Unify xdp-blocker and xdp-ddos into single xdp-defense project

Chain two XDP programs via libxdp dispatcher on the same interface:
xdp_blocker (priority 10) handles CIDR/country/whitelist blocking,
xdp_ddos (priority 20) handles rate limiting, EWMA analysis, and AI
anomaly detection. Whitelist maps are shared via BPF map pinning so
whitelisted IPs bypass both blocklist checks and DDoS rate limiting.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
kaffa
2026-02-07 08:39:21 +09:00
commit 1bcaddce25
12 changed files with 3523 additions and 0 deletions

256
lib/xdp_whitelist.py Executable file
View File

@@ -0,0 +1,256 @@
#!/usr/bin/env python3
"""
XDP Whitelist Manager - Fast batch IP whitelisting
Supports presets like Cloudflare, AWS, Google, etc.
"""
import sys
import json
import urllib.request
from pathlib import Path
from xdp_common import get_map_id, batch_map_operation, classify_cidrs
WHITELIST_DIR = Path("/etc/xdp-blocker/whitelist")
# Preset URLs for trusted services
PRESETS = {
"cloudflare": {
"v4": "https://www.cloudflare.com/ips-v4",
"v6": "https://www.cloudflare.com/ips-v6",
"desc": "Cloudflare CDN/Proxy"
},
"aws": {
"v4": "https://ip-ranges.amazonaws.com/ip-ranges.json",
"desc": "Amazon Web Services (all regions)"
},
"google": {
"v4": "https://www.gstatic.com/ipranges/cloud.json",
"desc": "Google Cloud Platform"
},
"github": {
"v4": "https://api.github.com/meta",
"desc": "GitHub Services"
}
}
def download_cloudflare():
"""Download Cloudflare IP ranges"""
cidrs = []
try:
req = urllib.request.Request(
PRESETS["cloudflare"]["v4"],
headers={"User-Agent": "xdp-whitelist/1.0"}
)
with urllib.request.urlopen(req) as r:
cidrs.extend(r.read().decode().strip().split('\n'))
print(f" Downloaded {len(cidrs)} IPv4 ranges")
except Exception as e:
print(f" [WARN] Failed to download IPv4: {e}")
return cidrs
def download_aws():
"""Download AWS IP ranges"""
cidrs = []
try:
with urllib.request.urlopen(PRESETS["aws"]["v4"]) as r:
data = json.loads(r.read().decode())
for prefix in data.get("prefixes", []):
cidrs.append(prefix["ip_prefix"])
print(f" Downloaded {len(cidrs)} IPv4 ranges")
except Exception as e:
print(f" [WARN] Failed to download: {e}")
return cidrs
def download_google():
"""Download Google Cloud IP ranges"""
cidrs = []
try:
with urllib.request.urlopen(PRESETS["google"]["v4"]) as r:
data = json.loads(r.read().decode())
for prefix in data.get("prefixes", []):
if "ipv4Prefix" in prefix:
cidrs.append(prefix["ipv4Prefix"])
print(f" Downloaded {len(cidrs)} IPv4 ranges")
except Exception as e:
print(f" [WARN] Failed to download: {e}")
return cidrs
def download_github():
"""Download GitHub IP ranges"""
cidrs = []
try:
req = urllib.request.Request(
PRESETS["github"]["v4"],
headers={"User-Agent": "xdp-whitelist"}
)
with urllib.request.urlopen(req) as r:
data = json.loads(r.read().decode())
for key in ["hooks", "web", "api", "git", "packages", "pages", "importer", "actions", "dependabot"]:
if key in data:
cidrs.extend(data[key])
cidrs = list(set(c for c in cidrs if ':' not in c))
print(f" Downloaded {len(cidrs)} IPv4 ranges")
except Exception as e:
print(f" [WARN] Failed to download: {e}")
return cidrs
def add_whitelist(name, cidrs=None):
"""Add IPs to whitelist"""
name = name.lower()
WHITELIST_DIR.mkdir(parents=True, exist_ok=True)
wl_file = WHITELIST_DIR / f"{name}.txt"
if cidrs is None and wl_file.exists():
with open(wl_file) as f:
cidrs = [line.strip() for line in f if line.strip() and ':' not in line]
if cidrs:
print(f"[INFO] Using cached {name} ({len(cidrs)} CIDRs)")
if cidrs is None:
if name == "cloudflare":
print(f"[INFO] Downloading Cloudflare IP ranges...")
cidrs = download_cloudflare()
elif name == "aws":
print(f"[INFO] Downloading AWS IP ranges...")
cidrs = download_aws()
elif name == "google":
print(f"[INFO] Downloading Google Cloud IP ranges...")
cidrs = download_google()
elif name == "github":
print(f"[INFO] Downloading GitHub IP ranges...")
cidrs = download_github()
else:
print(f"[ERROR] Unknown preset: {name}")
print(f"Available presets: {', '.join(PRESETS.keys())}")
return False
if not cidrs:
print("[ERROR] No CIDRs to add")
return False
with open(wl_file, 'w') as f:
f.write('\n'.join(cidrs))
map_id = get_map_id("whitelist_v4")
if not map_id:
print("[ERROR] whitelist_v4 map not found. Is XDP loaded?")
return False
v4_cidrs, v6_cidrs = classify_cidrs(cidrs)
if v4_cidrs:
print(f"[INFO] Adding {len(v4_cidrs)} IPv4 CIDRs to whitelist...")
batch_map_operation(map_id, v4_cidrs, operation="update", batch_size=500)
if v6_cidrs:
map_id_v6 = get_map_id("whitelist_v6")
if map_id_v6:
print(f"[INFO] Adding {len(v6_cidrs)} IPv6 CIDRs to whitelist...")
batch_map_operation(map_id_v6, v6_cidrs, operation="update", batch_size=500, ipv6=True)
print(f"[OK] Whitelisted {name}: {len(v4_cidrs)} v4 + {len(v6_cidrs)} v6 CIDRs")
return True
def del_whitelist(name):
"""Remove IPs from whitelist"""
name = name.lower()
wl_file = WHITELIST_DIR / f"{name}.txt"
if not wl_file.exists():
print(f"[ERROR] {name} is not whitelisted")
return False
map_id = get_map_id("whitelist_v4")
if not map_id:
print("[ERROR] whitelist_v4 map not found")
return False
with open(wl_file) as f:
cidrs = [line.strip() for line in f if line.strip()]
v4_cidrs, v6_cidrs = classify_cidrs(cidrs)
if v4_cidrs:
print(f"[INFO] Removing {len(v4_cidrs)} IPv4 CIDRs from whitelist...")
batch_map_operation(map_id, v4_cidrs, operation="delete", batch_size=500)
if v6_cidrs:
map_id_v6 = get_map_id("whitelist_v6")
if map_id_v6:
print(f"[INFO] Removing {len(v6_cidrs)} IPv6 CIDRs from whitelist...")
batch_map_operation(map_id_v6, v6_cidrs, operation="delete", batch_size=500, ipv6=True)
wl_file.unlink()
print(f"[OK] Removed {name} from whitelist")
return True
def list_whitelist():
"""List whitelisted services"""
print("=== Whitelisted Services ===")
if not WHITELIST_DIR.exists():
print(" (none)")
return
files = list(WHITELIST_DIR.glob("*.txt"))
if not files:
print(" (none)")
return
for wl_file in sorted(files):
name = wl_file.stem
count = sum(1 for line in open(wl_file) if line.strip() and ':' not in line)
desc = PRESETS.get(name, {}).get("desc", "Custom")
print(f" {name}: {count} CIDRs ({desc})")
def show_presets():
"""Show available presets"""
print("=== Available Presets ===")
for name, info in PRESETS.items():
print(f" {name}: {info['desc']}")
def show_help():
print("""XDP Whitelist Manager - Fast batch IP whitelisting
Usage: xdp-whitelist <command> [args]
Commands:
add <preset|file> Whitelist a preset or custom file
del <name> Remove from whitelist
list List whitelisted services
presets Show available presets
Presets:
cloudflare Cloudflare CDN/Proxy IPs
aws Amazon Web Services
google Google Cloud Platform
github GitHub Services
Examples:
xdp-whitelist add cloudflare # Whitelist Cloudflare
xdp-whitelist add aws # Whitelist AWS
xdp-whitelist del cloudflare # Remove Cloudflare
xdp-whitelist list
""")
def main():
if len(sys.argv) < 2:
show_help()
return
cmd = sys.argv[1]
if cmd == "add" and len(sys.argv) >= 3:
add_whitelist(sys.argv[2])
elif cmd == "del" and len(sys.argv) >= 3:
del_whitelist(sys.argv[2])
elif cmd == "list":
list_whitelist()
elif cmd == "presets":
show_presets()
else:
show_help()
if __name__ == "__main__":
main()