up
This commit is contained in:
@@ -5,5 +5,5 @@ namespace StellaOps.Signals.Services;
|
||||
|
||||
public interface IEventsPublisher
|
||||
{
|
||||
Task PublishFactUpdatedAsync(Models.ReachabilityFactDocument fact, CancellationToken cancellationToken);
|
||||
Task PublishFactUpdatedAsync(global::StellaOps.Signals.Models.ReachabilityFactDocument fact, CancellationToken cancellationToken);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using StellaOps.Signals.Services.Models;
|
||||
|
||||
namespace StellaOps.Signals.Services;
|
||||
|
||||
/// <summary>
|
||||
/// Ingests runtime+static union bundles and normalizes them into the reachability CAS layout.
|
||||
/// </summary>
|
||||
public interface IReachabilityUnionIngestionService
|
||||
{
|
||||
Task<ReachabilityUnionIngestResponse> IngestAsync(string analysisId, Stream zipStream, CancellationToken cancellationToken);
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using StellaOps.Signals.Models;
|
||||
|
||||
namespace StellaOps.Signals.Services;
|
||||
|
||||
public interface IUnknownsIngestionService
|
||||
{
|
||||
Task<UnknownsIngestResponse> IngestAsync(UnknownsIngestRequest request, CancellationToken cancellationToken);
|
||||
}
|
||||
@@ -31,6 +31,17 @@ internal sealed class InMemoryEventsPublisher : IEventsPublisher
|
||||
|
||||
var (reachable, unreachable) = CountStates(fact);
|
||||
var runtimeFactsCount = fact.RuntimeFacts?.Count ?? 0;
|
||||
var avgConfidence = fact.States.Count > 0 ? fact.States.Average(s => s.Confidence) : 0;
|
||||
var score = fact.Score;
|
||||
var unknownsCount = fact.UnknownsCount;
|
||||
var unknownsPressure = fact.UnknownsPressure;
|
||||
var topBucket = fact.States.Count > 0
|
||||
? fact.States
|
||||
.GroupBy(s => s.Bucket, StringComparer.OrdinalIgnoreCase)
|
||||
.OrderByDescending(g => g.Count())
|
||||
.ThenByDescending(g => g.Average(s => s.Weight))
|
||||
.First()
|
||||
: null;
|
||||
var payload = new ReachabilityFactUpdatedEvent(
|
||||
Version: "signals.fact.updated@v1",
|
||||
SubjectKey: fact.SubjectKey,
|
||||
@@ -39,7 +50,15 @@ internal sealed class InMemoryEventsPublisher : IEventsPublisher
|
||||
ReachableCount: reachable,
|
||||
UnreachableCount: unreachable,
|
||||
RuntimeFactsCount: runtimeFactsCount,
|
||||
ComputedAtUtc: fact.ComputedAt);
|
||||
Bucket: topBucket?.Key ?? "unknown",
|
||||
Weight: topBucket?.Average(s => s.Weight) ?? 0,
|
||||
StateCount: fact.States.Count,
|
||||
FactScore: score,
|
||||
UnknownsCount: unknownsCount,
|
||||
UnknownsPressure: unknownsPressure,
|
||||
AverageConfidence: avgConfidence,
|
||||
ComputedAtUtc: fact.ComputedAt,
|
||||
Targets: fact.States.Select(s => s.Target).ToArray());
|
||||
|
||||
var json = JsonSerializer.Serialize(payload, new JsonSerializerOptions(JsonSerializerDefaults.Web));
|
||||
logger.LogInformation("{Topic} {Payload}", topic, json);
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace StellaOps.Signals.Services.Models;
|
||||
|
||||
public sealed record ReachabilityUnionIngestResponse(
|
||||
string AnalysisId,
|
||||
string CasRoot,
|
||||
IReadOnlyList<ReachabilityUnionFile> Files);
|
||||
|
||||
public sealed record ReachabilityUnionFile(
|
||||
string Path,
|
||||
string Sha256,
|
||||
int? Records);
|
||||
@@ -18,6 +18,7 @@ public sealed class ReachabilityScoringService : IReachabilityScoringService
|
||||
private readonly TimeProvider timeProvider;
|
||||
private readonly SignalsScoringOptions scoringOptions;
|
||||
private readonly IReachabilityCache cache;
|
||||
private readonly IUnknownsRepository unknownsRepository;
|
||||
private readonly IEventsPublisher eventsPublisher;
|
||||
private readonly ILogger<ReachabilityScoringService> logger;
|
||||
|
||||
@@ -27,6 +28,7 @@ public sealed class ReachabilityScoringService : IReachabilityScoringService
|
||||
TimeProvider timeProvider,
|
||||
IOptions<SignalsOptions> options,
|
||||
IReachabilityCache cache,
|
||||
IUnknownsRepository unknownsRepository,
|
||||
IEventsPublisher eventsPublisher,
|
||||
ILogger<ReachabilityScoringService> logger)
|
||||
{
|
||||
@@ -35,6 +37,7 @@ public sealed class ReachabilityScoringService : IReachabilityScoringService
|
||||
this.timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
this.scoringOptions = options?.Value?.Scoring ?? throw new ArgumentNullException(nameof(options));
|
||||
this.cache = cache ?? throw new ArgumentNullException(nameof(cache));
|
||||
this.unknownsRepository = unknownsRepository ?? throw new ArgumentNullException(nameof(unknownsRepository));
|
||||
this.eventsPublisher = eventsPublisher ?? throw new ArgumentNullException(nameof(eventsPublisher));
|
||||
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
@@ -94,22 +97,25 @@ public sealed class ReachabilityScoringService : IReachabilityScoringService
|
||||
{
|
||||
var path = FindPath(entryPoints, target, graph.Adjacency);
|
||||
var reachable = path is not null;
|
||||
var confidence = reachable ? scoringOptions.ReachableConfidence : scoringOptions.UnreachableConfidence;
|
||||
var runtimeEvidence = runtimeHits.Where(hit => path?.Contains(hit, StringComparer.Ordinal) == true).ToList();
|
||||
|
||||
var runtimeEvidence = runtimeHits.Where(hit => path?.Contains(hit, StringComparer.Ordinal) == true)
|
||||
.ToList();
|
||||
if (runtimeEvidence.Count > 0)
|
||||
{
|
||||
confidence = Math.Min(scoringOptions.MaxConfidence, confidence + scoringOptions.RuntimeBonus);
|
||||
}
|
||||
var (bucket, weight, confidence) = ComputeScores(
|
||||
reachable,
|
||||
entryPoints,
|
||||
target,
|
||||
path,
|
||||
runtimeEvidence.Count);
|
||||
|
||||
confidence = Math.Clamp(confidence, scoringOptions.MinConfidence, scoringOptions.MaxConfidence);
|
||||
var score = confidence * weight;
|
||||
|
||||
states.Add(new ReachabilityStateDocument
|
||||
{
|
||||
Target = target,
|
||||
Reachable = reachable,
|
||||
Confidence = confidence,
|
||||
Bucket = bucket,
|
||||
Weight = weight,
|
||||
Score = score,
|
||||
Path = path ?? new List<string>(),
|
||||
Evidence = new ReachabilityEvidenceDocument
|
||||
{
|
||||
@@ -119,6 +125,14 @@ public sealed class ReachabilityScoringService : IReachabilityScoringService
|
||||
});
|
||||
}
|
||||
|
||||
var baseScore = states.Count > 0 ? states.Average(s => s.Score) : 0;
|
||||
var unknownsCount = await unknownsRepository.CountBySubjectAsync(subjectKey, cancellationToken).ConfigureAwait(false);
|
||||
var pressure = states.Count + unknownsCount == 0
|
||||
? 0
|
||||
: Math.Min(1.0, Math.Max(0.0, unknownsCount / (double)(states.Count + unknownsCount)));
|
||||
var pressurePenalty = Math.Min(scoringOptions.UnknownsPenaltyCeiling, pressure);
|
||||
var finalScore = baseScore * (1 - pressurePenalty);
|
||||
|
||||
var document = new ReachabilityFactDocument
|
||||
{
|
||||
CallgraphId = request.CallgraphId,
|
||||
@@ -126,6 +140,9 @@ public sealed class ReachabilityScoringService : IReachabilityScoringService
|
||||
EntryPoints = entryPoints,
|
||||
States = states,
|
||||
Metadata = request.Metadata,
|
||||
Score = finalScore,
|
||||
UnknownsCount = unknownsCount,
|
||||
UnknownsPressure = pressure,
|
||||
ComputedAt = timeProvider.GetUtcNow(),
|
||||
SubjectKey = subjectKey,
|
||||
RuntimeFacts = existingFact?.RuntimeFacts
|
||||
@@ -278,6 +295,51 @@ public sealed class ReachabilityScoringService : IReachabilityScoringService
|
||||
return path;
|
||||
}
|
||||
|
||||
private (string bucket, double weight, double confidence) ComputeScores(
|
||||
bool reachable,
|
||||
List<string> entryPoints,
|
||||
string target,
|
||||
List<string>? path,
|
||||
int runtimeEvidenceCount)
|
||||
{
|
||||
var bucket = "unknown";
|
||||
if (!reachable)
|
||||
{
|
||||
bucket = "unreachable";
|
||||
}
|
||||
else if (entryPoints.Contains(target, StringComparer.Ordinal))
|
||||
{
|
||||
bucket = "entrypoint";
|
||||
}
|
||||
else if (runtimeEvidenceCount > 0)
|
||||
{
|
||||
bucket = "runtime";
|
||||
}
|
||||
else if (path is not null && path.Count <= 2)
|
||||
{
|
||||
bucket = "direct";
|
||||
}
|
||||
else
|
||||
{
|
||||
bucket = "unknown";
|
||||
}
|
||||
|
||||
var weight = scoringOptions.ReachabilityBuckets.TryGetValue(bucket, out var w)
|
||||
? w
|
||||
: scoringOptions.ReachabilityBuckets.TryGetValue("unknown", out var unknown)
|
||||
? unknown
|
||||
: 1.0;
|
||||
|
||||
var confidence = reachable ? scoringOptions.ReachableConfidence : scoringOptions.UnreachableConfidence;
|
||||
if (runtimeEvidenceCount > 0 && reachable)
|
||||
{
|
||||
confidence = Math.Min(scoringOptions.MaxConfidence, confidence + scoringOptions.RuntimeBonus);
|
||||
}
|
||||
|
||||
confidence = Math.Clamp(confidence, scoringOptions.MinConfidence, scoringOptions.MaxConfidence);
|
||||
return (bucket, weight, confidence);
|
||||
}
|
||||
|
||||
private sealed record ReachabilityGraph(
|
||||
HashSet<string> Nodes,
|
||||
Dictionary<string, HashSet<string>> Adjacency,
|
||||
|
||||
@@ -0,0 +1,126 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.IO.Compression;
|
||||
using System.Linq;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text.Json;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using StellaOps.Signals.Models;
|
||||
using StellaOps.Signals.Options;
|
||||
using StellaOps.Signals.Services.Models;
|
||||
|
||||
namespace StellaOps.Signals.Services;
|
||||
|
||||
/// <summary>
|
||||
/// Writes reachability union bundles (runtime + static) into the CAS layout: reachability_graphs/<analysisId>/
|
||||
/// Validates meta.json hashes before persisting.
|
||||
/// </summary>
|
||||
public sealed class ReachabilityUnionIngestionService : IReachabilityUnionIngestionService
|
||||
{
|
||||
private static readonly string[] RequiredFiles = { "nodes.ndjson", "edges.ndjson", "meta.json" };
|
||||
|
||||
private readonly ILogger<ReachabilityUnionIngestionService> logger;
|
||||
private readonly SignalsOptions options;
|
||||
|
||||
public ReachabilityUnionIngestionService(
|
||||
ILogger<ReachabilityUnionIngestionService> logger,
|
||||
IOptions<SignalsOptions> options)
|
||||
{
|
||||
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
this.options = options?.Value ?? throw new ArgumentNullException(nameof(options));
|
||||
}
|
||||
|
||||
public async Task<ReachabilityUnionIngestResponse> IngestAsync(string analysisId, Stream zipStream, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(analysisId);
|
||||
ArgumentNullException.ThrowIfNull(zipStream);
|
||||
|
||||
var casRoot = Path.Combine(options.Storage.RootPath, "reachability_graphs", analysisId.Trim());
|
||||
if (Directory.Exists(casRoot))
|
||||
{
|
||||
Directory.Delete(casRoot, recursive: true);
|
||||
}
|
||||
|
||||
Directory.CreateDirectory(casRoot);
|
||||
|
||||
using var archive = new ZipArchive(zipStream, ZipArchiveMode.Read, leaveOpen: true);
|
||||
|
||||
var entries = archive.Entries.ToDictionary(e => e.FullName, StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
foreach (var required in RequiredFiles)
|
||||
{
|
||||
if (!entries.ContainsKey(required))
|
||||
{
|
||||
throw new InvalidOperationException($"Union bundle missing required file: {required}");
|
||||
}
|
||||
}
|
||||
|
||||
var metaEntry = entries["meta.json"];
|
||||
using var metaStream = metaEntry.Open();
|
||||
using var metaDoc = await JsonDocument.ParseAsync(metaStream, cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||
var metaRoot = metaDoc.RootElement;
|
||||
|
||||
var filesElement = metaRoot.TryGetProperty("files", out var f) && f.ValueKind == JsonValueKind.Array
|
||||
? f
|
||||
: throw new InvalidOperationException("meta.json is missing required 'files' array");
|
||||
|
||||
var recorded = filesElement.EnumerateArray()
|
||||
.Select(el => new
|
||||
{
|
||||
Path = el.GetProperty("path").GetString() ?? string.Empty,
|
||||
Sha = el.GetProperty("sha256").GetString() ?? string.Empty,
|
||||
Records = el.TryGetProperty("records", out var r) && r.ValueKind == JsonValueKind.Number ? r.GetInt32() : (int?)null
|
||||
})
|
||||
.ToList();
|
||||
|
||||
var filesForResponse = new List<ReachabilityUnionFile>();
|
||||
|
||||
foreach (var file in recorded)
|
||||
{
|
||||
if (!entries.TryGetValue(file.Path, out var zipEntry))
|
||||
{
|
||||
throw new InvalidOperationException($"meta.json references missing file '{file.Path}'.");
|
||||
}
|
||||
|
||||
var destPath = Path.Combine(casRoot, file.Path.Replace('/', Path.DirectorySeparatorChar));
|
||||
Directory.CreateDirectory(Path.GetDirectoryName(destPath)!);
|
||||
|
||||
using (var entryStream = zipEntry.Open())
|
||||
using (var dest = File.Create(destPath))
|
||||
{
|
||||
await entryStream.CopyToAsync(dest, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
var actualSha = ComputeSha256Hex(destPath);
|
||||
if (!string.Equals(actualSha, file.Sha, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
throw new InvalidOperationException($"SHA mismatch for {file.Path}: expected {file.Sha}, actual {actualSha}.");
|
||||
}
|
||||
|
||||
filesForResponse.Add(new ReachabilityUnionFile(file.Path, actualSha, file.Records));
|
||||
}
|
||||
|
||||
logger.LogInformation("Ingested reachability union bundle {AnalysisId} with {FileCount} files.", analysisId, filesForResponse.Count);
|
||||
|
||||
return new ReachabilityUnionIngestResponse(analysisId, $"cas://reachability_graphs/{analysisId}", filesForResponse);
|
||||
}
|
||||
|
||||
private static string ComputeSha256Hex(string path)
|
||||
{
|
||||
using var stream = File.OpenRead(path);
|
||||
var buffer = new byte[8192];
|
||||
using var sha = SHA256.Create();
|
||||
int read;
|
||||
while ((read = stream.Read(buffer, 0, buffer.Length)) > 0)
|
||||
{
|
||||
sha.TransformBlock(buffer, 0, read, null, 0);
|
||||
}
|
||||
|
||||
sha.TransformFinalBlock(Array.Empty<byte>(), 0, 0);
|
||||
return Convert.ToHexString(sha.Hash!).ToLowerInvariant();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Signals.Models;
|
||||
using StellaOps.Signals.Persistence;
|
||||
|
||||
namespace StellaOps.Signals.Services;
|
||||
|
||||
internal sealed class UnknownsIngestionService : IUnknownsIngestionService
|
||||
{
|
||||
private readonly IUnknownsRepository repository;
|
||||
private readonly TimeProvider timeProvider;
|
||||
private readonly ILogger<UnknownsIngestionService> logger;
|
||||
|
||||
public UnknownsIngestionService(IUnknownsRepository repository, TimeProvider timeProvider, ILogger<UnknownsIngestionService> logger)
|
||||
{
|
||||
this.repository = repository ?? throw new ArgumentNullException(nameof(repository));
|
||||
this.timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
public async Task<UnknownsIngestResponse> IngestAsync(UnknownsIngestRequest request, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
|
||||
if (request.Subject is null)
|
||||
{
|
||||
throw new UnknownsValidationException("Subject is required.");
|
||||
}
|
||||
|
||||
if (string.IsNullOrWhiteSpace(request.CallgraphId))
|
||||
{
|
||||
throw new UnknownsValidationException("callgraphId is required.");
|
||||
}
|
||||
|
||||
if (request.Unknowns is null || request.Unknowns.Count == 0)
|
||||
{
|
||||
throw new UnknownsValidationException("Unknowns list must not be empty.");
|
||||
}
|
||||
|
||||
var subjectKey = request.Subject.ToSubjectKey();
|
||||
if (string.IsNullOrWhiteSpace(subjectKey))
|
||||
{
|
||||
throw new UnknownsValidationException("Subject must include scanId, imageDigest, or component/version.");
|
||||
}
|
||||
|
||||
var now = timeProvider.GetUtcNow();
|
||||
var normalized = new List<UnknownSymbolDocument>();
|
||||
|
||||
foreach (var entry in request.Unknowns)
|
||||
{
|
||||
if (entry is null)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var hasContent = !(string.IsNullOrWhiteSpace(entry.SymbolId)
|
||||
&& string.IsNullOrWhiteSpace(entry.CodeId)
|
||||
&& string.IsNullOrWhiteSpace(entry.Purl)
|
||||
&& string.IsNullOrWhiteSpace(entry.EdgeFrom)
|
||||
&& string.IsNullOrWhiteSpace(entry.EdgeTo));
|
||||
|
||||
if (!hasContent)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
normalized.Add(new UnknownSymbolDocument
|
||||
{
|
||||
SubjectKey = subjectKey,
|
||||
CallgraphId = request.CallgraphId,
|
||||
SymbolId = entry.SymbolId?.Trim(),
|
||||
CodeId = entry.CodeId?.Trim(),
|
||||
Purl = entry.Purl?.Trim(),
|
||||
EdgeFrom = entry.EdgeFrom?.Trim(),
|
||||
EdgeTo = entry.EdgeTo?.Trim(),
|
||||
Reason = entry.Reason?.Trim(),
|
||||
CreatedAt = now
|
||||
});
|
||||
}
|
||||
|
||||
if (normalized.Count == 0)
|
||||
{
|
||||
throw new UnknownsValidationException("Unknown entries must include at least one symbolId, codeId, purl, or edge.");
|
||||
}
|
||||
|
||||
await repository.UpsertAsync(subjectKey, normalized, cancellationToken).ConfigureAwait(false);
|
||||
logger.LogInformation("Stored {Count} unknown symbols for subject {SubjectKey}", normalized.Count, subjectKey);
|
||||
|
||||
return new UnknownsIngestResponse
|
||||
{
|
||||
SubjectKey = subjectKey,
|
||||
UnknownsCount = normalized.Count
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
using System;
|
||||
|
||||
namespace StellaOps.Signals.Services;
|
||||
|
||||
public sealed class UnknownsValidationException : Exception
|
||||
{
|
||||
public UnknownsValidationException(string message) : base(message)
|
||||
{
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user