Create Asset Compliance: CURRENT_OUTSTANDING policy
[1]:
# Define a compliance policy that alerts when an asset spends too long in a bad state.
# Main function establishes a connection to DataTrails using an App Registration then uses that
# to create an access policy, test it in good and bad states, then cleans up.
[2]:
from json import dumps as json_dumps
from os import getenv
from time import sleep
from uuid import uuid4
from warnings import filterwarnings
from archivist.archivist import Archivist
from archivist.compliance_policy_requests import CompliancePolicyCurrentOutstanding
from archivist.constants import ASSET_BEHAVIOURS
from archivist.logger import set_logger
filterwarnings("ignore", message="Unverified HTTPS request")
[3]:
%reload_ext dotenv
%dotenv -o notebooks.env
[4]:
# URL, CLIENT, SECRET are environment variables that represent connection parameters.
#
# URL = represents the url to the DataTrails application
# CLIENT = represents the client ID from an Application Registration
# SECRET = represents the client secret from an Application Registration
DATATRAILS_URL = getenv("DATATRAILS_URL")
DATATRAILS_APPREG_CLIENT = getenv("DATATRAILS_APPREG_CLIENT")
DATATRAILS_APPREG_SECRET = getenv("DATATRAILS_APPREG_SECRET")
[5]:
"""
Main function of Asset and Event creation.
* Connect to DataTrails with client ID and client secret
* Creates an Asset and two Events
* Prints response of Asset and Event creation
"""
# Optional call to set the logger level. The argument can be either
# "INFO" or "DEBUG". For more sophisticated logging control see our
# documentation.
set_logger("INFO")
# Initialize connection to DATATRAILS
print("Connecting to DATATRAILS")
print("DATATRAILS_URL", DATATRAILS_URL)
arch = Archivist(
DATATRAILS_URL, (DATATRAILS_APPREG_CLIENT, DATATRAILS_APPREG_SECRET), max_time=300
)
Connecting to DATATRAILS
DATATRAILS_URL https://app.datatrails.ai
[6]:
def create_compliance_policy(arch):
"""Compliance policy which notices when process steps are
not executed - eg 'you must close the door after you open it'
or 'candidate software build must be approved before release'
This example creates a policy that requires doors to be closed
after they are opened.
"""
compliance_policy = arch.compliance_policies.create(
CompliancePolicyCurrentOutstanding(
description="Vault doors should be closed according to site security policy section Phys.Integ.02",
display_name="Phys.Integ.02",
asset_filter=[
["attributes.arc_display_type=Vault Door"],
],
event_display_type="Open",
closing_event_display_type="Close",
)
)
print("CURRENT_OUTSTANDING_POLICY:", json_dumps(compliance_policy, indent=4))
return compliance_policy
[7]:
def create_door(arch):
"""
Creates an Asset record to track a particular door.
"""
door, _ = arch.assets.create_if_not_exists(
{
"selector": [
{
"attributes": [
"arc_display_name",
"arc_display_type",
]
},
],
"behaviours": ASSET_BEHAVIOURS,
"attributes": {
"arc_display_name": "Gringott's Vault 2",
"arc_description": "Main door to the second level security vault in Gringott's Wizarding Bank",
"arc_display_type": "Vault Door",
},
},
)
print("DOOR:", json_dumps(door, indent=4))
return door
[8]:
def open_door(arch, door, tag):
"""
Open the vault door
"""
door_opened = arch.events.create(
door["identity"],
{
"operation": "Record",
"behaviour": "RecordEvidence",
},
{
"arc_description": "Open the door for Lucius Malfoy",
"arc_display_type": "Open",
"arc_correlation_value": f"{tag}",
},
)
print("DOOR_OPENED:", json_dumps(door_opened, indent=4))
[9]:
def close_door(arch, door, tag):
"""
Close the vault door
"""
door_closed = arch.events.create(
door["identity"],
{
"operation": "Record",
"behaviour": "RecordEvidence",
},
{
"arc_description": "Closed the door after Lucius Malfoy exited the vault",
"arc_display_type": "Close",
"arc_correlation_value": f"{tag}",
},
)
print("DOOR_CLOSED:", json_dumps(door_closed, indent=4))
[10]:
# Compliance policies with related events (eg open/close, order/ship/deliver
# type situations) require events to be linked through a correlation value.
# In many cases this will be obvious (a CVE tag for vulnerability management,
# or a works ticket number for maintenance, or even a timestamp) but here
# we'll just make a UUID to make sure it's unique and this test is repeatable
tag = uuid4()
print(f"Tag for this run: {tag}")
Tag for this run: 5597da73-19e4-448b-80c0-33c79253961a
[11]:
# make a compliance policy that alerts when doors are left open
compliance_policy = create_compliance_policy(arch)
print("compliance_policy", json_dumps(compliance_policy, indent=4))
Refresh token
CURRENT_OUTSTANDING_POLICY: {
"identity": "compliance_policies/253a31f2-ef9f-44ba-8940-4dafd23e32f9",
"compliance_type": "COMPLIANCE_CURRENT_OUTSTANDING",
"description": "Vault doors should be closed according to site security policy section Phys.Integ.02",
"display_name": "Phys.Integ.02",
"asset_filter": [
{
"or": [
"attributes.arc_display_type=Vault Door"
]
}
],
"event_display_type": "Open",
"closing_event_display_type": "Close",
"time_period_seconds": "0",
"dynamic_window": "0",
"dynamic_variability": 0,
"richness_assertions": []
}
compliance_policy {
"identity": "compliance_policies/253a31f2-ef9f-44ba-8940-4dafd23e32f9",
"compliance_type": "COMPLIANCE_CURRENT_OUTSTANDING",
"description": "Vault doors should be closed according to site security policy section Phys.Integ.02",
"display_name": "Phys.Integ.02",
"asset_filter": [
{
"or": [
"attributes.arc_display_type=Vault Door"
]
}
],
"event_display_type": "Open",
"closing_event_display_type": "Close",
"time_period_seconds": "0",
"dynamic_window": "0",
"dynamic_variability": 0,
"richness_assertions": []
}
[12]:
# create an asset that matches the assets_filter field in the
# compliance policy.
gringotts_vault = create_door(arch)
print("gringotts_vault", json_dumps(gringotts_vault, indent=4))
asset with selector {},{'arc_display_name': "Gringott's Vault 2", 'arc_display_type': 'Vault Door'} already exists
DOOR: {
"identity": "assets/652b3fdf-736e-455f-81ec-fed5088a351c",
"behaviours": [
"AssetCreator",
"RecordEvidence",
"Builtin"
],
"attributes": {
"arc_display_name": "Gringott's Vault 2",
"arc_display_type": "Vault Door",
"arc_description": "Main door to the second level security vault in Gringott's Wizarding Bank"
},
"confirmation_status": "CONFIRMED",
"tracked": "TRACKED",
"owner": "0xe889E67FdBa658C6f27ccBDa98D9d1B5500Dbbce",
"at_time": "2023-01-16T11:51:30Z",
"storage_integrity": "TENANT_STORAGE",
"chain_id": "827586838445807967",
"public": false,
"tenant_identity": "tenant/9bfb80ee-81f6-40dc-b5c7-1c7fb2fb9866"
}
gringotts_vault {
"identity": "assets/652b3fdf-736e-455f-81ec-fed5088a351c",
"behaviours": [
"AssetCreator",
"RecordEvidence",
"Builtin"
],
"attributes": {
"arc_display_name": "Gringott's Vault 2",
"arc_display_type": "Vault Door",
"arc_description": "Main door to the second level security vault in Gringott's Wizarding Bank"
},
"confirmation_status": "CONFIRMED",
"tracked": "TRACKED",
"owner": "0xe889E67FdBa658C6f27ccBDa98D9d1B5500Dbbce",
"at_time": "2023-01-16T11:51:30Z",
"storage_integrity": "TENANT_STORAGE",
"chain_id": "827586838445807967",
"public": false,
"tenant_identity": "tenant/9bfb80ee-81f6-40dc-b5c7-1c7fb2fb9866"
}
[13]:
# Open the door
open_door(arch, gringotts_vault, tag)
# Check compliance: should fail because the door is open
sleep(5)
compliance_nok = arch.compliance.compliant_at(
gringotts_vault["identity"],
)
print("COMPLIANCE (should be false):", json_dumps(compliance_nok, indent=4))
DOOR_OPENED: {
"identity": "assets/652b3fdf-736e-455f-81ec-fed5088a351c/events/f4355906-bd65-4d3a-b726-4967936012dd",
"asset_identity": "assets/652b3fdf-736e-455f-81ec-fed5088a351c",
"event_attributes": {
"arc_display_type": "Open",
"arc_correlation_value": "5597da73-19e4-448b-80c0-33c79253961a",
"arc_description": "Open the door for Lucius Malfoy"
},
"asset_attributes": {},
"operation": "Record",
"behaviour": "RecordEvidence",
"timestamp_declared": "2023-01-16T11:51:36Z",
"timestamp_accepted": "2023-01-16T11:51:36Z",
"timestamp_committed": "2023-01-16T11:51:36.744394895Z",
"principal_declared": {
"issuer": "https://app.datatrails.ai/appidpv1",
"subject": "437bd138-dade-4346-aadd-dfdfee51ddf4",
"display_name": "Test Notebooks",
"email": ""
},
"principal_accepted": {
"issuer": "https://app.datatrails.ai/appidpv1",
"subject": "437bd138-dade-4346-aadd-dfdfee51ddf4",
"display_name": "Test Notebooks",
"email": ""
},
"confirmation_status": "CONFIRMED",
"transaction_id": "",
"block_number": 0,
"transaction_index": 0,
"from": "0xe889E67FdBa658C6f27ccBDa98D9d1B5500Dbbce",
"tenant_identity": "tenant/9bfb80ee-81f6-40dc-b5c7-1c7fb2fb9866"
}
COMPLIANCE (should be false): {
"compliant": false,
"compliance": [
{
"compliance_policy_identity": "compliance_policies/253a31f2-ef9f-44ba-8940-4dafd23e32f9",
"compliant": false,
"reason": "No closing event for Open"
}
],
"next_page_token": "",
"compliant_at": "2023-01-16T11:51:42Z"
}
[14]:
# Now close the door
close_door(arch, gringotts_vault, tag)
# Check compliance - should be OK because the door is now closed
sleep(5)
compliance_ok = arch.compliance.compliant_at(
gringotts_vault["identity"],
)
print("COMPLIANCE (should be true):", json_dumps(compliance_ok, indent=4))
DOOR_CLOSED: {
"identity": "assets/652b3fdf-736e-455f-81ec-fed5088a351c/events/e1d5c641-c1a4-4a8c-b07b-621ff054e86c",
"asset_identity": "assets/652b3fdf-736e-455f-81ec-fed5088a351c",
"event_attributes": {
"arc_correlation_value": "5597da73-19e4-448b-80c0-33c79253961a",
"arc_description": "Closed the door after Lucius Malfoy exited the vault",
"arc_display_type": "Close"
},
"asset_attributes": {},
"operation": "Record",
"behaviour": "RecordEvidence",
"timestamp_declared": "2023-01-16T11:51:42Z",
"timestamp_accepted": "2023-01-16T11:51:42Z",
"timestamp_committed": "2023-01-16T11:51:42.809012247Z",
"principal_declared": {
"issuer": "https://app.datatrails.ai/appidpv1",
"subject": "437bd138-dade-4346-aadd-dfdfee51ddf4",
"display_name": "Test Notebooks",
"email": ""
},
"principal_accepted": {
"issuer": "https://app.datatrails.ai/appidpv1",
"subject": "437bd138-dade-4346-aadd-dfdfee51ddf4",
"display_name": "Test Notebooks",
"email": ""
},
"confirmation_status": "CONFIRMED",
"transaction_id": "",
"block_number": 0,
"transaction_index": 0,
"from": "0xe889E67FdBa658C6f27ccBDa98D9d1B5500Dbbce",
"tenant_identity": "tenant/9bfb80ee-81f6-40dc-b5c7-1c7fb2fb9866"
}
COMPLIANCE (should be true): {
"compliant": true,
"compliance": [
{
"compliance_policy_identity": "compliance_policies/253a31f2-ef9f-44ba-8940-4dafd23e32f9",
"compliant": true,
"reason": ""
}
],
"next_page_token": "",
"compliant_at": "2023-01-16T11:51:49Z"
}
[15]:
# However the fact that it is OK *now* is a bit of a red herring. It
# was non-compliant in the past and this may be an issue that needs to
# be verified during an investigation, insurance claim, or other dispute.
# We can check the audit history for compliance *at a point in time* and
# get a verifiable answer to the state of that asset at that time.
# To make sure the example works with such short time frames we grab the
# time from the previous not OK compliance call, but you can choose any
# arbitrary time in a real forensic process
time_of_suspicion = compliance_nok["compliant_at"]
compliance_nok = arch.compliance.compliant_at(
gringotts_vault["identity"], compliant_at=time_of_suspicion
)
print("HISTORICAL COMPLIANCE (should be false):", json_dumps(compliance_nok, indent=4))
HISTORICAL COMPLIANCE (should be false): {
"compliant": false,
"compliance": [
{
"compliance_policy_identity": "compliance_policies/253a31f2-ef9f-44ba-8940-4dafd23e32f9",
"compliant": false,
"reason": "No closing event for Open"
}
],
"next_page_token": "",
"compliant_at": "2023-01-16T11:51:42Z"
}
[16]:
# finally clean up by deleting the compliance_policy
_ = arch.compliance_policies.delete(
compliance_policy["identity"],
)
[ ]: