Initial implementation of Telegram AI customer support bot

Cloudflare Workers + Hono + D1 + KV + R2 stack with 4 specialized AI agents
(onboarding, troubleshoot, asset, billing), OpenAI function calling with
7 tool definitions, human escalation, pending action approval workflow,
feedback collection, audit logging, i18n (ko/en), and Workers AI fallback.

43 source files, 45 tests passing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
kappa
2026-02-11 13:21:38 +09:00
commit 1d6b64c9e4
58 changed files with 12857 additions and 0 deletions

174
src/routes/api.ts Normal file
View File

@@ -0,0 +1,174 @@
import { Hono } from 'hono';
import type { Env, User, Transaction } from '../types';
import { timingSafeEqual } from '../security';
import { getPendingActions } from '../services/pending-actions';
import { sendMessage } from '../telegram';
import { createLogger } from '../utils/logger';
const logger = createLogger('api');
const api = new Hono<{ Bindings: Env }>();
// Admin API auth middleware
api.use('*', async (c, next) => {
// Support both Bearer token and query param auth
const authHeader = c.req.header('Authorization');
const queryToken = c.req.query('token');
let token: string | undefined;
if (authHeader?.startsWith('Bearer ')) {
token = authHeader.slice(7);
} else if (queryToken) {
token = queryToken;
}
if (!token || !timingSafeEqual(token, c.env.WEBHOOK_SECRET)) {
return c.json({ error: 'Unauthorized' }, 401);
}
return next();
});
// GET /api/stats - Service statistics
api.get('/stats', async (c) => {
try {
const db = c.env.DB;
const [users, txPending, servers, feedback] = await Promise.all([
db.prepare('SELECT COUNT(*) as count FROM users').first<{ count: number }>(),
db.prepare("SELECT COUNT(*) as count FROM transactions WHERE status = 'pending'").first<{ count: number }>(),
db.prepare("SELECT COUNT(*) as count FROM servers WHERE status != 'terminated'").first<{ count: number }>(),
db.prepare('SELECT AVG(rating) as avg, COUNT(*) as count FROM feedback').first<{ avg: number | null; count: number }>(),
]);
return c.json({
users: users?.count ?? 0,
pendingTransactions: txPending?.count ?? 0,
activeServers: servers?.count ?? 0,
feedback: {
avgRating: feedback?.avg ? Number(feedback.avg.toFixed(2)) : 0,
count: feedback?.count ?? 0,
},
});
} catch (error) {
logger.error('Stats query failed', error as Error);
return c.json({ error: 'Internal error' }, 500);
}
});
// GET /api/users - List users (paginated)
api.get('/users', async (c) => {
try {
const limit = Math.min(parseInt(c.req.query('limit') ?? '20'), 100);
const offset = parseInt(c.req.query('offset') ?? '0');
const result = await c.env.DB.prepare(
`SELECT id, telegram_id, username, first_name, role, is_blocked, last_active_at, created_at
FROM users ORDER BY created_at DESC LIMIT ? OFFSET ?`
)
.bind(limit, offset)
.all<Pick<User, 'id' | 'telegram_id' | 'username' | 'first_name' | 'role' | 'is_blocked' | 'last_active_at' | 'created_at'>>();
return c.json({ users: result.results, count: result.results.length });
} catch (error) {
logger.error('Users query failed', error as Error);
return c.json({ error: 'Internal error' }, 500);
}
});
// GET /api/transactions/pending - Pending transactions
api.get('/transactions/pending', async (c) => {
try {
const result = await c.env.DB.prepare(
`SELECT t.id, t.user_id, t.amount, t.depositor_name, t.status, t.created_at,
u.username, u.telegram_id
FROM transactions t
JOIN users u ON t.user_id = u.id
WHERE t.status = 'pending' AND t.type = 'deposit'
ORDER BY t.created_at DESC LIMIT 50`
)
.all<Pick<Transaction, 'id' | 'user_id' | 'amount' | 'depositor_name' | 'status' | 'created_at'> & {
username: string | null; telegram_id: string;
}>();
return c.json({ transactions: result.results });
} catch (error) {
logger.error('Pending transactions query failed', error as Error);
return c.json({ error: 'Internal error' }, 500);
}
});
// GET /api/audit-logs - Audit logs
api.get('/audit-logs', async (c) => {
try {
const limit = Math.min(parseInt(c.req.query('limit') ?? '50'), 200);
const result = await c.env.DB.prepare(
`SELECT id, actor_id, action, resource_type, resource_id, result, created_at
FROM audit_logs ORDER BY created_at DESC LIMIT ?`
)
.bind(limit)
.all();
return c.json({ logs: result.results });
} catch (error) {
logger.error('Audit logs query failed', error as Error);
return c.json({ error: 'Internal error' }, 500);
}
});
// GET /api/pending-actions - Pending actions list
api.get('/pending-actions', async (c) => {
try {
const status = c.req.query('status');
const validStatuses = ['pending', 'approved', 'rejected', 'executed', 'failed'] as const;
const filterStatus = status && validStatuses.includes(status as typeof validStatuses[number])
? status as typeof validStatuses[number]
: undefined;
const actions = await getPendingActions(c.env.DB, filterStatus);
return c.json({ actions, count: actions.length });
} catch (error) {
logger.error('Pending actions query failed', error as Error);
return c.json({ error: 'Internal error' }, 500);
}
});
// POST /api/broadcast - Send message to all users
api.post('/broadcast', async (c) => {
try {
const body = await c.req.json<{ message?: string }>();
if (!body.message || body.message.trim().length === 0) {
return c.json({ error: 'Message is required' }, 400);
}
if (body.message.length > 4000) {
return c.json({ error: 'Message too long (max 4000 chars)' }, 400);
}
const users = await c.env.DB
.prepare('SELECT telegram_id FROM users WHERE is_blocked = 0')
.all<{ telegram_id: string }>();
const telegramIds = users.results ?? [];
let sent = 0;
let failed = 0;
for (const user of telegramIds) {
try {
await sendMessage(c.env.BOT_TOKEN, parseInt(user.telegram_id), body.message);
sent++;
} catch {
failed++;
}
}
logger.info('Broadcast completed', { sent, failed, total: telegramIds.length });
return c.json({ sent, failed, total: telegramIds.length });
} catch (error) {
logger.error('Broadcast failed', error as Error);
return c.json({ error: 'Internal error' }, 500);
}
});
export { api as apiRouter };

