Add call graph fixtures for various languages and scenarios
Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Export Center CI / export-ci (push) Has been cancelled
Findings Ledger CI / build-test (push) Has been cancelled
Findings Ledger CI / migration-validation (push) Has been cancelled
Findings Ledger CI / generate-manifest (push) Has been cancelled
Lighthouse CI / Lighthouse Audit (push) Has been cancelled
Lighthouse CI / Axe Accessibility Audit (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled
Reachability Corpus Validation / validate-corpus (push) Has been cancelled
Reachability Corpus Validation / validate-ground-truths (push) Has been cancelled
Scanner Analyzers / Discover Analyzers (push) Has been cancelled
Scanner Analyzers / Validate Test Fixtures (push) Has been cancelled
Signals CI & Image / signals-ci (push) Has been cancelled
Signals Reachability Scoring & Events / reachability-smoke (push) Has been cancelled
Reachability Corpus Validation / determinism-check (push) Has been cancelled
Scanner Analyzers / Build Analyzers (push) Has been cancelled
Scanner Analyzers / Test Language Analyzers (push) Has been cancelled
Scanner Analyzers / Verify Deterministic Output (push) Has been cancelled
Signals Reachability Scoring & Events / sign-and-upload (push) Has been cancelled

- Introduced `all-edge-reasons.json` to test edge resolution reasons in .NET.
- Added `all-visibility-levels.json` to validate method visibility levels in .NET.
- Created `dotnet-aspnetcore-minimal.json` for a minimal ASP.NET Core application.
- Included `go-gin-api.json` for a Go Gin API application structure.
- Added `java-spring-boot.json` for the Spring PetClinic application in Java.
- Introduced `legacy-no-schema.json` for legacy application structure without schema.
- Created `node-express-api.json` for an Express.js API application structure.
This commit is contained in:
master
2025-12-16 10:44:24 +02:00
parent 4391f35d8a
commit 5a480a3c2a
223 changed files with 19367 additions and 727 deletions

View File

@@ -0,0 +1,74 @@
namespace StellaOps.Orchestrator.Core.Domain;
/// <summary>
/// Represents the first meaningful signal for a job/run.
/// </summary>
public sealed record FirstSignal
{
public required string Version { get; init; } = "1.0";
public required string SignalId { get; init; }
public required Guid JobId { get; init; }
public required DateTimeOffset Timestamp { get; init; }
public required FirstSignalKind Kind { get; init; }
public required FirstSignalPhase Phase { get; init; }
public required FirstSignalScope Scope { get; init; }
public required string Summary { get; init; }
public int? EtaSeconds { get; init; }
public LastKnownOutcome? LastKnownOutcome { get; init; }
public IReadOnlyList<NextAction>? NextActions { get; init; }
public required FirstSignalDiagnostics Diagnostics { get; init; }
}
public enum FirstSignalKind
{
Queued,
Started,
Phase,
Blocked,
Failed,
Succeeded,
Canceled,
Unavailable
}
public enum FirstSignalPhase
{
Resolve,
Fetch,
Restore,
Analyze,
Policy,
Report,
Unknown
}
public sealed record FirstSignalScope
{
public required string Type { get; init; } // "repo" | "image" | "artifact"
public required string Id { get; init; }
}
public sealed record LastKnownOutcome
{
public required string SignatureId { get; init; }
public string? ErrorCode { get; init; }
public required string Token { get; init; }
public string? Excerpt { get; init; }
public required string Confidence { get; init; } // "low" | "medium" | "high"
public required DateTimeOffset FirstSeenAt { get; init; }
public required int HitCount { get; init; }
}
public sealed record NextAction
{
public required string Type { get; init; } // "open_logs" | "open_job" | "docs" | "retry" | "cli_command"
public required string Label { get; init; }
public required string Target { get; init; }
}
public sealed record FirstSignalDiagnostics
{
public required bool CacheHit { get; init; }
public required string Source { get; init; } // "snapshot" | "failure_index" | "cold_start"
public required string CorrelationId { get; init; }
}

View File

@@ -0,0 +1,37 @@
namespace StellaOps.Orchestrator.Core.Repositories;
public interface IFirstSignalSnapshotRepository
{
Task<FirstSignalSnapshot?> GetByRunIdAsync(
string tenantId,
Guid runId,
CancellationToken cancellationToken = default);
Task UpsertAsync(
FirstSignalSnapshot snapshot,
CancellationToken cancellationToken = default);
Task DeleteByRunIdAsync(
string tenantId,
Guid runId,
CancellationToken cancellationToken = default);
}
public sealed record FirstSignalSnapshot
{
public required string TenantId { get; init; }
public required Guid RunId { get; init; }
public required Guid JobId { get; init; }
public required DateTimeOffset CreatedAt { get; init; }
public required DateTimeOffset UpdatedAt { get; init; }
public required string Kind { get; init; }
public required string Phase { get; init; }
public required string Summary { get; init; }
public int? EtaSeconds { get; init; }
public string? LastKnownOutcomeJson { get; init; }
public string? NextActionsJson { get; init; }
public required string DiagnosticsJson { get; init; }
public required string SignalJson { get; init; }
}

View File

@@ -0,0 +1,50 @@
using StellaOps.Orchestrator.Core.Domain;
namespace StellaOps.Orchestrator.Core.Services;
public interface IFirstSignalService
{
/// <summary>
/// Gets the first signal for a run, checking cache first.
/// </summary>
Task<FirstSignalResult> GetFirstSignalAsync(
Guid runId,
string tenantId,
string? ifNoneMatch = null,
CancellationToken cancellationToken = default);
/// <summary>
/// Updates the first signal snapshot for a run and invalidates any cached copies.
/// </summary>
Task UpdateSnapshotAsync(
Guid runId,
string tenantId,
FirstSignal signal,
CancellationToken cancellationToken = default);
/// <summary>
/// Invalidates cached first signal for a run.
/// </summary>
Task InvalidateCacheAsync(
Guid runId,
string tenantId,
CancellationToken cancellationToken = default);
}
public sealed record FirstSignalResult
{
public required FirstSignalResultStatus Status { get; init; }
public FirstSignal? Signal { get; init; }
public string? ETag { get; init; }
public bool CacheHit { get; init; }
public string? Source { get; init; }
}
public enum FirstSignalResultStatus
{
Found,
NotModified,
NotFound,
NotAvailable,
Error
}

View File

@@ -0,0 +1,149 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Messaging;
using StellaOps.Messaging.Abstractions;
using StellaOps.Orchestrator.Core.Domain;
using StellaOps.Orchestrator.Infrastructure.Options;
namespace StellaOps.Orchestrator.Infrastructure.Caching;
public interface IFirstSignalCache
{
string ProviderName { get; }
ValueTask<CacheResult<FirstSignalCacheEntry>> GetAsync(
string tenantId,
Guid runId,
CancellationToken cancellationToken = default);
ValueTask SetAsync(
string tenantId,
Guid runId,
FirstSignalCacheEntry entry,
CancellationToken cancellationToken = default);
ValueTask<bool> InvalidateAsync(
string tenantId,
Guid runId,
CancellationToken cancellationToken = default);
}
public sealed record FirstSignalCacheEntry
{
public required FirstSignal Signal { get; init; }
public required string ETag { get; init; }
public required string Origin { get; init; }
}
public sealed class FirstSignalCache : IFirstSignalCache
{
private readonly IDistributedCache<FirstSignalCacheEntry>? _cache;
private readonly FirstSignalCacheOptions _options;
private readonly ILogger<FirstSignalCache> _logger;
public FirstSignalCache(
IOptions<FirstSignalOptions> options,
ILogger<FirstSignalCache> logger,
IDistributedCacheFactory? cacheFactory = null)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value.Cache ?? new FirstSignalCacheOptions();
var configuredBackend = _options.Backend?.Trim().ToLowerInvariant();
if (configuredBackend == "none")
{
ProviderName = "none";
return;
}
if (cacheFactory is null)
{
ProviderName = "none";
return;
}
try
{
ProviderName = cacheFactory.ProviderName;
if (!string.IsNullOrWhiteSpace(configuredBackend) &&
!string.Equals(configuredBackend, ProviderName, StringComparison.OrdinalIgnoreCase))
{
_logger.LogWarning(
"FirstSignal cache backend is configured as {ConfiguredBackend} but active cache provider is {ProviderName}.",
configuredBackend,
ProviderName);
}
_cache = cacheFactory.Create<FirstSignalCacheEntry>(new CacheOptions
{
KeyPrefix = _options.KeyPrefix,
DefaultTtl = TimeSpan.FromSeconds(_options.TtlSeconds),
SlidingExpiration = _options.SlidingExpiration
});
}
catch (Exception ex)
{
ProviderName = "none";
_logger.LogWarning(ex, "Failed to initialize distributed cache; disabling first-signal caching.");
}
}
public string ProviderName { get; }
public async ValueTask<CacheResult<FirstSignalCacheEntry>> GetAsync(
string tenantId,
Guid runId,
CancellationToken cancellationToken = default)
{
if (_cache is null)
{
return CacheResult<FirstSignalCacheEntry>.Miss();
}
var key = BuildKey(tenantId, runId);
return await _cache.GetAsync(key, cancellationToken).ConfigureAwait(false);
}
public async ValueTask SetAsync(
string tenantId,
Guid runId,
FirstSignalCacheEntry entry,
CancellationToken cancellationToken = default)
{
if (_cache is null)
{
return;
}
ArgumentNullException.ThrowIfNull(entry);
var key = BuildKey(tenantId, runId);
await _cache.SetAsync(key, entry, null, cancellationToken).ConfigureAwait(false);
}
public async ValueTask<bool> InvalidateAsync(
string tenantId,
Guid runId,
CancellationToken cancellationToken = default)
{
if (_cache is null)
{
return false;
}
var key = BuildKey(tenantId, runId);
return await _cache.InvalidateAsync(key, cancellationToken).ConfigureAwait(false);
}
private static string BuildKey(string tenantId, Guid runId)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
if (runId == Guid.Empty)
{
throw new ArgumentException("Run ID must be a non-empty GUID.", nameof(runId));
}
return $"tenant:{tenantId.Trim()}:signal:run:{runId:D}";
}
}

