- Added IScanMetricsRepository interface for scan metrics persistence and retrieval. - Implemented PostgresScanMetricsRepository for PostgreSQL database interactions, including methods for saving and retrieving scan metrics and execution phases. - Introduced methods for obtaining TTE statistics and recent scans for tenants. - Implemented deletion of old metrics for retention purposes. test(tests): Add SCA Failure Catalogue tests for FC6-FC10 - Created ScaCatalogueDeterminismTests to validate determinism properties of SCA Failure Catalogue fixtures. - Developed ScaFailureCatalogueTests to ensure correct handling of specific failure modes in the scanner. - Included tests for manifest validation, file existence, and expected findings across multiple failure cases. feat(telemetry): Integrate scan completion metrics into the pipeline - Introduced IScanCompletionMetricsIntegration interface and ScanCompletionMetricsIntegration class to record metrics upon scan completion. - Implemented proof coverage and TTE metrics recording with logging for scan completion summaries.
57 KiB
Database Specification
Version: 1.0.0 Status: DRAFT Last Updated: 2025-12-15
1. Overview
This document specifies the PostgreSQL database design for StellaOps control-plane domains. It defines schemas, naming conventions, data types, indexing strategies, and design patterns that all database work must follow.
2. Database Architecture
2.1 Database Topology
┌─────────────────────────────────────────────────────────────────┐
│ PostgreSQL Cluster │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ stellaops (database) ││
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ││
│ │ │authority│ │ vuln │ │ vex │ │scheduler│ ││
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ ││
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ││
│ │ │ notify │ │ policy │ │ packs │ │ issuer │ ││
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ ││
│ │ ┌─────────┐ ││
│ │ │ audit │ (cross-cutting audit schema) ││
│ │ └─────────┘ ││
│ └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘
2.2 Schema Ownership
| Schema | Owner Module | Purpose |
|---|---|---|
authority |
Authority | Identity, authentication, authorization, licensing |
vuln |
Concelier | Vulnerability advisories, CVSS, affected packages |
vex |
Excititor | VEX statements, graphs, observations, evidence |
scheduler |
Scheduler | Job definitions, triggers, execution history |
notify |
Notify | Channels, rules, deliveries, escalations |
policy |
Policy | Policy packs, rules, risk profiles, evaluations |
packs |
PacksRegistry | Package attestations, mirrors, lifecycle |
issuer |
IssuerDirectory | Trust anchors, issuer keys, certificates |
unknowns |
Unknowns | Bitemporal ambiguity tracking for scan gaps |
audit |
Shared | Cross-cutting audit log (optional) |
2.3 Multi-Tenancy Model
Strategy: Single database, single schema set, tenant_id column on all tenant-scoped tables with mandatory Row-Level Security (RLS).
-- Every tenant-scoped table includes:
tenant_id UUID NOT NULL,
-- Session-level tenant context (MUST be set on connection open):
SET app.tenant_id = '<tenant-uuid>';
-- Row-level security policy (MANDATORY for all tenant-scoped tables):
ALTER TABLE <schema>.<table> ENABLE ROW LEVEL SECURITY;
ALTER TABLE <schema>.<table> FORCE ROW LEVEL SECURITY;
CREATE POLICY <table>_tenant_isolation ON <schema>.<table>
FOR ALL
USING (tenant_id = <schema>_app.require_current_tenant())
WITH CHECK (tenant_id = <schema>_app.require_current_tenant());
RLS Helper Function Pattern:
Each schema with tenant-scoped tables has a companion <schema>_app schema containing a require_current_tenant() function that validates app.tenant_id is set.
CREATE SCHEMA IF NOT EXISTS <schema>_app;
CREATE OR REPLACE FUNCTION <schema>_app.require_current_tenant()
RETURNS TEXT
LANGUAGE plpgsql STABLE SECURITY DEFINER
AS $$
DECLARE
v_tenant TEXT;
BEGIN
v_tenant := current_setting('app.tenant_id', true);
IF v_tenant IS NULL OR v_tenant = '' THEN
RAISE EXCEPTION 'app.tenant_id session variable not set';
END IF;
RETURN v_tenant;
END;
$$;
Rationale:
- Defense-in-depth tenant isolation at the database level
- Prevents data leakage even if application bugs bypass tenant checks
- Shared connection pooling compatible
- Admin bypass via
BYPASSRLSroles for cross-tenant operations - Composite indexes on
(tenant_id, ...)for query performance
3. Naming Conventions
3.1 Schema Names
- Lowercase, singular noun
- Match module name where applicable
- Examples:
authority,vuln,vex,scheduler,notify,policy
3.2 Table Names
| Convention | Example |
|---|---|
| Lowercase with underscores | advisory_aliases |
| Plural nouns for collections | users, advisories, runs |
| Singular for junction tables | user_role, role_permission |
| Prefix with schema context if ambiguous | vex_statements (not just statements) |
3.3 Column Names
| Convention | Example |
|---|---|
| Lowercase with underscores | created_at, tenant_id |
| Primary keys | id (UUID) |
| Foreign keys | <table>_id (e.g., user_id, advisory_id) |
| Timestamps | *_at suffix (e.g., created_at, updated_at, deleted_at) |
| Booleans | is_* or has_* prefix, or adjective (e.g., enabled, is_primary) |
| Counts | *_count suffix |
| JSONB columns | Descriptive noun (e.g., attributes, metadata, config) |
3.4 Index Names
idx_<table>_<column(s)>
idx_<table>_<purpose>
Examples:
idx_users_tenant- Index on tenant_ididx_users_email- Index on emailidx_advisories_fts- Full-text search indexidx_runs_tenant_state- Composite index
3.5 Constraint Names
<table>_<column>_<type>
<table>_<purpose>_<type>
| Type | Suffix | Example |
|---|---|---|
| Primary key | _pkey |
users_pkey |
| Foreign key | _fkey |
users_tenant_id_fkey |
| Unique | _key |
users_email_key |
| Check | _check |
users_status_check |
| Exclusion | _excl |
schedules_time_excl |
4. Data Types
4.1 Standard Type Mappings
| Domain Concept | PostgreSQL Type | Notes |
|---|---|---|
| Identifiers | UUID |
Use gen_random_uuid() for generation |
| Timestamps | TIMESTAMPTZ |
Always UTC, never TIMESTAMP |
| Short strings | TEXT |
No VARCHAR(n) unless hard limit required |
| Enumerations | TEXT with CHECK |
Not ENUM type (easier migrations) |
| Booleans | BOOLEAN |
Never INTEGER or TEXT |
| Counts/quantities | INTEGER or BIGINT |
Use BIGINT for counters that may exceed 2B |
| Scores/decimals | NUMERIC(p,s) |
Explicit precision for CVSS, percentages |
| Arrays | TEXT[], UUID[] |
PostgreSQL native arrays |
| Semi-structured | JSONB |
Never JSON (always use binary) |
| IP addresses | INET |
For IP storage |
| Large text | TEXT |
No CLOB equivalent needed |
4.2 Identifier Strategy
Primary Keys:
id UUID PRIMARY KEY DEFAULT gen_random_uuid()
Alternative: ULID for time-ordered IDs:
-- If time-ordering in ID is needed (e.g., for pagination)
id TEXT PRIMARY KEY DEFAULT generate_ulid()
Surrogate vs Natural Keys:
- Use UUID surrogate keys for all tables
- Natural keys (e.g.,
advisory_key,username) as unique constraints, not primary keys - Exception: Junction tables use composite primary keys
4.3 Timestamp Conventions
-- Standard audit columns (on every mutable table):
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Optional soft-delete:
deleted_at TIMESTAMPTZ,
deleted_by TEXT,
-- Trigger for updated_at (optional, can be application-managed):
CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_<table>_updated_at
BEFORE UPDATE ON <table>
FOR EACH ROW EXECUTE FUNCTION update_updated_at();
4.4 JSONB Usage Guidelines
When to use JSONB:
- Semi-structured data with variable schema
- Audit/provenance trails
- External system payloads that must be preserved exactly
- Configuration objects with optional fields
- Nested arrays of complex objects
When NOT to use JSONB:
- Data that will be frequently queried/filtered
- Data that requires referential integrity
- Simple key-value pairs (use separate columns)
- Data that will be aggregated
JSONB Indexing:
-- GIN index for containment queries (@>, ?, ?&, ?|)
CREATE INDEX idx_<table>_<column>_gin ON <table> USING GIN (<column>);
-- Expression index for specific JSON path
CREATE INDEX idx_<table>_<column>_<path> ON <table> ((<column>->>'path'));
4.5 Generated Columns for JSONB Hot Fields
When JSONB fields are frequently queried with equality or range filters, use generated columns to extract them as first-class columns. This enables:
- B-tree indexes with accurate statistics
- Index-only scans via covering indexes
- Proper cardinality estimates for query planning
Pattern:
-- Extract hot field as generated column
ALTER TABLE <schema>.<table>
ADD COLUMN IF NOT EXISTS <field_name> <type>
GENERATED ALWAYS AS ((<jsonb_column>->>'<json_key>')::<type>) STORED;
-- Create B-tree index on generated column
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_<table>_<field_name>
ON <schema>.<table> (<field_name>)
WHERE <field_name> IS NOT NULL;
-- Covering index for dashboard queries
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_<table>_listing
ON <schema>.<table> (tenant_id, created_at DESC)
INCLUDE (<generated_col1>, <generated_col2>, <generated_col3>);
-- Update statistics
ANALYZE <schema>.<table>;
Example (scheduler.runs stats extraction):
ALTER TABLE scheduler.runs
ADD COLUMN IF NOT EXISTS finding_count INT
GENERATED ALWAYS AS (NULLIF((stats->>'findingCount'), '')::int) STORED;
CREATE INDEX ix_runs_with_findings
ON scheduler.runs (tenant_id, created_at DESC)
WHERE finding_count > 0;
Guidelines:
- Use
NULLIF(<expr>, '')before casting to handle empty strings - Add
WHERE <column> IS NOT NULLto partial indexes for sparse data - Use
INCLUDEclause for covering indexes that return multiple generated columns - Run
ANALYZEafter adding generated columns to populate statistics
5. Schema Definitions
5.1 Authority Schema
CREATE SCHEMA IF NOT EXISTS authority;
-- Core identity tables
CREATE TABLE authority.tenants (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
code TEXT NOT NULL UNIQUE,
display_name TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active'
CHECK (status IN ('active', 'suspended', 'trial', 'terminated')),
settings JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE authority.users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES authority.tenants(id),
subject_id UUID NOT NULL UNIQUE,
username TEXT NOT NULL,
normalized_username TEXT NOT NULL,
display_name TEXT,
email TEXT,
email_verified BOOLEAN NOT NULL DEFAULT FALSE,
disabled BOOLEAN NOT NULL DEFAULT FALSE,
plugin TEXT,
attributes JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (tenant_id, normalized_username)
);
CREATE TABLE authority.roles (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID REFERENCES authority.tenants(id),
name TEXT NOT NULL,
description TEXT,
is_system BOOLEAN NOT NULL DEFAULT FALSE,
permissions TEXT[] DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (tenant_id, name)
);
CREATE TABLE authority.user_roles (
user_id UUID NOT NULL REFERENCES authority.users(id) ON DELETE CASCADE,
role_id UUID NOT NULL REFERENCES authority.roles(id) ON DELETE CASCADE,
granted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
granted_by TEXT,
PRIMARY KEY (user_id, role_id)
);
CREATE TABLE authority.service_accounts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES authority.tenants(id),
account_id TEXT NOT NULL,
display_name TEXT NOT NULL,
description TEXT,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
allowed_scopes TEXT[] DEFAULT '{}',
authorized_clients TEXT[] DEFAULT '{}',
attributes JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (tenant_id, account_id)
);
CREATE TABLE authority.clients (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
client_id TEXT NOT NULL UNIQUE,
client_secret_hash TEXT,
display_name TEXT,
type TEXT NOT NULL DEFAULT 'confidential'
CHECK (type IN ('public', 'confidential')),
redirect_uris TEXT[] DEFAULT '{}',
post_logout_redirect_uris TEXT[] DEFAULT '{}',
permissions TEXT[] DEFAULT '{}',
requirements TEXT[] DEFAULT '{}',
settings JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE authority.scopes (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL UNIQUE,
display_name TEXT,
description TEXT,
resources TEXT[] DEFAULT '{}'
);
CREATE TABLE authority.tokens (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
subject_id UUID NOT NULL,
client_id TEXT,
token_type TEXT NOT NULL CHECK (token_type IN ('access', 'refresh', 'authorization_code')),
token_hash TEXT NOT NULL UNIQUE,
scopes TEXT[] DEFAULT '{}',
issued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL,
revoked_at TIMESTAMPTZ,
revocation_reason TEXT,
metadata JSONB DEFAULT '{}'
);
CREATE TABLE authority.revocations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
token_id UUID REFERENCES authority.tokens(id),
jti TEXT,
revoked_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
reason TEXT,
revoked_by TEXT
);
CREATE TABLE authority.login_attempts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID REFERENCES authority.tenants(id),
username TEXT NOT NULL,
ip_address INET,
user_agent TEXT,
success BOOLEAN NOT NULL,
failure_reason TEXT,
attempted_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE authority.licenses (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES authority.tenants(id),
license_key TEXT NOT NULL UNIQUE,
edition TEXT NOT NULL CHECK (edition IN ('community', 'standard', 'enterprise', 'sovereign')),
max_nodes INT,
max_projects INT,
features JSONB DEFAULT '{}',
start_date DATE NOT NULL,
end_date DATE,
issued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
issued_by TEXT,
revoked_at TIMESTAMPTZ,
revocation_reason TEXT
);
CREATE TABLE authority.license_usage (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
license_id UUID NOT NULL REFERENCES authority.licenses(id),
scanner_node_id TEXT NOT NULL,
project_id TEXT,
scanner_version TEXT,
first_seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (license_id, scanner_node_id)
);
-- Offline Kit audit (SPRINT_0341_0001_0001)
CREATE TABLE authority.offline_kit_audit (
event_id UUID PRIMARY KEY,
tenant_id TEXT NOT NULL,
event_type TEXT NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
actor TEXT NOT NULL,
details JSONB NOT NULL,
result TEXT NOT NULL
);
-- Indexes
CREATE INDEX idx_users_tenant ON authority.users(tenant_id);
CREATE INDEX idx_users_email ON authority.users(email) WHERE email IS NOT NULL;
CREATE INDEX idx_users_subject ON authority.users(subject_id);
CREATE INDEX idx_service_accounts_tenant ON authority.service_accounts(tenant_id);
CREATE INDEX idx_tokens_subject ON authority.tokens(subject_id);
CREATE INDEX idx_tokens_expires ON authority.tokens(expires_at) WHERE revoked_at IS NULL;
CREATE INDEX idx_tokens_hash ON authority.tokens(token_hash);
CREATE INDEX idx_login_attempts_tenant_time ON authority.login_attempts(tenant_id, attempted_at DESC);
CREATE INDEX idx_licenses_tenant ON authority.licenses(tenant_id);
CREATE INDEX idx_offline_kit_audit_ts ON authority.offline_kit_audit(timestamp DESC);
CREATE INDEX idx_offline_kit_audit_type ON authority.offline_kit_audit(event_type);
CREATE INDEX idx_offline_kit_audit_tenant_ts ON authority.offline_kit_audit(tenant_id, timestamp DESC);
CREATE INDEX idx_offline_kit_audit_result ON authority.offline_kit_audit(tenant_id, result, timestamp DESC);
5.2 Vulnerability Schema (vuln)
CREATE SCHEMA IF NOT EXISTS vuln;
CREATE TABLE vuln.sources (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
key TEXT NOT NULL UNIQUE,
display_name TEXT NOT NULL,
url TEXT,
source_type TEXT NOT NULL CHECK (source_type IN ('nvd', 'osv', 'ghsa', 'vendor', 'oval', 'custom')),
enabled BOOLEAN NOT NULL DEFAULT TRUE,
priority INT NOT NULL DEFAULT 100,
config JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE vuln.feed_snapshots (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_id UUID NOT NULL REFERENCES vuln.sources(id),
snapshot_id TEXT NOT NULL,
taken_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
status TEXT NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'processing', 'completed', 'failed')),
stats JSONB DEFAULT '{}',
checksum TEXT,
error TEXT,
UNIQUE (source_id, snapshot_id)
);
CREATE TABLE vuln.advisory_snapshots (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_id UUID NOT NULL REFERENCES vuln.sources(id),
source_advisory_id TEXT NOT NULL,
feed_snapshot_id UUID REFERENCES vuln.feed_snapshots(id),
imported_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
raw_payload JSONB NOT NULL,
payload_hash TEXT NOT NULL,
is_latest BOOLEAN NOT NULL DEFAULT TRUE,
UNIQUE (source_id, source_advisory_id, payload_hash)
);
CREATE TABLE vuln.advisories (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
advisory_key TEXT NOT NULL UNIQUE,
primary_vuln_id TEXT NOT NULL,
source_id UUID REFERENCES vuln.sources(id),
title TEXT,
summary TEXT,
description TEXT,
language TEXT DEFAULT 'en',
severity TEXT CHECK (severity IN ('critical', 'high', 'medium', 'low', 'none', 'unknown')),
exploit_known BOOLEAN NOT NULL DEFAULT FALSE,
state TEXT NOT NULL DEFAULT 'active' CHECK (state IN ('active', 'rejected', 'withdrawn', 'disputed')),
published_at TIMESTAMPTZ,
modified_at TIMESTAMPTZ,
withdrawn_at TIMESTAMPTZ,
current_snapshot_id UUID REFERENCES vuln.advisory_snapshots(id),
canonical_metric_id UUID,
provenance JSONB DEFAULT '[]',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE vuln.advisory_aliases (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
advisory_id UUID NOT NULL REFERENCES vuln.advisories(id) ON DELETE CASCADE,
alias_type TEXT NOT NULL CHECK (alias_type IN ('cve', 'ghsa', 'osv', 'vendor', 'internal', 'other')),
alias_value TEXT NOT NULL,
provenance JSONB DEFAULT '{}',
UNIQUE (alias_type, alias_value)
);
CREATE TABLE vuln.advisory_cvss (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
advisory_id UUID NOT NULL REFERENCES vuln.advisories(id) ON DELETE CASCADE,
version TEXT NOT NULL CHECK (version IN ('2.0', '3.0', '3.1', '4.0')),
vector TEXT NOT NULL,
base_score NUMERIC(3,1) NOT NULL CHECK (base_score >= 0 AND base_score <= 10),
base_severity TEXT,
temporal_score NUMERIC(3,1) CHECK (temporal_score >= 0 AND temporal_score <= 10),
environmental_score NUMERIC(3,1) CHECK (environmental_score >= 0 AND environmental_score <= 10),
source TEXT,
is_primary BOOLEAN NOT NULL DEFAULT FALSE,
provenance JSONB DEFAULT '{}'
);
CREATE TABLE vuln.advisory_affected (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
advisory_id UUID NOT NULL REFERENCES vuln.advisories(id) ON DELETE CASCADE,
package_type TEXT NOT NULL CHECK (package_type IN ('rpm', 'deb', 'cpe', 'semver', 'vendor', 'ics-vendor', 'generic')),
ecosystem TEXT,
package_name TEXT NOT NULL,
package_purl TEXT,
platform TEXT,
version_ranges JSONB NOT NULL DEFAULT '[]',
statuses JSONB DEFAULT '[]',
normalized_versions JSONB DEFAULT '[]',
provenance JSONB DEFAULT '[]'
);
CREATE TABLE vuln.advisory_references (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
advisory_id UUID NOT NULL REFERENCES vuln.advisories(id) ON DELETE CASCADE,
url TEXT NOT NULL,
title TEXT,
ref_type TEXT,
provenance JSONB DEFAULT '{}'
);
CREATE TABLE vuln.advisory_credits (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
advisory_id UUID NOT NULL REFERENCES vuln.advisories(id) ON DELETE CASCADE,
name TEXT NOT NULL,
contact TEXT,
credit_type TEXT,
provenance JSONB DEFAULT '{}'
);
CREATE TABLE vuln.advisory_weaknesses (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
advisory_id UUID NOT NULL REFERENCES vuln.advisories(id) ON DELETE CASCADE,
cwe_id TEXT NOT NULL,
description TEXT,
provenance JSONB DEFAULT '{}'
);
CREATE TABLE vuln.kev_flags (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
cve_id TEXT NOT NULL UNIQUE,
advisory_id UUID REFERENCES vuln.advisories(id),
added_date DATE NOT NULL,
due_date DATE,
vendor_project TEXT,
product TEXT,
vulnerability_name TEXT,
short_description TEXT,
required_action TEXT,
notes TEXT,
known_ransomware_campaign BOOLEAN DEFAULT FALSE,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE vuln.source_states (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_id UUID NOT NULL REFERENCES vuln.sources(id) UNIQUE,
cursor TEXT,
last_fetch_at TIMESTAMPTZ,
last_success_at TIMESTAMPTZ,
consecutive_failures INT DEFAULT 0,
last_error TEXT,
last_error_at TIMESTAMPTZ,
metadata JSONB DEFAULT '{}'
);
CREATE TABLE vuln.merge_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
advisory_id UUID NOT NULL REFERENCES vuln.advisories(id),
event_type TEXT NOT NULL CHECK (event_type IN ('created', 'updated', 'merged', 'superseded', 'withdrawn')),
source_id UUID REFERENCES vuln.sources(id),
changes JSONB DEFAULT '{}',
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Indexes
CREATE INDEX idx_advisories_primary_vuln ON vuln.advisories(primary_vuln_id);
CREATE INDEX idx_advisories_modified ON vuln.advisories(modified_at DESC);
CREATE INDEX idx_advisories_published ON vuln.advisories(published_at DESC);
CREATE INDEX idx_advisories_severity ON vuln.advisories(severity) WHERE state = 'active';
CREATE INDEX idx_advisories_state ON vuln.advisories(state);
CREATE INDEX idx_advisory_aliases_value ON vuln.advisory_aliases(alias_value);
CREATE INDEX idx_advisory_aliases_advisory ON vuln.advisory_aliases(advisory_id);
CREATE INDEX idx_advisory_affected_purl ON vuln.advisory_affected(package_purl) WHERE package_purl IS NOT NULL;
CREATE INDEX idx_advisory_affected_name ON vuln.advisory_affected(ecosystem, package_name);
CREATE INDEX idx_advisory_affected_advisory ON vuln.advisory_affected(advisory_id);
CREATE INDEX idx_advisory_snapshots_latest ON vuln.advisory_snapshots(source_id, source_advisory_id) WHERE is_latest = TRUE;
CREATE INDEX idx_kev_flags_cve ON vuln.kev_flags(cve_id);
CREATE INDEX idx_merge_events_advisory ON vuln.merge_events(advisory_id, occurred_at DESC);
-- Full-text search
CREATE INDEX idx_advisories_fts ON vuln.advisories USING GIN (
to_tsvector('english', COALESCE(title, '') || ' ' || COALESCE(summary, '') || ' ' || COALESCE(description, ''))
);
5.3 VEX & Graph Schema (vex)
CREATE SCHEMA IF NOT EXISTS vex;
CREATE TABLE vex.projects (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
key TEXT NOT NULL,
display_name TEXT NOT NULL,
description TEXT,
settings JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (tenant_id, key)
);
CREATE TABLE vex.graph_revisions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
project_id UUID NOT NULL REFERENCES vex.projects(id),
revision_id TEXT NOT NULL UNIQUE,
parent_revision_id TEXT,
sbom_hash TEXT NOT NULL,
sbom_format TEXT NOT NULL CHECK (sbom_format IN ('cyclonedx', 'spdx', 'syft', 'other')),
sbom_location TEXT,
feed_snapshot_id UUID,
lattice_policy_version TEXT,
unknowns_snapshot_id UUID,
node_count INT NOT NULL DEFAULT 0,
edge_count INT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by TEXT,
notes TEXT
);
CREATE TABLE vex.graph_nodes (
id BIGSERIAL PRIMARY KEY,
graph_revision_id UUID NOT NULL REFERENCES vex.graph_revisions(id) ON DELETE CASCADE,
node_key TEXT NOT NULL,
node_type TEXT NOT NULL CHECK (node_type IN ('component', 'vulnerability', 'runtime_entity', 'file', 'package', 'service')),
purl TEXT,
name TEXT,
version TEXT,
attributes JSONB DEFAULT '{}',
UNIQUE (graph_revision_id, node_key)
);
CREATE TABLE vex.graph_edges (
id BIGSERIAL PRIMARY KEY,
graph_revision_id UUID NOT NULL REFERENCES vex.graph_revisions(id) ON DELETE CASCADE,
from_node_id BIGINT NOT NULL REFERENCES vex.graph_nodes(id) ON DELETE CASCADE,
to_node_id BIGINT NOT NULL REFERENCES vex.graph_nodes(id) ON DELETE CASCADE,
edge_type TEXT NOT NULL CHECK (edge_type IN (
'depends_on', 'dev_depends_on', 'optional_depends_on',
'contains', 'introduces', 'mitigates', 'affects',
'build_tool', 'test_dependency'
)),
attributes JSONB DEFAULT '{}'
);
CREATE TABLE vex.statements (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
project_id UUID REFERENCES vex.projects(id),
graph_revision_id UUID REFERENCES vex.graph_revisions(id),
advisory_id UUID,
vulnerability_id TEXT NOT NULL,
subject_node_id BIGINT REFERENCES vex.graph_nodes(id),
product_key TEXT,
status TEXT NOT NULL CHECK (status IN ('affected', 'not_affected', 'under_investigation', 'fixed')),
status_justification TEXT CHECK (status_justification IN (
'component_not_present', 'vulnerable_code_not_present',
'vulnerable_code_not_in_execute_path', 'vulnerable_code_cannot_be_controlled_by_adversary',
'inline_mitigations_already_exist', NULL
)),
impact_statement TEXT,
action_statement TEXT,
action_statement_timestamp TIMESTAMPTZ,
evidence JSONB DEFAULT '{}',
provenance JSONB DEFAULT '{}',
evaluated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
evaluated_by TEXT,
superseded_by UUID REFERENCES vex.statements(id),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE vex.observations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
provider_id TEXT NOT NULL,
vulnerability_id TEXT NOT NULL,
product_key TEXT NOT NULL,
status TEXT NOT NULL CHECK (status IN ('affected', 'not_affected', 'under_investigation', 'fixed')),
status_justification TEXT,
content_hash TEXT NOT NULL,
linkset_id UUID,
dsse_envelope_hash TEXT,
provenance JSONB DEFAULT '{}',
observed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ,
UNIQUE (tenant_id, provider_id, vulnerability_id, product_key, content_hash)
);
CREATE TABLE vex.linksets (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
linkset_id TEXT NOT NULL,
provider_id TEXT NOT NULL,
sbom_digest TEXT,
vex_digest TEXT,
sbom_location TEXT,
vex_location TEXT,
status TEXT NOT NULL DEFAULT 'active' CHECK (status IN ('active', 'superseded', 'revoked')),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
metadata JSONB DEFAULT '{}',
UNIQUE (tenant_id, linkset_id)
);
CREATE TABLE vex.linkset_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
linkset_id UUID NOT NULL REFERENCES vex.linksets(id),
event_type TEXT NOT NULL CHECK (event_type IN ('created', 'updated', 'superseded', 'revoked')),
details JSONB DEFAULT '{}',
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE vex.consensus (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
vulnerability_id TEXT NOT NULL,
product_key TEXT NOT NULL,
computed_status TEXT NOT NULL CHECK (computed_status IN ('affected', 'not_affected', 'under_investigation', 'fixed', 'conflict')),
confidence_score NUMERIC(3,2) CHECK (confidence_score >= 0 AND confidence_score <= 1),
contributing_observations UUID[] DEFAULT '{}',
conflict_details JSONB,
computed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (tenant_id, vulnerability_id, product_key)
);
CREATE TABLE vex.consensus_holds (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
consensus_id UUID NOT NULL REFERENCES vex.consensus(id),
hold_type TEXT NOT NULL CHECK (hold_type IN ('manual_review', 'conflict_resolution', 'policy_override')),
reason TEXT NOT NULL,
placed_by TEXT NOT NULL,
placed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
released_at TIMESTAMPTZ,
released_by TEXT
);
CREATE TABLE vex.unknowns_snapshots (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
project_id UUID NOT NULL REFERENCES vex.projects(id),
graph_revision_id UUID REFERENCES vex.graph_revisions(id),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by TEXT,
rationale TEXT,
item_count INT NOT NULL DEFAULT 0
);
CREATE TABLE vex.unknown_items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
snapshot_id UUID NOT NULL REFERENCES vex.unknowns_snapshots(id) ON DELETE CASCADE,
item_key TEXT NOT NULL,
item_type TEXT NOT NULL CHECK (item_type IN (
'missing_sbom', 'ambiguous_package', 'missing_feed',
'unresolved_edge', 'no_version_info', 'unknown_ecosystem'
)),
severity TEXT CHECK (severity IN ('critical', 'high', 'medium', 'low', 'info')),
details JSONB DEFAULT '{}',
resolved_at TIMESTAMPTZ,
resolution TEXT
);
CREATE TABLE vex.evidence_manifests (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
manifest_id TEXT NOT NULL UNIQUE,
merkle_root TEXT NOT NULL,
signature TEXT,
signer_id TEXT,
sealed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
item_count INT NOT NULL DEFAULT 0,
items JSONB NOT NULL DEFAULT '[]',
metadata JSONB DEFAULT '{}'
);
CREATE TABLE vex.cvss_receipts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
statement_id UUID NOT NULL REFERENCES vex.statements(id),
cvss_metric_id UUID,
cvss_version TEXT NOT NULL,
vector TEXT NOT NULL,
score_used NUMERIC(3,1) NOT NULL,
context JSONB DEFAULT '{}',
scored_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE vex.attestations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
statement_id UUID REFERENCES vex.statements(id),
graph_revision_id UUID REFERENCES vex.graph_revisions(id),
attestation_type TEXT NOT NULL CHECK (attestation_type IN ('in-toto', 'dsse', 'sigstore')),
envelope_hash TEXT NOT NULL,
rekor_log_id TEXT,
rekor_log_index BIGINT,
signer_id TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
metadata JSONB DEFAULT '{}'
);
CREATE TABLE vex.timeline_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
project_id UUID REFERENCES vex.projects(id),
event_type TEXT NOT NULL,
entity_type TEXT NOT NULL,
entity_id UUID NOT NULL,
actor TEXT,
details JSONB DEFAULT '{}',
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Indexes
CREATE INDEX idx_projects_tenant ON vex.projects(tenant_id);
CREATE INDEX idx_graph_revisions_project ON vex.graph_revisions(project_id);
CREATE INDEX idx_graph_revisions_sbom ON vex.graph_revisions(sbom_hash);
CREATE INDEX idx_graph_nodes_revision ON vex.graph_nodes(graph_revision_id);
CREATE INDEX idx_graph_nodes_purl ON vex.graph_nodes(purl) WHERE purl IS NOT NULL;
CREATE INDEX idx_graph_edges_revision ON vex.graph_edges(graph_revision_id);
CREATE INDEX idx_graph_edges_from ON vex.graph_edges(from_node_id);
CREATE INDEX idx_graph_edges_to ON vex.graph_edges(to_node_id);
CREATE INDEX idx_statements_tenant_vuln ON vex.statements(tenant_id, vulnerability_id);
CREATE INDEX idx_statements_project ON vex.statements(project_id);
CREATE INDEX idx_statements_graph ON vex.statements(graph_revision_id);
CREATE INDEX idx_observations_tenant_vuln ON vex.observations(tenant_id, vulnerability_id);
CREATE INDEX idx_observations_provider ON vex.observations(provider_id);
CREATE INDEX idx_linksets_tenant ON vex.linksets(tenant_id);
CREATE INDEX idx_consensus_tenant_vuln ON vex.consensus(tenant_id, vulnerability_id);
CREATE INDEX idx_unknowns_project ON vex.unknowns_snapshots(project_id);
CREATE INDEX idx_attestations_tenant ON vex.attestations(tenant_id);
CREATE INDEX idx_attestations_rekor ON vex.attestations(rekor_log_id) WHERE rekor_log_id IS NOT NULL;
CREATE INDEX idx_timeline_tenant_time ON vex.timeline_events(tenant_id, occurred_at DESC);
CREATE INDEX idx_timeline_entity ON vex.timeline_events(entity_type, entity_id);
5.4 Scheduler Schema
CREATE SCHEMA IF NOT EXISTS scheduler;
CREATE TABLE scheduler.schedules (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
name TEXT NOT NULL,
description TEXT,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
cron_expression TEXT,
timezone TEXT NOT NULL DEFAULT 'UTC',
mode TEXT NOT NULL CHECK (mode IN ('scheduled', 'manual', 'on_event', 'continuous')),
selection JSONB NOT NULL DEFAULT '{}',
only_if JSONB DEFAULT '{}',
notify JSONB DEFAULT '{}',
limits JSONB DEFAULT '{}',
subscribers TEXT[] DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by TEXT,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_by TEXT,
deleted_at TIMESTAMPTZ,
deleted_by TEXT,
UNIQUE (tenant_id, name) WHERE deleted_at IS NULL
);
CREATE TABLE scheduler.triggers (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
schedule_id UUID NOT NULL REFERENCES scheduler.schedules(id) ON DELETE CASCADE,
trigger_type TEXT NOT NULL CHECK (trigger_type IN ('cron', 'fixed_delay', 'manual', 'on_event', 'webhook')),
cron_expression TEXT,
fixed_delay_seconds INT,
event_filter JSONB,
timezone TEXT DEFAULT 'UTC',
next_fire_time TIMESTAMPTZ,
last_fire_time TIMESTAMPTZ,
misfire_policy TEXT DEFAULT 'skip' CHECK (misfire_policy IN ('skip', 'fire_now', 'queue')),
enabled BOOLEAN NOT NULL DEFAULT TRUE
);
CREATE TABLE scheduler.runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
schedule_id UUID REFERENCES scheduler.schedules(id),
trigger_id UUID REFERENCES scheduler.triggers(id),
state TEXT NOT NULL CHECK (state IN ('pending', 'queued', 'running', 'completed', 'failed', 'cancelled', 'stale', 'timeout')),
reason JSONB DEFAULT '{}',
stats JSONB DEFAULT '{}',
deltas JSONB DEFAULT '[]',
worker_id UUID,
retry_of UUID REFERENCES scheduler.runs(id),
retry_count INT NOT NULL DEFAULT 0,
error TEXT,
error_details JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
timeout_at TIMESTAMPTZ
);
CREATE TABLE scheduler.graph_jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
sbom_id TEXT NOT NULL,
sbom_version_id TEXT,
sbom_digest TEXT NOT NULL,
graph_snapshot_id TEXT,
status TEXT NOT NULL CHECK (status IN ('pending', 'running', 'completed', 'failed', 'cancelled')),
trigger TEXT NOT NULL CHECK (trigger IN ('manual', 'scheduled', 'on_sbom_change', 'on_feed_update')),
priority INT NOT NULL DEFAULT 100,
attempts INT NOT NULL DEFAULT 0,
max_attempts INT NOT NULL DEFAULT 3,
cartographer_job_id TEXT,
correlation_id TEXT,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
error TEXT,
error_details JSONB
);
CREATE TABLE scheduler.policy_jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
policy_pack_id TEXT NOT NULL,
policy_version INT,
target_type TEXT NOT NULL CHECK (target_type IN ('image', 'sbom', 'project', 'artifact')),
target_id TEXT NOT NULL,
status TEXT NOT NULL CHECK (status IN ('pending', 'running', 'completed', 'failed')),
priority INT NOT NULL DEFAULT 100,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
result JSONB DEFAULT '{}',
error TEXT
);
CREATE TABLE scheduler.impact_snapshots (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
run_id UUID NOT NULL REFERENCES scheduler.runs(id),
image_digest TEXT NOT NULL,
image_reference TEXT,
new_findings INT NOT NULL DEFAULT 0,
new_criticals INT NOT NULL DEFAULT 0,
new_high INT NOT NULL DEFAULT 0,
new_medium INT NOT NULL DEFAULT 0,
new_low INT NOT NULL DEFAULT 0,
total_findings INT NOT NULL DEFAULT 0,
kev_hits TEXT[] DEFAULT '{}',
top_findings JSONB DEFAULT '[]',
report_url TEXT,
attestation JSONB DEFAULT '{}',
detected_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE scheduler.workers (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
node_id TEXT NOT NULL UNIQUE,
hostname TEXT,
capabilities TEXT[] DEFAULT '{}',
max_concurrent_jobs INT NOT NULL DEFAULT 1,
current_jobs INT NOT NULL DEFAULT 0,
version TEXT,
last_heartbeat_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
registered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
status TEXT NOT NULL DEFAULT 'active' CHECK (status IN ('active', 'draining', 'paused', 'dead'))
);
CREATE TABLE scheduler.execution_logs (
id BIGSERIAL PRIMARY KEY,
run_id UUID NOT NULL REFERENCES scheduler.runs(id) ON DELETE CASCADE,
logged_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
level TEXT NOT NULL CHECK (level IN ('trace', 'debug', 'info', 'warn', 'error', 'fatal')),
message TEXT NOT NULL,
logger TEXT,
data JSONB DEFAULT '{}'
);
CREATE TABLE scheduler.locks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
lock_key TEXT NOT NULL UNIQUE,
lock_type TEXT NOT NULL DEFAULT 'exclusive' CHECK (lock_type IN ('exclusive', 'shared')),
holder_id TEXT NOT NULL,
holder_info JSONB DEFAULT '{}',
acquired_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL,
renewed_at TIMESTAMPTZ
);
CREATE TABLE scheduler.run_summaries (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
schedule_id UUID REFERENCES scheduler.schedules(id),
period_start TIMESTAMPTZ NOT NULL,
period_end TIMESTAMPTZ NOT NULL,
total_runs INT NOT NULL DEFAULT 0,
successful_runs INT NOT NULL DEFAULT 0,
failed_runs INT NOT NULL DEFAULT 0,
cancelled_runs INT NOT NULL DEFAULT 0,
avg_duration_seconds NUMERIC(10,2),
max_duration_seconds INT,
min_duration_seconds INT,
total_findings_detected INT NOT NULL DEFAULT 0,
new_criticals INT NOT NULL DEFAULT 0,
computed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (tenant_id, schedule_id, period_start)
);
CREATE TABLE scheduler.audit (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
action TEXT NOT NULL,
entity_type TEXT NOT NULL,
entity_id UUID NOT NULL,
actor TEXT,
actor_type TEXT CHECK (actor_type IN ('user', 'service', 'system')),
old_value JSONB,
new_value JSONB,
details JSONB DEFAULT '{}',
ip_address INET,
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Indexes
CREATE INDEX idx_schedules_tenant ON scheduler.schedules(tenant_id) WHERE deleted_at IS NULL;
CREATE INDEX idx_schedules_enabled ON scheduler.schedules(tenant_id, enabled) WHERE deleted_at IS NULL;
CREATE INDEX idx_triggers_schedule ON scheduler.triggers(schedule_id);
CREATE INDEX idx_triggers_next_fire ON scheduler.triggers(next_fire_time) WHERE enabled = TRUE;
CREATE INDEX idx_runs_tenant_state ON scheduler.runs(tenant_id, state);
CREATE INDEX idx_runs_schedule ON scheduler.runs(schedule_id);
CREATE INDEX idx_runs_created ON scheduler.runs(created_at DESC);
CREATE INDEX idx_runs_state_created ON scheduler.runs(state, created_at) WHERE state IN ('pending', 'queued', 'running');
CREATE INDEX idx_graph_jobs_tenant_status ON scheduler.graph_jobs(tenant_id, status);
CREATE INDEX idx_graph_jobs_sbom ON scheduler.graph_jobs(sbom_digest);
CREATE INDEX idx_policy_jobs_tenant_status ON scheduler.policy_jobs(tenant_id, status);
CREATE INDEX idx_impact_snapshots_run ON scheduler.impact_snapshots(run_id);
CREATE INDEX idx_impact_snapshots_tenant ON scheduler.impact_snapshots(tenant_id, detected_at DESC);
CREATE INDEX idx_workers_status ON scheduler.workers(status);
CREATE INDEX idx_workers_heartbeat ON scheduler.workers(last_heartbeat_at);
CREATE INDEX idx_execution_logs_run ON scheduler.execution_logs(run_id);
CREATE INDEX idx_locks_expires ON scheduler.locks(expires_at);
CREATE INDEX idx_run_summaries_tenant ON scheduler.run_summaries(tenant_id, period_start DESC);
CREATE INDEX idx_audit_tenant_time ON scheduler.audit(tenant_id, occurred_at DESC);
CREATE INDEX idx_audit_entity ON scheduler.audit(entity_type, entity_id);
-- Partitioning for high-volume tables (optional)
-- CREATE TABLE scheduler.runs_partitioned (...) PARTITION BY RANGE (created_at);
-- CREATE TABLE scheduler.execution_logs_partitioned (...) PARTITION BY RANGE (logged_at);
5.5 Notify Schema
See schemas/notify.sql for the complete schema definition.
5.6 Policy Schema
See schemas/policy.sql for the complete schema definition.
6. Indexing Strategy
6.1 Index Types
| Index Type | Use Case | Example |
|---|---|---|
| B-tree (default) | Equality, range, sorting | CREATE INDEX idx_x ON t(col) |
| GIN | JSONB containment, arrays, full-text | CREATE INDEX idx_x ON t USING GIN (col) |
| GiST | Geometric, range types | CREATE INDEX idx_x ON t USING GiST (col) |
| Hash | Equality only (rare) | CREATE INDEX idx_x ON t USING HASH (col) |
| BRIN | Large tables with natural ordering | CREATE INDEX idx_x ON t USING BRIN (col) |
6.2 Composite Index Guidelines
-- Order columns by:
-- 1. Equality conditions first
-- 2. Range conditions second
-- 3. Most selective columns first within each group
-- Good: tenant_id always equality, created_at often range
CREATE INDEX idx_runs_tenant_created ON scheduler.runs(tenant_id, created_at DESC);
-- Good: Partial index for active records only
CREATE INDEX idx_schedules_active ON scheduler.schedules(tenant_id, name)
WHERE deleted_at IS NULL AND enabled = TRUE;
6.3 JSONB Indexing
-- GIN index for general JSONB queries
CREATE INDEX idx_advisories_provenance_gin ON vuln.advisories USING GIN (provenance);
-- Expression index for specific paths
CREATE INDEX idx_affected_ecosystem ON vuln.advisory_affected ((attributes->>'ecosystem'));
-- Partial GIN for specific conditions
CREATE INDEX idx_metadata_active ON scheduler.runs USING GIN (stats)
WHERE state = 'completed';
6.4 Generated Columns for JSONB Hot Keys
For frequently-queried JSONB fields, use PostgreSQL generated columns to enable efficient B-tree indexing and query planning statistics.
Problem with expression indexes:
-- Expression indexes don't collect statistics
CREATE INDEX idx_format ON sbom_docs ((doc->>'bomFormat'));
-- Query planner can't estimate cardinality, may choose suboptimal plans
Solution: Generated columns (PostgreSQL 12+):
-- Add generated column that extracts JSONB field
ALTER TABLE scanner.sbom_documents
ADD COLUMN bom_format TEXT GENERATED ALWAYS AS ((doc->>'bomFormat')) STORED;
-- Standard B-tree index with full statistics
CREATE INDEX idx_sbom_bom_format ON scanner.sbom_documents(bom_format);
Benefits:
- B-tree indexable: Standard index on generated column
- Statistics:
ANALYZEcollects cardinality, MCV, histogram - Index-only scans: Visible to covering indexes
- Zero application changes: Transparent to ORM/queries
When to use generated columns:
- Field queried in >10% of queries against the table
- Cardinality >100 distinct values (worth collecting stats)
- Field used in JOIN conditions or GROUP BY
- Index-only scans are beneficial
Naming convention:
<json_path_snake_case>
Examples:
doc->>'bomFormat' → bom_format
raw->>'schemaVersion' → schema_version
stats->>'findingCount'→ finding_count
Migration pattern:
-- Step 1: Add generated column (no lock on existing rows)
ALTER TABLE scheduler.runs
ADD COLUMN finding_count INT GENERATED ALWAYS AS ((stats->>'findingCount')::int) STORED;
-- Step 2: Create index concurrently
CREATE INDEX CONCURRENTLY idx_runs_finding_count
ON scheduler.runs(tenant_id, finding_count);
-- Step 3: Analyze for statistics
ANALYZE scheduler.runs;
Reference implementations:
src/Scheduler/...Storage.Postgres/Migrations/010_generated_columns_runs.sqlsrc/Excititor/...Storage.Postgres/Migrations/004_generated_columns_vex.sqlsrc/Concelier/...Storage.Postgres/Migrations/007_generated_columns_advisories.sql
7. Partitioning Strategy
7.1 When to Partition
- Tables exceeding 100M rows
- Time-series data with clear retention windows
- Append-heavy tables with date-based queries
7.2 Partition Schemes
Time-based (RANGE):
CREATE TABLE scheduler.runs (
-- columns
) PARTITION BY RANGE (created_at);
CREATE TABLE scheduler.runs_y2024m01 PARTITION OF scheduler.runs
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
Tenant-based (LIST):
CREATE TABLE vex.statements (
-- columns
) PARTITION BY LIST (tenant_id);
-- Only for very large tenants
CREATE TABLE vex.statements_tenant_abc PARTITION OF vex.statements
FOR VALUES IN ('abc-uuid');
7.3 Retention via Partition Drops
-- Monthly cleanup job
DROP TABLE scheduler.runs_y2023m01;
DROP TABLE scheduler.execution_logs_y2023m01;
8. Connection Management
8.1 Connection Pooling
Recommended: PgBouncer in transaction mode
[pgbouncer]
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 20
reserve_pool_size = 5
8.2 Session Configuration
Every connection must configure:
-- Set on connection open (via DataSource)
SET app.tenant_id = '<tenant-uuid>';
SET app.current_tenant = '<tenant-uuid>'; -- compatibility (legacy)
SET timezone = 'UTC';
SET statement_timeout = '30s'; -- Adjust per use case
8.3 Connection String Template
Host=<host>;Port=5432;Database=stellaops;Username=<user>;Password=<pass>;
Pooling=true;MinPoolSize=5;MaxPoolSize=20;ConnectionIdleLifetime=300;
CommandTimeout=30;Timeout=15;
9. Migration Strategy
9.1 Migration Naming
V<version>__<description>.sql
Examples:
V001__create_authority_schema.sql
V002__create_vuln_schema.sql
V003__add_kev_flags_index.sql
9.2 Migration Rules
- Idempotent: Use
IF NOT EXISTS,IF EXISTS - Backward compatible: Add columns as nullable first
- No data loss: Never drop columns without migration path
- Testable: Each migration runs in CI against test database
- Reversible: Include down migration where possible
9.3 Migration Template
-- V001__create_authority_schema.sql
-- Description: Create initial authority schema
-- Author: <name>
-- Date: 2025-XX-XX
BEGIN;
CREATE SCHEMA IF NOT EXISTS authority;
CREATE TABLE IF NOT EXISTS authority.tenants (
-- ...
);
-- Add indexes
CREATE INDEX IF NOT EXISTS idx_tenants_code ON authority.tenants(code);
COMMIT;
10. Performance Guidelines
10.1 Query Patterns
| Pattern | Guidance |
|---|---|
| Pagination | Use keyset pagination (WHERE id > :last_id), not OFFSET |
| Bulk inserts | Use COPY or multi-value INSERT |
| Existence checks | Use EXISTS, not COUNT(*) |
| Aggregations | Pre-aggregate in summary tables where possible |
10.2 EXPLAIN ANALYZE
All queries in production code should have EXPLAIN ANALYZE output documented for:
- Expected row counts
- Index usage
- Scan types
10.3 Monitoring Queries
-- Slow queries
SELECT * FROM pg_stat_statements ORDER BY total_time DESC LIMIT 20;
-- Index usage
SELECT * FROM pg_stat_user_indexes WHERE idx_scan = 0;
-- Table bloat
SELECT * FROM pgstattuple('schema.table');
Appendix A: Type Reference
A.1 Custom Types (if needed)
-- Advisory status type (use CHECK constraint instead for flexibility)
-- CREATE TYPE advisory_status AS ENUM ('active', 'rejected', 'withdrawn');
-- Prefer CHECK constraints:
status TEXT NOT NULL CHECK (status IN ('active', 'rejected', 'withdrawn'))
A.2 Extension Dependencies
-- Required extensions
CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; -- UUID generation (optional, gen_random_uuid() is built-in)
CREATE EXTENSION IF NOT EXISTS "pg_trgm"; -- Trigram similarity for fuzzy search
CREATE EXTENSION IF NOT EXISTS "btree_gin"; -- GIN indexes for scalar types
Appendix B: Schema Diagram
┌─────────────────────────────────────────────────────────────────────────────┐
│ AUTHORITY SCHEMA │
│ ┌─────────┐ ┌───────┐ ┌──────────────────┐ ┌─────────────────┐ │
│ │ tenants │───<│ users │───<│ user_roles │>───│ roles │ │
│ └─────────┘ └───────┘ └──────────────────┘ └─────────────────┘ │
│ │ │ │
│ │ ┌────┴─────┐ │
│ └────────<│ service_ │ ┌─────────┐ ┌────────┐ │
│ │ accounts │ │ clients │ │ scopes │ │
│ └──────────┘ └─────────┘ └────────┘ │
│ │
│ ┌─────────┐ ┌─────────────┐ ┌──────────┐ ┌────────────────┐ │
│ │ tokens │ │ revocations │ │ licenses │ │ license_usage │ │
│ └─────────┘ └─────────────┘ └──────────┘ └────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ VULN SCHEMA │
│ ┌─────────┐ ┌────────────────┐ ┌───────────────────┐ │
│ │ sources │───<│ feed_snapshots │───<│ advisory_snapshots│ │
│ └─────────┘ └────────────────┘ └───────────────────┘ │
│ │ │ │
│ └──────────────────┬─────────────────────┘ │
│ ▼ │
│ ┌────────────┐ │
│ │ advisories │ │
│ └────────────┘ │
│ │ │
│ ┌─────────────────────┼─────────────────────┬──────────────────┐ │
│ ▼ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌──────────────┐ ┌────────────────┐ ┌───────────┐ │
│ │ aliases │ │advisory_cvss │ │advisory_affected│ │ kev_flags │ │
│ └─────────────┘ └──────────────┘ └────────────────┘ └───────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ VEX SCHEMA │
│ ┌──────────┐ ┌─────────────────┐ ┌─────────────┐ ┌───────────┐ │
│ │ projects │───<│ graph_revisions │───<│ graph_nodes │───<│graph_edges│ │
│ └──────────┘ └─────────────────┘ └─────────────┘ └───────────┘ │
│ │ │ │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────────┐ ┌────────────────┐ ┌──────────────┐ │
│ │ statements │───<│ cvss_receipts │ │ observations │ │
│ └────────────┘ └────────────────┘ └──────────────┘ │
│ │ │
│ ┌──────────┐ ┌───────────┐ ┌───────────────┴─────┐ │
│ │ linksets │───<│ linkset_ │ │ consensus │ holds │ │
│ └──────────┘ │ events │ └─────────────────────┘ │
│ └───────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
Document Version: 1.0.0 Last Updated: 2025-11-28