keyringctl 42.3 KB
Newer Older
1
#!/usr/bin/env python
David Runge's avatar
David Runge committed
2
3
#
# SPDX-License-Identifier: GPL-3.0-or-later
4

David Runge's avatar
David Runge committed
5
from argparse import ArgumentParser
6
from collections import defaultdict
7
8
from collections.abc import Iterable
from collections.abc import Iterator
David Runge's avatar
David Runge committed
9
from contextlib import contextmanager
10
from itertools import chain
11
12
13
14
15
16
from logging import DEBUG
from logging import basicConfig
from logging import debug
from logging import error
from os import chdir
from os import getcwd
17
from pathlib import Path
18
19
20
from re import escape
from re import split
from re import sub
21
from shutil import copytree
22
23
24
25
26
27
28
from subprocess import PIPE
from subprocess import CalledProcessError
from subprocess import check_output
from sys import exit
from sys import stderr
from tempfile import TemporaryDirectory
from tempfile import mkdtemp
29
from traceback import print_stack
30
31
from typing import Dict
from typing import List
32
from typing import NewType
33
34
35
36
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union
37

38
39
40
Fingerprint = NewType("Fingerprint", str)
Uid = NewType("Uid", str)
Username = NewType("Username", str)
41
42


43
@contextmanager
44
def cwd(new_dir: Path) -> Iterator[None]:
45
46
47
48
49
50
51
52
    """Change to a new current working directory in a context and go back to the previous dir after the context is done

    Parameters
    ----------
    new_dir: Path
        A path to change to
    """

53
54
55
56
57
58
59
60
61
    previous_dir = getcwd()
    chdir(new_dir)
    try:
        yield
    finally:
        chdir(previous_dir)


def natural_sort_path(_list: Iterable[Path]) -> Iterable[Path]:
62
63
64
65
66
67
68
69
70
71
72
73
74
    """Sort an Iterable of Paths naturally

    Parameters
    ----------
    _list: Iterable[Path]
        An iterable containing paths to be sorted

    Return
    ------
    Iterable[Path]
        An Iterable of paths that are naturally sorted
    """

75
    def convert_text_chunk(text: str) -> Union[int, str]:
76
77
78
79
80
81
82
83
84
85
86
87
88
        """Convert input text to int or str

        Parameters
        ----------
        text: str
            An input string

        Returns
        -------
        Union[int, str]
            Either an integer if text is a digit, else text in lower-case representation
        """

89
90
        return int(text) if text.isdigit() else text.lower()

91
92
93
94
95
96
97
98
99
100
101
102
103
104
    def alphanum_key(key: Path) -> List[Union[int, str]]:
        """Retrieve an alphanumeric key from a Path, that can be used in sorted()

        Parameters
        ----------
        key: Path
            A path for which to create a key

        Returns
        -------
        List[Union[int, str]]
            A list of either int or str objects that may serve as 'key' argument for sorted()
        """

105
        return [convert_text_chunk(c) for c in split("([0-9]+)", str(key.name))]
106
107
108
109
110

    return sorted(_list, key=alphanum_key)


def system(cmd: List[str], exit_on_error: bool = True) -> str:
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
    """Execute a command using check_output

    Parameters
    ----------
    cmd: List[str]
        A list of strings to be fed to check_output
    exit_on_error: bool
        Whether to exit the script when encountering an error (defaults to True)

    Raises
    ------
    CalledProcessError
        If not exit_on_error and `check_output()` encounters an error

    Returns
    -------
    str
        The output of cmd
    """

131
132
133
134
    try:
        return check_output(cmd, stderr=PIPE).decode()
    except CalledProcessError as e:
        stderr.buffer.write(e.stderr)
135
        print_stack()
136
137
138
139
140
        if exit_on_error:
            exit(e.returncode)
        raise e


David Runge's avatar
David Runge committed
141
142
143
144
# TODO: simplify to lower complexity
def convert_certificate(  # noqa: ignore=C901
    working_dir: Path,
    certificate: Path,
145
    keyring_dir: Path,
146
    name_override: Optional[Username] = None,
147
    fingerprint_filter: Optional[Set[Fingerprint]] = None,
David Runge's avatar
David Runge committed
148
) -> Path:
149
150
    """Convert a single file public key certificate into a decomposed directory structure of multiple PGP packets

151
152
    The output directory structure is created per user. The username is derived from the certificate via
    `derive_username_from_fingerprint` or overridden via `name_override`.
153
    Below the username directory a directory tree describes the public keys components split up into certifications
154
155
156
157
158
159
160
161
    and revocations, as well as per subkey and per uid certifications and revocations.

    Parameters
    ----------
    working_dir: Path
        The path of the working directory below which to create split certificates
    certificate: Path
        The path to a public key certificate
162
163
    keyring_dir: Path
        The path of the keyring used to try to derive the username from the public key fingerprint
164
    name_override: Optional[Username]
165
        An optional string to override the username in the to be created output directory structure
166
    fingerprint_filter: Optional[Set[Fingerprint]]
167
168
        An optional list of strings defining fingerprints of PGP public keys that all certificates will be filtered
        with
169
170
171
172
173
174
175
176
177
178
179
180

    Raises
    ------
    Exception
        If required PGP packets are not found

    Returns
    -------
    Path
        The path of the user_dir (which is located below working_dir)
    """

