Initial commit: Vultr API v2 Python wrapper with FastAPI server

- vultr_api/: Python library wrapping Vultr API v2
  - 17 resource modules (instances, dns, firewall, vpc, etc.)
  - Pagination support, error handling

- server/: FastAPI REST server
  - All API endpoints exposed via HTTP
  - X-API-Key header authentication
  - Swagger docs at /docs

- Podman quadlet config for systemd deployment

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
HWANG BYUNGHA
2026-01-22 01:08:17 +09:00
commit 184054c6c1
48 changed files with 6058 additions and 0 deletions

8
vultr_api/__init__.py Normal file
View File

@@ -0,0 +1,8 @@
"""
Vultr API v2 Python Wrapper
"""
from .client import VultrClient
__version__ = "1.0.0"
__all__ = ["VultrClient"]

214
vultr_api/client.py Normal file
View File

@@ -0,0 +1,214 @@
"""
Vultr API v2 Client
"""
import requests
from typing import Optional, Dict, Any, List
from urllib.parse import urljoin
from .resources.account import AccountResource
from .resources.instances import InstancesResource
from .resources.dns import DNSResource
from .resources.firewall import FirewallResource
from .resources.ssh_keys import SSHKeysResource
from .resources.startup_scripts import StartupScriptsResource
from .resources.snapshots import SnapshotsResource
from .resources.block_storage import BlockStorageResource
from .resources.reserved_ips import ReservedIPsResource
from .resources.vpc import VPCResource
from .resources.load_balancers import LoadBalancersResource
from .resources.bare_metal import BareMetalResource
from .resources.plans import PlansResource
from .resources.regions import RegionsResource
from .resources.os import OSResource
from .resources.backups import BackupsResource
class VultrAPIError(Exception):
"""Vultr API Error"""
def __init__(self, message: str, status_code: int = None, response: dict = None):
self.message = message
self.status_code = status_code
self.response = response
super().__init__(self.message)
class VultrClient:
"""
Vultr API v2 Client
Usage:
client = VultrClient(api_key="your-api-key")
# Get account info
account = client.account.get()
# List instances
instances = client.instances.list()
# Create instance
instance = client.instances.create(
region="ewr",
plan="vc2-1c-1gb",
os_id=215
)
"""
BASE_URL = "https://api.vultr.com/v2/"
def __init__(self, api_key: str, timeout: int = 30):
"""
Initialize Vultr API client
Args:
api_key: Vultr API key (get from https://my.vultr.com/settings/#settingsapi)
timeout: Request timeout in seconds
"""
self.api_key = api_key
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
})
# Initialize resources
self.account = AccountResource(self)
self.instances = InstancesResource(self)
self.dns = DNSResource(self)
self.firewall = FirewallResource(self)
self.ssh_keys = SSHKeysResource(self)
self.startup_scripts = StartupScriptsResource(self)
self.snapshots = SnapshotsResource(self)
self.block_storage = BlockStorageResource(self)
self.reserved_ips = ReservedIPsResource(self)
self.vpc = VPCResource(self)
self.load_balancers = LoadBalancersResource(self)
self.bare_metal = BareMetalResource(self)
self.plans = PlansResource(self)
self.regions = RegionsResource(self)
self.os = OSResource(self)
self.backups = BackupsResource(self)
def _request(
self,
method: str,
endpoint: str,
params: Optional[Dict] = None,
data: Optional[Dict] = None,
) -> Optional[Dict[str, Any]]:
"""
Make API request
Args:
method: HTTP method (GET, POST, PATCH, DELETE)
endpoint: API endpoint (e.g., "instances")
params: Query parameters
data: Request body data
Returns:
Response JSON or None for 204 responses
"""
url = urljoin(self.BASE_URL, endpoint)
try:
response = self.session.request(
method=method,
url=url,
params=params,
json=data,
timeout=self.timeout,
)
if response.status_code == 204:
return None
if response.status_code >= 400:
error_data = None
try:
error_data = response.json()
except:
pass
error_msg = f"API Error: {response.status_code}"
if error_data and "error" in error_data:
error_msg = error_data["error"]
raise VultrAPIError(
message=error_msg,
status_code=response.status_code,
response=error_data,
)
if response.text:
return response.json()
return None
except requests.RequestException as e:
raise VultrAPIError(f"Request failed: {str(e)}")
def get(self, endpoint: str, params: Optional[Dict] = None) -> Optional[Dict]:
"""GET request"""
return self._request("GET", endpoint, params=params)
def post(self, endpoint: str, data: Optional[Dict] = None) -> Optional[Dict]:
"""POST request"""
return self._request("POST", endpoint, data=data)
def patch(self, endpoint: str, data: Optional[Dict] = None) -> Optional[Dict]:
"""PATCH request"""
return self._request("PATCH", endpoint, data=data)
def put(self, endpoint: str, data: Optional[Dict] = None) -> Optional[Dict]:
"""PUT request"""
return self._request("PUT", endpoint, data=data)
def delete(self, endpoint: str) -> None:
"""DELETE request"""
self._request("DELETE", endpoint)
def paginate(
self,
endpoint: str,
key: str,
params: Optional[Dict] = None,
per_page: int = 100,
) -> List[Dict]:
"""
Paginate through all results
Args:
endpoint: API endpoint
key: Response key containing items (e.g., "instances")
params: Additional query parameters
per_page: Items per page (max 500)
Returns:
List of all items
"""
params = params or {}
params["per_page"] = per_page
all_items = []
cursor = None
while True:
if cursor:
params["cursor"] = cursor
response = self.get(endpoint, params=params)
if not response:
break
items = response.get(key, [])
all_items.extend(items)
meta = response.get("meta", {})
links = meta.get("links", {})
cursor = links.get("next", "")
if not cursor:
break
return all_items

View File

@@ -0,0 +1,41 @@
"""
Vultr API Resources
"""
from .base import BaseResource
from .account import AccountResource
from .instances import InstancesResource
from .dns import DNSResource
from .firewall import FirewallResource
from .ssh_keys import SSHKeysResource
from .startup_scripts import StartupScriptsResource
from .snapshots import SnapshotsResource
from .block_storage import BlockStorageResource
from .reserved_ips import ReservedIPsResource
from .vpc import VPCResource
from .load_balancers import LoadBalancersResource
from .bare_metal import BareMetalResource
from .plans import PlansResource
from .regions import RegionsResource
from .os import OSResource
from .backups import BackupsResource
__all__ = [
"BaseResource",
"AccountResource",
"InstancesResource",
"DNSResource",
"FirewallResource",
"SSHKeysResource",
"StartupScriptsResource",
"SnapshotsResource",
"BlockStorageResource",
"ReservedIPsResource",
"VPCResource",
"LoadBalancersResource",
"BareMetalResource",
"PlansResource",
"RegionsResource",
"OSResource",
"BackupsResource",
]

View File

@@ -0,0 +1,115 @@
"""
Account Resource
Includes account info and ACL (Access Control List) management
"""
from typing import Dict, List, Optional
from .base import BaseResource
class AccountResource(BaseResource):
"""
Account and ACL management
Usage:
# Get account info
account = client.account.get()
# Get bandwidth
bandwidth = client.account.get_bandwidth()
# ACL (IP access control)
acl = client.account.get_acl()
client.account.set_acl(acls=["192.168.1.1/32", "10.0.0.0/8"])
"""
def get(self) -> Dict:
"""
Get account information
Returns:
Account info including name, email, balance, etc.
"""
response = self.client.get("account")
return response.get("account", {})
def get_bandwidth(self) -> Dict:
"""
Get account bandwidth usage
Returns:
Bandwidth usage info
"""
return self.client.get("account/bandwidth")
# ACL (Access Control List) methods
def get_acl(self) -> Dict:
"""
Get API access control list
Returns:
ACL info including enabled status and list of allowed IPs/CIDRs
"""
return self.client.get("account/acl")
def set_acl(self, acls: List[str]) -> Dict:
"""
Update API access control list
Args:
acls: List of IP addresses or CIDR ranges to allow.
Pass empty list to disable ACL.
Returns:
Updated ACL info
Example:
# Allow specific IPs
client.account.set_acl(["192.168.1.100/32", "10.0.0.0/8"])
# Disable ACL (allow all)
client.account.set_acl([])
"""
return self.client.post("account/acl", {"acls": acls})
def add_acl(self, ip: str) -> Dict:
"""
Add an IP/CIDR to the ACL
Args:
ip: IP address or CIDR to add (e.g., "192.168.1.1/32")
Returns:
Updated ACL info
"""
current = self.get_acl()
acls = current.get("acls", [])
if ip not in acls:
acls.append(ip)
return self.set_acl(acls)
def remove_acl(self, ip: str) -> Dict:
"""
Remove an IP/CIDR from the ACL
Args:
ip: IP address or CIDR to remove
Returns:
Updated ACL info
"""
current = self.get_acl()
acls = current.get("acls", [])
if ip in acls:
acls.remove(ip)
return self.set_acl(acls)
def clear_acl(self) -> Dict:
"""
Clear all ACL entries (disable ACL)
Returns:
Updated ACL info
"""
return self.set_acl([])

