sprints work

This commit is contained in:
master
2026-01-10 11:15:28 +02:00
parent a21d3dbc1f
commit 701eb6b21c
71 changed files with 10854 additions and 136 deletions

View File

@@ -0,0 +1,165 @@
// <copyright file="AgentRegistration.cs" company="StellaOps">
// Copyright (c) StellaOps. Licensed under the AGPL-3.0-or-later.
// </copyright>
using System.Collections.Immutable;
namespace StellaOps.Signals.RuntimeAgent;
/// <summary>
/// Represents a registered runtime agent.
/// Sprint: SPRINT_20260109_009_004 Task: Implement AgentRegistrationService
/// </summary>
public sealed record AgentRegistration
{
/// <summary>Unique agent identifier.</summary>
public required string AgentId { get; init; }
/// <summary>Target platform.</summary>
public required RuntimePlatform Platform { get; init; }
/// <summary>Hostname where agent is running.</summary>
public required string Hostname { get; init; }
/// <summary>Container ID if running in container.</summary>
public string? ContainerId { get; init; }
/// <summary>Kubernetes namespace if running in K8s.</summary>
public string? KubernetesNamespace { get; init; }
/// <summary>Kubernetes pod name if running in K8s.</summary>
public string? KubernetesPodName { get; init; }
/// <summary>Target application name.</summary>
public string? ApplicationName { get; init; }
/// <summary>Target process ID.</summary>
public int? ProcessId { get; init; }
/// <summary>Agent version.</summary>
public required string AgentVersion { get; init; }
/// <summary>Registration timestamp.</summary>
public required DateTimeOffset RegisteredAt { get; init; }
/// <summary>Last heartbeat timestamp.</summary>
public DateTimeOffset LastHeartbeat { get; init; }
/// <summary>Current agent state.</summary>
public AgentState State { get; init; } = AgentState.Stopped;
/// <summary>Current posture.</summary>
public RuntimePosture Posture { get; init; } = RuntimePosture.Sampled;
/// <summary>Tags for grouping/filtering.</summary>
public ImmutableDictionary<string, string> Tags { get; init; } =
ImmutableDictionary<string, string>.Empty;
/// <summary>
/// Checks if the agent is considered healthy (recent heartbeat).
/// </summary>
public bool IsHealthy(DateTimeOffset now, TimeSpan heartbeatTimeout)
{
return now - LastHeartbeat < heartbeatTimeout;
}
}
/// <summary>
/// Agent registration request.
/// </summary>
public sealed record AgentRegistrationRequest
{
/// <summary>Unique agent identifier (generated by agent).</summary>
public required string AgentId { get; init; }
/// <summary>Target platform.</summary>
public required RuntimePlatform Platform { get; init; }
/// <summary>Hostname where agent is running.</summary>
public required string Hostname { get; init; }
/// <summary>Container ID if running in container.</summary>
public string? ContainerId { get; init; }
/// <summary>Kubernetes namespace if running in K8s.</summary>
public string? KubernetesNamespace { get; init; }
/// <summary>Kubernetes pod name if running in K8s.</summary>
public string? KubernetesPodName { get; init; }
/// <summary>Target application name.</summary>
public string? ApplicationName { get; init; }
/// <summary>Target process ID.</summary>
public int? ProcessId { get; init; }
/// <summary>Agent version.</summary>
public required string AgentVersion { get; init; }
/// <summary>Initial posture.</summary>
public RuntimePosture InitialPosture { get; init; } = RuntimePosture.Sampled;
/// <summary>Tags for grouping/filtering.</summary>
public ImmutableDictionary<string, string> Tags { get; init; } =
ImmutableDictionary<string, string>.Empty;
}
/// <summary>
/// Agent heartbeat request.
/// </summary>
public sealed record AgentHeartbeatRequest
{
/// <summary>Agent identifier.</summary>
public required string AgentId { get; init; }
/// <summary>Current agent state.</summary>
public required AgentState State { get; init; }
/// <summary>Current posture.</summary>
public required RuntimePosture Posture { get; init; }
/// <summary>Statistics snapshot.</summary>
public AgentStatistics? Statistics { get; init; }
}
/// <summary>
/// Agent heartbeat response.
/// </summary>
public sealed record AgentHeartbeatResponse
{
/// <summary>Whether the agent should continue.</summary>
public bool Continue { get; init; } = true;
/// <summary>New posture if changed.</summary>
public RuntimePosture? NewPosture { get; init; }
/// <summary>Command to execute.</summary>
public AgentCommand? Command { get; init; }
}
/// <summary>
/// Commands that can be sent to agents.
/// </summary>
public enum AgentCommand
{
/// <summary>No command.</summary>
None = 0,
/// <summary>Start collection.</summary>
Start = 1,
/// <summary>Stop collection.</summary>
Stop = 2,
/// <summary>Pause collection.</summary>
Pause = 3,
/// <summary>Resume collection.</summary>
Resume = 4,
/// <summary>Update configuration.</summary>
UpdateConfig = 5,
/// <summary>Terminate agent.</summary>
Terminate = 6
}

