nuget reorganization
This commit is contained in:
@@ -0,0 +1,13 @@
|
||||
using System;
|
||||
|
||||
namespace StellaOps.Signals.Models;
|
||||
|
||||
public sealed record ReachabilityFactUpdatedEvent(
|
||||
string Version,
|
||||
string SubjectKey,
|
||||
string? CallgraphId,
|
||||
DateTimeOffset OccurredAtUtc,
|
||||
int ReachableCount,
|
||||
int UnreachableCount,
|
||||
int RuntimeFactsCount,
|
||||
DateTimeOffset ComputedAtUtc);
|
||||
@@ -4,6 +4,12 @@ namespace StellaOps.Signals.Options;
|
||||
|
||||
public sealed class SignalsAirGapOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// Optional override for the reachability fact update event topic when running in air-gap mode.
|
||||
/// If not set, defaults to <c>signals.fact.updated</c>.
|
||||
/// </summary>
|
||||
public string? EventTopic { get; set; }
|
||||
|
||||
public SignalsSealedModeOptions SealedMode { get; } = new();
|
||||
|
||||
public void Validate()
|
||||
|
||||
29
src/Signals/StellaOps.Signals/Options/SignalsCacheOptions.cs
Normal file
29
src/Signals/StellaOps.Signals/Options/SignalsCacheOptions.cs
Normal file
@@ -0,0 +1,29 @@
|
||||
using System;
|
||||
|
||||
namespace StellaOps.Signals.Options;
|
||||
|
||||
public sealed class SignalsCacheOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// Redis connection string (e.g., "localhost:6379").
|
||||
/// </summary>
|
||||
public string ConnectionString { get; set; } = "localhost:6379";
|
||||
|
||||
/// <summary>
|
||||
/// Default time-to-live for cached reachability facts.
|
||||
/// </summary>
|
||||
public int DefaultTtlSeconds { get; set; } = 600;
|
||||
|
||||
public void Validate()
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(ConnectionString))
|
||||
{
|
||||
throw new ArgumentException("Cache connection string is required.", nameof(ConnectionString));
|
||||
}
|
||||
|
||||
if (DefaultTtlSeconds <= 0)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(nameof(DefaultTtlSeconds), "Default TTL must be greater than zero.");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -34,6 +34,11 @@ public sealed class SignalsOptions
|
||||
/// Reachability scoring configuration.
|
||||
/// </summary>
|
||||
public SignalsScoringOptions Scoring { get; } = new();
|
||||
|
||||
/// <summary>
|
||||
/// Cache configuration.
|
||||
/// </summary>
|
||||
public SignalsCacheOptions Cache { get; } = new();
|
||||
|
||||
/// <summary>
|
||||
/// Validates configured options.
|
||||
@@ -45,5 +50,6 @@ public sealed class SignalsOptions
|
||||
Storage.Validate();
|
||||
AirGap.Validate();
|
||||
Scoring.Validate();
|
||||
Cache.Validate();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,16 +9,16 @@ using StellaOps.Auth.Abstractions;
|
||||
using StellaOps.Auth.ServerIntegration;
|
||||
using StellaOps.Configuration;
|
||||
using StellaOps.Signals.Authentication;
|
||||
using StellaOps.Signals.Hosting;
|
||||
using StellaOps.Signals.Models;
|
||||
using StellaOps.Signals.Options;
|
||||
using StellaOps.Signals.Parsing;
|
||||
using StellaOps.Signals.Hosting;
|
||||
using StellaOps.Signals.Models;
|
||||
using StellaOps.Signals.Options;
|
||||
using StellaOps.Signals.Parsing;
|
||||
using StellaOps.Signals.Persistence;
|
||||
using StellaOps.Signals.Routing;
|
||||
using StellaOps.Signals.Services;
|
||||
using StellaOps.Signals.Storage;
|
||||
|
||||
var builder = WebApplication.CreateBuilder(args);
|
||||
|
||||
var builder = WebApplication.CreateBuilder(args);
|
||||
|
||||
builder.Configuration.AddStellaOpsDefaults(options =>
|
||||
{
|
||||
@@ -122,7 +122,19 @@ builder.Services.AddSingleton<ICallgraphParser>(new SimpleJsonCallgraphParser("p
|
||||
builder.Services.AddSingleton<ICallgraphParser>(new SimpleJsonCallgraphParser("go"));
|
||||
builder.Services.AddSingleton<ICallgraphParserResolver, CallgraphParserResolver>();
|
||||
builder.Services.AddSingleton<ICallgraphIngestionService, CallgraphIngestionService>();
|
||||
builder.Services.AddSingleton<IReachabilityFactRepository, MongoReachabilityFactRepository>();
|
||||
builder.Services.AddSingleton<MongoReachabilityFactRepository>();
|
||||
builder.Services.AddSingleton<IReachabilityCache>(sp =>
|
||||
{
|
||||
var options = sp.GetRequiredService<IOptions<SignalsOptions>>().Value;
|
||||
return new RedisReachabilityCache(options.Cache);
|
||||
});
|
||||
builder.Services.AddSingleton<IEventsPublisher, InMemoryEventsPublisher>();
|
||||
builder.Services.AddSingleton<IReachabilityFactRepository>(sp =>
|
||||
{
|
||||
var inner = sp.GetRequiredService<MongoReachabilityFactRepository>();
|
||||
var cache = sp.GetRequiredService<IReachabilityCache>();
|
||||
return new ReachabilityFactCacheDecorator(inner, cache);
|
||||
});
|
||||
builder.Services.AddSingleton<IReachabilityScoringService, ReachabilityScoringService>();
|
||||
builder.Services.AddSingleton<IRuntimeFactsIngestionService, RuntimeFactsIngestionService>();
|
||||
|
||||
|
||||
@@ -2,3 +2,4 @@ using System.Runtime.CompilerServices;
|
||||
|
||||
[assembly: InternalsVisibleTo("StellaOps.Signals.Reachability.Tests")]
|
||||
[assembly: InternalsVisibleTo("StellaOps.ScannerSignals.IntegrationTests")]
|
||||
[assembly: InternalsVisibleTo("StellaOps.Signals.Tests")]
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace StellaOps.Signals.Services;
|
||||
|
||||
public interface IEventsPublisher
|
||||
{
|
||||
Task PublishFactUpdatedAsync(Models.ReachabilityFactDocument fact, CancellationToken cancellationToken);
|
||||
}
|
||||
14
src/Signals/StellaOps.Signals/Services/IReachabilityCache.cs
Normal file
14
src/Signals/StellaOps.Signals/Services/IReachabilityCache.cs
Normal file
@@ -0,0 +1,14 @@
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using StellaOps.Signals.Models;
|
||||
|
||||
namespace StellaOps.Signals.Services;
|
||||
|
||||
public interface IReachabilityCache
|
||||
{
|
||||
Task<ReachabilityFactDocument?> GetAsync(string subjectKey, CancellationToken cancellationToken);
|
||||
|
||||
Task SetAsync(ReachabilityFactDocument document, CancellationToken cancellationToken);
|
||||
|
||||
Task InvalidateAsync(string subjectKey, CancellationToken cancellationToken);
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
using System;
|
||||
using System.Linq;
|
||||
using System.Text.Json;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Signals.Models;
|
||||
using StellaOps.Signals.Options;
|
||||
|
||||
namespace StellaOps.Signals.Services;
|
||||
|
||||
/// <summary>
|
||||
/// Placeholder events publisher; replace with real bus integration when contracts are finalized.
|
||||
/// </summary>
|
||||
internal sealed class InMemoryEventsPublisher : IEventsPublisher
|
||||
{
|
||||
private readonly ILogger<InMemoryEventsPublisher> logger;
|
||||
private readonly string topic;
|
||||
|
||||
public InMemoryEventsPublisher(ILogger<InMemoryEventsPublisher> logger, SignalsOptions options)
|
||||
{
|
||||
this.logger = logger;
|
||||
topic = string.IsNullOrWhiteSpace(options?.AirGap?.EventTopic)
|
||||
? "signals.fact.updated"
|
||||
: options!.AirGap.EventTopic!;
|
||||
}
|
||||
|
||||
public Task PublishFactUpdatedAsync(ReachabilityFactDocument fact, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(fact);
|
||||
|
||||
var (reachable, unreachable) = CountStates(fact);
|
||||
var runtimeFactsCount = fact.RuntimeFacts?.Count ?? 0;
|
||||
var payload = new ReachabilityFactUpdatedEvent(
|
||||
Version: "signals.fact.updated@v1",
|
||||
SubjectKey: fact.SubjectKey,
|
||||
CallgraphId: string.IsNullOrWhiteSpace(fact.CallgraphId) ? null : fact.CallgraphId,
|
||||
OccurredAtUtc: DateTimeOffset.UtcNow,
|
||||
ReachableCount: reachable,
|
||||
UnreachableCount: unreachable,
|
||||
RuntimeFactsCount: runtimeFactsCount,
|
||||
ComputedAtUtc: fact.ComputedAt);
|
||||
|
||||
var json = JsonSerializer.Serialize(payload, new JsonSerializerOptions(JsonSerializerDefaults.Web));
|
||||
logger.LogInformation("{Topic} {Payload}", topic, json);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private static (int reachable, int unreachable) CountStates(ReachabilityFactDocument fact)
|
||||
{
|
||||
if (fact.States is null || fact.States.Count == 0)
|
||||
{
|
||||
return (0, 0);
|
||||
}
|
||||
|
||||
var reachable = fact.States.Count(state => state.Reachable);
|
||||
var unreachable = fact.States.Count - reachable;
|
||||
return (reachable, unreachable);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using StellaOps.Signals.Models;
|
||||
using StellaOps.Signals.Persistence;
|
||||
|
||||
namespace StellaOps.Signals.Services;
|
||||
|
||||
/// <summary>
|
||||
/// Decorator that adds cache lookups/updates to the reachability fact repository.
|
||||
/// </summary>
|
||||
internal sealed class ReachabilityFactCacheDecorator : IReachabilityFactRepository
|
||||
{
|
||||
private readonly IReachabilityFactRepository inner;
|
||||
private readonly IReachabilityCache cache;
|
||||
|
||||
public ReachabilityFactCacheDecorator(IReachabilityFactRepository inner, IReachabilityCache cache)
|
||||
{
|
||||
this.inner = inner ?? throw new ArgumentNullException(nameof(inner));
|
||||
this.cache = cache ?? throw new ArgumentNullException(nameof(cache));
|
||||
}
|
||||
|
||||
public async Task<ReachabilityFactDocument?> GetBySubjectAsync(string subjectKey, CancellationToken cancellationToken)
|
||||
{
|
||||
var cached = await cache.GetAsync(subjectKey, cancellationToken).ConfigureAwait(false);
|
||||
if (cached != null)
|
||||
{
|
||||
return cached;
|
||||
}
|
||||
|
||||
var document = await inner.GetBySubjectAsync(subjectKey, cancellationToken).ConfigureAwait(false);
|
||||
if (document != null)
|
||||
{
|
||||
await cache.SetAsync(document, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
return document;
|
||||
}
|
||||
|
||||
public async Task<ReachabilityFactDocument> UpsertAsync(ReachabilityFactDocument document, CancellationToken cancellationToken)
|
||||
{
|
||||
var result = await inner.UpsertAsync(document, cancellationToken).ConfigureAwait(false);
|
||||
await cache.SetAsync(result, cancellationToken).ConfigureAwait(false);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -17,6 +17,8 @@ public sealed class ReachabilityScoringService : IReachabilityScoringService
|
||||
private readonly IReachabilityFactRepository factRepository;
|
||||
private readonly TimeProvider timeProvider;
|
||||
private readonly SignalsScoringOptions scoringOptions;
|
||||
private readonly IReachabilityCache cache;
|
||||
private readonly IEventsPublisher eventsPublisher;
|
||||
private readonly ILogger<ReachabilityScoringService> logger;
|
||||
|
||||
public ReachabilityScoringService(
|
||||
@@ -24,12 +26,16 @@ public sealed class ReachabilityScoringService : IReachabilityScoringService
|
||||
IReachabilityFactRepository factRepository,
|
||||
TimeProvider timeProvider,
|
||||
IOptions<SignalsOptions> options,
|
||||
IReachabilityCache cache,
|
||||
IEventsPublisher eventsPublisher,
|
||||
ILogger<ReachabilityScoringService> logger)
|
||||
{
|
||||
this.callgraphRepository = callgraphRepository ?? throw new ArgumentNullException(nameof(callgraphRepository));
|
||||
this.factRepository = factRepository ?? throw new ArgumentNullException(nameof(factRepository));
|
||||
this.timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
this.scoringOptions = options?.Value?.Scoring ?? throw new ArgumentNullException(nameof(options));
|
||||
this.cache = cache ?? throw new ArgumentNullException(nameof(cache));
|
||||
this.eventsPublisher = eventsPublisher ?? throw new ArgumentNullException(nameof(eventsPublisher));
|
||||
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
@@ -126,7 +132,10 @@ public sealed class ReachabilityScoringService : IReachabilityScoringService
|
||||
};
|
||||
|
||||
logger.LogInformation("Computed reachability fact for subject {SubjectKey} with {StateCount} targets.", document.SubjectKey, states.Count);
|
||||
return await factRepository.UpsertAsync(document, cancellationToken).ConfigureAwait(false);
|
||||
var persisted = await factRepository.UpsertAsync(document, cancellationToken).ConfigureAwait(false);
|
||||
await cache.SetAsync(persisted, cancellationToken).ConfigureAwait(false);
|
||||
await eventsPublisher.PublishFactUpdatedAsync(persisted, cancellationToken).ConfigureAwait(false);
|
||||
return persisted;
|
||||
}
|
||||
|
||||
private static void ValidateRequest(ReachabilityRecomputeRequest request)
|
||||
|
||||
@@ -0,0 +1,76 @@
|
||||
using System;
|
||||
using System.Text.Json;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using StackExchange.Redis;
|
||||
using StellaOps.Signals.Models;
|
||||
using StellaOps.Signals.Options;
|
||||
|
||||
namespace StellaOps.Signals.Services;
|
||||
|
||||
internal sealed class RedisReachabilityCache : IReachabilityCache, IDisposable
|
||||
{
|
||||
private readonly SignalsCacheOptions options;
|
||||
private readonly ConnectionMultiplexer multiplexer;
|
||||
private readonly IDatabase database;
|
||||
private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web)
|
||||
{
|
||||
WriteIndented = false
|
||||
};
|
||||
|
||||
public RedisReachabilityCache(SignalsCacheOptions options)
|
||||
{
|
||||
this.options = options ?? throw new ArgumentNullException(nameof(options));
|
||||
multiplexer = ConnectionMultiplexer.Connect(options.ConnectionString);
|
||||
database = multiplexer.GetDatabase();
|
||||
}
|
||||
|
||||
public async Task<ReachabilityFactDocument?> GetAsync(string subjectKey, CancellationToken cancellationToken)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(subjectKey))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var value = await database.StringGetAsync(GetCacheKey(subjectKey)).ConfigureAwait(false);
|
||||
if (value.IsNullOrEmpty)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
// Explicitly pick the string overload to avoid ambiguity between span/string overloads on RedisValue
|
||||
return JsonSerializer.Deserialize<ReachabilityFactDocument>(value!.ToString(), SerializerOptions);
|
||||
}
|
||||
|
||||
public async Task SetAsync(ReachabilityFactDocument document, CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(document);
|
||||
if (string.IsNullOrWhiteSpace(document.SubjectKey))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var json = JsonSerializer.Serialize(document, SerializerOptions);
|
||||
await database.StringSetAsync(
|
||||
GetCacheKey(document.SubjectKey),
|
||||
json,
|
||||
TimeSpan.FromSeconds(options.DefaultTtlSeconds)).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task InvalidateAsync(string subjectKey, CancellationToken cancellationToken)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(subjectKey))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await database.KeyDeleteAsync(GetCacheKey(subjectKey)).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
multiplexer?.Dispose();
|
||||
}
|
||||
|
||||
private static string GetCacheKey(string subjectKey) => $"reachability_cache:{subjectKey}";
|
||||
}
|
||||
@@ -14,17 +14,23 @@ public sealed class RuntimeFactsIngestionService : IRuntimeFactsIngestionService
|
||||
{
|
||||
private readonly IReachabilityFactRepository factRepository;
|
||||
private readonly TimeProvider timeProvider;
|
||||
private readonly IReachabilityCache cache;
|
||||
private readonly IEventsPublisher eventsPublisher;
|
||||
private readonly IReachabilityScoringService scoringService;
|
||||
private readonly ILogger<RuntimeFactsIngestionService> logger;
|
||||
|
||||
public RuntimeFactsIngestionService(
|
||||
IReachabilityFactRepository factRepository,
|
||||
TimeProvider timeProvider,
|
||||
IReachabilityCache cache,
|
||||
IEventsPublisher eventsPublisher,
|
||||
IReachabilityScoringService scoringService,
|
||||
ILogger<RuntimeFactsIngestionService> logger)
|
||||
{
|
||||
this.factRepository = factRepository ?? throw new ArgumentNullException(nameof(factRepository));
|
||||
this.timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
this.cache = cache ?? throw new ArgumentNullException(nameof(cache));
|
||||
this.eventsPublisher = eventsPublisher ?? throw new ArgumentNullException(nameof(eventsPublisher));
|
||||
this.scoringService = scoringService ?? throw new ArgumentNullException(nameof(scoringService));
|
||||
this.logger = logger ?? NullLogger<RuntimeFactsIngestionService>.Instance;
|
||||
}
|
||||
@@ -57,6 +63,8 @@ public sealed class RuntimeFactsIngestionService : IRuntimeFactsIngestionService
|
||||
document.Metadata["provenance.callgraphId"] = request.CallgraphId;
|
||||
|
||||
var persisted = await factRepository.UpsertAsync(document, cancellationToken).ConfigureAwait(false);
|
||||
await cache.SetAsync(persisted, cancellationToken).ConfigureAwait(false);
|
||||
await eventsPublisher.PublishFactUpdatedAsync(persisted, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await RecomputeReachabilityAsync(persisted, aggregated, request, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="MongoDB.Driver" Version="2.24.0" />
|
||||
<PackageReference Include="StackExchange.Redis" Version="2.7.33" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
@@ -18,4 +19,4 @@
|
||||
<ProjectReference Include="../../Authority/StellaOps.Authority/StellaOps.Auth.Abstractions/StellaOps.Auth.Abstractions.csproj" />
|
||||
<ProjectReference Include="../../Authority/StellaOps.Authority/StellaOps.Auth.ServerIntegration/StellaOps.Auth.ServerIntegration.csproj" />
|
||||
</ItemGroup>
|
||||
</Project>
|
||||
</Project>
|
||||
|
||||
Reference in New Issue
Block a user