#!/usr/bin/env python3 """Bunny CDN 모니터링: Edge Script 사용량, 에러 통계, WAF 상태 조회.""" import json import os import sys from collections import Counter from datetime import datetime, timedelta, timezone from pathlib import Path import requests from dotenv import load_dotenv load_dotenv(Path(__file__).parent / ".env") BUNNY_API_BASE = "https://api.bunny.net" BUNNY_API_KEY = os.environ.get("BUNNY_API_KEY", "") PULL_ZONE_ID = os.environ.get("BUNNY_PULL_ZONE_ID", "5316471") SCRIPT_ID = os.environ.get("BUNNY_SCRIPT_ID", "64811") FREE_REQUEST_LIMIT = 25_000_000 def api(path: str) -> dict: resp = requests.get( f"{BUNNY_API_BASE}{path}", headers={"AccessKey": BUNNY_API_KEY}, timeout=30, ) resp.raise_for_status() return resp.json() def fmt_num(n: int) -> str: if n >= 1_000_000: return f"{n / 1_000_000:.2f}M" if n >= 1_000: return f"{n / 1_000:.1f}K" return str(n) def edge_script_usage(): """Edge Script 월간 사용량 조회.""" print("=" * 50) print(" Edge Script 월간 사용량") print("=" * 50) data = api(f"/compute/script/{SCRIPT_ID}") reqs = data.get("MonthlyRequestCount", 0) cpu = data.get("MonthlyCpuTime", 0) cost = data.get("MonthlyCost", 0) pct = reqs / FREE_REQUEST_LIMIT * 100 print(f" 요청 수: {fmt_num(reqs):>10} / {fmt_num(FREE_REQUEST_LIMIT)} ({pct:.4f}%)") print(f" CPU 시간: {cpu:>10} ms") print(f" 비용: ${cost:.2f}") if reqs > 0: print(f" 평균 CPU: {cpu / reqs:.1f} ms/req") if pct > 80: print(f"\n ⚠ 무료 한도의 {pct:.1f}% 사용 중 — 주의 필요!") elif pct > 50: print(f"\n △ 무료 한도의 {pct:.1f}% 사용 중") else: print(f"\n ✓ 여유 충분") print() def cdn_statistics(days: int = 30): """CDN 트래픽 및 에러 통계.""" print("=" * 50) print(f" CDN 통계 (최근 {days}일)") print("=" * 50) date_from = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%d") date_to = datetime.now(timezone.utc).strftime("%Y-%m-%d") data = api(f"/statistics?pullZone={PULL_ZONE_ID}&dateFrom={date_from}&dateTo={date_to}") total = data.get("TotalRequestsServed", 0) cache_rate = data.get("CacheHitRate", 0) bw = data.get("TotalBandwidthUsed", 0) origin_bw = data.get("TotalOriginTraffic", 0) avg_origin_rt = data.get("AverageOriginResponseTime", 0) print(f" 총 요청: {fmt_num(total)}") print(f" 캐시 히트율: {cache_rate:.1f}%") print(f" 총 대역폭: {bw / 1024 / 1024:.1f} MB") print(f" 오리진 트래픽: {origin_bw / 1024 / 1024:.1f} MB") print(f" 오리진 응답시간: {avg_origin_rt:.0f} ms (평균)") print() # 에러 통계 e3 = sum(data.get("Error3xxChart", {}).values()) e4 = sum(data.get("Error4xxChart", {}).values()) e5 = sum(data.get("Error5xxChart", {}).values()) print(" --- 응답 코드 ---") print(f" 3xx: {fmt_num(e3):>8}", end="") print(f" ({e3 / total * 100:.2f}%)" if total else "") print(f" 4xx: {fmt_num(e4):>8}", end="") print(f" ({e4 / total * 100:.2f}%)" if total else "") print(f" 5xx: {fmt_num(e5):>8}", end="") print(f" ({e5 / total * 100:.2f}%)" if total else "") if e4 > 0 and total > 0: print(f"\n ⚠ 4xx 에러 발생 중 (429 Rate Limit 포함 가능)") else: print(f"\n ✓ 4xx/5xx 에러 없음 — Rate Limit 정상") # 일별 에러 상세 e4_chart = data.get("Error4xxChart", {}) e5_chart = data.get("Error5xxChart", {}) error_days = {k[:10]: v for k, v in e4_chart.items() if v > 0} error_days_5 = {k[:10]: v for k, v in e5_chart.items() if v > 0} if error_days or error_days_5: print("\n --- 일별 에러 상세 ---") all_dates = sorted(set(list(error_days.keys()) + list(error_days_5.keys()))) for d in all_dates: parts = [] if d in error_days: parts.append(f"4xx={error_days[d]}") if d in error_days_5: parts.append(f"5xx={error_days_5[d]}") print(f" {d}: {', '.join(parts)}") print() def waf_status(): """WAF/Shield 상태 및 이벤트 로그 조회.""" print("=" * 50) print(" WAF / Bunny Shield 상태") print("=" * 50) # Pull Zone에서 Shield 활성 상태 확인 try: pz = api(f"/pullzone/{PULL_ZONE_ID}") shield_ddos = pz.get("ShieldDDosProtectionEnabled", False) zone_sec = pz.get("ZoneSecurityEnabled", False) print(f" Shield DDoS: {'✓ 활성' if shield_ddos else '✗ 비활성'}") print(f" Zone Security: {'✓ 활성' if zone_sec else '✗ 비활성'}") except requests.HTTPError: shield_ddos = False zone_sec = False # Shield Zone ID 조회 shield_zone_id = None shield_data = None try: sz = api(f"/shield/shield-zone/get-by-pullzone/{PULL_ZONE_ID}") shield_data = sz.get("data", {}) if isinstance(sz, dict) else {} shield_zone_id = shield_data.get("shieldZoneId") if shield_zone_id: plan_names = {0: "Free", 1: "Premium", 2: "Business", 3: "Enterprise"} plan = plan_names.get(shield_data.get("planType", -1), "?") waf_on = shield_data.get("wafEnabled", False) learning = shield_data.get("learningMode", False) print(f" Shield Zone: #{shield_zone_id} ({plan})") print(f" WAF: {'✓ 활성' if waf_on else '✗ 비활성'}") if learning: until = shield_data.get("learningModeUntil", "?")[:10] print(f" Learning Mode: ✓ ({until}까지)") except requests.HTTPError: pass if not shield_zone_id: print(f"\n Shield Zone 미생성 — WAF 이벤트 로그 조회 불가") print(f" 활성화: https://dash.bunny.net/cdn/{PULL_ZONE_ID}/security") print() # WAF 룰 카탈로그 try: data = api(f"/shield/waf/rules?pullZoneId={PULL_ZONE_ID}") except requests.HTTPError: print(f" WAF 룰 조회 불가") print() return total_groups = 0 total_rules = 0 active_categories = [] for category in data: for group in category.get("ruleGroups", []): total_groups += 1 rules = group.get("rules", []) rule_count = len(rules) total_rules += rule_count if rule_count > 0: active_categories.append((group.get("name", "?"), rule_count)) print(f" WAF 룰 그룹: {total_groups}개") print(f" WAF 활성 룰: {total_rules}개") print() if active_categories: print(" --- 활성 룰 그룹 (상위 10) ---") for name, count in sorted(active_categories, key=lambda x: -x[1])[:10]: print(f" {name:<45} {count:>3}개") print() # Shield가 활성이면 이벤트 로그 조회 if shield_zone_id: _fetch_waf_logs(shield_zone_id) else: print(f" WAF 이벤트 로그: https://dash.bunny.net/cdn/{PULL_ZONE_ID}/security/waf-log") print() def _fetch_waf_logs(shield_zone_id: int): """Shield 이벤트 로그 조회 및 요약.""" today = datetime.now(timezone.utc).strftime("%m-%d-%Y") print(" --- WAF 이벤트 로그 (오늘) ---") try: data = api(f"/shield/event-logs/{shield_zone_id}/{today}/") logs = data.get("logs") or [] if not logs: print(" ✓ 트리거된 WAF 이벤트 없음") print() return statuses = Counter() messages = Counter() countries = Counter() ips = Counter() for entry in logs: labels = entry.get("labels", {}) statuses[labels.get("status", "?")] += 1 countries[labels.get("country", "?")] += 1 log_data = json.loads(entry.get("log", "{}")) ips[log_data.get("RemoteIp", "?")] += 1 messages[log_data.get("Message", "?")] += 1 print(f" 총 {len(logs)}건", end="") if data.get("hasMoreData"): print(" (추가 데이터 있음)", end="") print() blocked = statuses.get("Blocked", 0) logged = statuses.get("Logged", 0) print(f" 차단: {blocked}건 | 로그만: {logged}건") print() print(" 공격 유형:") for msg, cnt in messages.most_common(5): print(f" {msg:<50} {cnt:>3}건") print() print(" IP (상위 5):") for ip, cnt in ips.most_common(5): print(f" {ip:<20} {cnt:>3}건") print() print(" 국가:") for cc, cnt in countries.most_common(5): print(f" {cc:<5} {cnt:>3}건") except requests.HTTPError as e: print(f" 이벤트 로그 조회 실패: {e}") print() def main(): if not BUNNY_API_KEY: print("ERROR: BUNNY_API_KEY 환경변수가 필요합니다.", file=sys.stderr) sys.exit(1) days = 30 if len(sys.argv) > 1: try: days = int(sys.argv[1]) except ValueError: print(f"사용법: {sys.argv[0]} [일수] (기본: 30)", file=sys.stderr) sys.exit(1) print() print(f" Bunny CDN 모니터링 — {datetime.now().strftime('%Y-%m-%d %H:%M')}") print(f" Pull Zone: {PULL_ZONE_ID} | Script: {SCRIPT_ID}") print() edge_script_usage() cdn_statistics(days) waf_status() if __name__ == "__main__": main()