Files
2026-01-17 21:32:08 +02:00

36 KiB

Agent Resilience

Overview

Agent Resilience transforms the deployment agent architecture into a highly available, fault-tolerant system. This enhancement provides agent clustering for high availability, automatic failover during deployments, offline task queuing, and self-healing capabilities.

This is a best-in-class implementation that ensures deployments complete successfully even when individual agents fail, network partitions occur, or agents need maintenance.


Design Principles

  1. Zero Downtime Deployments: Agent failures don't block deployments
  2. Automatic Recovery: Self-healing without operator intervention
  3. Graceful Degradation: Reduced capacity vs. complete failure
  4. Offline Resilience: Queue tasks for disconnected agents
  5. Transparent Failover: Seamless handoff between agents
  6. Predictable Behavior: Deterministic failover decisions

Architecture

Component Overview

┌────────────────────────────────────────────────────────────────────────┐
│                     Agent Resilience System                            │
├────────────────────────────────────────────────────────────────────────┤
│                                                                        │
│  ┌──────────────────┐    ┌───────────────────┐    ┌─────────────────┐ │
│  │ AgentCluster     │───▶│ FailoverManager   │───▶│ TaskRouter      │ │
│  │ Manager          │    │                   │    │                 │ │
│  └──────────────────┘    └───────────────────┘    └─────────────────┘ │
│           │                       │                        │          │
│           ▼                       ▼                        ▼          │
│  ┌──────────────────┐    ┌───────────────────┐    ┌─────────────────┐ │
│  │ HealthMonitor    │    │ LeaderElection    │    │ TaskQueue       │ │
│  │                  │    │                   │    │                 │ │
│  └──────────────────┘    └───────────────────┘    └─────────────────┘ │
│           │                       │                        │          │
│           ▼                       ▼                        ▼          │
│  ┌──────────────────┐    ┌───────────────────┐    ┌─────────────────┐ │
│  │ SelfHealer       │    │ StateSync         │    │ RetryManager    │ │
│  │                  │    │                   │    │                 │ │
│  └──────────────────┘    └───────────────────┘    └─────────────────┘ │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

Key Components

1. AgentClusterManager

Manages agent clusters for high availability:

public sealed class AgentClusterManager
{
    public async Task<AgentCluster> CreateClusterAsync(
        AgentClusterConfig config,
        CancellationToken ct)
    {
        var cluster = new AgentCluster
        {
            Id = Guid.NewGuid(),
            Name = config.Name,
            TargetGroupId = config.TargetGroupId,
            MinimumAgents = config.MinimumAgents,
            DesiredAgents = config.DesiredAgents,
            ReplicationMode = config.ReplicationMode,
            FailoverPolicy = config.FailoverPolicy,
            CreatedAt = _timeProvider.GetUtcNow()
        };

        await _clusterStore.SaveAsync(cluster, ct);
        return cluster;
    }

    public async Task<IReadOnlyList<AgentMember>> GetClusterMembersAsync(
        Guid clusterId,
        CancellationToken ct)
    {
        var cluster = await _clusterStore.GetAsync(clusterId, ct);
        var agents = await _agentStore.GetByClusterAsync(clusterId, ct);

        return agents.Select(a => new AgentMember
        {
            AgentId = a.Id,
            HostName = a.HostName,
            Status = a.Status,
            Role = DetermineRole(a, cluster),
            LastHeartbeat = a.LastHeartbeat,
            Capabilities = a.Capabilities,
            CurrentLoad = a.CurrentTaskCount,
            MaxLoad = a.MaxConcurrentTasks
        }).ToList();
    }

    private AgentRole DetermineRole(Agent agent, AgentCluster cluster)
    {
        if (cluster.LeaderId == agent.Id)
            return AgentRole.Leader;

        if (cluster.StandbyIds.Contains(agent.Id))
            return AgentRole.Standby;

        return AgentRole.Member;
    }
}

public sealed record AgentCluster
{
    public Guid Id { get; init; }
    public string Name { get; init; }
    public Guid TargetGroupId { get; init; }

    // Membership
    public int MinimumAgents { get; init; }
    public int DesiredAgents { get; init; }
    public Guid? LeaderId { get; init; }
    public ImmutableArray<Guid> StandbyIds { get; init; }

