Qualys VM Prioritization
Pipeline
A Python pipeline that parses Qualys VM CSV exports, deduplicates findings across CVE-level rows, applies risk-based priority scoring using CVSSv3 and Qualys RTI signals, and outputs a ranked remediation queue mapped to SLA targets. Built against a real cloud agent scan of a local macOS host.
Why this exists
Real scan output -- cloud agent
| Severity | Finding | CVSS | Score | SLA |
|---|---|---|---|---|
| Critical |
Microsoft Office Security Update for November 2025
Denial Of Service, Easy Exploit, High Data Loss, Remote Code Execution
|
9.8 | 218.8 | Within 15 days |
| High |
Microsoft Office Security Update for October 2025
Denial Of Service, Easy Exploit, Remote Code Execution
|
8.8 | 172.8 | Within 30 days |
| High |
Microsoft Office Security Update for January 2026
Denial Of Service, Easy Exploit, Remote Code Execution
|
8.4 | 170.4 | Within 30 days |
| High |
Microsoft Office Security Update for March 2026
Denial Of Service, Easy Exploit, Remote Code Execution, Privilege Escalation
|
7.8 | 166.8 | Within 30 days |
| High |
Microsoft Office Security Update for February 2026
Denial Of Service, Easy Exploit, High Data Loss, Privilege Escalation
|
7.8 | 166.8 | Within 30 days |
Pipeline stages
Reads Qualys VM CSV exports, skipping the 3 metadata rows Qualys prepends before the actual column headers. Extracts QID, CVE, title, CVSSv3, severity, RTI signals, asset info, detection age, and solution per row.
Qualys exports one row per CVE, so a single finding tied to 12 CVEs produces 12 rows. Pipeline deduplicates on QID + asset IP, keeping the row with the highest CVSSv3 score as the representative finding.
Calculates composite priority score: (severity_weight x 40) + (cvss_score x 6). Severity mapped from Qualys CVSS Rating Label field (Critical/High/Medium/Low), falling back to the numeric Qualys severity scale if the label is absent.
Maps each finding to a remediation SLA based on severity. Critical: 15 days, High: 30 days, Medium: 90 days, Low: 180 days. Qualys RTI signals (exploit availability, lateral movement risk, RCE) are preserved in output for analyst triage context.
Outputs a timestamped CSV with priority score, severity, CVSS, QID, CVE, title, asset, detection age, SLA target, RTI signals, and solution. Terminal summary prints severity distribution and top 5 findings on every run.
parse_qualys.py
#!/usr/bin/env python3 """ Qualys Vulnerability Prioritization Pipeline --------------------------------------------- Parses Qualys VM CSV exports, deduplicates findings by QID + asset, applies risk-based scoring, and outputs a prioritized remediation CSV. Usage: python3 parse_qualys.py export.csv python3 parse_qualys.py export1.csv export2.csv """ import sys import csv from datetime import datetime from pathlib import Path # ----------------------------------------------------------------------- # STEP 1: PARSE # Qualys exports have 3 metadata rows before the header row # ----------------------------------------------------------------------- def parse_qualys_csv(filepath): """Reads a Qualys VM CSV export and extracts all findings.""" findings = [] with open(filepath, "r", encoding="utf-8") as f: lines = f.readlines() # Skip the 3 metadata rows, row 4 is the real header reader = csv.DictReader(lines[3:]) for row in reader: if not row.get("QID", "").strip(): continue cvss3 = row.get("CVSSv3.1 Base", "0.0").strip() cvss2 = row.get("CVSSv2 Base", "0.0").strip() try: cvss_score = float(cvss3) if cvss3 and cvss3 != "'-" else float(cvss2) except ValueError: cvss_score = 0.0 sev_int = int(row.get("Severity", "0").strip() or 0) risk_factor = qualys_severity_to_risk(sev_int, row.get("CVSS Rating Labels", "")) finding = { "qid": row.get("QID", "").strip(), "cve": row.get("CVE", "").strip(), "title": row.get("Title", "").strip(), "risk_factor": risk_factor, "cvss_score": cvss_score, "asset_name": row.get("Asset Name", "").strip(), "asset_ip": row.get("Asset IPV4", "").strip(), "status": row.get("Status", "").strip(), "category": row.get("Category", "").strip(), "rti": row.get("RTI", "").strip(), "first_detected": row.get("First Detected", "").strip(), "last_detected": row.get("Last Detected", "").strip(), "detection_age": row.get("Detection AGE", "0").strip(), "patchable": row.get("Vuln Patchable", "").strip(), "solution": row.get("Solution", "")[:300].strip(), } finding["severity"] = risk_to_severity(risk_factor) findings.append(finding) return findings def qualys_severity_to_risk(sev_int, cvss_label): """Map Qualys severity to standard risk label.""" label = cvss_label.strip().upper() if label in ("CRITICAL", "HIGH", "MEDIUM", "LOW"): return label.capitalize() mapping = {5: "Critical", 4: "High", 3: "Medium", 2: "Low", 1: "Low"} return mapping.get(sev_int, "Low") def risk_to_severity(risk_factor): """Map risk factor to numeric weight for scoring.""" return {"Critical": 4, "High": 3, "Medium": 2, "Low": 1}.get(risk_factor, 0) # ----------------------------------------------------------------------- # STEP 2: DEDUPLICATE # ----------------------------------------------------------------------- def deduplicate(findings): """Remove duplicate QID + asset combos. Keep highest CVSSv3.""" seen = {} for f in findings: key = (f["qid"], f["asset_ip"]) if key not in seen or f["cvss_score"] > seen[key]["cvss_score"]: seen[key] = f return list(seen.values()), len(findings) - len(seen) # ----------------------------------------------------------------------- # STEP 3 + 4: SCORE AND SLA # ----------------------------------------------------------------------- def prioritize(findings): """Score findings and map to SLA targets. Sort highest risk first.""" sla_map = {"Critical": 15, "High": 30, "Medium": 90, "Low": 180} for f in findings: f["priority_score"] = round((f["severity"] * 40) + (f["cvss_score"] * 6), 2) sla = sla_map.get(f["risk_factor"]) f["sla_days"] = sla f["remediation_due"] = f"Within {sla} days" if sla else "Informational" findings.sort(key=lambda x: x["priority_score"], reverse=True) return findings # ----------------------------------------------------------------------- # STEP 5: EXPORT # ----------------------------------------------------------------------- def export_csv(findings, output_path): """Write prioritized findings to a timestamped CSV.""" fields = [ "priority_score", "risk_factor", "cvss_score", "qid", "cve", "title", "asset_name", "asset_ip", "status", "category", "rti", "detection_age", "first_detected", "last_detected", "sla_days", "remediation_due", "patchable", "solution" ] with open(output_path, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore") w.writeheader() w.writerows(findings) print(f"\nExported {len(findings)} findings to: {output_path}") # ----------------------------------------------------------------------- # MAIN # ----------------------------------------------------------------------- def main(): if len(sys.argv) < 2: print("Usage: python3 parse_qualys.py export.csv [export2.csv ...]") sys.exit(1) all_findings = [] for filepath in sys.argv[1:]: findings = parse_qualys_csv(filepath) print(f" {filepath}: {len(findings)} findings extracted") all_findings.extend(findings) unique, dupes = deduplicate(all_findings) print(f" {dupes} duplicates removed, {len(unique)} unique findings remain") prioritized = prioritize(unique) ts = datetime.now().strftime("%Y%m%d_%H%M%S") export_csv(prioritized, f"qualys_prioritized_{ts}.csv") if __name__ == "__main__": main()
Scoring in this pipeline is CVSSv3-weighted for demonstration purposes. In practice, vulnerability prioritization requires broader context -- asset criticality, exploitability in the wild, compensating controls, and business impact. Qualys RTI signals (exploit availability, lateral movement risk) are preserved in the output as additional triage context, but this pipeline is a lab exercise in automation and data normalization, not a production remediation workflow.