Topics PII & CUI with AI Agents
Series · 6 posts Contact

Audit Trails & Logging

Detection and redaction protect data in motion. Audit logs protect you in a compliance review. Reviewers don't care whether your regex was clever — they want to know: who accessed what data, when, what was detected, and what was done about it. This post designs an append-only audit system that provides those answers without ever storing the raw PII values that would make the log itself a liability.

What compliance reviewers actually look for

Whether the review is a HIPAA audit, a GDPR data subject access request, or a CUI spillage investigation, reviewers consistently ask the same four questions:

  1. Who accessed the data? — Agent identity, operator identity, timestamp
  2. What data was accessed? — Document identifier, classification level
  3. What was detected? — PII/CUI types found, count, confidence scores
  4. What action was taken? — Redaction strategy applied, output destination

Every log entry must answer all four. If your log answers three of four, reviewers assume the fourth is being hidden — which is worse than having incomplete data. Structure your schema around these questions, not around what's convenient to log.

Why append-only matters

An audit log that can be modified after the fact provides no tamper evidence. Anyone with write access to the log file can delete entries that show a problematic access, or insert entries to fabricate an approval trail. Append-only logs eliminate this risk at the storage layer:

  • File system level: Use Linux append-only file attribute (chattr +a). Even root cannot delete entries without first removing the attribute — which itself generates an OS-level audit event.
  • Application level: Open the log file with O_APPEND flag only. Never truncate or seek to a position before writing.
  • Infrastructure level: Write to a WORM (Write Once Read Many) storage bucket (S3 Object Lock, Azure Immutable Blob) for the highest assurance. SIEM ingestion provides a secondary append-only copy off-system.

The 5 mandatory event fields

Field Type Format Purpose
timestamp string ISO 8601 UTC (2026-07-01T14:23:05.123Z) Establishes when the event occurred; enables timeline reconstruction
event_type string Enum (see event types table) What kind of operation occurred
agent_id string UUID or service identity string Which agent or service generated the event
data_classification string Enum: PII / CUI / BOTH / NONE Highest classification level of data involved in this event
action_taken string Human-readable action description What the system did in response to the detection

Event type schema

Event Type Required Additional Fields Triggered by
DETECTION document_id, entity_types[], entity_count, confidence_scores[] Pass 2 confirms at least one PII/CUI entity
REDACTION document_id, strategy, entity_hashes[], entities_redacted Redaction applied to a document
ACCESS document_id, access_type (read/write), operator_id Agent or operator reads a classified document
EXPORT document_id, destination, redacted (bool), operator_id Processed document is written to output or transmitted
CONFIG_CHANGE changed_keys[], old_values, new_values, operator_id Any change to the pii-guard.yaml config or CLI threshold override
SCAN_START scan_id, target_path, config_hash A scan session begins
SCAN_COMPLETE scan_id, documents_scanned, detections_total, duration_ms A scan session finishes (success or error)
🚨

Never log the raw PII value itself. If you log "detected_value": "078-05-1120", your audit log is now itself a PII repository subject to all the same compliance requirements as the original document — plus it's centralized and easier to exfiltrate. Log a SHA-256 hash of the value (for traceability) and the type+position (for evidence). The hash lets you prove you detected a specific value without storing the value.

Log schema and Python implementation

audit_log.py — Structured append-only logger
from __future__ import annotations
import hashlib
import json
import logging
import os
import socket
from datetime import datetime, timezone
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Literal
import urllib.request

EventType = Literal[
    "DETECTION", "REDACTION", "ACCESS", "EXPORT",
    "CONFIG_CHANGE", "SCAN_START", "SCAN_COMPLETE"
]

DataClassification = Literal["PII", "CUI", "BOTH", "NONE"]

# ── Audit event dataclass ─────────────────────────────────────────────────────

@dataclass
class AuditEvent:
    event_type: EventType
    agent_id: str
    data_classification: DataClassification
    action_taken: str
    # Mandatory fields set automatically
    timestamp: str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )
    # Optional context fields
    document_id: str | None = None
    entity_types: list[str] = field(default_factory=list)
    entity_hashes: list[str] = field(default_factory=list)
    entity_count: int = 0
    confidence_scores: list[float] = field(default_factory=list)
    strategy: str | None = None
    operator_id: str | None = None
    scan_id: str | None = None
    extra: dict[str, Any] = field(default_factory=dict)

    def to_json(self) -> str:
        d = {k: v for k, v in asdict(self).items() if v is not None and v != [] and v != {}}
        return json.dumps(d, separators=(",", ":"))

