using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StellaOps.Telemetry.Federation.Aggregation; using StellaOps.Telemetry.Federation.Bundles; using StellaOps.Telemetry.Federation.Consent; using StellaOps.Telemetry.Federation.Privacy; namespace StellaOps.Telemetry.Federation.Sync; public sealed class FederatedTelemetrySyncService : BackgroundService { private readonly FederatedTelemetryOptions _options; private readonly IPrivacyBudgetTracker _budgetTracker; private readonly ITelemetryAggregator _aggregator; private readonly IConsentManager _consentManager; private readonly IFederatedTelemetryBundleBuilder _bundleBuilder; private readonly IEgressPolicyIntegration _egressPolicy; private readonly ILogger _logger; // In-memory fact buffer; production implementation would read from persistent store private readonly List _factBuffer = new(); private readonly object _bufferLock = new(); public FederatedTelemetrySyncService( IOptions options, IPrivacyBudgetTracker budgetTracker, ITelemetryAggregator aggregator, IConsentManager consentManager, IFederatedTelemetryBundleBuilder bundleBuilder, IEgressPolicyIntegration egressPolicy, ILogger logger) { _options = options.Value; _budgetTracker = budgetTracker; _aggregator = aggregator; _consentManager = consentManager; _bundleBuilder = bundleBuilder; _egressPolicy = egressPolicy; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation( "Federation sync service started. Interval: {Interval}, SealedMode: {SealedMode}", _options.AggregationInterval, _options.SealedModeEnabled); while (!stoppingToken.IsCancellationRequested) { try { await Task.Delay(_options.AggregationInterval, stoppingToken).ConfigureAwait(false); await RunSyncCycleAsync(stoppingToken).ConfigureAwait(false); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { break; } catch (Exception ex) { _logger.LogError(ex, "Federation sync cycle failed"); } } _logger.LogInformation("Federation sync service stopped"); } public async Task RunSyncCycleAsync(CancellationToken ct) { if (_options.SealedModeEnabled) { _logger.LogDebug("Sealed mode active; skipping federation sync cycle"); return; } if (_budgetTracker.IsBudgetExhausted) { _logger.LogDebug("Privacy budget exhausted; skipping federation sync cycle"); return; } // Check consent for the default tenant (placeholder: real implementation iterates tenants) var consent = await _consentManager.GetConsentStateAsync("default", ct).ConfigureAwait(false); if (!consent.Granted) { _logger.LogDebug("No consent granted; skipping federation sync cycle"); return; } // Drain fact buffer List facts; lock (_bufferLock) { facts = new List(_factBuffer); _factBuffer.Clear(); } if (facts.Count == 0) { _logger.LogDebug("No telemetry facts to aggregate"); return; } // Aggregate var aggregation = await _aggregator.AggregateAsync(facts, ct).ConfigureAwait(false); // Build bundle var consentProof = await _consentManager.GrantConsentAsync("default", "sync-service", null, ct).ConfigureAwait(false); var bundle = await _bundleBuilder.BuildAsync(aggregation, consentProof, ct).ConfigureAwait(false); // Check egress policy var egressCheck = await _egressPolicy.CheckEgressAsync("federation-mesh", bundle.Envelope.Length, ct).ConfigureAwait(false); if (!egressCheck.Allowed) { _logger.LogWarning("Egress blocked: {Reason}", egressCheck.Reason); return; } _logger.LogInformation( "Federation sync cycle complete. Bundle {BundleId}: {BucketCount} buckets, {Suppressed} suppressed, epsilon spent: {EpsilonSpent:F4}", bundle.Id, aggregation.Buckets.Count, aggregation.SuppressedBuckets, aggregation.EpsilonSpent); } /// /// Enqueue a telemetry fact for the next aggregation cycle. /// public void EnqueueFact(TelemetryFact fact) { lock (_bufferLock) { _factBuffer.Add(fact); } } }