Files
git.stella-ops.org/tools/slntools/nuget_vuln_checker.py
StellaOps Bot 3acc0ef0cd save progress
2025-12-28 03:09:52 +02:00

621 lines
22 KiB
Python

#!/usr/bin/env python3
"""
StellaOps NuGet Vulnerability Checker.
Scans NuGet packages for security vulnerabilities and suggests/applies fixes.
Usage:
python nuget_vuln_checker.py [OPTIONS]
Options:
--solution PATH Path to .sln file (default: src/StellaOps.sln)
--min-severity LEVEL Minimum severity: low|moderate|high|critical (default: high)
--fix Auto-fix by updating to non-vulnerable versions
--dry-run Show what would be fixed without modifying files
--report PATH Write JSON report to file
--include-transitive Include transitive dependency vulnerabilities
--exclude PACKAGE Exclude package from checks (repeatable)
-v, --verbose Verbose output
Exit Codes:
0 - No vulnerabilities found (or all below threshold)
1 - Vulnerabilities found above threshold
2 - Error during execution
"""
import argparse
import json
import logging
import re
import shutil
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from lib.nuget_api import NuGetApiClient, NuGetApiError
from lib.vulnerability_models import (
SEVERITY_LEVELS,
VulnerabilityDetail,
VulnerabilityReport,
VulnerablePackage,
meets_severity_threshold,
)
from lib.version_utils import parse_version
logger = logging.getLogger(__name__)
def setup_logging(verbose: bool) -> None:
"""Configure logging based on verbosity."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(levelname)s: %(message)s",
)
def check_dotnet_available() -> bool:
"""Check if dotnet CLI is available."""
return shutil.which("dotnet") is not None
def run_vulnerability_check(
solution_path: Path, include_transitive: bool
) -> dict | None:
"""
Run dotnet list package --vulnerable and parse JSON output.
Returns parsed JSON or None if command fails.
"""
cmd = [
"dotnet",
"list",
str(solution_path),
"package",
"--vulnerable",
"--format",
"json",
"--output-version",
"1",
]
if include_transitive:
cmd.append("--include-transitive")
logger.info(f"Running: {' '.join(cmd)}")
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=600, # 10 minute timeout for large solutions
)
# dotnet always returns 0, even with vulnerabilities
if result.returncode != 0:
logger.error(f"dotnet command failed: {result.stderr}")
return None
# Parse JSON output
if not result.stdout.strip():
logger.warning("Empty output from dotnet list package")
return {"version": 1, "projects": []}
return json.loads(result.stdout)
except subprocess.TimeoutExpired:
logger.error("dotnet command timed out")
return None
except json.JSONDecodeError as e:
logger.error(f"Failed to parse dotnet output as JSON: {e}")
logger.debug(f"Output was: {result.stdout[:500]}...")
return None
except Exception as e:
logger.error(f"Error running dotnet command: {e}")
return None
def parse_vulnerability_output(
data: dict, min_severity: str, exclude_packages: set[str]
) -> list[VulnerablePackage]:
"""
Parse dotnet list package --vulnerable JSON output.
Returns list of VulnerablePackage objects that meet severity threshold.
"""
vulnerable_packages: dict[str, VulnerablePackage] = {}
for project in data.get("projects", []):
project_path = Path(project.get("path", "unknown"))
for framework in project.get("frameworks", []):
# Check both topLevelPackages and transitivePackages
for package_list_key in ["topLevelPackages", "transitivePackages"]:
for package in framework.get(package_list_key, []):
package_id = package.get("id", "")
# Skip excluded packages
if package_id.lower() in {p.lower() for p in exclude_packages}:
logger.debug(f"Skipping excluded package: {package_id}")
continue
vulns = package.get("vulnerabilities", [])
if not vulns:
continue
# Check if any vulnerability meets threshold
matching_vulns = []
for vuln in vulns:
severity = vuln.get("severity", "unknown")
if meets_severity_threshold(severity, min_severity):
matching_vulns.append(
VulnerabilityDetail(
severity=severity,
advisory_url=vuln.get("advisoryurl", ""),
)
)
if not matching_vulns:
continue
# Add or update vulnerable package
key = f"{package_id}@{package.get('resolvedVersion', '')}"
if key not in vulnerable_packages:
vulnerable_packages[key] = VulnerablePackage(
package_id=package_id,
resolved_version=package.get("resolvedVersion", ""),
requested_version=package.get("requestedVersion", ""),
vulnerabilities=matching_vulns,
)
vulnerable_packages[key].affected_projects.append(project_path)
return list(vulnerable_packages.values())
def find_suggested_fixes(
vulnerable_packages: list[VulnerablePackage],
api_client: NuGetApiClient | None,
) -> None:
"""
For each vulnerable package, find a suggested non-vulnerable version.
Modifies packages in-place to add suggested_version and fix_risk.
"""
if api_client is None:
logger.warning("NuGet API client not available, cannot suggest fixes")
return
for pkg in vulnerable_packages:
logger.debug(f"Finding safe version for {pkg.package_id} {pkg.resolved_version}")
try:
safe_version = api_client.find_safe_version(
pkg.package_id, pkg.resolved_version
)
if safe_version:
pkg.suggested_version = safe_version
pkg.fix_risk = api_client.get_fix_risk(
pkg.resolved_version, safe_version
)
logger.info(
f"Found safe version for {pkg.package_id}: "
f"{pkg.resolved_version} -> {safe_version} (risk: {pkg.fix_risk})"
)
else:
logger.warning(
f"No safe version found for {pkg.package_id} {pkg.resolved_version}"
)
except NuGetApiError as e:
logger.warning(f"Failed to query NuGet API for {pkg.package_id}: {e}")
def has_direct_package_reference(content: str, package_id: str) -> bool:
"""Check if the csproj has a direct PackageReference for the package."""
pattern = re.compile(
rf'<PackageReference\s+[^>]*Include\s*=\s*"{re.escape(package_id)}"',
re.IGNORECASE,
)
return pattern.search(content) is not None
def add_package_reference(content: str, package_id: str, version: str) -> str:
"""
Add a new PackageReference to a csproj file.
Inserts into an existing ItemGroup with PackageReferences, or creates a new one.
"""
# Find existing ItemGroup with PackageReferences
itemgroup_pattern = re.compile(
r'(<ItemGroup[^>]*>)(.*?<PackageReference\s)',
re.IGNORECASE | re.DOTALL,
)
match = itemgroup_pattern.search(content)
if match:
# Insert after the opening ItemGroup tag
insert_pos = match.end(1)
new_ref = f'\n <PackageReference Include="{package_id}" Version="{version}" />'
return content[:insert_pos] + new_ref + content[insert_pos:]
# No ItemGroup with PackageReferences found, look for any ItemGroup
any_itemgroup = re.search(r'(<ItemGroup[^>]*>)', content, re.IGNORECASE)
if any_itemgroup:
insert_pos = any_itemgroup.end(1)
new_ref = f'\n <PackageReference Include="{package_id}" Version="{version}" />'
return content[:insert_pos] + new_ref + content[insert_pos:]
# No ItemGroup at all, add before closing </Project>
project_close = content.rfind('</Project>')
if project_close > 0:
new_itemgroup = f'\n <ItemGroup>\n <PackageReference Include="{package_id}" Version="{version}" />\n </ItemGroup>\n'
return content[:project_close] + new_itemgroup + content[project_close:]
# Fallback - shouldn't happen for valid csproj
return content
def apply_fixes(
vulnerable_packages: list[VulnerablePackage],
dry_run: bool = False,
) -> int:
"""
Apply suggested fixes to csproj files.
For direct dependencies: updates the version in place.
For transitive dependencies: adds an explicit PackageReference to override.
Returns number of files modified.
"""
files_modified: set[Path] = set()
for pkg in vulnerable_packages:
if not pkg.suggested_version:
continue
for project_path in pkg.affected_projects:
if not project_path.exists():
logger.warning(f"Project file not found: {project_path}")
continue
try:
content = project_path.read_text(encoding="utf-8")
# Check if this is a direct or transitive dependency
is_direct = has_direct_package_reference(content, pkg.package_id)
if is_direct:
# Direct dependency - update version in place
if dry_run:
logger.info(
f"Would update {pkg.package_id} in {project_path.name}: "
f"{pkg.resolved_version} -> {pkg.suggested_version}"
)
files_modified.add(project_path)
continue
# Pattern to match PackageReference for this package
pattern = re.compile(
rf'(<PackageReference\s+[^>]*Include\s*=\s*"{re.escape(pkg.package_id)}"'
rf'[^>]*Version\s*=\s*"){re.escape(pkg.resolved_version)}(")',
re.IGNORECASE,
)
new_content, count = pattern.subn(
rf"\g<1>{pkg.suggested_version}\g<2>",
content,
)
if count > 0:
project_path.write_text(new_content, encoding="utf-8")
files_modified.add(project_path)
logger.info(
f"Updated {pkg.package_id} in {project_path.name}: "
f"{pkg.resolved_version} -> {pkg.suggested_version}"
)
else:
# Try alternative pattern (Version before Include)
pattern_alt = re.compile(
rf'(<PackageReference\s+[^>]*Version\s*=\s*"){re.escape(pkg.resolved_version)}"'
rf'([^>]*Include\s*=\s*"{re.escape(pkg.package_id)}")',
re.IGNORECASE,
)
new_content, count = pattern_alt.subn(
rf'\g<1>{pkg.suggested_version}"\g<2>',
content,
)
if count > 0:
project_path.write_text(new_content, encoding="utf-8")
files_modified.add(project_path)
logger.info(
f"Updated {pkg.package_id} in {project_path.name}: "
f"{pkg.resolved_version} -> {pkg.suggested_version}"
)
else:
logger.warning(
f"Could not find {pkg.package_id} {pkg.resolved_version} "
f"in {project_path}"
)
else:
# Transitive dependency - add explicit PackageReference to override
if dry_run:
logger.info(
f"Would add explicit PackageReference for transitive dependency "
f"{pkg.package_id} {pkg.suggested_version} in {project_path.name} "
f"(overrides vulnerable {pkg.resolved_version})"
)
files_modified.add(project_path)
continue
new_content = add_package_reference(
content, pkg.package_id, pkg.suggested_version
)
if new_content != content:
project_path.write_text(new_content, encoding="utf-8")
files_modified.add(project_path)
logger.info(
f"Added explicit PackageReference for {pkg.package_id} "
f"{pkg.suggested_version} in {project_path.name} "
f"(overrides vulnerable transitive {pkg.resolved_version})"
)
else:
logger.warning(
f"Failed to add PackageReference for {pkg.package_id} "
f"in {project_path}"
)
except Exception as e:
logger.error(f"Failed to update {project_path}: {e}")
return len(files_modified)
def generate_report(
solution: Path,
min_severity: str,
total_packages: int,
vulnerable_packages: list[VulnerablePackage],
) -> dict:
"""Generate JSON report of vulnerability scan."""
return {
"timestamp": datetime.now(timezone.utc).isoformat(),
"solution": str(solution),
"min_severity": min_severity,
"summary": {
"total_packages_scanned": total_packages,
"vulnerable_packages": len(vulnerable_packages),
"fixable_packages": sum(
1 for p in vulnerable_packages if p.suggested_version
),
"unfixable_packages": sum(
1 for p in vulnerable_packages if not p.suggested_version
),
},
"vulnerabilities": [
{
"package": pkg.package_id,
"current_version": pkg.resolved_version,
"severity": pkg.highest_severity,
"advisory_urls": pkg.advisory_urls,
"affected_projects": [str(p) for p in pkg.affected_projects],
"suggested_fix": {
"version": pkg.suggested_version,
"risk": pkg.fix_risk,
}
if pkg.suggested_version
else None,
}
for pkg in vulnerable_packages
],
"unfixable": [
{
"package": pkg.package_id,
"version": pkg.resolved_version,
"reason": "No non-vulnerable version available",
}
for pkg in vulnerable_packages
if not pkg.suggested_version
],
}
def print_summary(
vulnerable_packages: list[VulnerablePackage],
min_severity: str,
dry_run: bool,
fix_mode: bool,
) -> None:
"""Print a human-readable summary of findings."""
print("\n" + "=" * 70)
print("NuGet Vulnerability Scan Results")
print("=" * 70)
if not vulnerable_packages:
print(f"\nNo vulnerabilities found at or above '{min_severity}' severity.")
return
print(f"\nFound {len(vulnerable_packages)} vulnerable package(s):\n")
for pkg in sorted(vulnerable_packages, key=lambda p: (
-SEVERITY_LEVELS.get(p.highest_severity.lower(), 0),
p.package_id,
)):
severity_upper = pkg.highest_severity.upper()
print(f" [{severity_upper}] {pkg.package_id} {pkg.resolved_version}")
for vuln in pkg.vulnerabilities:
print(f" Advisory: {vuln.advisory_url}")
if pkg.suggested_version:
risk_str = f" (risk: {pkg.fix_risk})" if pkg.fix_risk != "unknown" else ""
print(f" Suggested fix: {pkg.suggested_version}{risk_str}")
else:
print(" No fix available")
print(f" Affected projects: {len(pkg.affected_projects)}")
for proj in pkg.affected_projects[:3]: # Show first 3
print(f" - {proj.name}")
if len(pkg.affected_projects) > 3:
print(f" - ... and {len(pkg.affected_projects) - 3} more")
print()
# Summary counts
fixable = sum(1 for p in vulnerable_packages if p.suggested_version)
unfixable = len(vulnerable_packages) - fixable
print("-" * 70)
print(f"Summary: {len(vulnerable_packages)} vulnerable, {fixable} fixable, {unfixable} unfixable")
if dry_run:
print("\n[DRY RUN - No files were modified]")
elif not fix_mode:
print("\nRun with --fix to apply suggested fixes, or --dry-run to preview changes")
def main() -> int:
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Check NuGet packages for security vulnerabilities",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument(
"--solution",
type=Path,
default=Path("src/StellaOps.sln"),
help="Path to .sln file (default: src/StellaOps.sln)",
)
parser.add_argument(
"--min-severity",
choices=["low", "moderate", "high", "critical"],
default="high",
help="Minimum severity to report (default: high)",
)
parser.add_argument(
"--fix",
action="store_true",
help="Auto-fix by updating to non-vulnerable versions",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be fixed without modifying files",
)
parser.add_argument(
"--report",
type=Path,
help="Write JSON report to file",
)
parser.add_argument(
"--include-transitive",
action="store_true",
help="Include transitive dependency vulnerabilities",
)
parser.add_argument(
"--exclude",
action="append",
dest="exclude_packages",
default=[],
help="Exclude package from checks (repeatable)",
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Verbose output",
)
args = parser.parse_args()
setup_logging(args.verbose)
# Validate solution path
solution_path = args.solution.resolve()
if not solution_path.exists():
logger.error(f"Solution file not found: {solution_path}")
return 2
# Check dotnet is available
if not check_dotnet_available():
logger.error("dotnet CLI not found. Please install .NET SDK.")
return 2
logger.info(f"Scanning solution: {solution_path}")
logger.info(f"Minimum severity: {args.min_severity}")
# Run vulnerability check
vuln_data = run_vulnerability_check(solution_path, args.include_transitive)
if vuln_data is None:
logger.error("Failed to run vulnerability check")
return 2
# Count total packages for reporting
total_packages = 0
for project in vuln_data.get("projects", []):
for framework in project.get("frameworks", []):
total_packages += len(framework.get("topLevelPackages", []))
if args.include_transitive:
total_packages += len(framework.get("transitivePackages", []))
# Parse vulnerabilities
exclude_set = set(args.exclude_packages)
vulnerable_packages = parse_vulnerability_output(
vuln_data, args.min_severity, exclude_set
)
logger.info(f"Found {len(vulnerable_packages)} vulnerable package(s)")
# Try to find suggested fixes via NuGet API
api_client = None
try:
api_client = NuGetApiClient()
find_suggested_fixes(vulnerable_packages, api_client)
except ImportError:
logger.warning(
"requests library not available, cannot suggest fixes. "
"Install with: pip install requests"
)
except Exception as e:
logger.warning(f"NuGet API initialization failed: {e}")
# Generate report
report = generate_report(
solution_path, args.min_severity, total_packages, vulnerable_packages
)
# Write report if requested
if args.report:
try:
args.report.write_text(
json.dumps(report, indent=2, default=str),
encoding="utf-8",
)
logger.info(f"Report written to: {args.report}")
except Exception as e:
logger.error(f"Failed to write report: {e}")
# Print summary
print_summary(vulnerable_packages, args.min_severity, args.dry_run, args.fix)
# Apply fixes if requested
if args.fix or args.dry_run:
files_modified = apply_fixes(vulnerable_packages, dry_run=args.dry_run)
if not args.dry_run:
print(f"\nModified {files_modified} file(s)")
# Exit with appropriate code
if vulnerable_packages:
return 1
return 0
if __name__ == "__main__":
sys.exit(main())