Fix build and code structure improvements. New but essential UI functionality. CI improvements. Documentation improvements. AI module improvements.

This commit is contained in:
StellaOps Bot
2025-12-26 21:54:17 +02:00
parent 335ff7da16
commit c2b9cd8d1f
3717 changed files with 264714 additions and 48202 deletions

View File

@@ -0,0 +1,37 @@
"""
StellaOps Solution and NuGet Tools Library.
This package provides shared utilities for:
- Parsing .csproj files
- Generating .sln solution files
- Normalizing NuGet package versions
"""
from .models import CsprojProject, SolutionFolder, PackageUsage
from .version_utils import parse_version, compare_versions, is_stable, select_latest_stable
from .csproj_parser import find_all_csproj, parse_csproj, get_deterministic_guid
from .dependency_graph import build_dependency_graph, get_transitive_dependencies, classify_dependencies
from .sln_writer import generate_solution_content, build_folder_hierarchy
__all__ = [
# Models
"CsprojProject",
"SolutionFolder",
"PackageUsage",
# Version utilities
"parse_version",
"compare_versions",
"is_stable",
"select_latest_stable",
# Csproj parsing
"find_all_csproj",
"parse_csproj",
"get_deterministic_guid",
# Dependency graph
"build_dependency_graph",
"get_transitive_dependencies",
"classify_dependencies",
# Solution writer
"generate_solution_content",
"build_folder_hierarchy",
]

Binary file not shown.

View File

@@ -0,0 +1,276 @@
"""
Csproj file parsing utilities.
Provides functions to:
- Find all .csproj files in a directory tree
- Parse csproj files to extract project references and package references
- Generate deterministic GUIDs for projects
"""
import hashlib
import logging
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Optional
from .models import CsprojProject
logger = logging.getLogger(__name__)
# Default patterns to exclude when scanning for csproj files
DEFAULT_EXCLUDE_DIRS = {
"bin",
"obj",
"node_modules",
".git",
".vs",
".idea",
"third_party",
"packages",
".nuget",
".cache",
"Fixtures", # Test fixture files should not be in solutions
"TestData", # Test data files should not be in solutions
}
# Default file patterns to exclude (test fixtures, samples, etc.)
DEFAULT_EXCLUDE_PATTERNS = {
"*.Tests.Fixtures",
"*.Samples",
}
def get_deterministic_guid(path: Path, base_path: Optional[Path] = None) -> str:
"""
Generate a deterministic GUID from a path.
Uses SHA256 hash of the relative path to ensure consistency across runs.
Args:
path: Path to generate GUID for
base_path: Base path to calculate relative path from (optional)
Returns:
GUID string in uppercase format (e.g., "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX")
"""
if base_path:
try:
rel_path = path.relative_to(base_path)
except ValueError:
rel_path = path
else:
rel_path = path
# Normalize path separators and convert to lowercase for consistency
normalized = str(rel_path).replace("\\", "/").lower()
# Generate SHA256 hash
hash_bytes = hashlib.sha256(normalized.encode("utf-8")).digest()
# Format as GUID (use first 16 bytes)
guid_hex = hash_bytes[:16].hex().upper()
guid = f"{guid_hex[:8]}-{guid_hex[8:12]}-{guid_hex[12:16]}-{guid_hex[16:20]}-{guid_hex[20:32]}"
return guid
def find_all_csproj(
root_dir: Path,
exclude_dirs: Optional[set[str]] = None,
exclude_patterns: Optional[set[str]] = None,
) -> list[Path]:
"""
Find all .csproj files under a directory.
Args:
root_dir: Root directory to search
exclude_dirs: Directory names to exclude (defaults to bin, obj, etc.)
exclude_patterns: File name patterns to exclude
Returns:
List of absolute paths to .csproj files, sorted by path
"""
if exclude_dirs is None:
exclude_dirs = DEFAULT_EXCLUDE_DIRS
if exclude_patterns is None:
exclude_patterns = DEFAULT_EXCLUDE_PATTERNS
csproj_files: list[Path] = []
if not root_dir.exists():
logger.warning(f"Directory does not exist: {root_dir}")
return csproj_files
for item in root_dir.rglob("*.csproj"):
# Check if any parent directory should be excluded
skip = False
for parent in item.parents:
if parent.name in exclude_dirs:
skip = True
break
if skip:
continue
# Check file name patterns
for pattern in exclude_patterns:
if item.match(pattern):
skip = True
break
if skip:
continue
csproj_files.append(item.resolve())
return sorted(csproj_files)
def parse_csproj(
csproj_path: Path,
base_path: Optional[Path] = None,
) -> Optional[CsprojProject]:
"""
Parse a .csproj file and extract project information.
Args:
csproj_path: Path to the .csproj file
base_path: Base path for generating deterministic GUID
Returns:
CsprojProject with parsed information, or None if parsing fails
"""
if not csproj_path.exists():
logger.error(f"Csproj file does not exist: {csproj_path}")
return None
try:
tree = ET.parse(csproj_path)
root = tree.getroot()
except ET.ParseError as e:
logger.error(f"Failed to parse XML in {csproj_path}: {e}")
return None
# Extract project name from file name
name = csproj_path.stem
# Generate deterministic GUID
guid = get_deterministic_guid(csproj_path, base_path)
# Parse project references
project_references = _parse_project_references(root, csproj_path)
# Parse package references
package_references = _parse_package_references(root)
return CsprojProject(
path=csproj_path.resolve(),
name=name,
guid=guid,
project_references=project_references,
package_references=package_references,
)
def _parse_project_references(root: ET.Element, csproj_path: Path) -> list[Path]:
"""
Parse ProjectReference elements from csproj XML.
Args:
root: XML root element
csproj_path: Path to the csproj file (for resolving relative paths)
Returns:
List of resolved absolute paths to referenced projects
"""
references: list[Path] = []
csproj_dir = csproj_path.parent
# Handle both with and without namespace
for ref in root.iter():
if ref.tag.endswith("ProjectReference") or ref.tag == "ProjectReference":
include = ref.get("Include")
if include:
# Normalize path separators
include = include.replace("\\", "/")
# Resolve relative path
try:
ref_path = (csproj_dir / include).resolve()
if ref_path.exists():
references.append(ref_path)
else:
logger.warning(
f"Referenced project does not exist: {include} (from {csproj_path})"
)
except Exception as e:
logger.warning(f"Failed to resolve path {include}: {e}")
return references
def _parse_package_references(root: ET.Element) -> dict[str, str]:
"""
Parse PackageReference elements from csproj XML.
Args:
root: XML root element
Returns:
Dictionary mapping package name to version string
"""
packages: dict[str, str] = {}
for ref in root.iter():
if ref.tag.endswith("PackageReference") or ref.tag == "PackageReference":
include = ref.get("Include")
version = ref.get("Version")
if include and version:
packages[include] = version
elif include:
# Version might be in a child element
for child in ref:
if child.tag.endswith("Version") or child.tag == "Version":
if child.text:
packages[include] = child.text.strip()
break
return packages
def get_project_name_from_path(csproj_path: Path) -> str:
"""
Extract project name from csproj file path.
Args:
csproj_path: Path to csproj file
Returns:
Project name (file name without extension)
"""
return csproj_path.stem
def resolve_project_path(
include_path: str,
from_csproj: Path,
) -> Optional[Path]:
"""
Resolve a ProjectReference Include path to an absolute path.
Args:
include_path: The Include attribute value
from_csproj: The csproj file containing the reference
Returns:
Resolved absolute path, or None if resolution fails
"""
# Normalize path separators
include_path = include_path.replace("\\", "/")
try:
resolved = (from_csproj.parent / include_path).resolve()
return resolved if resolved.exists() else None
except Exception:
return None

View File

@@ -0,0 +1,282 @@
"""
Project dependency graph utilities.
Provides functions to:
- Build a dependency graph from parsed projects
- Get transitive dependencies
- Classify dependencies as internal or external to a module
"""
import logging
from pathlib import Path
from typing import Optional
from .models import CsprojProject
logger = logging.getLogger(__name__)
def build_dependency_graph(
projects: list[CsprojProject],
) -> dict[Path, set[Path]]:
"""
Build a dependency graph from a list of projects.
Args:
projects: List of parsed CsprojProject objects
Returns:
Dictionary mapping project path to set of dependency paths
"""
graph: dict[Path, set[Path]] = {}
for project in projects:
graph[project.path] = set(project.project_references)
return graph
def get_transitive_dependencies(
project_path: Path,
graph: dict[Path, set[Path]],
visited: Optional[set[Path]] = None,
) -> set[Path]:
"""
Get all transitive dependencies for a project.
Handles circular dependencies gracefully by tracking visited nodes.
Args:
project_path: Path to the project
graph: Dependency graph from build_dependency_graph
visited: Set of already visited paths (for cycle detection)
Returns:
Set of all transitive dependency paths
"""
if visited is None:
visited = set()
if project_path in visited:
return set() # Cycle detected
visited.add(project_path)
all_deps: set[Path] = set()
direct_deps = graph.get(project_path, set())
all_deps.update(direct_deps)
for dep in direct_deps:
transitive = get_transitive_dependencies(dep, graph, visited.copy())
all_deps.update(transitive)
return all_deps
def classify_dependencies(
project: CsprojProject,
module_dir: Path,
src_root: Path,
) -> dict[str, list[Path]]:
"""
Classify project dependencies as internal or external.
Args:
project: The project to analyze
module_dir: Root directory of the module
src_root: Root of the src/ directory
Returns:
Dictionary with keys:
- 'internal': Dependencies within module_dir
- '__Libraries': Dependencies from src/__Libraries/
- '<ModuleName>': Dependencies from other modules
"""
result: dict[str, list[Path]] = {"internal": []}
module_dir = module_dir.resolve()
src_root = src_root.resolve()
for ref_path in project.project_references:
ref_path = ref_path.resolve()
# Check if internal to module
try:
ref_path.relative_to(module_dir)
result["internal"].append(ref_path)
continue
except ValueError:
pass
# External - classify by source module
category = _get_external_category(ref_path, src_root)
if category not in result:
result[category] = []
result[category].append(ref_path)
return result
def _get_external_category(ref_path: Path, src_root: Path) -> str:
"""
Determine the category for an external dependency.
Args:
ref_path: Path to the referenced project
src_root: Root of the src/ directory
Returns:
Category name (e.g., '__Libraries', 'Authority', 'Scanner')
"""
try:
rel_path = ref_path.relative_to(src_root)
except ValueError:
# Outside of src/ - use 'External'
return "External"
parts = rel_path.parts
if len(parts) == 0:
return "External"
# First part is the module or __Libraries/__Tests etc.
first_part = parts[0]
if first_part == "__Libraries":
return "__Libraries"
elif first_part == "__Tests":
return "__Tests"
elif first_part == "__Analyzers":
return "__Analyzers"
else:
# It's a module name
return first_part
def collect_all_external_dependencies(
projects: list[CsprojProject],
module_dir: Path,
src_root: Path,
project_map: dict[Path, CsprojProject],
) -> dict[str, list[CsprojProject]]:
"""
Collect all external dependencies for a module's projects.
Includes transitive dependencies.
Args:
projects: List of projects in the module
module_dir: Root directory of the module
src_root: Root of the src/ directory
project_map: Map from path to CsprojProject for all known projects
Returns:
Dictionary mapping category to list of external CsprojProject objects
"""
# Build dependency graph for all known projects
all_projects = list(project_map.values())
graph = build_dependency_graph(all_projects)
module_dir = module_dir.resolve()
src_root = src_root.resolve()
# Collect all external dependencies
external_deps: dict[str, set[Path]] = {}
for project in projects:
# Get all transitive dependencies
all_deps = get_transitive_dependencies(project.path, graph)
for dep_path in all_deps:
dep_path = dep_path.resolve()
# Skip if internal to module
try:
dep_path.relative_to(module_dir)
continue
except ValueError:
pass
# External - classify
category = _get_external_category(dep_path, src_root)
if category not in external_deps:
external_deps[category] = set()
external_deps[category].add(dep_path)
# Convert paths to CsprojProject objects
result: dict[str, list[CsprojProject]] = {}
for category, paths in external_deps.items():
result[category] = []
for path in sorted(paths):
if path in project_map:
result[category].append(project_map[path])
else:
logger.warning(f"External dependency not in project map: {path}")
return result
def get_module_projects(
module_dir: Path,
all_projects: list[CsprojProject],
) -> list[CsprojProject]:
"""
Get all projects that belong to a module.
Args:
module_dir: Root directory of the module
all_projects: List of all projects
Returns:
List of projects within the module directory
"""
module_dir = module_dir.resolve()
result: list[CsprojProject] = []
for project in all_projects:
try:
project.path.relative_to(module_dir)
result.append(project)
except ValueError:
pass
return result
def detect_circular_dependencies(
graph: dict[Path, set[Path]],
) -> list[list[Path]]:
"""
Detect circular dependencies in the project graph.
Args:
graph: Dependency graph
Returns:
List of cycles (each cycle is a list of paths)
"""
cycles: list[list[Path]] = []
visited: set[Path] = set()
rec_stack: set[Path] = set()
def dfs(node: Path, path: list[Path]) -> None:
visited.add(node)
rec_stack.add(node)
path.append(node)
for neighbor in graph.get(node, set()):
if neighbor not in visited:
dfs(neighbor, path.copy())
elif neighbor in rec_stack:
# Found a cycle
cycle_start = path.index(neighbor)
cycle = path[cycle_start:] + [neighbor]
cycles.append(cycle)
rec_stack.remove(node)
for node in graph:
if node not in visited:
dfs(node, [])
return cycles

