Files
vultr-api/server/routers/block_storage.py
HWANG BYUNGHA 184054c6c1 Initial commit: Vultr API v2 Python wrapper with FastAPI server
- vultr_api/: Python library wrapping Vultr API v2
  - 17 resource modules (instances, dns, firewall, vpc, etc.)
  - Pagination support, error handling

- server/: FastAPI REST server
  - All API endpoints exposed via HTTP
  - X-API-Key header authentication
  - Swagger docs at /docs

- Podman quadlet config for systemd deployment

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 01:08:17 +09:00

79 lines
2.5 KiB
Python

"""Block Storage router"""
from fastapi import APIRouter, Depends, Query
from typing import Optional
from pydantic import BaseModel
from vultr_api import VultrClient
from deps import get_client
router = APIRouter()
class CreateBlockStorageRequest(BaseModel):
region: str
size_gb: int
label: Optional[str] = None
block_type: Optional[str] = None
class UpdateBlockStorageRequest(BaseModel):
size_gb: Optional[int] = None
label: Optional[str] = None
class AttachBlockStorageRequest(BaseModel):
instance_id: str
live: Optional[bool] = True
@router.get("")
async def list_block_storage(
per_page: int = Query(25, le=500),
cursor: Optional[str] = None,
client: VultrClient = Depends(get_client)
):
"""List all block storage volumes"""
return client.block_storage.list(per_page=per_page, cursor=cursor)
@router.get("/all")
async def list_all_block_storage(client: VultrClient = Depends(get_client)):
"""List all block storage volumes (auto-paginated)"""
return {"blocks": client.block_storage.list_all()}
@router.get("/{block_id}")
async def get_block_storage(block_id: str, client: VultrClient = Depends(get_client)):
"""Get block storage details"""
return client.block_storage.get(block_id)
@router.post("")
async def create_block_storage(req: CreateBlockStorageRequest, client: VultrClient = Depends(get_client)):
"""Create a block storage volume"""
return client.block_storage.create(**req.model_dump(exclude_none=True))
@router.patch("/{block_id}")
async def update_block_storage(block_id: str, req: UpdateBlockStorageRequest, client: VultrClient = Depends(get_client)):
"""Update a block storage volume"""
return client.block_storage.update(block_id, **req.model_dump(exclude_none=True))
@router.delete("/{block_id}")
async def delete_block_storage(block_id: str, client: VultrClient = Depends(get_client)):
"""Delete a block storage volume"""
return client.block_storage.delete(block_id)
@router.post("/{block_id}/attach")
async def attach_block_storage(block_id: str, req: AttachBlockStorageRequest, client: VultrClient = Depends(get_client)):
"""Attach block storage to an instance"""
return client.block_storage.attach(block_id, instance_id=req.instance_id, live=req.live)
@router.post("/{block_id}/detach")
async def detach_block_storage(block_id: str, live: bool = True, client: VultrClient = Depends(get_client)):
"""Detach block storage from an instance"""
return client.block_storage.detach(block_id, live=live)