Files
git.stella-ops.org/src/Concelier/StellaOps.Concelier.WebService/Extensions/FederationEndpointExtensions.cs
2026-01-08 20:46:43 +02:00

461 lines
18 KiB
C#

using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
using StellaOps.Concelier.Federation.Export;
using StellaOps.Concelier.Federation.Import;
using StellaOps.Concelier.Federation.Models;
using StellaOps.Concelier.WebService.Options;
using StellaOps.Concelier.WebService.Results;
using HttpResults = Microsoft.AspNetCore.Http.Results;
namespace StellaOps.Concelier.WebService.Extensions;
/// <summary>
/// Endpoint extensions for Federation functionality.
/// Per SPRINT_8200_0014_0002_CONCEL_delta_bundle_export.
/// </summary>
internal static class FederationEndpointExtensions
{
public static void MapConcelierFederationEndpoints(this WebApplication app)
{
var group = app.MapGroup("/api/v1/federation")
.WithTags("Federation");
// GET /api/v1/federation/export - Export delta bundle
group.MapGet("/export", async (
HttpContext context,
[FromServices] IBundleExportService exportService,
[FromServices] IOptionsMonitor<ConcelierOptions> optionsMonitor,
CancellationToken cancellationToken,
[FromQuery(Name = "since_cursor")] string? sinceCursor = null,
[FromQuery] bool sign = true,
[FromQuery(Name = "max_items")] int maxItems = 10000,
[FromQuery(Name = "compress_level")] int compressLevel = 3) =>
{
var options = optionsMonitor.CurrentValue;
if (!options.Federation.Enabled)
{
return ConcelierProblemResultFactory.FederationDisabled(context);
}
// Validate parameters
if (maxItems < 1 || maxItems > 100_000)
{
return HttpResults.BadRequest(new { error = "max_items must be between 1 and 100000" });
}
if (compressLevel < 1 || compressLevel > 19)
{
return HttpResults.BadRequest(new { error = "compress_level must be between 1 and 19" });
}
var exportOptions = new BundleExportOptions
{
Sign = sign,
MaxItems = maxItems,
CompressionLevel = compressLevel
};
// Set response headers for streaming
context.Response.ContentType = "application/zstd";
context.Response.Headers.ContentDisposition =
$"attachment; filename=\"feedser-bundle-{DateTime.UtcNow:yyyyMMdd-HHmmss}.zst\"";
// Export directly to response stream
var result = await exportService.ExportToStreamAsync(
context.Response.Body,
sinceCursor,
exportOptions,
cancellationToken);
// Add metadata headers
context.Response.Headers.Append("X-Bundle-Hash", result.BundleHash);
context.Response.Headers.Append("X-Export-Cursor", result.ExportCursor);
context.Response.Headers.Append("X-Items-Count", result.Counts.Total.ToString());
return HttpResults.Empty;
})
.WithName("ExportFederationBundle")
.WithSummary("Export delta bundle for federation sync")
.Produces(200, contentType: "application/zstd")
.ProducesProblem(400)
.ProducesProblem(503);
// GET /api/v1/federation/export/preview - Preview export statistics
group.MapGet("/export/preview", async (
HttpContext context,
[FromServices] IBundleExportService exportService,
[FromServices] IOptionsMonitor<ConcelierOptions> optionsMonitor,
CancellationToken cancellationToken,
[FromQuery(Name = "since_cursor")] string? sinceCursor = null) =>
{
var options = optionsMonitor.CurrentValue;
if (!options.Federation.Enabled)
{
return ConcelierProblemResultFactory.FederationDisabled(context);
}
var preview = await exportService.PreviewAsync(sinceCursor, cancellationToken);
return HttpResults.Ok(new
{
since_cursor = sinceCursor,
estimated_canonicals = preview.EstimatedCanonicals,
estimated_edges = preview.EstimatedEdges,
estimated_deletions = preview.EstimatedDeletions,
estimated_size_bytes = preview.EstimatedSizeBytes,
estimated_size_mb = Math.Round(preview.EstimatedSizeBytes / 1024.0 / 1024.0, 2)
});
})
.WithName("PreviewFederationExport")
.WithSummary("Preview export statistics without creating bundle")
.Produces<object>(200)
.ProducesProblem(503);
// GET /api/v1/federation/status - Federation status
group.MapGet("/status", (
HttpContext context,
[FromServices] IOptionsMonitor<ConcelierOptions> optionsMonitor) =>
{
var options = optionsMonitor.CurrentValue;
return HttpResults.Ok(new
{
enabled = options.Federation.Enabled,
site_id = options.Federation.SiteId,
default_compression_level = options.Federation.DefaultCompressionLevel,
default_max_items = options.Federation.DefaultMaxItems
});
})
.WithName("GetFederationStatus")
.WithSummary("Get federation configuration status")
.Produces<object>(200);
// POST /api/v1/federation/import - Import a bundle
// Per SPRINT_8200_0014_0003_CONCEL_bundle_import_merge Task 25-26.
group.MapPost("/import", async (
HttpContext context,
[FromServices] IBundleImportService importService,
[FromServices] IOptionsMonitor<ConcelierOptions> optionsMonitor,
CancellationToken cancellationToken,
[FromQuery(Name = "dry_run")] bool dryRun = false,
[FromQuery(Name = "skip_signature")] bool skipSignature = false,
[FromQuery(Name = "on_conflict")] string? onConflict = null,
[FromQuery] bool force = false) =>
{
var options = optionsMonitor.CurrentValue;
if (!options.Federation.Enabled)
{
return ConcelierProblemResultFactory.FederationDisabled(context);
}
// Validate content type
var contentType = context.Request.ContentType;
if (string.IsNullOrEmpty(contentType) ||
(!contentType.Contains("application/zstd") &&
!contentType.Contains("application/octet-stream")))
{
return HttpResults.BadRequest(new { error = "Content-Type must be application/zstd or application/octet-stream" });
}
// Parse conflict resolution
var conflictResolution = ConflictResolution.PreferRemote;
if (!string.IsNullOrEmpty(onConflict))
{
if (!Enum.TryParse<ConflictResolution>(onConflict, ignoreCase: true, out conflictResolution))
{
return HttpResults.BadRequest(new { error = "on_conflict must be one of: PreferRemote, PreferLocal, Fail" });
}
}
var importOptions = new BundleImportOptions
{
DryRun = dryRun,
SkipSignatureVerification = skipSignature,
OnConflict = conflictResolution,
Force = force
};
// Stream request body directly to import service
var result = await importService.ImportAsync(
context.Request.Body,
importOptions,
cancellationToken);
if (!result.Success)
{
return HttpResults.UnprocessableEntity(new
{
success = false,
bundle_hash = result.BundleHash,
failure_reason = result.FailureReason,
duration_ms = result.Duration.TotalMilliseconds
});
}
return HttpResults.Ok(new
{
success = true,
bundle_hash = result.BundleHash,
imported_cursor = result.ImportedCursor,
counts = new
{
canonical_created = result.Counts.CanonicalCreated,
canonical_updated = result.Counts.CanonicalUpdated,
canonical_skipped = result.Counts.CanonicalSkipped,
edges_added = result.Counts.EdgesAdded,
deletions_processed = result.Counts.DeletionsProcessed,
total = result.Counts.Total
},
conflicts = result.Conflicts.Select(c => new
{
merge_hash = c.MergeHash,
field = c.Field,
local_value = c.LocalValue,
remote_value = c.RemoteValue,
resolution = c.Resolution.ToString().ToLowerInvariant()
}),
duration_ms = result.Duration.TotalMilliseconds,
dry_run = dryRun
});
})
.WithName("ImportFederationBundle")
.WithSummary("Import a federation bundle")
.Accepts<Stream>("application/zstd")
.Produces<object>(200)
.ProducesProblem(400)
.ProducesProblem(422)
.ProducesProblem(503)
.DisableAntiforgery();
// POST /api/v1/federation/import/validate - Validate bundle without importing
group.MapPost("/import/validate", async (
HttpContext context,
[FromServices] IBundleImportService importService,
[FromServices] IOptionsMonitor<ConcelierOptions> optionsMonitor,
CancellationToken cancellationToken) =>
{
var options = optionsMonitor.CurrentValue;
if (!options.Federation.Enabled)
{
return ConcelierProblemResultFactory.FederationDisabled(context);
}
var result = await importService.ValidateAsync(
context.Request.Body,
cancellationToken);
return HttpResults.Ok(new
{
is_valid = result.IsValid,
errors = result.Errors,
warnings = result.Warnings,
hash_valid = result.HashValid,
signature_valid = result.SignatureValid,
cursor_valid = result.CursorValid
});
})
.WithName("ValidateFederationBundle")
.WithSummary("Validate a bundle without importing")
.Accepts<Stream>("application/zstd")
.Produces<object>(200)
.ProducesProblem(503)
.DisableAntiforgery();
// POST /api/v1/federation/import/preview - Preview import
group.MapPost("/import/preview", async (
HttpContext context,
[FromServices] IBundleImportService importService,
[FromServices] IOptionsMonitor<ConcelierOptions> optionsMonitor,
CancellationToken cancellationToken) =>
{
var options = optionsMonitor.CurrentValue;
if (!options.Federation.Enabled)
{
return ConcelierProblemResultFactory.FederationDisabled(context);
}
var preview = await importService.PreviewAsync(
context.Request.Body,
cancellationToken);
return HttpResults.Ok(new
{
is_valid = preview.IsValid,
is_duplicate = preview.IsDuplicate,
current_cursor = preview.CurrentCursor,
manifest = new
{
version = preview.Manifest.Version,
site_id = preview.Manifest.SiteId,
export_cursor = preview.Manifest.ExportCursor,
bundle_hash = preview.Manifest.BundleHash,
exported_at = preview.Manifest.ExportedAt,
counts = new
{
canonicals = preview.Manifest.Counts?.Canonicals ?? 0,
edges = preview.Manifest.Counts?.Edges ?? 0,
deletions = preview.Manifest.Counts?.Deletions ?? 0,
total = preview.Manifest.Counts?.Total ?? 0
}
},
errors = preview.Errors,
warnings = preview.Warnings
});
})
.WithName("PreviewFederationImport")
.WithSummary("Preview what import would do")
.Accepts<Stream>("application/zstd")
.Produces<object>(200)
.ProducesProblem(503)
.DisableAntiforgery();
// GET /api/v1/federation/sites - List all federation sites
// Per SPRINT_8200_0014_0003_CONCEL_bundle_import_merge Task 30.
group.MapGet("/sites", async (
HttpContext context,
[FromServices] ISyncLedgerRepository ledgerRepository,
[FromServices] IOptionsMonitor<ConcelierOptions> optionsMonitor,
CancellationToken cancellationToken,
[FromQuery(Name = "enabled_only")] bool enabledOnly = false) =>
{
var options = optionsMonitor.CurrentValue;
if (!options.Federation.Enabled)
{
return ConcelierProblemResultFactory.FederationDisabled(context);
}
var sites = await ledgerRepository.GetAllPoliciesAsync(enabledOnly, cancellationToken);
return HttpResults.Ok(new
{
sites = sites.Select(s => new
{
site_id = s.SiteId,
display_name = s.DisplayName,
enabled = s.Enabled,
last_sync_at = s.LastSyncAt,
last_cursor = s.LastCursor,
total_imports = s.TotalImports,
allowed_sources = s.AllowedSources,
max_bundle_size_bytes = s.MaxBundleSizeBytes
}),
count = sites.Count
});
})
.WithName("ListFederationSites")
.WithSummary("List all federation sites")
.Produces<object>(200)
.ProducesProblem(503);
// GET /api/v1/federation/sites/{siteId} - Get site details
group.MapGet("/sites/{siteId}", async (
HttpContext context,
[FromServices] ISyncLedgerRepository ledgerRepository,
[FromServices] IOptionsMonitor<ConcelierOptions> optionsMonitor,
string siteId,
CancellationToken cancellationToken) =>
{
var options = optionsMonitor.CurrentValue;
if (!options.Federation.Enabled)
{
return ConcelierProblemResultFactory.FederationDisabled(context);
}
var site = await ledgerRepository.GetPolicyAsync(siteId, cancellationToken);
if (site == null)
{
return HttpResults.NotFound(new { error = $"Site '{siteId}' not found" });
}
// Get recent sync history
var history = new List<object>();
await foreach (var entry in ledgerRepository.GetHistoryAsync(siteId, 10, cancellationToken))
{
history.Add(new
{
cursor = entry.Cursor,
bundle_hash = entry.BundleHash,
item_count = entry.ItemCount,
exported_at = entry.ExportedAt,
imported_at = entry.ImportedAt
});
}
return HttpResults.Ok(new
{
site_id = site.SiteId,
display_name = site.DisplayName,
enabled = site.Enabled,
last_sync_at = site.LastSyncAt,
last_cursor = site.LastCursor,
total_imports = site.TotalImports,
allowed_sources = site.AllowedSources,
max_bundle_size_bytes = site.MaxBundleSizeBytes,
recent_history = history
});
})
.WithName("GetFederationSite")
.WithSummary("Get federation site details")
.Produces<object>(200)
.ProducesProblem(404)
.ProducesProblem(503);
// PUT /api/v1/federation/sites/{siteId}/policy - Update site policy
// Per SPRINT_8200_0014_0003_CONCEL_bundle_import_merge Task 31.
group.MapPut("/sites/{siteId}/policy", async (
HttpContext context,
[FromServices] ISyncLedgerRepository ledgerRepository,
[FromServices] IOptionsMonitor<ConcelierOptions> optionsMonitor,
string siteId,
[FromBody] SitePolicyUpdateRequest request,
CancellationToken cancellationToken) =>
{
var options = optionsMonitor.CurrentValue;
if (!options.Federation.Enabled)
{
return ConcelierProblemResultFactory.FederationDisabled(context);
}
var existing = await ledgerRepository.GetPolicyAsync(siteId, cancellationToken);
var policy = new SitePolicy
{
SiteId = siteId,
DisplayName = request.DisplayName ?? existing?.DisplayName,
Enabled = request.Enabled ?? existing?.Enabled ?? true,
AllowedSources = request.AllowedSources ?? existing?.AllowedSources,
MaxBundleSizeBytes = request.MaxBundleSizeBytes ?? existing?.MaxBundleSizeBytes,
LastSyncAt = existing?.LastSyncAt,
LastCursor = existing?.LastCursor,
TotalImports = existing?.TotalImports ?? 0
};
await ledgerRepository.UpsertPolicyAsync(policy, cancellationToken);
return HttpResults.Ok(new
{
site_id = policy.SiteId,
display_name = policy.DisplayName,
enabled = policy.Enabled,
allowed_sources = policy.AllowedSources,
max_bundle_size_bytes = policy.MaxBundleSizeBytes
});
})
.WithName("UpdateFederationSitePolicy")
.WithSummary("Update federation site policy")
.Produces<object>(200)
.ProducesProblem(400)
.ProducesProblem(503);
}
}
/// <summary>
/// Request body for updating site policy.
/// </summary>
public sealed record SitePolicyUpdateRequest
{
public string? DisplayName { get; init; }
public bool? Enabled { get; init; }
public List<string>? AllowedSources { get; init; }
public long? MaxBundleSizeBytes { get; init; }
}