    // Configuration
    public ReplicationMode ReplicationMode { get; init; }
    public FailoverPolicy FailoverPolicy { get; init; }

    // Status
    public ClusterStatus Status { get; init; }
    public int HealthyAgentCount { get; init; }
    public DateTimeOffset CreatedAt { get; init; }
}

public enum ReplicationMode
{
    ActivePassive,   // One active, others standby
    ActiveActive,    // All agents handle tasks
    Sharded          // Tasks partitioned across agents
}

public enum AgentRole
{
    Leader,   // Primary agent (ActivePassive mode)
    Standby,  // Ready to take over
    Member    // Active participant (ActiveActive mode)
}

2. HealthMonitor

Monitors agent health with sophisticated detection:

public sealed class HealthMonitor
{
    private readonly ConcurrentDictionary<Guid, AgentHealthState> _healthStates = new();

    public async Task ProcessHeartbeatAsync(
        AgentHeartbeat heartbeat,
        CancellationToken ct)
    {
        var state = _healthStates.GetOrAdd(heartbeat.AgentId, _ => new AgentHealthState());

        state.LastHeartbeat = heartbeat.Timestamp;
        state.ReportedHealth = heartbeat.Health;
        state.CurrentLoad = heartbeat.TaskCount;
        state.ResourceMetrics = heartbeat.ResourceMetrics;

        // Update health assessment
        state.AssessedHealth = await AssessHealthAsync(heartbeat, state, ct);

        // Check for degradation
        if (state.AssessedHealth < HealthLevel.Healthy)
        {
            await HandleDegradationAsync(heartbeat.AgentId, state, ct);
        }

        // Emit metrics
        _metricsEmitter.EmitAgentHealth(heartbeat.AgentId, state);
    }

    private async Task<HealthLevel> AssessHealthAsync(
        AgentHeartbeat heartbeat,
        AgentHealthState state,
        CancellationToken ct)
    {
        var factors = new List<HealthFactor>();

        // 1. Self-reported health
        factors.Add(new HealthFactor("self_reported", heartbeat.Health, 0.2));

        // 2. Heartbeat regularity
        var heartbeatScore = CalculateHeartbeatScore(state);
        factors.Add(new HealthFactor("heartbeat_regularity", heartbeatScore, 0.3));

        // 3. Task completion rate
        var completionRate = await GetTaskCompletionRateAsync(heartbeat.AgentId, ct);
        factors.Add(new HealthFactor("task_completion", completionRate, 0.25));

        // 4. Resource utilization
        var resourceScore = CalculateResourceScore(heartbeat.ResourceMetrics);
        factors.Add(new HealthFactor("resource_utilization", resourceScore, 0.15));

        // 5. Error rate
        var errorRate = await GetErrorRateAsync(heartbeat.AgentId, ct);
        factors.Add(new HealthFactor("error_rate", 1.0 - errorRate, 0.1));

        // Weighted average
        var overallScore = factors.Sum(f => f.Score * f.Weight);

        return overallScore switch
        {
            >= 0.9 => HealthLevel.Healthy,
            >= 0.7 => HealthLevel.Degraded,
            >= 0.5 => HealthLevel.Warning,
            >= 0.3 => HealthLevel.Critical,
            _ => HealthLevel.Failed
        };
    }

    public async Task DetectFailuresAsync(CancellationToken ct)
    {
        var now = _timeProvider.GetUtcNow();

        foreach (var (agentId, state) in _healthStates)
        {
            var timeSinceHeartbeat = now - state.LastHeartbeat;

            if (timeSinceHeartbeat > _config.FailureThreshold)
            {
                await HandleAgentFailureAsync(agentId, state, ct);
            }
            else if (timeSinceHeartbeat > _config.WarningThreshold)
            {
                await HandleAgentWarningAsync(agentId, state, ct);
            }
        }
    }

    private async Task HandleAgentFailureAsync(
        Guid agentId,
        AgentHealthState state,
        CancellationToken ct)
    {
        _logger.LogWarning("Agent {AgentId} detected as failed", agentId);

        // Update state
        state.AssessedHealth = HealthLevel.Failed;
        state.FailedAt = _timeProvider.GetUtcNow();

        // Notify failover manager
        await _eventPublisher.PublishAsync(new AgentFailedEvent(agentId, state), ct);

        // Mark agent as offline
        await _agentStore.UpdateStatusAsync(agentId, AgentStatus.Offline, ct);
    }
}

public sealed class AgentHealthState
{
    public DateTimeOffset LastHeartbeat { get; set; }
    public HealthLevel ReportedHealth { get; set; }
    public HealthLevel AssessedHealth { get; set; }
    public int CurrentLoad { get; set; }
    public ResourceMetrics ResourceMetrics { get; set; }
    public DateTimeOffset? FailedAt { get; set; }
    public int ConsecutiveFailures { get; set; }
}

public enum HealthLevel
{
    Healthy = 100,
    Degraded = 75,
    Warning = 50,
    Critical = 25,
    Failed = 0
}

3. FailoverManager

Orchestrates failover between agents:

public sealed class FailoverManager
{
    public async Task<FailoverResult> PerformFailoverAsync(
        FailoverRequest request,
        CancellationToken ct)
    {
        var result = new FailoverResult
        {
            RequestId = Guid.NewGuid(),
            FailedAgentId = request.FailedAgentId,
            StartedAt = _timeProvider.GetUtcNow()
        };

        try
        {
            // 1. Find cluster
            var cluster = await _clusterStore.GetByAgentAsync(request.FailedAgentId, ct);
            if (cluster == null)
            {
                result.Status = FailoverStatus.NotInCluster;
                return result;
            }

            // 2. Select failover target
            var target = await SelectFailoverTargetAsync(cluster, request, ct);
            if (target == null)
            {
                result.Status = FailoverStatus.NoTargetAvailable;
                await HandleNoTargetAsync(cluster, request, ct);
                return result;
            }

            result.TargetAgentId = target.AgentId;

            // 3. Transfer in-flight tasks
            var tasksToTransfer = await GetInFlightTasksAsync(request.FailedAgentId, ct);
            result.TasksTransferred = tasksToTransfer.Count;

            foreach (var task in tasksToTransfer)
            {
                await TransferTaskAsync(task, target.AgentId, ct);
            }

            // 4. Update cluster membership
            if (cluster.LeaderId == request.FailedAgentId)
            {
                await PromoteToLeaderAsync(cluster, target.AgentId, ct);
            }

            // 5. Update target assignments
            await ReassignTargetsAsync(request.FailedAgentId, target.AgentId, ct);

            result.Status = FailoverStatus.Succeeded;
            result.CompletedAt = _timeProvider.GetUtcNow();

            // Emit event
            await _eventPublisher.PublishAsync(new FailoverCompletedEvent(result), ct);
        }
        catch (Exception ex)
        {
            result.Status = FailoverStatus.Failed;
            result.Error = ex.Message;
            _logger.LogError(ex, "Failover failed for agent {AgentId}", request.FailedAgentId);
        }

        return result;
    }

    private async Task<AgentMember?> SelectFailoverTargetAsync(
        AgentCluster cluster,
        FailoverRequest request,
        CancellationToken ct)
    {
        var candidates = await _clusterManager.GetClusterMembersAsync(cluster.Id, ct);

        // Filter healthy agents
        candidates = candidates
            .Where(a => a.AgentId != request.FailedAgentId)
            .Where(a => a.Status == AgentStatus.Online)
            .Where(a => a.HasCapability(request.RequiredCapabilities))
            .ToList();

        if (!candidates.Any())
            return null;

        // Apply selection strategy
        return cluster.FailoverPolicy.SelectionStrategy switch
        {
            FailoverSelectionStrategy.Standby =>
                candidates.FirstOrDefault(a => a.Role == AgentRole.Standby) ??
                candidates.OrderBy(a => a.CurrentLoad).First(),

            FailoverSelectionStrategy.LeastLoaded =>
                candidates.OrderBy(a => a.CurrentLoad / (double)a.MaxLoad).First(),

            FailoverSelectionStrategy.RoundRobin =>
                SelectRoundRobin(cluster, candidates),

            FailoverSelectionStrategy.Affinity =>
                SelectByAffinity(candidates, request.AffinityHints),

            _ => candidates.First()
        };
    }

