Files
xdp-defense/lib/xdp_whitelist.py
kaffa 667c6eac81 Fix 12 code review issues (4 MEDIUM + 8 LOW)
MEDIUM:
- M1: Whitelist direct IP/CIDR additions now persist to direct.txt
- M2: get_map_id() uses 5s TTL cache (single bpftool call for all maps)
- M3: IPv6 extension header parsing in xdp_ddos.c (hop-by-hop/routing/frag/dst)
- M4: Shell injection prevention - sanitize_input() + sys.argv[] for all Python calls

LOW:
- L1: Remove redundant self.running (uses _stop_event only)
- L2: Remove unused config values (rate_limit_after, cooldown_multiplier, retrain_interval)
- L3: Thread poll intervals reloaded on SIGHUP
- L4: batch_map_operation counts only successfully written entries
- L5: Clarify unique_ips_approx comment (per-packet counter)
- L6: Document LRU_HASH multi-CPU race condition as acceptable
- L7: Download Cloudflare IPv6 ranges in whitelist preset
- L8: Fix file handle leak in xdp_country.py list_countries()

Also: SIGHUP now preserves EWMA/violation state, daemon skips whitelisted
IPs in EWMA/AI escalation, deep copy for default config, IHL validation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 09:23:41 +09:00

276 lines
8.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""
XDP Whitelist Manager - Fast batch IP whitelisting
Supports presets like Cloudflare, AWS, Google, etc.
"""
import sys
import json
import urllib.request
from pathlib import Path
from xdp_common import get_map_id, batch_map_operation, classify_cidrs
WHITELIST_DIR = Path("/etc/xdp-blocker/whitelist")
# Preset URLs for trusted services
PRESETS = {
"cloudflare": {
"v4": "https://www.cloudflare.com/ips-v4",
"v6": "https://www.cloudflare.com/ips-v6",
"desc": "Cloudflare CDN/Proxy"
},
"aws": {
"v4": "https://ip-ranges.amazonaws.com/ip-ranges.json",
"desc": "Amazon Web Services (all regions)"
},
"google": {
"v4": "https://www.gstatic.com/ipranges/cloud.json",
"desc": "Google Cloud Platform"
},
"github": {
"v4": "https://api.github.com/meta",
"desc": "GitHub Services"
}
}
def download_cloudflare():
"""Download Cloudflare IP ranges (IPv4 + IPv6)"""
cidrs = []
try:
req = urllib.request.Request(
PRESETS["cloudflare"]["v4"],
headers={"User-Agent": "xdp-whitelist/1.0"}
)
with urllib.request.urlopen(req) as r:
v4 = r.read().decode().strip().split('\n')
cidrs.extend(v4)
print(f" Downloaded {len(v4)} IPv4 ranges")
except Exception as e:
print(f" [WARN] Failed to download IPv4: {e}")
try:
req = urllib.request.Request(
PRESETS["cloudflare"]["v6"],
headers={"User-Agent": "xdp-whitelist/1.0"}
)
with urllib.request.urlopen(req) as r:
v6 = r.read().decode().strip().split('\n')
cidrs.extend(v6)
print(f" Downloaded {len(v6)} IPv6 ranges")
except Exception as e:
print(f" [WARN] Failed to download IPv6: {e}")
return cidrs
def download_aws():
"""Download AWS IP ranges"""
cidrs = []
try:
with urllib.request.urlopen(PRESETS["aws"]["v4"]) as r:
data = json.loads(r.read().decode())
for prefix in data.get("prefixes", []):
cidrs.append(prefix["ip_prefix"])
print(f" Downloaded {len(cidrs)} IPv4 ranges")
except Exception as e:
print(f" [WARN] Failed to download: {e}")
return cidrs
def download_google():
"""Download Google Cloud IP ranges"""
cidrs = []
try:
with urllib.request.urlopen(PRESETS["google"]["v4"]) as r:
data = json.loads(r.read().decode())
for prefix in data.get("prefixes", []):
if "ipv4Prefix" in prefix:
cidrs.append(prefix["ipv4Prefix"])
print(f" Downloaded {len(cidrs)} IPv4 ranges")
except Exception as e:
print(f" [WARN] Failed to download: {e}")
return cidrs
def download_github():
"""Download GitHub IP ranges"""
cidrs = []
try:
req = urllib.request.Request(
PRESETS["github"]["v4"],
headers={"User-Agent": "xdp-whitelist"}
)
with urllib.request.urlopen(req) as r:
data = json.loads(r.read().decode())
for key in ["hooks", "web", "api", "git", "packages", "pages", "importer", "actions", "dependabot"]:
if key in data:
cidrs.extend(data[key])
cidrs = list(set(c for c in cidrs if ':' not in c))
print(f" Downloaded {len(cidrs)} IPv4 ranges")
except Exception as e:
print(f" [WARN] Failed to download: {e}")
return cidrs
def add_whitelist(name, cidrs=None):
"""Add IPs to whitelist"""
name = name.lower()
WHITELIST_DIR.mkdir(parents=True, exist_ok=True)
wl_file = WHITELIST_DIR / f"{name}.txt"
if cidrs is None and wl_file.exists():
with open(wl_file) as f:
cidrs = [line.strip() for line in f if line.strip()]
if cidrs:
print(f"[INFO] Using cached {name} ({len(cidrs)} CIDRs)")
if cidrs is None:
if name == "cloudflare":
print(f"[INFO] Downloading Cloudflare IP ranges...")
cidrs = download_cloudflare()
elif name == "aws":
print(f"[INFO] Downloading AWS IP ranges...")
cidrs = download_aws()
elif name == "google":
print(f"[INFO] Downloading Google Cloud IP ranges...")
cidrs = download_google()
elif name == "github":
print(f"[INFO] Downloading GitHub IP ranges...")
cidrs = download_github()
else:
print(f"[ERROR] Unknown preset: {name}")
print(f"Available presets: {', '.join(PRESETS.keys())}")
return False
if not cidrs:
print("[ERROR] No CIDRs to add")
return False
with open(wl_file, 'w') as f:
f.write('\n'.join(cidrs))
map_id = get_map_id("whitelist_v4")
if not map_id:
print("[ERROR] whitelist_v4 map not found. Is XDP loaded?")
return False
v4_cidrs, v6_cidrs = classify_cidrs(cidrs)
if v4_cidrs:
print(f"[INFO] Adding {len(v4_cidrs)} IPv4 CIDRs to whitelist...")
batch_map_operation(map_id, v4_cidrs, operation="update", batch_size=500)
if v6_cidrs:
map_id_v6 = get_map_id("whitelist_v6")
if map_id_v6:
print(f"[INFO] Adding {len(v6_cidrs)} IPv6 CIDRs to whitelist...")
batch_map_operation(map_id_v6, v6_cidrs, operation="update", batch_size=500, ipv6=True)
print(f"[OK] Whitelisted {name}: {len(v4_cidrs)} v4 + {len(v6_cidrs)} v6 CIDRs")
return True
def del_whitelist(name):
"""Remove IPs from whitelist"""
name = name.lower()
wl_file = WHITELIST_DIR / f"{name}.txt"
if not wl_file.exists():
print(f"[ERROR] {name} is not whitelisted")
return False
map_id = get_map_id("whitelist_v4")
if not map_id:
print("[ERROR] whitelist_v4 map not found")
return False
with open(wl_file) as f:
cidrs = [line.strip() for line in f if line.strip()]
v4_cidrs, v6_cidrs = classify_cidrs(cidrs)
if v4_cidrs:
print(f"[INFO] Removing {len(v4_cidrs)} IPv4 CIDRs from whitelist...")
batch_map_operation(map_id, v4_cidrs, operation="delete", batch_size=500)
if v6_cidrs:
map_id_v6 = get_map_id("whitelist_v6")
if map_id_v6:
print(f"[INFO] Removing {len(v6_cidrs)} IPv6 CIDRs from whitelist...")
batch_map_operation(map_id_v6, v6_cidrs, operation="delete", batch_size=500, ipv6=True)
wl_file.unlink()
print(f"[OK] Removed {name} from whitelist")
return True
def list_whitelist():
"""List whitelisted services"""
print("=== Whitelisted Services ===")
if not WHITELIST_DIR.exists():
print(" (none)")
return
files = list(WHITELIST_DIR.glob("*.txt"))
if not files:
print(" (none)")
return
for wl_file in sorted(files):
name = wl_file.stem
with open(wl_file) as f:
cidrs = [line.strip() for line in f if line.strip()]
v4_count = sum(1 for c in cidrs if ':' not in c)
v6_count = len(cidrs) - v4_count
desc = PRESETS.get(name, {}).get("desc", "Custom")
if v6_count > 0:
print(f" {name}: {v4_count} v4 + {v6_count} v6 CIDRs ({desc})")
else:
print(f" {name}: {v4_count} CIDRs ({desc})")
def show_presets():
"""Show available presets"""
print("=== Available Presets ===")
for name, info in PRESETS.items():
print(f" {name}: {info['desc']}")
def show_help():
print("""XDP Whitelist Manager - Fast batch IP whitelisting
Usage: xdp-whitelist <command> [args]
Commands:
add <preset|file> Whitelist a preset or custom file
del <name> Remove from whitelist
list List whitelisted services
presets Show available presets
Presets:
cloudflare Cloudflare CDN/Proxy IPs
aws Amazon Web Services
google Google Cloud Platform
github GitHub Services
Examples:
xdp-whitelist add cloudflare # Whitelist Cloudflare
xdp-whitelist add aws # Whitelist AWS
xdp-whitelist del cloudflare # Remove Cloudflare
xdp-whitelist list
""")
def main():
if len(sys.argv) < 2:
show_help()
return
cmd = sys.argv[1]
if cmd == "add" and len(sys.argv) >= 3:
add_whitelist(sys.argv[2])
elif cmd == "del" and len(sys.argv) >= 3:
del_whitelist(sys.argv[2])
elif cmd == "list":
list_whitelist()
elif cmd == "presets":
show_presets()
else:
show_help()
if __name__ == "__main__":
main()