From 14746936a9af573cd526276edf4955836379d05c Mon Sep 17 00:00:00 2001 From: StellaOps Bot Date: Sun, 21 Dec 2025 09:35:48 +0200 Subject: [PATCH] T5: Add PostgresExceptionApplicationRepository implementation --- .../PostgresExceptionApplicationRepository.cs | 174 ++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 src/Policy/__Libraries/StellaOps.Policy.Exceptions/Repositories/PostgresExceptionApplicationRepository.cs diff --git a/src/Policy/__Libraries/StellaOps.Policy.Exceptions/Repositories/PostgresExceptionApplicationRepository.cs b/src/Policy/__Libraries/StellaOps.Policy.Exceptions/Repositories/PostgresExceptionApplicationRepository.cs new file mode 100644 index 000000000..b6655d840 --- /dev/null +++ b/src/Policy/__Libraries/StellaOps.Policy.Exceptions/Repositories/PostgresExceptionApplicationRepository.cs @@ -0,0 +1,174 @@ +using System.Collections.Immutable; +using System.Text; +using System.Text.Json; +using Npgsql; +using NpgsqlTypes; +using StellaOps.Policy.Exceptions.Models; +namespace StellaOps.Policy.Exceptions.Repositories; +public sealed class PostgresExceptionApplicationRepository : IExceptionApplicationRepository +{ + private readonly NpgsqlDataSource _ds; + private static readonly JsonSerializerOptions JO = new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }; + public PostgresExceptionApplicationRepository(NpgsqlDataSource dataSource) => _ds = dataSource ?? throw new ArgumentNullException(nameof(dataSource)); + public async Task RecordAsync(ExceptionApplication app, CancellationToken ct = default) + { + const string sql = "INSERT INTO policy.exception_applications (id,tenant_id,exception_id,finding_id,vulnerability_id,original_status,applied_status,effect_name,effect_type,evaluation_run_id,policy_bundle_digest,applied_at,metadata) VALUES (@id,@tid,@eid,@fid,@vid,@os,@as,@en,@et,@rid,@pbd,@at,@md) RETURNING id"; + await using var cmd = _ds.CreateCommand(sql); + AddP(cmd, app); + await cmd.ExecuteScalarAsync(ct).ConfigureAwait(false); + return app; + } + public async Task> RecordBatchAsync(IEnumerable apps, CancellationToken ct = default) + { + var list = apps.ToList(); + if (list.Count == 0) return Array.Empty(); + await using var conn = await _ds.OpenConnectionAsync(ct).ConfigureAwait(false); + await using var w = await conn.BeginBinaryImportAsync("COPY policy.exception_applications (id,tenant_id,exception_id,finding_id,vulnerability_id,original_status,applied_status,effect_name,effect_type,evaluation_run_id,policy_bundle_digest,applied_at,metadata) FROM STDIN (FORMAT BINARY)", ct).ConfigureAwait(false); + foreach (var a in list) + { + await w.StartRowAsync(ct).ConfigureAwait(false); + await w.WriteAsync(a.Id, NpgsqlDbType.Uuid, ct).ConfigureAwait(false); + await w.WriteAsync(a.TenantId, NpgsqlDbType.Uuid, ct).ConfigureAwait(false); + await w.WriteAsync(a.ExceptionId, NpgsqlDbType.Text, ct).ConfigureAwait(false); + await w.WriteAsync(a.FindingId, NpgsqlDbType.Text, ct).ConfigureAwait(false); + await w.WriteAsync(a.VulnerabilityId ?? (object)DBNull.Value, NpgsqlDbType.Text, ct).ConfigureAwait(false); + await w.WriteAsync(a.OriginalStatus, NpgsqlDbType.Text, ct).ConfigureAwait(false); + await w.WriteAsync(a.AppliedStatus, NpgsqlDbType.Text, ct).ConfigureAwait(false); + await w.WriteAsync(a.EffectName, NpgsqlDbType.Text, ct).ConfigureAwait(false); + await w.WriteAsync(a.EffectType, NpgsqlDbType.Text, ct).ConfigureAwait(false); + await w.WriteAsync(a.EvaluationRunId ?? (object)DBNull.Value, NpgsqlDbType.Uuid, ct).ConfigureAwait(false); + await w.WriteAsync(a.PolicyBundleDigest ?? (object)DBNull.Value, NpgsqlDbType.Text, ct).ConfigureAwait(false); + await w.WriteAsync(a.AppliedAt, NpgsqlDbType.TimestampTz, ct).ConfigureAwait(false); + await w.WriteAsync(JsonSerializer.Serialize(a.Metadata, JO), NpgsqlDbType.Jsonb, ct).ConfigureAwait(false); + } + await w.CompleteAsync(ct).ConfigureAwait(false); + return list; + } + public async Task> GetByExceptionIdAsync(Guid tid, string eid, int? limit = null, CancellationToken ct = default) + { + var sql = Sel("WHERE tenant_id=@tid AND exception_id=@eid", limit); + await using var cmd = _ds.CreateCommand(sql); + cmd.Parameters.AddWithValue("tid", tid); + cmd.Parameters.AddWithValue("eid", eid); + return await Read(cmd, ct).ConfigureAwait(false); + } + public async Task> GetByFindingIdAsync(Guid tid, string fid, int? limit = null, CancellationToken ct = default) + { + var sql = Sel("WHERE tenant_id=@tid AND finding_id=@fid", limit); + await using var cmd = _ds.CreateCommand(sql); + cmd.Parameters.AddWithValue("tid", tid); + cmd.Parameters.AddWithValue("fid", fid); + return await Read(cmd, ct).ConfigureAwait(false); + } + public async Task> GetByVulnerabilityIdAsync(Guid tid, string vid, int? limit = null, CancellationToken ct = default) + { + var sql = Sel("WHERE tenant_id=@tid AND vulnerability_id=@vid", limit); + await using var cmd = _ds.CreateCommand(sql); + cmd.Parameters.AddWithValue("tid", tid); + cmd.Parameters.AddWithValue("vid", vid); + return await Read(cmd, ct).ConfigureAwait(false); + } + public async Task> GetByEvaluationRunIdAsync(Guid tid, Guid rid, CancellationToken ct = default) + { + var sql = Sel("WHERE tenant_id=@tid AND evaluation_run_id=@rid", null); + await using var cmd = _ds.CreateCommand(sql); + cmd.Parameters.AddWithValue("tid", tid); + cmd.Parameters.AddWithValue("rid", rid); + return await Read(cmd, ct).ConfigureAwait(false); + } + public async Task> GetByTimeRangeAsync(Guid tid, DateTimeOffset from, DateTimeOffset to, int? limit = null, CancellationToken ct = default) + { + var sql = Sel("WHERE tenant_id=@tid AND applied_at>=@f AND applied_at<=@t", limit); + await using var cmd = _ds.CreateCommand(sql); + cmd.Parameters.AddWithValue("tid", tid); + cmd.Parameters.AddWithValue("f", from); + cmd.Parameters.AddWithValue("t", to); + return await Read(cmd, ct).ConfigureAwait(false); + } + public async Task GetStatisticsAsync(Guid tid, ExceptionApplicationFilter? filter = null, CancellationToken ct = default) + { + var (wc, ps) = Where(tid, filter); + var sql = $"SELECT COUNT(*),COUNT(DISTINCT exception_id),COUNT(DISTINCT finding_id),COUNT(DISTINCT vulnerability_id),MIN(applied_at),MAX(applied_at) FROM policy.exception_applications {wc}"; + await using var cmd = _ds.CreateCommand(sql); + foreach (var (n, v) in ps) cmd.Parameters.AddWithValue(n, v); + int tot = 0, ue = 0, uf = 0, uv = 0; + DateTimeOffset? ea = null, la = null; + await using (var r = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false)) + { + if (await r.ReadAsync(ct).ConfigureAwait(false)) { tot = r.GetInt32(0); ue = r.GetInt32(1); uf = r.GetInt32(2); uv = r.GetInt32(3); ea = r.IsDBNull(4) ? null : r.GetDateTime(4); la = r.IsDBNull(5) ? null : r.GetDateTime(5); } + } + var bet = await GC(tid, "effect_type", filter, ct).ConfigureAwait(false); + var bas = await GC(tid, "applied_status", filter, ct).ConfigureAwait(false); + return new ExceptionApplicationStatistics(tot, ue, uf, uv, bet, bas, ea, la); + } + public async Task CountAsync(Guid tid, ExceptionApplicationFilter? filter = null, CancellationToken ct = default) + { + var (wc, ps) = Where(tid, filter); + var sql = $"SELECT COUNT(*) FROM policy.exception_applications {wc}"; + await using var cmd = _ds.CreateCommand(sql); + foreach (var (n, v) in ps) cmd.Parameters.AddWithValue(n, v); + return Convert.ToInt32(await cmd.ExecuteScalarAsync(ct).ConfigureAwait(false)); + } + private async Task> GC(Guid tid, string col, ExceptionApplicationFilter? f, CancellationToken ct) + { + var (wc, ps) = Where(tid, f); + var sql = $"SELECT {col},COUNT(*) FROM policy.exception_applications {wc} GROUP BY {col}"; + await using var cmd = _ds.CreateCommand(sql); + foreach (var (n, v) in ps) cmd.Parameters.AddWithValue(n, v); + var r = new Dictionary(); + await using var rd = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false); + while (await rd.ReadAsync(ct).ConfigureAwait(false)) r[rd.GetString(0)] = rd.GetInt32(1); + return r; + } + private static (string, List<(string, object)>) Where(Guid tid, ExceptionApplicationFilter? f) + { + var c = new List { "tenant_id=@tid" }; + var p = new List<(string, object)> { ("tid", tid) }; + if (f is null) return ($"WHERE {string.Join(" AND ", c)}", p); + if (!string.IsNullOrEmpty(f.ExceptionId)) { c.Add("exception_id=@eid"); p.Add(("eid", f.ExceptionId)); } + if (!string.IsNullOrEmpty(f.FindingId)) { c.Add("finding_id=@fid"); p.Add(("fid", f.FindingId)); } + if (!string.IsNullOrEmpty(f.VulnerabilityId)) { c.Add("vulnerability_id=@vid"); p.Add(("vid", f.VulnerabilityId)); } + if (f.EvaluationRunId.HasValue) { c.Add("evaluation_run_id=@rid"); p.Add(("rid", f.EvaluationRunId.Value)); } + if (!string.IsNullOrEmpty(f.EffectType)) { c.Add("effect_type=@et"); p.Add(("et", f.EffectType)); } + if (!string.IsNullOrEmpty(f.AppliedStatus)) { c.Add("applied_status=@as"); p.Add(("as", f.AppliedStatus)); } + if (f.FromDate.HasValue) { c.Add("applied_at>=@fd"); p.Add(("fd", f.FromDate.Value)); } + if (f.ToDate.HasValue) { c.Add("applied_at<=@td"); p.Add(("td", f.ToDate.Value)); } + return ($"WHERE {string.Join(" AND ", c)}", p); + } + private static string Sel(string w, int? l) + { + var sb = new StringBuilder("SELECT id,tenant_id,exception_id,finding_id,vulnerability_id,original_status,applied_status,effect_name,effect_type,evaluation_run_id,policy_bundle_digest,applied_at,metadata FROM policy.exception_applications "); + sb.Append(w).Append(" ORDER BY applied_at DESC"); + if (l.HasValue) sb.Append(" LIMIT ").Append(l.Value); + return sb.ToString(); + } + private static async Task> Read(NpgsqlCommand cmd, CancellationToken ct) + { + var res = new List(); + await using var r = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false); + while (await r.ReadAsync(ct).ConfigureAwait(false)) res.Add(Map(r)); + return res; + } + private static ExceptionApplication Map(NpgsqlDataReader r) + { + var mj = r.IsDBNull(12) ? "{}" : r.GetString(12); + var md = JsonSerializer.Deserialize>(mj, JO) ?? new(); + return new ExceptionApplication { Id = r.GetGuid(0), TenantId = r.GetGuid(1), ExceptionId = r.GetString(2), FindingId = r.GetString(3), VulnerabilityId = r.IsDBNull(4) ? null : r.GetString(4), OriginalStatus = r.GetString(5), AppliedStatus = r.GetString(6), EffectName = r.GetString(7), EffectType = r.GetString(8), EvaluationRunId = r.IsDBNull(9) ? null : r.GetGuid(9), PolicyBundleDigest = r.IsDBNull(10) ? null : r.GetString(10), AppliedAt = r.GetDateTime(11), Metadata = md.ToImmutableDictionary() }; + } + private static void AddP(NpgsqlCommand cmd, ExceptionApplication a) + { + cmd.Parameters.AddWithValue("id", a.Id); + cmd.Parameters.AddWithValue("tid", a.TenantId); + cmd.Parameters.AddWithValue("eid", a.ExceptionId); + cmd.Parameters.AddWithValue("fid", a.FindingId); + cmd.Parameters.AddWithValue("vid", (object?)a.VulnerabilityId ?? DBNull.Value); + cmd.Parameters.AddWithValue("os", a.OriginalStatus); + cmd.Parameters.AddWithValue("as", a.AppliedStatus); + cmd.Parameters.AddWithValue("en", a.EffectName); + cmd.Parameters.AddWithValue("et", a.EffectType); + cmd.Parameters.AddWithValue("rid", (object?)a.EvaluationRunId ?? DBNull.Value); + cmd.Parameters.AddWithValue("pbd", (object?)a.PolicyBundleDigest ?? DBNull.Value); + cmd.Parameters.AddWithValue("at", a.AppliedAt); + cmd.Parameters.AddWithValue("md", NpgsqlDbType.Jsonb, JsonSerializer.Serialize(a.Metadata, JO)); + } +} \ No newline at end of file