View File

@@ -0,0 +1,87 @@
"""
Data models for solution and project management.
"""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
@dataclass
class CsprojProject:
"""Represents a .csproj project file."""
path: Path # Absolute path to .csproj file
name: str # Project name (without extension)
guid: str # Project GUID (generated deterministically from path)
project_references: list[Path] = field(default_factory=list) # Resolved absolute paths
package_references: dict[str, str] = field(default_factory=dict) # Package name -> version
def __hash__(self) -> int:
return hash(self.path)
def __eq__(self, other: object) -> bool:
if not isinstance(other, CsprojProject):
return False
return self.path == other.path
@dataclass
class SolutionFolder:
"""Represents a solution folder in a .sln file."""
name: str # Folder display name
guid: str # Folder GUID
path: str # Full path within solution (e.g., "Module/__Libraries")
parent_guid: Optional[str] = None # Parent folder GUID (None for root folders)
children: list["SolutionFolder"] = field(default_factory=list)
projects: list[CsprojProject] = field(default_factory=list)
def __hash__(self) -> int:
return hash(self.path)
def __eq__(self, other: object) -> bool:
if not isinstance(other, SolutionFolder):
return False
return self.path == other.path
@dataclass
class PackageUsage:
"""Tracks usage of a NuGet package across the codebase."""
package_name: str
usages: dict[Path, str] = field(default_factory=dict) # csproj path -> version string
def get_all_versions(self) -> list[str]:
"""Get list of unique versions used."""
return list(set(self.usages.values()))
def get_usage_count(self) -> int:
"""Get number of projects using this package."""
return len(self.usages)
@dataclass
class NormalizationChange:
"""Represents a version change for a package in a project."""
csproj_path: Path
old_version: str
new_version: str
@dataclass
class NormalizationResult:
"""Result of normalizing a package across the codebase."""
package_name: str
target_version: str
changes: list[NormalizationChange] = field(default_factory=list)
skipped_reason: Optional[str] = None
# Constants for solution file format
CSHARP_PROJECT_TYPE_GUID = "FAE04EC0-301F-11D3-BF4B-00C04F79EFBC"
SOLUTION_FOLDER_TYPE_GUID = "2150E333-8FDC-42A3-9474-1A3956D46DE8"
BYPASS_MARKER = "# STELLAOPS-MANUAL-SOLUTION"

View File

