Create Asset Compliance: CURRENT_OUTSTANDING policy

[1]:
# Define a compliance policy that alerts when an asset spends too long in a bad state.

# Main function establishes a connection to DataTrails using an App Registration then uses that
# to create an access policy, test it in good and bad states, then cleans up.
[2]:
from json import dumps as json_dumps
from os import getenv
from time import sleep
from uuid import uuid4
from warnings import filterwarnings

from archivist.archivist import Archivist
from archivist.compliance_policy_requests import CompliancePolicyCurrentOutstanding
from archivist.constants import ASSET_BEHAVIOURS
from archivist.logger import set_logger

filterwarnings("ignore", message="Unverified HTTPS request")
[3]:
%reload_ext dotenv
%dotenv -o notebooks.env
[4]:
# URL, CLIENT, SECRET are environment variables that represent connection parameters.
#
# URL = represents the url to the DataTrails application
# CLIENT = represents the client ID from an Application Registration
# SECRET = represents the client secret from an Application Registration
DATATRAILS_URL = getenv("DATATRAILS_URL")
DATATRAILS_APPREG_CLIENT = getenv("DATATRAILS_APPREG_CLIENT")
DATATRAILS_APPREG_SECRET = getenv("DATATRAILS_APPREG_SECRET")
[5]:
"""
Main function of Asset and Event creation.

* Connect to DataTrails with client ID and client secret
* Creates an Asset and two Events
* Prints response of Asset and Event creation
"""

# Optional call to set the logger level.  The argument can be either
# "INFO" or "DEBUG".  For more sophisticated logging control see our
# documentation.
set_logger("INFO")

# Initialize connection to DATATRAILS
print("Connecting to DATATRAILS")
print("DATATRAILS_URL", DATATRAILS_URL)
arch = Archivist(
    DATATRAILS_URL, (DATATRAILS_APPREG_CLIENT, DATATRAILS_APPREG_SECRET), max_time=300
)
Connecting to DATATRAILS
DATATRAILS_URL https://app.datatrails.ai
[6]:
def create_compliance_policy(arch):
    """Compliance policy which notices when process steps are
    not executed - eg 'you must close the door after you open it'
    or 'candidate software build must be approved before release'

    This example creates a policy that requires doors to be closed
    after they are opened.
    """
    compliance_policy = arch.compliance_policies.create(
        CompliancePolicyCurrentOutstanding(
            description="Vault doors should be closed according to site security policy section Phys.Integ.02",
            display_name="Phys.Integ.02",
            asset_filter=[
                ["attributes.arc_display_type=Vault Door"],
            ],
            event_display_type="Open",
            closing_event_display_type="Close",
        )
    )
    print("CURRENT_OUTSTANDING_POLICY:", json_dumps(compliance_policy, indent=4))
    return compliance_policy
[7]:
def create_door(arch):
    """
    Creates an Asset record to track a particular door.
    """

    door, _ = arch.assets.create_if_not_exists(
        {
            "selector": [
                {
                    "attributes": [
                        "arc_display_name",
                        "arc_display_type",
                    ]
                },
            ],
            "behaviours": ASSET_BEHAVIOURS,
            "attributes": {
                "arc_display_name": "Gringott's Vault 2",
                "arc_description": "Main door to the second level security vault in Gringott's Wizarding Bank",
                "arc_display_type": "Vault Door",
            },
        },
    )
    print("DOOR:", json_dumps(door, indent=4))
    return door
[8]:
def open_door(arch, door, tag):
    """
    Open the vault door
    """
    door_opened = arch.events.create(
        door["identity"],
        {
            "operation": "Record",
            "behaviour": "RecordEvidence",
        },
        {
            "arc_description": "Open the door for Lucius Malfoy",
            "arc_display_type": "Open",
            "arc_correlation_value": f"{tag}",
        },
    )
    print("DOOR_OPENED:", json_dumps(door_opened, indent=4))
[9]:
def close_door(arch, door, tag):
    """
    Close the vault door
    """
    door_closed = arch.events.create(
        door["identity"],
        {
            "operation": "Record",
            "behaviour": "RecordEvidence",
        },
        {
            "arc_description": "Closed the door after Lucius Malfoy exited the vault",
            "arc_display_type": "Close",
            "arc_correlation_value": f"{tag}",
        },
    )
    print("DOOR_CLOSED:", json_dumps(door_closed, indent=4))
