Source code for archivist.errors

"""Archivist exceptions

   All exceptions are derived from a base ArchivistError class.


import json
from logging import getLogger

from requests import Response

from .constants import HEADERS_RETRY_AFTER
from .headers import _headers_get

LOGGER = getLogger(__name__)

[docs] class ArchivistError(Exception): """Base exception for archivist package"""
[docs] class ArchivistBadFieldError(ArchivistError): """Incorrect field name in list() method"""
[docs] class ArchivistUnconfirmedError(ArchivistError): """asset or event failed to confirm after fixed timeout"""
[docs] class ArchivistInvalidOperationError(ArchivistError): """Runner Operation is invalid"""
[docs] class ArchivistBadRequestError(ArchivistError): """Ill-formed request or validation error (400)"""
[docs] class ArchivistDuplicateError(ArchivistError): """Read by signature returns more than one asset"""
[docs] class ArchivistUnauthenticatedError(ArchivistError): """user is unknown (401)"""
[docs] class ArchivistPaymentRequiredError(ArchivistError): """A quota has been reached (402)"""
[docs] class ArchivistForbiddenError(ArchivistError): """User does not have permission (403)"""
[docs] class ArchivistNotFoundError(ArchivistError): """Entity does not exist (404)"""
[docs] class ArchivistTooManyRequestsError(ArchivistError): """Too many requests in too short a time (429)""" def __init__(self, retry: "str|None", *args): self.retry = float(retry) if retry is not None else 0 super().__init__(*args)
[docs] class Archivist4xxError(ArchivistError): """Any other 4xx error"""
[docs] class ArchivistNotImplementedError(ArchivistError): """Illegal REST verb (501) or option"""
[docs] class ArchivistHeaderError(ArchivistError): """When the expected header is not received"""
[docs] class ArchivistUnavailableError(ArchivistError): """Service is unavailable (503)"""
[docs] class Archivist5xxError(ArchivistError): """Any other 5xx error"""
def __identity(response: Response) -> str: identity = "unknown" if response.request: LOGGER.debug("Request %s", response.request) req = response.request body = getattr(req, "body", None) if body: # when uploading a file the body attribute is a # MultiPartEncoder try: body = json.loads(body) except (TypeError, json.decoder.JSONDecodeError): pass else: identity = body.get("identity", "unknown") return identity def __description(response: Response) -> str: status_code = response.status_code if status_code == 404: return f"{__identity(response)} not found ({status_code})" text = response.text or "" url = getattr(response, "url", "") return f"{url}: {text} ({status_code})" def _parse_response(response: Response): """Parse REST response Validates REST response. This is a convenience function called by all REST calls. Args: response (response): response from underlying REST call Returns: suitable exception if validation fails, None otherwise """ status_code = response.status_code LOGGER.debug("Status %s", status_code) if status_code < 400: return None desc = __description(response) if status_code == 429: return ArchivistTooManyRequestsError( _headers_get(response.headers, HEADERS_RETRY_AFTER), desc, ) if 400 <= status_code < 500: err, arg = { 400: (ArchivistBadRequestError, desc), 401: (ArchivistUnauthenticatedError, desc), 402: (ArchivistPaymentRequiredError, desc), 403: (ArchivistForbiddenError, desc), 404: (ArchivistNotFoundError, desc), }.get(status_code, (Archivist4xxError, desc)) return err(arg) if 500 <= status_code < 600: err, arg = { 501: (ArchivistNotImplementedError, desc), 503: (ArchivistUnavailableError, desc), }.get(status_code, (Archivist5xxError, desc)) return err(arg) return ArchivistError(desc)