181
182
    # root packets
    certificate_fingerprint: Optional[Fingerprint] = None
183
    pubkey: Optional[Path] = None
184
185
    direct_sigs: Dict[Fingerprint, List[Path]] = defaultdict(list)
    direct_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list)
186
187

    # subkey packets
188
189
190
    subkeys: Dict[Fingerprint, Path] = {}
    subkey_binding_sigs: Dict[Fingerprint, Path] = {}
    subkey_revocations: Dict[Fingerprint, Path] = {}
191
192
193
194
195
196
197
198

    # uid packets
    uids: Dict[Uid, Path] = {}
    uid_binding_sigs: Dict[Uid, Path] = {}
    certifications: Dict[Uid, List[Path]] = defaultdict(list)
    revocations: Dict[Uid, List[Path]] = defaultdict(list)

    # intermediate variables
199
    current_packet_mode: Optional[str] = None
200
201
    current_packet_fingerprint: Optional[Fingerprint] = None
    current_packet_uid: Optional[Uid] = None
202
203
204
205
206

    # XXX: PrimaryKeyBinding

    # TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean

David Runge's avatar
David Runge committed
207
    debug(f"Processing certificate {certificate}")
208

209
    for packet in packet_split(working_dir=working_dir, certificate=certificate):
David Runge's avatar
David Runge committed
210
211
212
        debug(f"Processing packet {packet.name}")
        if packet.name.endswith("--PublicKey"):
            current_packet_mode = "pubkey"
213
214
215
216
217
            current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint"))
            current_packet_uid = None

            certificate_fingerprint = current_packet_fingerprint
            pubkey = packet
David Runge's avatar
David Runge committed
218
219
        elif packet.name.endswith("--UserID"):
            current_packet_mode = "uid"
220
221
222
223
            current_packet_fingerprint = None
            current_packet_uid = simplify_user_id(Uid(packet_dump_field(packet, "Value")))

            uids[current_packet_uid] = packet
David Runge's avatar
David Runge committed
224
225
        elif packet.name.endswith("--PublicSubkey"):
            current_packet_mode = "subkey"
226
227
            current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint"))
            current_packet_uid = None
228

229
            subkeys[current_packet_fingerprint] = packet
David Runge's avatar
David Runge committed
230
        elif packet.name.endswith("--Signature"):
231
232
233
            if not certificate_fingerprint:
                raise Exception('missing certificate fingerprint for "{packet.name}"')

234
            issuer: Fingerprint = Fingerprint(packet_dump_field(packet, "Issuer"))
David Runge's avatar
David Runge committed
235
            signature_type = packet_dump_field(packet, "Type")
236

David Runge's avatar
David Runge committed
237
            if current_packet_mode == "pubkey":
238
239
240
                if not current_packet_fingerprint:
                    raise Exception('missing current packet fingerprint for "{packet.name}"')

David Runge's avatar
David Runge committed
241
                if signature_type == "KeyRevocation" and certificate_fingerprint.endswith(issuer):
242
                    direct_revocations[issuer].append(packet)
David Runge's avatar
David Runge committed
243
                elif signature_type in ["DirectKey", "GenericCertification"]:
244
                    direct_sigs[issuer].append(packet)
245
                else:
David Runge's avatar
David Runge committed
246
247
                    raise Exception(f"unknown signature type: {signature_type}")
            elif current_packet_mode == "uid":
248
249
250
                if not current_packet_uid:
                    raise Exception('missing current packet uid for "{packet.name}"')

David Runge's avatar
David Runge committed
251
                if signature_type == "CertificationRevocation":
252
                    revocations[current_packet_uid].append(packet)
David Runge's avatar
David Runge committed
253
                elif signature_type == "PositiveCertification" and certificate_fingerprint.endswith(issuer):
254
                    uid_binding_sigs[current_packet_uid] = packet
David Runge's avatar
David Runge committed
255
                elif signature_type.endswith("Certification"):
256
257
                    if fingerprint_filter is not None and any([fp.endswith(issuer) for fp in fingerprint_filter]):
                        debug(f"The certification by issuer {issuer} is appended as it is found in the filter.")
258
                        certifications[current_packet_uid].append(packet)
