Files
git.stella-ops.org/docs/modules/eventing/event-envelope-schema.md

14 KiB

Event Envelope Schema

Version: 1.0.0 Status: Draft Sprint: SPRINT_20260107_003_001_LB

This document specifies the canonical event envelope schema for the StellaOps Unified Event Timeline.


Overview

The event envelope provides a standardized format for all events emitted across StellaOps services. It enables:

  • Unified Timeline: Cross-service correlation with HLC ordering
  • Deterministic Replay: Reproducible event streams for forensics
  • Audit Compliance: DSSE-signed event bundles for export
  • Causal Analysis: Stage latency measurement and bottleneck identification

Envelope Schema (v1)

JSON Schema

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "$id": "https://stellaops.org/schemas/timeline-event.v1.json",
  "title": "TimelineEvent",
  "description": "Canonical event envelope for StellaOps Unified Event Timeline",
  "type": "object",
  "required": [
    "eventId",
    "tHlc",
    "tsWall",
    "service",
    "correlationId",
    "kind",
    "payload",
    "payloadDigest",
    "engineVersion",
    "schemaVersion"
  ],
  "properties": {
    "eventId": {
      "type": "string",
      "description": "Deterministic event ID: SHA-256(correlationId || tHlc || service || kind)[0:32] hex",
      "pattern": "^[a-f0-9]{32}$"
    },
    "tHlc": {
      "type": "string",
      "description": "HLC timestamp in sortable string format: <physicalTimeMs>:<logicalCounter>:<nodeId>",
      "pattern": "^\\d+:\\d+:[a-zA-Z0-9_-]+$"
    },
    "tsWall": {
      "type": "string",
      "format": "date-time",
      "description": "Wall-clock time in ISO 8601 format (informational only)"
    },
    "service": {
      "type": "string",
      "description": "Service name that emitted the event",
      "enum": ["Scheduler", "AirGap", "Attestor", "Policy", "VexLens", "Scanner", "Concelier", "Platform"]
    },
    "traceParent": {
      "type": ["string", "null"],
      "description": "W3C Trace Context traceparent header",
      "pattern": "^[0-9a-f]{2}-[0-9a-f]{32}-[0-9a-f]{16}-[0-9a-f]{2}$"
    },
    "correlationId": {
      "type": "string",
      "description": "Correlation ID linking related events (e.g., scanId, jobId, artifactDigest)"
    },
    "kind": {
      "type": "string",
      "description": "Event kind/type",
      "enum": [
        "ENQUEUE", "DEQUEUE", "EXECUTE", "COMPLETE", "FAIL",
        "IMPORT", "EXPORT", "MERGE", "CONFLICT",
        "ATTEST", "VERIFY",
        "EVALUATE", "GATE_PASS", "GATE_FAIL",
        "CONSENSUS", "OVERRIDE",
        "SCAN_START", "SCAN_COMPLETE",
        "EMIT", "ACK", "ERR"
      ]
    },
    "payload": {
      "type": "string",
      "description": "RFC 8785 canonicalized JSON payload"
    },
    "payloadDigest": {
      "type": "string",
      "description": "SHA-256 digest of payload as hex string",
      "pattern": "^[a-f0-9]{64}$"
    },
    "engineVersion": {
      "type": "object",
      "description": "Engine/resolver version for reproducibility",
      "required": ["engineName", "version", "sourceDigest"],
      "properties": {
        "engineName": {
          "type": "string",
          "description": "Name of the engine/service"
        },
        "version": {
          "type": "string",
          "description": "Semantic version string"
        },
        "sourceDigest": {
          "type": "string",
          "description": "SHA-256 digest of engine source/binary"
        }
      }
    },
    "dsseSig": {
      "type": ["string", "null"],
      "description": "Optional DSSE signature in format keyId:base64Signature"
    },
    "schemaVersion": {
      "type": "integer",
      "description": "Schema version for envelope evolution",
      "const": 1
    }
  }
}

C# Record Definition

/// <summary>
/// Canonical event envelope for unified timeline.
/// </summary>
public sealed record TimelineEvent
{
    /// <summary>
    /// Deterministic event ID: SHA-256(correlationId || tHlc || service || kind)[0:32] hex.
    /// NOT a random ULID - ensures replay determinism.
    /// </summary>
    [Required]
    [RegularExpression("^[a-f0-9]{32}$")]
    public required string EventId { get; init; }

    /// <summary>
    /// HLC timestamp from StellaOps.HybridLogicalClock library.
    /// </summary>
    [Required]
    public required HlcTimestamp THlc { get; init; }

