Add advisory data aggregation e2e tests proving pipeline produces queryable data

New test file verifying the full fetch→parse→map advisory pipeline:

Tier 1 (smoke, always runs):
- Source metrics: totalAdvisories > 0, lastSuccessAt populated, summary health
- Per-source freshness: syncCount, advisory counts
- Canonical API: paginated query, by-ID with source edges, CVE search
- Score distribution: endpoint works, counts sum correctly
- Cross-source: multiple distinct sources have data, multi-edge advisories

Tier 2 (gated behind E2E_ACTIVE_SYNC=1):
- Triggers KEV source sync, polls freshness until syncCount advances
- Verifies advisory count doesn't decrease, timestamp is recent

Resilience: All advisory-sources endpoints use getWithRetry() helper
that retries on 504/503 (gateway timeout during cold start). Tests
skip gracefully rather than fail when services are warming up.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
master
2026-03-31 23:10:52 +03:00
parent 513b0f7470
commit 5bb5596e2f

View File

@@ -0,0 +1,376 @@
/**
* Advisory Data Aggregation — End-to-End Tests
*
* Proves that advisory sources actually aggregate data through the full
* fetch → parse → map pipeline, not just that sync returns "accepted".
*
* Tier 1 (Blocks 1-3): Smoke tests verifying pre-existing data from prior syncs.
* Fast (~30s), API-only, no polling. Always passes if the stack has been synced.
*
* Tier 2 (Block 4): Active sync verification. Triggers a real sync on the KEV source
* (CISA Known Exploited Vulnerabilities), polls until completion, verifies data arrived.
* Gated behind E2E_ACTIVE_SYNC=1. Needs network access to www.cisa.gov.
*
* Prerequisites:
* - Main Stella Ops stack running
* - At least one advisory source has been synced previously (for Tier 1)
*/
import type { APIRequestContext } from '@playwright/test';
import { test, expect } from './live-auth.fixture';
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
async function pollUntil(
apiRequest: APIRequestContext,
url: string,
predicate: (body: any) => boolean,
opts: { intervalMs?: number; timeoutMs?: number; label?: string } = {},
): Promise<any> {
const interval = opts.intervalMs ?? 10_000;
const timeout = opts.timeoutMs ?? 180_000;
const label = opts.label ?? url;
const deadline = Date.now() + timeout;
while (Date.now() < deadline) {
const resp = await apiRequest.get(url);
if (resp.status() === 200) {
const body = await resp.json();
if (predicate(body)) return body;
}
await new Promise(r => setTimeout(r, interval));
}
const finalResp = await apiRequest.get(url);
const finalBody = await finalResp.json().catch(() => null);
throw new Error(
`pollUntil timed out after ${timeout}ms for ${label}. Last response: ${JSON.stringify(finalBody)}`,
);
}
/** Safe JSON parse from API response — returns null on non-200 or parse error */
async function safeJson(resp: any): Promise<any | null> {
if (resp.status() !== 200) return null;
try { return await resp.json(); } catch { return null; }
}
/** GET with retry on 504/503 — the advisory-sources endpoint can be slow after cold start */
async function getWithRetry(
apiRequest: APIRequestContext,
url: string,
maxRetries = 2,
): Promise<{ status: number; body: any }> {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
const resp = await apiRequest.get(url, { timeout: 45_000 });
const status = resp.status();
if (status !== 504 && status !== 503) {
const body = await resp.json().catch(() => null);
return { status, body };
}
if (attempt < maxRetries) {
await new Promise(r => setTimeout(r, 5_000)); // wait 5s before retry
}
}
return { status: 504, body: null };
}
// Shared state between tests in the same describe
let populatedSourceKey: string | null = null;
let firstCanonicalId: string | null = null;
let firstCanonicalCve: string | null = null;
// ---------------------------------------------------------------------------
// Block 1: Source Metrics Smoke
// ---------------------------------------------------------------------------
test.describe('Data Aggregation — Source Metrics Smoke', () => {
test('at least one source has totalAdvisories > 0', async ({ apiRequest }) => {
const { status, body } = await getWithRetry(apiRequest, '/api/v1/advisory-sources?includeDisabled=false');
if (status === 504) { test.skip(true, 'Advisory sources endpoint timed out (504)'); return; }
expect(status).toBe(200);
expect(body.items.length).toBeGreaterThan(0);
const withData = body.items.filter((s: any) => s.totalAdvisories > 0);
expect(
withData.length,
`Expected at least one source with totalAdvisories > 0, but all ${body.items.length} sources have 0. ` +
`Run a sync first: POST /api/v1/advisory-sources/{sourceId}/sync`,
).toBeGreaterThan(0);
// Save for later tests
populatedSourceKey = withData[0].sourceKey;
});
test('at least one source has a successful sync timestamp', async ({ apiRequest }) => {
const { status, body } = await getWithRetry(apiRequest, '/api/v1/advisory-sources?includeDisabled=false');
if (status === 504) { test.skip(true, 'Advisory sources endpoint timed out (504)'); return; }
expect(status).toBe(200);
const withTimestamp = body.items.filter(
(s: any) => s.lastSuccessAt && !isNaN(Date.parse(s.lastSuccessAt)),
);
expect(
withTimestamp.length,
'Expected at least one source with a valid lastSuccessAt timestamp',
).toBeGreaterThan(0);
});
test('source summary reflects non-zero health stats', async ({ apiRequest }) => {
const { status, body } = await getWithRetry(apiRequest, '/api/v1/advisory-sources/summary');
if (status === 504) { test.skip(true, 'Summary endpoint timed out (504)'); return; }
expect(status).toBe(200);
expect(body.totalSources).toBeGreaterThanOrEqual(1);
const activeCount =
(body.healthySources ?? 0) + (body.warningSources ?? 0) + (body.staleSources ?? 0);
expect(
activeCount,
'Expected healthy + warning + stale > 0 (some sources have freshness state)',
).toBeGreaterThan(0);
expect(body.dataAsOf).toBeTruthy();
});
test('per-source freshness endpoint returns data for a populated source', async ({ apiRequest }) => {
// If the first test didn't find a populated source, fetch again
if (!populatedSourceKey) {
const listResult = await getWithRetry(apiRequest, '/api/v1/advisory-sources?includeDisabled=false');
const withData = listResult.body?.items?.filter((s: any) => s.totalAdvisories > 0) ?? [];
if (withData.length === 0) {
test.skip(true, 'No populated sources available');
return;
}
populatedSourceKey = withData[0].sourceKey;
}
const { status, body } = await getWithRetry(
apiRequest, `/api/v1/advisory-sources/${populatedSourceKey}/freshness`,
);
if (status === 504) { test.skip(true, 'Freshness endpoint timed out (504)'); return; }
expect(status).toBe(200);
expect(body.source.totalAdvisories).toBeGreaterThan(0);
expect(body.syncCount).toBeGreaterThanOrEqual(1);
expect(body.dataAsOf).toBeTruthy();
});
});
// ---------------------------------------------------------------------------
// Block 2: Canonical API Queryability
// ---------------------------------------------------------------------------
test.describe('Data Aggregation — Canonical API Queryability', () => {
test('paginated canonical query returns advisories', async ({ apiRequest }) => {
const resp = await apiRequest.get('/api/v1/canonical?offset=0&limit=5');
// Canonical service may not be configured (503) — skip gracefully
if (resp.status() === 503 || resp.status() === 404) {
test.skip(true, `Canonical service not available (${resp.status()})`);
return;
}
expect(resp.status()).toBe(200);
const body = await resp.json();
if (body.totalCount === 0) {
test.skip(true, 'No canonical advisories in database (pipeline may not have run yet)');
return;
}
expect(body.items.length).toBeGreaterThan(0);
expect(body.items.length).toBeLessThanOrEqual(5);
const first = body.items[0];
expect(first.id).toBeTruthy();
expect(first.cve).toMatch(/CVE-\d{4}-\d+/);
expect(first.affectsKey).toBeTruthy();
expect(first.mergeHash).toBeTruthy();
expect(first.status).toBeTruthy();
expect(first.createdAt).toBeTruthy();
firstCanonicalId = first.id;
firstCanonicalCve = first.cve;
});
test('canonical advisory by ID returns full record with source edges', async ({ apiRequest }) => {
if (!firstCanonicalId) {
const listResp = await apiRequest.get('/api/v1/canonical?offset=0&limit=1');
const listBody = await safeJson(listResp);
if (!listBody?.items?.length) {
test.skip(true, 'No canonical advisories available');
return;
}
firstCanonicalId = listBody.items[0].id;
firstCanonicalCve = listBody.items[0].cve;
}
const resp = await apiRequest.get(`/api/v1/canonical/${firstCanonicalId}`);
if (resp.status() === 503) {
test.skip(true, 'Canonical service not available');
return;
}
expect(resp.status()).toBe(200);
const body = await resp.json();
expect(body.id).toBe(firstCanonicalId);
expect(body.cve).toBe(firstCanonicalCve);
expect(body.sourceEdges).toBeDefined();
expect(body.sourceEdges.length).toBeGreaterThanOrEqual(1);
const edge = body.sourceEdges[0];
expect(edge.sourceName).toBeTruthy();
expect(edge.sourceAdvisoryId).toBeTruthy();
});
test('CVE-based query returns matching advisory', async ({ apiRequest }) => {
if (!firstCanonicalCve) {
const listResp = await apiRequest.get('/api/v1/canonical?offset=0&limit=1');
const listBody = await safeJson(listResp);
if (!listBody?.items?.length) {
test.skip(true, 'No canonical advisories available');
return;
}
firstCanonicalCve = listBody.items[0].cve;
}
const resp = await apiRequest.get(`/api/v1/canonical?cve=${firstCanonicalCve}`);
if (resp.status() === 503) {
test.skip(true, 'Canonical service not available');
return;
}
expect(resp.status()).toBe(200);
const body = await resp.json();
expect(body.totalCount).toBeGreaterThanOrEqual(1);
expect(body.items[0].cve).toBe(firstCanonicalCve);
});
test('score distribution endpoint works', async ({ apiRequest }) => {
const resp = await apiRequest.get('/api/v1/scores/distribution');
if (resp.status() === 503 || resp.status() === 404) {
test.skip(true, `Score distribution endpoint not available (${resp.status()})`);
return;
}
expect(resp.status()).toBe(200);
const body = await resp.json();
expect(typeof body.totalCount).toBe('number');
if (body.totalCount > 0) {
const sum =
(body.highCount ?? 0) + (body.mediumCount ?? 0) +
(body.lowCount ?? 0) + (body.noneCount ?? 0);
expect(sum).toBe(body.totalCount);
}
});
});
// ---------------------------------------------------------------------------
// Block 3: Cross-Source Correlation
// ---------------------------------------------------------------------------
test.describe('Data Aggregation — Cross-Source Correlation', () => {
test('advisory sources with data span multiple distinct sources', async ({ apiRequest }) => {
const { status, body } = await getWithRetry(apiRequest, '/api/v1/advisory-sources?includeDisabled=false');
if (status === 504) { test.skip(true, 'Advisory sources endpoint timed out (504)'); return; }
expect(status).toBe(200);
const withData = body.items.filter((s: any) => s.totalAdvisories > 0);
const uniqueKeys = new Set(withData.map((s: any) => s.sourceKey));
expect(
uniqueKeys.size,
`Expected >= 2 distinct sources with advisory data, got ${uniqueKeys.size}: ${[...uniqueKeys].join(', ')}`,
).toBeGreaterThanOrEqual(2);
});
test('canonical advisory with multi-source edges (soft check)', async ({ apiRequest }) => {
const resp = await apiRequest.get('/api/v1/canonical?offset=0&limit=50');
if (resp.status() !== 200) {
// Canonical service not available — skip silently
return;
}
const body = await resp.json();
const multiEdge = body.items?.find((a: any) => a.sourceEdges?.length >= 2);
if (multiEdge) {
const sourceNames = multiEdge.sourceEdges.map((e: any) => e.sourceName);
const uniqueSources = new Set(sourceNames);
expect(
uniqueSources.size,
'Multi-edge advisory should have distinct source names',
).toBeGreaterThanOrEqual(2);
}
// Not finding a multi-edge advisory is OK — depends on CVE overlap
});
});
// ---------------------------------------------------------------------------
// Block 4: Active Sync Verification (gated)
// ---------------------------------------------------------------------------
test.describe('Data Aggregation — Active Sync (KEV)', () => {
const activeSyncEnabled = process.env['E2E_ACTIVE_SYNC'] === '1';
test.skip(!activeSyncEnabled, 'Active sync tests disabled (set E2E_ACTIVE_SYNC=1 to enable)');
test('kev sync completes and advisory count does not decrease', async ({ apiRequest }) => {
test.setTimeout(300_000); // 5 min for active sync
// Record baseline
const beforeResp = await apiRequest.get('/api/v1/advisory-sources/kev/freshness');
expect(beforeResp.status()).toBe(200);
const before = await beforeResp.json();
const beforeSyncCount = before.syncCount ?? 0;
const beforeAdvisories = before.source?.totalAdvisories ?? 0;
// Ensure source is enabled
await apiRequest.post('/api/v1/advisory-sources/kev/enable');
// Trigger sync
const syncResp = await apiRequest.post('/api/v1/advisory-sources/kev/sync');
expect(syncResp.status()).toBeLessThan(500);
const syncBody = await syncResp.json();
expect(['accepted', 'already_running']).toContain(syncBody.outcome);
// Poll until syncCount advances
const after = await pollUntil(
apiRequest,
'/api/v1/advisory-sources/kev/freshness',
(body) => (body.syncCount ?? 0) > beforeSyncCount,
{ intervalMs: 10_000, timeoutMs: 180_000, label: 'kev sync completion' },
);
// Verify advisory count did not decrease
expect(after.source.totalAdvisories).toBeGreaterThanOrEqual(beforeAdvisories);
});
test('kev freshness timestamp is recent after sync', async ({ apiRequest }) => {
const resp = await apiRequest.get('/api/v1/advisory-sources/kev/freshness');
expect(resp.status()).toBe(200);
const body = await resp.json();
if (body.lastSuccessAt) {
const lastSuccess = new Date(body.lastSuccessAt);
const fiveMinAgo = new Date(Date.now() - 5 * 60 * 1000);
expect(
lastSuccess.getTime(),
`lastSuccessAt should be within last 5 minutes, got ${body.lastSuccessAt}`,
).toBeGreaterThan(fiveMinAgo.getTime());
}
expect(
['healthy', 'warning'],
`freshnessStatus should be healthy or warning, got ${body.freshnessStatus}`,
).toContain(body.freshnessStatus);
});
});