- FastMCP server with 12 tools (pullzone, cache, statistics, shield) - Dockerfile with multi-stage build (python:3.11-slim + uv) - K8s manifests (Deployment + Service) - Gitea Actions CI/CD pipeline (build → push → deploy)
80 lines
3.2 KiB
Python
80 lines
3.2 KiB
Python
"""WAF / DDoS Shield tools."""
|
|
|
|
import json
|
|
from typing import Annotated
|
|
|
|
from pydantic import Field
|
|
|
|
from ..client import client
|
|
from ..config import logger
|
|
|
|
|
|
def register_shield_tools(mcp):
|
|
|
|
@mcp.tool()
|
|
async def bunny_shield_status(
|
|
pullzone_id: Annotated[int, Field(description="Pull Zone ID")],
|
|
) -> str:
|
|
"""Get the Shield/WAF/DDoS protection status for a Pull Zone."""
|
|
try:
|
|
zone = await client.get(f"/pullzone/{pullzone_id}")
|
|
status = {
|
|
"PullZoneId": zone.get("Id"),
|
|
"Name": zone.get("Name"),
|
|
"WAFEnabled": zone.get("WAFEnabled"),
|
|
"WAFEnabledRuleGroups": zone.get("WAFEnabledRuleGroups"),
|
|
"WAFDisabledRules": zone.get("WAFDisabledRules"),
|
|
"ShieldDDosProtectionEnabled": zone.get("ShieldDDosProtectionEnabled"),
|
|
"ShieldDDosProtectionType": zone.get("ShieldDDosProtectionType"),
|
|
"OriginShieldEnabled": zone.get("OriginShieldEnabled"),
|
|
"OriginShieldZoneCode": zone.get("OriginShieldZoneCode"),
|
|
}
|
|
return json.dumps(status, indent=2)
|
|
except Exception as e:
|
|
logger.error("bunny_shield_status failed: %s", e)
|
|
return f"Error: {e}"
|
|
|
|
@mcp.tool()
|
|
async def bunny_waf_rules(
|
|
pullzone_id: Annotated[int, Field(description="Pull Zone ID")],
|
|
) -> str:
|
|
"""List WAF rule groups and their configuration for a Pull Zone."""
|
|
try:
|
|
data = await client.get(f"/shield/waf/{pullzone_id}")
|
|
return json.dumps(data, indent=2)
|
|
except Exception as e:
|
|
logger.error("bunny_waf_rules failed: %s", e)
|
|
return f"Error: {e}"
|
|
|
|
@mcp.tool()
|
|
async def bunny_shield_update(
|
|
pullzone_id: Annotated[int, Field(description="Pull Zone ID")],
|
|
settings: Annotated[str, Field(description="JSON string of shield settings to update (e.g. '{\"ShieldDDosProtectionEnabled\": true, \"WAFEnabled\": true}')")],
|
|
) -> str:
|
|
"""Update DDoS/WAF/Shield settings for a Pull Zone."""
|
|
try:
|
|
parsed = json.loads(settings)
|
|
except json.JSONDecodeError as e:
|
|
return f"Error: Invalid JSON in settings: {e}"
|
|
try:
|
|
data = await client.post(f"/pullzone/{pullzone_id}", json=parsed)
|
|
return json.dumps(data, indent=2) if isinstance(data, dict) else f"Shield settings updated for Pull Zone {pullzone_id}"
|
|
except Exception as e:
|
|
logger.error("bunny_shield_update failed: %s", e)
|
|
return f"Error: {e}"
|
|
|
|
@mcp.tool()
|
|
async def bunny_waf_logs(
|
|
pullzone_id: Annotated[int, Field(description="Pull Zone ID")],
|
|
page: Annotated[int, Field(default=1, description="Page number")] = 1,
|
|
per_page: Annotated[int, Field(default=25, description="Results per page (max 100)")] = 25,
|
|
) -> str:
|
|
"""Get WAF event logs for a Pull Zone."""
|
|
try:
|
|
params = {"page": page, "perPage": min(per_page, 100)}
|
|
data = await client.get(f"/shield/waf/{pullzone_id}/logs", params=params)
|
|
return json.dumps(data, indent=2)
|
|
except Exception as e:
|
|
logger.error("bunny_waf_logs failed: %s", e)
|
|
return f"Error: {e}"
|