From 1e53976ffbc99c8e10352f83ed272e4550ec3da0 Mon Sep 17 00:00:00 2001 From: master <> Date: Mon, 9 Mar 2026 08:38:20 +0200 Subject: [PATCH] fix(jobengine): make all orchestrator migration SQL idempotent and PostgreSQL-compatible Fix 4 classes of issues that prevented JobEngine from auto-migrating: 1. Non-idempotent DDL: add IF NOT EXISTS to CREATE TABLE, wrap CREATE TYPE in DO blocks with EXCEPTION WHEN duplicate_object, wrap partition creation with EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' 2. Reserved keyword: quote `window` column name in 004_slo_quotas.sql 3. Invalid syntax: replace DELETE...LIMIT with ctid subquery pattern in 004_slo_quotas.sql and 005_audit_ledger.sql 4. Partition constraint: add tenant_id to UNIQUE(log_id) constraint on pack_run_logs in 006_pack_runs.sql (partitioned tables require partition key in all unique constraints) 5. Non-immutable index predicate: remove NOW() from partial index predicate in 002_backfill.sql 6. Remove BEGIN/COMMIT wrappers from all migration files (the StartupMigrationHost already wraps each migration in a transaction) All 8 orchestrator migrations (001-008) now apply cleanly on fresh DB. Co-Authored-By: Claude Opus 4.6 --- .../migrations/001_initial.sql | 193 ++++++++++-------- .../migrations/002_backfill.sql | 67 +++--- .../migrations/003_dead_letter.sql | 105 +++++----- .../migrations/004_slo_quotas.sql | 16 +- .../migrations/005_audit_ledger.sql | 14 +- .../migrations/006_pack_runs.sql | 52 ++--- .../007_pack_run_logs_integrity.sql | 12 +- .../migrations/008_first_signal_snapshots.sql | 15 +- 8 files changed, 266 insertions(+), 208 deletions(-) diff --git a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/001_initial.sql b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/001_initial.sql index da7f0ff53..60bc43d82 100644 --- a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/001_initial.sql +++ b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/001_initial.sql @@ -1,43 +1,50 @@ -- 001_initial.sql -- Orchestrator bootstrap schema (ORCH-SVC-32-001) -- Creates core tables for sources, runs, jobs, DAG edges, artifacts, quotas, schedules, and incidents. - -BEGIN; +-- All statements are idempotent so the migration is safe on databases bootstrapped by postgres-init scripts. -- Enum types for job and run statuses -CREATE TYPE job_status AS ENUM ( - 'pending', - 'scheduled', - 'leased', - 'succeeded', - 'failed', - 'canceled', - 'timed_out' -); +DO $$ BEGIN + CREATE TYPE job_status AS ENUM ( + 'pending', + 'scheduled', + 'leased', + 'succeeded', + 'failed', + 'canceled', + 'timed_out' + ); +EXCEPTION WHEN duplicate_object THEN NULL; END $$; -CREATE TYPE run_status AS ENUM ( - 'pending', - 'running', - 'succeeded', - 'partially_succeeded', - 'failed', - 'canceled' -); +DO $$ BEGIN + CREATE TYPE run_status AS ENUM ( + 'pending', + 'running', + 'succeeded', + 'partially_succeeded', + 'failed', + 'canceled' + ); +EXCEPTION WHEN duplicate_object THEN NULL; END $$; -CREATE TYPE incident_status AS ENUM ( - 'open', - 'acknowledged', - 'resolved' -); +DO $$ BEGIN + CREATE TYPE incident_status AS ENUM ( + 'open', + 'acknowledged', + 'resolved' + ); +EXCEPTION WHEN duplicate_object THEN NULL; END $$; -CREATE TYPE dag_edge_type AS ENUM ( - 'success', - 'always', - 'failure' -); +DO $$ BEGIN + CREATE TYPE dag_edge_type AS ENUM ( + 'success', + 'always', + 'failure' + ); +EXCEPTION WHEN duplicate_object THEN NULL; END $$; -- Sources: Job producers (Concelier, Scanner, Export, etc.) -CREATE TABLE sources ( +CREATE TABLE IF NOT EXISTS sources ( source_id UUID NOT NULL, tenant_id TEXT NOT NULL, name TEXT NOT NULL, @@ -54,13 +61,15 @@ CREATE TABLE sources ( CONSTRAINT uq_sources_name UNIQUE (tenant_id, name) ) PARTITION BY LIST (tenant_id); -CREATE TABLE sources_default PARTITION OF sources DEFAULT; +DO $$ BEGIN + CREATE TABLE sources_default PARTITION OF sources DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_sources_type ON sources (tenant_id, source_type); -CREATE INDEX ix_sources_enabled ON sources (tenant_id, enabled) WHERE enabled = TRUE; +CREATE INDEX IF NOT EXISTS ix_sources_type ON sources (tenant_id, source_type); +CREATE INDEX IF NOT EXISTS ix_sources_enabled ON sources (tenant_id, enabled) WHERE enabled = TRUE; -- Runs: Batch/workflow executions containing jobs -CREATE TABLE runs ( +CREATE TABLE IF NOT EXISTS runs ( run_id UUID NOT NULL, tenant_id TEXT NOT NULL, project_id TEXT, @@ -81,15 +90,17 @@ CREATE TABLE runs ( CONSTRAINT fk_runs_source FOREIGN KEY (tenant_id, source_id) REFERENCES sources (tenant_id, source_id) ) PARTITION BY LIST (tenant_id); -CREATE TABLE runs_default PARTITION OF runs DEFAULT; +DO $$ BEGIN + CREATE TABLE runs_default PARTITION OF runs DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_runs_status ON runs (tenant_id, status, created_at DESC); -CREATE INDEX ix_runs_source ON runs (tenant_id, source_id, created_at DESC); -CREATE INDEX ix_runs_project ON runs (tenant_id, project_id, created_at DESC) WHERE project_id IS NOT NULL; -CREATE INDEX ix_runs_correlation ON runs (tenant_id, correlation_id) WHERE correlation_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_runs_status ON runs (tenant_id, status, created_at DESC); +CREATE INDEX IF NOT EXISTS ix_runs_source ON runs (tenant_id, source_id, created_at DESC); +CREATE INDEX IF NOT EXISTS ix_runs_project ON runs (tenant_id, project_id, created_at DESC) WHERE project_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_runs_correlation ON runs (tenant_id, correlation_id) WHERE correlation_id IS NOT NULL; -- Jobs: Individual units of work -CREATE TABLE jobs ( +CREATE TABLE IF NOT EXISTS jobs ( job_id UUID NOT NULL, tenant_id TEXT NOT NULL, project_id TEXT, @@ -122,19 +133,21 @@ CREATE TABLE jobs ( CONSTRAINT ck_jobs_max_attempts_positive CHECK (max_attempts >= 1) ) PARTITION BY LIST (tenant_id); -CREATE TABLE jobs_default PARTITION OF jobs DEFAULT; +DO $$ BEGIN + CREATE TABLE jobs_default PARTITION OF jobs DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_jobs_status ON jobs (tenant_id, status, priority DESC, created_at); -CREATE INDEX ix_jobs_type_status ON jobs (tenant_id, job_type, status, created_at); -CREATE INDEX ix_jobs_run ON jobs (tenant_id, run_id) WHERE run_id IS NOT NULL; -CREATE INDEX ix_jobs_lease ON jobs (tenant_id, lease_id) WHERE lease_id IS NOT NULL; -CREATE INDEX ix_jobs_lease_expiry ON jobs (tenant_id, lease_until) WHERE status = 'leased' AND lease_until IS NOT NULL; -CREATE INDEX ix_jobs_not_before ON jobs (tenant_id, not_before) WHERE status = 'pending' AND not_before IS NOT NULL; -CREATE INDEX ix_jobs_scheduled ON jobs (tenant_id, job_type, status, scheduled_at) WHERE status = 'scheduled'; -CREATE INDEX ix_jobs_replay ON jobs (tenant_id, replay_of) WHERE replay_of IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_jobs_status ON jobs (tenant_id, status, priority DESC, created_at); +CREATE INDEX IF NOT EXISTS ix_jobs_type_status ON jobs (tenant_id, job_type, status, created_at); +CREATE INDEX IF NOT EXISTS ix_jobs_run ON jobs (tenant_id, run_id) WHERE run_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_jobs_lease ON jobs (tenant_id, lease_id) WHERE lease_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_jobs_lease_expiry ON jobs (tenant_id, lease_until) WHERE status = 'leased' AND lease_until IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_jobs_not_before ON jobs (tenant_id, not_before) WHERE status = 'pending' AND not_before IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_jobs_scheduled ON jobs (tenant_id, job_type, status, scheduled_at) WHERE status = 'scheduled'; +CREATE INDEX IF NOT EXISTS ix_jobs_replay ON jobs (tenant_id, replay_of) WHERE replay_of IS NOT NULL; -- Job History: Immutable audit trail for job state changes -CREATE TABLE job_history ( +CREATE TABLE IF NOT EXISTS job_history ( history_id UUID NOT NULL, tenant_id TEXT NOT NULL, job_id UUID NOT NULL, @@ -153,12 +166,14 @@ CREATE TABLE job_history ( CONSTRAINT ck_job_history_actor_type CHECK (actor_type IN ('system', 'operator', 'worker')) ) PARTITION BY LIST (tenant_id); -CREATE TABLE job_history_default PARTITION OF job_history DEFAULT; +DO $$ BEGIN + CREATE TABLE job_history_default PARTITION OF job_history DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_job_history_occurred ON job_history (tenant_id, job_id, occurred_at DESC); +CREATE INDEX IF NOT EXISTS ix_job_history_occurred ON job_history (tenant_id, job_id, occurred_at DESC); -- DAG Edges: Job dependencies within a run -CREATE TABLE dag_edges ( +CREATE TABLE IF NOT EXISTS dag_edges ( edge_id UUID NOT NULL, tenant_id TEXT NOT NULL, run_id UUID NOT NULL, @@ -174,14 +189,16 @@ CREATE TABLE dag_edges ( CONSTRAINT ck_dag_edges_no_self_loop CHECK (parent_job_id <> child_job_id) ) PARTITION BY LIST (tenant_id); -CREATE TABLE dag_edges_default PARTITION OF dag_edges DEFAULT; +DO $$ BEGIN + CREATE TABLE dag_edges_default PARTITION OF dag_edges DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_dag_edges_run ON dag_edges (tenant_id, run_id); -CREATE INDEX ix_dag_edges_parent ON dag_edges (tenant_id, parent_job_id); -CREATE INDEX ix_dag_edges_child ON dag_edges (tenant_id, child_job_id); +CREATE INDEX IF NOT EXISTS ix_dag_edges_run ON dag_edges (tenant_id, run_id); +CREATE INDEX IF NOT EXISTS ix_dag_edges_parent ON dag_edges (tenant_id, parent_job_id); +CREATE INDEX IF NOT EXISTS ix_dag_edges_child ON dag_edges (tenant_id, child_job_id); -- Artifacts: Job outputs with provenance -CREATE TABLE artifacts ( +CREATE TABLE IF NOT EXISTS artifacts ( artifact_id UUID NOT NULL, tenant_id TEXT NOT NULL, job_id UUID NOT NULL, @@ -198,15 +215,17 @@ CREATE TABLE artifacts ( CONSTRAINT ck_artifacts_digest_hex CHECK (digest ~ '^[0-9a-f]{64}$') ) PARTITION BY LIST (tenant_id); -CREATE TABLE artifacts_default PARTITION OF artifacts DEFAULT; +DO $$ BEGIN + CREATE TABLE artifacts_default PARTITION OF artifacts DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_artifacts_job ON artifacts (tenant_id, job_id); -CREATE INDEX ix_artifacts_run ON artifacts (tenant_id, run_id) WHERE run_id IS NOT NULL; -CREATE INDEX ix_artifacts_type ON artifacts (tenant_id, artifact_type, created_at DESC); -CREATE INDEX ix_artifacts_digest ON artifacts (tenant_id, digest); +CREATE INDEX IF NOT EXISTS ix_artifacts_job ON artifacts (tenant_id, job_id); +CREATE INDEX IF NOT EXISTS ix_artifacts_run ON artifacts (tenant_id, run_id) WHERE run_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_artifacts_type ON artifacts (tenant_id, artifact_type, created_at DESC); +CREATE INDEX IF NOT EXISTS ix_artifacts_digest ON artifacts (tenant_id, digest); -- Quotas: Rate-limit and concurrency controls -CREATE TABLE quotas ( +CREATE TABLE IF NOT EXISTS quotas ( quota_id UUID NOT NULL, tenant_id TEXT NOT NULL, job_type TEXT, @@ -233,13 +252,15 @@ CREATE TABLE quotas ( CONSTRAINT ck_quotas_refill_positive CHECK (refill_rate > 0) ) PARTITION BY LIST (tenant_id); -CREATE TABLE quotas_default PARTITION OF quotas DEFAULT; +DO $$ BEGIN + CREATE TABLE quotas_default PARTITION OF quotas DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_quotas_type ON quotas (tenant_id, job_type); -CREATE INDEX ix_quotas_paused ON quotas (tenant_id, paused) WHERE paused = TRUE; +CREATE INDEX IF NOT EXISTS ix_quotas_type ON quotas (tenant_id, job_type); +CREATE INDEX IF NOT EXISTS ix_quotas_paused ON quotas (tenant_id, paused) WHERE paused = TRUE; -- Schedules: Cron-based job triggers -CREATE TABLE schedules ( +CREATE TABLE IF NOT EXISTS schedules ( schedule_id UUID NOT NULL, tenant_id TEXT NOT NULL, project_id TEXT, @@ -264,14 +285,16 @@ CREATE TABLE schedules ( CONSTRAINT ck_schedules_max_attempts_positive CHECK (max_attempts >= 1) ) PARTITION BY LIST (tenant_id); -CREATE TABLE schedules_default PARTITION OF schedules DEFAULT; +DO $$ BEGIN + CREATE TABLE schedules_default PARTITION OF schedules DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_schedules_enabled ON schedules (tenant_id, enabled, next_trigger_at) WHERE enabled = TRUE; -CREATE INDEX ix_schedules_next_trigger ON schedules (tenant_id, next_trigger_at) WHERE enabled = TRUE AND next_trigger_at IS NOT NULL; -CREATE INDEX ix_schedules_source ON schedules (tenant_id, source_id); +CREATE INDEX IF NOT EXISTS ix_schedules_enabled ON schedules (tenant_id, enabled, next_trigger_at) WHERE enabled = TRUE; +CREATE INDEX IF NOT EXISTS ix_schedules_next_trigger ON schedules (tenant_id, next_trigger_at) WHERE enabled = TRUE AND next_trigger_at IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_schedules_source ON schedules (tenant_id, source_id); -- Incidents: Operational alerts and escalations -CREATE TABLE incidents ( +CREATE TABLE IF NOT EXISTS incidents ( incident_id UUID NOT NULL, tenant_id TEXT NOT NULL, incident_type TEXT NOT NULL, @@ -292,14 +315,16 @@ CREATE TABLE incidents ( CONSTRAINT ck_incidents_severity CHECK (severity IN ('warning', 'critical')) ) PARTITION BY LIST (tenant_id); -CREATE TABLE incidents_default PARTITION OF incidents DEFAULT; +DO $$ BEGIN + CREATE TABLE incidents_default PARTITION OF incidents DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_incidents_status ON incidents (tenant_id, status, created_at DESC); -CREATE INDEX ix_incidents_type ON incidents (tenant_id, incident_type, status); -CREATE INDEX ix_incidents_open ON incidents (tenant_id, severity, created_at DESC) WHERE status = 'open'; +CREATE INDEX IF NOT EXISTS ix_incidents_status ON incidents (tenant_id, status, created_at DESC); +CREATE INDEX IF NOT EXISTS ix_incidents_type ON incidents (tenant_id, incident_type, status); +CREATE INDEX IF NOT EXISTS ix_incidents_open ON incidents (tenant_id, severity, created_at DESC) WHERE status = 'open'; -- Throttles: Dynamic rate-limit overrides (pause/resume per source or job type) -CREATE TABLE throttles ( +CREATE TABLE IF NOT EXISTS throttles ( throttle_id UUID NOT NULL, tenant_id TEXT NOT NULL, source_id UUID, @@ -314,10 +339,10 @@ CREATE TABLE throttles ( CONSTRAINT ck_throttles_scope CHECK (source_id IS NOT NULL OR job_type IS NOT NULL) ) PARTITION BY LIST (tenant_id); -CREATE TABLE throttles_default PARTITION OF throttles DEFAULT; +DO $$ BEGIN + CREATE TABLE throttles_default PARTITION OF throttles DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_throttles_active ON throttles (tenant_id, active, expires_at) WHERE active = TRUE; -CREATE INDEX ix_throttles_source ON throttles (tenant_id, source_id) WHERE source_id IS NOT NULL; -CREATE INDEX ix_throttles_type ON throttles (tenant_id, job_type) WHERE job_type IS NOT NULL; - -COMMIT; +CREATE INDEX IF NOT EXISTS ix_throttles_active ON throttles (tenant_id, active, expires_at) WHERE active = TRUE; +CREATE INDEX IF NOT EXISTS ix_throttles_source ON throttles (tenant_id, source_id) WHERE source_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_throttles_type ON throttles (tenant_id, job_type) WHERE job_type IS NOT NULL; diff --git a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/002_backfill.sql b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/002_backfill.sql index ef4d7e194..e886e9747 100644 --- a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/002_backfill.sql +++ b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/002_backfill.sql @@ -1,22 +1,23 @@ -- 002_backfill.sql -- Backfill and watermark tables for event-time window tracking (ORCH-SVC-33-003) -- Adds watermarks, backfill_requests, and processed_events for duplicate suppression. - -BEGIN; +-- All statements are idempotent so the migration is safe on pre-existing databases. -- Backfill request status -CREATE TYPE backfill_status AS ENUM ( - 'pending', - 'validating', - 'running', - 'paused', - 'completed', - 'failed', - 'canceled' -); +DO $$ BEGIN + CREATE TYPE backfill_status AS ENUM ( + 'pending', + 'validating', + 'running', + 'paused', + 'completed', + 'failed', + 'canceled' + ); +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -- Watermarks: Per-source/job-type event-time cursors -CREATE TABLE watermarks ( +CREATE TABLE IF NOT EXISTS watermarks ( watermark_id UUID NOT NULL, tenant_id TEXT NOT NULL, source_id UUID, @@ -35,13 +36,15 @@ CREATE TABLE watermarks ( CONSTRAINT ck_watermarks_hash_hex CHECK (last_batch_hash IS NULL OR last_batch_hash ~ '^[0-9a-f]{64}$') ) PARTITION BY LIST (tenant_id); -CREATE TABLE watermarks_default PARTITION OF watermarks DEFAULT; +DO $$ BEGIN + CREATE TABLE watermarks_default PARTITION OF watermarks DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_watermarks_source ON watermarks (tenant_id, source_id) WHERE source_id IS NOT NULL; -CREATE INDEX ix_watermarks_job_type ON watermarks (tenant_id, job_type) WHERE job_type IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_watermarks_source ON watermarks (tenant_id, source_id) WHERE source_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_watermarks_job_type ON watermarks (tenant_id, job_type) WHERE job_type IS NOT NULL; -- Backfill Requests: Batch reprocessing operations -CREATE TABLE backfill_requests ( +CREATE TABLE IF NOT EXISTS backfill_requests ( backfill_id UUID NOT NULL, tenant_id TEXT NOT NULL, source_id UUID, @@ -79,14 +82,16 @@ CREATE TABLE backfill_requests ( CONSTRAINT ck_backfill_batch_size CHECK (batch_size > 0 AND batch_size <= 10000) ) PARTITION BY LIST (tenant_id); -CREATE TABLE backfill_requests_default PARTITION OF backfill_requests DEFAULT; +DO $$ BEGIN + CREATE TABLE backfill_requests_default PARTITION OF backfill_requests DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_backfill_status ON backfill_requests (tenant_id, status, created_at DESC); -CREATE INDEX ix_backfill_scope ON backfill_requests (tenant_id, scope_key, created_at DESC); -CREATE INDEX ix_backfill_running ON backfill_requests (tenant_id, source_id, job_type) WHERE status IN ('running', 'validating'); +CREATE INDEX IF NOT EXISTS ix_backfill_status ON backfill_requests (tenant_id, status, created_at DESC); +CREATE INDEX IF NOT EXISTS ix_backfill_scope ON backfill_requests (tenant_id, scope_key, created_at DESC); +CREATE INDEX IF NOT EXISTS ix_backfill_running ON backfill_requests (tenant_id, source_id, job_type) WHERE status IN ('running', 'validating'); -- Processed Events: Duplicate suppression tracking (TTL-managed) -CREATE TABLE processed_events ( +CREATE TABLE IF NOT EXISTS processed_events ( tenant_id TEXT NOT NULL, scope_key TEXT NOT NULL, event_key TEXT NOT NULL, -- Unique identifier for deduplication @@ -97,14 +102,16 @@ CREATE TABLE processed_events ( CONSTRAINT pk_processed_events PRIMARY KEY (tenant_id, scope_key, event_key) ) PARTITION BY LIST (tenant_id); -CREATE TABLE processed_events_default PARTITION OF processed_events DEFAULT; +DO $$ BEGIN + CREATE TABLE processed_events_default PARTITION OF processed_events DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_processed_events_expires ON processed_events (expires_at) WHERE expires_at < NOW() + INTERVAL '1 day'; -CREATE INDEX ix_processed_events_time ON processed_events (tenant_id, scope_key, event_time DESC); -CREATE INDEX ix_processed_events_batch ON processed_events (tenant_id, batch_id) WHERE batch_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_processed_events_expires ON processed_events (expires_at); +CREATE INDEX IF NOT EXISTS ix_processed_events_time ON processed_events (tenant_id, scope_key, event_time DESC); +CREATE INDEX IF NOT EXISTS ix_processed_events_batch ON processed_events (tenant_id, batch_id) WHERE batch_id IS NOT NULL; -- Backfill Checkpoints: Resumable batch processing state -CREATE TABLE backfill_checkpoints ( +CREATE TABLE IF NOT EXISTS backfill_checkpoints ( checkpoint_id UUID NOT NULL, tenant_id TEXT NOT NULL, backfill_id UUID NOT NULL, @@ -126,9 +133,11 @@ CREATE TABLE backfill_checkpoints ( CONSTRAINT ck_backfill_checkpoints_hash_hex CHECK (batch_hash IS NULL OR batch_hash ~ '^[0-9a-f]{64}$') ) PARTITION BY LIST (tenant_id); -CREATE TABLE backfill_checkpoints_default PARTITION OF backfill_checkpoints DEFAULT; +DO $$ BEGIN + CREATE TABLE backfill_checkpoints_default PARTITION OF backfill_checkpoints DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_backfill_checkpoints_request ON backfill_checkpoints (tenant_id, backfill_id, batch_number); +CREATE INDEX IF NOT EXISTS ix_backfill_checkpoints_request ON backfill_checkpoints (tenant_id, backfill_id, batch_number); -- Function to clean up expired processed events (called by background job) CREATE OR REPLACE FUNCTION cleanup_expired_processed_events(batch_limit INTEGER DEFAULT 10000) @@ -150,5 +159,3 @@ BEGIN RETURN deleted_count; END; $$ LANGUAGE plpgsql; - -COMMIT; diff --git a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/003_dead_letter.sql b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/003_dead_letter.sql index ee3333a4d..073c75607 100644 --- a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/003_dead_letter.sql +++ b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/003_dead_letter.sql @@ -1,35 +1,38 @@ -- 003_dead_letter.sql -- Dead-letter store for failed jobs with error classification and replay (ORCH-SVC-33-004) -- Adds dead_letter_entries, replay_audit, and notification_rules tables. - -BEGIN; +-- All statements are idempotent so the migration is safe on pre-existing databases. -- Dead-letter entry status -CREATE TYPE dead_letter_status AS ENUM ( - 'pending', -- Awaiting operator action or auto-replay - 'replaying', -- Currently being replayed - 'replayed', -- Successfully replayed as new job - 'resolved', -- Manually resolved without replay - 'exhausted', -- All replay attempts exhausted - 'expired' -- Expired and eligible for purge -); +DO $$ BEGIN + CREATE TYPE dead_letter_status AS ENUM ( + 'pending', -- Awaiting operator action or auto-replay + 'replaying', -- Currently being replayed + 'replayed', -- Successfully replayed as new job + 'resolved', -- Manually resolved without replay + 'exhausted', -- All replay attempts exhausted + 'expired' -- Expired and eligible for purge + ); +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -- Error classification category -CREATE TYPE error_category AS ENUM ( - 'unknown', -- Unclassified error - 'transient', -- Transient infrastructure error - 'not_found', -- Resource not found - 'auth_failure', -- Authentication/authorization failure - 'rate_limited', -- Rate limiting or quota exceeded - 'validation_error', -- Invalid input or configuration - 'upstream_error', -- External service error - 'internal_error', -- Internal processing error - 'conflict', -- Resource conflict - 'canceled' -- Operation canceled -); +DO $$ BEGIN + CREATE TYPE error_category AS ENUM ( + 'unknown', -- Unclassified error + 'transient', -- Transient infrastructure error + 'not_found', -- Resource not found + 'auth_failure', -- Authentication/authorization failure + 'rate_limited', -- Rate limiting or quota exceeded + 'validation_error', -- Invalid input or configuration + 'upstream_error', -- External service error + 'internal_error', -- Internal processing error + 'conflict', -- Resource conflict + 'canceled' -- Operation canceled + ); +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -- Dead-letter Entries: Failed jobs awaiting remediation -CREATE TABLE dead_letter_entries ( +CREATE TABLE IF NOT EXISTS dead_letter_entries ( entry_id UUID NOT NULL, tenant_id TEXT NOT NULL, -- Original job reference @@ -69,21 +72,23 @@ CREATE TABLE dead_letter_entries ( CONSTRAINT ck_dead_letter_attempts CHECK (replay_attempts >= 0 AND replay_attempts <= max_replay_attempts + 1) ) PARTITION BY LIST (tenant_id); -CREATE TABLE dead_letter_entries_default PARTITION OF dead_letter_entries DEFAULT; +DO $$ BEGIN + CREATE TABLE dead_letter_entries_default PARTITION OF dead_letter_entries DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -- Indexes for common query patterns -CREATE INDEX ix_dead_letter_status ON dead_letter_entries (tenant_id, status, created_at DESC); -CREATE INDEX ix_dead_letter_job ON dead_letter_entries (tenant_id, original_job_id); -CREATE INDEX ix_dead_letter_job_type ON dead_letter_entries (tenant_id, job_type, status, created_at DESC); -CREATE INDEX ix_dead_letter_category ON dead_letter_entries (tenant_id, category, status); -CREATE INDEX ix_dead_letter_error_code ON dead_letter_entries (tenant_id, error_code, status); -CREATE INDEX ix_dead_letter_expires ON dead_letter_entries (expires_at) WHERE status NOT IN ('replayed', 'resolved', 'exhausted'); -CREATE INDEX ix_dead_letter_source ON dead_letter_entries (tenant_id, source_id, status) WHERE source_id IS NOT NULL; -CREATE INDEX ix_dead_letter_run ON dead_letter_entries (tenant_id, run_id, status) WHERE run_id IS NOT NULL; -CREATE INDEX ix_dead_letter_retryable ON dead_letter_entries (tenant_id, is_retryable, status) WHERE is_retryable = TRUE AND status = 'pending'; +CREATE INDEX IF NOT EXISTS ix_dead_letter_status ON dead_letter_entries (tenant_id, status, created_at DESC); +CREATE INDEX IF NOT EXISTS ix_dead_letter_job ON dead_letter_entries (tenant_id, original_job_id); +CREATE INDEX IF NOT EXISTS ix_dead_letter_job_type ON dead_letter_entries (tenant_id, job_type, status, created_at DESC); +CREATE INDEX IF NOT EXISTS ix_dead_letter_category ON dead_letter_entries (tenant_id, category, status); +CREATE INDEX IF NOT EXISTS ix_dead_letter_error_code ON dead_letter_entries (tenant_id, error_code, status); +CREATE INDEX IF NOT EXISTS ix_dead_letter_expires ON dead_letter_entries (expires_at) WHERE status NOT IN ('replayed', 'resolved', 'exhausted'); +CREATE INDEX IF NOT EXISTS ix_dead_letter_source ON dead_letter_entries (tenant_id, source_id, status) WHERE source_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_dead_letter_run ON dead_letter_entries (tenant_id, run_id, status) WHERE run_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_dead_letter_retryable ON dead_letter_entries (tenant_id, is_retryable, status) WHERE is_retryable = TRUE AND status = 'pending'; -- Replay Audit: Track replay attempts for auditing and debugging -CREATE TABLE dead_letter_replay_audit ( +CREATE TABLE IF NOT EXISTS dead_letter_replay_audit ( audit_id UUID NOT NULL, tenant_id TEXT NOT NULL, entry_id UUID NOT NULL, @@ -104,13 +109,15 @@ CREATE TABLE dead_letter_replay_audit ( CONSTRAINT uq_dead_letter_replay_audit_attempt UNIQUE (tenant_id, entry_id, attempt_number) ) PARTITION BY LIST (tenant_id); -CREATE TABLE dead_letter_replay_audit_default PARTITION OF dead_letter_replay_audit DEFAULT; +DO $$ BEGIN + CREATE TABLE dead_letter_replay_audit_default PARTITION OF dead_letter_replay_audit DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_dead_letter_replay_audit_entry ON dead_letter_replay_audit (tenant_id, entry_id, attempt_number); -CREATE INDEX ix_dead_letter_replay_audit_job ON dead_letter_replay_audit (tenant_id, new_job_id) WHERE new_job_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_dead_letter_replay_audit_entry ON dead_letter_replay_audit (tenant_id, entry_id, attempt_number); +CREATE INDEX IF NOT EXISTS ix_dead_letter_replay_audit_job ON dead_letter_replay_audit (tenant_id, new_job_id) WHERE new_job_id IS NOT NULL; -- Notification Rules: Configure alerting for dead-letter events -CREATE TABLE dead_letter_notification_rules ( +CREATE TABLE IF NOT EXISTS dead_letter_notification_rules ( rule_id UUID NOT NULL, tenant_id TEXT NOT NULL, -- Filter criteria (all optional - match any if not specified) @@ -140,14 +147,16 @@ CREATE TABLE dead_letter_notification_rules ( CONSTRAINT ck_dead_letter_notification_max_per_hour CHECK (max_per_hour > 0) ) PARTITION BY LIST (tenant_id); -CREATE TABLE dead_letter_notification_rules_default PARTITION OF dead_letter_notification_rules DEFAULT; +DO $$ BEGIN + CREATE TABLE dead_letter_notification_rules_default PARTITION OF dead_letter_notification_rules DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_dead_letter_notification_rules_enabled ON dead_letter_notification_rules (tenant_id, enabled) WHERE enabled = TRUE; -CREATE INDEX ix_dead_letter_notification_rules_source ON dead_letter_notification_rules (tenant_id, source_id) WHERE source_id IS NOT NULL; -CREATE INDEX ix_dead_letter_notification_rules_category ON dead_letter_notification_rules (tenant_id, category) WHERE category IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_dead_letter_notification_rules_enabled ON dead_letter_notification_rules (tenant_id, enabled) WHERE enabled = TRUE; +CREATE INDEX IF NOT EXISTS ix_dead_letter_notification_rules_source ON dead_letter_notification_rules (tenant_id, source_id) WHERE source_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_dead_letter_notification_rules_category ON dead_letter_notification_rules (tenant_id, category) WHERE category IS NOT NULL; -- Notification Log: Track sent notifications for throttling and auditing -CREATE TABLE dead_letter_notification_log ( +CREATE TABLE IF NOT EXISTS dead_letter_notification_log ( log_id UUID NOT NULL, tenant_id TEXT NOT NULL, rule_id UUID NOT NULL, @@ -166,10 +175,12 @@ CREATE TABLE dead_letter_notification_log ( REFERENCES dead_letter_notification_rules (tenant_id, rule_id) ON DELETE CASCADE ) PARTITION BY LIST (tenant_id); -CREATE TABLE dead_letter_notification_log_default PARTITION OF dead_letter_notification_log DEFAULT; +DO $$ BEGIN + CREATE TABLE dead_letter_notification_log_default PARTITION OF dead_letter_notification_log DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_dead_letter_notification_log_rule ON dead_letter_notification_log (tenant_id, rule_id, sent_at DESC); -CREATE INDEX ix_dead_letter_notification_log_sent ON dead_letter_notification_log (tenant_id, sent_at DESC); +CREATE INDEX IF NOT EXISTS ix_dead_letter_notification_log_rule ON dead_letter_notification_log (tenant_id, rule_id, sent_at DESC); +CREATE INDEX IF NOT EXISTS ix_dead_letter_notification_log_sent ON dead_letter_notification_log (tenant_id, sent_at DESC); -- Dead-letter statistics view CREATE OR REPLACE VIEW dead_letter_stats AS @@ -274,5 +285,3 @@ BEGIN LIMIT p_limit; END; $$ LANGUAGE plpgsql STABLE; - -COMMIT; diff --git a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/004_slo_quotas.sql b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/004_slo_quotas.sql index e1b4ec394..ab6c5e522 100644 --- a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/004_slo_quotas.sql +++ b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/004_slo_quotas.sql @@ -11,7 +11,7 @@ CREATE TABLE IF NOT EXISTS slos ( job_type TEXT, source_id UUID, target DOUBLE PRECISION NOT NULL CHECK (target > 0 AND target <= 1), - window TEXT NOT NULL CHECK (window IN ('one_hour', 'one_day', 'seven_days', 'thirty_days')), + "window" TEXT NOT NULL CHECK ("window" IN ('one_hour', 'one_day', 'seven_days', 'thirty_days')), latency_percentile DOUBLE PRECISION CHECK (latency_percentile IS NULL OR (latency_percentile >= 0 AND latency_percentile <= 1)), latency_target_seconds DOUBLE PRECISION CHECK (latency_target_seconds IS NULL OR latency_target_seconds > 0), throughput_minimum INTEGER CHECK (throughput_minimum IS NULL OR throughput_minimum > 0), @@ -184,8 +184,11 @@ DECLARE BEGIN WITH deleted AS ( DELETE FROM slo_state_snapshots - WHERE computed_at < NOW() - (p_retention_days || ' days')::INTERVAL - LIMIT p_batch_limit + WHERE ctid IN ( + SELECT ctid FROM slo_state_snapshots + WHERE computed_at < NOW() - (p_retention_days || ' days')::INTERVAL + LIMIT p_batch_limit + ) RETURNING 1 ) SELECT COUNT(*) INTO deleted_count FROM deleted; @@ -204,8 +207,11 @@ DECLARE BEGIN WITH deleted AS ( DELETE FROM quota_audit_log - WHERE performed_at < NOW() - (p_retention_days || ' days')::INTERVAL - LIMIT p_batch_limit + WHERE ctid IN ( + SELECT ctid FROM quota_audit_log + WHERE performed_at < NOW() - (p_retention_days || ' days')::INTERVAL + LIMIT p_batch_limit + ) RETURNING 1 ) SELECT COUNT(*) INTO deleted_count FROM deleted; diff --git a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/005_audit_ledger.sql b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/005_audit_ledger.sql index 769948cbc..37cb539d4 100644 --- a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/005_audit_ledger.sql +++ b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/005_audit_ledger.sql @@ -376,8 +376,11 @@ DECLARE BEGIN WITH deleted AS ( DELETE FROM audit_entries - WHERE occurred_at < NOW() - (p_retention_days || ' days')::INTERVAL - LIMIT p_batch_limit + WHERE ctid IN ( + SELECT ctid FROM audit_entries + WHERE occurred_at < NOW() - (p_retention_days || ' days')::INTERVAL + LIMIT p_batch_limit + ) RETURNING 1 ) SELECT COUNT(*) INTO deleted_count FROM deleted; @@ -396,8 +399,11 @@ DECLARE BEGIN WITH deleted AS ( DELETE FROM run_ledger_entries - WHERE ledger_created_at < NOW() - (p_retention_days || ' days')::INTERVAL - LIMIT p_batch_limit + WHERE ctid IN ( + SELECT ctid FROM run_ledger_entries + WHERE ledger_created_at < NOW() - (p_retention_days || ' days')::INTERVAL + LIMIT p_batch_limit + ) RETURNING 1 ) SELECT COUNT(*) INTO deleted_count FROM deleted; diff --git a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/006_pack_runs.sql b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/006_pack_runs.sql index 27c63823b..98a3f6c80 100644 --- a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/006_pack_runs.sql +++ b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/006_pack_runs.sql @@ -1,22 +1,23 @@ -- 006_pack_runs.sql -- Pack run persistence and log streaming schema (ORCH-SVC-41/42-101) - -BEGIN; +-- All statements are idempotent so the migration is safe on pre-existing databases. -- Enum for pack run lifecycle -CREATE TYPE pack_run_status AS ENUM ( - 'pending', - 'scheduled', - 'leased', - 'running', - 'succeeded', - 'failed', - 'canceled', - 'timed_out' -); +DO $$ BEGIN + CREATE TYPE pack_run_status AS ENUM ( + 'pending', + 'scheduled', + 'leased', + 'running', + 'succeeded', + 'failed', + 'canceled', + 'timed_out' + ); +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -- Pack runs -CREATE TABLE pack_runs ( +CREATE TABLE IF NOT EXISTS pack_runs ( pack_run_id UUID NOT NULL, tenant_id TEXT NOT NULL, project_id TEXT, @@ -51,15 +52,17 @@ CREATE TABLE pack_runs ( CONSTRAINT ck_pack_runs_parameters_digest_hex CHECK (parameters_digest ~ '^[0-9a-f]{64}$') ) PARTITION BY LIST (tenant_id); -CREATE TABLE pack_runs_default PARTITION OF pack_runs DEFAULT; +DO $$ BEGIN + CREATE TABLE pack_runs_default PARTITION OF pack_runs DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_pack_runs_status ON pack_runs (tenant_id, status, priority DESC, created_at); -CREATE INDEX ix_pack_runs_pack ON pack_runs (tenant_id, pack_id, status, created_at DESC); -CREATE INDEX ix_pack_runs_not_before ON pack_runs (tenant_id, not_before) WHERE not_before IS NOT NULL; -CREATE INDEX ix_pack_runs_lease_until ON pack_runs (tenant_id, lease_until) WHERE status = 'leased' AND lease_until IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_pack_runs_status ON pack_runs (tenant_id, status, priority DESC, created_at); +CREATE INDEX IF NOT EXISTS ix_pack_runs_pack ON pack_runs (tenant_id, pack_id, status, created_at DESC); +CREATE INDEX IF NOT EXISTS ix_pack_runs_not_before ON pack_runs (tenant_id, not_before) WHERE not_before IS NOT NULL; +CREATE INDEX IF NOT EXISTS ix_pack_runs_lease_until ON pack_runs (tenant_id, lease_until) WHERE status = 'leased' AND lease_until IS NOT NULL; -- Pack run logs -CREATE TABLE pack_run_logs ( +CREATE TABLE IF NOT EXISTS pack_run_logs ( log_id UUID NOT NULL, tenant_id TEXT NOT NULL, pack_run_id UUID NOT NULL, @@ -70,12 +73,13 @@ CREATE TABLE pack_run_logs ( data JSONB, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), CONSTRAINT pk_pack_run_logs PRIMARY KEY (tenant_id, pack_run_id, sequence), - CONSTRAINT uq_pack_run_logs_log_id UNIQUE (log_id), + CONSTRAINT uq_pack_run_logs_log_id UNIQUE (tenant_id, log_id), CONSTRAINT fk_pack_run_logs_run FOREIGN KEY (tenant_id, pack_run_id) REFERENCES pack_runs (tenant_id, pack_run_id) ) PARTITION BY LIST (tenant_id); -CREATE TABLE pack_run_logs_default PARTITION OF pack_run_logs DEFAULT; +DO $$ BEGIN + CREATE TABLE pack_run_logs_default PARTITION OF pack_run_logs DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_pack_run_logs_level ON pack_run_logs (tenant_id, pack_run_id, log_level, sequence); -CREATE INDEX ix_pack_run_logs_created ON pack_run_logs (tenant_id, pack_run_id, created_at); -COMMIT; +CREATE INDEX IF NOT EXISTS ix_pack_run_logs_level ON pack_run_logs (tenant_id, pack_run_id, log_level, sequence); +CREATE INDEX IF NOT EXISTS ix_pack_run_logs_created ON pack_run_logs (tenant_id, pack_run_id, created_at); diff --git a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/007_pack_run_logs_integrity.sql b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/007_pack_run_logs_integrity.sql index fdf8522a1..7f1b7412c 100644 --- a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/007_pack_run_logs_integrity.sql +++ b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/007_pack_run_logs_integrity.sql @@ -1,10 +1,12 @@ -- Pack run log integrity (ORCH-GAPS-151-016, OR1/OR8/OR10) -- Adds canonical hash + size bytes to support tamper-evident streaming and audit linkage. -ALTER TABLE pack_run_logs - ADD COLUMN digest TEXT NOT NULL DEFAULT '', - ADD COLUMN size_bytes BIGINT NOT NULL DEFAULT 0; +-- Idempotent: skips if columns already exist. + +DO $$ BEGIN + ALTER TABLE pack_run_logs + ADD COLUMN digest TEXT NOT NULL DEFAULT '', + ADD COLUMN size_bytes BIGINT NOT NULL DEFAULT 0; +EXCEPTION WHEN duplicate_column THEN NULL; END $$; COMMENT ON COLUMN pack_run_logs.digest IS 'Canonical SHA-256 hash of log payload (tenant+packRun+sequence+level+source+message+data)'; COMMENT ON COLUMN pack_run_logs.size_bytes IS 'UTF-8 byte length of canonical log payload'; - -COMMIT; diff --git a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/008_first_signal_snapshots.sql b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/008_first_signal_snapshots.sql index 6e9ab8b1b..e5258cc05 100644 --- a/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/008_first_signal_snapshots.sql +++ b/src/JobEngine/StellaOps.JobEngine/StellaOps.JobEngine.Infrastructure/migrations/008_first_signal_snapshots.sql @@ -1,9 +1,8 @@ -- 008_first_signal_snapshots.sql -- First Signal snapshots for TTFS fast-path (SPRINT_0339_0001_0001_first_signal_api.md) +-- All statements are idempotent so the migration is safe on pre-existing databases. -BEGIN; - -CREATE TABLE first_signal_snapshots ( +CREATE TABLE IF NOT EXISTS first_signal_snapshots ( tenant_id TEXT NOT NULL, run_id UUID NOT NULL, job_id UUID NOT NULL, @@ -40,14 +39,14 @@ CREATE TABLE first_signal_snapshots ( CONSTRAINT pk_first_signal_snapshots PRIMARY KEY (tenant_id, run_id) ) PARTITION BY LIST (tenant_id); -CREATE TABLE first_signal_snapshots_default PARTITION OF first_signal_snapshots DEFAULT; +DO $$ BEGIN + CREATE TABLE first_signal_snapshots_default PARTITION OF first_signal_snapshots DEFAULT; +EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -CREATE INDEX ix_first_signal_snapshots_job ON first_signal_snapshots (tenant_id, job_id); -CREATE INDEX ix_first_signal_snapshots_updated ON first_signal_snapshots (tenant_id, updated_at DESC); +CREATE INDEX IF NOT EXISTS ix_first_signal_snapshots_job ON first_signal_snapshots (tenant_id, job_id); +CREATE INDEX IF NOT EXISTS ix_first_signal_snapshots_updated ON first_signal_snapshots (tenant_id, updated_at DESC); COMMENT ON TABLE first_signal_snapshots IS 'Per-run cached first-signal payload for TTFS fast path.'; COMMENT ON COLUMN first_signal_snapshots.kind IS 'Current signal kind.'; COMMENT ON COLUMN first_signal_snapshots.phase IS 'Current execution phase.'; COMMENT ON COLUMN first_signal_snapshots.signal_json IS 'Full first-signal payload for ETag and response mapping.'; - -COMMIT;