    /// <summary>
    /// Wall-clock time (informational only, not used for ordering).
    /// </summary>
    [Required]
    public required DateTimeOffset TsWall { get; init; }

    /// <summary>
    /// Service name that emitted the event.
    /// </summary>
    [Required]
    public required string Service { get; init; }

    /// <summary>
    /// W3C Trace Context traceparent for OpenTelemetry correlation.
    /// </summary>
    public string? TraceParent { get; init; }

    /// <summary>
    /// Correlation ID linking related events.
    /// </summary>
    [Required]
    public required string CorrelationId { get; init; }

    /// <summary>
    /// Event kind (ENQUEUE, EXECUTE, ATTEST, etc.).
    /// </summary>
    [Required]
    public required string Kind { get; init; }

    /// <summary>
    /// RFC 8785 canonicalized JSON payload.
    /// </summary>
    [Required]
    public required string Payload { get; init; }

    /// <summary>
    /// SHA-256 digest of Payload.
    /// </summary>
    [Required]
    public required byte[] PayloadDigest { get; init; }

    /// <summary>
    /// Engine version for reproducibility (per CLAUDE.md Rule 8.2.1).
    /// </summary>
    [Required]
    public required EngineVersionRef EngineVersion { get; init; }

    /// <summary>
    /// Optional DSSE signature (keyId:base64Signature).
    /// </summary>
    public string? DsseSig { get; init; }

    /// <summary>
    /// Schema version (current: 1).
    /// </summary>
    public int SchemaVersion { get; init; } = 1;
}

public sealed record EngineVersionRef(
    string EngineName,
    string Version,
    string SourceDigest);

Field Specifications

eventId

Purpose: Unique, deterministic identifier for each event.

Computation:

public static string GenerateEventId(
    string correlationId,
    HlcTimestamp tHlc,
    string service,
    string kind)
{
    using var hasher = IncrementalHash.CreateHash(HashAlgorithmName.SHA256);
    hasher.AppendData(Encoding.UTF8.GetBytes(correlationId));
    hasher.AppendData(Encoding.UTF8.GetBytes(tHlc.ToSortableString()));
    hasher.AppendData(Encoding.UTF8.GetBytes(service));
    hasher.AppendData(Encoding.UTF8.GetBytes(kind));
    var hash = hasher.GetHashAndReset();
    return Convert.ToHexString(hash.AsSpan(0, 16)).ToLowerInvariant();
}

Rationale: Unlike ULID or UUID, this deterministic approach ensures that:

  • The same event produces the same ID across replays
  • Duplicate events can be detected and deduplicated
  • Event ordering is verifiable

tHlc

Purpose: Primary ordering timestamp using Hybrid Logical Clock.

Format: <physicalTimeMs>:<logicalCounter>:<nodeId>

Example: 1704585600000:42:scheduler-node-1

Ordering: Lexicographic comparison produces correct temporal order:

  1. Compare physical time (milliseconds since Unix epoch)
  2. If equal, compare logical counter
  3. If equal, compare node ID (for uniqueness)

Implementation: Uses existing StellaOps.HybridLogicalClock.HlcTimestamp type.

tsWall

Purpose: Human-readable wall-clock timestamp for debugging.

Format: ISO 8601 with UTC timezone (e.g., 2026-01-07T12:00:00.000Z)

Important: This field is informational only. Never use for ordering or comparison. The tHlc field is the authoritative timestamp.

service

Purpose: Identifies the StellaOps service that emitted the event.

Allowed Values:

Value Description
Scheduler Job scheduling and queue management
AirGap Offline/air-gap sync operations
Attestor DSSE attestation and verification
Policy Policy engine evaluation
VexLens VEX consensus computation
Scanner Container scanning
Concelier Advisory ingestion
Platform Console backend aggregation

traceParent

Purpose: W3C Trace Context correlation for OpenTelemetry integration.

Format: 00-{trace-id}-{span-id}-{trace-flags}

Example: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01

Population: Automatically captured from Activity.Current?.Id during event emission.

correlationId

Purpose: Links related events across services.

Common Patterns:

Pattern Example Usage
Scan ID scan-abc123 Container scan lifecycle
Job ID job-xyz789 Scheduled job lifecycle
Artifact Digest sha256:abc... Artifact processing
Bundle ID bundle-def456 Air-gap bundle operations

kind

Purpose: Categorizes the event type.

Event Kinds by Service:

