22 KiB
component_architecture_concelier.md — Stella Ops Concelier (Sprint 22)
Scope. Implementation-ready architecture for Concelier: the advisory ingestion and Link-Not-Merge (LNM) observation pipeline that produces deterministic raw observations, correlation linksets, and evidence events consumed by Policy Engine, Console, CLI, and Export centers. Covers domain models, connectors, observation/linkset builders, storage schema, events, APIs, performance, security, and test matrices.
0) Mission & boundaries
Mission. Acquire authoritative vulnerability advisories (vendor PSIRTs, distros, OSS ecosystems, CERTs), persist them as immutable observations under the Aggregation-Only Contract (AOC), construct linksets that correlate observations without merging or precedence, and export deterministic evidence bundles (JSON, Trivy DB, Offline Kit) for downstream policy evaluation and operator tooling.
Boundaries.
- Concelier does not sign with private keys. When attestation is required, the export artifact is handed to the Signer/Attestor pipeline (out‑of‑process).
- Concelier does not decide PASS/FAIL; it provides data to the Policy engine.
- Online operation is allowlist‑only; air‑gapped deployments use the Offline Kit.
1) Topology & processes
Process shape: single ASP.NET Core service StellaOps.Concelier.WebService hosting:
- Scheduler with distributed locks (Mongo backed).
- Connectors (fetch/parse/map) that emit immutable observation candidates.
- Observation writer enforcing AOC invariants via
AOCWriteGuard. - Linkset builder that correlates observations into
advisory_linksetsand annotates conflicts. - Event publisher emitting
advisory.observation.updatedandadvisory.linkset.updatedmessages. - Exporters (JSON, Trivy DB, Offline Kit slices) fed from observation/linkset stores.
- Minimal REST for health/status/trigger/export and observation/linkset reads.
Scale: HA by running N replicas; locks prevent overlapping jobs per source/exporter.
2) Canonical domain model
Stored in MongoDB (database
concelier), serialized with a canonical JSON writer (stable order, camelCase, normalized timestamps).
2.1 Core entities
AdvisoryObservation
observationId // deterministic id: {tenant}:{source.vendor}:{upstreamId}:{revision}
tenant // issuing tenant (lower-case)
source{
vendor, stream, api, collectorVersion
}
upstream{
upstreamId, documentVersion, fetchedAt, receivedAt,
contentHash, signature{present, format?, keyId?, signature?}
}
content{
format, specVersion, raw, metadata?
}
identifiers{
cve?, ghsa?, vendorIds[], aliases[]
}
linkset{
purls[], cpes[], aliases[], references[{type,url}],
reconciledFrom[]
}
createdAt // when Concelier recorded the observation
attributes // optional provenance metadata (batch ids, ingest cursor)
```jsonc
#### AdvisoryLinkset
```jsonc
linksetId // sha256 over sorted (tenant, product/vuln tuple, observation ids)
tenant
key{
vulnerabilityId,
productKey,
confidence // low|medium|high
}
observations[] = [
{
observationId,
sourceVendor,
statement{
status?, severity?, references?, notes?
},
collectedAt
}
]
aliases{
primary,
others[]
}
purls[]
cpes[]
conflicts[]? // see AdvisoryLinksetConflict
createdAt
updatedAt
```jsonc
#### AdvisoryLinksetConflict
```jsonc
conflictId // deterministic hash
type // severity-mismatch | affected-range-divergence | reference-clash | alias-inconsistency | metadata-gap
field? // optional JSON pointer (e.g., /statement/severity/vector)
observations[] // per-source values contributing to the conflict
confidence // low|medium|high (heuristic weight)
detectedAt
```jsonc
#### ObservationEvent / LinksetEvent
```jsonc
eventId // ULID
tenant
type // advisory.observation.updated | advisory.linkset.updated
key{
observationId? // on observation event
linksetId? // on linkset event
vulnerabilityId?,
productKey?
}
delta{
added[], removed[], changed[] // normalized summary for consumers
}
hash // canonical hash of serialized delta payload
occurredAt
```jsonc
#### ExportState
```jsonc
exportKind // json | trivydb
baseExportId? // last full baseline
baseDigest? // digest of last full baseline
lastFullDigest? // digest of last full export
lastDeltaDigest? // digest of last delta export
cursor // per-kind incremental cursor
files[] // last manifest snapshot (path → sha256)
```jsonc
Legacy `Advisory`, `Affected`, and merge-centric entities remain in the repository for historical exports and replay but are being phased out as Link-Not-Merge takes over. New code paths must interact with `AdvisoryObservation` / `AdvisoryLinkset` exclusively and emit conflicts through the structured payloads described above.
### 2.2 Product identity (`productKey`)
* **Primary:** `purl` (Package URL).
* **OS packages:** RPM (NEVRA→purl:rpm), DEB (dpkg→purl:deb), APK (apk→purl:alpine), with **EVR/NVRA** preserved.
* **Secondary:** `cpe` retained for compatibility; advisory records may carry both.
* **Image/platform:** `oci:<registry>/<repo>@<digest>` for image‑level advisories (rare).
* **Unmappable:** if a source is non‑deterministic, keep native string under `productKey="native:<provider>:<id>"` and mark **non‑joinable**.
---
## 3) Source families & precedence
### 3.1 Families
* **Vendor PSIRTs**: Microsoft, Oracle, Cisco, Adobe, Apple, VMware, Chromium…
* **Linux distros**: Red Hat, SUSE, Ubuntu, Debian, Alpine…
* **OSS ecosystems**: OSV, GHSA (GitHub Security Advisories), PyPI, npm, Maven, NuGet, Go.
* **CERTs / national CSIRTs**: CISA (KEV, ICS), JVN, ACSC, CCCS, KISA, CERT‑FR/BUND, etc.
### 3.2 Precedence (when claims conflict)
1. **Vendor PSIRT** (authoritative for their product).
2. **Distro** (authoritative for packages they ship, including backports).
3. **Ecosystem** (OSV/GHSA) for library semantics.
4. **CERTs/aggregators** for enrichment (KEV/known exploited).
> Precedence affects **Affected** ranges and **fixed** info; **severity** is normalized to the **maximum** credible severity unless policy overrides. Conflicts are retained with **source provenance**.
---
## 4) Connectors & normalization
### 4.1 Connector contract
```csharp
public interface IFeedConnector {
string SourceName { get; }
Task FetchAsync(IServiceProvider sp, CancellationToken ct); // -> document collection
Task ParseAsync(IServiceProvider sp, CancellationToken ct); // -> dto collection (validated)
Task MapAsync(IServiceProvider sp, CancellationToken ct); // -> advisory/alias/affected/reference
}
```jsonc
* **Fetch**: windowed (cursor), conditional GET (ETag/Last‑Modified), retry/backoff, rate limiting.
* **Parse**: schema validation (JSON Schema, XSD/CSAF), content type checks; write **DTO** with normalized casing.
* **Map**: build canonical records; all outputs carry **provenance** (doc digest, URI, anchors).
### 4.2 Version range normalization
* **SemVer** ecosystems (npm, pypi, maven, nuget, golang): normalize to `introduced`/`fixed` semver ranges (use `~`, `^`, `<`, `>=` canonicalized to intervals).
* **RPM EVR**: `epoch:version-release` with `rpmvercmp` semantics; store raw EVR strings and also **computed order keys** for query.
* **DEB**: dpkg version comparison semantics mirrored; store computed keys.
* **APK**: Alpine version semantics; compute order keys.
* **Generic**: if provider uses text, retain raw; do **not** invent ranges.
### 4.3 Severity & CVSS
* Normalize **CVSS v2/v3/v4** where available (vector, baseScore, severity).
* If multiple CVSS sources exist, track them all; **effective severity** defaults to **max** by policy (configurable).
* **ExploitKnown** toggled by KEV and equivalent sources; store **evidence** (source, date).
---
## 5) Observation & linkset pipeline
> **Goal:** deterministically ingest raw documents into immutable observations, correlate them into evidence-rich linksets, and broadcast changes without precedence or mutation.
### 5.1 Observation flow
1. **Connector fetch/parse/map** — connectors download upstream payloads, validate signatures, and map to DTOs (identifiers, references, raw payload, provenance).
2. **AOC guard** — `AOCWriteGuard` verifies forbidden keys, provenance completeness, tenant claims, timestamp normalization, and content hash idempotency. Violations raise `ERR_AOC_00x` mapped to structured logs and metrics.
3. **Append-only write** — observations insert into `advisory_observations`; duplicates by `(tenant, source.vendor, upstream.upstreamId, upstream.contentHash)` become no-ops; new content for same upstream id creates a supersedes chain.
4. **Change feed + event** — Mongo change streams trigger `advisory.observation.updated@1` events with deterministic payloads (IDs, hash, supersedes pointer, linkset summary). Policy Engine, Offline Kit builder, and guard dashboards subscribe.
### 5.2 Linkset correlation
1. **Queue** — observation deltas enqueue correlation jobs keyed by `(tenant, vulnerabilityId, productKey)` candidates derived from identifiers + alias graph.
2. **Canonical grouping** — builder resolves aliases using Concelier’s alias store and deterministic heuristics (vendor > distro > cert), deriving normalized product keys (purl preferred) and confidence scores.
3. **Linkset materialization** — `advisory_linksets` documents store sorted observation references, alias sets, product keys, range metadata, and conflict payloads. Writes are idempotent; unchanged hashes skip updates.
4. **Conflict detection** — builder emits structured conflicts (`severity-mismatch`, `affected-range-divergence`, `reference-clash`, `alias-inconsistency`, `metadata-gap`). Conflicts carry per-observation values for explainability.
5. **Event emission** — `advisory.linkset.updated@1` summarizes deltas (`added`, `removed`, `changed` observation IDs, conflict updates, confidence changes) and includes a canonical hash for replay validation.
### 5.3 Event contract
| Event | Schema | Notes |
|-------|--------|-------|
| `advisory.observation.updated@1` | `events/advisory.observation.updated@1.json` | Fired on new or superseded observations. Includes `observationId`, source metadata, `linksetSummary` (aliases/purls), supersedes pointer (if any), SHA-256 hash, and `traceId`. |
| `advisory.linkset.updated@1` | `events/advisory.linkset.updated@1.json` | Fired when correlation changes. Includes `linksetId`, `key{vulnerabilityId, productKey, confidence}`, observation deltas, conflicts, `updatedAt`, and canonical hash. |
Events are emitted via NATS (primary) and Redis Stream (fallback). Consumers acknowledge idempotently using the hash; duplicates are safe. Offline Kit captures both topics during bundle creation for air-gapped replay.
---
## 6) Storage schema (MongoDB)
### Collections & indexes (LNM path)
* `concelier.sources` `{_id, type, baseUrl, enabled, notes}` — connector catalog.
* `concelier.source_state` `{sourceName(unique), enabled, cursor, lastSuccess, backoffUntil, paceOverrides}` — run-state (TTL indexes on `backoffUntil`).
* `concelier.documents` `{_id, sourceName, uri, fetchedAt, sha256, contentType, status, metadata, gridFsId?, etag?, lastModified?}` — raw payload registry.
* Indexes: `{sourceName:1, uri:1}` unique; `{fetchedAt:-1}` for recent fetches.
* `concelier.dto` `{_id, sourceName, documentId, schemaVer, payload, validatedAt}` — normalized connector DTOs used for replay.
* Index: `{sourceName:1, documentId:1}`.
* `concelier.advisory_observations`
{ _id: "tenant:vendor:upstreamId:revision", tenant, source: { vendor, stream, api, collectorVersion }, upstream: { upstreamId, documentVersion, fetchedAt, receivedAt, contentHash, signature }, content: { format, specVersion, raw, metadata? }, identifiers: { cve?, ghsa?, vendorIds[], aliases[] }, linkset: { purls[], cpes[], aliases[], references[], reconciledFrom[] }, supersedes?: "prevObservationId", createdAt, attributes?: object }
* Indexes: `{tenant:1, upstream.upstreamId:1}`, `{tenant:1, source.vendor:1, linkset.purls:1}`, `{tenant:1, linkset.aliases:1}`, `{tenant:1, createdAt:-1}`.
* `concelier.advisory_linksets`
{ _id: "sha256:...", tenant, key: { vulnerabilityId, productKey, confidence }, observations: [ { observationId, sourceVendor, statement, collectedAt } ], aliases: { primary, others: [] }, purls: [], cpes: [], conflicts: [], createdAt, updatedAt }
* Indexes: `{tenant:1, key.vulnerabilityId:1, key.productKey:1}`, `{tenant:1, purls:1}`, `{tenant:1, aliases.primary:1}`, `{tenant:1, updatedAt:-1}`.
* `concelier.advisory_events`
{ _id: ObjectId, tenant, type: "advisory.observation.updated" | "advisory.linkset.updated", key, delta, hash, occurredAt }
* TTL index on `occurredAt` (configurable retention), `{type:1, occurredAt:-1}` for replay.
* `concelier.export_state` `{_id(exportKind), baseExportId?, baseDigest?, lastFullDigest?, lastDeltaDigest?, cursor, files[]}`
* `locks` `{_id(jobKey), holder, acquiredAt, heartbeatAt, leaseMs, ttlAt}` (TTL cleans dead locks)
* `jobs` `{_id, type, args, state, startedAt, heartbeatAt, endedAt, error}`
**Legacy collections** (`advisory`, `alias`, `affected`, `reference`, `merge_event`) remain read-only during the migration window to support back-compat exports. New code must not write to them; scheduled cleanup removes them after Link-Not-Merge GA.
**GridFS buckets**: `fs.documents` for raw payloads (immutable); `fs.exports` for historical JSON/Trivy archives.
---
## 7) Exporters
### 7.1 Deterministic JSON (vuln‑list style)
* Folder structure mirroring `/<scheme>/<first-two>/<rest>/…` with one JSON per advisory; deterministic ordering, stable timestamps, normalized whitespace.
* `manifest.json` lists all files with SHA‑256 and a top‑level **export digest**.
### 7.2 Trivy DB exporter
* Builds Bolt DB archives compatible with Trivy; supports **full** and **delta** modes.
* In delta, unchanged blobs are reused from the base; metadata captures:
```json
{
"mode": "delta|full",
"baseExportId": "...",
"baseManifestDigest": "sha256:...",
"changed": ["path1", "path2"],
"removed": ["path3"]
}
- Optional ORAS push (OCI layout) for registries.
- Offline kit bundles include Trivy DB + JSON tree + export manifest.
- Mirror-ready bundles: when
concelier.trivy.mirrordefines domains, the exporter emitsmirror/index.jsonplus per-domainmanifest.json,metadata.json, anddb.tar.gzfiles with SHA-256 digests so Concelier mirrors can expose domain-scoped download endpoints. - Concelier.WebService serves
/concelier/exports/index.jsonand/concelier/exports/mirror/{domain}/…directly from the export tree with hour-long budgets (index: 60 s, bundles: 300 s, immutable) and per-domain rate limiting; the endpoints honour Stella Ops Authority or CIDR bypass lists depending on mirror topology.
7.3 Hand‑off to Signer/Attestor (optional)
- On export completion, if
attest: trueis set in job args, Concelier posts the artifact metadata to Signer/Attestor; Concelier itself does not hold signing keys. - Export record stores returned
{ uuid, index, url }from Rekor v2.
8) REST APIs
All under /api/v1/concelier.
Health & status
GET /healthz | /readyz
GET /status → sources, last runs, export cursors
Sources & jobs
GET /sources → list of configured sources
POST /sources/{name}/trigger → { jobId }
POST /sources/{name}/pause | /resume → toggle
GET /jobs/{id} → job status
Exports
POST /exports/json { full?:bool, force?:bool, attest?:bool } → { exportId, digest, rekor? }
POST /exports/trivy { full?:bool, force?:bool, publish?:bool, attest?:bool } → { exportId, digest, rekor? }
GET /exports/{id} → export metadata (kind, digest, createdAt, rekor?)
GET /concelier/exports/index.json → mirror index describing available domains/bundles
GET /concelier/exports/mirror/{domain}/manifest.json
GET /concelier/exports/mirror/{domain}/bundle.json
GET /concelier/exports/mirror/{domain}/bundle.json.jws
Search (operator debugging)
GET /advisories/{key}
GET /advisories?scheme=CVE&value=CVE-2025-12345
GET /affected?productKey=pkg:rpm/openssl&limit=100
AuthN/Z: Authority tokens (OpTok) with roles: concelier.read, concelier.admin, concelier.export.
9) Configuration (YAML)
concelier:
mongo: { uri: "mongodb://mongo/concelier" }
s3:
endpoint: "http://minio:9000"
bucket: "stellaops-concelier"
scheduler:
windowSeconds: 30
maxParallelSources: 4
sources:
- name: redhat
kind: csaf
baseUrl: https://access.redhat.com/security/data/csaf/v2/
signature: { type: pgp, keys: [ "…redhat PGP…" ] }
enabled: true
windowDays: 7
- name: suse
kind: csaf
baseUrl: https://ftp.suse.com/pub/projects/security/csaf/
signature: { type: pgp, keys: [ "…suse PGP…" ] }
- name: ubuntu
kind: usn-json
baseUrl: https://ubuntu.com/security/notices.json
signature: { type: none }
- name: osv
kind: osv
baseUrl: https://api.osv.dev/v1/
signature: { type: none }
- name: ghsa
kind: ghsa
baseUrl: https://api.github.com/graphql
auth: { tokenRef: "env:GITHUB_TOKEN" }
exporters:
json:
enabled: true
output: s3://stellaops-concelier/json/
trivy:
enabled: true
mode: full
output: s3://stellaops-concelier/trivy/
oras:
enabled: false
repo: ghcr.io/org/concelier
precedence:
vendorWinsOverDistro: true
distroWinsOverOsv: true
severity:
policy: max # or 'vendorPreferred' / 'distroPreferred'
10) Security & compliance
- Outbound allowlist per connector (domains, protocols); proxy support; TLS pinning where possible.
- Signature verification for raw docs (PGP/cosign/x509) with results stored in
document.metadata.sig. Docs failing verification may still be ingested but flagged; Policy Engine or downstream policy can down-weight them. - No secrets in logs; auth material via
env:or mounted files; HTTP redaction ofAuthorizationheaders. - Multi‑tenant: per‑tenant DBs or prefixes; per‑tenant S3 prefixes; tenant‑scoped API tokens.
- Determinism: canonical JSON writer; export digests stable across runs given same inputs.
11) Performance targets & scale
- Ingest: ≥ 5k documents/min on 4 cores (CSAF/OpenVEX/JSON).
- Normalize/map: ≥ 50k observation statements/min on 4 cores.
- Observation write: ≤ 5 ms P95 per document (including guard + Mongo write).
- Linkset build: ≤ 15 ms P95 per
(vulnerabilityId, productKey)update, even with 20+ contributing observations. - Export: 1M advisories JSON in ≤ 90 s (streamed, zstd), Trivy DB in ≤ 60 s on 8 cores.
- Memory: hard cap per job; chunked streaming writers; backpressure to avoid GC spikes.
Scale pattern: add Concelier replicas; Mongo scaling via indices and read/write concerns; GridFS only for oversized docs.
12) Observability
-
Metrics
concelier.fetch.docs_total{source}concelier.fetch.bytes_total{source}concelier.parse.failures_total{source}concelier.map.statements_total{source}concelier.observations.write_total{result=ok|noop|error}concelier.linksets.updated_total{result=ok|skip|error}concelier.linksets.conflicts_total{type}concelier.export.bytes{kind}concelier.export.duration_seconds{kind}
-
Tracing around fetch/parse/map/observe/linkset/export.
-
Logs: structured with
source,uri,docDigest,advisoryKey,exportId.
13) Testing matrix
- Connectors: fixture suites for each provider/format (happy path; malformed; signature fail).
- Version semantics: EVR vs dpkg vs semver edge cases (epoch bumps, tilde versions, pre‑releases).
- Linkset correlation: multi-source conflicts (severity, range, alias) produce deterministic conflict payloads; ensure confidence scoring stable.
- Export determinism: byte‑for‑byte stable outputs across runs; digest equality.
- Performance: soak tests with 1M advisories; cap memory; verify backpressure.
- API: pagination, filters, RBAC, error envelopes (RFC 7807).
- Offline kit: bundle build & import correctness.
14) Failure modes & recovery
- Source outages: scheduler backs off with exponential delay;
source_state.backoffUntil; alerts on staleness. - Schema drifts: parse stage marks DTO invalid; job fails with clear diagnostics; connector version flags track supported schema ranges.
- Partial exports: exporters write to temp prefix; manifest commit is atomic; only then move to final prefix and update
export_state. - Resume: all stages idempotent;
source_state.cursorsupports window resume.
15) Operator runbook (quick)
- Trigger all sources:
POST /api/v1/concelier/sources/*/trigger - Force full export JSON:
POST /api/v1/concelier/exports/json { "full": true, "force": true } - Force Trivy DB delta publish:
POST /api/v1/concelier/exports/trivy { "full": false, "publish": true } - Inspect observation:
GET /api/v1/concelier/observations/{observationId} - Query linkset:
GET /api/v1/concelier/linksets?vulnerabilityId=CVE-2025-12345&productKey=pkg:rpm/redhat/openssl - Pause noisy source:
POST /api/v1/concelier/sources/osv/pause
16) Rollout plan
- MVP: Red Hat (CSAF), SUSE (CSAF), Ubuntu (USN JSON), OSV; JSON export.
- Add: GHSA GraphQL, Debian (DSA HTML/JSON), Alpine secdb; Trivy DB export.
- Attestation hand‑off: integrate with Signer/Attestor (optional).
- Scale & diagnostics: provider dashboards, staleness alerts, export cache reuse.
- Offline kit: end‑to‑end verified bundles for air‑gap.