259
260
                    else:
                        debug(f"The certification by issuer {issuer} is not appended because it is not in the filter")
261
                else:
David Runge's avatar
David Runge committed
262
263
                    raise Exception(f"unknown signature type: {signature_type}")
            elif current_packet_mode == "subkey":
264
265
266
                if not current_packet_fingerprint:
                    raise Exception('missing current packet fingerprint for "{packet.name}"')

David Runge's avatar
David Runge committed
267
                if signature_type == "SubkeyBinding":
268
                    subkey_binding_sigs[current_packet_fingerprint] = packet
David Runge's avatar
David Runge committed
269
                elif signature_type == "SubkeyRevocation":
270
                    subkey_revocations[certificate_fingerprint] = packet
271
                else:
David Runge's avatar
David Runge committed
272
                    raise Exception(f"unknown signature type: {signature_type}")
273
274
275
276
277
278
            else:
                raise Exception(f'unknown signature root for "{packet.name}"')
        else:
            raise Exception(f'unknown packet type "{packet.name}"')

    if not certificate_fingerprint:
David Runge's avatar
David Runge committed
279
        raise Exception("missing certificate fingerprint")
280
281

    if not pubkey:
David Runge's avatar
David Runge committed
282
        raise Exception("missing certificate public-key")
283

284
285
286
287
288
    name_override = (
        name_override
        or derive_username_from_fingerprint(keyring_dir=keyring_dir, certificate_fingerprint=certificate_fingerprint)
        or Username(certificate.stem)
    )
289
290

    user_dir = working_dir / name_override
David Runge's avatar
David Runge committed
291
    key_dir = user_dir / certificate_fingerprint
292
    key_dir.mkdir(parents=True, exist_ok=True)
293

294
    persist_public_key(
295
296
        certificate_fingerprint=certificate_fingerprint,
        pubkey=pubkey,
297
        key_dir=key_dir,
298
299
300
301
302
    )

    persist_uids(
        key_dir=key_dir,
        uid_binding_sigs=uid_binding_sigs,
303
304
305
        uids=uids,
    )

306
307
308
309
310
311
312
313
314
315
316
    persist_subkeys(
        key_dir=key_dir,
        subkeys=subkeys,
        subkey_binding_sigs=subkey_binding_sigs,
    )

    persist_subkey_revocations(
        key_dir=key_dir,
        subkey_revocations=subkey_revocations,
    )

317
318
    persist_direct_sigs(
        direct_sigs=direct_sigs,
319
        pubkey=pubkey,
320
        key_dir=key_dir,
321
322
    )

323
324
325
    persist_direct_sigs(
        direct_sigs=direct_revocations,
        pubkey=pubkey,
326
327
        key_dir=key_dir,
        sig_type="revocation",
328
329
    )

330
331
332
    persist_certifications(
        certifications=certifications,
        pubkey=pubkey,
333
        key_dir=key_dir,
334
        uid_binding_sig=uid_binding_sigs,
335
336
337
338
339
340
        uids=uids,
    )

    persist_revocations(
        pubkey=pubkey,
        revocations=revocations,
341
        key_dir=key_dir,
342
        uid_binding_sig=uid_binding_sigs,
343
344
345
        uids=uids,
    )

346
    return user_dir
347
348


349
def persist_public_key(
350
    certificate_fingerprint: Fingerprint,
351
    pubkey: Path,
352
    key_dir: Path,
353
) -> None:
354
    """Persist the Public-Key packet
355
356
357

    Parameters
    ----------
358
    certificate_fingerprint: Fingerprint
359
360
361
        The unique fingerprint of the public key
    pubkey: Path
        The path to the public key of the root key
362
    key_dir: Path
363
364
365
        The root directory below which the basic key material is persisted
    """

366
    packets: List[Path] = [pubkey]
David Runge's avatar
David Runge committed
367
    output_file = key_dir / f"{certificate_fingerprint}.asc"
368
    output_file.parent.mkdir(parents=True, exist_ok=True)
David Runge's avatar
David Runge committed
369
    debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}")
370
371
372
    packet_join(packets, output_file)


373
374
def persist_uids(
    key_dir: Path,
375
376
    uid_binding_sigs: Dict[Uid, Path],
    uids: Dict[Uid, Path],
377
378
379
380
) -> None:
    """Persist the User IDs that belong to a PublicKey

    The User ID material consists of PublicSubkeys and their SubkeyBindings.
381
    The files are written to a UID specific directory and file below key_dir/uid.
382
383
384
385
386

    Parameters
    ----------
    key_dir: Path
        The root directory below which the basic key material is persisted
387
    uid_binding_sigs: Dict[Uid, Path]
388
        The PositiveCertifications of a User ID and Public-Key packet
389
    uids: Dict[Uid, Path]
390
391
392
393
394
        The User IDs of a Public-Key (the root key)
    """

    for key in uid_binding_sigs.keys():
        packets = [uids[key], uid_binding_sigs[key]]
