Some checks failed
		
		
	
	Docs CI / lint-and-preview (push) Has been cancelled
				
			- Implemented PolicyDslValidator with command-line options for strict mode and JSON output. - Created PolicySchemaExporter to generate JSON schemas for policy-related models. - Developed PolicySimulationSmoke tool to validate policy simulations against expected outcomes. - Added project files and necessary dependencies for each tool. - Ensured proper error handling and usage instructions across tools.
		
			
				
	
	
		
			131 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			131 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | |
| """
 | |
| Ensure deployment bundles reference the images defined in a release manifest.
 | |
| 
 | |
| Usage:
 | |
|   ./deploy/tools/check-channel-alignment.py \
 | |
|       --release deploy/releases/2025.10-edge.yaml \
 | |
|       --target deploy/helm/stellaops/values-dev.yaml \
 | |
|       --target deploy/compose/docker-compose.dev.yaml
 | |
| 
 | |
| For every target file, the script scans `image:` declarations and verifies that
 | |
| any image belonging to a repository listed in the release manifest matches the
 | |
| exact digest or tag recorded there. Images outside of the manifest (for example,
 | |
| supporting services such as `nats`) are ignored.
 | |
| """
 | |
| 
 | |
| from __future__ import annotations
 | |
| 
 | |
| import argparse
 | |
| import pathlib
 | |
| import re
 | |
| import sys
 | |
| from typing import Dict, Iterable, List, Optional, Set
 | |
| 
 | |
| IMAGE_LINE = re.compile(r"^\s*image:\s*['\"]?(?P<image>\S+)['\"]?\s*$")
 | |
| 
 | |
| 
 | |
| def extract_images(path: pathlib.Path) -> List[str]:
 | |
|     images: List[str] = []
 | |
|     for line in path.read_text(encoding="utf-8").splitlines():
 | |
|         match = IMAGE_LINE.match(line)
 | |
|         if match:
 | |
|             images.append(match.group("image"))
 | |
|     return images
 | |
| 
 | |
| 
 | |
| def image_repo(image: str) -> str:
 | |
|     if "@" in image:
 | |
|         return image.split("@", 1)[0]
 | |
|     # Split on the last colon to preserve registries with ports (e.g. localhost:5000)
 | |
|     if ":" in image:
 | |
|         prefix, tag = image.rsplit(":", 1)
 | |
|         if "/" in tag:
 | |
|             # handle digestive colon inside path (unlikely)
 | |
|             return image
 | |
|         return prefix
 | |
|     return image
 | |
| 
 | |
| 
 | |
| def load_release_map(release_path: pathlib.Path) -> Dict[str, str]:
 | |
|     release_map: Dict[str, str] = {}
 | |
|     for image in extract_images(release_path):
 | |
|         repo = image_repo(image)
 | |
|         release_map[repo] = image
 | |
|     return release_map
 | |
| 
 | |
| 
 | |
| def check_target(
 | |
|     target_path: pathlib.Path,
 | |
|     release_map: Dict[str, str],
 | |
|     ignore_repos: Set[str],
 | |
| ) -> List[str]:
 | |
|     errors: List[str] = []
 | |
|     for image in extract_images(target_path):
 | |
|         repo = image_repo(image)
 | |
|         if repo in ignore_repos:
 | |
|             continue
 | |
|         if repo not in release_map:
 | |
|             continue
 | |
|         expected = release_map[repo]
 | |
|         if image != expected:
 | |
|             errors.append(
 | |
|                 f"{target_path}: {image} does not match release value {expected}"
 | |
|             )
 | |
|     return errors
 | |
| 
 | |
| 
 | |
| def parse_args(argv: Optional[Iterable[str]] = None) -> argparse.Namespace:
 | |
|     parser = argparse.ArgumentParser(description=__doc__)
 | |
|     parser.add_argument(
 | |
|         "--release",
 | |
|         required=True,
 | |
|         type=pathlib.Path,
 | |
|         help="Path to the release manifest (YAML)",
 | |
|     )
 | |
|     parser.add_argument(
 | |
|         "--target",
 | |
|         action="append",
 | |
|         required=True,
 | |
|         type=pathlib.Path,
 | |
|         help="Deployment profile to validate against the release manifest",
 | |
|     )
 | |
|     parser.add_argument(
 | |
|         "--ignore-repo",
 | |
|         action="append",
 | |
|         default=[],
 | |
|         help="Repository prefix to ignore (may be repeated)",
 | |
|     )
 | |
|     return parser.parse_args(argv)
 | |
| 
 | |
| 
 | |
| def main(argv: Optional[Iterable[str]] = None) -> int:
 | |
|     args = parse_args(argv)
 | |
| 
 | |
|     release_map = load_release_map(args.release)
 | |
|     ignore_repos = {repo.rstrip("/") for repo in args.ignore_repo}
 | |
| 
 | |
|     if not release_map:
 | |
|         print(f"error: no images found in release manifest {args.release}", file=sys.stderr)
 | |
|         return 2
 | |
| 
 | |
|     total_errors: List[str] = []
 | |
|     for target in args.target:
 | |
|         if not target.exists():
 | |
|             total_errors.append(f"{target}: file not found")
 | |
|             continue
 | |
|         total_errors.extend(check_target(target, release_map, ignore_repos))
 | |
| 
 | |
|     if total_errors:
 | |
|         print("✖ channel alignment check failed:", file=sys.stderr)
 | |
|         for err in total_errors:
 | |
|             print(f"  - {err}", file=sys.stderr)
 | |
|         return 1
 | |
| 
 | |
|     print("✓ deployment profiles reference release images for the inspected repositories.")
 | |
|     return 0
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     raise SystemExit(main())
 |