#!/usr/bin/env python3 """ XDP Country Blocker - Fast batch IP blocking Uses bpftool batch for high-speed map updates """ import sys import time import urllib.request from pathlib import Path from xdp_common import get_map_id, batch_map_operation, classify_cidrs COUNTRY_DIR = Path("/etc/xdp-blocker/countries") IPDENY_V4_URL = "https://www.ipdeny.com/ipblocks/data/countries/{}.zone" IPDENY_V6_URL = "https://www.ipdeny.com/ipblocks/data/ipv6/ipv6-country-blocks/{}.zone" def download_country(cc): """Download country IPv4 + IPv6 IP lists""" COUNTRY_DIR.mkdir(parents=True, exist_ok=True) cc_file = COUNTRY_DIR / f"{cc.lower()}.txt" cidrs = [] # Download IPv4 print(f"[INFO] Downloading {cc.upper()} IPv4 ranges...") try: urllib.request.urlretrieve(IPDENY_V4_URL.format(cc.lower()), cc_file) with open(cc_file) as f: cidrs.extend(line.strip() for line in f if line.strip()) print(f" IPv4: {len(cidrs)} CIDRs") except Exception as e: print(f" [WARN] IPv4 download failed: {e}") # Download IPv6 v6_count = 0 try: v6_tmp = COUNTRY_DIR / f"{cc.lower()}_v6.tmp" urllib.request.urlretrieve(IPDENY_V6_URL.format(cc.lower()), v6_tmp) with open(v6_tmp) as f: v6_cidrs = [line.strip() for line in f if line.strip()] cidrs.extend(v6_cidrs) v6_count = len(v6_cidrs) v6_tmp.unlink(missing_ok=True) print(f" IPv6: {v6_count} CIDRs") except Exception: pass if not cidrs: print(f"[ERROR] No IP ranges found for {cc.upper()}") return None with open(cc_file, 'w') as f: f.write('\n'.join(cidrs) + '\n') return cc_file def add_country(cc): """Add country IPs to XDP blocklist using batch (IPv4 + IPv6)""" cc = cc.lower() cc_file = COUNTRY_DIR / f"{cc}.txt" if not cc_file.exists(): cc_file = download_country(cc) if not cc_file: return False with open(cc_file) as f: cidrs = [line.strip() for line in f if line.strip()] v4_cidrs, v6_cidrs = classify_cidrs(cidrs) if v4_cidrs: map_id = get_map_id("blocklist_v4") if not map_id: print("[ERROR] blocklist_v4 map not found. Is XDP loaded?") return False print(f"[INFO] Adding {len(v4_cidrs)} IPv4 CIDRs for {cc.upper()}...") batch_map_operation(map_id, v4_cidrs, operation="update") if v6_cidrs: map_id_v6 = get_map_id("blocklist_v6") if map_id_v6: print(f"[INFO] Adding {len(v6_cidrs)} IPv6 CIDRs for {cc.upper()}...") batch_map_operation(map_id_v6, v6_cidrs, operation="update", ipv6=True) else: print("[WARN] blocklist_v6 map not found, skipping IPv6") print(f"[OK] Added {cc.upper()}: {len(v4_cidrs)} v4 + {len(v6_cidrs)} v6 CIDRs") return True def del_country(cc): """Remove country IPs from XDP blocklist""" cc = cc.lower() cc_file = COUNTRY_DIR / f"{cc}.txt" if not cc_file.exists(): print(f"[ERROR] Country {cc.upper()} is not blocked") return False with open(cc_file) as f: cidrs = [line.strip() for line in f if line.strip()] v4_cidrs, v6_cidrs = classify_cidrs(cidrs) if v4_cidrs: map_id = get_map_id("blocklist_v4") if map_id: print(f"[INFO] Removing {len(v4_cidrs)} IPv4 CIDRs for {cc.upper()}...") batch_map_operation(map_id, v4_cidrs, operation="delete") if v6_cidrs: map_id_v6 = get_map_id("blocklist_v6") if map_id_v6: print(f"[INFO] Removing {len(v6_cidrs)} IPv6 CIDRs for {cc.upper()}...") batch_map_operation(map_id_v6, v6_cidrs, operation="delete", ipv6=True) cc_file.unlink() print(f"[OK] Removed {cc.upper()}: {len(v4_cidrs)} v4 + {len(v6_cidrs)} v6 CIDRs") return True def list_countries(): """List blocked countries""" print("=== Blocked Countries ===") if not COUNTRY_DIR.exists(): print(" (none)") return files = list(COUNTRY_DIR.glob("*.txt")) if not files: print(" (none)") return for cc_file in sorted(files): cc = cc_file.stem.upper() count = sum(1 for _ in open(cc_file)) mtime = cc_file.stat().st_mtime age = int((time.time() - mtime) / 86400) print(f" {cc}: {count} CIDRs (updated {age}d ago)") def show_help(): print("""XDP Country Blocker - Fast batch IP blocking Usage: xdp-country [args] Commands: add Block a country (e.g., br, cn, ru, kp) del Unblock a country list List blocked countries Examples: xdp-country add br # Block Brazil (~13K CIDRs in seconds) xdp-country add cn # Block China xdp-country del br # Unblock Brazil xdp-country list """) def main(): if len(sys.argv) < 2: show_help() return cmd = sys.argv[1] if cmd == "add" and len(sys.argv) >= 3: add_country(sys.argv[2]) elif cmd == "del" and len(sys.argv) >= 3: del_country(sys.argv[2]) elif cmd == "list": list_countries() else: show_help() if __name__ == "__main__": main()