db-update 4.86 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#!/usr/bin/python

import asyncio
import json
import hashlib
import base64
from asyncio.subprocess import PIPE
from fcntl import LOCK_EX, flock
from pathlib import Path
from sys import argv, exit


def parse_pkginfo(pkginfo: str) -> dict:
    fields = {}
    for line in pkginfo.splitlines():
        line = line.strip()
        if line.startswith("#"):
            continue
        key, value = line.split(" = ", 1)
        fields.setdefault(key, []).append(value)
    return fields


def parse_pkgfiles(pkginfo: str) -> dict:
    files = []
    for line in pkginfo.splitlines():
        line = line.strip()
        if not line.startswith("."):
            files.append(line)
    return sorted(files)


def cleandict(obj):
    if not isinstance(obj, dict):
        return obj
    return {k: cleandict(v) for k, v in obj.items() if v is not None}


async def run(*args):
    args = [str(a) for a in args]
    proc = await asyncio.create_subprocess_exec(*args)
    if await proc.wait() != 0:
        raise RuntimeError(f"Command failed: {args!r}")


async def get_output(*args) -> str:
    args = [str(a) for a in args]
    proc = await asyncio.create_subprocess_exec(*args, stdout=PIPE)
    stdout, _ = await proc.communicate()
    if proc.returncode != 0:
        raise RuntimeError(f"Command failed: {args!r}")
    return stdout.decode().strip()


async def put_input(*args, stdin: str):
    args = [str(a) for a in args]
    proc = await asyncio.create_subprocess_exec(*args, stdin=PIPE)
    await proc.communicate(stdin.encode())
    if proc.returncode != 0:
        raise RuntimeError(f"Command failed: {args!r}")


async def get_pkginfo(pkgfile: Path) -> dict:
    return parse_pkginfo(await get_output("bsdtar", "-xOf", pkgfile, ".PKGINFO"))


async def get_pkgfiles(pkgfile: Path) -> dict:
    return parse_pkgfiles(await get_output("bsdtar", "-tf", pkgfile))


async def build_pkgmeta(pkgpath, pkginfo):
    hash_md5 = hashlib.md5()
    hash_sha256 = hashlib.sha256()
    with pkgpath.open(mode="rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
            hash_sha256.update(chunk)

    pgpsig = None
    with (pkgpath.parent / f"{pkgpath.name}.sig").open(mode="rb") as f:
        pgpsig = base64.b64encode(f.read())
    pgpsig = pgpsig.decode()

    return cleandict(
        {
            "filename": pkgpath.name,
            "name": pkginfo["pkgname"][0],
            "desc": pkginfo["pkgdesc"][0],
            "groups": pkginfo.get("group"),
            "csize": pkgpath.stat().st_size,
            "isize": int(pkginfo["size"][0]),
            "md5sum": hash_md5.hexdigest(),
            "sha256sum": hash_sha256.hexdigest(),
            "pgpsig": pgpsig,
            "url": pkginfo["url"][0],
            "licenses": pkginfo.get("license"),
            "arch": pkginfo["arch"][0],
            "builddate": int(pkginfo["builddate"][0]),
            "packager": pkginfo["packager"][0],
            "depends": pkginfo.get("depend"),
            "optdepends": pkginfo.get("optdepend"),
            "makedepends": pkginfo.get("makedepend"),
            "replaces": pkginfo.get("replace"),
            "conflicts": pkginfo.get("conflict"),
            "provides": pkginfo.get("provides"),
            "files": await get_pkgfiles(pkgpath),
        }
    )


async def main() -> int:
    metadir = (Path(argv[0]).parent / "meta").resolve(strict=True)
    stagingdir = (Path(argv[0]).parent / "staging").resolve(strict=True)

    lockfile = (metadir / "dbscripts.lock").open(mode="w")
    flock(lockfile, LOCK_EX)

    # find pkg files to add/update
    packages = {
        r.name: {p: None for p in r.glob("*.pkg.tar.xz")} for r in stagingdir.glob("*")
    }
    packages = {r: ps for r, ps in packages.items() if ps}
    if not packages:
        return

    # load pkginfo
    async def load(ps, p):
        ps[p] = await get_pkginfo(p)

    await asyncio.gather(
        *(load(ps, p) for r, ps in packages.items() for p in ps.keys())
    )

    # prepare meta structure
    pkgbases = {r: {} for r in packages.keys()}
    for repo, ps in sorted(packages.items()):
        for pkgpath, pkginfo in ps.items():
            pkgbase = pkginfo["pkgbase"][0]
            if pkgbase not in pkgbases[repo]:
                pkgbases[repo][pkgbase] = {
                    "version": pkginfo["pkgver"][0],
                    "packages": [],
                }
            pkgbases[repo][pkgbase]["packages"].append(
                await build_pkgmeta(pkgpath, pkginfo)
            )

    # save meta info to json files
    for repo, ps in pkgbases.items():
        Path(metadir / repo).mkdir(exist_ok=True)
        for pkgbase, pkgs in ps.items():
            metafile = metadir / repo / f"{pkgbase}.json"
            with metafile.open(mode="w", encoding="utf-8") as f:
                json.dump(pkgs, f, ensure_ascii=False, indent=4, sort_keys=True)

    return 0


if __name__ == "__main__":
    exit(asyncio.run(main()))