Source code for appknox.client

# (c) 2017, XYSec Labs

from functools import lru_cache
from functools import partial
from io import BytesIO
import logging
import os
import time
import typing
from typing import Dict, List
from urllib.parse import urljoin

import requests

from appknox.exceptions import AppknoxError
from appknox.exceptions import CredentialError
from appknox.exceptions import OneTimePasswordError
from appknox.exceptions import OrganizationError
from appknox.exceptions import ProfileReportPreferenceError
from appknox.exceptions import ReportError
from appknox.exceptions import RescanError
from appknox.exceptions import SubmissionError
from appknox.exceptions import SubmissionFileTimeoutError
from appknox.exceptions import SubmissionNotFound
from appknox.exceptions import UploadError
from appknox.mapper import Analysis
from appknox.mapper import File
from appknox.mapper import mapper_drf_api
from appknox.mapper import mapper_json_api
from appknox.mapper import Organization
from appknox.mapper import OWASP
from appknox.mapper import PCIDSS
from appknox.mapper import PersonalToken
from appknox.mapper import ProfileReportPreference
from appknox.mapper import Project
from appknox.mapper import Report
from appknox.mapper import ReportPreferenceMapper
from appknox.mapper import Submission
from appknox.mapper import User
from appknox.mapper import Vulnerability
from appknox.mapper import Whoami
from appknox.version import __version__

if typing.TYPE_CHECKING:
    from requests import Response

DEFAULT_API_HOST = "https://api.appknox.com"
API_BASE = "/api"
JSON_API_HEADERS = {"Accept": "application/vnd.api+json"}
DRF_API_HEADERS = {"Accept": "application/json"}