View File

@@ -0,0 +1,65 @@
"""
Backups Resource
Backup management
"""
from typing import Dict, List
from .base import BaseResource
class BackupsResource(BaseResource):
"""
Backup management
Usage:
# List backups
backups = client.backups.list()
# Get backup details
backup = client.backups.get("backup-id")
"""
def list(
self,
instance_id: str = None,
per_page: int = 100,
cursor: str = None
) -> Dict:
"""
List backups
Args:
instance_id: Filter by instance ID
per_page: Items per page
cursor: Pagination cursor
Returns:
Dict with 'backups' list and 'meta' pagination
"""
params = {"per_page": per_page}
if instance_id:
params["instance_id"] = instance_id
if cursor:
params["cursor"] = cursor
return self.client.get("backups", params=params)
def list_all(self, instance_id: str = None) -> List[Dict]:
"""List all backups (auto-paginate)"""
params = {}
if instance_id:
params["instance_id"] = instance_id
return self.client.paginate("backups", "backups", params=params)
def get(self, backup_id: str) -> Dict:
"""
Get backup details
Args:
backup_id: Backup ID
Returns:
Backup details
"""
response = self.client.get(f"backups/{backup_id}")
return response.get("backup", {})

View File

@@ -0,0 +1,291 @@
"""
Bare Metal Resource
Bare metal server management
"""
from typing import Dict, List
from .base import BaseResource
class BareMetalResource(BaseResource):
"""
Bare metal server management
Usage:
# List bare metal servers
servers = client.bare_metal.list()
# Create server
server = client.bare_metal.create(
region="ewr",
plan="vbm-4c-32gb",
os_id=215
)
"""
def list(
self,
per_page: int = 100,
cursor: str = None,
tag: str = None,
label: str = None,
main_ip: str = None,
region: str = None,
) -> Dict:
"""
List bare metal servers
Returns:
Dict with 'bare_metals' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
if tag:
params["tag"] = tag
if label:
params["label"] = label
if main_ip:
params["main_ip"] = main_ip
if region:
params["region"] = region
return self.client.get("bare-metals", params=params)
def list_all(self, **kwargs) -> List[Dict]:
"""List all bare metal servers (auto-paginate)"""
return self.client.paginate("bare-metals", "bare_metals", params=kwargs)
def get(self, baremetal_id: str) -> Dict:
"""
Get bare metal server details
Args:
baremetal_id: Bare metal ID
Returns:
Server details
"""
response = self.client.get(f"bare-metals/{baremetal_id}")
return response.get("bare_metal", {})
def create(
self,
region: str,
plan: str,
os_id: int = None,
script_id: str = None,
snapshot_id: str = None,
enable_ipv6: bool = False,
label: str = None,
sshkey_id: List[str] = None,
app_id: int = None,
image_id: str = None,
user_data: str = None,
activation_email: bool = False,
hostname: str = None,
tag: str = None,
tags: List[str] = None,
reserved_ipv4: str = None,
persistent_pxe: bool = False,
attach_vpc2: List[str] = None,
) -> Dict:
"""
Create a bare metal server
Args:
region: Region ID
plan: Plan ID
os_id: Operating System ID
script_id: Startup script ID
snapshot_id: Snapshot ID
enable_ipv6: Enable IPv6
label: Server label
sshkey_id: SSH key IDs
app_id: Application ID
image_id: Custom image ID
user_data: Cloud-init user data
activation_email: Send activation email
hostname: Server hostname
tag: Deprecated, use tags
tags: Tags
reserved_ipv4: Reserved IPv4 to assign
persistent_pxe: Enable persistent PXE
attach_vpc2: VPC 2.0 IDs to attach
Returns:
Created server details
"""
data = {"region": region, "plan": plan}
if os_id is not None:
data["os_id"] = os_id
if script_id:
data["script_id"] = script_id
if snapshot_id:
data["snapshot_id"] = snapshot_id
if enable_ipv6:
data["enable_ipv6"] = enable_ipv6
if label:
data["label"] = label
if sshkey_id:
data["sshkey_id"] = sshkey_id
if app_id is not None:
data["app_id"] = app_id
if image_id:
data["image_id"] = image_id
if user_data:
data["user_data"] = user_data
if activation_email:
data["activation_email"] = activation_email
if hostname:
data["hostname"] = hostname
if tag:
data["tag"] = tag
if tags:
data["tags"] = tags
if reserved_ipv4:
data["reserved_ipv4"] = reserved_ipv4
if persistent_pxe:
data["persistent_pxe"] = persistent_pxe
if attach_vpc2:
data["attach_vpc2"] = attach_vpc2
response = self.client.post("bare-metals", data)
return response.get("bare_metal", {})
def update(
self,
baremetal_id: str,
user_data: str = None,
label: str = None,
tag: str = None,
tags: List[str] = None,
os_id: int = None,
app_id: int = None,
image_id: str = None,
enable_ipv6: bool = None,
) -> Dict:
"""
Update a bare metal server
Args:
baremetal_id: Bare metal ID
(other args same as create)
Returns:
Updated server details
"""
data = {}
if user_data is not None:
data["user_data"] = user_data
if label is not None:
data["label"] = label
if tag is not None:
data["tag"] = tag
if tags is not None:
data["tags"] = tags
if os_id is not None:
data["os_id"] = os_id
if app_id is not None:
data["app_id"] = app_id
if image_id is not None:
data["image_id"] = image_id
if enable_ipv6 is not None:
data["enable_ipv6"] = enable_ipv6
response = self.client.patch(f"bare-metals/{baremetal_id}", data)
return response.get("bare_metal", {})
def delete(self, baremetal_id: str) -> None:
"""
Delete a bare metal server
Args:
baremetal_id: Bare metal ID to delete
"""
self.client.delete(f"bare-metals/{baremetal_id}")
def start(self, baremetal_id: str) -> None:
"""Start a bare metal server"""
self.client.post(f"bare-metals/{baremetal_id}/start")
def halt(self, baremetal_id: str) -> None:
"""Halt a bare metal server"""
self.client.post(f"bare-metals/{baremetal_id}/halt")
def reboot(self, baremetal_id: str) -> None:
"""Reboot a bare metal server"""
self.client.post(f"bare-metals/{baremetal_id}/reboot")
def reinstall(self, baremetal_id: str, hostname: str = None) -> Dict:
"""
Reinstall a bare metal server
Args:
baremetal_id: Bare metal ID
hostname: New hostname (optional)
Returns:
Server details
"""
data = {}
if hostname:
data["hostname"] = hostname
response = self.client.post(f"bare-metals/{baremetal_id}/reinstall", data)
return response.get("bare_metal", {})
def get_bandwidth(self, baremetal_id: str) -> Dict:
"""Get bandwidth usage"""
return self.client.get(f"bare-metals/{baremetal_id}/bandwidth")
def get_user_data(self, baremetal_id: str) -> Dict:
"""Get user data"""
return self.client.get(f"bare-metals/{baremetal_id}/user-data")
def get_upgrades(self, baremetal_id: str, upgrade_type: str = None) -> Dict:
"""Get available upgrades"""
params = {}
if upgrade_type:
params["type"] = upgrade_type
return self.client.get(f"bare-metals/{baremetal_id}/upgrades", params=params)
def get_vnc(self, baremetal_id: str) -> Dict:
"""Get VNC URL"""
return self.client.get(f"bare-metals/{baremetal_id}/vnc")
# IPv4 management
def list_ipv4(self, baremetal_id: str) -> List[Dict]:
"""List IPv4 addresses"""
response = self.client.get(f"bare-metals/{baremetal_id}/ipv4")
return response.get("ipv4s", [])
def list_ipv6(self, baremetal_id: str) -> List[Dict]:
"""List IPv6 addresses"""
response = self.client.get(f"bare-metals/{baremetal_id}/ipv6")
return response.get("ipv6s", [])
# VPC 2.0
def list_vpc2(self, baremetal_id: str) -> List[Dict]:
"""List attached VPC 2.0 networks"""
response = self.client.get(f"bare-metals/{baremetal_id}/vpc2")
return response.get("vpcs", [])
def attach_vpc2(
self,
baremetal_id: str,
vpc_id: str,
ip_address: str = None
) -> None:
"""Attach VPC 2.0 to bare metal"""
data = {"vpc_id": vpc_id}
if ip_address:
data["ip_address"] = ip_address
self.client.post(f"bare-metals/{baremetal_id}/vpc2/attach", data)
def detach_vpc2(self, baremetal_id: str, vpc_id: str) -> None:
"""Detach VPC 2.0 from bare metal"""
self.client.post(f"bare-metals/{baremetal_id}/vpc2/detach", {"vpc_id": vpc_id})

View File

@@ -0,0 +1,15 @@
"""
Base Resource class
"""
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..client import VultrClient
class BaseResource:
"""Base class for API resources"""
def __init__(self, client: "VultrClient"):
self.client = client

View File

@@ -0,0 +1,135 @@
"""
Block Storage Resource
Block storage management
"""
from typing import Dict, List
from .base import BaseResource
class BlockStorageResource(BaseResource):
"""
Block storage management
Usage:
# List block storage
blocks = client.block_storage.list()
# Create block storage
block = client.block_storage.create(
region="ewr",
size_gb=50,
label="my-storage"
)
# Attach to instance
client.block_storage.attach(block_id="block-id", instance_id="instance-id")
"""
def list(self, per_page: int = 100, cursor: str = None) -> Dict:
"""
List block storage volumes
Returns:
Dict with 'blocks' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
return self.client.get("blocks", params=params)
def list_all(self) -> List[Dict]:
"""List all block storage volumes (auto-paginate)"""
return self.client.paginate("blocks", "blocks")
def get(self, block_id: str) -> Dict:
"""
Get block storage details
Args:
block_id: Block storage ID
Returns:
Block storage details
"""
response = self.client.get(f"blocks/{block_id}")
return response.get("block", {})
def create(
self,
region: str,
size_gb: int,
label: str = None,
block_type: str = None
) -> Dict:
"""
Create block storage
Args:
region: Region ID
size_gb: Size in GB (10-40000)
label: Label
block_type: Storage type ("high_perf" or "storage_opt")
Returns:
Created block storage details
"""
data = {"region": region, "size_gb": size_gb}
if label:
data["label"] = label
if block_type:
data["block_type"] = block_type
response = self.client.post("blocks", data)
return response.get("block", {})
def update(self, block_id: str, label: str = None, size_gb: int = None) -> None:
"""
Update block storage
Args:
block_id: Block ID
label: New label
size_gb: New size (can only increase)
"""
data = {}
if label:
data["label"] = label
if size_gb:
data["size_gb"] = size_gb
self.client.patch(f"blocks/{block_id}", data)
def delete(self, block_id: str) -> None:
"""
Delete block storage
Args:
block_id: Block ID to delete
"""
self.client.delete(f"blocks/{block_id}")
def attach(self, block_id: str, instance_id: str, live: bool = False) -> None:
"""
Attach block storage to instance
Args:
block_id: Block storage ID
instance_id: Instance ID to attach to
live: Live attach (no reboot)
"""
self.client.post(f"blocks/{block_id}/attach", {
"instance_id": instance_id,
"live": live
})
def detach(self, block_id: str, live: bool = False) -> None:
"""
Detach block storage from instance
Args:
block_id: Block storage ID
live: Live detach (no reboot)
"""
self.client.post(f"blocks/{block_id}/detach", {"live": live})

