Files
bunnycdn-mcp/bunnycdn_mcp/tools/shield.py
2026-04-13 10:20:10 +09:00

366 lines
15 KiB
Python

"""Bunny Shield tools - Shield Zone API.
API structure:
- Shield Zone: /shield/shield-zone, /shield/shield-zones
- WAF: /shield/waf/{shieldZoneId}, /shield/waf/custom-rules/{shieldZoneId}
- Rate Limiting: /shield/rate-limits/{shieldZoneId}
- Event Logs: /shield/event-logs/{shieldZoneId}
- Metrics: /shield/metrics/overview/{shieldZoneId}
- DDoS Enums: /shield/ddos/enums
- WAF Profiles: /shield/waf/profiles
NOTE: Pull Zone fields (ShieldDDosProtectionEnabled) are read-only legacy. Do not use.
"""
import json
from typing import Annotated
from pydantic import Field
from ..client import client
from ..config import logger
def register_shield_tools(mcp):
# -- Shield Zone CRUD --
@mcp.tool()
async def bunny_shield_list() -> str:
"""List all Shield Zones with WAF/DDoS configuration."""
try:
data = await client.get("/shield/shield-zones", params={"page": 1, "perPage": 50})
zones = data.get("data", []) if isinstance(data, dict) else data
result = []
for z in zones:
result.append({
"shieldZoneId": z.get("shieldZoneId"),
"pullZoneId": z.get("pullZoneId"),
"wafEnabled": z.get("wafEnabled"),
"dDoSShieldSensitivity": z.get("dDoSShieldSensitivity"),
"dDoSExecutionMode": z.get("dDoSExecutionMode"),
"learningMode": z.get("learningMode"),
"learningModeUntil": z.get("learningModeUntil"),
"planType": z.get("planType"),
})
return json.dumps(result, indent=2)
except Exception as e:
logger.error("bunny_shield_list failed: %s", e)
return f"Error: {e}"
@mcp.tool()
async def bunny_shield_status(
shield_zone_id: Annotated[int, Field(description="Shield Zone ID (not Pull Zone ID). Use bunny_shield_list to find it.")],
) -> str:
"""Get Shield Zone configuration by Shield Zone ID."""
try:
data = await client.get(f"/shield/shield-zone/{shield_zone_id}")
return json.dumps(data, indent=2)
except Exception as e:
logger.error("bunny_shield_status failed: %s", e)
return f"Error: {e}"
@mcp.tool()
async def bunny_shield_create(
pullzone_id: Annotated[int, Field(description="Pull Zone ID to enable Shield on")],
waf_enabled: Annotated[bool, Field(default=True, description="Enable WAF")] = True,
ddos_sensitivity: Annotated[int, Field(default=2, description="DDoS sensitivity: 0=off, 1=low, 2=medium, 3=high, 4=extreme")] = 2,
) -> str:
"""Create a Shield Zone for a Pull Zone (enables Shield protection). Returns 409 if already exists."""
try:
body = {
"pullZoneId": pullzone_id,
"wafEnabled": waf_enabled,
"dDoSShieldSensitivity": ddos_sensitivity,
}
data = await client.post("/shield/shield-zone", json=body)
return json.dumps(data, indent=2)
except Exception as e:
logger.error("bunny_shield_create failed: %s", e)
return f"Error: {e}"
@mcp.tool()
async def bunny_shield_update(
shield_zone_id: Annotated[int, Field(description="Shield Zone ID (not Pull Zone ID)")],
settings: Annotated[str, Field(description="JSON string of shield settings (e.g. '{\"wafEnabled\": true, \"dDoSShieldSensitivity\": 2}')")],
) -> str:
"""Update Shield Zone configuration by Shield Zone ID."""
try:
parsed = json.loads(settings)
except json.JSONDecodeError as e:
return f"Error: Invalid JSON: {e}"
try:
data = await client.put(f"/shield/shield-zone/{shield_zone_id}", json=parsed)
return json.dumps(data, indent=2) if isinstance(data, dict) else f"Shield zone {shield_zone_id} updated"
except Exception as e:
logger.error("bunny_shield_update failed: %s", e)
return f"Error: {e}"
# -- WAF --
@mcp.tool()
async def bunny_waf_rules(
shield_zone_id: Annotated[int, Field(description="Shield Zone ID")],
) -> str:
"""List all managed WAF rules for a Shield Zone."""
try:
data = await client.get(f"/shield/waf/{shield_zone_id}")
return json.dumps(data, indent=2)
except Exception as e:
logger.error("bunny_waf_rules failed: %s", e)
return f"Error: {e}"
@mcp.tool()
async def bunny_waf_custom_rules(
shield_zone_id: Annotated[int, Field(description="Shield Zone ID")],
) -> str:
"""List custom WAF rules for a Shield Zone."""
try:
data = await client.get(f"/shield/waf/custom-rules/{shield_zone_id}")
return json.dumps(data, indent=2)
except Exception as e:
logger.error("bunny_waf_custom_rules failed: %s", e)
return f"Error: {e}"
@mcp.tool()
async def bunny_waf_custom_rule_create(
shield_zone_id: Annotated[int, Field(description="Shield Zone ID")],
rule: Annotated[str, Field(description="JSON string of custom WAF rule definition")],
) -> str:
"""Create a custom WAF rule for a Shield Zone."""
try:
parsed = json.loads(rule)
parsed["shieldZoneId"] = shield_zone_id
except json.JSONDecodeError as e:
return f"Error: Invalid JSON: {e}"
try:
data = await client.post(f"/shield/waf/custom-rules/{shield_zone_id}", json=parsed)
return json.dumps(data, indent=2)
except Exception as e:
logger.error("bunny_waf_custom_rule_create failed: %s", e)
return f"Error: {e}"
@mcp.tool()
async def bunny_waf_profiles() -> str:
"""List available WAF profiles."""
try:
data = await client.get("/shield/waf/profiles")
return json.dumps(data, indent=2)
except Exception as e:
logger.error("bunny_waf_profiles failed: %s", e)
return f"Error: {e}"
# -- Rate Limiting --
@mcp.tool()
async def bunny_rate_limits(
shield_zone_id: Annotated[int, Field(description="Shield Zone ID")],
) -> str:
"""List rate limit rules for a Shield Zone."""
try:
data = await client.get(f"/shield/rate-limits/{shield_zone_id}")
return json.dumps(data, indent=2)
except Exception as e:
logger.error("bunny_rate_limits failed: %s", e)
return f"Error: {e}"
@mcp.tool()
async def bunny_rate_limit_create(
shield_zone_id: Annotated[int, Field(description="Shield Zone ID")],
rule: Annotated[str, Field(description="JSON string of rate limit rule")],
) -> str:
"""Create a rate limit rule for a Shield Zone."""
try:
parsed = json.loads(rule)
parsed["shieldZoneId"] = shield_zone_id
except json.JSONDecodeError as e:
return f"Error: Invalid JSON: {e}"
try:
data = await client.post(f"/shield/rate-limits/{shield_zone_id}", json=parsed)
return json.dumps(data, indent=2)
except Exception as e:
logger.error("bunny_rate_limit_create failed: %s", e)
return f"Error: {e}"
@mcp.tool()
async def bunny_rate_limit_update(
shield_zone_id: Annotated[int, Field(description="Shield Zone ID")],
rule_id: Annotated[int, Field(description="Rate limit rule ID")],
rule: Annotated[str, Field(description="JSON string of updated rule")],
) -> str:
"""Update a rate limit rule."""
try:
parsed = json.loads(rule)
except json.JSONDecodeError as e:
return f"Error: Invalid JSON: {e}"
try:
data = await client.put(f"/shield/rate-limits/{shield_zone_id}/{rule_id}", json=parsed)
return json.dumps(data, indent=2) if isinstance(data, dict) else f"Rate limit {rule_id} updated"
except Exception as e:
logger.error("bunny_rate_limit_update failed: %s", e)
return f"Error: {e}"
@mcp.tool()
async def bunny_rate_limit_delete(
shield_zone_id: Annotated[int, Field(description="Shield Zone ID")],
rule_id: Annotated[int, Field(description="Rate limit rule ID to delete")],
) -> str:
"""Delete a rate limit rule."""
try:
await client.delete(f"/shield/rate-limits/{shield_zone_id}/{rule_id}")
return f"Rate limit rule {rule_id} deleted"
except Exception as e:
logger.error("bunny_rate_limit_delete failed: %s", e)
return f"Error: {e}"
# -- Event Logs --
@mcp.tool()
async def bunny_waf_logs(
shield_zone_id: Annotated[int, Field(description="Shield Zone ID")],
page: Annotated[int, Field(default=1, description="Page number")] = 1,
per_page: Annotated[int, Field(default=25, description="Results per page (max 100)")] = 25,
) -> str:
"""Get WAF event logs for a Shield Zone."""
try:
params = {"page": page, "perPage": min(per_page, 100)}
data = await client.get(f"/shield/event-logs/{shield_zone_id}", params=params)
return json.dumps(data, indent=2)
except Exception as e:
logger.error("bunny_waf_logs failed: %s", e)
return f"Error: {e}"
# -- Metrics --
@mcp.tool()
async def bunny_shield_metrics(
shield_zone_id: Annotated[int, Field(description="Shield Zone ID")],
) -> str:
"""Get Shield metrics overview: WAF, DDoS, Rate Limit, Bot Detection, Access List (28-day summary)."""
try:
data = await client.get(f"/shield/metrics/overview/{shield_zone_id}")
if isinstance(data, dict) and "data" in data:
d = data["data"]
summary = {
"overview": d.get("overview"),
"waf": d.get("waf"),
"dDoS": d.get("dDoS"),
"ratelimit": d.get("ratelimit"),
"botDetection": d.get("botDetection"),
"accessList": d.get("accessList"),
"uploadScanning": d.get("uploadScanning"),
"totalBillableRequests": d.get("totalBillableRequests"),
"totalCleanRequestsLimit": d.get("totalCleanRequestsLimit"),
}
return json.dumps(summary, indent=2)
return json.dumps(data, indent=2)
except Exception as e:
logger.error("bunny_shield_metrics failed: %s", e)
return f"Error: {e}"
# -- Bot Detection (Advanced plan required) --
@mcp.tool()
async def bunny_bot_detection_get(
shield_zone_id: Annotated[int, Field(description="Shield Zone ID")],
) -> str:
"""Get Bot Detection configuration for a Shield Zone. Requires Advanced plan ($9.50/month)."""
try:
data = await client.get(f"/shield/bot-detection/{shield_zone_id}/configuration")
return json.dumps(data, indent=2)
except Exception as e:
logger.error("bunny_bot_detection_get failed: %s", e)
return f"Error: {e}"
@mcp.tool()
async def bunny_bot_detection_update(
shield_zone_id: Annotated[int, Field(description="Shield Zone ID")],
settings: Annotated[str, Field(description="JSON string of bot detection settings")],
) -> str:
"""Update Bot Detection configuration. Requires Advanced plan ($9.50/month)."""
try:
parsed = json.loads(settings)
except json.JSONDecodeError as e:
return f"Error: Invalid JSON: {e}"
try:
data = await client.put(f"/shield/bot-detection/{shield_zone_id}/configuration", json=parsed)
return json.dumps(data, indent=2) if isinstance(data, dict) else f"Bot detection updated for zone {shield_zone_id}"
except Exception as e:
logger.error("bunny_bot_detection_update failed: %s", e)
return f"Error: {e}"
# -- Access Lists (Advanced plan required) --
@mcp.tool()
async def bunny_access_lists(
shield_zone_id: Annotated[int, Field(description="Shield Zone ID")],
) -> str:
"""List Access Lists for a Shield Zone. Requires Advanced plan ($9.50/month)."""
try:
data = await client.get(f"/shield/access-lists/{shield_zone_id}")
return json.dumps(data, indent=2)
except Exception as e:
logger.error("bunny_access_lists failed: %s", e)
return f"Error: {e}"
@mcp.tool()
async def bunny_access_list_create(
shield_zone_id: Annotated[int, Field(description="Shield Zone ID")],
rule: Annotated[str, Field(description="JSON string of access list rule (IPs, CIDRs, ASNs, or countries with allow/block/challenge/log action)")],
) -> str:
"""Create an Access List rule. Requires Advanced plan ($9.50/month)."""
try:
parsed = json.loads(rule)
parsed["shieldZoneId"] = shield_zone_id
except json.JSONDecodeError as e:
return f"Error: Invalid JSON: {e}"
try:
data = await client.post(f"/shield/access-lists/{shield_zone_id}", json=parsed)
return json.dumps(data, indent=2)
except Exception as e:
logger.error("bunny_access_list_create failed: %s", e)
return f"Error: {e}"
@mcp.tool()
async def bunny_access_list_update(
shield_zone_id: Annotated[int, Field(description="Shield Zone ID")],
rule_id: Annotated[int, Field(description="Access list rule ID")],
rule: Annotated[str, Field(description="JSON string of updated access list rule")],
) -> str:
"""Update an Access List rule. Requires Advanced plan ($9.50/month)."""
try:
parsed = json.loads(rule)
except json.JSONDecodeError as e:
return f"Error: Invalid JSON: {e}"
try:
data = await client.put(f"/shield/access-lists/{shield_zone_id}/{rule_id}", json=parsed)
return json.dumps(data, indent=2) if isinstance(data, dict) else f"Access list {rule_id} updated"
except Exception as e:
logger.error("bunny_access_list_update failed: %s", e)
return f"Error: {e}"
@mcp.tool()
async def bunny_access_list_delete(
shield_zone_id: Annotated[int, Field(description="Shield Zone ID")],
rule_id: Annotated[int, Field(description="Access list rule ID to delete")],
) -> str:
"""Delete an Access List rule. Requires Advanced plan ($9.50/month)."""
try:
await client.delete(f"/shield/access-lists/{shield_zone_id}/{rule_id}")
return f"Access list rule {rule_id} deleted"
except Exception as e:
logger.error("bunny_access_list_delete failed: %s", e)
return f"Error: {e}"
# -- DDoS Enums --
@mcp.tool()
async def bunny_ddos_enums() -> str:
"""List DDoS sensitivity and execution mode enum values."""
try:
data = await client.get("/shield/ddos/enums")
return json.dumps(data, indent=2)
except Exception as e:
logger.error("bunny_ddos_enums failed: %s", e)
return f"Error: {e}"