[docs]class Appknox(object): """ """ def __init__( self, username: str = None, password: str = None, user_id: int = None, organization_id: int = None, token: str = None, access_token: str = None, host: str = DEFAULT_API_HOST, log_level: int = logging.INFO, http_proxy: str = None, https_proxy: str = None, insecure: bool = False, ): """ Initialise Appknox client :param username: Username used to authenticate and fetch token :param password: Password used to authenticate and fetch token :param user_id: User ID. Set this only if a token is available :param token: Token. Set this only if a token is available :param host: API host. By default, https://api.appknox.com If a token is not available, set ``username`` and ``password`` and use the ``login`` method to authenticate. Otherwise, ``user_id`` and ``token`` can be used. """ logging.basicConfig(level=log_level) self.host = host self.username = username self.password = password self.user_id = user_id self.organization_id = organization_id self.token = token self.access_token = access_token self.request_session = requests.session() self.request_session.verify = not insecure # if insecure don't verify self.proxies = {} if http_proxy: self.proxies["http"] = http_proxy if https_proxy: self.proxies["https"] = https_proxy if self.proxies: self.request_session.proxies.update(self.proxies) if self.host is None: self.host = DEFAULT_API_HOST if self.access_token: token_header = {"Authorization": "Token {}".format(self.access_token)} self.json_api = ApiResource( self.request_session, host=self.host, headers={**JSON_API_HEADERS, **token_header}, ) self.drf_api = ApiResource( self.request_session, host=self.host, headers={**DRF_API_HEADERS, **token_header}, ) self.organization_id = self.get_organization_id() elif self.user_id and self.token: self.json_api = ApiResource( self.request_session, host=self.host, headers={**JSON_API_HEADERS}, auth=(self.user_id, self.token), ) self.drf_api = ApiResource( self.request_session, host=self.host, headers={**DRF_API_HEADERS}, auth=(self.user_id, self.token), ) self.organization_id = self.get_organization_id()
[docs] def login(self, otp: int = None): """ Authenticate with server and create session :param otp: One-time password, if account has MFA enabled """ if not self.username or not self.password: raise CredentialError("Both username and password are required") data = { "username": self.username, "password": self.password, } if otp: data["otp"] = str(otp) response = self.request_session.post(urljoin(self.host, "api/login"), data=data) if response.status_code == 401: raise OneTimePasswordError(response.json()["message"]) elif response.status_code == 403: raise CredentialError(response.json()["message"]) elif response.status_code != 200: raise AppknoxError("Unknown error") json = response.json() self.token = json["token"] self.user_id = str(json["user_id"]) self.access_token = self.generate_access_token().key self.json_api = ApiResource( self.request_session, host=self.host, headers={ **JSON_API_HEADERS, **{"Authorization": "Token {}".format(self.access_token)}, }, ) self.drf_api = ApiResource( self.request_session, host=self.host, headers={ **DRF_API_HEADERS, **{"Authorization": "Token {}".format(self.access_token)}, }, ) self.organization_id = self.get_organization_id()
[docs] def get_organization_id(self, organization_id: int = None) -> int: """ Return organization id if exists otherwise first entry of organizations """ orgs = self.get_organizations() try: if organization_id: filtered_orgs = [o for o in orgs if o.id == organization_id] else: filtered_orgs = orgs return filtered_orgs[0].id except Exception: raise OrganizationError("User doesn't have organization. Login Failed!")
[docs] def switch_organization(self, organization_id: int = None) -> bool: """ Switch organization_id of client instance """ org_id = int(organization_id) orgs = [o.id for o in self.get_organizations()] if len(orgs) and (org_id not in orgs): return False self.organization_id = organization_id return True
[docs] def generate_access_token(self): """ Generates personal access token """ access_token = self.request_session.post( urljoin(self.host, "api/personaltokens"), auth=(self.user_id, self.token), data={ "name": "appknox-python for {} @{}".format( self.username, str(int(time.time())) ) }, ) return mapper_json_api(PersonalToken, access_token.json())
[docs] def revoke_access_token(self): """ Revokes existing personal access token """ resp = self.request_session.get( urljoin(self.host, "api/personaltokens?key=" + self.access_token), auth=(self.user_id, self.token), ) resp_json = resp.json() personal_token = next((p for p in resp_json.get("data")), None) if not personal_token: return token_id = personal_token["id"] return self.request_session.delete( urljoin(self.host, "api/personaltokens/" + token_id), auth=(self.user_id, self.token), )
[docs] def get_user(self, user_id: int) -> User: """ Fetch user by user ID :param user_id: User ID """ user = self.json_api.users(user_id).get() return mapper_json_api(User, user)
[docs] def get_whoami(self) -> Whoami: """ Show session info """ whoami = self.drf_api.me().get() return mapper_drf_api(Whoami, whoami)
[docs] def paginated_data(self, response, mapper_class): """ """ initial_data = [ mapper_json_api(mapper_class, dict(data=value)) for value in response["data"] ] if not response.get("links"): return initial_data link = response["links"]["next"] while link is not None: resp = self.drf_api.direct_get(urljoin(self.host, link)) link = resp["links"]["next"] initial_data += [ mapper_json_api(mapper_class, dict(data=value)) for value in resp["data"] ] return initial_data
[docs] def paginated_drf_data(self, response, mapper_class): """ """ initial_data = [ mapper_drf_api(mapper_class, value) for value in response["results"] ] if not response.get("next"): return initial_data nxt = response["next"] while nxt is not None: resp = self.drf_api.direct_get(nxt) nxt = resp["next"] initial_data += [ mapper_drf_api(mapper_class, value) for value in resp["results"] ] return initial_data
[docs] def get_organizations(self) -> List[Organization]: """ List organizations for currently authenticated user """ organizations = self.drf_api.organizations().get(limit=-1) return self.paginated_drf_data(organizations, Organization)
[docs] def get_project(self, project_id: int) -> Project: """ Fetch project by project ID :param project_id: Project ID """ project = self.drf_api["v2/projects/{}".format(project_id)]().get() return mapper_drf_api(Project, project)
[docs] def get_projects( self, platform: int = None, package_name: str = "", search: str = "" ) -> List[Project]: """ List projects for currently authenticated user in the given organizations """ projects = self.drf_api[ "organizations/{}/projects".format(self.organization_id) ]().get(platform=platform, package_name=package_name, q=search) return self.paginated_drf_data(projects, Project)
[docs] def get_last_file(self, project_id: int, version_code: int = None) -> File: """ Fetch latest file for the project :param project_id: Project ID """ files = ( self.drf_api["projects/{}/files".format(project_id)]() .get(version_code=version_code, limit=1) .get("results", []) ) if not files: return None return mapper_drf_api(File, files[0])
[docs] def get_file(self, file_id: int) -> File: """ Fetch file by file ID :param file_id: File ID """ file = self.drf_api["v2/files/{}".format(file_id)]().get() return mapper_drf_api(File, file)
[docs] def get_files(self, project_id: int, version_code: int = None) -> List[File]: """ List files in project :param project_id: Project ID """ files = self.drf_api["projects/{}/files".format(project_id)]().get( version_code=version_code ) return self.paginated_drf_data(files, File)
[docs] def get_analyses(self, file_id: int) -> List[Analysis]: """ List analyses for file :param file_id: File ID """ analyses = self.drf_api["v2/files/{}/analyses".format(file_id)]().get() return self.paginated_drf_data(analyses, Analysis)
[docs] @lru_cache(maxsize=1) def get_vulnerabilities(self) -> List[Vulnerability]: """ """ total_vulnerabilities = self.drf_api["v2/vulnerabilities"]().get(limit=1)[ "count" ] # limit is 1 just to get total count vulnerabilities_raw = self.drf_api["v2/vulnerabilities"]().get( limit=total_vulnerabilities + 1 ) vulnerabilities = self.paginated_drf_data(vulnerabilities_raw, Vulnerability) return vulnerabilities
[docs] def get_vulnerability(self, vulnerability_id: int) -> Vulnerability: """ Fetch vulnerability by vulnerability ID :param vulnerability_id: vulnerability ID """ vulnerabilities = self.get_vulnerabilities() vulnerability = next( (x for x in vulnerabilities if x.id == vulnerability_id), None ) if vulnerability: return vulnerability vulnerability = self.drf_api["v2/vulnerabilities"](vulnerability_id).get() return mapper_drf_api(Vulnerability, vulnerability)
[docs] @lru_cache(maxsize=1) def get_owasps(self) -> List[OWASP]: """ """ owasps_raw = self.drf_api["v2/owasps"]().get() owasps = self.paginated_drf_data(owasps_raw, OWASP) return owasps
[docs] def get_owasp(self, owasp_id: str) -> OWASP: """ Fetch OWASP by ID :param owasp_id: OWASP ID """ owasps = self.get_owasps() owasp = next((x for x in owasps if x.id == owasp_id), None) if owasp: return owasp owasp = self.drf_api["v2/owasps"](owasp_id).get() return mapper_drf_api(OWASP, owasp)
[docs] @lru_cache(maxsize=1) def get_pcidsses(self) -> List[PCIDSS]: """ """ pcidsss_raw = self.drf_api["v2/pcidsses"]().get() pcidsss = self.paginated_drf_data(pcidsss_raw, PCIDSS) return pcidsss
[docs] def get_pcidss(self, pcidss_id: str) -> PCIDSS: """ Fetch pcidss by ID :param pcidss_id: pcidss ID """ pcidsses = self.get_pcidsses() pcidss = next((x for x in pcidsses if x.id == pcidss_id), None) if pcidss: return pcidss pcidss = self.drf_api["v2/pcidsses"](pcidss_id).get() return mapper_drf_api(PCIDSS, pcidss)
[docs] def upload_file(self, file_data: str) -> int: """ Upload and scan a package and returns the file_id :param file: Package file content to be uploaded and scanned """ response = self.drf_api[ "organizations/{}/upload_app".format(self.organization_id) ]().get() url = response["url"] self.request_session.put(url, data=file_data) response2 = self.drf_api[ "organizations/{}/upload_app".format(self.organization_id) ]().post( data=dict( file_key=response["file_key"], file_key_signed=response["file_key_signed"], url=response["url"], ) ) submission_id = response2["submission_id"] try: file_id = self.poll_for_file_from_submission_id(submission_id) except (SubmissionNotFound, SubmissionError): raise UploadError( "Something went wrong, try uploading the\ file again." ) return file_id
[docs] def poll_for_file_from_submission_id(self, submission_id: int) -> int: """ Using the submission id, keep checking its status. Returns file instance when it's available :param submission_id: The ID of the submission object created for the scan :return: The File ID """ file = None timeout = time.time() + 10 while file is None: submission = self.drf_api.submissions(submission_id).get() if submission.get("detail") == "Not found.": raise SubmissionNotFound() submission_obj = mapper_drf_api(Submission, submission) file = submission_obj.file if submission_obj.reason: raise SubmissionError(submission_obj.reason) if time.time() > timeout: raise SubmissionFileTimeoutError() return file
[docs] def recent_uploads(self) -> List[Submission]: """ List details of recent file uploads """ submissions = self.drf_api.submissions().get() return self.paginated_drf_data(submissions, Submission)[0:10]
[docs] def rescan(self, file_id: int) -> int: """ Start a rescan for a file id :param filed_id: File ID :return: The ID of the File created in rescan """ endpoint = "v2/files/{}/rescan".format(file_id) response = self.drf_api[endpoint]().post(data=dict()) submission_id = response["submission_id"] try: file = self.poll_for_file_from_submission_id(submission_id) except (SubmissionNotFound, SubmissionError): raise RescanError("Something went wrong, retry rescan") return file
[docs] def get_profile_report_preference(self, profile_id: int) -> ProfileReportPreference: """ Fetch profile report preference """ try: profile_report_preference = self.drf_api[ "profiles/{}/report_preference".format(profile_id) ]().get() except: raise ProfileReportPreferenceError( "Could not fetch profile report preference" ) return ProfileReportPreference.from_json(profile_report_preference)
[docs] def get_unselected_report_preference(self, file_id: int) -> list: """ Get a list of unselected report preference items """ file = self.get_file(file_id) profile_report_preference = self.get_profile_report_preference(file.profile) unselected_report_pref = list() if not profile_report_preference.show_gdpr.value: unselected_report_pref.append(ReportPreferenceMapper["show_gdpr"]) if not profile_report_preference.show_hipaa.value: unselected_report_pref.append(ReportPreferenceMapper["show_hipaa"]) if not profile_report_preference.show_pcidss.value: unselected_report_pref.append(ReportPreferenceMapper["show_pcidss"]) return unselected_report_pref
[docs] def list_reports(self, file_id: int) -> typing.List["Report"]: """ Lists the latest reports for a given file ID """ report_list_data = self.drf_api["v2/files/{}/reports".format(file_id)]().get() # successful API response will have paginated results if report_list_data.get("results", None) is None: raise ReportError("Could not fetch report list") return [Report.from_json(report) for report in report_list_data["results"]]
[docs] def create_report(self, file_id: int) -> "Report": """ Create a Report object and returns a report ID. The report ID can then be used to download reports in different formats. """ report_data = self.drf_api["v2/files/{}/reports".format(file_id)]().post( data={} ) if not report_data.get("id"): raise ReportError("Failed to create a report") return Report.from_json(report_data)
[docs] def get_summary_csv_report_url(self, report_id: int) -> str: """ Returns the absolute URL to download Report Summary in CSV format """ csv_url_data = self.drf_api[ "v2/reports/{}/summary_csv".format(report_id) ]().get() if csv_url_data.get("url") is None: raise ReportError("Failed to get report summary URL") return csv_url_data["url"]
[docs] def get_summary_excel_report_url(self, report_id: int) -> str: """ Returns the absolute URL to download Report Summary in Excel format """ excel_url_data = self.drf_api[ "v2/reports/{}/summary_excel".format(report_id) ]().get() if excel_url_data.get("url") is None: raise ReportError("Failed to get report summary URL") return excel_url_data["url"]
[docs] def download_report_data(self, url: str) -> "bytes": """ Downloads the resppnse body in bytes format for a given absolute URL """ report_data = self.drf_api[url]().direct_get_http_response(url) if report_data.status_code >= 400: raise ReportError("Could not Download Report Data") return BytesIO(report_data.content).getvalue()
[docs] def write_data_to_file(self, data: bytes, output_file_path: str) -> None: """ Write any data in bytes format to a given file location """ output_dir_path = os.path.dirname(output_file_path) if not output_dir_path: output_dir_path = "." try: os.makedirs(output_dir_path, exist_ok=True) with open(output_file_path, "wb") as f: f.write(data) except OSError: raise ValueError("Failed to write data to {}".format(output_file_path)) except: raise ValueError("Failed to write data to {}".format(output_file_path))
[docs]class ApiResource(object): """ Class to perform API requests """ def __init__( self, request_session: object, host: str = DEFAULT_API_HOST, headers: object = None, auth: Dict[str, str] = None, ): """ Initialise APIResource object """ self.host = host self.headers = {**headers} self.headers["User-Agent"] = f"appknox-python/{__version__}" self.auth = auth self.request_session = request_session self.endpoint = urljoin(host, API_BASE) def __getitem__(self, resource): """ """ return partial(self.set_endpoint, resource) def __getattr__(self, resource): """ """ return partial(self.set_endpoint, resource)
[docs] def set_endpoint(self, resource, resource_id=None): """ """ self.endpoint = "{}/{}".format(urljoin(self.host, API_BASE), resource) if resource_id: self.endpoint += "/{}".format(str(resource_id)) return self
[docs] def get(self, **kwargs): """ """ resp = self.request_session.get( self.endpoint, headers=self.headers, auth=self.auth, params=kwargs, ) return resp.json()
[docs] def post(self, data, content_type=None, **kwargs): """ """ resp = self.request_session.post( self.endpoint, headers=self.headers, auth=self.auth, params=kwargs, data=data, ) return resp.json()
[docs] def direct_get(self, url, **kwargs): """ """ resp = self.request_session.get( url, headers=self.headers, auth=self.auth, params=kwargs, ) return resp.json()
[docs] def direct_get_http_response(self, url: str, **kwargs) -> "Response": """ Return the raw HTTP Response from a given absolute URL """ return self.request_session.get( url, headers=self.headers, auth=self.auth, params=kwargs, )