    private async Task TransferTaskAsync(
        AgentTask task,
        Guid targetAgentId,
        CancellationToken ct)
    {
        // Mark task as transferred
        task.TransferredFrom = task.AssignedAgentId;
        task.AssignedAgentId = targetAgentId;
        task.TransferredAt = _timeProvider.GetUtcNow();

        // Reset task state for retry
        if (task.Status == TaskStatus.Running)
        {
            task.Status = TaskStatus.Pending;
            task.RetryCount++;
        }

        await _taskStore.SaveAsync(task, ct);

        // Notify target agent
        await _agentNotifier.NotifyTaskAssignedAsync(targetAgentId, task, ct);
    }
}

public sealed record FailoverResult
{
    public Guid RequestId { get; init; }
    public Guid FailedAgentId { get; init; }
    public Guid? TargetAgentId { get; init; }
    public FailoverStatus Status { get; init; }
    public int TasksTransferred { get; init; }
    public string? Error { get; init; }
    public DateTimeOffset StartedAt { get; init; }
    public DateTimeOffset? CompletedAt { get; init; }
}

public enum FailoverStatus
{
    Succeeded,
    NotInCluster,
    NoTargetAvailable,
    Failed
}

4. LeaderElection

Manages leader election for ActivePassive clusters:

public sealed class LeaderElection
{
    private readonly IDistributedLockProvider _lockProvider;

    public async Task RunElectionAsync(
        Guid clusterId,
        CancellationToken ct)
    {
        var cluster = await _clusterStore.GetAsync(clusterId, ct);
        var members = await _clusterManager.GetClusterMembersAsync(clusterId, ct);

        var healthyMembers = members
            .Where(m => m.Status == AgentStatus.Online)
            .OrderByDescending(m => m.Role == AgentRole.Standby)  // Prefer standbys
            .ThenBy(m => m.CurrentLoad)                           // Then least loaded
            .ToList();

        if (!healthyMembers.Any())
        {
            _logger.LogWarning("No healthy members for cluster {ClusterId}", clusterId);
            return;
        }

        // Acquire distributed lock for election
        await using var @lock = await _lockProvider.AcquireAsync(
            $"cluster:{clusterId}:election", ct);

        // Re-read cluster state under lock
        cluster = await _clusterStore.GetAsync(clusterId, ct);

        // Check if current leader is healthy
        var currentLeader = healthyMembers.FirstOrDefault(m => m.AgentId == cluster.LeaderId);
        if (currentLeader != null)
        {
            _logger.LogDebug("Current leader {LeaderId} is healthy", cluster.LeaderId);
            return;
        }

        // Elect new leader
        var newLeader = healthyMembers.First();
        await PromoteToLeaderAsync(cluster, newLeader.AgentId, ct);

        _logger.LogInformation(
            "Elected new leader {NewLeaderId} for cluster {ClusterId}",
            newLeader.AgentId, clusterId);
    }

    private async Task PromoteToLeaderAsync(
        AgentCluster cluster,
        Guid newLeaderId,
        CancellationToken ct)
    {
        var previousLeaderId = cluster.LeaderId;

        // Update cluster
        cluster = cluster with { LeaderId = newLeaderId };

        // Update standby list
        var newStandbys = cluster.StandbyIds
            .Where(id => id != newLeaderId)
            .ToImmutableArray();

        if (previousLeaderId.HasValue)
        {
            // Demote previous leader to standby if still healthy
            var previousLeader = await _agentStore.GetAsync(previousLeaderId.Value, ct);
            if (previousLeader?.Status == AgentStatus.Online)
            {
                newStandbys = newStandbys.Add(previousLeaderId.Value);
            }
        }

        cluster = cluster with { StandbyIds = newStandbys };
        await _clusterStore.SaveAsync(cluster, ct);

        // Notify agents
        await _agentNotifier.NotifyLeaderChangeAsync(cluster.Id, newLeaderId, ct);

        // Emit event
        await _eventPublisher.PublishAsync(new LeaderElectedEvent(
            cluster.Id, newLeaderId, previousLeaderId), ct);
    }
}

