- Change auth from X-API-Key header to Authorization: Bearer format - Add /v2 prefix to all endpoints to match Vultr API URL structure - Fix router paths (dns, firewall) to avoid duplicate path segments - Split VPC 2.0 into separate router at /v2/vpc2 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
125 lines
4.1 KiB
Python
125 lines
4.1 KiB
Python
"""Firewall router"""
|
|
from fastapi import APIRouter, Depends, Query
|
|
from typing import Optional
|
|
from pydantic import BaseModel
|
|
|
|
from vultr_api import VultrClient
|
|
from deps import get_client
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class CreateGroupRequest(BaseModel):
|
|
description: Optional[str] = None
|
|
|
|
|
|
class CreateRuleRequest(BaseModel):
|
|
ip_type: str # v4 or v6
|
|
protocol: str # tcp, udp, icmp, gre, esp, ah
|
|
subnet: str
|
|
subnet_size: int
|
|
port: Optional[str] = None
|
|
source: Optional[str] = None
|
|
notes: Optional[str] = None
|
|
|
|
|
|
@router.get("")
|
|
async def list_groups(
|
|
per_page: int = Query(25, le=500),
|
|
cursor: Optional[str] = None,
|
|
client: VultrClient = Depends(get_client)
|
|
):
|
|
"""List all firewall groups"""
|
|
return client.firewall.list_groups(per_page=per_page, cursor=cursor)
|
|
|
|
|
|
@router.get("/all")
|
|
async def list_all_groups(client: VultrClient = Depends(get_client)):
|
|
"""List all firewall groups (auto-paginated)"""
|
|
return {"firewall_groups": client.firewall.list_all_groups()}
|
|
|
|
|
|
@router.get("/{group_id}")
|
|
async def get_group(group_id: str, client: VultrClient = Depends(get_client)):
|
|
"""Get firewall group details"""
|
|
return client.firewall.get_group(group_id)
|
|
|
|
|
|
@router.post("")
|
|
async def create_group(req: CreateGroupRequest, client: VultrClient = Depends(get_client)):
|
|
"""Create a firewall group"""
|
|
return client.firewall.create_group(description=req.description)
|
|
|
|
|
|
@router.patch("/{group_id}")
|
|
async def update_group(group_id: str, description: str, client: VultrClient = Depends(get_client)):
|
|
"""Update a firewall group"""
|
|
return client.firewall.update_group(group_id, description=description)
|
|
|
|
|
|
@router.delete("/{group_id}")
|
|
async def delete_group(group_id: str, client: VultrClient = Depends(get_client)):
|
|
"""Delete a firewall group"""
|
|
return client.firewall.delete_group(group_id)
|
|
|
|
|
|
# Rules
|
|
@router.get("/{group_id}/rules")
|
|
async def list_rules(
|
|
group_id: str,
|
|
per_page: int = Query(25, le=500),
|
|
cursor: Optional[str] = None,
|
|
client: VultrClient = Depends(get_client)
|
|
):
|
|
"""List firewall rules for a group"""
|
|
return client.firewall.list_rules(group_id, per_page=per_page, cursor=cursor)
|
|
|
|
|
|
@router.get("/{group_id}/rules/all")
|
|
async def list_all_rules(group_id: str, client: VultrClient = Depends(get_client)):
|
|
"""List all firewall rules (auto-paginated)"""
|
|
return {"firewall_rules": client.firewall.list_all_rules(group_id)}
|
|
|
|
|
|
@router.get("/{group_id}/rules/{rule_id}")
|
|
async def get_rule(group_id: str, rule_id: int, client: VultrClient = Depends(get_client)):
|
|
"""Get a firewall rule"""
|
|
return client.firewall.get_rule(group_id, rule_id)
|
|
|
|
|
|
@router.post("/{group_id}/rules")
|
|
async def create_rule(group_id: str, req: CreateRuleRequest, client: VultrClient = Depends(get_client)):
|
|
"""Create a firewall rule"""
|
|
return client.firewall.create_rule(group_id, **req.model_dump(exclude_none=True))
|
|
|
|
|
|
@router.delete("/{group_id}/rules/{rule_id}")
|
|
async def delete_rule(group_id: str, rule_id: int, client: VultrClient = Depends(get_client)):
|
|
"""Delete a firewall rule"""
|
|
return client.firewall.delete_rule(group_id, rule_id)
|
|
|
|
|
|
# Convenience endpoints
|
|
@router.post("/{group_id}/rules/allow-ssh")
|
|
async def allow_ssh(group_id: str, source: str = "0.0.0.0/0", client: VultrClient = Depends(get_client)):
|
|
"""Allow SSH (port 22) from source"""
|
|
return client.firewall.allow_ssh(group_id, source=source)
|
|
|
|
|
|
@router.post("/{group_id}/rules/allow-http")
|
|
async def allow_http(group_id: str, source: str = "0.0.0.0/0", client: VultrClient = Depends(get_client)):
|
|
"""Allow HTTP (port 80) from source"""
|
|
return client.firewall.allow_http(group_id, source=source)
|
|
|
|
|
|
@router.post("/{group_id}/rules/allow-https")
|
|
async def allow_https(group_id: str, source: str = "0.0.0.0/0", client: VultrClient = Depends(get_client)):
|
|
"""Allow HTTPS (port 443) from source"""
|
|
return client.firewall.allow_https(group_id, source=source)
|
|
|
|
|
|
@router.post("/{group_id}/rules/allow-ping")
|
|
async def allow_ping(group_id: str, source: str = "0.0.0.0/0", client: VultrClient = Depends(get_client)):
|
|
"""Allow ICMP ping from source"""
|
|
return client.firewall.allow_ping(group_id, source=source)
|