Overview
Python is the most widely used language in cybersecurity — from penetration testing tools to SIEM integrations and malware analysis scripts. Its extensive standard library, readable syntax, and vast ecosystem of security packages make it ideal for automating repetitive security tasks.
Who Should Use This Guide:
- SOC analysts automating daily tasks
- Security engineers building custom tools
- IT administrators writing security scripts
- Students preparing for security certifications (PNPT, OSCP)
What You Will Learn:
| Skill | Application |
|---|---|
| Network Scanning | Port scanning, host discovery |
| Log Parsing | Analyze security logs at scale |
| Hash Operations | File integrity, IOC matching |
| API Integration | VirusTotal, Shodan, AbuseIPDB |
| Packet Analysis | Parse pcap files with Scapy |
| Web Scraping | Gather OSINT data |
Requirements
# Create a virtual environment
python3 -m venv security-scripts
source security-scripts/bin/activate # Linux/macOS
# security-scripts\Scripts\activate # Windows
# Install security libraries
pip install requests scapy python-nmap shodan ipaddressPart 1: Network Scanning
Basic Port Scanner
"""
Simple TCP port scanner using sockets.
Usage: python port_scanner.py <target_ip>
"""
import socket
import sys
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
def scan_port(target: str, port: int) -> tuple[int, bool]:
"""Attempt TCP connection to a single port."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(1)
result = sock.connect_ex((target, port))
return port, result == 0
except socket.error:
return port, False
def scan_target(target: str, ports: range, max_threads: int = 100) -> list[int]:
"""Scan multiple ports concurrently."""
open_ports = []
print(f"Scanning {target} ({len(ports)} ports)...")
print(f"Start time: {datetime.now().strftime('%H:%M:%S')}")
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = {executor.submit(scan_port, target, p): p for p in ports}
for future in futures:
port, is_open = future.result()
if is_open:
open_ports.append(port)
print(f" [+] Port {port}/tcp — OPEN")
print(f"\nScan complete: {len(open_ports)} open ports found")
return sorted(open_ports)
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python port_scanner.py <target_ip>")
sys.exit(1)
target_ip = sys.argv[1]
# Resolve hostname if needed
try:
target_ip = socket.gethostbyname(target_ip)
except socket.gaierror:
print(f"Cannot resolve hostname: {target_ip}")
sys.exit(1)
open_ports = scan_target(target_ip, range(1, 1025))Service Banner Grabbing
"""Grab service banners from open ports."""
import socket
def grab_banner(target: str, port: int, timeout: int = 3) -> str:
"""Connect to a port and retrieve the service banner."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
sock.connect((target, port))
# Some services send a banner immediately
try:
banner = sock.recv(1024).decode("utf-8", errors="replace").strip()
if banner:
return banner
except socket.timeout:
pass
# For HTTP, send a request
if port in (80, 443, 8080, 8443):
sock.send(b"HEAD / HTTP/1.1\r\nHost: target\r\n\r\n")
return sock.recv(1024).decode("utf-8", errors="replace").strip()
return "No banner received"
except Exception as e:
return f"Error: {e}"
# Example usage
target = "192.168.1.100"
ports = [22, 80, 443, 3306, 8080]
for port in ports:
banner = grab_banner(target, port)
print(f"Port {port}: {banner[:100]}")Part 2: Log Analysis
Parse Windows Security Event Logs
"""Parse exported Windows Security event logs (CSV format)."""
import csv
from collections import Counter
from pathlib import Path
def parse_security_log(log_file: str) -> list[dict]:
"""Parse Windows Security event log exported as CSV."""
events = []
with open(log_file, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for row in reader:
events.append(row)
return events
def find_failed_logins(events: list[dict]) -> list[dict]:
"""Find failed login attempts (Event ID 4625)."""
return [e for e in events if e.get("EventID") == "4625"]
def find_brute_force(events: list[dict], threshold: int = 5) -> dict:
"""Detect potential brute force — IPs with failed logins above threshold."""
failed = find_failed_logins(events)
ip_counts = Counter(e.get("SourceIP", "unknown") for e in failed)
return {ip: count for ip, count in ip_counts.items() if count >= threshold}
def find_privilege_escalation(events: list[dict]) -> list[dict]:
"""Find privilege escalation events (Event ID 4672 — Special Logon)."""
return [e for e in events if e.get("EventID") == "4672"]
# Example usage
if __name__ == "__main__":
log_file = "security_events.csv"
if Path(log_file).exists():
events = parse_security_log(log_file)
print(f"Total events: {len(events)}")
brute_force = find_brute_force(events)
if brute_force:
print("\n[!] Potential brute force detected:")
for ip, count in sorted(brute_force.items(), key=lambda x: -x[1]):
print(f" {ip}: {count} failed attempts")
priv_esc = find_privilege_escalation(events)
print(f"\nPrivilege escalation events: {len(priv_esc)}")Parse Apache/Nginx Access Logs
"""Analyze web server access logs for suspicious activity."""
import re
from collections import Counter
LOG_PATTERN = re.compile(
r'(?P<ip>\d+\.\d+\.\d+\.\d+) - - '
r'\[(?P<date>[^\]]+)\] '
r'"(?P<method>\w+) (?P<path>[^\s]+) [^"]*" '
r'(?P<status>\d+) (?P<size>\d+)'
)
SUSPICIOUS_PATHS = [
r"/wp-admin", r"/wp-login", r"\.env",
r"/admin", r"/phpmyadmin", r"\.git",
r"/etc/passwd", r"\.\./", r"<script",
]
def parse_access_log(log_file: str) -> list[dict]:
"""Parse access log lines into structured data."""
entries = []
with open(log_file, "r") as f:
for line in f:
match = LOG_PATTERN.match(line)
if match:
entries.append(match.groupdict())
return entries
def detect_scanning(entries: list[dict], threshold: int = 50) -> dict:
"""Detect IPs making many 404 requests (directory scanning)."""
scanner_ips = Counter()
for entry in entries:
if entry["status"] == "404":
scanner_ips[entry["ip"]] += 1
return {ip: count for ip, count in scanner_ips.items() if count >= threshold}
def detect_suspicious_requests(entries: list[dict]) -> list[dict]:
"""Find requests matching known attack patterns."""
suspicious = []
for entry in entries:
for pattern in SUSPICIOUS_PATHS:
if re.search(pattern, entry["path"], re.IGNORECASE):
suspicious.append(entry)
break
return suspicious
# Example usage
if __name__ == "__main__":
entries = parse_access_log("access.log")
print(f"Total requests: {len(entries)}")
scanners = detect_scanning(entries)
print(f"\nPotential scanners ({len(scanners)} IPs):")
for ip, count in sorted(scanners.items(), key=lambda x: -x[1])[:10]:
print(f" {ip}: {count} 404 responses")
suspicious = detect_suspicious_requests(entries)
print(f"\nSuspicious requests: {len(suspicious)}")
for req in suspicious[:10]:
print(f" [{req['ip']}] {req['method']} {req['path']} -> {req['status']}")Part 3: Hash Operations and IOC Matching
File Hash Calculator
"""Calculate file hashes for integrity verification and IOC matching."""
import hashlib
from pathlib import Path
def hash_file(filepath: str) -> dict[str, str]:
"""Calculate MD5, SHA1, and SHA256 hashes for a file."""
md5 = hashlib.md5()
sha1 = hashlib.sha1()
sha256 = hashlib.sha256()
with open(filepath, "rb") as f:
while chunk := f.read(8192):
md5.update(chunk)
sha1.update(chunk)
sha256.update(chunk)
return {
"md5": md5.hexdigest(),
"sha1": sha1.hexdigest(),
"sha256": sha256.hexdigest(),
}
def check_against_iocs(file_hash: str, ioc_file: str) -> bool:
"""Check if a hash matches known IOCs."""
iocs = set()
with open(ioc_file, "r") as f:
for line in f:
cleaned = line.strip().lower()
if cleaned and not cleaned.startswith("#"):
iocs.add(cleaned)
return file_hash.lower() in iocs
def scan_directory(directory: str, ioc_file: str) -> list[dict]:
"""Scan all files in a directory against IOC list."""
matches = []
for filepath in Path(directory).rglob("*"):
if filepath.is_file():
try:
hashes = hash_file(str(filepath))
for hash_type, hash_value in hashes.items():
if check_against_iocs(hash_value, ioc_file):
matches.append({
"file": str(filepath),
"hash_type": hash_type,
"hash": hash_value,
})
except PermissionError:
continue
return matches
# Example usage
if __name__ == "__main__":
# Hash a single file
hashes = hash_file("/path/to/suspicious_file.exe")
for algo, value in hashes.items():
print(f" {algo.upper()}: {value}")Part 4: API Integrations
VirusTotal File Check
"""Check file hashes against VirusTotal."""
import requests
def check_virustotal(file_hash: str, api_key: str) -> dict:
"""Query VirusTotal for a file hash."""
url = f"https://www.virustotal.com/api/v3/files/{file_hash}"
headers = {"x-apikey": api_key}
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
data = response.json()["data"]["attributes"]
stats = data.get("last_analysis_stats", {})
return {
"name": data.get("meaningful_name", "Unknown"),
"malicious": stats.get("malicious", 0),
"undetected": stats.get("undetected", 0),
"total": sum(stats.values()),
"reputation": data.get("reputation", "N/A"),
}
elif response.status_code == 404:
return {"error": "Hash not found in VirusTotal"}
else:
return {"error": f"API error: {response.status_code}"}
# Example usage (requires API key)
# result = check_virustotal("hash_here", "<YOUR_API_KEY>")
# print(f"Detections: {result['malicious']}/{result['total']}")AbuseIPDB IP Reputation Check
"""Check IP reputation against AbuseIPDB."""
import requests
def check_ip_reputation(ip: str, api_key: str) -> dict:
"""Query AbuseIPDB for IP reputation."""
url = "https://api.abuseipdb.com/api/v2/check"
headers = {"Key": api_key, "Accept": "application/json"}
params = {"ipAddress": ip, "maxAgeInDays": 90}
response = requests.get(url, headers=headers, params=params, timeout=30)
if response.status_code == 200:
data = response.json()["data"]
return {
"ip": data["ipAddress"],
"abuse_score": data["abuseConfidenceScore"],
"country": data["countryCode"],
"isp": data["isp"],
"total_reports": data["totalReports"],
"is_tor": data.get("isTor", False),
}
return {"error": f"API error: {response.status_code}"}
# Example usage (requires API key)
# result = check_ip_reputation("1.2.3.4", "<YOUR_API_KEY>")
# print(f"Abuse Score: {result['abuse_score']}%")Part 5: Useful One-Liners
Quick Security Checks
# Check if a password appears in known breach lists (k-anonymity)
import hashlib, requests
def check_pwned(password: str) -> int:
sha1 = hashlib.sha1(password.encode()).hexdigest().upper()
prefix, suffix = sha1[:5], sha1[5:]
resp = requests.get(f"https://api.pwnedpasswords.com/range/{prefix}")
for line in resp.text.splitlines():
if line.split(":")[0] == suffix:
return int(line.split(":")[1])
return 0
# Generate a random secure password
import secrets, string
def generate_password(length: int = 20) -> str:
chars = string.ascii_letters + string.digits + string.punctuation
return "".join(secrets.choice(chars) for _ in range(length))
# Validate IP address
import ipaddress
def is_valid_ip(ip: str) -> bool:
try:
ipaddress.ip_address(ip)
return True
except ValueError:
return False
# Check if IP is private/internal
def is_private_ip(ip: str) -> bool:
return ipaddress.ip_address(ip).is_private
# Base64 encode/decode
import base64
def b64_encode(data: str) -> str:
return base64.b64encode(data.encode()).decode()
def b64_decode(data: str) -> str:
return base64.b64decode(data.encode()).decode()Next Steps
| Skill | Tools to Learn |
|---|---|
| Packet Analysis | Scapy, dpkt, pyshark |
| Web Security | Burp Suite extensions, Selenium |
| Malware Analysis | pefile, yara-python, oletools |
| Forensics | volatility3, plaso |
| Automation | ansible, paramiko (SSH) |