@@ -0,0 +1,416 @@
"""
NuGet API v3 client for package version and vulnerability queries.
"""
import logging
import re
from typing import Any
try:
import requests
except ImportError:
requests = None # type: ignore
from .version_utils import parse_version, is_stable
logger = logging.getLogger(__name__)
NUGET_SERVICE_INDEX = "https://api.nuget.org/v3/index.json"
NUGET_VULN_INDEX = "https://api.nuget.org/v3/vulnerabilities/index.json"
class NuGetApiError(Exception):
"""Error communicating with NuGet API."""
pass
class NuGetApiClient:
"""
Client for NuGet API v3 operations.
Provides methods for:
- Fetching available package versions
- Fetching vulnerability data
- Finding non-vulnerable versions
"""
def __init__(self, source: str = "https://api.nuget.org/v3"):
if requests is None:
raise ImportError(
"requests library is required for NuGet API access. "
"Install with: pip install requests"
)
self.source = source.rstrip("/")
self._session = requests.Session()
self._session.headers.update(
{"User-Agent": "StellaOps-NuGetVulnChecker/1.0"}
)
self._service_index: dict | None = None
self._vuln_cache: dict[str, list[dict]] | None = None
self._search_url: str | None = None
self._registration_url: str | None = None
def _get_service_index(self) -> dict:
"""Fetch and cache the NuGet service index."""
if self._service_index is not None:
return self._service_index
try:
response = self._session.get(f"{self.source}/index.json", timeout=30)
response.raise_for_status()
self._service_index = response.json()
return self._service_index
except Exception as e:
raise NuGetApiError(f"Failed to fetch NuGet service index: {e}")
def _get_search_url(self) -> str:
"""Get the SearchQueryService URL from service index."""
if self._search_url:
return self._search_url
index = self._get_service_index()
resources = index.get("resources", [])
# Look for SearchQueryService
for resource in resources:
resource_type = resource.get("@type", "")
if "SearchQueryService" in resource_type:
self._search_url = resource.get("@id", "")
return self._search_url
raise NuGetApiError("SearchQueryService not found in service index")
def _get_registration_url(self) -> str:
"""Get the RegistrationsBaseUrl from service index."""
if self._registration_url:
return self._registration_url
index = self._get_service_index()
resources = index.get("resources", [])
# Look for RegistrationsBaseUrl
for resource in resources:
resource_type = resource.get("@type", "")
if "RegistrationsBaseUrl" in resource_type:
self._registration_url = resource.get("@id", "").rstrip("/")
return self._registration_url
raise NuGetApiError("RegistrationsBaseUrl not found in service index")
def get_available_versions(self, package_id: str) -> list[str]:
"""
Fetch all available versions of a package from NuGet.
Args:
package_id: The NuGet package ID
Returns:
List of version strings, sorted newest first
"""
try:
# Use registration API for complete version list
reg_url = self._get_registration_url()
package_lower = package_id.lower()
url = f"{reg_url}/{package_lower}/index.json"
response = self._session.get(url, timeout=30)
if response.status_code == 404:
logger.warning(f"Package not found on NuGet: {package_id}")
return []
response.raise_for_status()
data = response.json()
versions = []
for page in data.get("items", []):
# Pages may be inline or require fetching
if "items" in page:
items = page["items"]
else:
# Fetch the page
page_url = page.get("@id")
if page_url:
page_response = self._session.get(page_url, timeout=30)
page_response.raise_for_status()
items = page_response.json().get("items", [])
else:
items = []
for item in items:
catalog_entry = item.get("catalogEntry", {})
version = catalog_entry.get("version")
if version:
versions.append(version)
# Sort by parsed version, newest first
def sort_key(v: str) -> tuple:
parsed = parse_version(v)
if parsed is None:
return (0, 0, 0, "")
return parsed
versions.sort(key=sort_key, reverse=True)
return versions
except NuGetApiError:
raise
except Exception as e:
logger.warning(f"Failed to fetch versions for {package_id}: {e}")
return []
def get_vulnerability_data(self) -> dict[str, list[dict]]:
"""
Fetch vulnerability data from NuGet VulnerabilityInfo API.
Returns:
Dictionary mapping lowercase package ID to list of vulnerability info dicts.
Each dict contains: severity, advisory_url, versions (affected range)
"""
if self._vuln_cache is not None:
return self._vuln_cache
try:
# Fetch vulnerability index
response = self._session.get(NUGET_VULN_INDEX, timeout=30)
response.raise_for_status()
index = response.json()
vuln_map: dict[str, list[dict]] = {}
# Fetch each vulnerability page
for page_info in index:
page_url = page_info.get("@id")
if not page_url:
continue
try:
page_response = self._session.get(page_url, timeout=60)
page_response.raise_for_status()
page_data = page_response.json()
# Parse vulnerability entries
self._merge_vuln_data(vuln_map, page_data)
except Exception as e:
logger.warning(f"Failed to fetch vulnerability page {page_url}: {e}")
continue
self._vuln_cache = vuln_map
logger.info(f"Loaded vulnerability data for {len(vuln_map)} packages")
return vuln_map
except Exception as e:
logger.warning(f"Failed to fetch vulnerability data: {e}")
return {}
def _merge_vuln_data(
self, vuln_map: dict[str, list[dict]], page_data: Any
) -> None:
"""Merge vulnerability page data into the vulnerability map."""
# The vulnerability data format is a dict mapping package ID (lowercase)
# to list of vulnerability objects
if not isinstance(page_data, dict):
return
for package_id, vulns in page_data.items():
if package_id.startswith("@"):
# Skip metadata fields like @context
continue
package_lower = package_id.lower()
if package_lower not in vuln_map:
vuln_map[package_lower] = []
if isinstance(vulns, list):
vuln_map[package_lower].extend(vulns)
def is_version_vulnerable(
self, package_id: str, version: str, vuln_data: dict[str, list[dict]] | None = None
) -> tuple[bool, list[dict]]:
"""
Check if a specific package version is vulnerable.
Args:
package_id: The package ID
version: The version to check
vuln_data: Optional pre-fetched vulnerability data
Returns:
Tuple of (is_vulnerable, list of matching vulnerabilities)
"""
if vuln_data is None:
vuln_data = self.get_vulnerability_data()
package_lower = package_id.lower()
vulns = vuln_data.get(package_lower, [])
if not vulns:
return False, []
matching = []
parsed_version = parse_version(version)
if parsed_version is None:
return False, []
for vuln in vulns:
# Check version range
version_range = vuln.get("versions", "")
if self._version_in_range(version, parsed_version, version_range):
matching.append(vuln)
return len(matching) > 0, matching
def _version_in_range(
self, version: str, parsed: tuple, range_str: str
) -> bool:
"""
Check if a version is in a NuGet version range.
NuGet range formats:
- "[1.0.0, 2.0.0)" - >= 1.0.0 and < 2.0.0
- "(, 1.0.0)" - < 1.0.0
- "[1.0.0,)" - >= 1.0.0
- "1.0.0" - exact match
"""
if not range_str:
return False
range_str = range_str.strip()
# Handle exact version
if not range_str.startswith(("[", "(")):
exact_parsed = parse_version(range_str)
return exact_parsed == parsed if exact_parsed else False
# Parse range
match = re.match(r"([\[\(])([^,]*),([^)\]]*)([\)\]])", range_str)
if not match:
return False
left_bracket, left_ver, right_ver, right_bracket = match.groups()
left_ver = left_ver.strip()
right_ver = right_ver.strip()
# Check lower bound
if left_ver:
left_parsed = parse_version(left_ver)
if left_parsed:
if left_bracket == "[":
if parsed < left_parsed:
return False
else: # "("
if parsed <= left_parsed:
return False
# Check upper bound
if right_ver:
right_parsed = parse_version(right_ver)
if right_parsed:
if right_bracket == "]":
if parsed > right_parsed:
return False
else: # ")"
if parsed >= right_parsed:
return False
return True
def find_safe_version(
self,
package_id: str,
current_version: str,
prefer_upgrade: bool = True,
) -> str | None:
"""
Find the closest non-vulnerable version.
Strategy:
1. Get all available versions
2. Filter out versions with known vulnerabilities
3. Prefer: patch upgrade > minor upgrade > major upgrade > downgrade
Args:
package_id: The package ID
current_version: Current (vulnerable) version
prefer_upgrade: If True, prefer upgrades over downgrades
Returns:
Suggested safe version, or None if not found
"""
available = self.get_available_versions(package_id)
if not available:
return None
vuln_data = self.get_vulnerability_data()
current_parsed = parse_version(current_version)
if current_parsed is None:
return None
# Find safe versions
from .version_utils import ParsedVersion
safe_versions: list[tuple[str, ParsedVersion]] = []
for version in available:
# Skip prereleases unless current is prerelease
if not is_stable(version) and is_stable(current_version):
continue
parsed = parse_version(version)
if parsed is None:
continue
is_vuln, _ = self.is_version_vulnerable(package_id, version, vuln_data)
if not is_vuln:
safe_versions.append((version, parsed))
if not safe_versions:
return None
# Sort by preference: closest upgrade first
def sort_key(item: tuple[str, ParsedVersion]) -> tuple:
version, parsed = item
major_diff = parsed.major - current_parsed.major
minor_diff = parsed.minor - current_parsed.minor
patch_diff = parsed.patch - current_parsed.patch
# Prefer upgrades (positive diff) over downgrades
# Within upgrades, prefer smaller changes
if prefer_upgrade:
if major_diff > 0 or (major_diff == 0 and minor_diff > 0) or \
(major_diff == 0 and minor_diff == 0 and patch_diff > 0):
# Upgrade: prefer smaller version jumps
return (0, major_diff, minor_diff, patch_diff)
elif major_diff == 0 and minor_diff == 0 and patch_diff == 0:
# Same version (shouldn't happen if vulnerable)
return (1, 0, 0, 0)
else:
# Downgrade: prefer smaller version drops
return (2, -major_diff, -minor_diff, -patch_diff)
else:
# Just prefer closest version
return (abs(major_diff), abs(minor_diff), abs(patch_diff))
safe_versions.sort(key=sort_key)
return safe_versions[0][0] if safe_versions else None
def get_fix_risk(
self, current_version: str, suggested_version: str
) -> str:
"""
Estimate the risk of upgrading to a suggested version.
Returns: "low", "medium", or "high"
"""
current = parse_version(current_version)
suggested = parse_version(suggested_version)
if current is None or suggested is None:
return "unknown"
if suggested.major > current.major:
return "high" # Major version change
elif suggested.minor > current.minor:
return "medium" # Minor version change
else:
return "low" # Patch or no change

