#!/usr/bin/python import asyncio import json import hashlib import base64 from asyncio.subprocess import PIPE from fcntl import LOCK_EX, flock from pathlib import Path from sys import argv, exit def parse_pkginfo(pkginfo: str) -> dict: fields = {} for line in pkginfo.splitlines(): line = line.strip() if line.startswith("#"): continue key, value = line.split(" = ", 1) fields.setdefault(key, []).append(value) return fields def parse_pkgfiles(pkginfo: str) -> dict: files = [] for line in pkginfo.splitlines(): line = line.strip() if not line.startswith("."): files.append(line) return sorted(files) def cleandict(obj): if not isinstance(obj, dict): return obj return {k: cleandict(v) for k, v in obj.items() if v is not None} async def run(*args): args = [str(a) for a in args] proc = await asyncio.create_subprocess_exec(*args) if await proc.wait() != 0: raise RuntimeError(f"Command failed: {args!r}") async def get_output(*args) -> str: args = [str(a) for a in args] proc = await asyncio.create_subprocess_exec(*args, stdout=PIPE) stdout, _ = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"Command failed: {args!r}") return stdout.decode().strip() async def put_input(*args, stdin: str): args = [str(a) for a in args] proc = await asyncio.create_subprocess_exec(*args, stdin=PIPE) await proc.communicate(stdin.encode()) if proc.returncode != 0: raise RuntimeError(f"Command failed: {args!r}") async def get_pkginfo(pkgfile: Path) -> dict: return parse_pkginfo(await get_output("bsdtar", "-xOf", pkgfile, ".PKGINFO")) async def get_pkgfiles(pkgfile: Path) -> dict: return parse_pkgfiles(await get_output("bsdtar", "-tf", pkgfile)) async def build_pkgmeta(pkgpath, pkginfo): hash_md5 = hashlib.md5() hash_sha256 = hashlib.sha256() with pkgpath.open(mode="rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) hash_sha256.update(chunk) pgpsig = None with (pkgpath.parent / f"{pkgpath.name}.sig").open(mode="rb") as f: pgpsig = base64.b64encode(f.read()) pgpsig = pgpsig.decode() return cleandict( { "filename": pkgpath.name, "name": pkginfo["pkgname"][0], "desc": pkginfo["pkgdesc"][0], "groups": pkginfo.get("group"), "csize": pkgpath.stat().st_size, "isize": int(pkginfo["size"][0]), "md5sum": hash_md5.hexdigest(), "sha256sum": hash_sha256.hexdigest(), "pgpsig": pgpsig, "url": pkginfo["url"][0], "licenses": pkginfo.get("license"), "arch": pkginfo["arch"][0], "builddate": int(pkginfo["builddate"][0]), "packager": pkginfo["packager"][0], "depends": pkginfo.get("depend"), "optdepends": pkginfo.get("optdepend"), "makedepends": pkginfo.get("makedepend"), "replaces": pkginfo.get("replace"), "conflicts": pkginfo.get("conflict"), "provides": pkginfo.get("provides"), "files": await get_pkgfiles(pkgpath), } ) async def main() -> int: metadir = (Path(argv[0]).parent / "meta").resolve(strict=True) stagingdir = (Path(argv[0]).parent / "staging").resolve(strict=True) lockfile = (metadir / "dbscripts.lock").open(mode="w") flock(lockfile, LOCK_EX) # find pkg files to add/update packages = { r.name: {p: None for p in r.glob("*.pkg.tar.xz")} for r in stagingdir.glob("*") } packages = {r: ps for r, ps in packages.items() if ps} if not packages: return # load pkginfo async def load(ps, p): ps[p] = await get_pkginfo(p) await asyncio.gather( *(load(ps, p) for r, ps in packages.items() for p in ps.keys()) ) # prepare meta structure pkgbases = {r: {} for r in packages.keys()} for repo, ps in sorted(packages.items()): for pkgpath, pkginfo in ps.items(): pkgbase = pkginfo["pkgbase"][0] if pkgbase not in pkgbases[repo]: pkgbases[repo][pkgbase] = { "version": pkginfo["pkgver"][0], "packages": [], } pkgbases[repo][pkgbase]["packages"].append( await build_pkgmeta(pkgpath, pkginfo) ) # save meta info to json files for repo, ps in pkgbases.items(): Path(metadir / repo).mkdir(exist_ok=True) for pkgbase, pkgs in ps.items(): metafile = metadir / repo / f"{pkgbase}.json" with metafile.open(mode="w", encoding="utf-8") as f: json.dump(pkgs, f, ensure_ascii=False, indent=4, sort_keys=True) return 0 if __name__ == "__main__": exit(asyncio.run(main()))