"""Archivist exceptions
All exceptions are derived from a base ArchivistError class.
"""
import json
from logging import getLogger
from requests import Response
from .constants import HEADERS_RETRY_AFTER
from .headers import _headers_get
LOGGER = getLogger(__name__)
[docs]
class ArchivistError(Exception):
"""Base exception for archivist package"""
[docs]
class ArchivistBadFieldError(ArchivistError):
"""Incorrect field name in list() method"""
[docs]
class ArchivistUnconfirmedError(ArchivistError):
"""asset or event failed to confirm after fixed timeout"""
[docs]
class ArchivistInvalidOperationError(ArchivistError):
"""Runner Operation is invalid"""
[docs]
class ArchivistBadRequestError(ArchivistError):
"""Ill-formed request or validation error (400)"""
[docs]
class ArchivistDuplicateError(ArchivistError):
"""Read by signature returns more than one asset"""
[docs]
class ArchivistUnauthenticatedError(ArchivistError):
"""user is unknown (401)"""
[docs]
class ArchivistPaymentRequiredError(ArchivistError):
"""A quota has been reached (402)"""
[docs]
class ArchivistForbiddenError(ArchivistError):
"""User does not have permission (403)"""
[docs]
class ArchivistNotFoundError(ArchivistError):
"""Entity does not exist (404)"""
[docs]
class ArchivistTooManyRequestsError(ArchivistError):
"""Too many requests in too short a time (429)"""
def __init__(self, retry: "str|None", *args):
self.retry = float(retry) if retry is not None else 0
super().__init__(*args)
[docs]
class Archivist4xxError(ArchivistError):
"""Any other 4xx error"""
[docs]
class ArchivistNotImplementedError(ArchivistError):
"""Illegal REST verb (501) or option"""
[docs]
class ArchivistUnavailableError(ArchivistError):
"""Service is unavailable (503)"""
[docs]
class Archivist5xxError(ArchivistError):
"""Any other 5xx error"""
def __identity(response: Response) -> str:
identity = "unknown"
if response.request:
LOGGER.debug("Request %s", response.request)
req = response.request
body = getattr(req, "body", None)
if body:
# when uploading a file the body attribute is a
# MultiPartEncoder
try:
body = json.loads(body)
except (TypeError, json.decoder.JSONDecodeError):
pass
else:
identity = body.get("identity", "unknown")
return identity
def __description(response: Response) -> str:
status_code = response.status_code
if status_code == 404:
return f"{__identity(response)} not found ({status_code})"
text = response.text or ""
url = getattr(response, "url", "")
return f"{url}: {text} ({status_code})"
def _parse_response(response: Response):
"""Parse REST response
Validates REST response. This is a convenience function called
by all REST calls.
Args:
response (response): response from underlying REST call
Returns:
suitable exception if validation fails, None otherwise
"""
status_code = response.status_code
LOGGER.debug("Status %s", status_code)
if status_code < 400:
return None
desc = __description(response)
if status_code == 429:
return ArchivistTooManyRequestsError(
_headers_get(response.headers, HEADERS_RETRY_AFTER),
desc,
)
if 400 <= status_code < 500:
err, arg = {
400: (ArchivistBadRequestError, desc),
401: (ArchivistUnauthenticatedError, desc),
402: (ArchivistPaymentRequiredError, desc),
403: (ArchivistForbiddenError, desc),
404: (ArchivistNotFoundError, desc),
}.get(status_code, (Archivist4xxError, desc))
return err(arg)
if 500 <= status_code < 600:
err, arg = {
501: (ArchivistNotImplementedError, desc),
503: (ArchivistUnavailableError, desc),
}.get(status_code, (Archivist5xxError, desc))
return err(arg)
return ArchivistError(desc)