View File

@@ -0,0 +1,381 @@
"""
Solution file (.sln) generation utilities.
Provides functions to:
- Build solution folder hierarchy from projects
- Generate complete .sln file content
"""
import logging
from pathlib import Path
from typing import Optional
from .csproj_parser import get_deterministic_guid
from .models import (
BYPASS_MARKER,
CSHARP_PROJECT_TYPE_GUID,
SOLUTION_FOLDER_TYPE_GUID,
CsprojProject,
SolutionFolder,
)
logger = logging.getLogger(__name__)
# Solution file header
SOLUTION_HEADER = """\
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.0.31903.59
MinimumVisualStudioVersion = 10.0.40219.1
"""
def build_folder_hierarchy(
projects: list[CsprojProject],
base_dir: Path,
prefix: str = "",
) -> dict[str, SolutionFolder]:
"""
Build solution folder hierarchy from project paths.
Creates nested folders matching the physical directory structure.
Args:
projects: List of projects to organize
base_dir: Base directory for calculating relative paths
prefix: Optional prefix for folder paths (e.g., "__External")
Returns:
Dictionary mapping folder path to SolutionFolder object
"""
folders: dict[str, SolutionFolder] = {}
base_dir = base_dir.resolve()
for project in projects:
try:
rel_path = project.path.parent.relative_to(base_dir)
except ValueError:
# Project outside base_dir - skip folder creation
continue
parts = list(rel_path.parts)
if not parts:
continue
# Add prefix if specified
if prefix:
parts = [prefix] + list(parts)
# Create folders for each level
current_path = ""
parent_guid: Optional[str] = None
for part in parts:
if current_path:
current_path = f"{current_path}/{part}"
else:
current_path = part
if current_path not in folders:
folder_guid = get_deterministic_guid(
Path(current_path), Path("")
)
folders[current_path] = SolutionFolder(
name=part,
guid=folder_guid,
path=current_path,
parent_guid=parent_guid,
)
parent_guid = folders[current_path].guid
# Assign project to its folder
if current_path in folders:
folders[current_path].projects.append(project)
return folders
def build_external_folder_hierarchy(
external_groups: dict[str, list[CsprojProject]],
src_root: Path,
) -> dict[str, SolutionFolder]:
"""
Build folder hierarchy for external dependencies.
Organizes external projects under __External/<Source>/<Path>.
Args:
external_groups: Dictionary mapping source category to projects
src_root: Root of the src/ directory
Returns:
Dictionary mapping folder path to SolutionFolder object
"""
folders: dict[str, SolutionFolder] = {}
src_root = src_root.resolve()
# Create __External root folder
external_root_path = "__External"
external_root_guid = get_deterministic_guid(Path(external_root_path), Path(""))
folders[external_root_path] = SolutionFolder(
name="__External",
guid=external_root_guid,
path=external_root_path,
parent_guid=None,
)
for category, projects in sorted(external_groups.items()):
if not projects:
continue
# Create category folder (e.g., __External/__Libraries, __External/Authority)
category_path = f"{external_root_path}/{category}"
category_guid = get_deterministic_guid(Path(category_path), Path(""))
if category_path not in folders:
folders[category_path] = SolutionFolder(
name=category,
guid=category_guid,
path=category_path,
parent_guid=external_root_guid,
)
# For each project, create intermediate folders based on path within source
for project in projects:
try:
if category == "__Libraries":
# Path relative to src/__Libraries/
lib_root = src_root / "__Libraries"
rel_path = project.path.parent.relative_to(lib_root)
else:
# Path relative to src/<Module>/
module_root = src_root / category
rel_path = project.path.parent.relative_to(module_root)
except ValueError:
# Just put directly in category folder
folders[category_path].projects.append(project)
continue
parts = list(rel_path.parts)
if not parts:
folders[category_path].projects.append(project)
continue
# Create intermediate folders
current_path = category_path
parent_guid = category_guid
for part in parts:
current_path = f"{current_path}/{part}"
if current_path not in folders:
folder_guid = get_deterministic_guid(Path(current_path), Path(""))
folders[current_path] = SolutionFolder(
name=part,
guid=folder_guid,
path=current_path,
parent_guid=parent_guid,
)
parent_guid = folders[current_path].guid
# Assign project to deepest folder
folders[current_path].projects.append(project)
return folders
def generate_solution_content(
sln_path: Path,
projects: list[CsprojProject],
folders: dict[str, SolutionFolder],
external_folders: Optional[dict[str, SolutionFolder]] = None,
add_bypass_marker: bool = False,
) -> str:
"""
Generate complete .sln file content.
Args:
sln_path: Path where the solution will be written (for relative paths)
projects: List of internal projects
folders: Internal folder hierarchy
external_folders: External dependency folders (optional)
add_bypass_marker: Whether to add the bypass marker comment
Returns:
Complete .sln file content as string
"""
lines: list[str] = []
sln_dir = sln_path.parent.resolve()
# Add header
if add_bypass_marker:
lines.append(BYPASS_MARKER)
lines.append("")
lines.append(SOLUTION_HEADER.rstrip())
# Merge folders
all_folders = dict(folders)
if external_folders:
all_folders.update(external_folders)
# Collect all projects (internal + external from folders)
all_projects: list[CsprojProject] = list(projects)
project_to_folder: dict[Path, str] = {}
for folder_path, folder in all_folders.items():
for proj in folder.projects:
if proj not in all_projects:
all_projects.append(proj)
project_to_folder[proj.path] = folder_path
# Write solution folder entries
for folder_path in sorted(all_folders.keys()):
folder = all_folders[folder_path]
lines.append(
f'Project("{{{SOLUTION_FOLDER_TYPE_GUID}}}") = "{folder.name}", "{folder.name}", "{{{folder.guid}}}"'
)
lines.append("EndProject")
# Write project entries
for project in sorted(all_projects, key=lambda p: p.name):
rel_path = _get_relative_path(sln_dir, project.path)
lines.append(
f'Project("{{{CSHARP_PROJECT_TYPE_GUID}}}") = "{project.name}", "{rel_path}", "{{{project.guid}}}"'
)
lines.append("EndProject")
# Write Global section
lines.append("Global")
# SolutionConfigurationPlatforms
lines.append("\tGlobalSection(SolutionConfigurationPlatforms) = preSolution")
lines.append("\t\tDebug|Any CPU = Debug|Any CPU")
lines.append("\t\tRelease|Any CPU = Release|Any CPU")
lines.append("\tEndGlobalSection")
# ProjectConfigurationPlatforms
lines.append("\tGlobalSection(ProjectConfigurationPlatforms) = postSolution")
for project in sorted(all_projects, key=lambda p: p.name):
guid = project.guid
lines.append(f"\t\t{{{guid}}}.Debug|Any CPU.ActiveCfg = Debug|Any CPU")
lines.append(f"\t\t{{{guid}}}.Debug|Any CPU.Build.0 = Debug|Any CPU")
lines.append(f"\t\t{{{guid}}}.Release|Any CPU.ActiveCfg = Release|Any CPU")
lines.append(f"\t\t{{{guid}}}.Release|Any CPU.Build.0 = Release|Any CPU")
lines.append("\tEndGlobalSection")
# SolutionProperties
lines.append("\tGlobalSection(SolutionProperties) = preSolution")
lines.append("\t\tHideSolutionNode = FALSE")
lines.append("\tEndGlobalSection")
# NestedProjects - assign folders and projects to parent folders
lines.append("\tGlobalSection(NestedProjects) = preSolution")
# Nest folders under their parents
for folder_path in sorted(all_folders.keys()):
folder = all_folders[folder_path]
if folder.parent_guid:
lines.append(f"\t\t{{{folder.guid}}} = {{{folder.parent_guid}}}")
# Nest projects under their folders
for project in sorted(all_projects, key=lambda p: p.name):
if project.path in project_to_folder:
folder_path = project_to_folder[project.path]
folder = all_folders[folder_path]
lines.append(f"\t\t{{{project.guid}}} = {{{folder.guid}}}")
lines.append("\tEndGlobalSection")
# ExtensibilityGlobals (required by VS)
lines.append("\tGlobalSection(ExtensibilityGlobals) = postSolution")
# Generate a solution GUID
sln_guid = get_deterministic_guid(sln_path, sln_path.parent.parent)
lines.append(f"\t\tSolutionGuid = {{{sln_guid}}}")
lines.append("\tEndGlobalSection")
lines.append("EndGlobal")
lines.append("") # Trailing newline
return "\r\n".join(lines)
def _get_relative_path(from_dir: Path, to_path: Path) -> str:
"""
Get relative path from directory to file, using backslashes.
Args:
from_dir: Directory to calculate from
to_path: Target path
Returns:
Relative path with backslashes (Windows format for .sln)
"""
try:
rel = to_path.relative_to(from_dir)
return str(rel).replace("/", "\\")
except ValueError:
# Different drive or not relative - use absolute with backslashes
return str(to_path).replace("/", "\\")
def has_bypass_marker(sln_path: Path) -> bool:
"""
Check if a solution file has the bypass marker.
Args:
sln_path: Path to the solution file
Returns:
True if the bypass marker is found in the first 10 lines
"""
if not sln_path.exists():
return False
try:
with open(sln_path, "r", encoding="utf-8-sig") as f:
for i, line in enumerate(f):
if i >= 10:
break
if BYPASS_MARKER in line:
return True
except Exception as e:
logger.warning(f"Failed to read solution file {sln_path}: {e}")
return False
def write_solution_file(
sln_path: Path,
content: str,
dry_run: bool = False,
) -> bool:
"""
Write solution content to file.
Args:
sln_path: Path to write to
content: Solution file content
dry_run: If True, don't actually write
Returns:
True if successful (or would be successful in dry run)
"""
if dry_run:
logger.info(f"Would write solution to: {sln_path}")
return True
try:
# Ensure parent directory exists
sln_path.parent.mkdir(parents=True, exist_ok=True)
# Write with UTF-8 BOM and CRLF line endings
with open(sln_path, "w", encoding="utf-8-sig", newline="\r\n") as f:
f.write(content)
logger.info(f"Wrote solution to: {sln_path}")
return True
except Exception as e:
logger.error(f"Failed to write solution {sln_path}: {e}")
return False