View File

@@ -0,0 +1,32 @@
namespace StellaOps.Orchestrator.Infrastructure.Options;
public sealed class FirstSignalOptions
{
public const string SectionName = "FirstSignal";
public FirstSignalCacheOptions Cache { get; set; } = new();
public FirstSignalColdPathOptions ColdPath { get; set; } = new();
public FirstSignalSnapshotWriterOptions SnapshotWriter { get; set; } = new();
}
public sealed class FirstSignalCacheOptions
{
public string Backend { get; set; } = "inmemory"; // inmemory | valkey | postgres | none
public int TtlSeconds { get; set; } = 86400;
public bool SlidingExpiration { get; set; } = true;
public string KeyPrefix { get; set; } = "orchestrator:first_signal:";
}
public sealed class FirstSignalColdPathOptions
{
public int TimeoutMs { get; set; } = 3000;
}
public sealed class FirstSignalSnapshotWriterOptions
{
public bool Enabled { get; set; }
public string? TenantId { get; set; }
public int PollIntervalSeconds { get; set; } = 10;
public int MaxRunsPerTick { get; set; } = 50;
public int LookbackMinutes { get; set; } = 60;
}

View File

@@ -0,0 +1,171 @@
using Microsoft.Extensions.Logging;
using Npgsql;
using NpgsqlTypes;
using StellaOps.Orchestrator.Core.Repositories;
namespace StellaOps.Orchestrator.Infrastructure.Postgres;
public sealed class PostgresFirstSignalSnapshotRepository : IFirstSignalSnapshotRepository
{
private const string SelectColumns = """
tenant_id, run_id, job_id, created_at, updated_at,
kind, phase, summary, eta_seconds,
last_known_outcome, next_actions, diagnostics, signal_json
""";
private const string SelectByRunIdSql = $"""
SELECT {SelectColumns}
FROM first_signal_snapshots
WHERE tenant_id = @tenant_id AND run_id = @run_id
LIMIT 1
""";
private const string DeleteByRunIdSql = """
DELETE FROM first_signal_snapshots
WHERE tenant_id = @tenant_id AND run_id = @run_id
""";
private const string UpsertSql = """
INSERT INTO first_signal_snapshots (
tenant_id, run_id, job_id, created_at, updated_at,
kind, phase, summary, eta_seconds,
last_known_outcome, next_actions, diagnostics, signal_json)
VALUES (
@tenant_id, @run_id, @job_id, @created_at, @updated_at,
@kind, @phase, @summary, @eta_seconds,
@last_known_outcome, @next_actions, @diagnostics, @signal_json)
ON CONFLICT (tenant_id, run_id) DO UPDATE SET
job_id = EXCLUDED.job_id,
updated_at = EXCLUDED.updated_at,
kind = EXCLUDED.kind,
phase = EXCLUDED.phase,
summary = EXCLUDED.summary,
eta_seconds = EXCLUDED.eta_seconds,
last_known_outcome = EXCLUDED.last_known_outcome,
next_actions = EXCLUDED.next_actions,
diagnostics = EXCLUDED.diagnostics,
signal_json = EXCLUDED.signal_json
""";
private readonly OrchestratorDataSource _dataSource;
private readonly ILogger<PostgresFirstSignalSnapshotRepository> _logger;
public PostgresFirstSignalSnapshotRepository(
OrchestratorDataSource dataSource,
ILogger<PostgresFirstSignalSnapshotRepository> logger)
{
_dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<FirstSignalSnapshot?> GetByRunIdAsync(string tenantId, Guid runId, CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
if (runId == Guid.Empty)
{
throw new ArgumentException("Run ID must be a non-empty GUID.", nameof(runId));
}
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "reader", cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(SelectByRunIdSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", tenantId);
command.Parameters.AddWithValue("run_id", runId);
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
return null;
}
return MapSnapshot(reader);
}
public async Task UpsertAsync(FirstSignalSnapshot snapshot, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(snapshot);
ArgumentException.ThrowIfNullOrWhiteSpace(snapshot.TenantId);
if (snapshot.RunId == Guid.Empty)
{
throw new ArgumentException("Run ID must be a non-empty GUID.", nameof(snapshot));
}
await using var connection = await _dataSource.OpenConnectionAsync(snapshot.TenantId, "writer", cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(UpsertSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", snapshot.TenantId);
command.Parameters.AddWithValue("run_id", snapshot.RunId);
command.Parameters.AddWithValue("job_id", snapshot.JobId);
command.Parameters.AddWithValue("created_at", snapshot.CreatedAt);
command.Parameters.AddWithValue("updated_at", snapshot.UpdatedAt);
command.Parameters.AddWithValue("kind", snapshot.Kind);
command.Parameters.AddWithValue("phase", snapshot.Phase);
command.Parameters.AddWithValue("summary", snapshot.Summary);
command.Parameters.AddWithValue("eta_seconds", (object?)snapshot.EtaSeconds ?? DBNull.Value);
command.Parameters.Add(new NpgsqlParameter("last_known_outcome", NpgsqlDbType.Jsonb)
{
Value = (object?)snapshot.LastKnownOutcomeJson ?? DBNull.Value
});
command.Parameters.Add(new NpgsqlParameter("next_actions", NpgsqlDbType.Jsonb)
{
Value = (object?)snapshot.NextActionsJson ?? DBNull.Value
});
command.Parameters.Add(new NpgsqlParameter("diagnostics", NpgsqlDbType.Jsonb)
{
Value = snapshot.DiagnosticsJson
});
command.Parameters.Add(new NpgsqlParameter("signal_json", NpgsqlDbType.Jsonb)
{
Value = snapshot.SignalJson
});
try
{
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
catch (PostgresException ex)
{
_logger.LogError(ex, "Failed to upsert first signal snapshot for tenant {TenantId} run {RunId}.", snapshot.TenantId, snapshot.RunId);
throw;
}
}
public async Task DeleteByRunIdAsync(string tenantId, Guid runId, CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
if (runId == Guid.Empty)
{
throw new ArgumentException("Run ID must be a non-empty GUID.", nameof(runId));
}
await using var connection = await _dataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken).ConfigureAwait(false);
await using var command = new NpgsqlCommand(DeleteByRunIdSql, connection);
command.CommandTimeout = _dataSource.CommandTimeoutSeconds;
command.Parameters.AddWithValue("tenant_id", tenantId);
command.Parameters.AddWithValue("run_id", runId);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
private static FirstSignalSnapshot MapSnapshot(NpgsqlDataReader reader)
{
return new FirstSignalSnapshot
{
TenantId = reader.GetString(0),
RunId = reader.GetGuid(1),
JobId = reader.GetGuid(2),
CreatedAt = reader.GetFieldValue<DateTimeOffset>(3),
UpdatedAt = reader.GetFieldValue<DateTimeOffset>(4),
Kind = reader.GetString(5),
Phase = reader.GetString(6),
Summary = reader.GetString(7),
EtaSeconds = reader.IsDBNull(8) ? null : reader.GetInt32(8),
LastKnownOutcomeJson = reader.IsDBNull(9) ? null : reader.GetString(9),
NextActionsJson = reader.IsDBNull(10) ? null : reader.GetString(10),
DiagnosticsJson = reader.GetString(11),
SignalJson = reader.GetString(12),
};
}
}

View File

@@ -2,11 +2,14 @@ using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using StellaOps.Orchestrator.Core.Backfill;
using StellaOps.Orchestrator.Core.Observability;
using StellaOps.Orchestrator.Core.Repositories;
using StellaOps.Orchestrator.Infrastructure.Ledger;
using StellaOps.Orchestrator.Infrastructure.Observability;
using StellaOps.Orchestrator.Infrastructure.Caching;
using StellaOps.Orchestrator.Infrastructure.Options;
using StellaOps.Orchestrator.Infrastructure.Postgres;
using StellaOps.Orchestrator.Infrastructure.Repositories;
using StellaOps.Orchestrator.Infrastructure.Services;
namespace StellaOps.Orchestrator.Infrastructure;
@@ -44,6 +47,7 @@ public static class ServiceCollectionExtensions
services.AddScoped<IPackRunRepository, PostgresPackRunRepository>();
services.AddScoped<IPackRunLogRepository, PostgresPackRunLogRepository>();
services.AddScoped<IPackRegistryRepository, PostgresPackRegistryRepository>();
services.AddScoped<IFirstSignalSnapshotRepository, PostgresFirstSignalSnapshotRepository>();
// Register audit and ledger repositories
services.AddScoped<IAuditRepository, PostgresAuditRepository>();
@@ -67,6 +71,11 @@ public static class ServiceCollectionExtensions
services.AddSingleton(incidentModeOptions);
services.AddSingleton<IIncidentModeHooks, IncidentModeHooks>();
// First signal (TTFS) services
services.Configure<FirstSignalOptions>(configuration.GetSection(FirstSignalOptions.SectionName));
services.AddSingleton<IFirstSignalCache, FirstSignalCache>();
services.AddScoped<StellaOps.Orchestrator.Core.Services.IFirstSignalService, FirstSignalService>();
return services;
}
}

View File

@@ -0,0 +1,571 @@
using System.Diagnostics;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Orchestrator.Core.Domain;
using StellaOps.Orchestrator.Core.Hashing;
using StellaOps.Orchestrator.Core.Repositories;
using StellaOps.Orchestrator.Infrastructure.Caching;
using StellaOps.Orchestrator.Infrastructure.Options;
using StellaOps.Orchestrator.Infrastructure.Repositories;
using StellaOps.Telemetry.Core;
using CoreServices = StellaOps.Orchestrator.Core.Services;
namespace StellaOps.Orchestrator.Infrastructure.Services;
public sealed class FirstSignalService : CoreServices.IFirstSignalService
{
private static readonly JsonSerializerOptions SignalJsonOptions = new()
{
PropertyNameCaseInsensitive = true,
Converters = { new JsonStringEnumConverter(JsonNamingPolicy.CamelCase) }
};
private readonly IFirstSignalCache _cache;
private readonly IFirstSignalSnapshotRepository _snapshotRepository;
private readonly IRunRepository _runRepository;
private readonly IJobRepository _jobRepository;
private readonly TimeProvider _timeProvider;
private readonly TimeToFirstSignalMetrics _ttfsMetrics;
private readonly FirstSignalOptions _options;
private readonly ILogger<FirstSignalService> _logger;
public FirstSignalService(
IFirstSignalCache cache,
IFirstSignalSnapshotRepository snapshotRepository,
IRunRepository runRepository,
IJobRepository jobRepository,
TimeProvider timeProvider,
TimeToFirstSignalMetrics ttfsMetrics,
IOptions<FirstSignalOptions> options,
ILogger<FirstSignalService> logger)
{
_cache = cache ?? throw new ArgumentNullException(nameof(cache));
_snapshotRepository = snapshotRepository ?? throw new ArgumentNullException(nameof(snapshotRepository));
_runRepository = runRepository ?? throw new ArgumentNullException(nameof(runRepository));
_jobRepository = jobRepository ?? throw new ArgumentNullException(nameof(jobRepository));
_timeProvider = timeProvider ?? TimeProvider.System;
_ttfsMetrics = ttfsMetrics ?? throw new ArgumentNullException(nameof(ttfsMetrics));
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<CoreServices.FirstSignalResult> GetFirstSignalAsync(
Guid runId,
string tenantId,
string? ifNoneMatch = null,
CancellationToken cancellationToken = default)
{
if (runId == Guid.Empty)
{
throw new ArgumentException("Run ID must be a non-empty GUID.", nameof(runId));
}
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
var overallStopwatch = Stopwatch.StartNew();
// 1) Cache fast path
var cacheLookupStopwatch = Stopwatch.StartNew();
var cacheResult = await _cache.GetAsync(tenantId, runId, cancellationToken).ConfigureAwait(false);
cacheLookupStopwatch.Stop();
if (cacheResult.HasValue)
{
var cached = cacheResult.Value;
var signal = cached.Signal;
var etag = cached.ETag;
var origin = string.IsNullOrWhiteSpace(cached.Origin) ? "snapshot" : cached.Origin.Trim().ToLowerInvariant();
_ttfsMetrics.RecordCacheLookup(
cacheLookupStopwatch.Elapsed.TotalSeconds,
surface: "api",
cacheHit: true,
signalSource: origin,
kind: MapKind(signal.Kind),
phase: MapPhase(signal.Phase),
tenantId: tenantId);
if (IsNotModified(ifNoneMatch, etag))
{
RecordSignalRendered(overallStopwatch, cacheHit: true, origin, signal.Kind, signal.Phase, tenantId);
return new CoreServices.FirstSignalResult
{
Status = CoreServices.FirstSignalResultStatus.NotModified,
CacheHit = true,
Source = origin,
ETag = etag,
Signal = signal with
{
Diagnostics = signal.Diagnostics with
{
CacheHit = true,
Source = origin,
}
}
};
}
RecordSignalRendered(overallStopwatch, cacheHit: true, origin, signal.Kind, signal.Phase, tenantId);
return new CoreServices.FirstSignalResult
{
Status = CoreServices.FirstSignalResultStatus.Found,
CacheHit = true,
Source = origin,
ETag = etag,
Signal = signal with
{
Diagnostics = signal.Diagnostics with
{
CacheHit = true,
Source = origin,
}
}
};
}
_ttfsMetrics.RecordCacheLookup(
cacheLookupStopwatch.Elapsed.TotalSeconds,
surface: "api",
cacheHit: false,
signalSource: null,
kind: TtfsSignalKind.Unavailable,
phase: TtfsPhase.Unknown,
tenantId: tenantId);
// 2) Snapshot fast path
var snapshot = await _snapshotRepository.GetByRunIdAsync(tenantId, runId, cancellationToken).ConfigureAwait(false);
if (snapshot is not null)
{
var signal = TryDeserializeSignal(snapshot.SignalJson);
if (signal is not null)
{
var etag = GenerateEtag(signal);
var origin = "snapshot";
if (IsNotModified(ifNoneMatch, etag))
{
RecordSignalRendered(overallStopwatch, cacheHit: false, origin, signal.Kind, signal.Phase, tenantId);
return new CoreServices.FirstSignalResult
{
Status = CoreServices.FirstSignalResultStatus.NotModified,
CacheHit = false,
Source = origin,
ETag = etag,
Signal = signal with
{
Diagnostics = signal.Diagnostics with
{
CacheHit = false,
Source = origin,
}
}
};
}
await _cache.SetAsync(
tenantId,
runId,
new FirstSignalCacheEntry
{
Signal = signal,
ETag = etag,
Origin = origin,
},
cancellationToken)
.ConfigureAwait(false);
RecordSignalRendered(overallStopwatch, cacheHit: false, origin, signal.Kind, signal.Phase, tenantId);
return new CoreServices.FirstSignalResult
{
Status = CoreServices.FirstSignalResultStatus.Found,
CacheHit = false,
Source = origin,
ETag = etag,
Signal = signal with
{
Diagnostics = signal.Diagnostics with
{
CacheHit = false,
Source = origin,
}
}
};
}
_logger.LogWarning(
"Invalid first signal snapshot JSON for tenant {TenantId} run {RunId}; deleting snapshot row.",
tenantId, runId);
await _snapshotRepository.DeleteByRunIdAsync(tenantId, runId, cancellationToken).ConfigureAwait(false);
}
// 3) Cold path
using var coldPathCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
if (_options.ColdPath.TimeoutMs > 0)
{
coldPathCts.CancelAfter(TimeSpan.FromMilliseconds(_options.ColdPath.TimeoutMs));
}
var coldStopwatch = Stopwatch.StartNew();
var run = await _runRepository.GetByIdAsync(tenantId, runId, coldPathCts.Token).ConfigureAwait(false);
if (run is null)
{
RecordSignalRendered(overallStopwatch, cacheHit: false, origin: null, TtfsSignalKind.Unavailable, TtfsPhase.Unknown, tenantId);
return new CoreServices.FirstSignalResult
{
Status = CoreServices.FirstSignalResultStatus.NotFound,
CacheHit = false,
Source = null,
ETag = null,
Signal = null,
};
}
var jobs = await _jobRepository.GetByRunIdAsync(tenantId, runId, coldPathCts.Token).ConfigureAwait(false);
coldStopwatch.Stop();
if (jobs.Count == 0)
{
RecordSignalRendered(overallStopwatch, cacheHit: false, origin: "cold_start", TtfsSignalKind.Unavailable, TtfsPhase.Unknown, tenantId);
return new CoreServices.FirstSignalResult
{
Status = CoreServices.FirstSignalResultStatus.NotAvailable,
CacheHit = false,
Source = "cold_start",
ETag = null,
Signal = null,
};
}
var signalComputed = ComputeSignal(run, jobs, cacheHit: false, origin: "cold_start");
var computedEtag = GenerateEtag(signalComputed);
_ttfsMetrics.RecordColdPathComputation(
coldStopwatch.Elapsed.TotalSeconds,
surface: "api",
signalSource: "cold_start",
kind: MapKind(signalComputed.Kind),
phase: MapPhase(signalComputed.Phase),
tenantId: tenantId);
await UpdateSnapshotAsyncInternal(runId, tenantId, signalComputed, cancellationToken).ConfigureAwait(false);
await _cache.SetAsync(
tenantId,
runId,
new FirstSignalCacheEntry
{
Signal = signalComputed,
ETag = computedEtag,
Origin = "cold_start",
},
cancellationToken)
.ConfigureAwait(false);
if (IsNotModified(ifNoneMatch, computedEtag))
{
RecordSignalRendered(overallStopwatch, cacheHit: false, origin: "cold_start", signalComputed.Kind, signalComputed.Phase, tenantId);
return new CoreServices.FirstSignalResult
{
Status = CoreServices.FirstSignalResultStatus.NotModified,
CacheHit = false,
Source = "cold_start",
ETag = computedEtag,
Signal = signalComputed,
};
}
RecordSignalRendered(overallStopwatch, cacheHit: false, origin: "cold_start", signalComputed.Kind, signalComputed.Phase, tenantId);
return new CoreServices.FirstSignalResult
{
Status = CoreServices.FirstSignalResultStatus.Found,
CacheHit = false,
Source = "cold_start",
ETag = computedEtag,
Signal = signalComputed,
};
}
public async Task UpdateSnapshotAsync(Guid runId, string tenantId, FirstSignal signal, CancellationToken cancellationToken = default)
{
if (runId == Guid.Empty)
{
throw new ArgumentException("Run ID must be a non-empty GUID.", nameof(runId));
}
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
ArgumentNullException.ThrowIfNull(signal);
await UpdateSnapshotAsyncInternal(runId, tenantId, signal with
{
Diagnostics = signal.Diagnostics with
{
CacheHit = false,
Source = "snapshot",
}
}, cancellationToken).ConfigureAwait(false);
await _cache.InvalidateAsync(tenantId, runId, cancellationToken).ConfigureAwait(false);
}
public async Task InvalidateCacheAsync(Guid runId, string tenantId, CancellationToken cancellationToken = default)
{
if (runId == Guid.Empty)
{
throw new ArgumentException("Run ID must be a non-empty GUID.", nameof(runId));
}
ArgumentException.ThrowIfNullOrWhiteSpace(tenantId);
await _cache.InvalidateAsync(tenantId, runId, cancellationToken).ConfigureAwait(false);
}
private async Task UpdateSnapshotAsyncInternal(Guid runId, string tenantId, FirstSignal signal, CancellationToken cancellationToken)
{
var now = _timeProvider.GetUtcNow();
var signalJson = CanonicalJsonHasher.ToCanonicalJson(signal);
var snapshot = new FirstSignalSnapshot
{
TenantId = tenantId,
RunId = runId,
JobId = signal.JobId,
CreatedAt = now,
UpdatedAt = now,
Kind = signal.Kind.ToString().ToLowerInvariant(),
Phase = signal.Phase.ToString().ToLowerInvariant(),
Summary = signal.Summary,
EtaSeconds = signal.EtaSeconds,
LastKnownOutcomeJson = signal.LastKnownOutcome is null
? null
: JsonSerializer.Serialize(signal.LastKnownOutcome, SignalJsonOptions),
NextActionsJson = signal.NextActions is null
? null
: JsonSerializer.Serialize(signal.NextActions, SignalJsonOptions),
DiagnosticsJson = JsonSerializer.Serialize(signal.Diagnostics, SignalJsonOptions),
SignalJson = signalJson,
};
await _snapshotRepository.UpsertAsync(snapshot, cancellationToken).ConfigureAwait(false);
}
private static FirstSignal ComputeSignal(Run run, IReadOnlyList<Job> jobs, bool cacheHit, string origin)
{
ArgumentNullException.ThrowIfNull(run);
ArgumentNullException.ThrowIfNull(jobs);
var job = SelectRepresentativeJob(run, jobs);
var hasLeasedJob = jobs.Any(j => j.Status == JobStatus.Leased);
var kind = hasLeasedJob
? FirstSignalKind.Started
: run.Status switch
{
RunStatus.Failed => FirstSignalKind.Failed,
RunStatus.Canceled => FirstSignalKind.Canceled,
RunStatus.Succeeded or RunStatus.PartiallySucceeded => FirstSignalKind.Succeeded,
_ => FirstSignalKind.Queued
};
var phase = FirstSignalPhase.Unknown;
var timestamp = ResolveTimestamp(run, job, kind);
var correlationId = run.CorrelationId ?? job.CorrelationId ?? string.Empty;
var signalId = $"{run.RunId:D}:{job.JobId:D}:{kind.ToString().ToLowerInvariant()}:{phase.ToString().ToLowerInvariant()}:{timestamp.ToUnixTimeMilliseconds()}";
var summary = kind switch
{
FirstSignalKind.Queued => "Run queued",
FirstSignalKind.Started => "Run started",
FirstSignalKind.Succeeded => "Run completed",
FirstSignalKind.Failed => "Run failed",
FirstSignalKind.Canceled => "Run canceled",
_ => "Run update"
};
return new FirstSignal
{
Version = "1.0",
SignalId = signalId,
JobId = job.JobId,
Timestamp = timestamp,
Kind = kind,
Phase = phase,
Scope = new FirstSignalScope { Type = "run", Id = run.RunId.ToString("D") },
Summary = summary,
EtaSeconds = null,
LastKnownOutcome = null,
NextActions = null,
Diagnostics = new FirstSignalDiagnostics
{
CacheHit = cacheHit,
Source = origin,
CorrelationId = correlationId
}
};
}
private static Job SelectRepresentativeJob(Run run, IReadOnlyList<Job> jobs)
{
// Prefer an in-flight job to surface "started" quickly, even if Run.Status hasn't transitioned yet.
var leased = jobs
.Where(j => j.Status == JobStatus.Leased)
.OrderBy(j => j.LeasedAt ?? DateTimeOffset.MaxValue)
.ThenBy(j => j.CreatedAt)
.FirstOrDefault();
if (leased is not null)
{
return leased;
}
// Prefer earliest completed job when run is terminal.
if (run.Status is RunStatus.Succeeded or RunStatus.PartiallySucceeded or RunStatus.Failed or RunStatus.Canceled)
{
var terminal = jobs
.Where(j => j.Status is JobStatus.Succeeded or JobStatus.Failed or JobStatus.Canceled or JobStatus.TimedOut)
.OrderBy(j => j.CompletedAt ?? DateTimeOffset.MaxValue)
.ThenBy(j => j.CreatedAt)
.FirstOrDefault();
if (terminal is not null)
{
return terminal;
}
}
// Otherwise, use the earliest-created job as representative.
return jobs.OrderBy(j => j.CreatedAt).First();
}
private static DateTimeOffset ResolveTimestamp(Run run, Job job, FirstSignalKind kind)
{
return kind switch
{
FirstSignalKind.Started => job.LeasedAt ?? run.StartedAt ?? run.CreatedAt,
FirstSignalKind.Succeeded or FirstSignalKind.Failed or FirstSignalKind.Canceled => job.CompletedAt ?? run.CompletedAt ?? run.CreatedAt,
_ => job.ScheduledAt ?? job.CreatedAt
};
}
private static FirstSignal? TryDeserializeSignal(string json)
{
if (string.IsNullOrWhiteSpace(json))
{
return null;
}
try
{
return JsonSerializer.Deserialize<FirstSignal>(json, SignalJsonOptions);
}
catch
{
return null;
}
}
private static string GenerateEtag(FirstSignal signal)
{
var material = new
{
signal.Version,
signal.JobId,
signal.Timestamp,
signal.Kind,
signal.Phase,
signal.Scope,
signal.Summary,
signal.EtaSeconds,
signal.LastKnownOutcome,
signal.NextActions
};
var canonicalJson = CanonicalJsonHasher.ToCanonicalJson(material);
var hash = SHA256.HashData(Encoding.UTF8.GetBytes(canonicalJson));
var base64 = Convert.ToBase64String(hash.AsSpan(0, 8));
return $"W/\"{base64}\"";
}
private static bool IsNotModified(string? ifNoneMatch, string etag)
{
if (string.IsNullOrWhiteSpace(ifNoneMatch))
{
return false;
}
var candidates = ifNoneMatch
.Split(',', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries)
.Select(t => t.Trim())
.ToList();
if (candidates.Any(t => t == "*"))
{
return true;
}
return candidates.Any(t => string.Equals(t, etag, StringComparison.Ordinal));
}
private static TtfsSignalKind MapKind(FirstSignalKind kind) => kind switch
{
FirstSignalKind.Queued => TtfsSignalKind.Queued,
FirstSignalKind.Started => TtfsSignalKind.Started,
FirstSignalKind.Phase => TtfsSignalKind.Phase,
FirstSignalKind.Blocked => TtfsSignalKind.Blocked,
FirstSignalKind.Failed => TtfsSignalKind.Failed,
FirstSignalKind.Succeeded => TtfsSignalKind.Succeeded,
FirstSignalKind.Canceled => TtfsSignalKind.Canceled,
_ => TtfsSignalKind.Unavailable,
};
private static TtfsPhase MapPhase(FirstSignalPhase phase) => phase switch
{
FirstSignalPhase.Resolve => TtfsPhase.Resolve,
FirstSignalPhase.Fetch => TtfsPhase.Fetch,
FirstSignalPhase.Restore => TtfsPhase.Restore,
FirstSignalPhase.Analyze => TtfsPhase.Analyze,
FirstSignalPhase.Policy => TtfsPhase.Policy,
FirstSignalPhase.Report => TtfsPhase.Report,
_ => TtfsPhase.Unknown,
};
private void RecordSignalRendered(
Stopwatch overallStopwatch,
bool cacheHit,
string? origin,
FirstSignalKind kind,
FirstSignalPhase phase,
string tenantId)
{
_ttfsMetrics.RecordSignalRendered(
latencySeconds: overallStopwatch.Elapsed.TotalSeconds,
surface: "api",
cacheHit: cacheHit,
signalSource: origin,
kind: MapKind(kind),
phase: MapPhase(phase),
tenantId: tenantId);
}
private void RecordSignalRendered(
Stopwatch overallStopwatch,
bool cacheHit,
string? origin,
TtfsSignalKind kind,
TtfsPhase phase,
string tenantId)
{
_ttfsMetrics.RecordSignalRendered(
latencySeconds: overallStopwatch.Elapsed.TotalSeconds,
surface: "api",
cacheHit: cacheHit,
signalSource: origin,
kind: kind,
phase: phase,
tenantId: tenantId);
}
}

View File

@@ -0,0 +1,130 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StellaOps.Orchestrator.Core.Domain;
using StellaOps.Orchestrator.Infrastructure.Options;
using StellaOps.Orchestrator.Infrastructure.Repositories;
using CoreServices = StellaOps.Orchestrator.Core.Services;
namespace StellaOps.Orchestrator.Infrastructure.Services;
public sealed class FirstSignalSnapshotWriter : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly FirstSignalSnapshotWriterOptions _options;
private readonly ILogger<FirstSignalSnapshotWriter> _logger;
public FirstSignalSnapshotWriter(
IServiceScopeFactory scopeFactory,
IOptions<FirstSignalOptions> options,
ILogger<FirstSignalSnapshotWriter> logger)
{
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value.SnapshotWriter;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_options.Enabled)
{
_logger.LogDebug("FirstSignalSnapshotWriter is disabled.");
return;
}
if (string.IsNullOrWhiteSpace(_options.TenantId))
{
_logger.LogWarning(
"FirstSignalSnapshotWriter enabled but no tenant configured; set {Section}:{Key}.",
FirstSignalOptions.SectionName,
$"{nameof(FirstSignalOptions.SnapshotWriter)}:{nameof(FirstSignalSnapshotWriterOptions.TenantId)}");
return;
}
var tenantId = _options.TenantId.Trim();
var lookback = TimeSpan.FromMinutes(Math.Max(1, _options.LookbackMinutes));
var pollInterval = TimeSpan.FromSeconds(Math.Max(1, _options.PollIntervalSeconds));
var maxRuns = Math.Max(1, _options.MaxRunsPerTick);
using var timer = new PeriodicTimer(pollInterval);
while (await timer.WaitForNextTickAsync(stoppingToken).ConfigureAwait(false))
{
try
{
await WarmTenantAsync(tenantId, lookback, maxRuns, stoppingToken).ConfigureAwait(false);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "FirstSignalSnapshotWriter tick failed for tenant {TenantId}.", tenantId);
}
}
}
private async Task WarmTenantAsync(
string tenantId,
TimeSpan lookback,
int maxRuns,
CancellationToken cancellationToken)
{
using var scope = _scopeFactory.CreateScope();
var runRepository = scope.ServiceProvider.GetRequiredService<IRunRepository>();
var firstSignalService = scope.ServiceProvider.GetRequiredService<CoreServices.IFirstSignalService>();
var createdAfter = DateTimeOffset.UtcNow.Subtract(lookback);
var pending = await runRepository.ListAsync(
tenantId,
sourceId: null,
runType: null,
status: RunStatus.Pending,
projectId: null,
createdAfter: createdAfter,
createdBefore: null,
limit: maxRuns,
offset: 0,
cancellationToken: cancellationToken).ConfigureAwait(false);
var running = await runRepository.ListAsync(
tenantId,
sourceId: null,
runType: null,
status: RunStatus.Running,
projectId: null,
createdAfter: createdAfter,
createdBefore: null,
limit: maxRuns,
offset: 0,
cancellationToken: cancellationToken).ConfigureAwait(false);
var candidates = pending
.Concat(running)
.GroupBy(r => r.RunId)
.Select(g => g.First())
.OrderBy(r => r.CreatedAt)
.ThenBy(r => r.RunId)
.Take(maxRuns)
.ToList();
foreach (var run in candidates)
{
cancellationToken.ThrowIfCancellationRequested();
try
{
await firstSignalService
.GetFirstSignalAsync(run.RunId, tenantId, ifNoneMatch: null, cancellationToken)
.ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed warming first signal for tenant {TenantId} run {RunId}.", tenantId, run.RunId);
}
}
}
}

