Files
xdp-defense/lib/xdp_whitelist.py
kaffa 4ae4440504 Unify legacy data path /etc/xdp-blocker → /etc/xdp-defense
All config/data paths now use /etc/xdp-defense/ consistently,
eliminating the legacy xdp-blocker directory reference.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 16:40:46 +09:00

276 lines
8.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""
XDP Whitelist Manager - Fast batch IP whitelisting
Supports presets like Cloudflare, AWS, Google, etc.
"""
import sys
import json
import urllib.request
from pathlib import Path
from xdp_common import get_map_id, batch_map_operation, classify_cidrs
WHITELIST_DIR = Path("/etc/xdp-defense/whitelist")
# Preset URLs for trusted services
PRESETS = {
"cloudflare": {
"v4": "https://www.cloudflare.com/ips-v4",
"v6": "https://www.cloudflare.com/ips-v6",
"desc": "Cloudflare CDN/Proxy"
},
"aws": {
"v4": "https://ip-ranges.amazonaws.com/ip-ranges.json",
"desc": "Amazon Web Services (all regions)"
},
"google": {
"v4": "https://www.gstatic.com/ipranges/cloud.json",
"desc": "Google Cloud Platform"
},
"github": {
"v4": "https://api.github.com/meta",
"desc": "GitHub Services"
}
}
def download_cloudflare():
"""Download Cloudflare IP ranges (IPv4 + IPv6)"""
cidrs = []
try:
req = urllib.request.Request(
PRESETS["cloudflare"]["v4"],
headers={"User-Agent": "xdp-whitelist/1.0"}
)
with urllib.request.urlopen(req) as r:
v4 = r.read().decode().strip().split('\n')
cidrs.extend(v4)
print(f" Downloaded {len(v4)} IPv4 ranges")
except Exception as e:
print(f" [WARN] Failed to download IPv4: {e}")
try:
req = urllib.request.Request(
PRESETS["cloudflare"]["v6"],
headers={"User-Agent": "xdp-whitelist/1.0"}
)
with urllib.request.urlopen(req) as r:
v6 = r.read().decode().strip().split('\n')
cidrs.extend(v6)
print(f" Downloaded {len(v6)} IPv6 ranges")
except Exception as e:
print(f" [WARN] Failed to download IPv6: {e}")
return cidrs
def download_aws():
"""Download AWS IP ranges"""
cidrs = []
try:
with urllib.request.urlopen(PRESETS["aws"]["v4"]) as r:
data = json.loads(r.read().decode())
for prefix in data.get("prefixes", []):
cidrs.append(prefix["ip_prefix"])
print(f" Downloaded {len(cidrs)} IPv4 ranges")
except Exception as e:
print(f" [WARN] Failed to download: {e}")
return cidrs
def download_google():
"""Download Google Cloud IP ranges"""
cidrs = []
try:
with urllib.request.urlopen(PRESETS["google"]["v4"]) as r:
data = json.loads(r.read().decode())
for prefix in data.get("prefixes", []):
if "ipv4Prefix" in prefix:
cidrs.append(prefix["ipv4Prefix"])
print(f" Downloaded {len(cidrs)} IPv4 ranges")
except Exception as e:
print(f" [WARN] Failed to download: {e}")
return cidrs
def download_github():
"""Download GitHub IP ranges"""
cidrs = []
try:
req = urllib.request.Request(
PRESETS["github"]["v4"],
headers={"User-Agent": "xdp-whitelist"}
)
with urllib.request.urlopen(req) as r:
data = json.loads(r.read().decode())
for key in ["hooks", "web", "api", "git", "packages", "pages", "importer", "actions", "dependabot"]:
if key in data:
cidrs.extend(data[key])
cidrs = list(set(c for c in cidrs if ':' not in c))
print(f" Downloaded {len(cidrs)} IPv4 ranges")
except Exception as e:
print(f" [WARN] Failed to download: {e}")
return cidrs
def add_whitelist(name, cidrs=None):
"""Add IPs to whitelist"""
name = name.lower()
WHITELIST_DIR.mkdir(parents=True, exist_ok=True)
wl_file = WHITELIST_DIR / f"{name}.txt"
if cidrs is None and wl_file.exists():
with open(wl_file) as f:
cidrs = [line.strip() for line in f if line.strip()]
if cidrs:
print(f"[INFO] Using cached {name} ({len(cidrs)} CIDRs)")
if cidrs is None:
if name == "cloudflare":
print(f"[INFO] Downloading Cloudflare IP ranges...")
cidrs = download_cloudflare()
elif name == "aws":
print(f"[INFO] Downloading AWS IP ranges...")
cidrs = download_aws()
elif name == "google":
print(f"[INFO] Downloading Google Cloud IP ranges...")
cidrs = download_google()
elif name == "github":
print(f"[INFO] Downloading GitHub IP ranges...")
cidrs = download_github()
else:
print(f"[ERROR] Unknown preset: {name}")
print(f"Available presets: {', '.join(PRESETS.keys())}")
return False
if not cidrs:
print("[ERROR] No CIDRs to add")
return False
with open(wl_file, 'w') as f:
f.write('\n'.join(cidrs))
map_id = get_map_id("whitelist_v4")
if not map_id:
print("[ERROR] whitelist_v4 map not found. Is XDP loaded?")
return False
v4_cidrs, v6_cidrs = classify_cidrs(cidrs)
if v4_cidrs:
print(f"[INFO] Adding {len(v4_cidrs)} IPv4 CIDRs to whitelist...")
batch_map_operation(map_id, v4_cidrs, operation="update", batch_size=500)
if v6_cidrs:
map_id_v6 = get_map_id("whitelist_v6")
if map_id_v6:
print(f"[INFO] Adding {len(v6_cidrs)} IPv6 CIDRs to whitelist...")
batch_map_operation(map_id_v6, v6_cidrs, operation="update", batch_size=500, ipv6=True)
print(f"[OK] Whitelisted {name}: {len(v4_cidrs)} v4 + {len(v6_cidrs)} v6 CIDRs")
return True
def del_whitelist(name):
"""Remove IPs from whitelist"""
name = name.lower()
wl_file = WHITELIST_DIR / f"{name}.txt"
if not wl_file.exists():
print(f"[ERROR] {name} is not whitelisted")
return False
map_id = get_map_id("whitelist_v4")
if not map_id:
print("[ERROR] whitelist_v4 map not found")
return False
with open(wl_file) as f:
cidrs = [line.strip() for line in f if line.strip()]
v4_cidrs, v6_cidrs = classify_cidrs(cidrs)
if v4_cidrs:
print(f"[INFO] Removing {len(v4_cidrs)} IPv4 CIDRs from whitelist...")
batch_map_operation(map_id, v4_cidrs, operation="delete", batch_size=500)
if v6_cidrs:
map_id_v6 = get_map_id("whitelist_v6")
if map_id_v6:
print(f"[INFO] Removing {len(v6_cidrs)} IPv6 CIDRs from whitelist...")
batch_map_operation(map_id_v6, v6_cidrs, operation="delete", batch_size=500, ipv6=True)
wl_file.unlink()
print(f"[OK] Removed {name} from whitelist")
return True
def list_whitelist():
"""List whitelisted services"""
print("=== Whitelisted Services ===")
if not WHITELIST_DIR.exists():
print(" (none)")
return
files = list(WHITELIST_DIR.glob("*.txt"))
if not files:
print(" (none)")
return
for wl_file in sorted(files):
name = wl_file.stem
with open(wl_file) as f:
cidrs = [line.strip() for line in f if line.strip()]
v4_count = sum(1 for c in cidrs if ':' not in c)
v6_count = len(cidrs) - v4_count
desc = PRESETS.get(name, {}).get("desc", "Custom")
if v6_count > 0:
print(f" {name}: {v4_count} v4 + {v6_count} v6 CIDRs ({desc})")
else:
print(f" {name}: {v4_count} CIDRs ({desc})")
def show_presets():
"""Show available presets"""
print("=== Available Presets ===")
for name, info in PRESETS.items():
print(f" {name}: {info['desc']}")
def show_help():
print("""XDP Whitelist Manager - Fast batch IP whitelisting
Usage: xdp-whitelist <command> [args]
Commands:
add <preset|file> Whitelist a preset or custom file
del <name> Remove from whitelist
list List whitelisted services
presets Show available presets
Presets:
cloudflare Cloudflare CDN/Proxy IPs
aws Amazon Web Services
google Google Cloud Platform
github GitHub Services
Examples:
xdp-whitelist add cloudflare # Whitelist Cloudflare
xdp-whitelist add aws # Whitelist AWS
xdp-whitelist del cloudflare # Remove Cloudflare
xdp-whitelist list
""")
def main():
if len(sys.argv) < 2:
show_help()
return
cmd = sys.argv[1]
if cmd == "add" and len(sys.argv) >= 3:
add_whitelist(sys.argv[2])
elif cmd == "del" and len(sys.argv) >= 3:
del_whitelist(sys.argv[2])
elif cmd == "list":
list_whitelist()
elif cmd == "presets":
show_presets()
else:
show_help()
if __name__ == "__main__":
main()