using System.Globalization; using System.Net; using System.Net.Http.Headers; using Amazon; using Amazon.Runtime; using Amazon.S3; using Amazon.S3.Model; var options = MigrationOptions.Parse(args); if (options is null) { MigrationOptions.PrintUsage(); return 1; } Console.WriteLine($"RustFS migrator starting (prefix: '{options.Prefix ?? ""}')"); if (options.DryRun) { Console.WriteLine("Dry-run enabled. No objects will be written to RustFS."); } var s3Config = new AmazonS3Config { ForcePathStyle = true, }; if (!string.IsNullOrWhiteSpace(options.S3ServiceUrl)) { s3Config.ServiceURL = options.S3ServiceUrl; s3Config.UseHttp = options.S3ServiceUrl.StartsWith("http://", StringComparison.OrdinalIgnoreCase); } if (!string.IsNullOrWhiteSpace(options.S3Region)) { s3Config.RegionEndpoint = RegionEndpoint.GetBySystemName(options.S3Region); } using var s3Client = CreateS3Client(options, s3Config); using var httpClient = CreateRustFsClient(options); using var cts = options.TimeoutSeconds > 0 ? new CancellationTokenSource(TimeSpan.FromSeconds(options.TimeoutSeconds)) : null; var cancellationToken = cts?.Token ?? CancellationToken.None; var listRequest = new ListObjectsV2Request { BucketName = options.S3Bucket, Prefix = options.Prefix, MaxKeys = 1000, }; var migrated = 0; var skipped = 0; try { do { var response = await ExecuteWithRetriesAsync( token => s3Client.ListObjectsV2Async(listRequest, token), "ListObjectsV2", options, cancellationToken).ConfigureAwait(false); foreach (var entry in response.S3Objects) { if (entry.Size == 0 && entry.Key.EndsWith("/", StringComparison.Ordinal)) { skipped++; continue; } Console.WriteLine($"Migrating {entry.Key} ({entry.Size} bytes)..."); if (options.DryRun) { migrated++; continue; } try { await UploadObjectAsync(s3Client, httpClient, options, entry, cancellationToken).ConfigureAwait(false); migrated++; } catch (Exception ex) when (ex is not OperationCanceledException) { Console.Error.WriteLine($"Failed to upload {entry.Key}: {ex.Message}"); return 2; } } listRequest.ContinuationToken = response.NextContinuationToken; } while (!string.IsNullOrEmpty(listRequest.ContinuationToken)); } catch (OperationCanceledException) { Console.Error.WriteLine("Migration canceled."); return 3; } Console.WriteLine($"Migration complete. Migrated {migrated} objects. Skipped {skipped} directory markers."); return 0; static AmazonS3Client CreateS3Client(MigrationOptions options, AmazonS3Config config) { if (!string.IsNullOrWhiteSpace(options.S3AccessKey) && !string.IsNullOrWhiteSpace(options.S3SecretKey)) { var credentials = new BasicAWSCredentials(options.S3AccessKey, options.S3SecretKey); return new AmazonS3Client(credentials, config); } return new AmazonS3Client(config); } static HttpClient CreateRustFsClient(MigrationOptions options) { var client = new HttpClient { BaseAddress = new Uri(options.RustFsEndpoint, UriKind.Absolute), Timeout = TimeSpan.FromMinutes(5), }; if (!string.IsNullOrWhiteSpace(options.RustFsApiKeyHeader) && !string.IsNullOrWhiteSpace(options.RustFsApiKey)) { client.DefaultRequestHeaders.TryAddWithoutValidation(options.RustFsApiKeyHeader, options.RustFsApiKey); } return client; } static async Task UploadObjectAsync(IAmazonS3 s3Client, HttpClient httpClient, MigrationOptions options, S3Object entry, CancellationToken cancellationToken) { await ExecuteWithRetriesAsync(async token => { using var getResponse = await s3Client.GetObjectAsync(new GetObjectRequest { BucketName = options.S3Bucket, Key = entry.Key, }, token).ConfigureAwait(false); using var request = BuildRustFsRequest(options, entry.Key, getResponse); using var responseMessage = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, token).ConfigureAwait(false); if (!responseMessage.IsSuccessStatusCode) { var error = await responseMessage.Content.ReadAsStringAsync(token).ConfigureAwait(false); if (ShouldRetry(responseMessage.StatusCode)) { throw new RetryableException($"RustFS upload returned {(int)responseMessage.StatusCode} {responseMessage.ReasonPhrase}: {error}"); } throw new InvalidOperationException($"RustFS upload returned {(int)responseMessage.StatusCode} {responseMessage.ReasonPhrase}: {error}"); } return null!; }, $"Upload {entry.Key}", options, cancellationToken).ConfigureAwait(false); } static HttpRequestMessage BuildRustFsRequest(MigrationOptions options, string key, GetObjectResponse getResponse) { var request = new HttpRequestMessage(HttpMethod.Put, RustFsMigratorPaths.BuildRustFsUri(options, key)) { Content = new StreamContent(getResponse.ResponseStream), }; request.Content.Headers.ContentType = MediaTypeHeaderValue.Parse("application/octet-stream"); if (getResponse.Headers.ContentLength > 0) { request.Content.Headers.ContentLength = getResponse.Headers.ContentLength; } if (options.Immutable) { request.Headers.TryAddWithoutValidation("X-RustFS-Immutable", "true"); } if (options.RetentionSeconds is { } retainSeconds) { request.Headers.TryAddWithoutValidation("X-RustFS-Retain-Seconds", retainSeconds.ToString(CultureInfo.InvariantCulture)); } if (!string.IsNullOrWhiteSpace(options.RustFsApiKeyHeader) && !string.IsNullOrWhiteSpace(options.RustFsApiKey)) { request.Headers.TryAddWithoutValidation(options.RustFsApiKeyHeader!, options.RustFsApiKey!); } return request; } static async Task ExecuteWithRetriesAsync(Func> action, string operation, MigrationOptions options, CancellationToken cancellationToken) { Exception? last = null; for (var attempt = 1; attempt <= options.RetryAttempts; attempt++) { cancellationToken.ThrowIfCancellationRequested(); try { return await action(cancellationToken).ConfigureAwait(false); } catch (Exception ex) when (ShouldRetryException(ex) && attempt < options.RetryAttempts) { last = ex; Console.Error.WriteLine($"[WARN] {operation} attempt {attempt} failed: {ex.Message}"); await Task.Delay(ComputeBackoffDelay(attempt, options.RetryDelayMs), cancellationToken).ConfigureAwait(false); } } if (last is not null) { throw last; } return await action(cancellationToken).ConfigureAwait(false); } static TimeSpan ComputeBackoffDelay(int attempt, int retryDelayMs) { var multiplier = Math.Pow(2, Math.Max(0, attempt - 1)); var delayMs = Math.Min(retryDelayMs * multiplier, 5000); return TimeSpan.FromMilliseconds(delayMs); } static bool ShouldRetryException(Exception ex) => ex is RetryableException or HttpRequestException or AmazonS3Exception or IOException; static bool ShouldRetry(HttpStatusCode statusCode) => statusCode == HttpStatusCode.RequestTimeout || statusCode == (HttpStatusCode)429 || (int)statusCode >= 500; internal sealed class RetryableException : Exception { public RetryableException(string message) : base(message) { } } internal sealed record MigrationOptions { public string S3Bucket { get; init; } = string.Empty; public string? S3ServiceUrl { get; init; } = null; public string? S3Region { get; init; } = null; public string? S3AccessKey { get; init; } = null; public string? S3SecretKey { get; init; } = null; public string RustFsEndpoint { get; init; } = string.Empty; public string RustFsBucket { get; init; } = string.Empty; public string? RustFsApiKeyHeader { get; init; } = null; public string? RustFsApiKey { get; init; } = null; public string? Prefix { get; init; } = null; public bool Immutable { get; init; } = false; public int? RetentionSeconds { get; init; } = null; public bool DryRun { get; init; } = false; public int RetryAttempts { get; init; } = 3; public int RetryDelayMs { get; init; } = 250; public int TimeoutSeconds { get; init; } = 0; public static MigrationOptions? Parse(string[] args) { var builder = new Dictionary(StringComparer.OrdinalIgnoreCase); for (var i = 0; i < args.Length; i++) { var key = args[i]; if (key.StartsWith("--", StringComparison.OrdinalIgnoreCase)) { var normalized = key[2..]; if (string.Equals(normalized, "immutable", StringComparison.OrdinalIgnoreCase) || string.Equals(normalized, "dry-run", StringComparison.OrdinalIgnoreCase)) { builder[normalized] = "true"; continue; } if (i + 1 >= args.Length) { Console.Error.WriteLine($"Missing value for argument '{key}'."); return null; } builder[normalized] = args[++i]; } } if (!builder.TryGetValue("s3-bucket", out var bucket) || string.IsNullOrWhiteSpace(bucket)) { Console.Error.WriteLine("--s3-bucket is required."); return null; } if (!builder.TryGetValue("rustfs-endpoint", out var rustFsEndpoint) || string.IsNullOrWhiteSpace(rustFsEndpoint)) { Console.Error.WriteLine("--rustfs-endpoint is required."); return null; } if (!builder.TryGetValue("rustfs-bucket", out var rustFsBucket) || string.IsNullOrWhiteSpace(rustFsBucket)) { Console.Error.WriteLine("--rustfs-bucket is required."); return null; } int? retentionSeconds = null; if (builder.TryGetValue("retain-days", out var retainStr) && !string.IsNullOrWhiteSpace(retainStr)) { if (double.TryParse(retainStr, NumberStyles.Float, CultureInfo.InvariantCulture, out var days) && days > 0) { retentionSeconds = (int)Math.Ceiling(days * 24 * 60 * 60); } else { Console.Error.WriteLine("--retain-days must be a positive number."); return null; } } var retryAttempts = ParseIntOption(builder, "retry-attempts", 3, min: 1, max: 10); var retryDelayMs = ParseIntOption(builder, "retry-delay-ms", 250, min: 50, max: 2000); var timeoutSeconds = ParseIntOption(builder, "timeout-seconds", 0, min: 0, max: 3600); return new MigrationOptions { S3Bucket = bucket, S3ServiceUrl = builder.TryGetValue("s3-endpoint", out var s3Endpoint) ? s3Endpoint : null, S3Region = builder.TryGetValue("s3-region", out var s3Region) ? s3Region : null, S3AccessKey = builder.TryGetValue("s3-access-key", out var s3AccessKey) ? s3AccessKey : null, S3SecretKey = builder.TryGetValue("s3-secret-key", out var s3SecretKey) ? s3SecretKey : null, RustFsEndpoint = rustFsEndpoint!, RustFsBucket = rustFsBucket!, RustFsApiKeyHeader = builder.TryGetValue("rustfs-api-key-header", out var apiKeyHeader) ? apiKeyHeader : null, RustFsApiKey = builder.TryGetValue("rustfs-api-key", out var apiKey) ? apiKey : null, Prefix = builder.TryGetValue("prefix", out var prefix) ? prefix : null, Immutable = builder.ContainsKey("immutable"), RetentionSeconds = retentionSeconds, DryRun = builder.ContainsKey("dry-run"), RetryAttempts = retryAttempts, RetryDelayMs = retryDelayMs, TimeoutSeconds = timeoutSeconds, }; } public static void PrintUsage() { Console.WriteLine(@"Usage: dotnet run --project src/Tools/RustFsMigrator -- \ --s3-bucket \ [--s3-endpoint http://minio:9000] \ [--s3-region us-east-1] \ [--s3-access-key key --s3-secret-key secret] \ --rustfs-endpoint http://rustfs:8080 \ --rustfs-bucket scanner-artifacts \ [--rustfs-api-key-header X-API-Key --rustfs-api-key token] \ [--prefix scanner/] \ [--immutable] \ [--retain-days 365] \ [--retry-attempts 3] \ [--retry-delay-ms 250] \ [--timeout-seconds 0] \ [--dry-run]"); } private static int ParseIntOption(Dictionary values, string name, int fallback, int min, int max) { if (!values.TryGetValue(name, out var raw) || string.IsNullOrWhiteSpace(raw)) { return fallback; } if (!int.TryParse(raw, NumberStyles.Integer, CultureInfo.InvariantCulture, out var parsed)) { return fallback; } if (parsed < min) { return min; } return parsed > max ? max : parsed; } }