Add tests and implement timeline ingestion options with NATS and Redis subscribers
- Introduced `BinaryReachabilityLifterTests` to validate binary lifting functionality. - Created `PackRunWorkerOptions` for configuring worker paths and execution persistence. - Added `TimelineIngestionOptions` for configuring NATS and Redis ingestion transports. - Implemented `NatsTimelineEventSubscriber` for subscribing to NATS events. - Developed `RedisTimelineEventSubscriber` for reading from Redis Streams. - Added `TimelineEnvelopeParser` to normalize incoming event envelopes. - Created unit tests for `TimelineEnvelopeParser` to ensure correct field mapping. - Implemented `TimelineAuthorizationAuditSink` for logging authorization outcomes.
This commit is contained in:
@@ -17,4 +17,12 @@ public sealed class TimelineEventView
|
||||
public string? Actor { get; init; }
|
||||
public string Severity { get; init; } = "info";
|
||||
public string? PayloadHash { get; init; }
|
||||
public IDictionary<string, string>? Attributes { get; init; }
|
||||
public string? RawPayloadJson { get; init; }
|
||||
public string? NormalizedPayloadJson { get; init; }
|
||||
public Guid? BundleId { get; init; }
|
||||
public string? BundleDigest { get; init; }
|
||||
public string? AttestationSubject { get; init; }
|
||||
public string? AttestationDigest { get; init; }
|
||||
public string? ManifestUri { get; init; }
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ public sealed record TimelineQueryOptions
|
||||
public string? EventType { get; init; }
|
||||
public string? CorrelationId { get; init; }
|
||||
public string? TraceId { get; init; }
|
||||
public string? Source { get; init; }
|
||||
public string? Severity { get; init; }
|
||||
public DateTimeOffset? Since { get; init; }
|
||||
public long? AfterEventSeq { get; init; }
|
||||
|
||||
@@ -35,6 +35,8 @@ public sealed class TimelineIngestionService(ITimelineEventStore store) : ITimel
|
||||
throw new ArgumentException("event_type is required", nameof(envelope));
|
||||
if (string.IsNullOrWhiteSpace(envelope.Source))
|
||||
throw new ArgumentException("source is required", nameof(envelope));
|
||||
if (string.IsNullOrWhiteSpace(envelope.RawPayloadJson))
|
||||
throw new ArgumentException("raw payload is required", nameof(envelope));
|
||||
}
|
||||
|
||||
internal static string ComputePayloadHash(string payloadJson)
|
||||
|
||||
@@ -45,7 +45,7 @@ CREATE TABLE IF NOT EXISTS timeline.timeline_events
|
||||
trace_id text,
|
||||
actor text,
|
||||
severity timeline.event_severity NOT NULL DEFAULT 'info',
|
||||
payload_hash text CHECK (payload_hash IS NULL OR payload_hash ~ '^[0-9a-f]{64}$'),
|
||||
payload_hash text CHECK (payload_hash IS NULL OR payload_hash ~ '^sha256:[0-9a-f]{64}$'),
|
||||
attributes jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
UNIQUE (tenant_id, event_id)
|
||||
);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Npgsql;
|
||||
using System.Text.Json;
|
||||
using StellaOps.Infrastructure.Postgres.Repositories;
|
||||
using StellaOps.TimelineIndexer.Core.Abstractions;
|
||||
using StellaOps.TimelineIndexer.Core.Models;
|
||||
@@ -22,6 +23,7 @@ public sealed class TimelineQueryStore(TimelineIndexerDataSource dataSource, ILo
|
||||
var sql = new System.Text.StringBuilder(BaseSelect);
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(options.EventType)) sql.Append(" AND event_type = @event_type");
|
||||
if (!string.IsNullOrWhiteSpace(options.Source)) sql.Append(" AND source = @source");
|
||||
if (!string.IsNullOrWhiteSpace(options.CorrelationId)) sql.Append(" AND correlation_id = @correlation_id");
|
||||
if (!string.IsNullOrWhiteSpace(options.TraceId)) sql.Append(" AND trace_id = @trace_id");
|
||||
if (!string.IsNullOrWhiteSpace(options.Severity)) sql.Append(" AND severity = @severity");
|
||||
@@ -37,6 +39,7 @@ public sealed class TimelineQueryStore(TimelineIndexerDataSource dataSource, ILo
|
||||
{
|
||||
AddParameter(cmd, "tenant_id", tenantId);
|
||||
AddParameter(cmd, "event_type", options.EventType);
|
||||
AddParameter(cmd, "source", options.Source);
|
||||
AddParameter(cmd, "correlation_id", options.CorrelationId);
|
||||
AddParameter(cmd, "trace_id", options.TraceId);
|
||||
AddParameter(cmd, "severity", options.Severity);
|
||||
@@ -51,9 +54,13 @@ public sealed class TimelineQueryStore(TimelineIndexerDataSource dataSource, ILo
|
||||
public async Task<TimelineEventView?> GetAsync(string tenantId, string eventId, CancellationToken cancellationToken)
|
||||
{
|
||||
const string sql = """
|
||||
SELECT event_seq, event_id, tenant_id, event_type, source, occurred_at, received_at, correlation_id, trace_id, actor, severity, payload_hash
|
||||
FROM timeline.timeline_events
|
||||
WHERE tenant_id = @tenant_id AND event_id = @event_id
|
||||
SELECT e.event_seq, e.event_id, e.tenant_id, e.event_type, e.source, e.occurred_at, e.received_at, e.correlation_id, e.trace_id, e.actor, e.severity, e.payload_hash,
|
||||
e.attributes, d.raw_payload, d.normalized_payload,
|
||||
dig.bundle_id, dig.bundle_digest, dig.attestation_subject, dig.attestation_digest, dig.manifest_uri
|
||||
FROM timeline.timeline_events e
|
||||
LEFT JOIN timeline.timeline_event_details d ON d.event_id = e.event_id AND d.tenant_id = e.tenant_id
|
||||
LEFT JOIN timeline.timeline_event_digests dig ON dig.event_id = e.event_id AND dig.tenant_id = e.tenant_id
|
||||
WHERE e.tenant_id = @tenant_id AND e.event_id = @event_id
|
||||
""";
|
||||
|
||||
return await QuerySingleOrDefaultAsync(
|
||||
@@ -64,7 +71,7 @@ public sealed class TimelineQueryStore(TimelineIndexerDataSource dataSource, ILo
|
||||
AddParameter(cmd, "tenant_id", tenantId);
|
||||
AddParameter(cmd, "event_id", eventId);
|
||||
},
|
||||
MapEvent,
|
||||
MapEventDetail,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
@@ -83,4 +90,48 @@ public sealed class TimelineQueryStore(TimelineIndexerDataSource dataSource, ILo
|
||||
Severity = reader.GetString(10),
|
||||
PayloadHash = GetNullableString(reader, 11)
|
||||
};
|
||||
|
||||
private static TimelineEventView MapEventDetail(NpgsqlDataReader reader)
|
||||
{
|
||||
return new TimelineEventView
|
||||
{
|
||||
EventSeq = reader.GetInt64(0),
|
||||
EventId = reader.GetString(1),
|
||||
TenantId = reader.GetString(2),
|
||||
EventType = reader.GetString(3),
|
||||
Source = reader.GetString(4),
|
||||
OccurredAt = reader.GetFieldValue<DateTimeOffset>(5),
|
||||
ReceivedAt = reader.GetFieldValue<DateTimeOffset>(6),
|
||||
CorrelationId = GetNullableString(reader, 7),
|
||||
TraceId = GetNullableString(reader, 8),
|
||||
Actor = GetNullableString(reader, 9),
|
||||
Severity = reader.GetString(10),
|
||||
PayloadHash = GetNullableString(reader, 11),
|
||||
Attributes = DeserializeAttributes(reader, 12),
|
||||
RawPayloadJson = GetNullableString(reader, 13),
|
||||
NormalizedPayloadJson = GetNullableString(reader, 14),
|
||||
BundleId = GetNullableGuid(reader, 15),
|
||||
BundleDigest = GetNullableString(reader, 16),
|
||||
AttestationSubject = GetNullableString(reader, 17),
|
||||
AttestationDigest = GetNullableString(reader, 18),
|
||||
ManifestUri = GetNullableString(reader, 19)
|
||||
};
|
||||
}
|
||||
|
||||
private static IDictionary<string, string>? DeserializeAttributes(NpgsqlDataReader reader, int ordinal)
|
||||
{
|
||||
if (reader.IsDBNull(ordinal)) return null;
|
||||
|
||||
var raw = reader.GetString(ordinal);
|
||||
if (string.IsNullOrWhiteSpace(raw)) return null;
|
||||
|
||||
try
|
||||
{
|
||||
return JsonSerializer.Deserialize<Dictionary<string, string>>(raw);
|
||||
}
|
||||
catch
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,83 @@
|
||||
using System;
|
||||
|
||||
namespace StellaOps.TimelineIndexer.Infrastructure.Options;
|
||||
|
||||
/// <summary>
|
||||
/// Configuration for timeline ingestion transports (NATS, Redis).
|
||||
/// </summary>
|
||||
public sealed class TimelineIngestionOptions
|
||||
{
|
||||
public NatsIngestionOptions Nats { get; init; } = new();
|
||||
public RedisIngestionOptions Redis { get; init; } = new();
|
||||
}
|
||||
|
||||
public sealed class NatsIngestionOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// Enables NATS subscription when true.
|
||||
/// </summary>
|
||||
public bool Enabled { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// NATS server URL (e.g., nats://localhost:4222).
|
||||
/// </summary>
|
||||
public string Url { get; init; } = "nats://localhost:4222";
|
||||
|
||||
/// <summary>
|
||||
/// Subject to subscribe to for orchestrator events.
|
||||
/// </summary>
|
||||
public string Subject { get; init; } = "orch.event";
|
||||
|
||||
/// <summary>
|
||||
/// Queue group for shared subscriptions to preserve ordering per subject.
|
||||
/// </summary>
|
||||
public string QueueGroup { get; init; } = "timeline-indexer";
|
||||
|
||||
/// <summary>
|
||||
/// Maximum in-flight messages per subscriber.
|
||||
/// </summary>
|
||||
public int Prefetch { get; init; } = 64;
|
||||
}
|
||||
|
||||
public sealed class RedisIngestionOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// Enables Redis Stream subscription when true.
|
||||
/// </summary>
|
||||
public bool Enabled { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Redis connection string (e.g., localhost:6379 or rediss://...).
|
||||
/// </summary>
|
||||
public string ConnectionString { get; init; } = "localhost:6379";
|
||||
|
||||
/// <summary>
|
||||
/// Stream name carrying timeline events.
|
||||
/// </summary>
|
||||
public string Stream { get; init; } = "timeline.events";
|
||||
|
||||
/// <summary>
|
||||
/// Consumer group used for ordered consumption.
|
||||
/// </summary>
|
||||
public string ConsumerGroup { get; init; } = "timeline-indexer";
|
||||
|
||||
/// <summary>
|
||||
/// Consumer name used when reading from the group.
|
||||
/// </summary>
|
||||
public string ConsumerName { get; init; } = Environment.MachineName ?? "timeline-indexer";
|
||||
|
||||
/// <summary>
|
||||
/// Field that contains the JSON payload within the stream entry.
|
||||
/// </summary>
|
||||
public string ValueField { get; init; } = "data";
|
||||
|
||||
/// <summary>
|
||||
/// Maximum entries fetched per polling iteration.
|
||||
/// </summary>
|
||||
public int MaxBatchSize { get; init; } = 128;
|
||||
|
||||
/// <summary>
|
||||
/// Polling interval in milliseconds when no entries are available.
|
||||
/// </summary>
|
||||
public int PollIntervalMilliseconds { get; init; } = 250;
|
||||
}
|
||||
@@ -32,7 +32,9 @@
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0-rc.2.25502.107" />
|
||||
<PackageReference Include="Microsoft.Extensions.Options" Version="10.0.0-rc.2.25502.107" />
|
||||
<PackageReference Include="NATS.Client.Core" Version="2.0.0" />
|
||||
<PackageReference Include="Npgsql" Version="9.0.2" />
|
||||
<PackageReference Include="StackExchange.Redis" Version="2.7.10" />
|
||||
</ItemGroup>
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Text;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using NATS.Client.Core;
|
||||
using StellaOps.TimelineIndexer.Core.Abstractions;
|
||||
using StellaOps.TimelineIndexer.Core.Models;
|
||||
using StellaOps.TimelineIndexer.Infrastructure.Options;
|
||||
|
||||
namespace StellaOps.TimelineIndexer.Infrastructure.Subscriptions;
|
||||
|
||||
/// <summary>
|
||||
/// NATS-based subscriber that yields orchestrator envelopes for ingestion.
|
||||
/// </summary>
|
||||
public sealed class NatsTimelineEventSubscriber : ITimelineEventSubscriber
|
||||
{
|
||||
private readonly IOptions<TimelineIngestionOptions> _options;
|
||||
private readonly TimelineEnvelopeParser _parser;
|
||||
private readonly ILogger<NatsTimelineEventSubscriber> _logger;
|
||||
|
||||
public NatsTimelineEventSubscriber(
|
||||
IOptions<TimelineIngestionOptions> options,
|
||||
TimelineEnvelopeParser parser,
|
||||
ILogger<NatsTimelineEventSubscriber> logger)
|
||||
{
|
||||
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||
_parser = parser ?? throw new ArgumentNullException(nameof(parser));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<TimelineEventEnvelope> SubscribeAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
|
||||
{
|
||||
var cfg = _options.Value.Nats;
|
||||
if (!cfg.Enabled)
|
||||
{
|
||||
yield break;
|
||||
}
|
||||
|
||||
await using var connection = new NatsConnection(new NatsOpts
|
||||
{
|
||||
Url = cfg.Url,
|
||||
Name = "timeline-indexer"
|
||||
});
|
||||
|
||||
await foreach (var msg in connection.SubscribeAsync<byte[]>(
|
||||
cfg.Subject,
|
||||
queueGroup: cfg.QueueGroup,
|
||||
cancellationToken: cancellationToken))
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var json = msg.Data is { Length: > 0 }
|
||||
? Encoding.UTF8.GetString(msg.Data)
|
||||
: string.Empty;
|
||||
|
||||
if (_parser.TryParse(json, out var envelope, out var reason))
|
||||
{
|
||||
yield return envelope;
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogWarning("Dropped NATS event on {Subject}: {Reason}", cfg.Subject, reason);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,126 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using StackExchange.Redis;
|
||||
using StellaOps.TimelineIndexer.Core.Abstractions;
|
||||
using StellaOps.TimelineIndexer.Core.Models;
|
||||
using StellaOps.TimelineIndexer.Infrastructure.Options;
|
||||
|
||||
namespace StellaOps.TimelineIndexer.Infrastructure.Subscriptions;
|
||||
|
||||
/// <summary>
|
||||
/// Redis Stream subscriber that reads orchestrator events and yields timeline envelopes.
|
||||
/// </summary>
|
||||
public sealed class RedisTimelineEventSubscriber : ITimelineEventSubscriber, IAsyncDisposable
|
||||
{
|
||||
private readonly IOptions<TimelineIngestionOptions> _options;
|
||||
private readonly TimelineEnvelopeParser _parser;
|
||||
private readonly ILogger<RedisTimelineEventSubscriber> _logger;
|
||||
private ConnectionMultiplexer? _connection;
|
||||
|
||||
public RedisTimelineEventSubscriber(
|
||||
IOptions<TimelineIngestionOptions> options,
|
||||
TimelineEnvelopeParser parser,
|
||||
ILogger<RedisTimelineEventSubscriber> logger)
|
||||
{
|
||||
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||
_parser = parser ?? throw new ArgumentNullException(nameof(parser));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<TimelineEventEnvelope> SubscribeAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
|
||||
{
|
||||
var cfg = _options.Value.Redis;
|
||||
if (!cfg.Enabled)
|
||||
{
|
||||
yield break;
|
||||
}
|
||||
|
||||
_connection = await ConnectionMultiplexer.ConnectAsync(cfg.ConnectionString);
|
||||
var db = _connection.GetDatabase();
|
||||
|
||||
await EnsureGroupAsync(db, cfg, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
StreamEntry[] entries;
|
||||
try
|
||||
{
|
||||
entries = await db.StreamReadGroupAsync(
|
||||
cfg.Stream,
|
||||
cfg.ConsumerGroup,
|
||||
cfg.ConsumerName,
|
||||
">",
|
||||
count: cfg.MaxBatchSize,
|
||||
flags: CommandFlags.DemandMaster).ConfigureAwait(false);
|
||||
}
|
||||
catch (RedisServerException ex) when (ex.Message.Contains("NOGROUP", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
await EnsureGroupAsync(db, cfg, cancellationToken).ConfigureAwait(false);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (entries.Length == 0)
|
||||
{
|
||||
await Task.Delay(cfg.PollIntervalMilliseconds, cancellationToken).ConfigureAwait(false);
|
||||
continue;
|
||||
}
|
||||
|
||||
foreach (var entry in entries)
|
||||
{
|
||||
if (!TryGetValue(entry, cfg.ValueField, out var json))
|
||||
{
|
||||
_logger.LogWarning("Redis entry {EntryId} missing expected field {Field}", entry.Id, cfg.ValueField);
|
||||
await db.StreamAcknowledgeAsync(cfg.Stream, cfg.ConsumerGroup, entry.Id).ConfigureAwait(false);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (_parser.TryParse(json!, out var envelope, out var reason))
|
||||
{
|
||||
yield return envelope;
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogWarning("Redis entry {EntryId} dropped: {Reason}", entry.Id, reason);
|
||||
}
|
||||
|
||||
await db.StreamAcknowledgeAsync(cfg.Stream, cfg.ConsumerGroup, entry.Id).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task EnsureGroupAsync(IDatabase db, RedisIngestionOptions cfg, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
await db.StreamCreateConsumerGroupAsync(cfg.Stream, cfg.ConsumerGroup, "$", true).ConfigureAwait(false);
|
||||
}
|
||||
catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
// Group already exists; nothing to do.
|
||||
}
|
||||
}
|
||||
|
||||
private static bool TryGetValue(in StreamEntry entry, string fieldName, out string? value)
|
||||
{
|
||||
foreach (var nv in entry.Values)
|
||||
{
|
||||
if (string.Equals(nv.Name, fieldName, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
value = nv.Value.HasValue ? nv.Value.ToString() : null;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
value = null;
|
||||
return false;
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_connection is not null)
|
||||
{
|
||||
await _connection.DisposeAsync().ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,209 @@
|
||||
using System.Text.Json;
|
||||
using StellaOps.TimelineIndexer.Core.Models;
|
||||
|
||||
namespace StellaOps.TimelineIndexer.Infrastructure.Subscriptions;
|
||||
|
||||
/// <summary>
|
||||
/// Normalises incoming orchestrator/notification envelopes into <see cref="TimelineEventEnvelope"/> instances.
|
||||
/// </summary>
|
||||
public sealed class TimelineEnvelopeParser
|
||||
{
|
||||
private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web)
|
||||
{
|
||||
WriteIndented = false,
|
||||
PropertyNameCaseInsensitive = true
|
||||
};
|
||||
|
||||
public bool TryParse(string rawJson, out TimelineEventEnvelope envelope, out string? failureReason)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(rawJson))
|
||||
{
|
||||
envelope = default!;
|
||||
failureReason = "Payload was empty";
|
||||
return false;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
using var doc = JsonDocument.Parse(rawJson);
|
||||
var root = doc.RootElement;
|
||||
|
||||
var eventId = FirstString(root, "eventId", "event_id", "id", "messageId");
|
||||
var tenantId = FirstString(root, "tenant", "tenantId", "tenant_id");
|
||||
var eventType = FirstString(root, "kind", "eventType", "event_type", "type");
|
||||
var source = FirstString(root, "source", "producer") ?? "unknown";
|
||||
var correlationId = FirstString(root, "correlationId", "correlation_id");
|
||||
var traceId = FirstString(root, "traceId", "trace_id");
|
||||
var actor = ExtractActor(root);
|
||||
var severity = (FirstString(root, "severity") ?? "info").ToLowerInvariant();
|
||||
var occurredAt = FirstDateTime(root, "occurredAt", "occurred_at", "timestamp", "ts") ?? DateTimeOffset.UtcNow;
|
||||
|
||||
var normalizedPayload = ExtractNormalizedPayload(root);
|
||||
var attributes = ExtractAttributes(root);
|
||||
|
||||
envelope = new TimelineEventEnvelope
|
||||
{
|
||||
EventId = eventId ?? throw new InvalidOperationException("event_id is required"),
|
||||
TenantId = tenantId ?? throw new InvalidOperationException("tenant_id is required"),
|
||||
EventType = eventType ?? throw new InvalidOperationException("event_type is required"),
|
||||
Source = source,
|
||||
OccurredAt = occurredAt,
|
||||
CorrelationId = correlationId,
|
||||
TraceId = traceId,
|
||||
Actor = actor,
|
||||
Severity = severity,
|
||||
RawPayloadJson = JsonSerializer.Serialize(root, SerializerOptions),
|
||||
NormalizedPayloadJson = normalizedPayload,
|
||||
Attributes = attributes,
|
||||
BundleDigest = FirstString(root, "bundleDigest"),
|
||||
BundleId = FirstGuid(root, "bundleId"),
|
||||
AttestationSubject = FirstString(root, "attestationSubject"),
|
||||
AttestationDigest = FirstString(root, "attestationDigest"),
|
||||
ManifestUri = FirstString(root, "manifestUri")
|
||||
};
|
||||
|
||||
failureReason = null;
|
||||
return true;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
envelope = default!;
|
||||
failureReason = ex.Message;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private static string? ExtractActor(JsonElement root)
|
||||
{
|
||||
if (TryGetProperty(root, "actor", out var actorElement))
|
||||
{
|
||||
if (actorElement.ValueKind == JsonValueKind.String)
|
||||
{
|
||||
return actorElement.GetString();
|
||||
}
|
||||
|
||||
if (actorElement.ValueKind == JsonValueKind.Object)
|
||||
{
|
||||
if (actorElement.TryGetProperty("subject", out var subject) && subject.ValueKind == JsonValueKind.String)
|
||||
{
|
||||
return subject.GetString();
|
||||
}
|
||||
|
||||
if (actorElement.TryGetProperty("user", out var user) && user.ValueKind == JsonValueKind.String)
|
||||
{
|
||||
return user.GetString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private static string? ExtractNormalizedPayload(JsonElement root)
|
||||
{
|
||||
if (TryGetProperty(root, "payload", out var payload))
|
||||
{
|
||||
return JsonSerializer.Serialize(payload, SerializerOptions);
|
||||
}
|
||||
|
||||
if (TryGetProperty(root, "data", out var data) && data.ValueKind is JsonValueKind.Object)
|
||||
{
|
||||
return JsonSerializer.Serialize(data, SerializerOptions);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private static IDictionary<string, string>? ExtractAttributes(JsonElement root)
|
||||
{
|
||||
if (!TryGetProperty(root, "attributes", out var attributes) || attributes.ValueKind != JsonValueKind.Object)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var dict = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
|
||||
foreach (var property in attributes.EnumerateObject())
|
||||
{
|
||||
var value = property.Value.ValueKind switch
|
||||
{
|
||||
JsonValueKind.String => property.Value.GetString(),
|
||||
JsonValueKind.Number => property.Value.ToString(),
|
||||
JsonValueKind.True => "true",
|
||||
JsonValueKind.False => "false",
|
||||
_ => null
|
||||
};
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(value))
|
||||
{
|
||||
dict[property.Name] = value!;
|
||||
}
|
||||
}
|
||||
|
||||
return dict.Count == 0 ? null : dict;
|
||||
}
|
||||
|
||||
private static bool TryGetProperty(JsonElement root, string name, out JsonElement value)
|
||||
{
|
||||
if (root.TryGetProperty(name, out value))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
foreach (var property in root.EnumerateObject())
|
||||
{
|
||||
if (string.Equals(property.Name, name, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
value = property.Value;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
value = default;
|
||||
return false;
|
||||
}
|
||||
|
||||
private static string? FirstString(JsonElement root, params string[] names)
|
||||
{
|
||||
foreach (var name in names)
|
||||
{
|
||||
if (TryGetProperty(root, name, out var value) && value.ValueKind == JsonValueKind.String)
|
||||
{
|
||||
var str = value.GetString();
|
||||
if (!string.IsNullOrWhiteSpace(str))
|
||||
{
|
||||
return str;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private static Guid? FirstGuid(JsonElement root, params string[] names)
|
||||
{
|
||||
var text = FirstString(root, names);
|
||||
if (Guid.TryParse(text, out var guid))
|
||||
{
|
||||
return guid;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private static DateTimeOffset? FirstDateTime(JsonElement root, params string[] names)
|
||||
{
|
||||
foreach (var name in names)
|
||||
{
|
||||
if (TryGetProperty(root, name, out var value) && value.ValueKind == JsonValueKind.String)
|
||||
{
|
||||
var text = value.GetString();
|
||||
if (!string.IsNullOrWhiteSpace(text) && DateTimeOffset.TryParse(text, out var dto))
|
||||
{
|
||||
return dto;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
using StellaOps.TimelineIndexer.Infrastructure.Subscriptions;
|
||||
|
||||
namespace StellaOps.TimelineIndexer.Tests;
|
||||
|
||||
public class TimelineEnvelopeParserTests
|
||||
{
|
||||
[Fact]
|
||||
public void Parser_Maps_Required_Fields()
|
||||
{
|
||||
const string json = """
|
||||
{
|
||||
"eventId": "11111111-1111-1111-1111-111111111111",
|
||||
"tenant": "tenant-a",
|
||||
"kind": "scanner.event.report.ready",
|
||||
"occurredAt": "2025-12-01T12:00:00Z",
|
||||
"source": "scanner.webservice",
|
||||
"correlationId": "corr-1",
|
||||
"traceId": "trace-1",
|
||||
"attributes": {"key":"value"},
|
||||
"payload": {"reportId": "report-1"}
|
||||
}
|
||||
""";
|
||||
|
||||
var parser = new TimelineEnvelopeParser();
|
||||
|
||||
var parsed = parser.TryParse(json, out var envelope, out var reason);
|
||||
|
||||
Assert.True(parsed, reason);
|
||||
Assert.Equal("tenant-a", envelope.TenantId);
|
||||
Assert.Equal("scanner.event.report.ready", envelope.EventType);
|
||||
Assert.Equal("trace-1", envelope.TraceId);
|
||||
Assert.Equal("corr-1", envelope.CorrelationId);
|
||||
Assert.NotNull(envelope.Attributes);
|
||||
Assert.Equal("value", envelope.Attributes!["key"]);
|
||||
Assert.NotNull(envelope.RawPayloadJson);
|
||||
Assert.NotNull(envelope.NormalizedPayloadJson);
|
||||
}
|
||||
}
|
||||
@@ -1,10 +1,12 @@
|
||||
using Microsoft.AspNetCore.Authorization;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using StellaOps.Cryptography.Audit;
|
||||
using StellaOps.Auth.Abstractions;
|
||||
using StellaOps.Auth.ServerIntegration;
|
||||
using StellaOps.TimelineIndexer.Core.Abstractions;
|
||||
using StellaOps.TimelineIndexer.Core.Models;
|
||||
using StellaOps.TimelineIndexer.Infrastructure.DependencyInjection;
|
||||
using StellaOps.TimelineIndexer.WebService;
|
||||
|
||||
var builder = WebApplication.CreateBuilder(args);
|
||||
|
||||
@@ -13,6 +15,7 @@ builder.Configuration.AddJsonFile("appsettings.Development.json", optional: true
|
||||
builder.Configuration.AddEnvironmentVariables(prefix: "TIMELINE_");
|
||||
|
||||
builder.Services.AddTimelineIndexerPostgres(builder.Configuration);
|
||||
builder.Services.AddSingleton<IAuthEventSink, TimelineAuthorizationAuditSink>();
|
||||
|
||||
builder.Services.AddStellaOpsResourceServerAuthentication(
|
||||
builder.Configuration,
|
||||
@@ -48,6 +51,7 @@ app.MapGet("/timeline", async (
|
||||
HttpContext ctx,
|
||||
ITimelineQueryService service,
|
||||
[FromQuery] string? eventType,
|
||||
[FromQuery] string? source,
|
||||
[FromQuery] string? correlationId,
|
||||
[FromQuery] string? traceId,
|
||||
[FromQuery] string? severity,
|
||||
@@ -60,6 +64,7 @@ app.MapGet("/timeline", async (
|
||||
var options = new TimelineQueryOptions
|
||||
{
|
||||
EventType = eventType,
|
||||
Source = source,
|
||||
CorrelationId = correlationId,
|
||||
TraceId = traceId,
|
||||
Severity = severity,
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
using System.Linq;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Cryptography.Audit;
|
||||
|
||||
namespace StellaOps.TimelineIndexer.WebService;
|
||||
|
||||
/// <summary>
|
||||
/// Logs authorization outcomes for timeline read/write operations.
|
||||
/// </summary>
|
||||
public sealed class TimelineAuthorizationAuditSink(ILogger<TimelineAuthorizationAuditSink> logger) : IAuthEventSink
|
||||
{
|
||||
public ValueTask WriteAsync(AuthEventRecord record, CancellationToken cancellationToken)
|
||||
{
|
||||
logger.LogInformation(
|
||||
"Auth {Outcome} for {EventType} tenant={Tenant} scopes={Scopes} subject={Subject} correlation={Correlation}",
|
||||
record.Outcome,
|
||||
record.EventType,
|
||||
record.Tenant.Value ?? "<none>",
|
||||
record.Scopes.Any() ? string.Join(" ", record.Scopes) : "<none>",
|
||||
record.Subject?.SubjectId.Value ?? "<unknown>",
|
||||
record.CorrelationId ?? "<none>");
|
||||
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,27 @@
|
||||
{
|
||||
"Logging": {
|
||||
"LogLevel": {
|
||||
"Default": "Information",
|
||||
"Microsoft.AspNetCore": "Warning"
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
"Logging": {
|
||||
"LogLevel": {
|
||||
"Default": "Information",
|
||||
"Microsoft.AspNetCore": "Information"
|
||||
}
|
||||
},
|
||||
"Authority": {
|
||||
"ResourceServer": {
|
||||
"Authority": "https://authority.localtest.me",
|
||||
"Audiences": [
|
||||
"api://timeline-indexer"
|
||||
],
|
||||
"RequiredTenants": [
|
||||
"tenant-default"
|
||||
]
|
||||
}
|
||||
},
|
||||
"Postgres": {
|
||||
"Timeline": {
|
||||
"ConnectionString": "Host=localhost;Database=timeline;Username=timeline;Password=timeline;",
|
||||
"SchemaName": "timeline",
|
||||
"CommandTimeoutSeconds": 30
|
||||
}
|
||||
},
|
||||
"AllowedHosts": "*"
|
||||
}
|
||||
|
||||
@@ -1,26 +1,27 @@
|
||||
{
|
||||
Logging: {
|
||||
LogLevel: {
|
||||
Default: Information,
|
||||
Microsoft.AspNetCore: Warning
|
||||
"Logging": {
|
||||
"LogLevel": {
|
||||
"Default": "Information",
|
||||
"Microsoft.AspNetCore": "Warning"
|
||||
}
|
||||
},
|
||||
Authority: {
|
||||
ResourceServer: {
|
||||
Authority: https://authority.localtest.me,
|
||||
Audiences: [
|
||||
api://timeline-indexer
|
||||
"Authority": {
|
||||
"ResourceServer": {
|
||||
"Authority": "https://authority.localtest.me",
|
||||
"Audiences": [
|
||||
"api://timeline-indexer"
|
||||
],
|
||||
RequiredTenants: [
|
||||
tenant-default
|
||||
"RequiredTenants": [
|
||||
"tenant-default"
|
||||
]
|
||||
}
|
||||
},
|
||||
Postgres: {
|
||||
Timeline: {
|
||||
ConnectionString: Host=localhost;Database=timeline;Username=timeline;Password=timeline;
|
||||
SchemaName: timeline
|
||||
"Postgres": {
|
||||
"Timeline": {
|
||||
"ConnectionString": "Host=localhost;Database=timeline;Username=timeline;Password=timeline;",
|
||||
"SchemaName": "timeline",
|
||||
"CommandTimeoutSeconds": 30
|
||||
}
|
||||
},
|
||||
AllowedHosts: *
|
||||
"AllowedHosts": "*"
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using StellaOps.TimelineIndexer.Core.Abstractions;
|
||||
using StellaOps.TimelineIndexer.Infrastructure.DependencyInjection;
|
||||
using StellaOps.TimelineIndexer.Infrastructure.Options;
|
||||
using StellaOps.TimelineIndexer.Infrastructure.Subscriptions;
|
||||
using StellaOps.TimelineIndexer.Worker;
|
||||
|
||||
@@ -11,6 +12,12 @@ builder.Configuration.AddJsonFile("appsettings.Development.json", optional: true
|
||||
builder.Configuration.AddEnvironmentVariables(prefix: "TIMELINE_");
|
||||
|
||||
builder.Services.AddTimelineIndexerPostgres(builder.Configuration);
|
||||
builder.Services.AddOptions<TimelineIngestionOptions>()
|
||||
.Bind(builder.Configuration.GetSection("Ingestion"));
|
||||
|
||||
builder.Services.AddSingleton<TimelineEnvelopeParser>();
|
||||
builder.Services.AddSingleton<ITimelineEventSubscriber, NatsTimelineEventSubscriber>();
|
||||
builder.Services.AddSingleton<ITimelineEventSubscriber, RedisTimelineEventSubscriber>();
|
||||
builder.Services.AddSingleton<ITimelineEventSubscriber, NullTimelineEventSubscriber>();
|
||||
builder.Services.AddHostedService<TimelineIngestionWorker>();
|
||||
|
||||
|
||||
@@ -12,17 +12,20 @@ namespace StellaOps.TimelineIndexer.Worker;
|
||||
public sealed class TimelineIngestionWorker(
|
||||
IEnumerable<ITimelineEventSubscriber> subscribers,
|
||||
ITimelineIngestionService ingestionService,
|
||||
ILogger<TimelineIngestionWorker> logger) : BackgroundService
|
||||
ILogger<TimelineIngestionWorker> logger,
|
||||
TimeProvider? timeProvider = null) : BackgroundService
|
||||
{
|
||||
private static readonly Meter Meter = new("StellaOps.TimelineIndexer", "1.0.0");
|
||||
private static readonly Counter<long> IngestedCounter = Meter.CreateCounter<long>("timeline.ingested");
|
||||
private static readonly Counter<long> DuplicateCounter = Meter.CreateCounter<long>("timeline.duplicates");
|
||||
private static readonly Counter<long> FailedCounter = Meter.CreateCounter<long>("timeline.failed");
|
||||
private static readonly Histogram<double> LagHistogram = Meter.CreateHistogram<double>("timeline.ingest.lag.seconds");
|
||||
|
||||
private readonly IEnumerable<ITimelineEventSubscriber> _subscribers = subscribers;
|
||||
private readonly ITimelineIngestionService _ingestion = ingestionService;
|
||||
private readonly ILogger<TimelineIngestionWorker> _logger = logger;
|
||||
private readonly ConcurrentDictionary<(string tenant, string eventId), byte> _sessionSeen = new();
|
||||
private readonly TimeProvider _timeProvider = timeProvider ?? TimeProvider.System;
|
||||
|
||||
protected override Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
@@ -48,6 +51,7 @@ public sealed class TimelineIngestionWorker(
|
||||
if (result.Inserted)
|
||||
{
|
||||
IngestedCounter.Add(1);
|
||||
LagHistogram.Record((_timeProvider.GetUtcNow() - envelope.OccurredAt).TotalSeconds);
|
||||
_logger.LogInformation("Ingested timeline event {EventId} from {Source} (tenant {Tenant})", envelope.EventId, envelope.Source, envelope.TenantId);
|
||||
}
|
||||
else
|
||||
|
||||
@@ -1,8 +1,34 @@
|
||||
{
|
||||
"Logging": {
|
||||
"LogLevel": {
|
||||
"Default": "Information",
|
||||
"Microsoft.Hosting.Lifetime": "Information"
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
"Logging": {
|
||||
"LogLevel": {
|
||||
"Default": "Debug",
|
||||
"Microsoft.Hosting.Lifetime": "Information"
|
||||
}
|
||||
},
|
||||
"Postgres": {
|
||||
"Timeline": {
|
||||
"ConnectionString": "Host=localhost;Database=timeline;Username=timeline;Password=timeline;",
|
||||
"SchemaName": "timeline",
|
||||
"CommandTimeoutSeconds": 30
|
||||
}
|
||||
},
|
||||
"Ingestion": {
|
||||
"Nats": {
|
||||
"Enabled": false,
|
||||
"Url": "nats://localhost:4222",
|
||||
"Subject": "orch.event",
|
||||
"QueueGroup": "timeline-indexer",
|
||||
"Prefetch": 64
|
||||
},
|
||||
"Redis": {
|
||||
"Enabled": false,
|
||||
"ConnectionString": "localhost:6379",
|
||||
"Stream": "timeline.events",
|
||||
"ConsumerGroup": "timeline-indexer",
|
||||
"ConsumerName": "timeline-worker",
|
||||
"ValueField": "data",
|
||||
"MaxBatchSize": 128,
|
||||
"PollIntervalMilliseconds": 250
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,34 @@
|
||||
{
|
||||
"Logging": {
|
||||
"LogLevel": {
|
||||
"Default": "Information",
|
||||
"Microsoft.Hosting.Lifetime": "Information"
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
"Logging": {
|
||||
"LogLevel": {
|
||||
"Default": "Information",
|
||||
"Microsoft.Hosting.Lifetime": "Information"
|
||||
}
|
||||
},
|
||||
"Postgres": {
|
||||
"Timeline": {
|
||||
"ConnectionString": "Host=localhost;Database=timeline;Username=timeline;Password=timeline;",
|
||||
"SchemaName": "timeline",
|
||||
"CommandTimeoutSeconds": 30
|
||||
}
|
||||
},
|
||||
"Ingestion": {
|
||||
"Nats": {
|
||||
"Enabled": false,
|
||||
"Url": "nats://localhost:4222",
|
||||
"Subject": "orch.event",
|
||||
"QueueGroup": "timeline-indexer",
|
||||
"Prefetch": 64
|
||||
},
|
||||
"Redis": {
|
||||
"Enabled": false,
|
||||
"ConnectionString": "localhost:6379",
|
||||
"Stream": "timeline.events",
|
||||
"ConsumerGroup": "timeline-indexer",
|
||||
"ConsumerName": "timeline-worker",
|
||||
"ValueField": "data",
|
||||
"MaxBatchSize": 128,
|
||||
"PollIntervalMilliseconds": 250
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user