348
vultr_api/resources/dns.py Normal file
View File

@@ -0,0 +1,348 @@
"""
DNS Resource
DNS domain and record management
"""
from typing import Dict, List, Optional
from .base import BaseResource
class DNSResource(BaseResource):
"""
DNS domain and record management
Usage:
# List domains
domains = client.dns.list_domains()
# Create domain
domain = client.dns.create_domain("example.com")
# List records
records = client.dns.list_records("example.com")
# Create A record
record = client.dns.create_record(
domain="example.com",
name="www",
record_type="A",
data="192.168.1.1",
ttl=300
)
"""
# Domain management
def list_domains(self, per_page: int = 100, cursor: str = None) -> Dict:
"""
List all DNS domains
Args:
per_page: Number of items per page
cursor: Pagination cursor
Returns:
Dict with 'domains' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
return self.client.get("domains", params=params)
def list_all_domains(self) -> List[Dict]:
"""List all domains (auto-paginate)"""
return self.client.paginate("domains", "domains")
def get_domain(self, domain: str) -> Dict:
"""
Get domain details
Args:
domain: Domain name (e.g., "example.com")
Returns:
Domain details
"""
response = self.client.get(f"domains/{domain}")
return response.get("domain", {})
def create_domain(
self,
domain: str,
ip: str = None,
dns_sec: str = "disabled"
) -> Dict:
"""
Create a DNS domain
Args:
domain: Domain name
ip: Default IP address for A record (optional)
dns_sec: DNSSEC status ("enabled" or "disabled")
Returns:
Created domain details
"""
data = {"domain": domain, "dns_sec": dns_sec}
if ip:
data["ip"] = ip
response = self.client.post("domains", data)
return response.get("domain", {})
def update_domain(self, domain: str, dns_sec: str) -> None:
"""
Update domain DNSSEC setting
Args:
domain: Domain name
dns_sec: DNSSEC status ("enabled" or "disabled")
"""
self.client.put(f"domains/{domain}", {"dns_sec": dns_sec})
def delete_domain(self, domain: str) -> None:
"""
Delete a DNS domain
Args:
domain: Domain name to delete
"""
self.client.delete(f"domains/{domain}")
def get_soa(self, domain: str) -> Dict:
"""
Get SOA record info
Args:
domain: Domain name
Returns:
SOA record details
"""
return self.client.get(f"domains/{domain}/soa")
def update_soa(
self,
domain: str,
nsprimary: str = None,
email: str = None
) -> None:
"""
Update SOA record
Args:
domain: Domain name
nsprimary: Primary nameserver
email: Admin email
"""
data = {}
if nsprimary:
data["nsprimary"] = nsprimary
if email:
data["email"] = email
self.client.patch(f"domains/{domain}/soa", data)
def get_dnssec(self, domain: str) -> Dict:
"""
Get DNSSEC info
Args:
domain: Domain name
Returns:
DNSSEC details
"""
return self.client.get(f"domains/{domain}/dnssec")
# Record management
def list_records(
self,
domain: str,
per_page: int = 100,
cursor: str = None
) -> Dict:
"""
List DNS records for a domain
Args:
domain: Domain name
per_page: Number of items per page
cursor: Pagination cursor
Returns:
Dict with 'records' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
return self.client.get(f"domains/{domain}/records", params=params)
def list_all_records(self, domain: str) -> List[Dict]:
"""List all records for domain (auto-paginate)"""
return self.client.paginate(f"domains/{domain}/records", "records")
def get_record(self, domain: str, record_id: str) -> Dict:
"""
Get a specific DNS record
Args:
domain: Domain name
record_id: Record ID
Returns:
Record details
"""
response = self.client.get(f"domains/{domain}/records/{record_id}")
return response.get("record", {})
def create_record(
self,
domain: str,
name: str,
record_type: str,
data: str,
ttl: int = 300,
priority: int = None
) -> Dict:
"""
Create a DNS record
Args:
domain: Domain name
name: Record name (subdomain or "@" for root)
record_type: Record type (A, AAAA, CNAME, MX, TXT, NS, SRV, CAA, SSHFP)
data: Record data/value
ttl: Time to live in seconds (default 300)
priority: Priority (required for MX and SRV)
Returns:
Created record details
Example:
# A record
client.dns.create_record("example.com", "www", "A", "192.168.1.1")
# MX record
client.dns.create_record("example.com", "@", "MX", "mail.example.com", priority=10)
# TXT record (SPF)
client.dns.create_record("example.com", "@", "TXT", "v=spf1 include:_spf.example.com ~all")
"""
record_data = {
"name": name,
"type": record_type,
"data": data,
"ttl": ttl
}
if priority is not None:
record_data["priority"] = priority
response = self.client.post(f"domains/{domain}/records", record_data)
return response.get("record", {})
def update_record(
self,
domain: str,
record_id: str,
name: str = None,
data: str = None,
ttl: int = None,
priority: int = None
) -> None:
"""
Update a DNS record
Args:
domain: Domain name
record_id: Record ID to update
name: New record name
data: New record data
ttl: New TTL
priority: New priority
"""
record_data = {}
if name is not None:
record_data["name"] = name
if data is not None:
record_data["data"] = data
if ttl is not None:
record_data["ttl"] = ttl
if priority is not None:
record_data["priority"] = priority
self.client.patch(f"domains/{domain}/records/{record_id}", record_data)
def delete_record(self, domain: str, record_id: str) -> None:
"""
Delete a DNS record
Args:
domain: Domain name
record_id: Record ID to delete
"""
self.client.delete(f"domains/{domain}/records/{record_id}")
# Convenience methods
def create_a_record(
self,
domain: str,
name: str,
ip: str,
ttl: int = 300
) -> Dict:
"""Create an A record"""
return self.create_record(domain, name, "A", ip, ttl)
def create_aaaa_record(
self,
domain: str,
name: str,
ip: str,
ttl: int = 300
) -> Dict:
"""Create an AAAA record"""
return self.create_record(domain, name, "AAAA", ip, ttl)
def create_cname_record(
self,
domain: str,
name: str,
target: str,
ttl: int = 300
) -> Dict:
"""Create a CNAME record"""
return self.create_record(domain, name, "CNAME", target, ttl)
def create_mx_record(
self,
domain: str,
name: str,
mail_server: str,
priority: int = 10,
ttl: int = 300
) -> Dict:
"""Create an MX record"""
return self.create_record(domain, name, "MX", mail_server, ttl, priority)
def create_txt_record(
self,
domain: str,
name: str,
text: str,
ttl: int = 300
) -> Dict:
"""Create a TXT record"""
return self.create_record(domain, name, "TXT", text, ttl)
def create_ns_record(
self,
domain: str,
name: str,
nameserver: str,
ttl: int = 300
) -> Dict:
"""Create an NS record"""
return self.create_record(domain, name, "NS", nameserver, ttl)

