MEDIUM: - M1: Whitelist direct IP/CIDR additions now persist to direct.txt - M2: get_map_id() uses 5s TTL cache (single bpftool call for all maps) - M3: IPv6 extension header parsing in xdp_ddos.c (hop-by-hop/routing/frag/dst) - M4: Shell injection prevention - sanitize_input() + sys.argv[] for all Python calls LOW: - L1: Remove redundant self.running (uses _stop_event only) - L2: Remove unused config values (rate_limit_after, cooldown_multiplier, retrain_interval) - L3: Thread poll intervals reloaded on SIGHUP - L4: batch_map_operation counts only successfully written entries - L5: Clarify unique_ips_approx comment (per-packet counter) - L6: Document LRU_HASH multi-CPU race condition as acceptable - L7: Download Cloudflare IPv6 ranges in whitelist preset - L8: Fix file handle leak in xdp_country.py list_countries() Also: SIGHUP now preserves EWMA/violation state, daemon skips whitelisted IPs in EWMA/AI escalation, deep copy for default config, IHL validation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
276 lines
8.5 KiB
Python
Executable File
276 lines
8.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
XDP Whitelist Manager - Fast batch IP whitelisting
|
|
Supports presets like Cloudflare, AWS, Google, etc.
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
from xdp_common import get_map_id, batch_map_operation, classify_cidrs
|
|
|
|
WHITELIST_DIR = Path("/etc/xdp-blocker/whitelist")
|
|
|
|
# Preset URLs for trusted services
|
|
PRESETS = {
|
|
"cloudflare": {
|
|
"v4": "https://www.cloudflare.com/ips-v4",
|
|
"v6": "https://www.cloudflare.com/ips-v6",
|
|
"desc": "Cloudflare CDN/Proxy"
|
|
},
|
|
"aws": {
|
|
"v4": "https://ip-ranges.amazonaws.com/ip-ranges.json",
|
|
"desc": "Amazon Web Services (all regions)"
|
|
},
|
|
"google": {
|
|
"v4": "https://www.gstatic.com/ipranges/cloud.json",
|
|
"desc": "Google Cloud Platform"
|
|
},
|
|
"github": {
|
|
"v4": "https://api.github.com/meta",
|
|
"desc": "GitHub Services"
|
|
}
|
|
}
|
|
|
|
def download_cloudflare():
|
|
"""Download Cloudflare IP ranges (IPv4 + IPv6)"""
|
|
cidrs = []
|
|
try:
|
|
req = urllib.request.Request(
|
|
PRESETS["cloudflare"]["v4"],
|
|
headers={"User-Agent": "xdp-whitelist/1.0"}
|
|
)
|
|
with urllib.request.urlopen(req) as r:
|
|
v4 = r.read().decode().strip().split('\n')
|
|
cidrs.extend(v4)
|
|
print(f" Downloaded {len(v4)} IPv4 ranges")
|
|
except Exception as e:
|
|
print(f" [WARN] Failed to download IPv4: {e}")
|
|
|
|
try:
|
|
req = urllib.request.Request(
|
|
PRESETS["cloudflare"]["v6"],
|
|
headers={"User-Agent": "xdp-whitelist/1.0"}
|
|
)
|
|
with urllib.request.urlopen(req) as r:
|
|
v6 = r.read().decode().strip().split('\n')
|
|
cidrs.extend(v6)
|
|
print(f" Downloaded {len(v6)} IPv6 ranges")
|
|
except Exception as e:
|
|
print(f" [WARN] Failed to download IPv6: {e}")
|
|
return cidrs
|
|
|
|
def download_aws():
|
|
"""Download AWS IP ranges"""
|
|
cidrs = []
|
|
try:
|
|
with urllib.request.urlopen(PRESETS["aws"]["v4"]) as r:
|
|
data = json.loads(r.read().decode())
|
|
for prefix in data.get("prefixes", []):
|
|
cidrs.append(prefix["ip_prefix"])
|
|
print(f" Downloaded {len(cidrs)} IPv4 ranges")
|
|
except Exception as e:
|
|
print(f" [WARN] Failed to download: {e}")
|
|
return cidrs
|
|
|
|
def download_google():
|
|
"""Download Google Cloud IP ranges"""
|
|
cidrs = []
|
|
try:
|
|
with urllib.request.urlopen(PRESETS["google"]["v4"]) as r:
|
|
data = json.loads(r.read().decode())
|
|
for prefix in data.get("prefixes", []):
|
|
if "ipv4Prefix" in prefix:
|
|
cidrs.append(prefix["ipv4Prefix"])
|
|
print(f" Downloaded {len(cidrs)} IPv4 ranges")
|
|
except Exception as e:
|
|
print(f" [WARN] Failed to download: {e}")
|
|
return cidrs
|
|
|
|
def download_github():
|
|
"""Download GitHub IP ranges"""
|
|
cidrs = []
|
|
try:
|
|
req = urllib.request.Request(
|
|
PRESETS["github"]["v4"],
|
|
headers={"User-Agent": "xdp-whitelist"}
|
|
)
|
|
with urllib.request.urlopen(req) as r:
|
|
data = json.loads(r.read().decode())
|
|
for key in ["hooks", "web", "api", "git", "packages", "pages", "importer", "actions", "dependabot"]:
|
|
if key in data:
|
|
cidrs.extend(data[key])
|
|
cidrs = list(set(c for c in cidrs if ':' not in c))
|
|
print(f" Downloaded {len(cidrs)} IPv4 ranges")
|
|
except Exception as e:
|
|
print(f" [WARN] Failed to download: {e}")
|
|
return cidrs
|
|
|
|
def add_whitelist(name, cidrs=None):
|
|
"""Add IPs to whitelist"""
|
|
name = name.lower()
|
|
WHITELIST_DIR.mkdir(parents=True, exist_ok=True)
|
|
wl_file = WHITELIST_DIR / f"{name}.txt"
|
|
|
|
if cidrs is None and wl_file.exists():
|
|
with open(wl_file) as f:
|
|
cidrs = [line.strip() for line in f if line.strip()]
|
|
if cidrs:
|
|
print(f"[INFO] Using cached {name} ({len(cidrs)} CIDRs)")
|
|
|
|
if cidrs is None:
|
|
if name == "cloudflare":
|
|
print(f"[INFO] Downloading Cloudflare IP ranges...")
|
|
cidrs = download_cloudflare()
|
|
elif name == "aws":
|
|
print(f"[INFO] Downloading AWS IP ranges...")
|
|
cidrs = download_aws()
|
|
elif name == "google":
|
|
print(f"[INFO] Downloading Google Cloud IP ranges...")
|
|
cidrs = download_google()
|
|
elif name == "github":
|
|
print(f"[INFO] Downloading GitHub IP ranges...")
|
|
cidrs = download_github()
|
|
else:
|
|
print(f"[ERROR] Unknown preset: {name}")
|
|
print(f"Available presets: {', '.join(PRESETS.keys())}")
|
|
return False
|
|
|
|
if not cidrs:
|
|
print("[ERROR] No CIDRs to add")
|
|
return False
|
|
|
|
with open(wl_file, 'w') as f:
|
|
f.write('\n'.join(cidrs))
|
|
|
|
map_id = get_map_id("whitelist_v4")
|
|
if not map_id:
|
|
print("[ERROR] whitelist_v4 map not found. Is XDP loaded?")
|
|
return False
|
|
|
|
v4_cidrs, v6_cidrs = classify_cidrs(cidrs)
|
|
|
|
if v4_cidrs:
|
|
print(f"[INFO] Adding {len(v4_cidrs)} IPv4 CIDRs to whitelist...")
|
|
batch_map_operation(map_id, v4_cidrs, operation="update", batch_size=500)
|
|
|
|
if v6_cidrs:
|
|
map_id_v6 = get_map_id("whitelist_v6")
|
|
if map_id_v6:
|
|
print(f"[INFO] Adding {len(v6_cidrs)} IPv6 CIDRs to whitelist...")
|
|
batch_map_operation(map_id_v6, v6_cidrs, operation="update", batch_size=500, ipv6=True)
|
|
|
|
print(f"[OK] Whitelisted {name}: {len(v4_cidrs)} v4 + {len(v6_cidrs)} v6 CIDRs")
|
|
return True
|
|
|
|
def del_whitelist(name):
|
|
"""Remove IPs from whitelist"""
|
|
name = name.lower()
|
|
wl_file = WHITELIST_DIR / f"{name}.txt"
|
|
|
|
if not wl_file.exists():
|
|
print(f"[ERROR] {name} is not whitelisted")
|
|
return False
|
|
|
|
map_id = get_map_id("whitelist_v4")
|
|
if not map_id:
|
|
print("[ERROR] whitelist_v4 map not found")
|
|
return False
|
|
|
|
with open(wl_file) as f:
|
|
cidrs = [line.strip() for line in f if line.strip()]
|
|
|
|
v4_cidrs, v6_cidrs = classify_cidrs(cidrs)
|
|
|
|
if v4_cidrs:
|
|
print(f"[INFO] Removing {len(v4_cidrs)} IPv4 CIDRs from whitelist...")
|
|
batch_map_operation(map_id, v4_cidrs, operation="delete", batch_size=500)
|
|
|
|
if v6_cidrs:
|
|
map_id_v6 = get_map_id("whitelist_v6")
|
|
if map_id_v6:
|
|
print(f"[INFO] Removing {len(v6_cidrs)} IPv6 CIDRs from whitelist...")
|
|
batch_map_operation(map_id_v6, v6_cidrs, operation="delete", batch_size=500, ipv6=True)
|
|
|
|
wl_file.unlink()
|
|
print(f"[OK] Removed {name} from whitelist")
|
|
return True
|
|
|
|
def list_whitelist():
|
|
"""List whitelisted services"""
|
|
print("=== Whitelisted Services ===")
|
|
|
|
if not WHITELIST_DIR.exists():
|
|
print(" (none)")
|
|
return
|
|
|
|
files = list(WHITELIST_DIR.glob("*.txt"))
|
|
if not files:
|
|
print(" (none)")
|
|
return
|
|
|
|
for wl_file in sorted(files):
|
|
name = wl_file.stem
|
|
with open(wl_file) as f:
|
|
cidrs = [line.strip() for line in f if line.strip()]
|
|
v4_count = sum(1 for c in cidrs if ':' not in c)
|
|
v6_count = len(cidrs) - v4_count
|
|
desc = PRESETS.get(name, {}).get("desc", "Custom")
|
|
if v6_count > 0:
|
|
print(f" {name}: {v4_count} v4 + {v6_count} v6 CIDRs ({desc})")
|
|
else:
|
|
print(f" {name}: {v4_count} CIDRs ({desc})")
|
|
|
|
def show_presets():
|
|
"""Show available presets"""
|
|
print("=== Available Presets ===")
|
|
for name, info in PRESETS.items():
|
|
print(f" {name}: {info['desc']}")
|
|
|
|
def show_help():
|
|
print("""XDP Whitelist Manager - Fast batch IP whitelisting
|
|
|
|
Usage: xdp-whitelist <command> [args]
|
|
|
|
Commands:
|
|
add <preset|file> Whitelist a preset or custom file
|
|
del <name> Remove from whitelist
|
|
list List whitelisted services
|
|
presets Show available presets
|
|
|
|
Presets:
|
|
cloudflare Cloudflare CDN/Proxy IPs
|
|
aws Amazon Web Services
|
|
google Google Cloud Platform
|
|
github GitHub Services
|
|
|
|
Examples:
|
|
xdp-whitelist add cloudflare # Whitelist Cloudflare
|
|
xdp-whitelist add aws # Whitelist AWS
|
|
xdp-whitelist del cloudflare # Remove Cloudflare
|
|
xdp-whitelist list
|
|
""")
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
show_help()
|
|
return
|
|
|
|
cmd = sys.argv[1]
|
|
|
|
if cmd == "add" and len(sys.argv) >= 3:
|
|
add_whitelist(sys.argv[2])
|
|
elif cmd == "del" and len(sys.argv) >= 3:
|
|
del_whitelist(sys.argv[2])
|
|
elif cmd == "list":
|
|
list_whitelist()
|
|
elif cmd == "presets":
|
|
show_presets()
|
|
else:
|
|
show_help()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|