Files
git.stella-ops.org/devops/compose/postgres-init/03-scheduler-tables.sql
2026-02-20 23:32:20 +02:00

691 lines
27 KiB
PL/PgSQL

-- Scheduler: Consolidated init from migrations 001-003
-- Auto-generated for docker-compose postgres-init
-- Creates all tables required by stellaops-scheduler-worker
-- ============================================================================
-- 001_initial_schema.sql - Complete scheduler schema
-- ============================================================================
CREATE SCHEMA IF NOT EXISTS scheduler;
CREATE SCHEMA IF NOT EXISTS scheduler_app;
-- Enum types
DO $$ BEGIN
CREATE TYPE scheduler.job_status AS ENUM (
'pending', 'scheduled', 'leased', 'running',
'succeeded', 'failed', 'canceled', 'timed_out'
);
EXCEPTION WHEN duplicate_object THEN null; END $$;
DO $$ BEGIN
CREATE TYPE scheduler.graph_job_type AS ENUM ('build', 'overlay');
EXCEPTION WHEN duplicate_object THEN NULL; END $$;
DO $$ BEGIN
CREATE TYPE scheduler.graph_job_status AS ENUM ('pending', 'queued', 'running', 'completed', 'failed', 'canceled');
EXCEPTION WHEN duplicate_object THEN NULL; END $$;
DO $$ BEGIN
CREATE TYPE scheduler.run_state AS ENUM ('planning','queued','running','completed','error','cancelled');
EXCEPTION WHEN duplicate_object THEN NULL; END $$;
DO $$ BEGIN
CREATE TYPE scheduler.policy_run_status AS ENUM ('pending','submitted','retrying','failed','completed','cancelled');
EXCEPTION WHEN duplicate_object THEN NULL; END $$;
-- Helper functions
CREATE OR REPLACE FUNCTION scheduler.update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION scheduler_app.require_current_tenant()
RETURNS TEXT
LANGUAGE plpgsql STABLE SECURITY DEFINER
AS $$
DECLARE
v_tenant TEXT;
BEGIN
v_tenant := current_setting('app.tenant_id', true);
IF v_tenant IS NULL OR v_tenant = '' THEN
RAISE EXCEPTION 'app.tenant_id session variable not set'
USING HINT = 'Set via: SELECT set_config(''app.tenant_id'', ''<tenant>'', false)',
ERRCODE = 'P0001';
END IF;
RETURN v_tenant;
END;
$$;
REVOKE ALL ON FUNCTION scheduler_app.require_current_tenant() FROM PUBLIC;
-- Core tables: jobs
CREATE TABLE IF NOT EXISTS scheduler.jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id TEXT NOT NULL,
project_id TEXT,
job_type TEXT NOT NULL,
status scheduler.job_status NOT NULL DEFAULT 'pending',
priority INT NOT NULL DEFAULT 0,
payload JSONB NOT NULL DEFAULT '{}',
payload_digest TEXT NOT NULL,
idempotency_key TEXT NOT NULL,
correlation_id TEXT,
attempt INT NOT NULL DEFAULT 0,
max_attempts INT NOT NULL DEFAULT 3,
lease_id UUID,
worker_id TEXT,
lease_until TIMESTAMPTZ,
not_before TIMESTAMPTZ,
reason TEXT,
result JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
scheduled_at TIMESTAMPTZ,
leased_at TIMESTAMPTZ,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_by TEXT,
UNIQUE(tenant_id, idempotency_key)
);
CREATE INDEX IF NOT EXISTS idx_jobs_tenant_status ON scheduler.jobs(tenant_id, status);
CREATE INDEX IF NOT EXISTS idx_jobs_tenant_type ON scheduler.jobs(tenant_id, job_type);
CREATE INDEX IF NOT EXISTS idx_jobs_scheduled ON scheduler.jobs(tenant_id, status, not_before, priority DESC, created_at)
WHERE status = 'scheduled';
CREATE INDEX IF NOT EXISTS idx_jobs_leased ON scheduler.jobs(tenant_id, status, lease_until)
WHERE status = 'leased';
CREATE INDEX IF NOT EXISTS idx_jobs_project ON scheduler.jobs(tenant_id, project_id);
CREATE INDEX IF NOT EXISTS idx_jobs_correlation ON scheduler.jobs(correlation_id);
-- Triggers table (cron-based job triggers)
CREATE TABLE IF NOT EXISTS scheduler.triggers (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id TEXT NOT NULL,
name TEXT NOT NULL,
description TEXT,
job_type TEXT NOT NULL,
job_payload JSONB NOT NULL DEFAULT '{}',
cron_expression TEXT NOT NULL,
timezone TEXT NOT NULL DEFAULT 'UTC',
enabled BOOLEAN NOT NULL DEFAULT TRUE,
next_fire_at TIMESTAMPTZ,
last_fire_at TIMESTAMPTZ,
last_job_id UUID REFERENCES scheduler.jobs(id),
fire_count BIGINT NOT NULL DEFAULT 0,
misfire_count INT NOT NULL DEFAULT 0,
metadata JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by TEXT,
UNIQUE(tenant_id, name)
);
CREATE INDEX IF NOT EXISTS idx_triggers_tenant_id ON scheduler.triggers(tenant_id);
CREATE INDEX IF NOT EXISTS idx_triggers_next_fire ON scheduler.triggers(enabled, next_fire_at) WHERE enabled = TRUE;
CREATE INDEX IF NOT EXISTS idx_triggers_job_type ON scheduler.triggers(tenant_id, job_type);
CREATE TRIGGER trg_triggers_updated_at
BEFORE UPDATE ON scheduler.triggers
FOR EACH ROW EXECUTE FUNCTION scheduler.update_updated_at();
-- Workers table (global, NOT RLS-protected)
CREATE TABLE IF NOT EXISTS scheduler.workers (
id TEXT PRIMARY KEY,
tenant_id TEXT,
hostname TEXT NOT NULL,
process_id INT,
job_types TEXT[] NOT NULL DEFAULT '{}',
max_concurrent_jobs INT NOT NULL DEFAULT 1,
current_jobs INT NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'active' CHECK (status IN ('active', 'draining', 'stopped')),
last_heartbeat_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
registered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
metadata JSONB NOT NULL DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_workers_status ON scheduler.workers(status);
CREATE INDEX IF NOT EXISTS idx_workers_heartbeat ON scheduler.workers(last_heartbeat_at);
CREATE INDEX IF NOT EXISTS idx_workers_tenant ON scheduler.workers(tenant_id);
COMMENT ON TABLE scheduler.workers IS 'Global worker registry. Not RLS-protected - workers serve all tenants.';
-- Distributed locks
CREATE TABLE IF NOT EXISTS scheduler.locks (
lock_key TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL,
holder_id TEXT NOT NULL,
acquired_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_locks_tenant ON scheduler.locks(tenant_id);
CREATE INDEX IF NOT EXISTS idx_locks_expires ON scheduler.locks(expires_at);
-- Job history
CREATE TABLE IF NOT EXISTS scheduler.job_history (
id BIGSERIAL PRIMARY KEY,
job_id UUID NOT NULL,
tenant_id TEXT NOT NULL,
project_id TEXT,
job_type TEXT NOT NULL,
status scheduler.job_status NOT NULL,
attempt INT NOT NULL,
payload_digest TEXT NOT NULL,
result JSONB,
reason TEXT,
worker_id TEXT,
duration_ms BIGINT,
created_at TIMESTAMPTZ NOT NULL,
completed_at TIMESTAMPTZ NOT NULL,
archived_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_job_history_tenant ON scheduler.job_history(tenant_id);
CREATE INDEX IF NOT EXISTS idx_job_history_job_id ON scheduler.job_history(job_id);
CREATE INDEX IF NOT EXISTS idx_job_history_type ON scheduler.job_history(tenant_id, job_type);
CREATE INDEX IF NOT EXISTS idx_job_history_completed ON scheduler.job_history(tenant_id, completed_at);
-- Metrics table
CREATE TABLE IF NOT EXISTS scheduler.metrics (
id BIGSERIAL PRIMARY KEY,
tenant_id TEXT NOT NULL,
job_type TEXT NOT NULL,
period_start TIMESTAMPTZ NOT NULL,
period_end TIMESTAMPTZ NOT NULL,
jobs_created BIGINT NOT NULL DEFAULT 0,
jobs_completed BIGINT NOT NULL DEFAULT 0,
jobs_failed BIGINT NOT NULL DEFAULT 0,
jobs_timed_out BIGINT NOT NULL DEFAULT 0,
avg_duration_ms BIGINT,
p50_duration_ms BIGINT,
p95_duration_ms BIGINT,
p99_duration_ms BIGINT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(tenant_id, job_type, period_start)
);
CREATE INDEX IF NOT EXISTS idx_metrics_tenant_period ON scheduler.metrics(tenant_id, period_start);
-- Schedules and runs
CREATE TABLE IF NOT EXISTS scheduler.schedules (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL,
name TEXT NOT NULL,
description TEXT,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
cron_expression TEXT,
timezone TEXT NOT NULL DEFAULT 'UTC',
mode TEXT NOT NULL CHECK (mode IN ('analysisonly', 'contentrefresh')),
selection JSONB NOT NULL DEFAULT '{}',
only_if JSONB NOT NULL DEFAULT '{}',
notify JSONB NOT NULL DEFAULT '{}',
limits JSONB NOT NULL DEFAULT '{}',
subscribers TEXT[] NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by TEXT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_by TEXT NOT NULL,
deleted_at TIMESTAMPTZ,
deleted_by TEXT
);
CREATE INDEX IF NOT EXISTS idx_schedules_tenant ON scheduler.schedules(tenant_id) WHERE deleted_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_schedules_enabled ON scheduler.schedules(tenant_id, enabled) WHERE deleted_at IS NULL;
CREATE UNIQUE INDEX IF NOT EXISTS uq_schedules_tenant_name_active ON scheduler.schedules(tenant_id, name) WHERE deleted_at IS NULL;
-- Runs table
CREATE TABLE IF NOT EXISTS scheduler.runs (
id TEXT NOT NULL,
tenant_id TEXT NOT NULL,
schedule_id TEXT,
trigger JSONB NOT NULL,
state scheduler.run_state NOT NULL,
stats JSONB NOT NULL,
reason JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
error TEXT,
deltas JSONB NOT NULL,
retry_of TEXT,
schema_version TEXT,
finding_count INT GENERATED ALWAYS AS (NULLIF((stats->>'findingCount'), '')::int) STORED,
critical_count INT GENERATED ALWAYS AS (NULLIF((stats->>'criticalCount'), '')::int) STORED,
high_count INT GENERATED ALWAYS AS (NULLIF((stats->>'highCount'), '')::int) STORED,
new_finding_count INT GENERATED ALWAYS AS (NULLIF((stats->>'newFindingCount'), '')::int) STORED,
component_count INT GENERATED ALWAYS AS (NULLIF((stats->>'componentCount'), '')::int) STORED,
PRIMARY KEY (tenant_id, id)
);
CREATE INDEX IF NOT EXISTS idx_runs_state ON scheduler.runs(state);
CREATE INDEX IF NOT EXISTS idx_runs_schedule ON scheduler.runs(tenant_id, schedule_id);
CREATE INDEX IF NOT EXISTS idx_runs_created ON scheduler.runs(created_at);
CREATE INDEX IF NOT EXISTS ix_runs_with_findings ON scheduler.runs(tenant_id, created_at DESC) WHERE finding_count > 0;
CREATE INDEX IF NOT EXISTS ix_runs_critical ON scheduler.runs(tenant_id, created_at DESC, critical_count) WHERE critical_count > 0;
CREATE INDEX IF NOT EXISTS ix_runs_summary_cover ON scheduler.runs(tenant_id, state, created_at DESC) INCLUDE (finding_count, critical_count, high_count, new_finding_count);
CREATE INDEX IF NOT EXISTS ix_runs_tenant_findings ON scheduler.runs(tenant_id, finding_count DESC, created_at DESC) WHERE state = 'completed';
-- Impact snapshots
CREATE TABLE IF NOT EXISTS scheduler.impact_snapshots (
snapshot_id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL,
run_id TEXT,
impact JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_impact_snapshots_run ON scheduler.impact_snapshots(run_id);
-- Run summaries
CREATE TABLE IF NOT EXISTS scheduler.run_summaries (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL,
schedule_id TEXT REFERENCES scheduler.schedules(id),
period_start TIMESTAMPTZ NOT NULL,
period_end TIMESTAMPTZ NOT NULL,
total_runs INT NOT NULL DEFAULT 0,
successful_runs INT NOT NULL DEFAULT 0,
failed_runs INT NOT NULL DEFAULT 0,
cancelled_runs INT NOT NULL DEFAULT 0,
avg_duration_seconds NUMERIC(10,2),
max_duration_seconds INT,
min_duration_seconds INT,
total_findings_detected INT NOT NULL DEFAULT 0,
new_criticals INT NOT NULL DEFAULT 0,
computed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (tenant_id, schedule_id, period_start)
);
CREATE INDEX IF NOT EXISTS idx_run_summaries_tenant ON scheduler.run_summaries(tenant_id, period_start DESC);
-- Execution logs
CREATE TABLE IF NOT EXISTS scheduler.execution_logs (
id BIGSERIAL PRIMARY KEY,
run_id TEXT NOT NULL,
logged_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
level TEXT NOT NULL,
message TEXT NOT NULL,
logger TEXT,
data JSONB NOT NULL DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_execution_logs_run ON scheduler.execution_logs(run_id);
-- Graph jobs (v2 schema)
CREATE TABLE IF NOT EXISTS scheduler.graph_jobs (
id UUID PRIMARY KEY,
tenant_id TEXT NOT NULL,
type scheduler.graph_job_type NOT NULL,
status scheduler.graph_job_status NOT NULL,
payload JSONB NOT NULL,
correlation_id TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_graph_jobs_tenant_status ON scheduler.graph_jobs(tenant_id, status, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_graph_jobs_tenant_type_status ON scheduler.graph_jobs(tenant_id, type, status, created_at DESC);
CREATE TABLE IF NOT EXISTS scheduler.graph_job_events (
id BIGSERIAL PRIMARY KEY,
job_id UUID NOT NULL REFERENCES scheduler.graph_jobs(id) ON DELETE CASCADE,
tenant_id TEXT NOT NULL,
status scheduler.graph_job_status NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_graph_job_events_job ON scheduler.graph_job_events(job_id, created_at DESC);
-- Policy run jobs
CREATE TABLE IF NOT EXISTS scheduler.policy_jobs (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL,
policy_pack_id TEXT NOT NULL,
policy_version INT,
target_type TEXT NOT NULL,
target_id TEXT NOT NULL,
status TEXT NOT NULL CHECK (status IN ('pending','queued','running','completed','failed','cancelled')),
priority INT NOT NULL DEFAULT 100,
run_id TEXT,
requested_by TEXT,
mode TEXT,
metadata JSONB NOT NULL DEFAULT '{}',
inputs JSONB NOT NULL DEFAULT '{}',
attempt_count INT NOT NULL DEFAULT 0,
max_attempts INT NOT NULL DEFAULT 3,
queued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
available_at TIMESTAMPTZ,
submitted_at TIMESTAMPTZ,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
cancellation_requested BOOLEAN NOT NULL DEFAULT FALSE,
cancellation_reason TEXT,
cancelled_at TIMESTAMPTZ,
last_attempt_at TIMESTAMPTZ,
last_error TEXT,
lease_owner TEXT,
lease_expires_at TIMESTAMPTZ,
correlation_id TEXT
);
CREATE INDEX IF NOT EXISTS idx_policy_jobs_tenant_status ON scheduler.policy_jobs(tenant_id, status);
CREATE INDEX IF NOT EXISTS idx_policy_jobs_run ON scheduler.policy_jobs(run_id);
CREATE TABLE IF NOT EXISTS scheduler.policy_run_jobs (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL,
policy_id TEXT NOT NULL,
policy_version INT,
mode TEXT NOT NULL,
priority INT NOT NULL,
priority_rank INT NOT NULL,
run_id TEXT,
requested_by TEXT,
correlation_id TEXT,
metadata JSONB,
inputs JSONB NOT NULL,
queued_at TIMESTAMPTZ,
status scheduler.policy_run_status NOT NULL,
attempt_count INT NOT NULL,
last_attempt_at TIMESTAMPTZ,
last_error TEXT,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
available_at TIMESTAMPTZ NOT NULL,
submitted_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
lease_owner TEXT,
lease_expires_at TIMESTAMPTZ,
cancellation_requested BOOLEAN NOT NULL DEFAULT FALSE,
cancellation_requested_at TIMESTAMPTZ,
cancellation_reason TEXT,
cancelled_at TIMESTAMPTZ,
schema_version TEXT
);
CREATE INDEX IF NOT EXISTS idx_policy_run_jobs_tenant ON scheduler.policy_run_jobs(tenant_id);
CREATE INDEX IF NOT EXISTS idx_policy_run_jobs_status ON scheduler.policy_run_jobs(status);
CREATE INDEX IF NOT EXISTS idx_policy_run_jobs_run ON scheduler.policy_run_jobs(run_id);
CREATE INDEX IF NOT EXISTS idx_policy_run_jobs_policy ON scheduler.policy_run_jobs(tenant_id, policy_id);
-- Partitioned audit table
CREATE TABLE IF NOT EXISTS scheduler.audit (
id BIGSERIAL,
tenant_id TEXT NOT NULL,
user_id UUID,
action TEXT NOT NULL,
resource_type TEXT NOT NULL,
resource_id TEXT,
old_value JSONB,
new_value JSONB,
correlation_id TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at);
DO $$
DECLARE
v_start DATE;
v_end DATE;
v_partition_name TEXT;
BEGIN
v_start := date_trunc('month', NOW() - INTERVAL '6 months')::DATE;
WHILE v_start <= date_trunc('month', NOW() + INTERVAL '3 months')::DATE LOOP
v_end := (v_start + INTERVAL '1 month')::DATE;
v_partition_name := 'audit_' || to_char(v_start, 'YYYY_MM');
IF NOT EXISTS (
SELECT 1 FROM pg_class c
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = 'scheduler' AND c.relname = v_partition_name
) THEN
EXECUTE format(
'CREATE TABLE scheduler.%I PARTITION OF scheduler.audit FOR VALUES FROM (%L) TO (%L)',
v_partition_name, v_start, v_end
);
END IF;
v_start := v_end;
END LOOP;
END $$;
CREATE TABLE IF NOT EXISTS scheduler.audit_default PARTITION OF scheduler.audit DEFAULT;
CREATE INDEX IF NOT EXISTS ix_audit_tenant ON scheduler.audit(tenant_id);
CREATE INDEX IF NOT EXISTS ix_audit_resource ON scheduler.audit(resource_type, resource_id);
CREATE INDEX IF NOT EXISTS ix_audit_correlation ON scheduler.audit(correlation_id) WHERE correlation_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS brin_audit_created ON scheduler.audit USING BRIN(created_at) WITH (pages_per_range = 128);
COMMENT ON TABLE scheduler.audit IS 'Audit log for scheduler operations. Partitioned monthly by created_at for retention management.';
-- Row-Level Security
ALTER TABLE scheduler.schedules ENABLE ROW LEVEL SECURITY;
ALTER TABLE scheduler.schedules FORCE ROW LEVEL SECURITY;
CREATE POLICY schedules_tenant_isolation ON scheduler.schedules FOR ALL
USING (tenant_id = scheduler_app.require_current_tenant())
WITH CHECK (tenant_id = scheduler_app.require_current_tenant());
ALTER TABLE scheduler.runs ENABLE ROW LEVEL SECURITY;
ALTER TABLE scheduler.runs FORCE ROW LEVEL SECURITY;
CREATE POLICY runs_tenant_isolation ON scheduler.runs FOR ALL
USING (tenant_id = scheduler_app.require_current_tenant())
WITH CHECK (tenant_id = scheduler_app.require_current_tenant());
ALTER TABLE scheduler.jobs ENABLE ROW LEVEL SECURITY;
ALTER TABLE scheduler.jobs FORCE ROW LEVEL SECURITY;
CREATE POLICY jobs_tenant_isolation ON scheduler.jobs FOR ALL
USING (tenant_id = scheduler_app.require_current_tenant())
WITH CHECK (tenant_id = scheduler_app.require_current_tenant());
ALTER TABLE scheduler.triggers ENABLE ROW LEVEL SECURITY;
ALTER TABLE scheduler.triggers FORCE ROW LEVEL SECURITY;
CREATE POLICY triggers_tenant_isolation ON scheduler.triggers FOR ALL
USING (tenant_id = scheduler_app.require_current_tenant())
WITH CHECK (tenant_id = scheduler_app.require_current_tenant());
ALTER TABLE scheduler.graph_jobs ENABLE ROW LEVEL SECURITY;
ALTER TABLE scheduler.graph_jobs FORCE ROW LEVEL SECURITY;
CREATE POLICY graph_jobs_tenant_isolation ON scheduler.graph_jobs FOR ALL
USING (tenant_id = scheduler_app.require_current_tenant())
WITH CHECK (tenant_id = scheduler_app.require_current_tenant());
ALTER TABLE scheduler.policy_jobs ENABLE ROW LEVEL SECURITY;
ALTER TABLE scheduler.policy_jobs FORCE ROW LEVEL SECURITY;
CREATE POLICY policy_jobs_tenant_isolation ON scheduler.policy_jobs FOR ALL
USING (tenant_id = scheduler_app.require_current_tenant())
WITH CHECK (tenant_id = scheduler_app.require_current_tenant());
ALTER TABLE scheduler.locks ENABLE ROW LEVEL SECURITY;
ALTER TABLE scheduler.locks FORCE ROW LEVEL SECURITY;
CREATE POLICY locks_tenant_isolation ON scheduler.locks FOR ALL
USING (tenant_id = scheduler_app.require_current_tenant())
WITH CHECK (tenant_id = scheduler_app.require_current_tenant());
ALTER TABLE scheduler.impact_snapshots ENABLE ROW LEVEL SECURITY;
ALTER TABLE scheduler.impact_snapshots FORCE ROW LEVEL SECURITY;
CREATE POLICY impact_snapshots_tenant_isolation ON scheduler.impact_snapshots FOR ALL
USING (tenant_id = scheduler_app.require_current_tenant())
WITH CHECK (tenant_id = scheduler_app.require_current_tenant());
ALTER TABLE scheduler.run_summaries ENABLE ROW LEVEL SECURITY;
ALTER TABLE scheduler.run_summaries FORCE ROW LEVEL SECURITY;
CREATE POLICY run_summaries_tenant_isolation ON scheduler.run_summaries FOR ALL
USING (tenant_id = scheduler_app.require_current_tenant())
WITH CHECK (tenant_id = scheduler_app.require_current_tenant());
ALTER TABLE scheduler.audit ENABLE ROW LEVEL SECURITY;
ALTER TABLE scheduler.audit FORCE ROW LEVEL SECURITY;
CREATE POLICY audit_tenant_isolation ON scheduler.audit FOR ALL
USING (tenant_id = scheduler_app.require_current_tenant())
WITH CHECK (tenant_id = scheduler_app.require_current_tenant());
ALTER TABLE scheduler.job_history ENABLE ROW LEVEL SECURITY;
ALTER TABLE scheduler.job_history FORCE ROW LEVEL SECURITY;
CREATE POLICY job_history_tenant_isolation ON scheduler.job_history FOR ALL
USING (tenant_id = scheduler_app.require_current_tenant())
WITH CHECK (tenant_id = scheduler_app.require_current_tenant());
ALTER TABLE scheduler.metrics ENABLE ROW LEVEL SECURITY;
ALTER TABLE scheduler.metrics FORCE ROW LEVEL SECURITY;
CREATE POLICY metrics_tenant_isolation ON scheduler.metrics FOR ALL
USING (tenant_id = scheduler_app.require_current_tenant())
WITH CHECK (tenant_id = scheduler_app.require_current_tenant());
ALTER TABLE scheduler.execution_logs ENABLE ROW LEVEL SECURITY;
ALTER TABLE scheduler.execution_logs FORCE ROW LEVEL SECURITY;
CREATE POLICY execution_logs_tenant_isolation ON scheduler.execution_logs FOR ALL
USING (
run_id IN (SELECT id FROM scheduler.runs WHERE tenant_id = scheduler_app.require_current_tenant())
);
-- Admin bypass role
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'scheduler_admin') THEN
CREATE ROLE scheduler_admin WITH NOLOGIN BYPASSRLS;
END IF;
END
$$;
-- ============================================================================
-- 002_hlc_queue_chain.sql - HLC-ordered scheduler queue with chain linking
-- ============================================================================
CREATE TABLE IF NOT EXISTS scheduler.scheduler_log (
seq_bigint BIGSERIAL PRIMARY KEY,
tenant_id TEXT NOT NULL,
t_hlc TEXT NOT NULL,
partition_key TEXT DEFAULT '',
job_id UUID NOT NULL,
payload_hash BYTEA NOT NULL CHECK (octet_length(payload_hash) = 32),
prev_link BYTEA CHECK (prev_link IS NULL OR octet_length(prev_link) = 32),
link BYTEA NOT NULL CHECK (octet_length(link) = 32),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT uq_scheduler_log_order UNIQUE (tenant_id, t_hlc, partition_key, job_id)
);
CREATE INDEX IF NOT EXISTS idx_scheduler_log_tenant_hlc
ON scheduler.scheduler_log (tenant_id, t_hlc ASC);
CREATE INDEX IF NOT EXISTS idx_scheduler_log_partition
ON scheduler.scheduler_log (tenant_id, partition_key, t_hlc ASC);
CREATE INDEX IF NOT EXISTS idx_scheduler_log_job_id
ON scheduler.scheduler_log (job_id);
CREATE INDEX IF NOT EXISTS idx_scheduler_log_link
ON scheduler.scheduler_log (link);
CREATE INDEX IF NOT EXISTS idx_scheduler_log_created
ON scheduler.scheduler_log (tenant_id, created_at DESC);
CREATE TABLE IF NOT EXISTS scheduler.batch_snapshot (
batch_id UUID PRIMARY KEY,
tenant_id TEXT NOT NULL,
range_start_t TEXT NOT NULL,
range_end_t TEXT NOT NULL,
head_link BYTEA NOT NULL CHECK (octet_length(head_link) = 32),
job_count INT NOT NULL CHECK (job_count >= 0),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
signed_by TEXT,
signature BYTEA,
CONSTRAINT chk_signature_requires_signer CHECK (
(signature IS NULL AND signed_by IS NULL) OR
(signature IS NOT NULL AND signed_by IS NOT NULL)
)
);
CREATE INDEX IF NOT EXISTS idx_batch_snapshot_tenant
ON scheduler.batch_snapshot (tenant_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_batch_snapshot_range
ON scheduler.batch_snapshot (tenant_id, range_start_t, range_end_t);
CREATE TABLE IF NOT EXISTS scheduler.chain_heads (
tenant_id TEXT NOT NULL,
partition_key TEXT NOT NULL DEFAULT '',
last_link BYTEA NOT NULL CHECK (octet_length(last_link) = 32),
last_t_hlc TEXT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (tenant_id, partition_key)
);
CREATE INDEX IF NOT EXISTS idx_chain_heads_updated
ON scheduler.chain_heads (updated_at DESC);
CREATE OR REPLACE FUNCTION scheduler.upsert_chain_head(
p_tenant_id TEXT,
p_partition_key TEXT,
p_new_link BYTEA,
p_new_t_hlc TEXT
)
RETURNS VOID
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO scheduler.chain_heads (tenant_id, partition_key, last_link, last_t_hlc, updated_at)
VALUES (p_tenant_id, p_partition_key, p_new_link, p_new_t_hlc, NOW())
ON CONFLICT (tenant_id, partition_key)
DO UPDATE SET
last_link = EXCLUDED.last_link,
last_t_hlc = EXCLUDED.last_t_hlc,
updated_at = EXCLUDED.updated_at
WHERE scheduler.chain_heads.last_t_hlc < EXCLUDED.last_t_hlc;
END;
$$;
-- ============================================================================
-- 003_exception_lifecycle.sql - Exception management tables
-- ============================================================================
DO $$ BEGIN
CREATE TYPE scheduler.exception_state AS ENUM ('pending', 'active', 'expired', 'revoked');
EXCEPTION WHEN duplicate_object THEN NULL; END $$;
CREATE TABLE IF NOT EXISTS scheduler.scheduler_exceptions (
exception_id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL,
policy_id TEXT NOT NULL,
vulnerability_id TEXT NOT NULL,
component_purl TEXT,
state scheduler.exception_state NOT NULL DEFAULT 'pending',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
activation_date TIMESTAMPTZ,
expiration_date TIMESTAMPTZ,
activated_at TIMESTAMPTZ,
expired_at TIMESTAMPTZ,
justification TEXT,
created_by TEXT
);
CREATE INDEX IF NOT EXISTS idx_scheduler_exceptions_tenant
ON scheduler.scheduler_exceptions(tenant_id);
CREATE INDEX IF NOT EXISTS idx_scheduler_exceptions_state
ON scheduler.scheduler_exceptions(state);
CREATE INDEX IF NOT EXISTS idx_scheduler_exceptions_tenant_state
ON scheduler.scheduler_exceptions(tenant_id, state);
CREATE INDEX IF NOT EXISTS idx_scheduler_exceptions_pending_activation
ON scheduler.scheduler_exceptions(activation_date)
WHERE state = 'pending';
CREATE INDEX IF NOT EXISTS idx_scheduler_exceptions_active_expiration
ON scheduler.scheduler_exceptions(expiration_date)
WHERE state = 'active';
CREATE INDEX IF NOT EXISTS idx_scheduler_exceptions_policy
ON scheduler.scheduler_exceptions(tenant_id, policy_id);
CREATE INDEX IF NOT EXISTS idx_scheduler_exceptions_vulnerability
ON scheduler.scheduler_exceptions(tenant_id, vulnerability_id);
ALTER TABLE scheduler.scheduler_exceptions ENABLE ROW LEVEL SECURITY;
ALTER TABLE scheduler.scheduler_exceptions FORCE ROW LEVEL SECURITY;
CREATE POLICY scheduler_exceptions_tenant_isolation ON scheduler.scheduler_exceptions FOR ALL
USING (tenant_id = scheduler_app.require_current_tenant())
WITH CHECK (tenant_id = scheduler_app.require_current_tenant());