View File

@@ -0,0 +1,306 @@
"""
Firewall Resource
Firewall group and rule management
"""
from typing import Dict, List, Optional
from .base import BaseResource
class FirewallResource(BaseResource):
"""
Firewall group and rule management
Usage:
# List firewall groups
groups = client.firewall.list_groups()
# Create firewall group
group = client.firewall.create_group(description="Web servers")
# Add rule
rule = client.firewall.create_rule(
firewall_group_id="group-id",
ip_type="v4",
protocol="tcp",
port="80",
subnet="0.0.0.0",
subnet_size=0
)
"""
# Firewall groups
def list_groups(self, per_page: int = 100, cursor: str = None) -> Dict:
"""
List firewall groups
Returns:
Dict with 'firewall_groups' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
return self.client.get("firewalls", params=params)
def list_all_groups(self) -> List[Dict]:
"""List all firewall groups (auto-paginate)"""
return self.client.paginate("firewalls", "firewall_groups")
def get_group(self, firewall_group_id: str) -> Dict:
"""
Get firewall group details
Args:
firewall_group_id: Firewall group ID
Returns:
Firewall group details
"""
response = self.client.get(f"firewalls/{firewall_group_id}")
return response.get("firewall_group", {})
def create_group(self, description: str = None) -> Dict:
"""
Create a firewall group
Args:
description: Group description
Returns:
Created firewall group
"""
data = {}
if description:
data["description"] = description
response = self.client.post("firewalls", data)
return response.get("firewall_group", {})
def update_group(self, firewall_group_id: str, description: str) -> None:
"""
Update firewall group description
Args:
firewall_group_id: Group ID
description: New description
"""
self.client.put(f"firewalls/{firewall_group_id}", {"description": description})
def delete_group(self, firewall_group_id: str) -> None:
"""
Delete a firewall group
Args:
firewall_group_id: Group ID to delete
"""
self.client.delete(f"firewalls/{firewall_group_id}")
# Firewall rules
def list_rules(
self,
firewall_group_id: str,
per_page: int = 100,
cursor: str = None
) -> Dict:
"""
List rules in a firewall group
Args:
firewall_group_id: Group ID
Returns:
Dict with 'firewall_rules' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
return self.client.get(
f"firewalls/{firewall_group_id}/rules",
params=params
)
def list_all_rules(self, firewall_group_id: str) -> List[Dict]:
"""List all rules in a firewall group (auto-paginate)"""
return self.client.paginate(
f"firewalls/{firewall_group_id}/rules",
"firewall_rules"
)
def get_rule(self, firewall_group_id: str, rule_id: str) -> Dict:
"""
Get a firewall rule
Args:
firewall_group_id: Group ID
rule_id: Rule ID
Returns:
Rule details
"""
response = self.client.get(
f"firewalls/{firewall_group_id}/rules/{rule_id}"
)
return response.get("firewall_rule", {})
def create_rule(
self,
firewall_group_id: str,
ip_type: str,
protocol: str,
port: str,
subnet: str,
subnet_size: int,
source: str = None,
notes: str = None
) -> Dict:
"""
Create a firewall rule
Args:
firewall_group_id: Group ID
ip_type: IP type ("v4" or "v6")
protocol: Protocol ("icmp", "tcp", "udp", "gre", "esp", "ah")
port: Port number, range (e.g., "8000:9000"), or empty for all
subnet: IP address or subnet
subnet_size: CIDR size (0-32 for v4, 0-128 for v6)
source: Source type ("", "cloudflare") - empty for custom subnet
notes: Rule notes
Returns:
Created rule details
Example:
# Allow SSH from anywhere
client.firewall.create_rule(
firewall_group_id="group-id",
ip_type="v4",
protocol="tcp",
port="22",
subnet="0.0.0.0",
subnet_size=0,
notes="SSH"
)
# Allow HTTP from Cloudflare
client.firewall.create_rule(
firewall_group_id="group-id",
ip_type="v4",
protocol="tcp",
port="80",
subnet="",
subnet_size=0,
source="cloudflare",
notes="HTTP via Cloudflare"
)
"""
data = {
"ip_type": ip_type,
"protocol": protocol,
"port": port,
"subnet": subnet,
"subnet_size": subnet_size,
}
if source:
data["source"] = source
if notes:
data["notes"] = notes
response = self.client.post(
f"firewalls/{firewall_group_id}/rules",
data
)
return response.get("firewall_rule", {})
def delete_rule(self, firewall_group_id: str, rule_id: str) -> None:
"""
Delete a firewall rule
Args:
firewall_group_id: Group ID
rule_id: Rule ID to delete
"""
self.client.delete(f"firewalls/{firewall_group_id}/rules/{rule_id}")
# Convenience methods
def allow_ssh(
self,
firewall_group_id: str,
ip_type: str = "v4",
subnet: str = "0.0.0.0",
subnet_size: int = 0
) -> Dict:
"""
Create SSH allow rule
Args:
firewall_group_id: Group ID
ip_type: "v4" or "v6"
subnet: Source subnet (default: anywhere)
subnet_size: CIDR size (default: 0 for anywhere)
Returns:
Created rule
"""
return self.create_rule(
firewall_group_id=firewall_group_id,
ip_type=ip_type,
protocol="tcp",
port="22",
subnet=subnet,
subnet_size=subnet_size,
notes="SSH"
)
def allow_http(
self,
firewall_group_id: str,
ip_type: str = "v4",
subnet: str = "0.0.0.0",
subnet_size: int = 0
) -> Dict:
"""Create HTTP allow rule"""
return self.create_rule(
firewall_group_id=firewall_group_id,
ip_type=ip_type,
protocol="tcp",
port="80",
subnet=subnet,
subnet_size=subnet_size,
notes="HTTP"
)
def allow_https(
self,
firewall_group_id: str,
ip_type: str = "v4",
subnet: str = "0.0.0.0",
subnet_size: int = 0
) -> Dict:
"""Create HTTPS allow rule"""
return self.create_rule(
firewall_group_id=firewall_group_id,
ip_type=ip_type,
protocol="tcp",
port="443",
subnet=subnet,
subnet_size=subnet_size,
notes="HTTPS"
)
def allow_ping(
self,
firewall_group_id: str,
ip_type: str = "v4"
) -> Dict:
"""Create ICMP (ping) allow rule"""
return self.create_rule(
firewall_group_id=firewall_group_id,
ip_type=ip_type,
protocol="icmp",
port="",
subnet="0.0.0.0" if ip_type == "v4" else "::",
subnet_size=0,
notes="ICMP/Ping"
)

View File