View File

@@ -0,0 +1,237 @@
"""
Version parsing and comparison utilities for NuGet packages.
Handles SemVer versions with prerelease suffixes.
"""
import re
from dataclasses import dataclass
from typing import Optional
@dataclass(frozen=True)
class ParsedVersion:
"""Parsed semantic version."""
major: int
minor: int
patch: int
prerelease: Optional[str] = None
build_metadata: Optional[str] = None
original: str = ""
def is_stable(self) -> bool:
"""Returns True if this is a stable (non-prerelease) version."""
return self.prerelease is None
def __lt__(self, other: "ParsedVersion") -> bool:
"""Compare versions following SemVer rules."""
# Compare major.minor.patch first
self_tuple = (self.major, self.minor, self.patch)
other_tuple = (other.major, other.minor, other.patch)
if self_tuple != other_tuple:
return self_tuple < other_tuple
# If equal, prerelease versions are less than stable
if self.prerelease is None and other.prerelease is None:
return False
if self.prerelease is None:
return False # stable > prerelease
if other.prerelease is None:
return True # prerelease < stable
# Both have prerelease - compare alphanumerically
return self._compare_prerelease(self.prerelease, other.prerelease) < 0
def __le__(self, other: "ParsedVersion") -> bool:
return self == other or self < other
def __gt__(self, other: "ParsedVersion") -> bool:
return other < self
def __ge__(self, other: "ParsedVersion") -> bool:
return self == other or self > other
@staticmethod
def _compare_prerelease(a: str, b: str) -> int:
"""Compare prerelease strings according to SemVer."""
a_parts = a.split(".")
b_parts = b.split(".")
for i in range(max(len(a_parts), len(b_parts))):
if i >= len(a_parts):
return -1
if i >= len(b_parts):
return 1
a_part = a_parts[i]
b_part = b_parts[i]
# Try numeric comparison first
a_is_num = a_part.isdigit()
b_is_num = b_part.isdigit()
if a_is_num and b_is_num:
diff = int(a_part) - int(b_part)
if diff != 0:
return diff
elif a_is_num:
return -1 # Numeric < string
elif b_is_num:
return 1 # String > numeric
else:
# Both strings - compare lexically
if a_part < b_part:
return -1
if a_part > b_part:
return 1
return 0
# Regex for parsing NuGet versions
# Matches: 1.2.3, 1.2.3-beta, 1.2.3-beta.1, 1.2.3-rc.1+build, [1.2.3]
VERSION_PATTERN = re.compile(
r"^\[?" # Optional opening bracket
r"(\d+)" # Major (required)
r"(?:\.(\d+))?" # Minor (optional)
r"(?:\.(\d+))?" # Patch (optional)
r"(?:-([a-zA-Z0-9][a-zA-Z0-9.-]*))?" # Prerelease (optional)
r"(?:\+([a-zA-Z0-9][a-zA-Z0-9.-]*))?" # Build metadata (optional)
r"\]?$" # Optional closing bracket
)
# Pattern for wildcard versions (e.g., 1.0.*)
WILDCARD_PATTERN = re.compile(r"\*")
def parse_version(version_str: str) -> Optional[ParsedVersion]:
"""
Parse a NuGet version string.
Args:
version_str: Version string like "1.2.3", "1.2.3-beta.1", "[1.2.3]"
Returns:
ParsedVersion if valid, None if invalid or wildcard
"""
if not version_str:
return None
version_str = version_str.strip()
# Skip wildcard versions
if WILDCARD_PATTERN.search(version_str):
return None
match = VERSION_PATTERN.match(version_str)
if not match:
return None
major = int(match.group(1))
minor = int(match.group(2)) if match.group(2) else 0
patch = int(match.group(3)) if match.group(3) else 0
prerelease = match.group(4)
build_metadata = match.group(5)
return ParsedVersion(
major=major,
minor=minor,
patch=patch,
prerelease=prerelease,
build_metadata=build_metadata,
original=version_str,
)
def is_stable(version_str: str) -> bool:
"""
Check if a version string represents a stable release.
Args:
version_str: Version string to check
Returns:
True if stable (no prerelease suffix), False otherwise
"""
parsed = parse_version(version_str)
if parsed is None:
return False
return parsed.is_stable()
def compare_versions(v1: str, v2: str) -> int:
"""
Compare two version strings.
Args:
v1: First version string
v2: Second version string
Returns:
-1 if v1 < v2, 0 if equal, 1 if v1 > v2
Returns 0 if either version is unparseable
"""
parsed_v1 = parse_version(v1)
parsed_v2 = parse_version(v2)
if parsed_v1 is None or parsed_v2 is None:
return 0
if parsed_v1 < parsed_v2:
return -1
if parsed_v1 > parsed_v2:
return 1
return 0
def select_latest_stable(versions: list[str]) -> Optional[str]:
"""
Select the latest stable version from a list.
Args:
versions: List of version strings
Returns:
Latest stable version string, or None if no stable versions exist
"""
stable_versions: list[tuple[ParsedVersion, str]] = []
for v in versions:
parsed = parse_version(v)
if parsed is not None and parsed.is_stable():
stable_versions.append((parsed, v))
if not stable_versions:
return None
# Sort by parsed version and return the original string of the max
stable_versions.sort(key=lambda x: x[0], reverse=True)
return stable_versions[0][1]
def normalize_version_string(version_str: str) -> str:
"""
Normalize a version string to a canonical form.
Strips brackets, whitespace, and normalizes format.
Args:
version_str: Version string to normalize
Returns:
Normalized version string
"""
parsed = parse_version(version_str)
if parsed is None:
return version_str.strip()
# Rebuild canonical form
result = f"{parsed.major}.{parsed.minor}.{parsed.patch}"
if parsed.prerelease:
result += f"-{parsed.prerelease}"
if parsed.build_metadata:
result += f"+{parsed.build_metadata}"
return result

