-- Scheduler: Consolidated init from migrations 001-003 -- Auto-generated for docker-compose postgres-init -- Creates all tables required by stellaops-scheduler-worker -- ============================================================================ -- 001_initial_schema.sql - Complete scheduler schema -- ============================================================================ CREATE SCHEMA IF NOT EXISTS scheduler; CREATE SCHEMA IF NOT EXISTS scheduler_app; -- Enum types DO $$ BEGIN CREATE TYPE scheduler.job_status AS ENUM ( 'pending', 'scheduled', 'leased', 'running', 'succeeded', 'failed', 'canceled', 'timed_out' ); EXCEPTION WHEN duplicate_object THEN null; END $$; DO $$ BEGIN CREATE TYPE scheduler.graph_job_type AS ENUM ('build', 'overlay'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; DO $$ BEGIN CREATE TYPE scheduler.graph_job_status AS ENUM ('pending', 'queued', 'running', 'completed', 'failed', 'canceled'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; DO $$ BEGIN CREATE TYPE scheduler.run_state AS ENUM ('planning','queued','running','completed','error','cancelled'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; DO $$ BEGIN CREATE TYPE scheduler.policy_run_status AS ENUM ('pending','submitted','retrying','failed','completed','cancelled'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; -- Helper functions CREATE OR REPLACE FUNCTION scheduler.update_updated_at() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = NOW(); RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION scheduler_app.require_current_tenant() RETURNS TEXT LANGUAGE plpgsql STABLE SECURITY DEFINER AS $$ DECLARE v_tenant TEXT; BEGIN v_tenant := current_setting('app.tenant_id', true); IF v_tenant IS NULL OR v_tenant = '' THEN RAISE EXCEPTION 'app.tenant_id session variable not set' USING HINT = 'Set via: SELECT set_config(''app.tenant_id'', '''', false)', ERRCODE = 'P0001'; END IF; RETURN v_tenant; END; $$; REVOKE ALL ON FUNCTION scheduler_app.require_current_tenant() FROM PUBLIC; -- Core tables: jobs CREATE TABLE IF NOT EXISTS scheduler.jobs ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), tenant_id TEXT NOT NULL, project_id TEXT, job_type TEXT NOT NULL, status scheduler.job_status NOT NULL DEFAULT 'pending', priority INT NOT NULL DEFAULT 0, payload JSONB NOT NULL DEFAULT '{}', payload_digest TEXT NOT NULL, idempotency_key TEXT NOT NULL, correlation_id TEXT, attempt INT NOT NULL DEFAULT 0, max_attempts INT NOT NULL DEFAULT 3, lease_id UUID, worker_id TEXT, lease_until TIMESTAMPTZ, not_before TIMESTAMPTZ, reason TEXT, result JSONB, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), scheduled_at TIMESTAMPTZ, leased_at TIMESTAMPTZ, started_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, created_by TEXT, UNIQUE(tenant_id, idempotency_key) ); CREATE INDEX IF NOT EXISTS idx_jobs_tenant_status ON scheduler.jobs(tenant_id, status); CREATE INDEX IF NOT EXISTS idx_jobs_tenant_type ON scheduler.jobs(tenant_id, job_type); CREATE INDEX IF NOT EXISTS idx_jobs_scheduled ON scheduler.jobs(tenant_id, status, not_before, priority DESC, created_at) WHERE status = 'scheduled'; CREATE INDEX IF NOT EXISTS idx_jobs_leased ON scheduler.jobs(tenant_id, status, lease_until) WHERE status = 'leased'; CREATE INDEX IF NOT EXISTS idx_jobs_project ON scheduler.jobs(tenant_id, project_id); CREATE INDEX IF NOT EXISTS idx_jobs_correlation ON scheduler.jobs(correlation_id); -- Triggers table (cron-based job triggers) CREATE TABLE IF NOT EXISTS scheduler.triggers ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), tenant_id TEXT NOT NULL, name TEXT NOT NULL, description TEXT, job_type TEXT NOT NULL, job_payload JSONB NOT NULL DEFAULT '{}', cron_expression TEXT NOT NULL, timezone TEXT NOT NULL DEFAULT 'UTC', enabled BOOLEAN NOT NULL DEFAULT TRUE, next_fire_at TIMESTAMPTZ, last_fire_at TIMESTAMPTZ, last_job_id UUID REFERENCES scheduler.jobs(id), fire_count BIGINT NOT NULL DEFAULT 0, misfire_count INT NOT NULL DEFAULT 0, metadata JSONB NOT NULL DEFAULT '{}', created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), created_by TEXT, UNIQUE(tenant_id, name) ); CREATE INDEX IF NOT EXISTS idx_triggers_tenant_id ON scheduler.triggers(tenant_id); CREATE INDEX IF NOT EXISTS idx_triggers_next_fire ON scheduler.triggers(enabled, next_fire_at) WHERE enabled = TRUE; CREATE INDEX IF NOT EXISTS idx_triggers_job_type ON scheduler.triggers(tenant_id, job_type); CREATE TRIGGER trg_triggers_updated_at BEFORE UPDATE ON scheduler.triggers FOR EACH ROW EXECUTE FUNCTION scheduler.update_updated_at(); -- Workers table (global, NOT RLS-protected) CREATE TABLE IF NOT EXISTS scheduler.workers ( id TEXT PRIMARY KEY, tenant_id TEXT, hostname TEXT NOT NULL, process_id INT, job_types TEXT[] NOT NULL DEFAULT '{}', max_concurrent_jobs INT NOT NULL DEFAULT 1, current_jobs INT NOT NULL DEFAULT 0, status TEXT NOT NULL DEFAULT 'active' CHECK (status IN ('active', 'draining', 'stopped')), last_heartbeat_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), registered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), metadata JSONB NOT NULL DEFAULT '{}' ); CREATE INDEX IF NOT EXISTS idx_workers_status ON scheduler.workers(status); CREATE INDEX IF NOT EXISTS idx_workers_heartbeat ON scheduler.workers(last_heartbeat_at); CREATE INDEX IF NOT EXISTS idx_workers_tenant ON scheduler.workers(tenant_id); COMMENT ON TABLE scheduler.workers IS 'Global worker registry. Not RLS-protected - workers serve all tenants.'; -- Distributed locks CREATE TABLE IF NOT EXISTS scheduler.locks ( lock_key TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, holder_id TEXT NOT NULL, acquired_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), expires_at TIMESTAMPTZ NOT NULL, metadata JSONB NOT NULL DEFAULT '{}' ); CREATE INDEX IF NOT EXISTS idx_locks_tenant ON scheduler.locks(tenant_id); CREATE INDEX IF NOT EXISTS idx_locks_expires ON scheduler.locks(expires_at); -- Job history CREATE TABLE IF NOT EXISTS scheduler.job_history ( id BIGSERIAL PRIMARY KEY, job_id UUID NOT NULL, tenant_id TEXT NOT NULL, project_id TEXT, job_type TEXT NOT NULL, status scheduler.job_status NOT NULL, attempt INT NOT NULL, payload_digest TEXT NOT NULL, result JSONB, reason TEXT, worker_id TEXT, duration_ms BIGINT, created_at TIMESTAMPTZ NOT NULL, completed_at TIMESTAMPTZ NOT NULL, archived_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_job_history_tenant ON scheduler.job_history(tenant_id); CREATE INDEX IF NOT EXISTS idx_job_history_job_id ON scheduler.job_history(job_id); CREATE INDEX IF NOT EXISTS idx_job_history_type ON scheduler.job_history(tenant_id, job_type); CREATE INDEX IF NOT EXISTS idx_job_history_completed ON scheduler.job_history(tenant_id, completed_at); -- Metrics table CREATE TABLE IF NOT EXISTS scheduler.metrics ( id BIGSERIAL PRIMARY KEY, tenant_id TEXT NOT NULL, job_type TEXT NOT NULL, period_start TIMESTAMPTZ NOT NULL, period_end TIMESTAMPTZ NOT NULL, jobs_created BIGINT NOT NULL DEFAULT 0, jobs_completed BIGINT NOT NULL DEFAULT 0, jobs_failed BIGINT NOT NULL DEFAULT 0, jobs_timed_out BIGINT NOT NULL DEFAULT 0, avg_duration_ms BIGINT, p50_duration_ms BIGINT, p95_duration_ms BIGINT, p99_duration_ms BIGINT, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), UNIQUE(tenant_id, job_type, period_start) ); CREATE INDEX IF NOT EXISTS idx_metrics_tenant_period ON scheduler.metrics(tenant_id, period_start); -- Schedules and runs CREATE TABLE IF NOT EXISTS scheduler.schedules ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, name TEXT NOT NULL, description TEXT, enabled BOOLEAN NOT NULL DEFAULT TRUE, cron_expression TEXT, timezone TEXT NOT NULL DEFAULT 'UTC', mode TEXT NOT NULL CHECK (mode IN ('analysisonly', 'contentrefresh')), selection JSONB NOT NULL DEFAULT '{}', only_if JSONB NOT NULL DEFAULT '{}', notify JSONB NOT NULL DEFAULT '{}', limits JSONB NOT NULL DEFAULT '{}', subscribers TEXT[] NOT NULL DEFAULT '{}', created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), created_by TEXT NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_by TEXT NOT NULL, deleted_at TIMESTAMPTZ, deleted_by TEXT ); CREATE INDEX IF NOT EXISTS idx_schedules_tenant ON scheduler.schedules(tenant_id) WHERE deleted_at IS NULL; CREATE INDEX IF NOT EXISTS idx_schedules_enabled ON scheduler.schedules(tenant_id, enabled) WHERE deleted_at IS NULL; CREATE UNIQUE INDEX IF NOT EXISTS uq_schedules_tenant_name_active ON scheduler.schedules(tenant_id, name) WHERE deleted_at IS NULL; -- Runs table CREATE TABLE IF NOT EXISTS scheduler.runs ( id TEXT NOT NULL, tenant_id TEXT NOT NULL, schedule_id TEXT, trigger JSONB NOT NULL, state scheduler.run_state NOT NULL, stats JSONB NOT NULL, reason JSONB NOT NULL, created_at TIMESTAMPTZ NOT NULL, started_at TIMESTAMPTZ, finished_at TIMESTAMPTZ, error TEXT, deltas JSONB NOT NULL, retry_of TEXT, schema_version TEXT, finding_count INT GENERATED ALWAYS AS (NULLIF((stats->>'findingCount'), '')::int) STORED, critical_count INT GENERATED ALWAYS AS (NULLIF((stats->>'criticalCount'), '')::int) STORED, high_count INT GENERATED ALWAYS AS (NULLIF((stats->>'highCount'), '')::int) STORED, new_finding_count INT GENERATED ALWAYS AS (NULLIF((stats->>'newFindingCount'), '')::int) STORED, component_count INT GENERATED ALWAYS AS (NULLIF((stats->>'componentCount'), '')::int) STORED, PRIMARY KEY (tenant_id, id) ); CREATE INDEX IF NOT EXISTS idx_runs_state ON scheduler.runs(state); CREATE INDEX IF NOT EXISTS idx_runs_schedule ON scheduler.runs(tenant_id, schedule_id); CREATE INDEX IF NOT EXISTS idx_runs_created ON scheduler.runs(created_at); CREATE INDEX IF NOT EXISTS ix_runs_with_findings ON scheduler.runs(tenant_id, created_at DESC) WHERE finding_count > 0; CREATE INDEX IF NOT EXISTS ix_runs_critical ON scheduler.runs(tenant_id, created_at DESC, critical_count) WHERE critical_count > 0; CREATE INDEX IF NOT EXISTS ix_runs_summary_cover ON scheduler.runs(tenant_id, state, created_at DESC) INCLUDE (finding_count, critical_count, high_count, new_finding_count); CREATE INDEX IF NOT EXISTS ix_runs_tenant_findings ON scheduler.runs(tenant_id, finding_count DESC, created_at DESC) WHERE state = 'completed'; -- Impact snapshots CREATE TABLE IF NOT EXISTS scheduler.impact_snapshots ( snapshot_id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, run_id TEXT, impact JSONB NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_impact_snapshots_run ON scheduler.impact_snapshots(run_id); -- Run summaries CREATE TABLE IF NOT EXISTS scheduler.run_summaries ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, schedule_id TEXT REFERENCES scheduler.schedules(id), period_start TIMESTAMPTZ NOT NULL, period_end TIMESTAMPTZ NOT NULL, total_runs INT NOT NULL DEFAULT 0, successful_runs INT NOT NULL DEFAULT 0, failed_runs INT NOT NULL DEFAULT 0, cancelled_runs INT NOT NULL DEFAULT 0, avg_duration_seconds NUMERIC(10,2), max_duration_seconds INT, min_duration_seconds INT, total_findings_detected INT NOT NULL DEFAULT 0, new_criticals INT NOT NULL DEFAULT 0, computed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), UNIQUE (tenant_id, schedule_id, period_start) ); CREATE INDEX IF NOT EXISTS idx_run_summaries_tenant ON scheduler.run_summaries(tenant_id, period_start DESC); -- Execution logs CREATE TABLE IF NOT EXISTS scheduler.execution_logs ( id BIGSERIAL PRIMARY KEY, run_id TEXT NOT NULL, logged_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), level TEXT NOT NULL, message TEXT NOT NULL, logger TEXT, data JSONB NOT NULL DEFAULT '{}' ); CREATE INDEX IF NOT EXISTS idx_execution_logs_run ON scheduler.execution_logs(run_id); -- Graph jobs (v2 schema) CREATE TABLE IF NOT EXISTS scheduler.graph_jobs ( id UUID PRIMARY KEY, tenant_id TEXT NOT NULL, type scheduler.graph_job_type NOT NULL, status scheduler.graph_job_status NOT NULL, payload JSONB NOT NULL, correlation_id TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_graph_jobs_tenant_status ON scheduler.graph_jobs(tenant_id, status, created_at DESC); CREATE INDEX IF NOT EXISTS idx_graph_jobs_tenant_type_status ON scheduler.graph_jobs(tenant_id, type, status, created_at DESC); CREATE TABLE IF NOT EXISTS scheduler.graph_job_events ( id BIGSERIAL PRIMARY KEY, job_id UUID NOT NULL REFERENCES scheduler.graph_jobs(id) ON DELETE CASCADE, tenant_id TEXT NOT NULL, status scheduler.graph_job_status NOT NULL, payload JSONB NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_graph_job_events_job ON scheduler.graph_job_events(job_id, created_at DESC); -- Policy run jobs CREATE TABLE IF NOT EXISTS scheduler.policy_jobs ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, policy_pack_id TEXT NOT NULL, policy_version INT, target_type TEXT NOT NULL, target_id TEXT NOT NULL, status TEXT NOT NULL CHECK (status IN ('pending','queued','running','completed','failed','cancelled')), priority INT NOT NULL DEFAULT 100, run_id TEXT, requested_by TEXT, mode TEXT, metadata JSONB NOT NULL DEFAULT '{}', inputs JSONB NOT NULL DEFAULT '{}', attempt_count INT NOT NULL DEFAULT 0, max_attempts INT NOT NULL DEFAULT 3, queued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), available_at TIMESTAMPTZ, submitted_at TIMESTAMPTZ, started_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, cancellation_requested BOOLEAN NOT NULL DEFAULT FALSE, cancellation_reason TEXT, cancelled_at TIMESTAMPTZ, last_attempt_at TIMESTAMPTZ, last_error TEXT, lease_owner TEXT, lease_expires_at TIMESTAMPTZ, correlation_id TEXT ); CREATE INDEX IF NOT EXISTS idx_policy_jobs_tenant_status ON scheduler.policy_jobs(tenant_id, status); CREATE INDEX IF NOT EXISTS idx_policy_jobs_run ON scheduler.policy_jobs(run_id); CREATE TABLE IF NOT EXISTS scheduler.policy_run_jobs ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, policy_id TEXT NOT NULL, policy_version INT, mode TEXT NOT NULL, priority INT NOT NULL, priority_rank INT NOT NULL, run_id TEXT, requested_by TEXT, correlation_id TEXT, metadata JSONB, inputs JSONB NOT NULL, queued_at TIMESTAMPTZ, status scheduler.policy_run_status NOT NULL, attempt_count INT NOT NULL, last_attempt_at TIMESTAMPTZ, last_error TEXT, created_at TIMESTAMPTZ NOT NULL, updated_at TIMESTAMPTZ NOT NULL, available_at TIMESTAMPTZ NOT NULL, submitted_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, lease_owner TEXT, lease_expires_at TIMESTAMPTZ, cancellation_requested BOOLEAN NOT NULL DEFAULT FALSE, cancellation_requested_at TIMESTAMPTZ, cancellation_reason TEXT, cancelled_at TIMESTAMPTZ, schema_version TEXT ); CREATE INDEX IF NOT EXISTS idx_policy_run_jobs_tenant ON scheduler.policy_run_jobs(tenant_id); CREATE INDEX IF NOT EXISTS idx_policy_run_jobs_status ON scheduler.policy_run_jobs(status); CREATE INDEX IF NOT EXISTS idx_policy_run_jobs_run ON scheduler.policy_run_jobs(run_id); CREATE INDEX IF NOT EXISTS idx_policy_run_jobs_policy ON scheduler.policy_run_jobs(tenant_id, policy_id); -- Partitioned audit table CREATE TABLE IF NOT EXISTS scheduler.audit ( id BIGSERIAL, tenant_id TEXT NOT NULL, user_id UUID, action TEXT NOT NULL, resource_type TEXT NOT NULL, resource_id TEXT, old_value JSONB, new_value JSONB, correlation_id TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (id, created_at) ) PARTITION BY RANGE (created_at); DO $$ DECLARE v_start DATE; v_end DATE; v_partition_name TEXT; BEGIN v_start := date_trunc('month', NOW() - INTERVAL '6 months')::DATE; WHILE v_start <= date_trunc('month', NOW() + INTERVAL '3 months')::DATE LOOP v_end := (v_start + INTERVAL '1 month')::DATE; v_partition_name := 'audit_' || to_char(v_start, 'YYYY_MM'); IF NOT EXISTS ( SELECT 1 FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname = 'scheduler' AND c.relname = v_partition_name ) THEN EXECUTE format( 'CREATE TABLE scheduler.%I PARTITION OF scheduler.audit FOR VALUES FROM (%L) TO (%L)', v_partition_name, v_start, v_end ); END IF; v_start := v_end; END LOOP; END $$; CREATE TABLE IF NOT EXISTS scheduler.audit_default PARTITION OF scheduler.audit DEFAULT; CREATE INDEX IF NOT EXISTS ix_audit_tenant ON scheduler.audit(tenant_id); CREATE INDEX IF NOT EXISTS ix_audit_resource ON scheduler.audit(resource_type, resource_id); CREATE INDEX IF NOT EXISTS ix_audit_correlation ON scheduler.audit(correlation_id) WHERE correlation_id IS NOT NULL; CREATE INDEX IF NOT EXISTS brin_audit_created ON scheduler.audit USING BRIN(created_at) WITH (pages_per_range = 128); COMMENT ON TABLE scheduler.audit IS 'Audit log for scheduler operations. Partitioned monthly by created_at for retention management.'; -- Row-Level Security ALTER TABLE scheduler.schedules ENABLE ROW LEVEL SECURITY; ALTER TABLE scheduler.schedules FORCE ROW LEVEL SECURITY; CREATE POLICY schedules_tenant_isolation ON scheduler.schedules FOR ALL USING (tenant_id = scheduler_app.require_current_tenant()) WITH CHECK (tenant_id = scheduler_app.require_current_tenant()); ALTER TABLE scheduler.runs ENABLE ROW LEVEL SECURITY; ALTER TABLE scheduler.runs FORCE ROW LEVEL SECURITY; CREATE POLICY runs_tenant_isolation ON scheduler.runs FOR ALL USING (tenant_id = scheduler_app.require_current_tenant()) WITH CHECK (tenant_id = scheduler_app.require_current_tenant()); ALTER TABLE scheduler.jobs ENABLE ROW LEVEL SECURITY; ALTER TABLE scheduler.jobs FORCE ROW LEVEL SECURITY; CREATE POLICY jobs_tenant_isolation ON scheduler.jobs FOR ALL USING (tenant_id = scheduler_app.require_current_tenant()) WITH CHECK (tenant_id = scheduler_app.require_current_tenant()); ALTER TABLE scheduler.triggers ENABLE ROW LEVEL SECURITY; ALTER TABLE scheduler.triggers FORCE ROW LEVEL SECURITY; CREATE POLICY triggers_tenant_isolation ON scheduler.triggers FOR ALL USING (tenant_id = scheduler_app.require_current_tenant()) WITH CHECK (tenant_id = scheduler_app.require_current_tenant()); ALTER TABLE scheduler.graph_jobs ENABLE ROW LEVEL SECURITY; ALTER TABLE scheduler.graph_jobs FORCE ROW LEVEL SECURITY; CREATE POLICY graph_jobs_tenant_isolation ON scheduler.graph_jobs FOR ALL USING (tenant_id = scheduler_app.require_current_tenant()) WITH CHECK (tenant_id = scheduler_app.require_current_tenant()); ALTER TABLE scheduler.policy_jobs ENABLE ROW LEVEL SECURITY; ALTER TABLE scheduler.policy_jobs FORCE ROW LEVEL SECURITY; CREATE POLICY policy_jobs_tenant_isolation ON scheduler.policy_jobs FOR ALL USING (tenant_id = scheduler_app.require_current_tenant()) WITH CHECK (tenant_id = scheduler_app.require_current_tenant()); ALTER TABLE scheduler.locks ENABLE ROW LEVEL SECURITY; ALTER TABLE scheduler.locks FORCE ROW LEVEL SECURITY; CREATE POLICY locks_tenant_isolation ON scheduler.locks FOR ALL USING (tenant_id = scheduler_app.require_current_tenant()) WITH CHECK (tenant_id = scheduler_app.require_current_tenant()); ALTER TABLE scheduler.impact_snapshots ENABLE ROW LEVEL SECURITY; ALTER TABLE scheduler.impact_snapshots FORCE ROW LEVEL SECURITY; CREATE POLICY impact_snapshots_tenant_isolation ON scheduler.impact_snapshots FOR ALL USING (tenant_id = scheduler_app.require_current_tenant()) WITH CHECK (tenant_id = scheduler_app.require_current_tenant()); ALTER TABLE scheduler.run_summaries ENABLE ROW LEVEL SECURITY; ALTER TABLE scheduler.run_summaries FORCE ROW LEVEL SECURITY; CREATE POLICY run_summaries_tenant_isolation ON scheduler.run_summaries FOR ALL USING (tenant_id = scheduler_app.require_current_tenant()) WITH CHECK (tenant_id = scheduler_app.require_current_tenant()); ALTER TABLE scheduler.audit ENABLE ROW LEVEL SECURITY; ALTER TABLE scheduler.audit FORCE ROW LEVEL SECURITY; CREATE POLICY audit_tenant_isolation ON scheduler.audit FOR ALL USING (tenant_id = scheduler_app.require_current_tenant()) WITH CHECK (tenant_id = scheduler_app.require_current_tenant()); ALTER TABLE scheduler.job_history ENABLE ROW LEVEL SECURITY; ALTER TABLE scheduler.job_history FORCE ROW LEVEL SECURITY; CREATE POLICY job_history_tenant_isolation ON scheduler.job_history FOR ALL USING (tenant_id = scheduler_app.require_current_tenant()) WITH CHECK (tenant_id = scheduler_app.require_current_tenant()); ALTER TABLE scheduler.metrics ENABLE ROW LEVEL SECURITY; ALTER TABLE scheduler.metrics FORCE ROW LEVEL SECURITY; CREATE POLICY metrics_tenant_isolation ON scheduler.metrics FOR ALL USING (tenant_id = scheduler_app.require_current_tenant()) WITH CHECK (tenant_id = scheduler_app.require_current_tenant()); ALTER TABLE scheduler.execution_logs ENABLE ROW LEVEL SECURITY; ALTER TABLE scheduler.execution_logs FORCE ROW LEVEL SECURITY; CREATE POLICY execution_logs_tenant_isolation ON scheduler.execution_logs FOR ALL USING ( run_id IN (SELECT id FROM scheduler.runs WHERE tenant_id = scheduler_app.require_current_tenant()) ); -- Admin bypass role DO $$ BEGIN IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'scheduler_admin') THEN CREATE ROLE scheduler_admin WITH NOLOGIN BYPASSRLS; END IF; END $$; -- ============================================================================ -- 002_hlc_queue_chain.sql - HLC-ordered scheduler queue with chain linking -- ============================================================================ CREATE TABLE IF NOT EXISTS scheduler.scheduler_log ( seq_bigint BIGSERIAL PRIMARY KEY, tenant_id TEXT NOT NULL, t_hlc TEXT NOT NULL, partition_key TEXT DEFAULT '', job_id UUID NOT NULL, payload_hash BYTEA NOT NULL CHECK (octet_length(payload_hash) = 32), prev_link BYTEA CHECK (prev_link IS NULL OR octet_length(prev_link) = 32), link BYTEA NOT NULL CHECK (octet_length(link) = 32), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), CONSTRAINT uq_scheduler_log_order UNIQUE (tenant_id, t_hlc, partition_key, job_id) ); CREATE INDEX IF NOT EXISTS idx_scheduler_log_tenant_hlc ON scheduler.scheduler_log (tenant_id, t_hlc ASC); CREATE INDEX IF NOT EXISTS idx_scheduler_log_partition ON scheduler.scheduler_log (tenant_id, partition_key, t_hlc ASC); CREATE INDEX IF NOT EXISTS idx_scheduler_log_job_id ON scheduler.scheduler_log (job_id); CREATE INDEX IF NOT EXISTS idx_scheduler_log_link ON scheduler.scheduler_log (link); CREATE INDEX IF NOT EXISTS idx_scheduler_log_created ON scheduler.scheduler_log (tenant_id, created_at DESC); CREATE TABLE IF NOT EXISTS scheduler.batch_snapshot ( batch_id UUID PRIMARY KEY, tenant_id TEXT NOT NULL, range_start_t TEXT NOT NULL, range_end_t TEXT NOT NULL, head_link BYTEA NOT NULL CHECK (octet_length(head_link) = 32), job_count INT NOT NULL CHECK (job_count >= 0), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), signed_by TEXT, signature BYTEA, CONSTRAINT chk_signature_requires_signer CHECK ( (signature IS NULL AND signed_by IS NULL) OR (signature IS NOT NULL AND signed_by IS NOT NULL) ) ); CREATE INDEX IF NOT EXISTS idx_batch_snapshot_tenant ON scheduler.batch_snapshot (tenant_id, created_at DESC); CREATE INDEX IF NOT EXISTS idx_batch_snapshot_range ON scheduler.batch_snapshot (tenant_id, range_start_t, range_end_t); CREATE TABLE IF NOT EXISTS scheduler.chain_heads ( tenant_id TEXT NOT NULL, partition_key TEXT NOT NULL DEFAULT '', last_link BYTEA NOT NULL CHECK (octet_length(last_link) = 32), last_t_hlc TEXT NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (tenant_id, partition_key) ); CREATE INDEX IF NOT EXISTS idx_chain_heads_updated ON scheduler.chain_heads (updated_at DESC); CREATE OR REPLACE FUNCTION scheduler.upsert_chain_head( p_tenant_id TEXT, p_partition_key TEXT, p_new_link BYTEA, p_new_t_hlc TEXT ) RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN INSERT INTO scheduler.chain_heads (tenant_id, partition_key, last_link, last_t_hlc, updated_at) VALUES (p_tenant_id, p_partition_key, p_new_link, p_new_t_hlc, NOW()) ON CONFLICT (tenant_id, partition_key) DO UPDATE SET last_link = EXCLUDED.last_link, last_t_hlc = EXCLUDED.last_t_hlc, updated_at = EXCLUDED.updated_at WHERE scheduler.chain_heads.last_t_hlc < EXCLUDED.last_t_hlc; END; $$; -- ============================================================================ -- 003_exception_lifecycle.sql - Exception management tables -- ============================================================================ DO $$ BEGIN CREATE TYPE scheduler.exception_state AS ENUM ('pending', 'active', 'expired', 'revoked'); EXCEPTION WHEN duplicate_object THEN NULL; END $$; CREATE TABLE IF NOT EXISTS scheduler.scheduler_exceptions ( exception_id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, policy_id TEXT NOT NULL, vulnerability_id TEXT NOT NULL, component_purl TEXT, state scheduler.exception_state NOT NULL DEFAULT 'pending', created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), activation_date TIMESTAMPTZ, expiration_date TIMESTAMPTZ, activated_at TIMESTAMPTZ, expired_at TIMESTAMPTZ, justification TEXT, created_by TEXT ); CREATE INDEX IF NOT EXISTS idx_scheduler_exceptions_tenant ON scheduler.scheduler_exceptions(tenant_id); CREATE INDEX IF NOT EXISTS idx_scheduler_exceptions_state ON scheduler.scheduler_exceptions(state); CREATE INDEX IF NOT EXISTS idx_scheduler_exceptions_tenant_state ON scheduler.scheduler_exceptions(tenant_id, state); CREATE INDEX IF NOT EXISTS idx_scheduler_exceptions_pending_activation ON scheduler.scheduler_exceptions(activation_date) WHERE state = 'pending'; CREATE INDEX IF NOT EXISTS idx_scheduler_exceptions_active_expiration ON scheduler.scheduler_exceptions(expiration_date) WHERE state = 'active'; CREATE INDEX IF NOT EXISTS idx_scheduler_exceptions_policy ON scheduler.scheduler_exceptions(tenant_id, policy_id); CREATE INDEX IF NOT EXISTS idx_scheduler_exceptions_vulnerability ON scheduler.scheduler_exceptions(tenant_id, vulnerability_id); ALTER TABLE scheduler.scheduler_exceptions ENABLE ROW LEVEL SECURITY; ALTER TABLE scheduler.scheduler_exceptions FORCE ROW LEVEL SECURITY; CREATE POLICY scheduler_exceptions_tenant_isolation ON scheduler.scheduler_exceptions FOR ALL USING (tenant_id = scheduler_app.require_current_tenant()) WITH CHECK (tenant_id = scheduler_app.require_current_tenant());