mroe completeness
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
# Sprint 3104 · Signals callgraph projection completion
|
||||
|
||||
**Status:** TODO
|
||||
**Status:** DONE
|
||||
**Priority:** P2 - MEDIUM
|
||||
**Module:** Signals
|
||||
**Working directory:** `src/Signals/`
|
||||
@@ -22,11 +22,11 @@
|
||||
## Delivery Tracker
|
||||
| # | Task ID | Status | Key dependency / next step | Owners | Task Definition |
|
||||
| --- | --- | --- | --- | --- | --- |
|
||||
| 1 | SIG-CG-3104-001 | TODO | Define contract | Signals · Storage | Define `ICallGraphSyncService` for projecting a canonical callgraph into `signals.*` relational tables. |
|
||||
| 2 | SIG-CG-3104-002 | TODO | Implement projection | Signals · Storage | Implement `CallGraphSyncService` with idempotent, transactional projection and stable ordering. |
|
||||
| 3 | SIG-CG-3104-003 | TODO | Trigger on ingest | Signals · Service | Wire projection trigger from callgraph ingestion path (post-upsert). |
|
||||
| 4 | SIG-CG-3104-004 | TODO | Integration tests | Signals · QA | Add integration tests for projection + `PostgresCallGraphQueryRepository` queries. |
|
||||
| 5 | SIG-CG-3104-005 | TODO | Close bookkeeping | Signals · Storage | Update local `TASKS.md` and sprint status with evidence. |
|
||||
| 1 | SIG-CG-3104-001 | DONE | Define contract | Signals · Storage | Define `ICallGraphSyncService` for projecting a canonical callgraph into `signals.*` relational tables. |
|
||||
| 2 | SIG-CG-3104-002 | DONE | Implement projection | Signals · Storage | Implement `CallGraphSyncService` with idempotent, transactional projection and stable ordering. |
|
||||
| 3 | SIG-CG-3104-003 | DONE | Trigger on ingest | Signals · Service | Wire projection trigger from callgraph ingestion path (post-upsert). |
|
||||
| 4 | SIG-CG-3104-004 | DONE | Integration tests | Signals · QA | Add integration tests for projection + `PostgresCallGraphQueryRepository` queries. |
|
||||
| 5 | SIG-CG-3104-005 | DONE | Close bookkeeping | Signals · Storage | Update local `TASKS.md` and sprint status with evidence. |
|
||||
|
||||
## Wave Coordination
|
||||
- Wave A: projection contract + service
|
||||
@@ -52,7 +52,8 @@
|
||||
| Date (UTC) | Update | Owner |
|
||||
| --- | --- | --- |
|
||||
| 2025-12-18 | Sprint created; awaiting staffing. | Planning |
|
||||
| 2025-12-18 | Started SIG-CG-3104-001 (projection contract + implementation). | Agent |
|
||||
| 2025-12-18 | Completed SIG-CG-3104-001..005; validated via `dotnet test src/Signals/StellaOps.Signals.Storage.Postgres.Tests/StellaOps.Signals.Storage.Postgres.Tests.csproj -c Release` (5 tests). | Agent |
|
||||
|
||||
## Next Checkpoints
|
||||
- 2025-12-18: Projection service skeleton + first passing integration test (if staffed).
|
||||
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
# Sprint 3105 · ProofSpine CBOR accept
|
||||
|
||||
**Status:** DOING
|
||||
**Priority:** P2 - MEDIUM
|
||||
**Module:** Scanner.WebService
|
||||
**Working directory:** `src/Scanner/StellaOps.Scanner.WebService/`
|
||||
|
||||
## Topic & Scope
|
||||
- Pick up deferred ProofSpine API work from `docs/implplan/archived/SPRINT_3100_0001_0001_proof_spine_system.md`:
|
||||
- add support for `Accept: application/cbor` on `GET /api/v1/spines/{spineId}` and `GET /api/v1/scans/{scanId}/spines`.
|
||||
- Keep outputs deterministic (canonical/stable ordering) and add tests for content negotiation.
|
||||
|
||||
## Dependencies & Concurrency
|
||||
- No schema changes required.
|
||||
- Keep scope inside Scanner WebService + its test project.
|
||||
|
||||
## Documentation Prerequisites
|
||||
- `docs/implplan/archived/SPRINT_3100_0001_0001_proof_spine_system.md`
|
||||
|
||||
## Delivery Tracker
|
||||
| # | Task ID | Status | Key dependency / next step | Owners | Task Definition |
|
||||
| --- | --- | --- | --- | --- | --- |
|
||||
| 1 | PROOF-CBOR-3105-001 | DOING | ProofSpine endpoints | Scanner · WebService | Add `Accept: application/cbor` support to ProofSpine endpoints with deterministic encoding. |
|
||||
| 2 | PROOF-CBOR-3105-002 | TODO | Encoder helper | Scanner · WebService | Add a shared CBOR encoder helper (JSON→CBOR) with stable key ordering. |
|
||||
| 3 | PROOF-CBOR-3105-003 | TODO | Integration tests | Scanner · QA | Add endpoint tests validating CBOR content-type and decoding key fields. |
|
||||
| 4 | PROOF-CBOR-3105-004 | TODO | Close bookkeeping | Scanner · WebService | Update local `TASKS.md`, sprint status, and execution log with evidence (test run). |
|
||||
|
||||
## Decisions & Risks
|
||||
- **Decision:** CBOR payload shape matches JSON DTO shape (same property names).
|
||||
- **Risk:** CBOR library availability on `net10.0`. **Mitigation:** use `System.Formats.Cbor` (BCL) and add package reference only if required by build.
|
||||
|
||||
## Execution Log
|
||||
| Date (UTC) | Update | Owner |
|
||||
| --- | --- | --- |
|
||||
| 2025-12-18 | Sprint created; started PROOF-CBOR-3105-001. | Agent |
|
||||
|
||||
@@ -2,8 +2,9 @@
|
||||
|
||||
| Task ID | Sprint | Status | Notes |
|
||||
| --- | --- | --- | --- |
|
||||
| `SCAN-API-3101-001` | `docs/implplan/SPRINT_3101_0001_0001_scanner_api_standardization.md` | DOING | Align Scanner OpenAPI spec with current endpoints and include ProofSpine routes; compose into `src/Api/StellaOps.Api.OpenApi/stella.yaml`. |
|
||||
| `PROOFSPINE-3100-API` | `docs/implplan/SPRINT_3100_0001_0001_proof_spine_system.md` | DOING | Implement and test `/api/v1/spines/*` endpoints and wire verification output. |
|
||||
| `SCAN-API-3101-001` | `docs/implplan/archived/SPRINT_3101_0001_0001_scanner_api_standardization.md` | DOING | Align Scanner OpenAPI spec with current endpoints and include ProofSpine routes; compose into `src/Api/StellaOps.Api.OpenApi/stella.yaml`. |
|
||||
| `PROOFSPINE-3100-API` | `docs/implplan/archived/SPRINT_3100_0001_0001_proof_spine_system.md` | DONE | Implemented and tested `/api/v1/spines/*` endpoints with verification output (CBOR accept tracked in SPRINT_3105). |
|
||||
| `PROOF-CBOR-3105-001` | `docs/implplan/SPRINT_3105_0001_0001_proofspine_cbor_accept.md` | DOING | Add `Accept: application/cbor` support for ProofSpine endpoints + tests. |
|
||||
| `SCAN-AIRGAP-0340-001` | `docs/implplan/SPRINT_0340_0001_0001_scanner_offline_config.md` | DONE | Offline kit import + DSSE/offline Rekor verification wired; integration tests cover success/failure/audit. |
|
||||
| `DRIFT-3600-API` | `docs/implplan/SPRINT_3600_0003_0001_drift_detection_engine.md` | DONE | Add reachability drift endpoints (`/api/v1/scans/{id}/drift`, `/api/v1/drift/{id}/sinks`) + integration tests. |
|
||||
| `SCAN-API-3103-001` | `docs/implplan/SPRINT_3103_0001_0001_scanner_api_ingestion_completion.md` | DONE | Implement missing ingestion services + DI for callgraph/SBOM endpoints and add deterministic integration tests. |
|
||||
|
||||
@@ -0,0 +1,132 @@
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using FluentAssertions;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using MicrosoftOptions = Microsoft.Extensions.Options;
|
||||
using StellaOps.Signals.Models;
|
||||
using StellaOps.Signals.Services;
|
||||
using StellaOps.Signals.Storage.Postgres.Repositories;
|
||||
using StellaOps.Signals.Storage.Postgres.Services;
|
||||
using Xunit;
|
||||
|
||||
namespace StellaOps.Signals.Storage.Postgres.Tests;
|
||||
|
||||
[Collection(SignalsPostgresCollection.Name)]
|
||||
public sealed class CallGraphSyncServiceTests : IAsyncLifetime
|
||||
{
|
||||
private readonly SignalsPostgresFixture _fixture;
|
||||
private readonly SignalsDataSource _dataSource;
|
||||
private readonly CallGraphSyncService _syncService;
|
||||
private readonly PostgresCallGraphQueryRepository _queryRepository;
|
||||
|
||||
public CallGraphSyncServiceTests(SignalsPostgresFixture fixture)
|
||||
{
|
||||
_fixture = fixture;
|
||||
|
||||
var options = fixture.Fixture.CreateOptions();
|
||||
options.SchemaName = fixture.SchemaName;
|
||||
_dataSource = new SignalsDataSource(MicrosoftOptions.Options.Create(options), NullLogger<SignalsDataSource>.Instance);
|
||||
|
||||
_syncService = new CallGraphSyncService(_dataSource, TimeProvider.System, NullLogger<CallGraphSyncService>.Instance);
|
||||
_queryRepository = new PostgresCallGraphQueryRepository(_dataSource, NullLogger<PostgresCallGraphQueryRepository>.Instance);
|
||||
}
|
||||
|
||||
public async Task InitializeAsync()
|
||||
{
|
||||
await _fixture.ExecuteSqlAsync("TRUNCATE TABLE signals.scans CASCADE;");
|
||||
}
|
||||
|
||||
public async Task DisposeAsync()
|
||||
{
|
||||
await _dataSource.DisposeAsync();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SyncAsync_ProjectsCallgraph_AndQueryRepositoryReturnsStats()
|
||||
{
|
||||
var document = new CallgraphDocument
|
||||
{
|
||||
Id = "callgraph-1",
|
||||
Language = "nodejs",
|
||||
Component = "pkg:npm/demo-app@1.0.0",
|
||||
Version = "1.0.0",
|
||||
Artifact = new CallgraphArtifactMetadata
|
||||
{
|
||||
Hash = "deadbeef",
|
||||
Path = "cas/reachability/graphs/deadbeef/callgraph.json",
|
||||
Length = 128,
|
||||
ContentType = "application/json"
|
||||
},
|
||||
Nodes = new List<CallgraphNode>
|
||||
{
|
||||
new("n1", "Main", "function", "Demo", "index.js", 1)
|
||||
{
|
||||
SymbolKey = "Demo::Main()",
|
||||
Visibility = SymbolVisibility.Public,
|
||||
IsEntrypointCandidate = true,
|
||||
},
|
||||
new("n2", "Helper", "function", "Demo", "lib.js", 10)
|
||||
{
|
||||
SymbolKey = "Demo::Helper()",
|
||||
Visibility = SymbolVisibility.Internal,
|
||||
Purl = "pkg:npm/lodash@4.17.21",
|
||||
IsEntrypointCandidate = false,
|
||||
}
|
||||
},
|
||||
Edges = new List<CallgraphEdge>
|
||||
{
|
||||
new("n1", "n2", "call")
|
||||
{
|
||||
Kind = EdgeKind.Static,
|
||||
Reason = EdgeReason.DirectCall,
|
||||
Weight = 1.0,
|
||||
IsResolved = true
|
||||
}
|
||||
},
|
||||
Entrypoints = new List<CallgraphEntrypoint>
|
||||
{
|
||||
new()
|
||||
{
|
||||
NodeId = "n1",
|
||||
Kind = EntrypointKind.Http,
|
||||
Framework = EntrypointFramework.Express,
|
||||
Route = "/",
|
||||
HttpMethod = "GET",
|
||||
Phase = EntrypointPhase.Runtime,
|
||||
Order = 0
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
var request = new CallGraphSyncRequest(
|
||||
ArtifactDigest: document.Artifact.Hash,
|
||||
SbomDigest: null,
|
||||
RepoUri: null,
|
||||
CommitSha: null,
|
||||
PolicyDigest: null,
|
||||
Document: document);
|
||||
|
||||
var result1 = await _syncService.SyncAsync(request, CancellationToken.None);
|
||||
result1.WasApplied.Should().BeTrue();
|
||||
result1.ScanId.Should().NotBeEmpty();
|
||||
|
||||
var stats = await _queryRepository.GetStatsAsync(result1.ScanId, CancellationToken.None);
|
||||
stats.NodeCount.Should().Be(2);
|
||||
stats.EdgeCount.Should().Be(1);
|
||||
stats.EntrypointCount.Should().Be(1);
|
||||
stats.UniquePurls.Should().Be(1);
|
||||
stats.HeuristicEdgeCount.Should().Be(0);
|
||||
stats.UnresolvedEdgeCount.Should().Be(0);
|
||||
|
||||
var reachable = await _queryRepository.GetReachableSymbolsAsync(result1.ScanId, "n1", cancellationToken: CancellationToken.None);
|
||||
reachable.Should().Contain("n2");
|
||||
|
||||
var result2 = await _syncService.SyncAsync(request, CancellationToken.None);
|
||||
result2.ScanId.Should().Be(result1.ScanId);
|
||||
|
||||
var stats2 = await _queryRepository.GetStatsAsync(result1.ScanId, CancellationToken.None);
|
||||
stats2.NodeCount.Should().Be(2);
|
||||
stats2.EdgeCount.Should().Be(1);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
-- ============================================================
|
||||
-- Signals Storage: database extensions bootstrap
|
||||
-- ============================================================
|
||||
|
||||
CREATE EXTENSION IF NOT EXISTS pgcrypto;
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.DependencyInjection.Extensions;
|
||||
using StellaOps.Infrastructure.Postgres.Migrations;
|
||||
using StellaOps.Infrastructure.Postgres.Options;
|
||||
using StellaOps.Signals.Persistence;
|
||||
using StellaOps.Signals.Services;
|
||||
using StellaOps.Signals.Storage.Postgres.Repositories;
|
||||
using StellaOps.Signals.Storage.Postgres.Services;
|
||||
|
||||
namespace StellaOps.Signals.Storage.Postgres;
|
||||
|
||||
@@ -24,8 +28,14 @@ public static class ServiceCollectionExtensions
|
||||
string sectionName = "Postgres:Signals")
|
||||
{
|
||||
services.Configure<PostgresOptions>(configuration.GetSection(sectionName));
|
||||
services.TryAddSingleton(TimeProvider.System);
|
||||
services.AddSingleton<SignalsDataSource>();
|
||||
|
||||
services.AddStartupMigrations(
|
||||
SignalsDataSource.DefaultSchemaName,
|
||||
"Signals.Storage",
|
||||
typeof(SignalsDataSource).Assembly);
|
||||
|
||||
// Register repositories
|
||||
services.AddSingleton<ICallgraphRepository, PostgresCallgraphRepository>();
|
||||
services.AddSingleton<IReachabilityFactRepository, PostgresReachabilityFactRepository>();
|
||||
@@ -35,6 +45,8 @@ public static class ServiceCollectionExtensions
|
||||
services.AddSingleton<IGraphMetricsRepository, PostgresGraphMetricsRepository>();
|
||||
services.AddSingleton<ICallGraphQueryRepository, PostgresCallGraphQueryRepository>();
|
||||
|
||||
services.AddSingleton<ICallGraphSyncService, CallGraphSyncService>();
|
||||
|
||||
return services;
|
||||
}
|
||||
|
||||
@@ -49,8 +61,14 @@ public static class ServiceCollectionExtensions
|
||||
Action<PostgresOptions> configureOptions)
|
||||
{
|
||||
services.Configure(configureOptions);
|
||||
services.TryAddSingleton(TimeProvider.System);
|
||||
services.AddSingleton<SignalsDataSource>();
|
||||
|
||||
services.AddStartupMigrations(
|
||||
SignalsDataSource.DefaultSchemaName,
|
||||
"Signals.Storage",
|
||||
typeof(SignalsDataSource).Assembly);
|
||||
|
||||
// Register repositories
|
||||
services.AddSingleton<ICallgraphRepository, PostgresCallgraphRepository>();
|
||||
services.AddSingleton<IReachabilityFactRepository, PostgresReachabilityFactRepository>();
|
||||
@@ -60,6 +78,8 @@ public static class ServiceCollectionExtensions
|
||||
services.AddSingleton<IGraphMetricsRepository, PostgresGraphMetricsRepository>();
|
||||
services.AddSingleton<ICallGraphQueryRepository, PostgresCallGraphQueryRepository>();
|
||||
|
||||
services.AddSingleton<ICallGraphSyncService, CallGraphSyncService>();
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,565 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using System.Text.Json;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Npgsql;
|
||||
using NpgsqlTypes;
|
||||
using StellaOps.Infrastructure.Postgres.Repositories;
|
||||
using StellaOps.Signals.Models;
|
||||
using StellaOps.Signals.Services;
|
||||
|
||||
namespace StellaOps.Signals.Storage.Postgres.Services;
|
||||
|
||||
public sealed class CallGraphSyncService : RepositoryBase<SignalsDataSource>, ICallGraphSyncService
|
||||
{
|
||||
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web)
|
||||
{
|
||||
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
|
||||
WriteIndented = false
|
||||
};
|
||||
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
public CallGraphSyncService(
|
||||
SignalsDataSource dataSource,
|
||||
TimeProvider timeProvider,
|
||||
ILogger<CallGraphSyncService> logger)
|
||||
: base(dataSource, logger)
|
||||
{
|
||||
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
}
|
||||
|
||||
public async Task<CallGraphSyncResult> SyncAsync(CallGraphSyncRequest request, CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
ArgumentNullException.ThrowIfNull(request.Document);
|
||||
|
||||
var artifactDigest = NormalizeRequired(request.ArtifactDigest);
|
||||
var sbomDigest = NormalizeOptionalDigest(request.SbomDigest) ?? string.Empty;
|
||||
|
||||
try
|
||||
{
|
||||
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
|
||||
await using var transaction = await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var scanId = await UpsertScanAsync(connection, transaction, artifactDigest, sbomDigest, request, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await DeleteProjectedRowsAsync(connection, transaction, scanId, cancellationToken).ConfigureAwait(false);
|
||||
await InsertScanArtifactAsync(connection, transaction, scanId, artifactDigest, request.Document, cancellationToken).ConfigureAwait(false);
|
||||
await InsertNodesAsync(connection, transaction, scanId, request.Document, cancellationToken).ConfigureAwait(false);
|
||||
await InsertEdgesAsync(connection, transaction, scanId, request.Document, cancellationToken).ConfigureAwait(false);
|
||||
await InsertEntrypointsAsync(connection, transaction, scanId, request.Document, cancellationToken).ConfigureAwait(false);
|
||||
await InsertSymbolComponentMappingsAsync(connection, transaction, scanId, request.Document, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await MarkScanCompletedAsync(connection, transaction, scanId, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
Logger.LogInformation(
|
||||
"Projected callgraph artifact={ArtifactDigest} sbom={SbomDigest} scan={ScanId} nodes={NodeCount} edges={EdgeCount}",
|
||||
artifactDigest,
|
||||
sbomDigest,
|
||||
scanId,
|
||||
request.Document.Nodes.Count,
|
||||
request.Document.Edges.Count);
|
||||
|
||||
return new CallGraphSyncResult(scanId, WasApplied: true);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogError(ex, "Callgraph projection failed for artifact={ArtifactDigest} sbom={SbomDigest}", artifactDigest, sbomDigest);
|
||||
await TryMarkScanFailedAsync(artifactDigest, sbomDigest, request, ex, cancellationToken).ConfigureAwait(false);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<Guid> UpsertScanAsync(
|
||||
NpgsqlConnection connection,
|
||||
NpgsqlTransaction transaction,
|
||||
string artifactDigest,
|
||||
string sbomDigest,
|
||||
CallGraphSyncRequest request,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
const string sql = """
|
||||
INSERT INTO signals.scans (artifact_digest, repo_uri, commit_sha, sbom_digest, policy_digest, status, completed_at, error_message)
|
||||
VALUES (@artifact_digest, @repo_uri, @commit_sha, @sbom_digest, @policy_digest, 'processing', NULL, NULL)
|
||||
ON CONFLICT (artifact_digest, sbom_digest)
|
||||
DO UPDATE SET
|
||||
repo_uri = EXCLUDED.repo_uri,
|
||||
commit_sha = EXCLUDED.commit_sha,
|
||||
policy_digest = EXCLUDED.policy_digest,
|
||||
status = 'processing',
|
||||
completed_at = NULL,
|
||||
error_message = NULL
|
||||
RETURNING scan_id
|
||||
""";
|
||||
|
||||
await using var command = CreateCommand(sql, connection);
|
||||
command.Transaction = transaction;
|
||||
|
||||
AddParameter(command, "@artifact_digest", artifactDigest);
|
||||
AddParameter(command, "@repo_uri", NormalizeOptional(request.RepoUri));
|
||||
AddParameter(command, "@commit_sha", NormalizeOptional(request.CommitSha)?.ToLowerInvariant());
|
||||
AddParameter(command, "@sbom_digest", sbomDigest);
|
||||
AddParameter(command, "@policy_digest", NormalizeOptionalDigest(request.PolicyDigest));
|
||||
|
||||
var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
|
||||
return (Guid)(result ?? throw new InvalidOperationException("scan_id was not returned from signals.scans upsert."));
|
||||
}
|
||||
|
||||
private async Task DeleteProjectedRowsAsync(
|
||||
NpgsqlConnection connection,
|
||||
NpgsqlTransaction transaction,
|
||||
Guid scanId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
const string sql = """
|
||||
DELETE FROM signals.symbol_component_map WHERE scan_id = @scan_id;
|
||||
DELETE FROM signals.entrypoints WHERE scan_id = @scan_id;
|
||||
DELETE FROM signals.cg_edges WHERE scan_id = @scan_id;
|
||||
DELETE FROM signals.cg_nodes WHERE scan_id = @scan_id;
|
||||
DELETE FROM signals.artifacts WHERE scan_id = @scan_id;
|
||||
""";
|
||||
|
||||
await using var command = CreateCommand(sql, connection);
|
||||
command.Transaction = transaction;
|
||||
AddParameter(command, "@scan_id", scanId);
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task InsertScanArtifactAsync(
|
||||
NpgsqlConnection connection,
|
||||
NpgsqlTransaction transaction,
|
||||
Guid scanId,
|
||||
string artifactDigest,
|
||||
CallgraphDocument document,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
const string sql = """
|
||||
INSERT INTO signals.artifacts (
|
||||
scan_id,
|
||||
artifact_key,
|
||||
kind,
|
||||
sha256,
|
||||
purl,
|
||||
build_id,
|
||||
file_path,
|
||||
size_bytes)
|
||||
VALUES (
|
||||
@scan_id,
|
||||
@artifact_key,
|
||||
@kind,
|
||||
@sha256,
|
||||
@purl,
|
||||
@build_id,
|
||||
@file_path,
|
||||
@size_bytes)
|
||||
""";
|
||||
|
||||
await using var command = CreateCommand(sql, connection);
|
||||
command.Transaction = transaction;
|
||||
|
||||
AddParameter(command, "@scan_id", scanId);
|
||||
AddParameter(command, "@artifact_key", NormalizeOptional(document.Component) ?? "callgraph");
|
||||
AddParameter(command, "@kind", InferArtifactKind(document.LanguageType));
|
||||
AddParameter(command, "@sha256", artifactDigest);
|
||||
AddParameter(command, "@purl", IsPurl(document.Component) ? document.Component.Trim().ToLowerInvariant() : null);
|
||||
AddParameter(command, "@build_id", NormalizeOptional(document.GraphMetadata?.BuildId));
|
||||
AddParameter(command, "@file_path", NormalizeOptional(document.Artifact.Path));
|
||||
AddParameter(command, "@size_bytes", document.Artifact.Length > 0 ? document.Artifact.Length : null);
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task InsertNodesAsync(
|
||||
NpgsqlConnection connection,
|
||||
NpgsqlTransaction transaction,
|
||||
Guid scanId,
|
||||
CallgraphDocument document,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
const string sql = """
|
||||
INSERT INTO signals.cg_nodes (
|
||||
scan_id,
|
||||
node_id,
|
||||
artifact_key,
|
||||
symbol_key,
|
||||
visibility,
|
||||
is_entrypoint_candidate,
|
||||
purl,
|
||||
symbol_digest,
|
||||
flags,
|
||||
attributes)
|
||||
VALUES (
|
||||
@scan_id,
|
||||
@node_id,
|
||||
@artifact_key,
|
||||
@symbol_key,
|
||||
@visibility,
|
||||
@is_entrypoint_candidate,
|
||||
@purl,
|
||||
@symbol_digest,
|
||||
@flags,
|
||||
@attributes)
|
||||
""";
|
||||
|
||||
await using var command = CreateCommand(sql, connection);
|
||||
command.Transaction = transaction;
|
||||
|
||||
command.Parameters.Add(new NpgsqlParameter<Guid>("@scan_id", NpgsqlDbType.Uuid) { TypedValue = scanId });
|
||||
command.Parameters.Add(new NpgsqlParameter<string>("@node_id", NpgsqlDbType.Text) { TypedValue = string.Empty });
|
||||
command.Parameters.Add(new NpgsqlParameter<string?>("@artifact_key", NpgsqlDbType.Text) { TypedValue = null });
|
||||
command.Parameters.Add(new NpgsqlParameter<string>("@symbol_key", NpgsqlDbType.Text) { TypedValue = string.Empty });
|
||||
command.Parameters.Add(new NpgsqlParameter<string>("@visibility", NpgsqlDbType.Text) { TypedValue = "unknown" });
|
||||
command.Parameters.Add(new NpgsqlParameter<bool>("@is_entrypoint_candidate", NpgsqlDbType.Boolean) { TypedValue = false });
|
||||
command.Parameters.Add(new NpgsqlParameter<string?>("@purl", NpgsqlDbType.Text) { TypedValue = null });
|
||||
command.Parameters.Add(new NpgsqlParameter<string?>("@symbol_digest", NpgsqlDbType.Text) { TypedValue = null });
|
||||
command.Parameters.Add(new NpgsqlParameter<int>("@flags", NpgsqlDbType.Integer) { TypedValue = 0 });
|
||||
command.Parameters.Add(new NpgsqlParameter<string?>("@attributes", NpgsqlDbType.Jsonb) { TypedValue = null });
|
||||
|
||||
foreach (var node in document.Nodes.OrderBy(n => n.Id, StringComparer.Ordinal))
|
||||
{
|
||||
command.Parameters["@node_id"].Value = NormalizeRequired(node.Id);
|
||||
command.Parameters["@artifact_key"].Value = NormalizeOptional(node.ArtifactKey);
|
||||
command.Parameters["@symbol_key"].Value = NormalizeRequired(node.SymbolKey ?? node.Name);
|
||||
command.Parameters["@visibility"].Value = MapVisibility(node.Visibility);
|
||||
command.Parameters["@is_entrypoint_candidate"].Value = node.IsEntrypointCandidate;
|
||||
command.Parameters["@purl"].Value = NormalizeOptionalDigest(node.Purl);
|
||||
command.Parameters["@symbol_digest"].Value = NormalizeOptionalDigest(node.SymbolDigest);
|
||||
command.Parameters["@flags"].Value = node.Flags;
|
||||
command.Parameters["@attributes"].Value = SerializeAttributes(node.Attributes);
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task InsertEdgesAsync(
|
||||
NpgsqlConnection connection,
|
||||
NpgsqlTransaction transaction,
|
||||
Guid scanId,
|
||||
CallgraphDocument document,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
const string sql = """
|
||||
INSERT INTO signals.cg_edges (
|
||||
scan_id,
|
||||
from_node_id,
|
||||
to_node_id,
|
||||
kind,
|
||||
reason,
|
||||
weight,
|
||||
offset_bytes,
|
||||
is_resolved,
|
||||
provenance)
|
||||
VALUES (
|
||||
@scan_id,
|
||||
@from_node_id,
|
||||
@to_node_id,
|
||||
@kind,
|
||||
@reason,
|
||||
@weight,
|
||||
@offset_bytes,
|
||||
@is_resolved,
|
||||
@provenance)
|
||||
""";
|
||||
|
||||
await using var command = CreateCommand(sql, connection);
|
||||
command.Transaction = transaction;
|
||||
|
||||
command.Parameters.Add(new NpgsqlParameter<Guid>("@scan_id", NpgsqlDbType.Uuid) { TypedValue = scanId });
|
||||
command.Parameters.Add(new NpgsqlParameter<string>("@from_node_id", NpgsqlDbType.Text) { TypedValue = string.Empty });
|
||||
command.Parameters.Add(new NpgsqlParameter<string>("@to_node_id", NpgsqlDbType.Text) { TypedValue = string.Empty });
|
||||
command.Parameters.Add(new NpgsqlParameter<short>("@kind", NpgsqlDbType.Smallint) { TypedValue = 0 });
|
||||
command.Parameters.Add(new NpgsqlParameter<short>("@reason", NpgsqlDbType.Smallint) { TypedValue = 0 });
|
||||
command.Parameters.Add(new NpgsqlParameter<float>("@weight", NpgsqlDbType.Real) { TypedValue = 1.0f });
|
||||
command.Parameters.Add(new NpgsqlParameter<int?>("@offset_bytes", NpgsqlDbType.Integer) { TypedValue = null });
|
||||
command.Parameters.Add(new NpgsqlParameter<bool>("@is_resolved", NpgsqlDbType.Boolean) { TypedValue = true });
|
||||
command.Parameters.Add(new NpgsqlParameter<string?>("@provenance", NpgsqlDbType.Text) { TypedValue = null });
|
||||
|
||||
foreach (var edge in document.Edges
|
||||
.OrderBy(e => e.SourceId, StringComparer.Ordinal)
|
||||
.ThenBy(e => e.TargetId, StringComparer.Ordinal)
|
||||
.ThenBy(e => (int)e.Kind)
|
||||
.ThenBy(e => (int)e.Reason)
|
||||
.ThenBy(e => e.Offset ?? -1))
|
||||
{
|
||||
command.Parameters["@from_node_id"].Value = NormalizeRequired(edge.SourceId);
|
||||
command.Parameters["@to_node_id"].Value = NormalizeRequired(edge.TargetId);
|
||||
command.Parameters["@kind"].Value = (short)edge.Kind;
|
||||
command.Parameters["@reason"].Value = (short)edge.Reason;
|
||||
command.Parameters["@weight"].Value = (float)Math.Clamp(edge.Weight, 0.0, 1.0);
|
||||
command.Parameters["@offset_bytes"].Value = edge.Offset;
|
||||
command.Parameters["@is_resolved"].Value = edge.IsResolved;
|
||||
command.Parameters["@provenance"].Value = NormalizeOptional(edge.Provenance);
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task InsertEntrypointsAsync(
|
||||
NpgsqlConnection connection,
|
||||
NpgsqlTransaction transaction,
|
||||
Guid scanId,
|
||||
CallgraphDocument document,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
const string sql = """
|
||||
INSERT INTO signals.entrypoints (
|
||||
scan_id,
|
||||
node_id,
|
||||
kind,
|
||||
framework,
|
||||
route,
|
||||
http_method,
|
||||
phase,
|
||||
order_idx)
|
||||
VALUES (
|
||||
@scan_id,
|
||||
@node_id,
|
||||
@kind,
|
||||
@framework,
|
||||
@route,
|
||||
@http_method,
|
||||
@phase,
|
||||
@order_idx)
|
||||
""";
|
||||
|
||||
await using var command = CreateCommand(sql, connection);
|
||||
command.Transaction = transaction;
|
||||
|
||||
command.Parameters.Add(new NpgsqlParameter<Guid>("@scan_id", NpgsqlDbType.Uuid) { TypedValue = scanId });
|
||||
command.Parameters.Add(new NpgsqlParameter<string>("@node_id", NpgsqlDbType.Text) { TypedValue = string.Empty });
|
||||
command.Parameters.Add(new NpgsqlParameter<string>("@kind", NpgsqlDbType.Text) { TypedValue = "unknown" });
|
||||
command.Parameters.Add(new NpgsqlParameter<string?>("@framework", NpgsqlDbType.Text) { TypedValue = null });
|
||||
command.Parameters.Add(new NpgsqlParameter<string?>("@route", NpgsqlDbType.Text) { TypedValue = null });
|
||||
command.Parameters.Add(new NpgsqlParameter<string?>("@http_method", NpgsqlDbType.Text) { TypedValue = null });
|
||||
command.Parameters.Add(new NpgsqlParameter<string>("@phase", NpgsqlDbType.Text) { TypedValue = "runtime" });
|
||||
command.Parameters.Add(new NpgsqlParameter<int>("@order_idx", NpgsqlDbType.Integer) { TypedValue = 0 });
|
||||
|
||||
foreach (var entrypoint in document.Entrypoints
|
||||
.OrderBy(e => (int)e.Phase)
|
||||
.ThenBy(e => e.Order)
|
||||
.ThenBy(e => e.NodeId, StringComparer.Ordinal))
|
||||
{
|
||||
command.Parameters["@node_id"].Value = NormalizeRequired(entrypoint.NodeId);
|
||||
command.Parameters["@kind"].Value = MapEntrypointKind(entrypoint.Kind);
|
||||
command.Parameters["@framework"].Value = entrypoint.Framework == EntrypointFramework.Unknown
|
||||
? null
|
||||
: entrypoint.Framework.ToString().ToLowerInvariant();
|
||||
command.Parameters["@route"].Value = NormalizeOptional(entrypoint.Route);
|
||||
command.Parameters["@http_method"].Value = NormalizeOptional(entrypoint.HttpMethod)?.ToUpperInvariant();
|
||||
command.Parameters["@phase"].Value = MapEntrypointPhase(entrypoint.Phase);
|
||||
command.Parameters["@order_idx"].Value = entrypoint.Order;
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task InsertSymbolComponentMappingsAsync(
|
||||
NpgsqlConnection connection,
|
||||
NpgsqlTransaction transaction,
|
||||
Guid scanId,
|
||||
CallgraphDocument document,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
const string sql = """
|
||||
INSERT INTO signals.symbol_component_map (
|
||||
scan_id,
|
||||
node_id,
|
||||
purl,
|
||||
mapping_kind,
|
||||
confidence,
|
||||
evidence)
|
||||
VALUES (
|
||||
@scan_id,
|
||||
@node_id,
|
||||
@purl,
|
||||
@mapping_kind,
|
||||
@confidence,
|
||||
@evidence)
|
||||
""";
|
||||
|
||||
await using var command = CreateCommand(sql, connection);
|
||||
command.Transaction = transaction;
|
||||
|
||||
command.Parameters.Add(new NpgsqlParameter<Guid>("@scan_id", NpgsqlDbType.Uuid) { TypedValue = scanId });
|
||||
command.Parameters.Add(new NpgsqlParameter<string>("@node_id", NpgsqlDbType.Text) { TypedValue = string.Empty });
|
||||
command.Parameters.Add(new NpgsqlParameter<string>("@purl", NpgsqlDbType.Text) { TypedValue = string.Empty });
|
||||
command.Parameters.Add(new NpgsqlParameter<string>("@mapping_kind", NpgsqlDbType.Text) { TypedValue = "exact" });
|
||||
command.Parameters.Add(new NpgsqlParameter<float>("@confidence", NpgsqlDbType.Real) { TypedValue = 1.0f });
|
||||
command.Parameters.Add(new NpgsqlParameter<string?>("@evidence", NpgsqlDbType.Jsonb) { TypedValue = null });
|
||||
|
||||
foreach (var node in document.Nodes
|
||||
.Where(n => !string.IsNullOrWhiteSpace(n.Purl))
|
||||
.OrderBy(n => n.Id, StringComparer.Ordinal))
|
||||
{
|
||||
command.Parameters["@node_id"].Value = NormalizeRequired(node.Id);
|
||||
command.Parameters["@purl"].Value = NormalizeRequired(node.Purl!).ToLowerInvariant();
|
||||
command.Parameters["@mapping_kind"].Value = "exact";
|
||||
command.Parameters["@confidence"].Value = 1.0f;
|
||||
command.Parameters["@evidence"].Value = SerializeEvidence(node.Evidence);
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task MarkScanCompletedAsync(
|
||||
NpgsqlConnection connection,
|
||||
NpgsqlTransaction transaction,
|
||||
Guid scanId,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
const string sql = """
|
||||
UPDATE signals.scans
|
||||
SET status = 'completed',
|
||||
completed_at = @completed_at,
|
||||
error_message = NULL
|
||||
WHERE scan_id = @scan_id
|
||||
""";
|
||||
|
||||
await using var command = CreateCommand(sql, connection);
|
||||
command.Transaction = transaction;
|
||||
AddParameter(command, "@scan_id", scanId);
|
||||
AddParameter(command, "@completed_at", _timeProvider.GetUtcNow());
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task TryMarkScanFailedAsync(
|
||||
string artifactDigest,
|
||||
string sbomDigest,
|
||||
CallGraphSyncRequest request,
|
||||
Exception exception,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
await using var connection = await DataSource.OpenSystemConnectionAsync(cancellationToken).ConfigureAwait(false);
|
||||
const string sql = """
|
||||
INSERT INTO signals.scans (artifact_digest, repo_uri, commit_sha, sbom_digest, policy_digest, status, completed_at, error_message)
|
||||
VALUES (@artifact_digest, @repo_uri, @commit_sha, @sbom_digest, @policy_digest, 'failed', @completed_at, @error_message)
|
||||
ON CONFLICT (artifact_digest, sbom_digest)
|
||||
DO UPDATE SET
|
||||
repo_uri = EXCLUDED.repo_uri,
|
||||
commit_sha = EXCLUDED.commit_sha,
|
||||
policy_digest = EXCLUDED.policy_digest,
|
||||
status = 'failed',
|
||||
completed_at = EXCLUDED.completed_at,
|
||||
error_message = EXCLUDED.error_message
|
||||
""";
|
||||
|
||||
await using var command = CreateCommand(sql, connection);
|
||||
AddParameter(command, "@artifact_digest", artifactDigest);
|
||||
AddParameter(command, "@repo_uri", NormalizeOptional(request.RepoUri));
|
||||
AddParameter(command, "@commit_sha", NormalizeOptional(request.CommitSha)?.ToLowerInvariant());
|
||||
AddParameter(command, "@sbom_digest", sbomDigest);
|
||||
AddParameter(command, "@policy_digest", NormalizeOptionalDigest(request.PolicyDigest));
|
||||
AddParameter(command, "@completed_at", _timeProvider.GetUtcNow());
|
||||
AddParameter(command, "@error_message", NormalizeOptional(exception.Message));
|
||||
|
||||
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogWarning(ex, "Failed to mark scan as failed (artifact={ArtifactDigest}, sbom={SbomDigest}).", artifactDigest, sbomDigest);
|
||||
}
|
||||
}
|
||||
|
||||
private static string InferArtifactKind(CallgraphLanguage language)
|
||||
=> language switch
|
||||
{
|
||||
CallgraphLanguage.DotNet => "assembly",
|
||||
CallgraphLanguage.Java => "jar",
|
||||
CallgraphLanguage.Binary => "binary",
|
||||
CallgraphLanguage.Node or CallgraphLanguage.Python or CallgraphLanguage.Ruby or CallgraphLanguage.Php => "script",
|
||||
_ => "module"
|
||||
};
|
||||
|
||||
private static string MapVisibility(SymbolVisibility visibility)
|
||||
=> visibility switch
|
||||
{
|
||||
SymbolVisibility.Public => "public",
|
||||
SymbolVisibility.Internal => "internal",
|
||||
SymbolVisibility.Protected => "protected",
|
||||
SymbolVisibility.Private => "private",
|
||||
_ => "unknown"
|
||||
};
|
||||
|
||||
private static string MapEntrypointKind(EntrypointKind kind)
|
||||
=> kind switch
|
||||
{
|
||||
EntrypointKind.Http => "http",
|
||||
EntrypointKind.Grpc => "grpc",
|
||||
EntrypointKind.Cli => "cli",
|
||||
EntrypointKind.Job => "job",
|
||||
EntrypointKind.Event => "event",
|
||||
EntrypointKind.MessageQueue => "message_queue",
|
||||
EntrypointKind.Timer => "timer",
|
||||
EntrypointKind.Test => "test",
|
||||
EntrypointKind.Main => "main",
|
||||
EntrypointKind.ModuleInit => "module_init",
|
||||
EntrypointKind.StaticConstructor => "static_constructor",
|
||||
_ => "unknown"
|
||||
};
|
||||
|
||||
private static string MapEntrypointPhase(EntrypointPhase phase)
|
||||
=> phase switch
|
||||
{
|
||||
EntrypointPhase.ModuleInit => "module_init",
|
||||
EntrypointPhase.AppStart => "app_start",
|
||||
EntrypointPhase.Shutdown => "shutdown",
|
||||
_ => "runtime"
|
||||
};
|
||||
|
||||
private static string NormalizeRequired(string value)
|
||||
=> string.IsNullOrWhiteSpace(value)
|
||||
? throw new ArgumentException("Value is required.", nameof(value))
|
||||
: value.Trim();
|
||||
|
||||
private static string? NormalizeOptional(string? value)
|
||||
=> string.IsNullOrWhiteSpace(value) ? null : value.Trim();
|
||||
|
||||
private static string? NormalizeOptionalDigest(string? value)
|
||||
=> string.IsNullOrWhiteSpace(value) ? null : value.Trim().ToLowerInvariant();
|
||||
|
||||
private static string? SerializeAttributes(IReadOnlyDictionary<string, string>? attributes)
|
||||
{
|
||||
if (attributes is null || attributes.Count == 0)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var ordered = attributes
|
||||
.Where(kv => !string.IsNullOrWhiteSpace(kv.Key))
|
||||
.OrderBy(kv => kv.Key, StringComparer.Ordinal)
|
||||
.ToDictionary(
|
||||
kv => kv.Key.Trim(),
|
||||
kv => kv.Value,
|
||||
StringComparer.Ordinal);
|
||||
|
||||
return ordered.Count == 0 ? null : JsonSerializer.Serialize(ordered, JsonOptions);
|
||||
}
|
||||
|
||||
private static string? SerializeEvidence(IReadOnlyList<string>? evidence)
|
||||
{
|
||||
if (evidence is null || evidence.Count == 0)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var ordered = evidence
|
||||
.Where(v => !string.IsNullOrWhiteSpace(v))
|
||||
.Select(v => v.Trim())
|
||||
.Distinct(StringComparer.Ordinal)
|
||||
.OrderBy(v => v, StringComparer.Ordinal)
|
||||
.ToArray();
|
||||
|
||||
return ordered.Length == 0 ? null : JsonSerializer.Serialize(ordered, JsonOptions);
|
||||
}
|
||||
|
||||
private static bool IsPurl(string? value)
|
||||
=> !string.IsNullOrWhiteSpace(value) && value.TrimStart().StartsWith("pkg:", StringComparison.OrdinalIgnoreCase);
|
||||
}
|
||||
@@ -9,4 +9,9 @@
|
||||
<ProjectReference Include="../StellaOps.Signals/StellaOps.Signals.csproj" />
|
||||
<ProjectReference Include="../../__Libraries/StellaOps.Infrastructure.Postgres/StellaOps.Infrastructure.Postgres.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<EmbeddedResource Include="Migrations\V0000_001__extensions.sql" LogicalName="%(Filename)%(Extension)" />
|
||||
<EmbeddedResource Include="Migrations\V3102_001__callgraph_relational_tables.sql" LogicalName="%(Filename)%(Extension)" />
|
||||
</ItemGroup>
|
||||
</Project>
|
||||
|
||||
@@ -2,5 +2,5 @@
|
||||
|
||||
| Task ID | Sprint | Status | Notes |
|
||||
| --- | --- | --- | --- |
|
||||
| `SIG-PG-3102-001` | `docs/implplan/SPRINT_3102_0001_0001_postgres_callgraph_tables.md` | DOING | Add relational call graph tables + migrations wiring; register query repository and add integration coverage. |
|
||||
| `SIG-CG-3104-001` | `docs/implplan/SPRINT_3104_0001_0001_signals_callgraph_projection_completion.md` | TODO | Resume deferred sync/projection so `signals.*` relational callgraph tables become populated and queryable. |
|
||||
| `SIG-PG-3102-001` | `docs/implplan/archived/SPRINT_3102_0001_0001_postgres_callgraph_tables.md` | DONE | Added relational callgraph tables + query repository; deferred projection work picked up in `docs/implplan/SPRINT_3104_0001_0001_signals_callgraph_projection_completion.md`. |
|
||||
| `SIG-CG-3104-001` | `docs/implplan/SPRINT_3104_0001_0001_signals_callgraph_projection_completion.md` | DONE | Resume deferred sync/projection so `signals.*` relational callgraph tables become populated and queryable. |
|
||||
|
||||
@@ -117,6 +117,7 @@ builder.Services.AddSingleton<ICallgraphParser>(new SimpleJsonCallgraphParser("p
|
||||
builder.Services.AddSingleton<ICallgraphParser>(new SimpleJsonCallgraphParser("go"));
|
||||
builder.Services.AddSingleton<ICallgraphParserResolver, CallgraphParserResolver>();
|
||||
builder.Services.AddSingleton<ICallgraphIngestionService, CallgraphIngestionService>();
|
||||
builder.Services.AddSingleton<ICallGraphSyncService, NullCallGraphSyncService>();
|
||||
builder.Services.AddSingleton<IReachabilityCache>(sp =>
|
||||
{
|
||||
var options = sp.GetRequiredService<IOptions<SignalsOptions>>().Value;
|
||||
|
||||
@@ -32,6 +32,7 @@ internal sealed class CallgraphIngestionService : ICallgraphIngestionService
|
||||
private readonly ICallgraphRepository repository;
|
||||
private readonly IReachabilityStoreRepository reachabilityStore;
|
||||
private readonly ICallgraphNormalizationService normalizer;
|
||||
private readonly ICallGraphSyncService syncService;
|
||||
private readonly ILogger<CallgraphIngestionService> logger;
|
||||
private readonly SignalsOptions options;
|
||||
private readonly TimeProvider timeProvider;
|
||||
@@ -43,6 +44,7 @@ internal sealed class CallgraphIngestionService : ICallgraphIngestionService
|
||||
ICallgraphRepository repository,
|
||||
IReachabilityStoreRepository reachabilityStore,
|
||||
ICallgraphNormalizationService normalizer,
|
||||
ICallGraphSyncService syncService,
|
||||
IOptions<SignalsOptions> options,
|
||||
TimeProvider timeProvider,
|
||||
ILogger<CallgraphIngestionService> logger)
|
||||
@@ -52,6 +54,7 @@ internal sealed class CallgraphIngestionService : ICallgraphIngestionService
|
||||
this.repository = repository ?? throw new ArgumentNullException(nameof(repository));
|
||||
this.reachabilityStore = reachabilityStore ?? throw new ArgumentNullException(nameof(reachabilityStore));
|
||||
this.normalizer = normalizer ?? throw new ArgumentNullException(nameof(normalizer));
|
||||
this.syncService = syncService ?? throw new ArgumentNullException(nameof(syncService));
|
||||
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
this.timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
this.options = options?.Value ?? throw new ArgumentNullException(nameof(options));
|
||||
@@ -161,6 +164,8 @@ internal sealed class CallgraphIngestionService : ICallgraphIngestionService
|
||||
document.Edges,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
await TrySyncRelationalProjectionAsync(document, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
logger.LogInformation(
|
||||
"Ingested callgraph {Language}:{Component}:{Version} (id={Id}) with {NodeCount} nodes and {EdgeCount} edges.",
|
||||
document.Language,
|
||||
@@ -183,6 +188,45 @@ internal sealed class CallgraphIngestionService : ICallgraphIngestionService
|
||||
document.Roots?.Count ?? 0);
|
||||
}
|
||||
|
||||
private async Task TrySyncRelationalProjectionAsync(CallgraphDocument document, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
var syncRequest = new CallGraphSyncRequest(
|
||||
ArtifactDigest: document.Artifact.Hash,
|
||||
SbomDigest: GetMetadataValue(document.Metadata, "sbomDigest"),
|
||||
RepoUri: GetMetadataValue(document.Metadata, "repoUri"),
|
||||
CommitSha: GetMetadataValue(document.Metadata, "commitSha"),
|
||||
PolicyDigest: GetMetadataValue(document.Metadata, "policyDigest"),
|
||||
Document: document);
|
||||
|
||||
var result = await syncService.SyncAsync(syncRequest, cancellationToken).ConfigureAwait(false);
|
||||
if (result.WasApplied)
|
||||
{
|
||||
logger.LogInformation("Projected callgraph {CallgraphId} into relational tables (scanId={ScanId}).", document.Id, result.ScanId);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Callgraph projection failed for callgraph {CallgraphId}.", document.Id);
|
||||
}
|
||||
}
|
||||
|
||||
private static string? GetMetadataValue(IReadOnlyDictionary<string, string?>? metadata, string key)
|
||||
{
|
||||
if (metadata is null || string.IsNullOrWhiteSpace(key))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!metadata.TryGetValue(key, out var value))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
return string.IsNullOrWhiteSpace(value) ? null : value.Trim();
|
||||
}
|
||||
|
||||
private static void ValidateRequest(CallgraphIngestRequest request)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using StellaOps.Signals.Models;
|
||||
|
||||
namespace StellaOps.Signals.Services;
|
||||
|
||||
public interface ICallGraphSyncService
|
||||
{
|
||||
Task<CallGraphSyncResult> SyncAsync(CallGraphSyncRequest request, CancellationToken cancellationToken = default);
|
||||
}
|
||||
|
||||
public sealed record CallGraphSyncRequest(
|
||||
string ArtifactDigest,
|
||||
string? SbomDigest,
|
||||
string? RepoUri,
|
||||
string? CommitSha,
|
||||
string? PolicyDigest,
|
||||
CallgraphDocument Document);
|
||||
|
||||
public sealed record CallGraphSyncResult(Guid ScanId, bool WasApplied);
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace StellaOps.Signals.Services;
|
||||
|
||||
internal sealed class NullCallGraphSyncService : ICallGraphSyncService
|
||||
{
|
||||
public Task<CallGraphSyncResult> SyncAsync(CallGraphSyncRequest request, CancellationToken cancellationToken = default)
|
||||
=> Task.FromResult(new CallGraphSyncResult(ScanId: default, WasApplied: false));
|
||||
}
|
||||
|
||||
@@ -12,3 +12,4 @@ This file mirrors sprint work for the Signals module.
|
||||
| `GATE-3405-011` | `docs/implplan/SPRINT_3405_0001_0001_gate_multipliers.md` | DONE (2025-12-18) | Applied gate multipliers in `ReachabilityScoringService` using path gate evidence from callgraph edges. |
|
||||
| `GATE-3405-012` | `docs/implplan/SPRINT_3405_0001_0001_gate_multipliers.md` | DONE (2025-12-18) | Extended reachability fact evidence contract + digest to include `GateMultiplierBps` and `Gates`. |
|
||||
| `GATE-3405-016` | `docs/implplan/SPRINT_3405_0001_0001_gate_multipliers.md` | DONE (2025-12-18) | Added deterministic parser/normalizer/scoring coverage for gate propagation + multiplier effect. |
|
||||
| `SIG-CG-3104-003` | `docs/implplan/SPRINT_3104_0001_0001_signals_callgraph_projection_completion.md` | DONE (2025-12-18) | Added callgraph projection trigger via `ICallGraphSyncService` (default no-op implementation). |
|
||||
|
||||
Reference in New Issue
Block a user