Commit 4e9ef6fb authored by Kevin Morris's avatar Kevin Morris
Browse files

add account edit (settings) routes



* Added account_url filter to jinja2 environment. This produces a path
  to the user's account url (/account/{username}).
* Updated archdev-navbar to link to new edit route.
+ Added migrate_cookies(request, response) to aurweb.util, a function
  that simply migrates the request cookies to response and returns it.
+ Added account_edit tests to test_accounts_routes.py.
Signed-off-by: Kevin Morris's avatarKevin Morris <kevr@0cost.org>
parent d323c1f9
import copy
from datetime import datetime
from http import HTTPStatus
from fastapi import APIRouter, Form, Request
......@@ -284,6 +285,7 @@ def make_account_form_context(context: dict,
context["cn"] = args.get("CN", user.CommentNotify)
context["un"] = args.get("UN", user.UpdateNotify)
context["on"] = args.get("ON", user.OwnershipNotify)
context["inactive"] = args.get("J", user.InactivityTS != 0)
else:
context["username"] = args.get("U", str())
context["account_type"] = args.get("T", user_account_type_id)
......@@ -301,6 +303,7 @@ def make_account_form_context(context: dict,
context["cn"] = args.get("CN", True)
context["un"] = args.get("UN", False)
context["on"] = args.get("ON", True)
context["inactive"] = args.get("J", False)
context["password"] = args.get("P", str())
context["confirm"] = args.get("C", str())
......@@ -409,3 +412,145 @@ async def account_register_post(request: Request,
context["complete"] = True
context["user"] = user
return render_template(request, "register.html", context)
def cannot_edit(request, user):
""" Return a 401 HTMLResponse if the request user doesn't
have authorization, otherwise None. """
has_dev_cred = request.user.has_credential("CRED_ACCOUNT_EDIT_DEV",
approved=[user])
if not has_dev_cred:
return HTMLResponse(status_code=int(HTTPStatus.UNAUTHORIZED))
return None
@router.get("/account/{username}/edit", response_class=HTMLResponse)
@auth_required(True)
async def account_edit(request: Request,
username: str):
user = db.query(User, User.Username == username).first()
response = cannot_edit(request, user)
if response:
return response
context = await make_variable_context(request, "Accounts")
context["user"] = user
context = make_account_form_context(context, request, user, dict())
return render_template(request, "account/edit.html", context)
@router.post("/account/{username}/edit", response_class=HTMLResponse)
@auth_required(True)
async def account_edit_post(request: Request,
username: str,
U: str = Form(default=str()), # Username
J: bool = Form(default=False),
E: str = Form(default=str()), # Email
H: str = Form(default=False), # Hide Email
BE: str = Form(default=None), # Backup Email
R: str = Form(default=None), # Real Name
HP: str = Form(default=None), # Homepage
I: str = Form(default=None), # IRC Nick
K: str = Form(default=None), # PGP Key
L: str = Form(aurweb.config.get(
"options", "default_lang")),
TZ: str = Form(aurweb.config.get(
"options", "default_timezone")),
P: str = Form(default=str()), # New Password
C: str = Form(default=None), # Password Confirm
PK: str = Form(default=None), # PubKey
CN: bool = Form(default=False), # Comment Notify
UN: bool = Form(default=False), # Update Notify
ON: bool = Form(default=False), # Owner Notify
passwd: str = Form(default=str())):
from aurweb.db import session
user = session.query(User).filter(User.Username == username).first()
response = cannot_edit(request, user)
if response:
return response
context = await make_variable_context(request, "Accounts")
context["user"] = user
if not passwd:
context["errors"] = ["Invalid password."]
return render_template(request, "account/edit.html", context,
status_code=int(HTTPStatus.BAD_REQUEST))
args = dict(await request.form())
context = make_account_form_context(context, request, user, args)
ok, errors = process_account_form(request, user, args)
if not ok:
context["errors"] = errors
return render_template(request, "account/edit.html", context,
status_code=int(HTTPStatus.BAD_REQUEST))
# Set all updated fields as needed.
user.Username = U or user.Username
user.Email = E or user.Email
user.HideEmail = bool(H)
user.BackupEmail = BE or user.BackupEmail
user.RealName = R or user.RealName
user.Homepage = HP or user.Homepage
user.IRCNick = I or user.IRCNick
user.PGPKey = K or user.PGPKey
user.InactivityTS = datetime.utcnow().timestamp() if J else 0
# If we update the language, update the cookie as well.
if L and L != user.LangPreference:
request.cookies["AURLANG"] = L
user.LangPreference = L
context["language"] = L
# If we update the timezone, also update the cookie.
if TZ and TZ != user.Timezone:
user.Timezone = TZ
request.cookies["AURTZ"] = TZ
context["timezone"] = TZ
user.CommentNotify = bool(CN)
user.UpdateNotify = bool(UN)
user.OwnershipNotify = bool(ON)
# If a PK is given, compare it against the target user's PK.
if PK:
# Get the second token in the public key, which is the actual key.
pubkey = PK.strip().rstrip()
fingerprint = get_fingerprint(pubkey)
if not user.ssh_pub_key:
# No public key exists, create one.
user.ssh_pub_key = SSHPubKey(UserID=user.ID,
PubKey=PK,
Fingerprint=fingerprint)
elif user.ssh_pub_key.Fingerprint != fingerprint:
# A public key already exists, update it.
user.ssh_pub_key.PubKey = PK
user.ssh_pub_key.Fingerprint = fingerprint
elif user.ssh_pub_key:
# Else, if the user has a public key already, delete it.
session.delete(user.ssh_pub_key)
# Commit changes, if any.
session.commit()
if P and not user.valid_password(P):
# Remove the fields we consumed for passwords.
context["P"] = context["C"] = str()
# If a password was given and it doesn't match the user's, update it.
user.update_password(P)
if user == request.user:
# If the target user is the request user, login with
# the updated password and update AURSID.
request.cookies["AURSID"] = user.login(request, P)
if not errors:
context["complete"] = True
# Update cookies with requests, in case they were changed.
response = render_template(request, "account/edit.html", context)
return util.migrate_cookies(request, response)
>>>>>> > dddd1137... add account edit(settings) routes
......@@ -12,7 +12,7 @@ from fastapi.responses import HTMLResponse
import aurweb.config
from aurweb import captcha, l10n, time
from aurweb import captcha, l10n, time, util
# Prepare jinja2 objects.
loader = jinja2.FileSystemLoader(os.path.join(
......@@ -27,6 +27,9 @@ env.filters["tr"] = l10n.tr
env.filters["captcha_salt"] = captcha.captcha_salt_filter
env.filters["captcha_cmdline"] = captcha.captcha_cmdline_filter
# Add account utility filters.
env.filters["account_url"] = util.account_url
def make_context(request: Request, title: str, next: str = None):
""" Create a context for a jinja2 TemplateResponse. """
......
......@@ -82,6 +82,12 @@ def valid_ssh_pubkey(pk):
return base64.b64encode(base64.b64decode(tokens[1])).decode() == tokens[1]
def migrate_cookies(request, response):
for k, v in request.cookies.items():
response.set_cookie(k, v)
return response
@jinja2.contextfilter
def account_url(context, user):
request = context.get("request")
......
{% extends "partials/layout.html" %}
{% block pageContent %}
<div class="box">
<h2>{% trans %}Accounts{% endtrans %}</h2>
{% if complete %}
{{
"The account, %s%s%s, has been successfully modified."
| tr
| format("<strong>", user.Username, "</strong>")
| safe
}}
{% else %}
{% if errors %}
{% include "partials/error.html" %}
{% else %}
<p>
{{ "Click %shere%s if you want to permanently delete this account."
| tr
| format('<a href="%s/delete">' | format(user | account_url),
"</a>")
| safe
}}
{{ "Click %shere%s for user details."
| tr
| format('<a href="%s">' | format(user | account_url),
"</a>")
| safe
}}
{{ "Click %shere%s to list the comments made by this account."
| tr
| format('<a href="%s/comments">' | format(user | account_url),
"</a>")
| safe
}}
</p>
{% endif %}
{% set form_type = "UpdateAccount" %}
{% include "partials/account_form.html" %}
{% endif %}
</div>
{% endblock %}
......@@ -42,6 +42,15 @@
"account is inactive." | tr }}</em>
</p>
<p>
<label for="id_inactive">{% trans %}Inactive{% endtrans %}:</label>
<input id="id_inactive" type="checkbox" name="J"
{% if inactive %}
checked="checked"
{% endif %}
>
</p>
{% if request.user.has_credential("CRED_ACCOUNT_CHANGE_TYPE") %}
<p>
<label for="id_type">
......
......@@ -6,16 +6,28 @@
<li><a href="/">AUR {% trans %}Home{% endtrans %}</a></li>
{% endif %}
<li><a href="/packages/">{% trans %}Packages{% endtrans %}</a></li>
<li><a href="/register/">{% trans %}Register{% endtrans %}</a></li>
<li>
{% if request.user.is_authenticated() %}
{% if request.user.is_authenticated() %}
<li>
<a href="/account/{{ request.user.Username }}/edit">
{% trans %} My Account{% endtrans %}
</a>
</li>
<li>
<a href="/logout/?next={{ next }}">
{% trans %}Logout{% endtrans %}
</a>
{% else %}
</li>
{% else %}
<li>
<a href="/register">
{% trans %}Register{% endtrans %}
</a>
</li>
<li>
<a href="/login/?next={{ next }}">
{% trans %}Login{% endtrans %}
</a>
</li>
{% endif %}
</li>
</ul>
......
......@@ -5,6 +5,7 @@ from datetime import datetime
from http import HTTPStatus
from subprocess import Popen
import lxml.html
import pytest
from fastapi.testclient import TestClient
......@@ -574,3 +575,298 @@ def test_post_register_with_ssh_pubkey():
response = post_register(request, PK=pk)
assert response.status_code == int(HTTPStatus.OK)
def test_get_account_edit():
request = Request()
sid = user.login(request, "testPassword")
with client as request:
response = request.get("/account/test/edit", cookies={
"AURSID": sid
}, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
def test_get_account_edit_unauthorized():
request = Request()
sid = user.login(request, "testPassword")
create(User, Username="test2", Email="test2@example.org",
Passwd="testPassword")
with client as request:
# Try to edit `test2` while authenticated as `test`.
response = request.get("/account/test2/edit", cookies={
"AURSID": sid
}, allow_redirects=False)
assert response.status_code == int(HTTPStatus.UNAUTHORIZED)
def test_post_account_edit():
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test666@example.org",
"passwd": "testPassword"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
expected = "The account, <strong>test</strong>, "
expected += "has been successfully modified."
assert expected in response.content.decode()
def test_post_account_edit_dev():
from aurweb.db import session
# Modify our user to be a "Trusted User & Developer"
name = "Trusted User & Developer"
tu_or_dev = query(AccountType, AccountType.AccountType == name).first()
user.AccountType = tu_or_dev
session.commit()
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test666@example.org",
"passwd": "testPassword"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
expected = "The account, <strong>test</strong>, "
expected += "has been successfully modified."
assert expected in response.content.decode()
def test_post_account_edit_language():
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test@example.org",
"L": "de", # German
"passwd": "testPassword"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
# Parse the response content html into an lxml root, then make
# sure we see a 'de' option selected on the page.
content = response.content.decode()
root = lxml.html.fromstring(content)
lang_nodes = root.xpath('//option[@value="de"]/@selected')
assert lang_nodes and len(lang_nodes) != 0
assert lang_nodes[0] == "selected"
def test_post_account_edit_timezone():
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test@example.org",
"TZ": "CET",
"passwd": "testPassword"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
def test_post_account_edit_error_missing_password():
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test@example.org",
"TZ": "CET",
"passwd": ""
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert "Invalid password." in content
def test_post_account_edit_error_invalid_password():
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test@example.org",
"TZ": "CET",
"passwd": "invalid"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
content = response.content.decode()
assert "Invalid password." in content
def test_post_account_edit_error_unauthorized():
request = Request()
sid = user.login(request, "testPassword")
test2 = create(User, Username="test2", Email="test2@example.org",
Passwd="testPassword")
post_data = {
"U": "test",
"E": "test@example.org",
"TZ": "CET",
"passwd": "testPassword"
}
with client as request:
# Attempt to edit 'test2' while logged in as 'test'.
response = request.post("/account/test2/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.UNAUTHORIZED)
def test_post_account_edit_ssh_pub_key():
pk = str()
# Create a public key with ssh-keygen (this adds ssh-keygen as a
# dependency to passing this test).
with tempfile.TemporaryDirectory() as tmpdir:
with open("/dev/null", "w") as null:
proc = Popen(["ssh-keygen", "-f", f"{tmpdir}/test.ssh", "-N", ""],
stdout=null, stderr=null)
proc.wait()
assert proc.returncode == 0
# Read in the public key, then delete the temp dir we made.
pk = open(f"{tmpdir}/test.ssh.pub").read().rstrip()
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test@example.org",
"PK": pk,
"passwd": "testPassword"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
# Now let's update what's already there to gain coverage over that path.
pk = str()
with tempfile.TemporaryDirectory() as tmpdir:
with open("/dev/null", "w") as null:
proc = Popen(["ssh-keygen", "-f", f"{tmpdir}/test.ssh", "-N", ""],
stdout=null, stderr=null)
proc.wait()
assert proc.returncode == 0
# Read in the public key, then delete the temp dir we made.
pk = open(f"{tmpdir}/test.ssh.pub").read().rstrip()
post_data["PK"] = pk
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
def test_post_account_edit_invalid_ssh_pubkey():
pubkey = "ssh-rsa fake key"
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test@example.org",
"P": "newPassword",
"C": "newPassword",
"PK": pubkey,
"passwd": "testPassword"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
def test_post_account_edit_password():
request = Request()
sid = user.login(request, "testPassword")
post_data = {
"U": "test",
"E": "test@example.org",
"P": "newPassword",
"C": "newPassword",
"passwd": "testPassword"
}
with client as request:
response = request.post("/account/test/edit", cookies={
"AURSID": sid
}, data=post_data, allow_redirects=False)
assert response.status_code == int(HTTPStatus.OK)
assert user.valid_password("newPassword")
>>>>>> > dddd1137... add account edit(settings) routes