"""Subjects interface
Access to the subjects endpoint.
The user is not expected to use this class directly. It is an attribute of the
:class:`Archivist` class.
For example instantiate an Archivist instance and execute the methods of the class:
.. code-block:: python
with open(".auth_token", mode="r", encoding="utf-8") as tokenfile:
authtoken = tokenfile.read().strip()
# Initialize connection to Archivist
arch = Archivist(
"https://app.datatrails.ai",
authtoken,
)
subject = arch.subjects.create(...)
"""
from base64 import b64decode
from json import loads as json_loads
from logging import getLogger
from typing import TYPE_CHECKING, Any
# pylint:disable=cyclic-import # but pylint doesn't understand this feature
from . import subjects_confirmer
from .constants import (
SUBJECTS_LABEL,
SUBJECTS_SELF_ID,
SUBJECTS_SUBPATH,
)
from .dictmerge import _deepmerge
if TYPE_CHECKING:
from .archivist import Archivist
LOGGER = getLogger(__name__)
[docs]
class Subject(dict):
"""Subject object"""
[docs]
class _SubjectsClient:
"""SubjectsClient
Access to subjects entities using CRUD interface. This class is usually
accessed as an attribute of the Archivist class.
Args:
archivist (Archivist): :class:`Archivist` instance
"""
maxDiff = None
def __init__(self, archivist_instance: "Archivist"):
self._archivist = archivist_instance
self._subpath = f"{archivist_instance.root}/{SUBJECTS_SUBPATH}"
self._label = f"{self._subpath}/{SUBJECTS_LABEL}"
def __str__(self) -> str:
return f"SubjectsClient({self._archivist.url})"
[docs]
def create(
self,
display_name: str,
wallet_pub_key: "list[str]",
tessera_pub_key: "list[str]",
) -> Subject:
"""Create subject
Creates subject with defined attributes.
Args:
display_name (str): display name of subject.
wallet_pub_key (list): wallet public keys
tessera_pub_key (list): tessera public keys
Returns:
:class:`Subject` instance
"""
LOGGER.debug("Create Subject %s", display_name)
return self.create_from_data(
self.__params(
display_name=display_name,
wallet_pub_key=wallet_pub_key,
tessera_pub_key=tessera_pub_key,
),
)
[docs]
def share(
self, name: str, other_name: str, other_archivist: "Archivist"
) -> "tuple[Subject, Subject]":
"""Import the self subjects from the foreign archivist connection
from another organization - mutually share.
Args:
name (str): display_name of the foreign self subject in this archivist
other_name (str): display_name of the self subject in other archivist
other_archivist (Archivist): Archivist object
Returns:
2-tuple of :class:`Subject` instance
"""
subject1 = self.import_subject(
name, other_archivist.subjects.read(SUBJECTS_SELF_ID)
)
subject2 = other_archivist.subjects.import_subject(
other_name, self.read(SUBJECTS_SELF_ID)
)
subject1 = self.wait_for_confirmation(subject1["identity"])
subject2 = other_archivist.subjects.wait_for_confirmation(subject2["identity"])
return subject1, subject2
[docs]
def import_subject(self, display_name: str, subject: Subject) -> Subject:
"""Create subject from another subject usually
from another organization.
Args:
display_name (str): display_name of the subject
subject (Subject): Subject object
Returns:
:class:`Subject` instance
"""
return self.create(
display_name,
subject["wallet_pub_key"],
subject["tessera_pub_key"],
)
[docs]
def create_from_data(self, data: "dict[str, Any]") -> Subject:
"""Create subject
Creates subject with request body from data stream.
Suitable for reading data from a file using json.load or yaml.load
Args:
data (dict): request body of subject.
Returns:
:class:`Subject` instance
"""
LOGGER.debug("Create Subject from data %s", data)
return Subject(**self._archivist.post(self._label, data))
[docs]
def create_from_b64(self, data: "dict[str, Any]") -> Subject:
"""Create subject
Creates subject with request body from b64 encoded string
Args:
data (dict): Dictionary with 2 fields:
A YAML representation of the data argument would be:
.. code-block:: yaml
display_name: An imported subject
subject_string: ey66...
Returns:
:class:`Subject` instance
"""
decoded = b64decode(data["subject_string"])
LOGGER.debug("decoded %s", decoded)
outdata = {
k: v
for k, v in json_loads(decoded).items()
if k in ("wallet_pub_key", "tessera_pub_key")
}
outdata["display_name"] = data["display_name"]
LOGGER.debug("data %s", outdata)
return Subject(**self._archivist.post(self._label, outdata))
[docs]
def wait_for_confirmation(self, identity: str) -> Subject:
"""Wait for subject to be confirmed.
Waits for subject to be confirmed.
Args:
identity (str): identity of asset
Returns:
True if subject is confirmed.
"""
subjects_confirmer.MAX_TIME = self._archivist.max_time
# pylint: disable=protected-access
return subjects_confirmer._wait_for_confirmation(self, identity)
[docs]
def read(self, identity: str) -> Subject:
"""Read Subject
Reads subject.
Args:
identity (str): subjects identity e.g. subjects/xxxxxxxxxxxxxxxxxxxxxxx
Returns:
:class:`Subject` instance
"""
return Subject(**self._archivist.get(f"{self._subpath}/{identity}"))
[docs]
def update(
self,
identity: str,
*,
display_name: "str|None" = None,
wallet_pub_key: "list[str]|None" = None,
tessera_pub_key: "list[str]|None" = None,
) -> Subject:
"""Update Subject
Update subject.
Args:
identity (str): subjects identity e.g. subjects/xxxxxxxxxxxxxxxxxxxxxxx
display_name (str): display name of subject.
wallet_pub_key (list): wallet public keys
tessera_pub_key (list): tessera public keys
Returns:
:class:`Subject` instance
"""
return Subject(
**self._archivist.patch(
f"{self._subpath}/{identity}",
self.__params(
display_name=display_name,
wallet_pub_key=wallet_pub_key,
tessera_pub_key=tessera_pub_key,
),
)
)
[docs]
def delete(self, identity: str) -> "dict[str, Any]":
"""Delete Subject
Deletes subject.
Args:
identity (str): subjects identity e.g. subjects/xxxxxxxxxxxxxxxxxxxxxxx
Returns:
:class:`Subject` instance - empty?
"""
return self._archivist.delete(f"{self._subpath}/{identity}")
def __params(
self,
*,
display_name: "str|None" = None,
wallet_pub_key: "list[str]|None" = None,
tessera_pub_key: "list[str]|None" = None,
) -> "dict[str, Any]":
params = {}
if display_name is not None:
params["display_name"] = display_name
if wallet_pub_key is not None:
params["wallet_pub_key"] = wallet_pub_key
if tessera_pub_key is not None:
params["tessera_pub_key"] = tessera_pub_key
return _deepmerge(self._archivist.fixtures.get(SUBJECTS_LABEL), params)
[docs]
def count(self, *, display_name: "str|None" = None) -> int:
"""Count subjects.
Counts number of subjects that match criteria.
Args:
display_name (str): display name (optional)
Returns:
integer count of subjects.
"""
return self._archivist.count(
self._label,
params=self.__params(display_name=display_name),
)
[docs]
def list(
self,
*,
page_size: "int|None" = None,
display_name: "str|None" = None,
):
"""List subjects.
List subjects that match criteria.
Args:
display_name (str): display name (optional)
page_size (int): optional page size. (Rarely used).
Returns:
iterable that returns :class:`Subject` instances
"""
LOGGER.debug("List '%s'", display_name)
return (
Subject(**a)
for a in self._archivist.list(
self._label,
SUBJECTS_LABEL,
page_size=page_size,
params=self.__params(display_name=display_name),
)
)