"""WAF / DDoS Shield tools.""" import json from typing import Annotated from pydantic import Field from ..client import client from ..config import logger def register_shield_tools(mcp): @mcp.tool() async def bunny_shield_status( pullzone_id: Annotated[int, Field(description="Pull Zone ID")], ) -> str: """Get the Shield/WAF/DDoS protection status for a Pull Zone.""" try: zone = await client.get(f"/pullzone/{pullzone_id}") status = { "PullZoneId": zone.get("Id"), "Name": zone.get("Name"), "WAFEnabled": zone.get("WAFEnabled"), "WAFEnabledRuleGroups": zone.get("WAFEnabledRuleGroups"), "WAFDisabledRules": zone.get("WAFDisabledRules"), "ShieldDDosProtectionEnabled": zone.get("ShieldDDosProtectionEnabled"), "ShieldDDosProtectionType": zone.get("ShieldDDosProtectionType"), "OriginShieldEnabled": zone.get("OriginShieldEnabled"), "OriginShieldZoneCode": zone.get("OriginShieldZoneCode"), } return json.dumps(status, indent=2) except Exception as e: logger.error("bunny_shield_status failed: %s", e) return f"Error: {e}" @mcp.tool() async def bunny_waf_rules( pullzone_id: Annotated[int, Field(description="Pull Zone ID")], ) -> str: """List WAF rule groups and their configuration for a Pull Zone.""" try: data = await client.get(f"/shield/waf/{pullzone_id}") return json.dumps(data, indent=2) except Exception as e: logger.error("bunny_waf_rules failed: %s", e) return f"Error: {e}" @mcp.tool() async def bunny_shield_update( pullzone_id: Annotated[int, Field(description="Pull Zone ID")], settings: Annotated[str, Field(description="JSON string of shield settings to update (e.g. '{\"ShieldDDosProtectionEnabled\": true, \"WAFEnabled\": true}')")], ) -> str: """Update DDoS/WAF/Shield settings for a Pull Zone.""" try: parsed = json.loads(settings) except json.JSONDecodeError as e: return f"Error: Invalid JSON in settings: {e}" try: data = await client.post(f"/pullzone/{pullzone_id}", json=parsed) return json.dumps(data, indent=2) if isinstance(data, dict) else f"Shield settings updated for Pull Zone {pullzone_id}" except Exception as e: logger.error("bunny_shield_update failed: %s", e) return f"Error: {e}" @mcp.tool() async def bunny_waf_logs( pullzone_id: Annotated[int, Field(description="Pull Zone ID")], page: Annotated[int, Field(default=1, description="Page number")] = 1, per_page: Annotated[int, Field(default=25, description="Results per page (max 100)")] = 25, ) -> str: """Get WAF event logs for a Pull Zone.""" try: params = {"page": page, "perPage": min(per_page, 100)} data = await client.get(f"/shield/waf/{pullzone_id}/logs", params=params) return json.dumps(data, indent=2) except Exception as e: logger.error("bunny_waf_logs failed: %s", e) return f"Error: {e}"