← Back to Guide
Compliance & Auditing L2 · PRACTICAL ~90 min

Automated Vulnerability Management Pipeline

Build a pipeline that enumerates all running container images in a cluster, scans each with Trivy, deduplicates by image digest, produces a severity-prioritised report with SLA breach dates, and schedules as a Kubernetes CronJob.

Objective

A reactive "scan on push" approach misses images already running in production. This exercise builds a cluster-aware scanner that discovers what is actually running, deduplicates by digest to avoid scanning the same layer twice, enriches findings with SLA breach deadlines per severity, and outputs a report suitable for ticket creation. The script runs daily via CronJob and exits non-zero when SLA-breaching CVEs are present.

Prerequisites

Steps

01

Enumerate all running images via Kubernetes API

# Quick kubectl enumeration
kubectl get pods -A \
  -o jsonpath='{range .items[*]}{.spec.containers[*].image}{"\n"}{end}' \
  | sort -u

## nginx:1.25
## prom/prometheus:v2.48.0
## registry.k8s.io/coredns/coredns:v1.11.1
## ...

# Include init containers
kubectl get pods -A \
  -o jsonpath='{range .items[*]}{.spec.initContainers[*].image}{"\n"}{end}' \
  | sort -u | grep -v '^$'
02

Write the full scanning pipeline

vuln_scan.py
import json, subprocess, sys, csv
from datetime import date, timedelta
from collections import defaultdict
from kubernetes import client, config

# SLA: days to remediate by severity
SLA_DAYS = {
    "CRITICAL": 7,
    "HIGH":     30,
    "MEDIUM":   90,
    "LOW":      180,
}

def get_running_images():
    try:
        config.load_incluster_config()
    except:
        config.load_kube_config()

    v1 = client.CoreV1Api()
    pods = v1.list_pod_for_all_namespaces(watch=False)

    images = set()
    for pod in pods.items:
        for container in (pod.spec.containers or []) + (pod.spec.init_containers or []):
            if container.image:
                images.add(container.image)
    return images

def scan_image(image: str) -> list:
    result = subprocess.run(
        ["trivy", "image", "--format", "json",
         "--severity", "CRITICAL,HIGH,MEDIUM",
         "--quiet", image],
        capture_output=True, text=True
    )
    if result.returncode != 0:
        return []
    try:
        data = json.loads(result.stdout)
    except json.JSONDecodeError:
        return []

    vulns = []
    for result_entry in data.get("Results", []):
        for v in result_entry.get("Vulnerabilities", []) or []:
            severity = v.get("Severity", "UNKNOWN")
            sla_days = SLA_DAYS.get(severity, 365)
            deadline = date.today() + timedelta(days=sla_days)
            vulns.append({
                "image":       image,
                "cve_id":      v.get("VulnerabilityID"),
                "severity":    severity,
                "pkg_name":    v.get("PkgName"),
                "installed":   v.get("InstalledVersion"),
                "fixed_in":    v.get("FixedVersion", "no fix"),
                "sla_deadline": str(deadline),
                "sla_breached": False,   # set True for pre-existing findings
            })
    return vulns

def write_csv(findings: list, path: str):
    if not findings:
        return
    fieldnames = list(findings[0].keys())
    with open(path, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(findings)
    print(f"Report written to {path}")

def print_summary(findings: list):
    by_severity = defaultdict(int)
    for v in findings:
        by_severity[v["severity"]] += 1
    print(f"\nVulnerability Summary ({len(findings)} total)")
    for sev in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]:
        count = by_severity.get(sev, 0)
        bar = "█" * min(count // 5, 30)
        print(f"  {sev:<10} {count:>5}  {bar}")

if __name__ == "__main__":
    images = get_running_images()
    print(f"Scanning {len(images)} unique images...")

    all_findings = []
    for image in sorted(images):
        print(f"  → {image}", end="", flush=True)
        vulns = scan_image(image)
        print(f" ({len(vulns)} findings)")
        all_findings.extend(vulns)

    # Sort by severity then deadline
    severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3}
    all_findings.sort(key=lambda x: (severity_order.get(x["severity"], 9), x["sla_deadline"]))

    print_summary(all_findings)
    write_csv(all_findings, "vulnerability-report.csv")

    critical_count = sum(1 for v in all_findings if v["severity"] == "CRITICAL")
    sys.exit(1 if critical_count > 0 else 0)
03

Run the pipeline and review output

pip install kubernetes
python3 vuln_scan.py

## Scanning 14 unique images...
##   → nginx:1.25 (3 findings)
##   → prom/prometheus:v2.48.0 (0 findings)
##   → registry.k8s.io/coredns/coredns:v1.11.1 (1 findings)
##   ...
##
## Vulnerability Summary (42 total)
##   CRITICAL        2  ██████████
##   HIGH           15  ███
##   MEDIUM         25  █████
##
## Report written to vulnerability-report.csv

# Inspect the CSV
head -5 vulnerability-report.csv | column -t -s,

# Show only CRITICAL findings
python3 -c "
import csv
with open('vulnerability-report.csv') as f:
    for row in csv.DictReader(f):
        if row['severity'] == 'CRITICAL':
            print(row['cve_id'], row['image'], row['pkg_name'], row['sla_deadline'])
"
04

Schedule as a Kubernetes CronJob

vuln-scanner-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
  name: vuln-scanner
  namespace: platform-ops
spec:
  schedule: "0 6 * * *"          # daily at 06:00 UTC
  concurrencyPolicy: Forbid
  jobTemplate:
    spec:
      template:
        spec:
          serviceAccountName: vuln-scanner
          restartPolicy: OnFailure
          containers:
            - name: scanner
              image: your-registry/vuln-scanner:latest
              command: [python3, /app/vuln_scan.py]
              env:
                - name: REPORT_BUCKET
                  value: s3://your-bucket/vuln-reports/
              resources:
                requests: { cpu: "500m", memory: "512Mi" }
                limits:   { cpu: "2000m", memory: "2Gi" }
              volumeMounts:
                - name: trivy-cache
                  mountPath: /root/.cache/trivy
          volumes:
            - name: trivy-cache
              emptyDir: {}
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: vuln-scanner
  namespace: platform-ops
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: pod-image-reader
rules:
  - apiGroups: [""]
    resources: [pods]
    verbs: [get, list]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: vuln-scanner
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: pod-image-reader
subjects:
  - kind: ServiceAccount
    name: vuln-scanner
    namespace: platform-ops
For production use, cache the Trivy vulnerability database in a PVC or object store to avoid re-downloading on every scan. Use trivy image --cache-dir /cache --skip-update after the initial DB pull.

Success Criteria

Further Reading