View File

@@ -16,6 +16,7 @@
<ItemGroup>
<ProjectReference Include="..\StellaOps.Orchestrator.Core\StellaOps.Orchestrator.Core.csproj"/>
<ProjectReference Include="..\..\..\Telemetry\StellaOps.Telemetry.Core\StellaOps.Telemetry.Core\StellaOps.Telemetry.Core.csproj"/>
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Messaging\StellaOps.Messaging.csproj" />
</ItemGroup>
<ItemGroup>

View File

@@ -0,0 +1,53 @@
-- 008_first_signal_snapshots.sql
-- First Signal snapshots for TTFS fast-path (SPRINT_0339_0001_0001_first_signal_api.md)
BEGIN;
CREATE TABLE first_signal_snapshots (
tenant_id TEXT NOT NULL,
run_id UUID NOT NULL,
job_id UUID NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
kind TEXT NOT NULL CHECK (kind IN (
'queued',
'started',
'phase',
'blocked',
'failed',
'succeeded',
'canceled',
'unavailable'
)),
phase TEXT NOT NULL CHECK (phase IN (
'resolve',
'fetch',
'restore',
'analyze',
'policy',
'report',
'unknown'
)),
summary TEXT NOT NULL,
eta_seconds INT NULL,
last_known_outcome JSONB NULL,
next_actions JSONB NULL,
diagnostics JSONB NOT NULL DEFAULT '{}'::jsonb,
signal_json JSONB NOT NULL,
CONSTRAINT pk_first_signal_snapshots PRIMARY KEY (tenant_id, run_id)
) PARTITION BY LIST (tenant_id);
CREATE TABLE first_signal_snapshots_default PARTITION OF first_signal_snapshots DEFAULT;
CREATE INDEX ix_first_signal_snapshots_job ON first_signal_snapshots (tenant_id, job_id);
CREATE INDEX ix_first_signal_snapshots_updated ON first_signal_snapshots (tenant_id, updated_at DESC);
COMMENT ON TABLE first_signal_snapshots IS 'Per-run cached first-signal payload for TTFS fast path.';
COMMENT ON COLUMN first_signal_snapshots.kind IS 'Current signal kind.';
COMMENT ON COLUMN first_signal_snapshots.phase IS 'Current execution phase.';
COMMENT ON COLUMN first_signal_snapshots.signal_json IS 'Full first-signal payload for ETag and response mapping.';
COMMIT;

