Skip to main content
COSMICBYTEZLABS
NewsSecurityHOWTOsToolsStudyTraining
ProjectsChecklistsAI RankingsNewsletterStatusTagsAbout
Subscribe

Press Enter to search or Esc to close

News
Security
HOWTOs
Tools
Study
Training
Projects
Checklists
AI Rankings
Newsletter
Status
Tags
About
RSS Feed
Reading List
Subscribe

Stay in the Loop

Get the latest security alerts, tutorials, and tech insights delivered to your inbox.

Subscribe NowFree forever. No spam.
COSMICBYTEZLABS

Your trusted source for IT intelligence, cybersecurity insights, and hands-on technical guides.

429+ Articles
114+ Guides

CONTENT

  • Latest News
  • Security Alerts
  • HOWTOs
  • Projects
  • Exam Prep

RESOURCES

  • Search
  • Browse Tags
  • Newsletter Archive
  • Reading List
  • RSS Feed

COMPANY

  • About Us
  • Contact
  • Privacy Policy
  • Terms of Service

© 2026 CosmicBytez Labs. All rights reserved.

System Status: Operational
  1. Home
  2. HOWTOs
  3. Python for Security Automation: Essential Scripting
Python for Security Automation: Essential Scripting
HOWTOBeginner

Python for Security Automation: Essential Scripting

Learn Python security scripting fundamentals including network scanning, log parsing, hash analysis, API integration, and automated threat detection for...

Dylan H.

Security Engineering

February 2, 2026
8 min read

Prerequisites

  • Python 3.10+ installed
  • Basic programming concepts (variables, loops, functions)
  • Command line familiarity
  • Understanding of TCP/IP networking basics

Overview

Python is the most widely used language in cybersecurity — from penetration testing tools to SIEM integrations and malware analysis scripts. Its extensive standard library, readable syntax, and vast ecosystem of security packages make it ideal for automating repetitive security tasks.

Who Should Use This Guide:

  • SOC analysts automating daily tasks
  • Security engineers building custom tools
  • IT administrators writing security scripts
  • Students preparing for security certifications (PNPT, OSCP)

What You Will Learn:

SkillApplication
Network ScanningPort scanning, host discovery
Log ParsingAnalyze security logs at scale
Hash OperationsFile integrity, IOC matching
API IntegrationVirusTotal, Shodan, AbuseIPDB
Packet AnalysisParse pcap files with Scapy
Web ScrapingGather OSINT data

Requirements

# Create a virtual environment
python3 -m venv security-scripts
source security-scripts/bin/activate  # Linux/macOS
# security-scripts\Scripts\activate   # Windows
 
# Install security libraries
pip install requests scapy python-nmap shodan ipaddress

Part 1: Network Scanning

Basic Port Scanner

"""
Simple TCP port scanner using sockets.
Usage: python port_scanner.py <target_ip>
"""
 
import socket
import sys
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
 
 
def scan_port(target: str, port: int) -> tuple[int, bool]:
    """Attempt TCP connection to a single port."""
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            sock.settimeout(1)
            result = sock.connect_ex((target, port))
            return port, result == 0
    except socket.error:
        return port, False
 
 
def scan_target(target: str, ports: range, max_threads: int = 100) -> list[int]:
    """Scan multiple ports concurrently."""
    open_ports = []
 
    print(f"Scanning {target} ({len(ports)} ports)...")
    print(f"Start time: {datetime.now().strftime('%H:%M:%S')}")
 
    with ThreadPoolExecutor(max_workers=max_threads) as executor:
        futures = {executor.submit(scan_port, target, p): p for p in ports}
        for future in futures:
            port, is_open = future.result()
            if is_open:
                open_ports.append(port)
                print(f"  [+] Port {port}/tcp — OPEN")
 
    print(f"\nScan complete: {len(open_ports)} open ports found")
    return sorted(open_ports)
 
 
if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: python port_scanner.py <target_ip>")
        sys.exit(1)
 
    target_ip = sys.argv[1]
 
    # Resolve hostname if needed
    try:
        target_ip = socket.gethostbyname(target_ip)
    except socket.gaierror:
        print(f"Cannot resolve hostname: {target_ip}")
        sys.exit(1)
 
    open_ports = scan_target(target_ip, range(1, 1025))

Service Banner Grabbing

"""Grab service banners from open ports."""
 
import socket
 
 
def grab_banner(target: str, port: int, timeout: int = 3) -> str:
    """Connect to a port and retrieve the service banner."""
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            sock.settimeout(timeout)
            sock.connect((target, port))
 
            # Some services send a banner immediately
            try:
                banner = sock.recv(1024).decode("utf-8", errors="replace").strip()
                if banner:
                    return banner
            except socket.timeout:
                pass
 
            # For HTTP, send a request
            if port in (80, 443, 8080, 8443):
                sock.send(b"HEAD / HTTP/1.1\r\nHost: target\r\n\r\n")
                return sock.recv(1024).decode("utf-8", errors="replace").strip()
 
            return "No banner received"
    except Exception as e:
        return f"Error: {e}"
 
 
# Example usage
target = "192.168.1.100"
ports = [22, 80, 443, 3306, 8080]
 
for port in ports:
    banner = grab_banner(target, port)
    print(f"Port {port}: {banner[:100]}")

Part 2: Log Analysis

Parse Windows Security Event Logs

"""Parse exported Windows Security event logs (CSV format)."""
 
import csv
from collections import Counter
from pathlib import Path
 
 
def parse_security_log(log_file: str) -> list[dict]:
    """Parse Windows Security event log exported as CSV."""
    events = []
    with open(log_file, "r", encoding="utf-8-sig") as f:
        reader = csv.DictReader(f)
        for row in reader:
            events.append(row)
    return events
 
 
def find_failed_logins(events: list[dict]) -> list[dict]:
    """Find failed login attempts (Event ID 4625)."""
    return [e for e in events if e.get("EventID") == "4625"]
 
 
def find_brute_force(events: list[dict], threshold: int = 5) -> dict:
    """Detect potential brute force — IPs with failed logins above threshold."""
    failed = find_failed_logins(events)
    ip_counts = Counter(e.get("SourceIP", "unknown") for e in failed)
    return {ip: count for ip, count in ip_counts.items() if count >= threshold}
 
 
def find_privilege_escalation(events: list[dict]) -> list[dict]:
    """Find privilege escalation events (Event ID 4672 — Special Logon)."""
    return [e for e in events if e.get("EventID") == "4672"]
 
 
# Example usage
if __name__ == "__main__":
    log_file = "security_events.csv"
 
    if Path(log_file).exists():
        events = parse_security_log(log_file)
        print(f"Total events: {len(events)}")
 
        brute_force = find_brute_force(events)
        if brute_force:
            print("\n[!] Potential brute force detected:")
            for ip, count in sorted(brute_force.items(), key=lambda x: -x[1]):
                print(f"  {ip}: {count} failed attempts")
 
        priv_esc = find_privilege_escalation(events)
        print(f"\nPrivilege escalation events: {len(priv_esc)}")

Parse Apache/Nginx Access Logs

"""Analyze web server access logs for suspicious activity."""
 
import re
from collections import Counter
 
 
LOG_PATTERN = re.compile(
    r'(?P<ip>\d+\.\d+\.\d+\.\d+) - - '
    r'\[(?P<date>[^\]]+)\] '
    r'"(?P<method>\w+) (?P<path>[^\s]+) [^"]*" '
    r'(?P<status>\d+) (?P<size>\d+)'
)
 
SUSPICIOUS_PATHS = [
    r"/wp-admin", r"/wp-login", r"\.env",
    r"/admin", r"/phpmyadmin", r"\.git",
    r"/etc/passwd", r"\.\./", r"<script",
]
 
 
def parse_access_log(log_file: str) -> list[dict]:
    """Parse access log lines into structured data."""
    entries = []
    with open(log_file, "r") as f:
        for line in f:
            match = LOG_PATTERN.match(line)
            if match:
                entries.append(match.groupdict())
    return entries
 
 
def detect_scanning(entries: list[dict], threshold: int = 50) -> dict:
    """Detect IPs making many 404 requests (directory scanning)."""
    scanner_ips = Counter()
    for entry in entries:
        if entry["status"] == "404":
            scanner_ips[entry["ip"]] += 1
    return {ip: count for ip, count in scanner_ips.items() if count >= threshold}
 
 
def detect_suspicious_requests(entries: list[dict]) -> list[dict]:
    """Find requests matching known attack patterns."""
    suspicious = []
    for entry in entries:
        for pattern in SUSPICIOUS_PATHS:
            if re.search(pattern, entry["path"], re.IGNORECASE):
                suspicious.append(entry)
                break
    return suspicious
 
 
# Example usage
if __name__ == "__main__":
    entries = parse_access_log("access.log")
    print(f"Total requests: {len(entries)}")
 
    scanners = detect_scanning(entries)
    print(f"\nPotential scanners ({len(scanners)} IPs):")
    for ip, count in sorted(scanners.items(), key=lambda x: -x[1])[:10]:
        print(f"  {ip}: {count} 404 responses")
 
    suspicious = detect_suspicious_requests(entries)
    print(f"\nSuspicious requests: {len(suspicious)}")
    for req in suspicious[:10]:
        print(f"  [{req['ip']}] {req['method']} {req['path']} -> {req['status']}")

Part 3: Hash Operations and IOC Matching

File Hash Calculator

"""Calculate file hashes for integrity verification and IOC matching."""
 
import hashlib
from pathlib import Path
 
 
def hash_file(filepath: str) -> dict[str, str]:
    """Calculate MD5, SHA1, and SHA256 hashes for a file."""
    md5 = hashlib.md5()
    sha1 = hashlib.sha1()
    sha256 = hashlib.sha256()
 
    with open(filepath, "rb") as f:
        while chunk := f.read(8192):
            md5.update(chunk)
            sha1.update(chunk)
            sha256.update(chunk)
 
    return {
        "md5": md5.hexdigest(),
        "sha1": sha1.hexdigest(),
        "sha256": sha256.hexdigest(),
    }
 
 
def check_against_iocs(file_hash: str, ioc_file: str) -> bool:
    """Check if a hash matches known IOCs."""
    iocs = set()
    with open(ioc_file, "r") as f:
        for line in f:
            cleaned = line.strip().lower()
            if cleaned and not cleaned.startswith("#"):
                iocs.add(cleaned)
    return file_hash.lower() in iocs
 
 
def scan_directory(directory: str, ioc_file: str) -> list[dict]:
    """Scan all files in a directory against IOC list."""
    matches = []
    for filepath in Path(directory).rglob("*"):
        if filepath.is_file():
            try:
                hashes = hash_file(str(filepath))
                for hash_type, hash_value in hashes.items():
                    if check_against_iocs(hash_value, ioc_file):
                        matches.append({
                            "file": str(filepath),
                            "hash_type": hash_type,
                            "hash": hash_value,
                        })
            except PermissionError:
                continue
    return matches
 
 
# Example usage
if __name__ == "__main__":
    # Hash a single file
    hashes = hash_file("/path/to/suspicious_file.exe")
    for algo, value in hashes.items():
        print(f"  {algo.upper()}: {value}")

Part 4: API Integrations

VirusTotal File Check

"""Check file hashes against VirusTotal."""
 
import requests
 
 
def check_virustotal(file_hash: str, api_key: str) -> dict:
    """Query VirusTotal for a file hash."""
    url = f"https://www.virustotal.com/api/v3/files/{file_hash}"
    headers = {"x-apikey": api_key}
 
    response = requests.get(url, headers=headers, timeout=30)
 
    if response.status_code == 200:
        data = response.json()["data"]["attributes"]
        stats = data.get("last_analysis_stats", {})
        return {
            "name": data.get("meaningful_name", "Unknown"),
            "malicious": stats.get("malicious", 0),
            "undetected": stats.get("undetected", 0),
            "total": sum(stats.values()),
            "reputation": data.get("reputation", "N/A"),
        }
    elif response.status_code == 404:
        return {"error": "Hash not found in VirusTotal"}
    else:
        return {"error": f"API error: {response.status_code}"}
 
 
# Example usage (requires API key)
# result = check_virustotal("hash_here", "<YOUR_API_KEY>")
# print(f"Detections: {result['malicious']}/{result['total']}")

AbuseIPDB IP Reputation Check

"""Check IP reputation against AbuseIPDB."""
 
import requests
 
 
def check_ip_reputation(ip: str, api_key: str) -> dict:
    """Query AbuseIPDB for IP reputation."""
    url = "https://api.abuseipdb.com/api/v2/check"
    headers = {"Key": api_key, "Accept": "application/json"}
    params = {"ipAddress": ip, "maxAgeInDays": 90}
 
    response = requests.get(url, headers=headers, params=params, timeout=30)
 
    if response.status_code == 200:
        data = response.json()["data"]
        return {
            "ip": data["ipAddress"],
            "abuse_score": data["abuseConfidenceScore"],
            "country": data["countryCode"],
            "isp": data["isp"],
            "total_reports": data["totalReports"],
            "is_tor": data.get("isTor", False),
        }
    return {"error": f"API error: {response.status_code}"}
 
 
# Example usage (requires API key)
# result = check_ip_reputation("1.2.3.4", "<YOUR_API_KEY>")
# print(f"Abuse Score: {result['abuse_score']}%")

Part 5: Useful One-Liners

Quick Security Checks

# Check if a password appears in known breach lists (k-anonymity)
import hashlib, requests
def check_pwned(password: str) -> int:
    sha1 = hashlib.sha1(password.encode()).hexdigest().upper()
    prefix, suffix = sha1[:5], sha1[5:]
    resp = requests.get(f"https://api.pwnedpasswords.com/range/{prefix}")
    for line in resp.text.splitlines():
        if line.split(":")[0] == suffix:
            return int(line.split(":")[1])
    return 0
 
# Generate a random secure password
import secrets, string
def generate_password(length: int = 20) -> str:
    chars = string.ascii_letters + string.digits + string.punctuation
    return "".join(secrets.choice(chars) for _ in range(length))
 
# Validate IP address
import ipaddress
def is_valid_ip(ip: str) -> bool:
    try:
        ipaddress.ip_address(ip)
        return True
    except ValueError:
        return False
 
# Check if IP is private/internal
def is_private_ip(ip: str) -> bool:
    return ipaddress.ip_address(ip).is_private
 
# Base64 encode/decode
import base64
def b64_encode(data: str) -> str:
    return base64.b64encode(data.encode()).decode()
def b64_decode(data: str) -> str:
    return base64.b64decode(data.encode()).decode()

Next Steps

SkillTools to Learn
Packet AnalysisScapy, dpkt, pyshark
Web SecurityBurp Suite extensions, Selenium
Malware Analysispefile, yara-python, oletools
Forensicsvolatility3, plaso
Automationansible, paramiko (SSH)

References

  • Python Security Libraries — Awesome List
  • SANS Python for Security Professionals
  • VirusTotal API Documentation
  • AbuseIPDB API Documentation
#Python#Security Automation#Scripting#SOC#Threat Detection#Network Security

Related Articles

How to Deploy Falco for Kubernetes Runtime Security Monitoring

Step-by-step guide to deploying Falco as a Kubernetes runtime security engine. Covers Helm installation, custom rule authoring, Falcosidekick alerting...

12 min read

How to Deploy Wazuh SIEM/XDR for Unified Security Monitoring

Step-by-step guide to deploying Wazuh as an open-source SIEM and XDR platform. Covers server installation, agent deployment across Windows and Linux,...

13 min read

How to Configure Microsoft Sentinel Analytics Rules

End-to-end SOC guide for Microsoft Sentinel: build KQL-based scheduled and NRT analytics rules, wire automation rules for incident triage, and deploy...

15 min read
Back to all HOWTOs