View File

@@ -0,0 +1,264 @@
// <copyright file="AgentRegistrationService.cs" company="StellaOps">
// Copyright (c) StellaOps. Licensed under the AGPL-3.0-or-later.
// </copyright>
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
namespace StellaOps.Signals.RuntimeAgent;
/// <summary>
/// In-memory implementation of agent registration service.
/// Sprint: SPRINT_20260109_009_004 Task: Implement AgentRegistrationService
/// </summary>
/// <remarks>
/// This implementation uses in-memory storage. For production use with persistence,
/// implement a database-backed version using the same interface.
/// </remarks>
public sealed class AgentRegistrationService : IAgentRegistrationService
{
private readonly TimeProvider _timeProvider;
private readonly ILogger<AgentRegistrationService> _logger;
private readonly ConcurrentDictionary<string, AgentRegistration> _registrations = new();
private readonly ConcurrentDictionary<string, AgentCommand> _pendingCommands = new();
private readonly ConcurrentDictionary<string, RuntimePosture> _pendingPostureChanges = new();
/// <summary>
/// Heartbeat timeout for considering agents unhealthy.
/// </summary>
public TimeSpan HeartbeatTimeout { get; set; } = TimeSpan.FromMinutes(2);
public AgentRegistrationService(TimeProvider timeProvider, ILogger<AgentRegistrationService> logger)
{
_timeProvider = timeProvider;
_logger = logger;
}
/// <inheritdoc/>
public Task<AgentRegistration> RegisterAsync(AgentRegistrationRequest request, CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(request.AgentId);
ArgumentException.ThrowIfNullOrWhiteSpace(request.Hostname);
ArgumentException.ThrowIfNullOrWhiteSpace(request.AgentVersion);
var now = _timeProvider.GetUtcNow();
var registration = new AgentRegistration
{
AgentId = request.AgentId,
Platform = request.Platform,
Hostname = request.Hostname,
ContainerId = request.ContainerId,
KubernetesNamespace = request.KubernetesNamespace,
KubernetesPodName = request.KubernetesPodName,
ApplicationName = request.ApplicationName,
ProcessId = request.ProcessId,
AgentVersion = request.AgentVersion,
RegisteredAt = now,
LastHeartbeat = now,
State = AgentState.Stopped,
Posture = request.InitialPosture,
Tags = request.Tags
};
_registrations.AddOrUpdate(
request.AgentId,
registration,
(_, existing) =>
{
_logger.LogWarning(
"Agent {AgentId} re-registered (previous: {PreviousRegistration})",
request.AgentId,
existing.RegisteredAt);
return registration;
});
_logger.LogInformation(
"Agent {AgentId} registered: Platform={Platform}, Host={Hostname}, App={Application}",
request.AgentId,
request.Platform,
request.Hostname,
request.ApplicationName ?? "N/A");
return Task.FromResult(registration);
}
/// <inheritdoc/>
public Task<AgentHeartbeatResponse> HeartbeatAsync(AgentHeartbeatRequest request, CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(request.AgentId);
var now = _timeProvider.GetUtcNow();
if (!_registrations.TryGetValue(request.AgentId, out var existing))
{
_logger.LogWarning("Heartbeat from unknown agent {AgentId}", request.AgentId);
return Task.FromResult(new AgentHeartbeatResponse { Continue = false });
}
// Update registration with heartbeat info
var updated = existing with
{
LastHeartbeat = now,
State = request.State,
Posture = request.Posture
};
_registrations.TryUpdate(request.AgentId, updated, existing);
// Check for pending commands
_pendingCommands.TryRemove(request.AgentId, out var pendingCommand);
_pendingPostureChanges.TryRemove(request.AgentId, out var pendingPosture);
var response = new AgentHeartbeatResponse
{
Continue = true,
Command = pendingCommand != AgentCommand.None ? pendingCommand : null,
NewPosture = pendingPosture != default ? pendingPosture : null
};
_logger.LogDebug(
"Heartbeat from {AgentId}: State={State}, Posture={Posture}, Events={Events}",
request.AgentId,
request.State,
request.Posture,
request.Statistics?.TotalEventsCollected ?? 0);
return Task.FromResult(response);
}
/// <inheritdoc/>
public Task UnregisterAsync(string agentId, CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(agentId);
if (_registrations.TryRemove(agentId, out var removed))
{
_pendingCommands.TryRemove(agentId, out _);
_pendingPostureChanges.TryRemove(agentId, out _);
_logger.LogInformation(
"Agent {AgentId} unregistered (was registered since {RegisteredAt})",
agentId,
removed.RegisteredAt);
}
else
{
_logger.LogDebug("Attempted to unregister unknown agent {AgentId}", agentId);
}
return Task.CompletedTask;
}
/// <inheritdoc/>
public Task<AgentRegistration?> GetAsync(string agentId, CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(agentId);
_registrations.TryGetValue(agentId, out var registration);
return Task.FromResult(registration);
}
/// <inheritdoc/>
public Task<IReadOnlyList<AgentRegistration>> ListAsync(CancellationToken ct = default)
{
var result = _registrations.Values.ToList();
return Task.FromResult<IReadOnlyList<AgentRegistration>>(result);
}
/// <inheritdoc/>
public Task<IReadOnlyList<AgentRegistration>> ListByPlatformAsync(RuntimePlatform platform, CancellationToken ct = default)
{
var result = _registrations.Values
.Where(r => r.Platform == platform)
.ToList();
return Task.FromResult<IReadOnlyList<AgentRegistration>>(result);
}
/// <inheritdoc/>
public Task<IReadOnlyList<AgentRegistration>> ListHealthyAsync(CancellationToken ct = default)
{
var now = _timeProvider.GetUtcNow();
var result = _registrations.Values
.Where(r => r.IsHealthy(now, HeartbeatTimeout))
.ToList();
return Task.FromResult<IReadOnlyList<AgentRegistration>>(result);
}
/// <inheritdoc/>
public Task SendCommandAsync(string agentId, AgentCommand command, CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(agentId);
if (!_registrations.ContainsKey(agentId))
{
_logger.LogWarning("Cannot send command to unknown agent {AgentId}", agentId);
return Task.CompletedTask;
}
_pendingCommands[agentId] = command;
_logger.LogInformation(
"Queued command {Command} for agent {AgentId}",
command,
agentId);
return Task.CompletedTask;
}
/// <inheritdoc/>
public Task UpdatePostureAsync(string agentId, RuntimePosture posture, CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(agentId);
if (!_registrations.ContainsKey(agentId))
{
_logger.LogWarning("Cannot update posture for unknown agent {AgentId}", agentId);
return Task.CompletedTask;
}
_pendingPostureChanges[agentId] = posture;
_logger.LogInformation(
"Queued posture change to {Posture} for agent {AgentId}",
posture,
agentId);
return Task.CompletedTask;
}
/// <summary>
/// Prune stale registrations (no heartbeat within timeout).
/// </summary>
/// <returns>Number of pruned registrations.</returns>
public int PruneStale()
{
var now = _timeProvider.GetUtcNow();
var pruned = 0;
foreach (var (agentId, registration) in _registrations)
{
if (!registration.IsHealthy(now, HeartbeatTimeout))
{
if (_registrations.TryRemove(agentId, out _))
{
_pendingCommands.TryRemove(agentId, out _);
_pendingPostureChanges.TryRemove(agentId, out _);
pruned++;
_logger.LogWarning(
"Pruned stale agent {AgentId} (last heartbeat: {LastHeartbeat})",
agentId,
registration.LastHeartbeat);
}
}
}
return pruned;
}
/// <summary>
/// Gets count of registered agents.
/// </summary>
public int Count => _registrations.Count;
}

