sprints work
This commit is contained in:
@@ -189,15 +189,18 @@ public sealed class DefaultBackfillSafetyValidator : IBackfillSafetyValidator
|
||||
{
|
||||
private readonly ISourceValidator _sourceValidator;
|
||||
private readonly IOverlapChecker _overlapChecker;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly BackfillManagerOptions _options;
|
||||
|
||||
public DefaultBackfillSafetyValidator(
|
||||
ISourceValidator sourceValidator,
|
||||
IOverlapChecker overlapChecker,
|
||||
TimeProvider timeProvider,
|
||||
BackfillManagerOptions options)
|
||||
{
|
||||
_sourceValidator = sourceValidator;
|
||||
_overlapChecker = overlapChecker;
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
_options = options;
|
||||
}
|
||||
|
||||
@@ -236,7 +239,7 @@ public sealed class DefaultBackfillSafetyValidator : IBackfillSafetyValidator
|
||||
}
|
||||
|
||||
// Check retention period
|
||||
var retentionLimit = DateTimeOffset.UtcNow - _options.RetentionPeriod;
|
||||
var retentionLimit = _timeProvider.GetUtcNow() - _options.RetentionPeriod;
|
||||
var withinRetention = request.WindowStart >= retentionLimit;
|
||||
if (!withinRetention)
|
||||
{
|
||||
@@ -325,6 +328,7 @@ public sealed class BackfillManager : IBackfillManager
|
||||
private readonly IBackfillSafetyValidator _safetyValidator;
|
||||
private readonly IBackfillEventCounter _eventCounter;
|
||||
private readonly IDuplicateSuppressor _duplicateSuppressor;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly BackfillManagerOptions _options;
|
||||
private readonly ILogger<BackfillManager> _logger;
|
||||
|
||||
@@ -333,6 +337,7 @@ public sealed class BackfillManager : IBackfillManager
|
||||
IBackfillSafetyValidator safetyValidator,
|
||||
IBackfillEventCounter eventCounter,
|
||||
IDuplicateSuppressor duplicateSuppressor,
|
||||
TimeProvider timeProvider,
|
||||
BackfillManagerOptions options,
|
||||
ILogger<BackfillManager> logger)
|
||||
{
|
||||
@@ -340,6 +345,7 @@ public sealed class BackfillManager : IBackfillManager
|
||||
_safetyValidator = safetyValidator;
|
||||
_eventCounter = eventCounter;
|
||||
_duplicateSuppressor = duplicateSuppressor;
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
_options = options;
|
||||
_logger = logger;
|
||||
}
|
||||
@@ -367,6 +373,7 @@ public sealed class BackfillManager : IBackfillManager
|
||||
windowEnd: windowEnd,
|
||||
reason: reason,
|
||||
createdBy: createdBy,
|
||||
timestamp: _timeProvider.GetUtcNow(),
|
||||
batchSize: batchSize,
|
||||
dryRun: dryRun,
|
||||
forceReprocess: forceReprocess,
|
||||
@@ -446,7 +453,7 @@ public sealed class BackfillManager : IBackfillManager
|
||||
// Run safety checks
|
||||
var tempRequest = BackfillRequest.Create(
|
||||
tenantId, sourceId, jobType, windowStart, windowEnd,
|
||||
"preview", "system", batchSize);
|
||||
"preview", "system", _timeProvider.GetUtcNow(), batchSize);
|
||||
|
||||
var safetyChecks = await _safetyValidator.ValidateAsync(
|
||||
tempRequest, estimatedEvents, estimatedDuration, cancellationToken);
|
||||
@@ -473,7 +480,7 @@ public sealed class BackfillManager : IBackfillManager
|
||||
var request = await _backfillRepository.GetByIdAsync(tenantId, backfillId, cancellationToken)
|
||||
?? throw new InvalidOperationException($"Backfill request {backfillId} not found.");
|
||||
|
||||
request = request.Start(updatedBy);
|
||||
request = request.Start(updatedBy, _timeProvider.GetUtcNow());
|
||||
await _backfillRepository.UpdateAsync(request, cancellationToken);
|
||||
|
||||
_logger.LogInformation("Started backfill request {BackfillId}", backfillId);
|
||||
@@ -524,7 +531,7 @@ public sealed class BackfillManager : IBackfillManager
|
||||
var request = await _backfillRepository.GetByIdAsync(tenantId, backfillId, cancellationToken)
|
||||
?? throw new InvalidOperationException($"Backfill request {backfillId} not found.");
|
||||
|
||||
request = request.Cancel(updatedBy);
|
||||
request = request.Cancel(updatedBy, _timeProvider.GetUtcNow());
|
||||
await _backfillRepository.UpdateAsync(request, cancellationToken);
|
||||
|
||||
_logger.LogInformation("Canceled backfill request {BackfillId}", backfillId);
|
||||
|
||||
@@ -90,8 +90,18 @@ public sealed record ProcessedEvent(
|
||||
public sealed class InMemoryDuplicateSuppressor : IDuplicateSuppressor
|
||||
{
|
||||
private readonly Dictionary<string, Dictionary<string, ProcessedEventEntry>> _store = new();
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly object _lock = new();
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new in-memory duplicate suppressor.
|
||||
/// </summary>
|
||||
/// <param name="timeProvider">Time provider for deterministic time.</param>
|
||||
public InMemoryDuplicateSuppressor(TimeProvider? timeProvider = null)
|
||||
{
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
private sealed record ProcessedEventEntry(
|
||||
DateTimeOffset EventTime,
|
||||
DateTimeOffset ProcessedAt,
|
||||
@@ -109,7 +119,7 @@ public sealed class InMemoryDuplicateSuppressor : IDuplicateSuppressor
|
||||
return Task.FromResult(false);
|
||||
|
||||
// Check if expired
|
||||
if (entry.ExpiresAt < DateTimeOffset.UtcNow)
|
||||
if (entry.ExpiresAt < _timeProvider.GetUtcNow())
|
||||
{
|
||||
scopeStore.Remove(eventKey);
|
||||
return Task.FromResult(false);
|
||||
@@ -121,7 +131,7 @@ public sealed class InMemoryDuplicateSuppressor : IDuplicateSuppressor
|
||||
|
||||
public Task<IReadOnlySet<string>> GetProcessedAsync(string scopeKey, IEnumerable<string> eventKeys, CancellationToken cancellationToken)
|
||||
{
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var result = new HashSet<string>();
|
||||
|
||||
lock (_lock)
|
||||
@@ -149,7 +159,7 @@ public sealed class InMemoryDuplicateSuppressor : IDuplicateSuppressor
|
||||
TimeSpan ttl,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var entry = new ProcessedEventEntry(eventTime, now, batchId, now + ttl);
|
||||
|
||||
lock (_lock)
|
||||
@@ -173,7 +183,7 @@ public sealed class InMemoryDuplicateSuppressor : IDuplicateSuppressor
|
||||
TimeSpan ttl,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var expiresAt = now + ttl;
|
||||
|
||||
lock (_lock)
|
||||
@@ -195,7 +205,7 @@ public sealed class InMemoryDuplicateSuppressor : IDuplicateSuppressor
|
||||
|
||||
public Task<long> CountProcessedAsync(string scopeKey, DateTimeOffset from, DateTimeOffset to, CancellationToken cancellationToken)
|
||||
{
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
long count = 0;
|
||||
|
||||
lock (_lock)
|
||||
@@ -212,7 +222,7 @@ public sealed class InMemoryDuplicateSuppressor : IDuplicateSuppressor
|
||||
|
||||
public Task<int> CleanupExpiredAsync(int batchLimit, CancellationToken cancellationToken)
|
||||
{
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var removed = 0;
|
||||
|
||||
lock (_lock)
|
||||
|
||||
@@ -71,19 +71,17 @@ public sealed record EventTimeWindow(
|
||||
/// <summary>
|
||||
/// Creates a window covering the last N hours from now.
|
||||
/// </summary>
|
||||
public static EventTimeWindow LastHours(int hours, DateTimeOffset? now = null)
|
||||
public static EventTimeWindow LastHours(int hours, DateTimeOffset now)
|
||||
{
|
||||
var endTime = now ?? DateTimeOffset.UtcNow;
|
||||
return FromDuration(endTime, TimeSpan.FromHours(hours));
|
||||
return FromDuration(now, TimeSpan.FromHours(hours));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a window covering the last N days from now.
|
||||
/// </summary>
|
||||
public static EventTimeWindow LastDays(int days, DateTimeOffset? now = null)
|
||||
public static EventTimeWindow LastDays(int days, DateTimeOffset now)
|
||||
{
|
||||
var endTime = now ?? DateTimeOffset.UtcNow;
|
||||
return FromDuration(endTime, TimeSpan.FromDays(days));
|
||||
return FromDuration(now, TimeSpan.FromDays(days));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -44,6 +44,7 @@ public sealed record NotificationRule(
|
||||
NotificationChannel channel,
|
||||
string endpoint,
|
||||
string createdBy,
|
||||
DateTimeOffset createdAt,
|
||||
string? jobTypePattern = null,
|
||||
string? errorCodePattern = null,
|
||||
ErrorCategory? category = null,
|
||||
@@ -52,7 +53,6 @@ public sealed record NotificationRule(
|
||||
int maxPerHour = 10,
|
||||
bool aggregate = true)
|
||||
{
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
return new NotificationRule(
|
||||
RuleId: Guid.NewGuid(),
|
||||
TenantId: tenantId,
|
||||
@@ -68,8 +68,8 @@ public sealed record NotificationRule(
|
||||
Aggregate: aggregate,
|
||||
LastNotifiedAt: null,
|
||||
NotificationsSent: 0,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
CreatedAt: createdAt,
|
||||
UpdatedAt: createdAt,
|
||||
CreatedBy: createdBy,
|
||||
UpdatedBy: createdBy);
|
||||
}
|
||||
|
||||
@@ -80,6 +80,7 @@ public sealed record AuditEntry(
|
||||
string actorId,
|
||||
ActorType actorType,
|
||||
string description,
|
||||
DateTimeOffset occurredAt,
|
||||
string? oldState = null,
|
||||
string? newState = null,
|
||||
string? actorIp = null,
|
||||
@@ -94,7 +95,6 @@ public sealed record AuditEntry(
|
||||
ArgumentNullException.ThrowIfNull(hasher);
|
||||
|
||||
var entryId = Guid.NewGuid();
|
||||
var occurredAt = DateTimeOffset.UtcNow;
|
||||
|
||||
// Compute canonical hash from immutable content
|
||||
// Use the same property names and fields as VerifyIntegrity to keep the hash stable.
|
||||
|
||||
@@ -113,6 +113,7 @@ public sealed record BackfillRequest(
|
||||
DateTimeOffset windowEnd,
|
||||
string reason,
|
||||
string createdBy,
|
||||
DateTimeOffset timestamp,
|
||||
int batchSize = 100,
|
||||
bool dryRun = false,
|
||||
bool forceReprocess = false,
|
||||
@@ -133,7 +134,6 @@ public sealed record BackfillRequest(
|
||||
_ => throw new ArgumentException("Either sourceId or jobType must be specified.")
|
||||
};
|
||||
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
return new BackfillRequest(
|
||||
BackfillId: Guid.NewGuid(),
|
||||
TenantId: tenantId,
|
||||
@@ -156,7 +156,7 @@ public sealed record BackfillRequest(
|
||||
SafetyChecks: null,
|
||||
Reason: reason,
|
||||
Ticket: ticket,
|
||||
CreatedAt: now,
|
||||
CreatedAt: timestamp,
|
||||
StartedAt: null,
|
||||
CompletedAt: null,
|
||||
CreatedBy: createdBy,
|
||||
@@ -196,7 +196,7 @@ public sealed record BackfillRequest(
|
||||
/// <summary>
|
||||
/// Transitions to running status.
|
||||
/// </summary>
|
||||
public BackfillRequest Start(string updatedBy)
|
||||
public BackfillRequest Start(string updatedBy, DateTimeOffset timestamp)
|
||||
{
|
||||
if (Status != BackfillStatus.Validating)
|
||||
throw new InvalidOperationException($"Cannot start from status {Status}.");
|
||||
@@ -207,7 +207,7 @@ public sealed record BackfillRequest(
|
||||
return this with
|
||||
{
|
||||
Status = BackfillStatus.Running,
|
||||
StartedAt = DateTimeOffset.UtcNow,
|
||||
StartedAt = timestamp,
|
||||
CurrentPosition = WindowStart,
|
||||
UpdatedBy = updatedBy
|
||||
};
|
||||
@@ -269,7 +269,7 @@ public sealed record BackfillRequest(
|
||||
/// <summary>
|
||||
/// Completes the backfill successfully.
|
||||
/// </summary>
|
||||
public BackfillRequest Complete(string updatedBy)
|
||||
public BackfillRequest Complete(string updatedBy, DateTimeOffset timestamp)
|
||||
{
|
||||
if (Status != BackfillStatus.Running)
|
||||
throw new InvalidOperationException($"Cannot complete from status {Status}.");
|
||||
@@ -277,7 +277,7 @@ public sealed record BackfillRequest(
|
||||
return this with
|
||||
{
|
||||
Status = BackfillStatus.Completed,
|
||||
CompletedAt = DateTimeOffset.UtcNow,
|
||||
CompletedAt = timestamp,
|
||||
CurrentPosition = WindowEnd,
|
||||
UpdatedBy = updatedBy
|
||||
};
|
||||
@@ -286,12 +286,12 @@ public sealed record BackfillRequest(
|
||||
/// <summary>
|
||||
/// Fails the backfill with an error.
|
||||
/// </summary>
|
||||
public BackfillRequest Fail(string error, string updatedBy)
|
||||
public BackfillRequest Fail(string error, string updatedBy, DateTimeOffset timestamp)
|
||||
{
|
||||
return this with
|
||||
{
|
||||
Status = BackfillStatus.Failed,
|
||||
CompletedAt = DateTimeOffset.UtcNow,
|
||||
CompletedAt = timestamp,
|
||||
ErrorMessage = error,
|
||||
UpdatedBy = updatedBy
|
||||
};
|
||||
@@ -300,7 +300,7 @@ public sealed record BackfillRequest(
|
||||
/// <summary>
|
||||
/// Cancels the backfill.
|
||||
/// </summary>
|
||||
public BackfillRequest Cancel(string updatedBy)
|
||||
public BackfillRequest Cancel(string updatedBy, DateTimeOffset timestamp)
|
||||
{
|
||||
if (IsTerminal)
|
||||
throw new InvalidOperationException($"Cannot cancel from terminal status {Status}.");
|
||||
@@ -308,7 +308,7 @@ public sealed record BackfillRequest(
|
||||
return this with
|
||||
{
|
||||
Status = BackfillStatus.Canceled,
|
||||
CompletedAt = DateTimeOffset.UtcNow,
|
||||
CompletedAt = timestamp,
|
||||
UpdatedBy = updatedBy
|
||||
};
|
||||
}
|
||||
|
||||
@@ -58,6 +58,7 @@ public sealed record EventEnvelope(
|
||||
OrchestratorEventType eventType,
|
||||
string tenantId,
|
||||
EventActor actor,
|
||||
DateTimeOffset occurredAt,
|
||||
string? correlationId = null,
|
||||
string? projectId = null,
|
||||
EventJob? job = null,
|
||||
@@ -65,14 +66,14 @@ public sealed record EventEnvelope(
|
||||
EventNotifier? notifier = null,
|
||||
JsonElement? payload = null)
|
||||
{
|
||||
var eventId = GenerateEventId();
|
||||
var eventId = GenerateEventId(occurredAt);
|
||||
var idempotencyKey = GenerateIdempotencyKey(eventType, job?.Id, job?.Attempt ?? 0);
|
||||
|
||||
return new EventEnvelope(
|
||||
SchemaVersion: CurrentSchemaVersion,
|
||||
EventId: eventId,
|
||||
EventType: eventType,
|
||||
OccurredAt: DateTimeOffset.UtcNow,
|
||||
OccurredAt: occurredAt,
|
||||
IdempotencyKey: idempotencyKey,
|
||||
CorrelationId: correlationId,
|
||||
TenantId: tenantId,
|
||||
@@ -90,6 +91,7 @@ public sealed record EventEnvelope(
|
||||
string tenantId,
|
||||
EventActor actor,
|
||||
EventJob job,
|
||||
DateTimeOffset occurredAt,
|
||||
string? correlationId = null,
|
||||
string? projectId = null,
|
||||
EventMetrics? metrics = null,
|
||||
@@ -99,6 +101,7 @@ public sealed record EventEnvelope(
|
||||
eventType: eventType,
|
||||
tenantId: tenantId,
|
||||
actor: actor,
|
||||
occurredAt: occurredAt,
|
||||
correlationId: correlationId,
|
||||
projectId: projectId,
|
||||
job: job,
|
||||
@@ -112,6 +115,7 @@ public sealed record EventEnvelope(
|
||||
string tenantId,
|
||||
EventActor actor,
|
||||
EventJob exportJob,
|
||||
DateTimeOffset occurredAt,
|
||||
string? correlationId = null,
|
||||
string? projectId = null,
|
||||
EventMetrics? metrics = null,
|
||||
@@ -122,6 +126,7 @@ public sealed record EventEnvelope(
|
||||
tenantId: tenantId,
|
||||
actor: actor,
|
||||
job: exportJob,
|
||||
occurredAt: occurredAt,
|
||||
correlationId: correlationId,
|
||||
projectId: projectId,
|
||||
metrics: metrics,
|
||||
@@ -133,6 +138,7 @@ public sealed record EventEnvelope(
|
||||
OrchestratorEventType eventType,
|
||||
string tenantId,
|
||||
EventActor actor,
|
||||
DateTimeOffset occurredAt,
|
||||
string? correlationId = null,
|
||||
string? projectId = null,
|
||||
JsonElement? payload = null)
|
||||
@@ -141,18 +147,19 @@ public sealed record EventEnvelope(
|
||||
eventType: eventType,
|
||||
tenantId: tenantId,
|
||||
actor: actor,
|
||||
occurredAt: occurredAt,
|
||||
correlationId: correlationId,
|
||||
projectId: projectId,
|
||||
payload: payload);
|
||||
}
|
||||
|
||||
/// <summary>Generates a UUIDv7-style event ID.</summary>
|
||||
private static string GenerateEventId()
|
||||
private static string GenerateEventId(DateTimeOffset timestamp)
|
||||
{
|
||||
// UUIDv7: timestamp-based with random suffix
|
||||
var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
||||
var timestampMs = timestamp.ToUnixTimeMilliseconds();
|
||||
var random = Guid.NewGuid().ToString("N")[..16];
|
||||
return $"urn:orch:event:{timestamp:x}-{random}";
|
||||
return $"urn:orch:event:{timestampMs:x}-{random}";
|
||||
}
|
||||
|
||||
/// <summary>Generates an idempotency key for deduplication.</summary>
|
||||
|
||||
@@ -188,8 +188,15 @@ public sealed class NullEventPublisher : IEventPublisher
|
||||
public sealed class InMemoryIdempotencyStore : IIdempotencyStore
|
||||
{
|
||||
private readonly Dictionary<string, DateTimeOffset> _keys = new();
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly object _lock = new();
|
||||
|
||||
/// <summary>Creates a new in-memory idempotency store.</summary>
|
||||
public InMemoryIdempotencyStore(TimeProvider? timeProvider = null)
|
||||
{
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
public Task<bool> TryMarkAsync(string key, TimeSpan ttl, CancellationToken cancellationToken = default)
|
||||
{
|
||||
lock (_lock)
|
||||
@@ -198,7 +205,7 @@ public sealed class InMemoryIdempotencyStore : IIdempotencyStore
|
||||
if (_keys.ContainsKey(key))
|
||||
return Task.FromResult(false);
|
||||
|
||||
_keys[key] = DateTimeOffset.UtcNow.Add(ttl);
|
||||
_keys[key] = _timeProvider.GetUtcNow().Add(ttl);
|
||||
return Task.FromResult(true);
|
||||
}
|
||||
}
|
||||
@@ -223,7 +230,7 @@ public sealed class InMemoryIdempotencyStore : IIdempotencyStore
|
||||
|
||||
private void CleanupExpired()
|
||||
{
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var expired = _keys.Where(kv => kv.Value <= now).Select(kv => kv.Key).ToList();
|
||||
foreach (var key in expired)
|
||||
{
|
||||
|
||||
@@ -273,10 +273,10 @@ public sealed record ExportDistribution(
|
||||
}
|
||||
|
||||
/// <summary>Creates a download URL with expiration.</summary>
|
||||
public ExportDistribution WithDownloadUrl(string url, TimeSpan validity) => this with
|
||||
public ExportDistribution WithDownloadUrl(string url, TimeSpan validity, DateTimeOffset timestamp) => this with
|
||||
{
|
||||
DownloadUrl = url,
|
||||
DownloadUrlExpiresAt = DateTimeOffset.UtcNow.Add(validity)
|
||||
DownloadUrlExpiresAt = timestamp.Add(validity)
|
||||
};
|
||||
|
||||
/// <summary>Adds a replication target.</summary>
|
||||
@@ -432,29 +432,29 @@ public sealed record ExportRetention(
|
||||
ExtensionCount: 0,
|
||||
Metadata: null);
|
||||
|
||||
/// <summary>Whether the export is expired.</summary>
|
||||
public bool IsExpired => ExpiresAt.HasValue && DateTimeOffset.UtcNow >= ExpiresAt.Value && !LegalHold;
|
||||
/// <summary>Whether the export is expired at the given timestamp.</summary>
|
||||
public bool IsExpiredAt(DateTimeOffset timestamp) => ExpiresAt.HasValue && timestamp >= ExpiresAt.Value && !LegalHold;
|
||||
|
||||
/// <summary>Whether the export should be archived.</summary>
|
||||
public bool ShouldArchive => ArchiveAt.HasValue && DateTimeOffset.UtcNow >= ArchiveAt.Value && !ArchivedAt.HasValue;
|
||||
/// <summary>Whether the export should be archived at the given timestamp.</summary>
|
||||
public bool ShouldArchiveAt(DateTimeOffset timestamp) => ArchiveAt.HasValue && timestamp >= ArchiveAt.Value && !ArchivedAt.HasValue;
|
||||
|
||||
/// <summary>Whether the export can be deleted.</summary>
|
||||
public bool CanDelete => IsExpired && (!RequiresRelease || ReleasedAt.HasValue) && !LegalHold;
|
||||
/// <summary>Whether the export can be deleted at the given timestamp.</summary>
|
||||
public bool CanDeleteAt(DateTimeOffset timestamp) => IsExpiredAt(timestamp) && (!RequiresRelease || ReleasedAt.HasValue) && !LegalHold;
|
||||
|
||||
/// <summary>Extends the retention period.</summary>
|
||||
public ExportRetention ExtendRetention(TimeSpan extension, string? reason = null)
|
||||
public ExportRetention ExtendRetention(TimeSpan extension, DateTimeOffset timestamp, string? reason = null)
|
||||
{
|
||||
var metadata = Metadata is null
|
||||
? new Dictionary<string, string>()
|
||||
: new Dictionary<string, string>(Metadata);
|
||||
|
||||
metadata[$"extension_{ExtensionCount + 1}_at"] = DateTimeOffset.UtcNow.ToString("o");
|
||||
metadata[$"extension_{ExtensionCount + 1}_at"] = timestamp.ToString("o");
|
||||
if (reason is not null)
|
||||
metadata[$"extension_{ExtensionCount + 1}_reason"] = reason;
|
||||
|
||||
return this with
|
||||
{
|
||||
ExpiresAt = (ExpiresAt ?? DateTimeOffset.UtcNow).Add(extension),
|
||||
ExpiresAt = (ExpiresAt ?? timestamp).Add(extension),
|
||||
ArchiveAt = ArchiveAt?.Add(extension),
|
||||
ExtensionCount = ExtensionCount + 1,
|
||||
Metadata = metadata
|
||||
@@ -476,22 +476,22 @@ public sealed record ExportRetention(
|
||||
};
|
||||
|
||||
/// <summary>Releases the export for deletion.</summary>
|
||||
public ExportRetention Release(string releasedBy) => this with
|
||||
public ExportRetention Release(string releasedBy, DateTimeOffset timestamp) => this with
|
||||
{
|
||||
ReleasedBy = releasedBy,
|
||||
ReleasedAt = DateTimeOffset.UtcNow
|
||||
ReleasedAt = timestamp
|
||||
};
|
||||
|
||||
/// <summary>Marks the export as archived.</summary>
|
||||
public ExportRetention MarkArchived() => this with
|
||||
public ExportRetention MarkArchived(DateTimeOffset timestamp) => this with
|
||||
{
|
||||
ArchivedAt = DateTimeOffset.UtcNow
|
||||
ArchivedAt = timestamp
|
||||
};
|
||||
|
||||
/// <summary>Marks the export as deleted.</summary>
|
||||
public ExportRetention MarkDeleted() => this with
|
||||
public ExportRetention MarkDeleted(DateTimeOffset timestamp) => this with
|
||||
{
|
||||
DeletedAt = DateTimeOffset.UtcNow
|
||||
DeletedAt = timestamp
|
||||
};
|
||||
|
||||
/// <summary>Serializes retention to JSON.</summary>
|
||||
|
||||
@@ -127,6 +127,18 @@ public static class ExportJobPolicy
|
||||
string tenantId,
|
||||
string? jobType = null,
|
||||
string createdBy = "system")
|
||||
{
|
||||
throw new NotImplementedException("ExportJobPolicy.CreateDefaultQuota requires a timestamp parameter for deterministic behavior. Use the overload with DateTimeOffset now parameter.");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a default quota for export jobs with explicit timestamp.
|
||||
/// </summary>
|
||||
public static Quota CreateDefaultQuota(
|
||||
string tenantId,
|
||||
DateTimeOffset now,
|
||||
string? jobType = null,
|
||||
string createdBy = "system")
|
||||
{
|
||||
var rateLimit = jobType is not null && ExportJobTypes.IsExportJob(jobType)
|
||||
? RateLimits.GetForJobType(jobType)
|
||||
@@ -135,8 +147,6 @@ public static class ExportJobPolicy
|
||||
QuotaDefaults.MaxPerHour,
|
||||
QuotaDefaults.DefaultLeaseSeconds);
|
||||
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
|
||||
return new Quota(
|
||||
QuotaId: Guid.NewGuid(),
|
||||
TenantId: tenantId,
|
||||
|
||||
@@ -87,6 +87,7 @@ public sealed record ExportSchedule(
|
||||
string cronExpression,
|
||||
ExportJobPayload payloadTemplate,
|
||||
string createdBy,
|
||||
DateTimeOffset timestamp,
|
||||
string? description = null,
|
||||
string timezone = "UTC",
|
||||
string retentionPolicy = "default",
|
||||
@@ -94,8 +95,6 @@ public sealed record ExportSchedule(
|
||||
int maxConcurrent = 1,
|
||||
bool skipIfRunning = true)
|
||||
{
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
|
||||
return new ExportSchedule(
|
||||
ScheduleId: Guid.NewGuid(),
|
||||
TenantId: tenantId,
|
||||
@@ -117,8 +116,8 @@ public sealed record ExportSchedule(
|
||||
TotalRuns: 0,
|
||||
SuccessfulRuns: 0,
|
||||
FailedRuns: 0,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
CreatedAt: timestamp,
|
||||
UpdatedAt: timestamp,
|
||||
CreatedBy: createdBy,
|
||||
UpdatedBy: createdBy);
|
||||
}
|
||||
@@ -129,63 +128,63 @@ public sealed record ExportSchedule(
|
||||
: 0;
|
||||
|
||||
/// <summary>Enables the schedule.</summary>
|
||||
public ExportSchedule Enable() => this with
|
||||
public ExportSchedule Enable(DateTimeOffset timestamp) => this with
|
||||
{
|
||||
Enabled = true,
|
||||
UpdatedAt = DateTimeOffset.UtcNow
|
||||
UpdatedAt = timestamp
|
||||
};
|
||||
|
||||
/// <summary>Disables the schedule.</summary>
|
||||
public ExportSchedule Disable() => this with
|
||||
public ExportSchedule Disable(DateTimeOffset timestamp) => this with
|
||||
{
|
||||
Enabled = false,
|
||||
UpdatedAt = DateTimeOffset.UtcNow
|
||||
UpdatedAt = timestamp
|
||||
};
|
||||
|
||||
/// <summary>Records a successful run.</summary>
|
||||
public ExportSchedule RecordSuccess(Guid jobId, DateTimeOffset? nextRun = null) => this with
|
||||
public ExportSchedule RecordSuccess(Guid jobId, DateTimeOffset timestamp, DateTimeOffset? nextRun = null) => this with
|
||||
{
|
||||
LastRunAt = DateTimeOffset.UtcNow,
|
||||
LastRunAt = timestamp,
|
||||
LastJobId = jobId,
|
||||
LastRunStatus = "completed",
|
||||
NextRunAt = nextRun,
|
||||
TotalRuns = TotalRuns + 1,
|
||||
SuccessfulRuns = SuccessfulRuns + 1,
|
||||
UpdatedAt = DateTimeOffset.UtcNow
|
||||
UpdatedAt = timestamp
|
||||
};
|
||||
|
||||
/// <summary>Records a failed run.</summary>
|
||||
public ExportSchedule RecordFailure(Guid jobId, string? reason = null, DateTimeOffset? nextRun = null) => this with
|
||||
public ExportSchedule RecordFailure(Guid jobId, DateTimeOffset timestamp, string? reason = null, DateTimeOffset? nextRun = null) => this with
|
||||
{
|
||||
LastRunAt = DateTimeOffset.UtcNow,
|
||||
LastRunAt = timestamp,
|
||||
LastJobId = jobId,
|
||||
LastRunStatus = $"failed: {reason ?? "unknown"}",
|
||||
NextRunAt = nextRun,
|
||||
TotalRuns = TotalRuns + 1,
|
||||
FailedRuns = FailedRuns + 1,
|
||||
UpdatedAt = DateTimeOffset.UtcNow
|
||||
UpdatedAt = timestamp
|
||||
};
|
||||
|
||||
/// <summary>Updates the next run time.</summary>
|
||||
public ExportSchedule WithNextRun(DateTimeOffset nextRun) => this with
|
||||
public ExportSchedule WithNextRun(DateTimeOffset nextRun, DateTimeOffset timestamp) => this with
|
||||
{
|
||||
NextRunAt = nextRun,
|
||||
UpdatedAt = DateTimeOffset.UtcNow
|
||||
UpdatedAt = timestamp
|
||||
};
|
||||
|
||||
/// <summary>Updates the cron expression.</summary>
|
||||
public ExportSchedule WithCron(string cronExpression, string updatedBy) => this with
|
||||
public ExportSchedule WithCron(string cronExpression, string updatedBy, DateTimeOffset timestamp) => this with
|
||||
{
|
||||
CronExpression = cronExpression,
|
||||
UpdatedAt = DateTimeOffset.UtcNow,
|
||||
UpdatedAt = timestamp,
|
||||
UpdatedBy = updatedBy
|
||||
};
|
||||
|
||||
/// <summary>Updates the payload template.</summary>
|
||||
public ExportSchedule WithPayload(ExportJobPayload payload, string updatedBy) => this with
|
||||
public ExportSchedule WithPayload(ExportJobPayload payload, string updatedBy, DateTimeOffset timestamp) => this with
|
||||
{
|
||||
PayloadTemplate = payload,
|
||||
UpdatedAt = DateTimeOffset.UtcNow,
|
||||
UpdatedAt = timestamp,
|
||||
UpdatedBy = updatedBy
|
||||
};
|
||||
}
|
||||
@@ -247,13 +246,12 @@ public sealed record RetentionPruneConfig(
|
||||
|
||||
/// <summary>Creates a default prune configuration.</summary>
|
||||
public static RetentionPruneConfig Create(
|
||||
DateTimeOffset timestamp,
|
||||
string? tenantId = null,
|
||||
string? exportType = null,
|
||||
string? cronExpression = null,
|
||||
int batchSize = DefaultBatchSize)
|
||||
{
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
|
||||
return new RetentionPruneConfig(
|
||||
PruneId: Guid.NewGuid(),
|
||||
TenantId: tenantId,
|
||||
@@ -268,17 +266,17 @@ public sealed record RetentionPruneConfig(
|
||||
LastPruneAt: null,
|
||||
LastPruneCount: 0,
|
||||
TotalPruned: 0,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now);
|
||||
CreatedAt: timestamp,
|
||||
UpdatedAt: timestamp);
|
||||
}
|
||||
|
||||
/// <summary>Records a prune operation.</summary>
|
||||
public RetentionPruneConfig RecordPrune(int count) => this with
|
||||
public RetentionPruneConfig RecordPrune(int count, DateTimeOffset timestamp) => this with
|
||||
{
|
||||
LastPruneAt = DateTimeOffset.UtcNow,
|
||||
LastPruneAt = timestamp,
|
||||
LastPruneCount = count,
|
||||
TotalPruned = TotalPruned + count,
|
||||
UpdatedAt = DateTimeOffset.UtcNow
|
||||
UpdatedAt = timestamp
|
||||
};
|
||||
}
|
||||
|
||||
@@ -335,13 +333,12 @@ public sealed record ExportAlertConfig(
|
||||
public static ExportAlertConfig Create(
|
||||
string tenantId,
|
||||
string name,
|
||||
DateTimeOffset timestamp,
|
||||
string? exportType = null,
|
||||
int consecutiveFailuresThreshold = 3,
|
||||
double failureRateThreshold = 50.0,
|
||||
ExportAlertSeverity severity = ExportAlertSeverity.Warning)
|
||||
{
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
|
||||
return new ExportAlertConfig(
|
||||
AlertConfigId: Guid.NewGuid(),
|
||||
TenantId: tenantId,
|
||||
@@ -356,20 +353,20 @@ public sealed record ExportAlertConfig(
|
||||
Cooldown: TimeSpan.FromMinutes(15),
|
||||
LastAlertAt: null,
|
||||
TotalAlerts: 0,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now);
|
||||
CreatedAt: timestamp,
|
||||
UpdatedAt: timestamp);
|
||||
}
|
||||
|
||||
/// <summary>Whether an alert can be triggered (respects cooldown).</summary>
|
||||
public bool CanAlert => !LastAlertAt.HasValue ||
|
||||
DateTimeOffset.UtcNow >= LastAlertAt.Value.Add(Cooldown);
|
||||
public bool CanAlertAt(DateTimeOffset timestamp) => !LastAlertAt.HasValue ||
|
||||
timestamp >= LastAlertAt.Value.Add(Cooldown);
|
||||
|
||||
/// <summary>Records an alert.</summary>
|
||||
public ExportAlertConfig RecordAlert() => this with
|
||||
public ExportAlertConfig RecordAlert(DateTimeOffset timestamp) => this with
|
||||
{
|
||||
LastAlertAt = DateTimeOffset.UtcNow,
|
||||
LastAlertAt = timestamp,
|
||||
TotalAlerts = TotalAlerts + 1,
|
||||
UpdatedAt = DateTimeOffset.UtcNow
|
||||
UpdatedAt = timestamp
|
||||
};
|
||||
}
|
||||
|
||||
@@ -444,7 +441,8 @@ public sealed record ExportAlert(
|
||||
string exportType,
|
||||
ExportAlertSeverity severity,
|
||||
IReadOnlyList<Guid> failedJobIds,
|
||||
int consecutiveFailures)
|
||||
int consecutiveFailures,
|
||||
DateTimeOffset timestamp)
|
||||
{
|
||||
return new ExportAlert(
|
||||
AlertId: Guid.NewGuid(),
|
||||
@@ -456,7 +454,7 @@ public sealed record ExportAlert(
|
||||
FailedJobIds: failedJobIds,
|
||||
ConsecutiveFailures: consecutiveFailures,
|
||||
FailureRate: 0,
|
||||
TriggeredAt: DateTimeOffset.UtcNow,
|
||||
TriggeredAt: timestamp,
|
||||
AcknowledgedAt: null,
|
||||
AcknowledgedBy: null,
|
||||
ResolvedAt: null,
|
||||
@@ -470,7 +468,8 @@ public sealed record ExportAlert(
|
||||
string exportType,
|
||||
ExportAlertSeverity severity,
|
||||
double failureRate,
|
||||
IReadOnlyList<Guid> recentFailedJobIds)
|
||||
IReadOnlyList<Guid> recentFailedJobIds,
|
||||
DateTimeOffset timestamp)
|
||||
{
|
||||
return new ExportAlert(
|
||||
AlertId: Guid.NewGuid(),
|
||||
@@ -482,7 +481,7 @@ public sealed record ExportAlert(
|
||||
FailedJobIds: recentFailedJobIds,
|
||||
ConsecutiveFailures: 0,
|
||||
FailureRate: failureRate,
|
||||
TriggeredAt: DateTimeOffset.UtcNow,
|
||||
TriggeredAt: timestamp,
|
||||
AcknowledgedAt: null,
|
||||
AcknowledgedBy: null,
|
||||
ResolvedAt: null,
|
||||
@@ -490,16 +489,16 @@ public sealed record ExportAlert(
|
||||
}
|
||||
|
||||
/// <summary>Acknowledges the alert.</summary>
|
||||
public ExportAlert Acknowledge(string acknowledgedBy) => this with
|
||||
public ExportAlert Acknowledge(string acknowledgedBy, DateTimeOffset timestamp) => this with
|
||||
{
|
||||
AcknowledgedAt = DateTimeOffset.UtcNow,
|
||||
AcknowledgedAt = timestamp,
|
||||
AcknowledgedBy = acknowledgedBy
|
||||
};
|
||||
|
||||
/// <summary>Resolves the alert.</summary>
|
||||
public ExportAlert Resolve(string? notes = null) => this with
|
||||
public ExportAlert Resolve(DateTimeOffset timestamp, string? notes = null) => this with
|
||||
{
|
||||
ResolvedAt = DateTimeOffset.UtcNow,
|
||||
ResolvedAt = timestamp,
|
||||
ResolutionNotes = notes
|
||||
};
|
||||
|
||||
|
||||
@@ -246,17 +246,20 @@ public sealed class MirrorOperationRecorder : IMirrorOperationRecorder
|
||||
private readonly ITimelineEventEmitter _timelineEmitter;
|
||||
private readonly IJobCapsuleGenerator _capsuleGenerator;
|
||||
private readonly IMirrorEvidenceStore _evidenceStore;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly ILogger<MirrorOperationRecorder> _logger;
|
||||
|
||||
public MirrorOperationRecorder(
|
||||
ITimelineEventEmitter timelineEmitter,
|
||||
IJobCapsuleGenerator capsuleGenerator,
|
||||
IMirrorEvidenceStore evidenceStore,
|
||||
TimeProvider timeProvider,
|
||||
ILogger<MirrorOperationRecorder> logger)
|
||||
{
|
||||
_timelineEmitter = timelineEmitter ?? throw new ArgumentNullException(nameof(timelineEmitter));
|
||||
_capsuleGenerator = capsuleGenerator ?? throw new ArgumentNullException(nameof(capsuleGenerator));
|
||||
_evidenceStore = evidenceStore ?? throw new ArgumentNullException(nameof(evidenceStore));
|
||||
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
@@ -357,6 +360,7 @@ public sealed class MirrorOperationRecorder : IMirrorOperationRecorder
|
||||
try
|
||||
{
|
||||
// Create evidence entry
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var evidence = new MirrorOperationEvidence(
|
||||
OperationId: context.OperationId,
|
||||
OperationType: MirrorOperationType.BundleExport,
|
||||
@@ -364,8 +368,8 @@ public sealed class MirrorOperationRecorder : IMirrorOperationRecorder
|
||||
ProjectId: context.ProjectId,
|
||||
JobId: context.JobId,
|
||||
Status: MirrorOperationStatus.Completed,
|
||||
StartedAt: DateTimeOffset.UtcNow.AddSeconds(-result.DurationSeconds),
|
||||
CompletedAt: DateTimeOffset.UtcNow,
|
||||
StartedAt: now.AddSeconds(-result.DurationSeconds),
|
||||
CompletedAt: now,
|
||||
SourceEnvironment: context.SourceEnvironment,
|
||||
TargetEnvironment: context.TargetEnvironment,
|
||||
BundleDigest: result.BundleDigest,
|
||||
@@ -471,6 +475,7 @@ public sealed class MirrorOperationRecorder : IMirrorOperationRecorder
|
||||
{
|
||||
try
|
||||
{
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var evidence = new MirrorOperationEvidence(
|
||||
OperationId: context.OperationId,
|
||||
OperationType: MirrorOperationType.BundleExport,
|
||||
@@ -478,8 +483,8 @@ public sealed class MirrorOperationRecorder : IMirrorOperationRecorder
|
||||
ProjectId: context.ProjectId,
|
||||
JobId: context.JobId,
|
||||
Status: MirrorOperationStatus.Failed,
|
||||
StartedAt: DateTimeOffset.UtcNow,
|
||||
CompletedAt: DateTimeOffset.UtcNow,
|
||||
StartedAt: now,
|
||||
CompletedAt: now,
|
||||
SourceEnvironment: context.SourceEnvironment,
|
||||
TargetEnvironment: context.TargetEnvironment,
|
||||
BundleDigest: null,
|
||||
@@ -620,6 +625,7 @@ public sealed class MirrorOperationRecorder : IMirrorOperationRecorder
|
||||
{
|
||||
try
|
||||
{
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var evidence = new MirrorOperationEvidence(
|
||||
OperationId: context.OperationId,
|
||||
OperationType: MirrorOperationType.BundleImport,
|
||||
@@ -627,8 +633,8 @@ public sealed class MirrorOperationRecorder : IMirrorOperationRecorder
|
||||
ProjectId: context.ProjectId,
|
||||
JobId: context.JobId,
|
||||
Status: MirrorOperationStatus.Completed,
|
||||
StartedAt: DateTimeOffset.UtcNow.AddSeconds(-result.DurationSeconds),
|
||||
CompletedAt: DateTimeOffset.UtcNow,
|
||||
StartedAt: now.AddSeconds(-result.DurationSeconds),
|
||||
CompletedAt: now,
|
||||
SourceEnvironment: result.Provenance.SourceEnvironment,
|
||||
TargetEnvironment: context.TargetEnvironment,
|
||||
BundleDigest: result.Provenance.BundleDigest,
|
||||
@@ -693,6 +699,7 @@ public sealed class MirrorOperationRecorder : IMirrorOperationRecorder
|
||||
{
|
||||
try
|
||||
{
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var evidence = new MirrorOperationEvidence(
|
||||
OperationId: context.OperationId,
|
||||
OperationType: MirrorOperationType.BundleImport,
|
||||
@@ -700,8 +707,8 @@ public sealed class MirrorOperationRecorder : IMirrorOperationRecorder
|
||||
ProjectId: context.ProjectId,
|
||||
JobId: context.JobId,
|
||||
Status: MirrorOperationStatus.Failed,
|
||||
StartedAt: DateTimeOffset.UtcNow,
|
||||
CompletedAt: DateTimeOffset.UtcNow,
|
||||
StartedAt: now,
|
||||
CompletedAt: now,
|
||||
SourceEnvironment: context.SourceEnvironment,
|
||||
TargetEnvironment: context.TargetEnvironment,
|
||||
BundleDigest: null,
|
||||
|
||||
@@ -45,7 +45,10 @@ public sealed record Pack(
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(displayName);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(createdBy);
|
||||
|
||||
var now = createdAt ?? DateTimeOffset.UtcNow;
|
||||
if (createdAt is null)
|
||||
throw new ArgumentNullException(nameof(createdAt), "createdAt must be provided for deterministic behavior.");
|
||||
|
||||
var now = createdAt.Value;
|
||||
|
||||
return new Pack(
|
||||
PackId: packId,
|
||||
@@ -96,15 +99,14 @@ public sealed record Pack(
|
||||
/// <summary>
|
||||
/// Creates a copy with updated status.
|
||||
/// </summary>
|
||||
public Pack WithStatus(PackStatus newStatus, string updatedBy, DateTimeOffset? updatedAt = null)
|
||||
public Pack WithStatus(PackStatus newStatus, string updatedBy, DateTimeOffset updatedAt)
|
||||
{
|
||||
var now = updatedAt ?? DateTimeOffset.UtcNow;
|
||||
return this with
|
||||
{
|
||||
Status = newStatus,
|
||||
UpdatedAt = now,
|
||||
UpdatedAt = updatedAt,
|
||||
UpdatedBy = updatedBy,
|
||||
PublishedAt = newStatus == PackStatus.Published ? now : PublishedAt,
|
||||
PublishedAt = newStatus == PackStatus.Published ? updatedAt : PublishedAt,
|
||||
PublishedBy = newStatus == PackStatus.Published ? updatedBy : PublishedBy
|
||||
};
|
||||
}
|
||||
@@ -112,14 +114,13 @@ public sealed record Pack(
|
||||
/// <summary>
|
||||
/// Creates a copy with incremented version count.
|
||||
/// </summary>
|
||||
public Pack WithVersionAdded(string version, string updatedBy, DateTimeOffset? updatedAt = null)
|
||||
public Pack WithVersionAdded(string version, string updatedBy, DateTimeOffset updatedAt)
|
||||
{
|
||||
var now = updatedAt ?? DateTimeOffset.UtcNow;
|
||||
return this with
|
||||
{
|
||||
VersionCount = VersionCount + 1,
|
||||
LatestVersion = version,
|
||||
UpdatedAt = now,
|
||||
UpdatedAt = updatedAt,
|
||||
UpdatedBy = updatedBy
|
||||
};
|
||||
}
|
||||
@@ -215,7 +216,10 @@ public sealed record PackVersion(
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(artifactDigest);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(createdBy);
|
||||
|
||||
var now = createdAt ?? DateTimeOffset.UtcNow;
|
||||
if (createdAt is null)
|
||||
throw new ArgumentNullException(nameof(createdAt), "createdAt must be provided for deterministic behavior.");
|
||||
|
||||
var now = createdAt.Value;
|
||||
|
||||
return new PackVersion(
|
||||
PackVersionId: packVersionId,
|
||||
@@ -278,15 +282,14 @@ public sealed record PackVersion(
|
||||
/// <summary>
|
||||
/// Creates a copy with updated status.
|
||||
/// </summary>
|
||||
public PackVersion WithStatus(PackVersionStatus newStatus, string updatedBy, DateTimeOffset? updatedAt = null)
|
||||
public PackVersion WithStatus(PackVersionStatus newStatus, string updatedBy, DateTimeOffset updatedAt)
|
||||
{
|
||||
var now = updatedAt ?? DateTimeOffset.UtcNow;
|
||||
return this with
|
||||
{
|
||||
Status = newStatus,
|
||||
UpdatedAt = now,
|
||||
UpdatedAt = updatedAt,
|
||||
UpdatedBy = updatedBy,
|
||||
PublishedAt = newStatus == PackVersionStatus.Published ? now : PublishedAt,
|
||||
PublishedAt = newStatus == PackVersionStatus.Published ? updatedAt : PublishedAt,
|
||||
PublishedBy = newStatus == PackVersionStatus.Published ? updatedBy : PublishedBy
|
||||
};
|
||||
}
|
||||
@@ -294,15 +297,14 @@ public sealed record PackVersion(
|
||||
/// <summary>
|
||||
/// Creates a copy with deprecation info.
|
||||
/// </summary>
|
||||
public PackVersion WithDeprecation(string deprecatedBy, string? reason, DateTimeOffset? deprecatedAt = null)
|
||||
public PackVersion WithDeprecation(string deprecatedBy, string? reason, DateTimeOffset deprecatedAt)
|
||||
{
|
||||
var now = deprecatedAt ?? DateTimeOffset.UtcNow;
|
||||
return this with
|
||||
{
|
||||
Status = PackVersionStatus.Deprecated,
|
||||
UpdatedAt = now,
|
||||
UpdatedAt = deprecatedAt,
|
||||
UpdatedBy = deprecatedBy,
|
||||
DeprecatedAt = now,
|
||||
DeprecatedAt = deprecatedAt,
|
||||
DeprecatedBy = deprecatedBy,
|
||||
DeprecationReason = reason
|
||||
};
|
||||
@@ -315,16 +317,15 @@ public sealed record PackVersion(
|
||||
string signatureUri,
|
||||
string signatureAlgorithm,
|
||||
string signedBy,
|
||||
DateTimeOffset? signedAt = null)
|
||||
DateTimeOffset signedAt)
|
||||
{
|
||||
var now = signedAt ?? DateTimeOffset.UtcNow;
|
||||
return this with
|
||||
{
|
||||
SignatureUri = signatureUri,
|
||||
SignatureAlgorithm = signatureAlgorithm,
|
||||
SignedBy = signedBy,
|
||||
SignedAt = now,
|
||||
UpdatedAt = now,
|
||||
SignedAt = signedAt,
|
||||
UpdatedAt = signedAt,
|
||||
UpdatedBy = signedBy
|
||||
};
|
||||
}
|
||||
|
||||
@@ -122,7 +122,7 @@ public sealed record PackRun(
|
||||
LeaseId: null,
|
||||
TaskRunnerId: null,
|
||||
LeaseUntil: null,
|
||||
CreatedAt: createdAt ?? DateTimeOffset.UtcNow,
|
||||
CreatedAt: createdAt ?? throw new ArgumentNullException(nameof(createdAt), "createdAt must be provided for deterministic behavior."),
|
||||
ScheduledAt: null,
|
||||
LeasedAt: null,
|
||||
StartedAt: null,
|
||||
|
||||
@@ -71,7 +71,7 @@ public sealed record PackRunLog(
|
||||
Message: message,
|
||||
Digest: digest,
|
||||
SizeBytes: sizeBytes,
|
||||
Timestamp: timestamp ?? DateTimeOffset.UtcNow,
|
||||
Timestamp: timestamp ?? throw new ArgumentNullException(nameof(timestamp), "timestamp must be provided for deterministic behavior."),
|
||||
Data: data);
|
||||
}
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ public sealed record ReplayInputsLock(
|
||||
return new ReplayInputsLock(
|
||||
SchemaVersion: schemaVersion,
|
||||
ManifestHash: manifest.ComputeHash(hasher),
|
||||
CreatedAt: createdAt ?? DateTimeOffset.UtcNow,
|
||||
CreatedAt: createdAt ?? throw new ArgumentNullException(nameof(createdAt), "createdAt must be provided for deterministic behavior."),
|
||||
Inputs: manifest.Inputs,
|
||||
Notes: string.IsNullOrWhiteSpace(notes) ? null : notes);
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ public sealed record ReplayManifest(
|
||||
SchemaVersion: schemaVersion,
|
||||
JobId: jobId,
|
||||
ReplayOf: replayOf,
|
||||
CreatedAt: createdAt ?? DateTimeOffset.UtcNow,
|
||||
CreatedAt: createdAt ?? throw new ArgumentNullException(nameof(createdAt), "createdAt must be provided for deterministic behavior."),
|
||||
Reason: string.IsNullOrWhiteSpace(reason) ? null : reason,
|
||||
Inputs: inputs,
|
||||
Artifacts: artifacts is null ? ImmutableArray<ReplayArtifact>.Empty : ImmutableArray.CreateRange(artifacts));
|
||||
|
||||
@@ -83,6 +83,7 @@ public sealed record RunLedgerEntry(
|
||||
string inputDigest,
|
||||
long sequenceNumber,
|
||||
string? previousEntryHash,
|
||||
DateTimeOffset ledgerCreatedAt,
|
||||
string? metadata = null)
|
||||
{
|
||||
if (run.CompletedAt is null)
|
||||
@@ -91,7 +92,6 @@ public sealed record RunLedgerEntry(
|
||||
}
|
||||
|
||||
var ledgerId = Guid.NewGuid();
|
||||
var ledgerCreatedAt = DateTimeOffset.UtcNow;
|
||||
|
||||
// Build artifact manifest
|
||||
var artifactManifest = BuildArtifactManifest(artifacts);
|
||||
@@ -259,6 +259,7 @@ public sealed record LedgerExport(
|
||||
string tenantId,
|
||||
string format,
|
||||
string requestedBy,
|
||||
DateTimeOffset requestedAt,
|
||||
DateTimeOffset? startTime = null,
|
||||
DateTimeOffset? endTime = null,
|
||||
string? runTypeFilter = null,
|
||||
@@ -289,7 +290,7 @@ public sealed record LedgerExport(
|
||||
OutputDigest: null,
|
||||
OutputSizeBytes: null,
|
||||
RequestedBy: requestedBy,
|
||||
RequestedAt: DateTimeOffset.UtcNow,
|
||||
RequestedAt: requestedAt,
|
||||
StartedAt: null,
|
||||
CompletedAt: null,
|
||||
ErrorMessage: null);
|
||||
@@ -298,33 +299,33 @@ public sealed record LedgerExport(
|
||||
/// <summary>
|
||||
/// Marks the export as started.
|
||||
/// </summary>
|
||||
public LedgerExport Start() => this with
|
||||
public LedgerExport Start(DateTimeOffset startedAt) => this with
|
||||
{
|
||||
Status = LedgerExportStatus.Processing,
|
||||
StartedAt = DateTimeOffset.UtcNow
|
||||
StartedAt = startedAt
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Marks the export as completed.
|
||||
/// </summary>
|
||||
public LedgerExport Complete(string outputUri, string outputDigest, long outputSizeBytes, int entryCount) => this with
|
||||
public LedgerExport Complete(string outputUri, string outputDigest, long outputSizeBytes, int entryCount, DateTimeOffset completedAt) => this with
|
||||
{
|
||||
Status = LedgerExportStatus.Completed,
|
||||
OutputUri = outputUri,
|
||||
OutputDigest = outputDigest,
|
||||
OutputSizeBytes = outputSizeBytes,
|
||||
EntryCount = entryCount,
|
||||
CompletedAt = DateTimeOffset.UtcNow
|
||||
CompletedAt = completedAt
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Marks the export as failed.
|
||||
/// </summary>
|
||||
public LedgerExport Fail(string errorMessage) => this with
|
||||
public LedgerExport Fail(string errorMessage, DateTimeOffset failedAt) => this with
|
||||
{
|
||||
Status = LedgerExportStatus.Failed,
|
||||
ErrorMessage = errorMessage,
|
||||
CompletedAt = DateTimeOffset.UtcNow
|
||||
CompletedAt = failedAt
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -66,6 +66,7 @@ public sealed record SignedManifest(
|
||||
/// </summary>
|
||||
public static SignedManifest CreateFromLedgerEntry(
|
||||
RunLedgerEntry ledger,
|
||||
DateTimeOffset createdAt,
|
||||
string? buildInfo = null,
|
||||
string? metadata = null)
|
||||
{
|
||||
@@ -95,7 +96,7 @@ public sealed record SignedManifest(
|
||||
SignatureAlgorithm: "none",
|
||||
Signature: string.Empty,
|
||||
KeyId: string.Empty,
|
||||
CreatedAt: DateTimeOffset.UtcNow,
|
||||
CreatedAt: createdAt,
|
||||
ExpiresAt: null,
|
||||
Metadata: metadata);
|
||||
}
|
||||
@@ -106,6 +107,7 @@ public sealed record SignedManifest(
|
||||
public static SignedManifest CreateFromExport(
|
||||
LedgerExport export,
|
||||
IReadOnlyList<RunLedgerEntry> entries,
|
||||
DateTimeOffset createdAt,
|
||||
string? buildInfo = null,
|
||||
string? metadata = null)
|
||||
{
|
||||
@@ -114,7 +116,7 @@ public sealed record SignedManifest(
|
||||
throw new InvalidOperationException("Cannot create manifest from incomplete export.");
|
||||
}
|
||||
|
||||
var statements = CreateStatementsFromExport(export, entries);
|
||||
var statements = CreateStatementsFromExport(export, entries, createdAt);
|
||||
var artifacts = CreateExportArtifacts(export);
|
||||
var materials = CreateExportMaterials(entries);
|
||||
|
||||
@@ -140,7 +142,7 @@ public sealed record SignedManifest(
|
||||
SignatureAlgorithm: "none",
|
||||
Signature: string.Empty,
|
||||
KeyId: string.Empty,
|
||||
CreatedAt: DateTimeOffset.UtcNow,
|
||||
CreatedAt: createdAt,
|
||||
ExpiresAt: null,
|
||||
Metadata: metadata);
|
||||
}
|
||||
@@ -180,9 +182,9 @@ public sealed record SignedManifest(
|
||||
public bool IsSigned => !string.IsNullOrEmpty(Signature) && SignatureAlgorithm != "none";
|
||||
|
||||
/// <summary>
|
||||
/// Checks if the manifest has expired.
|
||||
/// Checks if the manifest has expired at the given time.
|
||||
/// </summary>
|
||||
public bool IsExpired => ExpiresAt.HasValue && ExpiresAt.Value < DateTimeOffset.UtcNow;
|
||||
public bool IsExpiredAt(DateTimeOffset now) => ExpiresAt.HasValue && ExpiresAt.Value < now;
|
||||
|
||||
/// <summary>
|
||||
/// Verifies the payload digest integrity.
|
||||
@@ -281,8 +283,9 @@ public sealed record SignedManifest(
|
||||
return JsonSerializer.Serialize(materials);
|
||||
}
|
||||
|
||||
private static string CreateStatementsFromExport(LedgerExport export, IReadOnlyList<RunLedgerEntry> entries)
|
||||
private static string CreateStatementsFromExport(LedgerExport export, IReadOnlyList<RunLedgerEntry> entries, DateTimeOffset createdAt)
|
||||
{
|
||||
var timestamp = export.CompletedAt ?? createdAt;
|
||||
var statements = new List<ProvenanceStatement>
|
||||
{
|
||||
new(
|
||||
@@ -290,7 +293,7 @@ public sealed record SignedManifest(
|
||||
Subject: $"export:{export.ExportId}",
|
||||
Predicate: "contains",
|
||||
Object: $"entries:{entries.Count}",
|
||||
Timestamp: export.CompletedAt ?? DateTimeOffset.UtcNow,
|
||||
Timestamp: timestamp,
|
||||
Metadata: JsonSerializer.Serialize(new
|
||||
{
|
||||
export.Format,
|
||||
@@ -314,7 +317,7 @@ public sealed record SignedManifest(
|
||||
Subject: $"export:{export.ExportId}",
|
||||
Predicate: "covers",
|
||||
Object: $"sequence:{first.SequenceNumber}-{last.SequenceNumber}",
|
||||
Timestamp: export.CompletedAt ?? DateTimeOffset.UtcNow,
|
||||
Timestamp: timestamp,
|
||||
Metadata: JsonSerializer.Serialize(new
|
||||
{
|
||||
FirstEntryHash = first.ContentHash,
|
||||
|
||||
@@ -116,13 +116,13 @@ public sealed record Slo(
|
||||
double target,
|
||||
SloWindow window,
|
||||
string createdBy,
|
||||
DateTimeOffset createdAt,
|
||||
string? description = null,
|
||||
string? jobType = null,
|
||||
Guid? sourceId = null)
|
||||
{
|
||||
ValidateTarget(target);
|
||||
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
return new Slo(
|
||||
SloId: Guid.NewGuid(),
|
||||
TenantId: tenantId,
|
||||
@@ -137,8 +137,8 @@ public sealed record Slo(
|
||||
LatencyTargetSeconds: null,
|
||||
ThroughputMinimum: null,
|
||||
Enabled: true,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
CreatedAt: createdAt,
|
||||
UpdatedAt: createdAt,
|
||||
CreatedBy: createdBy,
|
||||
UpdatedBy: createdBy);
|
||||
}
|
||||
@@ -152,6 +152,7 @@ public sealed record Slo(
|
||||
double target,
|
||||
SloWindow window,
|
||||
string createdBy,
|
||||
DateTimeOffset createdAt,
|
||||
string? description = null,
|
||||
string? jobType = null,
|
||||
Guid? sourceId = null)
|
||||
@@ -162,7 +163,6 @@ public sealed record Slo(
|
||||
if (targetSeconds <= 0)
|
||||
throw new ArgumentOutOfRangeException(nameof(targetSeconds), "Target latency must be positive");
|
||||
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
return new Slo(
|
||||
SloId: Guid.NewGuid(),
|
||||
TenantId: tenantId,
|
||||
@@ -177,8 +177,8 @@ public sealed record Slo(
|
||||
LatencyTargetSeconds: targetSeconds,
|
||||
ThroughputMinimum: null,
|
||||
Enabled: true,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
CreatedAt: createdAt,
|
||||
UpdatedAt: createdAt,
|
||||
CreatedBy: createdBy,
|
||||
UpdatedBy: createdBy);
|
||||
}
|
||||
@@ -191,6 +191,7 @@ public sealed record Slo(
|
||||
double target,
|
||||
SloWindow window,
|
||||
string createdBy,
|
||||
DateTimeOffset createdAt,
|
||||
string? description = null,
|
||||
string? jobType = null,
|
||||
Guid? sourceId = null)
|
||||
@@ -199,7 +200,6 @@ public sealed record Slo(
|
||||
if (minimum <= 0)
|
||||
throw new ArgumentOutOfRangeException(nameof(minimum), "Throughput minimum must be positive");
|
||||
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
return new Slo(
|
||||
SloId: Guid.NewGuid(),
|
||||
TenantId: tenantId,
|
||||
@@ -214,14 +214,15 @@ public sealed record Slo(
|
||||
LatencyTargetSeconds: null,
|
||||
ThroughputMinimum: minimum,
|
||||
Enabled: true,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
CreatedAt: createdAt,
|
||||
UpdatedAt: createdAt,
|
||||
CreatedBy: createdBy,
|
||||
UpdatedBy: createdBy);
|
||||
}
|
||||
|
||||
/// <summary>Updates the SLO with new values.</summary>
|
||||
public Slo Update(
|
||||
DateTimeOffset updatedAt,
|
||||
string? name = null,
|
||||
string? description = null,
|
||||
double? target = null,
|
||||
@@ -237,26 +238,26 @@ public sealed record Slo(
|
||||
Description = description ?? Description,
|
||||
Target = target ?? Target,
|
||||
Enabled = enabled ?? Enabled,
|
||||
UpdatedAt = DateTimeOffset.UtcNow,
|
||||
UpdatedAt = updatedAt,
|
||||
UpdatedBy = updatedBy ?? UpdatedBy
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>Disables the SLO.</summary>
|
||||
public Slo Disable(string updatedBy) =>
|
||||
public Slo Disable(string updatedBy, DateTimeOffset updatedAt) =>
|
||||
this with
|
||||
{
|
||||
Enabled = false,
|
||||
UpdatedAt = DateTimeOffset.UtcNow,
|
||||
UpdatedAt = updatedAt,
|
||||
UpdatedBy = updatedBy
|
||||
};
|
||||
|
||||
/// <summary>Enables the SLO.</summary>
|
||||
public Slo Enable(string updatedBy) =>
|
||||
public Slo Enable(string updatedBy, DateTimeOffset updatedAt) =>
|
||||
this with
|
||||
{
|
||||
Enabled = true,
|
||||
UpdatedAt = DateTimeOffset.UtcNow,
|
||||
UpdatedAt = updatedAt,
|
||||
UpdatedBy = updatedBy
|
||||
};
|
||||
|
||||
@@ -414,6 +415,7 @@ public sealed record AlertBudgetThreshold(
|
||||
double budgetConsumedThreshold,
|
||||
AlertSeverity severity,
|
||||
string createdBy,
|
||||
DateTimeOffset createdAt,
|
||||
double? burnRateThreshold = null,
|
||||
string? notificationChannel = null,
|
||||
string? notificationEndpoint = null,
|
||||
@@ -422,7 +424,6 @@ public sealed record AlertBudgetThreshold(
|
||||
if (budgetConsumedThreshold < 0 || budgetConsumedThreshold > 1)
|
||||
throw new ArgumentOutOfRangeException(nameof(budgetConsumedThreshold), "Threshold must be between 0 and 1");
|
||||
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
return new AlertBudgetThreshold(
|
||||
ThresholdId: Guid.NewGuid(),
|
||||
SloId: sloId,
|
||||
@@ -435,8 +436,8 @@ public sealed record AlertBudgetThreshold(
|
||||
NotificationEndpoint: notificationEndpoint,
|
||||
Cooldown: cooldown ?? TimeSpan.FromHours(1),
|
||||
LastTriggeredAt: null,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
CreatedAt: createdAt,
|
||||
UpdatedAt: createdAt,
|
||||
CreatedBy: createdBy,
|
||||
UpdatedBy: createdBy);
|
||||
}
|
||||
|
||||
@@ -70,7 +70,8 @@ public sealed record Watermark(
|
||||
Guid? sourceId,
|
||||
string? jobType,
|
||||
DateTimeOffset highWatermark,
|
||||
string createdBy)
|
||||
string createdBy,
|
||||
DateTimeOffset createdAt)
|
||||
{
|
||||
var scopeKey = (sourceId, jobType) switch
|
||||
{
|
||||
@@ -80,7 +81,6 @@ public sealed record Watermark(
|
||||
_ => throw new ArgumentException("Either sourceId or jobType must be specified.")
|
||||
};
|
||||
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
return new Watermark(
|
||||
WatermarkId: Guid.NewGuid(),
|
||||
TenantId: tenantId,
|
||||
@@ -92,8 +92,8 @@ public sealed record Watermark(
|
||||
SequenceNumber: 0,
|
||||
ProcessedCount: 0,
|
||||
LastBatchHash: null,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
CreatedAt: createdAt,
|
||||
UpdatedAt: createdAt,
|
||||
UpdatedBy: createdBy);
|
||||
}
|
||||
|
||||
@@ -104,7 +104,8 @@ public sealed record Watermark(
|
||||
DateTimeOffset newHighWatermark,
|
||||
long eventsProcessed,
|
||||
string? batchHash,
|
||||
string updatedBy)
|
||||
string updatedBy,
|
||||
DateTimeOffset updatedAt)
|
||||
{
|
||||
if (newHighWatermark < HighWatermark)
|
||||
throw new ArgumentException("New high watermark cannot be before current high watermark.", nameof(newHighWatermark));
|
||||
@@ -115,7 +116,7 @@ public sealed record Watermark(
|
||||
SequenceNumber = SequenceNumber + 1,
|
||||
ProcessedCount = ProcessedCount + eventsProcessed,
|
||||
LastBatchHash = batchHash,
|
||||
UpdatedAt = DateTimeOffset.UtcNow,
|
||||
UpdatedAt = updatedAt,
|
||||
UpdatedBy = updatedBy
|
||||
};
|
||||
}
|
||||
@@ -123,7 +124,7 @@ public sealed record Watermark(
|
||||
/// <summary>
|
||||
/// Sets the event-time window bounds.
|
||||
/// </summary>
|
||||
public Watermark WithWindow(DateTimeOffset lowWatermark, DateTimeOffset highWatermark)
|
||||
public Watermark WithWindow(DateTimeOffset lowWatermark, DateTimeOffset highWatermark, DateTimeOffset updatedAt)
|
||||
{
|
||||
if (highWatermark < lowWatermark)
|
||||
throw new ArgumentException("High watermark cannot be before low watermark.");
|
||||
@@ -132,7 +133,7 @@ public sealed record Watermark(
|
||||
{
|
||||
LowWatermark = lowWatermark,
|
||||
HighWatermark = highWatermark,
|
||||
UpdatedAt = DateTimeOffset.UtcNow
|
||||
UpdatedAt = updatedAt
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ public sealed record EventEnvelope(
|
||||
ArgumentNullException.ThrowIfNull(job);
|
||||
ArgumentNullException.ThrowIfNull(actor);
|
||||
|
||||
var occurred = occurredAt ?? DateTimeOffset.UtcNow;
|
||||
var occurred = occurredAt ?? throw new ArgumentNullException(nameof(occurredAt), "occurredAt must be provided for deterministic behavior.");
|
||||
var evtId = string.IsNullOrWhiteSpace(eventId) ? Guid.NewGuid().ToString() : eventId!;
|
||||
var key = string.IsNullOrWhiteSpace(idempotencyKey)
|
||||
? ComputeIdempotencyKey(eventType, job.Id, job.Attempt)
|
||||
|
||||
@@ -195,17 +195,20 @@ public sealed class JobAttestationService : IJobAttestationService
|
||||
private readonly IJobAttestationSigner _signer;
|
||||
private readonly IJobAttestationStore _store;
|
||||
private readonly ITimelineEventEmitter _timelineEmitter;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly ILogger<JobAttestationService> _logger;
|
||||
|
||||
public JobAttestationService(
|
||||
IJobAttestationSigner signer,
|
||||
IJobAttestationStore store,
|
||||
ITimelineEventEmitter timelineEmitter,
|
||||
TimeProvider timeProvider,
|
||||
ILogger<JobAttestationService> logger)
|
||||
{
|
||||
_signer = signer ?? throw new ArgumentNullException(nameof(signer));
|
||||
_store = store ?? throw new ArgumentNullException(nameof(store));
|
||||
_timelineEmitter = timelineEmitter ?? throw new ArgumentNullException(nameof(timelineEmitter));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
@@ -229,7 +232,7 @@ public sealed class JobAttestationService : IJobAttestationService
|
||||
Status: request.Status,
|
||||
ExitCode: request.ExitCode,
|
||||
StartedAt: request.StartedAt,
|
||||
CompletedAt: request.CompletedAt ?? DateTimeOffset.UtcNow,
|
||||
CompletedAt: request.CompletedAt ?? _timeProvider.GetUtcNow(),
|
||||
DurationSeconds: request.DurationSeconds,
|
||||
InputHash: ComputePayloadHash(request.InputPayloadJson),
|
||||
OutputHash: ComputePayloadHash(request.OutputPayloadJson),
|
||||
@@ -318,7 +321,7 @@ public sealed class JobAttestationService : IJobAttestationService
|
||||
jobType = request.JobType,
|
||||
tenantId = request.TenantId,
|
||||
projectId = request.ProjectId,
|
||||
scheduledAt = DateTimeOffset.UtcNow,
|
||||
scheduledAt = _timeProvider.GetUtcNow(),
|
||||
inputHash = ComputePayloadHash(request.InputPayloadJson)
|
||||
};
|
||||
|
||||
@@ -379,7 +382,7 @@ public sealed class JobAttestationService : IJobAttestationService
|
||||
runId,
|
||||
tenantId,
|
||||
projectId,
|
||||
completedAt = DateTimeOffset.UtcNow,
|
||||
completedAt = _timeProvider.GetUtcNow(),
|
||||
jobCount = jobAttestations.Count,
|
||||
jobs = jobAttestations.Select(a => new
|
||||
{
|
||||
@@ -486,7 +489,7 @@ public sealed class JobAttestationService : IJobAttestationService
|
||||
var keyId = primarySignature?.KeyId;
|
||||
|
||||
// Check age
|
||||
var age = DateTimeOffset.UtcNow - attestation.CreatedAt;
|
||||
var age = _timeProvider.GetUtcNow() - attestation.CreatedAt;
|
||||
if (age > TimeSpan.FromDays(365))
|
||||
{
|
||||
warnings.Add($"Attestation is older than 1 year ({age.Days} days)");
|
||||
@@ -557,7 +560,7 @@ public sealed class JobAttestationService : IJobAttestationService
|
||||
PredicateType: predicateType,
|
||||
Subjects: subjects.Select(s => new AttestationSubject(s.Name, s.Digest)).ToList(),
|
||||
Envelope: envelope,
|
||||
CreatedAt: DateTimeOffset.UtcNow,
|
||||
CreatedAt: _timeProvider.GetUtcNow(),
|
||||
PayloadDigest: payloadDigest,
|
||||
EvidencePointer: null);
|
||||
}
|
||||
|
||||
@@ -76,6 +76,7 @@ public sealed record JobCapsule(
|
||||
string jobType,
|
||||
JobCapsuleKind kind,
|
||||
JobCapsuleInputs inputs,
|
||||
DateTimeOffset createdAt,
|
||||
JobCapsuleOutputs? outputs = null,
|
||||
IReadOnlyList<JobCapsuleArtifact>? artifacts = null,
|
||||
IReadOnlyList<JobCapsuleTimelineEntry>? timelineEntries = null,
|
||||
@@ -85,7 +86,6 @@ public sealed record JobCapsule(
|
||||
IReadOnlyDictionary<string, string>? metadata = null)
|
||||
{
|
||||
var capsuleId = Guid.NewGuid();
|
||||
var createdAt = DateTimeOffset.UtcNow;
|
||||
|
||||
// Compute root hash from all materials
|
||||
var rootHash = ComputeRootHash(
|
||||
|
||||
@@ -113,12 +113,14 @@ public sealed class JobCapsuleGenerator : IJobCapsuleGenerator
|
||||
private readonly IJobCapsuleStore _store;
|
||||
private readonly ITimelineEventEmitter? _timelineEmitter;
|
||||
private readonly ISnapshotHookInvoker? _snapshotHooks;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly ILogger<JobCapsuleGenerator> _logger;
|
||||
private readonly JobCapsuleGeneratorOptions _options;
|
||||
|
||||
public JobCapsuleGenerator(
|
||||
IJobRedactionGuard redactionGuard,
|
||||
IJobCapsuleStore store,
|
||||
TimeProvider timeProvider,
|
||||
ILogger<JobCapsuleGenerator> logger,
|
||||
ITimelineEventEmitter? timelineEmitter = null,
|
||||
ISnapshotHookInvoker? snapshotHooks = null,
|
||||
@@ -126,6 +128,7 @@ public sealed class JobCapsuleGenerator : IJobCapsuleGenerator
|
||||
{
|
||||
_redactionGuard = redactionGuard ?? throw new ArgumentNullException(nameof(redactionGuard));
|
||||
_store = store ?? throw new ArgumentNullException(nameof(store));
|
||||
_timeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_timelineEmitter = timelineEmitter;
|
||||
_snapshotHooks = snapshotHooks;
|
||||
@@ -164,6 +167,7 @@ public sealed class JobCapsuleGenerator : IJobCapsuleGenerator
|
||||
jobType: request.JobType,
|
||||
kind: JobCapsuleKind.JobScheduling,
|
||||
inputs: inputs,
|
||||
createdAt: _timeProvider.GetUtcNow(),
|
||||
timelineEntries: timelineEntries,
|
||||
policyResults: request.PolicyResults,
|
||||
projectId: request.ProjectId,
|
||||
@@ -239,6 +243,7 @@ public sealed class JobCapsuleGenerator : IJobCapsuleGenerator
|
||||
jobType: request.JobType,
|
||||
kind: JobCapsuleKind.JobCompletion,
|
||||
inputs: inputs,
|
||||
createdAt: _timeProvider.GetUtcNow(),
|
||||
outputs: outputs,
|
||||
artifacts: artifacts,
|
||||
timelineEntries: timelineEntries,
|
||||
@@ -323,6 +328,7 @@ public sealed class JobCapsuleGenerator : IJobCapsuleGenerator
|
||||
jobType: request.JobType,
|
||||
kind: JobCapsuleKind.JobFailure,
|
||||
inputs: inputs,
|
||||
createdAt: _timeProvider.GetUtcNow(),
|
||||
outputs: outputs,
|
||||
timelineEntries: timelineEntries,
|
||||
policyResults: request.PolicyResults,
|
||||
@@ -409,6 +415,7 @@ public sealed class JobCapsuleGenerator : IJobCapsuleGenerator
|
||||
jobType: "run.completion",
|
||||
kind: JobCapsuleKind.RunCompletion,
|
||||
inputs: inputs,
|
||||
createdAt: _timeProvider.GetUtcNow(),
|
||||
artifacts: jobRefs,
|
||||
projectId: projectId,
|
||||
runId: runId,
|
||||
|
||||
@@ -212,6 +212,7 @@ public sealed record IncidentModeHooksOptions
|
||||
public sealed class IncidentModeHooks : IIncidentModeHooks
|
||||
{
|
||||
private readonly ITimelineEventEmitter _eventEmitter;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly ILogger<IncidentModeHooks> _logger;
|
||||
private readonly IncidentModeHooksOptions _options;
|
||||
private readonly Dictionary<string, IncidentModeState> _tenantStates = new();
|
||||
@@ -220,10 +221,12 @@ public sealed class IncidentModeHooks : IIncidentModeHooks
|
||||
|
||||
public IncidentModeHooks(
|
||||
ITimelineEventEmitter eventEmitter,
|
||||
TimeProvider timeProvider,
|
||||
ILogger<IncidentModeHooks> logger,
|
||||
IncidentModeHooksOptions? options = null)
|
||||
{
|
||||
_eventEmitter = eventEmitter ?? throw new ArgumentNullException(nameof(eventEmitter));
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_options = options ?? new IncidentModeHooksOptions();
|
||||
}
|
||||
@@ -250,7 +253,7 @@ public sealed class IncidentModeHooks : IIncidentModeHooks
|
||||
{
|
||||
if (_lastActivations.TryGetValue(tenantId, out var lastActivation))
|
||||
{
|
||||
var timeSinceLastActivation = DateTimeOffset.UtcNow - lastActivation;
|
||||
var timeSinceLastActivation = _timeProvider.GetUtcNow() - lastActivation;
|
||||
if (timeSinceLastActivation < _options.ReactivationCooldown)
|
||||
{
|
||||
_logger.LogDebug(
|
||||
@@ -298,7 +301,7 @@ public sealed class IncidentModeHooks : IIncidentModeHooks
|
||||
IncidentModeSource source,
|
||||
TimeSpan ttl)
|
||||
{
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var expiresAt = now + ttl;
|
||||
|
||||
var newState = new IncidentModeState(
|
||||
@@ -372,7 +375,7 @@ public sealed class IncidentModeHooks : IIncidentModeHooks
|
||||
{
|
||||
// Check if expired
|
||||
if (state.IsActive && state.ExpiresAt.HasValue &&
|
||||
DateTimeOffset.UtcNow >= state.ExpiresAt.Value)
|
||||
_timeProvider.GetUtcNow() >= state.ExpiresAt.Value)
|
||||
{
|
||||
_tenantStates[tenantId] = IncidentModeState.Inactive;
|
||||
return IncidentModeState.Inactive;
|
||||
@@ -422,7 +425,7 @@ public sealed class IncidentModeHooks : IIncidentModeHooks
|
||||
TenantId: tenantId,
|
||||
EventType: eventType,
|
||||
Source: "orchestrator",
|
||||
OccurredAt: DateTimeOffset.UtcNow,
|
||||
OccurredAt: _timeProvider.GetUtcNow(),
|
||||
ReceivedAt: null,
|
||||
CorrelationId: Guid.NewGuid().ToString(),
|
||||
TraceId: null,
|
||||
@@ -462,8 +465,9 @@ public sealed class IncidentModeHooks : IIncidentModeHooks
|
||||
string actor,
|
||||
string reason)
|
||||
{
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
var duration = previousState.ActivatedAt.HasValue
|
||||
? DateTimeOffset.UtcNow - previousState.ActivatedAt.Value
|
||||
? now - previousState.ActivatedAt.Value
|
||||
: TimeSpan.Zero;
|
||||
|
||||
var @event = new TimelineEvent(
|
||||
@@ -472,7 +476,7 @@ public sealed class IncidentModeHooks : IIncidentModeHooks
|
||||
TenantId: tenantId,
|
||||
EventType: "orchestrator.incident_mode.deactivated",
|
||||
Source: "orchestrator",
|
||||
OccurredAt: DateTimeOffset.UtcNow,
|
||||
OccurredAt: now,
|
||||
ReceivedAt: null,
|
||||
CorrelationId: Guid.NewGuid().ToString(),
|
||||
TraceId: null,
|
||||
|
||||
@@ -361,7 +361,11 @@ public sealed class HourlyCounter
|
||||
|
||||
MaxPerHour = maxPerHour;
|
||||
_currentCount = currentCount;
|
||||
_hourStart = hourStart ?? TruncateToHour(DateTimeOffset.UtcNow);
|
||||
|
||||
if (hourStart is null)
|
||||
throw new ArgumentNullException(nameof(hourStart), "hourStart must be provided for deterministic behavior.");
|
||||
|
||||
_hourStart = hourStart.Value;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -69,7 +69,11 @@ public sealed class TokenBucket
|
||||
BurstCapacity = burstCapacity;
|
||||
RefillRate = refillRate;
|
||||
_currentTokens = Math.Min(initialTokens ?? burstCapacity, burstCapacity);
|
||||
_lastRefillAt = lastRefillAt ?? DateTimeOffset.UtcNow;
|
||||
|
||||
if (lastRefillAt is null)
|
||||
throw new ArgumentNullException(nameof(lastRefillAt), "lastRefillAt must be provided for deterministic behavior.");
|
||||
|
||||
_lastRefillAt = lastRefillAt.Value;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -6,15 +6,18 @@ namespace StellaOps.Orchestrator.Core.Scale;
|
||||
public sealed class LoadShedder
|
||||
{
|
||||
private readonly ScaleMetrics _scaleMetrics;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly LoadShedderOptions _options;
|
||||
private volatile LoadShedState _currentState = LoadShedState.Normal;
|
||||
private DateTimeOffset _lastStateChange = DateTimeOffset.UtcNow;
|
||||
private DateTimeOffset _lastStateChange;
|
||||
private readonly object _lock = new();
|
||||
|
||||
public LoadShedder(ScaleMetrics scaleMetrics, LoadShedderOptions? options = null)
|
||||
public LoadShedder(ScaleMetrics scaleMetrics, TimeProvider? timeProvider = null, LoadShedderOptions? options = null)
|
||||
{
|
||||
_scaleMetrics = scaleMetrics;
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
_options = options ?? LoadShedderOptions.Default;
|
||||
_lastStateChange = _timeProvider.GetUtcNow();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -108,7 +111,7 @@ public sealed class LoadShedder
|
||||
lock (_lock)
|
||||
{
|
||||
// Hysteresis: require sustained condition for state changes
|
||||
var timeSinceLastChange = DateTimeOffset.UtcNow - _lastStateChange;
|
||||
var timeSinceLastChange = _timeProvider.GetUtcNow() - _lastStateChange;
|
||||
|
||||
// Going up (worse) is immediate; going down (better) requires cooldown
|
||||
var isImproving = newState < _currentState;
|
||||
@@ -119,7 +122,7 @@ public sealed class LoadShedder
|
||||
}
|
||||
|
||||
_currentState = newState;
|
||||
_lastStateChange = DateTimeOffset.UtcNow;
|
||||
_lastStateChange = _timeProvider.GetUtcNow();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -131,7 +134,7 @@ public sealed class LoadShedder
|
||||
lock (_lock)
|
||||
{
|
||||
_currentState = state;
|
||||
_lastStateChange = DateTimeOffset.UtcNow;
|
||||
_lastStateChange = _timeProvider.GetUtcNow();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,12 +11,22 @@ public sealed class ScaleMetrics
|
||||
private readonly ConcurrentQueue<LatencySample> _dispatchLatencies = new();
|
||||
private readonly ConcurrentDictionary<string, long> _queueDepths = new();
|
||||
private readonly ConcurrentDictionary<string, long> _activeJobs = new();
|
||||
private readonly TimeProvider _timeProvider;
|
||||
private readonly object _lock = new();
|
||||
|
||||
// Keep samples for the last 5 minutes
|
||||
private static readonly TimeSpan SampleWindow = TimeSpan.FromMinutes(5);
|
||||
private const int MaxSamples = 10000;
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new ScaleMetrics instance.
|
||||
/// </summary>
|
||||
/// <param name="timeProvider">Time provider for deterministic time.</param>
|
||||
public ScaleMetrics(TimeProvider? timeProvider = null)
|
||||
{
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Records a dispatch latency sample.
|
||||
/// </summary>
|
||||
@@ -26,7 +36,7 @@ public sealed class ScaleMetrics
|
||||
public void RecordDispatchLatency(TimeSpan latency, string tenantId, string? jobType = null)
|
||||
{
|
||||
var sample = new LatencySample(
|
||||
Timestamp: DateTimeOffset.UtcNow,
|
||||
Timestamp: _timeProvider.GetUtcNow(),
|
||||
LatencyMs: latency.TotalMilliseconds,
|
||||
TenantId: tenantId,
|
||||
JobType: jobType);
|
||||
@@ -88,7 +98,7 @@ public sealed class ScaleMetrics
|
||||
/// <param name="window">Time window for samples (default: 1 minute).</param>
|
||||
public LatencyPercentiles GetDispatchLatencyPercentiles(string? tenantId = null, TimeSpan? window = null)
|
||||
{
|
||||
var cutoff = DateTimeOffset.UtcNow - (window ?? TimeSpan.FromMinutes(1));
|
||||
var cutoff = _timeProvider.GetUtcNow() - (window ?? TimeSpan.FromMinutes(1));
|
||||
|
||||
var samples = _dispatchLatencies
|
||||
.Where(s => s.Timestamp >= cutoff)
|
||||
@@ -122,7 +132,7 @@ public sealed class ScaleMetrics
|
||||
var totalActiveJobs = _activeJobs.Values.Sum();
|
||||
|
||||
return new ScaleSnapshot(
|
||||
Timestamp: DateTimeOffset.UtcNow,
|
||||
Timestamp: _timeProvider.GetUtcNow(),
|
||||
TotalQueueDepth: totalQueueDepth,
|
||||
TotalActiveJobs: totalActiveJobs,
|
||||
DispatchLatency: percentiles,
|
||||
@@ -189,7 +199,7 @@ public sealed class ScaleMetrics
|
||||
// Double-check after acquiring lock
|
||||
if (_dispatchLatencies.Count <= MaxSamples) return;
|
||||
|
||||
var cutoff = DateTimeOffset.UtcNow - SampleWindow;
|
||||
var cutoff = _timeProvider.GetUtcNow() - SampleWindow;
|
||||
var toRemove = _dispatchLatencies.Count - MaxSamples / 2;
|
||||
|
||||
for (var i = 0; i < toRemove; i++)
|
||||
|
||||
@@ -98,13 +98,16 @@ public sealed class ExportJobService : IExportJobService
|
||||
{
|
||||
private readonly IJobRepository _jobRepository;
|
||||
private readonly IQuotaRepository _quotaRepository;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
public ExportJobService(
|
||||
IJobRepository jobRepository,
|
||||
IQuotaRepository quotaRepository)
|
||||
IQuotaRepository quotaRepository,
|
||||
TimeProvider? timeProvider = null)
|
||||
{
|
||||
_jobRepository = jobRepository;
|
||||
_quotaRepository = quotaRepository;
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
public async Task<Job> CreateExportJobAsync(
|
||||
@@ -128,7 +131,7 @@ public sealed class ExportJobService : IExportJobService
|
||||
|
||||
var payloadJson = payload.ToJson();
|
||||
var payloadDigest = payload.ComputeDigest();
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
|
||||
var job = new Job(
|
||||
JobId: Guid.NewGuid(),
|
||||
|
||||
Reference in New Issue
Block a user