db.py 4.2 KB
Newer Older
Kevin Morris's avatar
Kevin Morris committed
1
2
import math

3
import aurweb.config
4

5
6
engine = None  # See get_engine

Kevin Morris's avatar
Kevin Morris committed
7
8
9
10
11
12
# ORM Session class.
Session = None

# Global ORM Session object.
session = None

13

14
15
16
17
18
def get_sqlalchemy_url():
    """
    Build an SQLAlchemy for use with create_engine based on the aurweb configuration.
    """
    import sqlalchemy
19
20
21
22
23
24
25
26
27

    constructor = sqlalchemy.engine.url.URL

    parts = sqlalchemy.__version__.split('.')
    major = int(parts[0])
    minor = int(parts[1])
    if major == 1 and minor >= 4:  # pragma: no cover
        constructor = sqlalchemy.engine.url.URL.create

28
29
    aur_db_backend = aurweb.config.get('database', 'backend')
    if aur_db_backend == 'mysql':
30
        return constructor(
31
32
33
34
35
36
37
38
39
40
            'mysql+mysqlconnector',
            username=aurweb.config.get('database', 'user'),
            password=aurweb.config.get('database', 'password'),
            host=aurweb.config.get('database', 'host'),
            database=aurweb.config.get('database', 'name'),
            query={
                'unix_socket': aurweb.config.get('database', 'socket'),
            },
        )
    elif aur_db_backend == 'sqlite':
41
        return constructor(
42
43
44
45
46
47
48
            'sqlite',
            database=aurweb.config.get('database', 'name'),
        )
    else:
        raise ValueError('unsupported database backend')


49
50
51
52
53
54
55
56
def get_engine():
    """
    Return the global SQLAlchemy engine.

    The engine is created on the first call to get_engine and then stored in the
    `engine` global variable for the next calls.
    """
    from sqlalchemy import create_engine
Kevin Morris's avatar
Kevin Morris committed
57
58
59
60
    from sqlalchemy.orm import sessionmaker

    global engine, session, Session

61
    if engine is None:
Kevin Morris's avatar
Kevin Morris committed
62
63
64
65
        engine = create_engine(get_sqlalchemy_url(),
                               # check_same_thread is for a SQLite technicality
                               # https://fastapi.tiangolo.com/tutorial/sql-databases/#note
                               connect_args={"check_same_thread": False})
66
67
68
        Session = sessionmaker(autocommit=False, autoflush=False, bind=engine)
        session = Session()

69
70
71
72
73
74
75
76
77
78
79
80
81
82
    return engine


def connect():
    """
    Return an SQLAlchemy connection. Connections are usually pooled. See
    <https://docs.sqlalchemy.org/en/13/core/connections.html>.

    Since SQLAlchemy connections are context managers too, you should use it
    with Python’s `with` operator, or with FastAPI’s dependency injection.
    """
    return get_engine().connect()


83
84
class Connection:
    _conn = None
85
    _paramstyle = None
86
87

    def __init__(self):
88
        aur_db_backend = aurweb.config.get('database', 'backend')
89
90

        if aur_db_backend == 'mysql':
Kevin Morris's avatar
Kevin Morris committed
91
            import mysql.connector
92
93
94
95
96
            aur_db_host = aurweb.config.get('database', 'host')
            aur_db_name = aurweb.config.get('database', 'name')
            aur_db_user = aurweb.config.get('database', 'user')
            aur_db_pass = aurweb.config.get('database', 'password')
            aur_db_socket = aurweb.config.get('database', 'socket')
97
98
99
100
101
102
103
104
            self._conn = mysql.connector.connect(host=aur_db_host,
                                                 user=aur_db_user,
                                                 passwd=aur_db_pass,
                                                 db=aur_db_name,
                                                 unix_socket=aur_db_socket,
                                                 buffered=True)
            self._paramstyle = mysql.connector.paramstyle
        elif aur_db_backend == 'sqlite':
Kevin Morris's avatar
Kevin Morris committed
105
            import sqlite3
106
            aur_db_name = aurweb.config.get('database', 'name')
107
            self._conn = sqlite3.connect(aur_db_name)
Kevin Morris's avatar
Kevin Morris committed
108
            self._conn.create_function("POWER", 2, math.pow)
109
110
111
            self._paramstyle = sqlite3.paramstyle
        else:
            raise ValueError('unsupported database backend')
112
113

    def execute(self, query, params=()):
114
        if self._paramstyle in ('format', 'pyformat'):
115
116
117
118
119
            query = query.replace('%', '%%').replace('?', '%s')
        elif self._paramstyle == 'qmark':
            pass
        else:
            raise ValueError('unsupported paramstyle')
120
121
122
123
124
125
126
127
128
129
130

        cur = self._conn.cursor()
        cur.execute(query, params)

        return cur

    def commit(self):
        self._conn.commit()

    def close(self):
        self._conn.close()