5. TaskQueue

Durable task queue for offline agents:

public sealed class TaskQueue
{
    private readonly ITaskQueueStore _store;

    public async Task<Guid> EnqueueAsync(
        AgentTask task,
        EnqueueOptions options,
        CancellationToken ct)
    {
        var queuedTask = new QueuedTask
        {
            Id = Guid.NewGuid(),
            Task = task,
            Priority = options.Priority,
            EnqueuedAt = _timeProvider.GetUtcNow(),
            ExpiresAt = options.ExpiresAt,
            TargetAgentId = options.TargetAgentId,
            TargetClusterId = options.TargetClusterId,
            RequiredCapabilities = options.RequiredCapabilities,
            DeliveryAttempts = 0,
            MaxDeliveryAttempts = options.MaxDeliveryAttempts
        };

        await _store.SaveAsync(queuedTask, ct);
        return queuedTask.Id;
    }

    public async Task<QueuedTask?> DequeueAsync(
        Guid agentId,
        ImmutableArray<string> capabilities,
        CancellationToken ct)
    {
        // Find eligible tasks
        var tasks = await _store.GetPendingTasksAsync(agentId, capabilities, ct);

        foreach (var task in tasks.OrderByDescending(t => t.Priority))
        {
            // Check expiration
            if (task.ExpiresAt.HasValue && task.ExpiresAt < _timeProvider.GetUtcNow())
            {
                await ExpireTaskAsync(task, ct);
                continue;
            }

            // Try to claim task
            var claimed = await _store.TryClaimAsync(task.Id, agentId, ct);
            if (claimed)
            {
                task.DeliveryAttempts++;
                task.LastAttemptAt = _timeProvider.GetUtcNow();
                task.ClaimedBy = agentId;
                await _store.SaveAsync(task, ct);
                return task;
            }
        }

        return null;
    }

    public async Task CompleteAsync(Guid taskId, TaskResult result, CancellationToken ct)
    {
        var task = await _store.GetAsync(taskId, ct);
        if (task == null)
            return;

        task.CompletedAt = _timeProvider.GetUtcNow();
        task.Result = result;
        task.Status = result.Success ? QueuedTaskStatus.Completed : QueuedTaskStatus.Failed;

        await _store.SaveAsync(task, ct);

        // Archive or retry
        if (task.Status == QueuedTaskStatus.Completed)
        {
            await _store.ArchiveAsync(taskId, ct);
        }
        else if (task.DeliveryAttempts < task.MaxDeliveryAttempts)
        {
            await RetryAsync(task, ct);
        }
        else
        {
            await _store.MoveToDeadLetterAsync(taskId, ct);
        }
    }

    private async Task RetryAsync(QueuedTask task, CancellationToken ct)
    {
        var delay = CalculateBackoff(task.DeliveryAttempts);
        task.Status = QueuedTaskStatus.Pending;
        task.ClaimedBy = null;
        task.NextAttemptAt = _timeProvider.GetUtcNow().Add(delay);
        await _store.SaveAsync(task, ct);
    }

    private TimeSpan CalculateBackoff(int attempts)
    {
        // Exponential backoff with jitter
        var baseDelay = TimeSpan.FromSeconds(Math.Pow(2, attempts));
        var jitter = TimeSpan.FromMilliseconds(Random.Shared.Next(0, 1000));
        var maxDelay = TimeSpan.FromMinutes(5);
        return Min(baseDelay + jitter, maxDelay);
    }
}

public sealed record QueuedTask
{
    public Guid Id { get; init; }
    public AgentTask Task { get; init; }
    public TaskPriority Priority { get; init; }
    public QueuedTaskStatus Status { get; init; }

    // Targeting
    public Guid? TargetAgentId { get; init; }
    public Guid? TargetClusterId { get; init; }
    public ImmutableArray<string> RequiredCapabilities { get; init; }

    // Timing
    public DateTimeOffset EnqueuedAt { get; init; }
    public DateTimeOffset? ExpiresAt { get; init; }
    public DateTimeOffset? NextAttemptAt { get; init; }
    public DateTimeOffset? CompletedAt { get; init; }