View File

@@ -0,0 +1,242 @@
import {
answerCallbackQuery,
editMessageText,
sendMessage,
} from '../../telegram';
import { approvePendingAction, rejectPendingAction } from '../../services/pending-actions';
import { createFeedback } from '../../services/feedback';
import { createAuditLog } from '../../services/audit';
import { isAdmin } from '../../security';
import { createLogger } from '../../utils/logger';
import type { Env, CallbackQuery } from '../../types';
const logger = createLogger('callback-handler');
export async function handleCallbackQuery(
env: Env,
callbackQuery: CallbackQuery
): Promise<void> {
const { id: queryId, from, message, data } = callbackQuery;
if (!data || !message) {
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '잘못된 요청입니다.' });
return;
}
const chatId = message.chat.id;
const messageId = message.message_id;
const telegramUserId = from.id.toString();
try {
// Feedback: fb:{session_type}:{rating}
if (data.startsWith('fb:')) {
await handleFeedback(env, queryId, chatId, messageId, telegramUserId, data);
return;
}
// Action approval: act:{action_id}:{approve|reject}
if (data.startsWith('act:')) {
await handleActionApproval(env, queryId, chatId, messageId, telegramUserId, data);
return;
}
// Escalation: esc:{session_id}:{accept|reject}
if (data.startsWith('esc:')) {
await handleEscalation(env, queryId, chatId, messageId, telegramUserId, data);
return;
}
await answerCallbackQuery(env.BOT_TOKEN, queryId);
} catch (error) {
logger.error('Callback handling error', error as Error, { data, telegramUserId });
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '처리 중 오류가 발생했습니다.' });
}
}
async function handleFeedback(
env: Env,
queryId: string,
chatId: number,
messageId: number,
telegramUserId: string,
data: string
): Promise<void> {
const parts = data.split(':');
if (parts.length !== 3) {
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '잘못된 데이터입니다.' });
return;
}
const sessionType = parts[1];
const rating = parseInt(parts[2], 10);
if (isNaN(rating) || rating < 1 || rating > 5) {
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '잘못된 평점입니다.' });
return;
}
const user = await env.DB
.prepare('SELECT id FROM users WHERE telegram_id = ?')
.bind(telegramUserId)
.first<{ id: number }>();
if (!user) {
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '사용자를 찾을 수 없습니다.' });
return;
}
await createFeedback(env.DB, {
userId: user.id,
sessionType,
rating,
});
const stars = '⭐'.repeat(rating);
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '피드백 감사합니다!' });
await editMessageText(env.BOT_TOKEN, chatId, messageId, `피드백이 등록되었습니다. ${stars}\n감사합니다!`);
}
async function handleActionApproval(
env: Env,
queryId: string,
chatId: number,
messageId: number,
telegramUserId: string,
data: string
): Promise<void> {
// Only admins can approve/reject actions
if (!isAdmin(telegramUserId, env.ADMIN_TELEGRAM_IDS)) {
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '관리자만 사용할 수 있습니다.' });
return;
}
const parts = data.split(':');
if (parts.length !== 3) {
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '잘못된 데이터입니다.' });
return;
}
const actionId = parseInt(parts[1], 10);
const approve = parts[2] === 'approve';
if (isNaN(actionId)) {
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '잘못된 작업 ID입니다.' });
return;
}
const admin = await env.DB
.prepare('SELECT id FROM users WHERE telegram_id = ?')
.bind(telegramUserId)
.first<{ id: number }>();
if (!admin) {
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '관리자 정보를 찾을 수 없습니다.' });
return;
}
if (approve) {
const result = await approvePendingAction(env.DB, actionId, admin.id);
if (!result) {
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '이미 처리된 요청입니다.' });
return;
}
await createAuditLog(env.DB, {
actorId: admin.id,
action: 'approve_action',
resourceType: 'pending_action',
resourceId: String(actionId),
result: 'success',
});
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '승인되었습니다.' });
await editMessageText(
env.BOT_TOKEN, chatId, messageId,
`✅ 작업 #${actionId} 승인 완료\n유형: ${result.action_type}\n대상: ${result.target}`
);
// Notify user about the approval
if (result.user_id) {
const actionUser = await env.DB
.prepare('SELECT telegram_id FROM users WHERE id = ?')
.bind(result.user_id)
.first<{ telegram_id: string }>();
if (actionUser) {
await sendMessage(env.BOT_TOKEN, parseInt(actionUser.telegram_id), '요청이 승인되어 실행되었습니다.').catch(() => {});
}
}
} else {
const result = await rejectPendingAction(env.DB, actionId, admin.id);
if (!result) {
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '이미 처리된 요청입니다.' });
return;
}
await createAuditLog(env.DB, {
actorId: admin.id,
action: 'reject_action',
resourceType: 'pending_action',
resourceId: String(actionId),
result: 'success',
});
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '거부되었습니다.' });
await editMessageText(
env.BOT_TOKEN, chatId, messageId,
`❌ 작업 #${actionId} 거부\n유형: ${result.action_type}\n대상: ${result.target}`
);
// Notify user about the rejection
if (result.user_id) {
const actionUser = await env.DB
.prepare('SELECT telegram_id FROM users WHERE id = ?')
.bind(result.user_id)
.first<{ telegram_id: string }>();
if (actionUser) {
await sendMessage(env.BOT_TOKEN, parseInt(actionUser.telegram_id), '요청이 거부되었습니다.').catch(() => {});
}
}
}
}
async function handleEscalation(
env: Env,
queryId: string,
chatId: number,
messageId: number,
telegramUserId: string,
data: string
): Promise<void> {
if (!isAdmin(telegramUserId, env.ADMIN_TELEGRAM_IDS)) {
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '관리자만 사용할 수 있습니다.' });
return;
}
const parts = data.split(':');
if (parts.length !== 3) {
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '잘못된 데이터입니다.' });
return;
}
const sessionId = parts[1];
const action = parts[2];
if (action === 'accept') {
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '에스컬레이션 수락' });
await editMessageText(
env.BOT_TOKEN, chatId, messageId,
`✅ 에스컬레이션 수락됨\n세션: ${sessionId}\n담당: 관리자`
);
// Notify the user their escalation was accepted
await sendMessage(
env.BOT_TOKEN,
parseInt(sessionId),
'관리자가 문의를 확인했습니다. 잠시만 기다려주세요.'
).catch(() => {});
} else {
await answerCallbackQuery(env.BOT_TOKEN, queryId, { text: '에스컬레이션 거부' });
await editMessageText(
env.BOT_TOKEN, chatId, messageId,
`❌ 에스컬레이션 거부됨\n세션: ${sessionId}`
);
}
}

