save checkpoint: save features
This commit is contained in:
@@ -136,7 +136,29 @@ public sealed record SurfacePointersPayload
|
||||
public SurfaceManifestDocument Manifest { get; init; } = new();
|
||||
}
|
||||
|
||||
public sealed record ScanCompletedEventPayload
|
||||
{
|
||||
[JsonPropertyName("scanId")]
|
||||
public string ScanId { get; init; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("reportId")]
|
||||
public string? ReportId { get; init; }
|
||||
|
||||
[JsonPropertyName("imageDigest")]
|
||||
public string? ImageDigest { get; init; }
|
||||
|
||||
[JsonPropertyName("generatedAt")]
|
||||
public DateTimeOffset? GeneratedAt { get; init; }
|
||||
|
||||
[JsonPropertyName("report")]
|
||||
public ReportDocumentPayload? Report { get; init; }
|
||||
|
||||
[JsonPropertyName("reportReady")]
|
||||
public ReportReadyEventPayload? ReportReady { get; init; }
|
||||
}
|
||||
|
||||
public static class OrchestratorEventKinds
|
||||
{
|
||||
public const string ScannerReportReady = "scanner.event.report.ready";
|
||||
public const string ScannerScanCompleted = "scanner.scan.completed";
|
||||
}
|
||||
|
||||
@@ -55,6 +55,8 @@ public sealed class AnalyticsStreamOptions
|
||||
public string ConcelierLinksetStream { get; set; } = "concelier:advisory.linkset.updated:v1";
|
||||
public string AttestorStream { get; set; } = "attestor:events";
|
||||
public bool StartFromBeginning { get; set; } = false;
|
||||
public bool ResumeFromCheckpoint { get; set; } = true;
|
||||
public string? ScannerCheckpointFilePath { get; set; }
|
||||
|
||||
public void Normalize()
|
||||
{
|
||||
@@ -62,6 +64,9 @@ public sealed class AnalyticsStreamOptions
|
||||
ConcelierObservationStream = NormalizeName(ConcelierObservationStream);
|
||||
ConcelierLinksetStream = NormalizeName(ConcelierLinksetStream);
|
||||
AttestorStream = NormalizeName(AttestorStream);
|
||||
ScannerCheckpointFilePath = string.IsNullOrWhiteSpace(ScannerCheckpointFilePath)
|
||||
? null
|
||||
: ScannerCheckpointFilePath.Trim();
|
||||
}
|
||||
|
||||
private static string NormalizeName(string value)
|
||||
|
||||
@@ -31,6 +31,8 @@ public sealed class AnalyticsIngestionService : BackgroundService
|
||||
private readonly IVulnerabilityCorrelationService? _correlationService;
|
||||
private readonly ILogger<AnalyticsIngestionService> _logger;
|
||||
private readonly IEventStream<OrchestratorEventEnvelope>? _eventStream;
|
||||
private readonly string? _scannerCheckpointFilePath;
|
||||
private readonly SemaphoreSlim _scannerCheckpointLock = new(1, 1);
|
||||
private readonly JsonSerializerOptions _jsonOptions = new()
|
||||
{
|
||||
PropertyNameCaseInsensitive = true
|
||||
@@ -52,6 +54,7 @@ public sealed class AnalyticsIngestionService : BackgroundService
|
||||
_sbomParser = sbomParser ?? throw new ArgumentNullException(nameof(sbomParser));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_correlationService = correlationService;
|
||||
_scannerCheckpointFilePath = ResolveScannerCheckpointPath(_options.Streams, _options.Cas);
|
||||
|
||||
if (eventStreamFactory is not null && !string.IsNullOrWhiteSpace(_options.Streams.ScannerStream))
|
||||
{
|
||||
@@ -76,9 +79,7 @@ public sealed class AnalyticsIngestionService : BackgroundService
|
||||
return;
|
||||
}
|
||||
|
||||
var position = _options.Streams.StartFromBeginning
|
||||
? StreamPosition.Beginning
|
||||
: StreamPosition.End;
|
||||
var position = await ResolveScannerSubscriptionPositionAsync(stoppingToken).ConfigureAwait(false);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Analytics ingestion started; subscribing to {StreamName} from {Position}.",
|
||||
@@ -90,6 +91,7 @@ public sealed class AnalyticsIngestionService : BackgroundService
|
||||
await foreach (var streamEvent in _eventStream.SubscribeAsync(position, stoppingToken))
|
||||
{
|
||||
await HandleEventAsync(streamEvent.Event, stoppingToken).ConfigureAwait(false);
|
||||
await PersistScannerCheckpointAsync(streamEvent.EntryId, stoppingToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
@@ -105,7 +107,7 @@ public sealed class AnalyticsIngestionService : BackgroundService
|
||||
|
||||
private async Task HandleEventAsync(OrchestratorEventEnvelope envelope, CancellationToken cancellationToken)
|
||||
{
|
||||
if (!string.Equals(envelope.Kind, OrchestratorEventKinds.ScannerReportReady, StringComparison.OrdinalIgnoreCase))
|
||||
if (!IsSupportedScannerEventKind(envelope.Kind))
|
||||
{
|
||||
return;
|
||||
}
|
||||
@@ -116,32 +118,393 @@ public sealed class AnalyticsIngestionService : BackgroundService
|
||||
return;
|
||||
}
|
||||
|
||||
if (envelope.Payload is null || envelope.Payload.Value.ValueKind == JsonValueKind.Undefined)
|
||||
if (!TryResolveScannerPayload(envelope, _jsonOptions, out var payload, out var parseError))
|
||||
{
|
||||
_logger.LogWarning("Scanner report event {EventId} missing payload.", envelope.EventId);
|
||||
return;
|
||||
}
|
||||
|
||||
ReportReadyEventPayload? payload;
|
||||
try
|
||||
{
|
||||
payload = envelope.Payload.Value.Deserialize<ReportReadyEventPayload>(_jsonOptions);
|
||||
}
|
||||
catch (JsonException ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Failed to parse scanner report payload for event {EventId}.", envelope.EventId);
|
||||
return;
|
||||
}
|
||||
|
||||
if (payload is null)
|
||||
{
|
||||
_logger.LogWarning("Scanner report payload empty for event {EventId}.", envelope.EventId);
|
||||
_logger.LogWarning(
|
||||
"Failed to parse scanner payload for event {EventId} ({Kind}); reason={Reason}.",
|
||||
envelope.EventId,
|
||||
envelope.Kind,
|
||||
parseError ?? "unknown");
|
||||
return;
|
||||
}
|
||||
|
||||
await IngestSbomAsync(envelope, payload, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task<StreamPosition> ResolveScannerSubscriptionPositionAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var checkpointEntryId = await ReadScannerCheckpointAsync(cancellationToken).ConfigureAwait(false);
|
||||
var position = ResolveScannerSubscriptionPosition(
|
||||
_options.Streams.StartFromBeginning,
|
||||
_options.Streams.ResumeFromCheckpoint,
|
||||
checkpointEntryId);
|
||||
|
||||
if (position != StreamPosition.Beginning && position != StreamPosition.End)
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"Resuming scanner ingestion from checkpoint entry {EntryId}.",
|
||||
position.Value);
|
||||
}
|
||||
|
||||
return position;
|
||||
}
|
||||
|
||||
internal static bool IsSupportedScannerEventKind(string? eventKind)
|
||||
{
|
||||
return string.Equals(eventKind, OrchestratorEventKinds.ScannerReportReady, StringComparison.OrdinalIgnoreCase)
|
||||
|| string.Equals(eventKind, OrchestratorEventKinds.ScannerScanCompleted, StringComparison.OrdinalIgnoreCase);
|
||||
}
|
||||
|
||||
internal static bool TryResolveScannerPayload(
|
||||
OrchestratorEventEnvelope envelope,
|
||||
JsonSerializerOptions serializerOptions,
|
||||
out ReportReadyEventPayload payload,
|
||||
out string? error)
|
||||
{
|
||||
payload = new ReportReadyEventPayload();
|
||||
error = null;
|
||||
|
||||
if (envelope.Payload is null ||
|
||||
envelope.Payload.Value.ValueKind == JsonValueKind.Undefined ||
|
||||
envelope.Payload.Value.ValueKind == JsonValueKind.Null)
|
||||
{
|
||||
error = "missing_payload";
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!TryDeserializeScannerPayload(envelope.Payload.Value, envelope.Kind, serializerOptions, out payload))
|
||||
{
|
||||
error = "payload_parse_failed";
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
internal static bool TryDeserializeScannerPayload(
|
||||
JsonElement payloadElement,
|
||||
string eventKind,
|
||||
JsonSerializerOptions serializerOptions,
|
||||
out ReportReadyEventPayload payload)
|
||||
{
|
||||
payload = new ReportReadyEventPayload();
|
||||
|
||||
if (TryDeserializeReportReadyPayload(payloadElement, serializerOptions, out payload))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!string.Equals(eventKind, OrchestratorEventKinds.ScannerScanCompleted, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return TryMapScanCompletedPayload(payloadElement, serializerOptions, out payload);
|
||||
}
|
||||
|
||||
internal static bool TryDeserializeReportReadyPayload(
|
||||
JsonElement payloadElement,
|
||||
JsonSerializerOptions serializerOptions,
|
||||
out ReportReadyEventPayload payload)
|
||||
{
|
||||
payload = new ReportReadyEventPayload();
|
||||
|
||||
if (TryDeserializeReportReadyObject(payloadElement, serializerOptions, out payload))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if (payloadElement.TryGetProperty("reportReady", out var reportReadyElement) &&
|
||||
TryDeserializeReportReadyObject(reportReadyElement, serializerOptions, out payload))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if (payloadElement.TryGetProperty("dsseEnvelope", out var dsseEnvelopeElement) &&
|
||||
TryDeserializeReportReadyDsseEnvelope(dsseEnvelopeElement, serializerOptions, out payload))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
return TryDeserializeReportReadyDsseEnvelope(payloadElement, serializerOptions, out payload);
|
||||
}
|
||||
|
||||
internal static bool TryDeserializeReportReadyDsseEnvelope(
|
||||
JsonElement envelopeElement,
|
||||
JsonSerializerOptions serializerOptions,
|
||||
out ReportReadyEventPayload payload)
|
||||
{
|
||||
payload = new ReportReadyEventPayload();
|
||||
|
||||
try
|
||||
{
|
||||
if (!TryExtractDssePayload(envelopeElement, out var payloadBytes, out _))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
var parsed = JsonSerializer.Deserialize<ReportReadyEventPayload>(payloadBytes, serializerOptions);
|
||||
if (!IsUsableReportReadyPayload(parsed))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
payload = parsed!;
|
||||
return true;
|
||||
}
|
||||
catch (JsonException)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
catch (FormatException)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
internal static bool TryExtractDssePayload(
|
||||
JsonElement envelopeElement,
|
||||
out byte[] payloadBytes,
|
||||
out string? payloadType)
|
||||
{
|
||||
payloadBytes = Array.Empty<byte>();
|
||||
payloadType = null;
|
||||
|
||||
if (!envelopeElement.TryGetProperty("payload", out var payloadElement) ||
|
||||
payloadElement.ValueKind != JsonValueKind.String)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
var payloadValue = payloadElement.GetString();
|
||||
if (string.IsNullOrWhiteSpace(payloadValue))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
payloadBytes = Convert.FromBase64String(payloadValue);
|
||||
if (envelopeElement.TryGetProperty("payloadType", out var payloadTypeElement) &&
|
||||
payloadTypeElement.ValueKind == JsonValueKind.String)
|
||||
{
|
||||
payloadType = payloadTypeElement.GetString();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
internal static bool TryMapScanCompletedPayload(
|
||||
JsonElement payloadElement,
|
||||
JsonSerializerOptions serializerOptions,
|
||||
out ReportReadyEventPayload payload)
|
||||
{
|
||||
payload = new ReportReadyEventPayload();
|
||||
|
||||
try
|
||||
{
|
||||
var completed = JsonSerializer.Deserialize<ScanCompletedEventPayload>(payloadElement, serializerOptions);
|
||||
if (completed is null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (IsUsableReportReadyPayload(completed.ReportReady))
|
||||
{
|
||||
payload = completed.ReportReady!;
|
||||
return true;
|
||||
}
|
||||
|
||||
if (completed.Report is { Surface: not null } report)
|
||||
{
|
||||
payload = new ReportReadyEventPayload
|
||||
{
|
||||
ReportId = FirstNonEmpty(completed.ReportId, report.ReportId, completed.ScanId),
|
||||
ScanId = string.IsNullOrWhiteSpace(completed.ScanId) ? null : completed.ScanId,
|
||||
ImageDigest = FirstNonEmpty(completed.ImageDigest, report.ImageDigest),
|
||||
GeneratedAt = completed.GeneratedAt ?? report.GeneratedAt,
|
||||
Summary = new ReportSummaryPayload(),
|
||||
Report = report
|
||||
};
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
catch (JsonException)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return TryDeserializeReportReadyDsseEnvelope(payloadElement, serializerOptions, out payload);
|
||||
}
|
||||
|
||||
internal static StreamPosition ResolveScannerSubscriptionPosition(
|
||||
bool startFromBeginning,
|
||||
bool resumeFromCheckpoint,
|
||||
string? checkpointEntryId)
|
||||
{
|
||||
if (startFromBeginning)
|
||||
{
|
||||
return StreamPosition.Beginning;
|
||||
}
|
||||
|
||||
if (!resumeFromCheckpoint)
|
||||
{
|
||||
return StreamPosition.End;
|
||||
}
|
||||
|
||||
var normalizedEntryId = NormalizeScannerCheckpointEntryId(checkpointEntryId);
|
||||
return string.IsNullOrWhiteSpace(normalizedEntryId)
|
||||
? StreamPosition.End
|
||||
: StreamPosition.After(normalizedEntryId);
|
||||
}
|
||||
|
||||
internal static string? NormalizeScannerCheckpointEntryId(string? entryId)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(entryId))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var trimmed = entryId.Trim();
|
||||
if (trimmed.Length == 0 ||
|
||||
trimmed.Equals(StreamPosition.Beginning.Value, StringComparison.Ordinal) ||
|
||||
trimmed.Equals(StreamPosition.End.Value, StringComparison.Ordinal))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
return trimmed;
|
||||
}
|
||||
|
||||
internal static string? ResolveScannerCheckpointPath(
|
||||
AnalyticsStreamOptions streamOptions,
|
||||
AnalyticsCasOptions casOptions)
|
||||
{
|
||||
if (!streamOptions.ResumeFromCheckpoint)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(streamOptions.ScannerCheckpointFilePath))
|
||||
{
|
||||
var configuredPath = streamOptions.ScannerCheckpointFilePath!;
|
||||
if (Path.IsPathRooted(configuredPath) || string.IsNullOrWhiteSpace(casOptions.RootPath))
|
||||
{
|
||||
return configuredPath;
|
||||
}
|
||||
|
||||
return Path.Combine(casOptions.RootPath!, configuredPath);
|
||||
}
|
||||
|
||||
if (string.IsNullOrWhiteSpace(casOptions.RootPath))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
return Path.Combine(casOptions.RootPath, ".state", "platform-scanner-stream.checkpoint");
|
||||
}
|
||||
|
||||
private async Task<string?> ReadScannerCheckpointAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(_scannerCheckpointFilePath) || !File.Exists(_scannerCheckpointFilePath))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var entryId = await File.ReadAllTextAsync(_scannerCheckpointFilePath, cancellationToken).ConfigureAwait(false);
|
||||
return NormalizeScannerCheckpointEntryId(entryId);
|
||||
}
|
||||
catch (IOException ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Failed to read scanner checkpoint from {Path}.", _scannerCheckpointFilePath);
|
||||
return null;
|
||||
}
|
||||
catch (UnauthorizedAccessException ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Unauthorized to read scanner checkpoint from {Path}.", _scannerCheckpointFilePath);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task PersistScannerCheckpointAsync(string entryId, CancellationToken cancellationToken)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(_scannerCheckpointFilePath))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var normalizedEntryId = NormalizeScannerCheckpointEntryId(entryId);
|
||||
if (string.IsNullOrWhiteSpace(normalizedEntryId))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await _scannerCheckpointLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
var directory = Path.GetDirectoryName(_scannerCheckpointFilePath);
|
||||
if (!string.IsNullOrWhiteSpace(directory))
|
||||
{
|
||||
Directory.CreateDirectory(directory);
|
||||
}
|
||||
|
||||
await File.WriteAllTextAsync(_scannerCheckpointFilePath, normalizedEntryId, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (IOException ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Failed to persist scanner checkpoint to {Path}.", _scannerCheckpointFilePath);
|
||||
}
|
||||
catch (UnauthorizedAccessException ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Unauthorized to persist scanner checkpoint to {Path}.", _scannerCheckpointFilePath);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_scannerCheckpointLock.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private static bool IsUsableReportReadyPayload(ReportReadyEventPayload? payload)
|
||||
=> payload is not null && payload.Report is { Surface: not null };
|
||||
|
||||
private static bool TryDeserializeReportReadyObject(
|
||||
JsonElement payloadElement,
|
||||
JsonSerializerOptions serializerOptions,
|
||||
out ReportReadyEventPayload payload)
|
||||
{
|
||||
payload = new ReportReadyEventPayload();
|
||||
|
||||
try
|
||||
{
|
||||
var parsed = JsonSerializer.Deserialize<ReportReadyEventPayload>(payloadElement, serializerOptions);
|
||||
if (!IsUsableReportReadyPayload(parsed))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
payload = parsed!;
|
||||
return true;
|
||||
}
|
||||
catch (JsonException)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private static string FirstNonEmpty(params string?[] values)
|
||||
{
|
||||
foreach (var value in values)
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(value))
|
||||
{
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
return string.Empty;
|
||||
}
|
||||
|
||||
private async Task IngestSbomAsync(
|
||||
OrchestratorEventEnvelope envelope,
|
||||
ReportReadyEventPayload payload,
|
||||
|
||||
@@ -1,8 +1,16 @@
|
||||
# StellaOps.Platform.Analytics Task Board
|
||||
# StellaOps.Platform.Analytics Task Board
|
||||
This board mirrors active sprint tasks for this module.
|
||||
Source of truth: `docs/implplan/SPRINT_20260130_002_Tools_csproj_remediation_solid_review.md`.
|
||||
|
||||
| Task ID | Status | Notes |
|
||||
| --- | --- | --- |
|
||||
| QA-PLATFORM-VERIFY-001 | DONE | run-002 verification captured analytics rollup/materialized-view behavior evidence; feature terminalized as `not_implemented` due missing advisory lock/LISTEN-NOTIFY parity. |
|
||||
| QA-PLATFORM-VERIFY-002 | DONE | run-001 verification passed with maintenance, endpoint paths, analytics service behavior, and Docker schema integration (`38/38` scoped tests). |
|
||||
| QA-PLATFORM-VERIFY-003 | DONE | `platform-service-aggregation-layer` verified with run-001 Tier 0/1/2 endpoint evidence and moved to `docs/features/checked/platform/`. |
|
||||
| QA-PLATFORM-VERIFY-004 | DONE | `platform-setup-wizard-backend-api` verified with run-001 setup endpoint behavior evidence and moved to `docs/features/checked/platform/`. |
|
||||
| QA-PLATFORM-VERIFY-005 | DONE | `sbom-analytics-lake` verified with run-001 Tier 0/1/2 analytics ingestion/schema integration evidence plus tenant allowlist normalization coverage (`171/171` full-suite execution under MTP), moved to `docs/features/checked/platform/`. |
|
||||
| QA-PLATFORM-VERIFY-006 | DONE | `scanner-platform-events` remediated and reverified in run-003: scanner ingestion now supports `scanner.scan.completed`, DSSE scanner payload parsing, and persisted checkpoint resume semantics; Tier 1 (`185/185`) and Tier 2 (`38/38`) passed; feature moved to `docs/features/checked/platform/scanner-platform-events.md`. |
|
||||
| REMED-05 | TODO | Remediation checklist: docs/implplan/audits/csproj-standards/remediation/checklists/src/Platform/StellaOps.Platform.Analytics/StellaOps.Platform.Analytics.md. |
|
||||
| REMED-06 | DONE | SOLID review notes captured for SPRINT_20260130_002. |
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user