View File

@@ -0,0 +1,59 @@
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Options;
using StellaOps.Orchestrator.Infrastructure.Options;
using StellaOps.Orchestrator.WebService.Services;
namespace StellaOps.Orchestrator.Tests.ControlPlane;
public sealed class TenantResolverTests
{
[Fact]
public void ResolveForStreaming_PrefersHeaderWhenPresent()
{
var resolver = new TenantResolver(Options.Create(new OrchestratorServiceOptions
{
TenantHeader = "X-StellaOps-Tenant",
}));
var context = new DefaultHttpContext();
context.Request.Headers["X-StellaOps-Tenant"] = " acme ";
context.Request.QueryString = new QueryString("?tenant=ignored");
var tenant = resolver.ResolveForStreaming(context);
Assert.Equal("acme", tenant);
}
[Fact]
public void ResolveForStreaming_FallsBackToQueryParam()
{
var resolver = new TenantResolver(Options.Create(new OrchestratorServiceOptions
{
TenantHeader = "X-StellaOps-Tenant",
}));
var context = new DefaultHttpContext();
context.Request.QueryString = new QueryString("?tenant=%20acme%20");
var tenant = resolver.ResolveForStreaming(context);
Assert.Equal("acme", tenant);
}
[Fact]
public void ResolveForStreaming_ThrowsWhenTenantMissing()
{
var resolver = new TenantResolver(Options.Create(new OrchestratorServiceOptions
{
TenantHeader = "X-StellaOps-Tenant",
}));
var context = new DefaultHttpContext();
var ex = Assert.Throws<InvalidOperationException>(() => resolver.ResolveForStreaming(context));
Assert.Contains("X-StellaOps-Tenant", ex.Message);
Assert.Contains("tenant", ex.Message);
}
}