    // Delivery
    public int DeliveryAttempts { get; set; }
    public int MaxDeliveryAttempts { get; init; }
    public DateTimeOffset? LastAttemptAt { get; set; }
    public Guid? ClaimedBy { get; set; }

    // Result
    public TaskResult? Result { get; set; }
}

6. SelfHealer

Automatic recovery and self-healing:

public sealed class SelfHealer
{
    public async Task RunHealingCycleAsync(CancellationToken ct)
    {
        var healingActions = new List<HealingAction>();

        // 1. Detect unhealthy agents
        var unhealthyAgents = await DetectUnhealthyAgentsAsync(ct);
        foreach (var agent in unhealthyAgents)
        {
            var action = await DetermineHealingActionAsync(agent, ct);
            if (action != null)
            {
                healingActions.Add(action);
            }
        }

        // 2. Detect orphaned tasks
        var orphanedTasks = await DetectOrphanedTasksAsync(ct);
        foreach (var task in orphanedTasks)
        {
            healingActions.Add(new HealingAction
            {
                Type = HealingActionType.ReassignTask,
                TargetId = task.Id,
                Reason = "Task orphaned after agent failure"
            });
        }

        // 3. Detect under-replicated clusters
        var underReplicatedClusters = await DetectUnderReplicatedClustersAsync(ct);
        foreach (var cluster in underReplicatedClusters)
        {
            healingActions.Add(new HealingAction
            {
                Type = HealingActionType.RebalanceCluster,
                TargetId = cluster.Id,
                Reason = $"Cluster has {cluster.HealthyAgentCount}/{cluster.DesiredAgents} agents"
            });
        }

        // 4. Execute healing actions
        foreach (var action in healingActions.OrderByDescending(a => a.Priority))
        {
            await ExecuteHealingActionAsync(action, ct);
        }
    }

    private async Task<HealingAction?> DetermineHealingActionAsync(
        Agent agent,
        CancellationToken ct)
    {
        var health = await _healthMonitor.GetHealthStateAsync(agent.Id, ct);

        return health.AssessedHealth switch
        {
            HealthLevel.Degraded => new HealingAction
            {
                Type = HealingActionType.DrainAgent,
                TargetId = agent.Id,
                Reason = "Agent degraded, draining tasks"
            },

            HealthLevel.Warning => new HealingAction
            {
                Type = HealingActionType.ReduceLoad,
                TargetId = agent.Id,
                Reason = "Agent showing warnings, reducing load"
            },

            HealthLevel.Critical or HealthLevel.Failed => new HealingAction
            {
                Type = HealingActionType.FailoverAgent,
                TargetId = agent.Id,
                Reason = $"Agent health critical: {health.AssessedHealth}"
            },

            _ => null
        };
    }

    private async Task ExecuteHealingActionAsync(
        HealingAction action,
        CancellationToken ct)
    {
        _logger.LogInformation(
            "Executing healing action {ActionType} on {TargetId}: {Reason}",
            action.Type, action.TargetId, action.Reason);

        switch (action.Type)
        {
            case HealingActionType.FailoverAgent:
                await _failoverManager.PerformFailoverAsync(
                    new FailoverRequest { FailedAgentId = action.TargetId }, ct);
                break;

            case HealingActionType.DrainAgent:
                await DrainAgentAsync(action.TargetId, ct);
                break;

            case HealingActionType.ReduceLoad:
                await ReduceAgentLoadAsync(action.TargetId, ct);
                break;

            case HealingActionType.ReassignTask:
                await ReassignTaskAsync(action.TargetId, ct);
                break;

            case HealingActionType.RebalanceCluster:
                await RebalanceClusterAsync(action.TargetId, ct);
                break;
        }

        // Record healing action
        await _healingStore.RecordAsync(action, ct);
    }

