asgi.py 4.86 KB
Newer Older
1
import asyncio
2
import http
3
import typing
4

5
6
from urllib.parse import quote_plus

7
from fastapi import FastAPI, HTTPException, Request
8
from fastapi.responses import HTMLResponse, RedirectResponse
9
from fastapi.staticfiles import StaticFiles
10
from sqlalchemy import and_, or_
11
from starlette.middleware.authentication import AuthenticationMiddleware
12
13
14
from starlette.middleware.sessions import SessionMiddleware

import aurweb.config
Kevin Morris's avatar
Kevin Morris committed
15
import aurweb.logging
16

17
from aurweb.auth import BasicAuthBackend
18
19
20
from aurweb.db import get_engine, query
from aurweb.models.accepted_term import AcceptedTerm
from aurweb.models.term import Term
21
from aurweb.routers import accounts, auth, errors, html, rss, sso, trusted_user
22

23
# Setup the FastAPI app.
Marcus Andersson's avatar
Marcus Andersson committed
24
app = FastAPI(exception_handlers=errors.exceptions)
Kevin Morris's avatar
Kevin Morris committed
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42


@app.on_event("startup")
async def app_startup():
    session_secret = aurweb.config.get("fastapi", "session_secret")
    if not session_secret:
        raise Exception("[fastapi] session_secret must not be empty")

    app.mount("/static/css",
              StaticFiles(directory="web/html/css"),
              name="static_css")
    app.mount("/static/js",
              StaticFiles(directory="web/html/js"),
              name="static_js")
    app.mount("/static/images",
              StaticFiles(directory="web/html/images"),
              name="static_images")

43
44
    # Add application middlewares.
    app.add_middleware(AuthenticationMiddleware, backend=BasicAuthBackend())
Kevin Morris's avatar
Kevin Morris committed
45
    app.add_middleware(SessionMiddleware, secret_key=session_secret)
46
47

    # Add application routes.
Kevin Morris's avatar
Kevin Morris committed
48
49
    app.include_router(sso.router)
    app.include_router(html.router)
50
    app.include_router(auth.router)
Kevin Morris's avatar
Kevin Morris committed
51
    app.include_router(accounts.router)
Kevin Morris's avatar
Kevin Morris committed
52
    app.include_router(trusted_user.router)
53
    app.include_router(rss.router)
Kevin Morris's avatar
Kevin Morris committed
54

55
    # Initialize the database engine and ORM.
Kevin Morris's avatar
Kevin Morris committed
56
    get_engine()
57

58
59
60
61
62
63
64
65
66
67

@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
    """
    Dirty HTML error page to replace the default JSON error responses.
    In the future this should use a proper Arch-themed HTML template.
    """
    phrase = http.HTTPStatus(exc.status_code).phrase
    return HTMLResponse(f"<h1>{exc.status_code} {phrase}</h1><p>{exc.detail}</p>",
                        status_code=exc.status_code)
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86


@app.middleware("http")
async def add_security_headers(request: Request, call_next: typing.Callable):
    """ This middleware adds the CSP, XCTO, XFO and RP security
    headers to the HTTP response associated with request.

    CSP: Content-Security-Policy
    XCTO: X-Content-Type-Options
    RP: Referrer-Policy
    XFO: X-Frame-Options
    """
    response = asyncio.create_task(call_next(request))
    await asyncio.wait({response}, return_when=asyncio.FIRST_COMPLETED)
    response = response.result()

    # Add CSP header.
    nonce = request.user.nonce
    csp = "default-src 'self'; "
87
    script_hosts = []
88
    csp += f"script-src 'self' 'nonce-{nonce}' " + ' '.join(script_hosts)
89
    # It's fine if css is inlined.
90
    csp += "; style-src 'self' 'unsafe-inline'"
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
    response.headers["Content-Security-Policy"] = csp

    # Add XTCO header.
    xcto = "nosniff"
    response.headers["X-Content-Type-Options"] = xcto

    # Add Referrer Policy header.
    rp = "same-origin"
    response.headers["Referrer-Policy"] = rp

    # Add X-Frame-Options header.
    xfo = "SAMEORIGIN"
    response.headers["X-Frame-Options"] = xfo

    return response
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124


@app.middleware("http")
async def check_terms_of_service(request: Request, call_next: typing.Callable):
    """ This middleware function redirects authenticated users if they
    have any outstanding Terms to agree to. """
    if request.user.is_authenticated() and request.url.path != "/tos":
        unaccepted = query(Term).join(AcceptedTerm).filter(
            or_(AcceptedTerm.UsersID != request.user.ID,
                and_(AcceptedTerm.UsersID == request.user.ID,
                     AcceptedTerm.TermsID == Term.ID,
                     AcceptedTerm.Revision < Term.Revision)))
        if query(Term).count() > unaccepted.count():
            return RedirectResponse(
                "/tos", status_code=int(http.HTTPStatus.SEE_OTHER))

    task = asyncio.create_task(call_next(request))
    await asyncio.wait({task}, return_when=asyncio.FIRST_COMPLETED)
    return task.result()
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144


@app.middleware("http")
async def id_redirect_middleware(request: Request, call_next: typing.Callable):
    id = request.query_params.get("id")

    if id is not None:
        # Preserve query string.
        qs = []
        for k, v in request.query_params.items():
            if k != "id":
                qs.append(f"{k}={quote_plus(str(v))}")
        qs = str() if not qs else '?' + '&'.join(qs)

        path = request.url.path.rstrip('/')
        return RedirectResponse(f"{path}/{id}{qs}")

    task = asyncio.create_task(call_next(request))
    await asyncio.wait({task}, return_when=asyncio.FIRST_COMPLETED)
    return task.result()