Files
xdp-defense/lib/xdp_country.py
kaffa 1bcaddce25 Unify xdp-blocker and xdp-ddos into single xdp-defense project
Chain two XDP programs via libxdp dispatcher on the same interface:
xdp_blocker (priority 10) handles CIDR/country/whitelist blocking,
xdp_ddos (priority 20) handles rate limiting, EWMA analysis, and AI
anomaly detection. Whitelist maps are shared via BPF map pinning so
whitelisted IPs bypass both blocklist checks and DDoS rate limiting.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 08:39:21 +09:00

176 lines
5.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""
XDP Country Blocker - Fast batch IP blocking
Uses bpftool batch for high-speed map updates
"""
import sys
import time
import urllib.request
from pathlib import Path
from xdp_common import get_map_id, batch_map_operation, classify_cidrs
COUNTRY_DIR = Path("/etc/xdp-blocker/countries")
IPDENY_V4_URL = "https://www.ipdeny.com/ipblocks/data/countries/{}.zone"
IPDENY_V6_URL = "https://www.ipdeny.com/ipblocks/data/ipv6/ipv6-country-blocks/{}.zone"
def download_country(cc):
"""Download country IPv4 + IPv6 IP lists"""
COUNTRY_DIR.mkdir(parents=True, exist_ok=True)
cc_file = COUNTRY_DIR / f"{cc.lower()}.txt"
cidrs = []
# Download IPv4
print(f"[INFO] Downloading {cc.upper()} IPv4 ranges...")
try:
urllib.request.urlretrieve(IPDENY_V4_URL.format(cc.lower()), cc_file)
with open(cc_file) as f:
cidrs.extend(line.strip() for line in f if line.strip())
print(f" IPv4: {len(cidrs)} CIDRs")
except Exception as e:
print(f" [WARN] IPv4 download failed: {e}")
# Download IPv6
v6_count = 0
try:
v6_tmp = COUNTRY_DIR / f"{cc.lower()}_v6.tmp"
urllib.request.urlretrieve(IPDENY_V6_URL.format(cc.lower()), v6_tmp)
with open(v6_tmp) as f:
v6_cidrs = [line.strip() for line in f if line.strip()]
cidrs.extend(v6_cidrs)
v6_count = len(v6_cidrs)
v6_tmp.unlink(missing_ok=True)
print(f" IPv6: {v6_count} CIDRs")
except Exception:
pass
if not cidrs:
print(f"[ERROR] No IP ranges found for {cc.upper()}")
return None
with open(cc_file, 'w') as f:
f.write('\n'.join(cidrs) + '\n')
return cc_file
def add_country(cc):
"""Add country IPs to XDP blocklist using batch (IPv4 + IPv6)"""
cc = cc.lower()
cc_file = COUNTRY_DIR / f"{cc}.txt"
if not cc_file.exists():
cc_file = download_country(cc)
if not cc_file:
return False
with open(cc_file) as f:
cidrs = [line.strip() for line in f if line.strip()]
v4_cidrs, v6_cidrs = classify_cidrs(cidrs)
if v4_cidrs:
map_id = get_map_id("blocklist_v4")
if not map_id:
print("[ERROR] blocklist_v4 map not found. Is XDP loaded?")
return False
print(f"[INFO] Adding {len(v4_cidrs)} IPv4 CIDRs for {cc.upper()}...")
batch_map_operation(map_id, v4_cidrs, operation="update")
if v6_cidrs:
map_id_v6 = get_map_id("blocklist_v6")
if map_id_v6:
print(f"[INFO] Adding {len(v6_cidrs)} IPv6 CIDRs for {cc.upper()}...")
batch_map_operation(map_id_v6, v6_cidrs, operation="update", ipv6=True)
else:
print("[WARN] blocklist_v6 map not found, skipping IPv6")
print(f"[OK] Added {cc.upper()}: {len(v4_cidrs)} v4 + {len(v6_cidrs)} v6 CIDRs")
return True
def del_country(cc):
"""Remove country IPs from XDP blocklist"""
cc = cc.lower()
cc_file = COUNTRY_DIR / f"{cc}.txt"
if not cc_file.exists():
print(f"[ERROR] Country {cc.upper()} is not blocked")
return False
with open(cc_file) as f:
cidrs = [line.strip() for line in f if line.strip()]
v4_cidrs, v6_cidrs = classify_cidrs(cidrs)
if v4_cidrs:
map_id = get_map_id("blocklist_v4")
if map_id:
print(f"[INFO] Removing {len(v4_cidrs)} IPv4 CIDRs for {cc.upper()}...")
batch_map_operation(map_id, v4_cidrs, operation="delete")
if v6_cidrs:
map_id_v6 = get_map_id("blocklist_v6")
if map_id_v6:
print(f"[INFO] Removing {len(v6_cidrs)} IPv6 CIDRs for {cc.upper()}...")
batch_map_operation(map_id_v6, v6_cidrs, operation="delete", ipv6=True)
cc_file.unlink()
print(f"[OK] Removed {cc.upper()}: {len(v4_cidrs)} v4 + {len(v6_cidrs)} v6 CIDRs")
return True
def list_countries():
"""List blocked countries"""
print("=== Blocked Countries ===")
if not COUNTRY_DIR.exists():
print(" (none)")
return
files = list(COUNTRY_DIR.glob("*.txt"))
if not files:
print(" (none)")
return
for cc_file in sorted(files):
cc = cc_file.stem.upper()
count = sum(1 for _ in open(cc_file))
mtime = cc_file.stat().st_mtime
age = int((time.time() - mtime) / 86400)
print(f" {cc}: {count} CIDRs (updated {age}d ago)")
def show_help():
print("""XDP Country Blocker - Fast batch IP blocking
Usage: xdp-country <command> [args]
Commands:
add <cc> Block a country (e.g., br, cn, ru, kp)
del <cc> Unblock a country
list List blocked countries
Examples:
xdp-country add br # Block Brazil (~13K CIDRs in seconds)
xdp-country add cn # Block China
xdp-country del br # Unblock Brazil
xdp-country list
""")
def main():
if len(sys.argv) < 2:
show_help()
return
cmd = sys.argv[1]
if cmd == "add" and len(sys.argv) >= 3:
add_country(sys.argv[2])
elif cmd == "del" and len(sys.argv) >= 3:
del_country(sys.argv[2])
elif cmd == "list":
list_countries()
else:
show_help()
if __name__ == "__main__":
main()