Files
vultr-api/vultr_api/resources/firewall.py
HWANG BYUNGHA 184054c6c1 Initial commit: Vultr API v2 Python wrapper with FastAPI server
- vultr_api/: Python library wrapping Vultr API v2
  - 17 resource modules (instances, dns, firewall, vpc, etc.)
  - Pagination support, error handling

- server/: FastAPI REST server
  - All API endpoints exposed via HTTP
  - X-API-Key header authentication
  - Swagger docs at /docs

- Podman quadlet config for systemd deployment

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 01:08:17 +09:00

307 lines
8.1 KiB
Python

"""
Firewall Resource
Firewall group and rule management
"""
from typing import Dict, List, Optional
from .base import BaseResource
class FirewallResource(BaseResource):
"""
Firewall group and rule management
Usage:
# List firewall groups
groups = client.firewall.list_groups()
# Create firewall group
group = client.firewall.create_group(description="Web servers")
# Add rule
rule = client.firewall.create_rule(
firewall_group_id="group-id",
ip_type="v4",
protocol="tcp",
port="80",
subnet="0.0.0.0",
subnet_size=0
)
"""
# Firewall groups
def list_groups(self, per_page: int = 100, cursor: str = None) -> Dict:
"""
List firewall groups
Returns:
Dict with 'firewall_groups' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
return self.client.get("firewalls", params=params)
def list_all_groups(self) -> List[Dict]:
"""List all firewall groups (auto-paginate)"""
return self.client.paginate("firewalls", "firewall_groups")
def get_group(self, firewall_group_id: str) -> Dict:
"""
Get firewall group details
Args:
firewall_group_id: Firewall group ID
Returns:
Firewall group details
"""
response = self.client.get(f"firewalls/{firewall_group_id}")
return response.get("firewall_group", {})
def create_group(self, description: str = None) -> Dict:
"""
Create a firewall group
Args:
description: Group description
Returns:
Created firewall group
"""
data = {}
if description:
data["description"] = description
response = self.client.post("firewalls", data)
return response.get("firewall_group", {})
def update_group(self, firewall_group_id: str, description: str) -> None:
"""
Update firewall group description
Args:
firewall_group_id: Group ID
description: New description
"""
self.client.put(f"firewalls/{firewall_group_id}", {"description": description})
def delete_group(self, firewall_group_id: str) -> None:
"""
Delete a firewall group
Args:
firewall_group_id: Group ID to delete
"""
self.client.delete(f"firewalls/{firewall_group_id}")
# Firewall rules
def list_rules(
self,
firewall_group_id: str,
per_page: int = 100,
cursor: str = None
) -> Dict:
"""
List rules in a firewall group
Args:
firewall_group_id: Group ID
Returns:
Dict with 'firewall_rules' list and 'meta' pagination
"""
params = {"per_page": per_page}
if cursor:
params["cursor"] = cursor
return self.client.get(
f"firewalls/{firewall_group_id}/rules",
params=params
)
def list_all_rules(self, firewall_group_id: str) -> List[Dict]:
"""List all rules in a firewall group (auto-paginate)"""
return self.client.paginate(
f"firewalls/{firewall_group_id}/rules",
"firewall_rules"
)
def get_rule(self, firewall_group_id: str, rule_id: str) -> Dict:
"""
Get a firewall rule
Args:
firewall_group_id: Group ID
rule_id: Rule ID
Returns:
Rule details
"""
response = self.client.get(
f"firewalls/{firewall_group_id}/rules/{rule_id}"
)
return response.get("firewall_rule", {})
def create_rule(
self,
firewall_group_id: str,
ip_type: str,
protocol: str,
port: str,
subnet: str,
subnet_size: int,
source: str = None,
notes: str = None
) -> Dict:
"""
Create a firewall rule
Args:
firewall_group_id: Group ID
ip_type: IP type ("v4" or "v6")
protocol: Protocol ("icmp", "tcp", "udp", "gre", "esp", "ah")
port: Port number, range (e.g., "8000:9000"), or empty for all
subnet: IP address or subnet
subnet_size: CIDR size (0-32 for v4, 0-128 for v6)
source: Source type ("", "cloudflare") - empty for custom subnet
notes: Rule notes
Returns:
Created rule details
Example:
# Allow SSH from anywhere
client.firewall.create_rule(
firewall_group_id="group-id",
ip_type="v4",
protocol="tcp",
port="22",
subnet="0.0.0.0",
subnet_size=0,
notes="SSH"
)
# Allow HTTP from Cloudflare
client.firewall.create_rule(
firewall_group_id="group-id",
ip_type="v4",
protocol="tcp",
port="80",
subnet="",
subnet_size=0,
source="cloudflare",
notes="HTTP via Cloudflare"
)
"""
data = {
"ip_type": ip_type,
"protocol": protocol,
"port": port,
"subnet": subnet,
"subnet_size": subnet_size,
}
if source:
data["source"] = source
if notes:
data["notes"] = notes
response = self.client.post(
f"firewalls/{firewall_group_id}/rules",
data
)
return response.get("firewall_rule", {})
def delete_rule(self, firewall_group_id: str, rule_id: str) -> None:
"""
Delete a firewall rule
Args:
firewall_group_id: Group ID
rule_id: Rule ID to delete
"""
self.client.delete(f"firewalls/{firewall_group_id}/rules/{rule_id}")
# Convenience methods
def allow_ssh(
self,
firewall_group_id: str,
ip_type: str = "v4",
subnet: str = "0.0.0.0",
subnet_size: int = 0
) -> Dict:
"""
Create SSH allow rule
Args:
firewall_group_id: Group ID
ip_type: "v4" or "v6"
subnet: Source subnet (default: anywhere)
subnet_size: CIDR size (default: 0 for anywhere)
Returns:
Created rule
"""
return self.create_rule(
firewall_group_id=firewall_group_id,
ip_type=ip_type,
protocol="tcp",
port="22",
subnet=subnet,
subnet_size=subnet_size,
notes="SSH"
)
def allow_http(
self,
firewall_group_id: str,
ip_type: str = "v4",
subnet: str = "0.0.0.0",
subnet_size: int = 0
) -> Dict:
"""Create HTTP allow rule"""
return self.create_rule(
firewall_group_id=firewall_group_id,
ip_type=ip_type,
protocol="tcp",
port="80",
subnet=subnet,
subnet_size=subnet_size,
notes="HTTP"
)
def allow_https(
self,
firewall_group_id: str,
ip_type: str = "v4",
subnet: str = "0.0.0.0",
subnet_size: int = 0
) -> Dict:
"""Create HTTPS allow rule"""
return self.create_rule(
firewall_group_id=firewall_group_id,
ip_type=ip_type,
protocol="tcp",
port="443",
subnet=subnet,
subnet_size=subnet_size,
notes="HTTPS"
)
def allow_ping(
self,
firewall_group_id: str,
ip_type: str = "v4"
) -> Dict:
"""Create ICMP (ping) allow rule"""
return self.create_rule(
firewall_group_id=firewall_group_id,
ip_type=ip_type,
protocol="icmp",
port="",
subnet="0.0.0.0" if ip_type == "v4" else "::",
subnet_size=0,
notes="ICMP/Ping"
)