/** * Message Handler - 텔레그램 메시지 처리의 핵심 모듈 * * 메시지 수신 -> 사용자 등록/확인 -> 에이전트 라우팅 -> AI 응답 -> 피드백 수집 */ import { sendMessage, sendMessageWithKeyboard, sendChatAction } from '../../telegram'; import { checkRateLimit } from '../../security'; import { registerAgent, routeToActiveAgent, type RegisterableAgent } from '../../agents/agent-registry'; import { OnboardingAgent } from '../../agents/onboarding-agent'; import { TroubleshootAgent } from '../../agents/troubleshoot-agent'; import { AssetAgent } from '../../agents/asset-agent'; import { BillingAgent } from '../../agents/billing-agent'; import { getMessage } from '../../i18n'; import { selectToolsForMessage, executeTool } from '../../tools'; import { createLogger } from '../../utils/logger'; import { getOpenAIUrl } from '../../utils/api-urls'; import { AI_CONFIG } from '../../constants/agent-config'; import { escalateToAdmin, shouldEscalate } from '../../services/human-handoff'; import type { Env, TelegramUpdate, User, OpenAIAPIResponse, OpenAIMessage } from '../../types'; const logger = createLogger('message-handler'); // Register agent singletons with priorities (lower = checked first) const onboardingAgent = new OnboardingAgent(); const troubleshootAgent = new TroubleshootAgent(); const assetAgent = new AssetAgent(); const billingAgent = new BillingAgent(); registerAgent('onboarding', onboardingAgent, 10); registerAgent('troubleshoot', troubleshootAgent, 20); registerAgent('asset', assetAgent, 30); registerAgent('billing', billingAgent, 40); export async function handleMessage( env: Env, update: TelegramUpdate ): Promise { if (!update.message?.text) return; const message = update.message; const chatId = message.chat.id; const text = message.text!; const telegramUserId = message.from.id.toString(); const lang = message.from.language_code ?? 'ko'; const requestId = crypto.randomUUID(); // 1. Check if user is blocked & register/update user const user = await getOrCreateUser(env.DB, telegramUserId, message.from.first_name ?? '', message.from.username); if (!user) { await sendMessage(env.BOT_TOKEN, chatId, getMessage(lang, 'error_general')); return; } if (user.is_blocked) { await sendMessage(env.BOT_TOKEN, chatId, getMessage(lang, 'error_blocked')); return; } // 2. Rate limiting if (!(await checkRateLimit(env.RATE_LIMIT_KV, telegramUserId))) { await sendMessage(env.BOT_TOKEN, chatId, getMessage(lang, 'error_rate_limit')); return; } // 3. Send typing action await sendChatAction(env.BOT_TOKEN, chatId).catch(() => {}); try { // 4. Route to active agent session first const meta = { chatId, messageId: message.message_id }; const agentResult = await routeToActiveAgent(env.DB, telegramUserId, text, env, meta); if (agentResult) { const { cleanText, sessionEnded } = cleanSessionMarkers(agentResult.response); // Handle escalation marker if (agentResult.response.includes('__ESCALATE__')) { const cleaned = cleanText.replace('__ESCALATE__', '').trim(); await storeConversation(env.DB, user.id, text, cleaned, requestId); await sendMessage(env.BOT_TOKEN, chatId, cleaned); await escalateToAdmin(env, telegramUserId, text, 'agent'); return; } await storeConversation(env.DB, user.id, text, cleanText, requestId); await sendMessage(env.BOT_TOKEN, chatId, cleanText); // Prompt feedback only after multi-round agents const isMultiRound = agentResult.agentName === 'troubleshoot' || agentResult.agentName === 'onboarding'; if (sessionEnded && isMultiRound) { await promptFeedback(env, chatId, lang, agentResult.agentName); } return; } // 5. AI-based intent classification → agent routing const intent = await classifyIntent(env, text); logger.info('의도 분류 결과', { intent, telegramUserId }); const agentMap: Record = { onboarding: { agent: onboardingAgent, type: 'onboarding' }, troubleshoot: { agent: troubleshootAgent, type: 'troubleshoot' }, billing: { agent: billingAgent, type: 'billing' }, asset: { agent: assetAgent, type: 'asset' }, }; if (intent && intent in agentMap) { const { agent, type: sessionType } = agentMap[intent]; const response = await agent.processConsultation(env.DB, telegramUserId, text, env, meta); const { cleanText, sessionEnded } = cleanSessionMarkers(response); await storeConversation(env.DB, user.id, text, cleanText, requestId); await sendMessage(env.BOT_TOKEN, chatId, cleanText); const isMultiRound = sessionType === 'troubleshoot' || sessionType === 'onboarding'; if (sessionEnded && isMultiRound) { await promptFeedback(env, chatId, lang, sessionType); } return; } // 6. General AI fallback (intent = 'general' or classification failed) const aiResponse = await handleGeneralAI(env, user, text, telegramUserId); await storeConversation(env.DB, user.id, text, aiResponse, requestId); await sendMessage(env.BOT_TOKEN, chatId, aiResponse); // 7. Check frustration for potential escalation if (shouldEscalate(text, 0)) { logger.warn('Frustration detected in general AI flow', { telegramUserId }); } } catch (error) { logger.error('Message handling error', error as Error, { telegramUserId, requestId }); await sendMessage(env.BOT_TOKEN, chatId, getMessage(lang, 'error_general')); } } function cleanSessionMarkers(text: string): { cleanText: string; sessionEnded: boolean } { const sessionEnded = text.includes('[세션 종료]') || text.includes('__SESSION_END__'); const cleanText = text .replace('[세션 종료]', '') .replace('__SESSION_END__', '') .trim(); return { cleanText, sessionEnded }; } async function getOrCreateUser( db: D1Database, telegramId: string, firstName: string, username?: string ): Promise<(User & { id: number }) | null> { try { await db .prepare( `INSERT INTO users (telegram_id, username, first_name, last_active_at) VALUES (?, ?, ?, CURRENT_TIMESTAMP) ON CONFLICT (telegram_id) DO UPDATE SET username = COALESCE(excluded.username, users.username), first_name = COALESCE(excluded.first_name, users.first_name), last_active_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP` ) .bind(telegramId, username ?? null, firstName) .run(); const user = await db .prepare('SELECT * FROM users WHERE telegram_id = ?') .bind(telegramId) .first(); return user ?? null; } catch (error) { logger.error('User upsert failed', error as Error, { telegramId }); return null; } } const VALID_INTENTS = ['troubleshoot', 'onboarding', 'billing', 'asset', 'general'] as const; async function classifyIntent(env: Env, text: string): Promise { if (!env.OPENAI_API_KEY) return null; try { const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), 5000); try { const response = await fetch(getOpenAIUrl(env), { method: 'POST', headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${env.OPENAI_API_KEY}`, }, signal: controller.signal, body: JSON.stringify({ model: AI_CONFIG.model, messages: [ { role: 'system', content: `사용자 메시지의 의도를 분류하세요. 반드시 아래 중 하나만 응답하세요: - troubleshoot: 기술 문제, 접속 불가, 오류, 장애, 느림, 차단, 네트워크 문제, 도메인/서버/서비스 문제 해결 - onboarding: 신규 가입, 서비스 소개, 요금/플랜 문의, 처음 이용 - billing: 입금, 충전, 잔액, 결제, 환불, 요금 관련 - asset: 자산 현황, 내 서버/도메인 목록, 보유 서비스 조회 - general: 위 어느 것에도 해당하지 않는 일반 질문이나 인사 한 단어만 응답하세요.`, }, { role: 'user', content: text }, ], max_tokens: 10, temperature: 0, }), }); if (!response.ok) return null; const data = (await response.json()) as OpenAIAPIResponse; const raw = data.choices[0]?.message?.content?.trim().toLowerCase(); if (raw && VALID_INTENTS.includes(raw as typeof VALID_INTENTS[number])) { return raw === 'general' ? null : raw; } return null; } finally { clearTimeout(timeoutId); } } catch (error) { logger.error('Intent classification failed', error as Error); return null; } } async function handleGeneralAI( env: Env, user: User, userMessage: string, telegramUserId: string ): Promise { if (!env.OPENAI_API_KEY) { return await handleWorkersAIFallback(env, userMessage); } try { // Load recent conversation context const history = await env.DB .prepare( `SELECT role, content FROM conversations WHERE user_id = ? ORDER BY created_at DESC LIMIT ?` ) .bind(user.id, user.context_limit) .all<{ role: string; content: string }>(); const conversationHistory = history.results.reverse(); const tools = selectToolsForMessage(userMessage); const messages: OpenAIMessage[] = [ { role: 'system', content: `당신은 클라우드 호스팅/도메인/서버 관리 서비스의 AI 고객 지원 상담사입니다. 한국어로 친절하고 전문적으로 답변하세요. 기술 용어는 쉽게 풀어 설명하세요. 답변을 모르면 솔직히 모른다고 하고, 관리자 연결을 제안하세요.`, }, ...conversationHistory.map((m) => ({ role: m.role as 'user' | 'assistant', content: m.content, })), { role: 'user', content: userMessage }, ]; const body: Record = { model: AI_CONFIG.model, messages, max_tokens: 1024, temperature: AI_CONFIG.defaultTemperature, }; if (tools.length > 0) { body.tools = tools; body.tool_choice = 'auto'; } const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), 25000); try { const response = await fetch(getOpenAIUrl(env), { method: 'POST', headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${env.OPENAI_API_KEY}`, }, signal: controller.signal, body: JSON.stringify(body), }); if (!response.ok) { logger.error('OpenAI API error', new Error(`HTTP ${response.status}`)); return await handleWorkersAIFallback(env, userMessage); } const data = (await response.json()) as OpenAIAPIResponse; const assistantMessage = data.choices[0].message; // Handle tool calls (single round for general AI) if (assistantMessage.tool_calls && assistantMessage.tool_calls.length > 0) { // Add assistant message with tool calls messages.push({ role: 'assistant', content: assistantMessage.content, tool_calls: assistantMessage.tool_calls, }); for (const tc of assistantMessage.tool_calls) { let args: Record; try { args = JSON.parse(tc.function.arguments) as Record; } catch { continue; } const result = await executeTool(tc.function.name, args, env, telegramUserId, env.DB); messages.push({ role: 'tool', content: result, tool_call_id: tc.id, name: tc.function.name, }); } // Second call to synthesize tool results const followUp = await fetch(getOpenAIUrl(env), { method: 'POST', headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${env.OPENAI_API_KEY}`, }, body: JSON.stringify({ model: AI_CONFIG.model, messages, max_tokens: 1024, temperature: AI_CONFIG.defaultTemperature, }), }); if (followUp.ok) { const followUpData = (await followUp.json()) as OpenAIAPIResponse; return followUpData.choices[0].message.content ?? getMessage('ko', 'error_ai_unavailable'); } // Fallback: return AI text return assistantMessage.content ?? getMessage('ko', 'error_ai_unavailable'); } return assistantMessage.content ?? getMessage('ko', 'error_ai_unavailable'); } finally { clearTimeout(timeoutId); } } catch (error) { logger.error('General AI error', error as Error); return await handleWorkersAIFallback(env, userMessage); } } async function handleWorkersAIFallback(env: Env, userMessage: string): Promise { try { const result = await env.AI.run('@cf/meta/llama-3.1-8b-instruct-fp8', { messages: [ { role: 'system', content: '당신은 클라우드 호스팅 서비스의 AI 고객 지원 상담사입니다. 한국어로 친절하게 답변하세요.', }, { role: 'user', content: userMessage }, ], }); if (result && typeof result === 'object' && 'response' in result) { return (result as { response: string }).response; } return getMessage('ko', 'error_ai_unavailable'); } catch (error) { logger.error('Workers AI fallback error', error as Error); return getMessage('ko', 'error_ai_unavailable'); } } async function storeConversation( db: D1Database, userId: number, userMessage: string, assistantResponse: string, requestId: string ): Promise { try { await db.batch([ db.prepare( `INSERT INTO conversations (user_id, role, content, request_id) VALUES (?, 'user', ?, ?)` ).bind(userId, userMessage, requestId), db.prepare( `INSERT INTO conversations (user_id, role, content, request_id) VALUES (?, 'assistant', ?, ?)` ).bind(userId, assistantResponse, requestId), ]); } catch (error) { logger.error('Conversation storage failed', error as Error, { userId, requestId }); } } async function promptFeedback( env: Env, chatId: number, lang: string, sessionType: string ): Promise { try { const text = getMessage(lang, 'feedback_prompt'); const keyboard = [ [1, 2, 3, 4, 5].map((rating) => ({ text: '⭐'.repeat(rating), callback_data: `fb:${sessionType}:${rating}`, })), ]; await sendMessageWithKeyboard(env.BOT_TOKEN, chatId, text, keyboard); } catch (error) { logger.error('Failed to send feedback prompt', error as Error); } }