# Agent Resilience ## Overview Agent Resilience transforms the deployment agent architecture into a highly available, fault-tolerant system. This enhancement provides agent clustering for high availability, automatic failover during deployments, offline task queuing, and self-healing capabilities. This is a best-in-class implementation that ensures deployments complete successfully even when individual agents fail, network partitions occur, or agents need maintenance. --- ## Design Principles 1. **Zero Downtime Deployments**: Agent failures don't block deployments 2. **Automatic Recovery**: Self-healing without operator intervention 3. **Graceful Degradation**: Reduced capacity vs. complete failure 4. **Offline Resilience**: Queue tasks for disconnected agents 5. **Transparent Failover**: Seamless handoff between agents 6. **Predictable Behavior**: Deterministic failover decisions --- ## Architecture ### Component Overview ``` ┌────────────────────────────────────────────────────────────────────────┐ │ Agent Resilience System │ ├────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────────┐ ┌───────────────────┐ ┌─────────────────┐ │ │ │ AgentCluster │───▶│ FailoverManager │───▶│ TaskRouter │ │ │ │ Manager │ │ │ │ │ │ │ └──────────────────┘ └───────────────────┘ └─────────────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌──────────────────┐ ┌───────────────────┐ ┌─────────────────┐ │ │ │ HealthMonitor │ │ LeaderElection │ │ TaskQueue │ │ │ │ │ │ │ │ │ │ │ └──────────────────┘ └───────────────────┘ └─────────────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌──────────────────┐ ┌───────────────────┐ ┌─────────────────┐ │ │ │ SelfHealer │ │ StateSync │ │ RetryManager │ │ │ │ │ │ │ │ │ │ │ └──────────────────┘ └───────────────────┘ └─────────────────┘ │ │ │ └────────────────────────────────────────────────────────────────────────┘ ``` ### Key Components #### 1. AgentClusterManager Manages agent clusters for high availability: ```csharp public sealed class AgentClusterManager { public async Task CreateClusterAsync( AgentClusterConfig config, CancellationToken ct) { var cluster = new AgentCluster { Id = Guid.NewGuid(), Name = config.Name, TargetGroupId = config.TargetGroupId, MinimumAgents = config.MinimumAgents, DesiredAgents = config.DesiredAgents, ReplicationMode = config.ReplicationMode, FailoverPolicy = config.FailoverPolicy, CreatedAt = _timeProvider.GetUtcNow() }; await _clusterStore.SaveAsync(cluster, ct); return cluster; } public async Task> GetClusterMembersAsync( Guid clusterId, CancellationToken ct) { var cluster = await _clusterStore.GetAsync(clusterId, ct); var agents = await _agentStore.GetByClusterAsync(clusterId, ct); return agents.Select(a => new AgentMember { AgentId = a.Id, HostName = a.HostName, Status = a.Status, Role = DetermineRole(a, cluster), LastHeartbeat = a.LastHeartbeat, Capabilities = a.Capabilities, CurrentLoad = a.CurrentTaskCount, MaxLoad = a.MaxConcurrentTasks }).ToList(); } private AgentRole DetermineRole(Agent agent, AgentCluster cluster) { if (cluster.LeaderId == agent.Id) return AgentRole.Leader; if (cluster.StandbyIds.Contains(agent.Id)) return AgentRole.Standby; return AgentRole.Member; } } public sealed record AgentCluster { public Guid Id { get; init; } public string Name { get; init; } public Guid TargetGroupId { get; init; } // Membership public int MinimumAgents { get; init; } public int DesiredAgents { get; init; } public Guid? LeaderId { get; init; } public ImmutableArray StandbyIds { get; init; } // Configuration public ReplicationMode ReplicationMode { get; init; } public FailoverPolicy FailoverPolicy { get; init; } // Status public ClusterStatus Status { get; init; } public int HealthyAgentCount { get; init; } public DateTimeOffset CreatedAt { get; init; } } public enum ReplicationMode { ActivePassive, // One active, others standby ActiveActive, // All agents handle tasks Sharded // Tasks partitioned across agents } public enum AgentRole { Leader, // Primary agent (ActivePassive mode) Standby, // Ready to take over Member // Active participant (ActiveActive mode) } ``` #### 2. HealthMonitor Monitors agent health with sophisticated detection: ```csharp public sealed class HealthMonitor { private readonly ConcurrentDictionary _healthStates = new(); public async Task ProcessHeartbeatAsync( AgentHeartbeat heartbeat, CancellationToken ct) { var state = _healthStates.GetOrAdd(heartbeat.AgentId, _ => new AgentHealthState()); state.LastHeartbeat = heartbeat.Timestamp; state.ReportedHealth = heartbeat.Health; state.CurrentLoad = heartbeat.TaskCount; state.ResourceMetrics = heartbeat.ResourceMetrics; // Update health assessment state.AssessedHealth = await AssessHealthAsync(heartbeat, state, ct); // Check for degradation if (state.AssessedHealth < HealthLevel.Healthy) { await HandleDegradationAsync(heartbeat.AgentId, state, ct); } // Emit metrics _metricsEmitter.EmitAgentHealth(heartbeat.AgentId, state); } private async Task AssessHealthAsync( AgentHeartbeat heartbeat, AgentHealthState state, CancellationToken ct) { var factors = new List(); // 1. Self-reported health factors.Add(new HealthFactor("self_reported", heartbeat.Health, 0.2)); // 2. Heartbeat regularity var heartbeatScore = CalculateHeartbeatScore(state); factors.Add(new HealthFactor("heartbeat_regularity", heartbeatScore, 0.3)); // 3. Task completion rate var completionRate = await GetTaskCompletionRateAsync(heartbeat.AgentId, ct); factors.Add(new HealthFactor("task_completion", completionRate, 0.25)); // 4. Resource utilization var resourceScore = CalculateResourceScore(heartbeat.ResourceMetrics); factors.Add(new HealthFactor("resource_utilization", resourceScore, 0.15)); // 5. Error rate var errorRate = await GetErrorRateAsync(heartbeat.AgentId, ct); factors.Add(new HealthFactor("error_rate", 1.0 - errorRate, 0.1)); // Weighted average var overallScore = factors.Sum(f => f.Score * f.Weight); return overallScore switch { >= 0.9 => HealthLevel.Healthy, >= 0.7 => HealthLevel.Degraded, >= 0.5 => HealthLevel.Warning, >= 0.3 => HealthLevel.Critical, _ => HealthLevel.Failed }; } public async Task DetectFailuresAsync(CancellationToken ct) { var now = _timeProvider.GetUtcNow(); foreach (var (agentId, state) in _healthStates) { var timeSinceHeartbeat = now - state.LastHeartbeat; if (timeSinceHeartbeat > _config.FailureThreshold) { await HandleAgentFailureAsync(agentId, state, ct); } else if (timeSinceHeartbeat > _config.WarningThreshold) { await HandleAgentWarningAsync(agentId, state, ct); } } } private async Task HandleAgentFailureAsync( Guid agentId, AgentHealthState state, CancellationToken ct) { _logger.LogWarning("Agent {AgentId} detected as failed", agentId); // Update state state.AssessedHealth = HealthLevel.Failed; state.FailedAt = _timeProvider.GetUtcNow(); // Notify failover manager await _eventPublisher.PublishAsync(new AgentFailedEvent(agentId, state), ct); // Mark agent as offline await _agentStore.UpdateStatusAsync(agentId, AgentStatus.Offline, ct); } } public sealed class AgentHealthState { public DateTimeOffset LastHeartbeat { get; set; } public HealthLevel ReportedHealth { get; set; } public HealthLevel AssessedHealth { get; set; } public int CurrentLoad { get; set; } public ResourceMetrics ResourceMetrics { get; set; } public DateTimeOffset? FailedAt { get; set; } public int ConsecutiveFailures { get; set; } } public enum HealthLevel { Healthy = 100, Degraded = 75, Warning = 50, Critical = 25, Failed = 0 } ``` #### 3. FailoverManager Orchestrates failover between agents: ```csharp public sealed class FailoverManager { public async Task PerformFailoverAsync( FailoverRequest request, CancellationToken ct) { var result = new FailoverResult { RequestId = Guid.NewGuid(), FailedAgentId = request.FailedAgentId, StartedAt = _timeProvider.GetUtcNow() }; try { // 1. Find cluster var cluster = await _clusterStore.GetByAgentAsync(request.FailedAgentId, ct); if (cluster == null) { result.Status = FailoverStatus.NotInCluster; return result; } // 2. Select failover target var target = await SelectFailoverTargetAsync(cluster, request, ct); if (target == null) { result.Status = FailoverStatus.NoTargetAvailable; await HandleNoTargetAsync(cluster, request, ct); return result; } result.TargetAgentId = target.AgentId; // 3. Transfer in-flight tasks var tasksToTransfer = await GetInFlightTasksAsync(request.FailedAgentId, ct); result.TasksTransferred = tasksToTransfer.Count; foreach (var task in tasksToTransfer) { await TransferTaskAsync(task, target.AgentId, ct); } // 4. Update cluster membership if (cluster.LeaderId == request.FailedAgentId) { await PromoteToLeaderAsync(cluster, target.AgentId, ct); } // 5. Update target assignments await ReassignTargetsAsync(request.FailedAgentId, target.AgentId, ct); result.Status = FailoverStatus.Succeeded; result.CompletedAt = _timeProvider.GetUtcNow(); // Emit event await _eventPublisher.PublishAsync(new FailoverCompletedEvent(result), ct); } catch (Exception ex) { result.Status = FailoverStatus.Failed; result.Error = ex.Message; _logger.LogError(ex, "Failover failed for agent {AgentId}", request.FailedAgentId); } return result; } private async Task SelectFailoverTargetAsync( AgentCluster cluster, FailoverRequest request, CancellationToken ct) { var candidates = await _clusterManager.GetClusterMembersAsync(cluster.Id, ct); // Filter healthy agents candidates = candidates .Where(a => a.AgentId != request.FailedAgentId) .Where(a => a.Status == AgentStatus.Online) .Where(a => a.HasCapability(request.RequiredCapabilities)) .ToList(); if (!candidates.Any()) return null; // Apply selection strategy return cluster.FailoverPolicy.SelectionStrategy switch { FailoverSelectionStrategy.Standby => candidates.FirstOrDefault(a => a.Role == AgentRole.Standby) ?? candidates.OrderBy(a => a.CurrentLoad).First(), FailoverSelectionStrategy.LeastLoaded => candidates.OrderBy(a => a.CurrentLoad / (double)a.MaxLoad).First(), FailoverSelectionStrategy.RoundRobin => SelectRoundRobin(cluster, candidates), FailoverSelectionStrategy.Affinity => SelectByAffinity(candidates, request.AffinityHints), _ => candidates.First() }; } private async Task TransferTaskAsync( AgentTask task, Guid targetAgentId, CancellationToken ct) { // Mark task as transferred task.TransferredFrom = task.AssignedAgentId; task.AssignedAgentId = targetAgentId; task.TransferredAt = _timeProvider.GetUtcNow(); // Reset task state for retry if (task.Status == TaskStatus.Running) { task.Status = TaskStatus.Pending; task.RetryCount++; } await _taskStore.SaveAsync(task, ct); // Notify target agent await _agentNotifier.NotifyTaskAssignedAsync(targetAgentId, task, ct); } } public sealed record FailoverResult { public Guid RequestId { get; init; } public Guid FailedAgentId { get; init; } public Guid? TargetAgentId { get; init; } public FailoverStatus Status { get; init; } public int TasksTransferred { get; init; } public string? Error { get; init; } public DateTimeOffset StartedAt { get; init; } public DateTimeOffset? CompletedAt { get; init; } } public enum FailoverStatus { Succeeded, NotInCluster, NoTargetAvailable, Failed } ``` #### 4. LeaderElection Manages leader election for ActivePassive clusters: ```csharp public sealed class LeaderElection { private readonly IDistributedLockProvider _lockProvider; public async Task RunElectionAsync( Guid clusterId, CancellationToken ct) { var cluster = await _clusterStore.GetAsync(clusterId, ct); var members = await _clusterManager.GetClusterMembersAsync(clusterId, ct); var healthyMembers = members .Where(m => m.Status == AgentStatus.Online) .OrderByDescending(m => m.Role == AgentRole.Standby) // Prefer standbys .ThenBy(m => m.CurrentLoad) // Then least loaded .ToList(); if (!healthyMembers.Any()) { _logger.LogWarning("No healthy members for cluster {ClusterId}", clusterId); return; } // Acquire distributed lock for election await using var @lock = await _lockProvider.AcquireAsync( $"cluster:{clusterId}:election", ct); // Re-read cluster state under lock cluster = await _clusterStore.GetAsync(clusterId, ct); // Check if current leader is healthy var currentLeader = healthyMembers.FirstOrDefault(m => m.AgentId == cluster.LeaderId); if (currentLeader != null) { _logger.LogDebug("Current leader {LeaderId} is healthy", cluster.LeaderId); return; } // Elect new leader var newLeader = healthyMembers.First(); await PromoteToLeaderAsync(cluster, newLeader.AgentId, ct); _logger.LogInformation( "Elected new leader {NewLeaderId} for cluster {ClusterId}", newLeader.AgentId, clusterId); } private async Task PromoteToLeaderAsync( AgentCluster cluster, Guid newLeaderId, CancellationToken ct) { var previousLeaderId = cluster.LeaderId; // Update cluster cluster = cluster with { LeaderId = newLeaderId }; // Update standby list var newStandbys = cluster.StandbyIds .Where(id => id != newLeaderId) .ToImmutableArray(); if (previousLeaderId.HasValue) { // Demote previous leader to standby if still healthy var previousLeader = await _agentStore.GetAsync(previousLeaderId.Value, ct); if (previousLeader?.Status == AgentStatus.Online) { newStandbys = newStandbys.Add(previousLeaderId.Value); } } cluster = cluster with { StandbyIds = newStandbys }; await _clusterStore.SaveAsync(cluster, ct); // Notify agents await _agentNotifier.NotifyLeaderChangeAsync(cluster.Id, newLeaderId, ct); // Emit event await _eventPublisher.PublishAsync(new LeaderElectedEvent( cluster.Id, newLeaderId, previousLeaderId), ct); } } ``` #### 5. TaskQueue Durable task queue for offline agents: ```csharp public sealed class TaskQueue { private readonly ITaskQueueStore _store; public async Task EnqueueAsync( AgentTask task, EnqueueOptions options, CancellationToken ct) { var queuedTask = new QueuedTask { Id = Guid.NewGuid(), Task = task, Priority = options.Priority, EnqueuedAt = _timeProvider.GetUtcNow(), ExpiresAt = options.ExpiresAt, TargetAgentId = options.TargetAgentId, TargetClusterId = options.TargetClusterId, RequiredCapabilities = options.RequiredCapabilities, DeliveryAttempts = 0, MaxDeliveryAttempts = options.MaxDeliveryAttempts }; await _store.SaveAsync(queuedTask, ct); return queuedTask.Id; } public async Task DequeueAsync( Guid agentId, ImmutableArray capabilities, CancellationToken ct) { // Find eligible tasks var tasks = await _store.GetPendingTasksAsync(agentId, capabilities, ct); foreach (var task in tasks.OrderByDescending(t => t.Priority)) { // Check expiration if (task.ExpiresAt.HasValue && task.ExpiresAt < _timeProvider.GetUtcNow()) { await ExpireTaskAsync(task, ct); continue; } // Try to claim task var claimed = await _store.TryClaimAsync(task.Id, agentId, ct); if (claimed) { task.DeliveryAttempts++; task.LastAttemptAt = _timeProvider.GetUtcNow(); task.ClaimedBy = agentId; await _store.SaveAsync(task, ct); return task; } } return null; } public async Task CompleteAsync(Guid taskId, TaskResult result, CancellationToken ct) { var task = await _store.GetAsync(taskId, ct); if (task == null) return; task.CompletedAt = _timeProvider.GetUtcNow(); task.Result = result; task.Status = result.Success ? QueuedTaskStatus.Completed : QueuedTaskStatus.Failed; await _store.SaveAsync(task, ct); // Archive or retry if (task.Status == QueuedTaskStatus.Completed) { await _store.ArchiveAsync(taskId, ct); } else if (task.DeliveryAttempts < task.MaxDeliveryAttempts) { await RetryAsync(task, ct); } else { await _store.MoveToDeadLetterAsync(taskId, ct); } } private async Task RetryAsync(QueuedTask task, CancellationToken ct) { var delay = CalculateBackoff(task.DeliveryAttempts); task.Status = QueuedTaskStatus.Pending; task.ClaimedBy = null; task.NextAttemptAt = _timeProvider.GetUtcNow().Add(delay); await _store.SaveAsync(task, ct); } private TimeSpan CalculateBackoff(int attempts) { // Exponential backoff with jitter var baseDelay = TimeSpan.FromSeconds(Math.Pow(2, attempts)); var jitter = TimeSpan.FromMilliseconds(Random.Shared.Next(0, 1000)); var maxDelay = TimeSpan.FromMinutes(5); return Min(baseDelay + jitter, maxDelay); } } public sealed record QueuedTask { public Guid Id { get; init; } public AgentTask Task { get; init; } public TaskPriority Priority { get; init; } public QueuedTaskStatus Status { get; init; } // Targeting public Guid? TargetAgentId { get; init; } public Guid? TargetClusterId { get; init; } public ImmutableArray RequiredCapabilities { get; init; } // Timing public DateTimeOffset EnqueuedAt { get; init; } public DateTimeOffset? ExpiresAt { get; init; } public DateTimeOffset? NextAttemptAt { get; init; } public DateTimeOffset? CompletedAt { get; init; } // Delivery public int DeliveryAttempts { get; set; } public int MaxDeliveryAttempts { get; init; } public DateTimeOffset? LastAttemptAt { get; set; } public Guid? ClaimedBy { get; set; } // Result public TaskResult? Result { get; set; } } ``` #### 6. SelfHealer Automatic recovery and self-healing: ```csharp public sealed class SelfHealer { public async Task RunHealingCycleAsync(CancellationToken ct) { var healingActions = new List(); // 1. Detect unhealthy agents var unhealthyAgents = await DetectUnhealthyAgentsAsync(ct); foreach (var agent in unhealthyAgents) { var action = await DetermineHealingActionAsync(agent, ct); if (action != null) { healingActions.Add(action); } } // 2. Detect orphaned tasks var orphanedTasks = await DetectOrphanedTasksAsync(ct); foreach (var task in orphanedTasks) { healingActions.Add(new HealingAction { Type = HealingActionType.ReassignTask, TargetId = task.Id, Reason = "Task orphaned after agent failure" }); } // 3. Detect under-replicated clusters var underReplicatedClusters = await DetectUnderReplicatedClustersAsync(ct); foreach (var cluster in underReplicatedClusters) { healingActions.Add(new HealingAction { Type = HealingActionType.RebalanceCluster, TargetId = cluster.Id, Reason = $"Cluster has {cluster.HealthyAgentCount}/{cluster.DesiredAgents} agents" }); } // 4. Execute healing actions foreach (var action in healingActions.OrderByDescending(a => a.Priority)) { await ExecuteHealingActionAsync(action, ct); } } private async Task DetermineHealingActionAsync( Agent agent, CancellationToken ct) { var health = await _healthMonitor.GetHealthStateAsync(agent.Id, ct); return health.AssessedHealth switch { HealthLevel.Degraded => new HealingAction { Type = HealingActionType.DrainAgent, TargetId = agent.Id, Reason = "Agent degraded, draining tasks" }, HealthLevel.Warning => new HealingAction { Type = HealingActionType.ReduceLoad, TargetId = agent.Id, Reason = "Agent showing warnings, reducing load" }, HealthLevel.Critical or HealthLevel.Failed => new HealingAction { Type = HealingActionType.FailoverAgent, TargetId = agent.Id, Reason = $"Agent health critical: {health.AssessedHealth}" }, _ => null }; } private async Task ExecuteHealingActionAsync( HealingAction action, CancellationToken ct) { _logger.LogInformation( "Executing healing action {ActionType} on {TargetId}: {Reason}", action.Type, action.TargetId, action.Reason); switch (action.Type) { case HealingActionType.FailoverAgent: await _failoverManager.PerformFailoverAsync( new FailoverRequest { FailedAgentId = action.TargetId }, ct); break; case HealingActionType.DrainAgent: await DrainAgentAsync(action.TargetId, ct); break; case HealingActionType.ReduceLoad: await ReduceAgentLoadAsync(action.TargetId, ct); break; case HealingActionType.ReassignTask: await ReassignTaskAsync(action.TargetId, ct); break; case HealingActionType.RebalanceCluster: await RebalanceClusterAsync(action.TargetId, ct); break; } // Record healing action await _healingStore.RecordAsync(action, ct); } private async Task DrainAgentAsync(Guid agentId, CancellationToken ct) { // Stop accepting new tasks await _agentStore.UpdateStatusAsync(agentId, AgentStatus.Draining, ct); // Wait for in-flight tasks to complete (with timeout) var timeout = _timeProvider.GetUtcNow().AddMinutes(5); while (_timeProvider.GetUtcNow() < timeout) { var inFlightTasks = await _taskStore.GetInFlightTasksAsync(agentId, ct); if (!inFlightTasks.Any()) break; await Task.Delay(TimeSpan.FromSeconds(5), ct); } // Force transfer remaining tasks var remainingTasks = await _taskStore.GetInFlightTasksAsync(agentId, ct); foreach (var task in remainingTasks) { await _failoverManager.TransferTaskAsync(task, ct); } } } ``` #### 7. StateSync Synchronizes state across cluster members: ```csharp public sealed class StateSync { public async Task SyncClusterStateAsync( Guid clusterId, CancellationToken ct) { var cluster = await _clusterStore.GetAsync(clusterId, ct); var members = await _clusterManager.GetClusterMembersAsync(clusterId, ct); var leader = members.FirstOrDefault(m => m.Role == AgentRole.Leader); if (leader == null) { _logger.LogWarning("No leader for cluster {ClusterId}, skipping sync", clusterId); return; } // Get leader's state var leaderState = await GetAgentStateAsync(leader.AgentId, ct); // Sync to other members foreach (var member in members.Where(m => m.Role != AgentRole.Leader)) { await SyncToMemberAsync(member.AgentId, leaderState, ct); } } private async Task SyncToMemberAsync( Guid agentId, AgentState leaderState, CancellationToken ct) { var memberState = await GetAgentStateAsync(agentId, ct); var diff = CalculateStateDiff(leaderState, memberState); if (diff.HasChanges) { _logger.LogDebug( "Syncing {ChangeCount} changes to agent {AgentId}", diff.Changes.Count, agentId); await _agentNotifier.SendStateSyncAsync(agentId, diff, ct); } } } public sealed record AgentState { public Guid AgentId { get; init; } public DateTimeOffset CapturedAt { get; init; } // Target assignments public ImmutableArray AssignedTargets { get; init; } // Task state public ImmutableArray TaskStates { get; init; } // Configuration public AgentConfiguration Configuration { get; init; } // Cached data public ImmutableDictionary CachedDigests { get; init; } } ``` --- ## Cluster Topologies ### Active-Passive ``` ┌─────────────────────────────────────────┐ │ Agent Cluster │ │ │ │ ┌─────────┐ ┌─────────┐ │ │ │ LEADER │ │ STANDBY │ │ │ │ Agent A │ │ Agent B │ │ │ │ (Active)│ │(Passive)│ │ │ └────┬────┘ └────┬────┘ │ │ │ │ │ │ ▼ │ (failover) │ │ ┌─────────┐ │ │ │ │ Targets │◄────────┘ │ │ └─────────┘ │ └─────────────────────────────────────────┘ ``` ### Active-Active ``` ┌─────────────────────────────────────────┐ │ Agent Cluster │ │ │ │ ┌─────────┐ ┌─────────┐ │ │ │ Agent A │ │ Agent B │ │ │ │ (Active)│ │ (Active)│ │ │ └────┬────┘ └────┬────┘ │ │ │ │ │ │ └──────┬───────┘ │ │ ▼ │ │ ┌─────────────────────┐ │ │ │ Targets (balanced) │ │ │ └─────────────────────┘ │ └─────────────────────────────────────────┘ ``` ### Sharded ``` ┌─────────────────────────────────────────┐ │ Agent Cluster │ │ │ │ ┌─────────┐ ┌─────────┐ │ │ │ Agent A │ │ Agent B │ │ │ │ Shard 0 │ │ Shard 1 │ │ │ └────┬────┘ └────┬────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────┐ ┌─────────┐ │ │ │Targets │ │Targets │ │ │ │ 0-49 │ │ 50-99 │ │ │ └─────────┘ └─────────┘ │ └─────────────────────────────────────────┘ ``` --- ## API Design ### REST Endpoints ``` # Clusters POST /api/v1/agents/clusters # Create cluster GET /api/v1/agents/clusters # List clusters GET /api/v1/agents/clusters/{id} # Get cluster PUT /api/v1/agents/clusters/{id} # Update cluster DELETE /api/v1/agents/clusters/{id} # Delete cluster GET /api/v1/agents/clusters/{id}/members # Get members POST /api/v1/agents/clusters/{id}/rebalance # Trigger rebalance # Failover POST /api/v1/agents/{id}/failover # Manual failover GET /api/v1/agents/failovers # Failover history GET /api/v1/agents/failovers/{id} # Failover details # Health GET /api/v1/agents/{id}/health # Get agent health GET /api/v1/agents/clusters/{id}/health # Get cluster health # Task Queue GET /api/v1/agents/tasks/queue # View queue GET /api/v1/agents/tasks/queue/dead-letter # Dead letter queue POST /api/v1/agents/tasks/{id}/retry # Retry task # Self-Healing GET /api/v1/agents/healing/actions # Healing history GET /api/v1/agents/healing/status # Current healing status ``` --- ## Metrics & Observability ### Prometheus Metrics ``` # Cluster Health stella_agent_cluster_members{cluster_id, status} stella_agent_cluster_leader{cluster_id, agent_id} stella_agent_cluster_health{cluster_id} # Failover stella_agent_failovers_total{cluster_id, status} stella_agent_failover_duration_seconds{cluster_id} stella_agent_tasks_transferred_total{cluster_id} # Task Queue stella_agent_queue_depth{cluster_id, priority} stella_agent_queue_latency_seconds{cluster_id} stella_agent_dead_letter_queue_depth{cluster_id} # Self-Healing stella_agent_healing_actions_total{action_type, status} stella_agent_healing_cycle_duration_seconds # Agent Health stella_agent_health_score{agent_id} stella_agent_heartbeat_age_seconds{agent_id} stella_agent_task_completion_rate{agent_id} ``` --- ## Configuration ```yaml agent_cluster: name: "production-docker-agents" target_group_id: "prod-docker-hosts" membership: minimum_agents: 2 desired_agents: 3 max_agents: 5 replication_mode: active_active failover: selection_strategy: least_loaded task_transfer_timeout: "00:05:00" max_transfer_retries: 3 health_monitoring: heartbeat_interval: "00:00:30" warning_threshold: "00:01:00" failure_threshold: "00:01:30" health_check_interval: "00:00:10" task_queue: max_delivery_attempts: 3 default_expiration: "01:00:00" dead_letter_retention: "7.00:00:00" self_healing: enabled: true cycle_interval: "00:01:00" drain_timeout: "00:05:00" leader_election: enabled: true # For ActivePassive mode election_interval: "00:00:15" lease_duration: "00:00:30" ``` --- ## Test Strategy ### Unit Tests - Health score calculation - Failover target selection - Task queue operations - Backoff calculation ### Integration Tests - Full failover flow - Leader election - State synchronization - Task transfer ### Chaos Tests - Random agent failures - Network partitions - Split-brain scenarios - Cascading failures ### Load Tests - High task throughput - Many concurrent agents - Rapid failover cycles --- ## Migration Path ### Phase 1: Foundation (Week 1-2) - Cluster data model - Basic cluster management - Health monitoring enhancements ### Phase 2: Failover (Week 3-4) - Failover manager - Task transfer - Target reassignment ### Phase 3: Leader Election (Week 5-6) - Distributed lock integration - Election algorithm - ActivePassive support ### Phase 4: Task Queue (Week 7-8) - Durable queue implementation - Dead letter handling - Retry logic ### Phase 5: Self-Healing (Week 9-10) - Healing cycle - Automatic actions - Monitoring integration ### Phase 6: State Sync (Week 11-12) - State diffing - Sync protocol - Consistency verification