Files
xdp-defense/lib/xdp_country.py
kaffa 667c6eac81 Fix 12 code review issues (4 MEDIUM + 8 LOW)
MEDIUM:
- M1: Whitelist direct IP/CIDR additions now persist to direct.txt
- M2: get_map_id() uses 5s TTL cache (single bpftool call for all maps)
- M3: IPv6 extension header parsing in xdp_ddos.c (hop-by-hop/routing/frag/dst)
- M4: Shell injection prevention - sanitize_input() + sys.argv[] for all Python calls

LOW:
- L1: Remove redundant self.running (uses _stop_event only)
- L2: Remove unused config values (rate_limit_after, cooldown_multiplier, retrain_interval)
- L3: Thread poll intervals reloaded on SIGHUP
- L4: batch_map_operation counts only successfully written entries
- L5: Clarify unique_ips_approx comment (per-packet counter)
- L6: Document LRU_HASH multi-CPU race condition as acceptable
- L7: Download Cloudflare IPv6 ranges in whitelist preset
- L8: Fix file handle leak in xdp_country.py list_countries()

Also: SIGHUP now preserves EWMA/violation state, daemon skips whitelisted
IPs in EWMA/AI escalation, deep copy for default config, IHL validation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 09:23:41 +09:00

177 lines
5.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""
XDP Country Blocker - Fast batch IP blocking
Uses bpftool batch for high-speed map updates
"""
import sys
import time
import urllib.request
from pathlib import Path
from xdp_common import get_map_id, batch_map_operation, classify_cidrs
COUNTRY_DIR = Path("/etc/xdp-blocker/countries")
IPDENY_V4_URL = "https://www.ipdeny.com/ipblocks/data/countries/{}.zone"
IPDENY_V6_URL = "https://www.ipdeny.com/ipblocks/data/ipv6/ipv6-country-blocks/{}.zone"
def download_country(cc):
"""Download country IPv4 + IPv6 IP lists"""
COUNTRY_DIR.mkdir(parents=True, exist_ok=True)
cc_file = COUNTRY_DIR / f"{cc.lower()}.txt"
cidrs = []
# Download IPv4
print(f"[INFO] Downloading {cc.upper()} IPv4 ranges...")
try:
urllib.request.urlretrieve(IPDENY_V4_URL.format(cc.lower()), cc_file)
with open(cc_file) as f:
cidrs.extend(line.strip() for line in f if line.strip())
print(f" IPv4: {len(cidrs)} CIDRs")
except Exception as e:
print(f" [WARN] IPv4 download failed: {e}")
# Download IPv6
v6_count = 0
try:
v6_tmp = COUNTRY_DIR / f"{cc.lower()}_v6.tmp"
urllib.request.urlretrieve(IPDENY_V6_URL.format(cc.lower()), v6_tmp)
with open(v6_tmp) as f:
v6_cidrs = [line.strip() for line in f if line.strip()]
cidrs.extend(v6_cidrs)
v6_count = len(v6_cidrs)
v6_tmp.unlink(missing_ok=True)
print(f" IPv6: {v6_count} CIDRs")
except Exception:
pass
if not cidrs:
print(f"[ERROR] No IP ranges found for {cc.upper()}")
return None
with open(cc_file, 'w') as f:
f.write('\n'.join(cidrs) + '\n')
return cc_file
def add_country(cc):
"""Add country IPs to XDP blocklist using batch (IPv4 + IPv6)"""
cc = cc.lower()
cc_file = COUNTRY_DIR / f"{cc}.txt"
if not cc_file.exists():
cc_file = download_country(cc)
if not cc_file:
return False
with open(cc_file) as f:
cidrs = [line.strip() for line in f if line.strip()]
v4_cidrs, v6_cidrs = classify_cidrs(cidrs)
if v4_cidrs:
map_id = get_map_id("blocklist_v4")
if not map_id:
print("[ERROR] blocklist_v4 map not found. Is XDP loaded?")
return False
print(f"[INFO] Adding {len(v4_cidrs)} IPv4 CIDRs for {cc.upper()}...")
batch_map_operation(map_id, v4_cidrs, operation="update")
if v6_cidrs:
map_id_v6 = get_map_id("blocklist_v6")
if map_id_v6:
print(f"[INFO] Adding {len(v6_cidrs)} IPv6 CIDRs for {cc.upper()}...")
batch_map_operation(map_id_v6, v6_cidrs, operation="update", ipv6=True)
else:
print("[WARN] blocklist_v6 map not found, skipping IPv6")
print(f"[OK] Added {cc.upper()}: {len(v4_cidrs)} v4 + {len(v6_cidrs)} v6 CIDRs")
return True
def del_country(cc):
"""Remove country IPs from XDP blocklist"""
cc = cc.lower()
cc_file = COUNTRY_DIR / f"{cc}.txt"
if not cc_file.exists():
print(f"[ERROR] Country {cc.upper()} is not blocked")
return False
with open(cc_file) as f:
cidrs = [line.strip() for line in f if line.strip()]
v4_cidrs, v6_cidrs = classify_cidrs(cidrs)
if v4_cidrs:
map_id = get_map_id("blocklist_v4")
if map_id:
print(f"[INFO] Removing {len(v4_cidrs)} IPv4 CIDRs for {cc.upper()}...")
batch_map_operation(map_id, v4_cidrs, operation="delete")
if v6_cidrs:
map_id_v6 = get_map_id("blocklist_v6")
if map_id_v6:
print(f"[INFO] Removing {len(v6_cidrs)} IPv6 CIDRs for {cc.upper()}...")
batch_map_operation(map_id_v6, v6_cidrs, operation="delete", ipv6=True)
cc_file.unlink()
print(f"[OK] Removed {cc.upper()}: {len(v4_cidrs)} v4 + {len(v6_cidrs)} v6 CIDRs")
return True
def list_countries():
"""List blocked countries"""
print("=== Blocked Countries ===")
if not COUNTRY_DIR.exists():
print(" (none)")
return
files = list(COUNTRY_DIR.glob("*.txt"))
if not files:
print(" (none)")
return
for cc_file in sorted(files):
cc = cc_file.stem.upper()
with open(cc_file) as f:
count = sum(1 for _ in f)
mtime = cc_file.stat().st_mtime
age = int((time.time() - mtime) / 86400)
print(f" {cc}: {count} CIDRs (updated {age}d ago)")
def show_help():
print("""XDP Country Blocker - Fast batch IP blocking
Usage: xdp-country <command> [args]
Commands:
add <cc> Block a country (e.g., br, cn, ru, kp)
del <cc> Unblock a country
list List blocked countries
Examples:
xdp-country add br # Block Brazil (~13K CIDRs in seconds)
xdp-country add cn # Block China
xdp-country del br # Unblock Brazil
xdp-country list
""")
def main():
if len(sys.argv) < 2:
show_help()
return
cmd = sys.argv[1]
if cmd == "add" and len(sys.argv) >= 3:
add_country(sys.argv[2])
elif cmd == "del" and len(sys.argv) >= 3:
del_country(sys.argv[2])
elif cmd == "list":
list_countries()
else:
show_help()
if __name__ == "__main__":
main()