"""Firewall router""" from fastapi import APIRouter, Depends, Query from typing import Optional from pydantic import BaseModel from vultr_api import VultrClient from deps import get_client router = APIRouter() class CreateGroupRequest(BaseModel): description: Optional[str] = None class CreateRuleRequest(BaseModel): ip_type: str # v4 or v6 protocol: str # tcp, udp, icmp, gre, esp, ah subnet: str subnet_size: int port: Optional[str] = None source: Optional[str] = None notes: Optional[str] = None @router.get("") async def list_groups( per_page: int = Query(25, le=500), cursor: Optional[str] = None, client: VultrClient = Depends(get_client) ): """List all firewall groups""" return client.firewall.list_groups(per_page=per_page, cursor=cursor) @router.get("/all") async def list_all_groups(client: VultrClient = Depends(get_client)): """List all firewall groups (auto-paginated)""" return {"firewall_groups": client.firewall.list_all_groups()} @router.get("/{group_id}") async def get_group(group_id: str, client: VultrClient = Depends(get_client)): """Get firewall group details""" return client.firewall.get_group(group_id) @router.post("") async def create_group(req: CreateGroupRequest, client: VultrClient = Depends(get_client)): """Create a firewall group""" return client.firewall.create_group(description=req.description) @router.patch("/{group_id}") async def update_group(group_id: str, description: str, client: VultrClient = Depends(get_client)): """Update a firewall group""" return client.firewall.update_group(group_id, description=description) @router.delete("/{group_id}") async def delete_group(group_id: str, client: VultrClient = Depends(get_client)): """Delete a firewall group""" return client.firewall.delete_group(group_id) # Rules @router.get("/{group_id}/rules") async def list_rules( group_id: str, per_page: int = Query(25, le=500), cursor: Optional[str] = None, client: VultrClient = Depends(get_client) ): """List firewall rules for a group""" return client.firewall.list_rules(group_id, per_page=per_page, cursor=cursor) @router.get("/{group_id}/rules/all") async def list_all_rules(group_id: str, client: VultrClient = Depends(get_client)): """List all firewall rules (auto-paginated)""" return {"firewall_rules": client.firewall.list_all_rules(group_id)} @router.get("/{group_id}/rules/{rule_id}") async def get_rule(group_id: str, rule_id: int, client: VultrClient = Depends(get_client)): """Get a firewall rule""" return client.firewall.get_rule(group_id, rule_id) @router.post("/{group_id}/rules") async def create_rule(group_id: str, req: CreateRuleRequest, client: VultrClient = Depends(get_client)): """Create a firewall rule""" return client.firewall.create_rule(group_id, **req.model_dump(exclude_none=True)) @router.delete("/{group_id}/rules/{rule_id}") async def delete_rule(group_id: str, rule_id: int, client: VultrClient = Depends(get_client)): """Delete a firewall rule""" return client.firewall.delete_rule(group_id, rule_id) # Convenience endpoints @router.post("/{group_id}/rules/allow-ssh") async def allow_ssh(group_id: str, source: str = "0.0.0.0/0", client: VultrClient = Depends(get_client)): """Allow SSH (port 22) from source""" return client.firewall.allow_ssh(group_id, source=source) @router.post("/{group_id}/rules/allow-http") async def allow_http(group_id: str, source: str = "0.0.0.0/0", client: VultrClient = Depends(get_client)): """Allow HTTP (port 80) from source""" return client.firewall.allow_http(group_id, source=source) @router.post("/{group_id}/rules/allow-https") async def allow_https(group_id: str, source: str = "0.0.0.0/0", client: VultrClient = Depends(get_client)): """Allow HTTPS (port 443) from source""" return client.firewall.allow_https(group_id, source=source) @router.post("/{group_id}/rules/allow-ping") async def allow_ping(group_id: str, source: str = "0.0.0.0/0", client: VultrClient = Depends(get_client)): """Allow ICMP ping from source""" return client.firewall.allow_ping(group_id, source=source)