using System.Globalization; using System.Security.Cryptography; using System.Text.Json; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using MongoDB.Bson; using MongoDB.Driver; using StellaOps.Feedser.Source.Common; using StellaOps.Feedser.Source.Common.Fetch; using StellaOps.Feedser.Storage.Mongo; using StellaOps.Feedser.Storage.Mongo.Documents; namespace SourceStateSeeder; internal static class Program { private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web) { PropertyNameCaseInsensitive = true, ReadCommentHandling = JsonCommentHandling.Skip, AllowTrailingCommas = true, }; public static async Task Main(string[] args) { try { var options = SeedOptions.Parse(args); if (options is null) { SeedOptions.PrintUsage(); return 1; } var seed = await LoadSpecificationAsync(options.InputPath).ConfigureAwait(false); var sourceName = seed.Source ?? options.SourceName; if (string.IsNullOrWhiteSpace(sourceName)) { Console.Error.WriteLine("Source name must be supplied via --source or the seed file."); return 1; } var client = new MongoClient(options.ConnectionString); var database = client.GetDatabase(options.DatabaseName); var loggerFactory = NullLoggerFactory.Instance; var documentStore = new DocumentStore(database, loggerFactory.CreateLogger()); var rawStorage = new RawDocumentStorage(database); var stateRepository = new MongoSourceStateRepository(database, loggerFactory.CreateLogger()); var pendingDocumentIds = new List(); var pendingMappingIds = new List(); var knownAdvisories = new List(); var now = DateTimeOffset.UtcNow; var baseDirectory = Path.GetDirectoryName(Path.GetFullPath(options.InputPath)) ?? Directory.GetCurrentDirectory(); foreach (var document in seed.Documents) { var (record, addedToPendingDocs, addedToPendingMaps, known) = await UpsertDocumentAsync( documentStore, rawStorage, sourceName, baseDirectory, now, document, cancellationToken: default).ConfigureAwait(false); if (addedToPendingDocs) { pendingDocumentIds.Add(record.Id); } if (addedToPendingMaps) { pendingMappingIds.Add(record.Id); } if (known is not null) { knownAdvisories.AddRange(known); } } await UpdateCursorAsync( stateRepository, sourceName, seed.Cursor, pendingDocumentIds, pendingMappingIds, knownAdvisories, now).ConfigureAwait(false); Console.WriteLine($"Seeded {pendingDocumentIds.Count + pendingMappingIds.Count} documents for {sourceName}."); return 0; } catch (Exception ex) { Console.Error.WriteLine($"Error: {ex.Message}"); return 1; } } private static async Task LoadSpecificationAsync(string inputPath) { await using var stream = File.OpenRead(inputPath); var seed = await JsonSerializer.DeserializeAsync(stream, JsonOptions).ConfigureAwait(false) ?? throw new InvalidOperationException("Input file deserialized to null."); return seed; } private static async Task<(DocumentRecord Record, bool PendingDoc, bool PendingMap, IReadOnlyCollection? Known)> UpsertDocumentAsync( DocumentStore documentStore, RawDocumentStorage rawStorage, string sourceName, string baseDirectory, DateTimeOffset fetchedAt, DocumentSeed seed, CancellationToken cancellationToken) { if (string.IsNullOrWhiteSpace(seed.Uri)) { throw new InvalidOperationException("Seed entry missing 'uri'."); } if (string.IsNullOrWhiteSpace(seed.ContentFile)) { throw new InvalidOperationException($"Seed entry for '{seed.Uri}' missing 'contentFile'."); } var contentPath = Path.IsPathRooted(seed.ContentFile) ? seed.ContentFile : Path.GetFullPath(Path.Combine(baseDirectory, seed.ContentFile)); if (!File.Exists(contentPath)) { throw new FileNotFoundException($"Content file not found for '{seed.Uri}'.", contentPath); } var contentBytes = await File.ReadAllBytesAsync(contentPath, cancellationToken).ConfigureAwait(false); var sha256 = Convert.ToHexString(SHA256.HashData(contentBytes)).ToLowerInvariant(); var gridId = await rawStorage.UploadAsync( sourceName, seed.Uri, contentBytes, seed.ContentType, seed.ExpiresAt, cancellationToken).ConfigureAwait(false); var metadata = seed.Metadata is null ? new Dictionary(StringComparer.OrdinalIgnoreCase) : new Dictionary(seed.Metadata, StringComparer.OrdinalIgnoreCase); var headers = seed.Headers is null ? new Dictionary(StringComparer.OrdinalIgnoreCase) : new Dictionary(seed.Headers, StringComparer.OrdinalIgnoreCase); if (!headers.ContainsKey("content-type") && !string.IsNullOrWhiteSpace(seed.ContentType)) { headers["content-type"] = seed.ContentType!; } var lastModified = seed.LastModified is null ? (DateTimeOffset?)null : DateTimeOffset.Parse(seed.LastModified, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal | DateTimeStyles.AdjustToUniversal); var record = new DocumentRecord( Guid.NewGuid(), sourceName, seed.Uri, fetchedAt, sha256, string.IsNullOrWhiteSpace(seed.Status) ? DocumentStatuses.PendingParse : seed.Status, seed.ContentType, headers, metadata, seed.Etag, lastModified, gridId, seed.ExpiresAt); var upserted = await documentStore.UpsertAsync(record, cancellationToken).ConfigureAwait(false); return (upserted, seed.AddToPendingDocuments, seed.AddToPendingMappings, seed.KnownIdentifiers); } private static async Task UpdateCursorAsync( ISourceStateRepository repository, string sourceName, CursorSeed? cursorSeed, IReadOnlyCollection pendingDocuments, IReadOnlyCollection pendingMappings, IReadOnlyCollection knownAdvisories, DateTimeOffset completedAt) { var state = await repository.TryGetAsync(sourceName, CancellationToken.None).ConfigureAwait(false); var cursor = state?.Cursor ?? new BsonDocument(); MergeGuidArray(cursor, "pendingDocuments", pendingDocuments); MergeGuidArray(cursor, "pendingMappings", pendingMappings); if (knownAdvisories.Count > 0) { MergeStringArray(cursor, "knownAdvisories", knownAdvisories); } if (cursorSeed is not null) { if (cursorSeed.LastModifiedCursor.HasValue) { cursor["lastModifiedCursor"] = cursorSeed.LastModifiedCursor.Value.UtcDateTime; } if (cursorSeed.LastFetchAt.HasValue) { cursor["lastFetchAt"] = cursorSeed.LastFetchAt.Value.UtcDateTime; } if (cursorSeed.Additional is not null) { foreach (var kvp in cursorSeed.Additional) { cursor[kvp.Key] = kvp.Value; } } } cursor["lastSeededAt"] = completedAt.UtcDateTime; await repository.UpdateCursorAsync(sourceName, cursor, completedAt, CancellationToken.None).ConfigureAwait(false); } private static void MergeGuidArray(BsonDocument cursor, string field, IReadOnlyCollection values) { if (values.Count == 0) { return; } var existing = cursor.TryGetValue(field, out var value) && value is BsonArray array ? array.Select(v => Guid.TryParse(v?.AsString, out var parsed) ? parsed : Guid.Empty) .Where(g => g != Guid.Empty) .ToHashSet() : new HashSet(); foreach (var guid in values) { existing.Add(guid); } cursor[field] = new BsonArray(existing.Select(g => g.ToString())); } private static void MergeStringArray(BsonDocument cursor, string field, IReadOnlyCollection values) { if (values.Count == 0) { return; } var existing = cursor.TryGetValue(field, out var value) && value is BsonArray array ? array.Select(v => v?.AsString ?? string.Empty) .Where(s => !string.IsNullOrWhiteSpace(s)) .ToHashSet(StringComparer.OrdinalIgnoreCase) : new HashSet(StringComparer.OrdinalIgnoreCase); foreach (var entry in values) { if (!string.IsNullOrWhiteSpace(entry)) { existing.Add(entry.Trim()); } } cursor[field] = new BsonArray(existing.OrderBy(s => s, StringComparer.OrdinalIgnoreCase)); } } internal sealed record SeedOptions { public required string ConnectionString { get; init; } public required string DatabaseName { get; init; } public required string InputPath { get; init; } public string? SourceName { get; init; } public static SeedOptions? Parse(string[] args) { string? connectionString = null; string? database = null; string? input = null; string? source = null; for (var i = 0; i < args.Length; i++) { var arg = args[i]; switch (arg) { case "--connection-string": case "-c": connectionString = TakeValue(args, ref i, arg); break; case "--database": case "-d": database = TakeValue(args, ref i, arg); break; case "--input": case "-i": input = TakeValue(args, ref i, arg); break; case "--source": case "-s": source = TakeValue(args, ref i, arg); break; case "--help": case "-h": return null; default: Console.Error.WriteLine($"Unrecognized argument '{arg}'."); return null; } } if (string.IsNullOrWhiteSpace(connectionString) || string.IsNullOrWhiteSpace(database) || string.IsNullOrWhiteSpace(input)) { return null; } return new SeedOptions { ConnectionString = connectionString, DatabaseName = database, InputPath = input, SourceName = source, }; } public static void PrintUsage() { Console.WriteLine("Usage: dotnet run --project tools/SourceStateSeeder -- --connection-string --database --input [--source ]"); } private static string TakeValue(string[] args, ref int index, string arg) { if (index + 1 >= args.Length) { throw new ArgumentException($"Missing value for {arg}."); } index++; return args[index]; } } internal sealed record StateSeed { public string? Source { get; init; } public List Documents { get; init; } = new(); public CursorSeed? Cursor { get; init; } } internal sealed record DocumentSeed { public string Uri { get; init; } = string.Empty; public string ContentFile { get; init; } = string.Empty; public string? ContentType { get; init; } public Dictionary? Metadata { get; init; } public Dictionary? Headers { get; init; } public string Status { get; init; } = DocumentStatuses.PendingParse; public bool AddToPendingDocuments { get; init; } = true; public bool AddToPendingMappings { get; init; } public string? LastModified { get; init; } public string? Etag { get; init; } public DateTimeOffset? ExpiresAt { get; init; } public IReadOnlyCollection? KnownIdentifiers { get; init; } } internal sealed record CursorSeed { public DateTimeOffset? LastModifiedCursor { get; init; } public DateTimeOffset? LastFetchAt { get; init; } public Dictionary? Additional { get; init; } }