View File

@@ -0,0 +1,294 @@
// <copyright file="ClrMethodResolver.cs" company="StellaOps">
// Copyright (c) StellaOps. Licensed under the AGPL-3.0-or-later.
// </copyright>
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Globalization;
using System.Text.RegularExpressions;
using Microsoft.Extensions.Logging;
namespace StellaOps.Signals.RuntimeAgent;
/// <summary>
/// Resolves CLR method IDs from ETW/EventPipe events to readable method names.
/// Sprint: SPRINT_20260109_009_004 Task: Implement ClrMethodResolver
/// </summary>
/// <remarks>
/// CLR runtime events use method IDs (MethodID/FunctionID) that need to be resolved
/// to human-readable names. This class maintains caches from MethodLoad/MethodILToNativeMap
/// events and provides resolution services.
/// </remarks>
public sealed partial class ClrMethodResolver
{
private readonly ILogger<ClrMethodResolver> _logger;
private readonly TimeProvider _timeProvider;
// Method ID to name cache (from MethodLoad events)
private readonly ConcurrentDictionary<ulong, ResolvedMethod> _methodIdCache = new();
// Module ID to name cache (from ModuleLoad events)
private readonly ConcurrentDictionary<ulong, ModuleInfo> _moduleIdCache = new();
// Assembly ID to name cache (from AssemblyLoad events)
private readonly ConcurrentDictionary<ulong, string> _assemblyIdCache = new();
public ClrMethodResolver(TimeProvider timeProvider, ILogger<ClrMethodResolver> logger)
{
_timeProvider = timeProvider;
_logger = logger;
}
/// <summary>
/// Gets the number of resolved methods in cache.
/// </summary>
public int CachedMethodCount => _methodIdCache.Count;
/// <summary>
/// Gets the number of resolved modules in cache.
/// </summary>
public int CachedModuleCount => _moduleIdCache.Count;
/// <summary>
/// Registers a module from ModuleLoad event.
/// </summary>
public void RegisterModule(ulong moduleId, ulong assemblyId, string modulePath, string simpleName)
{
var info = new ModuleInfo(moduleId, assemblyId, modulePath, simpleName);
_moduleIdCache[moduleId] = info;
_logger.LogDebug("Registered module {ModuleId}: {SimpleName}", moduleId, simpleName);
}
/// <summary>
/// Registers an assembly from AssemblyLoad event.
/// </summary>
public void RegisterAssembly(ulong assemblyId, string assemblyName)
{
_assemblyIdCache[assemblyId] = assemblyName;
_logger.LogDebug("Registered assembly {AssemblyId}: {AssemblyName}", assemblyId, assemblyName);
}
/// <summary>
/// Registers a method from MethodLoad event.
/// </summary>
public void RegisterMethod(
ulong methodId,
ulong moduleId,
string methodNamespace,
string methodName,
string methodSignature)
{
var resolved = new ResolvedMethod(
MethodId: methodId,
ModuleId: moduleId,
Namespace: methodNamespace,
TypeName: ExtractTypeName(methodNamespace),
MethodName: methodName,
Signature: methodSignature,
ResolvedAt: _timeProvider.GetUtcNow());
_methodIdCache[methodId] = resolved;
_logger.LogDebug(
"Registered method {MethodId}: {Namespace}.{Method}",
methodId, methodNamespace, methodName);
}
/// <summary>
/// Resolves a method ID to its full name.
/// </summary>
/// <returns>Resolved method info or null if not found.</returns>
public ResolvedMethod? ResolveMethod(ulong methodId)
{
return _methodIdCache.TryGetValue(methodId, out var resolved) ? resolved : null;
}
/// <summary>
/// Resolves a method ID to a RuntimeMethodEvent.
/// </summary>
public RuntimeMethodEvent? ResolveToEvent(
ulong methodId,
RuntimeEventKind eventKind,
string eventId,
DateTimeOffset timestamp,
int? processId = null,
string? threadId = null)
{
if (!_methodIdCache.TryGetValue(methodId, out var resolved))
{
_logger.LogDebug("Method {MethodId} not found in cache", methodId);
return null;
}
var assemblyName = GetAssemblyForModule(resolved.ModuleId);
return new RuntimeMethodEvent
{
EventId = eventId,
SymbolId = FormatSymbolId(methodId),
MethodName = resolved.MethodName,
TypeName = resolved.TypeName,
AssemblyOrModule = assemblyName ?? "Unknown",
Timestamp = timestamp,
Kind = eventKind,
Platform = RuntimePlatform.DotNet,
ProcessId = processId,
ThreadId = threadId,
Context = new Dictionary<string, string>
{
["MethodId"] = methodId.ToString("X16", CultureInfo.InvariantCulture),
["Namespace"] = resolved.Namespace,
["Signature"] = resolved.Signature
}.ToImmutableDictionary()
};
}
/// <summary>
/// Tries to parse an ETW-style method reference like "MethodID=0x06000123".
/// </summary>
public bool TryParseEtwMethodId(string etwString, out ulong methodId)
{
methodId = 0;
var match = EtwMethodIdRegex().Match(etwString);
if (!match.Success)
return false;
var hexValue = match.Groups["id"].Value;
return ulong.TryParse(
hexValue,
NumberStyles.HexNumber,
CultureInfo.InvariantCulture,
out methodId);
}
/// <summary>
/// Parses a full ETW method string with module info.
/// Example: "MethodID=0x06000123 ModuleID=0x00007FF8ABC12340"
/// </summary>
public (ulong MethodId, ulong ModuleId)? ParseEtwMethodWithModule(string etwString)
{
var methodMatch = EtwMethodIdRegex().Match(etwString);
var moduleMatch = EtwModuleIdRegex().Match(etwString);
if (!methodMatch.Success)
return null;
var methodHex = methodMatch.Groups["id"].Value;
if (!ulong.TryParse(methodHex, NumberStyles.HexNumber, CultureInfo.InvariantCulture, out var methodId))
return null;
ulong moduleId = 0;
if (moduleMatch.Success)
{
var moduleHex = moduleMatch.Groups["id"].Value;
ulong.TryParse(moduleHex, NumberStyles.HexNumber, CultureInfo.InvariantCulture, out moduleId);
}
return (methodId, moduleId);
}
/// <summary>
/// Formats a method ID as a symbol ID string.
/// </summary>
public static string FormatSymbolId(ulong methodId)
{
return $"clr:method:{methodId:X16}";
}
/// <summary>
/// Clears all caches.
/// </summary>
public void Clear()
{
_methodIdCache.Clear();
_moduleIdCache.Clear();
_assemblyIdCache.Clear();
_logger.LogDebug("Cleared all method resolution caches");
}
/// <summary>
/// Gets statistics about the resolver.
/// </summary>
public ClrMethodResolverStats GetStatistics()
{
return new ClrMethodResolverStats(
CachedMethods: _methodIdCache.Count,
CachedModules: _moduleIdCache.Count,
CachedAssemblies: _assemblyIdCache.Count);
}
private string? GetAssemblyForModule(ulong moduleId)
{
if (!_moduleIdCache.TryGetValue(moduleId, out var moduleInfo))
return null;
if (_assemblyIdCache.TryGetValue(moduleInfo.AssemblyId, out var assemblyName))
return assemblyName;
return moduleInfo.SimpleName;
}
private static string ExtractTypeName(string fullNamespace)
{
if (string.IsNullOrEmpty(fullNamespace))
return "_";
// Take the last part after the dot
var lastDot = fullNamespace.LastIndexOf('.');
return lastDot >= 0 && lastDot < fullNamespace.Length - 1
? fullNamespace[(lastDot + 1)..]
: fullNamespace;
}
[GeneratedRegex(@"MethodID=0x(?<id>[0-9A-Fa-f]+)", RegexOptions.Compiled)]
private static partial Regex EtwMethodIdRegex();
[GeneratedRegex(@"ModuleID=0x(?<id>[0-9A-Fa-f]+)", RegexOptions.Compiled)]
private static partial Regex EtwModuleIdRegex();
}
/// <summary>
/// Resolved method information.
/// </summary>
public sealed record ResolvedMethod(
ulong MethodId,
ulong ModuleId,
string Namespace,
string TypeName,
string MethodName,
string Signature,
DateTimeOffset ResolvedAt)
{
/// <summary>
/// Gets the fully qualified name.
/// </summary>
public string FullyQualifiedName => string.IsNullOrEmpty(Namespace)
? MethodName
: $"{Namespace}.{MethodName}";
/// <summary>
/// Gets the display name with signature.
/// </summary>
public string DisplayName => $"{FullyQualifiedName}{Signature}";
}
/// <summary>
/// Module information.
/// </summary>
public sealed record ModuleInfo(
ulong ModuleId,
ulong AssemblyId,
string ModulePath,
string SimpleName);
/// <summary>
/// Statistics about the method resolver.
/// </summary>
public sealed record ClrMethodResolverStats(
int CachedMethods,
int CachedModules,
int CachedAssemblies);

