feat: Add DigestUpsertRequest and LockEntity models
Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Export Center CI / export-ci (push) Has been cancelled
Mirror Thin Bundle Sign & Verify / mirror-sign (push) Has been cancelled
Some checks failed
AOC Guard CI / aoc-guard (push) Has been cancelled
AOC Guard CI / aoc-verify (push) Has been cancelled
Concelier Attestation Tests / attestation-tests (push) Has been cancelled
Docs CI / lint-and-preview (push) Has been cancelled
Export Center CI / export-ci (push) Has been cancelled
Mirror Thin Bundle Sign & Verify / mirror-sign (push) Has been cancelled
- Introduced DigestUpsertRequest for handling digest upsert requests with properties like ChannelId, Recipient, DigestKey, Events, and CollectUntil. - Created LockEntity to represent a lightweight distributed lock entry with properties such as Id, TenantId, Resource, Owner, ExpiresAt, and CreatedAt. feat: Implement ILockRepository interface and LockRepository class - Defined ILockRepository interface with methods for acquiring and releasing locks. - Implemented LockRepository class with methods to try acquiring a lock and releasing it, using SQL for upsert operations. feat: Add SurfaceManifestPointer record for manifest pointers - Introduced SurfaceManifestPointer to represent a minimal pointer to a Surface.FS manifest associated with an image digest. feat: Create PolicySimulationInputLock and related validation logic - Added PolicySimulationInputLock record to describe policy simulation inputs and expected digests. - Implemented validation logic for policy simulation inputs, including checks for digest drift and shadow mode requirements. test: Add unit tests for ReplayVerificationService and ReplayVerifier - Created ReplayVerificationServiceTests to validate the behavior of the ReplayVerificationService under various scenarios. - Developed ReplayVerifierTests to ensure the correctness of the ReplayVerifier logic. test: Implement PolicySimulationInputLockValidatorTests - Added tests for PolicySimulationInputLockValidator to verify the validation logic against expected inputs and conditions. chore: Add cosign key example and signing scripts - Included a placeholder cosign key example for development purposes. - Added a script for signing Signals artifacts using cosign with support for both v2 and v3. chore: Create script for uploading evidence to the evidence locker - Developed a script to upload evidence to the evidence locker, ensuring required environment variables are set.
This commit is contained in:
@@ -291,6 +291,20 @@ CREATE TABLE IF NOT EXISTS notify.audit (
|
||||
CREATE INDEX idx_audit_tenant ON notify.audit(tenant_id);
|
||||
CREATE INDEX idx_audit_created ON notify.audit(tenant_id, created_at);
|
||||
|
||||
-- Locks table (lightweight distributed locks)
|
||||
CREATE TABLE IF NOT EXISTS notify.locks (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
tenant_id TEXT NOT NULL,
|
||||
resource TEXT NOT NULL,
|
||||
owner TEXT NOT NULL,
|
||||
expires_at TIMESTAMPTZ NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
UNIQUE(tenant_id, resource)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_locks_tenant ON notify.locks(tenant_id);
|
||||
CREATE INDEX idx_locks_expiry ON notify.locks(expires_at);
|
||||
|
||||
-- Update timestamp function
|
||||
CREATE OR REPLACE FUNCTION notify.update_updated_at()
|
||||
RETURNS TRIGGER AS $$
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
namespace StellaOps.Notify.Storage.Postgres.Models;
|
||||
|
||||
/// <summary>
|
||||
/// Represents a lightweight distributed lock entry.
|
||||
/// </summary>
|
||||
public sealed class LockEntity
|
||||
{
|
||||
public required Guid Id { get; init; }
|
||||
public required string TenantId { get; init; }
|
||||
public required string Resource { get; init; }
|
||||
public required string Owner { get; init; }
|
||||
public required DateTimeOffset ExpiresAt { get; init; }
|
||||
public DateTimeOffset CreatedAt { get; init; }
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
using System.Text;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Npgsql;
|
||||
using StellaOps.Infrastructure.Postgres.Repositories;
|
||||
@@ -62,6 +63,128 @@ public sealed class DeliveryRepository : RepositoryBase<NotifyDataSource>, IDeli
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<DeliveryEntity> UpsertAsync(DeliveryEntity delivery, CancellationToken cancellationToken = default)
|
||||
{
|
||||
const string sql = """
|
||||
INSERT INTO notify.deliveries (
|
||||
id, tenant_id, channel_id, rule_id, template_id, status, recipient, subject, body,
|
||||
event_type, event_payload, attempt, max_attempts, next_retry_at, error_message,
|
||||
external_id, correlation_id, created_at, queued_at, sent_at, delivered_at, failed_at
|
||||
) VALUES (
|
||||
@id, @tenant_id, @channel_id, @rule_id, @template_id, @status::notify.delivery_status, @recipient, @subject, @body,
|
||||
@event_type, @event_payload::jsonb, @attempt, @max_attempts, @next_retry_at, @error_message,
|
||||
@external_id, @correlation_id, @created_at, @queued_at, @sent_at, @delivered_at, @failed_at
|
||||
)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
status = EXCLUDED.status,
|
||||
recipient = EXCLUDED.recipient,
|
||||
subject = EXCLUDED.subject,
|
||||
body = EXCLUDED.body,
|
||||
event_type = EXCLUDED.event_type,
|
||||
event_payload = EXCLUDED.event_payload,
|
||||
attempt = EXCLUDED.attempt,
|
||||
max_attempts = EXCLUDED.max_attempts,
|
||||
next_retry_at = EXCLUDED.next_retry_at,
|
||||
error_message = EXCLUDED.error_message,
|
||||
external_id = COALESCE(EXCLUDED.external_id, notify.deliveries.external_id),
|
||||
correlation_id = EXCLUDED.correlation_id,
|
||||
rule_id = EXCLUDED.rule_id,
|
||||
template_id = EXCLUDED.template_id,
|
||||
channel_id = EXCLUDED.channel_id,
|
||||
queued_at = EXCLUDED.queued_at,
|
||||
sent_at = EXCLUDED.sent_at,
|
||||
delivered_at = EXCLUDED.delivered_at,
|
||||
failed_at = EXCLUDED.failed_at
|
||||
RETURNING *
|
||||
""";
|
||||
|
||||
await using var connection = await DataSource.OpenConnectionAsync(delivery.TenantId, "writer", cancellationToken).ConfigureAwait(false);
|
||||
await using var command = CreateCommand(sql, connection);
|
||||
AddDeliveryParameters(command, delivery);
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);
|
||||
await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
|
||||
return MapDelivery(reader);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<IReadOnlyList<DeliveryEntity>> QueryAsync(
|
||||
string tenantId,
|
||||
DeliveryStatus? status = null,
|
||||
Guid? channelId = null,
|
||||
string? eventType = null,
|
||||
DateTimeOffset? since = null,
|
||||
DateTimeOffset? until = null,
|
||||
int limit = 100,
|
||||
int offset = 0,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
var sql = new StringBuilder("SELECT * FROM notify.deliveries WHERE tenant_id = @tenant_id");
|
||||
|
||||
if (status is not null)
|
||||
{
|
||||
sql.Append(" AND status = @status::notify.delivery_status");
|
||||
}
|
||||
|
||||
if (channelId is not null)
|
||||
{
|
||||
sql.Append(" AND channel_id = @channel_id");
|
||||
}
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(eventType))
|
||||
{
|
||||
sql.Append(" AND event_type = @event_type");
|
||||
}
|
||||
|
||||
if (since is not null)
|
||||
{
|
||||
sql.Append(" AND created_at >= @since");
|
||||
}
|
||||
|
||||
if (until is not null)
|
||||
{
|
||||
sql.Append(" AND created_at <= @until");
|
||||
}
|
||||
|
||||
sql.Append(" ORDER BY created_at DESC, id LIMIT @limit OFFSET @offset");
|
||||
|
||||
return await QueryAsync(
|
||||
tenantId,
|
||||
sql.ToString(),
|
||||
cmd =>
|
||||
{
|
||||
AddParameter(cmd, "tenant_id", tenantId);
|
||||
if (status is not null)
|
||||
{
|
||||
AddParameter(cmd, "status", StatusToString(status.Value));
|
||||
}
|
||||
|
||||
if (channelId is not null)
|
||||
{
|
||||
AddParameter(cmd, "channel_id", channelId.Value);
|
||||
}
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(eventType))
|
||||
{
|
||||
AddParameter(cmd, "event_type", eventType);
|
||||
}
|
||||
|
||||
if (since is not null)
|
||||
{
|
||||
AddParameter(cmd, "since", since.Value);
|
||||
}
|
||||
|
||||
if (until is not null)
|
||||
{
|
||||
AddParameter(cmd, "until", until.Value);
|
||||
}
|
||||
|
||||
AddParameter(cmd, "limit", limit);
|
||||
AddParameter(cmd, "offset", offset);
|
||||
},
|
||||
MapDelivery,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<IReadOnlyList<DeliveryEntity>> GetPendingAsync(
|
||||
string tenantId,
|
||||
|
||||
@@ -124,6 +124,23 @@ public sealed class DigestRepository : RepositoryBase<NotifyDataSource>, IDigest
|
||||
return await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<bool> DeleteByKeyAsync(string tenantId, Guid channelId, string recipient, string digestKey, CancellationToken cancellationToken = default)
|
||||
{
|
||||
const string sql = "DELETE FROM notify.digests WHERE tenant_id = @tenant_id AND channel_id = @channel_id AND recipient = @recipient AND digest_key = @digest_key";
|
||||
var rows = await ExecuteAsync(
|
||||
tenantId,
|
||||
sql,
|
||||
cmd =>
|
||||
{
|
||||
AddParameter(cmd, "tenant_id", tenantId);
|
||||
AddParameter(cmd, "channel_id", channelId);
|
||||
AddParameter(cmd, "recipient", recipient);
|
||||
AddParameter(cmd, "digest_key", digestKey);
|
||||
},
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
return rows > 0;
|
||||
}
|
||||
|
||||
private static DigestEntity MapDigest(NpgsqlDataReader reader) => new()
|
||||
{
|
||||
Id = reader.GetGuid(0),
|
||||
|
||||
@@ -43,6 +43,25 @@ public interface IDeliveryRepository
|
||||
string correlationId,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Queries deliveries with optional filters.
|
||||
/// </summary>
|
||||
Task<IReadOnlyList<DeliveryEntity>> QueryAsync(
|
||||
string tenantId,
|
||||
DeliveryStatus? status = null,
|
||||
Guid? channelId = null,
|
||||
string? eventType = null,
|
||||
DateTimeOffset? since = null,
|
||||
DateTimeOffset? until = null,
|
||||
int limit = 100,
|
||||
int offset = 0,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Inserts or updates a delivery row by id.
|
||||
/// </summary>
|
||||
Task<DeliveryEntity> UpsertAsync(DeliveryEntity delivery, CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Marks a delivery as queued.
|
||||
/// </summary>
|
||||
|
||||
@@ -12,4 +12,5 @@ public interface IDigestRepository
|
||||
Task<bool> MarkSendingAsync(string tenantId, Guid id, CancellationToken cancellationToken = default);
|
||||
Task<bool> MarkSentAsync(string tenantId, Guid id, CancellationToken cancellationToken = default);
|
||||
Task<int> DeleteOldAsync(DateTimeOffset cutoff, CancellationToken cancellationToken = default);
|
||||
Task<bool> DeleteByKeyAsync(string tenantId, Guid channelId, string recipient, string digestKey, CancellationToken cancellationToken = default);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
using StellaOps.Notify.Storage.Postgres.Models;
|
||||
|
||||
namespace StellaOps.Notify.Storage.Postgres.Repositories;
|
||||
|
||||
/// <summary>
|
||||
/// Repository for distributed locks in the notify schema.
|
||||
/// </summary>
|
||||
public interface ILockRepository
|
||||
{
|
||||
/// <summary>
|
||||
/// Attempts to acquire a lock for the given resource. If the existing lock is expired or already owned by the caller, it is replaced.
|
||||
/// </summary>
|
||||
Task<bool> TryAcquireAsync(string tenantId, string resource, string owner, TimeSpan ttl, CancellationToken cancellationToken = default);
|
||||
|
||||
/// <summary>
|
||||
/// Releases a lock owned by the caller.
|
||||
/// </summary>
|
||||
Task<bool> ReleaseAsync(string tenantId, string resource, string owner, CancellationToken cancellationToken = default);
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using StellaOps.Infrastructure.Postgres.Repositories;
|
||||
using StellaOps.Notify.Storage.Postgres.Models;
|
||||
|
||||
namespace StellaOps.Notify.Storage.Postgres.Repositories;
|
||||
|
||||
public sealed class LockRepository : RepositoryBase<NotifyDataSource>, ILockRepository
|
||||
{
|
||||
public LockRepository(NotifyDataSource dataSource, ILogger<LockRepository> logger)
|
||||
: base(dataSource, logger) { }
|
||||
|
||||
public async Task<bool> TryAcquireAsync(string tenantId, string resource, string owner, TimeSpan ttl, CancellationToken cancellationToken = default)
|
||||
{
|
||||
const string sql = """
|
||||
WITH upsert AS (
|
||||
INSERT INTO notify.locks (id, tenant_id, resource, owner, expires_at)
|
||||
VALUES (gen_random_uuid(), @tenant_id, @resource, @owner, NOW() + @ttl)
|
||||
ON CONFLICT (tenant_id, resource) DO UPDATE SET
|
||||
owner = EXCLUDED.owner,
|
||||
expires_at = EXCLUDED.expires_at
|
||||
WHERE notify.locks.expires_at < NOW() OR notify.locks.owner = EXCLUDED.owner
|
||||
RETURNING 1
|
||||
)
|
||||
SELECT EXISTS(SELECT 1 FROM upsert) AS acquired;
|
||||
""";
|
||||
|
||||
await using var connection = await DataSource.OpenConnectionAsync(tenantId, "writer", cancellationToken).ConfigureAwait(false);
|
||||
await using var command = CreateCommand(sql, connection);
|
||||
AddParameter(command, "tenant_id", tenantId);
|
||||
AddParameter(command, "resource", resource);
|
||||
AddParameter(command, "owner", owner);
|
||||
AddParameter(command, "ttl", ttl);
|
||||
|
||||
var result = await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false);
|
||||
return result is bool acquired && acquired;
|
||||
}
|
||||
|
||||
public async Task<bool> ReleaseAsync(string tenantId, string resource, string owner, CancellationToken cancellationToken = default)
|
||||
{
|
||||
const string sql = "DELETE FROM notify.locks WHERE tenant_id = @tenant_id AND resource = @resource AND owner = @owner";
|
||||
var rows = await ExecuteAsync(
|
||||
tenantId,
|
||||
sql,
|
||||
cmd =>
|
||||
{
|
||||
AddParameter(cmd, "tenant_id", tenantId);
|
||||
AddParameter(cmd, "resource", resource);
|
||||
AddParameter(cmd, "owner", owner);
|
||||
},
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
return rows > 0;
|
||||
}
|
||||
}
|
||||
@@ -40,6 +40,7 @@ public static class ServiceCollectionExtensions
|
||||
services.AddScoped<IInboxRepository, InboxRepository>();
|
||||
services.AddScoped<IIncidentRepository, IncidentRepository>();
|
||||
services.AddScoped<INotifyAuditRepository, NotifyAuditRepository>();
|
||||
services.AddScoped<ILockRepository, LockRepository>();
|
||||
|
||||
return services;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user