    private async Task DrainAgentAsync(Guid agentId, CancellationToken ct)
    {
        // Stop accepting new tasks
        await _agentStore.UpdateStatusAsync(agentId, AgentStatus.Draining, ct);

        // Wait for in-flight tasks to complete (with timeout)
        var timeout = _timeProvider.GetUtcNow().AddMinutes(5);
        while (_timeProvider.GetUtcNow() < timeout)
        {
            var inFlightTasks = await _taskStore.GetInFlightTasksAsync(agentId, ct);
            if (!inFlightTasks.Any())
                break;

            await Task.Delay(TimeSpan.FromSeconds(5), ct);
        }

        // Force transfer remaining tasks
        var remainingTasks = await _taskStore.GetInFlightTasksAsync(agentId, ct);
        foreach (var task in remainingTasks)
        {
            await _failoverManager.TransferTaskAsync(task, ct);
        }
    }
}

7. StateSync

Synchronizes state across cluster members:

public sealed class StateSync
{
    public async Task SyncClusterStateAsync(
        Guid clusterId,
        CancellationToken ct)
    {
        var cluster = await _clusterStore.GetAsync(clusterId, ct);
        var members = await _clusterManager.GetClusterMembersAsync(clusterId, ct);
        var leader = members.FirstOrDefault(m => m.Role == AgentRole.Leader);

        if (leader == null)
        {
            _logger.LogWarning("No leader for cluster {ClusterId}, skipping sync", clusterId);
            return;
        }

        // Get leader's state
        var leaderState = await GetAgentStateAsync(leader.AgentId, ct);

        // Sync to other members
        foreach (var member in members.Where(m => m.Role != AgentRole.Leader))
        {
            await SyncToMemberAsync(member.AgentId, leaderState, ct);
        }
    }

    private async Task SyncToMemberAsync(
        Guid agentId,
        AgentState leaderState,
        CancellationToken ct)
    {
        var memberState = await GetAgentStateAsync(agentId, ct);
        var diff = CalculateStateDiff(leaderState, memberState);

        if (diff.HasChanges)
        {
            _logger.LogDebug(
                "Syncing {ChangeCount} changes to agent {AgentId}",
                diff.Changes.Count, agentId);

            await _agentNotifier.SendStateSyncAsync(agentId, diff, ct);
        }
    }
}

public sealed record AgentState
{
    public Guid AgentId { get; init; }
    public DateTimeOffset CapturedAt { get; init; }

    // Target assignments
    public ImmutableArray<Guid> AssignedTargets { get; init; }

    // Task state
    public ImmutableArray<TaskState> TaskStates { get; init; }

    // Configuration
    public AgentConfiguration Configuration { get; init; }

    // Cached data
    public ImmutableDictionary<string, string> CachedDigests { get; init; }
}

Cluster Topologies

Active-Passive

┌─────────────────────────────────────────┐
│              Agent Cluster              │
│                                         │
│   ┌─────────┐    ┌─────────┐           │
│   │ LEADER  │    │ STANDBY │           │
│   │ Agent A │    │ Agent B │           │
│   │ (Active)│    │(Passive)│           │
│   └────┬────┘    └────┬────┘           │
│        │              │                 │
│        ▼              │ (failover)      │
│   ┌─────────┐         │                 │
│   │ Targets │◄────────┘                 │
│   └─────────┘                           │
└─────────────────────────────────────────┘

Active-Active

┌─────────────────────────────────────────┐
│              Agent Cluster              │
│                                         │
│   ┌─────────┐    ┌─────────┐           │
│   │ Agent A │    │ Agent B │           │
│   │ (Active)│    │ (Active)│           │
│   └────┬────┘    └────┬────┘           │
│        │              │                 │
│        └──────┬───────┘                 │
│               ▼                         │
│   ┌─────────────────────┐               │
│   │ Targets (balanced)  │               │
│   └─────────────────────┘               │
└─────────────────────────────────────────┘

Sharded

┌─────────────────────────────────────────┐
│              Agent Cluster              │
│                                         │
│   ┌─────────┐    ┌─────────┐           │
│   │ Agent A │    │ Agent B │           │
│   │ Shard 0 │    │ Shard 1 │           │
│   └────┬────┘    └────┬────┘           │
│        │              │                 │
│        ▼              ▼                 │
│   ┌─────────┐    ┌─────────┐           │
│   │Targets  │    │Targets  │           │
│   │ 0-49    │    │ 50-99   │           │
│   └─────────┘    └─────────┘           │
└─────────────────────────────────────────┘

API Design

REST Endpoints

