feat: Implement Filesystem and MongoDB provenance writers for PackRun execution context
- Added `FilesystemPackRunProvenanceWriter` to write provenance manifests to the filesystem. - Introduced `MongoPackRunArtifactReader` to read artifacts from MongoDB. - Created `MongoPackRunProvenanceWriter` to store provenance manifests in MongoDB. - Developed unit tests for filesystem and MongoDB provenance writers. - Established `ITimelineEventStore` and `ITimelineIngestionService` interfaces for timeline event handling. - Implemented `TimelineIngestionService` to validate and persist timeline events with hashing. - Created PostgreSQL schema and migration scripts for timeline indexing. - Added dependency injection support for timeline indexer services. - Developed tests for timeline ingestion and schema validation.
This commit is contained in:
38
bench/reachability-benchmark/cases/py/django-ssti/case.yaml
Normal file
38
bench/reachability-benchmark/cases/py/django-ssti/case.yaml
Normal file
@@ -0,0 +1,38 @@
|
||||
id: "py-django-ssti:105"
|
||||
language: py
|
||||
project: django-ssti
|
||||
version: "1.0.0"
|
||||
description: "Django-like template rendering (autoescape off) reachable"
|
||||
entrypoints:
|
||||
- "POST /render"
|
||||
sinks:
|
||||
- id: "DjangoSSTI::render"
|
||||
path: "src/app.py::handle_request"
|
||||
kind: "http"
|
||||
location:
|
||||
file: src/app.py
|
||||
line: 5
|
||||
notes: "template replace without escaping"
|
||||
environment:
|
||||
os_image: "python:3.12-alpine"
|
||||
runtime:
|
||||
python: "3.12"
|
||||
source_date_epoch: 1730000000
|
||||
build:
|
||||
command: "./build/build.sh"
|
||||
source_date_epoch: 1730000000
|
||||
outputs:
|
||||
artifact_path: outputs/binary.tar.gz
|
||||
sbom_path: outputs/sbom.cdx.json
|
||||
coverage_path: outputs/coverage.json
|
||||
traces_dir: outputs/traces
|
||||
test:
|
||||
command: "./tests/run-tests.sh"
|
||||
expected_coverage:
|
||||
- outputs/coverage.json
|
||||
expected_traces:
|
||||
- outputs/traces/traces.json
|
||||
ground_truth:
|
||||
summary: "Template rendering reachable with autoescape off"
|
||||
evidence_files:
|
||||
- "../benchmark/truth/py-django-ssti.json"
|
||||
@@ -0,0 +1,8 @@
|
||||
case_id: "py-django-ssti:105"
|
||||
entries:
|
||||
http:
|
||||
- id: "POST /render"
|
||||
route: "/render"
|
||||
method: "POST"
|
||||
handler: "app.handle_request"
|
||||
description: "Template rendering with autoescape off"
|
||||
@@ -0,0 +1 @@
|
||||
# stdlib only
|
||||
Binary file not shown.
12
bench/reachability-benchmark/cases/py/django-ssti/src/app.py
Normal file
12
bench/reachability-benchmark/cases/py/django-ssti/src/app.py
Normal file
@@ -0,0 +1,12 @@
|
||||
"""Django-like template rendering with autoescape off (reachable)."""
|
||||
|
||||
def render(template: str, context: dict) -> str:
|
||||
# naive render; simulates autoescape off
|
||||
return template.replace("{{user}}", context.get("user", "guest"))
|
||||
|
||||
def handle_request(body):
|
||||
template = body.get("template") if isinstance(body, dict) else None
|
||||
if not isinstance(template, str):
|
||||
return {"status": 400, "body": "bad request"}
|
||||
rendered = render(template, {"user": "guest"})
|
||||
return {"status": 200, "body": rendered}
|
||||
@@ -0,0 +1,8 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
cd "$(dirname "$0")"
|
||||
export SOURCE_DATE_EPOCH=${SOURCE_DATE_EPOCH:-1730000000}
|
||||
export TZ=UTC
|
||||
export LC_ALL=C
|
||||
export PYTHONPATH="$(cd .. && pwd)/src"
|
||||
python test_reach.py
|
||||
@@ -0,0 +1,48 @@
|
||||
import json
|
||||
import pathlib
|
||||
from app import handle_request
|
||||
|
||||
ROOT = pathlib.Path(__file__).resolve().parent.parent
|
||||
OUT = ROOT / "outputs"
|
||||
TRACE_DIR = OUT / "traces"
|
||||
COVERAGE_FILE = OUT / "coverage.json"
|
||||
TRACE_FILE = TRACE_DIR / "traces.json"
|
||||
|
||||
def ensure_dirs():
|
||||
OUT.mkdir(parents=True, exist_ok=True)
|
||||
TRACE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def record_trace(entry, path_nodes):
|
||||
TRACE_FILE.write_text(
|
||||
json.dumps({
|
||||
"entry": entry,
|
||||
"path": path_nodes,
|
||||
"sink": "DjangoSSTI::render",
|
||||
"notes": "Template rendered (autoescape off)"
|
||||
}, indent=2)
|
||||
)
|
||||
|
||||
def record_coverage(file_path, lines):
|
||||
COVERAGE_FILE.write_text(
|
||||
json.dumps({
|
||||
"files": {
|
||||
file_path: {
|
||||
"lines_covered": lines,
|
||||
"lines_total": 38
|
||||
}
|
||||
}
|
||||
}, indent=2)
|
||||
)
|
||||
|
||||
def test_reach():
|
||||
ensure_dirs()
|
||||
res = handle_request({"template": "Hello {{user}}"})
|
||||
assert res["status"] == 200
|
||||
assert res["body"] == "Hello guest"
|
||||
record_trace("POST /render", ["app.py::handle_request", "render"])
|
||||
record_coverage("src/app.py", [3,4,5,7,8,9,10])
|
||||
(OUT / "SINK_REACHED").write_text("true")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_reach()
|
||||
@@ -0,0 +1,38 @@
|
||||
id: "py-fastapi-guarded:104"
|
||||
language: py
|
||||
project: fastapi-guarded
|
||||
version: "1.0.0"
|
||||
description: "FastAPI-like exec guarded by ALLOW_EXEC flag (unreachable by default)"
|
||||
entrypoints:
|
||||
- "POST /exec"
|
||||
sinks:
|
||||
- id: "FastApiGuarded::handle_request"
|
||||
path: "src/app.py::handle_request"
|
||||
kind: "process"
|
||||
location:
|
||||
file: src/app.py
|
||||
line: 7
|
||||
notes: "eval guarded by ALLOW_EXEC"
|
||||
environment:
|
||||
os_image: "python:3.12-alpine"
|
||||
runtime:
|
||||
python: "3.12"
|
||||
source_date_epoch: 1730000000
|
||||
build:
|
||||
command: "./build/build.sh"
|
||||
source_date_epoch: 1730000000
|
||||
outputs:
|
||||
artifact_path: outputs/binary.tar.gz
|
||||
sbom_path: outputs/sbom.cdx.json
|
||||
coverage_path: outputs/coverage.json
|
||||
traces_dir: outputs/traces
|
||||
test:
|
||||
command: "./tests/run-tests.sh"
|
||||
expected_coverage:
|
||||
- outputs/coverage.json
|
||||
expected_traces:
|
||||
- outputs/traces/traces.json
|
||||
ground_truth:
|
||||
summary: "Guard blocks eval unless ALLOW_EXEC=true"
|
||||
evidence_files:
|
||||
- "../benchmark/truth/py-fastapi-guarded.json"
|
||||
@@ -0,0 +1,8 @@
|
||||
case_id: "py-fastapi-guarded:104"
|
||||
entries:
|
||||
http:
|
||||
- id: "POST /exec"
|
||||
route: "/exec"
|
||||
method: "POST"
|
||||
handler: "app.handle_request"
|
||||
description: "Exec guarded by ALLOW_EXEC"
|
||||
@@ -0,0 +1 @@
|
||||
# stdlib only
|
||||
Binary file not shown.
@@ -0,0 +1,11 @@
|
||||
"""FastAPI-like handler with feature flag guarding exec."""
|
||||
|
||||
def handle_request(body, env=None):
|
||||
env = env or {}
|
||||
if env.get("ALLOW_EXEC") != "true":
|
||||
return {"status": 403, "body": "forbidden"}
|
||||
code = body.get("code") if isinstance(body, dict) else None
|
||||
if not isinstance(code, str):
|
||||
return {"status": 400, "body": "bad request"}
|
||||
result = eval(code)
|
||||
return {"status": 200, "body": str(result)}
|
||||
@@ -0,0 +1,8 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
cd "$(dirname "$0")"
|
||||
export SOURCE_DATE_EPOCH=${SOURCE_DATE_EPOCH:-1730000000}
|
||||
export TZ=UTC
|
||||
export LC_ALL=C
|
||||
export PYTHONPATH="$(cd .. && pwd)/src"
|
||||
python test_unreachable.py
|
||||
@@ -0,0 +1,47 @@
|
||||
import json
|
||||
import pathlib
|
||||
from app import handle_request
|
||||
|
||||
ROOT = pathlib.Path(__file__).resolve().parent.parent
|
||||
OUT = ROOT / "outputs"
|
||||
TRACE_DIR = OUT / "traces"
|
||||
COVERAGE_FILE = OUT / "coverage.json"
|
||||
TRACE_FILE = TRACE_DIR / "traces.json"
|
||||
|
||||
def ensure_dirs():
|
||||
OUT.mkdir(parents=True, exist_ok=True)
|
||||
TRACE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def record_trace(entry, path_nodes):
|
||||
TRACE_FILE.write_text(
|
||||
json.dumps({
|
||||
"entry": entry,
|
||||
"path": path_nodes,
|
||||
"sink": "FastApiGuarded::handle_request",
|
||||
"notes": "Guard blocked eval"
|
||||
}, indent=2)
|
||||
)
|
||||
|
||||
def record_coverage(file_path, lines):
|
||||
COVERAGE_FILE.write_text(
|
||||
json.dumps({
|
||||
"files": {
|
||||
file_path: {
|
||||
"lines_covered": lines,
|
||||
"lines_total": 40
|
||||
}
|
||||
}
|
||||
}, indent=2)
|
||||
)
|
||||
|
||||
def test_unreachable():
|
||||
ensure_dirs()
|
||||
res = handle_request({"code": "10/2"}, env={"ALLOW_EXEC": "false"})
|
||||
assert res["status"] == 403
|
||||
assert res["body"] == "forbidden"
|
||||
record_trace("POST /exec", ["app.py::handle_request", "guard: ALLOW_EXEC!=true"])
|
||||
record_coverage("src/app.py", [3,4,5,8,9,11])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_unreachable()
|
||||
@@ -0,0 +1,38 @@
|
||||
id: "py-flask-template:103"
|
||||
language: py
|
||||
project: flask-template
|
||||
version: "1.0.0"
|
||||
description: "Template rendering reachable via POST /render"
|
||||
entrypoints:
|
||||
- "POST /render"
|
||||
sinks:
|
||||
- id: "FlaskTemplate::render"
|
||||
path: "src/app.py::handle_request"
|
||||
kind: "http"
|
||||
location:
|
||||
file: src/app.py
|
||||
line: 5
|
||||
notes: "template replace on user input"
|
||||
environment:
|
||||
os_image: "python:3.12-alpine"
|
||||
runtime:
|
||||
python: "3.12"
|
||||
source_date_epoch: 1730000000
|
||||
build:
|
||||
command: "./build/build.sh"
|
||||
source_date_epoch: 1730000000
|
||||
outputs:
|
||||
artifact_path: outputs/binary.tar.gz
|
||||
sbom_path: outputs/sbom.cdx.json
|
||||
coverage_path: outputs/coverage.json
|
||||
traces_dir: outputs/traces
|
||||
test:
|
||||
command: "./tests/run-tests.sh"
|
||||
expected_coverage:
|
||||
- outputs/coverage.json
|
||||
expected_traces:
|
||||
- outputs/traces/traces.json
|
||||
ground_truth:
|
||||
summary: "Template rendering reachable"
|
||||
evidence_files:
|
||||
- "../benchmark/truth/py-flask-template.json"
|
||||
@@ -0,0 +1,8 @@
|
||||
case_id: "py-flask-template:103"
|
||||
entries:
|
||||
http:
|
||||
- id: "POST /render"
|
||||
route: "/render"
|
||||
method: "POST"
|
||||
handler: "app.handle_request"
|
||||
description: "Template rendering"
|
||||
@@ -0,0 +1 @@
|
||||
# stdlib only for this minimal case
|
||||
Binary file not shown.
@@ -0,0 +1,12 @@
|
||||
"""Minimal flask-like template rendering sink (reachable)."""
|
||||
|
||||
def render(template: str, context: dict) -> str:
|
||||
return template.replace("{{name}}", context.get("name", "guest"))
|
||||
|
||||
def handle_request(body):
|
||||
template = body.get("template") if isinstance(body, dict) else None
|
||||
if not isinstance(template, str):
|
||||
return {"status": 400, "body": "bad request"}
|
||||
rendered = render(template, {"name": "guest"})
|
||||
# Sink: returns rendered template (models potential SSTI)
|
||||
return {"status": 200, "body": rendered}
|
||||
@@ -0,0 +1,8 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
cd "$(dirname "$0")"
|
||||
export SOURCE_DATE_EPOCH=${SOURCE_DATE_EPOCH:-1730000000}
|
||||
export TZ=UTC
|
||||
export LC_ALL=C
|
||||
export PYTHONPATH="$(cd .. && pwd)/src"
|
||||
python test_reach.py
|
||||
@@ -0,0 +1,48 @@
|
||||
import json
|
||||
import pathlib
|
||||
from app import handle_request
|
||||
|
||||
ROOT = pathlib.Path(__file__).resolve().parent.parent
|
||||
OUT = ROOT / "outputs"
|
||||
TRACE_DIR = OUT / "traces"
|
||||
COVERAGE_FILE = OUT / "coverage.json"
|
||||
TRACE_FILE = TRACE_DIR / "traces.json"
|
||||
|
||||
def ensure_dirs():
|
||||
OUT.mkdir(parents=True, exist_ok=True)
|
||||
TRACE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def record_trace(entry, path_nodes):
|
||||
TRACE_FILE.write_text(
|
||||
json.dumps({
|
||||
"entry": entry,
|
||||
"path": path_nodes,
|
||||
"sink": "FlaskTemplate::render",
|
||||
"notes": "Template rendered"
|
||||
}, indent=2)
|
||||
)
|
||||
|
||||
def record_coverage(file_path, lines):
|
||||
COVERAGE_FILE.write_text(
|
||||
json.dumps({
|
||||
"files": {
|
||||
file_path: {
|
||||
"lines_covered": lines,
|
||||
"lines_total": 40
|
||||
}
|
||||
}
|
||||
}, indent=2)
|
||||
)
|
||||
|
||||
def test_reach():
|
||||
ensure_dirs()
|
||||
res = handle_request({"template": "Hello {{name}}"})
|
||||
assert res["status"] == 200
|
||||
assert res["body"] == "Hello guest"
|
||||
record_trace("POST /render", ["app.py::handle_request", "render"])
|
||||
record_coverage("src/app.py", [4,5,6,8,9,10,11])
|
||||
(OUT / "SINK_REACHED").write_text("true")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_reach()
|
||||
38
bench/reachability-benchmark/cases/py/guarded-exec/case.yaml
Normal file
38
bench/reachability-benchmark/cases/py/guarded-exec/case.yaml
Normal file
@@ -0,0 +1,38 @@
|
||||
id: "py-guarded-exec:102"
|
||||
language: py
|
||||
project: guarded-exec
|
||||
version: "1.0.0"
|
||||
description: "Python eval guarded by FEATURE_ENABLE flag; unreachable by default"
|
||||
entrypoints:
|
||||
- "POST /api/exec"
|
||||
sinks:
|
||||
- id: "PyGuardedExec::handle_request"
|
||||
path: "src/app.py::handle_request"
|
||||
kind: "process"
|
||||
location:
|
||||
file: src/app.py
|
||||
line: 7
|
||||
notes: "eval guarded by FEATURE_ENABLE"
|
||||
environment:
|
||||
os_image: "python:3.12-alpine"
|
||||
runtime:
|
||||
python: "3.12"
|
||||
source_date_epoch: 1730000000
|
||||
build:
|
||||
command: "./build/build.sh"
|
||||
source_date_epoch: 1730000000
|
||||
outputs:
|
||||
artifact_path: outputs/binary.tar.gz
|
||||
sbom_path: outputs/sbom.cdx.json
|
||||
coverage_path: outputs/coverage.json
|
||||
traces_dir: outputs/traces
|
||||
test:
|
||||
command: "./tests/run-tests.sh"
|
||||
expected_coverage:
|
||||
- outputs/coverage.json
|
||||
expected_traces:
|
||||
- outputs/traces/traces.json
|
||||
ground_truth:
|
||||
summary: "Guard blocks eval when FEATURE_ENABLE != 1"
|
||||
evidence_files:
|
||||
- "../benchmark/truth/py-guarded-exec.json"
|
||||
@@ -0,0 +1,8 @@
|
||||
case_id: "py-guarded-exec:102"
|
||||
entries:
|
||||
http:
|
||||
- id: "POST /api/exec"
|
||||
route: "/api/exec"
|
||||
method: "POST"
|
||||
handler: "app.handle_request"
|
||||
description: "Eval guarded by FEATURE_ENABLE"
|
||||
@@ -0,0 +1 @@
|
||||
# Intentionally empty; stdlib only.
|
||||
Binary file not shown.
@@ -0,0 +1,13 @@
|
||||
"""Python handler with feature-flag guard for eval sink."""
|
||||
|
||||
def handle_request(body, env=None):
|
||||
env = env or {}
|
||||
if env.get("FEATURE_ENABLE") != "1":
|
||||
return {"status": 403, "body": "disabled"}
|
||||
|
||||
code = body.get("code") if isinstance(body, dict) else None
|
||||
if not isinstance(code, str):
|
||||
return {"status": 400, "body": "bad request"}
|
||||
|
||||
result = eval(code)
|
||||
return {"status": 200, "body": str(result)}
|
||||
@@ -0,0 +1,8 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
cd "$(dirname "$0")"
|
||||
export SOURCE_DATE_EPOCH=${SOURCE_DATE_EPOCH:-1730000000}
|
||||
export TZ=UTC
|
||||
export LC_ALL=C
|
||||
export PYTHONPATH="$(cd .. && pwd)/src"
|
||||
python test_unreachable.py
|
||||
@@ -0,0 +1,48 @@
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
from app import handle_request
|
||||
|
||||
ROOT = pathlib.Path(__file__).resolve().parent.parent
|
||||
OUT = ROOT / "outputs"
|
||||
TRACE_DIR = OUT / "traces"
|
||||
COVERAGE_FILE = OUT / "coverage.json"
|
||||
TRACE_FILE = TRACE_DIR / "traces.json"
|
||||
|
||||
def ensure_dirs():
|
||||
OUT.mkdir(parents=True, exist_ok=True)
|
||||
TRACE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def record_trace(entry, path_nodes):
|
||||
TRACE_FILE.write_text(
|
||||
json.dumps({
|
||||
"entry": entry,
|
||||
"path": path_nodes,
|
||||
"sink": "PyGuardedExec::handle_request",
|
||||
"notes": "Guard blocked eval"
|
||||
}, indent=2)
|
||||
)
|
||||
|
||||
def record_coverage(file_path, lines):
|
||||
COVERAGE_FILE.write_text(
|
||||
json.dumps({
|
||||
"files": {
|
||||
file_path: {
|
||||
"lines_covered": lines,
|
||||
"lines_total": 34
|
||||
}
|
||||
}
|
||||
}, indent=2)
|
||||
)
|
||||
|
||||
def test_unreachable():
|
||||
ensure_dirs()
|
||||
res = handle_request({"code": "5*5"}, env={"FEATURE_ENABLE": "0"})
|
||||
assert res["status"] == 403
|
||||
assert res["body"] == "disabled"
|
||||
|
||||
record_trace("POST /api/exec", ["app.py::handle_request", "guard: FEATURE_ENABLE != 1"])
|
||||
record_coverage("src/app.py", [3,4,5,8,9,11])
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_unreachable()
|
||||
38
bench/reachability-benchmark/cases/py/unsafe-exec/case.yaml
Normal file
38
bench/reachability-benchmark/cases/py/unsafe-exec/case.yaml
Normal file
@@ -0,0 +1,38 @@
|
||||
id: "py-unsafe-exec:101"
|
||||
language: py
|
||||
project: unsafe-exec
|
||||
version: "1.0.0"
|
||||
description: "Python handler with reachable eval sink"
|
||||
entrypoints:
|
||||
- "POST /api/exec"
|
||||
sinks:
|
||||
- id: "PyUnsafeExec::handle_request"
|
||||
path: "src/app.py::handle_request"
|
||||
kind: "process"
|
||||
location:
|
||||
file: src/app.py
|
||||
line: 8
|
||||
notes: "eval on user input"
|
||||
environment:
|
||||
os_image: "python:3.12-alpine"
|
||||
runtime:
|
||||
python: "3.12"
|
||||
source_date_epoch: 1730000000
|
||||
build:
|
||||
command: "./build/build.sh"
|
||||
source_date_epoch: 1730000000
|
||||
outputs:
|
||||
artifact_path: outputs/binary.tar.gz
|
||||
sbom_path: outputs/sbom.cdx.json
|
||||
coverage_path: outputs/coverage.json
|
||||
traces_dir: outputs/traces
|
||||
test:
|
||||
command: "./tests/run-tests.sh"
|
||||
expected_coverage:
|
||||
- outputs/coverage.json
|
||||
expected_traces:
|
||||
- outputs/traces/traces.json
|
||||
ground_truth:
|
||||
summary: "Eval reachable via POST /api/exec"
|
||||
evidence_files:
|
||||
- "../benchmark/truth/py-unsafe-exec.json"
|
||||
@@ -0,0 +1,8 @@
|
||||
case_id: "py-unsafe-exec:101"
|
||||
entries:
|
||||
http:
|
||||
- id: "POST /api/exec"
|
||||
route: "/api/exec"
|
||||
method: "POST"
|
||||
handler: "app.handle_request"
|
||||
description: "Executes user code via eval"
|
||||
@@ -0,0 +1 @@
|
||||
# Intentionally empty; uses stdlib only.
|
||||
Binary file not shown.
10
bench/reachability-benchmark/cases/py/unsafe-exec/src/app.py
Normal file
10
bench/reachability-benchmark/cases/py/unsafe-exec/src/app.py
Normal file
@@ -0,0 +1,10 @@
|
||||
"""Minimal Python handler with an unsafe eval sink."""
|
||||
|
||||
def handle_request(body):
|
||||
code = body.get("code") if isinstance(body, dict) else None
|
||||
if not isinstance(code, str):
|
||||
return {"status": 400, "body": "bad request"}
|
||||
|
||||
# Sink: eval on user input (reachable)
|
||||
result = eval(code)
|
||||
return {"status": 200, "body": str(result)}
|
||||
@@ -0,0 +1,8 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
cd "$(dirname "$0")"
|
||||
export SOURCE_DATE_EPOCH=${SOURCE_DATE_EPOCH:-1730000000}
|
||||
export TZ=UTC
|
||||
export LC_ALL=C
|
||||
export PYTHONPATH="$(cd .. && pwd)/src"
|
||||
python test_reach.py
|
||||
@@ -0,0 +1,54 @@
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
from app import handle_request
|
||||
|
||||
ROOT = pathlib.Path(__file__).resolve().parent.parent
|
||||
OUT = ROOT / "outputs"
|
||||
TRACE_DIR = OUT / "traces"
|
||||
COVERAGE_FILE = OUT / "coverage.json"
|
||||
TRACE_FILE = TRACE_DIR / "traces.json"
|
||||
|
||||
|
||||
def ensure_dirs():
|
||||
OUT.mkdir(parents=True, exist_ok=True)
|
||||
TRACE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
def record_trace(entry, path_nodes):
|
||||
TRACE_FILE.write_text(
|
||||
json.dumps({
|
||||
"entry": entry,
|
||||
"path": path_nodes,
|
||||
"sink": "PyUnsafeExec::handle_request",
|
||||
"notes": "Eval reached"
|
||||
}, indent=2)
|
||||
)
|
||||
|
||||
|
||||
def record_coverage(file_path, lines):
|
||||
COVERAGE_FILE.write_text(
|
||||
json.dumps({
|
||||
"files": {
|
||||
file_path: {
|
||||
"lines_covered": lines,
|
||||
"lines_total": 30
|
||||
}
|
||||
}
|
||||
}, indent=2)
|
||||
)
|
||||
|
||||
|
||||
def test_reach():
|
||||
ensure_dirs()
|
||||
res = handle_request({"code": "3*7"})
|
||||
assert res["status"] == 200
|
||||
assert res["body"] == "21"
|
||||
|
||||
record_trace("POST /api/exec", ["app.py::handle_request", "eval(code)"])
|
||||
record_coverage("src/app.py", [3, 4, 5, 8, 10])
|
||||
(OUT / "SINK_REACHED").write_text("true")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_reach()
|
||||
Reference in New Issue
Block a user