View File

@@ -0,0 +1,371 @@
/**
* Message Handler - 텔레그램 메시지 처리의 핵심 모듈
*
* 메시지 수신 -> 사용자 등록/확인 -> 에이전트 라우팅 -> AI 응답 -> 피드백 수집
*/
import { sendMessage, sendMessageWithKeyboard, sendChatAction } from '../../telegram';
import { checkRateLimit } from '../../security';
import { registerAgent, routeToActiveAgent } from '../../agents/agent-registry';
import { OnboardingAgent } from '../../agents/onboarding-agent';
import { TroubleshootAgent } from '../../agents/troubleshoot-agent';
import { AssetAgent } from '../../agents/asset-agent';
import { BillingAgent } from '../../agents/billing-agent';
import { getMessage } from '../../i18n';
import {
ONBOARDING_PATTERNS,
TROUBLESHOOT_PATTERNS,
ASSET_PATTERNS,
BILLING_PATTERNS,
} from '../../utils/patterns';
import { selectToolsForMessage, executeTool } from '../../tools';
import { createLogger } from '../../utils/logger';
import { getOpenAIUrl } from '../../utils/api-urls';
import { AI_CONFIG } from '../../constants/agent-config';
import { escalateToAdmin, shouldEscalate } from '../../services/human-handoff';
import type { Env, TelegramUpdate, User, OpenAIAPIResponse, OpenAIMessage } from '../../types';
const logger = createLogger('message-handler');
// Register agent singletons with priorities (lower = checked first)
const onboardingAgent = new OnboardingAgent();
const troubleshootAgent = new TroubleshootAgent();
const assetAgent = new AssetAgent();
const billingAgent = new BillingAgent();
registerAgent('onboarding', onboardingAgent, 10);
registerAgent('troubleshoot', troubleshootAgent, 20);
registerAgent('asset', assetAgent, 30);
registerAgent('billing', billingAgent, 40);
export async function handleMessage(
env: Env,
update: TelegramUpdate
): Promise<void> {
if (!update.message?.text) return;
const message = update.message;
const chatId = message.chat.id;
const text = message.text!;
const telegramUserId = message.from.id.toString();
const lang = message.from.language_code ?? 'ko';
const requestId = crypto.randomUUID();
// 1. Check if user is blocked & register/update user
const user = await getOrCreateUser(env.DB, telegramUserId, message.from.first_name ?? '', message.from.username);
if (!user) {
await sendMessage(env.BOT_TOKEN, chatId, getMessage(lang, 'error_general'));
return;
}
if (user.is_blocked) {
await sendMessage(env.BOT_TOKEN, chatId, getMessage(lang, 'error_blocked'));
return;
}
// 2. Rate limiting
if (!(await checkRateLimit(env.RATE_LIMIT_KV, telegramUserId))) {
await sendMessage(env.BOT_TOKEN, chatId, getMessage(lang, 'error_rate_limit'));
return;
}
// 3. Send typing action
await sendChatAction(env.BOT_TOKEN, chatId).catch(() => {});
try {
// 4. Route to active agent session first
const agentResponse = await routeToActiveAgent(env.DB, telegramUserId, text, env);
if (agentResponse) {
const { cleanText, sessionEnded } = cleanSessionMarkers(agentResponse);
// Handle escalation marker
if (agentResponse.includes('__ESCALATE__')) {
const cleaned = cleanText.replace('__ESCALATE__', '').trim();
await storeConversation(env.DB, user.id, text, cleaned, requestId);
await sendMessage(env.BOT_TOKEN, chatId, cleaned);
await escalateToAdmin(env, telegramUserId, text, 'agent');
return;
}
await storeConversation(env.DB, user.id, text, cleanText, requestId);
await sendMessage(env.BOT_TOKEN, chatId, cleanText);
// Prompt feedback after session end
if (sessionEnded) {
await promptFeedback(env, chatId, lang, 'agent');
}
return;
}
// 5. Detect intent for new session creation
let response: string | null = null;
if (ONBOARDING_PATTERNS.test(text)) {
response = await onboardingAgent.processConsultation(env.DB, telegramUserId, text, env);
} else if (TROUBLESHOOT_PATTERNS.test(text)) {
response = await troubleshootAgent.processConsultation(env.DB, telegramUserId, text, env);
} else if (BILLING_PATTERNS.test(text)) {
response = await billingAgent.processConsultation(env.DB, telegramUserId, text, env);
} else if (ASSET_PATTERNS.test(text)) {
response = await assetAgent.processConsultation(env.DB, telegramUserId, text, env);
}
if (response) {
const { cleanText, sessionEnded } = cleanSessionMarkers(response);
await storeConversation(env.DB, user.id, text, cleanText, requestId);
await sendMessage(env.BOT_TOKEN, chatId, cleanText);
if (sessionEnded) {
await promptFeedback(env, chatId, lang, 'agent');
}
return;
}
// 6. General AI fallback (no matching agent)
const aiResponse = await handleGeneralAI(env, user, text, telegramUserId);
await storeConversation(env.DB, user.id, text, aiResponse, requestId);
await sendMessage(env.BOT_TOKEN, chatId, aiResponse);
// 7. Check frustration for potential escalation
if (shouldEscalate(text, 0)) {
logger.warn('Frustration detected in general AI flow', { telegramUserId });
}
} catch (error) {
logger.error('Message handling error', error as Error, { telegramUserId, requestId });
await sendMessage(env.BOT_TOKEN, chatId, getMessage(lang, 'error_general'));
}
}
function cleanSessionMarkers(text: string): { cleanText: string; sessionEnded: boolean } {
const sessionEnded = text.includes('[세션 종료]') || text.includes('__SESSION_END__');
const cleanText = text
.replace('[세션 종료]', '')
.replace('__SESSION_END__', '')
.trim();
return { cleanText, sessionEnded };
}
async function getOrCreateUser(
db: D1Database,
telegramId: string,
firstName: string,
username?: string
): Promise<(User & { id: number }) | null> {
try {
await db
.prepare(
`INSERT INTO users (telegram_id, username, first_name, last_active_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT (telegram_id) DO UPDATE SET
username = COALESCE(excluded.username, users.username),
first_name = COALESCE(excluded.first_name, users.first_name),
last_active_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP`
)
.bind(telegramId, username ?? null, firstName)
.run();
const user = await db
.prepare('SELECT * FROM users WHERE telegram_id = ?')
.bind(telegramId)
.first<User>();
return user ?? null;
} catch (error) {
logger.error('User upsert failed', error as Error, { telegramId });
return null;
}
}
async function handleGeneralAI(
env: Env,
user: User,
userMessage: string,
telegramUserId: string
): Promise<string> {
if (!env.OPENAI_API_KEY) {
return await handleWorkersAIFallback(env, userMessage);
}
try {
// Load recent conversation context
const history = await env.DB
.prepare(
`SELECT role, content FROM conversations
WHERE user_id = ? ORDER BY created_at DESC LIMIT ?`
)
.bind(user.id, user.context_limit)
.all<{ role: string; content: string }>();
const conversationHistory = history.results.reverse();
const tools = selectToolsForMessage(userMessage);
const messages: OpenAIMessage[] = [
{
role: 'system',
content: `당신은 클라우드 호스팅/도메인/서버 관리 서비스의 AI 고객 지원 상담사입니다.
한국어로 친절하고 전문적으로 답변하세요.
기술 용어는 쉽게 풀어 설명하세요.
답변을 모르면 솔직히 모른다고 하고, 관리자 연결을 제안하세요.`,
},
...conversationHistory.map((m) => ({
role: m.role as 'user' | 'assistant',
content: m.content,
})),
{ role: 'user', content: userMessage },
];
const body: Record<string, unknown> = {
model: AI_CONFIG.model,
messages,
max_tokens: 1024,
temperature: AI_CONFIG.defaultTemperature,
};
if (tools.length > 0) {
body.tools = tools;
body.tool_choice = 'auto';
}
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 25000);
try {
const response = await fetch(getOpenAIUrl(env), {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${env.OPENAI_API_KEY}`,
},
signal: controller.signal,
body: JSON.stringify(body),
});
if (!response.ok) {
logger.error('OpenAI API error', new Error(`HTTP ${response.status}`));
return await handleWorkersAIFallback(env, userMessage);
}
const data = (await response.json()) as OpenAIAPIResponse;
const assistantMessage = data.choices[0].message;
// Handle tool calls (single round for general AI)
if (assistantMessage.tool_calls && assistantMessage.tool_calls.length > 0) {
// Add assistant message with tool calls
messages.push({
role: 'assistant',
content: assistantMessage.content,
tool_calls: assistantMessage.tool_calls,
});
for (const tc of assistantMessage.tool_calls) {
let args: Record<string, unknown>;
try {
args = JSON.parse(tc.function.arguments) as Record<string, unknown>;
} catch {
continue;
}
const result = await executeTool(tc.function.name, args, env, telegramUserId, env.DB);
messages.push({
role: 'tool',
content: result,
tool_call_id: tc.id,
name: tc.function.name,
});
}
// Second call to synthesize tool results
const followUp = await fetch(getOpenAIUrl(env), {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${env.OPENAI_API_KEY}`,
},
body: JSON.stringify({
model: AI_CONFIG.model,
messages,
max_tokens: 1024,
temperature: AI_CONFIG.defaultTemperature,
}),
});
if (followUp.ok) {
const followUpData = (await followUp.json()) as OpenAIAPIResponse;
return followUpData.choices[0].message.content ?? getMessage('ko', 'error_ai_unavailable');
}
// Fallback: return AI text
return assistantMessage.content ?? getMessage('ko', 'error_ai_unavailable');
}
return assistantMessage.content ?? getMessage('ko', 'error_ai_unavailable');
} finally {
clearTimeout(timeoutId);
}
} catch (error) {
logger.error('General AI error', error as Error);
return await handleWorkersAIFallback(env, userMessage);
}
}
async function handleWorkersAIFallback(env: Env, userMessage: string): Promise<string> {
try {
const result = await env.AI.run('@cf/meta/llama-3.1-8b-instruct-fp8', {
messages: [
{
role: 'system',
content: '당신은 클라우드 호스팅 서비스의 AI 고객 지원 상담사입니다. 한국어로 친절하게 답변하세요.',
},
{ role: 'user', content: userMessage },
],
});
if (result && typeof result === 'object' && 'response' in result) {
return (result as { response: string }).response;
}
return getMessage('ko', 'error_ai_unavailable');
} catch (error) {
logger.error('Workers AI fallback error', error as Error);
return getMessage('ko', 'error_ai_unavailable');
}
}
async function storeConversation(
db: D1Database,
userId: number,
userMessage: string,
assistantResponse: string,
requestId: string
): Promise<void> {
try {
await db.batch([
db.prepare(
`INSERT INTO conversations (user_id, role, content, request_id) VALUES (?, 'user', ?, ?)`
).bind(userId, userMessage, requestId),
db.prepare(
`INSERT INTO conversations (user_id, role, content, request_id) VALUES (?, 'assistant', ?, ?)`
).bind(userId, assistantResponse, requestId),
]);
} catch (error) {
logger.error('Conversation storage failed', error as Error, { userId, requestId });
}
}
async function promptFeedback(
env: Env,
chatId: number,
lang: string,
sessionType: string
): Promise<void> {
try {
const text = getMessage(lang, 'feedback_prompt');
const keyboard = [
[1, 2, 3, 4, 5].map((rating) => ({
text: '⭐'.repeat(rating),
callback_data: `fb:${sessionType}:${rating}`,
})),
];
await sendMessageWithKeyboard(env.BOT_TOKEN, chatId, text, keyboard);
} catch (error) {
logger.error('Failed to send feedback prompt', error as Error);
}
}