@@ -0,0 +1,536 @@
"""
Instances Resource
Cloud compute instance management
"""
from typing import Dict, List, Optional
from .base import BaseResource
class InstancesResource(BaseResource):
"""
Cloud compute instances management
Usage:
# List all instances
instances = client.instances.list()
# Get specific instance
instance = client.instances.get("instance-id")
# Create instance
instance = client.instances.create(
region="ewr",
plan="vc2-1c-1gb",
os_id=215,
label="my-server"
)
# Delete instance
client.instances.delete("instance-id")
"""
def list(
self,
per_page: int = 100,
cursor: str = None,
tag: str = None,
label: str = None,
main_ip: str = None,
region: str = None,
) -> Dict:
"""
List all instances
Args:
per_page: Number of items per page (max 500)
cursor: Pagination cursor
tag: Filter by tag
label: Filter by label
main_ip: Filter by main IP
region: Filter by region
Returns:
Dict with 'instances' list and 'meta' pagination info
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
if tag:
params["tag"] = tag
if label:
params["label"] = label
if main_ip:
params["main_ip"] = main_ip
if region:
params["region"] = region
return self.client.get("instances", params=params)
def list_all(self, **kwargs) -> List[Dict]:
"""
List all instances (auto-paginate)
Returns:
List of all instances
"""
return self.client.paginate("instances", "instances", params=kwargs)
def get(self, instance_id: str) -> Dict:
"""
Get instance details
Args:
instance_id: Instance ID
Returns:
Instance details
"""
response = self.client.get(f"instances/{instance_id}")
return response.get("instance", {})
def create(
self,
region: str,
plan: str,
os_id: int = None,
iso_id: str = None,
snapshot_id: str = None,
app_id: int = None,
image_id: str = None,
ipxe_chain_url: str = None,
script_id: str = None,
enable_ipv6: bool = False,
disable_public_ipv4: bool = False,
attach_private_network: List[str] = None,
attach_vpc: List[str] = None,
attach_vpc2: List[str] = None,
label: str = None,
sshkey_id: List[str] = None,
backups: str = None,
ddos_protection: bool = False,
activation_email: bool = False,
hostname: str = None,
tag: str = None,
tags: List[str] = None,
firewall_group_id: str = None,
reserved_ipv4: str = None,
user_data: str = None,
) -> Dict:
"""
Create a new instance
Args:
region: Region ID (e.g., "ewr", "lax", "nrt")
plan: Plan ID (e.g., "vc2-1c-1gb")
os_id: Operating System ID
iso_id: ISO ID for custom install
snapshot_id: Snapshot ID to restore from
app_id: Application ID
image_id: Custom image ID
ipxe_chain_url: iPXE chain URL
script_id: Startup script ID
enable_ipv6: Enable IPv6
disable_public_ipv4: Disable public IPv4
attach_private_network: Private network IDs to attach
attach_vpc: VPC IDs to attach
attach_vpc2: VPC 2.0 IDs to attach
label: Instance label
sshkey_id: SSH key IDs to add
backups: Backup schedule ("enabled" or "disabled")
ddos_protection: Enable DDoS protection
activation_email: Send activation email
hostname: Server hostname
tag: Deprecated, use tags
tags: Tags for the instance
firewall_group_id: Firewall group ID
reserved_ipv4: Reserved IPv4 to assign
user_data: Cloud-init user data (base64)
Returns:
Created instance details
"""
data = {"region": region, "plan": plan}
# Add optional parameters
if os_id is not None:
data["os_id"] = os_id
if iso_id:
data["iso_id"] = iso_id
if snapshot_id:
data["snapshot_id"] = snapshot_id
if app_id is not None:
data["app_id"] = app_id
if image_id:
data["image_id"] = image_id
if ipxe_chain_url:
data["ipxe_chain_url"] = ipxe_chain_url
if script_id:
data["script_id"] = script_id
if enable_ipv6:
data["enable_ipv6"] = enable_ipv6
if disable_public_ipv4:
data["disable_public_ipv4"] = disable_public_ipv4
if attach_private_network:
data["attach_private_network"] = attach_private_network
if attach_vpc:
data["attach_vpc"] = attach_vpc
if attach_vpc2:
data["attach_vpc2"] = attach_vpc2
if label:
data["label"] = label
if sshkey_id:
data["sshkey_id"] = sshkey_id
if backups:
data["backups"] = backups
if ddos_protection:
data["ddos_protection"] = ddos_protection
if activation_email:
data["activation_email"] = activation_email
if hostname:
data["hostname"] = hostname
if tag:
data["tag"] = tag
if tags:
data["tags"] = tags
if firewall_group_id:
data["firewall_group_id"] = firewall_group_id
if reserved_ipv4:
data["reserved_ipv4"] = reserved_ipv4
if user_data:
data["user_data"] = user_data
response = self.client.post("instances", data)
return response.get("instance", {})
def update(
self,
instance_id: str,
app_id: int = None,
image_id: str = None,
backups: str = None,
firewall_group_id: str = None,
enable_ipv6: bool = None,
os_id: int = None,
user_data: str = None,
tag: str = None,
tags: List[str] = None,
label: str = None,
ddos_protection: bool = None,
) -> Dict:
"""
Update an instance
Args:
instance_id: Instance ID to update
(other args same as create)
Returns:
Updated instance details
"""
data = {}
if app_id is not None:
data["app_id"] = app_id
if image_id is not None:
data["image_id"] = image_id
if backups is not None:
data["backups"] = backups
if firewall_group_id is not None:
data["firewall_group_id"] = firewall_group_id
if enable_ipv6 is not None:
data["enable_ipv6"] = enable_ipv6
if os_id is not None:
data["os_id"] = os_id
if user_data is not None:
data["user_data"] = user_data
if tag is not None:
data["tag"] = tag
if tags is not None:
data["tags"] = tags
if label is not None:
data["label"] = label
if ddos_protection is not None:
data["ddos_protection"] = ddos_protection
response = self.client.patch(f"instances/{instance_id}", data)
return response.get("instance", {})
def delete(self, instance_id: str) -> None:
"""
Delete an instance
Args:
instance_id: Instance ID to delete
"""
self.client.delete(f"instances/{instance_id}")
def start(self, instance_id: str) -> None:
"""Start an instance"""
self.client.post(f"instances/{instance_id}/start")
def stop(self, instance_id: str) -> None:
"""Stop an instance (alias for halt)"""
self.halt(instance_id)
def halt(self, instance_id: str) -> None:
"""Halt an instance"""
self.client.post(f"instances/{instance_id}/halt")
def reboot(self, instance_id: str) -> None:
"""Reboot an instance"""
self.client.post(f"instances/{instance_id}/reboot")
def reinstall(self, instance_id: str, hostname: str = None) -> Dict:
"""
Reinstall an instance
Args:
instance_id: Instance ID
hostname: New hostname (optional)
Returns:
Instance details
"""
data = {}
if hostname:
data["hostname"] = hostname
response = self.client.post(f"instances/{instance_id}/reinstall", data)
return response.get("instance", {})
def get_bandwidth(self, instance_id: str) -> Dict:
"""Get instance bandwidth usage"""
return self.client.get(f"instances/{instance_id}/bandwidth")
def get_neighbors(self, instance_id: str) -> Dict:
"""Get instance neighbors (shared host)"""
return self.client.get(f"instances/{instance_id}/neighbors")
def get_user_data(self, instance_id: str) -> Dict:
"""Get instance user data"""
return self.client.get(f"instances/{instance_id}/user-data")
def get_upgrades(self, instance_id: str, upgrade_type: str = None) -> Dict:
"""
Get available upgrades
Args:
instance_id: Instance ID
upgrade_type: Filter by type ("all", "applications", "os", "plans")
Returns:
Available upgrades
"""
params = {}
if upgrade_type:
params["type"] = upgrade_type
return self.client.get(f"instances/{instance_id}/upgrades", params=params)
# IPv4 management
def list_ipv4(self, instance_id: str) -> List[Dict]:
"""List IPv4 addresses"""
response = self.client.get(f"instances/{instance_id}/ipv4")
return response.get("ipv4s", [])
def create_ipv4(self, instance_id: str, reboot: bool = True) -> Dict:
"""
Create additional IPv4
Args:
instance_id: Instance ID
reboot: Reboot instance to apply (default True)
Returns:
New IPv4 info
"""
response = self.client.post(
f"instances/{instance_id}/ipv4",
{"reboot": reboot}
)
return response.get("ipv4", {})
def delete_ipv4(self, instance_id: str, ipv4: str) -> None:
"""Delete an IPv4 address"""
self.client.delete(f"instances/{instance_id}/ipv4/{ipv4}")
def create_ipv4_reverse(
self,
instance_id: str,
ip: str,
reverse: str
) -> None:
"""Set IPv4 reverse DNS"""
self.client.post(
f"instances/{instance_id}/ipv4/reverse",
{"ip": ip, "reverse": reverse}
)
def set_default_ipv4_reverse(self, instance_id: str, ip: str) -> None:
"""Set default reverse DNS for IPv4"""
self.client.post(
f"instances/{instance_id}/ipv4/reverse/default",
{"ip": ip}
)
# IPv6 management
def list_ipv6(self, instance_id: str) -> List[Dict]:
"""List IPv6 addresses"""
response = self.client.get(f"instances/{instance_id}/ipv6")
return response.get("ipv6s", [])
def create_ipv6_reverse(
self,
instance_id: str,
ip: str,
reverse: str
) -> None:
"""Set IPv6 reverse DNS"""
self.client.post(
f"instances/{instance_id}/ipv6/reverse",
{"ip": ip, "reverse": reverse}
)
def list_ipv6_reverse(self, instance_id: str) -> List[Dict]:
"""List IPv6 reverse DNS entries"""
response = self.client.get(f"instances/{instance_id}/ipv6/reverse")
return response.get("reverse_ipv6s", [])
def delete_ipv6_reverse(self, instance_id: str, ipv6: str) -> None:
"""Delete IPv6 reverse DNS"""
self.client.delete(f"instances/{instance_id}/ipv6/reverse/{ipv6}")
# Backup management
def get_backup_schedule(self, instance_id: str) -> Dict:
"""Get backup schedule"""
return self.client.get(f"instances/{instance_id}/backup-schedule")
def set_backup_schedule(
self,
instance_id: str,
backup_type: str,
hour: int = None,
dow: int = None,
dom: int = None,
) -> None:
"""
Set backup schedule
Args:
instance_id: Instance ID
backup_type: Schedule type ("daily", "weekly", "monthly", "daily_alt_even", "daily_alt_odd")
hour: Hour of day (0-23)
dow: Day of week (1-7, for weekly)
dom: Day of month (1-28, for monthly)
"""
data = {"type": backup_type}
if hour is not None:
data["hour"] = hour
if dow is not None:
data["dow"] = dow
if dom is not None:
data["dom"] = dom
self.client.post(f"instances/{instance_id}/backup-schedule", data)
def restore_backup(self, instance_id: str, backup_id: str) -> Dict:
"""
Restore instance from backup
Args:
instance_id: Instance ID
backup_id: Backup ID to restore
Returns:
Restore status
"""
return self.client.post(
f"instances/{instance_id}/restore",
{"backup_id": backup_id}
)
def restore_snapshot(self, instance_id: str, snapshot_id: str) -> Dict:
"""
Restore instance from snapshot
Args:
instance_id: Instance ID
snapshot_id: Snapshot ID to restore
Returns:
Restore status
"""
return self.client.post(
f"instances/{instance_id}/restore",
{"snapshot_id": snapshot_id}
)
# VPC management
def list_vpcs(self, instance_id: str) -> List[Dict]:
"""List VPCs attached to instance"""
response = self.client.get(f"instances/{instance_id}/vpcs")
return response.get("vpcs", [])
def attach_vpc(self, instance_id: str, vpc_id: str) -> None:
"""Attach VPC to instance"""
self.client.post(
f"instances/{instance_id}/vpcs/attach",
{"vpc_id": vpc_id}
)
def detach_vpc(self, instance_id: str, vpc_id: str) -> None:
"""Detach VPC from instance"""
self.client.post(
f"instances/{instance_id}/vpcs/detach",
{"vpc_id": vpc_id}
)
# VPC 2.0 management
def list_vpc2(self, instance_id: str) -> List[Dict]:
"""List VPC 2.0 networks attached to instance"""
response = self.client.get(f"instances/{instance_id}/vpc2")
return response.get("vpcs", [])
def attach_vpc2(
self,
instance_id: str,
vpc_id: str,
ip_address: str = None
) -> None:
"""
Attach VPC 2.0 to instance
Args:
instance_id: Instance ID
vpc_id: VPC 2.0 ID
ip_address: Specific IP to assign (optional)
"""
data = {"vpc_id": vpc_id}
if ip_address:
data["ip_address"] = ip_address
self.client.post(f"instances/{instance_id}/vpc2/attach", data)
def detach_vpc2(self, instance_id: str, vpc_id: str) -> None:
"""Detach VPC 2.0 from instance"""
self.client.post(
f"instances/{instance_id}/vpc2/detach",
{"vpc_id": vpc_id}
)
# ISO management
def attach_iso(self, instance_id: str, iso_id: str) -> Dict:
"""Attach ISO to instance"""
return self.client.post(
f"instances/{instance_id}/iso/attach",
{"iso_id": iso_id}
)
def detach_iso(self, instance_id: str) -> Dict:
"""Detach ISO from instance"""
return self.client.post(f"instances/{instance_id}/iso/detach")
def get_iso_status(self, instance_id: str) -> Dict:
"""Get ISO attach status"""
return self.client.get(f"instances/{instance_id}/iso")

View File

@@ -0,0 +1,266 @@
"""
Load Balancers Resource
Load balancer management
"""
from typing import Dict, List
from .base import BaseResource
class LoadBalancersResource(BaseResource):
"""
Load balancer management
Usage:
# List load balancers
lbs = client.load_balancers.list()
# Create load balancer
lb = client.load_balancers.create(
region="ewr",
label="my-lb",
forwarding_rules=[{
"frontend_protocol": "http",
"frontend_port": 80,
"backend_protocol": "http",
"backend_port": 80
}]
)
"""
def list(self, per_page: int = 100, cursor: str = None) -> Dict:
"""
List load balancers
Returns:
Dict with 'load_balancers' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
return self.client.get("load-balancers", params=params)
def list_all(self) -> List[Dict]:
"""List all load balancers (auto-paginate)"""
return self.client.paginate("load-balancers", "load_balancers")
def get(self, load_balancer_id: str) -> Dict:
"""
Get load balancer details
Args:
load_balancer_id: Load balancer ID
Returns:
Load balancer details
"""
response = self.client.get(f"load-balancers/{load_balancer_id}")
return response.get("load_balancer", {})
def create(
self,
region: str,
forwarding_rules: List[Dict],
label: str = None,
balancing_algorithm: str = "roundrobin",
ssl_redirect: bool = False,
proxy_protocol: bool = False,
health_check: Dict = None,
sticky_session: Dict = None,
ssl: Dict = None,
instances: List[str] = None,
firewall_rules: List[Dict] = None,
vpc: str = None,
) -> Dict:
"""
Create a load balancer
Args:
region: Region ID
forwarding_rules: List of forwarding rules
label: Load balancer label
balancing_algorithm: "roundrobin" or "leastconn"
ssl_redirect: Redirect HTTP to HTTPS
proxy_protocol: Enable proxy protocol
health_check: Health check config
sticky_session: Sticky session config
ssl: SSL config
instances: List of instance IDs to attach
firewall_rules: List of firewall rules
vpc: VPC ID
Returns:
Created load balancer details
Example forwarding_rules:
[{
"frontend_protocol": "http",
"frontend_port": 80,
"backend_protocol": "http",
"backend_port": 80
}]
Example health_check:
{
"protocol": "http",
"port": 80,
"path": "/health",
"check_interval": 15,
"response_timeout": 5,
"unhealthy_threshold": 5,
"healthy_threshold": 5
}
"""
data = {
"region": region,
"forwarding_rules": forwarding_rules,
"balancing_algorithm": balancing_algorithm,
"ssl_redirect": ssl_redirect,
"proxy_protocol": proxy_protocol,
}
if label:
data["label"] = label
if health_check:
data["health_check"] = health_check
if sticky_session:
data["sticky_session"] = sticky_session
if ssl:
data["ssl"] = ssl
if instances:
data["instances"] = instances
if firewall_rules:
data["firewall_rules"] = firewall_rules
if vpc:
data["vpc"] = vpc
response = self.client.post("load-balancers", data)
return response.get("load_balancer", {})
def update(
self,
load_balancer_id: str,
label: str = None,
balancing_algorithm: str = None,
ssl_redirect: bool = None,
proxy_protocol: bool = None,
health_check: Dict = None,
forwarding_rules: List[Dict] = None,
sticky_session: Dict = None,
ssl: Dict = None,
instances: List[str] = None,
firewall_rules: List[Dict] = None,
vpc: str = None,
) -> None:
"""
Update a load balancer
Args:
load_balancer_id: Load balancer ID
(other args same as create)
"""
data = {}
if label is not None:
data["label"] = label
if balancing_algorithm is not None:
data["balancing_algorithm"] = balancing_algorithm
if ssl_redirect is not None:
data["ssl_redirect"] = ssl_redirect
if proxy_protocol is not None:
data["proxy_protocol"] = proxy_protocol
if health_check is not None:
data["health_check"] = health_check
if forwarding_rules is not None:
data["forwarding_rules"] = forwarding_rules
if sticky_session is not None:
data["sticky_session"] = sticky_session
if ssl is not None:
data["ssl"] = ssl
if instances is not None:
data["instances"] = instances
if firewall_rules is not None:
data["firewall_rules"] = firewall_rules
if vpc is not None:
data["vpc"] = vpc
self.client.patch(f"load-balancers/{load_balancer_id}", data)
def delete(self, load_balancer_id: str) -> None:
"""
Delete a load balancer
Args:
load_balancer_id: Load balancer ID to delete
"""
self.client.delete(f"load-balancers/{load_balancer_id}")
# Forwarding rules
def list_forwarding_rules(self, load_balancer_id: str) -> List[Dict]:
"""List forwarding rules"""
response = self.client.get(
f"load-balancers/{load_balancer_id}/forwarding-rules"
)
return response.get("forwarding_rules", [])
def create_forwarding_rule(
self,
load_balancer_id: str,
frontend_protocol: str,
frontend_port: int,
backend_protocol: str,
backend_port: int
) -> Dict:
"""Create a forwarding rule"""
data = {
"frontend_protocol": frontend_protocol,
"frontend_port": frontend_port,
"backend_protocol": backend_protocol,
"backend_port": backend_port,
}
response = self.client.post(
f"load-balancers/{load_balancer_id}/forwarding-rules",
data
)
return response.get("forwarding_rule", {})
def get_forwarding_rule(
self,
load_balancer_id: str,
rule_id: str
) -> Dict:
"""Get a forwarding rule"""
response = self.client.get(
f"load-balancers/{load_balancer_id}/forwarding-rules/{rule_id}"
)
return response.get("forwarding_rule", {})
def delete_forwarding_rule(
self,
load_balancer_id: str,
rule_id: str
) -> None:
"""Delete a forwarding rule"""
self.client.delete(
f"load-balancers/{load_balancer_id}/forwarding-rules/{rule_id}"
)
# Firewall rules
def list_firewall_rules(self, load_balancer_id: str) -> List[Dict]:
"""List firewall rules"""
response = self.client.get(
f"load-balancers/{load_balancer_id}/firewall-rules"
)
return response.get("firewall_rules", [])
def get_firewall_rule(
self,
load_balancer_id: str,
rule_id: str
) -> Dict:
"""Get a firewall rule"""
response = self.client.get(
f"load-balancers/{load_balancer_id}/firewall-rules/{rule_id}"
)
return response.get("firewall_rule", {})