# Clusters
POST   /api/v1/agents/clusters                    # Create cluster
GET    /api/v1/agents/clusters                    # List clusters
GET    /api/v1/agents/clusters/{id}               # Get cluster
PUT    /api/v1/agents/clusters/{id}               # Update cluster
DELETE /api/v1/agents/clusters/{id}               # Delete cluster
GET    /api/v1/agents/clusters/{id}/members       # Get members
POST   /api/v1/agents/clusters/{id}/rebalance     # Trigger rebalance

# Failover
POST   /api/v1/agents/{id}/failover               # Manual failover
GET    /api/v1/agents/failovers                   # Failover history
GET    /api/v1/agents/failovers/{id}              # Failover details

# Health
GET    /api/v1/agents/{id}/health                 # Get agent health
GET    /api/v1/agents/clusters/{id}/health        # Get cluster health

# Task Queue
GET    /api/v1/agents/tasks/queue                 # View queue
GET    /api/v1/agents/tasks/queue/dead-letter     # Dead letter queue
POST   /api/v1/agents/tasks/{id}/retry            # Retry task

# Self-Healing
GET    /api/v1/agents/healing/actions             # Healing history
GET    /api/v1/agents/healing/status              # Current healing status

Metrics & Observability

Prometheus Metrics

# Cluster Health
stella_agent_cluster_members{cluster_id, status}
stella_agent_cluster_leader{cluster_id, agent_id}
stella_agent_cluster_health{cluster_id}

# Failover
stella_agent_failovers_total{cluster_id, status}
stella_agent_failover_duration_seconds{cluster_id}
stella_agent_tasks_transferred_total{cluster_id}

# Task Queue
stella_agent_queue_depth{cluster_id, priority}
stella_agent_queue_latency_seconds{cluster_id}
stella_agent_dead_letter_queue_depth{cluster_id}

# Self-Healing
stella_agent_healing_actions_total{action_type, status}
stella_agent_healing_cycle_duration_seconds

# Agent Health
stella_agent_health_score{agent_id}
stella_agent_heartbeat_age_seconds{agent_id}
stella_agent_task_completion_rate{agent_id}

Configuration

agent_cluster:
  name: "production-docker-agents"
  target_group_id: "prod-docker-hosts"

  membership:
    minimum_agents: 2
    desired_agents: 3
    max_agents: 5

  replication_mode: active_active

  failover:
    selection_strategy: least_loaded
    task_transfer_timeout: "00:05:00"
    max_transfer_retries: 3

  health_monitoring:
    heartbeat_interval: "00:00:30"
    warning_threshold: "00:01:00"
    failure_threshold: "00:01:30"
    health_check_interval: "00:00:10"

  task_queue:
    max_delivery_attempts: 3
    default_expiration: "01:00:00"
    dead_letter_retention: "7.00:00:00"

  self_healing:
    enabled: true
    cycle_interval: "00:01:00"
    drain_timeout: "00:05:00"

  leader_election:
    enabled: true  # For ActivePassive mode
    election_interval: "00:00:15"
    lease_duration: "00:00:30"

Test Strategy

Unit Tests

  • Health score calculation
  • Failover target selection
  • Task queue operations
  • Backoff calculation

Integration Tests

  • Full failover flow
  • Leader election
  • State synchronization
  • Task transfer

Chaos Tests

  • Random agent failures
  • Network partitions
  • Split-brain scenarios
  • Cascading failures

Load Tests

  • High task throughput
  • Many concurrent agents
  • Rapid failover cycles

Migration Path

Phase 1: Foundation (Week 1-2)

  • Cluster data model
  • Basic cluster management
  • Health monitoring enhancements

Phase 2: Failover (Week 3-4)

  • Failover manager
  • Task transfer
  • Target reassignment

Phase 3: Leader Election (Week 5-6)

  • Distributed lock integration
  • Election algorithm
  • ActivePassive support

Phase 4: Task Queue (Week 7-8)

  • Durable queue implementation
  • Dead letter handling
  • Retry logic

Phase 5: Self-Healing (Week 9-10)

  • Healing cycle
  • Automatic actions
  • Monitoring integration

Phase 6: State Sync (Week 11-12)

  • State diffing
  • Sync protocol
  • Consistency verification