From 11c1ab0134413ecb2575f48e8b873389bde5fc57 Mon Sep 17 00:00:00 2001 From: kaffa Date: Sat, 7 Feb 2026 10:14:07 +0900 Subject: [PATCH] Add time-aware traffic logger and auto-retrain system MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Log traffic features with timestamps to CSV every 5s - Add hour_sin/hour_cos time features (15 → 17 feature vector) - Auto-retrain from traffic log at configurable interval (default 24h) - Detect old 15-feature models and switch to learning mode - SIGUSR1 now retrains from traffic log first, falls back to collect mode - Add CLI: `ai traffic` (time-bucketed summary), `ai log` (recent entries) - Add config keys: traffic_log_file, retention_days, retrain_window Co-Authored-By: Claude Opus 4.6 --- bin/xdp-defense | 155 ++++++++++++++++++++++++++++++++ config/config.yaml | 7 +- lib/xdp_defense_daemon.py | 185 ++++++++++++++++++++++++++++++++++++-- 3 files changed, 337 insertions(+), 10 deletions(-) diff --git a/bin/xdp-defense b/bin/xdp-defense index 7785ea5..d04b6e2 100755 --- a/bin/xdp-defense +++ b/bin/xdp-defense @@ -801,6 +801,157 @@ cmd_ai_retrain() { fi } +cmd_ai_traffic() { + local log_file + log_file=$(python3 -c " +import yaml +with open('$CONFIG_FILE') as f: + cfg = yaml.safe_load(f) +print(cfg.get('ai',{}).get('traffic_log_file', '/var/lib/xdp-defense/traffic_log.csv')) +" 2>/dev/null || echo "/var/lib/xdp-defense/traffic_log.csv") + + [ ! -f "$log_file" ] && { log_err "Traffic log not found: $log_file"; exit 1; } + + python3 -c " +import csv, sys +from datetime import datetime, timedelta + +log_file = sys.argv[1] +cutoff = datetime.now() - timedelta(hours=24) + +# Buckets: 0-6, 6-12, 12-18, 18-24 +buckets = {0: [], 1: [], 2: [], 3: []} +total_samples = 0 + +with open(log_file, 'r') as f: + reader = csv.reader(f) + header = next(reader, None) + if header is None: + print('Traffic log is empty') + sys.exit(0) + for row in reader: + try: + ts = datetime.fromisoformat(row[0]) + if ts < cutoff: + continue + hour = float(row[1]) + bucket = min(int(hour // 6), 3) + # features: row[2]=hour_sin, row[3]=hour_cos, row[4]=total_packets, row[5]=total_bytes, ... + pps = float(row[4]) + bps = float(row[5]) + buckets[bucket].append((pps, bps)) + total_samples += 1 + except (ValueError, IndexError): + continue + +labels = ['00:00-06:00', '06:00-12:00', '12:00-18:00', '18:00-24:00'] +print() +print('\033[1m=== Traffic Summary (last 24h) ===\033[0m') +print(f'{\"Period\":>15} {\"Avg PPS\":>10} {\"Peak PPS\":>10} {\"Avg BPS\":>12} {\"Samples\":>8}') +print(f'{\"-\"*15} {\"-\"*10} {\"-\"*10} {\"-\"*12} {\"-\"*8}') + +for i, label in enumerate(labels): + data = buckets[i] + if not data: + print(f'{label:>15} {\"--\":>10} {\"--\":>10} {\"--\":>12} {0:>8}') + continue + pps_list = [d[0] for d in data] + bps_list = [d[1] for d in data] + avg_pps = sum(pps_list) / len(pps_list) + peak_pps = max(pps_list) + avg_bps = sum(bps_list) / len(bps_list) + + def fmt_bytes(b): + if b >= 1024*1024: + return f'{b/1024/1024:.1f}MB' + elif b >= 1024: + return f'{b/1024:.1f}KB' + return f'{b:.0f}B' + + print(f'{label:>15} {avg_pps:>10.0f} {peak_pps:>10.0f} {fmt_bytes(avg_bps):>12} {len(data):>8}') + +hours = total_samples * 5 / 3600 # 5s intervals +print(f'Total: {total_samples} samples ({hours:.1f}h)') + +# Show next retrain time +import yaml, os, time +try: + with open('$CONFIG_FILE') as f: + cfg = yaml.safe_load(f) + retrain_interval = cfg.get('ai',{}).get('retrain_interval', 86400) + model_file = cfg.get('ai',{}).get('model_file', '/var/lib/xdp-defense/ai_model.pkl') + if os.path.exists(model_file): + mtime = os.path.getmtime(model_file) + next_retrain = mtime + retrain_interval - time.time() + if next_retrain > 0: + h = int(next_retrain // 3600) + m = int((next_retrain % 3600) // 60) + print(f'Next retrain: {h}h {m}m') + else: + print('Next retrain: imminent') + else: + print('Next retrain: model not yet trained') +except: + pass +print() +" "$log_file" +} + +cmd_ai_log() { + local n=${1:-20} + [[ "$n" =~ ^[0-9]+$ ]] || n=20 + + local log_file + log_file=$(python3 -c " +import yaml +with open('$CONFIG_FILE') as f: + cfg = yaml.safe_load(f) +print(cfg.get('ai',{}).get('traffic_log_file', '/var/lib/xdp-defense/traffic_log.csv')) +" 2>/dev/null || echo "/var/lib/xdp-defense/traffic_log.csv") + + [ ! -f "$log_file" ] && { log_err "Traffic log not found: $log_file"; exit 1; } + + python3 -c " +import csv, sys + +log_file = sys.argv[1] +n = int(sys.argv[2]) + +rows = [] +with open(log_file, 'r') as f: + reader = csv.reader(f) + header = next(reader, None) + if header is None: + print('Traffic log is empty') + sys.exit(0) + for row in reader: + rows.append(row) + +# Show last N rows +display = rows[-n:] +print() +print('\033[1m=== Recent Traffic Log ===\033[0m') +print(f'{\"Timestamp\":>22} {\"Hour\":>6} {\"PPS\":>10} {\"Bytes\":>12} {\"SYN%\":>6} {\"UDP%\":>6} {\"ICMP%\":>6}') +print(f'{\"-\"*22} {\"-\"*6} {\"-\"*10} {\"-\"*12} {\"-\"*6} {\"-\"*6} {\"-\"*6}') + +for row in display: + try: + ts = row[0][:19] # trim microseconds + hour = float(row[1]) + pkts = float(row[4]) + bts = float(row[5]) + syn_r = float(row[14]) * 100 if len(row) > 14 else 0 + udp_r = float(row[15]) * 100 if len(row) > 15 else 0 + icmp_r = float(row[16]) * 100 if len(row) > 16 else 0 + print(f'{ts:>22} {hour:>6.1f} {pkts:>10.0f} {bts:>12.0f} {syn_r:>5.1f}% {udp_r:>5.1f}% {icmp_r:>5.1f}%') + except (ValueError, IndexError): + continue + +print(f'Showing {len(display)} of {len(rows)} entries') +print() +" "$log_file" "$n" +} + # ==================== GeoIP ==================== cmd_geoip() { @@ -923,6 +1074,8 @@ DDoS: AI: ai status Show AI model status ai retrain Trigger AI model retrain + ai traffic Show time-of-day traffic summary (last 24h) + ai log [N] Show recent N traffic log entries (default 20) Daemon: daemon start Start defense daemon (background) @@ -1026,6 +1179,8 @@ case "${1:-help}" in case "${2:-status}" in status) cmd_ai_status ;; retrain) cmd_ai_retrain ;; + traffic) cmd_ai_traffic ;; + log) cmd_ai_log "$3" ;; *) cmd_ai_status ;; esac ;; diff --git a/config/config.yaml b/config/config.yaml index 509d907..a80890c 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -65,6 +65,11 @@ ai: anomaly_threshold: -0.16 # sklearn decision_function threshold # Retraining - retrain_interval: 604800 # 7 days in seconds + retrain_interval: 86400 # auto-retrain interval (seconds, default 24h) + retrain_window: 86400 # data range for retrain (most recent N seconds) model_file: /var/lib/xdp-defense/ai_model.pkl training_data_file: /var/lib/xdp-defense/training_data.csv + + # Traffic logging + traffic_log_file: /var/lib/xdp-defense/traffic_log.csv + traffic_log_retention_days: 7 # days to keep traffic log data diff --git a/lib/xdp_defense_daemon.py b/lib/xdp_defense_daemon.py index 2015c53..53d010c 100755 --- a/lib/xdp_defense_daemon.py +++ b/lib/xdp_defense_daemon.py @@ -12,6 +12,7 @@ time-profile switching, and automatic escalation. """ import copy +import math import os import sys import time @@ -22,7 +23,7 @@ import logging.handlers import csv import pickle from collections import defaultdict -from datetime import datetime +from datetime import datetime, timedelta import yaml @@ -80,6 +81,10 @@ DEFAULT_CONFIG = { 'min_packets_for_sample': 20, 'model_file': '/var/lib/xdp-defense/ai_model.pkl', 'training_data_file': '/var/lib/xdp-defense/training_data.csv', + 'traffic_log_file': '/var/lib/xdp-defense/traffic_log.csv', + 'traffic_log_retention_days': 7, + 'retrain_interval': 86400, + 'retrain_window': 86400, }, } @@ -267,6 +272,7 @@ class AIDetector: with open(data_file, 'w', newline='') as f: writer = csv.writer(f) writer.writerow([ + 'hour_sin', 'hour_cos', 'total_packets', 'total_bytes', 'tcp_syn_count', 'tcp_other_count', 'udp_count', 'icmp_count', 'other_proto_count', 'unique_ips_approx', 'small_pkt_count', 'large_pkt_count', @@ -280,17 +286,30 @@ class AIDetector: log.error("AI training failed: %s", e) def load_model(self): - """Load a previously trained model.""" + """Load a previously trained model. Check feature dimension compatibility.""" model_file = self.cfg.get('model_file', '/var/lib/xdp-defense/ai_model.pkl') if not os.path.exists(model_file): return False try: with open(model_file, 'rb') as f: data = pickle.load(f) - self.model = data['model'] - self.scaler = data['scaler'] + model = data['model'] + scaler = data['scaler'] + + expected_features = 17 + if hasattr(scaler, 'n_features_in_') and scaler.n_features_in_ != expected_features: + log.warning( + "Model has %d features, expected %d. Switching to learning mode.", + scaler.n_features_in_, expected_features + ) + self.is_learning = True + return False + + self.model = model + self.scaler = scaler self.is_learning = False - log.info("AI model loaded from %s", model_file) + log.info("AI model loaded from %s (%d features)", + model_file, getattr(scaler, 'n_features_in_', '?')) return True except Exception as e: log.error("Failed to load AI model: %s", e) @@ -312,6 +331,51 @@ class AIDetector: log.error("AI prediction error: %s", e) return False, 0.0 + def retrain_from_log(self): + """Retrain the model from traffic_log.csv data.""" + log_file = self.cfg.get('traffic_log_file', '/var/lib/xdp-defense/traffic_log.csv') + if not os.path.exists(log_file): + log.warning("Traffic log not found: %s", log_file) + return False + + retrain_window = self.cfg.get('retrain_window', 86400) + cutoff = datetime.now() - timedelta(seconds=retrain_window) + + try: + samples = [] + with open(log_file, 'r', newline='') as f: + reader = csv.reader(f) + header = next(reader, None) + if header is None: + log.warning("Traffic log is empty") + return False + + # Feature columns: skip timestamp and hour (first 2), take remaining 17 + for row in reader: + try: + ts = datetime.fromisoformat(row[0]) + if ts < cutoff: + continue + features = [float(v) for v in row[2:]] # skip timestamp, hour + if len(features) == 17: + samples.append(features) + except (ValueError, IndexError): + continue + + if len(samples) < 10: + log.warning("Not enough recent samples for retrain (%d)", len(samples)) + return False + + log.info("Auto-retrain: loading %d samples from traffic log (window=%ds)", + len(samples), retrain_window) + self.training_data = samples + self._train() + return True + + except Exception as e: + log.error("retrain_from_log failed: %s", e) + return False + # ==================== ProfileManager ==================== @@ -421,6 +485,9 @@ class DDoSDaemon: if self.ai_detector.enabled: self.ai_detector.load_model() + self._last_retrain_time = self._get_model_mtime() + self._last_log_cleanup = time.time() + level = self.cfg['general'].get('log_level', 'info').upper() log.setLevel(getattr(logging, level, logging.INFO)) @@ -482,8 +549,80 @@ class DDoSDaemon: self._stop_event.set() def _handle_sigusr1(self, signum, frame): - log.info("SIGUSR1 received, requesting AI retrain...") - self.ai_detector.request_retrain() + log.info("SIGUSR1 received, triggering retrain from traffic log...") + if self.ai_detector.retrain_from_log(): + self._last_retrain_time = time.time() + log.info("SIGUSR1 retrain completed successfully") + else: + log.warning("SIGUSR1 retrain failed (falling back to collect mode)") + self.ai_detector.request_retrain() + + # ---- Traffic Logging ---- + + TRAFFIC_CSV_HEADER = [ + 'timestamp', 'hour', + 'hour_sin', 'hour_cos', + 'total_packets', 'total_bytes', 'tcp_syn_count', 'tcp_other_count', + 'udp_count', 'icmp_count', 'other_proto_count', 'unique_ips_approx', + 'small_pkt_count', 'large_pkt_count', + 'syn_ratio', 'udp_ratio', 'icmp_ratio', 'small_pkt_ratio', 'avg_pkt_size' + ] + + def _log_traffic(self, now, hour, features): + """Append one row to traffic_log.csv.""" + log_file = self.cfg['ai'].get('traffic_log_file', '/var/lib/xdp-defense/traffic_log.csv') + try: + write_header = not os.path.exists(log_file) or os.path.getsize(log_file) == 0 + os.makedirs(os.path.dirname(log_file), exist_ok=True) + with open(log_file, 'a', newline='') as f: + writer = csv.writer(f) + if write_header: + writer.writerow(self.TRAFFIC_CSV_HEADER) + row = [now.isoformat(), f'{hour:.4f}'] + [f'{v:.6f}' for v in features] + writer.writerow(row) + except Exception as e: + log.error("Failed to write traffic log: %s", e) + + def _cleanup_traffic_log(self): + """Remove entries older than retention_days from traffic_log.csv.""" + log_file = self.cfg['ai'].get('traffic_log_file', '/var/lib/xdp-defense/traffic_log.csv') + retention_days = self.cfg['ai'].get('traffic_log_retention_days', 7) + cutoff = datetime.now() - timedelta(days=retention_days) + + if not os.path.exists(log_file): + return + + try: + kept = [] + header = None + with open(log_file, 'r', newline='') as f: + reader = csv.reader(f) + header = next(reader, None) + for row in reader: + try: + ts = datetime.fromisoformat(row[0]) + if ts >= cutoff: + kept.append(row) + except (ValueError, IndexError): + kept.append(row) # keep unparseable rows + + with open(log_file, 'w', newline='') as f: + writer = csv.writer(f) + if header: + writer.writerow(header) + writer.writerows(kept) + + log.info("Traffic log cleanup: kept %d rows (retention=%dd)", len(kept), retention_days) + except Exception as e: + log.error("Traffic log cleanup failed: %s", e) + + def _get_model_mtime(self): + """Get model file modification time, or current time if not found.""" + model_file = self.cfg['ai'].get('model_file', '/var/lib/xdp-defense/ai_model.pkl') + try: + return os.path.getmtime(model_file) + except OSError: + return time.time() # ---- Worker Threads ---- @@ -552,6 +691,8 @@ class DDoSDaemon: from xdp_common import read_percpu_features, dump_rate_counters, block_ip, is_whitelisted prev_features = None + self._last_retrain_time = self._get_model_mtime() + self._last_log_cleanup = time.time() while not self._stop_event.is_set(): interval = self._ai_interval @@ -592,13 +733,39 @@ class DDoSDaemon: avg_pkt_size = deltas[1] / total deltas.extend([syn_ratio, udp_ratio, icmp_ratio, small_pkt_ratio, avg_pkt_size]) + # Add time features (hour_sin, hour_cos) at the front + now = datetime.now() + hour = now.hour + now.minute / 60.0 + hour_sin = math.sin(2 * math.pi * hour / 24) + hour_cos = math.cos(2 * math.pi * hour / 24) + deltas_with_time = [hour_sin, hour_cos] + deltas # 17 features + + # Log to traffic CSV + self._log_traffic(now, hour, deltas_with_time) + + # Periodic log file cleanup (once per day) + if time.time() - self._last_log_cleanup > 86400: + self._cleanup_traffic_log() + self._last_log_cleanup = time.time() + if self.ai_detector.is_learning: - self.ai_detector.collect_sample(deltas) + self.ai_detector.collect_sample(deltas_with_time) if len(self.ai_detector.training_data) % 100 == 0: log.debug("AI learning: %d samples collected", len(self.ai_detector.training_data)) else: - is_anomaly, score = self.ai_detector.predict(deltas) + # Auto-retrain check + retrain_interval = self.cfg['ai'].get('retrain_interval', 86400) + if time.time() - self._last_retrain_time >= retrain_interval: + log.info("Auto-retrain triggered (interval=%ds)", retrain_interval) + if self.ai_detector.retrain_from_log(): + self._last_retrain_time = time.time() + log.info("Auto-retrain completed successfully") + else: + log.warning("Auto-retrain failed, will retry next interval") + self._last_retrain_time = time.time() + + is_anomaly, score = self.ai_detector.predict(deltas_with_time) if is_anomaly: log.warning( "AI ANOMALY detected: score=%.4f deltas=%s",