Topics PII & CUI with AI Agents
Series · 6 posts Contact

End-to-End Pipeline

The previous five posts each cover one layer: taxonomy, detection, redaction, config, and audit. This post wires them together into a single Pipeline class with a .process(text, context) method — the only interface an agent needs. You'll also see how to register it as a pre/post hook in the Claude Agent SDK, handle failures safely, and verify the system is production-ready before deploying it.

Architecture overview

The pipeline has six stages, each with a defined input, output, and failure mode:

Stage Input Output Failure mode
1. Config load YAML file + CLI args PiiGuardConfig Raise on missing API key; warn on invalid threshold
2. Pass 1 detect Raw text Candidate span list Presidio error → log + treat as no candidates (fail-open)
3. Pass 2 confirm Candidates + context Confirmed entity list API error → retry x2, then fail-closed (block output)
4. Redact Text + confirmed entities Redacted text + redaction records Strategy error → fail-closed (return original text redacted with masks)
5. Audit Entities + redaction records Audit log entries written Disk full → write to SIEM only; log warning locally
6. Output Redacted text Caller receives safe text If any upstream stage failed-closed, return sentinel value

Fail-open vs fail-closed

Every pipeline that sits between an agent and its data source must decide what to do when a stage fails. The choice is not aesthetic — it determines your default security posture:

Fail-open
On error, pass the original unredacted text through. Maintains agent functionality. Acceptable for low-sensitivity data where availability matters more than protection.
Fail-closed
On error, block the text entirely — return an error sentinel. Protects sensitive data at the cost of agent availability. Required for CUI and healthcare.
Stage-dependent
The recommended approach: fail-open for Pass 1 (a missed candidate goes to Pass 2 anyway), fail-closed for Pass 2 (model errors mean unverified data reaches the agent).
Config-driven
Expose the fail behavior as a config key so operators can choose per-environment: fail-open in staging, fail-closed in production.

Python reference implementation

pipeline.py — Complete PII guard pipeline
from __future__ import annotations
import asyncio
import hashlib
import logging
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

# Imports from previous posts in this series
from .config import PiiGuardConfig, build_config
from .detector import detect, DetectedEntity
from .redactor import redact, RedactionRecord
from .audit_log import AuditLogger, make_detection_event, AuditEvent

logger = logging.getLogger(__name__)

# ── Pipeline result ───────────────────────────────────────────────────────────

@dataclass
class PipelineResult:
    safe_text: str
    entities_detected: list[DetectedEntity]
    redaction_records: list[RedactionRecord]
    scan_id: str
    blocked: bool = False   # True when fail-closed triggered
    error: str | None = None

_BLOCKED_SENTINEL = "[PIPELINE_BLOCKED: detection failed, output suppressed]"

# ── Main pipeline class ───────────────────────────────────────────────────────

