155 lines
5.3 KiB
C#
155 lines
5.3 KiB
C#
// Copyright (c) StellaOps. Licensed under the BUSL-1.1.
|
|
|
|
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using StellaOps.Eventing.Internal;
|
|
using StellaOps.Eventing.Models;
|
|
using StellaOps.Eventing.Signing;
|
|
using StellaOps.Eventing.Storage;
|
|
using StellaOps.HybridLogicalClock;
|
|
using System.Diagnostics;
|
|
using System.Text.Json;
|
|
|
|
namespace StellaOps.Eventing;
|
|
|
|
/// <summary>
|
|
/// Implementation of <see cref="ITimelineEventEmitter"/> for emitting timeline events.
|
|
/// </summary>
|
|
public sealed class TimelineEventEmitter : ITimelineEventEmitter
|
|
{
|
|
private readonly IHybridLogicalClock _hlc;
|
|
private readonly TimeProvider _timeProvider;
|
|
private readonly ITimelineEventStore _eventStore;
|
|
private readonly IEventSigner? _eventSigner;
|
|
private readonly IOptions<EventingOptions> _options;
|
|
private readonly ILogger<TimelineEventEmitter> _logger;
|
|
private readonly EngineVersionRef _engineVersion;
|
|
|
|
/// <summary>
|
|
/// Initializes a new instance of the <see cref="TimelineEventEmitter"/> class.
|
|
/// </summary>
|
|
public TimelineEventEmitter(
|
|
IHybridLogicalClock hlc,
|
|
TimeProvider timeProvider,
|
|
ITimelineEventStore eventStore,
|
|
IOptions<EventingOptions> options,
|
|
ILogger<TimelineEventEmitter> logger,
|
|
IEventSigner? eventSigner = null)
|
|
{
|
|
_hlc = hlc ?? throw new ArgumentNullException(nameof(hlc));
|
|
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
|
_eventStore = eventStore ?? throw new ArgumentNullException(nameof(eventStore));
|
|
_options = options ?? throw new ArgumentNullException(nameof(options));
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
_eventSigner = eventSigner;
|
|
|
|
_engineVersion = options.Value.EngineVersion ?? EngineVersionRef.FromEntryAssembly();
|
|
}
|
|
|
|
/// <inheritdoc/>
|
|
public async Task<TimelineEvent> EmitAsync<TPayload>(
|
|
string correlationId,
|
|
string kind,
|
|
TPayload payload,
|
|
CancellationToken cancellationToken = default) where TPayload : notnull
|
|
{
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(correlationId);
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(kind);
|
|
ArgumentNullException.ThrowIfNull(payload);
|
|
|
|
var timelineEvent = CreateEvent(correlationId, kind, payload);
|
|
|
|
await _eventStore.AppendAsync(timelineEvent, cancellationToken).ConfigureAwait(false);
|
|
|
|
_logger.LogDebug(
|
|
"Emitted timeline event {EventId} for {CorrelationId} [{Kind}]",
|
|
timelineEvent.EventId,
|
|
correlationId,
|
|
kind);
|
|
|
|
return timelineEvent;
|
|
}
|
|
|
|
/// <inheritdoc/>
|
|
public async Task<IReadOnlyList<TimelineEvent>> EmitBatchAsync(
|
|
IEnumerable<PendingEvent> events,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(events);
|
|
|
|
var timelineEvents = events
|
|
.Select(e => CreateEvent(e.CorrelationId, e.Kind, e.Payload))
|
|
.ToList();
|
|
|
|
if (timelineEvents.Count == 0)
|
|
{
|
|
return Array.Empty<TimelineEvent>();
|
|
}
|
|
|
|
await _eventStore.AppendBatchAsync(timelineEvents, cancellationToken).ConfigureAwait(false);
|
|
|
|
_logger.LogDebug("Emitted batch of {Count} timeline events", timelineEvents.Count);
|
|
|
|
return timelineEvents;
|
|
}
|
|
|
|
private TimelineEvent CreateEvent<TPayload>(
|
|
string correlationId,
|
|
string kind,
|
|
TPayload payload) where TPayload : notnull
|
|
{
|
|
var tHlc = _hlc.Tick();
|
|
var tsWall = _timeProvider.GetUtcNow();
|
|
var service = _options.Value.ServiceName;
|
|
|
|
// Canonicalize payload using RFC 8785
|
|
var canonicalPayload = CanonicalizePayload(payload);
|
|
var payloadDigest = EventIdGenerator.ComputePayloadDigest(canonicalPayload);
|
|
|
|
// Generate deterministic event ID
|
|
var eventId = EventIdGenerator.Generate(correlationId, tHlc, service, kind);
|
|
|
|
// Capture trace context if available
|
|
var traceParent = Activity.Current?.Id;
|
|
|
|
var timelineEvent = new TimelineEvent
|
|
{
|
|
EventId = eventId,
|
|
THlc = tHlc,
|
|
TsWall = tsWall,
|
|
Service = service,
|
|
TraceParent = traceParent,
|
|
CorrelationId = correlationId,
|
|
Kind = kind,
|
|
Payload = canonicalPayload,
|
|
PayloadDigest = payloadDigest,
|
|
EngineVersion = _engineVersion,
|
|
SchemaVersion = 1
|
|
};
|
|
|
|
// Sign if signer is available and signing is enabled
|
|
if (_eventSigner is not null && _options.Value.SignEvents)
|
|
{
|
|
var signature = _eventSigner.Sign(timelineEvent);
|
|
return timelineEvent with { DsseSig = signature };
|
|
}
|
|
|
|
return timelineEvent;
|
|
}
|
|
|
|
private static string CanonicalizePayload<TPayload>(TPayload payload)
|
|
{
|
|
// Use RFC 8785 canonicalization
|
|
// For now, use standard JSON serialization with sorted keys
|
|
// In production, this should use StellaOps.Canonical.Json
|
|
var options = new JsonSerializerOptions
|
|
{
|
|
PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower,
|
|
WriteIndented = false
|
|
};
|
|
|
|
return JsonSerializer.Serialize(payload, options);
|
|
}
|
|
}
|