[10]:
# Compliance policies with related events (eg open/close, order/ship/deliver
# type situations) require events to be linked through a correlation value.
# In many cases this will be obvious (a CVE tag for vulnerability management,
# or a works ticket number for maintenance, or even a timestamp) but here
# we'll just make a UUID to make sure it's unique and this test is repeatable
tag = uuid4()
print(f"Tag for this run: {tag}")
Tag for this run: 5597da73-19e4-448b-80c0-33c79253961a
[11]:
# make a compliance policy that alerts when doors are left open
compliance_policy = create_compliance_policy(arch)
print("compliance_policy", json_dumps(compliance_policy, indent=4))
Refresh token
CURRENT_OUTSTANDING_POLICY: {
    "identity": "compliance_policies/253a31f2-ef9f-44ba-8940-4dafd23e32f9",
    "compliance_type": "COMPLIANCE_CURRENT_OUTSTANDING",
    "description": "Vault doors should be closed according to site security policy section Phys.Integ.02",
    "display_name": "Phys.Integ.02",
    "asset_filter": [
        {
            "or": [
                "attributes.arc_display_type=Vault Door"
            ]
        }
    ],
    "event_display_type": "Open",
    "closing_event_display_type": "Close",
    "time_period_seconds": "0",
    "dynamic_window": "0",
    "dynamic_variability": 0,
    "richness_assertions": []
}
compliance_policy {
    "identity": "compliance_policies/253a31f2-ef9f-44ba-8940-4dafd23e32f9",
    "compliance_type": "COMPLIANCE_CURRENT_OUTSTANDING",
    "description": "Vault doors should be closed according to site security policy section Phys.Integ.02",
    "display_name": "Phys.Integ.02",
    "asset_filter": [
        {
            "or": [
                "attributes.arc_display_type=Vault Door"
            ]
        }
    ],
    "event_display_type": "Open",
    "closing_event_display_type": "Close",
    "time_period_seconds": "0",
    "dynamic_window": "0",
    "dynamic_variability": 0,
    "richness_assertions": []
}
[12]:
# create an asset that matches the assets_filter field in the
# compliance policy.
gringotts_vault = create_door(arch)
print("gringotts_vault", json_dumps(gringotts_vault, indent=4))
asset with selector {},{'arc_display_name': "Gringott's Vault 2", 'arc_display_type': 'Vault Door'} already exists
DOOR: {
    "identity": "assets/652b3fdf-736e-455f-81ec-fed5088a351c",
    "behaviours": [
        "AssetCreator",
        "RecordEvidence",
        "Builtin"
    ],
    "attributes": {
        "arc_display_name": "Gringott's Vault 2",
        "arc_display_type": "Vault Door",
        "arc_description": "Main door to the second level security vault in Gringott's Wizarding Bank"
    },
    "confirmation_status": "CONFIRMED",
    "tracked": "TRACKED",
    "owner": "0xe889E67FdBa658C6f27ccBDa98D9d1B5500Dbbce",
    "at_time": "2023-01-16T11:51:30Z",
    "storage_integrity": "TENANT_STORAGE",
    "chain_id": "827586838445807967",
    "public": false,
    "tenant_identity": "tenant/9bfb80ee-81f6-40dc-b5c7-1c7fb2fb9866"
}
gringotts_vault {
    "identity": "assets/652b3fdf-736e-455f-81ec-fed5088a351c",
    "behaviours": [
        "AssetCreator",
        "RecordEvidence",
        "Builtin"
    ],
    "attributes": {
        "arc_display_name": "Gringott's Vault 2",
        "arc_display_type": "Vault Door",
        "arc_description": "Main door to the second level security vault in Gringott's Wizarding Bank"
    },
    "confirmation_status": "CONFIRMED",
    "tracked": "TRACKED",
    "owner": "0xe889E67FdBa658C6f27ccBDa98D9d1B5500Dbbce",
    "at_time": "2023-01-16T11:51:30Z",
    "storage_integrity": "TENANT_STORAGE",
    "chain_id": "827586838445807967",
    "public": false,
    "tenant_identity": "tenant/9bfb80ee-81f6-40dc-b5c7-1c7fb2fb9866"
}
[13]:
# Open the door
open_door(arch, gringotts_vault, tag)

