Files
git.stella-ops.org/src/Timeline/StellaOps.Timeline.WebService/Audit/PostgresUnifiedAuditEventStore.cs
master f5a9f874d0 feat(audit): wire AddAuditEmission into 9 services (AUDIT-002)
- Wire StellaOps.Audit.Emission DI in: Authority, Policy, Release-Orchestrator,
  EvidenceLocker, Notify, Scanner, Scheduler, Integrations, Platform
- Add AuditEmission__TimelineBaseUrl to compose defaults
- Endpoint filter annotation deferred to follow-up pass

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 16:20:39 +03:00

484 lines
20 KiB
C#

// Copyright (c) StellaOps. Licensed under the BUSL-1.1.
using System.Data;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Text.Json.Serialization;
using Npgsql;
using NpgsqlTypes;
using StellaOps.Timeline.Core.Postgres;
namespace StellaOps.Timeline.WebService.Audit;
/// <summary>
/// PostgreSQL-backed audit event store with SHA-256 hash chain integrity.
/// Replaces the in-memory <see cref="IngestAuditEventStore"/> to ensure audit events
/// survive service restarts and provide tamper-evident chain verification.
/// </summary>
/// <remarks>
/// Hash chain pattern follows the JobEngine audit_entries design:
/// - Each event gets a content_hash computed from canonical JSON of its fields.
/// - Each event links to the previous event's content_hash via previous_entry_hash.
/// - Sequence numbers are monotonically increasing per tenant.
/// - SERIALIZABLE isolation ensures chain integrity under concurrent writes.
/// </remarks>
public sealed class PostgresUnifiedAuditEventStore
{
private readonly TimelineCoreDataSource _dataSource;
private readonly ILogger<PostgresUnifiedAuditEventStore> _logger;
private static readonly JsonSerializerOptions CanonicalJsonOptions = new()
{
DefaultIgnoreCondition = JsonIgnoreCondition.Never,
WriteIndented = false,
PropertyNamingPolicy = null,
Converters = { new JsonStringEnumConverter(JsonNamingPolicy.CamelCase) }
};
public PostgresUnifiedAuditEventStore(
TimelineCoreDataSource dataSource,
ILogger<PostgresUnifiedAuditEventStore> logger)
{
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>
/// Persists an audit event with hash chain integrity.
/// Uses SERIALIZABLE isolation to prevent concurrent chain corruption.
/// </summary>
public async Task AddAsync(UnifiedAuditEvent auditEvent, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(auditEvent);
var tenantId = auditEvent.TenantId ?? "default";
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken)
.ConfigureAwait(false);
await using var transaction = await connection.BeginTransactionAsync(
IsolationLevel.Serializable, cancellationToken).ConfigureAwait(false);
try
{
// Step 1: Get next sequence number and previous hash atomically
var (sequenceNumber, previousHash) = await GetNextSequenceAsync(
connection, transaction, tenantId, cancellationToken).ConfigureAwait(false);
// Step 2: Compute content hash from canonical JSON
var contentHash = ComputeContentHash(auditEvent, tenantId, sequenceNumber);
// Step 3: Insert the audit event
await InsertEventAsync(
connection, transaction, auditEvent, tenantId,
sequenceNumber, previousHash, contentHash, cancellationToken).ConfigureAwait(false);
// Step 4: Update the sequence tracker with the new hash
await UpdateSequenceHashAsync(
connection, transaction, tenantId, contentHash, cancellationToken).ConfigureAwait(false);
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
_logger.LogDebug(
"Persisted audit event {EventId} for tenant {TenantId} (seq={Seq}, hash={Hash})",
auditEvent.Id, tenantId, sequenceNumber, contentHash[..16]);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogError(ex,
"Failed to persist audit event {EventId} for tenant {TenantId}",
auditEvent.Id, tenantId);
try
{
await transaction.RollbackAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception rollbackEx)
{
_logger.LogWarning(rollbackEx, "Rollback failed for audit event {EventId}", auditEvent.Id);
}
throw;
}
}
/// <summary>
/// Returns all persisted audit events, ordered by timestamp descending.
/// </summary>
public async Task<IReadOnlyList<UnifiedAuditEvent>> GetAllAsync(CancellationToken cancellationToken = default)
{
await using var connection = await _dataSource.OpenSystemConnectionAsync(cancellationToken)
.ConfigureAwait(false);
const string sql = """
SELECT id, tenant_id, timestamp, module, action, severity,
actor_id, actor_name, actor_email, actor_type, actor_ip, actor_user_agent,
resource_type, resource_id, resource_name,
description, details_jsonb, diff_jsonb,
correlation_id, parent_event_id, tags
FROM timeline.unified_audit_events
ORDER BY timestamp DESC, id ASC
LIMIT 10000
""";
await using var command = new NpgsqlCommand(sql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
var events = new List<UnifiedAuditEvent>();
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
events.Add(MapReaderToEvent(reader));
}
return events;
}
/// <summary>
/// Returns persisted audit events for a specific tenant, ordered by timestamp descending.
/// </summary>
public async Task<IReadOnlyList<UnifiedAuditEvent>> GetByTenantAsync(
string tenantId, int limit = 1000, CancellationToken cancellationToken = default)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken)
.ConfigureAwait(false);
const string sql = """
SELECT id, tenant_id, timestamp, module, action, severity,
actor_id, actor_name, actor_email, actor_type, actor_ip, actor_user_agent,
resource_type, resource_id, resource_name,
description, details_jsonb, diff_jsonb,
correlation_id, parent_event_id, tags
FROM timeline.unified_audit_events
WHERE tenant_id = @tenantId
ORDER BY timestamp DESC, id ASC
LIMIT @limit
""";
await using var command = new NpgsqlCommand(sql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenantId", tenantId);
command.Parameters.AddWithValue("limit", Math.Clamp(limit, 1, 10000));
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
var events = new List<UnifiedAuditEvent>();
while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
events.Add(MapReaderToEvent(reader));
}
return events;
}
/// <summary>
/// Verifies the hash chain integrity for a tenant.
/// </summary>
public async Task<AuditChainVerificationResult> VerifyChainAsync(
string tenantId,
long? startSequence = null,
long? endSequence = null,
CancellationToken cancellationToken = default)
{
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken)
.ConfigureAwait(false);
const string sql = """
SELECT is_valid, invalid_event_id, invalid_sequence, error_message
FROM timeline.verify_unified_audit_chain(@tenantId, @startSeq, @endSeq)
""";
await using var command = new NpgsqlCommand(sql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenantId", tenantId);
command.Parameters.AddWithValue("startSeq", startSequence ?? 1L);
command.Parameters.AddWithValue("endSeq", (object?)endSequence ?? DBNull.Value);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
if (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
return new AuditChainVerificationResult
{
IsValid = reader.GetBoolean(0),
InvalidEventId = reader.IsDBNull(1) ? null : reader.GetString(1),
InvalidSequence = reader.IsDBNull(2) ? null : reader.GetInt64(2),
ErrorMessage = reader.IsDBNull(3) ? null : reader.GetString(3)
};
}
// No rows returned means empty chain, which is valid
return new AuditChainVerificationResult { IsValid = true };
}
// ── Private helpers ──────────────────────────────────────────────────────
private static async Task<(long SequenceNumber, string? PreviousHash)> GetNextSequenceAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction,
string tenantId,
CancellationToken cancellationToken)
{
const string sql = "SELECT next_seq, prev_hash FROM timeline.next_unified_audit_sequence(@tenantId)";
await using var command = new NpgsqlCommand(sql, connection, transaction);
command.Parameters.AddWithValue("tenantId", tenantId);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
throw new InvalidOperationException($"Failed to acquire audit sequence for tenant {tenantId}");
}
var sequenceNumber = reader.GetInt64(0);
var previousHash = reader.IsDBNull(1) ? null : reader.GetString(1);
return (sequenceNumber, previousHash);
}
private static async Task InsertEventAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction,
UnifiedAuditEvent auditEvent,
string tenantId,
long sequenceNumber,
string? previousHash,
string contentHash,
CancellationToken cancellationToken)
{
const string sql = """
INSERT INTO timeline.unified_audit_events (
id, tenant_id, timestamp, module, action, severity,
actor_id, actor_name, actor_email, actor_type, actor_ip, actor_user_agent,
resource_type, resource_id, resource_name,
description, details_jsonb, diff_jsonb,
correlation_id, parent_event_id, tags,
content_hash, previous_entry_hash, sequence_number,
created_at
) VALUES (
@id, @tenantId, @timestamp, @module, @action, @severity,
@actorId, @actorName, @actorEmail, @actorType, @actorIp, @actorUserAgent,
@resourceType, @resourceId, @resourceName,
@description, @detailsJsonb::jsonb, @diffJsonb::jsonb,
@correlationId, @parentEventId, @tags,
@contentHash, @previousHash, @sequenceNumber,
NOW()
)
ON CONFLICT (id, tenant_id) DO NOTHING
""";
await using var command = new NpgsqlCommand(sql, connection, transaction);
command.Parameters.AddWithValue("id", auditEvent.Id);
command.Parameters.AddWithValue("tenantId", tenantId);
command.Parameters.AddWithValue("timestamp", auditEvent.Timestamp);
command.Parameters.AddWithValue("module", auditEvent.Module);
command.Parameters.AddWithValue("action", auditEvent.Action);
command.Parameters.AddWithValue("severity", auditEvent.Severity);
command.Parameters.AddWithValue("actorId", auditEvent.Actor.Id);
command.Parameters.AddWithValue("actorName", auditEvent.Actor.Name);
command.Parameters.AddWithValue("actorEmail", (object?)auditEvent.Actor.Email ?? DBNull.Value);
command.Parameters.AddWithValue("actorType", auditEvent.Actor.Type);
command.Parameters.AddWithValue("actorIp", (object?)auditEvent.Actor.IpAddress ?? DBNull.Value);
command.Parameters.AddWithValue("actorUserAgent", (object?)auditEvent.Actor.UserAgent ?? DBNull.Value);
command.Parameters.AddWithValue("resourceType", auditEvent.Resource.Type);
command.Parameters.AddWithValue("resourceId", auditEvent.Resource.Id);
command.Parameters.AddWithValue("resourceName", (object?)auditEvent.Resource.Name ?? DBNull.Value);
command.Parameters.AddWithValue("description", auditEvent.Description);
command.Parameters.AddWithValue("detailsJsonb", SerializeDetails(auditEvent.Details));
command.Parameters.AddWithValue("diffJsonb", (object?)SerializeDiff(auditEvent.Diff) ?? DBNull.Value);
command.Parameters.AddWithValue("correlationId", (object?)auditEvent.CorrelationId ?? DBNull.Value);
command.Parameters.AddWithValue("parentEventId", (object?)auditEvent.ParentEventId ?? DBNull.Value);
command.Parameters.Add(new NpgsqlParameter("tags", NpgsqlDbType.Array | NpgsqlDbType.Text)
{
Value = auditEvent.Tags.ToArray()
});
command.Parameters.AddWithValue("contentHash", contentHash);
command.Parameters.AddWithValue("previousHash", (object?)previousHash ?? DBNull.Value);
command.Parameters.AddWithValue("sequenceNumber", sequenceNumber);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private static async Task UpdateSequenceHashAsync(
NpgsqlConnection connection,
NpgsqlTransaction transaction,
string tenantId,
string contentHash,
CancellationToken cancellationToken)
{
const string sql = "SELECT timeline.update_unified_audit_sequence_hash(@tenantId, @contentHash)";
await using var command = new NpgsqlCommand(sql, connection, transaction);
command.Parameters.AddWithValue("tenantId", tenantId);
command.Parameters.AddWithValue("contentHash", contentHash);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Computes a SHA-256 content hash from the canonical JSON representation of an audit event.
/// Fields are sorted lexicographically for deterministic hashing.
/// </summary>
internal static string ComputeContentHash(UnifiedAuditEvent auditEvent, string tenantId, long sequenceNumber)
{
// Build canonical representation with sorted keys
var canonical = new
{
Action = auditEvent.Action,
ActorId = auditEvent.Actor.Id,
ActorName = auditEvent.Actor.Name,
ActorType = auditEvent.Actor.Type,
CorrelationId = auditEvent.CorrelationId,
Description = auditEvent.Description,
Id = auditEvent.Id,
Module = auditEvent.Module,
ResourceId = auditEvent.Resource.Id,
ResourceType = auditEvent.Resource.Type,
SequenceNumber = sequenceNumber,
Severity = auditEvent.Severity,
TenantId = tenantId,
Timestamp = auditEvent.Timestamp
};
var canonicalJson = ToCanonicalJson(canonical);
var bytes = Encoding.UTF8.GetBytes(canonicalJson);
var hash = SHA256.HashData(bytes);
return Convert.ToHexStringLower(hash);
}
private static string ToCanonicalJson<T>(T value)
{
var node = JsonSerializer.SerializeToNode(value, CanonicalJsonOptions) ?? new JsonObject();
var ordered = OrderNode(node.DeepClone());
return ordered.ToJsonString(CanonicalJsonOptions);
}
private static JsonNode OrderNode(JsonNode node)
{
switch (node)
{
case JsonObject obj:
var orderedObj = new JsonObject();
foreach (var kvp in obj.OrderBy(x => x.Key, StringComparer.Ordinal))
{
orderedObj.Add(kvp.Key, kvp.Value is null ? null : OrderNode(kvp.Value.DeepClone()));
}
return orderedObj;
case JsonArray arr:
var orderedArr = new JsonArray();
foreach (var item in arr)
{
orderedArr.Add(item is null ? null : OrderNode(item.DeepClone()));
}
return orderedArr;
default:
return node.DeepClone();
}
}
private static string SerializeDetails(IReadOnlyDictionary<string, object?> details)
{
return JsonSerializer.Serialize(details, CanonicalJsonOptions);
}
private static string? SerializeDiff(UnifiedAuditDiff? diff)
{
if (diff is null)
{
return null;
}
return JsonSerializer.Serialize(diff, CanonicalJsonOptions);
}
private static UnifiedAuditEvent MapReaderToEvent(NpgsqlDataReader reader)
{
var detailsJson = reader.IsDBNull(16) ? "{}" : reader.GetString(16);
var diffJson = reader.IsDBNull(17) ? null : reader.GetString(17);
var tagsArray = reader.IsDBNull(20) ? Array.Empty<string>() : reader.GetFieldValue<string[]>(20);
return new UnifiedAuditEvent
{
Id = reader.GetString(0),
TenantId = reader.GetString(1),
Timestamp = reader.GetFieldValue<DateTimeOffset>(2),
Module = reader.GetString(3),
Action = reader.GetString(4),
Severity = reader.GetString(5),
Actor = new UnifiedAuditActor
{
Id = reader.GetString(6),
Name = reader.GetString(7),
Email = reader.IsDBNull(8) ? null : reader.GetString(8),
Type = reader.GetString(9),
IpAddress = reader.IsDBNull(10) ? null : reader.GetString(10),
UserAgent = reader.IsDBNull(11) ? null : reader.GetString(11)
},
Resource = new UnifiedAuditResource
{
Type = reader.GetString(12),
Id = reader.GetString(13),
Name = reader.IsDBNull(14) ? null : reader.GetString(14)
},
Description = reader.GetString(15),
Details = DeserializeDetails(detailsJson),
Diff = DeserializeDiff(diffJson),
CorrelationId = reader.IsDBNull(18) ? null : reader.GetString(18),
ParentEventId = reader.IsDBNull(19) ? null : reader.GetString(19),
Tags = tagsArray
};
}
private static IReadOnlyDictionary<string, object?> DeserializeDetails(string json)
{
try
{
return JsonSerializer.Deserialize<Dictionary<string, object?>>(json) ??
new Dictionary<string, object?>();
}
catch (JsonException)
{
return new Dictionary<string, object?>();
}
}
private static UnifiedAuditDiff? DeserializeDiff(string? json)
{
if (string.IsNullOrWhiteSpace(json))
{
return null;
}
try
{
return JsonSerializer.Deserialize<UnifiedAuditDiff>(json);
}
catch (JsonException)
{
return null;
}
}
}
/// <summary>
/// Result of hash chain verification for a tenant's audit events.
/// </summary>
public sealed record AuditChainVerificationResult
{
/// <summary>Whether the chain is intact.</summary>
public required bool IsValid { get; init; }
/// <summary>Event ID where the chain breaks, if any.</summary>
public string? InvalidEventId { get; init; }
/// <summary>Sequence number where the chain breaks, if any.</summary>
public long? InvalidSequence { get; init; }
/// <summary>Human-readable error message if the chain is broken.</summary>
public string? ErrorMessage { get; init; }
}