36 KiB
36 KiB
Agent Resilience
Overview
Agent Resilience transforms the deployment agent architecture into a highly available, fault-tolerant system. This enhancement provides agent clustering for high availability, automatic failover during deployments, offline task queuing, and self-healing capabilities.
This is a best-in-class implementation that ensures deployments complete successfully even when individual agents fail, network partitions occur, or agents need maintenance.
Design Principles
- Zero Downtime Deployments: Agent failures don't block deployments
- Automatic Recovery: Self-healing without operator intervention
- Graceful Degradation: Reduced capacity vs. complete failure
- Offline Resilience: Queue tasks for disconnected agents
- Transparent Failover: Seamless handoff between agents
- Predictable Behavior: Deterministic failover decisions
Architecture
Component Overview
┌────────────────────────────────────────────────────────────────────────┐
│ Agent Resilience System │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ ┌───────────────────┐ ┌─────────────────┐ │
│ │ AgentCluster │───▶│ FailoverManager │───▶│ TaskRouter │ │
│ │ Manager │ │ │ │ │ │
│ └──────────────────┘ └───────────────────┘ └─────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────┐ ┌───────────────────┐ ┌─────────────────┐ │
│ │ HealthMonitor │ │ LeaderElection │ │ TaskQueue │ │
│ │ │ │ │ │ │ │
│ └──────────────────┘ └───────────────────┘ └─────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────┐ ┌───────────────────┐ ┌─────────────────┐ │
│ │ SelfHealer │ │ StateSync │ │ RetryManager │ │
│ │ │ │ │ │ │ │
│ └──────────────────┘ └───────────────────┘ └─────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────┘
Key Components
1. AgentClusterManager
Manages agent clusters for high availability:
public sealed class AgentClusterManager
{
public async Task<AgentCluster> CreateClusterAsync(
AgentClusterConfig config,
CancellationToken ct)
{
var cluster = new AgentCluster
{
Id = Guid.NewGuid(),
Name = config.Name,
TargetGroupId = config.TargetGroupId,
MinimumAgents = config.MinimumAgents,
DesiredAgents = config.DesiredAgents,
ReplicationMode = config.ReplicationMode,
FailoverPolicy = config.FailoverPolicy,
CreatedAt = _timeProvider.GetUtcNow()
};
await _clusterStore.SaveAsync(cluster, ct);
return cluster;
}
public async Task<IReadOnlyList<AgentMember>> GetClusterMembersAsync(
Guid clusterId,
CancellationToken ct)
{
var cluster = await _clusterStore.GetAsync(clusterId, ct);
var agents = await _agentStore.GetByClusterAsync(clusterId, ct);
return agents.Select(a => new AgentMember
{
AgentId = a.Id,
HostName = a.HostName,
Status = a.Status,
Role = DetermineRole(a, cluster),
LastHeartbeat = a.LastHeartbeat,
Capabilities = a.Capabilities,
CurrentLoad = a.CurrentTaskCount,
MaxLoad = a.MaxConcurrentTasks
}).ToList();
}
private AgentRole DetermineRole(Agent agent, AgentCluster cluster)
{
if (cluster.LeaderId == agent.Id)
return AgentRole.Leader;
if (cluster.StandbyIds.Contains(agent.Id))
return AgentRole.Standby;
return AgentRole.Member;
}
}
public sealed record AgentCluster
{
public Guid Id { get; init; }
public string Name { get; init; }
public Guid TargetGroupId { get; init; }
// Membership
public int MinimumAgents { get; init; }
public int DesiredAgents { get; init; }
public Guid? LeaderId { get; init; }
public ImmutableArray<Guid> StandbyIds { get; init; }
// Configuration
public ReplicationMode ReplicationMode { get; init; }
public FailoverPolicy FailoverPolicy { get; init; }
// Status
public ClusterStatus Status { get; init; }
public int HealthyAgentCount { get; init; }
public DateTimeOffset CreatedAt { get; init; }
}
public enum ReplicationMode
{
ActivePassive, // One active, others standby
ActiveActive, // All agents handle tasks
Sharded // Tasks partitioned across agents
}
public enum AgentRole
{
Leader, // Primary agent (ActivePassive mode)
Standby, // Ready to take over
Member // Active participant (ActiveActive mode)
}
2. HealthMonitor
Monitors agent health with sophisticated detection:
public sealed class HealthMonitor
{
private readonly ConcurrentDictionary<Guid, AgentHealthState> _healthStates = new();
public async Task ProcessHeartbeatAsync(
AgentHeartbeat heartbeat,
CancellationToken ct)
{
var state = _healthStates.GetOrAdd(heartbeat.AgentId, _ => new AgentHealthState());
state.LastHeartbeat = heartbeat.Timestamp;
state.ReportedHealth = heartbeat.Health;
state.CurrentLoad = heartbeat.TaskCount;
state.ResourceMetrics = heartbeat.ResourceMetrics;
// Update health assessment
state.AssessedHealth = await AssessHealthAsync(heartbeat, state, ct);
// Check for degradation
if (state.AssessedHealth < HealthLevel.Healthy)
{
await HandleDegradationAsync(heartbeat.AgentId, state, ct);
}
// Emit metrics
_metricsEmitter.EmitAgentHealth(heartbeat.AgentId, state);
}
private async Task<HealthLevel> AssessHealthAsync(
AgentHeartbeat heartbeat,
AgentHealthState state,
CancellationToken ct)
{
var factors = new List<HealthFactor>();
// 1. Self-reported health
factors.Add(new HealthFactor("self_reported", heartbeat.Health, 0.2));
// 2. Heartbeat regularity
var heartbeatScore = CalculateHeartbeatScore(state);
factors.Add(new HealthFactor("heartbeat_regularity", heartbeatScore, 0.3));
// 3. Task completion rate
var completionRate = await GetTaskCompletionRateAsync(heartbeat.AgentId, ct);
factors.Add(new HealthFactor("task_completion", completionRate, 0.25));
// 4. Resource utilization
var resourceScore = CalculateResourceScore(heartbeat.ResourceMetrics);
factors.Add(new HealthFactor("resource_utilization", resourceScore, 0.15));
// 5. Error rate
var errorRate = await GetErrorRateAsync(heartbeat.AgentId, ct);
factors.Add(new HealthFactor("error_rate", 1.0 - errorRate, 0.1));
// Weighted average
var overallScore = factors.Sum(f => f.Score * f.Weight);
return overallScore switch
{
>= 0.9 => HealthLevel.Healthy,
>= 0.7 => HealthLevel.Degraded,
>= 0.5 => HealthLevel.Warning,
>= 0.3 => HealthLevel.Critical,
_ => HealthLevel.Failed
};
}
public async Task DetectFailuresAsync(CancellationToken ct)
{
var now = _timeProvider.GetUtcNow();
foreach (var (agentId, state) in _healthStates)
{
var timeSinceHeartbeat = now - state.LastHeartbeat;
if (timeSinceHeartbeat > _config.FailureThreshold)
{
await HandleAgentFailureAsync(agentId, state, ct);
}
else if (timeSinceHeartbeat > _config.WarningThreshold)
{
await HandleAgentWarningAsync(agentId, state, ct);
}
}
}
private async Task HandleAgentFailureAsync(
Guid agentId,
AgentHealthState state,
CancellationToken ct)
{
_logger.LogWarning("Agent {AgentId} detected as failed", agentId);
// Update state
state.AssessedHealth = HealthLevel.Failed;
state.FailedAt = _timeProvider.GetUtcNow();
// Notify failover manager
await _eventPublisher.PublishAsync(new AgentFailedEvent(agentId, state), ct);
// Mark agent as offline
await _agentStore.UpdateStatusAsync(agentId, AgentStatus.Offline, ct);
}
}
public sealed class AgentHealthState
{
public DateTimeOffset LastHeartbeat { get; set; }
public HealthLevel ReportedHealth { get; set; }
public HealthLevel AssessedHealth { get; set; }
public int CurrentLoad { get; set; }
public ResourceMetrics ResourceMetrics { get; set; }
public DateTimeOffset? FailedAt { get; set; }
public int ConsecutiveFailures { get; set; }
}
public enum HealthLevel
{
Healthy = 100,
Degraded = 75,
Warning = 50,
Critical = 25,
Failed = 0
}
3. FailoverManager
Orchestrates failover between agents:
public sealed class FailoverManager
{
public async Task<FailoverResult> PerformFailoverAsync(
FailoverRequest request,
CancellationToken ct)
{
var result = new FailoverResult
{
RequestId = Guid.NewGuid(),
FailedAgentId = request.FailedAgentId,
StartedAt = _timeProvider.GetUtcNow()
};
try
{
// 1. Find cluster
var cluster = await _clusterStore.GetByAgentAsync(request.FailedAgentId, ct);
if (cluster == null)
{
result.Status = FailoverStatus.NotInCluster;
return result;
}
// 2. Select failover target
var target = await SelectFailoverTargetAsync(cluster, request, ct);
if (target == null)
{
result.Status = FailoverStatus.NoTargetAvailable;
await HandleNoTargetAsync(cluster, request, ct);
return result;
}
result.TargetAgentId = target.AgentId;
// 3. Transfer in-flight tasks
var tasksToTransfer = await GetInFlightTasksAsync(request.FailedAgentId, ct);
result.TasksTransferred = tasksToTransfer.Count;
foreach (var task in tasksToTransfer)
{
await TransferTaskAsync(task, target.AgentId, ct);
}
// 4. Update cluster membership
if (cluster.LeaderId == request.FailedAgentId)
{
await PromoteToLeaderAsync(cluster, target.AgentId, ct);
}
// 5. Update target assignments
await ReassignTargetsAsync(request.FailedAgentId, target.AgentId, ct);
result.Status = FailoverStatus.Succeeded;
result.CompletedAt = _timeProvider.GetUtcNow();
// Emit event
await _eventPublisher.PublishAsync(new FailoverCompletedEvent(result), ct);
}
catch (Exception ex)
{
result.Status = FailoverStatus.Failed;
result.Error = ex.Message;
_logger.LogError(ex, "Failover failed for agent {AgentId}", request.FailedAgentId);
}
return result;
}
private async Task<AgentMember?> SelectFailoverTargetAsync(
AgentCluster cluster,
FailoverRequest request,
CancellationToken ct)
{
var candidates = await _clusterManager.GetClusterMembersAsync(cluster.Id, ct);
// Filter healthy agents
candidates = candidates
.Where(a => a.AgentId != request.FailedAgentId)
.Where(a => a.Status == AgentStatus.Online)
.Where(a => a.HasCapability(request.RequiredCapabilities))
.ToList();
if (!candidates.Any())
return null;
// Apply selection strategy
return cluster.FailoverPolicy.SelectionStrategy switch
{
FailoverSelectionStrategy.Standby =>
candidates.FirstOrDefault(a => a.Role == AgentRole.Standby) ??
candidates.OrderBy(a => a.CurrentLoad).First(),
FailoverSelectionStrategy.LeastLoaded =>
candidates.OrderBy(a => a.CurrentLoad / (double)a.MaxLoad).First(),
FailoverSelectionStrategy.RoundRobin =>
SelectRoundRobin(cluster, candidates),
FailoverSelectionStrategy.Affinity =>
SelectByAffinity(candidates, request.AffinityHints),
_ => candidates.First()
};
}
private async Task TransferTaskAsync(
AgentTask task,
Guid targetAgentId,
CancellationToken ct)
{
// Mark task as transferred
task.TransferredFrom = task.AssignedAgentId;
task.AssignedAgentId = targetAgentId;
task.TransferredAt = _timeProvider.GetUtcNow();
// Reset task state for retry
if (task.Status == TaskStatus.Running)
{
task.Status = TaskStatus.Pending;
task.RetryCount++;
}
await _taskStore.SaveAsync(task, ct);
// Notify target agent
await _agentNotifier.NotifyTaskAssignedAsync(targetAgentId, task, ct);
}
}
public sealed record FailoverResult
{
public Guid RequestId { get; init; }
public Guid FailedAgentId { get; init; }
public Guid? TargetAgentId { get; init; }
public FailoverStatus Status { get; init; }
public int TasksTransferred { get; init; }
public string? Error { get; init; }
public DateTimeOffset StartedAt { get; init; }
public DateTimeOffset? CompletedAt { get; init; }
}
public enum FailoverStatus
{
Succeeded,
NotInCluster,
NoTargetAvailable,
Failed
}
4. LeaderElection
Manages leader election for ActivePassive clusters:
public sealed class LeaderElection
{
private readonly IDistributedLockProvider _lockProvider;
public async Task RunElectionAsync(
Guid clusterId,
CancellationToken ct)
{
var cluster = await _clusterStore.GetAsync(clusterId, ct);
var members = await _clusterManager.GetClusterMembersAsync(clusterId, ct);
var healthyMembers = members
.Where(m => m.Status == AgentStatus.Online)
.OrderByDescending(m => m.Role == AgentRole.Standby) // Prefer standbys
.ThenBy(m => m.CurrentLoad) // Then least loaded
.ToList();
if (!healthyMembers.Any())
{
_logger.LogWarning("No healthy members for cluster {ClusterId}", clusterId);
return;
}
// Acquire distributed lock for election
await using var @lock = await _lockProvider.AcquireAsync(
$"cluster:{clusterId}:election", ct);
// Re-read cluster state under lock
cluster = await _clusterStore.GetAsync(clusterId, ct);
// Check if current leader is healthy
var currentLeader = healthyMembers.FirstOrDefault(m => m.AgentId == cluster.LeaderId);
if (currentLeader != null)
{
_logger.LogDebug("Current leader {LeaderId} is healthy", cluster.LeaderId);
return;
}
// Elect new leader
var newLeader = healthyMembers.First();
await PromoteToLeaderAsync(cluster, newLeader.AgentId, ct);
_logger.LogInformation(
"Elected new leader {NewLeaderId} for cluster {ClusterId}",
newLeader.AgentId, clusterId);
}
private async Task PromoteToLeaderAsync(
AgentCluster cluster,
Guid newLeaderId,
CancellationToken ct)
{
var previousLeaderId = cluster.LeaderId;
// Update cluster
cluster = cluster with { LeaderId = newLeaderId };
// Update standby list
var newStandbys = cluster.StandbyIds
.Where(id => id != newLeaderId)
.ToImmutableArray();
if (previousLeaderId.HasValue)
{
// Demote previous leader to standby if still healthy
var previousLeader = await _agentStore.GetAsync(previousLeaderId.Value, ct);
if (previousLeader?.Status == AgentStatus.Online)
{
newStandbys = newStandbys.Add(previousLeaderId.Value);
}
}
cluster = cluster with { StandbyIds = newStandbys };
await _clusterStore.SaveAsync(cluster, ct);
// Notify agents
await _agentNotifier.NotifyLeaderChangeAsync(cluster.Id, newLeaderId, ct);
// Emit event
await _eventPublisher.PublishAsync(new LeaderElectedEvent(
cluster.Id, newLeaderId, previousLeaderId), ct);
}
}
5. TaskQueue
Durable task queue for offline agents:
public sealed class TaskQueue
{
private readonly ITaskQueueStore _store;
public async Task<Guid> EnqueueAsync(
AgentTask task,
EnqueueOptions options,
CancellationToken ct)
{
var queuedTask = new QueuedTask
{
Id = Guid.NewGuid(),
Task = task,
Priority = options.Priority,
EnqueuedAt = _timeProvider.GetUtcNow(),
ExpiresAt = options.ExpiresAt,
TargetAgentId = options.TargetAgentId,
TargetClusterId = options.TargetClusterId,
RequiredCapabilities = options.RequiredCapabilities,
DeliveryAttempts = 0,
MaxDeliveryAttempts = options.MaxDeliveryAttempts
};
await _store.SaveAsync(queuedTask, ct);
return queuedTask.Id;
}
public async Task<QueuedTask?> DequeueAsync(
Guid agentId,
ImmutableArray<string> capabilities,
CancellationToken ct)
{
// Find eligible tasks
var tasks = await _store.GetPendingTasksAsync(agentId, capabilities, ct);
foreach (var task in tasks.OrderByDescending(t => t.Priority))
{
// Check expiration
if (task.ExpiresAt.HasValue && task.ExpiresAt < _timeProvider.GetUtcNow())
{
await ExpireTaskAsync(task, ct);
continue;
}
// Try to claim task
var claimed = await _store.TryClaimAsync(task.Id, agentId, ct);
if (claimed)
{
task.DeliveryAttempts++;
task.LastAttemptAt = _timeProvider.GetUtcNow();
task.ClaimedBy = agentId;
await _store.SaveAsync(task, ct);
return task;
}
}
return null;
}
public async Task CompleteAsync(Guid taskId, TaskResult result, CancellationToken ct)
{
var task = await _store.GetAsync(taskId, ct);
if (task == null)
return;
task.CompletedAt = _timeProvider.GetUtcNow();
task.Result = result;
task.Status = result.Success ? QueuedTaskStatus.Completed : QueuedTaskStatus.Failed;
await _store.SaveAsync(task, ct);
// Archive or retry
if (task.Status == QueuedTaskStatus.Completed)
{
await _store.ArchiveAsync(taskId, ct);
}
else if (task.DeliveryAttempts < task.MaxDeliveryAttempts)
{
await RetryAsync(task, ct);
}
else
{
await _store.MoveToDeadLetterAsync(taskId, ct);
}
}
private async Task RetryAsync(QueuedTask task, CancellationToken ct)
{
var delay = CalculateBackoff(task.DeliveryAttempts);
task.Status = QueuedTaskStatus.Pending;
task.ClaimedBy = null;
task.NextAttemptAt = _timeProvider.GetUtcNow().Add(delay);
await _store.SaveAsync(task, ct);
}
private TimeSpan CalculateBackoff(int attempts)
{
// Exponential backoff with jitter
var baseDelay = TimeSpan.FromSeconds(Math.Pow(2, attempts));
var jitter = TimeSpan.FromMilliseconds(Random.Shared.Next(0, 1000));
var maxDelay = TimeSpan.FromMinutes(5);
return Min(baseDelay + jitter, maxDelay);
}
}
public sealed record QueuedTask
{
public Guid Id { get; init; }
public AgentTask Task { get; init; }
public TaskPriority Priority { get; init; }
public QueuedTaskStatus Status { get; init; }
// Targeting
public Guid? TargetAgentId { get; init; }
public Guid? TargetClusterId { get; init; }
public ImmutableArray<string> RequiredCapabilities { get; init; }
// Timing
public DateTimeOffset EnqueuedAt { get; init; }
public DateTimeOffset? ExpiresAt { get; init; }
public DateTimeOffset? NextAttemptAt { get; init; }
public DateTimeOffset? CompletedAt { get; init; }
// Delivery
public int DeliveryAttempts { get; set; }
public int MaxDeliveryAttempts { get; init; }
public DateTimeOffset? LastAttemptAt { get; set; }
public Guid? ClaimedBy { get; set; }
// Result
public TaskResult? Result { get; set; }
}
6. SelfHealer
Automatic recovery and self-healing:
public sealed class SelfHealer
{
public async Task RunHealingCycleAsync(CancellationToken ct)
{
var healingActions = new List<HealingAction>();
// 1. Detect unhealthy agents
var unhealthyAgents = await DetectUnhealthyAgentsAsync(ct);
foreach (var agent in unhealthyAgents)
{
var action = await DetermineHealingActionAsync(agent, ct);
if (action != null)
{
healingActions.Add(action);
}
}
// 2. Detect orphaned tasks
var orphanedTasks = await DetectOrphanedTasksAsync(ct);
foreach (var task in orphanedTasks)
{
healingActions.Add(new HealingAction
{
Type = HealingActionType.ReassignTask,
TargetId = task.Id,
Reason = "Task orphaned after agent failure"
});
}
// 3. Detect under-replicated clusters
var underReplicatedClusters = await DetectUnderReplicatedClustersAsync(ct);
foreach (var cluster in underReplicatedClusters)
{
healingActions.Add(new HealingAction
{
Type = HealingActionType.RebalanceCluster,
TargetId = cluster.Id,
Reason = $"Cluster has {cluster.HealthyAgentCount}/{cluster.DesiredAgents} agents"
});
}
// 4. Execute healing actions
foreach (var action in healingActions.OrderByDescending(a => a.Priority))
{
await ExecuteHealingActionAsync(action, ct);
}
}
private async Task<HealingAction?> DetermineHealingActionAsync(
Agent agent,
CancellationToken ct)
{
var health = await _healthMonitor.GetHealthStateAsync(agent.Id, ct);
return health.AssessedHealth switch
{
HealthLevel.Degraded => new HealingAction
{
Type = HealingActionType.DrainAgent,
TargetId = agent.Id,
Reason = "Agent degraded, draining tasks"
},
HealthLevel.Warning => new HealingAction
{
Type = HealingActionType.ReduceLoad,
TargetId = agent.Id,
Reason = "Agent showing warnings, reducing load"
},
HealthLevel.Critical or HealthLevel.Failed => new HealingAction
{
Type = HealingActionType.FailoverAgent,
TargetId = agent.Id,
Reason = $"Agent health critical: {health.AssessedHealth}"
},
_ => null
};
}
private async Task ExecuteHealingActionAsync(
HealingAction action,
CancellationToken ct)
{
_logger.LogInformation(
"Executing healing action {ActionType} on {TargetId}: {Reason}",
action.Type, action.TargetId, action.Reason);
switch (action.Type)
{
case HealingActionType.FailoverAgent:
await _failoverManager.PerformFailoverAsync(
new FailoverRequest { FailedAgentId = action.TargetId }, ct);
break;
case HealingActionType.DrainAgent:
await DrainAgentAsync(action.TargetId, ct);
break;
case HealingActionType.ReduceLoad:
await ReduceAgentLoadAsync(action.TargetId, ct);
break;
case HealingActionType.ReassignTask:
await ReassignTaskAsync(action.TargetId, ct);
break;
case HealingActionType.RebalanceCluster:
await RebalanceClusterAsync(action.TargetId, ct);
break;
}
// Record healing action
await _healingStore.RecordAsync(action, ct);
}
private async Task DrainAgentAsync(Guid agentId, CancellationToken ct)
{
// Stop accepting new tasks
await _agentStore.UpdateStatusAsync(agentId, AgentStatus.Draining, ct);
// Wait for in-flight tasks to complete (with timeout)
var timeout = _timeProvider.GetUtcNow().AddMinutes(5);
while (_timeProvider.GetUtcNow() < timeout)
{
var inFlightTasks = await _taskStore.GetInFlightTasksAsync(agentId, ct);
if (!inFlightTasks.Any())
break;
await Task.Delay(TimeSpan.FromSeconds(5), ct);
}
// Force transfer remaining tasks
var remainingTasks = await _taskStore.GetInFlightTasksAsync(agentId, ct);
foreach (var task in remainingTasks)
{
await _failoverManager.TransferTaskAsync(task, ct);
}
}
}
7. StateSync
Synchronizes state across cluster members:
public sealed class StateSync
{
public async Task SyncClusterStateAsync(
Guid clusterId,
CancellationToken ct)
{
var cluster = await _clusterStore.GetAsync(clusterId, ct);
var members = await _clusterManager.GetClusterMembersAsync(clusterId, ct);
var leader = members.FirstOrDefault(m => m.Role == AgentRole.Leader);
if (leader == null)
{
_logger.LogWarning("No leader for cluster {ClusterId}, skipping sync", clusterId);
return;
}
// Get leader's state
var leaderState = await GetAgentStateAsync(leader.AgentId, ct);
// Sync to other members
foreach (var member in members.Where(m => m.Role != AgentRole.Leader))
{
await SyncToMemberAsync(member.AgentId, leaderState, ct);
}
}
private async Task SyncToMemberAsync(
Guid agentId,
AgentState leaderState,
CancellationToken ct)
{
var memberState = await GetAgentStateAsync(agentId, ct);
var diff = CalculateStateDiff(leaderState, memberState);
if (diff.HasChanges)
{
_logger.LogDebug(
"Syncing {ChangeCount} changes to agent {AgentId}",
diff.Changes.Count, agentId);
await _agentNotifier.SendStateSyncAsync(agentId, diff, ct);
}
}
}
public sealed record AgentState
{
public Guid AgentId { get; init; }
public DateTimeOffset CapturedAt { get; init; }
// Target assignments
public ImmutableArray<Guid> AssignedTargets { get; init; }
// Task state
public ImmutableArray<TaskState> TaskStates { get; init; }
// Configuration
public AgentConfiguration Configuration { get; init; }
// Cached data
public ImmutableDictionary<string, string> CachedDigests { get; init; }
}
Cluster Topologies
Active-Passive
┌─────────────────────────────────────────┐
│ Agent Cluster │
│ │
│ ┌─────────┐ ┌─────────┐ │
│ │ LEADER │ │ STANDBY │ │
│ │ Agent A │ │ Agent B │ │
│ │ (Active)│ │(Passive)│ │
│ └────┬────┘ └────┬────┘ │
│ │ │ │
│ ▼ │ (failover) │
│ ┌─────────┐ │ │
│ │ Targets │◄────────┘ │
│ └─────────┘ │
└─────────────────────────────────────────┘
Active-Active
┌─────────────────────────────────────────┐
│ Agent Cluster │
│ │
│ ┌─────────┐ ┌─────────┐ │
│ │ Agent A │ │ Agent B │ │
│ │ (Active)│ │ (Active)│ │
│ └────┬────┘ └────┬────┘ │
│ │ │ │
│ └──────┬───────┘ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Targets (balanced) │ │
│ └─────────────────────┘ │
└─────────────────────────────────────────┘
Sharded
┌─────────────────────────────────────────┐
│ Agent Cluster │
│ │
│ ┌─────────┐ ┌─────────┐ │
│ │ Agent A │ │ Agent B │ │
│ │ Shard 0 │ │ Shard 1 │ │
│ └────┬────┘ └────┬────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ │
│ │Targets │ │Targets │ │
│ │ 0-49 │ │ 50-99 │ │
│ └─────────┘ └─────────┘ │
└─────────────────────────────────────────┘
API Design
REST Endpoints
# Clusters
POST /api/v1/agents/clusters # Create cluster
GET /api/v1/agents/clusters # List clusters
GET /api/v1/agents/clusters/{id} # Get cluster
PUT /api/v1/agents/clusters/{id} # Update cluster
DELETE /api/v1/agents/clusters/{id} # Delete cluster
GET /api/v1/agents/clusters/{id}/members # Get members
POST /api/v1/agents/clusters/{id}/rebalance # Trigger rebalance
# Failover
POST /api/v1/agents/{id}/failover # Manual failover
GET /api/v1/agents/failovers # Failover history
GET /api/v1/agents/failovers/{id} # Failover details
# Health
GET /api/v1/agents/{id}/health # Get agent health
GET /api/v1/agents/clusters/{id}/health # Get cluster health
# Task Queue
GET /api/v1/agents/tasks/queue # View queue
GET /api/v1/agents/tasks/queue/dead-letter # Dead letter queue
POST /api/v1/agents/tasks/{id}/retry # Retry task
# Self-Healing
GET /api/v1/agents/healing/actions # Healing history
GET /api/v1/agents/healing/status # Current healing status
Metrics & Observability
Prometheus Metrics
# Cluster Health
stella_agent_cluster_members{cluster_id, status}
stella_agent_cluster_leader{cluster_id, agent_id}
stella_agent_cluster_health{cluster_id}
# Failover
stella_agent_failovers_total{cluster_id, status}
stella_agent_failover_duration_seconds{cluster_id}
stella_agent_tasks_transferred_total{cluster_id}
# Task Queue
stella_agent_queue_depth{cluster_id, priority}
stella_agent_queue_latency_seconds{cluster_id}
stella_agent_dead_letter_queue_depth{cluster_id}
# Self-Healing
stella_agent_healing_actions_total{action_type, status}
stella_agent_healing_cycle_duration_seconds
# Agent Health
stella_agent_health_score{agent_id}
stella_agent_heartbeat_age_seconds{agent_id}
stella_agent_task_completion_rate{agent_id}
Configuration
agent_cluster:
name: "production-docker-agents"
target_group_id: "prod-docker-hosts"
membership:
minimum_agents: 2
desired_agents: 3
max_agents: 5
replication_mode: active_active
failover:
selection_strategy: least_loaded
task_transfer_timeout: "00:05:00"
max_transfer_retries: 3
health_monitoring:
heartbeat_interval: "00:00:30"
warning_threshold: "00:01:00"
failure_threshold: "00:01:30"
health_check_interval: "00:00:10"
task_queue:
max_delivery_attempts: 3
default_expiration: "01:00:00"
dead_letter_retention: "7.00:00:00"
self_healing:
enabled: true
cycle_interval: "00:01:00"
drain_timeout: "00:05:00"
leader_election:
enabled: true # For ActivePassive mode
election_interval: "00:00:15"
lease_duration: "00:00:30"
Test Strategy
Unit Tests
- Health score calculation
- Failover target selection
- Task queue operations
- Backoff calculation
Integration Tests
- Full failover flow
- Leader election
- State synchronization
- Task transfer
Chaos Tests
- Random agent failures
- Network partitions
- Split-brain scenarios
- Cascading failures
Load Tests
- High task throughput
- Many concurrent agents
- Rapid failover cycles
Migration Path
Phase 1: Foundation (Week 1-2)
- Cluster data model
- Basic cluster management
- Health monitoring enhancements
Phase 2: Failover (Week 3-4)
- Failover manager
- Task transfer
- Target reassignment
Phase 3: Leader Election (Week 5-6)
- Distributed lock integration
- Election algorithm
- ActivePassive support
Phase 4: Task Queue (Week 7-8)
- Durable queue implementation
- Dead letter handling
- Retry logic
Phase 5: Self-Healing (Week 9-10)
- Healing cycle
- Automatic actions
- Monitoring integration
Phase 6: State Sync (Week 11-12)
- State diffing
- Sync protocol
- Consistency verification