# Check compliance: should fail because the door is open
sleep(5)
compliance_nok = arch.compliance.compliant_at(
    gringotts_vault["identity"],
)
print("COMPLIANCE (should be false):", json_dumps(compliance_nok, indent=4))
DOOR_OPENED: {
    "identity": "assets/652b3fdf-736e-455f-81ec-fed5088a351c/events/f4355906-bd65-4d3a-b726-4967936012dd",
    "asset_identity": "assets/652b3fdf-736e-455f-81ec-fed5088a351c",
    "event_attributes": {
        "arc_display_type": "Open",
        "arc_correlation_value": "5597da73-19e4-448b-80c0-33c79253961a",
        "arc_description": "Open the door for Lucius Malfoy"
    },
    "asset_attributes": {},
    "operation": "Record",
    "behaviour": "RecordEvidence",
    "timestamp_declared": "2023-01-16T11:51:36Z",
    "timestamp_accepted": "2023-01-16T11:51:36Z",
    "timestamp_committed": "2023-01-16T11:51:36.744394895Z",
    "principal_declared": {
        "issuer": "https://app.datatrails.ai/appidpv1",
        "subject": "437bd138-dade-4346-aadd-dfdfee51ddf4",
        "display_name": "Test Notebooks",
        "email": ""
    },
    "principal_accepted": {
        "issuer": "https://app.datatrails.ai/appidpv1",
        "subject": "437bd138-dade-4346-aadd-dfdfee51ddf4",
        "display_name": "Test Notebooks",
        "email": ""
    },
    "confirmation_status": "CONFIRMED",
    "transaction_id": "",
    "block_number": 0,
    "transaction_index": 0,
    "from": "0xe889E67FdBa658C6f27ccBDa98D9d1B5500Dbbce",
    "tenant_identity": "tenant/9bfb80ee-81f6-40dc-b5c7-1c7fb2fb9866"
}
COMPLIANCE (should be false): {
    "compliant": false,
    "compliance": [
        {
            "compliance_policy_identity": "compliance_policies/253a31f2-ef9f-44ba-8940-4dafd23e32f9",
            "compliant": false,
            "reason": "No closing event for Open"
        }
    ],
    "next_page_token": "",
    "compliant_at": "2023-01-16T11:51:42Z"
}
[14]:
# Now close the door
close_door(arch, gringotts_vault, tag)

# Check compliance - should be OK because the door is now closed
sleep(5)
compliance_ok = arch.compliance.compliant_at(
    gringotts_vault["identity"],
)
print("COMPLIANCE (should be true):", json_dumps(compliance_ok, indent=4))
DOOR_CLOSED: {
    "identity": "assets/652b3fdf-736e-455f-81ec-fed5088a351c/events/e1d5c641-c1a4-4a8c-b07b-621ff054e86c",
    "asset_identity": "assets/652b3fdf-736e-455f-81ec-fed5088a351c",
    "event_attributes": {
        "arc_correlation_value": "5597da73-19e4-448b-80c0-33c79253961a",
        "arc_description": "Closed the door after Lucius Malfoy exited the vault",
        "arc_display_type": "Close"
    },
    "asset_attributes": {},
    "operation": "Record",
    "behaviour": "RecordEvidence",
    "timestamp_declared": "2023-01-16T11:51:42Z",
    "timestamp_accepted": "2023-01-16T11:51:42Z",
    "timestamp_committed": "2023-01-16T11:51:42.809012247Z",
    "principal_declared": {
        "issuer": "https://app.datatrails.ai/appidpv1",
        "subject": "437bd138-dade-4346-aadd-dfdfee51ddf4",
        "display_name": "Test Notebooks",
        "email": ""
    },
    "principal_accepted": {
        "issuer": "https://app.datatrails.ai/appidpv1",
        "subject": "437bd138-dade-4346-aadd-dfdfee51ddf4",
        "display_name": "Test Notebooks",
        "email": ""
    },
    "confirmation_status": "CONFIRMED",
    "transaction_id": "",
    "block_number": 0,
    "transaction_index": 0,
    "from": "0xe889E67FdBa658C6f27ccBDa98D9d1B5500Dbbce",
    "tenant_identity": "tenant/9bfb80ee-81f6-40dc-b5c7-1c7fb2fb9866"
}
COMPLIANCE (should be true): {
    "compliant": true,
    "compliance": [
        {
            "compliance_policy_identity": "compliance_policies/253a31f2-ef9f-44ba-8940-4dafd23e32f9",
            "compliant": true,
            "reason": ""
        }
    ],
    "next_page_token": "",
    "compliant_at": "2023-01-16T11:51:49Z"
}
[15]:
# However the fact that it is OK *now* is a bit of a red herring. It
# was non-compliant in the past and this may be an issue that needs to
# be verified during an investigation, insurance claim, or other dispute.
# We can check the audit history for compliance *at a point in time* and
# get a verifiable answer to the state of that asset at that time.

# To make sure the example works with such short time frames we grab the
# time from the previous not OK compliance call, but you can choose any
# arbitrary time in a real forensic process
time_of_suspicion = compliance_nok["compliant_at"]
compliance_nok = arch.compliance.compliant_at(
    gringotts_vault["identity"], compliant_at=time_of_suspicion
)
print("HISTORICAL COMPLIANCE (should be false):", json_dumps(compliance_nok, indent=4))
HISTORICAL COMPLIANCE (should be false): {
    "compliant": false,
    "compliance": [
        {
            "compliance_policy_identity": "compliance_policies/253a31f2-ef9f-44ba-8940-4dafd23e32f9",
            "compliant": false,
            "reason": "No closing event for Open"
        }
    ],
    "next_page_token": "",
    "compliant_at": "2023-01-16T11:51:42Z"
}
[16]:
# finally clean up by deleting the compliance_policy
_ = arch.compliance_policies.delete(
    compliance_policy["identity"],
)
[ ]: