8.4 KiB
Sync Ledger Schema
Sprint: SPRINT_8200_0014_0001_DB_sync_ledger_schema
Migration: 008_sync_ledger.sql
Working Directory: src/Concelier/__Libraries/StellaOps.Concelier.Storage.Postgres/
Overview
The sync ledger schema provides federation cursor tracking for multi-site vulnerability advisory synchronization. It enables air-gapped and distributed deployments to:
- Track sync state per remote site using monotonic cursors
- Prevent duplicate bundle imports via hash-based deduplication
- Enforce per-site policies (source filtering, size limits, signature requirements)
This is critical infrastructure for the Concelier module's federation capabilities.
Tables
vuln.sync_ledger
Tracks federation sync state per remote site. Each record represents a successfully imported bundle.
| Column | Type | Description |
|---|---|---|
id |
UUID | Primary key (auto-generated) |
site_id |
TEXT | Remote site identifier (e.g., site-us-west, airgap-dc2) |
cursor |
TEXT | Position marker for incremental sync (ISO8601#sequence format) |
bundle_hash |
TEXT | SHA256 hash of imported bundle (for deduplication) |
items_count |
INT | Number of advisory items in the bundle |
signed_at |
TIMESTAMPTZ | Timestamp when bundle was signed by remote site |
imported_at |
TIMESTAMPTZ | Timestamp when bundle was imported locally (auto-generated) |
Constraints:
uq_sync_ledger_site_cursor: Unique constraint on(site_id, cursor)- prevents duplicate cursor positionsuq_sync_ledger_bundle: Unique constraint onbundle_hash- prevents reimporting same bundle
vuln.site_policy
Per-site federation governance policies. Controls what data can be imported from each remote site.
| Column | Type | Description |
|---|---|---|
id |
UUID | Primary key (auto-generated) |
site_id |
TEXT | Site identifier (unique) |
display_name |
TEXT | Human-readable site name |
allowed_sources |
TEXT[] | Source allow list (empty = allow all, supports wildcards) |
denied_sources |
TEXT[] | Source deny list (takes precedence over allow list) |
max_bundle_size_mb |
INT | Maximum bundle size in MB (default: 100) |
max_items_per_bundle |
INT | Maximum items per bundle (default: 10000) |
require_signature |
BOOLEAN | Require cryptographic signature on bundles (default: TRUE) |
allowed_signers |
TEXT[] | Allowed signer key IDs or issuer names |
enabled |
BOOLEAN | Policy enabled status (default: TRUE) |
created_at |
TIMESTAMPTZ | Record creation time |
updated_at |
TIMESTAMPTZ | Last modification time (auto-updated via trigger) |
Indexes
| Index | Columns | Purpose |
|---|---|---|
idx_sync_ledger_site |
site_id |
Fast site-based lookups |
idx_sync_ledger_site_time |
site_id, signed_at DESC |
Get latest entries per site |
idx_site_policy_enabled |
enabled (partial, WHERE enabled = TRUE) |
Filter active policies |
Cursor Format
Cursors use a combined timestamp + sequence format for total ordering:
2025-01-15T10:30:00.000Z#0042
Components:
- ISO8601 timestamp with timezone (UTC)
#separator- 4-digit sequence number (allows multiple bundles per timestamp)
Ordering rules:
- Compare timestamps first
- If equal, compare sequence numbers
- Higher values indicate later positions
CursorFormat Utility
// Create cursor
string cursor = CursorFormat.Create(DateTimeOffset.UtcNow, sequence: 0);
// Result: "2025-01-15T10:30:00.0000000+00:00#0000"
// Parse cursor
var (timestamp, sequence) = CursorFormat.Parse(cursor);
// Compare cursors
bool isNewer = CursorFormat.IsAfter(cursor1, cursor2);
Repository Operations
ISyncLedgerRepository Interface
public interface ISyncLedgerRepository
{
// Ledger CRUD
Task<SyncLedgerEntity?> GetLatestAsync(string siteId, CancellationToken ct = default);
Task<IReadOnlyList<SyncLedgerEntity>> GetHistoryAsync(string siteId, int limit = 10, CancellationToken ct = default);
Task<SyncLedgerEntity?> GetByBundleHashAsync(string bundleHash, CancellationToken ct = default);
Task<Guid> InsertAsync(SyncLedgerEntity entry, CancellationToken ct = default);
// Cursor operations
Task<string?> GetCursorAsync(string siteId, CancellationToken ct = default);
Task AdvanceCursorAsync(string siteId, string newCursor, string bundleHash,
int itemsCount, DateTimeOffset signedAt, CancellationToken ct = default);
Task<bool> IsCursorConflictAsync(string siteId, string cursor, CancellationToken ct = default);
// Policy operations
Task<SitePolicyEntity?> GetPolicyAsync(string siteId, CancellationToken ct = default);
Task UpsertPolicyAsync(SitePolicyEntity policy, CancellationToken ct = default);
Task<IReadOnlyList<SitePolicyEntity>> GetAllPoliciesAsync(bool enabledOnly = true, CancellationToken ct = default);
// Statistics
Task<SyncStatistics> GetStatisticsAsync(CancellationToken ct = default);
}
Policy Enforcement
The SitePolicyEnforcementService validates imports against site policies.
Source Validation
Sources are validated against allow/deny lists with wildcard support:
// Pattern matching
"github.com/*" // Matches github.com/advisories, github.com/osv
"*.redhat.com" // Matches access.redhat.com, bugzilla.redhat.com
"nvd.nist.gov" // Exact match only
Evaluation order:
- Check deny list first (if matched, reject)
- If allow list is empty, allow all
- If allow list has entries, source must match at least one
Bundle Size Validation
var result = await policyService.ValidateBundleSizeAsync(
siteId: "site-us-west",
bundleSizeMb: 50,
itemsCount: 5000,
ct: cancellationToken);
if (!result.IsValid)
{
// result.SizeExceedsLimit or result.ItemsExceedLimit
}
Budget Tracking
var budget = await policyService.GetBudgetInfoAsync("site-us-west", ct);
// Returns: TotalImported, ItemsImported, LastImportAt, RemainingBudgetMb
Usage Examples
Check for duplicate bundle
SELECT EXISTS(
SELECT 1 FROM vuln.sync_ledger
WHERE bundle_hash = :hash
) AS already_imported;
Get latest cursor for incremental sync
SELECT cursor
FROM vuln.sync_ledger
WHERE site_id = :site_id
ORDER BY signed_at DESC
LIMIT 1;
Import history with pagination
SELECT id, cursor, bundle_hash, items_count, signed_at, imported_at
FROM vuln.sync_ledger
WHERE site_id = :site_id
ORDER BY signed_at DESC
LIMIT :limit OFFSET :offset;
Get active site policies
SELECT site_id, display_name, allowed_sources, denied_sources,
max_bundle_size_mb, max_items_per_bundle
FROM vuln.site_policy
WHERE enabled = TRUE;
Site sync statistics
SELECT
site_id,
COUNT(*) as bundle_count,
SUM(items_count) as total_items,
MAX(signed_at) as latest_bundle,
MIN(imported_at) as first_import
FROM vuln.sync_ledger
GROUP BY site_id;
Error Handling
Duplicate Bundle Import
If a bundle with the same hash is imported twice, the unique constraint prevents insertion. The repository returns the existing entry ID:
var existingEntry = await repository.GetByBundleHashAsync(bundleHash, ct);
if (existingEntry != null)
{
// Bundle already imported, skip
return existingEntry.Id;
}
Cursor Conflict Detection
Out-of-order imports are detected via cursor comparison:
bool hasConflict = await repository.IsCursorConflictAsync(siteId, newCursor, ct);
if (hasConflict)
{
// newCursor is not newer than current position
throw new CursorConflictException(...);
}
Migration
Migration file: src/Concelier/__Libraries/StellaOps.Concelier.Storage.Postgres/Migrations/008_sync_ledger.sql
Prerequisites:
vulnschema must existvuln.update_timestamp()trigger function must exist (from canonical schema migration)
Apply with:
psql -d stellaops -f 008_sync_ledger.sql
Rollback (if needed):
DROP TRIGGER IF EXISTS trg_site_policy_updated ON vuln.site_policy;
DROP TABLE IF EXISTS vuln.site_policy;
DROP TABLE IF EXISTS vuln.sync_ledger;