"""Assets interface
Access to the assets endpoint.
The user is not expected to use this class directly. It is an attribute of the
:class:`Archivist` class.
For example instantiate an Archivist instance and execute the methods of the class:
.. code-block:: python
with open(".auth_token", mode="r", encoding="utf-8") as tokenfile:
authtoken = tokenfile.read().strip()
# Initialize connection to Archivist
arch = Archivist(
"https://app.datatrails.ai",
authtoken,
)
asset = arch.assets.create(...)
"""
from copy import deepcopy
from logging import getLogger
from typing import TYPE_CHECKING, Any
# pylint:disable=cyclic-import # but pylint doesn't understand this feature
from . import confirmer
from .asset import Asset
from .constants import (
ASSET_BEHAVIOURS,
ASSETS_LABEL,
ASSETS_SUBPATH,
CONFIRMATION_STATUS,
)
from .dictmerge import _deepmerge
from .errors import ArchivistBadFieldError, ArchivistNotFoundError
from .utils import selector_signature
if TYPE_CHECKING:
from .archivist import Archivist
LOGGER = getLogger(__name__)
[docs]
class _AssetsPublic:
"""AssetsReader
Access to assets entities using CRUD interface. This class is usually
accessed as an attribute of the Archivist or Public class.
Args:
archivist (Archivist): :class:`Archivist` instance
"""
def __init__(self, archivist_instance: "Archivist"):
self._archivist = archivist_instance
self._public = archivist_instance.public
self._subpath = f"{archivist_instance.root}/{ASSETS_SUBPATH}"
def __str__(self) -> str:
return "AssetsPublic()"
[docs]
def _identity(self, identity: str) -> str:
"""Return fully qualified identity
If public then expect a full url as argument
"""
if self._public:
return identity
return f"{self._subpath}/{identity}"
[docs]
def read(self, identity: str) -> Asset:
"""Read asset
Reads asset.
Args:
identity (str): assets identity e.g. assets/xxxxxxxxxxxxxxxxxxxxxxx
Returns:
:class:`Asset` instance
"""
return Asset(**self._archivist.get(self._identity(identity)))
[docs]
class _AssetsRestricted(_AssetsPublic):
"""AssetsRestricted
Access to assets entities using CRUD interface. This class is usually
accessed as an attribute of the Archivist or Public class.
Args:
archivist (Archivist): :class:`Archivist` instance
"""
def __init__(self, archivist_instance: "Archivist"):
super().__init__(archivist_instance)
self._label = f"{self._subpath}/{ASSETS_LABEL}"
self.pending_count: int = 0
def __str__(self) -> str:
return f"AssetsRestricted({self._archivist.url})"
def __params(
self, props: "dict[str, Any]|None", attrs: "dict[str, Any]|None"
) -> "dict[str, Any]":
params = deepcopy(props) if props else {}
if attrs:
params["attributes"] = attrs
return _deepmerge(self._archivist.fixtures.get(f"{ASSETS_LABEL}"), params)
[docs]
def create(
self,
*,
props: "dict[str, Any]|None" = None,
attrs: "dict[str, Any]|None" = None,
confirm: bool = False,
) -> Asset:
"""Create asset
Creates asset with defined properties and attributes.
Args:
props (dict): Properties
attrs (dict): attributes of created asset.
confirm (bool): if True wait for asset to be confirmed.
Returns:
:class:`Asset` instance
"""
LOGGER.debug("Create Asset %s", attrs)
# default behaviours are added first - any set in user-specified fixtures or
# in the method args will overide...
newprops = _deepmerge({"behaviours": ASSET_BEHAVIOURS}, props)
data = self.__params(newprops, attrs)
return self.create_from_data(data, confirm=confirm)
[docs]
def create_from_data(
self, data: "dict[str, Any]", *, confirm: bool = False
) -> Asset:
"""Create asset
Creates asset with request body from data stream.
Suitable for reading data from a file using json.load or yaml.load
Args:
data (dict): request body of asset.
confirm (bool): if True wait for asset to be confirmed.
Returns:
:class:`Asset` instance
"""
asset = Asset(**self._archivist.post(self._label, data))
if not confirm:
return asset
return self.wait_for_confirmation(asset["identity"])
[docs]
def create_if_not_exists(
self, data: "dict[str, Any]", *, confirm: bool = False
) -> "tuple[Asset, bool]":
"""
Creates an asset and associated locations and attachments if asset
does not already exist.
Args:
data (dict): request body of asset.
confirm (bool): if True wait for asset to be confirmed.
A YAML representation of the data argument would be:
.. code-block:: yaml
selector:
- attributes:
- arc_display_name
behaviours
- RecordEvidence
attributes:
arc_display_name: DataTrails Front Door
arc_firmware_version: "1.0"
arc_serial_number: das-j1-01
arc_description: Electronic door entry system to DataTrails France
wavestone_asset_id: paris.france.datatrails.das
location:
identity: locations/xxxxxxxxxxxxxxxxxxxxxxxxxx
location:
selector:
- display_name
display_name: DataTrails Paris
description: Sales and sales support for the French region
latitude: 48.8339211,
longitude: 2.371345,
attributes:
address: 5 Parvis Alan Turing, 75013 Paris, France
wavestone_ext: managed
attachments:
- filename: functests/test_resources/doors/assets/entry-terminal.jpg
content_type: image/jpg
attachment: terminal entry
The 'selector' value is required and will usually specify the 'arc_display_name' as a
secondary key. The keys in 'selector' must exist in the attributes of the asset.
If 'location' is specified then the 'selector' value is required and is used as a
secondary key. Likewise the secondary key must exist in the attributes of the location.
Alternatively the identity of the location is specified - both
are shown - choose one.
Returns:
tuple of :class:`Asset` instance, Boolean is True if asset already existed
"""
asset = None
existed = False
data = deepcopy(data)
attachments = data.pop("attachments", None)
location = data.pop("location", None)
selector = data.pop("selector") # must exist
props, attrs = selector_signature(selector, data)
try:
asset = self.read_by_signature(props=props, attrs=attrs)
except ArchivistNotFoundError:
LOGGER.info(
"asset with selector %s,%s does not exist - creating", props, attrs
)
else:
LOGGER.info("asset with selector %s,%s already exists", props, attrs)
return asset, True
# is location present?
if location is not None:
if "identity" in location:
data["attributes"]["arc_home_location_identity"] = location["identity"]
else:
loc, _ = self._archivist.locations.create_if_not_exists(
location,
)
data["attributes"]["arc_home_location_identity"] = loc["identity"]
# any attachments ?
if attachments is not None:
for a in attachments:
# attempt to get attachment to use as a key
attachment_key = a.get("attachment", None)
if attachment_key is None:
# failing that create a key from filename or url
attachment_key = self._archivist.attachments.get_default_key(a)
data["attributes"][attachment_key] = self._archivist.attachments.create(
a
)
asset = self.create_from_data(
data=data,
confirm=confirm,
)
return asset, existed
[docs]
def wait_for_confirmation(self, identity: str) -> Asset:
"""Wait for asset to be confirmed.
Waits for asset to be confirmed.
Args:
identity (str): identity of asset
Returns:
True if asset is confirmed.
"""
confirmer.MAX_TIME = self._archivist.max_time
# pylint: disable=protected-access
return confirmer._wait_for_confirmation(self, identity)
[docs]
def wait_for_confirmed(
self,
*,
props: "dict[str, Any]|None" = None,
attrs: "dict[str, Any]|None" = None,
) -> bool:
"""Wait for assets to be confirmed.
Waits for all assets that match criteria to be confirmed.
Args:
props (dict): e.g. {"tracked": "TRACKED" }
attrs (dict): e.g. {"arc_display_type": "door" }
Returns:
True if all assets are confirmed.
"""
# check that entities exist
newprops = deepcopy(props) if props else {}
newprops.pop(CONFIRMATION_STATUS, None)
LOGGER.debug("Count assets %s", newprops)
count = self.count(props=newprops, attrs=attrs)
if count == 0:
raise ArchivistNotFoundError("No assets exist")
confirmer.MAX_TIME = self._archivist.max_time
# pylint: disable=protected-access
return confirmer._wait_for_confirmed(self, props=newprops, attrs=attrs)
[docs]
def count(
self,
*,
props: "dict[str, Any]|None" = None,
attrs: "dict[str, Any]|None" = None,
) -> int:
"""Count assets.
Counts number of assets that match criteria.
Args:
props (dict): e.g. {"confirmation_status": "CONFIRMED" }
attrs (dict): e.g. {"arc_display_type": "door" }
Returns:
integer count of assets.
"""
return self._archivist.count(self._label, params=self.__params(props, attrs))
[docs]
def list(
self,
*,
page_size: "int|None" = None,
props: "dict[str, Any]|None" = None,
attrs: "dict[str, Any]|None" = None,
):
"""List assets.
Lists assets that match criteria.
Args:
props (dict): optional e.g. {"tracked": "TRACKED" }
attrs (dict): optional e.g. {"arc_display_type": "door" }
page_size (int): optional page size. (Rarely used).
Returns:
iterable that returns :class:`Asset` instances
"""
return (
Asset(**a)
for a in self._archivist.list(
self._label,
ASSETS_LABEL,
page_size=page_size,
params=self.__params(props, attrs),
)
)
[docs]
def read_by_signature(
self,
*,
props: "dict[str, Any]|None" = None,
attrs: "dict[str, Any]|None" = None,
) -> Asset:
"""Read Asset by signature.
Reads asset that meets criteria. Only one asset is expected.
Args:
props (dict): e.g. {"tracked": "TRACKED" }
attrs (dict): e.g. {"arc_display_type": "door" }
Returns:
:class:`Asset` instance
"""
assets_label = f"public{ASSETS_LABEL}" if self._public else ASSETS_LABEL
return Asset(
**self._archivist.get_by_signature(
self._label,
assets_label,
params=self.__params(props, attrs),
)
)
[docs]
def publicurl(self, identity: str) -> str:
"""Read asset public url
Reads assets public url.
Args:
identity (str): assets identity e.g. assets/xxxxxxxxxxxxxxxxxxxxxxx
Returns:
:str: publicurl as string
"""
body = self._archivist.get(f"{self._identity(identity)}:publicurl")
publicurl = body.get("publicurl")
if publicurl is None:
raise ArchivistBadFieldError("No publicurl found in response")
return publicurl