Files
git.stella-ops.org/docs/db/analytics_schema.sql
2026-01-22 19:08:46 +02:00

1161 lines
43 KiB
PL/PgSQL

-- Stella Ops Analytics Schema (PostgreSQL)
-- System of record: PostgreSQL
-- Purpose: Star-schema analytics layer for SBOM and attestation data
-- Sprint: 20260120_030
-- Version: 1.0.0
BEGIN;
-- =============================================================================
-- EXTENSIONS
-- =============================================================================
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- =============================================================================
-- SCHEMA
-- =============================================================================
CREATE SCHEMA IF NOT EXISTS analytics;
COMMENT ON SCHEMA analytics IS 'Analytics star-schema for SBOM, attestation, and vulnerability data';
-- =============================================================================
-- VERSION TRACKING
-- =============================================================================
CREATE TABLE IF NOT EXISTS analytics.schema_version (
version TEXT PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT now(),
description TEXT
);
INSERT INTO analytics.schema_version (version, description)
VALUES ('1.0.0', 'Initial analytics schema - SBOM analytics lake')
ON CONFLICT DO NOTHING;
-- =============================================================================
-- ENUMS
-- =============================================================================
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'analytics_component_type') THEN
CREATE TYPE analytics_component_type AS ENUM (
'library',
'application',
'container',
'framework',
'operating-system',
'device',
'firmware',
'file'
);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'analytics_license_category') THEN
CREATE TYPE analytics_license_category AS ENUM (
'permissive',
'copyleft-weak',
'copyleft-strong',
'proprietary',
'unknown'
);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'analytics_severity') THEN
CREATE TYPE analytics_severity AS ENUM (
'critical',
'high',
'medium',
'low',
'none',
'unknown'
);
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'analytics_attestation_type') THEN
CREATE TYPE analytics_attestation_type AS ENUM (
'provenance',
'sbom',
'vex',
'build',
'scan',
'policy'
);
END IF;
END $$;
-- =============================================================================
-- NORMALIZATION FUNCTIONS
-- =============================================================================
-- Normalize supplier names for consistent grouping
CREATE OR REPLACE FUNCTION analytics.normalize_supplier(raw_supplier TEXT)
RETURNS TEXT AS $$
BEGIN
IF raw_supplier IS NULL OR raw_supplier = '' THEN
RETURN NULL;
END IF;
-- Lowercase, trim, remove common suffixes, normalize whitespace
RETURN LOWER(TRIM(
REGEXP_REPLACE(
REGEXP_REPLACE(raw_supplier, '\s+(Inc\.?|LLC|Ltd\.?|Corp\.?|GmbH|B\.V\.|S\.A\.|PLC|Co\.)$', '', 'i'),
'\s+', ' ', 'g'
)
));
END;
$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;
COMMENT ON FUNCTION analytics.normalize_supplier IS 'Normalize supplier names by removing legal suffixes and standardizing case/whitespace';
-- Categorize SPDX license expressions
CREATE OR REPLACE FUNCTION analytics.categorize_license(license_expr TEXT)
RETURNS analytics_license_category AS $$
BEGIN
IF license_expr IS NULL OR license_expr = '' THEN
RETURN 'unknown';
END IF;
-- Strong copyleft (without exceptions)
IF license_expr ~* '(^GPL-[23]|AGPL|OSL|SSPL|EUPL|RPL|QPL|Sleepycat)' AND
license_expr !~* 'WITH.*exception|WITH.*linking.*exception|WITH.*classpath.*exception' THEN
RETURN 'copyleft-strong';
END IF;
-- Weak copyleft
IF license_expr ~* '(LGPL|MPL|EPL|CPL|CDDL|Artistic|MS-RL|APSL|IPL|SPL)' THEN
RETURN 'copyleft-weak';
END IF;
-- Permissive licenses
IF license_expr ~* '(MIT|Apache|BSD|ISC|Zlib|Unlicense|CC0|WTFPL|0BSD|PostgreSQL|X11|Beerware|FTL|HPND|NTP|UPL)' THEN
RETURN 'permissive';
END IF;
-- Proprietary indicators
IF license_expr ~* '(proprietary|commercial|all.rights.reserved|see.license|custom|confidential)' THEN
RETURN 'proprietary';
END IF;
-- Check for GPL with exceptions (treat as weak copyleft)
IF license_expr ~* 'GPL.*WITH.*exception' THEN
RETURN 'copyleft-weak';
END IF;
RETURN 'unknown';
END;
$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;
COMMENT ON FUNCTION analytics.categorize_license IS 'Categorize SPDX license expressions into risk categories';
-- Extract PURL components
CREATE OR REPLACE FUNCTION analytics.parse_purl(purl TEXT)
RETURNS TABLE (purl_type TEXT, purl_namespace TEXT, purl_name TEXT, purl_version TEXT) AS $$
DECLARE
matches TEXT[];
BEGIN
-- Pattern: pkg:type/namespace/name@version or pkg:type/name@version
IF purl IS NULL OR purl = '' THEN
RETURN QUERY SELECT NULL::TEXT, NULL::TEXT, NULL::TEXT, NULL::TEXT;
RETURN;
END IF;
-- Extract type
purl_type := SUBSTRING(purl FROM 'pkg:([^/]+)/');
-- Extract version if present
purl_version := SUBSTRING(purl FROM '@([^?#]+)');
-- Remove version and qualifiers for name extraction
DECLARE
name_part TEXT := REGEXP_REPLACE(purl, '@[^?#]+', '');
BEGIN
name_part := REGEXP_REPLACE(name_part, '\?.*$', '');
name_part := REGEXP_REPLACE(name_part, '#.*$', '');
name_part := REGEXP_REPLACE(name_part, '^pkg:[^/]+/', '');
-- Check if there's a namespace
IF name_part ~ '/' THEN
purl_namespace := SUBSTRING(name_part FROM '^([^/]+)/');
purl_name := SUBSTRING(name_part FROM '/([^/]+)$');
ELSE
purl_namespace := NULL;
purl_name := name_part;
END IF;
END;
RETURN QUERY SELECT purl_type, purl_namespace, purl_name, purl_version;
END;
$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;
COMMENT ON FUNCTION analytics.parse_purl IS 'Parse Package URL into components (type, namespace, name, version)';
-- =============================================================================
-- CORE TABLES
-- =============================================================================
-- Unified component registry (dimension table)
CREATE TABLE IF NOT EXISTS analytics.components (
component_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- Canonical identifiers
purl TEXT NOT NULL, -- Package URL (canonical identifier)
purl_type TEXT NOT NULL, -- Extracted: maven, npm, pypi, golang, etc.
purl_namespace TEXT, -- Extracted: group/org/scope
purl_name TEXT NOT NULL, -- Extracted: package name
purl_version TEXT, -- Extracted: version
hash_sha256 TEXT, -- Content hash for deduplication
-- Display fields
name TEXT NOT NULL, -- Display name
version TEXT, -- Display version
description TEXT,
-- Classification
component_type analytics_component_type NOT NULL DEFAULT 'library',
-- Supplier/maintainer
supplier TEXT, -- Raw supplier name
supplier_normalized TEXT, -- Normalized for grouping
-- Licensing
license_declared TEXT, -- Raw license string from SBOM
license_concluded TEXT, -- Concluded SPDX expression
license_category analytics_license_category DEFAULT 'unknown',
-- Additional identifiers
cpe TEXT, -- CPE identifier if available
-- Usage metrics
first_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(),
sbom_count INT NOT NULL DEFAULT 1, -- Number of SBOMs containing this
artifact_count INT NOT NULL DEFAULT 1, -- Number of artifacts containing this
-- Audit
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (purl, hash_sha256)
);
CREATE INDEX IF NOT EXISTS ix_components_purl ON analytics.components (purl);
CREATE INDEX IF NOT EXISTS ix_components_purl_type ON analytics.components (purl_type);
CREATE INDEX IF NOT EXISTS ix_components_supplier ON analytics.components (supplier_normalized);
CREATE INDEX IF NOT EXISTS ix_components_license ON analytics.components (license_category, license_concluded);
CREATE INDEX IF NOT EXISTS ix_components_type ON analytics.components (component_type);
CREATE INDEX IF NOT EXISTS ix_components_hash ON analytics.components (hash_sha256) WHERE hash_sha256 IS NOT NULL;
CREATE INDEX IF NOT EXISTS ix_components_last_seen ON analytics.components (last_seen_at DESC);
COMMENT ON TABLE analytics.components IS 'Unified component registry - canonical source for all SBOM components';
-- Artifacts (container images, applications) - dimension table
CREATE TABLE IF NOT EXISTS analytics.artifacts (
artifact_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- Identity
artifact_type TEXT NOT NULL, -- container, application, library, firmware
name TEXT NOT NULL, -- Image/app name
version TEXT, -- Tag/version
digest TEXT, -- SHA256 digest
purl TEXT, -- Package URL if applicable
-- Source
source_repo TEXT, -- Git repo URL
source_ref TEXT, -- Git ref (branch/tag/commit)
registry TEXT, -- Container registry
-- Organization
environment TEXT, -- dev, stage, prod
team TEXT, -- Owning team
service TEXT, -- Service name
deployed_at TIMESTAMPTZ, -- Last deployment timestamp
-- SBOM metadata
sbom_digest TEXT, -- SHA256 of associated SBOM
sbom_format TEXT, -- cyclonedx, spdx
sbom_spec_version TEXT, -- 1.7, 3.0, etc.
-- Pre-computed counts
component_count INT DEFAULT 0, -- Number of components in SBOM
vulnerability_count INT DEFAULT 0, -- Total vulns (pre-VEX)
critical_count INT DEFAULT 0, -- Critical severity vulns
high_count INT DEFAULT 0, -- High severity vulns
medium_count INT DEFAULT 0, -- Medium severity vulns
low_count INT DEFAULT 0, -- Low severity vulns
-- Attestation status
provenance_attested BOOLEAN DEFAULT FALSE,
slsa_level INT, -- 0-4
-- Audit
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (digest)
);
CREATE INDEX IF NOT EXISTS ix_artifacts_name_version ON analytics.artifacts (name, version);
CREATE INDEX IF NOT EXISTS ix_artifacts_environment ON analytics.artifacts (environment);
CREATE INDEX IF NOT EXISTS ix_artifacts_environment_name ON analytics.artifacts (environment, name);
CREATE INDEX IF NOT EXISTS ix_artifacts_team ON analytics.artifacts (team);
CREATE INDEX IF NOT EXISTS ix_artifacts_deployed ON analytics.artifacts (deployed_at DESC);
CREATE INDEX IF NOT EXISTS ix_artifacts_digest ON analytics.artifacts (digest);
CREATE INDEX IF NOT EXISTS ix_artifacts_service ON analytics.artifacts (service);
COMMENT ON TABLE analytics.artifacts IS 'Container images and applications with SBOM and attestation metadata';
-- Artifact-component bridge (fact table)
CREATE TABLE IF NOT EXISTS analytics.artifact_components (
artifact_id UUID NOT NULL REFERENCES analytics.artifacts(artifact_id) ON DELETE CASCADE,
component_id UUID NOT NULL REFERENCES analytics.components(component_id) ON DELETE CASCADE,
-- SBOM reference
bom_ref TEXT, -- Original bom-ref for round-trips
-- Dependency metadata
scope TEXT, -- required, optional, excluded
dependency_path TEXT[], -- Path from root (for transitive deps)
depth INT DEFAULT 0, -- Dependency depth (0=direct)
introduced_via TEXT, -- Direct dependency that introduced this
-- Audit
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (artifact_id, component_id)
);
CREATE INDEX IF NOT EXISTS ix_artifact_components_component ON analytics.artifact_components (component_id);
CREATE INDEX IF NOT EXISTS ix_artifact_components_depth ON analytics.artifact_components (depth);
COMMENT ON TABLE analytics.artifact_components IS 'Bridge table linking artifacts to their SBOM components';
-- Component-vulnerability bridge (fact table)
CREATE TABLE IF NOT EXISTS analytics.component_vulns (
component_id UUID NOT NULL REFERENCES analytics.components(component_id) ON DELETE CASCADE,
vuln_id TEXT NOT NULL, -- CVE-YYYY-NNNNN or GHSA-xxxx-xxxx-xxxx
-- Source
source TEXT NOT NULL, -- nvd, ghsa, osv, vendor
-- Severity
severity analytics_severity NOT NULL,
cvss_score NUMERIC(3,1), -- 0.0-10.0
cvss_vector TEXT, -- CVSS vector string
-- Exploitability
epss_score NUMERIC(5,4), -- 0.0000-1.0000
kev_listed BOOLEAN DEFAULT FALSE, -- CISA KEV
-- Applicability
affects BOOLEAN NOT NULL DEFAULT TRUE, -- Does this vuln affect this component?
affected_versions TEXT, -- Version range expression
-- Remediation
fixed_version TEXT, -- First fixed version
fix_available BOOLEAN DEFAULT FALSE,
-- Provenance
introduced_via TEXT, -- How vulnerability was introduced
published_at TIMESTAMPTZ,
-- Audit
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (component_id, vuln_id)
);
CREATE INDEX IF NOT EXISTS ix_component_vulns_vuln ON analytics.component_vulns (vuln_id);
CREATE INDEX IF NOT EXISTS ix_component_vulns_severity ON analytics.component_vulns (severity, cvss_score DESC);
CREATE INDEX IF NOT EXISTS ix_component_vulns_fixable ON analytics.component_vulns (fix_available) WHERE fix_available = TRUE;
CREATE INDEX IF NOT EXISTS ix_component_vulns_kev ON analytics.component_vulns (kev_listed) WHERE kev_listed = TRUE;
CREATE INDEX IF NOT EXISTS ix_component_vulns_epss ON analytics.component_vulns (epss_score DESC) WHERE epss_score IS NOT NULL;
CREATE INDEX IF NOT EXISTS ix_component_vulns_published ON analytics.component_vulns (published_at DESC) WHERE published_at IS NOT NULL;
COMMENT ON TABLE analytics.component_vulns IS 'Component-to-vulnerability mapping with severity and remediation data';
-- Attestations analytics table
CREATE TABLE IF NOT EXISTS analytics.attestations (
attestation_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
artifact_id UUID REFERENCES analytics.artifacts(artifact_id) ON DELETE SET NULL,
-- Predicate
predicate_type analytics_attestation_type NOT NULL,
predicate_uri TEXT NOT NULL, -- Full predicate type URI
-- Issuer
issuer TEXT, -- Who signed
issuer_normalized TEXT, -- Normalized issuer
-- Build metadata
builder_id TEXT, -- Build system identifier
slsa_level INT, -- SLSA conformance level (0-4)
-- DSSE envelope
dsse_payload_hash TEXT NOT NULL, -- SHA256 of payload
dsse_sig_algorithm TEXT, -- Signature algorithm
-- Transparency log
rekor_log_id TEXT, -- Transparency log ID
rekor_log_index BIGINT, -- Log index
-- Timestamps
statement_time TIMESTAMPTZ, -- When statement was made
-- Verification
verified BOOLEAN DEFAULT FALSE, -- Signature verified
verification_time TIMESTAMPTZ,
-- Build provenance fields
materials_hash TEXT, -- Hash of build materials
source_uri TEXT, -- Source code URI
workflow_ref TEXT, -- CI workflow reference
-- Audit
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (dsse_payload_hash)
);
CREATE INDEX IF NOT EXISTS ix_attestations_artifact ON analytics.attestations (artifact_id);
CREATE INDEX IF NOT EXISTS ix_attestations_type ON analytics.attestations (predicate_type);
CREATE INDEX IF NOT EXISTS ix_attestations_artifact_type ON analytics.attestations (artifact_id, predicate_type);
CREATE INDEX IF NOT EXISTS ix_attestations_issuer ON analytics.attestations (issuer_normalized);
CREATE INDEX IF NOT EXISTS ix_attestations_rekor ON analytics.attestations (rekor_log_id) WHERE rekor_log_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS ix_attestations_slsa ON analytics.attestations (slsa_level) WHERE slsa_level IS NOT NULL;
COMMENT ON TABLE analytics.attestations IS 'Attestation metadata for analytics (provenance, SBOM, VEX attestations)';
-- VEX overrides (fact table)
CREATE TABLE IF NOT EXISTS analytics.vex_overrides (
override_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
attestation_id UUID REFERENCES analytics.attestations(attestation_id) ON DELETE SET NULL,
artifact_id UUID REFERENCES analytics.artifacts(artifact_id) ON DELETE CASCADE,
-- Vulnerability
vuln_id TEXT NOT NULL,
component_purl TEXT, -- Optional: specific component
-- Status
status TEXT NOT NULL, -- not_affected, affected, fixed, under_investigation
-- Justification
justification TEXT, -- Justification category (CycloneDX/OpenVEX)
justification_detail TEXT, -- Human-readable detail
impact TEXT, -- Impact statement
action_statement TEXT, -- Recommended action
-- Decision metadata
operator_id TEXT, -- Who made the decision
confidence NUMERIC(3,2), -- 0.00-1.00
-- Validity
valid_from TIMESTAMPTZ NOT NULL DEFAULT now(),
valid_until TIMESTAMPTZ, -- Expiration
-- Review tracking
last_reviewed TIMESTAMPTZ,
review_count INT DEFAULT 1,
-- Audit
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_vex_overrides_artifact_vuln ON analytics.vex_overrides (artifact_id, vuln_id);
CREATE INDEX IF NOT EXISTS ix_vex_overrides_vuln ON analytics.vex_overrides (vuln_id);
CREATE INDEX IF NOT EXISTS ix_vex_overrides_status ON analytics.vex_overrides (status);
CREATE INDEX IF NOT EXISTS ix_vex_overrides_active ON analytics.vex_overrides (artifact_id, vuln_id, valid_from, valid_until)
WHERE status = 'not_affected';
CREATE INDEX IF NOT EXISTS ix_vex_overrides_vuln_active ON analytics.vex_overrides (vuln_id, valid_from, valid_until)
WHERE status = 'not_affected';
COMMENT ON TABLE analytics.vex_overrides IS 'VEX status overrides with justifications and validity periods';
-- =============================================================================
-- RAW PAYLOAD AUDIT TABLES
-- =============================================================================
-- Raw SBOM storage for audit trail
CREATE TABLE IF NOT EXISTS analytics.raw_sboms (
sbom_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
artifact_id UUID REFERENCES analytics.artifacts(artifact_id) ON DELETE SET NULL,
-- Format
format TEXT NOT NULL, -- cyclonedx, spdx
spec_version TEXT NOT NULL, -- 1.7, 3.0.1, etc.
-- Content
content_hash TEXT NOT NULL UNIQUE, -- SHA256 of raw content
content_size BIGINT NOT NULL,
storage_uri TEXT NOT NULL, -- Object storage path
-- Pipeline metadata
ingest_version TEXT NOT NULL, -- Pipeline version
schema_version TEXT NOT NULL, -- Schema version at ingest
-- Audit
ingested_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_raw_sboms_artifact ON analytics.raw_sboms (artifact_id);
CREATE INDEX IF NOT EXISTS ix_raw_sboms_hash ON analytics.raw_sboms (content_hash);
COMMENT ON TABLE analytics.raw_sboms IS 'Raw SBOM payloads for audit trail and reprocessing';
-- Raw attestation storage
CREATE TABLE IF NOT EXISTS analytics.raw_attestations (
raw_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
attestation_id UUID REFERENCES analytics.attestations(attestation_id) ON DELETE SET NULL,
-- Content
content_hash TEXT NOT NULL UNIQUE,
content_size BIGINT NOT NULL,
storage_uri TEXT NOT NULL,
-- Pipeline metadata
ingest_version TEXT NOT NULL,
schema_version TEXT NOT NULL,
-- Audit
ingested_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_raw_attestations_attestation ON analytics.raw_attestations (attestation_id);
CREATE INDEX IF NOT EXISTS ix_raw_attestations_hash ON analytics.raw_attestations (content_hash);
COMMENT ON TABLE analytics.raw_attestations IS 'Raw attestation payloads (DSSE envelopes) for audit trail';
-- =============================================================================
-- TIME-SERIES ROLLUP TABLES
-- =============================================================================
-- Daily vulnerability counts
CREATE TABLE IF NOT EXISTS analytics.daily_vulnerability_counts (
snapshot_date DATE NOT NULL,
environment TEXT NOT NULL,
team TEXT,
severity analytics_severity NOT NULL,
-- Counts
total_vulns INT NOT NULL,
fixable_vulns INT NOT NULL,
vex_mitigated INT NOT NULL,
kev_vulns INT NOT NULL,
unique_cves INT NOT NULL,
affected_artifacts INT NOT NULL,
affected_components INT NOT NULL,
-- Audit
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (snapshot_date, environment, COALESCE(team, ''), severity)
);
CREATE INDEX IF NOT EXISTS ix_daily_vuln_counts_date ON analytics.daily_vulnerability_counts (snapshot_date DESC);
CREATE INDEX IF NOT EXISTS ix_daily_vuln_counts_env ON analytics.daily_vulnerability_counts (environment, snapshot_date DESC);
COMMENT ON TABLE analytics.daily_vulnerability_counts IS 'Daily vulnerability count rollups for trend analysis';
-- Daily component counts
CREATE TABLE IF NOT EXISTS analytics.daily_component_counts (
snapshot_date DATE NOT NULL,
environment TEXT NOT NULL,
team TEXT,
license_category analytics_license_category NOT NULL,
component_type analytics_component_type NOT NULL,
-- Counts
total_components INT NOT NULL,
unique_suppliers INT NOT NULL,
-- Audit
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (snapshot_date, environment, COALESCE(team, ''), license_category, component_type)
);
CREATE INDEX IF NOT EXISTS ix_daily_comp_counts_date ON analytics.daily_component_counts (snapshot_date DESC);
CREATE INDEX IF NOT EXISTS ix_daily_comp_counts_env ON analytics.daily_component_counts (environment, snapshot_date DESC);
COMMENT ON TABLE analytics.daily_component_counts IS 'Daily component count rollups by license and type';
-- =============================================================================
-- MATERIALIZED VIEWS
-- =============================================================================
-- Supplier concentration
CREATE MATERIALIZED VIEW IF NOT EXISTS analytics.mv_supplier_concentration AS
SELECT
c.supplier_normalized AS supplier,
COUNT(DISTINCT c.component_id) AS component_count,
COUNT(DISTINCT ac.artifact_id) AS artifact_count,
COUNT(DISTINCT a.team) AS team_count,
ARRAY_AGG(DISTINCT a.environment ORDER BY a.environment) FILTER (WHERE a.environment IS NOT NULL) AS environments,
SUM(CASE WHEN cv.severity = 'critical' THEN 1 ELSE 0 END) AS critical_vuln_count,
SUM(CASE WHEN cv.severity = 'high' THEN 1 ELSE 0 END) AS high_vuln_count,
MAX(c.last_seen_at) AS last_seen_at
FROM analytics.components c
LEFT JOIN analytics.artifact_components ac ON ac.component_id = c.component_id
LEFT JOIN analytics.artifacts a ON a.artifact_id = ac.artifact_id
LEFT JOIN analytics.component_vulns cv ON cv.component_id = c.component_id AND cv.affects = TRUE
WHERE c.supplier_normalized IS NOT NULL
GROUP BY c.supplier_normalized
WITH DATA;
CREATE UNIQUE INDEX IF NOT EXISTS ix_mv_supplier_concentration_supplier
ON analytics.mv_supplier_concentration (supplier);
CREATE INDEX IF NOT EXISTS ix_mv_supplier_concentration_component_count
ON analytics.mv_supplier_concentration (component_count DESC);
COMMENT ON MATERIALIZED VIEW analytics.mv_supplier_concentration IS 'Pre-computed supplier concentration metrics';
-- License distribution
CREATE MATERIALIZED VIEW IF NOT EXISTS analytics.mv_license_distribution AS
SELECT
c.license_concluded,
c.license_category,
COUNT(*) AS component_count,
COUNT(DISTINCT ac.artifact_id) AS artifact_count,
ARRAY_AGG(DISTINCT c.purl_type ORDER BY c.purl_type) FILTER (WHERE c.purl_type IS NOT NULL) AS ecosystems
FROM analytics.components c
LEFT JOIN analytics.artifact_components ac ON ac.component_id = c.component_id
GROUP BY c.license_concluded, c.license_category
WITH DATA;
CREATE UNIQUE INDEX IF NOT EXISTS ix_mv_license_distribution_license
ON analytics.mv_license_distribution (COALESCE(license_concluded, ''), license_category);
CREATE INDEX IF NOT EXISTS ix_mv_license_distribution_component_count
ON analytics.mv_license_distribution (component_count DESC);
COMMENT ON MATERIALIZED VIEW analytics.mv_license_distribution IS 'Pre-computed license distribution metrics';
-- Vulnerability exposure adjusted by VEX
CREATE MATERIALIZED VIEW IF NOT EXISTS analytics.mv_vuln_exposure AS
SELECT
cv.vuln_id,
cv.severity,
cv.cvss_score,
cv.epss_score,
cv.kev_listed,
cv.fix_available,
COUNT(DISTINCT cv.component_id) AS raw_component_count,
COUNT(DISTINCT ac.artifact_id) AS raw_artifact_count,
COUNT(DISTINCT cv.component_id) FILTER (
WHERE NOT EXISTS (
SELECT 1 FROM analytics.vex_overrides vo
WHERE vo.artifact_id = ac.artifact_id
AND vo.vuln_id = cv.vuln_id
AND vo.status = 'not_affected'
AND vo.valid_from <= now()
AND (vo.valid_until IS NULL OR vo.valid_until > now())
)
) AS effective_component_count,
COUNT(DISTINCT ac.artifact_id) FILTER (
WHERE NOT EXISTS (
SELECT 1 FROM analytics.vex_overrides vo
WHERE vo.artifact_id = ac.artifact_id
AND vo.vuln_id = cv.vuln_id
AND vo.status = 'not_affected'
AND vo.valid_from <= now()
AND (vo.valid_until IS NULL OR vo.valid_until > now())
)
) AS effective_artifact_count
FROM analytics.component_vulns cv
JOIN analytics.artifact_components ac ON ac.component_id = cv.component_id
WHERE cv.affects = TRUE
GROUP BY cv.vuln_id, cv.severity, cv.cvss_score, cv.epss_score, cv.kev_listed, cv.fix_available
WITH DATA;
CREATE UNIQUE INDEX IF NOT EXISTS ix_mv_vuln_exposure_key
ON analytics.mv_vuln_exposure (vuln_id, severity, cvss_score, epss_score, kev_listed, fix_available);
CREATE INDEX IF NOT EXISTS ix_mv_vuln_exposure_severity_count
ON analytics.mv_vuln_exposure (severity, effective_artifact_count DESC);
COMMENT ON MATERIALIZED VIEW analytics.mv_vuln_exposure IS 'CVE exposure with VEX-adjusted impact counts';
-- Attestation coverage by environment/team
CREATE MATERIALIZED VIEW IF NOT EXISTS analytics.mv_attestation_coverage AS
SELECT
a.environment,
a.team,
COUNT(*) AS total_artifacts,
COUNT(*) FILTER (WHERE a.provenance_attested = TRUE) AS with_provenance,
COUNT(*) FILTER (WHERE EXISTS (
SELECT 1 FROM analytics.attestations att
WHERE att.artifact_id = a.artifact_id AND att.predicate_type = 'sbom'
)) AS with_sbom_attestation,
COUNT(*) FILTER (WHERE EXISTS (
SELECT 1 FROM analytics.attestations att
WHERE att.artifact_id = a.artifact_id AND att.predicate_type = 'vex'
)) AS with_vex_attestation,
COUNT(*) FILTER (WHERE a.slsa_level >= 2) AS slsa_level_2_plus,
COUNT(*) FILTER (WHERE a.slsa_level >= 3) AS slsa_level_3_plus,
ROUND(100.0 * COUNT(*) FILTER (WHERE a.provenance_attested = TRUE) / NULLIF(COUNT(*), 0), 1) AS provenance_pct,
ROUND(100.0 * COUNT(*) FILTER (WHERE a.slsa_level >= 2) / NULLIF(COUNT(*), 0), 1) AS slsa2_pct
FROM analytics.artifacts a
GROUP BY a.environment, a.team
WITH DATA;
CREATE UNIQUE INDEX IF NOT EXISTS ix_mv_attestation_coverage_env_team
ON analytics.mv_attestation_coverage (COALESCE(environment, ''), COALESCE(team, ''));
CREATE INDEX IF NOT EXISTS ix_mv_attestation_coverage_provenance
ON analytics.mv_attestation_coverage (provenance_pct ASC);
COMMENT ON MATERIALIZED VIEW analytics.mv_attestation_coverage IS 'Attestation coverage percentages by environment and team';
-- =============================================================================
-- STORED PROCEDURES FOR DAY-1 QUERIES
-- =============================================================================
-- Top suppliers by component count
CREATE OR REPLACE FUNCTION analytics.sp_top_suppliers(
p_limit INT DEFAULT 20,
p_environment TEXT DEFAULT NULL
)
RETURNS JSON AS $$
DECLARE
env TEXT;
BEGIN
env := NULLIF(BTRIM(p_environment), '');
IF env IS NULL THEN
RETURN (
SELECT json_agg(row_to_json(t))
FROM (
SELECT
supplier,
component_count,
artifact_count,
team_count,
critical_vuln_count,
high_vuln_count,
environments
FROM analytics.mv_supplier_concentration
ORDER BY component_count DESC, supplier ASC
LIMIT p_limit
) t
);
END IF;
RETURN (
SELECT json_agg(row_to_json(t))
FROM (
SELECT
c.supplier_normalized AS supplier,
COUNT(DISTINCT c.component_id) AS component_count,
COUNT(DISTINCT ac.artifact_id) AS artifact_count,
COUNT(DISTINCT a.team) AS team_count,
ARRAY_AGG(DISTINCT a.environment ORDER BY a.environment) FILTER (WHERE a.environment IS NOT NULL) AS environments,
SUM(CASE WHEN cv.severity = 'critical' THEN 1 ELSE 0 END) AS critical_vuln_count,
SUM(CASE WHEN cv.severity = 'high' THEN 1 ELSE 0 END) AS high_vuln_count
FROM analytics.components c
JOIN analytics.artifact_components ac ON ac.component_id = c.component_id
JOIN analytics.artifacts a ON a.artifact_id = ac.artifact_id
LEFT JOIN analytics.component_vulns cv ON cv.component_id = c.component_id AND cv.affects = TRUE
WHERE c.supplier_normalized IS NOT NULL
AND a.environment = env
GROUP BY c.supplier_normalized
ORDER BY component_count DESC, supplier ASC
LIMIT p_limit
) t
);
END;
$$ LANGUAGE plpgsql STABLE;
COMMENT ON FUNCTION analytics.sp_top_suppliers IS 'Get top suppliers by component count for supply chain risk analysis';
-- License distribution heatmap
CREATE OR REPLACE FUNCTION analytics.sp_license_heatmap(p_environment TEXT DEFAULT NULL)
RETURNS JSON AS $$
DECLARE
env TEXT;
BEGIN
env := NULLIF(BTRIM(p_environment), '');
IF env IS NULL THEN
RETURN (
SELECT json_agg(row_to_json(t))
FROM (
SELECT
license_category,
license_concluded,
component_count,
artifact_count,
ecosystems
FROM analytics.mv_license_distribution
ORDER BY component_count DESC, license_category, COALESCE(license_concluded, '')
) t
);
END IF;
RETURN (
SELECT json_agg(row_to_json(t))
FROM (
SELECT
c.license_category,
c.license_concluded,
COUNT(*) AS component_count,
COUNT(DISTINCT ac.artifact_id) AS artifact_count,
ARRAY_AGG(DISTINCT c.purl_type ORDER BY c.purl_type) FILTER (WHERE c.purl_type IS NOT NULL) AS ecosystems
FROM analytics.components c
JOIN analytics.artifact_components ac ON ac.component_id = c.component_id
JOIN analytics.artifacts a ON a.artifact_id = ac.artifact_id
WHERE a.environment = env
GROUP BY c.license_concluded, c.license_category
ORDER BY component_count DESC, license_category, COALESCE(c.license_concluded, '')
) t
);
END;
$$ LANGUAGE plpgsql STABLE;
COMMENT ON FUNCTION analytics.sp_license_heatmap IS 'Get license distribution for compliance heatmap';
-- CVE exposure adjusted by VEX
CREATE OR REPLACE FUNCTION analytics.sp_vuln_exposure(
p_environment TEXT DEFAULT NULL,
p_min_severity TEXT DEFAULT 'low'
)
RETURNS JSON AS $$
DECLARE
min_rank INT;
env TEXT;
BEGIN
env := NULLIF(BTRIM(p_environment), '');
min_rank := CASE LOWER(COALESCE(NULLIF(p_min_severity, ''), 'low'))
WHEN 'critical' THEN 1
WHEN 'high' THEN 2
WHEN 'medium' THEN 3
WHEN 'low' THEN 4
WHEN 'none' THEN 5
ELSE 6
END;
IF env IS NULL THEN
RETURN (
SELECT json_agg(row_to_json(t))
FROM (
SELECT
vuln_id,
severity::TEXT,
cvss_score,
epss_score,
kev_listed,
fix_available,
raw_component_count,
raw_artifact_count,
effective_component_count,
effective_artifact_count,
raw_artifact_count - effective_artifact_count AS vex_mitigated
FROM analytics.mv_vuln_exposure
WHERE effective_artifact_count > 0
AND CASE severity
WHEN 'critical' THEN 1
WHEN 'high' THEN 2
WHEN 'medium' THEN 3
WHEN 'low' THEN 4
WHEN 'none' THEN 5
ELSE 6
END <= min_rank
ORDER BY
CASE severity
WHEN 'critical' THEN 1
WHEN 'high' THEN 2
WHEN 'medium' THEN 3
WHEN 'low' THEN 4
WHEN 'none' THEN 5
ELSE 6
END,
effective_artifact_count DESC,
vuln_id
LIMIT 50
) t
);
END IF;
RETURN (
SELECT json_agg(row_to_json(t))
FROM (
SELECT
vuln_id,
severity::TEXT,
cvss_score,
epss_score,
kev_listed,
fix_available,
raw_component_count,
raw_artifact_count,
effective_component_count,
effective_artifact_count,
raw_artifact_count - effective_artifact_count AS vex_mitigated
FROM (
SELECT
cv.vuln_id,
cv.severity,
cv.cvss_score,
cv.epss_score,
cv.kev_listed,
cv.fix_available,
COUNT(DISTINCT cv.component_id) AS raw_component_count,
COUNT(DISTINCT ac.artifact_id) AS raw_artifact_count,
COUNT(DISTINCT cv.component_id) FILTER (
WHERE NOT EXISTS (
SELECT 1 FROM analytics.vex_overrides vo
WHERE vo.artifact_id = ac.artifact_id
AND vo.vuln_id = cv.vuln_id
AND vo.status = 'not_affected'
AND vo.valid_from <= now()
AND (vo.valid_until IS NULL OR vo.valid_until > now())
)
) AS effective_component_count,
COUNT(DISTINCT ac.artifact_id) FILTER (
WHERE NOT EXISTS (
SELECT 1 FROM analytics.vex_overrides vo
WHERE vo.artifact_id = ac.artifact_id
AND vo.vuln_id = cv.vuln_id
AND vo.status = 'not_affected'
AND vo.valid_from <= now()
AND (vo.valid_until IS NULL OR vo.valid_until > now())
)
) AS effective_artifact_count
FROM analytics.component_vulns cv
JOIN analytics.artifact_components ac ON ac.component_id = cv.component_id
JOIN analytics.artifacts a ON a.artifact_id = ac.artifact_id
WHERE cv.affects = TRUE
AND a.environment = env
GROUP BY cv.vuln_id, cv.severity, cv.cvss_score, cv.epss_score, cv.kev_listed, cv.fix_available
) exposure
WHERE effective_artifact_count > 0
AND CASE severity
WHEN 'critical' THEN 1
WHEN 'high' THEN 2
WHEN 'medium' THEN 3
WHEN 'low' THEN 4
WHEN 'none' THEN 5
ELSE 6
END <= min_rank
ORDER BY
CASE severity
WHEN 'critical' THEN 1
WHEN 'high' THEN 2
WHEN 'medium' THEN 3
WHEN 'low' THEN 4
WHEN 'none' THEN 5
ELSE 6
END,
effective_artifact_count DESC,
vuln_id
LIMIT 50
) t
);
END;
$$ LANGUAGE plpgsql STABLE;
COMMENT ON FUNCTION analytics.sp_vuln_exposure IS
'Get CVE exposure with VEX-adjusted counts, optional environment filter, and severity threshold';
-- Fixable backlog
CREATE OR REPLACE FUNCTION analytics.sp_fixable_backlog(p_environment TEXT DEFAULT NULL)
RETURNS JSON AS $$
DECLARE
env TEXT;
BEGIN
env := NULLIF(BTRIM(p_environment), '');
RETURN (
SELECT json_agg(row_to_json(t))
FROM (
SELECT
a.name AS service,
a.environment,
c.name AS component,
c.version,
cv.vuln_id,
cv.severity::TEXT,
cv.fixed_version
FROM analytics.component_vulns cv
JOIN analytics.components c ON c.component_id = cv.component_id
JOIN analytics.artifact_components ac ON ac.component_id = c.component_id
JOIN analytics.artifacts a ON a.artifact_id = ac.artifact_id
LEFT JOIN analytics.vex_overrides vo ON vo.artifact_id = a.artifact_id
AND vo.vuln_id = cv.vuln_id
AND vo.status = 'not_affected'
AND vo.valid_from <= now()
AND (vo.valid_until IS NULL OR vo.valid_until > now())
WHERE cv.affects = TRUE
AND cv.fix_available = TRUE
AND vo.override_id IS NULL
AND (env IS NULL OR a.environment = env)
ORDER BY
CASE cv.severity
WHEN 'critical' THEN 1
WHEN 'high' THEN 2
ELSE 3
END,
a.name,
c.name,
c.version,
cv.vuln_id
LIMIT 100
) t
);
END;
$$ LANGUAGE plpgsql STABLE;
COMMENT ON FUNCTION analytics.sp_fixable_backlog IS 'Get vulnerabilities with available fixes that are not VEX-mitigated';
-- Attestation coverage gaps
CREATE OR REPLACE FUNCTION analytics.sp_attestation_gaps(p_environment TEXT DEFAULT NULL)
RETURNS JSON AS $$
DECLARE
env TEXT;
BEGIN
env := NULLIF(BTRIM(p_environment), '');
RETURN (
SELECT json_agg(row_to_json(t))
FROM (
SELECT
environment,
team,
total_artifacts,
with_provenance,
provenance_pct,
slsa_level_2_plus,
slsa2_pct,
total_artifacts - with_provenance AS missing_provenance
FROM analytics.mv_attestation_coverage
WHERE (env IS NULL OR environment = env)
ORDER BY provenance_pct ASC, COALESCE(environment, ''), COALESCE(team, '')
) t
);
END;
$$ LANGUAGE plpgsql STABLE;
COMMENT ON FUNCTION analytics.sp_attestation_gaps IS 'Get attestation coverage gaps by environment/team';
-- MTTR by severity (simplified - requires proper remediation tracking)
CREATE OR REPLACE FUNCTION analytics.sp_mttr_by_severity(p_days INT DEFAULT 90)
RETURNS JSON AS $$
BEGIN
RETURN (
SELECT json_agg(row_to_json(t))
FROM (
SELECT
severity::TEXT,
COUNT(*) AS total_vulns,
AVG(EXTRACT(EPOCH FROM (vo.valid_from - cv.published_at)) / 86400)::NUMERIC(10,2) AS avg_days_to_mitigate
FROM analytics.component_vulns cv
JOIN analytics.vex_overrides vo ON vo.vuln_id = cv.vuln_id
AND vo.status = 'not_affected'
AND vo.valid_from <= now()
AND (vo.valid_until IS NULL OR vo.valid_until > now())
WHERE cv.published_at >= now() - (p_days || ' days')::INTERVAL
AND cv.published_at IS NOT NULL
GROUP BY severity
ORDER BY
CASE severity
WHEN 'critical' THEN 1
WHEN 'high' THEN 2
WHEN 'medium' THEN 3
ELSE 4
END,
severity::TEXT
) t
);
END;
$$ LANGUAGE plpgsql STABLE;
COMMENT ON FUNCTION analytics.sp_mttr_by_severity IS 'Get mean time to remediate by severity (last N days)';
-- =============================================================================
-- REFRESH PROCEDURES
-- =============================================================================
-- Refresh all materialized views
CREATE OR REPLACE FUNCTION analytics.refresh_all_views()
RETURNS VOID AS $$
BEGIN
REFRESH MATERIALIZED VIEW analytics.mv_supplier_concentration;
REFRESH MATERIALIZED VIEW analytics.mv_license_distribution;
REFRESH MATERIALIZED VIEW analytics.mv_vuln_exposure;
REFRESH MATERIALIZED VIEW analytics.mv_attestation_coverage;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION analytics.refresh_all_views IS 'Refresh all analytics materialized views (non-concurrent; run off-peak or use PlatformAnalyticsMaintenanceService for concurrent refresh)';
-- Daily rollup procedure
CREATE OR REPLACE FUNCTION analytics.compute_daily_rollups(p_date DATE DEFAULT CURRENT_DATE)
RETURNS VOID AS $$
BEGIN
-- Vulnerability counts
INSERT INTO analytics.daily_vulnerability_counts (
snapshot_date, environment, team, severity,
total_vulns, fixable_vulns, vex_mitigated, kev_vulns,
unique_cves, affected_artifacts, affected_components
)
SELECT
p_date,
a.environment,
a.team,
cv.severity,
COUNT(*) AS total_vulns,
COUNT(*) FILTER (WHERE cv.fix_available = TRUE) AS fixable_vulns,
COUNT(*) FILTER (WHERE EXISTS (
SELECT 1 FROM analytics.vex_overrides vo
WHERE vo.artifact_id = a.artifact_id
AND vo.vuln_id = cv.vuln_id
AND vo.status = 'not_affected'
AND vo.valid_from::DATE <= p_date
AND (vo.valid_until IS NULL OR vo.valid_until::DATE >= p_date)
)) AS vex_mitigated,
COUNT(*) FILTER (WHERE cv.kev_listed = TRUE) AS kev_vulns,
COUNT(DISTINCT cv.vuln_id) AS unique_cves,
COUNT(DISTINCT a.artifact_id) AS affected_artifacts,
COUNT(DISTINCT cv.component_id) AS affected_components
FROM analytics.artifacts a
JOIN analytics.artifact_components ac ON ac.artifact_id = a.artifact_id
JOIN analytics.component_vulns cv ON cv.component_id = ac.component_id AND cv.affects = TRUE
GROUP BY a.environment, a.team, cv.severity
ON CONFLICT (snapshot_date, environment, COALESCE(team, ''), severity)
DO UPDATE SET
total_vulns = EXCLUDED.total_vulns,
fixable_vulns = EXCLUDED.fixable_vulns,
vex_mitigated = EXCLUDED.vex_mitigated,
kev_vulns = EXCLUDED.kev_vulns,
unique_cves = EXCLUDED.unique_cves,
affected_artifacts = EXCLUDED.affected_artifacts,
affected_components = EXCLUDED.affected_components,
created_at = now();
-- Component counts
INSERT INTO analytics.daily_component_counts (
snapshot_date, environment, team, license_category, component_type,
total_components, unique_suppliers
)
SELECT
p_date,
a.environment,
a.team,
c.license_category,
c.component_type,
COUNT(DISTINCT c.component_id) AS total_components,
COUNT(DISTINCT c.supplier_normalized) AS unique_suppliers
FROM analytics.artifacts a
JOIN analytics.artifact_components ac ON ac.artifact_id = a.artifact_id
JOIN analytics.components c ON c.component_id = ac.component_id
GROUP BY a.environment, a.team, c.license_category, c.component_type
ON CONFLICT (snapshot_date, environment, COALESCE(team, ''), license_category, component_type)
DO UPDATE SET
total_components = EXCLUDED.total_components,
unique_suppliers = EXCLUDED.unique_suppliers,
created_at = now();
DELETE FROM analytics.daily_vulnerability_counts
WHERE snapshot_date < (p_date - INTERVAL '90 days');
DELETE FROM analytics.daily_component_counts
WHERE snapshot_date < (p_date - INTERVAL '90 days');
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION analytics.compute_daily_rollups IS 'Compute daily vulnerability and component rollups for trend analysis';
COMMIT;