T5: Add PostgresExceptionApplicationRepository implementation

This commit is contained in:
StellaOps Bot
2025-12-21 09:35:48 +02:00
parent 94ea6c5e88
commit 14746936a9

View File

@@ -0,0 +1,174 @@
using System.Collections.Immutable;
using System.Text;
using System.Text.Json;
using Npgsql;
using NpgsqlTypes;
using StellaOps.Policy.Exceptions.Models;
namespace StellaOps.Policy.Exceptions.Repositories;
public sealed class PostgresExceptionApplicationRepository : IExceptionApplicationRepository
{
private readonly NpgsqlDataSource _ds;
private static readonly JsonSerializerOptions JO = new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
public PostgresExceptionApplicationRepository(NpgsqlDataSource dataSource) => _ds = dataSource ?? throw new ArgumentNullException(nameof(dataSource));
public async Task<ExceptionApplication> RecordAsync(ExceptionApplication app, CancellationToken ct = default)
{
const string sql = "INSERT INTO policy.exception_applications (id,tenant_id,exception_id,finding_id,vulnerability_id,original_status,applied_status,effect_name,effect_type,evaluation_run_id,policy_bundle_digest,applied_at,metadata) VALUES (@id,@tid,@eid,@fid,@vid,@os,@as,@en,@et,@rid,@pbd,@at,@md) RETURNING id";
await using var cmd = _ds.CreateCommand(sql);
AddP(cmd, app);
await cmd.ExecuteScalarAsync(ct).ConfigureAwait(false);
return app;
}
public async Task<IReadOnlyList<ExceptionApplication>> RecordBatchAsync(IEnumerable<ExceptionApplication> apps, CancellationToken ct = default)
{
var list = apps.ToList();
if (list.Count == 0) return Array.Empty<ExceptionApplication>();
await using var conn = await _ds.OpenConnectionAsync(ct).ConfigureAwait(false);
await using var w = await conn.BeginBinaryImportAsync("COPY policy.exception_applications (id,tenant_id,exception_id,finding_id,vulnerability_id,original_status,applied_status,effect_name,effect_type,evaluation_run_id,policy_bundle_digest,applied_at,metadata) FROM STDIN (FORMAT BINARY)", ct).ConfigureAwait(false);
foreach (var a in list)
{
await w.StartRowAsync(ct).ConfigureAwait(false);
await w.WriteAsync(a.Id, NpgsqlDbType.Uuid, ct).ConfigureAwait(false);
await w.WriteAsync(a.TenantId, NpgsqlDbType.Uuid, ct).ConfigureAwait(false);
await w.WriteAsync(a.ExceptionId, NpgsqlDbType.Text, ct).ConfigureAwait(false);
await w.WriteAsync(a.FindingId, NpgsqlDbType.Text, ct).ConfigureAwait(false);
await w.WriteAsync(a.VulnerabilityId ?? (object)DBNull.Value, NpgsqlDbType.Text, ct).ConfigureAwait(false);
await w.WriteAsync(a.OriginalStatus, NpgsqlDbType.Text, ct).ConfigureAwait(false);
await w.WriteAsync(a.AppliedStatus, NpgsqlDbType.Text, ct).ConfigureAwait(false);
await w.WriteAsync(a.EffectName, NpgsqlDbType.Text, ct).ConfigureAwait(false);
await w.WriteAsync(a.EffectType, NpgsqlDbType.Text, ct).ConfigureAwait(false);
await w.WriteAsync(a.EvaluationRunId ?? (object)DBNull.Value, NpgsqlDbType.Uuid, ct).ConfigureAwait(false);
await w.WriteAsync(a.PolicyBundleDigest ?? (object)DBNull.Value, NpgsqlDbType.Text, ct).ConfigureAwait(false);
await w.WriteAsync(a.AppliedAt, NpgsqlDbType.TimestampTz, ct).ConfigureAwait(false);
await w.WriteAsync(JsonSerializer.Serialize(a.Metadata, JO), NpgsqlDbType.Jsonb, ct).ConfigureAwait(false);
}
await w.CompleteAsync(ct).ConfigureAwait(false);
return list;
}
public async Task<IReadOnlyList<ExceptionApplication>> GetByExceptionIdAsync(Guid tid, string eid, int? limit = null, CancellationToken ct = default)
{
var sql = Sel("WHERE tenant_id=@tid AND exception_id=@eid", limit);
await using var cmd = _ds.CreateCommand(sql);
cmd.Parameters.AddWithValue("tid", tid);
cmd.Parameters.AddWithValue("eid", eid);
return await Read(cmd, ct).ConfigureAwait(false);
}
public async Task<IReadOnlyList<ExceptionApplication>> GetByFindingIdAsync(Guid tid, string fid, int? limit = null, CancellationToken ct = default)
{
var sql = Sel("WHERE tenant_id=@tid AND finding_id=@fid", limit);
await using var cmd = _ds.CreateCommand(sql);
cmd.Parameters.AddWithValue("tid", tid);
cmd.Parameters.AddWithValue("fid", fid);
return await Read(cmd, ct).ConfigureAwait(false);
}
public async Task<IReadOnlyList<ExceptionApplication>> GetByVulnerabilityIdAsync(Guid tid, string vid, int? limit = null, CancellationToken ct = default)
{
var sql = Sel("WHERE tenant_id=@tid AND vulnerability_id=@vid", limit);
await using var cmd = _ds.CreateCommand(sql);
cmd.Parameters.AddWithValue("tid", tid);
cmd.Parameters.AddWithValue("vid", vid);
return await Read(cmd, ct).ConfigureAwait(false);
}
public async Task<IReadOnlyList<ExceptionApplication>> GetByEvaluationRunIdAsync(Guid tid, Guid rid, CancellationToken ct = default)
{
var sql = Sel("WHERE tenant_id=@tid AND evaluation_run_id=@rid", null);
await using var cmd = _ds.CreateCommand(sql);
cmd.Parameters.AddWithValue("tid", tid);
cmd.Parameters.AddWithValue("rid", rid);
return await Read(cmd, ct).ConfigureAwait(false);
}
public async Task<IReadOnlyList<ExceptionApplication>> GetByTimeRangeAsync(Guid tid, DateTimeOffset from, DateTimeOffset to, int? limit = null, CancellationToken ct = default)
{
var sql = Sel("WHERE tenant_id=@tid AND applied_at>=@f AND applied_at<=@t", limit);
await using var cmd = _ds.CreateCommand(sql);
cmd.Parameters.AddWithValue("tid", tid);
cmd.Parameters.AddWithValue("f", from);
cmd.Parameters.AddWithValue("t", to);
return await Read(cmd, ct).ConfigureAwait(false);
}
public async Task<ExceptionApplicationStatistics> GetStatisticsAsync(Guid tid, ExceptionApplicationFilter? filter = null, CancellationToken ct = default)
{
var (wc, ps) = Where(tid, filter);
var sql = $"SELECT COUNT(*),COUNT(DISTINCT exception_id),COUNT(DISTINCT finding_id),COUNT(DISTINCT vulnerability_id),MIN(applied_at),MAX(applied_at) FROM policy.exception_applications {wc}";
await using var cmd = _ds.CreateCommand(sql);
foreach (var (n, v) in ps) cmd.Parameters.AddWithValue(n, v);
int tot = 0, ue = 0, uf = 0, uv = 0;
DateTimeOffset? ea = null, la = null;
await using (var r = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false))
{
if (await r.ReadAsync(ct).ConfigureAwait(false)) { tot = r.GetInt32(0); ue = r.GetInt32(1); uf = r.GetInt32(2); uv = r.GetInt32(3); ea = r.IsDBNull(4) ? null : r.GetDateTime(4); la = r.IsDBNull(5) ? null : r.GetDateTime(5); }
}
var bet = await GC(tid, "effect_type", filter, ct).ConfigureAwait(false);
var bas = await GC(tid, "applied_status", filter, ct).ConfigureAwait(false);
return new ExceptionApplicationStatistics(tot, ue, uf, uv, bet, bas, ea, la);
}
public async Task<int> CountAsync(Guid tid, ExceptionApplicationFilter? filter = null, CancellationToken ct = default)
{
var (wc, ps) = Where(tid, filter);
var sql = $"SELECT COUNT(*) FROM policy.exception_applications {wc}";
await using var cmd = _ds.CreateCommand(sql);
foreach (var (n, v) in ps) cmd.Parameters.AddWithValue(n, v);
return Convert.ToInt32(await cmd.ExecuteScalarAsync(ct).ConfigureAwait(false));
}
private async Task<Dictionary<string, int>> GC(Guid tid, string col, ExceptionApplicationFilter? f, CancellationToken ct)
{
var (wc, ps) = Where(tid, f);
var sql = $"SELECT {col},COUNT(*) FROM policy.exception_applications {wc} GROUP BY {col}";
await using var cmd = _ds.CreateCommand(sql);
foreach (var (n, v) in ps) cmd.Parameters.AddWithValue(n, v);
var r = new Dictionary<string, int>();
await using var rd = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false);
while (await rd.ReadAsync(ct).ConfigureAwait(false)) r[rd.GetString(0)] = rd.GetInt32(1);
return r;
}
private static (string, List<(string, object)>) Where(Guid tid, ExceptionApplicationFilter? f)
{
var c = new List<string> { "tenant_id=@tid" };
var p = new List<(string, object)> { ("tid", tid) };
if (f is null) return ($"WHERE {string.Join(" AND ", c)}", p);
if (!string.IsNullOrEmpty(f.ExceptionId)) { c.Add("exception_id=@eid"); p.Add(("eid", f.ExceptionId)); }
if (!string.IsNullOrEmpty(f.FindingId)) { c.Add("finding_id=@fid"); p.Add(("fid", f.FindingId)); }
if (!string.IsNullOrEmpty(f.VulnerabilityId)) { c.Add("vulnerability_id=@vid"); p.Add(("vid", f.VulnerabilityId)); }
if (f.EvaluationRunId.HasValue) { c.Add("evaluation_run_id=@rid"); p.Add(("rid", f.EvaluationRunId.Value)); }
if (!string.IsNullOrEmpty(f.EffectType)) { c.Add("effect_type=@et"); p.Add(("et", f.EffectType)); }
if (!string.IsNullOrEmpty(f.AppliedStatus)) { c.Add("applied_status=@as"); p.Add(("as", f.AppliedStatus)); }
if (f.FromDate.HasValue) { c.Add("applied_at>=@fd"); p.Add(("fd", f.FromDate.Value)); }
if (f.ToDate.HasValue) { c.Add("applied_at<=@td"); p.Add(("td", f.ToDate.Value)); }
return ($"WHERE {string.Join(" AND ", c)}", p);
}
private static string Sel(string w, int? l)
{
var sb = new StringBuilder("SELECT id,tenant_id,exception_id,finding_id,vulnerability_id,original_status,applied_status,effect_name,effect_type,evaluation_run_id,policy_bundle_digest,applied_at,metadata FROM policy.exception_applications ");
sb.Append(w).Append(" ORDER BY applied_at DESC");
if (l.HasValue) sb.Append(" LIMIT ").Append(l.Value);
return sb.ToString();
}
private static async Task<IReadOnlyList<ExceptionApplication>> Read(NpgsqlCommand cmd, CancellationToken ct)
{
var res = new List<ExceptionApplication>();
await using var r = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false);
while (await r.ReadAsync(ct).ConfigureAwait(false)) res.Add(Map(r));
return res;
}
private static ExceptionApplication Map(NpgsqlDataReader r)
{
var mj = r.IsDBNull(12) ? "{}" : r.GetString(12);
var md = JsonSerializer.Deserialize<Dictionary<string, string>>(mj, JO) ?? new();
return new ExceptionApplication { Id = r.GetGuid(0), TenantId = r.GetGuid(1), ExceptionId = r.GetString(2), FindingId = r.GetString(3), VulnerabilityId = r.IsDBNull(4) ? null : r.GetString(4), OriginalStatus = r.GetString(5), AppliedStatus = r.GetString(6), EffectName = r.GetString(7), EffectType = r.GetString(8), EvaluationRunId = r.IsDBNull(9) ? null : r.GetGuid(9), PolicyBundleDigest = r.IsDBNull(10) ? null : r.GetString(10), AppliedAt = r.GetDateTime(11), Metadata = md.ToImmutableDictionary() };
}
private static void AddP(NpgsqlCommand cmd, ExceptionApplication a)
{
cmd.Parameters.AddWithValue("id", a.Id);
cmd.Parameters.AddWithValue("tid", a.TenantId);
cmd.Parameters.AddWithValue("eid", a.ExceptionId);
cmd.Parameters.AddWithValue("fid", a.FindingId);
cmd.Parameters.AddWithValue("vid", (object?)a.VulnerabilityId ?? DBNull.Value);
cmd.Parameters.AddWithValue("os", a.OriginalStatus);
cmd.Parameters.AddWithValue("as", a.AppliedStatus);
cmd.Parameters.AddWithValue("en", a.EffectName);
cmd.Parameters.AddWithValue("et", a.EffectType);
cmd.Parameters.AddWithValue("rid", (object?)a.EvaluationRunId ?? DBNull.Value);
cmd.Parameters.AddWithValue("pbd", (object?)a.PolicyBundleDigest ?? DBNull.Value);
cmd.Parameters.AddWithValue("at", a.AppliedAt);
cmd.Parameters.AddWithValue("md", NpgsqlDbType.Jsonb, JsonSerializer.Serialize(a.Metadata, JO));
}
}