Service Kinds
Scheduler ENQUEUE, DEQUEUE, EXECUTE, COMPLETE, FAIL
AirGap IMPORT, EXPORT, MERGE, CONFLICT
Attestor ATTEST, VERIFY
Policy EVALUATE, GATE_PASS, GATE_FAIL
VexLens CONSENSUS, OVERRIDE
Scanner SCAN_START, SCAN_COMPLETE
Generic EMIT, ACK, ERR

payload

Purpose: Domain-specific event data.

Requirements:

  1. RFC 8785 Canonicalization: Must use CanonJson.Serialize() from StellaOps.Canonical.Json
  2. No Non-Deterministic Fields: No random IDs, current timestamps, or environment-specific data
  3. Bounded Size: Payload should be < 1MB; use references for large data

Example:

{
  "artifactDigest": "sha256:abc123...",
  "jobId": "job-xyz789",
  "status": "completed",
  "findingsCount": 42
}

payloadDigest

Purpose: Integrity verification of payload.

Computation:

var digest = SHA256.HashData(Encoding.UTF8.GetBytes(payload));

Format: 64-character lowercase hex string.

engineVersion

Purpose: Records the engine/resolver version for reproducibility verification (per CLAUDE.md Rule 8.2.1).

Fields:

Field Description Example
engineName Service/engine name "Scheduler"
version Semantic version "2.5.0"
sourceDigest Build artifact hash "sha256:abc..."

Population: Use EngineVersionRef.FromAssembly(Assembly.GetExecutingAssembly()).

dsseSig

Purpose: Optional cryptographic signature for audit compliance.

Format: {keyId}:{base64Signature}

Example: signing-key-001:MEUCIQD...

Integration: Uses existing StellaOps.Attestation.DsseHelper for signature generation.

schemaVersion

Purpose: Enables schema evolution without breaking compatibility.

Current Value: 1

Migration Strategy: When schema changes:

  1. Increment version number
  2. Add migration logic for older versions
  3. Document breaking changes

Database Schema

CREATE SCHEMA IF NOT EXISTS timeline;

CREATE TABLE timeline.events (
    event_id        TEXT PRIMARY KEY,
    t_hlc           TEXT NOT NULL,
    ts_wall         TIMESTAMPTZ NOT NULL,
    service         TEXT NOT NULL,
    trace_parent    TEXT,
    correlation_id  TEXT NOT NULL,
    kind            TEXT NOT NULL,
    payload         JSONB NOT NULL,
    payload_digest  BYTEA NOT NULL,
    engine_name     TEXT NOT NULL,
    engine_version  TEXT NOT NULL,
    engine_digest   TEXT NOT NULL,
    dsse_sig        TEXT,
    schema_version  INTEGER NOT NULL DEFAULT 1,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Primary query: events by correlation, HLC ordered
CREATE INDEX idx_events_corr_hlc ON timeline.events (correlation_id, t_hlc);

-- Service-specific queries
CREATE INDEX idx_events_svc_hlc ON timeline.events (service, t_hlc);

-- Payload search (JSONB GIN index)
CREATE INDEX idx_events_payload ON timeline.events USING GIN (payload);

-- Kind filtering
CREATE INDEX idx_events_kind ON timeline.events (kind);

Usage Examples

Emitting an Event

public class SchedulerService
{
    private readonly ITimelineEventEmitter _emitter;

    public async Task EnqueueJobAsync(Job job, CancellationToken ct)
    {
        // Business logic...
        await _queue.EnqueueAsync(job, ct);

        // Emit timeline event
        await _emitter.EmitAsync(
            correlationId: job.Id.ToString(),
            kind: "ENQUEUE",
            payload: new { jobId = job.Id, priority = job.Priority },
            ct);
    }
}

Querying Timeline

public async Task<IReadOnlyList<TimelineEvent>> GetJobTimelineAsync(
    string jobId,
    CancellationToken ct)
{
    return await _timelineService.GetEventsAsync(
        correlationId: jobId,
        options: new TimelineQueryOptions
        {
            Services = ["Scheduler", "Attestor"],
            Kinds = ["ENQUEUE", "EXECUTE", "COMPLETE", "ATTEST"]
        },
        ct);
}

Compatibility Notes

Relation to Existing HLC Infrastructure

This schema builds on the existing StellaOps.HybridLogicalClock library:

  • Uses HlcTimestamp type directly
  • Integrates with IHybridLogicalClock.Tick() for timestamp generation
  • Compatible with air-gap merge algorithms

Relation to Existing Replay Infrastructure

This schema integrates with StellaOps.Replay.Core:

  • KnowledgeSnapshot can include timeline event references
  • Replay uses FakeTimeProvider with HLC timestamps
  • Verification compares payload digests

References