Files
git.stella-ops.org/docs/architecture/EVIDENCE_PIPELINE_ARCHITECTURE.md

605 lines
24 KiB
Markdown

# Evidence Pipeline Architecture - Consolidation Guide
**Version**: 1.1
**Status**: Reference Architecture
**Last Updated**: 2026-01-14
---
## Overview
This document describes how existing Stella Ops components integrate to form the complete evidence pipeline. The key insight is that **most components already exist** - this guide shows how to wire them together.
---
## Component Map
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ EVIDENCE PIPELINE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌──────────────────┐ ┌─────────────────────┐ │
│ │ Concelier │───▶│ CveSymbolMapping │───▶│ ReachabilityJob │ │
│ │ (CVE Data) │ │ Service │ │ Executor │ │
│ └─────────────┘ └──────────────────┘ └──────────┬──────────┘ │
│ │ │ │ │
│ │ │ ▼ │
│ │ │ ┌───────────────────────┐ │
│ │ │ │ ReachabilityAnalyzer │ │
│ │ │ │ (BFS Call Graph) │ │
│ │ │ └──────────┬────────────┘ │
│ │ │ │ │
│ │ │ ▼ │
│ │ │ ┌───────────────────────┐ │
│ │ │ │ ReachabilityStack │ │
│ │ │ │ Evaluator │ │
│ │ │ │ (3-Layer Verdict) │ │
│ │ │ └──────────┬────────────┘ │
│ │ │ │ │
│ │ │ ┌───────────────┼───────────────┐ │
│ │ │ ▼ ▼ ▼ │
│ │ │ ┌─────────┐ ┌──────────┐ ┌─────────┐ │
│ │ │ │ Layer 1 │ │ Layer 2 │ │ Layer 3 │ │
│ │ │ │ Static │ │ Binary │ │ Runtime │ │
│ │ │ └────┬────┘ └────┬─────┘ └────┬────┘ │
│ │ │ │ │ │ │
│ │ │ ▼ ▼ ▼ │
│ │ │ ┌──────────────────────────────────────┐ │
│ │ │ │ EvidenceBundle │ │
│ │ │ │ - ReachabilityEvidence │ │
│ │ │ │ - PatchDiffEvidence │ │
│ │ │ │ - RuntimeObservationEvidence │ │
│ │ │ └─────────────────┬────────────────────┘ │
│ │ │ │ │
│ │ │ ▼ │
│ │ │ ┌──────────────────────────────────────┐ │
│ │ │ │ ReachabilityWitnessDsseBuilder │ │
│ │ │ │ (in-toto Attestation) │ │
│ │ │ └─────────────────┬────────────────────┘ │
│ │ │ │ │
│ │ │ ▼ │
│ │ │ ┌──────────────────────────────────────┐ │
│ │ │ │ EvidenceDbContext │ │
│ │ │ │ (Postgres) │ │
│ │ │ └─────────────────┬────────────────────┘ │
│ │ │ │ │
│ │ │ ▼ │
│ │ │ ┌──────────────────────────────────────┐ │
│ │ └───▶│ VexStatusDeterminer │ │
│ │ │ (Verdict → VEX) │ │
│ │ └─────────────────┬────────────────────┘ │
│ │ │ │
│ │ ▼ │
│ │ ┌──────────────────────────────────────┐ │
│ └────────────────────────▶│ VexHub │ │
│ │ (VEX Documents) │ │
│ └──────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
```
---
## Existing Components Reference
### Layer 1: Static Call Graph Analysis
**Already Implemented** - Wire together existing pieces.
| Component | File | Purpose |
|-----------|------|---------|
| `ReachabilityAnalyzer` | `Scanner.CallGraph/Analysis/ReachabilityAnalyzer.cs` | BFS from entrypoints to sinks |
| `CallGraphSnapshot` | `Scanner.CallGraph/CallGraphSnapshot.cs` | Immutable graph representation |
| `DotNetCallGraphExtractor` | `Scanner.CallGraph/Extraction/DotNet/` | .NET-specific extraction |
| `DotNetReachabilityLifter` | `Scanner.Reachability/Lifters/` | Lift to union model |
| `ReachabilityUnionGraph` | `Scanner.Reachability/ReachabilityUnionSchemas.cs` | Unified multi-lang graph |
**Integration Point**:
```csharp
// In ReachabilityEvidenceJobExecutor
var snapshot = await _callGraphCache.GetOrComputeAsync(imageDigest, ct);
var result = _analyzer.Analyze(snapshot, new ReachabilityAnalysisOptions
{
ExplicitSinks = sinks.Select(s => s.CanonicalId).ToImmutableArray()
});
var layer1 = new ReachabilityLayer1
{
IsReachable = result.ReachableSinkIds.Length > 0,
Confidence = ConfidenceLevel.High,
Paths = result.Paths.Select(ToLayer1Path).ToImmutableArray(),
ReachingEntrypoints = result.Paths.Select(p => p.EntrypointId).Distinct().ToImmutableArray(),
AnalysisMethod = "BFS"
};
```
### Layer 2: Binary Resolution
**Partially Implemented** - Add patch verification.
| Component | File | Purpose |
|-----------|------|---------|
| `HeuristicScanner` | `Scanner.Analyzers.Native/HeuristicScanner.cs` | Binary format detection |
| `NativeAnalyzer` | `Scanner.Analyzers.Native/Plugin/` | Native binary analysis |
| B2R2 (dependency) | NuGet | Binary lifting/IR |
BinaryIndex ops and configuration (read-only):
- Ops endpoints: GET `/api/v1/ops/binaryindex/health` -> BinaryIndexOpsHealthResponse, POST `/api/v1/ops/binaryindex/bench/run` -> BinaryIndexBenchResponse, GET `/api/v1/ops/binaryindex/cache` -> BinaryIndexFunctionCacheStats, GET `/api/v1/ops/binaryindex/config` -> BinaryIndexEffectiveConfig.
- Config sections (case-insensitive): `BinaryIndex:B2R2Pool`, `BinaryIndex:SemanticLifting`, `BinaryIndex:FunctionCache` (Valkey), `Postgres:BinaryIndex` (canonical IR persistence).
**New Component Needed**:
```csharp
// IBinaryDiffService implementation using B2R2
public sealed class B2R2BinaryDiffService : IBinaryDiffService
{
public async Task<PatchDiffResult> DiffAsync(
Stream vulnerableBinary,
Stream patchedBinary,
IReadOnlyList<string> targetSymbols,
CancellationToken ct)
{
// Use B2R2.FrontEnd.BinFile to load binaries
// Use B2R2.MiddleEnd.BinGraph for CFG comparison
// Compare function IRs for targeted symbols
}
}
```
### Layer 3: Runtime Gating
**Models Exist** - Adapters needed.
| Component | File | Purpose |
|-----------|------|---------|
| `RuntimeEvidence` | `Scanner.Analyzers.Native/RuntimeCapture/RuntimeEvidence.cs` | Evidence model |
| `RuntimeLoadEvent` | Same file | Individual load event |
| `RuntimeCaptureSession` | Same file | Session container |
| `RuntimeCaptureOptions` | `RuntimeCaptureOptions.cs` | Configuration |
**Existing Model** (no changes needed):
```csharp
// Already defined in RuntimeEvidence.cs
public sealed record RuntimeEvidence(
IReadOnlyList<RuntimeCaptureSession> Sessions,
IReadOnlyList<RuntimeLibrarySummary> UniqueLibraries,
IReadOnlyList<RuntimeDependencyEdge> RuntimeEdges);
```
**New Adapter Needed**:
```csharp
// TetragonAdapter - implements IRuntimeCaptureAdapter
public sealed class TetragonAdapter : IRuntimeCaptureAdapter
{
private readonly TetragonClient _client;
public string Platform => "linux";
public string Method => "ebpf";
public async IAsyncEnumerable<RuntimeLoadEvent> StreamEventsAsync(
string sessionId,
[EnumeratorCancellation] CancellationToken ct)
{
await foreach (var evt in _client.StreamAsync(sessionId, ct))
{
yield return new RuntimeLoadEvent(
Timestamp: evt.Time.ToDateTime(),
ProcessId: (int)evt.Process.Pid,
ThreadId: 0, // Tetragon doesn't track thread
LoadType: MapLoadType(evt),
RequestedPath: evt.Args?.FirstOrDefault()?.StringArg ?? "",
ResolvedPath: null,
LoadAddress: null,
Success: true,
ErrorCode: null,
CallerModule: evt.Process.Binary,
CallerAddress: null);
}
}
}
```
### Verdict Evaluation
**Already Implemented** - Use as-is.
| Component | File | Purpose |
|-----------|------|---------|
| `ReachabilityStackEvaluator` | `Scanner.Reachability/Stack/ReachabilityStackEvaluator.cs` | 3-layer verdict |
| `ReachabilityVerdict` | Same file | Verdict enum |
| `ReachabilityStack` | `Scanner.Reachability/Stack/ReachabilityStack.cs` | Stack model |
**Verdict Logic** (already implemented):
```csharp
public ReachabilityVerdict DeriveVerdict(
ReachabilityLayer1 layer1,
ReachabilityLayer2 layer2,
ReachabilityLayer3 layer3)
{
// L1 not reachable → Unreachable
// L2 not resolved → Unreachable
// L3 gated → Unreachable
// All clear → Exploitable
// Mixed → LikelyExploitable or PossiblyExploitable
}
```
### Evidence Storage
**Already Implemented** - Use as-is.
| Component | File | Purpose |
|-----------|------|---------|
| `EvidenceBundle` | `Evidence.Bundle/EvidenceBundle.cs` | Container |
| `ReachabilityEvidence` | `Evidence.Bundle/ReachabilityEvidence.cs` | Reachability proof |
| `EvidenceDbContext` | `Evidence.Persistence/EfCore/Context/` | Postgres |
| `IEvidenceStore` | `Evidence.Core/IEvidenceStore.cs` | Store interface |
### DSSE Attestation
**Already Implemented** - Wire to Authority for real signing.
| Component | File | Purpose |
|-----------|------|---------|
| `ReachabilityWitnessDsseBuilder` | `Scanner.Reachability/Attestation/` | Build in-toto statements |
| `ReachabilityWitnessStatement` | Same dir | Statement model |
| `IDsseEnvelopeSigner` | `Scanner.Worker/Processing/Surface/` | Signing interface |
| `DeterministicDsseEnvelopeSigner` | Same file | Fallback signer |
**Integration**:
```csharp
var statement = _dsseBuilder.BuildStatement(
graph: richGraph,
graphHash: graphHash,
subjectDigest: imageDigest,
graphCasUri: casUri,
policyHash: null,
sourceCommit: commit);
var statementBytes = _dsseBuilder.SerializeStatement(statement);
var envelope = await _signer.SignAsync(
payloadType: "https://stella.ops/reachabilityWitness/v1",
content: statementBytes,
suggestedKind: "reachability",
merkleRoot: graphHash,
view: null,
ct);
```
### VEX Integration
**Partially Implemented** - Add verdict bridge.
| Component | File | Purpose |
|-----------|------|---------|
| `VexHubDbContext` | `VexHub/` | VEX storage |
| `VexLens` | `VexLens/` | VEX analysis |
| `TriageEffectiveVex` | `Scanner.WebService/` | Effective VEX |
**New Bridge Needed**:
```csharp
public sealed class VexStatusDeterminer : IVexStatusDeterminer
{
public VexStatus DetermineStatus(ReachabilityVerdict verdict) => verdict switch
{
ReachabilityVerdict.Exploitable => VexStatus.Affected,
ReachabilityVerdict.LikelyExploitable => VexStatus.Affected,
ReachabilityVerdict.PossiblyExploitable => VexStatus.UnderInvestigation,
ReachabilityVerdict.Unreachable => VexStatus.NotAffected,
ReachabilityVerdict.Unknown => VexStatus.UnderInvestigation,
_ => VexStatus.UnderInvestigation
};
public VexJustification BuildJustification(
ReachabilityStack stack,
IReadOnlyList<string> evidenceUris)
{
var detail = stack.Verdict switch
{
ReachabilityVerdict.Unreachable =>
"Vulnerable code is not reachable from application entrypoints.",
ReachabilityVerdict.Exploitable =>
$"Vulnerable code is reachable via {stack.StaticCallGraph.Paths.Length} call path(s).",
_ => "Reachability analysis completed."
};
return new VexJustification
{
Category = stack.Verdict == ReachabilityVerdict.Unreachable
? JustificationCategory.CodeNotReachable
: JustificationCategory.RequiresDependentCode,
Detail = detail,
EvidenceReferences = evidenceUris
};
}
}
```
---
## Data Flow Sequence
### 1. Scan Trigger → Reachability Analysis
```
1. Scanner.WebService receives POST /api/reachability/analyze
└─ { imageDigest, cveId, purl }
2. ReachabilityEvidenceJob queued to Scanner.Queue
└─ Job contains: imageDigest, cveId, purl, options
3. Scanner.Worker picks up job
├─ ICveSymbolMappingService.GetSinksForCveAsync(cveId, purl)
│ └─ Returns: [VulnerableSymbol{name: "JndiLookup.lookup", ...}]
├─ ICallGraphCache.GetOrComputeAsync(imageDigest)
│ └─ Returns: CallGraphSnapshot (nodes, edges, entrypoints, sinks)
├─ ReachabilityAnalyzer.Analyze(snapshot, options{ExplicitSinks})
│ └─ Returns: ReachabilityAnalysisResult (paths, reachableSinks)
├─ BuildLayer1 from analysis result
│ └─ Layer1{IsReachable, Confidence, Paths, ReachingEntrypoints}
├─ ReachabilityStackEvaluator.Evaluate(findingId, symbol, L1, L2, L3)
│ └─ Returns: ReachabilityStack with Verdict
└─ IEvidenceStore.StoreAsync(stack.ToEvidenceBundle())
└─ Returns: EvidenceBundleId
4. Optional: Emit VEX
├─ IVexStatusDeterminer.DetermineStatus(verdict)
├─ IVexStatusDeterminer.BuildJustification(stack, [evidenceUri])
└─ VexHub.EmitAsync(vexDocument)
```
### 2. Runtime Observation → Layer 3 Update
```
1. RuntimeEvidenceCollector monitors container
├─ TetragonAdapter.StartSessionAsync(options)
└─ TetragonAdapter.StreamEventsAsync(sessionId)
└─ Yields: RuntimeLoadEvent[]
2. RuntimeEvidenceCorrelator processes events
├─ Match loaded libraries to vulnerable symbols
├─ Check if sink functions were called
└─ Build RuntimeObservation
3. Update ReachabilityStack Layer 3
├─ Layer3{IsGated: false/true, Outcome, Conditions}
└─ Re-evaluate verdict with ReachabilityStackEvaluator
4. Store updated evidence
└─ IEvidenceStore.UpdateAsync(bundleId, updatedStack)
```
### 3. Patch Verification → Layer 2 Update
```
1. PatchVerificationJob triggered for "distro claims fixed"
├─ Download vulnerable binary from upstream
├─ Download patched binary from distro
└─ Get target symbols from CVE mapping
2. IBinaryDiffService.DiffAsync(vulnerable, patched, symbols)
└─ Returns: PatchDiffResult{IsPatched, ChangedFunctions, SimilarityScore}
3. Update ReachabilityStack Layer 2
├─ Layer2{IsResolved: !IsPatched, Resolution, Reason}
└─ Re-evaluate verdict
4. Store patch diff evidence
└─ EvidenceBundle += PatchDiffEvidence
```
---
## Database Schema Integration
### Existing Tables Used
```sql
-- CVE-Symbol Mapping (exists in reachability schema)
reachability.cve_symbol_mappings
reachability.vulnerable_symbols
reachability.patch_analysis
-- Evidence Storage (exists in evidence schema)
evidence.evidence_bundles
evidence.evidence_records
evidence.attestations
-- VEX (exists in vex schema)
vex.documents
vex.statements
vex.analysis
```
### New Tables Needed
```sql
-- Runtime observations
CREATE TABLE reachability.runtime_observations (
observation_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
scan_id UUID NOT NULL,
image_digest TEXT NOT NULL,
session_id TEXT NOT NULL,
symbol_name TEXT,
observed_at TIMESTAMPTZ NOT NULL,
load_type TEXT,
process_id INTEGER,
correlated_cve_id TEXT,
correlated_finding_id UUID,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- VEX-Evidence linking
CREATE TABLE reachability.vex_evidence_links (
link_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
vex_document_id UUID NOT NULL,
evidence_bundle_id UUID NOT NULL,
evidence_type TEXT NOT NULL,
linked_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
```
---
## Configuration
### Scanner.Worker appsettings.json
```json
{
"Reachability": {
"EnableL2Analysis": false,
"EnableL3Analysis": false,
"MaxPathsPerSink": 5,
"MaxDepth": 256,
"CacheGraphs": true,
"GraphCacheTtlMinutes": 60
},
"RuntimeCapture": {
"Adapter": "tetragon",
"SessionDurationSeconds": 300,
"EventBufferSize": 10000
},
"BinaryDiff": {
"MaxBinarySizeMb": 100,
"TimeoutSeconds": 120
}
}
```
### Tetragon TracingPolicy
```yaml
apiVersion: cilium.io/v1alpha1
kind: TracingPolicy
metadata:
name: stella-library-loads
spec:
kprobes:
- call: "do_dlopen"
syscall: false
args:
- index: 0
type: "string"
- call: "load_elf_binary"
syscall: false
args:
- index: 0
type: "file"
```
---
## Testing Strategy
### Unit Tests
```csharp
// Test Layer 1 analysis
[Fact]
public void ReachabilityAnalyzer_FindsPath_WhenSinkReachable()
{
var snapshot = CreateGraphWithPath("entry", "sink");
var result = _analyzer.Analyze(snapshot, new() { ExplicitSinks = ["sink"] });
Assert.Single(result.Paths);
Assert.Equal("entry", result.Paths[0].EntrypointId);
Assert.Equal("sink", result.Paths[0].SinkId);
}
// Test verdict evaluation
[Theory]
[InlineData(true, true, false, ReachabilityVerdict.Exploitable)]
[InlineData(true, true, true, ReachabilityVerdict.Unreachable)]
[InlineData(false, true, false, ReachabilityVerdict.Unreachable)]
public void StackEvaluator_DeterminesVerdict_Correctly(
bool l1Reachable, bool l2Resolved, bool l3Gated, ReachabilityVerdict expected)
{
var layer1 = new ReachabilityLayer1 { IsReachable = l1Reachable, ... };
var layer2 = new ReachabilityLayer2 { IsResolved = l2Resolved, ... };
var layer3 = new ReachabilityLayer3 { IsGated = l3Gated, ... };
var verdict = _evaluator.DeriveVerdict(layer1, layer2, layer3);
Assert.Equal(expected, verdict);
}
```
### Integration Tests
```csharp
// Test end-to-end reachability job
[Fact]
public async Task ReachabilityJob_ProducesEvidence_ForKnownCve()
{
var job = new ReachabilityEvidenceJob(
JobId: Guid.NewGuid().ToString("N"),
ImageDigest: "sha256:test123",
CveId: "CVE-2021-44228",
Purl: "pkg:maven/org.apache.logging.log4j/log4j-core@2.14.1",
SourceCommit: null,
Options: new(),
QueuedAt: DateTimeOffset.UtcNow);
var stack = await _executor.ExecuteAsync(job, CancellationToken.None);
Assert.NotNull(stack);
Assert.Equal("CVE-2021-44228:pkg:maven/...", stack.FindingId);
Assert.NotNull(stack.StaticCallGraph);
}
```
---
## Monitoring & Observability
### Metrics
```csharp
// Meter: stella.reachability
Counter<long> reachability_jobs_total { status = "completed|failed" }
Histogram<double> reachability_analysis_duration_seconds
Counter<long> reachability_verdicts_total { verdict = "exploitable|unreachable|..." }
Gauge<long> runtime_capture_sessions_active
```
### Logs
```
[INFO] ReachabilityJob started: jobId={jobId} cveId={cveId} image={digest}
[INFO] CVE sinks resolved: cveId={cveId} sinkCount={count}
[INFO] Call graph loaded: nodes={nodes} edges={edges} entrypoints={eps}
[INFO] Reachability analysis complete: reachable={bool} pathCount={count}
[INFO] Verdict determined: verdict={verdict} findingId={findingId}
[INFO] Evidence stored: bundleId={bundleId}
```
---
## Migration Path
### Phase 1: Enable L1 Only
- Deploy with `EnableL2Analysis: false`, `EnableL3Analysis: false`
- Validate with known CVEs (Log4Shell, Spring4Shell)
- Monitor verdict accuracy
### Phase 2: Add L2 (Binary Resolution)
- Enable patch verification for distro packages
- Deploy B2R2BinaryDiffService
- Set `EnableL2Analysis: true`
### Phase 3: Add L3 (Runtime)
- Deploy Tetragon in cluster
- Enable RuntimeEvidenceCollector
- Set `EnableL3Analysis: true`
### Phase 4: Full VEX Automation
- Enable VexStatusDeterminer auto-population
- Configure VEX refresh triggers
- Monitor VEX accuracy vs manual triage