class PiiGuardPipeline:
    """
    Single entry point for PII/CUI detection and redaction.
    Call .process(text, context) from your agent code.
    """

    def __init__(
        self,
        config: PiiGuardConfig,
        audit_logger: AuditLogger,
        agent_id: str | None = None,
    ) -> None:
        self.config = config
        self.audit = audit_logger
        self.agent_id = agent_id or f"pii-guard-{uuid.uuid4().hex[:8]}"

    def process(
        self,
        text: str,
        context: dict[str, Any] | None = None,
    ) -> PipelineResult:
        """
        Run the full pipeline synchronously.
        context: optional metadata (document_id, operator_id, etc.)
        """
        scan_id = str(uuid.uuid4())
        ctx = context or {}
        document_id = ctx.get("document_id", hashlib.md5(text.encode()).hexdigest()[:12])

        # ── Stage 2: Pass 1 — deterministic sweep ────────────────────────────
        try:
            # detect() in Post 02 runs both passes internally
            entities = detect(
                text,
                confidence_threshold=self.config.confidence_threshold,
            )
        except Exception as exc:
            logger.error("Detection failed for %s: %s", document_id, exc)
            if self.config.fail_closed:
                self._audit_blocked(scan_id, document_id, str(exc))
                return PipelineResult(
                    safe_text=_BLOCKED_SENTINEL,
                    entities_detected=[],
                    redaction_records=[],
                    scan_id=scan_id,
                    blocked=True,
                    error=str(exc),
                )
            # Fail-open: no entities detected, pass through original text
            entities = []

        # ── Stage 3: Audit detection ──────────────────────────────────────────
        if entities:
            event = make_detection_event(self.agent_id, document_id, entities)
            event.scan_id = scan_id
            self.audit.emit(event)

        # ── Stage 4: Redact ───────────────────────────────────────────────────
        try:
            safe_text, redaction_records = redact(
                text,
                entities,
                strategy=self.config.redaction_strategy,
            )
        except Exception as exc:
            logger.error("Redaction failed for %s: %s", document_id, exc)
            # Redaction failure always fail-closes — never pass unredacted text
            self._audit_blocked(scan_id, document_id, str(exc))
            return PipelineResult(
                safe_text=_BLOCKED_SENTINEL,
                entities_detected=entities,
                redaction_records=[],
                scan_id=scan_id,
                blocked=True,
                error=str(exc),
            )

        # ── Stage 5: Audit redaction ──────────────────────────────────────────
        if redaction_records:
            self.audit.emit(AuditEvent(
                event_type="REDACTION",
                agent_id=self.agent_id,
                data_classification="PII" if entities else "NONE",
                action_taken=f"Applied {self.config.redaction_strategy} to {len(redaction_records)} entities",
                document_id=document_id,
                scan_id=scan_id,
                strategy=self.config.redaction_strategy,
                entity_hashes=[r.original_hash for r in redaction_records],
                entity_count=len(redaction_records),
            ))

        # ── Stage 6: Return safe output ───────────────────────────────────────
        return PipelineResult(
            safe_text=safe_text,
            entities_detected=entities,
            redaction_records=redaction_records,
            scan_id=scan_id,
            blocked=False,
        )

    def _audit_blocked(self, scan_id: str, document_id: str, reason: str) -> None:
        self.audit.emit(AuditEvent(
            event_type="ACCESS",
            agent_id=self.agent_id,
            data_classification="NONE",
            action_taken=f"Output blocked: {reason[:200]}",
            document_id=document_id,
            scan_id=scan_id,
        ))

# ── Factory ───────────────────────────────────────────────────────────────────

def build_pipeline(config_path: Path | None = None) -> PiiGuardPipeline:
    """Build a pipeline from the standard config + environment."""
    import argparse
    args = argparse.Namespace()
    cfg = build_config(args, config_path=config_path or Path("pii-guard.yaml"))
    audit = AuditLogger(
        log_path=Path("logs/pii-guard-audit.jsonl"),
        siem_url=cfg.siem_url if hasattr(cfg, "siem_url") else None,
    )
    return PiiGuardPipeline(config=cfg, audit_logger=audit)

Claude Agent SDK integration

The pipeline fits naturally as a pre/post hook in the Claude Agent SDK. The pre-processing hook redacts PII from any user input before it reaches the model; the post-processing hook catches any PII-shaped content the model might have introduced in its output.

agent_hook.py — Claude Agent SDK integration
from __future__ import annotations
from pathlib import Path
from claude_code_sdk import ClaudeCodeSDK, HookContext

from .pipeline import build_pipeline, PipelineResult

# Build the pipeline once at module load (reads config + opens audit log)
_pipeline = build_pipeline()

def pii_pre_hook(ctx: HookContext) -> str:
    """
    Pre-processing hook: redact PII from user input before the model sees it.
    Registered via ClaudeCodeSDK.add_pre_hook().
    """
    result: PipelineResult = _pipeline.process(
        text=ctx.user_message,
        context={"document_id": ctx.session_id, "operator_id": ctx.user_id},
    )

    if result.blocked:
        # Fail-closed: replace user message with a safe error
        return "[Input blocked: PII detection failed. Please try again.]"

    if result.entities_detected:
        # Optionally inform the model what was redacted
        note = f"[{len(result.entities_detected)} PII entities redacted from input]"
        return result.safe_text + "\n\n" + note

    return result.safe_text

def pii_post_hook(ctx: HookContext) -> str:
    """
    Post-processing hook: scan model output for residual PII before returning to user.
    Model outputs are lower risk but should still be checked for hallucinated PII.
    Use a higher threshold here — false positives in output are more disruptive.
    """
    result: PipelineResult = _pipeline.process(
        text=ctx.model_response,
        context={"document_id": f"{ctx.session_id}-out"},
    )
    return result.safe_text  # return original if blocked (fail-open for output)

