release orchestration strengthening

This commit is contained in:
master
2026-01-17 21:32:03 +02:00
parent 195dff2457
commit da27b9faa9
256 changed files with 94634 additions and 2269 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,749 @@
# Drift Remediation Automation
## Overview
Drift Remediation Automation extends the existing drift detection system with intelligent, policy-driven automatic remediation. While drift detection identifies divergence between expected and actual state, remediation automation closes the loop by taking corrective action without manual intervention.
This is a best-in-class implementation that balances automation with safety, providing configurable remediation strategies, severity-based prioritization, and comprehensive audit trails.
---
## Design Principles
1. **Safety First**: Auto-remediation never executes without explicit policy authorization
2. **Gradual Escalation**: Start with notifications, escalate to remediation based on drift age/severity
3. **Deterministic Actions**: Remediation produces identical outcomes for identical drift states
4. **Full Auditability**: Every remediation action generates signed evidence packets
5. **Blast Radius Control**: Limit concurrent remediations; prevent cascading failures
6. **Human Override**: Operators can pause, cancel, or override any remediation
---
## Architecture
### Component Overview
```
┌─────────────────────────────────────────────────────────────────────┐
│ Drift Remediation System │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌──────────────────┐ ┌───────────────┐ │
│ │ DriftDetector │───▶│ RemediationEngine│───▶│ ActionExecutor│ │
│ │ (existing) │ │ │ │ │ │
│ └─────────────────┘ └──────────────────┘ └───────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌──────────────────┐ ┌───────────────┐ │
│ │ SeverityScorer │ │ PolicyEvaluator │ │ EvidenceWriter│ │
│ │ │ │ │ │ │ │
│ └─────────────────┘ └──────────────────┘ └───────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────┐ ┌──────────────────┐ ┌───────────────┐ │
│ │ AlertRouter │ │ ReconcileScheduler│ │ MetricsEmitter│ │
│ │ │ │ │ │ │ │
│ └─────────────────┘ └──────────────────┘ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
```
### Key Components
#### 1. SeverityScorer
Calculates drift severity based on multiple weighted factors:
```csharp
public sealed record DriftSeverity
{
public DriftSeverityLevel Level { get; init; } // Critical, High, Medium, Low, Info
public int Score { get; init; } // 0-100 numeric score
public ImmutableArray<SeverityFactor> Factors { get; init; }
public TimeSpan DriftAge { get; init; }
public bool RequiresImmediate { get; init; }
}
public enum DriftSeverityLevel
{
Info = 0, // Cosmetic differences (labels, annotations)
Low = 25, // Non-critical drift (resource limits changed)
Medium = 50, // Functional drift (ports, volumes)
High = 75, // Security drift (image digest mismatch)
Critical = 100 // Severe drift (container missing, wrong image)
}
```
**Severity Factors:**
| Factor | Weight | Description |
|--------|--------|-------------|
| Drift Type | 30% | Missing > Digest Mismatch > Status Mismatch > Unexpected |
| Drift Age | 25% | Older drift = higher severity |
| Environment Criticality | 20% | Production > Staging > Development |
| Component Criticality | 15% | Core services weighted higher |
| Blast Radius | 10% | Number of dependent services affected |
#### 2. RemediationPolicy
Defines when and how to remediate drift:
```csharp
public sealed record RemediationPolicy
{
public Guid Id { get; init; }
public string Name { get; init; }
public Guid EnvironmentId { get; init; }
// Triggers
public RemediationTrigger Trigger { get; init; }
public DriftSeverityLevel MinimumSeverity { get; init; }
public TimeSpan MinimumDriftAge { get; init; }
public TimeSpan MaximumDriftAge { get; init; } // Escalate to manual if exceeded
// Actions
public RemediationAction Action { get; init; }
public RemediationStrategy Strategy { get; init; }
// Safety limits
public int MaxConcurrentRemediations { get; init; }
public int MaxRemediationsPerHour { get; init; }
public TimeSpan CooldownPeriod { get; init; }
// Schedule
public RemediationWindow? MaintenanceWindow { get; init; }
public ImmutableArray<DayOfWeek> AllowedDays { get; init; }
public TimeOnly AllowedStartTime { get; init; }
public TimeOnly AllowedEndTime { get; init; }
// Notifications
public NotificationConfig Notifications { get; init; }
}
public enum RemediationTrigger
{
Immediate, // Remediate as soon as detected
Scheduled, // Wait for maintenance window
AgeThreshold, // Remediate after drift exceeds age
SeverityEscalation, // Remediate when severity increases
Manual // Notification only, human initiates
}
public enum RemediationAction
{
NotifyOnly, // Alert but don't act
Reconcile, // Restore to expected state
Rollback, // Rollback to previous known-good release
Scale, // Adjust replica count
Restart, // Restart containers
Quarantine // Isolate drifted targets from traffic
}
public enum RemediationStrategy
{
AllAtOnce, // Remediate all drifted targets simultaneously
Rolling, // Remediate one at a time with health checks
Canary, // Remediate one, verify, then proceed
BlueGreen // Deploy to standby, switch traffic
}
```
#### 3. RemediationEngine
Orchestrates the remediation process:
```csharp
public sealed class RemediationEngine
{
public async Task<RemediationPlan> CreatePlanAsync(
DriftReport driftReport,
RemediationPolicy policy,
CancellationToken ct)
{
// 1. Score severity for each drift item
var scoredDrifts = await _severityScorer.ScoreAsync(driftReport.Items, ct);
// 2. Filter by policy thresholds
var actionable = scoredDrifts
.Where(d => d.Severity.Level >= policy.MinimumSeverity)
.Where(d => d.Severity.DriftAge >= policy.MinimumDriftAge)
.ToImmutableArray();
// 3. Check maintenance window
if (!IsWithinMaintenanceWindow(policy))
return RemediationPlan.Deferred(actionable, policy.MaintenanceWindow);
// 4. Check rate limits
var allowed = await CheckRateLimitsAsync(actionable, policy, ct);
// 5. Build execution plan
return BuildExecutionPlan(allowed, policy);
}
public async Task<RemediationResult> ExecuteAsync(
RemediationPlan plan,
CancellationToken ct)
{
// Execute with blast radius control
var semaphore = new SemaphoreSlim(plan.Policy.MaxConcurrentRemediations);
var results = new ConcurrentBag<TargetRemediationResult>();
foreach (var batch in plan.Batches)
{
var tasks = batch.Targets.Select(async target =>
{
await semaphore.WaitAsync(ct);
try
{
return await RemediateTargetAsync(target, plan, ct);
}
finally
{
semaphore.Release();
}
});
var batchResults = await Task.WhenAll(tasks);
results.AddRange(batchResults);
// Health check between batches for rolling strategy
if (plan.Policy.Strategy == RemediationStrategy.Rolling)
{
await VerifyBatchHealthAsync(batchResults, ct);
}
}
// Generate evidence
var evidence = await _evidenceWriter.WriteAsync(plan, results, ct);
return new RemediationResult(plan.Id, results.ToImmutableArray(), evidence);
}
}
```
#### 4. ReconcileScheduler
Manages scheduled reconciliation runs:
```csharp
public sealed class ReconcileScheduler
{
private readonly TimeProvider _timeProvider;
private readonly IRemediationPolicyStore _policyStore;
private readonly IDriftDetector _driftDetector;
private readonly RemediationEngine _engine;
public async Task RunScheduledReconciliationAsync(CancellationToken ct)
{
var policies = await _policyStore.GetScheduledPoliciesAsync(ct);
foreach (var policy in policies)
{
if (!IsWithinWindow(policy))
continue;
// Detect drift
var inventory = await _inventoryService.GetCurrentAsync(policy.EnvironmentId, ct);
var expected = await _releaseService.GetExpectedStateAsync(policy.EnvironmentId, ct);
var drift = _driftDetector.Detect(inventory, expected);
if (drift.HasDrift)
{
var plan = await _engine.CreatePlanAsync(drift, policy, ct);
await _engine.ExecuteAsync(plan, ct);
}
}
}
}
```
---
## Data Models
### RemediationPlan
```csharp
public sealed record RemediationPlan
{
public Guid Id { get; init; }
public Guid DriftReportId { get; init; }
public RemediationPolicy Policy { get; init; }
public RemediationPlanStatus Status { get; init; }
public ImmutableArray<RemediationBatch> Batches { get; init; }
public DateTimeOffset CreatedAt { get; init; }
public DateTimeOffset? ScheduledFor { get; init; }
public DateTimeOffset? StartedAt { get; init; }
public DateTimeOffset? CompletedAt { get; init; }
public string? DeferralReason { get; init; }
}
public enum RemediationPlanStatus
{
Created,
Scheduled,
Deferred, // Waiting for maintenance window
Running,
Paused, // Human intervention requested
Succeeded,
PartialSuccess, // Some targets remediated, some failed
Failed,
Cancelled
}
public sealed record RemediationBatch
{
public int Order { get; init; }
public ImmutableArray<RemediationTarget> Targets { get; init; }
public TimeSpan? DelayAfter { get; init; }
public bool RequiresHealthCheck { get; init; }
}
public sealed record RemediationTarget
{
public Guid TargetId { get; init; }
public string TargetName { get; init; }
public DriftItem Drift { get; init; }
public DriftSeverity Severity { get; init; }
public RemediationAction Action { get; init; }
public string? ActionPayload { get; init; } // Compose file, rollback digest, etc.
}
```
### RemediationResult
```csharp
public sealed record RemediationResult
{
public Guid PlanId { get; init; }
public RemediationResultStatus Status { get; init; }
public ImmutableArray<TargetRemediationResult> TargetResults { get; init; }
public Guid EvidencePacketId { get; init; }
public TimeSpan Duration { get; init; }
public RemediationMetrics Metrics { get; init; }
}
public sealed record TargetRemediationResult
{
public Guid TargetId { get; init; }
public RemediationTargetStatus Status { get; init; }
public string? Error { get; init; }
public TimeSpan Duration { get; init; }
public string? PreviousDigest { get; init; }
public string? CurrentDigest { get; init; }
public ImmutableArray<string> Logs { get; init; }
}
public sealed record RemediationMetrics
{
public int TotalTargets { get; init; }
public int Succeeded { get; init; }
public int Failed { get; init; }
public int Skipped { get; init; }
public TimeSpan TotalDuration { get; init; }
public TimeSpan AverageTargetDuration { get; init; }
}
```
---
## API Design
### REST Endpoints
```
# Policies
POST /api/v1/remediation/policies # Create policy
GET /api/v1/remediation/policies # List policies
GET /api/v1/remediation/policies/{id} # Get policy
PUT /api/v1/remediation/policies/{id} # Update policy
DELETE /api/v1/remediation/policies/{id} # Delete policy
POST /api/v1/remediation/policies/{id}/activate # Activate policy
POST /api/v1/remediation/policies/{id}/deactivate # Deactivate policy
# Plans
GET /api/v1/remediation/plans # List plans
GET /api/v1/remediation/plans/{id} # Get plan details
POST /api/v1/remediation/plans/{id}/execute # Execute deferred plan
POST /api/v1/remediation/plans/{id}/pause # Pause running plan
POST /api/v1/remediation/plans/{id}/resume # Resume paused plan
POST /api/v1/remediation/plans/{id}/cancel # Cancel plan
# On-demand
POST /api/v1/remediation/preview # Preview remediation (dry-run)
POST /api/v1/remediation/execute # Execute immediate remediation
# History
GET /api/v1/remediation/history # List remediation history
GET /api/v1/remediation/history/{id} # Get remediation result
GET /api/v1/remediation/history/{id}/evidence # Get evidence packet
```
### WebSocket Events
```typescript
// Real-time remediation updates
interface RemediationEvent {
type: 'plan.created' | 'plan.started' | 'plan.completed' |
'target.started' | 'target.completed' | 'target.failed';
planId: string;
targetId?: string;
status: string;
progress?: number;
message?: string;
timestamp: string;
}
```
---
## Severity Scoring Algorithm
```csharp
public sealed class SeverityScorer
{
private readonly SeverityScoringConfig _config;
public DriftSeverity Score(DriftItem drift, ScoringContext context)
{
var factors = new List<SeverityFactor>();
var score = 0.0;
// Factor 1: Drift Type (30%)
var typeScore = drift.Type switch
{
DriftType.Missing => 100,
DriftType.DigestMismatch => 80,
DriftType.StatusMismatch => 50,
DriftType.Unexpected => 30,
_ => 10
};
factors.Add(new SeverityFactor("DriftType", typeScore, 0.30));
score += typeScore * 0.30;
// Factor 2: Drift Age (25%)
var ageScore = CalculateAgeScore(drift.DetectedAt, context.Now);
factors.Add(new SeverityFactor("DriftAge", ageScore, 0.25));
score += ageScore * 0.25;
// Factor 3: Environment Criticality (20%)
var envScore = context.Environment.Criticality switch
{
EnvironmentCriticality.Production => 100,
EnvironmentCriticality.Staging => 60,
EnvironmentCriticality.Development => 20,
_ => 10
};
factors.Add(new SeverityFactor("EnvironmentCriticality", envScore, 0.20));
score += envScore * 0.20;
// Factor 4: Component Criticality (15%)
var componentScore = context.ComponentCriticality.GetValueOrDefault(drift.ComponentId, 50);
factors.Add(new SeverityFactor("ComponentCriticality", componentScore, 0.15));
score += componentScore * 0.15;
// Factor 5: Blast Radius (10%)
var blastScore = CalculateBlastRadius(drift, context.DependencyGraph);
factors.Add(new SeverityFactor("BlastRadius", blastScore, 0.10));
score += blastScore * 0.10;
return new DriftSeverity
{
Level = ScoreToLevel((int)score),
Score = (int)score,
Factors = factors.ToImmutableArray(),
DriftAge = context.Now - drift.DetectedAt,
RequiresImmediate = score >= 90
};
}
private int CalculateAgeScore(DateTimeOffset detectedAt, DateTimeOffset now)
{
var age = now - detectedAt;
return age.TotalMinutes switch
{
< 5 => 10, // Very fresh - low urgency
< 30 => 30, // Recent
< 60 => 50, // 1 hour
< 240 => 70, // 4 hours
< 1440 => 85, // 24 hours
_ => 100 // > 24 hours - critical
};
}
private int CalculateBlastRadius(DriftItem drift, DependencyGraph graph)
{
var dependents = graph.GetDependents(drift.ComponentId);
return dependents.Count switch
{
0 => 10,
< 3 => 30,
< 10 => 60,
< 25 => 80,
_ => 100
};
}
}
```
---
## Safety Mechanisms
### 1. Rate Limiting
```csharp
public sealed class RemediationRateLimiter
{
public async Task<RateLimitResult> CheckAsync(
RemediationPolicy policy,
int requestedCount,
CancellationToken ct)
{
var hourlyCount = await GetHourlyRemediationCountAsync(policy.Id, ct);
var dailyCount = await GetDailyRemediationCountAsync(policy.Id, ct);
if (hourlyCount + requestedCount > policy.MaxRemediationsPerHour)
{
return RateLimitResult.Exceeded(
$"Hourly limit exceeded: {hourlyCount}/{policy.MaxRemediationsPerHour}");
}
var lastRemediation = await GetLastRemediationAsync(policy.Id, ct);
if (lastRemediation != null)
{
var timeSinceLast = _timeProvider.GetUtcNow() - lastRemediation.CompletedAt;
if (timeSinceLast < policy.CooldownPeriod)
{
return RateLimitResult.Cooldown(policy.CooldownPeriod - timeSinceLast);
}
}
return RateLimitResult.Allowed(requestedCount);
}
}
```
### 2. Blast Radius Control
```csharp
// Maximum percentage of targets that can be remediated in one operation
public const int MaxTargetPercentage = 25;
// Never remediate more than this many targets at once
public const int AbsoluteMaxTargets = 10;
// Minimum healthy targets required before remediation
public const double MinHealthyPercentage = 0.75;
```
### 3. Circuit Breaker
```csharp
public sealed class RemediationCircuitBreaker
{
private int _consecutiveFailures;
private DateTimeOffset? _openedAt;
public bool IsOpen => _openedAt != null &&
(_timeProvider.GetUtcNow() - _openedAt.Value) < _config.OpenDuration;
public void RecordSuccess()
{
_consecutiveFailures = 0;
_openedAt = null;
}
public void RecordFailure()
{
_consecutiveFailures++;
if (_consecutiveFailures >= _config.FailureThreshold)
{
_openedAt = _timeProvider.GetUtcNow();
_logger.LogWarning("Remediation circuit breaker opened after {Failures} failures",
_consecutiveFailures);
}
}
}
```
---
## Metrics & Observability
### Prometheus Metrics
```
# Counters
stella_remediation_plans_total{environment, policy, status}
stella_remediation_targets_total{environment, action, status}
stella_remediation_rate_limit_hits_total{policy}
# Histograms
stella_remediation_plan_duration_seconds{environment, strategy}
stella_remediation_target_duration_seconds{environment, action}
stella_remediation_detection_to_action_seconds{environment, severity}
# Gauges
stella_drift_items_pending_remediation{environment, severity}
stella_remediation_circuit_breaker_open{policy}
```
### Structured Logging
```json
{
"event": "remediation.target.completed",
"plan_id": "abc-123",
"target_id": "target-456",
"environment": "production",
"action": "reconcile",
"drift_type": "digest_mismatch",
"severity": "high",
"duration_ms": 4532,
"status": "succeeded",
"previous_digest": "sha256:abc...",
"current_digest": "sha256:def...",
"correlation_id": "xyz-789"
}
```
---
## Evidence Generation
Every remediation produces a sealed evidence packet:
```csharp
public sealed record RemediationEvidence
{
// What drifted
public ImmutableArray<DriftItem> DetectedDrift { get; init; }
public ImmutableArray<DriftSeverity> Severities { get; init; }
// Policy applied
public RemediationPolicy Policy { get; init; }
// Plan executed
public RemediationPlan Plan { get; init; }
// Results
public ImmutableArray<TargetRemediationResult> Results { get; init; }
// Who/when
public string InitiatedBy { get; init; } // "system:auto" or user ID
public DateTimeOffset InitiatedAt { get; init; }
public DateTimeOffset CompletedAt { get; init; }
// Artifacts
public ImmutableArray<string> GeneratedArtifacts { get; init; } // Compose files, scripts
}
```
---
## Configuration
### Default Policy Template
```yaml
name: "production-auto-remediation"
environment_id: "prod-001"
trigger: age_threshold
minimum_severity: high
minimum_drift_age: "00:15:00" # 15 minutes
maximum_drift_age: "24:00:00" # 24 hours, then escalate to manual
action: reconcile
strategy: rolling
safety:
max_concurrent_remediations: 2
max_remediations_per_hour: 10
cooldown_period: "00:05:00" # 5 minutes between remediations
schedule:
maintenance_window:
enabled: true
start: "02:00"
end: "06:00"
timezone: "UTC"
allowed_days: [monday, tuesday, wednesday, thursday, friday]
notifications:
on_plan_created: true
on_remediation_started: true
on_remediation_completed: true
on_remediation_failed: true
channels:
- type: slack
channel: "#ops-alerts"
- type: email
recipients: ["ops-team@example.com"]
```
---
## Test Strategy
### Unit Tests
- Severity scoring with various drift combinations
- Rate limiting logic
- Circuit breaker state transitions
- Policy evaluation with edge cases
### Integration Tests
- Full remediation flow: detect → plan → execute → verify
- Maintenance window enforcement
- Rate limit enforcement across multiple requests
- Evidence packet generation and signing
### Chaos Tests
- Agent failure during remediation
- Database unavailability during plan execution
- Concurrent remediation requests
- Clock skew handling
### Golden Tests
- Deterministic severity scores for fixed inputs
- Deterministic plan generation for fixed drift reports
- Evidence packet structure validation
---
## Migration Path
### Phase 1: Foundation (Week 1-2)
- Severity scoring service
- Remediation policy model and store
- Basic API endpoints
### Phase 2: Engine (Week 3-4)
- Remediation engine implementation
- Plan creation and execution
- Target remediation logic
### Phase 3: Safety (Week 5)
- Rate limiting
- Circuit breaker
- Blast radius controls
### Phase 4: Scheduling (Week 6)
- Maintenance window support
- Scheduled reconciliation
- Age-based escalation
### Phase 5: Observability (Week 7)
- Metrics emission
- Evidence generation
- Alert integration
### Phase 6: UI & Polish (Week 8)
- Web console integration
- Real-time updates
- Policy management UI

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,951 @@
# Performance Optimizations
## Overview
Performance Optimizations transforms the Release Orchestrator into a high-performance system capable of handling enterprise-scale deployments. This enhancement provides parallel gate evaluation, bulk digest resolution, agent task batching, optimized database queries, and intelligent caching strategies.
This is a best-in-class implementation focused on reducing latency, increasing throughput, and ensuring the system scales efficiently under load.
---
## Design Principles
1. **Measure First**: Optimize based on profiling data, not assumptions
2. **Parallel by Default**: Concurrent execution where dependencies allow
3. **Cache Intelligently**: Cache at the right level with proper invalidation
4. **Batch Operations**: Reduce round-trips through batching
5. **Async Everything**: Non-blocking operations throughout
6. **Graceful Degradation**: Performance degrades linearly, not exponentially
---
## Architecture
### Component Overview
```
┌────────────────────────────────────────────────────────────────────────┐
│ Performance Optimization System │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ ┌───────────────────┐ ┌─────────────────┐ │
│ │ ParallelGate │ │ BulkDigestResolver│ │ QueryOptimizer │ │
│ │ Evaluator │ │ │ │ │ │
│ └──────────────────┘ └───────────────────┘ └─────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────┐ ┌───────────────────┐ ┌─────────────────┐ │
│ │ TaskBatcher │ │ CacheManager │ │ ConnectionPool │ │
│ │ │ │ │ │ │ │
│ └──────────────────┘ └───────────────────┘ └─────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────┐ ┌───────────────────┐ ┌─────────────────┐ │
│ │ Prefetcher │ │ IndexManager │ │ LoadBalancer │ │
│ │ │ │ │ │ │ │
│ └──────────────────┘ └───────────────────┘ └─────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────┘
```
### Key Components
#### 1. ParallelGateEvaluator
Evaluates multiple gates concurrently:
```csharp
public sealed class ParallelGateEvaluator
{
private readonly ImmutableArray<IGateEvaluator> _evaluators;
private readonly SemaphoreSlim _concurrencyLimiter;
private readonly IGateResultCache _cache;
public ParallelGateEvaluator(ParallelGateConfig config)
{
_concurrencyLimiter = new SemaphoreSlim(config.MaxConcurrentEvaluations);
}
public async Task<GateEvaluationResult> EvaluateAllAsync(
PromotionContext context,
IReadOnlyList<GateDefinition> gates,
CancellationToken ct)
{
var result = new GateEvaluationResult
{
PromotionId = context.PromotionId,
StartedAt = _timeProvider.GetUtcNow()
};
// Group gates by dependency
var executionPlan = BuildExecutionPlan(gates);
foreach (var stage in executionPlan.Stages)
{
// Execute all gates in this stage concurrently
var stageTasks = stage.Gates.Select(async gate =>
{
await _concurrencyLimiter.WaitAsync(ct);
try
{
return await EvaluateSingleGateAsync(gate, context, ct);
}
finally
{
_concurrencyLimiter.Release();
}
});
var stageResults = await Task.WhenAll(stageTasks);
result.GateResults.AddRange(stageResults);
// Check for failures that should stop evaluation
var failures = stageResults.Where(r => r.Status == GateStatus.Failed && r.Gate.StopOnFailure);
if (failures.Any())
{
result.Status = GateEvaluationStatus.Failed;
result.FailedGates = failures.Select(f => f.Gate.Id).ToImmutableArray();
break;
}
}
result.CompletedAt = _timeProvider.GetUtcNow();
return result;
}
private async Task<SingleGateResult> EvaluateSingleGateAsync(
GateDefinition gate,
PromotionContext context,
CancellationToken ct)
{
// Check cache first
var cacheKey = BuildCacheKey(gate, context);
var cached = await _cache.GetAsync(cacheKey, ct);
if (cached != null && !IsExpired(cached, gate.CacheTtl))
{
return cached with { FromCache = true };
}
// Evaluate
var evaluator = _evaluators.First(e => e.CanEvaluate(gate.Type));
var sw = Stopwatch.StartNew();
try
{
var result = await evaluator.EvaluateAsync(gate, context, ct);
sw.Stop();
result = result with
{
EvaluationDuration = sw.Elapsed,
EvaluatedAt = _timeProvider.GetUtcNow()
};
// Cache result
await _cache.SetAsync(cacheKey, result, gate.CacheTtl, ct);
return result;
}
catch (Exception ex)
{
return new SingleGateResult
{
GateId = gate.Id,
Status = GateStatus.Error,
Error = ex.Message,
EvaluationDuration = sw.Elapsed
};
}
}
private GateExecutionPlan BuildExecutionPlan(IReadOnlyList<GateDefinition> gates)
{
var plan = new GateExecutionPlan();
var remaining = gates.ToList();
var completed = new HashSet<Guid>();
while (remaining.Any())
{
// Find gates with all dependencies satisfied
var ready = remaining
.Where(g => g.DependsOn.All(d => completed.Contains(d)))
.ToList();
if (!ready.Any())
{
throw new CircularDependencyException(remaining.Select(g => g.Id));
}
plan.Stages.Add(new GateExecutionStage { Gates = ready.ToImmutableArray() });
foreach (var gate in ready)
{
completed.Add(gate.Id);
remaining.Remove(gate);
}
}
return plan;
}
}
```
#### 2. BulkDigestResolver
Resolves multiple image digests in parallel:
```csharp
public sealed class BulkDigestResolver
{
private readonly IRegistryClientPool _clientPool;
private readonly IDigestCache _cache;
private readonly int _maxConcurrency;
public async Task<IReadOnlyDictionary<string, string>> ResolveAllAsync(
IReadOnlyList<ImageReference> images,
CancellationToken ct)
{
var results = new ConcurrentDictionary<string, string>();
// Check cache first
var uncached = new List<ImageReference>();
foreach (var image in images)
{
var cached = await _cache.GetAsync(image.FullReference, ct);
if (cached != null)
{
results[image.FullReference] = cached;
}
else
{
uncached.Add(image);
}
}
if (!uncached.Any())
{
return results.ToImmutableDictionary();
}
// Group by registry for connection reuse
var byRegistry = uncached.GroupBy(i => i.Registry);
await Parallel.ForEachAsync(
byRegistry,
new ParallelOptions { MaxDegreeOfParallelism = _maxConcurrency, CancellationToken = ct },
async (group, ct) =>
{
var client = await _clientPool.GetClientAsync(group.Key, ct);
try
{
// Batch resolve for this registry
var digests = await client.ResolveDigestsAsync(
group.Select(i => (i.Repository, i.Tag)).ToList(), ct);
foreach (var (image, digest) in group.Zip(digests))
{
results[image.FullReference] = digest;
await _cache.SetAsync(image.FullReference, digest, _cacheTtl, ct);
}
}
finally
{
_clientPool.ReturnClient(client);
}
});
return results.ToImmutableDictionary();
}
}
public interface IRegistryClient
{
// Single resolution
Task<string> ResolveDigestAsync(string repository, string tag, CancellationToken ct);
// Batch resolution (more efficient)
Task<IReadOnlyList<string>> ResolveDigestsAsync(
IReadOnlyList<(string Repository, string Tag)> images,
CancellationToken ct);
}
```
#### 3. TaskBatcher
Batches agent tasks for efficiency:
```csharp
public sealed class TaskBatcher
{
private readonly ConcurrentDictionary<Guid, TaskBatch> _batches = new();
private readonly TimeSpan _batchWindow;
private readonly int _maxBatchSize;
public async Task<Guid> EnqueueAsync(
AgentTask task,
CancellationToken ct)
{
var agentId = task.TargetAgentId;
// Get or create batch for this agent
var batch = _batches.GetOrAdd(agentId, _ => new TaskBatch
{
AgentId = agentId,
CreatedAt = _timeProvider.GetUtcNow(),
Tasks = new ConcurrentBag<AgentTask>()
});
batch.Tasks.Add(task);
// Check if batch should be sent
if (ShouldFlushBatch(batch))
{
await FlushBatchAsync(agentId, ct);
}
return batch.Id;
}
private bool ShouldFlushBatch(TaskBatch batch)
{
// Flush if max size reached
if (batch.Tasks.Count >= _maxBatchSize)
return true;
// Flush if batch window expired
if (_timeProvider.GetUtcNow() - batch.CreatedAt >= _batchWindow)
return true;
// Flush if high-priority task added
if (batch.Tasks.Any(t => t.Priority == TaskPriority.Immediate))
return true;
return false;
}
private async Task FlushBatchAsync(Guid agentId, CancellationToken ct)
{
if (!_batches.TryRemove(agentId, out var batch))
return;
var tasks = batch.Tasks.ToArray();
if (!tasks.Any())
return;
_logger.LogDebug(
"Flushing batch of {Count} tasks to agent {AgentId}",
tasks.Length, agentId);
// Group tasks by type for optimized execution
var grouped = tasks.GroupBy(t => t.TaskType);
foreach (var group in grouped)
{
var batchedPayload = CreateBatchedPayload(group.ToList());
await _agentClient.SendBatchAsync(agentId, batchedPayload, ct);
}
}
private BatchedTaskPayload CreateBatchedPayload(IReadOnlyList<AgentTask> tasks)
{
// Optimize payload based on task type
return tasks.First().TaskType switch
{
TaskType.Deploy => CreateDeployBatch(tasks),
TaskType.HealthCheck => CreateHealthCheckBatch(tasks),
TaskType.WriteSticker => CreateStickerBatch(tasks),
_ => CreateGenericBatch(tasks)
};
}
private BatchedTaskPayload CreateDeployBatch(IReadOnlyList<AgentTask> tasks)
{
// Deduplicate image pulls
var uniqueImages = tasks
.SelectMany(t => t.Payload.Images)
.Distinct()
.ToList();
return new BatchedTaskPayload
{
Type = BatchType.Deploy,
Images = uniqueImages, // Pull once, deploy many
Tasks = tasks.Select(t => new SlimTaskPayload
{
TaskId = t.Id,
ContainerName = t.Payload.ContainerName,
ImageIndex = uniqueImages.IndexOf(t.Payload.Image)
}).ToImmutableArray()
};
}
}
```
#### 4. CacheManager
Multi-level caching with intelligent invalidation:
```csharp
public sealed class CacheManager
{
private readonly IMemoryCache _l1Cache; // In-process
private readonly IDistributedCache _l2Cache; // Redis
private readonly ICacheInvalidator _invalidator;
public async Task<T?> GetOrSetAsync<T>(
string key,
Func<CancellationToken, Task<T>> factory,
CacheOptions options,
CancellationToken ct) where T : class
{
// L1 check
if (_l1Cache.TryGetValue(key, out T? l1Value))
{
_metrics.RecordHit("l1");
return l1Value;
}
// L2 check
var l2Value = await _l2Cache.GetAsync<T>(key, ct);
if (l2Value != null)
{
_metrics.RecordHit("l2");
// Populate L1
_l1Cache.Set(key, l2Value, new MemoryCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = options.L1Ttl,
Size = EstimateSize(l2Value)
});
return l2Value;
}
// Cache miss - compute value
_metrics.RecordMiss();
var value = await factory(ct);
if (value != null)
{
// Set L1
_l1Cache.Set(key, value, new MemoryCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = options.L1Ttl,
Size = EstimateSize(value)
});
// Set L2
await _l2Cache.SetAsync(key, value, new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = options.L2Ttl
}, ct);
// Register for invalidation
if (options.InvalidationTags != null)
{
await _invalidator.RegisterAsync(key, options.InvalidationTags, ct);
}
}
return value;
}
public async Task InvalidateByTagAsync(string tag, CancellationToken ct)
{
var keys = await _invalidator.GetKeysByTagAsync(tag, ct);
foreach (var key in keys)
{
_l1Cache.Remove(key);
await _l2Cache.RemoveAsync(key, ct);
}
await _invalidator.UnregisterTagAsync(tag, ct);
}
}
public sealed record CacheOptions
{
public TimeSpan L1Ttl { get; init; } = TimeSpan.FromMinutes(5);
public TimeSpan L2Ttl { get; init; } = TimeSpan.FromHours(1);
public ImmutableArray<string>? InvalidationTags { get; init; }
public bool AllowStale { get; init; }
}
```
#### 5. QueryOptimizer
Optimizes database queries:
```csharp
public sealed class QueryOptimizer
{
public async Task<IReadOnlyList<Release>> GetReleasesOptimizedAsync(
ReleaseQuery query,
CancellationToken ct)
{
// Build optimized query
var sql = new StringBuilder();
sql.AppendLine(@"
SELECT r.*,
c.name as component_name, c.digest as component_digest,
e.name as env_name, e.status as env_status
FROM releases r");
// Use indexed join strategy based on query
if (query.EnvironmentId.HasValue)
{
// Use environment index
sql.AppendLine(@"
INNER JOIN release_environments re ON r.id = re.release_id
AND re.environment_id = @EnvironmentId");
}
sql.AppendLine(@"
LEFT JOIN release_components c ON r.id = c.release_id
LEFT JOIN environments e ON r.current_environment_id = e.id
WHERE r.tenant_id = @TenantId");
// Apply filters with index hints
if (query.Status.HasValue)
{
sql.AppendLine("AND r.status = @Status"); // Uses idx_releases_status
}
if (query.CreatedAfter.HasValue)
{
sql.AppendLine("AND r.created_at >= @CreatedAfter"); // Uses idx_releases_created
}
// Optimized ordering
sql.AppendLine("ORDER BY r.created_at DESC");
// Pagination with keyset (faster than OFFSET)
if (query.Cursor != null)
{
sql.AppendLine("AND r.created_at < @CursorCreatedAt");
sql.AppendLine("AND r.id < @CursorId");
}
sql.AppendLine("LIMIT @Limit");
// Execute with read replica if available
var connection = query.AllowStale
? await _connectionPool.GetReadReplicaAsync(ct)
: await _connectionPool.GetPrimaryAsync(ct);
return await connection.QueryAsync<Release>(sql.ToString(), query, ct);
}
public void EnsureIndexes()
{
// Ensure critical indexes exist
var requiredIndexes = new[]
{
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_releases_tenant_status ON releases(tenant_id, status)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_releases_tenant_created ON releases(tenant_id, created_at DESC)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_releases_env ON releases(current_environment_id) WHERE current_environment_id IS NOT NULL",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_components_release ON release_components(release_id)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_deployments_release ON deployments(release_id, created_at DESC)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_promotions_release ON promotions(release_id, status)",
"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_evidence_subject ON evidence_packets(subject_id, subject_type)"
};
foreach (var index in requiredIndexes)
{
_migrationRunner.EnsureIndex(index);
}
}
}
```
#### 6. Prefetcher
Proactively loads data:
```csharp
public sealed class Prefetcher
{
public async Task PrefetchForPromotionAsync(
Guid releaseId,
Guid targetEnvironmentId,
CancellationToken ct)
{
// Prefetch in parallel
var tasks = new List<Task>
{
// Release and components
_releaseCache.WarmAsync(releaseId, ct),
// Target environment
_environmentCache.WarmAsync(targetEnvironmentId, ct),
// Gates for this environment
_gateCache.WarmForEnvironmentAsync(targetEnvironmentId, ct),
// Recent scan results
_scanCache.WarmForReleaseAsync(releaseId, ct),
// Approval policies
_policyCache.WarmForEnvironmentAsync(targetEnvironmentId, ct),
// Available agents
_agentCache.WarmForEnvironmentAsync(targetEnvironmentId, ct)
};
await Task.WhenAll(tasks);
}
public async Task PrefetchForDashboardAsync(
Guid tenantId,
CancellationToken ct)
{
// Predictive prefetch based on user behavior
var recentQueries = await _queryHistoryStore.GetRecentAsync(tenantId, ct);
var predictedQueries = _predictor.Predict(recentQueries);
foreach (var query in predictedQueries.Take(10))
{
_ = ExecuteAndCacheAsync(query, ct); // Fire and forget
}
}
}
```
#### 7. ConnectionPool
Optimized connection management:
```csharp
public sealed class ConnectionPool
{
private readonly ObjectPool<NpgsqlConnection> _primaryPool;
private readonly ObjectPool<NpgsqlConnection> _replicaPool;
private readonly ILoadBalancer _replicaBalancer;
public async Task<PooledConnection> GetPrimaryAsync(CancellationToken ct)
{
var connection = _primaryPool.Get();
if (connection.State != ConnectionState.Open)
{
await connection.OpenAsync(ct);
}
return new PooledConnection(connection, () => _primaryPool.Return(connection));
}
public async Task<PooledConnection> GetReadReplicaAsync(CancellationToken ct)
{
// Select replica based on load
var replica = _replicaBalancer.SelectReplica();
var connection = _replicaPool.Get();
connection.ConnectionString = replica.ConnectionString;
if (connection.State != ConnectionState.Open)
{
await connection.OpenAsync(ct);
}
return new PooledConnection(connection, () => _replicaPool.Return(connection));
}
public void WarmPool()
{
// Pre-create connections
Parallel.For(0, _config.MinPoolSize, _ =>
{
var connection = new NpgsqlConnection(_config.ConnectionString);
connection.Open();
_primaryPool.Return(connection);
});
}
}
public sealed class PooledConnection : IAsyncDisposable
{
private readonly NpgsqlConnection _connection;
private readonly Action _returnAction;
public PooledConnection(NpgsqlConnection connection, Action returnAction)
{
_connection = connection;
_returnAction = returnAction;
}
public NpgsqlConnection Connection => _connection;
public async ValueTask DisposeAsync()
{
_returnAction();
}
}
```
---
## Performance Benchmarks
### Target Metrics
| Operation | Current | Target | Optimization |
|-----------|---------|--------|--------------|
| Gate evaluation (5 gates) | 5s (sequential) | 1.5s (parallel) | ParallelGateEvaluator |
| Digest resolution (10 images) | 10s | 2s | BulkDigestResolver |
| Promotion creation | 500ms | 100ms | Prefetching |
| Dashboard load | 2s | 500ms | Caching + Query optimization |
| Deployment start | 3s | 500ms | Task batching |
| Agent task throughput | 100/s | 1000/s | Connection pooling |
### Load Test Scenarios
```csharp
public sealed class PerformanceTests
{
[Fact]
public async Task Gate_Evaluation_Should_Complete_Under_Target()
{
// Arrange
var gates = CreateGates(count: 10);
var context = CreatePromotionContext();
// Act
var sw = Stopwatch.StartNew();
var result = await _evaluator.EvaluateAllAsync(context, gates, CancellationToken.None);
sw.Stop();
// Assert
Assert.True(sw.Elapsed < TimeSpan.FromSeconds(2));
Assert.Equal(GateEvaluationStatus.Succeeded, result.Status);
}
[Fact]
public async Task Concurrent_Promotions_Should_Scale_Linearly()
{
// Test with 1, 10, 50, 100 concurrent promotions
var results = new List<(int Count, TimeSpan Duration)>();
foreach (var count in new[] { 1, 10, 50, 100 })
{
var promotions = Enumerable.Range(0, count)
.Select(_ => CreatePromotionRequest())
.ToList();
var sw = Stopwatch.StartNew();
await Task.WhenAll(promotions.Select(p =>
_promotionService.CreateAsync(p, CancellationToken.None)));
sw.Stop();
results.Add((count, sw.Elapsed));
}
// Assert linear scaling (within 2x factor)
var baseline = results[0].Duration.TotalMilliseconds;
foreach (var (count, duration) in results.Skip(1))
{
var expectedMax = baseline * count * 2;
Assert.True(duration.TotalMilliseconds < expectedMax,
$"Count {count}: {duration.TotalMilliseconds}ms exceeded {expectedMax}ms");
}
}
}
```
---
## Configuration
### Performance Tuning Options
```yaml
performance:
# Gate evaluation
gates:
max_concurrent_evaluations: 10
evaluation_timeout: "00:00:30"
cache_ttl: "00:05:00"
# Digest resolution
digest_resolution:
max_concurrent_registries: 5
max_concurrent_per_registry: 10
cache_ttl: "01:00:00"
timeout: "00:00:30"
# Task batching
task_batching:
enabled: true
batch_window: "00:00:01"
max_batch_size: 50
# Caching
cache:
l1:
enabled: true
max_size_mb: 256
default_ttl: "00:05:00"
l2:
enabled: true
provider: redis
connection_string: "redis://localhost:6379"
default_ttl: "01:00:00"
# Database
database:
primary:
min_pool_size: 10
max_pool_size: 100
connection_timeout: "00:00:05"
read_replicas:
enabled: true
hosts:
- host: replica1.db.local
weight: 50
- host: replica2.db.local
weight: 50
load_balancing: round_robin
# Prefetching
prefetch:
enabled: true
promotion_warmup: true
dashboard_prediction: true
prediction_depth: 10
# Connection pooling
http_client:
max_connections_per_host: 100
connection_lifetime: "00:05:00"
keep_alive_timeout: "00:00:30"
# gRPC
grpc:
max_concurrent_streams: 100
keepalive_time: "00:01:00"
keepalive_timeout: "00:00:20"
```
---
## Metrics & Observability
### Prometheus Metrics
```
# Latency histograms
stella_gate_evaluation_duration_seconds{gate_type}
stella_digest_resolution_duration_seconds{registry}
stella_promotion_creation_duration_seconds
stella_deployment_start_duration_seconds
# Cache metrics
stella_cache_hits_total{level, cache}
stella_cache_misses_total{cache}
stella_cache_size_bytes{level, cache}
stella_cache_evictions_total{cache, reason}
# Connection pools
stella_connection_pool_size{pool}
stella_connection_pool_active{pool}
stella_connection_pool_wait_seconds{pool}
# Batching
stella_batch_size{operation}
stella_batch_flush_total{operation, reason}
stella_batch_latency_seconds{operation}
# Query performance
stella_query_duration_seconds{query_type}
stella_query_rows_returned{query_type}
stella_index_scan_total{table, index}
# Throughput
stella_operations_per_second{operation}
stella_concurrent_operations{operation}
```
---
## API Design
### Performance-Optimized Endpoints
```
# Batch operations
POST /api/v1/batch/digests # Bulk digest resolution
POST /api/v1/batch/releases # Bulk release creation
POST /api/v1/batch/gates # Parallel gate evaluation
# Prefetch hints
POST /api/v1/prefetch/promotion # Warm cache for promotion
POST /api/v1/prefetch/dashboard # Warm cache for dashboard
# Cache management
DELETE /api/v1/cache/invalidate # Invalidate cache entries
GET /api/v1/cache/stats # Cache statistics
# Health & metrics
GET /api/v1/performance/stats # Performance statistics
GET /api/v1/performance/slow-queries # Recent slow queries
```
---
## Test Strategy
### Unit Tests
- Parallel evaluation logic
- Batch sizing algorithms
- Cache key generation
- Query optimization rules
### Integration Tests
- Full parallel gate flow
- Cache hit/miss scenarios
- Connection pool behavior
- Batch flush triggers
### Performance Tests
- Load testing with concurrent users
- Throughput benchmarks
- Latency percentiles
- Memory usage under load
### Chaos Tests
- Cache failure scenarios
- Database failover
- Connection pool exhaustion
---
## Migration Path
### Phase 1: Measurement (Week 1)
- Add performance metrics
- Establish baselines
- Identify bottlenecks
### Phase 2: Parallel Gates (Week 2-3)
- ParallelGateEvaluator
- Execution plan builder
- Gate result caching
### Phase 3: Bulk Operations (Week 4-5)
- BulkDigestResolver
- Task batching
- Batch optimization
### Phase 4: Caching (Week 6-7)
- Multi-level cache
- Cache invalidation
- Prefetching
### Phase 5: Database (Week 8-9)
- Query optimization
- Index tuning
- Connection pooling
- Read replicas
### Phase 6: Tuning (Week 10)
- Load testing
- Parameter tuning
- Documentation

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff