Nessus Vulnerability
Prioritization Pipeline
A Python pipeline that ingests Nessus HTML scan reports, deduplicates findings across credentialed and uncredentialed scans, applies risk-based priority scoring, and outputs a ranked remediation queue mapped to SLA targets. Built against real scan output from Nessus Essentials; logic designed to translate directly to Qualys exports.
Why this exists
Real scan output - localhost
| Severity | Finding | CVSS | Priority Score | SLA |
|---|---|---|---|---|
| Critical | Security Updates for Microsoft Office Products (Nov 2024) | 10.0 | 220.0 | Within 15 days |
| High | Security Updates for Microsoft Office Products (Dec 2024) | 7.2 | 163.2 | Within 30 days |
| High | Security Updates for Microsoft Office Products (Feb 2025) | 7.2 | 163.2 | Within 30 days |
| High | Security Updates for Microsoft Office Products (Jan 2025) | 7.2 | 163.2 | Within 30 days |
| High | Security Updates for Microsoft Office Products (Mar 2025) | 7.2 | 163.2 | Within 30 days |
Pipeline stages
Reads one or more Nessus HTML report files using BeautifulSoup. Extracts plugin ID, plugin name, risk factor, CVSS score, synopsis, description, solution, host, and port for every finding across all provided reports.
Identifies duplicate findings across scan types using a composite key of plugin_id + host + port. When duplicates exist, keeps the instance with the highest CVSS score. Reports exact count of removed duplicates.
Calculates a composite priority score per finding: (severity_weight × 40) + (cvss_score × 6). Severity weights: Critical=4, High=3, Medium=2, Low=1. Max score of 220 ensures clear separation between tiers. Sorts output highest-risk first.
Maps each finding to industry-standard SLA remediation targets based on severity. Critical: 15 days, High: 30 days, Medium: 90 days, Low: 180 days. Informational findings flagged separately with no SLA assigned.
Outputs a timestamped CSV containing all prioritized findings with priority score, severity, CVSS, SLA target, synopsis, and recommended solution. Terminal summary prints severity distribution and top 5 findings on every run.
parse_nessus.py
#!/usr/bin/env python3 """ Nessus HTML Report Parser & Vulnerability Prioritization Pipeline ----------------------------------------------------------------- Parses Nessus Essentials HTML reports, deduplicates findings, applies risk-based scoring, and outputs a prioritized remediation CSV. Usage: python3 parse_nessus.py report1.html report2.html """ import sys import csv import re from datetime import datetime from pathlib import Path from bs4 import BeautifulSoup # ----------------------------------------------------------------------- # STEP 1: PARSE # ----------------------------------------------------------------------- def parse_nessus_html(filepath): """Extract all findings from a Nessus HTML report.""" findings = [] scan_name = Path(filepath).stem with open(filepath, "r", encoding="utf-8") as f: soup = BeautifulSoup(f, "html.parser") sections = soup.find_all("div", class_="section-wrapper") for section in sections: finding = { "scan_name": scan_name, "plugin_id": "", "plugin_name": "", "risk_factor": "None", "cvss_score": 0.0, "synopsis": "", "solution": "", "host": "127.0.0.1", "port": "", } header = section.find_previous_sibling("div") if header: match = re.match(r"(\d+)\s*[-–]\s*(.+)", header.get_text(strip=True)) if match: finding["plugin_id"] = match.group(1).strip() finding["plugin_name"] = match.group(2).strip() for dh in section.find_all("div", class_="details-header"): label = dh.get_text(strip=True).lower() val_div = dh.find_next_sibling("div") if not val_div: continue value = val_div.get_text(separator=" ", strip=True) if "synopsis" in label: finding["synopsis"] = value elif "solution" in label: finding["solution"] = value elif "risk factor" in label: finding["risk_factor"] = value.strip() elif "cvss" in label and "base" in label: m = re.search(r"(\d+\.\d+)", value) if m: finding["cvss_score"] = float(m.group(1)) port_tag = section.find("h2") if port_tag: finding["port"] = port_tag.get_text(strip=True) finding["severity"] = risk_to_severity(finding["risk_factor"]) if finding["plugin_id"]: findings.append(finding) return findings def risk_to_severity(risk_factor): """Map risk factor string to numeric weight for scoring.""" return {"critical": 4, "high": 3, "medium": 2, "low": 1, "none": 0}.get( risk_factor.lower(), 0 ) # ----------------------------------------------------------------------- # STEP 2: DEDUPLICATE # ----------------------------------------------------------------------- def deduplicate(findings): """Remove duplicate plugin/host/port combos. Keep highest CVSS.""" seen = {} for f in findings: key = (f["plugin_id"], f["host"], f["port"]) if key not in seen or f["cvss_score"] > seen[key]["cvss_score"]: seen[key] = f return list(seen.values()), len(findings) - len(seen) # ----------------------------------------------------------------------- # STEP 3 + 4: SCORE & SLA # ----------------------------------------------------------------------- def prioritize(findings): """Score findings and map to SLA targets. Sort highest risk first.""" sla_map = {"critical": 15, "high": 30, "medium": 90, "low": 180} for f in findings: f["priority_score"] = round((f["severity"] * 40) + (f["cvss_score"] * 6), 2) sla = sla_map.get(f["risk_factor"].lower()) f["sla_days"] = sla f["remediation_due"] = f"Within {sla} days" if sla else "Informational" findings.sort(key=lambda x: x["priority_score"], reverse=True) return findings # ----------------------------------------------------------------------- # STEP 5: EXPORT # ----------------------------------------------------------------------- def export_csv(findings, output_path): """Write prioritized findings to a timestamped CSV.""" fields = ["priority_score", "risk_factor", "cvss_score", "plugin_id", "plugin_name", "host", "port", "sla_days", "remediation_due", "synopsis", "solution", "scan_name"] with open(output_path, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore") w.writeheader() w.writerows(findings) print(f"\n✅ Exported {len(findings)} findings → {output_path}") # ----------------------------------------------------------------------- # MAIN # ----------------------------------------------------------------------- def main(): if len(sys.argv) < 2: print("Usage: python3 parse_nessus.py report1.html [report2.html ...]") sys.exit(1) all_findings = [] for filepath in sys.argv[1:]: findings = parse_nessus_html(filepath) print(f"✓ {filepath}: {len(findings)} findings") all_findings.extend(findings) unique, dupes = deduplicate(all_findings) print(f"✓ {dupes} duplicates removed, {len(unique)} unique findings") prioritized = prioritize(unique) ts = datetime.now().strftime("%Y%m%d_%H%M%S") export_csv(prioritized, f"prioritized_findings_{ts}.csv") if __name__ == "__main__": main()
Scoring in this pipeline is CVSS-weighted for demonstration purposes. In practice, vulnerability prioritization requires broader context - asset criticality, exploitability in the wild, compensating controls, and business impact - none of which a CVSS score alone captures. This pipeline is a lab exercise in automation and data normalization, not a production remediation workflow.