13
src/routes/health.ts Normal file
View File

@@ -0,0 +1,13 @@
import { Hono } from 'hono';
const health = new Hono();
health.get('/', (c) => {
return c.json({
status: 'ok',
service: 'telegram-ai-support',
timestamp: new Date().toISOString(),
});
});
export { health as healthRouter };

96
src/routes/webhook.ts Normal file
View File

@@ -0,0 +1,96 @@
import { Hono } from 'hono';
import { createMiddleware } from 'hono/factory';
import type { Env, TelegramUpdate } from '../types';
import { timingSafeEqual } from '../security';
import { handleCallbackQuery } from './handlers/callback-handler';
import { handleMessage } from './handlers/message-handler';
import { createLogger } from '../utils/logger';
const logger = createLogger('webhook');
const webhook = new Hono<{ Bindings: Env }>();
// Telegram webhook authentication middleware
const telegramAuth = createMiddleware<{ Bindings: Env }>(async (c, next) => {
if (c.req.method !== 'POST') {
logger.warn('Invalid HTTP method', { method: c.req.method });
return c.text('Method not allowed', 405);
}
const contentType = c.req.header('Content-Type');
if (!contentType?.includes('application/json')) {
logger.warn('Invalid content type', { contentType });
return c.text('Invalid content type', 400);
}
const secretToken = c.req.header('X-Telegram-Bot-Api-Secret-Token');
if (!c.env.WEBHOOK_SECRET) {
logger.error('WEBHOOK_SECRET not configured', new Error('Missing WEBHOOK_SECRET'));
return c.text('Server configuration error', 500);
}
if (!timingSafeEqual(secretToken, c.env.WEBHOOK_SECRET)) {
logger.warn('Invalid webhook secret token');
return c.text('Unauthorized', 401);
}
const clientIP = c.req.header('CF-Connecting-IP');
if (clientIP) {
logger.debug('Request from IP', { clientIP });
}
return next();
});
webhook.post('/', telegramAuth, async (c) => {
let update: TelegramUpdate;
try {
update = await c.req.json<TelegramUpdate>();
} catch (error) {
logger.error('JSON parsing error', error as Error);
return c.json({ ok: true });
}
if (!update || typeof update.update_id !== 'number') {
logger.warn('Invalid update structure', { updateKeys: update ? Object.keys(update) : [] });
return c.json({ ok: true });
}
// Timestamp validation (5 minutes) - replay attack prevention
if (update.message?.date) {
const messageTime = update.message.date * 1000;
const now = Date.now();
const MAX_AGE_MS = 5 * 60 * 1000;
if (now - messageTime > MAX_AGE_MS) {
logger.warn('Message too old', { messageAge: Math.floor((now - messageTime) / 1000) });
return c.json({ ok: true });
}
}
try {
if (update.callback_query) {
await handleCallbackQuery(c.env, update.callback_query);
return c.json({ ok: true });
}
if (update.message) {
await handleMessage(c.env, update);
return c.json({ ok: true });
}
logger.debug('Unknown update type', { updateKeys: Object.keys(update) });
return c.json({ ok: true });
} catch (error) {
// Always return 200 to Telegram to prevent retries
logger.error('Webhook processing error', error as Error, {
updateId: update.update_id,
hasMessage: !!update.message,
hasCallback: !!update.callback_query,
});
return c.json({ ok: true });
}
});
export { webhook as webhookRouter };