View File

@@ -0,0 +1,81 @@
// <copyright file="IAgentRegistrationService.cs" company="StellaOps">
// Copyright (c) StellaOps. Licensed under the AGPL-3.0-or-later.
// </copyright>
namespace StellaOps.Signals.RuntimeAgent;
/// <summary>
/// Service for managing runtime agent registrations.
/// Sprint: SPRINT_20260109_009_004 Task: Implement AgentRegistrationService
/// </summary>
public interface IAgentRegistrationService
{
/// <summary>
/// Register a new agent.
/// </summary>
/// <param name="request">Registration request.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>The registration record.</returns>
Task<AgentRegistration> RegisterAsync(AgentRegistrationRequest request, CancellationToken ct = default);
/// <summary>
/// Process agent heartbeat.
/// </summary>
/// <param name="request">Heartbeat request.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>Response with commands.</returns>
Task<AgentHeartbeatResponse> HeartbeatAsync(AgentHeartbeatRequest request, CancellationToken ct = default);
/// <summary>
/// Unregister an agent.
/// </summary>
/// <param name="agentId">Agent identifier.</param>
/// <param name="ct">Cancellation token.</param>
Task UnregisterAsync(string agentId, CancellationToken ct = default);
/// <summary>
/// Get registration by agent ID.
/// </summary>
/// <param name="agentId">Agent identifier.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>Registration or null if not found.</returns>
Task<AgentRegistration?> GetAsync(string agentId, CancellationToken ct = default);
/// <summary>
/// List all registered agents.
/// </summary>
/// <param name="ct">Cancellation token.</param>
/// <returns>All registrations.</returns>
Task<IReadOnlyList<AgentRegistration>> ListAsync(CancellationToken ct = default);
/// <summary>
/// List agents by platform.
/// </summary>
/// <param name="platform">Platform filter.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>Matching registrations.</returns>
Task<IReadOnlyList<AgentRegistration>> ListByPlatformAsync(RuntimePlatform platform, CancellationToken ct = default);
/// <summary>
/// List healthy agents (recent heartbeat).
/// </summary>
/// <param name="ct">Cancellation token.</param>
/// <returns>Healthy registrations.</returns>
Task<IReadOnlyList<AgentRegistration>> ListHealthyAsync(CancellationToken ct = default);
/// <summary>
/// Send command to an agent.
/// </summary>
/// <param name="agentId">Agent identifier.</param>
/// <param name="command">Command to send.</param>
/// <param name="ct">Cancellation token.</param>
Task SendCommandAsync(string agentId, AgentCommand command, CancellationToken ct = default);
/// <summary>
/// Update agent posture.
/// </summary>
/// <param name="agentId">Agent identifier.</param>
/// <param name="posture">New posture.</param>
/// <param name="ct">Cancellation token.</param>
Task UpdatePostureAsync(string agentId, RuntimePosture posture, CancellationToken ct = default);
}

