Files
crowdsec-bunny-bouncer/monitor.py
2026-02-13 14:33:38 +09:00

290 lines
9.4 KiB
Python

#!/usr/bin/env python3
"""Bunny CDN 모니터링: Edge Script 사용량, 에러 통계, WAF 상태 조회."""
import json
import os
import sys
from collections import Counter
from datetime import datetime, timedelta, timezone
from pathlib import Path
import requests
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent / ".env")
BUNNY_API_BASE = "https://api.bunny.net"
BUNNY_API_KEY = os.environ.get("BUNNY_API_KEY", "")
PULL_ZONE_ID = os.environ.get("BUNNY_PULL_ZONE_ID", "5316471")
SCRIPT_ID = os.environ.get("BUNNY_SCRIPT_ID", "64811")
FREE_REQUEST_LIMIT = 25_000_000
def api(path: str) -> dict:
resp = requests.get(
f"{BUNNY_API_BASE}{path}",
headers={"AccessKey": BUNNY_API_KEY},
timeout=30,
)
resp.raise_for_status()
return resp.json()
def fmt_num(n: int) -> str:
if n >= 1_000_000:
return f"{n / 1_000_000:.2f}M"
if n >= 1_000:
return f"{n / 1_000:.1f}K"
return str(n)
def edge_script_usage():
"""Edge Script 월간 사용량 조회."""
print("=" * 50)
print(" Edge Script 월간 사용량")
print("=" * 50)
data = api(f"/compute/script/{SCRIPT_ID}")
reqs = data.get("MonthlyRequestCount", 0)
cpu = data.get("MonthlyCpuTime", 0)
cost = data.get("MonthlyCost", 0)
pct = reqs / FREE_REQUEST_LIMIT * 100
print(f" 요청 수: {fmt_num(reqs):>10} / {fmt_num(FREE_REQUEST_LIMIT)} ({pct:.4f}%)")
print(f" CPU 시간: {cpu:>10} ms")
print(f" 비용: ${cost:.2f}")
if reqs > 0:
print(f" 평균 CPU: {cpu / reqs:.1f} ms/req")
if pct > 80:
print(f"\n ⚠ 무료 한도의 {pct:.1f}% 사용 중 — 주의 필요!")
elif pct > 50:
print(f"\n △ 무료 한도의 {pct:.1f}% 사용 중")
else:
print(f"\n ✓ 여유 충분")
print()
def cdn_statistics(days: int = 30):
"""CDN 트래픽 및 에러 통계."""
print("=" * 50)
print(f" CDN 통계 (최근 {days}일)")
print("=" * 50)
date_from = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%d")
date_to = datetime.now(timezone.utc).strftime("%Y-%m-%d")
data = api(f"/statistics?pullZone={PULL_ZONE_ID}&dateFrom={date_from}&dateTo={date_to}")
total = data.get("TotalRequestsServed", 0)
cache_rate = data.get("CacheHitRate", 0)
bw = data.get("TotalBandwidthUsed", 0)
origin_bw = data.get("TotalOriginTraffic", 0)
avg_origin_rt = data.get("AverageOriginResponseTime", 0)
print(f" 총 요청: {fmt_num(total)}")
print(f" 캐시 히트율: {cache_rate:.1f}%")
print(f" 총 대역폭: {bw / 1024 / 1024:.1f} MB")
print(f" 오리진 트래픽: {origin_bw / 1024 / 1024:.1f} MB")
print(f" 오리진 응답시간: {avg_origin_rt:.0f} ms (평균)")
print()
# 에러 통계
e3 = sum(data.get("Error3xxChart", {}).values())
e4 = sum(data.get("Error4xxChart", {}).values())
e5 = sum(data.get("Error5xxChart", {}).values())
print(" --- 응답 코드 ---")
print(f" 3xx: {fmt_num(e3):>8}", end="")
print(f" ({e3 / total * 100:.2f}%)" if total else "")
print(f" 4xx: {fmt_num(e4):>8}", end="")
print(f" ({e4 / total * 100:.2f}%)" if total else "")
print(f" 5xx: {fmt_num(e5):>8}", end="")
print(f" ({e5 / total * 100:.2f}%)" if total else "")
if e4 > 0 and total > 0:
print(f"\n ⚠ 4xx 에러 발생 중 (429 Rate Limit 포함 가능)")
else:
print(f"\n ✓ 4xx/5xx 에러 없음 — Rate Limit 정상")
# 일별 에러 상세
e4_chart = data.get("Error4xxChart", {})
e5_chart = data.get("Error5xxChart", {})
error_days = {k[:10]: v for k, v in e4_chart.items() if v > 0}
error_days_5 = {k[:10]: v for k, v in e5_chart.items() if v > 0}
if error_days or error_days_5:
print("\n --- 일별 에러 상세 ---")
all_dates = sorted(set(list(error_days.keys()) + list(error_days_5.keys())))
for d in all_dates:
parts = []
if d in error_days:
parts.append(f"4xx={error_days[d]}")
if d in error_days_5:
parts.append(f"5xx={error_days_5[d]}")
print(f" {d}: {', '.join(parts)}")
print()
def waf_status():
"""WAF/Shield 상태 및 이벤트 로그 조회."""
print("=" * 50)
print(" WAF / Bunny Shield 상태")
print("=" * 50)
# Pull Zone에서 Shield 활성 상태 확인
try:
pz = api(f"/pullzone/{PULL_ZONE_ID}")
shield_ddos = pz.get("ShieldDDosProtectionEnabled", False)
zone_sec = pz.get("ZoneSecurityEnabled", False)
print(f" Shield DDoS: {'✓ 활성' if shield_ddos else '✗ 비활성'}")
print(f" Zone Security: {'✓ 활성' if zone_sec else '✗ 비활성'}")
except requests.HTTPError:
shield_ddos = False
zone_sec = False
# Shield Zone ID 조회
shield_zone_id = None
shield_data = None
try:
sz = api(f"/shield/shield-zone/get-by-pullzone/{PULL_ZONE_ID}")
shield_data = sz.get("data", {}) if isinstance(sz, dict) else {}
shield_zone_id = shield_data.get("shieldZoneId")
if shield_zone_id:
plan_names = {0: "Free", 1: "Premium", 2: "Business", 3: "Enterprise"}
plan = plan_names.get(shield_data.get("planType", -1), "?")
waf_on = shield_data.get("wafEnabled", False)
learning = shield_data.get("learningMode", False)
print(f" Shield Zone: #{shield_zone_id} ({plan})")
print(f" WAF: {'✓ 활성' if waf_on else '✗ 비활성'}")
if learning:
until = shield_data.get("learningModeUntil", "?")[:10]
print(f" Learning Mode: ✓ ({until}까지)")
except requests.HTTPError:
pass
if not shield_zone_id:
print(f"\n Shield Zone 미생성 — WAF 이벤트 로그 조회 불가")
print(f" 활성화: https://dash.bunny.net/cdn/{PULL_ZONE_ID}/security")
print()
# WAF 룰 카탈로그
try:
data = api(f"/shield/waf/rules?pullZoneId={PULL_ZONE_ID}")
except requests.HTTPError:
print(f" WAF 룰 조회 불가")
print()
return
total_groups = 0
total_rules = 0
active_categories = []
for category in data:
for group in category.get("ruleGroups", []):
total_groups += 1
rules = group.get("rules", [])
rule_count = len(rules)
total_rules += rule_count
if rule_count > 0:
active_categories.append((group.get("name", "?"), rule_count))
print(f" WAF 룰 그룹: {total_groups}")
print(f" WAF 활성 룰: {total_rules}")
print()
if active_categories:
print(" --- 활성 룰 그룹 (상위 10) ---")
for name, count in sorted(active_categories, key=lambda x: -x[1])[:10]:
print(f" {name:<45} {count:>3}")
print()
# Shield가 활성이면 이벤트 로그 조회
if shield_zone_id:
_fetch_waf_logs(shield_zone_id)
else:
print(f" WAF 이벤트 로그: https://dash.bunny.net/cdn/{PULL_ZONE_ID}/security/waf-log")
print()
def _fetch_waf_logs(shield_zone_id: int):
"""Shield 이벤트 로그 조회 및 요약."""
today = datetime.now(timezone.utc).strftime("%m-%d-%Y")
print(" --- WAF 이벤트 로그 (오늘) ---")
try:
data = api(f"/shield/event-logs/{shield_zone_id}/{today}/")
logs = data.get("logs") or []
if not logs:
print(" ✓ 트리거된 WAF 이벤트 없음")
print()
return
statuses = Counter()
messages = Counter()
countries = Counter()
ips = Counter()
for entry in logs:
labels = entry.get("labels", {})
statuses[labels.get("status", "?")] += 1
countries[labels.get("country", "?")] += 1
log_data = json.loads(entry.get("log", "{}"))
ips[log_data.get("RemoteIp", "?")] += 1
messages[log_data.get("Message", "?")] += 1
print(f"{len(logs)}", end="")
if data.get("hasMoreData"):
print(" (추가 데이터 있음)", end="")
print()
blocked = statuses.get("Blocked", 0)
logged = statuses.get("Logged", 0)
print(f" 차단: {blocked}건 | 로그만: {logged}")
print()
print(" 공격 유형:")
for msg, cnt in messages.most_common(5):
print(f" {msg:<50} {cnt:>3}")
print()
print(" IP (상위 5):")
for ip, cnt in ips.most_common(5):
print(f" {ip:<20} {cnt:>3}")
print()
print(" 국가:")
for cc, cnt in countries.most_common(5):
print(f" {cc:<5} {cnt:>3}")
except requests.HTTPError as e:
print(f" 이벤트 로그 조회 실패: {e}")
print()
def main():
if not BUNNY_API_KEY:
print("ERROR: BUNNY_API_KEY 환경변수가 필요합니다.", file=sys.stderr)
sys.exit(1)
days = 30
if len(sys.argv) > 1:
try:
days = int(sys.argv[1])
except ValueError:
print(f"사용법: {sys.argv[0]} [일수] (기본: 30)", file=sys.stderr)
sys.exit(1)
print()
print(f" Bunny CDN 모니터링 — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print(f" Pull Zone: {PULL_ZONE_ID} | Script: {SCRIPT_ID}")
print()
edge_script_usage()
cdn_statistics(days)
waf_status()
if __name__ == "__main__":
main()