# ── SDK registration ──────────────────────────────────────────────────────────

def register_hooks(sdk: ClaudeCodeSDK) -> None:
    sdk.add_pre_hook(pii_pre_hook)
    sdk.add_post_hook(pii_post_hook)

Performance: async processing and caching

For high-throughput scenarios (bulk document processing, CI/CD pipeline scans), two optimizations matter most:

  • Async Pass 2: The model confirmation calls are I/O-bound. Run them concurrently with asyncio.gather() — a document with 10 candidates can run all 10 confirmations in parallel rather than serially. Wall time drops from ~10s to ~1.5s at typical API latencies.
  • Entity map caching: When the same document is processed multiple times (e.g., a template that appears in many output files), cache the confirmed entity map by a hash of the document content. Skip Pass 2 on cache hit. Use an LRU cache with a short TTL (5 minutes) to avoid stale results.
cache.py — Simple entity map cache
from __future__ import annotations
import hashlib
import time

_CACHE: dict[str, tuple[list, float]] = {}
_TTL_SECONDS = 300  # 5 minutes

def _doc_hash(text: str) -> str:
    return hashlib.sha256(text.encode()).hexdigest()

def get_cached_entities(text: str) -> list | None:
    key = _doc_hash(text)
    if key in _CACHE:
        entities, ts = _CACHE[key]
        if time.monotonic() - ts < _TTL_SECONDS:
            return entities
        del _CACHE[key]
    return None

def set_cached_entities(text: str, entities: list) -> None:
    key = _doc_hash(text)
    _CACHE[key] = (entities, time.monotonic())

Testing strategy

Test each pipeline stage independently before integration testing the full flow. This makes failures easy to localize:

  • Unit: Pass 1 regex — Test each regex pattern against a set of true positives (real SSNs, emails, etc.) and true negatives (schema files, test fixtures). Aim for 100% recall on the true positive set; accept some false positives to be filtered by Pass 2.
  • Unit: Pass 2 confirmation — Mock the API call. Test that context-window logic correctly distinguishes "ssn" as a column header from "ssn" as a value. Verify the schema is enforced and invalid model responses are handled.
  • Unit: Redaction — For each strategy, test that the output format is correct and that the original value is not present in the output. Test overlap handling.
  • Unit: Audit logger — Test that the log file grows monotonically (never truncated), that PII values are absent from log entries, and that each event is valid JSON parseable independently.
  • Integration: Full pipeline — Run a sample document through the full Pipeline.process() method. Verify the output is safe, the audit log has DETECTION and REDACTION events, and the redaction records match the detected entities.

The graduation checklist

Before going to production, verify all 8 items:

  • [ ] Secrets are external. ANTHROPIC_API_KEY and PII_PSEUDO_SECRET are injected via environment or secret manager. No secrets in YAML, no secrets in code.
  • [ ] Fail-closed is configured. Pass 2 API failures block output, not pass through unredacted text. Verify this with a mock API error in your test suite.
  • [ ] Audit log is append-only. Verify the log file has the append-only attribute set on the filesystem (lsattr on Linux) or is writing to WORM storage.
  • [ ] PII values are absent from the audit log. Grep the log for known test PII values after a test run. None should appear.
  • [ ] Confidence threshold is documented. The chosen threshold and the precision/recall tradeoff at that threshold are written down and approved by whoever owns the compliance requirement.
  • [ ] Exclusion patterns are reviewed. Every pattern in exclusion_patterns is intentional and has a named owner who can be asked "why is this excluded?"
  • [ ] SIEM integration is tested. Send a test event and confirm it appears in the SIEM dashboard. Verify the SIEM_TOKEN is rotated from the default.
  • [ ] Retention policy is in place. Log rotation is configured, and the retention period matches your contract or regulatory requirement.
💡

This checklist is an engineering verification, not a legal certification. Run it past your security team before handling production data. The pipeline provides the technical controls — your legal and compliance teams define the policies those controls need to express.

← 05 — Audit Trails & Logging Series Overview End of series