David Runge's avatar
David Runge committed
395
        output_file = key_dir / "uid" / key / f"{key}.asc"
396
        output_file.parent.mkdir(parents=True, exist_ok=True)
David Runge's avatar
David Runge committed
397
        debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}")
398
399
400
        packet_join(packets, output_file)


401
402
def persist_subkeys(
    key_dir: Path,
403
404
    subkeys: Dict[Fingerprint, Path],
    subkey_binding_sigs: Dict[Fingerprint, Path],
405
406
407
408
409
410
411
) -> None:
    """Persist all Public-Subkeys and their PublicSubkeyBinding of a root key file to file(s)

    Parameters
    ----------
    key_dir: Path
        The root directory below which the basic key material is persisted
412
    subkeys: Dict[Fingerprint, Path]
413
        The PublicSubkeys of a key
414
    subkey_binding_sigs: Dict[Fingerprint, Path]
415
416
417
        The SubkeyBinding signatures of a Public-Key (the root key)
    """

418
419
420
421
422
423
424
    for fingerprint, subkey in subkeys.items():
        packets: List[Path] = []
        packets.extend([subkey, subkey_binding_sigs[fingerprint]])
        output_file = key_dir / "subkey" / fingerprint / f"{fingerprint}.asc"
        output_file.parent.mkdir(parents=True, exist_ok=True)
        debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}")
        packet_join(packets=packets, output=output_file)
425
426
427
428


def persist_subkey_revocations(
    key_dir: Path,
429
    subkey_revocations: Dict[Fingerprint, Path],
430
431
432
433
434
435
436
) -> None:
    """Persist the SubkeyRevocations of all Public-Subkeys of a root key to file(s)

    Parameters
    ----------
    key_dir: Path
        The root directory below which the basic key material is persisted
437
    subkey_revocations: Dict[Fingerprint, Path]
438
439
440
        The SubkeyRevocations of PublicSubkeys of a key
    """

441
442
443
444
445
446
    for fingerprint, revocation in subkey_revocations.items():
        issuer = packet_dump_field(revocation, "Issuer")
        output_file = key_dir / "subkey" / fingerprint / "revocation" / f"{issuer}.asc"
        output_file.parent.mkdir(parents=True, exist_ok=True)
        debug(f"Writing file {output_file} from {revocation}")
        packet_join(packets=[revocation], output=output_file)
447
448


449
def persist_direct_sigs(
450
    direct_sigs: Dict[Fingerprint, List[Path]],
451
    pubkey: Path,
452
    key_dir: Path,
453
    sig_type: str = "certification",
454
) -> None:
455
456
    """Persist the signatures directly on a root key (such as DirectKeys or *Certifications without a User ID) to
    file(s)
457
458
459

    Parameters
    ----------
460
    direct_sigs: Dict[Fingerprint, List[Path]]
461
        The direct sigs to write to file
462
463
    pubkey: Path
        The path to the public key of the root key
464
    key_dir: Path
465
        The root directory below which the Directkeys are persisted
466
467
    sig_type: str
        The type of direct certification to persist (defaults to 'certification'). This influences the directory name
468
469
    """

470
471
472
473
474
475
    for issuer, certifications in direct_sigs.items():
        packets = [pubkey] + certifications
        output_file = key_dir / sig_type / f"{issuer}.asc"
        output_file.parent.mkdir(parents=True, exist_ok=True)
        debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}")
        packet_join(packets, output_file)
476
477
478


def persist_certifications(
479
    certifications: Dict[Uid, List[Path]],
480
    pubkey: Path,
481
    key_dir: Path,
482
483
    uid_binding_sig: Dict[Uid, Path],
    uids: Dict[Uid, Path],
484
485
486
487
488
) -> None:
    """Persist the certifications of a root key to file(s)

    The certifications include all CasualCertifications, GenericCertifications, PersonaCertifications and
    PositiveCertifications for all User IDs of the given root key.
489
    All certifications are persisted in per User ID certification directories below key_dir.
490
491
492

    Parameters
    ----------
493
    certifications: Dict[Uid, List[Path]]
494
495
496
        The certifications to write to file
    pubkey: Path
        The path to the public key of the root key
497
    key_dir: Path
498
        The root directory below which certifications are persisted
499
    uid_binding_sig: Dict[Uid, Path]
500
        The PositiveCertifications of a User ID and Public-Key packet
501
    uids: Dict[Uid, Path]
502
503
        The User IDs of a Public-Key (the root key)
    """
504
505
506

    for key, current_certifications in certifications.items():
        for certification in current_certifications:
David Runge's avatar
David Runge committed
507
            certification_dir = key_dir / "uid" / key / "certification"
508
            certification_dir.mkdir(parents=True, exist_ok=True)
David Runge's avatar
David Runge committed
509
            issuer = packet_dump_field(certification, "Issuer")
510

511
512
            if uids.get(key) and uid_binding_sig.get(key):
                packets = [pubkey, uids[key], uid_binding_sig[key], certification]
David Runge's avatar
David Runge committed
513
514
                output_file = certification_dir / f"{issuer}.asc"
                debug(f"Writing file {output_file} from {certification}")
515
516
517
518
519
                packet_join(packets, output_file)
            else:
                error(
                    f"Public key '{pubkey}' does not provide "
                    f"{'the UID binding signature' if not uid_binding_sig.get(key) else ''} for UID '{key}', "
David Runge's avatar
David Runge committed
520
521
                    "so its certifications can not be used!"
                )
522

523
524
525

def persist_revocations(
    pubkey: Path,
526
    revocations: Dict[Uid, List[Path]],
527
    key_dir: Path,
528
529
    uid_binding_sig: Dict[Uid, Path],
    uids: Dict[Uid, Path],
530
531
532
533
) -> None:
    """Persist the revocations of a root key to file(s)

    The revocations include all CertificationRevocations for all User IDs of the given root key.
534
    All revocations are persisted in per User ID 'revocation' directories below key_dir.
535
536
537
538
539

    Parameters
    ----------
    pubkey: Path
        The path to the public key of the root key
540
    revocations: Dict[Uid, List[Path]]
541
        The revocations to write to file
542
    key_dir: Path
543
        The root directory below which revocations will be persisted
544
    uid_binding_sig: Dict[Uid, Path]
545
        The PositiveCertifications of a User ID and Public-Key packet
546
    uids: Dict[Uid, Path]
547
548
549
        The User IDs of a Public-Key (the root key)
    """

550
551
    for key, current_revocations in revocations.items():
        for revocation in current_revocations:
David Runge's avatar
David Runge committed
552
            revocation_dir = key_dir / "uid" / key / "revocation"
553
554
            revocation_dir.mkdir(parents=True, exist_ok=True)

David Runge's avatar
David Runge committed
555
            issuer = packet_dump_field(revocation, "Issuer")
556
557
558
559
560
            packets = [pubkey, uids[key]]
            # Binding sigs only exist for 3rd-party revocations
            if key in uid_binding_sig:
                packets.append(uid_binding_sig[key])
            packets.append(revocation)
David Runge's avatar
David Runge committed
561
562
            output_file = revocation_dir / f"{issuer}.asc"
            debug(f"Writing file {output_file} from {revocation}")
563
564
565
566
            packet_join(packets, output_file)


def packet_dump(packet: Path) -> str:
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
    """Dump a PGP packet to string

    The `sq packet dump` command is used to retrieve a dump of information from a PGP packet

    Parameters
    ----------
    packet: Path
        The path to the PGP packet to retrieve the value from

    Returns
    -------
    str
        The contents of the packet dump
    """

David Runge's avatar
David Runge committed
582
    return system(["sq", "packet", "dump", str(packet)])
583
584
585


def packet_dump_field(packet: Path, field: str) -> str:
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
    """Retrieve the value of a field from a PGP packet

    Parameters
    ----------
    packet: Path
        The path to the PGP packet to retrieve the value from
    field: str
        The name of the field

    Raises
    ------
    Exception
        If the field is not found in the PGP packet

    Returns
    -------
    str
        The value of the field found in packet
    """

606
607
    dump = packet_dump(packet)
    lines = [line.strip() for line in dump.splitlines()]
David Runge's avatar
David Runge committed
608
    lines = list(filter(lambda line: line.startswith(f"{field}: "), lines))
609
610
611
612
613
    if not lines:
        raise Exception(f'Packet has no field "{field}"')
    return lines[0].split(maxsplit=1)[1]


614
615
def keyring_split(working_dir: Path, keyring: Path) -> Iterable[Path]:
    """Split a file containing a PGP keyring into separate certificate files
616

617
618
    The original keyring filename is preserved if the split only yields a single certificate.
    The file is split using sq.
619

620
621
622
    Parameters
    ----------
    working_dir: Path
623
624
625
        The path of the working directory below which to create the output files
    keyring: Path
        The path of a file containing a PGP keyring
626
627
628

    Returns
    -------
629
630
    Iterable[Path]
        An iterable over the naturally sorted list of certificate files derived from a keyring
631
632
    """

David Runge's avatar
David Runge committed
633
    keyring_dir = Path(mkdtemp(dir=working_dir, prefix="keyring-")).absolute()
634
635

    with cwd(keyring_dir):
David Runge's avatar
David Runge committed
636
        system(["sq", "keyring", "split", "--prefix", "", str(keyring)])
637
638
639
640
641
642
643

    keyrings: List[Path] = list(natural_sort_path(keyring_dir.iterdir()))

    if 1 == len(keyrings):
        keyrings[0] = keyrings[0].rename(keyrings[0].parent / keyring.name)

    return keyrings
644
645


646
def keyring_merge(certificates: List[Path], output: Optional[Path] = None, force: bool = False) -> str:
647
648
649
650
651
652
653
654
    """Merge multiple certificates into a keyring

    Parameters
    ----------
    certificates: List[Path]
        List of paths to certificates to merge into a keyring
    output: Optional[Path]
        Path to a file which the keyring is written, return the result instead if None
655
656
    force: bool
        Whether to force overwriting existing files (defaults to False)
657
658
659
660
661
662
663
664

    Returns
    -------
    str
        The result if no output file has been used
    """

    cmd = ["sq", "keyring", "merge"]
665
666
    if force:
        cmd.insert(1, "--force")
667
668
669
670
671
672
673
    if output:
        cmd += ["--output", str(output)]
    cmd += [str(cert) for cert in sorted(certificates)]

    return system(cmd, exit_on_error=False)


674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
def packet_split(working_dir: Path, certificate: Path) -> Iterable[Path]:
    """Split a file containing a PGP certificate into separate packet files

    The files are split using sq

    Parameters
    ----------
    working_dir: Path
        The path of the working directory below which to create the output files
    certificate: Path
        The absolute path of a file containing one PGP certificate

    Returns
    -------
    Iterable[Path]
        An iterable over the naturally sorted list of packet files derived from certificate
    """

David Runge's avatar
David Runge committed
692
    packet_dir = Path(mkdtemp(dir=working_dir, prefix="packet-")).absolute()
693

694
    with cwd(packet_dir):
David Runge's avatar
David Runge committed
695
        system(["sq", "packet", "split", "--prefix", "", str(certificate)])
696
697
698
    return natural_sort_path(packet_dir.iterdir())


699
def packet_join(packets: List[Path], output: Optional[Path] = None, force: bool = False) -> str:
700
701
702
703
704
705
    """Join PGP packet data in files to a single output file

    Parameters
    ----------
    packets: List[Path]
        A list of paths to files that contain PGP packet data
706
707
    output: Optional[Path]
        Path to a file to which all PGP packet data is written, return the result instead if None
708
    force: bool
709
        Whether to force overwriting existing files (defaults to False)
710
711
712
713
714

    Returns
    -------
    str
        The result if no output file has been used
715
716
    """

David Runge's avatar
David Runge committed
717
    cmd = ["sq", "packet", "join"]
718
    if force:
David Runge's avatar
David Runge committed
719
        cmd.insert(1, "--force")
720
721
    packets_str = list(map(lambda path: str(path), packets))
    cmd.extend(packets_str)
David Runge's avatar
David Runge committed
722
    cmd.extend(["--output", str(output)])
723
    return system(cmd, exit_on_error=False)
724
725


726
def simplify_user_id(user_id: Uid) -> Uid:
727
728
729
730
    """Simplify the User ID string to contain more filesystem friendly characters

    Parameters
    ----------
731
    user_id: Uid
732
733
734
735
        A User ID string (e.g. 'Foobar McFooface <foobar@foo.face>')

    Returns
    -------
736
    Uid
737
738
739
        The simplified representation of user_id
    """

740
741
742
743
    user_id_str: str = user_id.replace("@", "_at_")
    user_id_str = sub("[<>]", "", user_id_str)
    user_id_str = sub("[" + escape(r" !@#$%^&*()_-+=[]{}\|;:,.<>/?") + "]", "_", user_id_str)
    return Uid(user_id_str)
744
745


746
def derive_username_from_fingerprint(keyring_dir: Path, certificate_fingerprint: Fingerprint) -> Optional[Username]:
747
    """Attempt to derive the username of a public key fingerprint from a keyring directory
748
749
750

    Parameters
    ----------
751
    keyring_dir: Path
752
        The directory in which to look up a username
753
754
    certificate_fingerprint: Fingerprint
        The public key fingerprint to derive the username from
755
756
757
758
759
760
761
762

    Raises
    ------
    Exception
        If more than one username is found (a public key can only belong to one individual)

    Returns
    -------
763
    Optional[Username]
764
765
766
        A string representing the username a public key certificate belongs to, None otherwise
    """

767
    matches = list(keyring_dir.glob(f"*/*{certificate_fingerprint}"))
768
769
770

    if len(matches) > 1:
        raise Exception(
771
772
            f"More than one username found in {keyring_dir} when probing for fingerprint '{certificate_fingerprint}': "
            f"{matches}"
773
        )
774
775
    elif not matches:
        debug(f"Can not derive username from target directory for fingerprint {certificate_fingerprint}")
776
777
778
        return None
    else:
        username = matches[0].parent.stem
779
780
781
782
        debug(
            f"Successfully derived username '{username}' from target directory for fingerprint "
            f"{certificate_fingerprint}"
        )
783
        return Username(username)
784
785


786
787
def convert(
    working_dir: Path,
788
    source: Iterable[Path],
789
    target_dir: Path,
790
    name_override: Optional[Username] = None,
791
    fingerprint_filter: Optional[Set[Fingerprint]] = None,
792
) -> Path:
793
    """Convert a path containing PGP certificate material to a decomposed directory structure
794

795
    Any input is first split by `keyring_split()` into individual certificates.
796
797
798
799
800

    Parameters
    ----------
    working_dir: Path
        A directory to use for temporary files
801
    source: Iterable[Path]
802
        A path to a file or directory to decompose
803
804
    target_dir: Path
        A directory path to write the new directory structure to
805
    name_override: Optional[Username]
806
        An optional username override for the call to `convert_certificate()`
807
    fingerprint_filter: Optional[Set[Fingerprint]]
808
        An optional set of strings defining fingerprints of PGP public keys that all certificates will be filtered with
809
810
811
812
813
814
815

    Returns
    -------
    Path
        The directory that contains the resulting directory structure (target_dir)
    """

816
    directories: List[Path] = []
817
    keys: Iterable[Path] = set(chain.from_iterable(map(lambda s: s.iterdir() if s.is_dir() else [s], source)))
818
819
820

    for key in keys:
        for cert in keyring_split(working_dir=working_dir, keyring=key):
821
822
823
824
            directories.append(
                convert_certificate(
                    working_dir=working_dir,
                    certificate=cert,
825
826
                    keyring_dir=target_dir,
                    name_override=name_override,
827
828
829
                    fingerprint_filter=fingerprint_filter,
                )
            )
830
831

    for path in directories:
David Runge's avatar
David Runge committed
832
        (target_dir / path.name).mkdir(parents=True, exist_ok=True)
833
        copytree(src=path, dst=(target_dir / path.name), dirs_exist_ok=True)
834
835
836
837

    return target_dir


838
def get_all_and_revoked_certs(certs: List[Path]) -> Tuple[List[Fingerprint], List[Fingerprint]]:
839
840
841
842
843
844
845
846
847
848
849
850
851
852
    """Get the fingerprints of all public keys and all fingerprints of all (self) revoked public keys in a directory

    Parameters
    ----------
    certs: List[Path]
        The certificates to trust

    Returns
    -------
    Tuple[List[str], List[str]]
        A tuple with the first item containing the fingerprints of all public keys and the second item containing the
        fingerprints of all self-revoked public keys
    """

853
854
855
856
    all_fingerprints: List[Fingerprint] = []
    revoked_fingerprints: List[Fingerprint] = []

    # TODO: what about direct key revocations/signatures?
857
858
859
860
861
862
863

    debug(f"Retrieving all and self-revoked certificates from {[str(cert_dir) for cert_dir in certs]}")
    for cert_collection in certs:
        if cert_collection.is_dir():
            for user_dir in cert_collection.iterdir():
                if user_dir.is_dir():
                    for cert_dir in user_dir.iterdir():
864
865
                        cert_fingerprint = Fingerprint(cert_dir.stem)
                        all_fingerprints.append(cert_fingerprint)
866
                        for revocation_cert in cert_dir.glob("revocation/*.asc"):
867
868
869
                            if cert_fingerprint.endswith(revocation_cert.stem):
                                debug(f"Revoking {cert_fingerprint} due to self-revocation")
                                revoked_fingerprints.append(cert_fingerprint)
870

871
    return all_fingerprints, revoked_fingerprints
872
873


874
def export_ownertrust(certs: List[Path], output: Path) -> Tuple[List[Fingerprint], List[Fingerprint]]:
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
    """Export ownertrust from a set of keys

    The output file format is compatible with `gpg --import-ownertrust` and lists the main fingerprint ID of all
    non-revoked keys as fully trusted.
    The exported file is used by pacman-key when importing a keyring (see
    https://man.archlinux.org/man/pacman-key.8#PROVIDING_A_KEYRING_FOR_IMPORT).

    Parameters
    ----------
    certs: List[Path]
        The certificates to trust
    output: Path
        The file path to write to
    """

890
    all_certs, revoked_certs = get_all_and_revoked_certs(certs=certs)
891
892
893
894
895
896
897
    trusted_certs = [cert for cert in all_certs if cert not in revoked_certs]

    with open(file=output, mode="w") as trusted_certs_file:
        for cert in trusted_certs:
            debug(f"Writing {cert} to {output}")
            trusted_certs_file.write(f"{cert}:4:\n")

898
    return trusted_certs, all_certs
899
900


901
def export_revoked(certs: List[Path], main_keys: List[Fingerprint], output: Path, min_revoker: int = 2) -> None:
902
903
904
905
906
907
908
909
910
911
912
    """Export the PGP revoked status from a set of keys

    The output file contains the fingerprints of all self-revoked keys and all keys for which at least two revocations
    by any main key exist.
    The exported file is used by pacman-key when importing a keyring (see
    https://man.archlinux.org/man/pacman-key.8#PROVIDING_A_KEYRING_FOR_IMPORT).

    Parameters
    ----------
    certs: List[Path]
        A list of directories with keys to check for their revocation status
913
    main_keys: List[Fingerprint]
914
915
916
917
918
919
920
921
        A list of strings representing the fingerprints of (current and/or revoked) main keys
    output: Path
        The file path to write to
    min_revoker: int
        The minimum amount of revocation certificates on a User ID from any main key to deem a public key as revoked
        (defaults to 2)
    """

922
    all_certs, revoked_certs = get_all_and_revoked_certs(certs=certs)
923
924
925
926
927
928
929
930
931
932

    debug(f"Retrieving certificates revoked by main keys from {[str(cert_dir) for cert_dir in certs]}")
    foreign_revocations: Dict[str, List[str]] = {}
    for cert_collection in certs:
        if cert_collection.is_dir():
            for user_dir in cert_collection.iterdir():
                if user_dir.is_dir():
                    for cert_dir in user_dir.iterdir():
                        debug(f"Inspecting public key {cert_dir.name}")
                        foreign_revocations[cert_dir.stem] = []
933
                        for revocation_cert in cert_dir.glob("uid/*/revocation/*.asc"):
934
                            foreign_revocations[cert_dir.stem] += [
David Runge's avatar
David Runge committed
935
936
                                revocation_cert.stem
                                for main_key in main_keys
937
938
939
940
941
942
943
944
945
946
                                if main_key.endswith(revocation_cert.stem)
                            ]

                        # TODO: find a better (less naive) approach, as this would also match on public certificates,
                        # where some UIDs are signed and others are revoked
                        if len(set(foreign_revocations[cert_dir.stem])) >= min_revoker:
                            debug(
                                f"Revoking {cert_dir.name} due to {set(foreign_revocations[cert_dir.stem])} "
                                "being main key revocations"
                            )
947
                            revoked_certs.append(Fingerprint(cert_dir.stem))
948
949
950
951
952
953
954

    with open(file=output, mode="w") as trusted_certs_file:
        for cert in set(revoked_certs):
            debug(f"Writing {cert} to {output}")
            trusted_certs_file.write(f"{cert}\n")


955
def get_fingerprints_from_import_source(working_dir: Path, source: List[Path]) -> List[Fingerprint]:
956
957
958
959
960
961
    """Get all fingerprints of PGP public keys from import file(s)

    Parameters
    ----------
    working_dir: Path
        A directory to use for temporary files
962
    source: List[Path]
963
964
965
966
        The path to a source file or directory

    Returns
    -------
967
    List[Fingerprint]
968
969
970
        A list of strings representing the fingerprints of PGP public keys found in source
    """

971
    fingerprints: List[Fingerprint] = []
972
    keys: Iterable[Path] = set(chain.from_iterable(map(lambda s: s.iterdir() if s.is_dir() else [s], source)))
973
974
975
976
977

    for key in keys:
        for certificate in keyring_split(working_dir=working_dir, keyring=key):
            for packet in packet_split(working_dir=working_dir, certificate=certificate):
                if packet.name.endswith("--PublicKey"):
978
                    fingerprints += [Fingerprint(packet_dump_field(packet, "Fingerprint"))]
979
980
981
982
983

    debug(f"Fingerprints of PGP public keys in {source}: {fingerprints}")
    return fingerprints


984
def get_fingerprints_from_decomposed_dir(path: Path) -> List[Fingerprint]:
985
986
987
988
989
990
991
992
993
    """Get all fingerprints of PGP public keys from a decomposed directory structure

    Parameters
    ----------
    path: Path
        The path to a decomposed directory structure

    Returns
    -------
994
    List[Fingerprint]
995
996
997
        A list of strings representing all fingerprints of PGP public keys below path
    """

998
    fingerprints = [Fingerprint(path.stem) for path in list(path.absolute().glob("*/*"))]
999
1000
    debug(f"Fingerprints of PGP public keys in {path}: {fingerprints}")
    return fingerprints
For faster browsing, not all history is shown. View entire blame