Refactor SurfaceCacheValidator to simplify oldest entry calculation
Add global using for Xunit in test project Enhance ImportValidatorTests with async validation and quarantine checks Implement FileSystemQuarantineServiceTests for quarantine functionality Add integration tests for ImportValidator to check monotonicity Create BundleVersionTests to validate version parsing and comparison logic Implement VersionMonotonicityCheckerTests for monotonicity checks and activation logic
This commit is contained in:
@@ -0,0 +1,44 @@
|
||||
namespace StellaOps.Signals.Options;
|
||||
|
||||
/// <summary>
|
||||
/// Configuration for the unknowns rescan background worker.
|
||||
/// </summary>
|
||||
public sealed class UnknownsRescanOptions
|
||||
{
|
||||
public const string SectionName = "Signals:UnknownsRescan";
|
||||
|
||||
/// <summary>
|
||||
/// Whether the rescan worker is enabled. Default: true
|
||||
/// </summary>
|
||||
public bool Enabled { get; set; } = true;
|
||||
|
||||
/// <summary>
|
||||
/// Poll interval for checking due items. Default: 60 seconds
|
||||
/// </summary>
|
||||
public TimeSpan PollInterval { get; set; } = TimeSpan.FromSeconds(60);
|
||||
|
||||
/// <summary>
|
||||
/// Maximum HOT items to process per poll cycle. Default: 50
|
||||
/// </summary>
|
||||
public int HotBatchSize { get; set; } = 50;
|
||||
|
||||
/// <summary>
|
||||
/// Maximum WARM items to process per poll cycle. Default: 100
|
||||
/// </summary>
|
||||
public int WarmBatchSize { get; set; } = 100;
|
||||
|
||||
/// <summary>
|
||||
/// Maximum COLD items to process in weekly batch. Default: 500
|
||||
/// </summary>
|
||||
public int ColdBatchSize { get; set; } = 500;
|
||||
|
||||
/// <summary>
|
||||
/// Day of week for COLD batch processing. Default: Sunday
|
||||
/// </summary>
|
||||
public DayOfWeek ColdBatchDay { get; set; } = DayOfWeek.Sunday;
|
||||
|
||||
/// <summary>
|
||||
/// Hour (UTC) for COLD batch processing. Default: 3 (3 AM UTC)
|
||||
/// </summary>
|
||||
public int ColdBatchHourUtc { get; set; } = 3;
|
||||
}
|
||||
@@ -251,6 +251,102 @@ public sealed class SimpleJsonCallgraphParser : ICallgraphParser
|
||||
return true;
|
||||
}
|
||||
|
||||
private static bool TryParseFlatGraph(JsonElement root, out CallgraphParseResult result)
|
||||
{
|
||||
result = default!;
|
||||
|
||||
// Flat graph format: array of edges only, nodes derived from edge endpoints
|
||||
if (root.ValueKind != JsonValueKind.Array)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
var edges = new List<CallgraphEdge>();
|
||||
var uniqueNodeIds = new HashSet<string>(StringComparer.Ordinal);
|
||||
|
||||
foreach (var edgeElement in root.EnumerateArray())
|
||||
{
|
||||
var source = GetString(edgeElement, "source", "from");
|
||||
var target = GetString(edgeElement, "target", "to");
|
||||
if (string.IsNullOrWhiteSpace(source) || string.IsNullOrWhiteSpace(target))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
uniqueNodeIds.Add(source.Trim());
|
||||
uniqueNodeIds.Add(target.Trim());
|
||||
|
||||
edges.Add(new CallgraphEdge
|
||||
{
|
||||
SourceId = source.Trim(),
|
||||
TargetId = target.Trim(),
|
||||
Type = GetString(edgeElement, "type", "kind") ?? "call",
|
||||
Purl = GetString(edgeElement, "purl"),
|
||||
SymbolDigest = GetString(edgeElement, "symbol_digest", "symbolDigest"),
|
||||
Candidates = GetStringArray(edgeElement, "candidates"),
|
||||
Confidence = GetNullableDouble(edgeElement, "confidence"),
|
||||
Evidence = GetStringArray(edgeElement, "evidence")
|
||||
});
|
||||
}
|
||||
|
||||
if (edges.Count == 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
var nodes = new List<CallgraphNode>();
|
||||
foreach (var nodeId in uniqueNodeIds)
|
||||
{
|
||||
nodes.Add(new CallgraphNode { Id = nodeId, Name = nodeId, Kind = "function" });
|
||||
}
|
||||
|
||||
result = new CallgraphParseResult(
|
||||
nodes,
|
||||
edges,
|
||||
Array.Empty<CallgraphRoot>(),
|
||||
"1.0",
|
||||
"1.0",
|
||||
null);
|
||||
return true;
|
||||
}
|
||||
|
||||
private static IReadOnlyList<CallgraphEntrypoint> ParseEntrypoints(JsonElement root)
|
||||
{
|
||||
if (!root.TryGetProperty("entrypoints", out var entrypointsEl) || entrypointsEl.ValueKind != JsonValueKind.Array)
|
||||
{
|
||||
return Array.Empty<CallgraphEntrypoint>();
|
||||
}
|
||||
|
||||
var entrypoints = new List<CallgraphEntrypoint>(entrypointsEl.GetArrayLength());
|
||||
var order = 0;
|
||||
foreach (var ep in entrypointsEl.EnumerateArray())
|
||||
{
|
||||
var nodeId = GetString(ep, "nodeId", "node_id");
|
||||
if (string.IsNullOrWhiteSpace(nodeId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var kindStr = GetString(ep, "kind") ?? "unknown";
|
||||
var phaseStr = GetString(ep, "phase") ?? "runtime";
|
||||
var frameworkStr = GetString(ep, "framework") ?? "unknown";
|
||||
|
||||
entrypoints.Add(new CallgraphEntrypoint
|
||||
{
|
||||
NodeId = nodeId.Trim(),
|
||||
Kind = Enum.TryParse<EntrypointKind>(kindStr, true, out var kind) ? kind : EntrypointKind.Unknown,
|
||||
Phase = Enum.TryParse<EntrypointPhase>(phaseStr, true, out var phase) ? phase : EntrypointPhase.Runtime,
|
||||
Framework = Enum.TryParse<EntrypointFramework>(frameworkStr, true, out var framework) ? framework : EntrypointFramework.Unknown,
|
||||
Route = GetString(ep, "route"),
|
||||
HttpMethod = GetString(ep, "httpMethod", "http_method"),
|
||||
Source = GetString(ep, "source"),
|
||||
Order = order++
|
||||
});
|
||||
}
|
||||
|
||||
return entrypoints;
|
||||
}
|
||||
|
||||
private static IReadOnlyList<CallgraphRoot> ParseRoots(JsonElement root)
|
||||
{
|
||||
if (!root.TryGetProperty("roots", out var rootsEl) || rootsEl.ValueKind != JsonValueKind.Array)
|
||||
|
||||
@@ -28,4 +28,18 @@ public interface IUnknownsRepository
|
||||
UnknownsBand band,
|
||||
int limit,
|
||||
CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Queries unknowns with optional band filter and pagination.
|
||||
/// </summary>
|
||||
Task<IReadOnlyList<UnknownSymbolDocument>> QueryAsync(
|
||||
UnknownsBand? band,
|
||||
int limit,
|
||||
int offset,
|
||||
CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Gets a single unknown by its ID.
|
||||
/// </summary>
|
||||
Task<UnknownSymbolDocument?> GetByIdAsync(string id, CancellationToken cancellationToken);
|
||||
}
|
||||
|
||||
@@ -94,6 +94,44 @@ public sealed class InMemoryUnknownsRepository : IUnknownsRepository
|
||||
return Task.FromResult<IReadOnlyList<UnknownSymbolDocument>>(results);
|
||||
}
|
||||
|
||||
public Task<IReadOnlyList<UnknownSymbolDocument>> QueryAsync(
|
||||
UnknownsBand? band,
|
||||
int limit,
|
||||
int offset,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var query = _store.Values.SelectMany(x => x);
|
||||
|
||||
if (band.HasValue)
|
||||
{
|
||||
query = query.Where(u => u.Band == band.Value);
|
||||
}
|
||||
|
||||
var results = query
|
||||
.OrderByDescending(u => u.Score)
|
||||
.ThenBy(u => u.Id, StringComparer.OrdinalIgnoreCase)
|
||||
.Skip(offset)
|
||||
.Take(limit)
|
||||
.Select(Clone)
|
||||
.ToList();
|
||||
|
||||
return Task.FromResult<IReadOnlyList<UnknownSymbolDocument>>(results);
|
||||
}
|
||||
|
||||
public Task<UnknownSymbolDocument?> GetByIdAsync(string id, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(id);
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var item = _store.Values
|
||||
.SelectMany(x => x)
|
||||
.FirstOrDefault(u => string.Equals(u.Id, id, StringComparison.OrdinalIgnoreCase));
|
||||
|
||||
return Task.FromResult(item is not null ? Clone(item) : null);
|
||||
}
|
||||
|
||||
private static UnknownSymbolDocument Clone(UnknownSymbolDocument source) => new()
|
||||
{
|
||||
Id = source.Id,
|
||||
|
||||
@@ -743,6 +743,91 @@ signalsGroup.MapGet("/unknowns/{subjectKey}", async Task<IResult> (
|
||||
return items.Count == 0 ? Results.NotFound() : Results.Ok(items);
|
||||
}).WithName("SignalsUnknownsGet");
|
||||
|
||||
signalsGroup.MapGet("/unknowns", async Task<IResult> (
|
||||
HttpContext context,
|
||||
SignalsOptions options,
|
||||
IUnknownsRepository repository,
|
||||
SignalsSealedModeMonitor sealedModeMonitor,
|
||||
[FromQuery] string? band,
|
||||
[FromQuery] int limit = 100,
|
||||
[FromQuery] int offset = 0,
|
||||
CancellationToken cancellationToken = default) =>
|
||||
{
|
||||
if (!Program.TryAuthorize(context, SignalsPolicies.Read, options.Authority.AllowAnonymousFallback, out var authFailure))
|
||||
{
|
||||
return authFailure ?? Results.Unauthorized();
|
||||
}
|
||||
|
||||
if (!Program.TryEnsureSealedMode(sealedModeMonitor, out var sealedFailure))
|
||||
{
|
||||
return sealedFailure ?? Results.StatusCode(StatusCodes.Status503ServiceUnavailable);
|
||||
}
|
||||
|
||||
limit = Math.Clamp(limit, 1, 1000);
|
||||
offset = Math.Max(0, offset);
|
||||
|
||||
UnknownsBand? bandFilter = null;
|
||||
if (!string.IsNullOrWhiteSpace(band) && Enum.TryParse<UnknownsBand>(band, ignoreCase: true, out var parsedBand))
|
||||
{
|
||||
bandFilter = parsedBand;
|
||||
}
|
||||
|
||||
var items = await repository.QueryAsync(bandFilter, limit, offset, cancellationToken).ConfigureAwait(false);
|
||||
return Results.Ok(new
|
||||
{
|
||||
items,
|
||||
count = items.Count,
|
||||
limit,
|
||||
offset,
|
||||
band = bandFilter?.ToString().ToLowerInvariant()
|
||||
});
|
||||
}).WithName("SignalsUnknownsQuery");
|
||||
|
||||
signalsGroup.MapGet("/unknowns/{id}/explain", async Task<IResult> (
|
||||
HttpContext context,
|
||||
SignalsOptions options,
|
||||
string id,
|
||||
IUnknownsRepository repository,
|
||||
IUnknownsScoringService scoringService,
|
||||
SignalsSealedModeMonitor sealedModeMonitor,
|
||||
CancellationToken cancellationToken) =>
|
||||
{
|
||||
if (!Program.TryAuthorize(context, SignalsPolicies.Read, options.Authority.AllowAnonymousFallback, out var authFailure))
|
||||
{
|
||||
return authFailure ?? Results.Unauthorized();
|
||||
}
|
||||
|
||||
if (!Program.TryEnsureSealedMode(sealedModeMonitor, out var sealedFailure))
|
||||
{
|
||||
return sealedFailure ?? Results.StatusCode(StatusCodes.Status503ServiceUnavailable);
|
||||
}
|
||||
|
||||
if (string.IsNullOrWhiteSpace(id))
|
||||
{
|
||||
return Results.BadRequest(new { error = "id is required." });
|
||||
}
|
||||
|
||||
var unknown = await repository.GetByIdAsync(id.Trim(), cancellationToken).ConfigureAwait(false);
|
||||
if (unknown is null)
|
||||
{
|
||||
return Results.NotFound(new { error = $"Unknown with id '{id}' not found." });
|
||||
}
|
||||
|
||||
return Results.Ok(new
|
||||
{
|
||||
id = unknown.Id,
|
||||
subjectKey = unknown.SubjectKey,
|
||||
band = unknown.Band.ToString().ToLowerInvariant(),
|
||||
score = unknown.Score,
|
||||
normalizationTrace = unknown.NormalizationTrace,
|
||||
flags = unknown.Flags,
|
||||
nextScheduledRescan = unknown.NextScheduledRescan,
|
||||
rescanAttempts = unknown.RescanAttempts,
|
||||
createdAt = unknown.CreatedAt,
|
||||
updatedAt = unknown.UpdatedAt
|
||||
});
|
||||
}).WithName("SignalsUnknownsExplain");
|
||||
|
||||
signalsGroup.MapPost("/reachability/recompute", async Task<IResult> (
|
||||
HttpContext context,
|
||||
SignalsOptions options,
|
||||
|
||||
@@ -330,20 +330,6 @@ internal sealed class CallgraphIngestionService : ICallgraphIngestionService
|
||||
return ordered.ToString();
|
||||
}
|
||||
|
||||
private static string JoinDict(IReadOnlyDictionary<string, string>? values)
|
||||
{
|
||||
if (values is null)
|
||||
{
|
||||
return string.Empty;
|
||||
}
|
||||
|
||||
var ordered = new StringBuilder();
|
||||
foreach (var kv in values.OrderBy(k => k.Key, StringComparer.Ordinal))
|
||||
{
|
||||
ordered.Append(kv.Key).Append('=').Append(kv.Value).Append(';');
|
||||
}
|
||||
return ordered.ToString();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -0,0 +1,64 @@
|
||||
using StellaOps.Signals.Models;
|
||||
|
||||
namespace StellaOps.Signals.Services;
|
||||
|
||||
/// <summary>
|
||||
/// Orchestrates rescan operations for unknowns.
|
||||
/// </summary>
|
||||
public interface IRescanOrchestrator
|
||||
{
|
||||
/// <summary>
|
||||
/// Triggers a rescan for a single unknown item.
|
||||
/// </summary>
|
||||
Task<RescanResult> TriggerRescanAsync(
|
||||
UnknownSymbolDocument unknown,
|
||||
RescanPriority priority,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Triggers a batch rescan for multiple unknown items.
|
||||
/// </summary>
|
||||
Task<BatchRescanResult> TriggerBatchRescanAsync(
|
||||
IReadOnlyList<UnknownSymbolDocument> unknowns,
|
||||
RescanPriority priority,
|
||||
CancellationToken cancellationToken = default);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Priority level for rescan operations.
|
||||
/// </summary>
|
||||
public enum RescanPriority
|
||||
{
|
||||
/// <summary>
|
||||
/// Immediate processing for HOT items.
|
||||
/// </summary>
|
||||
Immediate,
|
||||
|
||||
/// <summary>
|
||||
/// Scheduled processing for WARM items.
|
||||
/// </summary>
|
||||
Scheduled,
|
||||
|
||||
/// <summary>
|
||||
/// Batch processing for COLD items.
|
||||
/// </summary>
|
||||
Batch
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Result of a single rescan operation.
|
||||
/// </summary>
|
||||
public sealed record RescanResult(
|
||||
string UnknownId,
|
||||
bool Success,
|
||||
string? ErrorMessage = null,
|
||||
DateTimeOffset? NextScheduledRescan = null);
|
||||
|
||||
/// <summary>
|
||||
/// Result of a batch rescan operation.
|
||||
/// </summary>
|
||||
public sealed record BatchRescanResult(
|
||||
int TotalRequested,
|
||||
int SuccessCount,
|
||||
int FailureCount,
|
||||
IReadOnlyList<RescanResult> Results);
|
||||
@@ -0,0 +1,85 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Signals.Models;
|
||||
|
||||
namespace StellaOps.Signals.Services;
|
||||
|
||||
/// <summary>
|
||||
/// Logging-only implementation of <see cref="IRescanOrchestrator"/>.
|
||||
/// Placeholder until actual rescan integration is implemented.
|
||||
/// </summary>
|
||||
public sealed class LoggingRescanOrchestrator : IRescanOrchestrator
|
||||
{
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly ILogger<LoggingRescanOrchestrator> _logger;
|
||||
|
||||
public LoggingRescanOrchestrator(
|
||||
TimeProvider timeProvider,
|
||||
ILogger<LoggingRescanOrchestrator> logger)
|
||||
{
|
||||
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public Task<RescanResult> TriggerRescanAsync(
|
||||
UnknownSymbolDocument unknown,
|
||||
RescanPriority priority,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(unknown);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Rescan triggered for unknown {UnknownId} with priority {Priority} (Band={Band}, Score={Score:F2})",
|
||||
unknown.Id,
|
||||
priority,
|
||||
unknown.Band,
|
||||
unknown.Score);
|
||||
|
||||
// Calculate next rescan time based on priority
|
||||
var nextRescan = priority switch
|
||||
{
|
||||
RescanPriority.Immediate => _timeProvider.GetUtcNow().AddMinutes(15),
|
||||
RescanPriority.Scheduled => _timeProvider.GetUtcNow().AddHours(24),
|
||||
_ => _timeProvider.GetUtcNow().AddDays(7)
|
||||
};
|
||||
|
||||
return Task.FromResult(new RescanResult(
|
||||
unknown.Id,
|
||||
Success: true,
|
||||
NextScheduledRescan: nextRescan));
|
||||
}
|
||||
|
||||
public Task<BatchRescanResult> TriggerBatchRescanAsync(
|
||||
IReadOnlyList<UnknownSymbolDocument> unknowns,
|
||||
RescanPriority priority,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(unknowns);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Batch rescan triggered for {Count} unknowns with priority {Priority}",
|
||||
unknowns.Count,
|
||||
priority);
|
||||
|
||||
var results = new List<RescanResult>(unknowns.Count);
|
||||
var nextRescan = priority switch
|
||||
{
|
||||
RescanPriority.Immediate => _timeProvider.GetUtcNow().AddMinutes(15),
|
||||
RescanPriority.Scheduled => _timeProvider.GetUtcNow().AddHours(24),
|
||||
_ => _timeProvider.GetUtcNow().AddDays(7)
|
||||
};
|
||||
|
||||
foreach (var unknown in unknowns)
|
||||
{
|
||||
results.Add(new RescanResult(
|
||||
unknown.Id,
|
||||
Success: true,
|
||||
NextScheduledRescan: nextRescan));
|
||||
}
|
||||
|
||||
return Task.FromResult(new BatchRescanResult(
|
||||
TotalRequested: unknowns.Count,
|
||||
SuccessCount: unknowns.Count,
|
||||
FailureCount: 0,
|
||||
Results: results));
|
||||
}
|
||||
}
|
||||
107
src/Signals/StellaOps.Signals/Services/UnknownsRescanMetrics.cs
Normal file
107
src/Signals/StellaOps.Signals/Services/UnknownsRescanMetrics.cs
Normal file
@@ -0,0 +1,107 @@
|
||||
using System.Diagnostics.Metrics;
|
||||
|
||||
namespace StellaOps.Signals.Services;
|
||||
|
||||
/// <summary>
|
||||
/// Metrics for unknowns rescan operations and band distribution.
|
||||
/// </summary>
|
||||
internal static class UnknownsRescanMetrics
|
||||
{
|
||||
private static readonly Meter Meter = new("StellaOps.Signals.Rescan", "1.0.0");
|
||||
|
||||
// ===== RESCAN COUNTERS =====
|
||||
|
||||
public static readonly Counter<long> RescansTriggered = Meter.CreateCounter<long>(
|
||||
"stellaops_unknowns_rescans_triggered_total",
|
||||
description: "Total rescans triggered by band");
|
||||
|
||||
public static readonly Counter<long> RescansSucceeded = Meter.CreateCounter<long>(
|
||||
"stellaops_unknowns_rescans_succeeded_total",
|
||||
description: "Total successful rescans by band");
|
||||
|
||||
public static readonly Counter<long> RescansFailed = Meter.CreateCounter<long>(
|
||||
"stellaops_unknowns_rescans_failed_total",
|
||||
description: "Total failed rescans by band");
|
||||
|
||||
// ===== BATCH COUNTERS =====
|
||||
|
||||
public static readonly Counter<long> HotBatchesProcessed = Meter.CreateCounter<long>(
|
||||
"stellaops_unknowns_hot_batches_processed_total",
|
||||
description: "Total HOT band batch processing cycles");
|
||||
|
||||
public static readonly Counter<long> WarmBatchesProcessed = Meter.CreateCounter<long>(
|
||||
"stellaops_unknowns_warm_batches_processed_total",
|
||||
description: "Total WARM band batch processing cycles");
|
||||
|
||||
public static readonly Counter<long> ColdBatchesProcessed = Meter.CreateCounter<long>(
|
||||
"stellaops_unknowns_cold_batches_processed_total",
|
||||
description: "Total COLD band weekly batch runs");
|
||||
|
||||
// ===== TIMING HISTOGRAMS =====
|
||||
|
||||
public static readonly Histogram<double> RescanDurationSeconds = Meter.CreateHistogram<double>(
|
||||
"stellaops_unknowns_rescan_duration_seconds",
|
||||
unit: "s",
|
||||
description: "Duration of individual rescan operations");
|
||||
|
||||
public static readonly Histogram<double> BatchDurationSeconds = Meter.CreateHistogram<double>(
|
||||
"stellaops_unknowns_rescan_batch_duration_seconds",
|
||||
unit: "s",
|
||||
description: "Duration of rescan batch cycles");
|
||||
|
||||
// ===== BAND DISTRIBUTION =====
|
||||
|
||||
public static readonly ObservableGauge<int> HotCount = Meter.CreateObservableGauge(
|
||||
"stellaops_unknowns_band_hot_count",
|
||||
() => _hotCount,
|
||||
description: "Current count of HOT band unknowns");
|
||||
|
||||
public static readonly ObservableGauge<int> WarmCount = Meter.CreateObservableGauge(
|
||||
"stellaops_unknowns_band_warm_count",
|
||||
() => _warmCount,
|
||||
description: "Current count of WARM band unknowns");
|
||||
|
||||
public static readonly ObservableGauge<int> ColdCount = Meter.CreateObservableGauge(
|
||||
"stellaops_unknowns_band_cold_count",
|
||||
() => _coldCount,
|
||||
description: "Current count of COLD band unknowns");
|
||||
|
||||
// Band distribution state (updated by scoring service)
|
||||
private static int _hotCount;
|
||||
private static int _warmCount;
|
||||
private static int _coldCount;
|
||||
|
||||
/// <summary>
|
||||
/// Updates the band distribution gauges.
|
||||
/// </summary>
|
||||
public static void SetBandDistribution(int hot, int warm, int cold)
|
||||
{
|
||||
Interlocked.Exchange(ref _hotCount, hot);
|
||||
Interlocked.Exchange(ref _warmCount, warm);
|
||||
Interlocked.Exchange(ref _coldCount, cold);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Records a rescan trigger with band tag.
|
||||
/// </summary>
|
||||
public static void RecordRescanTriggered(string band)
|
||||
{
|
||||
RescansTriggered.Add(1, new KeyValuePair<string, object?>("band", band));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Records a successful rescan with band tag.
|
||||
/// </summary>
|
||||
public static void RecordRescanSuccess(string band)
|
||||
{
|
||||
RescansSucceeded.Add(1, new KeyValuePair<string, object?>("band", band));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Records a failed rescan with band tag.
|
||||
/// </summary>
|
||||
public static void RecordRescanFailure(string band)
|
||||
{
|
||||
RescansFailed.Add(1, new KeyValuePair<string, object?>("band", band));
|
||||
}
|
||||
}
|
||||
263
src/Signals/StellaOps.Signals/Services/UnknownsRescanWorker.cs
Normal file
263
src/Signals/StellaOps.Signals/Services/UnknownsRescanWorker.cs
Normal file
@@ -0,0 +1,263 @@
|
||||
using System.Diagnostics;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using StellaOps.Signals.Models;
|
||||
using StellaOps.Signals.Options;
|
||||
using StellaOps.Signals.Persistence;
|
||||
|
||||
namespace StellaOps.Signals.Services;
|
||||
|
||||
/// <summary>
|
||||
/// Background worker that processes unknowns rescans based on band scheduling.
|
||||
/// HOT items are processed immediately, WARM items on schedule, COLD items in weekly batches.
|
||||
/// </summary>
|
||||
public sealed class UnknownsRescanWorker : BackgroundService
|
||||
{
|
||||
private readonly IUnknownsRepository _repository;
|
||||
private readonly IRescanOrchestrator _orchestrator;
|
||||
private readonly IOptions<UnknownsRescanOptions> _options;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly ILogger<UnknownsRescanWorker> _logger;
|
||||
|
||||
public UnknownsRescanWorker(
|
||||
IUnknownsRepository repository,
|
||||
IRescanOrchestrator orchestrator,
|
||||
IOptions<UnknownsRescanOptions> options,
|
||||
TimeProvider timeProvider,
|
||||
ILogger<UnknownsRescanWorker> logger)
|
||||
{
|
||||
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
|
||||
_orchestrator = orchestrator ?? throw new ArgumentNullException(nameof(orchestrator));
|
||||
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
var opts = _options.Value;
|
||||
|
||||
if (!opts.Enabled)
|
||||
{
|
||||
_logger.LogInformation("Unknowns rescan worker is disabled.");
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.LogInformation(
|
||||
"Unknowns rescan worker started. Poll interval: {PollInterval}",
|
||||
opts.PollInterval);
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await ProcessHotBandAsync(opts, stoppingToken).ConfigureAwait(false);
|
||||
await ProcessWarmBandAsync(opts, stoppingToken).ConfigureAwait(false);
|
||||
await ProcessColdBandAsync(opts, stoppingToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in unknowns rescan worker cycle.");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await Task.Delay(opts.PollInterval, _timeProvider, stoppingToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
_logger.LogInformation("Unknowns rescan worker stopping.");
|
||||
}
|
||||
|
||||
private async Task ProcessHotBandAsync(UnknownsRescanOptions opts, CancellationToken cancellationToken)
|
||||
{
|
||||
var sw = Stopwatch.StartNew();
|
||||
var hotItems = await _repository.GetDueForRescanAsync(
|
||||
UnknownsBand.Hot,
|
||||
opts.HotBatchSize,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (hotItems.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.LogInformation(
|
||||
"Processing {Count} HOT unknowns for immediate rescan.",
|
||||
hotItems.Count);
|
||||
|
||||
foreach (var item in hotItems)
|
||||
{
|
||||
UnknownsRescanMetrics.RecordRescanTriggered("hot");
|
||||
try
|
||||
{
|
||||
var result = await _orchestrator.TriggerRescanAsync(
|
||||
item,
|
||||
RescanPriority.Immediate,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (result.Success)
|
||||
{
|
||||
UnknownsRescanMetrics.RecordRescanSuccess("hot");
|
||||
_logger.LogDebug(
|
||||
"HOT unknown {UnknownId} rescan triggered successfully.",
|
||||
item.Id);
|
||||
}
|
||||
else
|
||||
{
|
||||
UnknownsRescanMetrics.RecordRescanFailure("hot");
|
||||
_logger.LogWarning(
|
||||
"HOT unknown {UnknownId} rescan failed: {Error}",
|
||||
item.Id,
|
||||
result.ErrorMessage);
|
||||
}
|
||||
}
|
||||
catch (Exception ex) when (ex is not OperationCanceledException)
|
||||
{
|
||||
UnknownsRescanMetrics.RecordRescanFailure("hot");
|
||||
_logger.LogError(ex, "Failed to trigger rescan for HOT unknown {UnknownId}.", item.Id);
|
||||
}
|
||||
}
|
||||
|
||||
sw.Stop();
|
||||
UnknownsRescanMetrics.HotBatchesProcessed.Add(1);
|
||||
UnknownsRescanMetrics.BatchDurationSeconds.Record(sw.Elapsed.TotalSeconds, new KeyValuePair<string, object?>("band", "hot"));
|
||||
}
|
||||
|
||||
private async Task ProcessWarmBandAsync(UnknownsRescanOptions opts, CancellationToken cancellationToken)
|
||||
{
|
||||
var sw = Stopwatch.StartNew();
|
||||
var warmItems = await _repository.GetDueForRescanAsync(
|
||||
UnknownsBand.Warm,
|
||||
opts.WarmBatchSize,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (warmItems.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.LogInformation(
|
||||
"Processing {Count} WARM unknowns for scheduled rescan.",
|
||||
warmItems.Count);
|
||||
|
||||
foreach (var item in warmItems)
|
||||
{
|
||||
UnknownsRescanMetrics.RecordRescanTriggered("warm");
|
||||
try
|
||||
{
|
||||
var result = await _orchestrator.TriggerRescanAsync(
|
||||
item,
|
||||
RescanPriority.Scheduled,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (result.Success)
|
||||
{
|
||||
UnknownsRescanMetrics.RecordRescanSuccess("warm");
|
||||
_logger.LogDebug(
|
||||
"WARM unknown {UnknownId} rescan scheduled.",
|
||||
item.Id);
|
||||
}
|
||||
else
|
||||
{
|
||||
UnknownsRescanMetrics.RecordRescanFailure("warm");
|
||||
_logger.LogWarning(
|
||||
"WARM unknown {UnknownId} rescan scheduling failed: {Error}",
|
||||
item.Id,
|
||||
result.ErrorMessage);
|
||||
}
|
||||
}
|
||||
catch (Exception ex) when (ex is not OperationCanceledException)
|
||||
{
|
||||
UnknownsRescanMetrics.RecordRescanFailure("warm");
|
||||
_logger.LogError(ex, "Failed to schedule rescan for WARM unknown {UnknownId}.", item.Id);
|
||||
}
|
||||
}
|
||||
|
||||
sw.Stop();
|
||||
UnknownsRescanMetrics.WarmBatchesProcessed.Add(1);
|
||||
UnknownsRescanMetrics.BatchDurationSeconds.Record(sw.Elapsed.TotalSeconds, new KeyValuePair<string, object?>("band", "warm"));
|
||||
}
|
||||
|
||||
private async Task ProcessColdBandAsync(UnknownsRescanOptions opts, CancellationToken cancellationToken)
|
||||
{
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
|
||||
// COLD items are processed in weekly batches on the configured day and hour
|
||||
if (now.DayOfWeek != opts.ColdBatchDay)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// Only process once per day during the configured hour
|
||||
if (now.Hour != opts.ColdBatchHourUtc)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var sw = Stopwatch.StartNew();
|
||||
var coldItems = await _repository.GetDueForRescanAsync(
|
||||
UnknownsBand.Cold,
|
||||
opts.ColdBatchSize,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (coldItems.Count == 0)
|
||||
{
|
||||
_logger.LogDebug("No COLD unknowns due for weekly batch processing.");
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.LogInformation(
|
||||
"Processing weekly COLD batch: {Count} unknowns.",
|
||||
coldItems.Count);
|
||||
|
||||
try
|
||||
{
|
||||
foreach (var item in coldItems)
|
||||
{
|
||||
UnknownsRescanMetrics.RecordRescanTriggered("cold");
|
||||
}
|
||||
|
||||
var result = await _orchestrator.TriggerBatchRescanAsync(
|
||||
coldItems,
|
||||
RescanPriority.Batch,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// Record success/failure metrics
|
||||
for (var i = 0; i < result.SuccessCount; i++)
|
||||
{
|
||||
UnknownsRescanMetrics.RecordRescanSuccess("cold");
|
||||
}
|
||||
for (var i = 0; i < result.FailureCount; i++)
|
||||
{
|
||||
UnknownsRescanMetrics.RecordRescanFailure("cold");
|
||||
}
|
||||
|
||||
_logger.LogInformation(
|
||||
"COLD batch completed: {Success}/{Total} succeeded.",
|
||||
result.SuccessCount,
|
||||
result.TotalRequested);
|
||||
}
|
||||
catch (Exception ex) when (ex is not OperationCanceledException)
|
||||
{
|
||||
foreach (var item in coldItems)
|
||||
{
|
||||
UnknownsRescanMetrics.RecordRescanFailure("cold");
|
||||
}
|
||||
_logger.LogError(ex, "Failed to process COLD batch rescan.");
|
||||
}
|
||||
|
||||
sw.Stop();
|
||||
UnknownsRescanMetrics.ColdBatchesProcessed.Add(1);
|
||||
UnknownsRescanMetrics.BatchDurationSeconds.Record(sw.Elapsed.TotalSeconds, new KeyValuePair<string, object?>("band", "cold"));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user