feat(jobengine): dual-write audit entries to Timeline unified sink

Sprint SPRINT_20260408_005 DEPRECATE-001 (JobEngine/ReleaseOrchestrator,
fifth service).

PostgresAuditRepository.AppendAsync now fans out to Timeline via the
optional IAuditEventEmitter after the local transaction commits. The
hash chain (content_hash, previous_entry_hash, sequence_number) stays
in the local audit_entries table as service-level chain-of-custody
evidence; Timeline receives only the summary event for cross-service
correlation, with the content hash surfaced as a detail field.

Same pattern as Authority/Policy/Notify/Scheduler dual-write:
fire-and-forget, optional DI, local write stays authoritative.

Remaining: Attestor dual-write (existing audit is already decorated
with .Audited() on endpoints — verifying the attestor audit log insert
path needs separate review).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
master
2026-04-19 22:42:39 +03:00
parent 7c69058e19
commit 2f32c7f0c2
2 changed files with 68 additions and 1 deletions

View File

@@ -1,5 +1,6 @@
using Microsoft.Extensions.Logging;
using Npgsql;
using StellaOps.Audit.Emission;
using StellaOps.ReleaseOrchestrator.Persistence.Domain;
using StellaOps.ReleaseOrchestrator.Persistence.Hashing;
using StellaOps.ReleaseOrchestrator.Persistence.Repositories;
@@ -98,17 +99,20 @@ public sealed class PostgresAuditRepository : IAuditRepository
private readonly CanonicalJsonHasher _hasher;
private readonly ILogger<PostgresAuditRepository> _logger;
private readonly TimeProvider _timeProvider;
private readonly IAuditEventEmitter? _timelineEmitter;
public PostgresAuditRepository(
ReleaseOrchestratorDataSource dataSource,
CanonicalJsonHasher hasher,
ILogger<PostgresAuditRepository> logger,
TimeProvider? timeProvider = null)
TimeProvider? timeProvider = null,
IAuditEventEmitter? timelineEmitter = null)
{
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
_hasher = hasher ?? throw new ArgumentNullException(nameof(hasher));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_timeProvider = timeProvider ?? TimeProvider.System;
_timelineEmitter = timelineEmitter;
}
public async Task<AuditEntry> AppendAsync(
@@ -198,6 +202,21 @@ public sealed class PostgresAuditRepository : IAuditRepository
_logger.LogDebug("Audit entry {EntryId} appended for tenant {TenantId}, sequence {Sequence}",
entry.EntryId, tenantId, sequenceNumber);
// DEPRECATE-001: dual-write to Timeline. Fire-and-forget; hash chain is
// service-level evidence that stays local (not emitted) — only the audit
// event summary goes to Timeline for cross-service correlation.
if (_timelineEmitter is not null)
{
try
{
await _timelineEmitter.EmitAsync(MapToTimelinePayload(entry), cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to emit jobengine audit event to Timeline (local write succeeded, entryId={EntryId})", entry.EntryId);
}
}
return entry;
}
catch
@@ -207,6 +226,53 @@ public sealed class PostgresAuditRepository : IAuditRepository
}
}
private static AuditEventPayload MapToTimelinePayload(AuditEntry entry)
{
var details = new Dictionary<string, object?>(StringComparer.Ordinal)
{
["localEntryId"] = entry.EntryId,
["sequenceNumber"] = entry.SequenceNumber,
["contentHash"] = entry.ContentHash,
["httpMethod"] = entry.HttpMethod,
["requestPath"] = entry.RequestPath
};
if (!string.IsNullOrWhiteSpace(entry.OldState))
{
details["oldState"] = entry.OldState;
}
if (!string.IsNullOrWhiteSpace(entry.NewState))
{
details["newState"] = entry.NewState;
}
return new AuditEventPayload
{
Id = $"jobengine-{entry.EntryId}",
Timestamp = entry.OccurredAt,
Module = "jobengine",
Action = entry.EventType.ToString().ToLowerInvariant(),
Severity = "info",
Actor = new AuditActorPayload
{
Id = entry.ActorId ?? "jobengine-system",
Name = entry.ActorId ?? "jobengine-system",
Type = entry.ActorType.ToString().ToLowerInvariant(),
IpAddress = entry.ActorIp,
UserAgent = entry.UserAgent
},
Resource = new AuditResourcePayload
{
Type = entry.ResourceType ?? "jobengine_resource",
Id = entry.ResourceId.ToString()
},
Description = entry.Description ?? entry.EventType.ToString(),
Details = details,
CorrelationId = entry.CorrelationId,
TenantId = entry.TenantId,
Tags = new[] { "jobengine", entry.EventType.ToString().ToLowerInvariant() }
};
}
public async Task<AuditEntry?> GetByIdAsync(
string tenantId,
Guid entryId,

View File

@@ -27,6 +27,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Audit.Emission\StellaOps.Audit.Emission.csproj" />
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Cryptography\StellaOps.Cryptography.csproj" />
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Infrastructure.Postgres\StellaOps.Infrastructure.Postgres.csproj" />
</ItemGroup>