Nessus Vulnerability Pipeline - Carissa Durko
Vulnerability Management

Nessus Vulnerability
Prioritization Pipeline

A Python pipeline that ingests Nessus HTML scan reports, deduplicates findings across credentialed and uncredentialed scans, applies risk-based priority scoring, and outputs a ranked remediation queue mapped to SLA targets. Built against real scan output from Nessus Essentials; logic designed to translate directly to Qualys exports.

Python BeautifulSoup Nessus Essentials Vulnerability Management CVSS Scoring SLA Tracking Deduplication Remediation Prioritization

Why this exists

Problem
Raw scanner output requires manual triage. Findings from multiple scan types duplicate, severity labels don't account for asset context, and there's no built-in SLA tracking.
Approach
Automated pipeline that parses, deduplicates, scores, and exports findings into a ranked remediation queue - removing the manual sorting step entirely.
Platform note
Built against Nessus Essentials HTML exports (CSV gated behind paid tier). Logic maps directly to Qualys exports - both output CVSS scores, plugin IDs, and asset metadata in equivalent structure.

Real scan output - localhost

1
Critical
4
High
4
Medium
0
Low
100
Info
218 total findings extracted across two scans (credentialed + uncredentialed). Pipeline removed 109 exact duplicates - plugin/host/port combinations present in both scan types - leaving 109 unique findings for remediation review.
Severity Finding CVSS Priority Score SLA
Critical Security Updates for Microsoft Office Products (Nov 2024) 10.0 220.0 Within 15 days
High Security Updates for Microsoft Office Products (Dec 2024) 7.2 163.2 Within 30 days
High Security Updates for Microsoft Office Products (Feb 2025) 7.2 163.2 Within 30 days
High Security Updates for Microsoft Office Products (Jan 2025) 7.2 163.2 Within 30 days
High Security Updates for Microsoft Office Products (Mar 2025) 7.2 163.2 Within 30 days
PROJECT-1 - Unauthenticated
Findings extracted109
Critical1
High4
Medium4
Target127.0.0.1
PROJECT-2 - Credentialed (SSH)
Findings extracted109
Duplicates removed109
Auth methodSSH + sudo
Elevated privilegesYes

Pipeline stages

01
Ingest - HTML Report Parsing

Reads one or more Nessus HTML report files using BeautifulSoup. Extracts plugin ID, plugin name, risk factor, CVSS score, synopsis, description, solution, host, and port for every finding across all provided reports.

parse_nessus_html() → list of finding dicts
02
Deduplicate - Cross-Scan Normalization

Identifies duplicate findings across scan types using a composite key of plugin_id + host + port. When duplicates exist, keeps the instance with the highest CVSS score. Reports exact count of removed duplicates.

deduplicate() → unique findings + dupe_count
03
Score - Risk-Based Prioritization

Calculates a composite priority score per finding: (severity_weight × 40) + (cvss_score × 6). Severity weights: Critical=4, High=3, Medium=2, Low=1. Max score of 220 ensures clear separation between tiers. Sorts output highest-risk first.

priority_score = (severity_weight × 40) + (cvss × 6)
04
SLA Mapping - Remediation Targets

Maps each finding to industry-standard SLA remediation targets based on severity. Critical: 15 days, High: 30 days, Medium: 90 days, Low: 180 days. Informational findings flagged separately with no SLA assigned.

Critical: 15d · High: 30d · Medium: 90d · Low: 180d
05
Export - Remediation Queue CSV

Outputs a timestamped CSV containing all prioritized findings with priority score, severity, CVSS, SLA target, synopsis, and recommended solution. Terminal summary prints severity distribution and top 5 findings on every run.

prioritized_findings_YYYYMMDD_HHMMSS.csv

parse_nessus.py

parse_nessus.py
Python 3
#!/usr/bin/env python3
"""
Nessus HTML Report Parser & Vulnerability Prioritization Pipeline
-----------------------------------------------------------------
Parses Nessus Essentials HTML reports, deduplicates findings,
applies risk-based scoring, and outputs a prioritized remediation CSV.

Usage:
    python3 parse_nessus.py report1.html report2.html
"""

import sys
import csv
import re
from datetime import datetime
from pathlib import Path
from bs4 import BeautifulSoup


# -----------------------------------------------------------------------
# STEP 1: PARSE
# -----------------------------------------------------------------------

def parse_nessus_html(filepath):
    """Extract all findings from a Nessus HTML report."""
    findings = []
    scan_name = Path(filepath).stem

    with open(filepath, "r", encoding="utf-8") as f:
        soup = BeautifulSoup(f, "html.parser")

    sections = soup.find_all("div", class_="section-wrapper")

    for section in sections:
        finding = {
            "scan_name": scan_name,
            "plugin_id": "",
            "plugin_name": "",
            "risk_factor": "None",
            "cvss_score": 0.0,
            "synopsis": "",
            "solution": "",
            "host": "127.0.0.1",
            "port": "",
        }

        header = section.find_previous_sibling("div")
        if header:
            match = re.match(r"(\d+)\s*[-–]\s*(.+)", header.get_text(strip=True))
            if match:
                finding["plugin_id"]   = match.group(1).strip()
                finding["plugin_name"] = match.group(2).strip()

        for dh in section.find_all("div", class_="details-header"):
            label = dh.get_text(strip=True).lower()
            val_div = dh.find_next_sibling("div")
            if not val_div: continue
            value = val_div.get_text(separator=" ", strip=True)

            if   "synopsis"    in label: finding["synopsis"]    = value
            elif "solution"   in label: finding["solution"]   = value
            elif "risk factor" in label: finding["risk_factor"] = value.strip()
            elif "cvss" in label and "base" in label:
                m = re.search(r"(\d+\.\d+)", value)
                if m: finding["cvss_score"] = float(m.group(1))

        port_tag = section.find("h2")
        if port_tag: finding["port"] = port_tag.get_text(strip=True)

        finding["severity"] = risk_to_severity(finding["risk_factor"])

        if finding["plugin_id"]:
            findings.append(finding)

    return findings


def risk_to_severity(risk_factor):
    """Map risk factor string to numeric weight for scoring."""
    return {"critical": 4, "high": 3, "medium": 2, "low": 1, "none": 0}.get(
        risk_factor.lower(), 0
    )


# -----------------------------------------------------------------------
# STEP 2: DEDUPLICATE
# -----------------------------------------------------------------------

def deduplicate(findings):
    """Remove duplicate plugin/host/port combos. Keep highest CVSS."""
    seen = {}
    for f in findings:
        key = (f["plugin_id"], f["host"], f["port"])
        if key not in seen or f["cvss_score"] > seen[key]["cvss_score"]:
            seen[key] = f
    return list(seen.values()), len(findings) - len(seen)


# -----------------------------------------------------------------------
# STEP 3 + 4: SCORE & SLA
# -----------------------------------------------------------------------

def prioritize(findings):
    """Score findings and map to SLA targets. Sort highest risk first."""
    sla_map = {"critical": 15, "high": 30, "medium": 90, "low": 180}
    for f in findings:
        f["priority_score"] = round((f["severity"] * 40) + (f["cvss_score"] * 6), 2)
        sla = sla_map.get(f["risk_factor"].lower())
        f["sla_days"]        = sla
        f["remediation_due"] = f"Within {sla} days" if sla else "Informational"
    findings.sort(key=lambda x: x["priority_score"], reverse=True)
    return findings


# -----------------------------------------------------------------------
# STEP 5: EXPORT
# -----------------------------------------------------------------------

def export_csv(findings, output_path):
    """Write prioritized findings to a timestamped CSV."""
    fields = ["priority_score", "risk_factor", "cvss_score", "plugin_id",
              "plugin_name", "host", "port", "sla_days", "remediation_due",
              "synopsis", "solution", "scan_name"]
    with open(output_path, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore")
        w.writeheader()
        w.writerows(findings)
    print(f"\n✅ Exported {len(findings)} findings → {output_path}")


# -----------------------------------------------------------------------
# MAIN
# -----------------------------------------------------------------------

def main():
    if len(sys.argv) < 2:
        print("Usage: python3 parse_nessus.py report1.html [report2.html ...]")
        sys.exit(1)

    all_findings = []
    for filepath in sys.argv[1:]:
        findings = parse_nessus_html(filepath)
        print(f"✓ {filepath}: {len(findings)} findings")
        all_findings.extend(findings)

    unique, dupes = deduplicate(all_findings)
    print(f"✓ {dupes} duplicates removed, {len(unique)} unique findings")

    prioritized = prioritize(unique)
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    export_csv(prioritized, f"prioritized_findings_{ts}.csv")


if __name__ == "__main__":
    main()
Lab simulation note

Scoring in this pipeline is CVSS-weighted for demonstration purposes. In practice, vulnerability prioritization requires broader context - asset criticality, exploitability in the wild, compensating controls, and business impact - none of which a CVSS score alone captures. This pipeline is a lab exercise in automation and data normalization, not a production remediation workflow.

Scroll to Top