Files
HWANG BYUNGHA b807b9d267 Make API endpoints compatible with Vultr API v2 format
- Change auth from X-API-Key header to Authorization: Bearer format
- Add /v2 prefix to all endpoints to match Vultr API URL structure
- Fix router paths (dns, firewall) to avoid duplicate path segments
- Split VPC 2.0 into separate router at /v2/vpc2

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 17:03:19 +09:00

156 lines
4.9 KiB
Python

"""DNS router"""
from fastapi import APIRouter, Depends, Query
from typing import Optional
from pydantic import BaseModel
from vultr_api import VultrClient
from deps import get_client
router = APIRouter()
class CreateDomainRequest(BaseModel):
domain: str
ip: Optional[str] = None
class CreateRecordRequest(BaseModel):
type: str
name: str
data: str
ttl: Optional[int] = 300
priority: Optional[int] = None
class UpdateRecordRequest(BaseModel):
name: Optional[str] = None
data: Optional[str] = None
ttl: Optional[int] = None
priority: Optional[int] = None
@router.get("")
async def list_domains(
per_page: int = Query(25, le=500),
cursor: Optional[str] = None,
client: VultrClient = Depends(get_client)
):
"""List all DNS domains"""
return client.dns.list_domains(per_page=per_page, cursor=cursor)
@router.get("/all")
async def list_all_domains(client: VultrClient = Depends(get_client)):
"""List all DNS domains (auto-paginated)"""
return {"domains": client.dns.list_all_domains()}
@router.get("/{domain}")
async def get_domain(domain: str, client: VultrClient = Depends(get_client)):
"""Get DNS domain details"""
return client.dns.get_domain(domain)
@router.post("")
async def create_domain(req: CreateDomainRequest, client: VultrClient = Depends(get_client)):
"""Create a DNS domain"""
return client.dns.create_domain(req.domain, ip=req.ip)
@router.delete("/{domain}")
async def delete_domain(domain: str, client: VultrClient = Depends(get_client)):
"""Delete a DNS domain"""
return client.dns.delete_domain(domain)
@router.get("/{domain}/soa")
async def get_soa(domain: str, client: VultrClient = Depends(get_client)):
"""Get SOA record for a domain"""
return client.dns.get_soa(domain)
@router.get("/{domain}/dnssec")
async def get_dnssec(domain: str, client: VultrClient = Depends(get_client)):
"""Get DNSSEC info for a domain"""
return client.dns.get_dnssec(domain)
# Records
@router.get("/{domain}/records")
async def list_records(
domain: str,
per_page: int = Query(25, le=500),
cursor: Optional[str] = None,
client: VultrClient = Depends(get_client)
):
"""List DNS records for a domain"""
return client.dns.list_records(domain, per_page=per_page, cursor=cursor)
@router.get("/{domain}/records/all")
async def list_all_records(domain: str, client: VultrClient = Depends(get_client)):
"""List all DNS records (auto-paginated)"""
return {"records": client.dns.list_all_records(domain)}
@router.get("/{domain}/records/{record_id}")
async def get_record(domain: str, record_id: str, client: VultrClient = Depends(get_client)):
"""Get a DNS record"""
return client.dns.get_record(domain, record_id)
@router.post("/{domain}/records")
async def create_record(domain: str, req: CreateRecordRequest, client: VultrClient = Depends(get_client)):
"""Create a DNS record"""
return client.dns.create_record(
domain,
type=req.type,
name=req.name,
data=req.data,
ttl=req.ttl,
priority=req.priority
)
@router.patch("/{domain}/records/{record_id}")
async def update_record(domain: str, record_id: str, req: UpdateRecordRequest, client: VultrClient = Depends(get_client)):
"""Update a DNS record"""
return client.dns.update_record(domain, record_id, **req.model_dump(exclude_none=True))
@router.delete("/{domain}/records/{record_id}")
async def delete_record(domain: str, record_id: str, client: VultrClient = Depends(get_client)):
"""Delete a DNS record"""
return client.dns.delete_record(domain, record_id)
# Convenience endpoints
@router.post("/{domain}/records/a")
async def create_a_record(domain: str, name: str, ip: str, ttl: int = 300, client: VultrClient = Depends(get_client)):
"""Create an A record"""
return client.dns.create_a_record(domain, name, ip, ttl)
@router.post("/{domain}/records/aaaa")
async def create_aaaa_record(domain: str, name: str, ip: str, ttl: int = 300, client: VultrClient = Depends(get_client)):
"""Create an AAAA record"""
return client.dns.create_aaaa_record(domain, name, ip, ttl)
@router.post("/{domain}/records/cname")
async def create_cname_record(domain: str, name: str, target: str, ttl: int = 300, client: VultrClient = Depends(get_client)):
"""Create a CNAME record"""
return client.dns.create_cname_record(domain, name, target, ttl)
@router.post("/{domain}/records/txt")
async def create_txt_record(domain: str, name: str, data: str, ttl: int = 300, client: VultrClient = Depends(get_client)):
"""Create a TXT record"""
return client.dns.create_txt_record(domain, name, data, ttl)
@router.post("/{domain}/records/mx")
async def create_mx_record(domain: str, name: str, data: str, priority: int = 10, ttl: int = 300, client: VultrClient = Depends(get_client)):
"""Create an MX record"""
return client.dns.create_mx_record(domain, name, data, priority, ttl)