sprints work

This commit is contained in:
master
2026-01-10 20:32:13 +02:00
parent 0d5eda86fc
commit 17d0631b8e
189 changed files with 40667 additions and 497 deletions

View File

@@ -0,0 +1,680 @@
// -----------------------------------------------------------------------------
// RuntimeAgentController.cs
// Sprint: SPRINT_20260109_009_004
// Task: API endpoints for runtime agent registration, heartbeat, and facts ingestion
// -----------------------------------------------------------------------------
using System.Collections.Immutable;
using System.ComponentModel.DataAnnotations;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using StellaOps.Signals.RuntimeAgent;
namespace StellaOps.Signals.Api;
/// <summary>
/// API controller for runtime agent management and facts ingestion.
/// Provides endpoints for agent registration, heartbeat, and runtime observation ingestion.
/// </summary>
[ApiController]
[Route("api/v1/agents")]
[Produces("application/json")]
public sealed class RuntimeAgentController : ControllerBase
{
private readonly IAgentRegistrationService _registrationService;
private readonly IRuntimeFactsIngest _factsIngestService;
private readonly ILogger<RuntimeAgentController> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="RuntimeAgentController"/> class.
/// </summary>
public RuntimeAgentController(
IAgentRegistrationService registrationService,
IRuntimeFactsIngest factsIngestService,
ILogger<RuntimeAgentController> logger)
{
_registrationService = registrationService ?? throw new ArgumentNullException(nameof(registrationService));
_factsIngestService = factsIngestService ?? throw new ArgumentNullException(nameof(factsIngestService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>
/// Registers a new runtime agent.
/// </summary>
/// <param name="request">Registration request.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>Agent registration response with agent ID.</returns>
[HttpPost("register")]
[ProducesResponseType(typeof(AgentRegistrationApiResponse), StatusCodes.Status201Created)]
[ProducesResponseType(typeof(ProblemDetails), StatusCodes.Status400BadRequest)]
public async Task<ActionResult<AgentRegistrationApiResponse>> Register(
[FromBody] RegisterAgentApiRequest request,
CancellationToken ct = default)
{
if (string.IsNullOrWhiteSpace(request.AgentId))
{
return BadRequest(new ProblemDetails
{
Title = "Invalid agent ID",
Detail = "The 'agentId' field is required.",
Status = StatusCodes.Status400BadRequest,
});
}
if (string.IsNullOrWhiteSpace(request.Hostname))
{
return BadRequest(new ProblemDetails
{
Title = "Invalid hostname",
Detail = "The 'hostname' field is required.",
Status = StatusCodes.Status400BadRequest,
});
}
_logger.LogInformation(
"Registering agent {AgentId}, hostname {Hostname}, platform {Platform}",
request.AgentId, request.Hostname, request.Platform);
try
{
var registrationRequest = new AgentRegistrationRequest
{
AgentId = request.AgentId,
Platform = request.Platform,
Hostname = request.Hostname,
ContainerId = request.ContainerId,
KubernetesNamespace = request.KubernetesNamespace,
KubernetesPodName = request.KubernetesPodName,
ApplicationName = request.ApplicationName,
ProcessId = request.ProcessId,
AgentVersion = request.AgentVersion ?? "1.0.0",
InitialPosture = request.InitialPosture,
Tags = request.Tags?.ToImmutableDictionary() ?? ImmutableDictionary<string, string>.Empty,
};
var registration = await _registrationService.RegisterAsync(registrationRequest, ct);
var response = MapToApiResponse(registration);
return CreatedAtAction(
nameof(GetAgent),
new { agentId = registration.AgentId },
response);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error registering agent {AgentId}", request.AgentId);
return StatusCode(StatusCodes.Status500InternalServerError, new ProblemDetails
{
Title = "Internal server error",
Detail = "An error occurred while registering the agent.",
Status = StatusCodes.Status500InternalServerError,
});
}
}
/// <summary>
/// Records an agent heartbeat.
/// </summary>
/// <param name="agentId">Agent ID.</param>
/// <param name="request">Heartbeat request with state and statistics.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>Heartbeat response with commands.</returns>
[HttpPost("{agentId}/heartbeat")]
[ProducesResponseType(typeof(AgentHeartbeatApiResponse), StatusCodes.Status200OK)]
[ProducesResponseType(typeof(ProblemDetails), StatusCodes.Status404NotFound)]
public async Task<ActionResult<AgentHeartbeatApiResponse>> Heartbeat(
string agentId,
[FromBody] HeartbeatApiRequest request,
CancellationToken ct = default)
{
_logger.LogDebug("Heartbeat received from agent {AgentId}", agentId);
try
{
var heartbeatRequest = new AgentHeartbeatRequest
{
AgentId = agentId,
State = request.State,
Posture = request.Posture,
Statistics = request.Statistics,
};
var response = await _registrationService.HeartbeatAsync(heartbeatRequest, ct);
return Ok(new AgentHeartbeatApiResponse
{
Continue = response.Continue,
NewPosture = response.NewPosture,
Command = response.Command,
});
}
catch (KeyNotFoundException)
{
return NotFound(new ProblemDetails
{
Title = "Agent not found",
Detail = $"No agent found with ID '{agentId}'.",
Status = StatusCodes.Status404NotFound,
});
}
catch (Exception ex)
{
_logger.LogError(ex, "Error recording heartbeat for agent {AgentId}", agentId);
return StatusCode(StatusCodes.Status500InternalServerError, new ProblemDetails
{
Title = "Internal server error",
Detail = "An error occurred while recording the heartbeat.",
Status = StatusCodes.Status500InternalServerError,
});
}
}
/// <summary>
/// Gets agent details.
/// </summary>
/// <param name="agentId">Agent ID.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>Agent details.</returns>
[HttpGet("{agentId}")]
[ProducesResponseType(typeof(AgentRegistrationApiResponse), StatusCodes.Status200OK)]
[ProducesResponseType(typeof(ProblemDetails), StatusCodes.Status404NotFound)]
public async Task<ActionResult<AgentRegistrationApiResponse>> GetAgent(
string agentId,
CancellationToken ct = default)
{
var registration = await _registrationService.GetAsync(agentId, ct);
if (registration == null)
{
return NotFound(new ProblemDetails
{
Title = "Agent not found",
Detail = $"No agent found with ID '{agentId}'.",
Status = StatusCodes.Status404NotFound,
});
}
return Ok(MapToApiResponse(registration));
}
/// <summary>
/// Lists all registered agents.
/// </summary>
/// <param name="platform">Optional platform filter.</param>
/// <param name="healthyOnly">Only return healthy agents.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>List of agents.</returns>
[HttpGet]
[ProducesResponseType(typeof(AgentListApiResponse), StatusCodes.Status200OK)]
public async Task<ActionResult<AgentListApiResponse>> ListAgents(
[FromQuery(Name = "platform")] RuntimePlatform? platform = null,
[FromQuery(Name = "healthy_only")] bool healthyOnly = false,
CancellationToken ct = default)
{
IReadOnlyList<AgentRegistration> agents;
if (healthyOnly)
{
agents = await _registrationService.ListHealthyAsync(ct);
}
else if (platform.HasValue)
{
agents = await _registrationService.ListByPlatformAsync(platform.Value, ct);
}
else
{
agents = await _registrationService.ListAsync(ct);
}
return Ok(new AgentListApiResponse
{
Agents = agents.Select(MapToApiResponse).ToList(),
TotalCount = agents.Count,
});
}
/// <summary>
/// Deregisters an agent.
/// </summary>
/// <param name="agentId">Agent ID.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>No content on success.</returns>
[HttpDelete("{agentId}")]
[ProducesResponseType(StatusCodes.Status204NoContent)]
[ProducesResponseType(typeof(ProblemDetails), StatusCodes.Status404NotFound)]
public async Task<IActionResult> Unregister(
string agentId,
CancellationToken ct = default)
{
_logger.LogInformation("Unregistering agent {AgentId}", agentId);
try
{
await _registrationService.UnregisterAsync(agentId, ct);
return NoContent();
}
catch (KeyNotFoundException)
{
return NotFound(new ProblemDetails
{
Title = "Agent not found",
Detail = $"No agent found with ID '{agentId}'.",
Status = StatusCodes.Status404NotFound,
});
}
}
/// <summary>
/// Sends a command to an agent.
/// </summary>
/// <param name="agentId">Agent ID.</param>
/// <param name="request">Command request.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>Accepted.</returns>
[HttpPost("{agentId}/commands")]
[ProducesResponseType(StatusCodes.Status202Accepted)]
[ProducesResponseType(typeof(ProblemDetails), StatusCodes.Status404NotFound)]
public async Task<IActionResult> SendCommand(
string agentId,
[FromBody] CommandApiRequest request,
CancellationToken ct = default)
{
_logger.LogInformation(
"Sending command {Command} to agent {AgentId}",
request.Command, agentId);
try
{
await _registrationService.SendCommandAsync(agentId, request.Command, ct);
return Accepted();
}
catch (KeyNotFoundException)
{
return NotFound(new ProblemDetails
{
Title = "Agent not found",
Detail = $"No agent found with ID '{agentId}'.",
Status = StatusCodes.Status404NotFound,
});
}
}
/// <summary>
/// Updates agent posture.
/// </summary>
/// <param name="agentId">Agent ID.</param>
/// <param name="request">Posture update request.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>No content on success.</returns>
[HttpPatch("{agentId}/posture")]
[ProducesResponseType(StatusCodes.Status204NoContent)]
[ProducesResponseType(typeof(ProblemDetails), StatusCodes.Status404NotFound)]
public async Task<IActionResult> UpdatePosture(
string agentId,
[FromBody] PostureUpdateRequest request,
CancellationToken ct = default)
{
_logger.LogInformation(
"Updating posture of agent {AgentId} to {Posture}",
agentId, request.Posture);
try
{
await _registrationService.UpdatePostureAsync(agentId, request.Posture, ct);
return NoContent();
}
catch (KeyNotFoundException)
{
return NotFound(new ProblemDetails
{
Title = "Agent not found",
Detail = $"No agent found with ID '{agentId}'.",
Status = StatusCodes.Status404NotFound,
});
}
}
private static AgentRegistrationApiResponse MapToApiResponse(AgentRegistration registration)
{
return new AgentRegistrationApiResponse
{
AgentId = registration.AgentId,
Platform = registration.Platform,
Hostname = registration.Hostname,
ContainerId = registration.ContainerId,
KubernetesNamespace = registration.KubernetesNamespace,
KubernetesPodName = registration.KubernetesPodName,
ApplicationName = registration.ApplicationName,
ProcessId = registration.ProcessId,
AgentVersion = registration.AgentVersion,
RegisteredAt = registration.RegisteredAt,
LastHeartbeat = registration.LastHeartbeat,
State = registration.State,
Posture = registration.Posture,
Tags = registration.Tags.ToDictionary(kv => kv.Key, kv => kv.Value),
};
}
}
/// <summary>
/// API controller for runtime facts ingestion.
/// </summary>
[ApiController]
[Route("api/v1/agents/{agentId}/facts")]
[Produces("application/json")]
public sealed class RuntimeFactsController : ControllerBase
{
private readonly IRuntimeFactsIngest _factsIngestService;
private readonly ILogger<RuntimeFactsController> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="RuntimeFactsController"/> class.
/// </summary>
public RuntimeFactsController(
IRuntimeFactsIngest factsIngestService,
ILogger<RuntimeFactsController> logger)
{
_factsIngestService = factsIngestService ?? throw new ArgumentNullException(nameof(factsIngestService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>
/// Ingests a batch of runtime method events.
/// </summary>
/// <param name="agentId">Agent ID.</param>
/// <param name="request">Batch of events.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>Ingestion result.</returns>
[HttpPost]
[ProducesResponseType(typeof(FactsIngestApiResponse), StatusCodes.Status200OK)]
[ProducesResponseType(typeof(ProblemDetails), StatusCodes.Status400BadRequest)]
public async Task<ActionResult<FactsIngestApiResponse>> IngestFacts(
string agentId,
[FromBody] FactsIngestApiRequest request,
CancellationToken ct = default)
{
if (request.Events == null || request.Events.Count == 0)
{
return BadRequest(new ProblemDetails
{
Title = "Invalid request",
Detail = "At least one event is required.",
Status = StatusCodes.Status400BadRequest,
});
}
_logger.LogDebug(
"Ingesting {EventCount} events from agent {AgentId}",
request.Events.Count, agentId);
try
{
var events = request.Events.Select(e => new RuntimeMethodEvent
{
EventId = e.EventId ?? Guid.NewGuid().ToString("N"),
SymbolId = e.SymbolId,
MethodName = e.MethodName,
TypeName = e.TypeName,
AssemblyOrModule = e.AssemblyOrModule,
Timestamp = e.Timestamp,
Kind = e.Kind,
ContainerId = e.ContainerId,
ProcessId = e.ProcessId,
ThreadId = e.ThreadId,
CallDepth = e.CallDepth,
DurationMicroseconds = e.DurationMicroseconds,
Context = e.Context?.ToImmutableDictionary() ?? ImmutableDictionary<string, string>.Empty,
});
var result = await _factsIngestService.IngestBatchAsync(agentId, events, ct);
return Ok(new FactsIngestApiResponse
{
AcceptedCount = result.AcceptedCount,
RejectedCount = result.RejectedCount,
AggregatedSymbols = result.AggregatedSymbols,
});
}
catch (Exception ex)
{
_logger.LogError(ex, "Error ingesting facts from agent {AgentId}", agentId);
return StatusCode(StatusCodes.Status500InternalServerError, new ProblemDetails
{
Title = "Internal server error",
Detail = "An error occurred while ingesting facts.",
Status = StatusCodes.Status500InternalServerError,
});
}
}
}
#region API DTOs
/// <summary>
/// Agent registration API request.
/// </summary>
public sealed record RegisterAgentApiRequest
{
/// <summary>Unique agent identifier (generated by agent).</summary>
[Required]
public required string AgentId { get; init; }
/// <summary>Target platform.</summary>
public RuntimePlatform Platform { get; init; } = RuntimePlatform.DotNet;
/// <summary>Hostname where agent is running.</summary>
[Required]
public required string Hostname { get; init; }
/// <summary>Container ID if running in container.</summary>
public string? ContainerId { get; init; }
/// <summary>Kubernetes namespace if running in K8s.</summary>
public string? KubernetesNamespace { get; init; }
/// <summary>Kubernetes pod name if running in K8s.</summary>
public string? KubernetesPodName { get; init; }
/// <summary>Target application name.</summary>
public string? ApplicationName { get; init; }
/// <summary>Target process ID.</summary>
public int? ProcessId { get; init; }
/// <summary>Agent version.</summary>
public string? AgentVersion { get; init; }
/// <summary>Initial posture.</summary>
public RuntimePosture InitialPosture { get; init; } = RuntimePosture.Sampled;
/// <summary>Tags for grouping/filtering.</summary>
public Dictionary<string, string>? Tags { get; init; }
}
/// <summary>
/// Agent registration API response.
/// </summary>
public sealed record AgentRegistrationApiResponse
{
/// <summary>Agent ID.</summary>
public required string AgentId { get; init; }
/// <summary>Platform.</summary>
public required RuntimePlatform Platform { get; init; }
/// <summary>Hostname.</summary>
public required string Hostname { get; init; }
/// <summary>Container ID.</summary>
public string? ContainerId { get; init; }
/// <summary>Kubernetes namespace.</summary>
public string? KubernetesNamespace { get; init; }
/// <summary>Kubernetes pod name.</summary>
public string? KubernetesPodName { get; init; }
/// <summary>Application name.</summary>
public string? ApplicationName { get; init; }
/// <summary>Process ID.</summary>
public int? ProcessId { get; init; }
/// <summary>Agent version.</summary>
public required string AgentVersion { get; init; }
/// <summary>Registered timestamp.</summary>
public required DateTimeOffset RegisteredAt { get; init; }
/// <summary>Last heartbeat timestamp.</summary>
public DateTimeOffset LastHeartbeat { get; init; }
/// <summary>State.</summary>
public AgentState State { get; init; }
/// <summary>Posture.</summary>
public RuntimePosture Posture { get; init; }
/// <summary>Tags.</summary>
public Dictionary<string, string>? Tags { get; init; }
}
/// <summary>
/// Agent heartbeat API request.
/// </summary>
public sealed record HeartbeatApiRequest
{
/// <summary>Current agent state.</summary>
public required AgentState State { get; init; }
/// <summary>Current posture.</summary>
public required RuntimePosture Posture { get; init; }
/// <summary>Statistics snapshot.</summary>
public AgentStatistics? Statistics { get; init; }
}
/// <summary>
/// Agent heartbeat API response.
/// </summary>
public sealed record AgentHeartbeatApiResponse
{
/// <summary>Whether the agent should continue.</summary>
public bool Continue { get; init; } = true;
/// <summary>New posture if changed.</summary>
public RuntimePosture? NewPosture { get; init; }
/// <summary>Command to execute.</summary>
public AgentCommand? Command { get; init; }
}
/// <summary>
/// Agent list API response.
/// </summary>
public sealed record AgentListApiResponse
{
/// <summary>List of agents.</summary>
public required IReadOnlyList<AgentRegistrationApiResponse> Agents { get; init; }
/// <summary>Total count.</summary>
public required int TotalCount { get; init; }
}
/// <summary>
/// Command API request.
/// </summary>
public sealed record CommandApiRequest
{
/// <summary>Command to send.</summary>
[Required]
public required AgentCommand Command { get; init; }
}
/// <summary>
/// Posture update request.
/// </summary>
public sealed record PostureUpdateRequest
{
/// <summary>New posture.</summary>
[Required]
public required RuntimePosture Posture { get; init; }
}
/// <summary>
/// Facts ingest API request.
/// </summary>
public sealed record FactsIngestApiRequest
{
/// <summary>Events to ingest.</summary>
[Required]
public required IReadOnlyList<RuntimeEventApiDto> Events { get; init; }
}
/// <summary>
/// Runtime event API DTO.
/// </summary>
public sealed record RuntimeEventApiDto
{
/// <summary>Event ID (optional, will be generated if not provided).</summary>
public string? EventId { get; init; }
/// <summary>Symbol ID.</summary>
[Required]
public required string SymbolId { get; init; }
/// <summary>Method name.</summary>
[Required]
public required string MethodName { get; init; }
/// <summary>Type name.</summary>
[Required]
public required string TypeName { get; init; }
/// <summary>Assembly or module.</summary>
[Required]
public required string AssemblyOrModule { get; init; }
/// <summary>Timestamp.</summary>
[Required]
public required DateTimeOffset Timestamp { get; init; }
/// <summary>Event kind.</summary>
public RuntimeEventKind Kind { get; init; } = RuntimeEventKind.Sample;
/// <summary>Container ID.</summary>
public string? ContainerId { get; init; }
/// <summary>Process ID.</summary>
public int? ProcessId { get; init; }
/// <summary>Thread ID.</summary>
public string? ThreadId { get; init; }
/// <summary>Call depth.</summary>
public int? CallDepth { get; init; }
/// <summary>Duration in microseconds.</summary>
public long? DurationMicroseconds { get; init; }
/// <summary>Additional context.</summary>
public Dictionary<string, string>? Context { get; init; }
}
/// <summary>
/// Facts ingest API response.
/// </summary>
public sealed record FactsIngestApiResponse
{
/// <summary>Number of accepted events.</summary>
public required int AcceptedCount { get; init; }
/// <summary>Number of rejected events.</summary>
public required int RejectedCount { get; init; }
/// <summary>Number of aggregated symbols.</summary>
public required int AggregatedSymbols { get; init; }
}
#endregion

View File

@@ -0,0 +1,241 @@
-- Signals Schema Migration 002: Runtime Agent Framework
-- Sprint: SPRINT_20260109_009_004
-- Creates tables for runtime agent registration, heartbeats, and aggregated facts
-- ============================================================================
-- Runtime Agent Registrations
-- ============================================================================
CREATE TABLE IF NOT EXISTS signals.runtime_agents (
agent_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
artifact_digest TEXT NOT NULL,
platform TEXT NOT NULL CHECK (platform IN ('dotnet', 'java', 'native', 'python', 'nodejs', 'go', 'rust')),
posture TEXT NOT NULL DEFAULT 'sampled'
CHECK (posture IN ('none', 'passive', 'sampled', 'active_tracing', 'deep', 'full')),
metadata JSONB,
registered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_heartbeat_at TIMESTAMPTZ,
state TEXT NOT NULL DEFAULT 'registered'
CHECK (state IN ('registered', 'starting', 'running', 'stopping', 'stopped', 'error')),
statistics JSONB,
version TEXT,
hostname TEXT,
container_id TEXT,
pod_name TEXT,
namespace TEXT
);
CREATE INDEX IF NOT EXISTS idx_runtime_agents_tenant ON signals.runtime_agents(tenant_id);
CREATE INDEX IF NOT EXISTS idx_runtime_agents_artifact ON signals.runtime_agents(artifact_digest);
CREATE INDEX IF NOT EXISTS idx_runtime_agents_heartbeat ON signals.runtime_agents(last_heartbeat_at);
CREATE INDEX IF NOT EXISTS idx_runtime_agents_state ON signals.runtime_agents(state);
CREATE INDEX IF NOT EXISTS idx_runtime_agents_active ON signals.runtime_agents(tenant_id, state)
WHERE state = 'running';
COMMENT ON TABLE signals.runtime_agents IS 'Runtime agent registrations for method-level execution trace collection';
COMMENT ON COLUMN signals.runtime_agents.platform IS 'Target platform: dotnet, java, native, python, nodejs, go, rust';
COMMENT ON COLUMN signals.runtime_agents.posture IS 'Collection intensity: none, passive, sampled, active_tracing, deep, full';
COMMENT ON COLUMN signals.runtime_agents.state IS 'Agent lifecycle state: registered, starting, running, stopping, stopped, error';
-- ============================================================================
-- Runtime Facts (Aggregated Observations)
-- ============================================================================
CREATE TABLE IF NOT EXISTS signals.runtime_facts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
artifact_digest TEXT NOT NULL,
canonical_symbol_id TEXT NOT NULL,
display_name TEXT NOT NULL,
hit_count BIGINT NOT NULL DEFAULT 0,
first_seen TIMESTAMPTZ NOT NULL,
last_seen TIMESTAMPTZ NOT NULL,
contexts JSONB NOT NULL DEFAULT '[]'::jsonb,
agent_ids UUID[] NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT runtime_facts_unique UNIQUE (tenant_id, artifact_digest, canonical_symbol_id)
);
CREATE INDEX IF NOT EXISTS idx_runtime_facts_tenant ON signals.runtime_facts(tenant_id);
CREATE INDEX IF NOT EXISTS idx_runtime_facts_artifact ON signals.runtime_facts(tenant_id, artifact_digest);
CREATE INDEX IF NOT EXISTS idx_runtime_facts_symbol ON signals.runtime_facts(canonical_symbol_id);
CREATE INDEX IF NOT EXISTS idx_runtime_facts_last_seen ON signals.runtime_facts(last_seen DESC);
CREATE INDEX IF NOT EXISTS idx_runtime_facts_hit_count ON signals.runtime_facts(hit_count DESC);
CREATE INDEX IF NOT EXISTS idx_runtime_facts_gin_contexts ON signals.runtime_facts USING GIN (contexts);
COMMENT ON TABLE signals.runtime_facts IS 'Aggregated runtime method observations from runtime agents';
COMMENT ON COLUMN signals.runtime_facts.canonical_symbol_id IS 'Canonicalized symbol identifier from symbol normalization pipeline';
COMMENT ON COLUMN signals.runtime_facts.hit_count IS 'Total number of times this symbol was observed executing';
COMMENT ON COLUMN signals.runtime_facts.contexts IS 'JSONB array of runtime contexts (container, route, process) where symbol was observed';
COMMENT ON COLUMN signals.runtime_facts.agent_ids IS 'Array of agent IDs that have reported observations for this symbol';
-- ============================================================================
-- Agent Heartbeat History (for monitoring)
-- ============================================================================
CREATE TABLE IF NOT EXISTS signals.agent_heartbeats (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
agent_id UUID NOT NULL REFERENCES signals.runtime_agents(agent_id) ON DELETE CASCADE,
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
events_collected BIGINT NOT NULL DEFAULT 0,
events_transmitted BIGINT NOT NULL DEFAULT 0,
events_dropped BIGINT NOT NULL DEFAULT 0,
memory_bytes BIGINT,
cpu_percent REAL,
error_count INT NOT NULL DEFAULT 0,
last_error TEXT
);
CREATE INDEX IF NOT EXISTS idx_agent_heartbeats_agent ON signals.agent_heartbeats(agent_id);
CREATE INDEX IF NOT EXISTS idx_agent_heartbeats_recorded ON signals.agent_heartbeats(recorded_at DESC);
-- Partitioning hint: Consider partitioning by recorded_at for high-volume deployments
COMMENT ON TABLE signals.agent_heartbeats IS 'Agent heartbeat history for monitoring and diagnostics';
-- ============================================================================
-- Agent Commands Queue (for remote control)
-- ============================================================================
CREATE TABLE IF NOT EXISTS signals.agent_commands (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
agent_id UUID NOT NULL REFERENCES signals.runtime_agents(agent_id) ON DELETE CASCADE,
command_type TEXT NOT NULL CHECK (command_type IN (
'start', 'stop', 'reconfigure', 'flush', 'update_filters', 'set_posture'
)),
payload JSONB,
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending', 'acknowledged', 'executing', 'completed', 'failed')),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
acknowledged_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
result JSONB
);
CREATE INDEX IF NOT EXISTS idx_agent_commands_agent ON signals.agent_commands(agent_id);
CREATE INDEX IF NOT EXISTS idx_agent_commands_pending ON signals.agent_commands(agent_id, status)
WHERE status = 'pending';
COMMENT ON TABLE signals.agent_commands IS 'Command queue for remote agent control';
COMMENT ON COLUMN signals.agent_commands.command_type IS 'Command type: start, stop, reconfigure, flush, update_filters, set_posture';
-- ============================================================================
-- Views for Runtime Agent Management
-- ============================================================================
-- Active agents summary
CREATE OR REPLACE VIEW signals.active_agents AS
SELECT
ra.agent_id,
ra.tenant_id,
ra.artifact_digest,
ra.platform,
ra.posture,
ra.state,
ra.hostname,
ra.container_id,
ra.registered_at,
ra.last_heartbeat_at,
(ra.statistics->>'eventsCollected')::bigint AS events_collected,
(ra.statistics->>'eventsTransmitted')::bigint AS events_transmitted,
NOW() - ra.last_heartbeat_at AS time_since_heartbeat
FROM signals.runtime_agents ra
WHERE ra.state = 'running'
AND ra.last_heartbeat_at > NOW() - INTERVAL '5 minutes';
COMMENT ON VIEW signals.active_agents IS 'Currently active runtime agents with recent heartbeats';
-- Runtime facts summary per artifact
CREATE OR REPLACE VIEW signals.runtime_facts_summary AS
SELECT
rf.tenant_id,
rf.artifact_digest,
COUNT(*) AS unique_symbols_observed,
SUM(rf.hit_count) AS total_observations,
MIN(rf.first_seen) AS earliest_observation,
MAX(rf.last_seen) AS latest_observation,
COUNT(DISTINCT unnest(rf.agent_ids)) AS contributing_agents
FROM signals.runtime_facts rf
GROUP BY rf.tenant_id, rf.artifact_digest;
COMMENT ON VIEW signals.runtime_facts_summary IS 'Summary of runtime observations per artifact';
-- ============================================================================
-- Functions for Runtime Agent Management
-- ============================================================================
-- Upsert runtime fact (for batch ingestion)
CREATE OR REPLACE FUNCTION signals.upsert_runtime_fact(
p_tenant_id UUID,
p_artifact_digest TEXT,
p_canonical_symbol_id TEXT,
p_display_name TEXT,
p_hit_count BIGINT,
p_first_seen TIMESTAMPTZ,
p_last_seen TIMESTAMPTZ,
p_contexts JSONB,
p_agent_id UUID
) RETURNS UUID AS $$
DECLARE
v_fact_id UUID;
BEGIN
INSERT INTO signals.runtime_facts (
tenant_id, artifact_digest, canonical_symbol_id, display_name,
hit_count, first_seen, last_seen, contexts, agent_ids
) VALUES (
p_tenant_id, p_artifact_digest, p_canonical_symbol_id, p_display_name,
p_hit_count, p_first_seen, p_last_seen, p_contexts, ARRAY[p_agent_id]
)
ON CONFLICT (tenant_id, artifact_digest, canonical_symbol_id)
DO UPDATE SET
hit_count = signals.runtime_facts.hit_count + EXCLUDED.hit_count,
last_seen = GREATEST(signals.runtime_facts.last_seen, EXCLUDED.last_seen),
first_seen = LEAST(signals.runtime_facts.first_seen, EXCLUDED.first_seen),
contexts = signals.runtime_facts.contexts || EXCLUDED.contexts,
agent_ids = ARRAY(SELECT DISTINCT unnest(signals.runtime_facts.agent_ids || EXCLUDED.agent_ids)),
updated_at = NOW()
RETURNING id INTO v_fact_id;
RETURN v_fact_id;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION signals.upsert_runtime_fact IS 'Upsert a runtime fact, aggregating hit counts and contexts';
-- Clean up stale agents
CREATE OR REPLACE FUNCTION signals.cleanup_stale_agents(
p_stale_threshold INTERVAL DEFAULT INTERVAL '1 hour'
) RETURNS INT AS $$
DECLARE
v_count INT;
BEGIN
UPDATE signals.runtime_agents
SET state = 'stopped'
WHERE state = 'running'
AND last_heartbeat_at < NOW() - p_stale_threshold;
GET DIAGNOSTICS v_count = ROW_COUNT;
RETURN v_count;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION signals.cleanup_stale_agents IS 'Mark agents as stopped if no heartbeat received within threshold';
-- Prune old heartbeat history
CREATE OR REPLACE FUNCTION signals.prune_heartbeat_history(
p_retention_days INT DEFAULT 7
) RETURNS INT AS $$
DECLARE
v_count INT;
BEGIN
DELETE FROM signals.agent_heartbeats
WHERE recorded_at < NOW() - (p_retention_days || ' days')::INTERVAL;
GET DIAGNOSTICS v_count = ROW_COUNT;
RETURN v_count;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION signals.prune_heartbeat_history IS 'Delete heartbeat records older than retention period';