View File

@@ -0,0 +1,123 @@
"""
Data models for NuGet vulnerability checking.
"""
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class VulnerabilityDetail:
"""Details about a specific vulnerability."""
severity: str # low, moderate, high, critical
advisory_url: str
@dataclass
class VulnerablePackage:
"""A package with known vulnerabilities."""
package_id: str
resolved_version: str
requested_version: str
vulnerabilities: list[VulnerabilityDetail] = field(default_factory=list)
affected_projects: list[Path] = field(default_factory=list)
suggested_version: str | None = None
fix_risk: str = "unknown" # low, medium, high
@property
def highest_severity(self) -> str:
"""Get the highest severity among all vulnerabilities."""
severity_order = {"low": 1, "moderate": 2, "high": 3, "critical": 4}
if not self.vulnerabilities:
return "unknown"
return max(
self.vulnerabilities,
key=lambda v: severity_order.get(v.severity.lower(), 0),
).severity
@property
def advisory_urls(self) -> list[str]:
"""Get all advisory URLs."""
return [v.advisory_url for v in self.vulnerabilities]
@dataclass
class SuggestedFix:
"""Suggested fix for a vulnerable package."""
version: str
is_major_upgrade: bool
is_minor_upgrade: bool
is_patch_upgrade: bool
breaking_change_risk: str # low, medium, high
@classmethod
def from_versions(
cls, current: str, suggested: str, current_parsed: tuple, suggested_parsed: tuple
) -> "SuggestedFix":
"""Create a SuggestedFix from version tuples."""
is_major = suggested_parsed[0] > current_parsed[0]
is_minor = not is_major and suggested_parsed[1] > current_parsed[1]
is_patch = not is_major and not is_minor and suggested_parsed[2] > current_parsed[2]
# Estimate breaking change risk
if is_major:
risk = "high"
elif is_minor:
risk = "medium"
else:
risk = "low"
return cls(
version=suggested,
is_major_upgrade=is_major,
is_minor_upgrade=is_minor,
is_patch_upgrade=is_patch,
breaking_change_risk=risk,
)
@dataclass
class VulnerabilityReport:
"""Complete vulnerability scan report."""
solution: Path
min_severity: str
total_packages: int
vulnerabilities: list[VulnerablePackage] = field(default_factory=list)
unfixable: list[tuple[str, str]] = field(default_factory=list) # (package, reason)
@property
def vulnerable_count(self) -> int:
"""Count of vulnerable packages."""
return len(self.vulnerabilities)
@property
def fixable_count(self) -> int:
"""Count of packages with suggested fixes."""
return sum(1 for v in self.vulnerabilities if v.suggested_version)
@property
def unfixable_count(self) -> int:
"""Count of packages without fixes."""
return len(self.unfixable) + sum(
1 for v in self.vulnerabilities if not v.suggested_version
)
# Severity level mapping for comparisons
SEVERITY_LEVELS = {
"low": 1,
"moderate": 2,
"high": 3,
"critical": 4,
}
def meets_severity_threshold(vuln_severity: str, min_severity: str) -> bool:
"""Check if vulnerability meets minimum severity threshold."""
vuln_level = SEVERITY_LEVELS.get(vuln_severity.lower(), 0)
min_level = SEVERITY_LEVELS.get(min_severity.lower(), 0)
return vuln_level >= min_level