Qualys Vulnerability Pipeline — Carissa Durko
Vulnerability Management

Qualys VM Prioritization
Pipeline

A Python pipeline that parses Qualys VM CSV exports, deduplicates findings across CVE-level rows, applies risk-based priority scoring using CVSSv3 and Qualys RTI signals, and outputs a ranked remediation queue mapped to SLA targets. Built against a real cloud agent scan of a local macOS host.

Python Qualys VM Cloud Agent CVSSv3 RTI Deduplication SLA Tracking Remediation Prioritization

Why this exists

Problem
Qualys exports one row per CVE per finding, meaning a single vulnerability with 10 associated CVEs produces 10 rows. Raw output is not usable as a remediation queue without normalization.
Approach
Pipeline deduplicates on QID + asset IP, keeping the highest CVSSv3 instance. Scoring incorporates both CVSS score and severity weight. RTI (Risk Threat Intelligence) signals from Qualys are preserved in the output for additional analyst context.
Platform note
Qualys VM cloud agent scan on macOS. Qualys does not support loopback scanning -- the cloud agent approach authenticates locally and reports to the Qualys platform, which is how enterprise deployments typically work at scale.

Real scan output -- cloud agent

1
Critical
5
High
1
Medium
0
Low
0
Info
67 raw rows extracted from the Qualys export. Pipeline removed 60 duplicates -- same QID appearing once per associated CVE -- leaving 7 unique findings across 7 distinct QIDs on a single asset.
Severity Finding CVSS Score SLA
Critical
Microsoft Office Security Update for November 2025
Denial Of Service, Easy Exploit, High Data Loss, Remote Code Execution
9.8 218.8 Within 15 days
High
Microsoft Office Security Update for October 2025
Denial Of Service, Easy Exploit, Remote Code Execution
8.8 172.8 Within 30 days
High
Microsoft Office Security Update for January 2026
Denial Of Service, Easy Exploit, Remote Code Execution
8.4 170.4 Within 30 days
High
Microsoft Office Security Update for March 2026
Denial Of Service, Easy Exploit, Remote Code Execution, Privilege Escalation
7.8 166.8 Within 30 days
High
Microsoft Office Security Update for February 2026
Denial Of Service, Easy Exploit, High Data Loss, Privilege Escalation
7.8 166.8 Within 30 days
Raw Export
Total rows67
Unique QIDs7
Asset scannedCarissas-MacBook-Air.local
Scan methodCloud Agent
After Pipeline
Duplicates removed60
Unique findings7
Critical1
Patchable findings6 of 7

Pipeline stages

01
Ingest -- Qualys CSV Parsing

Reads Qualys VM CSV exports, skipping the 3 metadata rows Qualys prepends before the actual column headers. Extracts QID, CVE, title, CVSSv3, severity, RTI signals, asset info, detection age, and solution per row.

parse_qualys_csv() -- skips 3 metadata rows, maps 37 columns
02
Deduplicate -- QID + Asset Normalization

Qualys exports one row per CVE, so a single finding tied to 12 CVEs produces 12 rows. Pipeline deduplicates on QID + asset IP, keeping the row with the highest CVSSv3 score as the representative finding.

deduplicate() -- key: (QID, asset_ip)
03
Score -- Risk-Based Prioritization

Calculates composite priority score: (severity_weight x 40) + (cvss_score x 6). Severity mapped from Qualys CVSS Rating Label field (Critical/High/Medium/Low), falling back to the numeric Qualys severity scale if the label is absent.

priority_score = (severity_weight x 40) + (cvss x 6)
04
SLA Mapping -- Remediation Targets

Maps each finding to a remediation SLA based on severity. Critical: 15 days, High: 30 days, Medium: 90 days, Low: 180 days. Qualys RTI signals (exploit availability, lateral movement risk, RCE) are preserved in output for analyst triage context.

Critical: 15d / High: 30d / Medium: 90d / Low: 180d
05
Export -- Remediation Queue CSV

Outputs a timestamped CSV with priority score, severity, CVSS, QID, CVE, title, asset, detection age, SLA target, RTI signals, and solution. Terminal summary prints severity distribution and top 5 findings on every run.

qualys_prioritized_YYYYMMDD_HHMMSS.csv

parse_qualys.py

parse_qualys.py
Python 3
#!/usr/bin/env python3
"""
Qualys Vulnerability Prioritization Pipeline
---------------------------------------------
Parses Qualys VM CSV exports, deduplicates findings by QID + asset,
applies risk-based scoring, and outputs a prioritized remediation CSV.

Usage:
    python3 parse_qualys.py export.csv
    python3 parse_qualys.py export1.csv export2.csv
"""

import sys
import csv
from datetime import datetime
from pathlib import Path


# -----------------------------------------------------------------------
# STEP 1: PARSE
# Qualys exports have 3 metadata rows before the header row
# -----------------------------------------------------------------------

