Files
git.stella-ops.org/docs/modules/eventing/event-envelope-schema.md

497 lines
14 KiB
Markdown

# Event Envelope Schema
> **Version:** 1.0.0
> **Status:** Draft
> **Sprint:** [SPRINT_20260107_003_001_LB](../../implplan/SPRINT_20260107_003_001_LB_event_envelope_sdk.md)
This document specifies the canonical event envelope schema for the StellaOps Unified Event Timeline.
---
## Overview
The event envelope provides a standardized format for all events emitted across StellaOps services. It enables:
- **Unified Timeline:** Cross-service correlation with HLC ordering
- **Deterministic Replay:** Reproducible event streams for forensics
- **Audit Compliance:** DSSE-signed event bundles for export
- **Causal Analysis:** Stage latency measurement and bottleneck identification
---
## Envelope Schema (v1)
### JSON Schema
```json
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://stellaops.org/schemas/timeline-event.v1.json",
"title": "TimelineEvent",
"description": "Canonical event envelope for StellaOps Unified Event Timeline",
"type": "object",
"required": [
"eventId",
"tHlc",
"tsWall",
"service",
"correlationId",
"kind",
"payload",
"payloadDigest",
"engineVersion",
"schemaVersion"
],
"properties": {
"eventId": {
"type": "string",
"description": "Deterministic event ID: SHA-256(correlationId || tHlc || service || kind)[0:32] hex",
"pattern": "^[a-f0-9]{32}$"
},
"tHlc": {
"type": "string",
"description": "HLC timestamp in sortable string format: <physicalTimeMs>:<logicalCounter>:<nodeId>",
"pattern": "^\\d+:\\d+:[a-zA-Z0-9_-]+$"
},
"tsWall": {
"type": "string",
"format": "date-time",
"description": "Wall-clock time in ISO 8601 format (informational only)"
},
"service": {
"type": "string",
"description": "Service name that emitted the event",
"enum": ["Scheduler", "AirGap", "Attestor", "Policy", "VexLens", "Scanner", "Concelier", "Platform"]
},
"traceParent": {
"type": ["string", "null"],
"description": "W3C Trace Context traceparent header",
"pattern": "^[0-9a-f]{2}-[0-9a-f]{32}-[0-9a-f]{16}-[0-9a-f]{2}$"
},
"correlationId": {
"type": "string",
"description": "Correlation ID linking related events (e.g., scanId, jobId, artifactDigest)"
},
"kind": {
"type": "string",
"description": "Event kind/type",
"enum": [
"ENQUEUE", "DEQUEUE", "EXECUTE", "COMPLETE", "FAIL",
"IMPORT", "EXPORT", "MERGE", "CONFLICT",
"ATTEST", "VERIFY",
"EVALUATE", "GATE_PASS", "GATE_FAIL",
"CONSENSUS", "OVERRIDE",
"SCAN_START", "SCAN_COMPLETE",
"EMIT", "ACK", "ERR"
]
},
"payload": {
"type": "string",
"description": "RFC 8785 canonicalized JSON payload"
},
"payloadDigest": {
"type": "string",
"description": "SHA-256 digest of payload as hex string",
"pattern": "^[a-f0-9]{64}$"
},
"engineVersion": {
"type": "object",
"description": "Engine/resolver version for reproducibility",
"required": ["engineName", "version", "sourceDigest"],
"properties": {
"engineName": {
"type": "string",
"description": "Name of the engine/service"
},
"version": {
"type": "string",
"description": "Semantic version string"
},
"sourceDigest": {
"type": "string",
"description": "SHA-256 digest of engine source/binary"
}
}
},
"dsseSig": {
"type": ["string", "null"],
"description": "Optional DSSE signature in format keyId:base64Signature"
},
"schemaVersion": {
"type": "integer",
"description": "Schema version for envelope evolution",
"const": 1
}
}
}
```
### C# Record Definition
```csharp
/// <summary>
/// Canonical event envelope for unified timeline.
/// </summary>
public sealed record TimelineEvent
{
/// <summary>
/// Deterministic event ID: SHA-256(correlationId || tHlc || service || kind)[0:32] hex.
/// NOT a random ULID - ensures replay determinism.
/// </summary>
[Required]
[RegularExpression("^[a-f0-9]{32}$")]
public required string EventId { get; init; }
/// <summary>
/// HLC timestamp from StellaOps.HybridLogicalClock library.
/// </summary>
[Required]
public required HlcTimestamp THlc { get; init; }
/// <summary>
/// Wall-clock time (informational only, not used for ordering).
/// </summary>
[Required]
public required DateTimeOffset TsWall { get; init; }
/// <summary>
/// Service name that emitted the event.
/// </summary>
[Required]
public required string Service { get; init; }
/// <summary>
/// W3C Trace Context traceparent for OpenTelemetry correlation.
/// </summary>
public string? TraceParent { get; init; }
/// <summary>
/// Correlation ID linking related events.
/// </summary>
[Required]
public required string CorrelationId { get; init; }
/// <summary>
/// Event kind (ENQUEUE, EXECUTE, ATTEST, etc.).
/// </summary>
[Required]
public required string Kind { get; init; }
/// <summary>
/// RFC 8785 canonicalized JSON payload.
/// </summary>
[Required]
public required string Payload { get; init; }
/// <summary>
/// SHA-256 digest of Payload.
/// </summary>
[Required]
public required byte[] PayloadDigest { get; init; }
/// <summary>
/// Engine version for reproducibility (per CLAUDE.md Rule 8.2.1).
/// </summary>
[Required]
public required EngineVersionRef EngineVersion { get; init; }
/// <summary>
/// Optional DSSE signature (keyId:base64Signature).
/// </summary>
public string? DsseSig { get; init; }
/// <summary>
/// Schema version (current: 1).
/// </summary>
public int SchemaVersion { get; init; } = 1;
}
public sealed record EngineVersionRef(
string EngineName,
string Version,
string SourceDigest);
```
---
## Field Specifications
### eventId
**Purpose:** Unique, deterministic identifier for each event.
**Computation:**
```csharp
public static string GenerateEventId(
string correlationId,
HlcTimestamp tHlc,
string service,
string kind)
{
using var hasher = IncrementalHash.CreateHash(HashAlgorithmName.SHA256);
hasher.AppendData(Encoding.UTF8.GetBytes(correlationId));
hasher.AppendData(Encoding.UTF8.GetBytes(tHlc.ToSortableString()));
hasher.AppendData(Encoding.UTF8.GetBytes(service));
hasher.AppendData(Encoding.UTF8.GetBytes(kind));
var hash = hasher.GetHashAndReset();
return Convert.ToHexString(hash.AsSpan(0, 16)).ToLowerInvariant();
}
```
**Rationale:** Unlike ULID or UUID, this deterministic approach ensures that:
- The same event produces the same ID across replays
- Duplicate events can be detected and deduplicated
- Event ordering is verifiable
### tHlc
**Purpose:** Primary ordering timestamp using Hybrid Logical Clock.
**Format:** `<physicalTimeMs>:<logicalCounter>:<nodeId>`
**Example:** `1704585600000:42:scheduler-node-1`
**Ordering:** Lexicographic comparison produces correct temporal order:
1. Compare physical time (milliseconds since Unix epoch)
2. If equal, compare logical counter
3. If equal, compare node ID (for uniqueness)
**Implementation:** Uses existing `StellaOps.HybridLogicalClock.HlcTimestamp` type.
### tsWall
**Purpose:** Human-readable wall-clock timestamp for debugging.
**Format:** ISO 8601 with UTC timezone (e.g., `2026-01-07T12:00:00.000Z`)
**Important:** This field is **informational only**. Never use for ordering or comparison. The `tHlc` field is the authoritative timestamp.
### service
**Purpose:** Identifies the StellaOps service that emitted the event.
**Allowed Values:**
| Value | Description |
|-------|-------------|
| `Scheduler` | Job scheduling and queue management |
| `AirGap` | Offline/air-gap sync operations |
| `Attestor` | DSSE attestation and verification |
| `Policy` | Policy engine evaluation |
| `VexLens` | VEX consensus computation |
| `Scanner` | Container scanning |
| `Concelier` | Advisory ingestion |
| `Platform` | Console backend aggregation |
### traceParent
**Purpose:** W3C Trace Context correlation for OpenTelemetry integration.
**Format:** `00-{trace-id}-{span-id}-{trace-flags}`
**Example:** `00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01`
**Population:** Automatically captured from `Activity.Current?.Id` during event emission.
### correlationId
**Purpose:** Links related events across services.
**Common Patterns:**
| Pattern | Example | Usage |
|---------|---------|-------|
| Scan ID | `scan-abc123` | Container scan lifecycle |
| Job ID | `job-xyz789` | Scheduled job lifecycle |
| Artifact Digest | `sha256:abc...` | Artifact processing |
| Bundle ID | `bundle-def456` | Air-gap bundle operations |
### kind
**Purpose:** Categorizes the event type.
**Event Kinds by Service:**
| Service | Kinds |
|---------|-------|
| Scheduler | `ENQUEUE`, `DEQUEUE`, `EXECUTE`, `COMPLETE`, `FAIL` |
| AirGap | `IMPORT`, `EXPORT`, `MERGE`, `CONFLICT` |
| Attestor | `ATTEST`, `VERIFY` |
| Policy | `EVALUATE`, `GATE_PASS`, `GATE_FAIL` |
| VexLens | `CONSENSUS`, `OVERRIDE` |
| Scanner | `SCAN_START`, `SCAN_COMPLETE` |
| Generic | `EMIT`, `ACK`, `ERR` |
### payload
**Purpose:** Domain-specific event data.
**Requirements:**
1. **RFC 8785 Canonicalization:** Must use `CanonJson.Serialize()` from `StellaOps.Canonical.Json`
2. **No Non-Deterministic Fields:** No random IDs, current timestamps, or environment-specific data
3. **Bounded Size:** Payload should be < 1MB; use references for large data
**Example:**
```json
{
"artifactDigest": "sha256:abc123...",
"jobId": "job-xyz789",
"status": "completed",
"findingsCount": 42
}
```
### payloadDigest
**Purpose:** Integrity verification of payload.
**Computation:**
```csharp
var digest = SHA256.HashData(Encoding.UTF8.GetBytes(payload));
```
**Format:** 64-character lowercase hex string.
### engineVersion
**Purpose:** Records the engine/resolver version for reproducibility verification (per CLAUDE.md Rule 8.2.1).
**Fields:**
| Field | Description | Example |
|-------|-------------|---------|
| `engineName` | Service/engine name | `"Scheduler"` |
| `version` | Semantic version | `"2.5.0"` |
| `sourceDigest` | Build artifact hash | `"sha256:abc..."` |
**Population:** Use `EngineVersionRef.FromAssembly(Assembly.GetExecutingAssembly())`.
### dsseSig
**Purpose:** Optional cryptographic signature for audit compliance.
**Format:** `{keyId}:{base64Signature}`
**Example:** `signing-key-001:MEUCIQD...`
**Integration:** Uses existing `StellaOps.Attestation.DsseHelper` for signature generation.
### schemaVersion
**Purpose:** Enables schema evolution without breaking compatibility.
**Current Value:** `1`
**Migration Strategy:** When schema changes:
1. Increment version number
2. Add migration logic for older versions
3. Document breaking changes
---
## Database Schema
```sql
CREATE SCHEMA IF NOT EXISTS timeline;
CREATE TABLE timeline.events (
event_id TEXT PRIMARY KEY,
t_hlc TEXT NOT NULL,
ts_wall TIMESTAMPTZ NOT NULL,
service TEXT NOT NULL,
trace_parent TEXT,
correlation_id TEXT NOT NULL,
kind TEXT NOT NULL,
payload JSONB NOT NULL,
payload_digest BYTEA NOT NULL,
engine_name TEXT NOT NULL,
engine_version TEXT NOT NULL,
engine_digest TEXT NOT NULL,
dsse_sig TEXT,
schema_version INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Primary query: events by correlation, HLC ordered
CREATE INDEX idx_events_corr_hlc ON timeline.events (correlation_id, t_hlc);
-- Service-specific queries
CREATE INDEX idx_events_svc_hlc ON timeline.events (service, t_hlc);
-- Payload search (JSONB GIN index)
CREATE INDEX idx_events_payload ON timeline.events USING GIN (payload);
-- Kind filtering
CREATE INDEX idx_events_kind ON timeline.events (kind);
```
---
## Usage Examples
### Emitting an Event
```csharp
public class SchedulerService
{
private readonly ITimelineEventEmitter _emitter;
public async Task EnqueueJobAsync(Job job, CancellationToken ct)
{
// Business logic...
await _queue.EnqueueAsync(job, ct);
// Emit timeline event
await _emitter.EmitAsync(
correlationId: job.Id.ToString(),
kind: "ENQUEUE",
payload: new { jobId = job.Id, priority = job.Priority },
ct);
}
}
```
### Querying Timeline
```csharp
public async Task<IReadOnlyList<TimelineEvent>> GetJobTimelineAsync(
string jobId,
CancellationToken ct)
{
return await _timelineService.GetEventsAsync(
correlationId: jobId,
options: new TimelineQueryOptions
{
Services = ["Scheduler", "Attestor"],
Kinds = ["ENQUEUE", "EXECUTE", "COMPLETE", "ATTEST"]
},
ct);
}
```
---
## Compatibility Notes
### Relation to Existing HLC Infrastructure
This schema builds on the existing `StellaOps.HybridLogicalClock` library:
- Uses `HlcTimestamp` type directly
- Integrates with `IHybridLogicalClock.Tick()` for timestamp generation
- Compatible with air-gap merge algorithms
### Relation to Existing Replay Infrastructure
This schema integrates with `StellaOps.Replay.Core`:
- `KnowledgeSnapshot` can include timeline event references
- Replay uses `FakeTimeProvider` with HLC timestamps
- Verification compares payload digests
---
## References
- [SPRINT_20260107_003_000_INDEX](../../implplan/SPRINT_20260107_003_000_INDEX_unified_event_timeline.md) - Parent sprint index
- [SPRINT_20260105_002_000_INDEX](../../implplan/SPRINT_20260105_002_000_INDEX_hlc_audit_safe_ordering.md) - HLC foundation
- [RFC 8785](https://datatracker.ietf.org/doc/html/rfc8785) - JSON Canonicalization Scheme
- [W3C Trace Context](https://www.w3.org/TR/trace-context/) - Distributed tracing
- CLAUDE.md Section 8.2.1 - Engine version tracking
- CLAUDE.md Section 8.7 - RFC 8785 canonicalization