feat: Implement CVSS receipt management client and models
Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled
devportal-offline / build-offline (push) Has been cancelled
Mirror Thin Bundle Sign & Verify / mirror-sign (push) Has been cancelled
Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Policy Lint & Smoke / policy-lint (push) Has been cancelled
devportal-offline / build-offline (push) Has been cancelled
Mirror Thin Bundle Sign & Verify / mirror-sign (push) Has been cancelled
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
using System.Collections.Concurrent;
|
||||
using MongoDB.Bson;
|
||||
using System.IO;
|
||||
|
||||
namespace StellaOps.Concelier.Connector.Common.Fetch;
|
||||
|
||||
@@ -8,9 +8,9 @@ namespace StellaOps.Concelier.Connector.Common.Fetch;
|
||||
/// </summary>
|
||||
public sealed class RawDocumentStorage
|
||||
{
|
||||
private readonly ConcurrentDictionary<ObjectId, byte[]> _blobs = new();
|
||||
private readonly ConcurrentDictionary<Guid, byte[]> _blobs = new();
|
||||
|
||||
public Task<ObjectId> UploadAsync(
|
||||
public Task<Guid> UploadAsync(
|
||||
string sourceName,
|
||||
string uri,
|
||||
byte[] content,
|
||||
@@ -18,19 +18,20 @@ public sealed class RawDocumentStorage
|
||||
CancellationToken cancellationToken)
|
||||
=> UploadAsync(sourceName, uri, content, contentType, expiresAt: null, cancellationToken);
|
||||
|
||||
public async Task<ObjectId> UploadAsync(
|
||||
public async Task<Guid> UploadAsync(
|
||||
string sourceName,
|
||||
string uri,
|
||||
byte[] content,
|
||||
string? contentType,
|
||||
DateTimeOffset? expiresAt,
|
||||
CancellationToken cancellationToken)
|
||||
CancellationToken cancellationToken,
|
||||
Guid? documentId = null)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrEmpty(sourceName);
|
||||
ArgumentException.ThrowIfNullOrEmpty(uri);
|
||||
ArgumentNullException.ThrowIfNull(content);
|
||||
|
||||
var id = ObjectId.GenerateNewId();
|
||||
var id = documentId ?? Guid.NewGuid();
|
||||
var copy = new byte[content.Length];
|
||||
Buffer.BlockCopy(content, 0, copy, 0, content.Length);
|
||||
_blobs[id] = copy;
|
||||
@@ -38,17 +39,17 @@ public sealed class RawDocumentStorage
|
||||
return id;
|
||||
}
|
||||
|
||||
public Task<byte[]> DownloadAsync(ObjectId id, CancellationToken cancellationToken)
|
||||
public Task<byte[]> DownloadAsync(Guid id, CancellationToken cancellationToken)
|
||||
{
|
||||
if (_blobs.TryGetValue(id, out var bytes))
|
||||
{
|
||||
return Task.FromResult(bytes);
|
||||
}
|
||||
|
||||
throw new MongoDB.Driver.GridFSFileNotFoundException($"Blob {id} not found.");
|
||||
throw new FileNotFoundException($"Blob {id} not found.");
|
||||
}
|
||||
|
||||
public async Task DeleteAsync(ObjectId id, CancellationToken cancellationToken)
|
||||
public async Task DeleteAsync(Guid id, CancellationToken cancellationToken)
|
||||
{
|
||||
_blobs.TryRemove(id, out _);
|
||||
await Task.CompletedTask.ConfigureAwait(false);
|
||||
|
||||
@@ -147,31 +147,33 @@ public sealed class SourceFetchService
|
||||
}
|
||||
}
|
||||
|
||||
var gridFsId = await _rawDocumentStorage.UploadAsync(
|
||||
var existing = await _documentStore.FindBySourceAndUriAsync(request.SourceName, request.RequestUri.ToString(), cancellationToken).ConfigureAwait(false);
|
||||
var recordId = existing?.Id ?? Guid.NewGuid();
|
||||
|
||||
var payloadId = await _rawDocumentStorage.UploadAsync(
|
||||
request.SourceName,
|
||||
request.RequestUri.ToString(),
|
||||
contentBytes,
|
||||
contentType,
|
||||
expiresAt,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var existing = await _documentStore.FindBySourceAndUriAsync(request.SourceName, request.RequestUri.ToString(), cancellationToken).ConfigureAwait(false);
|
||||
var recordId = existing?.Id ?? Guid.NewGuid();
|
||||
|
||||
var record = new DocumentRecord(
|
||||
recordId,
|
||||
request.SourceName,
|
||||
cancellationToken,
|
||||
recordId).ConfigureAwait(false);
|
||||
|
||||
var record = new DocumentRecord(
|
||||
recordId,
|
||||
request.SourceName,
|
||||
request.RequestUri.ToString(),
|
||||
fetchedAt,
|
||||
contentHash,
|
||||
DocumentStatuses.PendingParse,
|
||||
contentType,
|
||||
headers,
|
||||
metadata,
|
||||
response.Headers.ETag?.Tag,
|
||||
response.Content.Headers.LastModified,
|
||||
gridFsId,
|
||||
expiresAt);
|
||||
headers,
|
||||
metadata,
|
||||
response.Headers.ETag?.Tag,
|
||||
response.Content.Headers.LastModified,
|
||||
payloadId,
|
||||
expiresAt,
|
||||
Payload: contentBytes);
|
||||
|
||||
var upserted = await _documentStore.UpsertAsync(record, cancellationToken).ConfigureAwait(false);
|
||||
SourceDiagnostics.RecordHttpRequest(request.SourceName, request.ClientName, response.StatusCode, sendResult.Attempts, duration, contentBytes.LongLength, rateLimitRemaining);
|
||||
|
||||
@@ -4,7 +4,7 @@ using System.Security.Cryptography;
|
||||
using System.Linq;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Concelier.Models;
|
||||
using StellaOps.Concelier.Storage.Mongo.MergeEvents;
|
||||
using StellaOps.Concelier.Storage.Mongo.MergeEvents;
|
||||
|
||||
/// <summary>
|
||||
/// Persists merge events with canonical before/after hashes for auditability.
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using StellaOps.Concelier.Models;
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo
|
||||
@@ -12,6 +13,17 @@ namespace StellaOps.Concelier.Storage.Mongo
|
||||
public const string Failed = "failed";
|
||||
}
|
||||
|
||||
public static class MongoStorageDefaults
|
||||
{
|
||||
public static class Collections
|
||||
{
|
||||
public const string AdvisoryStatements = "advisory_statements";
|
||||
public const string AdvisoryRaw = "advisory_raw";
|
||||
public const string Alias = "aliases";
|
||||
public const string MergeEvent = "merge_events";
|
||||
}
|
||||
}
|
||||
|
||||
public sealed record MongoStorageOptions
|
||||
{
|
||||
public string DefaultTenant { get; init; } = "default";
|
||||
@@ -87,7 +99,7 @@ namespace StellaOps.Concelier.Storage.Mongo
|
||||
Guid DocumentId,
|
||||
string SourceName,
|
||||
string Format,
|
||||
string Payload,
|
||||
MongoDB.Bson.BsonDocument Payload,
|
||||
DateTimeOffset CreatedAt);
|
||||
|
||||
public interface IDtoStore
|
||||
@@ -117,9 +129,9 @@ namespace StellaOps.Concelier.Storage.Mongo
|
||||
{
|
||||
private readonly ConcurrentDictionary<Guid, byte[]> _blobs = new();
|
||||
|
||||
public Task<Guid> UploadAsync(string sourceName, string uri, byte[] content, string? contentType, DateTimeOffset? expiresAt, CancellationToken cancellationToken)
|
||||
public Task<Guid> UploadAsync(string sourceName, string uri, byte[] content, string? contentType, DateTimeOffset? expiresAt, CancellationToken cancellationToken, Guid? documentId = null)
|
||||
{
|
||||
var id = Guid.NewGuid();
|
||||
var id = documentId ?? Guid.NewGuid();
|
||||
_blobs[id] = content.ToArray();
|
||||
return Task.FromResult(id);
|
||||
}
|
||||
@@ -143,12 +155,12 @@ namespace StellaOps.Concelier.Storage.Mongo
|
||||
}
|
||||
}
|
||||
|
||||
public sealed record SourceStateRecord(string SourceName, string? CursorJson, DateTimeOffset UpdatedAt);
|
||||
public sealed record SourceStateRecord(string SourceName, MongoDB.Bson.BsonDocument? Cursor, DateTimeOffset UpdatedAt);
|
||||
|
||||
public interface ISourceStateRepository
|
||||
{
|
||||
Task<SourceStateRecord?> TryGetAsync(string sourceName, CancellationToken cancellationToken);
|
||||
Task UpdateCursorAsync(string sourceName, string cursorJson, DateTimeOffset completedAt, CancellationToken cancellationToken);
|
||||
Task UpdateCursorAsync(string sourceName, MongoDB.Bson.BsonDocument cursor, DateTimeOffset completedAt, CancellationToken cancellationToken);
|
||||
Task MarkFailureAsync(string sourceName, DateTimeOffset now, TimeSpan backoff, string reason, CancellationToken cancellationToken);
|
||||
}
|
||||
|
||||
@@ -162,9 +174,9 @@ namespace StellaOps.Concelier.Storage.Mongo
|
||||
return Task.FromResult<SourceStateRecord?>(record);
|
||||
}
|
||||
|
||||
public Task UpdateCursorAsync(string sourceName, string cursorJson, DateTimeOffset completedAt, CancellationToken cancellationToken)
|
||||
public Task UpdateCursorAsync(string sourceName, MongoDB.Bson.BsonDocument cursor, DateTimeOffset completedAt, CancellationToken cancellationToken)
|
||||
{
|
||||
_states[sourceName] = new SourceStateRecord(sourceName, cursorJson, completedAt);
|
||||
_states[sourceName] = new SourceStateRecord(sourceName, cursor.DeepClone(), completedAt);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
@@ -225,7 +237,15 @@ namespace StellaOps.Concelier.Storage.Mongo.Advisories
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo.Aliases
|
||||
{
|
||||
public static class AliasStoreConstants
|
||||
{
|
||||
public const string PrimaryScheme = "PRIMARY";
|
||||
public const string UnscopedScheme = "UNSCOPED";
|
||||
}
|
||||
|
||||
public sealed record AliasEntry(string Scheme, string Value);
|
||||
public sealed record AliasRecord(string AdvisoryKey, string Scheme, string Value);
|
||||
public sealed record AliasCollision(string Scheme, string Value, IReadOnlyList<string> AdvisoryKeys);
|
||||
|
||||
public interface IAliasStore
|
||||
{
|
||||
@@ -387,17 +407,52 @@ namespace StellaOps.Concelier.Storage.Mongo.Exporting
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo.MergeEvents
|
||||
{
|
||||
public sealed record MergeEventRecord(string AdvisoryKey, string EventType, DateTimeOffset CreatedAt);
|
||||
public sealed record MergeEventRecord(
|
||||
Guid Id,
|
||||
string AdvisoryKey,
|
||||
byte[] BeforeHash,
|
||||
byte[] AfterHash,
|
||||
DateTimeOffset MergedAt,
|
||||
IReadOnlyList<Guid> InputDocumentIds,
|
||||
IReadOnlyList<MergeFieldDecision> FieldDecisions);
|
||||
|
||||
public sealed record MergeFieldDecision(
|
||||
string Field,
|
||||
string? SelectedSource,
|
||||
string DecisionReason,
|
||||
DateTimeOffset? SelectedModified,
|
||||
IReadOnlyList<string> ConsideredSources);
|
||||
|
||||
public interface IMergeEventStore
|
||||
{
|
||||
Task AppendAsync(MergeEventRecord record, CancellationToken cancellationToken);
|
||||
Task<IReadOnlyList<MergeEventRecord>> GetRecentAsync(string advisoryKey, int limit, CancellationToken cancellationToken);
|
||||
}
|
||||
|
||||
public sealed class InMemoryMergeEventStore : IMergeEventStore
|
||||
{
|
||||
private readonly ConcurrentBag<MergeEventRecord> _records = new();
|
||||
|
||||
public Task AppendAsync(MergeEventRecord record, CancellationToken cancellationToken)
|
||||
{
|
||||
_records.Add(record);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task<IReadOnlyList<MergeEventRecord>> GetRecentAsync(string advisoryKey, int limit, CancellationToken cancellationToken)
|
||||
{
|
||||
var records = _records
|
||||
.Where(r => string.Equals(r.AdvisoryKey, advisoryKey, StringComparison.OrdinalIgnoreCase))
|
||||
.OrderByDescending(r => r.MergedAt)
|
||||
.Take(limit)
|
||||
.ToArray();
|
||||
|
||||
return Task.FromResult<IReadOnlyList<MergeEventRecord>>(records);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Mongo
|
||||
{
|
||||
public static class MongoStorageDefaults
|
||||
{
|
||||
public static class Collections
|
||||
{
|
||||
public const string AdvisoryStatements = "advisory_statements";
|
||||
public const string AdvisoryRaw = "advisory_raw";
|
||||
}
|
||||
}
|
||||
// Already defined above; kept for backward compatibility with legacy using directives.
|
||||
}
|
||||
|
||||
@@ -50,7 +50,7 @@ public sealed class PostgresDocumentStore : IDocumentStore
|
||||
MetadataJson: record.Metadata is null ? null : JsonSerializer.Serialize(record.Metadata, _json),
|
||||
Etag: record.Etag,
|
||||
LastModified: record.LastModified,
|
||||
Payload: Array.Empty<byte>(), // payload handled via RawDocumentStorage; keep pointer zero-length here
|
||||
Payload: record.Payload ?? Array.Empty<byte>(),
|
||||
CreatedAt: record.CreatedAt,
|
||||
UpdatedAt: DateTimeOffset.UtcNow,
|
||||
ExpiresAt: record.ExpiresAt);
|
||||
@@ -82,7 +82,8 @@ public sealed class PostgresDocumentStore : IDocumentStore
|
||||
: JsonSerializer.Deserialize<Dictionary<string, string>>(row.MetadataJson, _json),
|
||||
row.Etag,
|
||||
row.LastModified,
|
||||
PayloadId: null,
|
||||
ExpiresAt: row.ExpiresAt);
|
||||
PayloadId: row.Id,
|
||||
ExpiresAt: row.ExpiresAt,
|
||||
Payload: row.Payload);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
using System.Text.Json;
|
||||
using Dapper;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Concelier.Storage.Postgres.Models;
|
||||
using StellaOps.Infrastructure.Postgres;
|
||||
using StellaOps.Infrastructure.Postgres.Connections;
|
||||
using StellaOps.Infrastructure.Postgres.Repositories;
|
||||
|
||||
namespace StellaOps.Concelier.Storage.Postgres.Repositories;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user