def parse_qualys_csv(filepath):
    """Reads a Qualys VM CSV export and extracts all findings."""
    findings = []

    with open(filepath, "r", encoding="utf-8") as f:
        lines = f.readlines()

    # Skip the 3 metadata rows, row 4 is the real header
    reader = csv.DictReader(lines[3:])

    for row in reader:
        if not row.get("QID", "").strip():
            continue

        cvss3 = row.get("CVSSv3.1 Base", "0.0").strip()
        cvss2 = row.get("CVSSv2 Base", "0.0").strip()
        try:
            cvss_score = float(cvss3) if cvss3 and cvss3 != "'-" else float(cvss2)
        except ValueError:
            cvss_score = 0.0

        sev_int = int(row.get("Severity", "0").strip() or 0)
        risk_factor = qualys_severity_to_risk(sev_int, row.get("CVSS Rating Labels", ""))

        finding = {
            "qid":          row.get("QID", "").strip(),
            "cve":          row.get("CVE", "").strip(),
            "title":        row.get("Title", "").strip(),
            "risk_factor":  risk_factor,
            "cvss_score":   cvss_score,
            "asset_name":   row.get("Asset Name", "").strip(),
            "asset_ip":     row.get("Asset IPV4", "").strip(),
            "status":       row.get("Status", "").strip(),
            "category":     row.get("Category", "").strip(),
            "rti":          row.get("RTI", "").strip(),
            "first_detected": row.get("First Detected", "").strip(),
            "last_detected":  row.get("Last Detected", "").strip(),
            "detection_age":  row.get("Detection AGE", "0").strip(),
            "patchable":    row.get("Vuln Patchable", "").strip(),
            "solution":     row.get("Solution", "")[:300].strip(),
        }

        finding["severity"] = risk_to_severity(risk_factor)
        findings.append(finding)

    return findings


def qualys_severity_to_risk(sev_int, cvss_label):
    """Map Qualys severity to standard risk label."""
    label = cvss_label.strip().upper()
    if label in ("CRITICAL", "HIGH", "MEDIUM", "LOW"):
        return label.capitalize()
    mapping = {5: "Critical", 4: "High", 3: "Medium", 2: "Low", 1: "Low"}
    return mapping.get(sev_int, "Low")


def risk_to_severity(risk_factor):
    """Map risk factor to numeric weight for scoring."""
    return {"Critical": 4, "High": 3, "Medium": 2, "Low": 1}.get(risk_factor, 0)


# -----------------------------------------------------------------------
# STEP 2: DEDUPLICATE
# -----------------------------------------------------------------------

def deduplicate(findings):
    """Remove duplicate QID + asset combos. Keep highest CVSSv3."""
    seen = {}
    for f in findings:
        key = (f["qid"], f["asset_ip"])
        if key not in seen or f["cvss_score"] > seen[key]["cvss_score"]:
            seen[key] = f
    return list(seen.values()), len(findings) - len(seen)


# -----------------------------------------------------------------------
# STEP 3 + 4: SCORE AND SLA
# -----------------------------------------------------------------------

def prioritize(findings):
    """Score findings and map to SLA targets. Sort highest risk first."""
    sla_map = {"Critical": 15, "High": 30, "Medium": 90, "Low": 180}
    for f in findings:
        f["priority_score"] = round((f["severity"] * 40) + (f["cvss_score"] * 6), 2)
        sla = sla_map.get(f["risk_factor"])
        f["sla_days"] = sla
        f["remediation_due"] = f"Within {sla} days" if sla else "Informational"
    findings.sort(key=lambda x: x["priority_score"], reverse=True)
    return findings


# -----------------------------------------------------------------------
# STEP 5: EXPORT
# -----------------------------------------------------------------------

def export_csv(findings, output_path):
    """Write prioritized findings to a timestamped CSV."""
    fields = [
        "priority_score", "risk_factor", "cvss_score", "qid", "cve",
        "title", "asset_name", "asset_ip", "status", "category",
        "rti", "detection_age", "first_detected", "last_detected",
        "sla_days", "remediation_due", "patchable", "solution"
    ]
    with open(output_path, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore")
        w.writeheader()
        w.writerows(findings)
    print(f"\nExported {len(findings)} findings to: {output_path}")


# -----------------------------------------------------------------------
# MAIN
# -----------------------------------------------------------------------

def main():
    if len(sys.argv) < 2:
        print("Usage: python3 parse_qualys.py export.csv [export2.csv ...]")
        sys.exit(1)

    all_findings = []
    for filepath in sys.argv[1:]:
        findings = parse_qualys_csv(filepath)
        print(f"  {filepath}: {len(findings)} findings extracted")
        all_findings.extend(findings)

    unique, dupes = deduplicate(all_findings)
    print(f"  {dupes} duplicates removed, {len(unique)} unique findings remain")

    prioritized = prioritize(unique)
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    export_csv(prioritized, f"qualys_prioritized_{ts}.csv")


if __name__ == "__main__":
    main()
Lab simulation note

Scoring in this pipeline is CVSSv3-weighted for demonstration purposes. In practice, vulnerability prioritization requires broader context -- asset criticality, exploitability in the wild, compensating controls, and business impact. Qualys RTI signals (exploit availability, lateral movement risk) are preserved in the output as additional triage context, but this pipeline is a lab exercise in automation and data normalization, not a production remediation workflow.

Scroll to Top