Skip to content
Snippets Groups Projects

mkpkglists improvements

Merged Kevin Morris requested to merge kevr/aurweb:teapot into master
Compare and
4 files
+ 120
17
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 117
17
@@ -2,15 +2,43 @@
import datetime
import gzip
import json
import os
import aurweb.config
import aurweb.db
packagesfile = aurweb.config.get('mkpkglists', 'packagesfile')
packagesmetafile = aurweb.config.get('mkpkglists', 'packagesmetafile')
pkgbasefile = aurweb.config.get('mkpkglists', 'pkgbasefile')
userfile = aurweb.config.get('mkpkglists', 'userfile')
def should_update(tablename: str) -> int:
if aurweb.config.get("database", "backend") != "mysql":
return False
conn = aurweb.db.Connection()
db_name = aurweb.config.get("database", "name")
cur = conn.execute("SELECT auto_increment FROM information_schema.tables "
"WHERE table_schema = ? AND table_name = ?",
(db_name, tablename,))
update_time = cur.fetchone()[0]
cached_update_time = 0
if os.path.exists(f"/tmp/{tablename}.update-time.cache"):
with open(f"/tmp/{tablename}.update-time.cache") as f:
cached_update_time = int(f.read().strip())
return (cached_update_time == update_time, update_time)
def update_cache(tablename: str, update_time: int) -> None:
with open(f"/tmp/{tablename}.update-time.cache", "w") as f:
f.write(str(update_time))
def main():
conn = aurweb.db.Connection()
@@ -19,24 +47,96 @@ def main():
pkgbaselist_header = "# AUR package base list, generated on " + datestr
userlist_header = "# AUR user name list, generated on " + datestr
with gzip.open(packagesfile, "w") as f:
f.write(bytes(pkglist_header + "\n", "UTF-8"))
cur = conn.execute("SELECT Packages.Name FROM Packages " +
"INNER JOIN PackageBases " +
"ON PackageBases.ID = Packages.PackageBaseID " +
updated, update_time = should_update("Packages")
if not updated:
print("Updating Packages...")
columns = ("Packages.ID, PackageBaseID, Packages.Name, "
"Version, Description, URL")
cur = conn.execute(f"SELECT {columns} FROM Packages "
"INNER JOIN PackageBases "
"ON PackageBases.ID = Packages.PackageBaseID "
"WHERE PackageBases.PackagerUID IS NOT NULL")
f.writelines([bytes(x[0] + "\n", "UTF-8") for x in cur.fetchall()])
with gzip.open(pkgbasefile, "w") as f:
f.write(bytes(pkgbaselist_header + "\n", "UTF-8"))
cur = conn.execute("SELECT Name FROM PackageBases " +
"WHERE PackagerUID IS NOT NULL")
f.writelines([bytes(x[0] + "\n", "UTF-8") for x in cur.fetchall()])
with gzip.open(userfile, "w") as f:
f.write(bytes(userlist_header + "\n", "UTF-8"))
cur = conn.execute("SELECT UserName FROM Users")
f.writelines([bytes(x[0] + "\n", "UTF-8") for x in cur.fetchall()])
results = cur.fetchall()
with gzip.open(packagesfile, "w") as f:
f.write(bytes(pkglist_header + "\n", "UTF-8"))
f.writelines([bytes(x[2] + "\n", "UTF-8") for x in results])
with gzip.open(packagesmetafile, "wt") as f:
""" The output "data" json key points to a list of dictionaries,
each representing a single result, filled with column names as
keys and column values as values.
The output "mapping" json key points to a dictionary of Package
name key -> "data"-list index pairs. This provides users of
the meta archive a way to perform O(1) searches based on a
package name, while still providing a sequential list for
loopability.
i = json_data["mapping"]["package_name"]
package_data = json_data["data"][i]
name = package_data.get("Name")
version = package_data.get("Version")
Example:
{
"data": [
{
"ID": 123,
"Name": "package_name",
"PackageBaseID": 234,
"Version": "0.1.1",
"Description": "Some description...",
"URL": "https://some.url"
},
...
],
"mapping": {
"package_name": 0,
...
}
}
"""
json.dump({
"warning": ("This is a experimental! It can be removed "
"or modified without warning!"),
"mapping": {
result[2]: i
for i, result in enumerate(results)
},
"data": [{
column[0]: result[i]
for i, column in enumerate(cur.description)
} for result in results]
}, f)
update_cache("Packages", update_time)
else:
print("Packages have not been updated; skipping.")
updated, update_time = should_update("PackageBases")
if not updated:
print("Updating PackageBases...")
with gzip.open(pkgbasefile, "w") as f:
f.write(bytes(pkgbaselist_header + "\n", "UTF-8"))
cur = conn.execute("SELECT Name FROM PackageBases " +
"WHERE PackagerUID IS NOT NULL")
f.writelines([bytes(x[0] + "\n", "UTF-8") for x in cur.fetchall()])
update_cache("PackageBases", update_time)
else:
print("PackageBases have not been updated; skipping.")
updated, update_time = should_update("Users")
if not updated:
print("Updating Users...")
with gzip.open(userfile, "w") as f:
f.write(bytes(userlist_header + "\n", "UTF-8"))
cur = conn.execute("SELECT UserName FROM Users")
f.writelines([bytes(x[0] + "\n", "UTF-8") for x in cur.fetchall()])
update_cache("Users", update_time)
else:
print("Users have not been updated; skipping.")
conn.close()
Loading