Skip to content
Snippets Groups Projects
Commit c2d3dc1d authored by Hunter Wittenborn's avatar Hunter Wittenborn
Browse files

Added info and multiinfo types for /rpc

parent db2718fc
No related branches found
No related tags found
No related merge requests found
Pipeline #11118 passed
......@@ -18,7 +18,7 @@ from aurweb.auth import BasicAuthBackend
from aurweb.db import get_engine, query
from aurweb.models.accepted_term import AcceptedTerm
from aurweb.models.term import Term
from aurweb.routers import accounts, auth, errors, html, packages, rss, sso, trusted_user
from aurweb.routers import accounts, auth, errors, html, packages, rpc, rss, sso, trusted_user
# Setup the FastAPI app.
app = FastAPI(exception_handlers=errors.exceptions)
......@@ -52,7 +52,7 @@ async def app_startup():
app.include_router(trusted_user.router)
app.include_router(rss.router)
app.include_router(packages.router)
app.include_router(rpc.router)
# Initialize the database engine and ORM.
get_engine()
......
from typing import List, Optional
from fastapi import APIRouter, Query, Request
from aurweb.rpc import RPC
router = APIRouter()
def arg_legacy_gen(request):
# '[]' characters in the path randomly kept getting transformed to (what
# appears to be) their HTML-formatted variants, so we keep that behavior
# just in case.
arguments = request.url.query.replace("%5B%5D", "[]").split("&")
arguments.reverse()
temp_args = []
for i in arguments:
# We only want to deal with 'arg' and 'arg[]' strings, so only take those.
if i.split("=")[0] in ("arg", "arg[]"):
temp_args += [i]
returned_arguments = []
argument_bracketed = False
for i in temp_args:
# Split argument on first occurance of '='.
current_argument = i.split("=")
argument_name = current_argument[0]
argument_value = "".join(current_argument[1:])
# Process argument.
if argument_name == "arg[]":
returned_arguments += [argument_value]
argument_bracketed = True
elif argument_name == "arg":
# Only set this argument if 'arg[]' hasen't previously been found.
if not argument_bracketed:
returned_arguments = [argument_value]
break
return returned_arguments
@router.get("/rpc")
async def rpc(request: Request,
v: Optional[int] = Query(None),
type: Optional[str] = Query(None),
arg: Optional[str] = Query(None),
args: Optional[List[str]] = Query(None, alias="arg[]")):
# Ensure valid version was passed
if v is None:
return {"error": "Please specify an API version."}
elif v != 5:
return {"error": "Invalid version specified."}
# The PHP implementation sets the type to 'multiinfo' when the type is set to 'info'.
if type == "info":
type = "multiinfo"
# Defaults for returned data
returned_data = {}
returned_data["version"] = v
returned_data["results"] = []
returned_data["resultcount"] = 0
returned_data["type"] = type
# Ensure type is valid.
if type is None:
returned_data["type"] = "error"
returned_data["error"] = "No request type/data specified."
return returned_data
elif type not in ("info", "multiinfo"):
returned_data["type"] = "error"
returned_data["error"] = "Incorrect request type specified."
return returned_data
# Take arguments from either 'args' or 'args[]' and put them into 'argument_list'.
argument_list = []
# In the PHP implementation, aurweb uses the last 'arg' value or all the
# last 'arg[]' values when both 'arg' and 'arg[]' are part of the query
# request. We thus preserve that behavior here for legacy purposes.
if arg is not None and args is not None:
argument_list = arg_legacy_gen(request)
elif arg is not None:
argument_list = [arg]
elif args is not None:
argument_list = args
else:
# Abort because no package arguments were passed.
returned_data["type"] = "error"
returned_data["error"] = "No request type/data specified."
return returned_data
# Process and return data
returned_data = RPC(v=v,
type=type,
argument_list=argument_list,
returned_data=returned_data)
return returned_data
import aurweb.config as config
from aurweb import db
from aurweb.models.dependency_type import CHECKDEPENDS_ID, DEPENDS_ID, MAKEDEPENDS_ID, OPTDEPENDS_ID
from aurweb.models.license import License
from aurweb.models.package import Package
from aurweb.models.package_base import PackageBase
from aurweb.models.package_dependency import PackageDependency
from aurweb.models.package_keyword import PackageKeyword
from aurweb.models.package_license import PackageLicense
from aurweb.models.package_relation import PackageRelation
from aurweb.models.package_vote import PackageVote
from aurweb.models.relation_type import CONFLICTS_ID, PROVIDES_ID, REPLACES_ID
from aurweb.models.user import User
# Define dependency types.
DEP_TYPES = {
DEPENDS_ID: "Depends",
MAKEDEPENDS_ID: "MakeDepends",
CHECKDEPENDS_ID: "CheckDepends",
OPTDEPENDS_ID: "OptDepends"
}
# Define relationship types.
REL_TYPES = {
CONFLICTS_ID: "Conflicts",
PROVIDES_ID: "Provides",
REPLACES_ID: "Replaces"
}
# Define functions for request types.
def add_deps(current_array, db_dep):
if db_dep.count() > 0:
# Create lists for all dependency types.
for i in DEP_TYPES.values():
current_array[i] = []
# Generate each dependency item in list.
for i in db_dep.all():
dep_string = i.DepName
# Add relationship version restrictor (i.e. '<=5') if it exists.
if i.DepCondition is not None:
dep_string += i.DepCondition
# Add item to list.
current_deptype = DEP_TYPES.get(i.DepTypeID)
current_array[current_deptype] += [dep_string]
# Remove any dependency lists that are empty.
for i in DEP_TYPES.values():
if current_array[i] == []:
current_array.pop(i)
return current_array
def add_rels(current_array, db_rel):
if db_rel.count() > 0:
# Create lists for all relationship types.
for i in REL_TYPES.values():
current_array[i] = []
# Generate each relationship item in list.
for i in db_rel.all():
rel_string = i.RelName
# Add relationship version restrictor (i.e. '<=5') if it exists.
if i.RelCondition is not None:
rel_string += i.RelCondition
# Add item to list.
current_reltype = REL_TYPES.get(i.RelTypeID)
current_array[current_reltype] += [rel_string]
# Remove any relationship lists that are empty.
for i in REL_TYPES.values():
if current_array[i] == []:
current_array.pop(i)
return current_array
def run_info(returned_data, package_name, snapshot_uri):
# Get package name.
db_package = db.query(Package).filter(Package.Name == package_name)
if db_package.count() == 0:
return returned_data
db_package = db_package.first()
# Get name of package under PackageBaseID.
db_package_baseid = db.query(PackageBase).filter(PackageBase.ID == db_package.PackageBaseID).first()
# Get maintainer info.
db_package_maintainer = db.query(User).filter(User.ID == db_package_baseid.MaintainerUID).first()
current_array = {}
returned_data["resultcount"] = returned_data["resultcount"] + 1
# Data from the Packages table.
current_array["ID"] = db_package.ID
current_array["Name"] = db_package.Name
current_array["PackageBaseID"] = db_package.PackageBaseID
current_array["Version"] = db_package.Version
current_array["Description"] = db_package.Description
current_array["URL"] = db_package.URL
# PackageBase table.
current_array["PackageBase"] = db_package_baseid.Name
current_array["NumVotes"] = db_package_baseid.NumVotes
current_array["Popularity"] = db_package_baseid.Popularity
current_array["OutOfDate"] = db_package_baseid.OutOfDateTS
current_array["FirstSubmitted"] = db_package_baseid.SubmittedTS
current_array["LastModified"] = db_package_baseid.ModifiedTS
# User table.
try:
current_array["Maintainer"] = db_package_maintainer.Username
except AttributeError:
current_array["Maintainer"] = None
# Generate and add snapshot_uri.
current_array["URLPath"] = snapshot_uri.replace("%s", package_name)
# Add package votes.
current_array["NumVotes"] = db.query(PackageVote).count()
# Generate dependency listing.
db_dep = db.query(PackageDependency).filter(PackageDependency.PackageID == db_package.ID)
current_array = add_deps(current_array, db_dep)
# Generate relationship listing.
db_rel = db.query(PackageRelation).filter(PackageRelation.PackageID == db_package.ID)
current_array = add_rels(current_array, db_rel)
# License table.
current_array["License"] = []
for i in db.query(PackageLicense).filter(PackageLicense.PackageID == db_package.ID):
current_array["License"] += [db.query(License).first().Name]
# Keywords table.
current_array["Keywords"] = []
for i in db.query(PackageKeyword).filter(PackageKeyword.PackageBaseID == db_package_baseid.ID):
current_array["Keywords"] += [i.Keyword]
# Add current array to returned results.
returned_data["results"] += [current_array]
return returned_data
def RPC(**function_args):
# Get arguments.
#
# We'll use 'v' in the future when we add v6.
# v = function_args.get("v")
type = function_args.get("type")
args = function_args.get("argument_list")
returned_data = function_args.get("returned_data")
# Get Snapshot URI
snapshot_uri = config.get("options", "snapshot_uri")
# Remove duplicate arguments if type is 'multiinfo' so we don't fetch
# results for a package multiple times.
#
# Note that the type is set to 'multiinfo' when 'type=info' is passed.
if type == "multiinfo":
args = set(args)
# Set request type to run.
type_actions = {
"multiinfo": run_info
}
# This if statement should always be executed, as we checked if the
# specified type was valid in aurweb/routers/rpc.py.
if type in type_actions:
run_request = type_actions.get(type)
for i in args:
returned_data = run_request(returned_data, i, snapshot_uri)
return returned_data
import orjson
from fastapi.testclient import TestClient
from aurweb.asgi import app
from aurweb.db import begin, create, query
from aurweb.models.account_type import AccountType
from aurweb.models.dependency_type import DependencyType
from aurweb.models.license import License
from aurweb.models.package import Package
from aurweb.models.package_base import PackageBase
from aurweb.models.package_dependency import PackageDependency
from aurweb.models.package_keyword import PackageKeyword
from aurweb.models.package_license import PackageLicense
from aurweb.models.package_relation import PackageRelation
from aurweb.models.package_vote import PackageVote
from aurweb.models.relation_type import RelationType
from aurweb.models.user import User
from aurweb.testing import setup_test_db
def make_request(path):
with TestClient(app) as request:
return request.get(path)
def setup():
# Set up tables.
setup_test_db("Users", "PackageBases", "Packages", "Licenses",
"PackageDepends", "PackageRelations", "PackageLicenses",
"PackageKeywords", "PackageVotes")
# Create test package details.
with begin():
# Get ID types.
account_type = query(AccountType, AccountType.AccountType == "User").first()
dependency_depends = query(DependencyType, DependencyType.Name == "depends").first()
dependency_optdepends = query(DependencyType, DependencyType.Name == "optdepends").first()
dependency_makedepends = query(DependencyType, DependencyType.Name == "makedepends").first()
dependency_checkdepends = query(DependencyType, DependencyType.Name == "checkdepends").first()
relation_conflicts = query(RelationType, RelationType.Name == "conflicts").first()
relation_provides = query(RelationType, RelationType.Name == "provides").first()
relation_replaces = query(RelationType, RelationType.Name == "replaces").first()
# Create database info.
user1 = create(User,
Username="user1",
Email="user1@example.com",
RealName="Test User 1",
Passwd="testPassword",
AccountType=account_type)
user2 = create(User,
Username="user2",
Email="user2@example.com",
RealName="Test User 2",
Passwd="testPassword",
AccountType=account_type)
user3 = create(User,
Username="user3",
Email="user3@example.com",
RealName="Test User 3",
Passwd="testPassword",
AccountType=account_type)
pkgbase1 = create(PackageBase, Name="big-chungus", Maintainer=user1)
pkgname1 = create(Package,
PackageBase=pkgbase1,
Name=pkgbase1.Name,
Description="Bunny bunny around bunny",
URL="https://example.com/")
pkgbase2 = create(PackageBase, Name="chungy-chungus", Maintainer=user1)
pkgname2 = create(Package,
PackageBase=pkgbase2,
Name=pkgbase2.Name,
Description="Wubby wubby on wobba wuubu",
URL="https://example.com/")
pkgbase3 = create(PackageBase, Name="gluggly-chungus", Maintainer=user1)
create(Package,
PackageBase=pkgbase3,
Name=pkgbase3.Name,
Description="glurrba glurrba gur globba",
URL="https://example.com/")
pkgbase4 = create(PackageBase, Name="woogly-chungus", Maintainer=None)
create(Package,
PackageBase=pkgbase4,
Name=pkgbase4.Name,
Description="wuggla woblabeloop shemashmoop",
URL="https://example.com/")
# Dependencies.
create(PackageDependency,
Package=pkgname1,
DependencyType=dependency_depends,
DepName="chungus-depends")
create(PackageDependency,
Package=pkgname2,
DependencyType=dependency_depends,
DepName="chungy-depends")
create(PackageDependency,
Package=pkgname1,
DependencyType=dependency_optdepends,
DepName="chungus-optdepends",
DepCondition="=50")
create(PackageDependency,
Package=pkgname1,
DependencyType=dependency_makedepends,
DepName="chungus-makedepends")
create(PackageDependency,
Package=pkgname1,
DependencyType=dependency_checkdepends,
DepName="chungus-checkdepends")
# Relations.
create(PackageRelation,
Package=pkgname1,
RelationType=relation_conflicts,
RelName="chungus-conflicts")
create(PackageRelation,
Package=pkgname2,
RelationType=relation_conflicts,
RelName="chungy-conflicts")
create(PackageRelation,
Package=pkgname1,
RelationType=relation_provides,
RelName="chungus-provides",
RelCondition="<=200")
create(PackageRelation,
Package=pkgname1,
RelationType=relation_replaces,
RelName="chungus-replaces",
RelCondition="<=200")
license = create(License, Name="GPL")
create(PackageLicense,
Package=pkgname1,
License=license)
for i in ["big-chungus", "smol-chungus", "sizeable-chungus"]:
create(PackageKeyword,
PackageBase=pkgbase1,
Keyword=i)
for i in [user1, user2, user3]:
create(PackageVote,
User=i,
PackageBase=pkgbase1,
VoteTS=5000)
def test_rpc_singular_info():
# Define expected response.
expected_data = {
"version": 5,
"results": [{
"Name": "big-chungus",
"Version": "",
"Description": "Bunny bunny around bunny",
"URL": "https://example.com/",
"PackageBase": "big-chungus",
"NumVotes": 3,
"Popularity": 0.0,
"OutOfDate": None,
"Maintainer": "user1",
"URLPath": "/cgit/aur.git/snapshot/big-chungus.tar.gz",
"Depends": ["chungus-depends"],
"OptDepends": ["chungus-optdepends=50"],
"MakeDepends": ["chungus-makedepends"],
"CheckDepends": ["chungus-checkdepends"],
"Conflicts": ["chungus-conflicts"],
"Provides": ["chungus-provides<=200"],
"Replaces": ["chungus-replaces<=200"],
"License": ["GPL"],
"Keywords": [
"big-chungus",
"sizeable-chungus",
"smol-chungus"
]
}],
"resultcount": 1,
"type": "multiinfo"
}
# Make dummy request.
response_arg = make_request("/rpc/?v=5&type=info&arg=chungy-chungus&arg=big-chungus")
# Load request response into Python dictionary.
response_info_arg = orjson.loads(response_arg.content.decode())
# Remove the FirstSubmitted LastModified, ID and PackageBaseID keys from
# reponse, as the key's values aren't guaranteed to match between the two
# (the keys are already removed from 'expected_data').
for i in ["FirstSubmitted", "LastModified", "ID", "PackageBaseID"]:
response_info_arg["results"][0].pop(i)
# Validate that the new dictionaries are the same.
assert response_info_arg == expected_data
def test_rpc_nonexistent_package():
# Make dummy request.
response = make_request("/rpc/?v=5&type=info&arg=nonexistent-package")
# Load request response into Python dictionary.
response_data = orjson.loads(response.content.decode())
# Validate data.
assert response_data["resultcount"] == 0
def test_rpc_multiinfo():
# Make dummy request.
request_packages = ["big-chungus", "chungy-chungus"]
response = make_request("/rpc/?v=5&type=info&arg[]=big-chungus&arg[]=chungy-chungus")
# Load request response into Python dictionary.
response_data = orjson.loads(response.content.decode())
# Validate data.
for i in response_data["results"]:
request_packages.remove(i["Name"])
assert request_packages == []
def test_rpc_mixedargs():
# Make dummy request.
response1_packages = ["gluggly-chungus"]
response2_packages = ["gluggly-chungus", "chungy-chungus"]
response1 = make_request("/rpc/?v=5&arg[]=big-chungus&arg=gluggly-chungus&type=info")
response2 = make_request("/rpc/?v=5&arg=big-chungus&arg[]=gluggly-chungus&type=info&arg[]=chungy-chungus")
# Load request response into Python dictionary.
response1_data = orjson.loads(response1.content.decode())
response2_data = orjson.loads(response2.content.decode())
# Validate data.
for i in response1_data["results"]:
response1_packages.remove(i["Name"])
for i in response2_data["results"]:
response2_packages.remove(i["Name"])
for i in [response1_packages, response2_packages]:
assert i == []
def test_rpc_no_dependencies():
"""This makes sure things like 'MakeDepends' get removed from JSON strings
when they don't have set values."""
expected_response = {
'version': 5,
'results': [{
'Name': 'chungy-chungus',
'Version': '',
'Description': 'Wubby wubby on wobba wuubu',
'URL': 'https://example.com/',
'PackageBase': 'chungy-chungus',
'NumVotes': 3,
'Popularity': 0.0,
'OutOfDate': None,
'Maintainer': 'user1',
'URLPath': '/cgit/aur.git/snapshot/chungy-chungus.tar.gz',
'Depends': ['chungy-depends'],
'Conflicts': ['chungy-conflicts'],
'License': [],
'Keywords': []
}],
'resultcount': 1,
'type': 'multiinfo'
}
# Make dummy request.
response = make_request("/rpc/?v=5&type=info&arg=chungy-chungus")
response_data = orjson.loads(response.content.decode())
# Remove inconsistent keys.
for i in ["ID", "PackageBaseID", "FirstSubmitted", "LastModified"]:
response_data["results"][0].pop(i)
assert response_data == expected_response
def test_rpc_bad_type():
# Define expected response.
expected_data = {
'version': 5,
'results': [],
'resultcount': 0,
'type': 'error',
'error': 'Incorrect request type specified.'
}
# Make dummy request.
response = make_request("/rpc/?v=5&type=invalid-type&arg=big-chungus")
# Load request response into Python dictionary.
response_data = orjson.loads(response.content.decode())
# Validate data.
assert expected_data == response_data
def test_rpc_bad_version():
# Define expected response.
expected_data = {'error': 'Invalid version specified.'}
# Make dummy request.
response = make_request("/rpc/?v=0&type=info&arg=big-chungus")
# Load request response into Python dictionary.
response_data = orjson.loads(response.content.decode())
# Validate data.
assert expected_data == response_data
def test_rpc_no_version():
# Define expected response.
expected_data = {'error': 'Please specify an API version.'}
# Make dummy request.
response = make_request("/rpc/?type=info&arg=big-chungus")
# Load request response into Python dictionary.
response_data = orjson.loads(response.content.decode())
# Validate data.
assert expected_data == response_data
def test_rpc_no_type():
# Define expected response.
expected_data = {
'version': 5,
'results': [],
'resultcount': 0,
'type': 'error',
'error': 'No request type/data specified.'
}
# Make dummy request.
response = make_request("/rpc/?v=5&arg=big-chungus")
# Load request response into Python dictionary.
response_data = orjson.loads(response.content.decode())
# Validate data.
assert expected_data == response_data
def test_rpc_no_args():
# Define expected response.
expected_data = {
'version': 5,
'results': [],
'resultcount': 0,
'type': 'error',
'error': 'No request type/data specified.'
}
# Make dummy request.
response = make_request("/rpc/?v=5&type=info")
# Load request response into Python dictionary.
response_data = orjson.loads(response.content.decode())
# Validate data.
assert expected_data == response_data
def test_rpc_no_maintainer():
# Make dummy request.
response = make_request("/rpc/?v=5&type=info&arg=woogly-chungus")
# Load request response into Python dictionary.
response_data = orjson.loads(response.content.decode())
# Validate data.
assert response_data["results"][0]["Maintainer"] is None
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment