Files
bunnycdn-mcp/bunnycdn_mcp/tools/pullzone.py
kappa 7269686179 Initial commit: BunnyCDN MCP server with K8s deployment
- FastMCP server with 12 tools (pullzone, cache, statistics, shield)
- Dockerfile with multi-stage build (python:3.11-slim + uv)
- K8s manifests (Deployment + Service)
- Gitea Actions CI/CD pipeline (build → push → deploy)
2026-02-15 15:19:42 +09:00

92 lines
3.5 KiB
Python

"""Pull Zone management tools."""
import json
from typing import Annotated
from pydantic import Field
from ..client import client
from ..config import logger
def register_pullzone_tools(mcp):
@mcp.tool()
async def bunny_list_pullzones() -> str:
"""List all Pull Zones in the account."""
try:
data = await client.get("/pullzone")
zones = []
for z in data:
zones.append({
"Id": z.get("Id"),
"Name": z.get("Name"),
"OriginUrl": z.get("OriginUrl"),
"Enabled": z.get("Enabled"),
"Hostnames": [h.get("Value") for h in z.get("Hostnames", [])],
"MonthlyBandwidthUsed": z.get("MonthlyBandwidthUsed"),
"CacheQuality": z.get("CacheQuality"),
})
return json.dumps(zones, indent=2)
except Exception as e:
logger.error("bunny_list_pullzones failed: %s", e)
return f"Error: {e}"
@mcp.tool()
async def bunny_get_pullzone(
pullzone_id: Annotated[int, Field(description="Pull Zone ID")],
) -> str:
"""Get detailed information for a specific Pull Zone."""
try:
data = await client.get(f"/pullzone/{pullzone_id}")
return json.dumps(data, indent=2)
except Exception as e:
logger.error("bunny_get_pullzone failed: %s", e)
return f"Error: {e}"
@mcp.tool()
async def bunny_update_pullzone(
pullzone_id: Annotated[int, Field(description="Pull Zone ID")],
settings: Annotated[str, Field(description="JSON string of settings to update (e.g. '{\"OriginUrl\": \"https://new-origin.com\", \"EnableGeoZoneUS\": true}')")],
) -> str:
"""Update Pull Zone settings. Pass settings as a JSON string."""
try:
parsed = json.loads(settings)
except json.JSONDecodeError as e:
return f"Error: Invalid JSON in settings: {e}"
try:
data = await client.post(f"/pullzone/{pullzone_id}", json=parsed)
return json.dumps(data, indent=2) if isinstance(data, dict) else str(data)
except Exception as e:
logger.error("bunny_update_pullzone failed: %s", e)
return f"Error: {e}"
@mcp.tool()
async def bunny_add_hostname(
pullzone_id: Annotated[int, Field(description="Pull Zone ID")],
hostname: Annotated[str, Field(description="Custom hostname to add (e.g. cdn.example.com)")],
) -> str:
"""Add a custom hostname to a Pull Zone."""
try:
await client.post("/pullzone/addHostname", json={
"PullZoneId": pullzone_id,
"Hostname": hostname,
})
return f"Hostname '{hostname}' added to Pull Zone {pullzone_id}"
except Exception as e:
logger.error("bunny_add_hostname failed: %s", e)
return f"Error: {e}"
@mcp.tool()
async def bunny_remove_hostname(
pullzone_id: Annotated[int, Field(description="Pull Zone ID")],
hostname: Annotated[str, Field(description="Hostname to remove")],
) -> str:
"""Remove a hostname from a Pull Zone."""
try:
await client.delete(f"/pullzone/deleteHostname?id={pullzone_id}&hostname={hostname}")
return f"Hostname '{hostname}' removed from Pull Zone {pullzone_id}"
except Exception as e:
logger.error("bunny_remove_hostname failed: %s", e)
return f"Error: {e}"