34
vultr_api/resources/os.py Normal file
View File

@@ -0,0 +1,34 @@
"""
OS Resource
Operating system listings
"""
from typing import Dict, List
from .base import BaseResource
class OSResource(BaseResource):
"""
Operating system listings
Usage:
# List all OS options
os_list = client.os.list()
"""
def list(self, per_page: int = 100, cursor: str = None) -> Dict:
"""
List available operating systems
Returns:
Dict with 'os' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
return self.client.get("os", params=params)
def list_all(self) -> List[Dict]:
"""List all operating systems (auto-paginate)"""
return self.client.paginate("os", "os")

View File

@@ -0,0 +1,75 @@
"""
Plans Resource
Available plan listings
"""
from typing import Dict, List
from .base import BaseResource
class PlansResource(BaseResource):
"""
Plan listings
Usage:
# List cloud compute plans
plans = client.plans.list()
# List bare metal plans
bm_plans = client.plans.list_bare_metal()
"""
def list(
self,
plan_type: str = None,
per_page: int = 100,
cursor: str = None,
os: str = None
) -> Dict:
"""
List cloud compute plans
Args:
plan_type: Filter by type ("all", "vc2", "vhf", "vdc", "voc", "voc-g", "voc-c", "voc-m", "voc-s", "vcg")
per_page: Items per page
cursor: Pagination cursor
os: Filter by supported OS ("windows")
Returns:
Dict with 'plans' list and 'meta' pagination
"""
params = {"per_page": per_page}
if plan_type:
params["type"] = plan_type
if cursor:
params["cursor"] = cursor
if os:
params["os"] = os
return self.client.get("plans", params=params)
def list_all(self, plan_type: str = None, os: str = None) -> List[Dict]:
"""List all plans (auto-paginate)"""
params = {}
if plan_type:
params["type"] = plan_type
if os:
params["os"] = os
return self.client.paginate("plans", "plans", params=params)
def list_bare_metal(self, per_page: int = 100, cursor: str = None) -> Dict:
"""
List bare metal plans
Returns:
Dict with 'plans' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
return self.client.get("plans-metal", params=params)
def list_all_bare_metal(self) -> List[Dict]:
"""List all bare metal plans (auto-paginate)"""
return self.client.paginate("plans-metal", "plans")

