using System.Security.Cryptography; using System.Text; using System.Text.Json; using System.Text.Json.Serialization; using Microsoft.Extensions.Logging; using Npgsql; using NpgsqlTypes; using StellaOps.Scanner.Storage.Postgres; using StellaOps.Scanner.WebService.Contracts; using StellaOps.Scanner.WebService.Domain; namespace StellaOps.Scanner.WebService.Services; internal sealed class CallGraphIngestionService : ICallGraphIngestionService { private const string TenantContext = "00000000-0000-0000-0000-000000000001"; private static readonly Guid TenantId = Guid.Parse(TenantContext); private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web) { DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull }; private readonly ScannerDataSource _dataSource; private readonly TimeProvider _timeProvider; private readonly ILogger _logger; private string SchemaName => _dataSource.SchemaName ?? ScannerDataSource.DefaultSchema; private string CallGraphIngestionsTable => $"{SchemaName}.callgraph_ingestions"; public CallGraphIngestionService( ScannerDataSource dataSource, TimeProvider timeProvider, ILogger logger) { _dataSource = dataSource ?? throw new ArgumentNullException(nameof(dataSource)); _timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public CallGraphValidationResult Validate(CallGraphV1Dto callGraph) { ArgumentNullException.ThrowIfNull(callGraph); var errors = new List(); if (string.IsNullOrWhiteSpace(callGraph.Schema)) { errors.Add("Schema is required."); } else if (!string.Equals(callGraph.Schema, "stella.callgraph.v1", StringComparison.Ordinal)) { errors.Add($"Unsupported schema '{callGraph.Schema}'. Expected 'stella.callgraph.v1'."); } if (string.IsNullOrWhiteSpace(callGraph.ScanKey)) { errors.Add("ScanKey is required."); } if (string.IsNullOrWhiteSpace(callGraph.Language)) { errors.Add("Language is required."); } if (callGraph.Nodes is null || callGraph.Nodes.Count == 0) { errors.Add("At least one node is required."); } if (callGraph.Edges is null || callGraph.Edges.Count == 0) { errors.Add("At least one edge is required."); } return errors.Count == 0 ? CallGraphValidationResult.Success() : CallGraphValidationResult.Failure(errors.ToArray()); } public async Task FindByDigestAsync( ScanId scanId, string contentDigest, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(scanId.Value)) { return null; } if (string.IsNullOrWhiteSpace(contentDigest)) { return null; } var sql = $""" SELECT id, content_digest, created_at_utc FROM {CallGraphIngestionsTable} WHERE tenant_id = @tenant_id AND scan_id = @scan_id AND content_digest = @content_digest LIMIT 1 """; await using var connection = await _dataSource.OpenConnectionAsync(TenantContext, "reader", cancellationToken) .ConfigureAwait(false); await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("tenant_id", TenantId); command.Parameters.AddWithValue("scan_id", scanId.Value.Trim()); command.Parameters.AddWithValue("content_digest", contentDigest.Trim()); await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) { return null; } return new ExistingCallGraphDto( Id: reader.GetString(0), Digest: reader.GetString(1), CreatedAt: reader.GetFieldValue(2)); } public async Task IngestAsync( ScanId scanId, CallGraphV1Dto callGraph, string contentDigest, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(callGraph); ArgumentException.ThrowIfNullOrWhiteSpace(scanId.Value); ArgumentException.ThrowIfNullOrWhiteSpace(contentDigest); var normalizedDigest = contentDigest.Trim(); var callgraphId = CreateCallGraphId(scanId, normalizedDigest); var now = _timeProvider.GetUtcNow(); var nodeCount = callGraph.Nodes?.Count ?? 0; var edgeCount = callGraph.Edges?.Count ?? 0; var language = callGraph.Language?.Trim() ?? string.Empty; var payload = JsonSerializer.Serialize(callGraph, JsonOptions); var insertSql = $""" INSERT INTO {CallGraphIngestionsTable} ( id, tenant_id, scan_id, content_digest, language, node_count, edge_count, created_at_utc, callgraph_json ) VALUES ( @id, @tenant_id, @scan_id, @content_digest, @language, @node_count, @edge_count, @created_at_utc, @callgraph_json::jsonb ) ON CONFLICT (tenant_id, scan_id, content_digest) DO NOTHING """; var selectSql = $""" SELECT id, content_digest, node_count, edge_count FROM {CallGraphIngestionsTable} WHERE tenant_id = @tenant_id AND scan_id = @scan_id AND content_digest = @content_digest LIMIT 1 """; await using var connection = await _dataSource.OpenConnectionAsync(TenantContext, "writer", cancellationToken) .ConfigureAwait(false); await using (var insert = new NpgsqlCommand(insertSql, connection)) { insert.Parameters.AddWithValue("id", callgraphId); insert.Parameters.AddWithValue("tenant_id", TenantId); insert.Parameters.AddWithValue("scan_id", scanId.Value.Trim()); insert.Parameters.AddWithValue("content_digest", normalizedDigest); insert.Parameters.AddWithValue("language", language); insert.Parameters.AddWithValue("node_count", nodeCount); insert.Parameters.AddWithValue("edge_count", edgeCount); insert.Parameters.AddWithValue("created_at_utc", now.UtcDateTime); insert.Parameters.Add(new NpgsqlParameter("callgraph_json", NpgsqlDbType.Jsonb) { TypedValue = payload }); await insert.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); } await using var select = new NpgsqlCommand(selectSql, connection); select.Parameters.AddWithValue("tenant_id", TenantId); select.Parameters.AddWithValue("scan_id", scanId.Value.Trim()); select.Parameters.AddWithValue("content_digest", normalizedDigest); await using var reader = await select.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) { throw new InvalidOperationException("Call graph ingestion row was not persisted."); } var persistedId = reader.GetString(0); var persistedDigest = reader.GetString(1); var persistedNodeCount = reader.GetInt32(2); var persistedEdgeCount = reader.GetInt32(3); _logger.LogInformation( "Ingested callgraph scan={ScanId} lang={Language} nodes={Nodes} edges={Edges} digest={Digest}", scanId.Value, language, persistedNodeCount, persistedEdgeCount, persistedDigest); return new CallGraphIngestionResult( CallgraphId: persistedId, NodeCount: persistedNodeCount, EdgeCount: persistedEdgeCount, Digest: persistedDigest); } private static string CreateCallGraphId(ScanId scanId, string contentDigest) { var bytes = Encoding.UTF8.GetBytes($"{scanId.Value.Trim()}:{contentDigest.Trim()}"); var hash = SHA256.HashData(bytes); return $"cg_{Convert.ToHexString(hash).ToLowerInvariant()}"; } }