From 3d1e353b1a17a3dde59e7246095c95f7a52aa59c Mon Sep 17 00:00:00 2001 From: kaffa Date: Sat, 7 Feb 2026 10:30:10 +0900 Subject: [PATCH] Replace CSV traffic log with SQLite for better performance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - traffic_log.csv → traffic_log.db (SQLite with indexed timestamp) - INSERT instead of CSV append, DELETE instead of file rewrite - CLI queries use SQL (GROUP BY for traffic, LIMIT for log) - retrain_from_log() uses read-only connection with time range query - Config key: traffic_log_file → traffic_log_db Co-Authored-By: Claude Opus 4.6 --- bin/xdp-defense | 115 +++++++++++++-------------- config/config.yaml | 2 +- lib/xdp_defense_daemon.py | 162 +++++++++++++++++++++----------------- 3 files changed, 147 insertions(+), 132 deletions(-) diff --git a/bin/xdp-defense b/bin/xdp-defense index d04b6e2..6e97594 100755 --- a/bin/xdp-defense +++ b/bin/xdp-defense @@ -802,47 +802,41 @@ cmd_ai_retrain() { } cmd_ai_traffic() { - local log_file - log_file=$(python3 -c " + local db_file + db_file=$(python3 -c " import yaml with open('$CONFIG_FILE') as f: cfg = yaml.safe_load(f) -print(cfg.get('ai',{}).get('traffic_log_file', '/var/lib/xdp-defense/traffic_log.csv')) -" 2>/dev/null || echo "/var/lib/xdp-defense/traffic_log.csv") +print(cfg.get('ai',{}).get('traffic_log_db', '/var/lib/xdp-defense/traffic_log.db')) +" 2>/dev/null || echo "/var/lib/xdp-defense/traffic_log.db") - [ ! -f "$log_file" ] && { log_err "Traffic log not found: $log_file"; exit 1; } + [ ! -f "$db_file" ] && { log_err "Traffic log not found: $db_file"; exit 1; } python3 -c " -import csv, sys +import sqlite3, sys from datetime import datetime, timedelta -log_file = sys.argv[1] -cutoff = datetime.now() - timedelta(hours=24) +db_file = sys.argv[1] +cutoff = (datetime.now() - timedelta(hours=24)).isoformat() + +conn = sqlite3.connect(db_file) +cur = conn.cursor() # Buckets: 0-6, 6-12, 12-18, 18-24 buckets = {0: [], 1: [], 2: [], 3: []} total_samples = 0 -with open(log_file, 'r') as f: - reader = csv.reader(f) - header = next(reader, None) - if header is None: - print('Traffic log is empty') - sys.exit(0) - for row in reader: - try: - ts = datetime.fromisoformat(row[0]) - if ts < cutoff: - continue - hour = float(row[1]) - bucket = min(int(hour // 6), 3) - # features: row[2]=hour_sin, row[3]=hour_cos, row[4]=total_packets, row[5]=total_bytes, ... - pps = float(row[4]) - bps = float(row[5]) - buckets[bucket].append((pps, bps)) - total_samples += 1 - except (ValueError, IndexError): - continue +cur.execute('SELECT hour, total_packets, total_bytes FROM traffic_samples WHERE timestamp >= ?', (cutoff,)) +for row in cur.fetchall(): + try: + hour = float(row[0]) + bucket = min(int(hour // 6), 3) + pps = float(row[1]) + bps = float(row[2]) + buckets[bucket].append((pps, bps)) + total_samples += 1 + except (ValueError, TypeError): + continue labels = ['00:00-06:00', '06:00-12:00', '12:00-18:00', '18:00-24:00'] print() @@ -873,6 +867,8 @@ for i, label in enumerate(labels): hours = total_samples * 5 / 3600 # 5s intervals print(f'Total: {total_samples} samples ({hours:.1f}h)') +conn.close() + # Show next retrain time import yaml, os, time try: @@ -894,62 +890,67 @@ try: except: pass print() -" "$log_file" +" "$db_file" } cmd_ai_log() { local n=${1:-20} [[ "$n" =~ ^[0-9]+$ ]] || n=20 - local log_file - log_file=$(python3 -c " + local db_file + db_file=$(python3 -c " import yaml with open('$CONFIG_FILE') as f: cfg = yaml.safe_load(f) -print(cfg.get('ai',{}).get('traffic_log_file', '/var/lib/xdp-defense/traffic_log.csv')) -" 2>/dev/null || echo "/var/lib/xdp-defense/traffic_log.csv") +print(cfg.get('ai',{}).get('traffic_log_db', '/var/lib/xdp-defense/traffic_log.db')) +" 2>/dev/null || echo "/var/lib/xdp-defense/traffic_log.db") - [ ! -f "$log_file" ] && { log_err "Traffic log not found: $log_file"; exit 1; } + [ ! -f "$db_file" ] && { log_err "Traffic log not found: $db_file"; exit 1; } python3 -c " -import csv, sys +import sqlite3, sys -log_file = sys.argv[1] +db_file = sys.argv[1] n = int(sys.argv[2]) -rows = [] -with open(log_file, 'r') as f: - reader = csv.reader(f) - header = next(reader, None) - if header is None: - print('Traffic log is empty') - sys.exit(0) - for row in reader: - rows.append(row) +conn = sqlite3.connect(db_file) +cur = conn.cursor() + +cur.execute('SELECT COUNT(*) FROM traffic_samples') +total_count = cur.fetchone()[0] + +if total_count == 0: + print('Traffic log is empty') + conn.close() + sys.exit(0) + +cur.execute('SELECT timestamp, hour, total_packets, total_bytes, syn_ratio, udp_ratio, icmp_ratio FROM traffic_samples ORDER BY id DESC LIMIT ?', (n,)) +rows = cur.fetchall() +rows.reverse() + +conn.close() -# Show last N rows -display = rows[-n:] print() print('\033[1m=== Recent Traffic Log ===\033[0m') print(f'{\"Timestamp\":>22} {\"Hour\":>6} {\"PPS\":>10} {\"Bytes\":>12} {\"SYN%\":>6} {\"UDP%\":>6} {\"ICMP%\":>6}') print(f'{\"-\"*22} {\"-\"*6} {\"-\"*10} {\"-\"*12} {\"-\"*6} {\"-\"*6} {\"-\"*6}') -for row in display: +for row in rows: try: - ts = row[0][:19] # trim microseconds + ts = str(row[0])[:19] # trim microseconds hour = float(row[1]) - pkts = float(row[4]) - bts = float(row[5]) - syn_r = float(row[14]) * 100 if len(row) > 14 else 0 - udp_r = float(row[15]) * 100 if len(row) > 15 else 0 - icmp_r = float(row[16]) * 100 if len(row) > 16 else 0 + pkts = float(row[2]) + bts = float(row[3]) + syn_r = float(row[4]) * 100 if row[4] is not None else 0 + udp_r = float(row[5]) * 100 if row[5] is not None else 0 + icmp_r = float(row[6]) * 100 if row[6] is not None else 0 print(f'{ts:>22} {hour:>6.1f} {pkts:>10.0f} {bts:>12.0f} {syn_r:>5.1f}% {udp_r:>5.1f}% {icmp_r:>5.1f}%') - except (ValueError, IndexError): + except (ValueError, TypeError): continue -print(f'Showing {len(display)} of {len(rows)} entries') +print(f'Showing {len(rows)} of {total_count} entries') print() -" "$log_file" "$n" +" "$db_file" "$n" } # ==================== GeoIP ==================== diff --git a/config/config.yaml b/config/config.yaml index a80890c..5280a26 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -71,5 +71,5 @@ ai: training_data_file: /var/lib/xdp-defense/training_data.csv # Traffic logging - traffic_log_file: /var/lib/xdp-defense/traffic_log.csv + traffic_log_db: /var/lib/xdp-defense/traffic_log.db traffic_log_retention_days: 7 # days to keep traffic log data diff --git a/lib/xdp_defense_daemon.py b/lib/xdp_defense_daemon.py index 53d010c..e12d6dc 100755 --- a/lib/xdp_defense_daemon.py +++ b/lib/xdp_defense_daemon.py @@ -22,6 +22,7 @@ import logging import logging.handlers import csv import pickle +import sqlite3 from collections import defaultdict from datetime import datetime, timedelta @@ -81,7 +82,7 @@ DEFAULT_CONFIG = { 'min_packets_for_sample': 20, 'model_file': '/var/lib/xdp-defense/ai_model.pkl', 'training_data_file': '/var/lib/xdp-defense/training_data.csv', - 'traffic_log_file': '/var/lib/xdp-defense/traffic_log.csv', + 'traffic_log_db': '/var/lib/xdp-defense/traffic_log.db', 'traffic_log_retention_days': 7, 'retrain_interval': 86400, 'retrain_window': 86400, @@ -331,36 +332,30 @@ class AIDetector: log.error("AI prediction error: %s", e) return False, 0.0 - def retrain_from_log(self): - """Retrain the model from traffic_log.csv data.""" - log_file = self.cfg.get('traffic_log_file', '/var/lib/xdp-defense/traffic_log.csv') - if not os.path.exists(log_file): - log.warning("Traffic log not found: %s", log_file) + def retrain_from_log(self, db_path=None): + """Retrain the model from traffic_log.db data.""" + if db_path is None: + db_path = self.cfg.get('traffic_log_db', '/var/lib/xdp-defense/traffic_log.db') + if not os.path.exists(db_path): + log.warning("Traffic log DB not found: %s", db_path) return False retrain_window = self.cfg.get('retrain_window', 86400) - cutoff = datetime.now() - timedelta(seconds=retrain_window) + cutoff = (datetime.now() - timedelta(seconds=retrain_window)).isoformat() + conn = None try: - samples = [] - with open(log_file, 'r', newline='') as f: - reader = csv.reader(f) - header = next(reader, None) - if header is None: - log.warning("Traffic log is empty") - return False - - # Feature columns: skip timestamp and hour (first 2), take remaining 17 - for row in reader: - try: - ts = datetime.fromisoformat(row[0]) - if ts < cutoff: - continue - features = [float(v) for v in row[2:]] # skip timestamp, hour - if len(features) == 17: - samples.append(features) - except (ValueError, IndexError): - continue + conn = sqlite3.connect(f'file:{db_path}?mode=ro', uri=True) + cur = conn.execute( + 'SELECT hour_sin, hour_cos, total_packets, total_bytes, ' + 'tcp_syn_count, tcp_other_count, udp_count, icmp_count, ' + 'other_proto_count, unique_ips_approx, small_pkt_count, ' + 'large_pkt_count, syn_ratio, udp_ratio, icmp_ratio, ' + 'small_pkt_ratio, avg_pkt_size ' + 'FROM traffic_samples WHERE timestamp >= ? ORDER BY timestamp', + (cutoff,) + ) + samples = [list(row) for row in cur.fetchall()] if len(samples) < 10: log.warning("Not enough recent samples for retrain (%d)", len(samples)) @@ -375,6 +370,9 @@ class AIDetector: except Exception as e: log.error("retrain_from_log failed: %s", e) return False + finally: + if conn: + conn.close() # ==================== ProfileManager ==================== @@ -488,6 +486,8 @@ class DDoSDaemon: self._last_retrain_time = self._get_model_mtime() self._last_log_cleanup = time.time() + self._init_traffic_db() + level = self.cfg['general'].get('log_level', 'info').upper() log.setLevel(getattr(logging, level, logging.INFO)) @@ -550,69 +550,82 @@ class DDoSDaemon: def _handle_sigusr1(self, signum, frame): log.info("SIGUSR1 received, triggering retrain from traffic log...") - if self.ai_detector.retrain_from_log(): + db_path = self.cfg['ai'].get('traffic_log_db', '/var/lib/xdp-defense/traffic_log.db') + if self.ai_detector.retrain_from_log(db_path): self._last_retrain_time = time.time() log.info("SIGUSR1 retrain completed successfully") else: log.warning("SIGUSR1 retrain failed (falling back to collect mode)") self.ai_detector.request_retrain() - # ---- Traffic Logging ---- + # ---- Traffic Logging (SQLite) ---- - TRAFFIC_CSV_HEADER = [ - 'timestamp', 'hour', - 'hour_sin', 'hour_cos', - 'total_packets', 'total_bytes', 'tcp_syn_count', 'tcp_other_count', - 'udp_count', 'icmp_count', 'other_proto_count', 'unique_ips_approx', - 'small_pkt_count', 'large_pkt_count', - 'syn_ratio', 'udp_ratio', 'icmp_ratio', 'small_pkt_ratio', 'avg_pkt_size' - ] + def _init_traffic_db(self): + """Initialize SQLite database for traffic logging.""" + db_path = self.cfg['ai'].get('traffic_log_db', '/var/lib/xdp-defense/traffic_log.db') + os.makedirs(os.path.dirname(db_path), exist_ok=True) + self._traffic_db = sqlite3.connect(db_path, check_same_thread=False) + self._traffic_db.execute( + 'CREATE TABLE IF NOT EXISTS traffic_samples (' + ' id INTEGER PRIMARY KEY AUTOINCREMENT,' + ' timestamp TEXT NOT NULL,' + ' hour REAL NOT NULL,' + ' hour_sin REAL NOT NULL,' + ' hour_cos REAL NOT NULL,' + ' total_packets REAL NOT NULL,' + ' total_bytes REAL NOT NULL,' + ' tcp_syn_count REAL NOT NULL,' + ' tcp_other_count REAL NOT NULL,' + ' udp_count REAL NOT NULL,' + ' icmp_count REAL NOT NULL,' + ' other_proto_count REAL NOT NULL,' + ' unique_ips_approx REAL NOT NULL,' + ' small_pkt_count REAL NOT NULL,' + ' large_pkt_count REAL NOT NULL,' + ' syn_ratio REAL NOT NULL,' + ' udp_ratio REAL NOT NULL,' + ' icmp_ratio REAL NOT NULL,' + ' small_pkt_ratio REAL NOT NULL,' + ' avg_pkt_size REAL NOT NULL' + ')' + ) + self._traffic_db.execute( + 'CREATE INDEX IF NOT EXISTS idx_timestamp ON traffic_samples(timestamp)' + ) + self._traffic_db.commit() + log.info("Traffic log DB initialized: %s", db_path) def _log_traffic(self, now, hour, features): - """Append one row to traffic_log.csv.""" - log_file = self.cfg['ai'].get('traffic_log_file', '/var/lib/xdp-defense/traffic_log.csv') + """Insert one row into traffic_samples table.""" try: - write_header = not os.path.exists(log_file) or os.path.getsize(log_file) == 0 - os.makedirs(os.path.dirname(log_file), exist_ok=True) - with open(log_file, 'a', newline='') as f: - writer = csv.writer(f) - if write_header: - writer.writerow(self.TRAFFIC_CSV_HEADER) - row = [now.isoformat(), f'{hour:.4f}'] + [f'{v:.6f}' for v in features] - writer.writerow(row) + self._traffic_db.execute( + 'INSERT INTO traffic_samples (' + ' timestamp, hour, hour_sin, hour_cos,' + ' total_packets, total_bytes, tcp_syn_count, tcp_other_count,' + ' udp_count, icmp_count, other_proto_count, unique_ips_approx,' + ' small_pkt_count, large_pkt_count,' + ' syn_ratio, udp_ratio, icmp_ratio, small_pkt_ratio, avg_pkt_size' + ') VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', + (now.isoformat(), hour, *features) + ) + self._traffic_db.commit() except Exception as e: log.error("Failed to write traffic log: %s", e) def _cleanup_traffic_log(self): - """Remove entries older than retention_days from traffic_log.csv.""" - log_file = self.cfg['ai'].get('traffic_log_file', '/var/lib/xdp-defense/traffic_log.csv') + """Remove entries older than retention_days from traffic_samples.""" retention_days = self.cfg['ai'].get('traffic_log_retention_days', 7) - cutoff = datetime.now() - timedelta(days=retention_days) - - if not os.path.exists(log_file): - return + cutoff = (datetime.now() - timedelta(days=retention_days)).isoformat() try: - kept = [] - header = None - with open(log_file, 'r', newline='') as f: - reader = csv.reader(f) - header = next(reader, None) - for row in reader: - try: - ts = datetime.fromisoformat(row[0]) - if ts >= cutoff: - kept.append(row) - except (ValueError, IndexError): - kept.append(row) # keep unparseable rows - - with open(log_file, 'w', newline='') as f: - writer = csv.writer(f) - if header: - writer.writerow(header) - writer.writerows(kept) - - log.info("Traffic log cleanup: kept %d rows (retention=%dd)", len(kept), retention_days) + cur = self._traffic_db.execute( + 'DELETE FROM traffic_samples WHERE timestamp < ?', (cutoff,) + ) + deleted = cur.rowcount + self._traffic_db.commit() + if deleted > 1000: + self._traffic_db.execute('VACUUM') + log.info("Traffic log cleanup: deleted %d rows (retention=%dd)", deleted, retention_days) except Exception as e: log.error("Traffic log cleanup failed: %s", e) @@ -740,7 +753,7 @@ class DDoSDaemon: hour_cos = math.cos(2 * math.pi * hour / 24) deltas_with_time = [hour_sin, hour_cos] + deltas # 17 features - # Log to traffic CSV + # Log to traffic DB self._log_traffic(now, hour, deltas_with_time) # Periodic log file cleanup (once per day) @@ -758,7 +771,8 @@ class DDoSDaemon: retrain_interval = self.cfg['ai'].get('retrain_interval', 86400) if time.time() - self._last_retrain_time >= retrain_interval: log.info("Auto-retrain triggered (interval=%ds)", retrain_interval) - if self.ai_detector.retrain_from_log(): + db_path = self.cfg['ai'].get('traffic_log_db', '/var/lib/xdp-defense/traffic_log.db') + if self.ai_detector.retrain_from_log(db_path): self._last_retrain_time = time.time() log.info("Auto-retrain completed successfully") else: