383 lines
13 KiB
C#
383 lines
13 KiB
C#
using System.Globalization;
|
|
using System.Security.Cryptography;
|
|
using System.Text.Json;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using MongoDB.Bson;
|
|
using MongoDB.Driver;
|
|
using StellaOps.Feedser.Source.Common;
|
|
using StellaOps.Feedser.Source.Common.Fetch;
|
|
using StellaOps.Feedser.Storage.Mongo;
|
|
using StellaOps.Feedser.Storage.Mongo.Documents;
|
|
|
|
namespace SourceStateSeeder;
|
|
|
|
internal static class Program
|
|
{
|
|
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web)
|
|
{
|
|
PropertyNameCaseInsensitive = true,
|
|
ReadCommentHandling = JsonCommentHandling.Skip,
|
|
AllowTrailingCommas = true,
|
|
};
|
|
|
|
public static async Task<int> Main(string[] args)
|
|
{
|
|
try
|
|
{
|
|
var options = SeedOptions.Parse(args);
|
|
if (options is null)
|
|
{
|
|
SeedOptions.PrintUsage();
|
|
return 1;
|
|
}
|
|
|
|
var seed = await LoadSpecificationAsync(options.InputPath).ConfigureAwait(false);
|
|
var sourceName = seed.Source ?? options.SourceName;
|
|
if (string.IsNullOrWhiteSpace(sourceName))
|
|
{
|
|
Console.Error.WriteLine("Source name must be supplied via --source or the seed file.");
|
|
return 1;
|
|
}
|
|
|
|
var client = new MongoClient(options.ConnectionString);
|
|
var database = client.GetDatabase(options.DatabaseName);
|
|
|
|
var loggerFactory = NullLoggerFactory.Instance;
|
|
var documentStore = new DocumentStore(database, loggerFactory.CreateLogger<DocumentStore>());
|
|
var rawStorage = new RawDocumentStorage(database);
|
|
var stateRepository = new MongoSourceStateRepository(database, loggerFactory.CreateLogger<MongoSourceStateRepository>());
|
|
|
|
var pendingDocumentIds = new List<Guid>();
|
|
var pendingMappingIds = new List<Guid>();
|
|
var knownAdvisories = new List<string>();
|
|
|
|
var now = DateTimeOffset.UtcNow;
|
|
var baseDirectory = Path.GetDirectoryName(Path.GetFullPath(options.InputPath)) ?? Directory.GetCurrentDirectory();
|
|
|
|
foreach (var document in seed.Documents)
|
|
{
|
|
var (record, addedToPendingDocs, addedToPendingMaps, known) = await UpsertDocumentAsync(
|
|
documentStore,
|
|
rawStorage,
|
|
sourceName,
|
|
baseDirectory,
|
|
now,
|
|
document,
|
|
cancellationToken: default).ConfigureAwait(false);
|
|
|
|
if (addedToPendingDocs)
|
|
{
|
|
pendingDocumentIds.Add(record.Id);
|
|
}
|
|
|
|
if (addedToPendingMaps)
|
|
{
|
|
pendingMappingIds.Add(record.Id);
|
|
}
|
|
|
|
if (known is not null)
|
|
{
|
|
knownAdvisories.AddRange(known);
|
|
}
|
|
}
|
|
|
|
await UpdateCursorAsync(
|
|
stateRepository,
|
|
sourceName,
|
|
seed.Cursor,
|
|
pendingDocumentIds,
|
|
pendingMappingIds,
|
|
knownAdvisories,
|
|
now).ConfigureAwait(false);
|
|
|
|
Console.WriteLine($"Seeded {pendingDocumentIds.Count + pendingMappingIds.Count} documents for {sourceName}.");
|
|
return 0;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Console.Error.WriteLine($"Error: {ex.Message}");
|
|
return 1;
|
|
}
|
|
}
|
|
|
|
private static async Task<StateSeed> LoadSpecificationAsync(string inputPath)
|
|
{
|
|
await using var stream = File.OpenRead(inputPath);
|
|
var seed = await JsonSerializer.DeserializeAsync<StateSeed>(stream, JsonOptions).ConfigureAwait(false)
|
|
?? throw new InvalidOperationException("Input file deserialized to null.");
|
|
return seed;
|
|
}
|
|
|
|
private static async Task<(DocumentRecord Record, bool PendingDoc, bool PendingMap, IReadOnlyCollection<string>? Known)> UpsertDocumentAsync(
|
|
DocumentStore documentStore,
|
|
RawDocumentStorage rawStorage,
|
|
string sourceName,
|
|
string baseDirectory,
|
|
DateTimeOffset fetchedAt,
|
|
DocumentSeed seed,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(seed.Uri))
|
|
{
|
|
throw new InvalidOperationException("Seed entry missing 'uri'.");
|
|
}
|
|
|
|
if (string.IsNullOrWhiteSpace(seed.ContentFile))
|
|
{
|
|
throw new InvalidOperationException($"Seed entry for '{seed.Uri}' missing 'contentFile'.");
|
|
}
|
|
|
|
var contentPath = Path.IsPathRooted(seed.ContentFile)
|
|
? seed.ContentFile
|
|
: Path.GetFullPath(Path.Combine(baseDirectory, seed.ContentFile));
|
|
|
|
if (!File.Exists(contentPath))
|
|
{
|
|
throw new FileNotFoundException($"Content file not found for '{seed.Uri}'.", contentPath);
|
|
}
|
|
|
|
var contentBytes = await File.ReadAllBytesAsync(contentPath, cancellationToken).ConfigureAwait(false);
|
|
var sha256 = Convert.ToHexString(SHA256.HashData(contentBytes)).ToLowerInvariant();
|
|
var gridId = await rawStorage.UploadAsync(
|
|
sourceName,
|
|
seed.Uri,
|
|
contentBytes,
|
|
seed.ContentType,
|
|
seed.ExpiresAt,
|
|
cancellationToken).ConfigureAwait(false);
|
|
|
|
var metadata = seed.Metadata is null
|
|
? new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase)
|
|
: new Dictionary<string, string>(seed.Metadata, StringComparer.OrdinalIgnoreCase);
|
|
|
|
var headers = seed.Headers is null
|
|
? new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase)
|
|
: new Dictionary<string, string>(seed.Headers, StringComparer.OrdinalIgnoreCase);
|
|
|
|
if (!headers.ContainsKey("content-type") && !string.IsNullOrWhiteSpace(seed.ContentType))
|
|
{
|
|
headers["content-type"] = seed.ContentType!;
|
|
}
|
|
|
|
var lastModified = seed.LastModified is null
|
|
? (DateTimeOffset?)null
|
|
: DateTimeOffset.Parse(seed.LastModified, CultureInfo.InvariantCulture, DateTimeStyles.AssumeUniversal | DateTimeStyles.AdjustToUniversal);
|
|
|
|
var record = new DocumentRecord(
|
|
Guid.NewGuid(),
|
|
sourceName,
|
|
seed.Uri,
|
|
fetchedAt,
|
|
sha256,
|
|
string.IsNullOrWhiteSpace(seed.Status) ? DocumentStatuses.PendingParse : seed.Status,
|
|
seed.ContentType,
|
|
headers,
|
|
metadata,
|
|
seed.Etag,
|
|
lastModified,
|
|
gridId,
|
|
seed.ExpiresAt);
|
|
|
|
var upserted = await documentStore.UpsertAsync(record, cancellationToken).ConfigureAwait(false);
|
|
|
|
return (upserted, seed.AddToPendingDocuments, seed.AddToPendingMappings, seed.KnownIdentifiers);
|
|
}
|
|
|
|
private static async Task UpdateCursorAsync(
|
|
ISourceStateRepository repository,
|
|
string sourceName,
|
|
CursorSeed? cursorSeed,
|
|
IReadOnlyCollection<Guid> pendingDocuments,
|
|
IReadOnlyCollection<Guid> pendingMappings,
|
|
IReadOnlyCollection<string> knownAdvisories,
|
|
DateTimeOffset completedAt)
|
|
{
|
|
var state = await repository.TryGetAsync(sourceName, CancellationToken.None).ConfigureAwait(false);
|
|
var cursor = state?.Cursor ?? new BsonDocument();
|
|
|
|
MergeGuidArray(cursor, "pendingDocuments", pendingDocuments);
|
|
MergeGuidArray(cursor, "pendingMappings", pendingMappings);
|
|
|
|
if (knownAdvisories.Count > 0)
|
|
{
|
|
MergeStringArray(cursor, "knownAdvisories", knownAdvisories);
|
|
}
|
|
|
|
if (cursorSeed is not null)
|
|
{
|
|
if (cursorSeed.LastModifiedCursor.HasValue)
|
|
{
|
|
cursor["lastModifiedCursor"] = cursorSeed.LastModifiedCursor.Value.UtcDateTime;
|
|
}
|
|
|
|
if (cursorSeed.LastFetchAt.HasValue)
|
|
{
|
|
cursor["lastFetchAt"] = cursorSeed.LastFetchAt.Value.UtcDateTime;
|
|
}
|
|
|
|
if (cursorSeed.Additional is not null)
|
|
{
|
|
foreach (var kvp in cursorSeed.Additional)
|
|
{
|
|
cursor[kvp.Key] = kvp.Value;
|
|
}
|
|
}
|
|
}
|
|
|
|
cursor["lastSeededAt"] = completedAt.UtcDateTime;
|
|
|
|
await repository.UpdateCursorAsync(sourceName, cursor, completedAt, CancellationToken.None).ConfigureAwait(false);
|
|
}
|
|
|
|
private static void MergeGuidArray(BsonDocument cursor, string field, IReadOnlyCollection<Guid> values)
|
|
{
|
|
if (values.Count == 0)
|
|
{
|
|
return;
|
|
}
|
|
|
|
var existing = cursor.TryGetValue(field, out var value) && value is BsonArray array
|
|
? array.Select(v => Guid.TryParse(v?.AsString, out var parsed) ? parsed : Guid.Empty)
|
|
.Where(g => g != Guid.Empty)
|
|
.ToHashSet()
|
|
: new HashSet<Guid>();
|
|
|
|
foreach (var guid in values)
|
|
{
|
|
existing.Add(guid);
|
|
}
|
|
|
|
cursor[field] = new BsonArray(existing.Select(g => g.ToString()));
|
|
}
|
|
|
|
private static void MergeStringArray(BsonDocument cursor, string field, IReadOnlyCollection<string> values)
|
|
{
|
|
if (values.Count == 0)
|
|
{
|
|
return;
|
|
}
|
|
|
|
var existing = cursor.TryGetValue(field, out var value) && value is BsonArray array
|
|
? array.Select(v => v?.AsString ?? string.Empty)
|
|
.Where(s => !string.IsNullOrWhiteSpace(s))
|
|
.ToHashSet(StringComparer.OrdinalIgnoreCase)
|
|
: new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
|
|
|
foreach (var entry in values)
|
|
{
|
|
if (!string.IsNullOrWhiteSpace(entry))
|
|
{
|
|
existing.Add(entry.Trim());
|
|
}
|
|
}
|
|
|
|
cursor[field] = new BsonArray(existing.OrderBy(s => s, StringComparer.OrdinalIgnoreCase));
|
|
}
|
|
}
|
|
|
|
internal sealed record SeedOptions
|
|
{
|
|
public required string ConnectionString { get; init; }
|
|
public required string DatabaseName { get; init; }
|
|
public required string InputPath { get; init; }
|
|
public string? SourceName { get; init; }
|
|
|
|
public static SeedOptions? Parse(string[] args)
|
|
{
|
|
string? connectionString = null;
|
|
string? database = null;
|
|
string? input = null;
|
|
string? source = null;
|
|
|
|
for (var i = 0; i < args.Length; i++)
|
|
{
|
|
var arg = args[i];
|
|
switch (arg)
|
|
{
|
|
case "--connection-string":
|
|
case "-c":
|
|
connectionString = TakeValue(args, ref i, arg);
|
|
break;
|
|
case "--database":
|
|
case "-d":
|
|
database = TakeValue(args, ref i, arg);
|
|
break;
|
|
case "--input":
|
|
case "-i":
|
|
input = TakeValue(args, ref i, arg);
|
|
break;
|
|
case "--source":
|
|
case "-s":
|
|
source = TakeValue(args, ref i, arg);
|
|
break;
|
|
case "--help":
|
|
case "-h":
|
|
return null;
|
|
default:
|
|
Console.Error.WriteLine($"Unrecognized argument '{arg}'.");
|
|
return null;
|
|
}
|
|
}
|
|
|
|
if (string.IsNullOrWhiteSpace(connectionString) || string.IsNullOrWhiteSpace(database) || string.IsNullOrWhiteSpace(input))
|
|
{
|
|
return null;
|
|
}
|
|
|
|
return new SeedOptions
|
|
{
|
|
ConnectionString = connectionString,
|
|
DatabaseName = database,
|
|
InputPath = input,
|
|
SourceName = source,
|
|
};
|
|
}
|
|
|
|
public static void PrintUsage()
|
|
{
|
|
Console.WriteLine("Usage: dotnet run --project tools/SourceStateSeeder -- --connection-string <connection> --database <name> --input <seed.json> [--source <source>]");
|
|
}
|
|
|
|
private static string TakeValue(string[] args, ref int index, string arg)
|
|
{
|
|
if (index + 1 >= args.Length)
|
|
{
|
|
throw new ArgumentException($"Missing value for {arg}.");
|
|
}
|
|
|
|
index++;
|
|
return args[index];
|
|
}
|
|
}
|
|
|
|
internal sealed record StateSeed
|
|
{
|
|
public string? Source { get; init; }
|
|
public List<DocumentSeed> Documents { get; init; } = new();
|
|
public CursorSeed? Cursor { get; init; }
|
|
}
|
|
|
|
internal sealed record DocumentSeed
|
|
{
|
|
public string Uri { get; init; } = string.Empty;
|
|
public string ContentFile { get; init; } = string.Empty;
|
|
public string? ContentType { get; init; }
|
|
public Dictionary<string, string>? Metadata { get; init; }
|
|
public Dictionary<string, string>? Headers { get; init; }
|
|
public string Status { get; init; } = DocumentStatuses.PendingParse;
|
|
public bool AddToPendingDocuments { get; init; } = true;
|
|
public bool AddToPendingMappings { get; init; }
|
|
public string? LastModified { get; init; }
|
|
public string? Etag { get; init; }
|
|
public DateTimeOffset? ExpiresAt { get; init; }
|
|
public IReadOnlyCollection<string>? KnownIdentifiers { get; init; }
|
|
}
|
|
|
|
internal sealed record CursorSeed
|
|
{
|
|
public DateTimeOffset? LastModifiedCursor { get; init; }
|
|
public DateTimeOffset? LastFetchAt { get; init; }
|
|
public Dictionary<string, string>? Additional { get; init; }
|
|
}
|