"""Bunny Shield tools - Shield Zone API. API structure: - Shield Zone: /shield/shield-zone, /shield/shield-zones - WAF: /shield/waf/{shieldZoneId}, /shield/waf/custom-rules/{shieldZoneId} - Rate Limiting: /shield/rate-limits/{shieldZoneId} - Event Logs: /shield/event-logs/{shieldZoneId} - Metrics: /shield/metrics/overview/{shieldZoneId} - DDoS Enums: /shield/ddos/enums - WAF Profiles: /shield/waf/profiles NOTE: Pull Zone fields (ShieldDDosProtectionEnabled) are read-only legacy. Do not use. """ import json from typing import Annotated from pydantic import Field from ..client import client from ..config import logger def register_shield_tools(mcp): # -- Shield Zone CRUD -- @mcp.tool() async def bunny_shield_list() -> str: """List all Shield Zones with WAF/DDoS configuration.""" try: data = await client.get("/shield/shield-zones", params={"page": 1, "perPage": 50}) zones = data.get("data", []) if isinstance(data, dict) else data result = [] for z in zones: result.append({ "shieldZoneId": z.get("shieldZoneId"), "pullZoneId": z.get("pullZoneId"), "wafEnabled": z.get("wafEnabled"), "dDoSShieldSensitivity": z.get("dDoSShieldSensitivity"), "dDoSExecutionMode": z.get("dDoSExecutionMode"), "learningMode": z.get("learningMode"), "learningModeUntil": z.get("learningModeUntil"), "planType": z.get("planType"), }) return json.dumps(result, indent=2) except Exception as e: logger.error("bunny_shield_list failed: %s", e) return f"Error: {e}" @mcp.tool() async def bunny_shield_status( shield_zone_id: Annotated[int, Field(description="Shield Zone ID (not Pull Zone ID). Use bunny_shield_list to find it.")], ) -> str: """Get Shield Zone configuration by Shield Zone ID.""" try: data = await client.get(f"/shield/shield-zone/{shield_zone_id}") return json.dumps(data, indent=2) except Exception as e: logger.error("bunny_shield_status failed: %s", e) return f"Error: {e}" @mcp.tool() async def bunny_shield_create( pullzone_id: Annotated[int, Field(description="Pull Zone ID to enable Shield on")], waf_enabled: Annotated[bool, Field(default=True, description="Enable WAF")] = True, ddos_sensitivity: Annotated[int, Field(default=2, description="DDoS sensitivity: 0=off, 1=low, 2=medium, 3=high, 4=extreme")] = 2, ) -> str: """Create a Shield Zone for a Pull Zone (enables Shield protection). Returns 409 if already exists.""" try: body = { "pullZoneId": pullzone_id, "wafEnabled": waf_enabled, "dDoSShieldSensitivity": ddos_sensitivity, } data = await client.post("/shield/shield-zone", json=body) return json.dumps(data, indent=2) except Exception as e: logger.error("bunny_shield_create failed: %s", e) return f"Error: {e}" @mcp.tool() async def bunny_shield_update( shield_zone_id: Annotated[int, Field(description="Shield Zone ID (not Pull Zone ID)")], settings: Annotated[str, Field(description="JSON string of shield settings (e.g. '{\"wafEnabled\": true, \"dDoSShieldSensitivity\": 2}')")], ) -> str: """Update Shield Zone configuration by Shield Zone ID.""" try: parsed = json.loads(settings) except json.JSONDecodeError as e: return f"Error: Invalid JSON: {e}" try: body = {"shieldZoneId": shield_zone_id, "shieldZone": parsed} data = await client.patch("/shield/shield-zone", json=body) return json.dumps(data, indent=2) if isinstance(data, dict) else f"Shield zone {shield_zone_id} updated" except Exception as e: logger.error("bunny_shield_update failed: %s", e) return f"Error: {e}" # -- WAF -- @mcp.tool() async def bunny_waf_rules( shield_zone_id: Annotated[int, Field(description="Shield Zone ID")], ) -> str: """List all managed WAF rules for a Shield Zone.""" try: data = await client.get(f"/shield/waf/{shield_zone_id}") return json.dumps(data, indent=2) except Exception as e: logger.error("bunny_waf_rules failed: %s", e) return f"Error: {e}" @mcp.tool() async def bunny_waf_custom_rules( shield_zone_id: Annotated[int, Field(description="Shield Zone ID")], ) -> str: """List custom WAF rules for a Shield Zone.""" try: data = await client.get(f"/shield/waf/custom-rules/{shield_zone_id}") return json.dumps(data, indent=2) except Exception as e: logger.error("bunny_waf_custom_rules failed: %s", e) return f"Error: {e}" @mcp.tool() async def bunny_waf_custom_rule_create( shield_zone_id: Annotated[int, Field(description="Shield Zone ID")], rule: Annotated[str, Field(description="JSON string of custom WAF rule definition")], ) -> str: """Create a custom WAF rule for a Shield Zone.""" try: parsed = json.loads(rule) parsed["shieldZoneId"] = shield_zone_id except json.JSONDecodeError as e: return f"Error: Invalid JSON: {e}" try: data = await client.post(f"/shield/waf/custom-rules/{shield_zone_id}", json=parsed) return json.dumps(data, indent=2) except Exception as e: logger.error("bunny_waf_custom_rule_create failed: %s", e) return f"Error: {e}" @mcp.tool() async def bunny_waf_profiles() -> str: """List available WAF profiles.""" try: data = await client.get("/shield/waf/profiles") return json.dumps(data, indent=2) except Exception as e: logger.error("bunny_waf_profiles failed: %s", e) return f"Error: {e}" # -- Rate Limiting -- @mcp.tool() async def bunny_rate_limits( shield_zone_id: Annotated[int, Field(description="Shield Zone ID")], ) -> str: """List rate limit rules for a Shield Zone.""" try: data = await client.get(f"/shield/rate-limits/{shield_zone_id}") return json.dumps(data, indent=2) except Exception as e: logger.error("bunny_rate_limits failed: %s", e) return f"Error: {e}" @mcp.tool() async def bunny_rate_limit_create( shield_zone_id: Annotated[int, Field(description="Shield Zone ID")], rule: Annotated[str, Field(description="JSON string of rate limit rule")], ) -> str: """Create a rate limit rule for a Shield Zone.""" try: parsed = json.loads(rule) parsed["shieldZoneId"] = shield_zone_id except json.JSONDecodeError as e: return f"Error: Invalid JSON: {e}" try: data = await client.post(f"/shield/rate-limits/{shield_zone_id}", json=parsed) return json.dumps(data, indent=2) except Exception as e: logger.error("bunny_rate_limit_create failed: %s", e) return f"Error: {e}" @mcp.tool() async def bunny_rate_limit_update( shield_zone_id: Annotated[int, Field(description="Shield Zone ID")], rule_id: Annotated[int, Field(description="Rate limit rule ID")], rule: Annotated[str, Field(description="JSON string of updated rule")], ) -> str: """Update a rate limit rule.""" try: parsed = json.loads(rule) except json.JSONDecodeError as e: return f"Error: Invalid JSON: {e}" try: data = await client.put(f"/shield/rate-limits/{shield_zone_id}/{rule_id}", json=parsed) return json.dumps(data, indent=2) if isinstance(data, dict) else f"Rate limit {rule_id} updated" except Exception as e: logger.error("bunny_rate_limit_update failed: %s", e) return f"Error: {e}" @mcp.tool() async def bunny_rate_limit_delete( shield_zone_id: Annotated[int, Field(description="Shield Zone ID")], rule_id: Annotated[int, Field(description="Rate limit rule ID to delete")], ) -> str: """Delete a rate limit rule.""" try: await client.delete(f"/shield/rate-limits/{shield_zone_id}/{rule_id}") return f"Rate limit rule {rule_id} deleted" except Exception as e: logger.error("bunny_rate_limit_delete failed: %s", e) return f"Error: {e}" # -- Event Logs -- @mcp.tool() async def bunny_waf_logs( shield_zone_id: Annotated[int, Field(description="Shield Zone ID")], page: Annotated[int, Field(default=1, description="Page number")] = 1, per_page: Annotated[int, Field(default=25, description="Results per page (max 100)")] = 25, ) -> str: """Get WAF event logs for a Shield Zone.""" try: params = {"page": page, "perPage": min(per_page, 100)} data = await client.get(f"/shield/event-logs/{shield_zone_id}", params=params) return json.dumps(data, indent=2) except Exception as e: logger.error("bunny_waf_logs failed: %s", e) return f"Error: {e}" # -- Metrics -- @mcp.tool() async def bunny_shield_metrics( shield_zone_id: Annotated[int, Field(description="Shield Zone ID")], ) -> str: """Get Shield metrics overview: WAF, DDoS, Rate Limit, Bot Detection, Access List (28-day summary).""" try: data = await client.get(f"/shield/metrics/overview/{shield_zone_id}") if isinstance(data, dict) and "data" in data: d = data["data"] summary = { "overview": d.get("overview"), "waf": d.get("waf"), "dDoS": d.get("dDoS"), "ratelimit": d.get("ratelimit"), "botDetection": d.get("botDetection"), "accessList": d.get("accessList"), "uploadScanning": d.get("uploadScanning"), "totalBillableRequests": d.get("totalBillableRequests"), "totalCleanRequestsLimit": d.get("totalCleanRequestsLimit"), } return json.dumps(summary, indent=2) return json.dumps(data, indent=2) except Exception as e: logger.error("bunny_shield_metrics failed: %s", e) return f"Error: {e}" # -- Bot Detection (Advanced plan required) -- @mcp.tool() async def bunny_bot_detection_get( shield_zone_id: Annotated[int, Field(description="Shield Zone ID")], ) -> str: """Get Bot Detection configuration for a Shield Zone. Requires Advanced plan ($9.50/month).""" try: data = await client.get(f"/shield/bot-detection/{shield_zone_id}/configuration") return json.dumps(data, indent=2) except Exception as e: logger.error("bunny_bot_detection_get failed: %s", e) return f"Error: {e}" @mcp.tool() async def bunny_bot_detection_update( shield_zone_id: Annotated[int, Field(description="Shield Zone ID")], settings: Annotated[str, Field(description="JSON string of bot detection settings")], ) -> str: """Update Bot Detection configuration. Requires Advanced plan ($9.50/month).""" try: parsed = json.loads(settings) except json.JSONDecodeError as e: return f"Error: Invalid JSON: {e}" try: data = await client.put(f"/shield/bot-detection/{shield_zone_id}/configuration", json=parsed) return json.dumps(data, indent=2) if isinstance(data, dict) else f"Bot detection updated for zone {shield_zone_id}" except Exception as e: logger.error("bunny_bot_detection_update failed: %s", e) return f"Error: {e}" # -- Access Lists (Advanced plan required) -- @mcp.tool() async def bunny_access_lists( shield_zone_id: Annotated[int, Field(description="Shield Zone ID")], ) -> str: """List Access Lists for a Shield Zone. Requires Advanced plan ($9.50/month).""" try: data = await client.get(f"/shield/access-lists/{shield_zone_id}") return json.dumps(data, indent=2) except Exception as e: logger.error("bunny_access_lists failed: %s", e) return f"Error: {e}" @mcp.tool() async def bunny_access_list_create( shield_zone_id: Annotated[int, Field(description="Shield Zone ID")], rule: Annotated[str, Field(description="JSON string of access list rule (IPs, CIDRs, ASNs, or countries with allow/block/challenge/log action)")], ) -> str: """Create an Access List rule. Requires Advanced plan ($9.50/month).""" try: parsed = json.loads(rule) parsed["shieldZoneId"] = shield_zone_id except json.JSONDecodeError as e: return f"Error: Invalid JSON: {e}" try: data = await client.post(f"/shield/access-lists/{shield_zone_id}", json=parsed) return json.dumps(data, indent=2) except Exception as e: logger.error("bunny_access_list_create failed: %s", e) return f"Error: {e}" @mcp.tool() async def bunny_access_list_update( shield_zone_id: Annotated[int, Field(description="Shield Zone ID")], rule_id: Annotated[int, Field(description="Access list rule ID")], rule: Annotated[str, Field(description="JSON string of updated access list rule")], ) -> str: """Update an Access List rule. Requires Advanced plan ($9.50/month).""" try: parsed = json.loads(rule) except json.JSONDecodeError as e: return f"Error: Invalid JSON: {e}" try: data = await client.put(f"/shield/access-lists/{shield_zone_id}/{rule_id}", json=parsed) return json.dumps(data, indent=2) if isinstance(data, dict) else f"Access list {rule_id} updated" except Exception as e: logger.error("bunny_access_list_update failed: %s", e) return f"Error: {e}" @mcp.tool() async def bunny_access_list_delete( shield_zone_id: Annotated[int, Field(description="Shield Zone ID")], rule_id: Annotated[int, Field(description="Access list rule ID to delete")], ) -> str: """Delete an Access List rule. Requires Advanced plan ($9.50/month).""" try: await client.delete(f"/shield/access-lists/{shield_zone_id}/{rule_id}") return f"Access list rule {rule_id} deleted" except Exception as e: logger.error("bunny_access_list_delete failed: %s", e) return f"Error: {e}" # -- DDoS Enums -- @mcp.tool() async def bunny_ddos_enums() -> str: """List DDoS sensitivity and execution mode enum values.""" try: data = await client.get("/shield/ddos/enums") return json.dumps(data, indent=2) except Exception as e: logger.error("bunny_ddos_enums failed: %s", e) return f"Error: {e}"