View File

@@ -0,0 +1,53 @@
"""
Regions Resource
Available region listings
"""
from typing import Dict, List
from .base import BaseResource
class RegionsResource(BaseResource):
"""
Region listings
Usage:
# List all regions
regions = client.regions.list()
# List plans available in a region
plans = client.regions.list_availability("ewr")
"""
def list(self, per_page: int = 100, cursor: str = None) -> Dict:
"""
List all regions
Returns:
Dict with 'regions' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
return self.client.get("regions", params=params)
def list_all(self) -> List[Dict]:
"""List all regions (auto-paginate)"""
return self.client.paginate("regions", "regions")
def list_availability(self, region_id: str, plan_type: str = None) -> Dict:
"""
List available plans in a region
Args:
region_id: Region ID (e.g., "ewr", "lax", "nrt")
plan_type: Filter by type ("vc2", "vhf", "vdc")
Returns:
Dict with 'available_plans' list
"""
params = {}
if plan_type:
params["type"] = plan_type
return self.client.get(f"regions/{region_id}/availability", params=params)

View File

@@ -0,0 +1,135 @@
"""
Reserved IPs Resource
Reserved IP address management
"""
from typing import Dict, List
from .base import BaseResource
class ReservedIPsResource(BaseResource):
"""
Reserved IP address management
Usage:
# List reserved IPs
ips = client.reserved_ips.list()
# Create reserved IP
ip = client.reserved_ips.create(region="ewr", ip_type="v4")
# Attach to instance
client.reserved_ips.attach(reserved_ip="id", instance_id="instance-id")
"""
def list(self, per_page: int = 100, cursor: str = None) -> Dict:
"""
List reserved IPs
Returns:
Dict with 'reserved_ips' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
return self.client.get("reserved-ips", params=params)
def list_all(self) -> List[Dict]:
"""List all reserved IPs (auto-paginate)"""
return self.client.paginate("reserved-ips", "reserved_ips")
def get(self, reserved_ip: str) -> Dict:
"""
Get reserved IP details
Args:
reserved_ip: Reserved IP ID
Returns:
Reserved IP details
"""
response = self.client.get(f"reserved-ips/{reserved_ip}")
return response.get("reserved_ip", {})
def create(
self,
region: str,
ip_type: str,
label: str = None
) -> Dict:
"""
Create a reserved IP
Args:
region: Region ID
ip_type: IP type ("v4" or "v6")
label: Label
Returns:
Created reserved IP details
"""
data = {"region": region, "ip_type": ip_type}
if label:
data["label"] = label
response = self.client.post("reserved-ips", data)
return response.get("reserved_ip", {})
def update(self, reserved_ip: str, label: str) -> None:
"""
Update reserved IP label
Args:
reserved_ip: Reserved IP ID
label: New label
"""
self.client.patch(f"reserved-ips/{reserved_ip}", {"label": label})
def delete(self, reserved_ip: str) -> None:
"""
Delete a reserved IP
Args:
reserved_ip: Reserved IP ID to delete
"""
self.client.delete(f"reserved-ips/{reserved_ip}")
def attach(self, reserved_ip: str, instance_id: str) -> None:
"""
Attach reserved IP to instance
Args:
reserved_ip: Reserved IP ID
instance_id: Instance ID to attach to
"""
self.client.post(f"reserved-ips/{reserved_ip}/attach", {
"instance_id": instance_id
})
def detach(self, reserved_ip: str) -> None:
"""
Detach reserved IP from instance
Args:
reserved_ip: Reserved IP ID
"""
self.client.post(f"reserved-ips/{reserved_ip}/detach")
def convert(self, ip_address: str, label: str = None) -> Dict:
"""
Convert instance IP to reserved IP
Args:
ip_address: IP address to convert
label: Label for reserved IP
Returns:
Created reserved IP details
"""
data = {"ip_address": ip_address}
if label:
data["label"] = label
response = self.client.post("reserved-ips/convert", data)
return response.get("reserved_ip", {})

View File

@@ -0,0 +1,108 @@
"""
Snapshots Resource
Snapshot management
"""
from typing import Dict, List
from .base import BaseResource
class SnapshotsResource(BaseResource):
"""
Snapshot management
Usage:
# List snapshots
snapshots = client.snapshots.list()
# Create snapshot from instance
snapshot = client.snapshots.create(instance_id="instance-id", description="My snapshot")
# Create from URL
snapshot = client.snapshots.create_from_url(url="https://example.com/image.raw")
"""
def list(self, per_page: int = 100, cursor: str = None) -> Dict:
"""
List snapshots
Returns:
Dict with 'snapshots' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
return self.client.get("snapshots", params=params)
def list_all(self) -> List[Dict]:
"""List all snapshots (auto-paginate)"""
return self.client.paginate("snapshots", "snapshots")
def get(self, snapshot_id: str) -> Dict:
"""
Get snapshot details
Args:
snapshot_id: Snapshot ID
Returns:
Snapshot details
"""
response = self.client.get(f"snapshots/{snapshot_id}")
return response.get("snapshot", {})
def create(self, instance_id: str, description: str = None) -> Dict:
"""
Create a snapshot from an instance
Args:
instance_id: Instance ID to snapshot
description: Snapshot description
Returns:
Created snapshot details
"""
data = {"instance_id": instance_id}
if description:
data["description"] = description
response = self.client.post("snapshots", data)
return response.get("snapshot", {})
def create_from_url(self, url: str, description: str = None) -> Dict:
"""
Create a snapshot from a URL
Args:
url: URL to raw disk image
description: Snapshot description
Returns:
Created snapshot details
"""
data = {"url": url}
if description:
data["description"] = description
response = self.client.post("snapshots/create-from-url", data)
return response.get("snapshot", {})
def update(self, snapshot_id: str, description: str) -> None:
"""
Update snapshot description
Args:
snapshot_id: Snapshot ID
description: New description
"""
self.client.put(f"snapshots/{snapshot_id}", {"description": description})
def delete(self, snapshot_id: str) -> None:
"""
Delete a snapshot
Args:
snapshot_id: Snapshot ID to delete
"""
self.client.delete(f"snapshots/{snapshot_id}")

View File

@@ -0,0 +1,99 @@
"""
SSH Keys Resource
SSH key management
"""
from typing import Dict, List
from .base import BaseResource
class SSHKeysResource(BaseResource):
"""
SSH key management
Usage:
# List keys
keys = client.ssh_keys.list()
# Create key
key = client.ssh_keys.create(
name="my-key",
ssh_key="ssh-rsa AAAAB3..."
)
# Delete key
client.ssh_keys.delete("key-id")
"""
def list(self, per_page: int = 100, cursor: str = None) -> Dict:
"""
List SSH keys
Returns:
Dict with 'ssh_keys' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
return self.client.get("ssh-keys", params=params)
def list_all(self) -> List[Dict]:
"""List all SSH keys (auto-paginate)"""
return self.client.paginate("ssh-keys", "ssh_keys")
def get(self, ssh_key_id: str) -> Dict:
"""
Get SSH key details
Args:
ssh_key_id: SSH key ID
Returns:
SSH key details
"""
response = self.client.get(f"ssh-keys/{ssh_key_id}")
return response.get("ssh_key", {})
def create(self, name: str, ssh_key: str) -> Dict:
"""
Create an SSH key
Args:
name: Key name
ssh_key: Public key content
Returns:
Created SSH key details
"""
response = self.client.post("ssh-keys", {
"name": name,
"ssh_key": ssh_key
})
return response.get("ssh_key", {})
def update(self, ssh_key_id: str, name: str = None, ssh_key: str = None) -> None:
"""
Update an SSH key
Args:
ssh_key_id: Key ID
name: New name
ssh_key: New public key content
"""
data = {}
if name:
data["name"] = name
if ssh_key:
data["ssh_key"] = ssh_key
self.client.patch(f"ssh-keys/{ssh_key_id}", data)
def delete(self, ssh_key_id: str) -> None:
"""
Delete an SSH key
Args:
ssh_key_id: Key ID to delete
"""
self.client.delete(f"ssh-keys/{ssh_key_id}")