View File

@@ -0,0 +1,473 @@
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using StellaOps.Messaging;
using StellaOps.Orchestrator.Core.Domain;
using StellaOps.Orchestrator.Core.Hashing;
using StellaOps.Orchestrator.Core.Repositories;
using StellaOps.Orchestrator.Infrastructure.Caching;
using StellaOps.Orchestrator.Infrastructure.Options;
using StellaOps.Orchestrator.Infrastructure.Repositories;
using StellaOps.Orchestrator.Infrastructure.Services;
using StellaOps.Telemetry.Core;
namespace StellaOps.Orchestrator.Tests.Ttfs;
public sealed class FirstSignalServiceTests
{
private const string TenantId = "test-tenant";
[Fact]
public async Task GetFirstSignalAsync_ColdPathThenCacheHit_IfNoneMatch_Returns304()
{
var runId = Guid.NewGuid();
var jobId = Guid.NewGuid();
var now = new DateTimeOffset(2025, 12, 15, 12, 0, 0, TimeSpan.Zero);
var run = new Run(
RunId: runId,
TenantId: TenantId,
ProjectId: null,
SourceId: Guid.NewGuid(),
RunType: "scan",
Status: RunStatus.Pending,
CorrelationId: "corr-1",
TotalJobs: 1,
CompletedJobs: 0,
SucceededJobs: 0,
FailedJobs: 0,
CreatedAt: now,
StartedAt: null,
CompletedAt: null,
CreatedBy: "system",
Metadata: null);
var job = new Job(
JobId: jobId,
TenantId: TenantId,
ProjectId: null,
RunId: runId,
JobType: "scan.image",
Status: JobStatus.Scheduled,
Priority: 0,
Attempt: 1,
MaxAttempts: 3,
PayloadDigest: new string('a', 64),
Payload: "{}",
IdempotencyKey: "idem-1",
CorrelationId: null,
LeaseId: null,
WorkerId: null,
TaskRunnerId: null,
LeaseUntil: null,
CreatedAt: now,
ScheduledAt: now,
LeasedAt: null,
CompletedAt: null,
NotBefore: null,
Reason: null,
ReplayOf: null,
CreatedBy: "system");
var cache = new FakeFirstSignalCache();
var snapshots = new FakeFirstSignalSnapshotRepository();
var runs = new FakeRunRepository(run);
var jobs = new FakeJobRepository(job);
using var ttfs = new TimeToFirstSignalMetrics();
var options = Options.Create(new FirstSignalOptions());
var service = new FirstSignalService(
cache,
snapshots,
runs,
jobs,
TimeProvider.System,
ttfs,
options,
NullLogger<FirstSignalService>.Instance);
var first = await service.GetFirstSignalAsync(runId, TenantId);
Assert.Equal(StellaOps.Orchestrator.Core.Services.FirstSignalResultStatus.Found, first.Status);
Assert.NotNull(first.ETag);
Assert.False(first.CacheHit);
var second = await service.GetFirstSignalAsync(runId, TenantId, ifNoneMatch: first.ETag);
Assert.Equal(StellaOps.Orchestrator.Core.Services.FirstSignalResultStatus.NotModified, second.Status);
Assert.True(second.CacheHit);
}
[Fact]
public async Task GetFirstSignalAsync_RunPendingButJobLeased_ReturnsStarted()
{
var runId = Guid.NewGuid();
var jobId = Guid.NewGuid();
var now = new DateTimeOffset(2025, 12, 15, 12, 0, 0, TimeSpan.Zero);
var run = new Run(
RunId: runId,
TenantId: TenantId,
ProjectId: null,
SourceId: Guid.NewGuid(),
RunType: "scan",
Status: RunStatus.Pending,
CorrelationId: null,
TotalJobs: 1,
CompletedJobs: 0,
SucceededJobs: 0,
FailedJobs: 0,
CreatedAt: now,
StartedAt: null,
CompletedAt: null,
CreatedBy: "system",
Metadata: null);
var job = new Job(
JobId: jobId,
TenantId: TenantId,
ProjectId: null,
RunId: runId,
JobType: "scan.image",
Status: JobStatus.Leased,
Priority: 0,
Attempt: 1,
MaxAttempts: 3,
PayloadDigest: new string('a', 64),
Payload: "{}",
IdempotencyKey: "idem-1",
CorrelationId: null,
LeaseId: Guid.NewGuid(),
WorkerId: "worker-1",
TaskRunnerId: null,
LeaseUntil: now.AddMinutes(5),
CreatedAt: now,
ScheduledAt: now,
LeasedAt: now.AddSeconds(10),
CompletedAt: null,
NotBefore: null,
Reason: null,
ReplayOf: null,
CreatedBy: "system");
using var ttfs = new TimeToFirstSignalMetrics();
var service = new FirstSignalService(
cache: new FakeFirstSignalCache(),
snapshotRepository: new FakeFirstSignalSnapshotRepository(),
runRepository: new FakeRunRepository(run),
jobRepository: new FakeJobRepository(job),
timeProvider: TimeProvider.System,
ttfsMetrics: ttfs,
options: Options.Create(new FirstSignalOptions()),
logger: NullLogger<FirstSignalService>.Instance);
var result = await service.GetFirstSignalAsync(runId, TenantId);
Assert.Equal(StellaOps.Orchestrator.Core.Services.FirstSignalResultStatus.Found, result.Status);
Assert.NotNull(result.Signal);
Assert.Equal(FirstSignalKind.Started, result.Signal!.Kind);
}
[Fact]
public async Task GetFirstSignalAsync_RunMissing_Returns404()
{
using var ttfs = new TimeToFirstSignalMetrics();
var service = new FirstSignalService(
cache: new FakeFirstSignalCache(),
snapshotRepository: new FakeFirstSignalSnapshotRepository(),
runRepository: new FakeRunRepository(null),
jobRepository: new FakeJobRepository(),
timeProvider: TimeProvider.System,
ttfsMetrics: ttfs,
options: Options.Create(new FirstSignalOptions()),
logger: NullLogger<FirstSignalService>.Instance);
var result = await service.GetFirstSignalAsync(Guid.NewGuid(), TenantId);
Assert.Equal(StellaOps.Orchestrator.Core.Services.FirstSignalResultStatus.NotFound, result.Status);
}
[Fact]
public async Task GetFirstSignalAsync_RunWithNoJobs_Returns204()
{
var run = new Run(
RunId: Guid.NewGuid(),
TenantId: TenantId,
ProjectId: null,
SourceId: Guid.NewGuid(),
RunType: "scan",
Status: RunStatus.Pending,
CorrelationId: null,
TotalJobs: 0,
CompletedJobs: 0,
SucceededJobs: 0,
FailedJobs: 0,
CreatedAt: DateTimeOffset.UtcNow,
StartedAt: null,
CompletedAt: null,
CreatedBy: "system",
Metadata: null);
using var ttfs = new TimeToFirstSignalMetrics();
var service = new FirstSignalService(
cache: new FakeFirstSignalCache(),
snapshotRepository: new FakeFirstSignalSnapshotRepository(),
runRepository: new FakeRunRepository(run),
jobRepository: new FakeJobRepository(),
timeProvider: TimeProvider.System,
ttfsMetrics: ttfs,
options: Options.Create(new FirstSignalOptions()),
logger: NullLogger<FirstSignalService>.Instance);
var result = await service.GetFirstSignalAsync(run.RunId, TenantId);
Assert.Equal(StellaOps.Orchestrator.Core.Services.FirstSignalResultStatus.NotAvailable, result.Status);
}
[Fact]
public async Task GetFirstSignalAsync_SnapshotHit_PopulatesCache()
{
var runId = Guid.NewGuid();
var jobId = Guid.NewGuid();
var signal = new FirstSignal
{
Version = "1.0",
SignalId = "sig-1",
JobId = jobId,
Timestamp = new DateTimeOffset(2025, 12, 15, 12, 0, 0, TimeSpan.Zero),
Kind = FirstSignalKind.Queued,
Phase = FirstSignalPhase.Unknown,
Scope = new FirstSignalScope { Type = "run", Id = runId.ToString("D") },
Summary = "Run queued",
EtaSeconds = null,
LastKnownOutcome = null,
NextActions = null,
Diagnostics = new FirstSignalDiagnostics
{
CacheHit = false,
Source = "snapshot",
CorrelationId = string.Empty
}
};
var snapshotRepo = new FakeFirstSignalSnapshotRepository();
await snapshotRepo.UpsertAsync(new FirstSignalSnapshot
{
TenantId = TenantId,
RunId = runId,
JobId = jobId,
CreatedAt = DateTimeOffset.UtcNow,
UpdatedAt = DateTimeOffset.UtcNow,
Kind = "queued",
Phase = "unknown",
Summary = "Run queued",
EtaSeconds = null,
LastKnownOutcomeJson = null,
NextActionsJson = null,
DiagnosticsJson = "{}",
SignalJson = CanonicalJsonHasher.ToCanonicalJson(signal),
});
var cache = new FakeFirstSignalCache();
using var ttfs = new TimeToFirstSignalMetrics();
var service = new FirstSignalService(
cache,
snapshotRepo,
runRepository: new FakeRunRepository(null),
jobRepository: new FakeJobRepository(),
timeProvider: TimeProvider.System,
ttfsMetrics: ttfs,
options: Options.Create(new FirstSignalOptions()),
logger: NullLogger<FirstSignalService>.Instance);
var first = await service.GetFirstSignalAsync(runId, TenantId);
Assert.Equal(StellaOps.Orchestrator.Core.Services.FirstSignalResultStatus.Found, first.Status);
Assert.False(first.CacheHit);
Assert.True(cache.TryGet(TenantId, runId, out _));
var second = await service.GetFirstSignalAsync(runId, TenantId);
Assert.Equal(StellaOps.Orchestrator.Core.Services.FirstSignalResultStatus.Found, second.Status);
Assert.True(second.CacheHit);
}
private sealed class FakeFirstSignalCache : IFirstSignalCache
{
private readonly Dictionary<(string TenantId, Guid RunId), FirstSignalCacheEntry> _entries = new();
public string ProviderName => "fake";
public ValueTask<CacheResult<FirstSignalCacheEntry>> GetAsync(string tenantId, Guid runId, CancellationToken cancellationToken = default)
{
if (_entries.TryGetValue((tenantId, runId), out var entry))
{
return ValueTask.FromResult(CacheResult<FirstSignalCacheEntry>.Found(entry));
}
return ValueTask.FromResult(CacheResult<FirstSignalCacheEntry>.Miss());
}
public ValueTask SetAsync(string tenantId, Guid runId, FirstSignalCacheEntry entry, CancellationToken cancellationToken = default)
{
_entries[(tenantId, runId)] = entry;
return ValueTask.CompletedTask;
}
public ValueTask<bool> InvalidateAsync(string tenantId, Guid runId, CancellationToken cancellationToken = default)
{
return ValueTask.FromResult(_entries.Remove((tenantId, runId)));
}
public bool TryGet(string tenantId, Guid runId, out FirstSignalCacheEntry? entry)
{
if (_entries.TryGetValue((tenantId, runId), out var value))
{
entry = value;
return true;
}
entry = null;
return false;
}
}
private sealed class FakeFirstSignalSnapshotRepository : IFirstSignalSnapshotRepository
{
private readonly Dictionary<(string TenantId, Guid RunId), FirstSignalSnapshot> _rows = new();
public Task<FirstSignalSnapshot?> GetByRunIdAsync(string tenantId, Guid runId, CancellationToken cancellationToken = default)
{
_rows.TryGetValue((tenantId, runId), out var snapshot);
return Task.FromResult(snapshot);
}
public Task UpsertAsync(FirstSignalSnapshot snapshot, CancellationToken cancellationToken = default)
{
_rows[(snapshot.TenantId, snapshot.RunId)] = snapshot;
return Task.CompletedTask;
}
public Task DeleteByRunIdAsync(string tenantId, Guid runId, CancellationToken cancellationToken = default)
{
_rows.Remove((tenantId, runId));
return Task.CompletedTask;
}
}
private sealed class FakeRunRepository : IRunRepository
{
private readonly Run? _run;
public FakeRunRepository(Run? run) => _run = run;
public Task<Run?> GetByIdAsync(string tenantId, Guid runId, CancellationToken cancellationToken)
=> Task.FromResult(_run);
public Task CreateAsync(Run run, CancellationToken cancellationToken) => throw new NotImplementedException();
public Task UpdateStatusAsync(
string tenantId,
Guid runId,
RunStatus status,
int totalJobs,
int completedJobs,
int succeededJobs,
int failedJobs,
DateTimeOffset? startedAt,
DateTimeOffset? completedAt,
CancellationToken cancellationToken) => throw new NotImplementedException();
public Task IncrementJobCountsAsync(string tenantId, Guid runId, bool succeeded, CancellationToken cancellationToken)
=> throw new NotImplementedException();
public Task<IReadOnlyList<Run>> ListAsync(
string tenantId,
Guid? sourceId,
string? runType,
RunStatus? status,
string? projectId,
DateTimeOffset? createdAfter,
DateTimeOffset? createdBefore,
int limit,
int offset,
CancellationToken cancellationToken) => throw new NotImplementedException();
public Task<int> CountAsync(
string tenantId,
Guid? sourceId,
string? runType,
RunStatus? status,
string? projectId,
CancellationToken cancellationToken) => throw new NotImplementedException();
}
private sealed class FakeJobRepository : IJobRepository
{
private readonly IReadOnlyList<Job> _jobs;
public FakeJobRepository(params Job[] jobs) => _jobs = jobs;
public Task<IReadOnlyList<Job>> GetByRunIdAsync(string tenantId, Guid runId, CancellationToken cancellationToken)
=> Task.FromResult(_jobs.Where(j => j.RunId == runId).ToList() as IReadOnlyList<Job>);
public Task<Job?> GetByIdAsync(string tenantId, Guid jobId, CancellationToken cancellationToken)
=> throw new NotImplementedException();
public Task<Job?> GetByIdempotencyKeyAsync(string tenantId, string idempotencyKey, CancellationToken cancellationToken)
=> throw new NotImplementedException();
public Task CreateAsync(Job job, CancellationToken cancellationToken)
=> throw new NotImplementedException();
public Task UpdateStatusAsync(
string tenantId,
Guid jobId,
JobStatus status,
int attempt,
Guid? leaseId,
string? workerId,
string? taskRunnerId,
DateTimeOffset? leaseUntil,
DateTimeOffset? scheduledAt,
DateTimeOffset? leasedAt,
DateTimeOffset? completedAt,
DateTimeOffset? notBefore,
string? reason,
CancellationToken cancellationToken) => throw new NotImplementedException();
public Task<Job?> LeaseNextAsync(
string tenantId,
string? jobType,
Guid leaseId,
string workerId,
DateTimeOffset leaseUntil,
CancellationToken cancellationToken) => throw new NotImplementedException();
public Task<bool> ExtendLeaseAsync(
string tenantId,
Guid jobId,
Guid leaseId,
DateTimeOffset newLeaseUntil,
CancellationToken cancellationToken) => throw new NotImplementedException();
public Task<IReadOnlyList<Job>> GetExpiredLeasesAsync(string tenantId, DateTimeOffset cutoff, int limit, CancellationToken cancellationToken)
=> throw new NotImplementedException();
public Task<IReadOnlyList<Job>> ListAsync(
string tenantId,
JobStatus? status,
string? jobType,
string? projectId,
DateTimeOffset? createdAfter,
DateTimeOffset? createdBefore,
int limit,
int offset,
CancellationToken cancellationToken) => throw new NotImplementedException();
public Task<int> CountAsync(
string tenantId,
JobStatus? status,
string? jobType,
string? projectId,
CancellationToken cancellationToken) => throw new NotImplementedException();
}
}

