Add authority bootstrap flows and Concelier ops runbooks
This commit is contained in:
		
							
								
								
									
										382
									
								
								tools/SourceStateSeeder/Program.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										382
									
								
								tools/SourceStateSeeder/Program.cs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,382 @@
 | 
			
		||||
using System.Globalization;
 | 
			
		||||
using System.Security.Cryptography;
 | 
			
		||||
using System.Text.Json;
 | 
			
		||||
using Microsoft.Extensions.Logging;
 | 
			
		||||
using Microsoft.Extensions.Logging.Abstractions;
 | 
			
		||||
using MongoDB.Bson;
 | 
			
		||||
using MongoDB.Driver;
 | 
			
		||||
using StellaOps.Feedser.Source.Common;
 | 
			
		||||
using StellaOps.Feedser.Source.Common.Fetch;
 | 
			
		||||
using StellaOps.Feedser.Storage.Mongo;
 | 
			
		||||
using StellaOps.Feedser.Storage.Mongo.Documents;
 | 
			
		||||
 | 
			
		||||
namespace SourceStateSeeder;
 | 
			
		||||
 | 
			
		||||
internal static class Program
 | 
			
		||||
{
 | 
			
		||||
    private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web)
 | 
			
		||||
    {
 | 
			
		||||
        PropertyNameCaseInsensitive = true,
 | 
			
		||||
        ReadCommentHandling = JsonCommentHandling.Skip,
 | 
			
		||||
        AllowTrailingCommas = true,
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    public static async Task<int> Main(string[] args)
 | 
			
		||||
    {
 | 
			
		||||
        try
 | 
			
		||||
        {
 | 
			
		||||
            var options = SeedOptions.Parse(args);
 | 
			
		||||
            if (options is null)
 | 
			
		||||
            {
 | 
			
		||||
                SeedOptions.PrintUsage();
 | 
			
		||||
                return 1;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            var seed = await LoadSpecificationAsync(options.InputPath).ConfigureAwait(false);
 | 
			
		||||
            var sourceName = seed.Source ?? options.SourceName;
 | 
			
		||||
            if (string.IsNullOrWhiteSpace(sourceName))
 | 
			
		||||
            {
 | 
			
		||||
                Console.Error.WriteLine("Source name must be supplied via --source or the seed file.");
 | 
			
		||||
                return 1;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            var client = new MongoClient(options.ConnectionString);
 | 
			
		||||
            var database = client.GetDatabase(options.DatabaseName);
 | 
			
		||||
 | 
			
		||||
            var loggerFactory = NullLoggerFactory.Instance;
 | 
			
		||||
            var documentStore = new DocumentStore(database, loggerFactory.CreateLogger<DocumentStore>());
 | 
			
		||||
            var rawStorage = new RawDocumentStorage(database);
 | 
			
		||||
            var stateRepository = new MongoSourceStateRepository(database, loggerFactory.CreateLogger<MongoSourceStateRepository>());
 | 
			
		||||
 | 
			
		||||
            var pendingDocumentIds = new List<Guid>();
 | 
			
		||||
            var pendingMappingIds = new List<Guid>();
 | 
			
		||||
            var knownAdvisories = new List<string>();
 | 
			
		||||
 | 
			
		||||
            var now = DateTimeOffset.UtcNow;
 | 
			
		||||
            var baseDirectory = Path.GetDirectoryName(Path.GetFullPath(options.InputPath)) ?? Directory.GetCurrentDirectory();
 | 
			
		||||
 | 
			
		||||
            foreach (var document in seed.Documents)
 | 
			
		||||
            {
 | 
			
		||||
                var (record, addedToPendingDocs, addedToPendingMaps, known) = await UpsertDocumentAsync(
 | 
			
		||||
                    documentStore,
 | 
			
		||||
                    rawStorage,
 | 
			
		||||
                    sourceName,
 | 
			
		||||
                    baseDirectory,
 | 
			
		||||
                    now,
 | 
			
		||||
                    document,
 | 
			
		||||
                    cancellationToken: default).ConfigureAwait(false);
 | 
			
		||||
 | 
			
		||||
                if (addedToPendingDocs)
 | 
			
		||||
                {
 | 
			
		||||
                    pendingDocumentIds.Add(record.Id);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                if (addedToPendingMaps)
 | 
			
		||||
                {
 | 
			
		||||
                    pendingMappingIds.Add(record.Id);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                if (known is not null)
 | 
			
		||||
                {
 | 
			
		||||
                    knownAdvisories.AddRange(known);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            await UpdateCursorAsync(
 | 
			
		||||
                stateRepository,
 | 
			
		||||
                sourceName,
 | 
			
		||||
                seed.Cursor,
 | 
			
		||||
                pendingDocumentIds,
 | 
			
		||||
                pendingMappingIds,
 | 
			
		||||
                knownAdvisories,
 | 
			
		||||
                now).ConfigureAwait(false);
 | 
			
		||||
 | 
			
		||||
            Console.WriteLine($"Seeded {pendingDocumentIds.Count + pendingMappingIds.Count} documents for {sourceName}.");
 | 
			
		||||
            return 0;
 | 
			
		||||
        }
 | 
			
		||||
        catch (Exception ex)
 | 
			
		||||
        {
 | 
			
		||||
            Console.Error.WriteLine($"Error: {ex.Message}");
 | 
			
		||||
            return 1;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static async Task<StateSeed> LoadSpecificationAsync(string inputPath)
 | 
			
		||||
    {
 | 
			
		||||
        await using var stream = File.OpenRead(inputPath);
 | 
			
		||||
        var seed = await JsonSerializer.DeserializeAsync<StateSeed>(stream, JsonOptions).ConfigureAwait(false)
 | 
			
		||||
                   ?? throw new InvalidOperationException("Input file deserialized to null.");
 | 
			
		||||
        return seed;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static async Task<(DocumentRecord Record, bool PendingDoc, bool PendingMap, IReadOnlyCollection<string>? Known)> UpsertDocumentAsync(
 | 
			
		||||
        DocumentStore documentStore,
 | 
			
		||||
        RawDocumentStorage rawStorage,
 | 
			
		||||
        string sourceName,
 | 
			
		||||
        string baseDirectory,
 | 
			
		||||
        DateTimeOffset fetchedAt,
 | 
			
		||||
        DocumentSeed seed,
 | 
			
		||||
        CancellationToken cancellationToken)
 | 
			
		||||
    {
 | 
			
		||||
        if (string.IsNullOrWhiteSpace(seed.Uri))
 | 
			
		||||
        {
 | 
			
		||||
            throw new InvalidOperationException("Seed entry missing 'uri'.");
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (string.IsNullOrWhiteSpace(seed.ContentFile))
 | 
			
		||||
        {
 | 
			
		||||
            throw new InvalidOperationException($"Seed entry for '{seed.Uri}' missing 'contentFile'.");
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        var contentPath = Path.IsPathRooted(seed.ContentFile)
 | 
			
		||||
            ? seed.ContentFile
 | 
			
		||||
            : Path.GetFullPath(Path.Combine(baseDirectory, seed.ContentFile));
 | 
			
		||||
 | 
			
		||||
        if (!File.Exists(contentPath))
 | 
			
		||||
        {
 | 
			
		||||
            throw new FileNotFoundException($"Content file not found for '{seed.Uri}'.", contentPath);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        var contentBytes = await File.ReadAllBytesAsync(contentPath, cancellationToken).ConfigureAwait(false);
 | 
			
		||||
        var sha256 = Convert.ToHexString(SHA256.HashData(contentBytes)).ToLowerInvariant();
 | 
			
		||||
        var gridId = await rawStorage.UploadAsync(
 | 
			
		||||
            sourceName,
 | 
			
		||||
            seed.Uri,
 | 
			
		||||
            contentBytes,
 | 
			
		||||
            seed.ContentType,
 | 
			
		||||
            seed.ExpiresAt,
 | 
			
		||||
            cancellationToken).ConfigureAwait(false);
 | 
			
		||||
 | 
			
		||||
        var metadata = seed.Metadata is null
 | 
			
		||||
            ? new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase)
 | 
			
		||||
            : new Dictionary<string, string>(seed.Metadata, StringComparer.OrdinalIgnoreCase);
 | 
			
		||||
 | 
			
		||||
        var headers = seed.Headers is null
 | 
			
		||||
            ? new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase)
 | 
			
		||||
            : new Dictionary<string, string>(seed.Headers, StringComparer.OrdinalIgnoreCase);
 | 
			
		||||
 | 
			
		||||
        if (!headers.ContainsKey("content-type") && !string.IsNullOrWhiteSpace(seed.ContentType))
 | 
			
		||||
        {
 | 
			
		||||
            headers["content-type"] = seed.ContentType!;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        var lastModified = seed.LastModified is null
 | 
			
		||||
            ? (DateTimeOffset?)null
 | 
			
		||||
            : DateTimeOffset.Parse(seed.LastModified, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal | DateTimeStyles.AdjustToUniversal);
 | 
			
		||||
 | 
			
		||||
        var record = new DocumentRecord(
 | 
			
		||||
            Guid.NewGuid(),
 | 
			
		||||
            sourceName,
 | 
			
		||||
            seed.Uri,
 | 
			
		||||
            fetchedAt,
 | 
			
		||||
            sha256,
 | 
			
		||||
            string.IsNullOrWhiteSpace(seed.Status) ? DocumentStatuses.PendingParse : seed.Status,
 | 
			
		||||
            seed.ContentType,
 | 
			
		||||
            headers,
 | 
			
		||||
            metadata,
 | 
			
		||||
            seed.Etag,
 | 
			
		||||
            lastModified,
 | 
			
		||||
            gridId,
 | 
			
		||||
            seed.ExpiresAt);
 | 
			
		||||
 | 
			
		||||
        var upserted = await documentStore.UpsertAsync(record, cancellationToken).ConfigureAwait(false);
 | 
			
		||||
 | 
			
		||||
        return (upserted, seed.AddToPendingDocuments, seed.AddToPendingMappings, seed.KnownIdentifiers);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static async Task UpdateCursorAsync(
 | 
			
		||||
        ISourceStateRepository repository,
 | 
			
		||||
        string sourceName,
 | 
			
		||||
        CursorSeed? cursorSeed,
 | 
			
		||||
        IReadOnlyCollection<Guid> pendingDocuments,
 | 
			
		||||
        IReadOnlyCollection<Guid> pendingMappings,
 | 
			
		||||
        IReadOnlyCollection<string> knownAdvisories,
 | 
			
		||||
        DateTimeOffset completedAt)
 | 
			
		||||
    {
 | 
			
		||||
        var state = await repository.TryGetAsync(sourceName, CancellationToken.None).ConfigureAwait(false);
 | 
			
		||||
        var cursor = state?.Cursor ?? new BsonDocument();
 | 
			
		||||
 | 
			
		||||
        MergeGuidArray(cursor, "pendingDocuments", pendingDocuments);
 | 
			
		||||
        MergeGuidArray(cursor, "pendingMappings", pendingMappings);
 | 
			
		||||
 | 
			
		||||
        if (knownAdvisories.Count > 0)
 | 
			
		||||
        {
 | 
			
		||||
            MergeStringArray(cursor, "knownAdvisories", knownAdvisories);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (cursorSeed is not null)
 | 
			
		||||
        {
 | 
			
		||||
            if (cursorSeed.LastModifiedCursor.HasValue)
 | 
			
		||||
            {
 | 
			
		||||
                cursor["lastModifiedCursor"] = cursorSeed.LastModifiedCursor.Value.UtcDateTime;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            if (cursorSeed.LastFetchAt.HasValue)
 | 
			
		||||
            {
 | 
			
		||||
                cursor["lastFetchAt"] = cursorSeed.LastFetchAt.Value.UtcDateTime;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            if (cursorSeed.Additional is not null)
 | 
			
		||||
            {
 | 
			
		||||
                foreach (var kvp in cursorSeed.Additional)
 | 
			
		||||
                {
 | 
			
		||||
                    cursor[kvp.Key] = kvp.Value;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        cursor["lastSeededAt"] = completedAt.UtcDateTime;
 | 
			
		||||
 | 
			
		||||
        await repository.UpdateCursorAsync(sourceName, cursor, completedAt, CancellationToken.None).ConfigureAwait(false);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static void MergeGuidArray(BsonDocument cursor, string field, IReadOnlyCollection<Guid> values)
 | 
			
		||||
    {
 | 
			
		||||
        if (values.Count == 0)
 | 
			
		||||
        {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        var existing = cursor.TryGetValue(field, out var value) && value is BsonArray array
 | 
			
		||||
            ? array.Select(v => Guid.TryParse(v?.AsString, out var parsed) ? parsed : Guid.Empty)
 | 
			
		||||
                .Where(g => g != Guid.Empty)
 | 
			
		||||
                .ToHashSet()
 | 
			
		||||
            : new HashSet<Guid>();
 | 
			
		||||
 | 
			
		||||
        foreach (var guid in values)
 | 
			
		||||
        {
 | 
			
		||||
            existing.Add(guid);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        cursor[field] = new BsonArray(existing.Select(g => g.ToString()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static void MergeStringArray(BsonDocument cursor, string field, IReadOnlyCollection<string> values)
 | 
			
		||||
    {
 | 
			
		||||
        if (values.Count == 0)
 | 
			
		||||
        {
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        var existing = cursor.TryGetValue(field, out var value) && value is BsonArray array
 | 
			
		||||
            ? array.Select(v => v?.AsString ?? string.Empty)
 | 
			
		||||
                .Where(s => !string.IsNullOrWhiteSpace(s))
 | 
			
		||||
                .ToHashSet(StringComparer.OrdinalIgnoreCase)
 | 
			
		||||
            : new HashSet<string>(StringComparer.OrdinalIgnoreCase);
 | 
			
		||||
 | 
			
		||||
        foreach (var entry in values)
 | 
			
		||||
        {
 | 
			
		||||
            if (!string.IsNullOrWhiteSpace(entry))
 | 
			
		||||
            {
 | 
			
		||||
                existing.Add(entry.Trim());
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        cursor[field] = new BsonArray(existing.OrderBy(s => s, StringComparer.OrdinalIgnoreCase));
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
internal sealed record SeedOptions
 | 
			
		||||
{
 | 
			
		||||
    public required string ConnectionString { get; init; }
 | 
			
		||||
    public required string DatabaseName { get; init; }
 | 
			
		||||
    public required string InputPath { get; init; }
 | 
			
		||||
    public string? SourceName { get; init; }
 | 
			
		||||
 | 
			
		||||
    public static SeedOptions? Parse(string[] args)
 | 
			
		||||
    {
 | 
			
		||||
        string? connectionString = null;
 | 
			
		||||
        string? database = null;
 | 
			
		||||
        string? input = null;
 | 
			
		||||
        string? source = null;
 | 
			
		||||
 | 
			
		||||
        for (var i = 0; i < args.Length; i++)
 | 
			
		||||
        {
 | 
			
		||||
            var arg = args[i];
 | 
			
		||||
            switch (arg)
 | 
			
		||||
            {
 | 
			
		||||
                case "--connection-string":
 | 
			
		||||
                case "-c":
 | 
			
		||||
                    connectionString = TakeValue(args, ref i, arg);
 | 
			
		||||
                    break;
 | 
			
		||||
                case "--database":
 | 
			
		||||
                case "-d":
 | 
			
		||||
                    database = TakeValue(args, ref i, arg);
 | 
			
		||||
                    break;
 | 
			
		||||
                case "--input":
 | 
			
		||||
                case "-i":
 | 
			
		||||
                    input = TakeValue(args, ref i, arg);
 | 
			
		||||
                    break;
 | 
			
		||||
                case "--source":
 | 
			
		||||
                case "-s":
 | 
			
		||||
                    source = TakeValue(args, ref i, arg);
 | 
			
		||||
                    break;
 | 
			
		||||
                case "--help":
 | 
			
		||||
                case "-h":
 | 
			
		||||
                    return null;
 | 
			
		||||
                default:
 | 
			
		||||
                    Console.Error.WriteLine($"Unrecognized argument '{arg}'.");
 | 
			
		||||
                    return null;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (string.IsNullOrWhiteSpace(connectionString) || string.IsNullOrWhiteSpace(database) || string.IsNullOrWhiteSpace(input))
 | 
			
		||||
        {
 | 
			
		||||
            return null;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return new SeedOptions
 | 
			
		||||
        {
 | 
			
		||||
            ConnectionString = connectionString,
 | 
			
		||||
            DatabaseName = database,
 | 
			
		||||
            InputPath = input,
 | 
			
		||||
            SourceName = source,
 | 
			
		||||
        };
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static void PrintUsage()
 | 
			
		||||
    {
 | 
			
		||||
        Console.WriteLine("Usage: dotnet run --project tools/SourceStateSeeder -- --connection-string <connection> --database <name> --input <seed.json> [--source <source>]");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private static string TakeValue(string[] args, ref int index, string arg)
 | 
			
		||||
    {
 | 
			
		||||
        if (index + 1 >= args.Length)
 | 
			
		||||
        {
 | 
			
		||||
            throw new ArgumentException($"Missing value for {arg}.");
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        index++;
 | 
			
		||||
        return args[index];
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
internal sealed record StateSeed
 | 
			
		||||
{
 | 
			
		||||
    public string? Source { get; init; }
 | 
			
		||||
    public List<DocumentSeed> Documents { get; init; } = new();
 | 
			
		||||
    public CursorSeed? Cursor { get; init; }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
internal sealed record DocumentSeed
 | 
			
		||||
{
 | 
			
		||||
    public string Uri { get; init; } = string.Empty;
 | 
			
		||||
    public string ContentFile { get; init; } = string.Empty;
 | 
			
		||||
    public string? ContentType { get; init; }
 | 
			
		||||
    public Dictionary<string, string>? Metadata { get; init; }
 | 
			
		||||
    public Dictionary<string, string>? Headers { get; init; }
 | 
			
		||||
    public string Status { get; init; } = DocumentStatuses.PendingParse;
 | 
			
		||||
    public bool AddToPendingDocuments { get; init; } = true;
 | 
			
		||||
    public bool AddToPendingMappings { get; init; }
 | 
			
		||||
    public string? LastModified { get; init; }
 | 
			
		||||
    public string? Etag { get; init; }
 | 
			
		||||
    public DateTimeOffset? ExpiresAt { get; init; }
 | 
			
		||||
    public IReadOnlyCollection<string>? KnownIdentifiers { get; init; }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
internal sealed record CursorSeed
 | 
			
		||||
{
 | 
			
		||||
    public DateTimeOffset? LastModifiedCursor { get; init; }
 | 
			
		||||
    public DateTimeOffset? LastFetchAt { get; init; }
 | 
			
		||||
    public Dictionary<string, string>? Additional { get; init; }
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user