Sharing Asset Example

"""Create an asset for Archivist with token.

   Create an access_policy that shares an asset when certain criteria are met.

   Access the asset from another Archivist connection using a second token with different
   access rights.
"""

from json import dumps as json_dumps
from os import getenv
from warnings import filterwarnings

from archivist.archivist import Archivist
from archivist.constants import ASSET_BEHAVIOURS, SUBJECTS_SELF_ID
from archivist.logger import set_logger
from archivist.utils import get_auth

filterwarnings("ignore", message="Unverified HTTPS request")


def create_example_asset(arch, label):
    """Create an asset using Archivist Connection.

    Args:
        arch: archivist connection.
        label: convenience label to easily distinguish the 2 organizations.

    Returns:
        Asset: a new asset created.

    """
    attrs = {
        "arc_display_name": f"{label}_display_name",  # Asset's display name
        "arc_description": f"{label}_display_description",  # Asset's description
        "arc_display_type": f"{label}_display_type",  # Arc_display_type is a free text field
        "ext_vendor_name": label,
    }

    # The first argument is the attributes of the asset
    #
    return arch.assets.create(attrs=attrs)


def create_archivist(label):
    """Create connection to archivist"""
    # Get authorization token. The token grants certain rights and access permissions.
    # The token can represent the root principal or user in an organization. Different tokens
    # could indicate different users in the same organization or membership of different
    # organiastions.
    auth = get_auth(
        auth_token=getenv(f"DATATRAILS_AUTHTOKEN_{label}"),
    )
    # Initialize connection to Archivist. max_time is the time to wait for confirmation
    # of an asset or event creation - the default is 300 seconds but one can optionally
    # specify a different value.
    return Archivist(
        "https://app.datatrails.ai",
        auth,
        max_time=300,
    )


def import_subject(acme, weyland):
    """Add subjects record for weyland in acme's environment"""
    subject = acme.subjects.import_subject(
        "weyland",
        weyland.subjects.read(SUBJECTS_SELF_ID),
    )

    # must wait for confirmation
    acme.subjects.wait_for_confirmation(subject["identity"])

    return subject


def create_example_access_policy(arch, label, subject):
    """Create access policy"""
    # consists of a filter selection entry and a selection criteria to restrict/redact
    # values of the asset attributes available to the sharee.

    # values pertaining to the access polcy itself.
    props = {
        "display_name": f"{label} access policy",
        "description": f"{label} Policy description",
    }

    # Filtering - access will be allowed to any asset that contains both these
    # attributes that equal these values. This happens to match the asset created
    # previously.
    filters = [
        {
            "or": [
                f"attributes.arc_display_type={label}_display_type",
            ]
        },
        {
            "or": [
                f"attributes.ext_vendor_name={label}",
            ]
        },
    ]

    # one must be the subject to gain access and only those fields
    # specified in include_attributes will be emitted.
    access_permissions = [
        {
            "subjects": [
                subject["identity"],
            ],
            "behaviours": ASSET_BEHAVIOURS,
            "include_attributes": [
                "arc_display_name",
            ],
        },
    ]

    return arch.access_policies.create(
        props,
        filters,
        access_permissions,
    )


def main():
    """Main function of share-asset."""
    # optional call to set the logger level for all subsystems. The argument can
    # be either "INFO" or "DEBUG". For more sophisticated logging control see the
    # documentation.
    set_logger("INFO")

    # For demonstration purposes we are going to assume that 2 organizations are
    # going to share an asset. The 2 organizations are ACME Corp and Weyland-Yutani
    # Corporation.
    acme = create_archivist("acme")
    weyland = create_archivist("weyland")

    # set a subject for weyland in acme's environment. The identity will be used as a
    # filter in the access permissions of the access_policy.
    weyland_subject_on_acme = import_subject(acme, weyland)
    print("weyland_subject on acme", json_dumps(weyland_subject_on_acme, indent=4))

    # acme creates an asset
    acme_asset = create_example_asset(acme, "acme")
    print("asset created in acme", json_dumps(acme_asset, indent=4))

    # now we want acme to share this asset to weyland via an access policy.
    access_policy = create_example_access_policy(acme, "acme", weyland_subject_on_acme)
    print("access policy created in acme", json_dumps(access_policy, indent=4))

    # display the asset as retrieved by weyland
    # NB: the attributes dict is redacted...
    weyland_asset = weyland.assets.read(acme_asset["identity"])
    print("asset read from weyland", json_dumps(weyland_asset, indent=4))

    # list matching access policies
    access_policies = list(
        acme.access_policies.list_matching_access_policies(acme_asset["identity"])
    )
    print("access policies read from acme", json_dumps(access_policies, indent=4))

    # delete all the access policies
    for access_policy in access_policies:
        acme.access_policies.delete(access_policy["identity"])

    # list matching access policies
    access_policies = list(
        acme.access_policies.list_matching_access_policies(acme_asset["identity"])
    )
    print("access policies read from acme", json_dumps(access_policies, indent=4))

    # display the asset as retrieved by the sharee
    # NB the asset is still shared even though there are no access policies
    weyland_asset = weyland.assets.read(acme_asset["identity"])
    print("asset read from weyland", json_dumps(weyland_asset, indent=4))

    acme.close()
    weyland.close()


if __name__ == "__main__":
    main()