-- 002_backfill.sql -- Backfill and watermark tables for event-time window tracking (ORCH-SVC-33-003) -- Adds watermarks, backfill_requests, and processed_events for duplicate suppression. -- All statements are idempotent so the migration is safe on pre-existing databases. -- Backfill request status DO $$ BEGIN CREATE TYPE backfill_status AS ENUM ( 'pending', 'validating', 'running', 'paused', 'completed', 'failed', 'canceled' ); EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; -- Watermarks: Per-source/job-type event-time cursors CREATE TABLE IF NOT EXISTS watermarks ( watermark_id UUID NOT NULL, tenant_id TEXT NOT NULL, source_id UUID, job_type TEXT, scope_key TEXT NOT NULL, -- Normalized scope identifier high_watermark TIMESTAMPTZ NOT NULL, -- Latest processed event time low_watermark TIMESTAMPTZ, -- Earliest event time in current window sequence_number BIGINT NOT NULL DEFAULT 0, processed_count BIGINT NOT NULL DEFAULT 0, last_batch_hash CHAR(64), -- SHA-256 of last processed batch for integrity created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_by TEXT NOT NULL, CONSTRAINT pk_watermarks PRIMARY KEY (tenant_id, watermark_id), CONSTRAINT uq_watermarks_scope UNIQUE (tenant_id, scope_key), CONSTRAINT ck_watermarks_hash_hex CHECK (last_batch_hash IS NULL OR last_batch_hash ~ '^[0-9a-f]{64}$') ) PARTITION BY LIST (tenant_id); DO $$ BEGIN CREATE TABLE watermarks_default PARTITION OF watermarks DEFAULT; EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; CREATE INDEX IF NOT EXISTS ix_watermarks_source ON watermarks (tenant_id, source_id) WHERE source_id IS NOT NULL; CREATE INDEX IF NOT EXISTS ix_watermarks_job_type ON watermarks (tenant_id, job_type) WHERE job_type IS NOT NULL; -- Backfill Requests: Batch reprocessing operations CREATE TABLE IF NOT EXISTS backfill_requests ( backfill_id UUID NOT NULL, tenant_id TEXT NOT NULL, source_id UUID, job_type TEXT, scope_key TEXT NOT NULL, status backfill_status NOT NULL DEFAULT 'pending', -- Time window for backfill window_start TIMESTAMPTZ NOT NULL, window_end TIMESTAMPTZ NOT NULL, -- Progress tracking current_position TIMESTAMPTZ, total_events BIGINT, processed_events BIGINT NOT NULL DEFAULT 0, skipped_events BIGINT NOT NULL DEFAULT 0, -- Duplicates skipped failed_events BIGINT NOT NULL DEFAULT 0, -- Configuration batch_size INTEGER NOT NULL DEFAULT 100, dry_run BOOLEAN NOT NULL DEFAULT FALSE, force_reprocess BOOLEAN NOT NULL DEFAULT FALSE, -- Ignore duplicate suppression -- Safety validations estimated_duration INTERVAL, max_duration INTERVAL, safety_checks JSONB, -- Validation results -- Audit reason TEXT NOT NULL, ticket TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), started_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, created_by TEXT NOT NULL, updated_by TEXT NOT NULL, error_message TEXT, CONSTRAINT pk_backfill_requests PRIMARY KEY (tenant_id, backfill_id), CONSTRAINT ck_backfill_window_order CHECK (window_end > window_start), CONSTRAINT ck_backfill_batch_size CHECK (batch_size > 0 AND batch_size <= 10000) ) PARTITION BY LIST (tenant_id); DO $$ BEGIN CREATE TABLE backfill_requests_default PARTITION OF backfill_requests DEFAULT; EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; CREATE INDEX IF NOT EXISTS ix_backfill_status ON backfill_requests (tenant_id, status, created_at DESC); CREATE INDEX IF NOT EXISTS ix_backfill_scope ON backfill_requests (tenant_id, scope_key, created_at DESC); CREATE INDEX IF NOT EXISTS ix_backfill_running ON backfill_requests (tenant_id, source_id, job_type) WHERE status IN ('running', 'validating'); -- Processed Events: Duplicate suppression tracking (TTL-managed) CREATE TABLE IF NOT EXISTS processed_events ( tenant_id TEXT NOT NULL, scope_key TEXT NOT NULL, event_key TEXT NOT NULL, -- Unique identifier for deduplication event_time TIMESTAMPTZ NOT NULL, processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), batch_id UUID, -- Backfill batch or run ID expires_at TIMESTAMPTZ NOT NULL, -- TTL for automatic cleanup CONSTRAINT pk_processed_events PRIMARY KEY (tenant_id, scope_key, event_key) ) PARTITION BY LIST (tenant_id); DO $$ BEGIN CREATE TABLE processed_events_default PARTITION OF processed_events DEFAULT; EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; CREATE INDEX IF NOT EXISTS ix_processed_events_expires ON processed_events (expires_at); CREATE INDEX IF NOT EXISTS ix_processed_events_time ON processed_events (tenant_id, scope_key, event_time DESC); CREATE INDEX IF NOT EXISTS ix_processed_events_batch ON processed_events (tenant_id, batch_id) WHERE batch_id IS NOT NULL; -- Backfill Checkpoints: Resumable batch processing state CREATE TABLE IF NOT EXISTS backfill_checkpoints ( checkpoint_id UUID NOT NULL, tenant_id TEXT NOT NULL, backfill_id UUID NOT NULL, batch_number INTEGER NOT NULL, batch_start TIMESTAMPTZ NOT NULL, batch_end TIMESTAMPTZ NOT NULL, events_in_batch INTEGER NOT NULL, events_processed INTEGER NOT NULL DEFAULT 0, events_skipped INTEGER NOT NULL DEFAULT 0, events_failed INTEGER NOT NULL DEFAULT 0, batch_hash CHAR(64), started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), completed_at TIMESTAMPTZ, error_message TEXT, CONSTRAINT pk_backfill_checkpoints PRIMARY KEY (tenant_id, checkpoint_id), CONSTRAINT fk_backfill_checkpoints_request FOREIGN KEY (tenant_id, backfill_id) REFERENCES backfill_requests (tenant_id, backfill_id) ON DELETE CASCADE, CONSTRAINT uq_backfill_checkpoints_batch UNIQUE (tenant_id, backfill_id, batch_number), CONSTRAINT ck_backfill_checkpoints_hash_hex CHECK (batch_hash IS NULL OR batch_hash ~ '^[0-9a-f]{64}$') ) PARTITION BY LIST (tenant_id); DO $$ BEGIN CREATE TABLE backfill_checkpoints_default PARTITION OF backfill_checkpoints DEFAULT; EXCEPTION WHEN duplicate_object OR SQLSTATE '42P17' THEN NULL; END $$; CREATE INDEX IF NOT EXISTS ix_backfill_checkpoints_request ON backfill_checkpoints (tenant_id, backfill_id, batch_number); -- Function to clean up expired processed events (called by background job) CREATE OR REPLACE FUNCTION cleanup_expired_processed_events(batch_limit INTEGER DEFAULT 10000) RETURNS INTEGER AS $$ DECLARE deleted_count INTEGER; BEGIN WITH deleted AS ( DELETE FROM processed_events WHERE ctid IN ( SELECT ctid FROM processed_events WHERE expires_at < NOW() LIMIT batch_limit ) RETURNING 1 ) SELECT COUNT(*) INTO deleted_count FROM deleted; RETURN deleted_count; END; $$ LANGUAGE plpgsql;