View File

@@ -0,0 +1,112 @@
"""
Startup Scripts Resource
Startup script management
"""
from typing import Dict, List
from .base import BaseResource
class StartupScriptsResource(BaseResource):
"""
Startup script management
Usage:
# List scripts
scripts = client.startup_scripts.list()
# Create script
script = client.startup_scripts.create(
name="my-script",
script="#!/bin/bash\\necho 'Hello'",
script_type="boot"
)
# Delete script
client.startup_scripts.delete("script-id")
"""
def list(self, per_page: int = 100, cursor: str = None) -> Dict:
"""
List startup scripts
Returns:
Dict with 'startup_scripts' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
return self.client.get("startup-scripts", params=params)
def list_all(self) -> List[Dict]:
"""List all startup scripts (auto-paginate)"""
return self.client.paginate("startup-scripts", "startup_scripts")
def get(self, startup_script_id: str) -> Dict:
"""
Get startup script details
Args:
startup_script_id: Script ID
Returns:
Script details
"""
response = self.client.get(f"startup-scripts/{startup_script_id}")
return response.get("startup_script", {})
def create(
self,
name: str,
script: str,
script_type: str = "boot"
) -> Dict:
"""
Create a startup script
Args:
name: Script name
script: Script content (base64 or plain text)
script_type: Type ("boot" or "pxe")
Returns:
Created script details
"""
response = self.client.post("startup-scripts", {
"name": name,
"script": script,
"type": script_type
})
return response.get("startup_script", {})
def update(
self,
startup_script_id: str,
name: str = None,
script: str = None
) -> None:
"""
Update a startup script
Args:
startup_script_id: Script ID
name: New name
script: New script content
"""
data = {}
if name:
data["name"] = name
if script:
data["script"] = script
self.client.patch(f"startup-scripts/{startup_script_id}", data)
def delete(self, startup_script_id: str) -> None:
"""
Delete a startup script
Args:
startup_script_id: Script ID to delete
"""
self.client.delete(f"startup-scripts/{startup_script_id}")

220
vultr_api/resources/vpc.py Normal file
View File

@@ -0,0 +1,220 @@
"""
VPC Resource
VPC (Virtual Private Cloud) management - both v1 and v2
"""
from typing import Dict, List
from .base import BaseResource
class VPCResource(BaseResource):
"""
VPC management (v1 and v2)
Usage:
# List VPCs
vpcs = client.vpc.list()
# Create VPC
vpc = client.vpc.create(
region="ewr",
description="My VPC",
v4_subnet="10.0.0.0",
v4_subnet_mask=24
)
# VPC 2.0
vpc2 = client.vpc.create_vpc2(region="ewr", description="My VPC 2.0")
"""
# VPC v1
def list(self, per_page: int = 100, cursor: str = None) -> Dict:
"""
List VPCs
Returns:
Dict with 'vpcs' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
return self.client.get("vpcs", params=params)
def list_all(self) -> List[Dict]:
"""List all VPCs (auto-paginate)"""
return self.client.paginate("vpcs", "vpcs")
def get(self, vpc_id: str) -> Dict:
"""
Get VPC details
Args:
vpc_id: VPC ID
Returns:
VPC details
"""
response = self.client.get(f"vpcs/{vpc_id}")
return response.get("vpc", {})
def create(
self,
region: str,
description: str = None,
v4_subnet: str = None,
v4_subnet_mask: int = None
) -> Dict:
"""
Create a VPC
Args:
region: Region ID
description: VPC description
v4_subnet: IPv4 subnet (e.g., "10.0.0.0")
v4_subnet_mask: Subnet mask (e.g., 24)
Returns:
Created VPC details
"""
data = {"region": region}
if description:
data["description"] = description
if v4_subnet:
data["v4_subnet"] = v4_subnet
if v4_subnet_mask:
data["v4_subnet_mask"] = v4_subnet_mask
response = self.client.post("vpcs", data)
return response.get("vpc", {})
def update(self, vpc_id: str, description: str) -> None:
"""
Update VPC description
Args:
vpc_id: VPC ID
description: New description
"""
self.client.put(f"vpcs/{vpc_id}", {"description": description})
def delete(self, vpc_id: str) -> None:
"""
Delete a VPC
Args:
vpc_id: VPC ID to delete
"""
self.client.delete(f"vpcs/{vpc_id}")
# VPC 2.0
def list_vpc2(self, per_page: int = 100, cursor: str = None) -> Dict:
"""
List VPC 2.0 networks
Returns:
Dict with 'vpcs' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
return self.client.get("vpc2", params=params)
def list_all_vpc2(self) -> List[Dict]:
"""List all VPC 2.0 networks (auto-paginate)"""
return self.client.paginate("vpc2", "vpcs")
def get_vpc2(self, vpc_id: str) -> Dict:
"""
Get VPC 2.0 details
Args:
vpc_id: VPC 2.0 ID
Returns:
VPC 2.0 details
"""
response = self.client.get(f"vpc2/{vpc_id}")
return response.get("vpc", {})
def create_vpc2(
self,
region: str,
description: str = None,
ip_block: str = None,
prefix_length: int = None
) -> Dict:
"""
Create a VPC 2.0 network
Args:
region: Region ID
description: VPC description
ip_block: IP block (e.g., "10.0.0.0")
prefix_length: Prefix length (e.g., 24)
Returns:
Created VPC 2.0 details
"""
data = {"region": region}
if description:
data["description"] = description
if ip_block:
data["ip_block"] = ip_block
if prefix_length:
data["prefix_length"] = prefix_length
response = self.client.post("vpc2", data)
return response.get("vpc", {})
def update_vpc2(self, vpc_id: str, description: str) -> None:
"""
Update VPC 2.0 description
Args:
vpc_id: VPC 2.0 ID
description: New description
"""
self.client.put(f"vpc2/{vpc_id}", {"description": description})
def delete_vpc2(self, vpc_id: str) -> None:
"""
Delete a VPC 2.0 network
Args:
vpc_id: VPC 2.0 ID to delete
"""
self.client.delete(f"vpc2/{vpc_id}")
def list_vpc2_nodes(self, vpc_id: str) -> List[Dict]:
"""
List nodes attached to VPC 2.0
Args:
vpc_id: VPC 2.0 ID
Returns:
List of attached nodes
"""
response = self.client.get(f"vpc2/{vpc_id}/nodes")
return response.get("nodes", [])
def attach_vpc2_nodes(self, vpc_id: str, nodes: List[Dict]) -> None:
"""
Attach nodes to VPC 2.0
Args:
vpc_id: VPC 2.0 ID
nodes: List of nodes [{"id": "instance-id", "ip_address": "10.0.0.5"}, ...]
"""
self.client.post(f"vpc2/{vpc_id}/nodes/attach", {"nodes": nodes})
def detach_vpc2_nodes(self, vpc_id: str, nodes: List[str]) -> None:
"""
Detach nodes from VPC 2.0
Args:
vpc_id: VPC 2.0 ID
nodes: List of node IDs
"""
self.client.post(f"vpc2/{vpc_id}/nodes/detach", {"nodes": nodes})