View File

@@ -0,0 +1,33 @@
namespace StellaOps.Orchestrator.WebService.Contracts;
/// <summary>
/// API response for first signal endpoint.
/// </summary>
public sealed record FirstSignalResponse
{
public required Guid RunId { get; init; }
public required FirstSignalDto? FirstSignal { get; init; }
public required string SummaryEtag { get; init; }
}
public sealed record FirstSignalDto
{
public required string Type { get; init; }
public string? Stage { get; init; }
public string? Step { get; init; }
public required string Message { get; init; }
public required DateTimeOffset At { get; init; }
public FirstSignalArtifactDto? Artifact { get; init; }
}
public sealed record FirstSignalArtifactDto
{
public required string Kind { get; init; }
public FirstSignalRangeDto? Range { get; init; }
}
public sealed record FirstSignalRangeDto
{
public required int Start { get; init; }
public required int End { get; init; }
}

View File

@@ -0,0 +1,104 @@
using Microsoft.AspNetCore.Mvc;
using StellaOps.Orchestrator.Core.Services;
using StellaOps.Orchestrator.WebService.Contracts;
using StellaOps.Orchestrator.WebService.Services;
namespace StellaOps.Orchestrator.WebService.Endpoints;
/// <summary>
/// REST API endpoint for first signal (TTFS).
/// </summary>
public static class FirstSignalEndpoints
{
public static RouteGroupBuilder MapFirstSignalEndpoints(this IEndpointRouteBuilder app)
{
var group = app.MapGroup("/api/v1/orchestrator/runs")
.WithTags("Orchestrator Runs");
group.MapGet("{runId:guid}/first-signal", GetFirstSignal)
.WithName("Orchestrator_GetFirstSignal")
.WithDescription("Gets the first meaningful signal for a run");
return group;
}
private static async Task<IResult> GetFirstSignal(
HttpContext context,
[FromRoute] Guid runId,
[FromHeader(Name = "If-None-Match")] string? ifNoneMatch,
[FromServices] TenantResolver tenantResolver,
[FromServices] IFirstSignalService firstSignalService,
CancellationToken cancellationToken)
{
try
{
var tenantId = tenantResolver.Resolve(context);
var result = await firstSignalService
.GetFirstSignalAsync(runId, tenantId, ifNoneMatch, cancellationToken)
.ConfigureAwait(false);
context.Response.Headers["Cache-Status"] = result.CacheHit ? "hit" : "miss";
if (!string.IsNullOrWhiteSpace(result.Source))
{
context.Response.Headers["X-FirstSignal-Source"] = result.Source;
}
if (!string.IsNullOrWhiteSpace(result.ETag))
{
context.Response.Headers.ETag = result.ETag;
context.Response.Headers.CacheControl = "private, max-age=60";
}
return result.Status switch
{
FirstSignalResultStatus.Found => Results.Ok(MapToResponse(runId, result)),
FirstSignalResultStatus.NotModified => Results.StatusCode(StatusCodes.Status304NotModified),
FirstSignalResultStatus.NotFound => Results.NotFound(),
FirstSignalResultStatus.NotAvailable => Results.NoContent(),
_ => Results.Problem("Internal error")
};
}
catch (InvalidOperationException ex)
{
return Results.BadRequest(new { error = ex.Message });
}
catch (ArgumentException ex)
{
return Results.BadRequest(new { error = ex.Message });
}
}
private static FirstSignalResponse MapToResponse(Guid runId, FirstSignalResult result)
{
if (result.Signal is null)
{
return new FirstSignalResponse
{
RunId = runId,
FirstSignal = null,
SummaryEtag = result.ETag ?? string.Empty
};
}
var signal = result.Signal;
return new FirstSignalResponse
{
RunId = runId,
SummaryEtag = result.ETag ?? string.Empty,
FirstSignal = new FirstSignalDto
{
Type = signal.Kind.ToString().ToLowerInvariant(),
Stage = signal.Phase.ToString().ToLowerInvariant(),
Step = null,
Message = signal.Summary,
At = signal.Timestamp,
Artifact = new FirstSignalArtifactDto
{
Kind = signal.Scope.Type,
Range = null
}
}
};
}
}