View File

@@ -44,42 +44,3 @@ public interface IRuntimeFactsIngest
/// <param name="ct">Cancellation token.</param>
Task UnregisterAgentAsync(string agentId, CancellationToken ct);
}
/// <summary>
/// Agent registration information.
/// </summary>
public sealed record AgentRegistration
{
/// <summary>Unique agent ID.</summary>
public required string AgentId { get; init; }
/// <summary>Target platform.</summary>
public required RuntimePlatform Platform { get; init; }
/// <summary>Agent version.</summary>
public required string AgentVersion { get; init; }
/// <summary>Hostname.</summary>
public required string Hostname { get; init; }
/// <summary>Container ID if applicable.</summary>
public string? ContainerId { get; init; }
/// <summary>Kubernetes pod name if applicable.</summary>
public string? PodName { get; init; }
/// <summary>Kubernetes namespace if applicable.</summary>
public string? Namespace { get; init; }
/// <summary>Target process name.</summary>
public string? ProcessName { get; init; }
/// <summary>Target process ID.</summary>
public int? ProcessId { get; init; }
/// <summary>Registration timestamp.</summary>
public required DateTimeOffset RegisteredAt { get; init; }
/// <summary>Initial posture.</summary>
public RuntimePosture Posture { get; init; } = RuntimePosture.Sampled;
}

