sprints work.
This commit is contained in:
@@ -0,0 +1,527 @@
|
||||
using System.Collections.Immutable;
|
||||
using System.IO.Compression;
|
||||
using System.Net;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using StellaOps.BinaryIndex.GroundTruth.Abstractions;
|
||||
using StellaOps.BinaryIndex.GroundTruth.Ddeb.Configuration;
|
||||
using StellaOps.BinaryIndex.GroundTruth.Ddeb.Internal;
|
||||
|
||||
namespace StellaOps.BinaryIndex.GroundTruth.Ddeb;
|
||||
|
||||
/// <summary>
|
||||
/// Ubuntu ddeb debug symbol package connector.
|
||||
/// Fetches .ddeb packages containing DWARF debug symbols.
|
||||
/// </summary>
|
||||
public sealed class DdebConnector : SymbolSourceConnectorBase, ISymbolSourceCapability
|
||||
{
|
||||
private readonly IHttpClientFactory _httpClientFactory;
|
||||
private readonly ISymbolRawDocumentRepository _documentRepository;
|
||||
private readonly ISymbolObservationRepository _observationRepository;
|
||||
private readonly ISymbolSourceStateRepository _stateRepository;
|
||||
private readonly ISymbolObservationWriteGuard _writeGuard;
|
||||
private readonly DdebOptions _options;
|
||||
private readonly DdebDiagnostics _diagnostics;
|
||||
|
||||
/// <summary>
|
||||
/// Source ID for this connector.
|
||||
/// </summary>
|
||||
public const string SourceName = "ddeb-ubuntu";
|
||||
|
||||
public DdebConnector(
|
||||
IHttpClientFactory httpClientFactory,
|
||||
ISymbolRawDocumentRepository documentRepository,
|
||||
ISymbolObservationRepository observationRepository,
|
||||
ISymbolSourceStateRepository stateRepository,
|
||||
ISymbolObservationWriteGuard writeGuard,
|
||||
IOptions<DdebOptions> options,
|
||||
DdebDiagnostics diagnostics,
|
||||
ILogger<DdebConnector> logger,
|
||||
TimeProvider? timeProvider = null)
|
||||
: base(logger, timeProvider)
|
||||
{
|
||||
_httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory));
|
||||
_documentRepository = documentRepository ?? throw new ArgumentNullException(nameof(documentRepository));
|
||||
_observationRepository = observationRepository ?? throw new ArgumentNullException(nameof(observationRepository));
|
||||
_stateRepository = stateRepository ?? throw new ArgumentNullException(nameof(stateRepository));
|
||||
_writeGuard = writeGuard ?? throw new ArgumentNullException(nameof(writeGuard));
|
||||
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
|
||||
_options.Validate();
|
||||
_diagnostics = diagnostics ?? throw new ArgumentNullException(nameof(diagnostics));
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public override string SourceId => SourceName;
|
||||
|
||||
/// <inheritdoc/>
|
||||
public override string DisplayName => "Ubuntu ddebs";
|
||||
|
||||
/// <inheritdoc/>
|
||||
public override IReadOnlyList<string> SupportedDistros => ["ubuntu"];
|
||||
|
||||
/// <inheritdoc/>
|
||||
public override async Task FetchAsync(IServiceProvider services, CancellationToken cancellationToken)
|
||||
{
|
||||
var state = await _stateRepository.GetOrCreateAsync(SourceId, cancellationToken);
|
||||
|
||||
// Check backoff
|
||||
if (state.BackoffUntil.HasValue && state.BackoffUntil.Value > UtcNow)
|
||||
{
|
||||
Logger.LogInformation(
|
||||
"Ddeb fetch skipped due to backoff until {BackoffUntil}",
|
||||
state.BackoffUntil.Value);
|
||||
return;
|
||||
}
|
||||
|
||||
var httpClient = _httpClientFactory.CreateClient(DdebOptions.HttpClientName);
|
||||
var fetchedCount = 0;
|
||||
var errorCount = 0;
|
||||
|
||||
foreach (var distribution in _options.Distributions)
|
||||
{
|
||||
foreach (var component in _options.Components)
|
||||
{
|
||||
foreach (var architecture in _options.Architectures)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
try
|
||||
{
|
||||
var packagesIndexed = await FetchPackagesIndexAsync(
|
||||
httpClient,
|
||||
distribution,
|
||||
component,
|
||||
architecture,
|
||||
state,
|
||||
cancellationToken);
|
||||
|
||||
fetchedCount += packagesIndexed;
|
||||
}
|
||||
catch (HttpRequestException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
|
||||
{
|
||||
Logger.LogDebug(
|
||||
"Packages index not found for {Distro}/{Component}/{Arch}",
|
||||
distribution, component, architecture);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogError(ex, "Fetch", $"Failed to fetch index for {distribution}/{component}/{architecture}");
|
||||
errorCount++;
|
||||
_diagnostics.RecordFetchError();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state = state with { LastSuccessAt = UtcNow };
|
||||
await _stateRepository.UpdateAsync(state, cancellationToken);
|
||||
|
||||
Logger.LogInformation(
|
||||
"Ddeb fetch completed: {FetchedCount} packages indexed, {ErrorCount} errors",
|
||||
fetchedCount, errorCount);
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public override async Task ParseAsync(IServiceProvider services, CancellationToken cancellationToken)
|
||||
{
|
||||
var state = await _stateRepository.GetOrCreateAsync(SourceId, cancellationToken);
|
||||
|
||||
if (state.PendingParse.Length == 0)
|
||||
{
|
||||
Logger.LogDebug("No documents pending parse for ddeb");
|
||||
return;
|
||||
}
|
||||
|
||||
var debExtractor = services.GetRequiredService<IDebPackageExtractor>();
|
||||
var parsedCount = 0;
|
||||
|
||||
foreach (var digest in state.PendingParse)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var document = await _documentRepository.FindByDigestAsync(digest, cancellationToken);
|
||||
if (document is null)
|
||||
{
|
||||
Logger.LogWarning("Document {Digest} not found for parse", digest);
|
||||
state = state.RemovePendingParse(digest);
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
// Extract .ddeb package
|
||||
var extractionResult = await debExtractor.ExtractAsync(
|
||||
document.PayloadId!.Value,
|
||||
cancellationToken);
|
||||
|
||||
LogParse(digest, extractionResult.SymbolCount);
|
||||
|
||||
// Update document status and move to map phase
|
||||
await _documentRepository.UpdateStatusAsync(digest, DocumentStatus.PendingMap, cancellationToken);
|
||||
state = state.MoveToPendingMap(digest);
|
||||
parsedCount++;
|
||||
_diagnostics.RecordParseSuccess(extractionResult.SymbolCount);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogError(ex, "Parse", $"Failed to parse document {digest}");
|
||||
await _documentRepository.UpdateStatusAsync(digest, DocumentStatus.Failed, cancellationToken);
|
||||
state = state.RemovePendingParse(digest);
|
||||
_diagnostics.RecordParseError();
|
||||
}
|
||||
}
|
||||
|
||||
await _stateRepository.UpdateAsync(state, cancellationToken);
|
||||
|
||||
Logger.LogInformation("Ddeb parse completed: {ParsedCount} packages parsed", parsedCount);
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public override async Task MapAsync(IServiceProvider services, CancellationToken cancellationToken)
|
||||
{
|
||||
var state = await _stateRepository.GetOrCreateAsync(SourceId, cancellationToken);
|
||||
|
||||
if (state.PendingMap.Length == 0)
|
||||
{
|
||||
Logger.LogDebug("No documents pending map for ddeb");
|
||||
return;
|
||||
}
|
||||
|
||||
var debExtractor = services.GetRequiredService<IDebPackageExtractor>();
|
||||
var mappedCount = 0;
|
||||
|
||||
foreach (var digest in state.PendingMap)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var document = await _documentRepository.FindByDigestAsync(digest, cancellationToken);
|
||||
if (document is null)
|
||||
{
|
||||
Logger.LogWarning("Document {Digest} not found for map", digest);
|
||||
state = state.MarkMapped(digest);
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
// Extract symbols from stored payload
|
||||
var extractionResult = await debExtractor.ExtractAsync(
|
||||
document.PayloadId!.Value,
|
||||
cancellationToken);
|
||||
|
||||
// Build observations for each debug binary in the package
|
||||
foreach (var binary in extractionResult.Binaries)
|
||||
{
|
||||
var observation = BuildObservation(document, binary);
|
||||
|
||||
// Validate against AOC
|
||||
_writeGuard.EnsureValid(observation);
|
||||
|
||||
// Check for existing observation
|
||||
var existingId = await _observationRepository.FindByContentHashAsync(
|
||||
SourceId,
|
||||
observation.DebugId,
|
||||
observation.ContentHash,
|
||||
cancellationToken);
|
||||
|
||||
if (existingId is not null)
|
||||
{
|
||||
Logger.LogDebug(
|
||||
"Observation already exists with hash {Hash}, skipping",
|
||||
observation.ContentHash);
|
||||
}
|
||||
else
|
||||
{
|
||||
await _observationRepository.InsertAsync(observation, cancellationToken);
|
||||
LogMap(observation.ObservationId);
|
||||
_diagnostics.RecordMapSuccess(binary.Symbols.Count);
|
||||
}
|
||||
}
|
||||
|
||||
await _documentRepository.UpdateStatusAsync(digest, DocumentStatus.Mapped, cancellationToken);
|
||||
state = state.MarkMapped(digest);
|
||||
mappedCount++;
|
||||
}
|
||||
catch (GroundTruthAocGuardException ex)
|
||||
{
|
||||
Logger.LogError(
|
||||
"AOC violation mapping document {Digest}: {Violations}",
|
||||
digest,
|
||||
string.Join(", ", ex.Violations.Select(v => v.Code)));
|
||||
await _documentRepository.UpdateStatusAsync(digest, DocumentStatus.Quarantined, cancellationToken);
|
||||
state = state.MarkMapped(digest);
|
||||
_diagnostics.RecordMapAocViolation();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
LogError(ex, "Map", $"Failed to map document {digest}");
|
||||
await _documentRepository.UpdateStatusAsync(digest, DocumentStatus.Failed, cancellationToken);
|
||||
state = state.MarkMapped(digest);
|
||||
_diagnostics.RecordMapError();
|
||||
}
|
||||
}
|
||||
|
||||
await _stateRepository.UpdateAsync(state, cancellationToken);
|
||||
|
||||
Logger.LogInformation("Ddeb map completed: {MappedCount} packages mapped", mappedCount);
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public async Task<SymbolSourceConnectivityResult> TestConnectivityAsync(CancellationToken ct = default)
|
||||
{
|
||||
var startTime = UtcNow;
|
||||
try
|
||||
{
|
||||
var httpClient = _httpClientFactory.CreateClient(DdebOptions.HttpClientName);
|
||||
var testUrl = $"/dists/{_options.Distributions[0]}/Release";
|
||||
var response = await httpClient.GetAsync(testUrl, ct);
|
||||
response.EnsureSuccessStatusCode();
|
||||
|
||||
var latency = UtcNow - startTime;
|
||||
return new SymbolSourceConnectivityResult(
|
||||
IsConnected: true,
|
||||
Latency: latency,
|
||||
ErrorMessage: null,
|
||||
TestedAt: UtcNow);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
var latency = UtcNow - startTime;
|
||||
return new SymbolSourceConnectivityResult(
|
||||
IsConnected: false,
|
||||
Latency: latency,
|
||||
ErrorMessage: ex.Message,
|
||||
TestedAt: UtcNow);
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public async Task<SymbolSourceMetadata> GetMetadataAsync(CancellationToken ct = default)
|
||||
{
|
||||
var stats = await _observationRepository.GetStatsAsync(ct);
|
||||
return new SymbolSourceMetadata(
|
||||
SourceId: SourceId,
|
||||
DisplayName: DisplayName,
|
||||
BaseUrl: _options.MirrorUrl.ToString(),
|
||||
LastSyncAt: stats.NewestObservation,
|
||||
ObservationCount: (int)stats.TotalObservations,
|
||||
DebugIdCount: (int)stats.UniqueDebugIds,
|
||||
AdditionalInfo: new Dictionary<string, string>
|
||||
{
|
||||
["distributions"] = string.Join(",", _options.Distributions),
|
||||
["total_symbols"] = stats.TotalSymbols.ToString()
|
||||
});
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public async Task<SymbolData?> FetchByDebugIdAsync(string debugId, CancellationToken ct = default)
|
||||
{
|
||||
// Ddeb doesn't support direct debug ID lookup
|
||||
// Symbols must be fetched via package index
|
||||
var observations = await _observationRepository.FindByDebugIdAsync(debugId, ct);
|
||||
var observation = observations.FirstOrDefault();
|
||||
|
||||
if (observation is null)
|
||||
return null;
|
||||
|
||||
return new SymbolData(
|
||||
DebugId: debugId,
|
||||
BinaryName: observation.BinaryName,
|
||||
Architecture: observation.Architecture,
|
||||
Symbols: observation.Symbols.Select(s => new SymbolEntry(
|
||||
Name: s.Name,
|
||||
DemangledName: s.DemangledName,
|
||||
Address: s.Address,
|
||||
SizeBytes: (int)Math.Min(s.Size, int.MaxValue),
|
||||
Type: s.Type,
|
||||
Binding: s.Binding,
|
||||
SourceFile: s.SourceFile,
|
||||
SourceLine: s.SourceLine)).ToList(),
|
||||
BuildInfo: observation.BuildMetadata is not null
|
||||
? new BuildMetadata(
|
||||
Compiler: observation.BuildMetadata.Compiler,
|
||||
CompilerVersion: observation.BuildMetadata.CompilerVersion,
|
||||
OptimizationLevel: observation.BuildMetadata.OptimizationLevel,
|
||||
BuildFlags: observation.BuildMetadata.BuildFlags.ToList(),
|
||||
SourceArchiveSha256: observation.BuildMetadata.SourceSha256,
|
||||
BuildTimestamp: observation.BuildMetadata.BuildTimestamp)
|
||||
: null,
|
||||
Provenance: new SymbolDataProvenance(
|
||||
SourceId: SourceId,
|
||||
DocumentUri: observation.Provenance.DocumentUri,
|
||||
FetchedAt: observation.Provenance.FetchedAt,
|
||||
ContentHash: observation.ContentHash,
|
||||
SignatureState: observation.Provenance.SignatureState,
|
||||
SignatureDetails: observation.Provenance.SignatureDetails));
|
||||
}
|
||||
|
||||
private async Task<int> FetchPackagesIndexAsync(
|
||||
HttpClient httpClient,
|
||||
string distribution,
|
||||
string component,
|
||||
string architecture,
|
||||
SymbolSourceState state,
|
||||
CancellationToken ct)
|
||||
{
|
||||
// Fetch Packages.gz index
|
||||
// URL pattern: /dists/{dist}/{component}/debug/binary-{arch}/Packages.gz
|
||||
var indexUrl = $"/dists/{distribution}/{component}/debug/binary-{architecture}/Packages.gz";
|
||||
LogFetch(indexUrl);
|
||||
|
||||
var response = await httpClient.GetAsync(indexUrl, ct);
|
||||
response.EnsureSuccessStatusCode();
|
||||
|
||||
var compressedContent = await response.Content.ReadAsByteArrayAsync(ct);
|
||||
|
||||
// Decompress gzip
|
||||
using var compressedStream = new MemoryStream(compressedContent);
|
||||
using var gzipStream = new GZipStream(compressedStream, CompressionMode.Decompress);
|
||||
using var reader = new StreamReader(gzipStream);
|
||||
var content = await reader.ReadToEndAsync(ct);
|
||||
|
||||
// Parse Packages index
|
||||
var parser = new PackagesIndexParser();
|
||||
var packages = parser.Parse(content, distribution, component, architecture);
|
||||
|
||||
Logger.LogDebug(
|
||||
"Found {Count} ddeb packages in {Dist}/{Component}/{Arch}",
|
||||
packages.Count, distribution, component, architecture);
|
||||
|
||||
// Filter to dbgsym packages and limit
|
||||
var dbgsymPackages = packages
|
||||
.Where(p => p.PackageName.EndsWith("-dbgsym") || p.PackageName.EndsWith("-dbg"))
|
||||
.Take(_options.MaxPackagesPerSync)
|
||||
.ToList();
|
||||
|
||||
var fetchedCount = 0;
|
||||
foreach (var pkg in dbgsymPackages)
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
|
||||
// Check if we already have this package version
|
||||
var existing = await _documentRepository.FindByUriAsync(SourceId, pkg.PoolUrl, ct);
|
||||
if (existing is not null)
|
||||
continue;
|
||||
|
||||
try
|
||||
{
|
||||
var document = await FetchPackageAsync(httpClient, pkg, ct);
|
||||
if (document is not null)
|
||||
{
|
||||
await _documentRepository.UpsertAsync(document, ct);
|
||||
state = state.AddPendingParse(document.Digest);
|
||||
fetchedCount++;
|
||||
_diagnostics.RecordFetchSuccess();
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogWarning(
|
||||
ex,
|
||||
"Failed to fetch ddeb package {Package}",
|
||||
pkg.PackageName);
|
||||
_diagnostics.RecordFetchError();
|
||||
}
|
||||
}
|
||||
|
||||
await _stateRepository.UpdateAsync(state, ct);
|
||||
return fetchedCount;
|
||||
}
|
||||
|
||||
private async Task<SymbolRawDocument?> FetchPackageAsync(
|
||||
HttpClient httpClient,
|
||||
DdebPackageInfo package,
|
||||
CancellationToken ct)
|
||||
{
|
||||
LogFetch(package.PoolUrl, package.PackageName);
|
||||
|
||||
var response = await httpClient.GetAsync(package.PoolUrl, ct);
|
||||
response.EnsureSuccessStatusCode();
|
||||
|
||||
var content = await response.Content.ReadAsByteArrayAsync(ct);
|
||||
var digest = ComputeDocumentDigest(content);
|
||||
|
||||
// Verify SHA256 if provided
|
||||
if (!string.IsNullOrEmpty(package.Sha256))
|
||||
{
|
||||
var expectedDigest = $"sha256:{package.Sha256.ToLowerInvariant()}";
|
||||
if (!digest.Equals(expectedDigest, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
Logger.LogWarning(
|
||||
"SHA256 mismatch for package {Package}: expected {Expected}, got {Actual}",
|
||||
package.PackageName, expectedDigest, digest);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
return new SymbolRawDocument
|
||||
{
|
||||
Digest = digest,
|
||||
SourceId = SourceId,
|
||||
DocumentUri = $"{_options.MirrorUrl}{package.PoolUrl}",
|
||||
FetchedAt = UtcNow,
|
||||
RecordedAt = UtcNow,
|
||||
ContentType = "application/vnd.debian.binary-package",
|
||||
ContentSize = content.Length,
|
||||
ETag = response.Headers.ETag?.Tag,
|
||||
Status = DocumentStatus.PendingParse,
|
||||
PayloadId = null, // Will be set by blob storage
|
||||
Metadata = ImmutableDictionary<string, string>.Empty
|
||||
.Add("package_name", package.PackageName)
|
||||
.Add("package_version", package.Version)
|
||||
.Add("distribution", package.Distribution)
|
||||
.Add("component", package.Component)
|
||||
.Add("architecture", package.Architecture)
|
||||
};
|
||||
}
|
||||
|
||||
private SymbolObservation BuildObservation(
|
||||
SymbolRawDocument document,
|
||||
ExtractedBinary binary)
|
||||
{
|
||||
var packageName = document.Metadata.GetValueOrDefault("package_name", "unknown");
|
||||
var packageVersion = document.Metadata.GetValueOrDefault("package_version", "unknown");
|
||||
var distribution = document.Metadata.GetValueOrDefault("distribution", "unknown");
|
||||
var architecture = document.Metadata.GetValueOrDefault("architecture", "amd64");
|
||||
|
||||
// Determine revision number
|
||||
var existingObservations = _observationRepository
|
||||
.FindByDebugIdAsync(binary.BuildId, CancellationToken.None)
|
||||
.GetAwaiter()
|
||||
.GetResult();
|
||||
var revision = existingObservations.Length + 1;
|
||||
|
||||
var observation = new SymbolObservation
|
||||
{
|
||||
ObservationId = GenerateObservationId(binary.BuildId, revision),
|
||||
SourceId = SourceId,
|
||||
DebugId = binary.BuildId,
|
||||
BinaryName = binary.BinaryName,
|
||||
BinaryPath = binary.BinaryPath,
|
||||
Architecture = architecture,
|
||||
Distro = "ubuntu",
|
||||
DistroVersion = distribution,
|
||||
PackageName = packageName.Replace("-dbgsym", "").Replace("-dbg", ""),
|
||||
PackageVersion = packageVersion,
|
||||
Symbols = binary.Symbols.ToImmutableArray(),
|
||||
SymbolCount = binary.Symbols.Count,
|
||||
BuildMetadata = binary.BuildMetadata,
|
||||
Provenance = new ObservationProvenance
|
||||
{
|
||||
SourceId = SourceId,
|
||||
DocumentUri = document.DocumentUri,
|
||||
FetchedAt = document.FetchedAt,
|
||||
RecordedAt = UtcNow,
|
||||
DocumentHash = document.Digest,
|
||||
SignatureState = SignatureState.None,
|
||||
ConnectorVersion = "1.0.0"
|
||||
},
|
||||
ContentHash = "",
|
||||
CreatedAt = UtcNow
|
||||
};
|
||||
|
||||
var contentHash = ComputeContentHash(observation);
|
||||
return observation with { ContentHash = contentHash };
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user