View File

@@ -47,7 +47,7 @@ public static class StreamEndpoints
{
try
{
var tenantId = tenantResolver.Resolve(context);
var tenantId = tenantResolver.ResolveForStreaming(context);
var job = await jobRepository.GetByIdAsync(tenantId, jobId, cancellationToken).ConfigureAwait(false);
if (job is null)
@@ -83,7 +83,7 @@ public static class StreamEndpoints
{
try
{
var tenantId = tenantResolver.Resolve(context);
var tenantId = tenantResolver.ResolveForStreaming(context);
var run = await runRepository.GetByIdAsync(tenantId, runId, cancellationToken).ConfigureAwait(false);
if (run is null)
@@ -119,7 +119,7 @@ public static class StreamEndpoints
{
try
{
var tenantId = tenantResolver.Resolve(context);
var tenantId = tenantResolver.ResolveForStreaming(context);
var packRun = await packRunRepository.GetByIdAsync(tenantId, packRunId, cancellationToken).ConfigureAwait(false);
if (packRun is null)
{
@@ -158,7 +158,7 @@ public static class StreamEndpoints
return;
}
var tenantId = tenantResolver.Resolve(context);
var tenantId = tenantResolver.ResolveForStreaming(context);
var packRun = await packRunRepository.GetByIdAsync(tenantId, packRunId, cancellationToken).ConfigureAwait(false);
if (packRun is null)
{

View File

@@ -1,5 +1,10 @@
using StellaOps.Messaging.DependencyInjection;
using StellaOps.Messaging.Transport.InMemory;
using StellaOps.Messaging.Transport.Postgres;
using StellaOps.Messaging.Transport.Valkey;
using StellaOps.Orchestrator.Core.Scale;
using StellaOps.Orchestrator.Infrastructure;
using StellaOps.Orchestrator.Infrastructure.Services;
using StellaOps.Orchestrator.WebService.Endpoints;
using StellaOps.Orchestrator.WebService.Services;
using StellaOps.Orchestrator.WebService.Streaming;
@@ -11,6 +16,27 @@ builder.Services.AddRouting(options => options.LowercaseUrls = true);
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddOpenApi();
// Register messaging transport (used for distributed caching primitives).
// Defaults to in-memory unless explicitly configured.
var configuredCacheBackend = builder.Configuration["FirstSignal:Cache:Backend"]?.Trim().ToLowerInvariant();
var configuredTransport = builder.Configuration["messaging:transport"]?.Trim().ToLowerInvariant();
var transport = string.IsNullOrWhiteSpace(configuredCacheBackend) ? configuredTransport : configuredCacheBackend;
switch (transport)
{
case "none":
break;
case "valkey":
builder.Services.AddMessagingTransport<ValkeyTransportPlugin>(builder.Configuration);
break;
case "postgres":
builder.Services.AddMessagingTransport<PostgresTransportPlugin>(builder.Configuration);
break;
default:
builder.Services.AddMessagingTransport<InMemoryTransportPlugin>(builder.Configuration);
break;
}
// Register StellaOps telemetry with OpenTelemetry integration
// Per ORCH-OBS-50-001: Wire StellaOps.Telemetry.Core into orchestrator host
builder.Services.AddStellaOpsTelemetry(
@@ -35,6 +61,9 @@ builder.Services.AddTelemetryContextPropagation();
// Register golden signal metrics for scheduler instrumentation
builder.Services.AddGoldenSignalMetrics();
// Register TTFS metrics for first-signal endpoint/service
builder.Services.AddTimeToFirstSignalMetrics();
// Register incident mode for enhanced telemetry during incidents
builder.Services.AddIncidentMode(builder.Configuration);
@@ -50,9 +79,12 @@ builder.Services.AddSingleton(TimeProvider.System);
// Register streaming options and coordinators
builder.Services.Configure<StreamOptions>(builder.Configuration.GetSection(StreamOptions.SectionName));
builder.Services.AddSingleton<IJobStreamCoordinator, JobStreamCoordinator>();
builder.Services.AddSingleton<IRunStreamCoordinator, RunStreamCoordinator>();
builder.Services.AddSingleton<IPackRunStreamCoordinator, PackRunStreamCoordinator>();
builder.Services.AddScoped<IJobStreamCoordinator, JobStreamCoordinator>();
builder.Services.AddScoped<IRunStreamCoordinator, RunStreamCoordinator>();
builder.Services.AddScoped<IPackRunStreamCoordinator, PackRunStreamCoordinator>();
// Optional TTFS snapshot writer (disabled by default via config)
builder.Services.AddHostedService<FirstSignalSnapshotWriter>();
// Register scale metrics and load shedding services
builder.Services.AddSingleton<ScaleMetrics>();
@@ -85,6 +117,7 @@ app.MapScaleEndpoints();
// Register API endpoints
app.MapSourceEndpoints();
app.MapRunEndpoints();
app.MapFirstSignalEndpoints();
app.MapJobEndpoints();
app.MapDagEndpoints();
app.MapPackRunEndpoints();

View File

@@ -10,6 +10,7 @@ public sealed class TenantResolver
{
private readonly OrchestratorServiceOptions _options;
private const string DefaultTenantHeader = "X-Tenant-Id";
private const string DefaultTenantQueryParam = "tenant";
public TenantResolver(IOptions<OrchestratorServiceOptions> options)
{
@@ -44,6 +45,31 @@ public sealed class TenantResolver
return tenantId.Trim();
}
/// <summary>
/// Resolves the tenant ID for streaming endpoints.
/// EventSource cannot set custom headers, so we allow a query string fallback.
/// </summary>
/// <param name="context">HTTP context.</param>
/// <returns>Tenant ID.</returns>
public string ResolveForStreaming(HttpContext context)
{
ArgumentNullException.ThrowIfNull(context);
if (TryResolve(context, out var tenantId) && !string.IsNullOrWhiteSpace(tenantId))
{
return tenantId;
}
if (TryResolveFromQuery(context, out tenantId) && !string.IsNullOrWhiteSpace(tenantId))
{
return tenantId;
}
var headerName = _options.TenantHeader ?? DefaultTenantHeader;
throw new InvalidOperationException(
$"Tenant header '{headerName}' or query parameter '{DefaultTenantQueryParam}' is required for Orchestrator streaming operations.");
}
/// <summary>
/// Tries to resolve the tenant ID from the request headers.
/// </summary>
@@ -75,4 +101,23 @@ public sealed class TenantResolver
tenantId = value.Trim();
return true;
}
private static bool TryResolveFromQuery(HttpContext context, out string? tenantId)
{
tenantId = null;
if (context is null)
{
return false;
}
var value = context.Request.Query[DefaultTenantQueryParam].ToString();
if (string.IsNullOrWhiteSpace(value))
{
return false;
}
tenantId = value.Trim();
return true;
}
}

View File

@@ -35,6 +35,11 @@
<ProjectReference Include="..\..\..\Telemetry\StellaOps.Telemetry.Core\StellaOps.Telemetry.Core\StellaOps.Telemetry.Core.csproj"/>
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Messaging\StellaOps.Messaging.csproj" />
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Messaging.Transport.InMemory\StellaOps.Messaging.Transport.InMemory.csproj" />
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Messaging.Transport.Postgres\StellaOps.Messaging.Transport.Postgres.csproj" />
<ProjectReference Include="..\..\..\__Libraries\StellaOps.Messaging.Transport.Valkey\StellaOps.Messaging.Transport.Valkey.csproj" />
</ItemGroup>

View File

@@ -1,6 +1,7 @@
using System.Text.Json;
using Microsoft.Extensions.Options;
using StellaOps.Orchestrator.Core.Domain;
using StellaOps.Orchestrator.Core.Services;
using StellaOps.Orchestrator.Infrastructure.Repositories;
namespace StellaOps.Orchestrator.WebService.Streaming;
@@ -24,17 +25,20 @@ public sealed class RunStreamCoordinator : IRunStreamCoordinator
private static readonly JsonSerializerOptions SerializerOptions = new(JsonSerializerDefaults.Web);
private readonly IRunRepository _runRepository;
private readonly IFirstSignalService _firstSignalService;
private readonly TimeProvider _timeProvider;
private readonly ILogger<RunStreamCoordinator> _logger;
private readonly StreamOptions _options;
public RunStreamCoordinator(
IRunRepository runRepository,
IFirstSignalService firstSignalService,
IOptions<StreamOptions> options,
TimeProvider? timeProvider,
ILogger<RunStreamCoordinator> logger)
{
_runRepository = runRepository ?? throw new ArgumentNullException(nameof(runRepository));
_firstSignalService = firstSignalService ?? throw new ArgumentNullException(nameof(firstSignalService));
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value.Validate();
@@ -49,9 +53,12 @@ public sealed class RunStreamCoordinator : IRunStreamCoordinator
SseWriter.ConfigureSseHeaders(response);
await SseWriter.WriteRetryAsync(response, _options.ReconnectDelay, cancellationToken).ConfigureAwait(false);
string? lastFirstSignalEtag = null;
var lastRun = initialRun;
await SseWriter.WriteEventAsync(response, "initial", RunSnapshotPayload.FromRun(lastRun), SerializerOptions, cancellationToken).ConfigureAwait(false);
await SseWriter.WriteEventAsync(response, "heartbeat", HeartbeatPayload.Create(_timeProvider.GetUtcNow(), lastRun.RunId.ToString()), SerializerOptions, cancellationToken).ConfigureAwait(false);
lastFirstSignalEtag = await EmitFirstSignalIfUpdatedAsync(response, tenantId, lastRun.RunId, lastFirstSignalEtag, cancellationToken).ConfigureAwait(false);
// If already terminal, send completed and exit
if (IsTerminal(lastRun.Status))
@@ -91,6 +98,8 @@ public sealed class RunStreamCoordinator : IRunStreamCoordinator
break;
}
lastFirstSignalEtag = await EmitFirstSignalIfUpdatedAsync(response, tenantId, current.RunId, lastFirstSignalEtag, cancellationToken).ConfigureAwait(false);
if (HasChanged(lastRun, current))
{
await EmitProgressAsync(response, current, cancellationToken).ConfigureAwait(false);
@@ -162,6 +171,45 @@ public sealed class RunStreamCoordinator : IRunStreamCoordinator
await SseWriter.WriteEventAsync(response, "completed", payload, SerializerOptions, cancellationToken).ConfigureAwait(false);
}
private async Task<string?> EmitFirstSignalIfUpdatedAsync(
HttpResponse response,
string tenantId,
Guid runId,
string? lastFirstSignalEtag,
CancellationToken cancellationToken)
{
try
{
var result = await _firstSignalService
.GetFirstSignalAsync(runId, tenantId, lastFirstSignalEtag, cancellationToken)
.ConfigureAwait(false);
if (result.Status != FirstSignalResultStatus.Found || result.Signal is null || string.IsNullOrWhiteSpace(result.ETag))
{
return lastFirstSignalEtag;
}
await SseWriter.WriteEventAsync(
response,
"first_signal",
new { runId, signal = result.Signal, etag = result.ETag },
SerializerOptions,
cancellationToken)
.ConfigureAwait(false);
return result.ETag;
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
return lastFirstSignalEtag;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to emit first_signal event for run {RunId}.", runId);
return lastFirstSignalEtag;
}
}
private static bool IsTerminal(RunStatus status) =>
status is RunStatus.Succeeded or RunStatus.PartiallySucceeded or RunStatus.Failed or RunStatus.Canceled;
}

View File

@@ -21,3 +21,13 @@ Status mirror for `docs/implplan/SPRINT_0152_0001_0002_orchestrator_ii.md`. Upda
| 15 | ORCH-SVC-37-101 | DONE | Scheduled exports, pruning, failure alerting. |
Last synced: 2025-11-30 (UTC).
## SPRINT_0339_0001_0001 First Signal API
Status mirror for `docs/implplan/SPRINT_0339_0001_0001_first_signal_api.md`. Update alongside the sprint file to avoid drift.
| # | Task ID | Status | Notes |
| --- | --- | --- | --- |
| 1 | ORCH-TTFS-0339-001 | DONE | First signal API delivered (service/repo/cache/endpoint/ETag/SSE/tests/docs). |
Last synced: 2025-12-15 (UTC).