View File

@@ -0,0 +1,303 @@
// <copyright file="RuntimeFactsIngestService.cs" company="StellaOps">
// Copyright (c) StellaOps. Licensed under the AGPL-3.0-or-later.
// </copyright>
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
namespace StellaOps.Signals.RuntimeAgent;
/// <summary>
/// Service for ingesting and processing runtime facts from agents.
/// Sprint: SPRINT_20260109_009_004 Task: Implement RuntimeFactsIngestService
/// </summary>
/// <remarks>
/// This implementation buffers events in memory and aggregates them by symbol.
/// For production use, integrate with persistence and the Signals module.
/// </remarks>
public sealed class RuntimeFactsIngestService : IRuntimeFactsIngest, IAsyncDisposable
{
private readonly TimeProvider _timeProvider;
private readonly ILogger<RuntimeFactsIngestService> _logger;
private readonly IAgentRegistrationService _registrationService;
// Event buffer channel for async processing
private readonly Channel<IngestBatch> _ingestChannel;
private readonly Task _processingTask;
private readonly CancellationTokenSource _cts = new();
// Symbol observation tracking
private readonly ConcurrentDictionary<string, SymbolObservation> _observations = new();
// Statistics
private long _totalEventsIngested;
private long _totalBatchesProcessed;
public RuntimeFactsIngestService(
TimeProvider timeProvider,
IAgentRegistrationService registrationService,
ILogger<RuntimeFactsIngestService> logger)
{
_timeProvider = timeProvider;
_registrationService = registrationService;
_logger = logger;
// Create bounded channel to prevent memory issues
_ingestChannel = Channel.CreateBounded<IngestBatch>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false
});
// Start background processing
_processingTask = ProcessIngestChannelAsync(_cts.Token);
}
/// <inheritdoc/>
public async Task<int> IngestAsync(
string agentId,
IReadOnlyList<RuntimeMethodEvent> events,
CancellationToken ct)
{
ArgumentException.ThrowIfNullOrWhiteSpace(agentId);
if (events.Count == 0)
return 0;
var batch = new IngestBatch(agentId, events, _timeProvider.GetUtcNow());
await _ingestChannel.Writer.WriteAsync(batch, ct).ConfigureAwait(false);
_logger.LogDebug(
"Queued {Count} events from agent {AgentId}",
events.Count,
agentId);
return events.Count;
}
/// <inheritdoc/>
public Task RegisterAgentAsync(AgentRegistration registration, CancellationToken ct)
{
_logger.LogInformation(
"Agent {AgentId} registered for fact ingestion",
registration.AgentId);
return Task.CompletedTask;
}
/// <inheritdoc/>
public Task HeartbeatAsync(string agentId, AgentStatistics statistics, CancellationToken ct)
{
_logger.LogDebug(
"Heartbeat from {AgentId}: {TotalEvents} events collected",
agentId,
statistics.TotalEventsCollected);
return Task.CompletedTask;
}
/// <inheritdoc/>
public Task UnregisterAgentAsync(string agentId, CancellationToken ct)
{
_logger.LogInformation(
"Agent {AgentId} unregistered from fact ingestion",
agentId);
return Task.CompletedTask;
}
/// <summary>
/// Gets the observation for a symbol.
/// </summary>
public SymbolObservation? GetObservation(string symbolId)
{
_observations.TryGetValue(symbolId, out var observation);
return observation;
}
/// <summary>
/// Gets all symbols observed since a given time.
/// </summary>
public IReadOnlyList<SymbolObservation> GetObservationsSince(DateTimeOffset since)
{
return _observations.Values
.Where(o => o.LastObserved >= since)
.ToList();
}
/// <summary>
/// Gets all unique symbols observed.
/// </summary>
public IReadOnlyList<string> GetObservedSymbols()
{
return _observations.Keys.ToList();
}
/// <summary>
/// Gets ingest statistics.
/// </summary>
public RuntimeFactsIngestStats GetStatistics()
{
return new RuntimeFactsIngestStats(
TotalEventsIngested: Interlocked.Read(ref _totalEventsIngested),
TotalBatchesProcessed: Interlocked.Read(ref _totalBatchesProcessed),
UniqueSymbolsObserved: _observations.Count,
PendingBatches: _ingestChannel.Reader.Count);
}
private async Task ProcessIngestChannelAsync(CancellationToken ct)
{
try
{
await foreach (var batch in _ingestChannel.Reader.ReadAllAsync(ct).ConfigureAwait(false))
{
try
{
ProcessBatch(batch);
Interlocked.Increment(ref _totalBatchesProcessed);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing batch from agent {AgentId}", batch.AgentId);
}
}
}
catch (OperationCanceledException)
{
// Expected during shutdown
}
}
private void ProcessBatch(IngestBatch batch)
{
foreach (var @event in batch.Events)
{
var symbolId = @event.SymbolId;
_observations.AddOrUpdate(
symbolId,
_ => CreateObservation(@event, batch),
(_, existing) => UpdateObservation(existing, @event, batch));
Interlocked.Increment(ref _totalEventsIngested);
}
_logger.LogDebug(
"Processed batch of {Count} events from agent {AgentId}",
batch.Events.Count,
batch.AgentId);
}
private SymbolObservation CreateObservation(RuntimeMethodEvent @event, IngestBatch batch)
{
return new SymbolObservation
{
SymbolId = @event.SymbolId,
MethodName = @event.MethodName,
TypeName = @event.TypeName,
AssemblyOrModule = @event.AssemblyOrModule,
Platform = @event.Platform,
FirstObserved = @event.Timestamp,
LastObserved = @event.Timestamp,
ObservationCount = 1,
AgentIds = [batch.AgentId],
EventKinds = [@event.Kind]
};
}
private static SymbolObservation UpdateObservation(
SymbolObservation existing,
RuntimeMethodEvent @event,
IngestBatch batch)
{
var agentIds = existing.AgentIds.Contains(batch.AgentId)
? existing.AgentIds
: existing.AgentIds.Add(batch.AgentId);
var eventKinds = existing.EventKinds.Contains(@event.Kind)
? existing.EventKinds
: existing.EventKinds.Add(@event.Kind);
return existing with
{
LastObserved = @event.Timestamp > existing.LastObserved
? @event.Timestamp
: existing.LastObserved,
ObservationCount = existing.ObservationCount + 1,
AgentIds = agentIds,
EventKinds = eventKinds
};
}
/// <inheritdoc/>
public async ValueTask DisposeAsync()
{
_cts.Cancel();
_ingestChannel.Writer.Complete();
try
{
await _processingTask.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Expected
}
_cts.Dispose();
}
private sealed record IngestBatch(
string AgentId,
IReadOnlyList<RuntimeMethodEvent> Events,
DateTimeOffset ReceivedAt);
}
/// <summary>
/// Aggregated observation for a symbol.
/// </summary>
public sealed record SymbolObservation
{
/// <summary>Symbol identifier.</summary>
public required string SymbolId { get; init; }
/// <summary>Method name.</summary>
public required string MethodName { get; init; }
/// <summary>Type/class name.</summary>
public required string TypeName { get; init; }
/// <summary>Assembly/module.</summary>
public required string AssemblyOrModule { get; init; }
/// <summary>Platform.</summary>
public required RuntimePlatform Platform { get; init; }
/// <summary>First observation timestamp.</summary>
public required DateTimeOffset FirstObserved { get; init; }
/// <summary>Most recent observation timestamp.</summary>
public required DateTimeOffset LastObserved { get; init; }
/// <summary>Total observation count.</summary>
public required long ObservationCount { get; init; }
/// <summary>Agents that observed this symbol.</summary>
public required ImmutableHashSet<string> AgentIds { get; init; }
/// <summary>Event kinds observed.</summary>
public required ImmutableHashSet<RuntimeEventKind> EventKinds { get; init; }
}
/// <summary>
/// Statistics for the ingest service.
/// </summary>
public sealed record RuntimeFactsIngestStats(
long TotalEventsIngested,
long TotalBatchesProcessed,
int UniqueSymbolsObserved,
int PendingBatches);