feat: add Queue-based async server provisioning
- Add Cloudflare Queue for async server provisioning workflow - Implement VPS provider abstraction (Linode, Vultr) - Add provisioning API endpoints with API key authentication - Fix race condition in balance deduction (atomic query) - Remove root_password from Queue for security (fetch from DB) - Add IP assignment wait logic after server creation - Add rollback/refund on all failure cases Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
392
src/services/provisioning-service.ts
Normal file
392
src/services/provisioning-service.ts
Normal file
@@ -0,0 +1,392 @@
|
||||
/**
|
||||
* Provisioning Service
|
||||
* Orchestrates the server provisioning workflow with Queue-based async processing
|
||||
*/
|
||||
|
||||
import type { Env, ProvisionRequest, ProvisionResponse, ProvisionQueueMessage, ServerOrder, VPSProvider } from '../types';
|
||||
import { ProvisioningRepository } from '../repositories/ProvisioningRepository';
|
||||
import { LinodeProvider } from './linode-provider';
|
||||
import { VultrProvider } from './vultr-provider';
|
||||
import { getExchangeRate } from '../utils/exchange-rate';
|
||||
|
||||
export class ProvisioningService {
|
||||
private repo: ProvisioningRepository;
|
||||
private linodeProvider: LinodeProvider | null;
|
||||
private vultrProvider: VultrProvider | null;
|
||||
private env: Env;
|
||||
|
||||
constructor(
|
||||
env: Env, // Full env for exchange rate API + cache + queue
|
||||
db: D1Database, // cloud-instances-db: pricing, providers
|
||||
userDb: D1Database, // telegram-conversations: users, deposits, orders
|
||||
linodeApiKey?: string,
|
||||
vultrApiKey?: string
|
||||
) {
|
||||
this.env = env;
|
||||
this.repo = new ProvisioningRepository(db, userDb);
|
||||
this.linodeProvider = linodeApiKey ? new LinodeProvider(linodeApiKey) : null;
|
||||
this.vultrProvider = vultrApiKey ? new VultrProvider(vultrApiKey) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Main provisioning workflow (async via Queue)
|
||||
* 1. Validate user by telegram_id
|
||||
* 2. Get pricing details
|
||||
* 3. Check and deduct balance (KRW)
|
||||
* 4. Create order with status 'queued'
|
||||
* 5. Send message to Queue
|
||||
* 6. Return immediately with order info
|
||||
*/
|
||||
async provisionServer(request: ProvisionRequest): Promise<ProvisionResponse> {
|
||||
const { telegram_id, pricing_id, label, image, dry_run } = request;
|
||||
const osImageKey = image || 'ubuntu_22_04';
|
||||
|
||||
// Step 1: Validate user by telegram_id
|
||||
const user = await this.repo.getUserByTelegramId(telegram_id);
|
||||
if (!user) {
|
||||
return {
|
||||
success: false,
|
||||
error: { code: 'USER_NOT_FOUND', message: 'User not found' },
|
||||
};
|
||||
}
|
||||
|
||||
// Step 2: Get pricing details with provider info
|
||||
const pricing = await this.repo.getPricingWithProvider(pricing_id);
|
||||
if (!pricing) {
|
||||
return {
|
||||
success: false,
|
||||
error: { code: 'PRICING_NOT_FOUND', message: 'Invalid pricing_id or unavailable' },
|
||||
};
|
||||
}
|
||||
|
||||
// Step 3: Calculate price in KRW using real-time exchange rate
|
||||
// Round to nearest 500 KRW for cleaner pricing
|
||||
const exchangeRate = await getExchangeRate(this.env);
|
||||
const priceKrw = Math.round((pricing.monthly_price * exchangeRate) / 500) * 500;
|
||||
|
||||
// Step 4: Check balance
|
||||
const currentBalance = await this.repo.getBalance(user.id);
|
||||
if (currentBalance < priceKrw) {
|
||||
return {
|
||||
success: false,
|
||||
error: {
|
||||
code: 'INSUFFICIENT_BALANCE',
|
||||
message: `Insufficient balance. Required: ₩${priceKrw.toLocaleString()}, Available: ₩${currentBalance.toLocaleString()}`,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
// Dry run mode: return validation result without creating order
|
||||
if (dry_run) {
|
||||
return {
|
||||
success: true,
|
||||
order: {
|
||||
id: 0,
|
||||
user_id: user.id,
|
||||
spec_id: pricing_id,
|
||||
status: 'pending',
|
||||
region: pricing.region_code,
|
||||
provider_instance_id: null,
|
||||
ip_address: null,
|
||||
root_password: null,
|
||||
price_paid: priceKrw,
|
||||
error_message: null,
|
||||
provisioned_at: null,
|
||||
terminated_at: null,
|
||||
created_at: new Date().toISOString(),
|
||||
updated_at: new Date().toISOString(),
|
||||
label: label || null,
|
||||
image: osImageKey,
|
||||
billing_type: 'monthly',
|
||||
},
|
||||
dry_run_info: {
|
||||
message: 'Dry run successful. No server created, no balance deducted.',
|
||||
user_id: user.id,
|
||||
telegram_id,
|
||||
current_balance_krw: currentBalance,
|
||||
price_krw: priceKrw,
|
||||
remaining_balance_krw: currentBalance - priceKrw,
|
||||
provider: pricing.provider_name,
|
||||
instance: pricing.instance_name,
|
||||
region: pricing.region_name,
|
||||
},
|
||||
} as ProvisionResponse;
|
||||
}
|
||||
|
||||
// Step 5: Generate root password before creating order
|
||||
const rootPassword = this.generateSecurePassword();
|
||||
|
||||
// Step 6: Create order with payment (atomic balance deduction + order creation)
|
||||
const orderResult = await this.repo.createOrderWithPayment(
|
||||
user.id,
|
||||
pricing_id,
|
||||
pricing.region_code,
|
||||
priceKrw,
|
||||
label || null,
|
||||
osImageKey
|
||||
);
|
||||
|
||||
if (!orderResult.orderId) {
|
||||
return {
|
||||
success: false,
|
||||
error: {
|
||||
code: orderResult.error || 'ORDER_CREATION_FAILED',
|
||||
message: orderResult.error === 'INSUFFICIENT_BALANCE'
|
||||
? `Insufficient balance. Required: ₩${priceKrw.toLocaleString()}`
|
||||
: 'Failed to create order',
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const orderId = orderResult.orderId;
|
||||
|
||||
// Step 7: Store root password in order (encrypted in production)
|
||||
await this.repo.updateOrderRootPassword(orderId, rootPassword);
|
||||
|
||||
// Step 8: Update order status to 'queued' and send to Queue
|
||||
await this.repo.updateOrderStatus(orderId, 'provisioning');
|
||||
|
||||
// Send message to provision queue (root_password stored in DB, not in queue for security)
|
||||
const queueMessage: ProvisionQueueMessage = {
|
||||
order_id: orderId,
|
||||
user_id: user.id,
|
||||
pricing_id,
|
||||
label: label || null,
|
||||
image: osImageKey,
|
||||
timestamp: new Date().toISOString(),
|
||||
};
|
||||
|
||||
await this.env.PROVISION_QUEUE.send(queueMessage);
|
||||
console.log(`[ProvisioningService] Order ${orderId} queued for provisioning`);
|
||||
|
||||
// Step 9: Return immediately with order info
|
||||
const order = await this.repo.getOrderById(orderId);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
order: order!,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Process provision queue message (called by Queue consumer)
|
||||
* 1. Get order and pricing details
|
||||
* 2. Call provider API
|
||||
* 3. On success: update order to active
|
||||
* 4. On failure: refund balance, update order to failed
|
||||
*/
|
||||
async processQueueMessage(message: ProvisionQueueMessage): Promise<void> {
|
||||
const { order_id, user_id, pricing_id, label, image } = message;
|
||||
console.log(`[ProvisioningService] Processing order ${order_id}`);
|
||||
|
||||
// Fetch order to get root_password (stored in DB for security)
|
||||
const order = await this.repo.getOrderById(order_id);
|
||||
if (!order || !order.root_password) {
|
||||
console.error(`[ProvisioningService] Order or root_password not found for order ${order_id}`);
|
||||
await this.repo.updateOrderStatus(order_id, 'failed', 'Order not found');
|
||||
return;
|
||||
}
|
||||
const root_password = order.root_password;
|
||||
|
||||
// Get pricing details
|
||||
const pricing = await this.repo.getPricingWithProvider(pricing_id);
|
||||
if (!pricing) {
|
||||
console.error(`[ProvisioningService] Pricing not found for order ${order_id}`);
|
||||
await this.repo.rollbackOrder(order_id, user_id, order.price_paid, 'Pricing not found');
|
||||
return;
|
||||
}
|
||||
|
||||
// Get provider
|
||||
const provider = this.getProvider(pricing.provider_name.toLowerCase() as VPSProvider);
|
||||
if (!provider) {
|
||||
console.error(`[ProvisioningService] Provider not configured for order ${order_id}`);
|
||||
await this.repo.rollbackOrder(order_id, user_id, order.price_paid, 'Provider not configured');
|
||||
return;
|
||||
}
|
||||
|
||||
// Get OS image ID
|
||||
const osImageId = provider.getOsImageId(image);
|
||||
|
||||
// Call provider API
|
||||
const createResult = await provider.createServer({
|
||||
plan: pricing.instance_id,
|
||||
region: pricing.region_code,
|
||||
osImage: osImageId,
|
||||
label: label || `order-${order_id}`,
|
||||
rootPassword: root_password,
|
||||
tags: [`user:${user_id}`, `order:${order_id}`],
|
||||
});
|
||||
|
||||
if (!createResult.success) {
|
||||
console.error(`[ProvisioningService] Provider API failed for order ${order_id}:`, createResult.error);
|
||||
await this.repo.rollbackOrder(
|
||||
order_id,
|
||||
user_id,
|
||||
order.price_paid,
|
||||
createResult.error?.message || 'Provider API error'
|
||||
);
|
||||
// Throw error to trigger retry
|
||||
throw new Error(createResult.error?.message || 'Provider API error');
|
||||
}
|
||||
|
||||
// Wait for IP assignment if not immediately available
|
||||
let ipv4 = createResult.ipv4;
|
||||
if (!ipv4 && createResult.instanceId) {
|
||||
console.log(`[ProvisioningService] Waiting for IP assignment for order ${order_id}...`);
|
||||
const readyResult = await this.waitForServerReady(provider, createResult.instanceId);
|
||||
ipv4 = readyResult.ipv4;
|
||||
if (!ipv4) {
|
||||
console.warn(`[ProvisioningService] IP not assigned within timeout for order ${order_id}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Success - update order with provider info
|
||||
await this.repo.updateOrderProviderInfo(
|
||||
order_id,
|
||||
createResult.instanceId!,
|
||||
ipv4 || null,
|
||||
root_password
|
||||
);
|
||||
|
||||
console.log(`[ProvisioningService] Order ${order_id} provisioned successfully: ${createResult.instanceId}, IP: ${ipv4 || 'pending'}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get provider instance by name
|
||||
*/
|
||||
private getProvider(providerName: VPSProvider): LinodeProvider | VultrProvider | null {
|
||||
switch (providerName) {
|
||||
case 'linode':
|
||||
return this.linodeProvider;
|
||||
case 'vultr':
|
||||
return this.vultrProvider;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for server to be ready with IP assigned
|
||||
* Polls getServerStatus every 5 seconds for up to 2 minutes
|
||||
*/
|
||||
private async waitForServerReady(
|
||||
provider: LinodeProvider | VultrProvider,
|
||||
instanceId: string,
|
||||
maxWaitMs: number = 120000
|
||||
): Promise<{ ready: boolean; ipv4?: string; ipv6?: string }> {
|
||||
const startTime = Date.now();
|
||||
const pollInterval = 5000;
|
||||
|
||||
while (Date.now() - startTime < maxWaitMs) {
|
||||
const status = await provider.getServerStatus(instanceId);
|
||||
|
||||
if (status.ipv4) {
|
||||
return { ready: true, ipv4: status.ipv4, ipv6: status.ipv6 };
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, pollInterval));
|
||||
}
|
||||
|
||||
return { ready: false };
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate cryptographically secure password
|
||||
*/
|
||||
private generateSecurePassword(length: number = 32): string {
|
||||
const chars = 'ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnpqrstuvwxyz23456789!@#$%^&*';
|
||||
const array = new Uint8Array(length);
|
||||
crypto.getRandomValues(array);
|
||||
let password = '';
|
||||
for (let i = 0; i < length; i++) {
|
||||
password += chars[array[i] % chars.length];
|
||||
}
|
||||
return password;
|
||||
}
|
||||
|
||||
// ============================================
|
||||
// Additional Operations
|
||||
// ============================================
|
||||
|
||||
/**
|
||||
* Get user's server orders by telegram_id
|
||||
*/
|
||||
async getUserOrders(telegramId: string, limit: number = 20): Promise<ServerOrder[]> {
|
||||
const user = await this.repo.getUserByTelegramId(telegramId);
|
||||
if (!user) return [];
|
||||
return this.repo.getOrdersByUserId(user.id, limit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get order by ID
|
||||
*/
|
||||
async getOrder(orderId: string): Promise<ServerOrder | null> {
|
||||
const numericId = parseInt(orderId, 10);
|
||||
if (isNaN(numericId)) return null;
|
||||
return this.repo.getOrderById(numericId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get user balance by telegram_id (in KRW)
|
||||
*/
|
||||
async getUserBalance(telegramId: string): Promise<{ balance_krw: number; user_id: number } | null> {
|
||||
const user = await this.repo.getUserByTelegramId(telegramId);
|
||||
if (!user) return null;
|
||||
const balance = await this.repo.getBalance(user.id);
|
||||
return { balance_krw: balance, user_id: user.id };
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a server (terminate) - requires telegram_id for authorization
|
||||
*/
|
||||
async deleteServer(orderId: string, telegramId: string): Promise<{ success: boolean; error?: string }> {
|
||||
const user = await this.repo.getUserByTelegramId(telegramId);
|
||||
if (!user) {
|
||||
return { success: false, error: 'User not found' };
|
||||
}
|
||||
|
||||
const numericOrderId = parseInt(orderId, 10);
|
||||
if (isNaN(numericOrderId)) {
|
||||
return { success: false, error: 'Invalid order ID' };
|
||||
}
|
||||
|
||||
const order = await this.repo.getOrderById(numericOrderId);
|
||||
|
||||
if (!order) {
|
||||
return { success: false, error: 'Order not found' };
|
||||
}
|
||||
|
||||
if (order.user_id !== user.id) {
|
||||
return { success: false, error: 'Unauthorized' };
|
||||
}
|
||||
|
||||
if (order.status !== 'active') {
|
||||
return { success: false, error: 'Server is not active' };
|
||||
}
|
||||
|
||||
if (!order.provider_instance_id) {
|
||||
return { success: false, error: 'No provider instance ID' };
|
||||
}
|
||||
|
||||
// Get provider from pricing info
|
||||
const pricing = await this.repo.getPricingWithProvider(order.spec_id);
|
||||
if (!pricing) {
|
||||
return { success: false, error: 'Pricing info not found' };
|
||||
}
|
||||
|
||||
const provider = this.getProvider(pricing.provider_name.toLowerCase() as VPSProvider);
|
||||
if (!provider) {
|
||||
return { success: false, error: 'Provider not configured' };
|
||||
}
|
||||
|
||||
const deleteResult = await provider.deleteServer(order.provider_instance_id);
|
||||
|
||||
if (!deleteResult.success) {
|
||||
return { success: false, error: deleteResult.error };
|
||||
}
|
||||
|
||||
await this.repo.updateOrderStatus(numericOrderId, 'terminated');
|
||||
|
||||
return { success: true };
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user