End-to-End Pipeline
Pipeline class with a
.process(text, context) method — the only interface an agent needs. You'll also
see how to register it as a pre/post hook in the Claude Agent SDK, handle failures safely,
and verify the system is production-ready before deploying it.
Architecture overview
The pipeline has six stages, each with a defined input, output, and failure mode:
| Stage | Input | Output | Failure mode |
|---|---|---|---|
| 1. Config load | YAML file + CLI args | PiiGuardConfig | Raise on missing API key; warn on invalid threshold |
| 2. Pass 1 detect | Raw text | Candidate span list | Presidio error → log + treat as no candidates (fail-open) |
| 3. Pass 2 confirm | Candidates + context | Confirmed entity list | API error → retry x2, then fail-closed (block output) |
| 4. Redact | Text + confirmed entities | Redacted text + redaction records | Strategy error → fail-closed (return original text redacted with masks) |
| 5. Audit | Entities + redaction records | Audit log entries written | Disk full → write to SIEM only; log warning locally |
| 6. Output | Redacted text | Caller receives safe text | If any upstream stage failed-closed, return sentinel value |
Fail-open vs fail-closed
Every pipeline that sits between an agent and its data source must decide what to do when a stage fails. The choice is not aesthetic — it determines your default security posture:
Python reference implementation
from __future__ import annotations
import asyncio
import hashlib
import logging
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
# Imports from previous posts in this series
from .config import PiiGuardConfig, build_config
from .detector import detect, DetectedEntity
from .redactor import redact, RedactionRecord
from .audit_log import AuditLogger, make_detection_event, AuditEvent
logger = logging.getLogger(__name__)
# ── Pipeline result ───────────────────────────────────────────────────────────
@dataclass
class PipelineResult:
safe_text: str
entities_detected: list[DetectedEntity]
redaction_records: list[RedactionRecord]
scan_id: str
blocked: bool = False # True when fail-closed triggered
error: str | None = None
_BLOCKED_SENTINEL = "[PIPELINE_BLOCKED: detection failed, output suppressed]"
# ── Main pipeline class ───────────────────────────────────────────────────────
class PiiGuardPipeline:
"""
Single entry point for PII/CUI detection and redaction.
Call .process(text, context) from your agent code.
"""
def __init__(
self,
config: PiiGuardConfig,
audit_logger: AuditLogger,
agent_id: str | None = None,
) -> None:
self.config = config
self.audit = audit_logger
self.agent_id = agent_id or f"pii-guard-{uuid.uuid4().hex[:8]}"
def process(
self,
text: str,
context: dict[str, Any] | None = None,
) -> PipelineResult:
"""
Run the full pipeline synchronously.
context: optional metadata (document_id, operator_id, etc.)
"""
scan_id = str(uuid.uuid4())
ctx = context or {}
document_id = ctx.get("document_id", hashlib.md5(text.encode()).hexdigest()[:12])
# ── Stage 2: Pass 1 — deterministic sweep ────────────────────────────
try:
# detect() in Post 02 runs both passes internally
entities = detect(
text,
confidence_threshold=self.config.confidence_threshold,
)
except Exception as exc:
logger.error("Detection failed for %s: %s", document_id, exc)
if self.config.fail_closed:
self._audit_blocked(scan_id, document_id, str(exc))
return PipelineResult(
safe_text=_BLOCKED_SENTINEL,
entities_detected=[],
redaction_records=[],
scan_id=scan_id,
blocked=True,
error=str(exc),
)
# Fail-open: no entities detected, pass through original text
entities = []
# ── Stage 3: Audit detection ──────────────────────────────────────────
if entities:
event = make_detection_event(self.agent_id, document_id, entities)
event.scan_id = scan_id
self.audit.emit(event)
# ── Stage 4: Redact ───────────────────────────────────────────────────
try:
safe_text, redaction_records = redact(
text,
entities,
strategy=self.config.redaction_strategy,
)
except Exception as exc:
logger.error("Redaction failed for %s: %s", document_id, exc)
# Redaction failure always fail-closes — never pass unredacted text
self._audit_blocked(scan_id, document_id, str(exc))
return PipelineResult(
safe_text=_BLOCKED_SENTINEL,
entities_detected=entities,
redaction_records=[],
scan_id=scan_id,
blocked=True,
error=str(exc),
)
# ── Stage 5: Audit redaction ──────────────────────────────────────────
if redaction_records:
self.audit.emit(AuditEvent(
event_type="REDACTION",
agent_id=self.agent_id,
data_classification="PII" if entities else "NONE",
action_taken=f"Applied {self.config.redaction_strategy} to {len(redaction_records)} entities",
document_id=document_id,
scan_id=scan_id,
strategy=self.config.redaction_strategy,
entity_hashes=[r.original_hash for r in redaction_records],
entity_count=len(redaction_records),
))
# ── Stage 6: Return safe output ───────────────────────────────────────
return PipelineResult(
safe_text=safe_text,
entities_detected=entities,
redaction_records=redaction_records,
scan_id=scan_id,
blocked=False,
)
def _audit_blocked(self, scan_id: str, document_id: str, reason: str) -> None:
self.audit.emit(AuditEvent(
event_type="ACCESS",
agent_id=self.agent_id,
data_classification="NONE",
action_taken=f"Output blocked: {reason[:200]}",
document_id=document_id,
scan_id=scan_id,
))
# ── Factory ───────────────────────────────────────────────────────────────────
def build_pipeline(config_path: Path | None = None) -> PiiGuardPipeline:
"""Build a pipeline from the standard config + environment."""
import argparse
args = argparse.Namespace()
cfg = build_config(args, config_path=config_path or Path("pii-guard.yaml"))
audit = AuditLogger(
log_path=Path("logs/pii-guard-audit.jsonl"),
siem_url=cfg.siem_url if hasattr(cfg, "siem_url") else None,
)
return PiiGuardPipeline(config=cfg, audit_logger=audit) Claude Agent SDK integration
The pipeline fits naturally as a pre/post hook in the Claude Agent SDK. The pre-processing hook redacts PII from any user input before it reaches the model; the post-processing hook catches any PII-shaped content the model might have introduced in its output.
from __future__ import annotations
from pathlib import Path
from claude_code_sdk import ClaudeCodeSDK, HookContext
from .pipeline import build_pipeline, PipelineResult
# Build the pipeline once at module load (reads config + opens audit log)
_pipeline = build_pipeline()
def pii_pre_hook(ctx: HookContext) -> str:
"""
Pre-processing hook: redact PII from user input before the model sees it.
Registered via ClaudeCodeSDK.add_pre_hook().
"""
result: PipelineResult = _pipeline.process(
text=ctx.user_message,
context={"document_id": ctx.session_id, "operator_id": ctx.user_id},
)
if result.blocked:
# Fail-closed: replace user message with a safe error
return "[Input blocked: PII detection failed. Please try again.]"
if result.entities_detected:
# Optionally inform the model what was redacted
note = f"[{len(result.entities_detected)} PII entities redacted from input]"
return result.safe_text + "\n\n" + note
return result.safe_text
def pii_post_hook(ctx: HookContext) -> str:
"""
Post-processing hook: scan model output for residual PII before returning to user.
Model outputs are lower risk but should still be checked for hallucinated PII.
Use a higher threshold here — false positives in output are more disruptive.
"""
result: PipelineResult = _pipeline.process(
text=ctx.model_response,
context={"document_id": f"{ctx.session_id}-out"},
)
return result.safe_text # return original if blocked (fail-open for output)
# ── SDK registration ──────────────────────────────────────────────────────────
def register_hooks(sdk: ClaudeCodeSDK) -> None:
sdk.add_pre_hook(pii_pre_hook)
sdk.add_post_hook(pii_post_hook) Performance: async processing and caching
For high-throughput scenarios (bulk document processing, CI/CD pipeline scans), two optimizations matter most:
- Async Pass 2: The model confirmation calls are I/O-bound. Run them
concurrently with
asyncio.gather()— a document with 10 candidates can run all 10 confirmations in parallel rather than serially. Wall time drops from ~10s to ~1.5s at typical API latencies. - Entity map caching: When the same document is processed multiple times (e.g., a template that appears in many output files), cache the confirmed entity map by a hash of the document content. Skip Pass 2 on cache hit. Use an LRU cache with a short TTL (5 minutes) to avoid stale results.
from __future__ import annotations
import hashlib
import time
_CACHE: dict[str, tuple[list, float]] = {}
_TTL_SECONDS = 300 # 5 minutes
def _doc_hash(text: str) -> str:
return hashlib.sha256(text.encode()).hexdigest()
def get_cached_entities(text: str) -> list | None:
key = _doc_hash(text)
if key in _CACHE:
entities, ts = _CACHE[key]
if time.monotonic() - ts < _TTL_SECONDS:
return entities
del _CACHE[key]
return None
def set_cached_entities(text: str, entities: list) -> None:
key = _doc_hash(text)
_CACHE[key] = (entities, time.monotonic()) Testing strategy
Test each pipeline stage independently before integration testing the full flow. This makes failures easy to localize:
- Unit: Pass 1 regex — Test each regex pattern against a set of true positives (real SSNs, emails, etc.) and true negatives (schema files, test fixtures). Aim for 100% recall on the true positive set; accept some false positives to be filtered by Pass 2.
- Unit: Pass 2 confirmation — Mock the API call. Test that context-window logic correctly distinguishes "ssn" as a column header from "ssn" as a value. Verify the schema is enforced and invalid model responses are handled.
- Unit: Redaction — For each strategy, test that the output format is correct and that the original value is not present in the output. Test overlap handling.
- Unit: Audit logger — Test that the log file grows monotonically (never truncated), that PII values are absent from log entries, and that each event is valid JSON parseable independently.
- Integration: Full pipeline — Run a sample document through the full
Pipeline.process()method. Verify the output is safe, the audit log has DETECTION and REDACTION events, and the redaction records match the detected entities.
The graduation checklist
Before going to production, verify all 8 items:
- [ ] Secrets are external.
ANTHROPIC_API_KEYandPII_PSEUDO_SECRETare injected via environment or secret manager. No secrets in YAML, no secrets in code. - [ ] Fail-closed is configured. Pass 2 API failures block output, not pass through unredacted text. Verify this with a mock API error in your test suite.
- [ ] Audit log is append-only. Verify the log file has the append-only
attribute set on the filesystem (
lsattron Linux) or is writing to WORM storage. - [ ] PII values are absent from the audit log. Grep the log for known test PII values after a test run. None should appear.
- [ ] Confidence threshold is documented. The chosen threshold and the precision/recall tradeoff at that threshold are written down and approved by whoever owns the compliance requirement.
- [ ] Exclusion patterns are reviewed. Every pattern in
exclusion_patternsis intentional and has a named owner who can be asked "why is this excluded?" - [ ] SIEM integration is tested. Send a test event and confirm it appears in the SIEM dashboard. Verify the SIEM_TOKEN is rotated from the default.
- [ ] Retention policy is in place. Log rotation is configured, and the retention period matches your contract or regulatory requirement.
This checklist is an engineering verification, not a legal certification. Run it past your security team before handling production data. The pipeline provides the technical controls — your legal and compliance teams define the policies those controls need to express.