# ── Audit logger ──────────────────────────────────────────────────────────────

class AuditLogger:
    """Append-only structured audit log with optional SIEM forwarding."""

    def __init__(
        self,
        log_path: Path,
        siem_url: str | None = None,
        siem_token: str | None = None,
    ) -> None:
        self.log_path = log_path
        self.siem_url = siem_url
        self.siem_token = siem_token or os.getenv("SIEM_TOKEN")
        log_path.parent.mkdir(parents=True, exist_ok=True)
        # Open in append mode — never truncate
        self._file = log_path.open("a", encoding="utf-8", buffering=1)

    def emit(self, event: AuditEvent) -> None:
        line = event.to_json() + "\n"
        self._file.write(line)
        self._file.flush()
        if self.siem_url:
            self._forward_to_siem(line)

    def _forward_to_siem(self, line: str) -> None:
        """HTTP forward to SIEM (Splunk HEC, Elastic, etc.)."""
        try:
            data = line.encode()
            req = urllib.request.Request(
                self.siem_url,
                data=data,
                method="POST",
                headers={
                    "Content-Type": "application/json",
                    "Authorization": f"Splunk {self.siem_token}",
                },
            )
            with urllib.request.urlopen(req, timeout=2):
                pass
        except Exception as exc:
            # SIEM forwarding must never block the main pipeline
            logging.warning("SIEM forward failed: %s", exc)

    def close(self) -> None:
        self._file.close()

# ── Convenience helpers ───────────────────────────────────────────────────────

def hash_pii_value(value: str) -> str:
    """SHA-256 of the PII value for audit traceability without storing the value."""
    return hashlib.sha256(value.encode()).hexdigest()

def make_detection_event(
    agent_id: str,
    document_id: str,
    entities: list,   # list of DetectedEntity from detector.py
) -> AuditEvent:
    classification: DataClassification = "NONE"
    cui_types = {"CUI_EXPORT", "CUI_LAW_ENFORCEMENT", "CUI_GOVERNMENT_CONTRACT"}
    has_pii = any(e.pii_type not in cui_types for e in entities)
    has_cui = any(e.pii_type in cui_types for e in entities)
    if has_pii and has_cui:
        classification = "BOTH"
    elif has_pii:
        classification = "PII"
    elif has_cui:
        classification = "CUI"

    return AuditEvent(
        event_type="DETECTION",
        agent_id=agent_id,
        data_classification=classification,
        action_taken="Detection recorded; awaiting redaction policy",
        document_id=document_id,
        entity_types=[e.pii_type for e in entities],
        entity_hashes=[hash_pii_value(e.text) for e in entities],
        entity_count=len(entities),
        confidence_scores=[round(e.confidence, 3) for e in entities],
    )

Log retention and GDPR considerations

The audit log is the last place where the connection between a real person and an event is traceable (via the entity hash). This makes the log itself subject to data protection rules:

  • GDPR Article 17 (right to erasure): When a data subject requests deletion, you cannot delete audit log entries that contain their data hash — but you can pseudonymize the hash by re-hashing it with a deleted salt. This breaks the traceable link while preserving the log structure.
  • CUI retention: NARA CUI policy does not mandate a specific log retention period, but most federal contracts specify 3 years. Keep logs in encrypted storage for at least this period.
  • Operational recommendation: Rotate active log files daily (compressed), retain compressed logs for 90 days on fast storage, then move to cold storage (S3 Glacier, Azure Archive) for the remainder of the contractual retention period.
💡

SIEM integration: Shipping events to Splunk, Elastic, or Datadog via HTTP gives you a second copy that is off-system and harder to tamper with. Use the HTTP Event Collector (HEC) pattern shown above. For syslog-based SIEMs, serialize the JSON event to the syslog message field and send over TLS. Never ship raw PII values to your SIEM — the entity hash is sufficient for correlation queries.

← 04 — Easy Config via CLI Tool Series Overview 06 — End-to-End Pipeline →