files.py 7.31 KB
Newer Older
1
import io
David Runge's avatar
David Runge committed
2
import re
David Runge's avatar
David Runge committed
3
import tarfile
4
import time
David Runge's avatar
David Runge committed
5
from pathlib import Path
David Runge's avatar
David Runge committed
6
from typing import Iterator
David Runge's avatar
David Runge committed
7

8
9
10
11
import orjson
from pydantic.error_wrappers import ValidationError

from repo_management import convert, defaults, errors, models
12

David Runge's avatar
David Runge committed
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

def _read_db_file(db_path: Path, compression: str = "gz") -> tarfile.TarFile:
    """Read a repository database file

    Parameters
    ----------
    db_path: Path
        A pathlib.Path instance, representing the location of the database file
    compression: str
        The compression used for the database file (defaults to 'gz')

    Raises
    ------
    ValueError
        If the file represented by db_path does not exist
    tarfile.ReadError
        If the file could not be opened
    tarfile.CompressionError
        If the provided compression does not match the compression of the file or if the compression type is unknown

    Returns
    -------
    tarfile.Tarfile
        An instance of Tarfile
    """

    return tarfile.open(name=db_path, mode=f"r:{compression}")
David Runge's avatar
David Runge committed
40
41


42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def _extract_db_member_package_name(name: str) -> str:
    """Extract and return the package name from a repository database member name

    Parameters
    ----------
    name: str
        The name of a member of a repository database (i.e. one of tarfile.Tarfile.getnames())

    Returns
    str
        The package name extracted from name
    """

    return "".join(re.split("(-)", re.sub("(/desc|/files)$", "", name))[:-4])


def _db_file_member_as_model(
    db_file: tarfile.TarFile, regex: str = "(/desc|/files)$"
) -> Iterator[models.RepoDbMemberData]:
    """Iterate over the members of a database file, represented by an instance of tarfile.TarFile and yield the members
    as instances of models.RepoDbMemberData

    The method filters the list of evaluated members using a regular expression. Depending on member name one of
    defaults.RepoDbMemberType is chosen.
David Runge's avatar
David Runge committed
66
67
68
69
70
71
72
73
74
75
76

    Paramaters
    ----------
    tarfile.TarFile
        An instance of TarFile representing a repository database
    regex: str
        A regular expression used to filter the names of the members contained in db_file (defaults to
        '(/desc|/files)$')
    """

    for name in [name for name in db_file.getnames() if re.search(regex, name)]:
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
        file_type = defaults.RepoDbMemberType.UNKNOWN
        if re.search("(/desc)$", name):
            file_type = defaults.RepoDbMemberType.DESC
        if re.search("(/files)$", name):
            file_type = defaults.RepoDbMemberType.FILES

        yield models.RepoDbMemberData(
            member_type=file_type,
            name=_extract_db_member_package_name(name=name),
            data=io.StringIO(
                io.BytesIO(
                    db_file.extractfile(name).read(),  # type: ignore
                )
                .read()
                .decode("utf-8"),
            ),
        )
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229


def _json_files_in_directory(path: Path) -> Iterator[Path]:
    """Yield JSON files found in a directory

    Parameters
    ----------
    path: Path
        A Path to search in for JSON files

    Raises
    ------
    errors.RepoManagementFileNotFoundError
        If there are no JSON files found below

    Returns
    -------
    Iterator[Path]
        An iterator over the files found in the directory defined by path
    """

    file_list = sorted(path.glob("*.json"))
    if not file_list:
        raise errors.RepoManagementFileNotFoundError(f"There are no JSON files in {path}!")

    for json_file in file_list:
        yield json_file


def _read_pkgbase_json_file(path: Path) -> models.OutputPackageBase:
    """Read a JSON file that represents a pkgbase and return it as models.OutputPackageBase

    Parameters
    ----------
    path: Path
        A Path to to a JSON file

    Raises
    ------
    errors.RepoManagementFileError
        If the JSON file can not be decoded
    errors.RepoManagementValidationError
        If the JSON file can not be validated using models.OutputPackageBase

    Returns
    -------
    models.OutputPackageBase
        A pydantic model representing a pkgbase
    """

    with open(path, "r") as input_file:
        try:
            return models.OutputPackageBase(**orjson.loads(input_file.read()))
        except orjson.JSONDecodeError as e:
            raise errors.RepoManagementFileError(f"The JSON file '{path}' could not be decoded!\n{e}")
        except ValidationError as e:
            raise errors.RepoManagementValidationError(f"The JSON file '{path}' could not be validated!\n{e}")


def _write_db_file(path: Path, compression: str = "gz") -> tarfile.TarFile:
    """Open a repository database file for writing

    Parameters
    ----------
    db_path: Path
        A pathlib.Path instance, representing the location of the database file
    compression: str
        The compression used for the database file (defaults to 'gz')

    Raises
    ------
    ValueError
        If the file represented by db_path does not exist
    tarfile.ReadError
        If the file could not be opened
    tarfile.CompressionError
        If the provided compression does not match the compression of the file or if the compression type is unknown

    Returns
    -------
    tarfile.Tarfile
        An instance of Tarfile
    """

    return tarfile.open(name=path, mode=f"w:{compression}")


def _stream_package_base_to_db(
    db: tarfile.TarFile,
    model: models.OutputPackageBase,
    repodbfile: convert.RepoDbFile,
    db_type: defaults.RepoDbType,
) -> None:
    """Stream descriptor files for packages of a pkgbase to a repository database

    Allows streaming to a default repository database or a files database

    Parameters
    ----------
    db: tarfile.TarFile
        The repository database to stream to
    model: models.OutputPackageBase
        The model to use for streaming descriptor files to the repository database
    db_type: defaults.RepoDbType
        The type of database to stream to
    """

    for (desc_model, files_model) in model.get_packages_as_models():
        dirname = f"{desc_model.name}-{model.version}"
        directory = tarfile.TarInfo(dirname)
        directory.type = tarfile.DIRTYPE
        directory.mtime = int(time.time())
        directory.uname = defaults.DB_USER
        directory.gname = defaults.DB_GROUP
        directory.mode = int(defaults.DB_DIR_MODE, base=8)
        db.addfile(directory)

        desc_content = io.StringIO()
        repodbfile.render_desc_template(model=desc_model, output=desc_content)
        desc_file = tarfile.TarInfo(f"{dirname}/desc")
        desc_file.size = len(desc_content.getvalue().encode())
        desc_file.mtime = int(time.time())
        desc_file.uname = defaults.DB_USER
        desc_file.gname = defaults.DB_GROUP
        desc_file.mode = int(defaults.DB_FILE_MODE, base=8)
        db.addfile(desc_file, io.BytesIO(desc_content.getvalue().encode()))
        if db_type == defaults.RepoDbType.FILES:
            files_content = io.StringIO()
            repodbfile.render_files_template(model=files_model, output=files_content)
            files_file = tarfile.TarInfo(f"{dirname}/files")
            files_file.size = len(files_content.getvalue().encode())
            files_file.mtime = int(time.time())
            files_file.uname = defaults.DB_USER
            files_file.gname = defaults.DB_GROUP
            files_file.mode = int(defaults.DB_FILE_MODE, base=8)
            db.addfile(files_file, io.BytesIO(files_content.getvalue().encode()))