Files
telegram-bot-workers/src/index.ts
kappa f304c6a7d4 refactor: apply new utilities and constants across codebase
P0 fixes:
- KV Cache migration: security.ts now delegates to kv-cache.ts (74% code reduction)
- Environment validation: index.ts validates env on first request
- Type safety: optimistic-lock.ts removes `as any` with proper interface

P1 improvements:
- Constants applied to deposit-agent.ts (TRANSACTION_STATUS, TRANSACTION_TYPE)
- Constants applied to callback-handler.ts (CALLBACK_PREFIXES)
- Constants applied to domain-tool.ts (MESSAGE_MARKERS)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 10:49:31 +09:00

472 lines
16 KiB
TypeScript

import { Env, EmailMessage, ProvisionMessage, MessageBatch } from './types';
import { sendMessage, setWebhook, getWebhookInfo } from './telegram';
import { webhookRouter } from './routes/webhook';
import { apiRouter } from './routes/api';
import { handleHealthCheck } from './routes/health';
import { parseBankSMS } from './services/bank-sms-parser';
import { matchPendingDeposit } from './services/deposit-matcher';
import { reconcileDeposits, formatReconciliationReport } from './utils/reconciliation';
import { handleProvisionQueue, handleProvisionDLQ } from './server-provision';
import { timingSafeEqual } from './security';
import { createLogger } from './utils/logger';
import { notifyAdmin } from './services/notification';
import { validateEnv } from './utils/env-validation';
import { Hono } from 'hono';
const logger = createLogger('worker');
// Environment validation flag (checked once per instance)
let envValidated = false;
// Hono app with Env type
const app = new Hono<{ Bindings: Env }>();
// Environment validation middleware (runs once per worker instance)
app.use('*', async (c, next) => {
if (!envValidated) {
// Cast to Record<string, unknown> for validation
const result = validateEnv(c.env as unknown as Record<string, unknown>);
if (!result.success) {
logger.error('Environment validation failed on startup', new Error('Invalid configuration'), {
errors: result.errors,
});
return c.json({
error: 'Configuration error',
message: 'The worker is not properly configured. Please check environment variables.',
details: result.errors,
}, 500);
}
// Log warnings but continue
if (result.warnings.length > 0) {
logger.warn('Environment configuration warnings', { warnings: result.warnings });
}
logger.info('Environment validation passed', {
environment: c.env.ENVIRONMENT || 'production',
warnings: result.warnings.length,
});
envValidated = true;
}
return await next();
});
// Health check (public - minimal info only)
app.get('/health', () => handleHealthCheck());
// Setup webhook (with auth)
app.get('/setup-webhook', async (c) => {
const env = c.env;
if (!env.BOT_TOKEN) {
return c.json({ error: 'BOT_TOKEN not configured' }, 500);
}
if (!env.WEBHOOK_SECRET) {
return c.json({ error: 'WEBHOOK_SECRET not configured' }, 500);
}
// 인증: token + secret 검증
const token = c.req.query('token');
const secret = c.req.query('secret');
if (!token || !timingSafeEqual(token, env.BOT_TOKEN)) {
return c.text('Unauthorized: Invalid or missing token', 401);
}
if (!secret || !timingSafeEqual(secret, env.WEBHOOK_SECRET)) {
return c.text('Unauthorized: Invalid or missing secret', 401);
}
const webhookUrl = `${new URL(c.req.url).origin}/webhook`;
const result = await setWebhook(env.BOT_TOKEN, webhookUrl, env.WEBHOOK_SECRET);
return c.json(result);
});
// Webhook info
app.get('/webhook-info', async (c) => {
const env = c.env;
if (!env.BOT_TOKEN) {
return c.json({ error: 'BOT_TOKEN not configured' }, 500);
}
if (!env.WEBHOOK_SECRET) {
return c.json({ error: 'WEBHOOK_SECRET not configured' }, 500);
}
// 인증: token + secret 검증
const token = c.req.query('token');
const secret = c.req.query('secret');
if (!token || !timingSafeEqual(token, env.BOT_TOKEN)) {
return c.text('Unauthorized: Invalid or missing token', 401);
}
if (!secret || !timingSafeEqual(secret, env.WEBHOOK_SECRET)) {
return c.text('Unauthorized: Invalid or missing secret', 401);
}
const result = await getWebhookInfo(env.BOT_TOKEN);
return c.json(result);
});
// API routes - use Hono router
app.route('/api', apiRouter);
// Telegram Webhook - use Hono router with middleware
app.route('/webhook', webhookRouter);
// Root path
app.get('/', (c) => {
return c.text(
`Telegram Rolling Summary Bot
Endpoints:
GET /health - Health check
GET /webhook-info - Webhook status
GET /setup-webhook - Configure webhook
POST /webhook - Telegram webhook (authenticated)
Documentation: https://github.com/your-repo`,
200
);
});
// 404 handler
app.notFound((c) => c.text('Not Found', 404));
export default {
// HTTP 요청 핸들러 - Hono handles HTTP
fetch: app.fetch,
// Email 핸들러 (SMS → 메일 → 파싱)
async email(message: EmailMessage, env: Env): Promise<void> {
const emailLogger = createLogger('email-handler');
try {
// 이메일 본문 읽기
const rawEmail = await new Response(message.raw).text();
// 이메일 주소 마스킹
const maskedFrom = message.from.replace(/@.+/, '@****');
emailLogger.info('이메일 수신', { from: maskedFrom, size: message.rawSize });
// SMS 내용 파싱
const notification = await parseBankSMS(rawEmail, env);
if (!notification) {
// Structured logging with context
emailLogger.warn('SMS 파싱 실패', {
from: maskedFrom,
subject: message.headers.get('subject') || 'N/A',
preview: rawEmail.substring(0, 200).replace(/\s+/g, ' '),
size: message.rawSize
});
// Admin notification for manual review
await notifyAdmin(
'api_error',
{
service: 'Email Handler',
error: 'SMS parsing failed',
context: `From: ${maskedFrom}\nSubject: ${message.headers.get('subject') || 'N/A'}\nPreview: ${rawEmail.substring(0, 150).replace(/\s+/g, ' ')}...`
},
{
telegram: {
sendMessage: (chatId: number, text: string) =>
sendMessage(env.BOT_TOKEN, chatId, text)
},
adminId: env.DEPOSIT_ADMIN_ID || '',
env
}
);
return; // Don't throw - email routing expects success
}
// 파싱 결과 마스킹 로깅
emailLogger.info('SMS 파싱 결과', {
bankName: notification.bankName,
depositorName: notification.depositorName
? notification.depositorName.slice(0, 2) + '***'
: 'unknown',
amount: notification.amount ? '****원' : 'unknown',
transactionTime: notification.transactionTime
? 'masked'
: 'not parsed',
matched: !!notification.transactionTime,
});
// DB에 저장
const insertResult = await env.DB.prepare(
`INSERT INTO bank_notifications (bank_name, depositor_name, depositor_name_prefix, amount, balance_after, transaction_time, raw_message)
VALUES (?, ?, ?, ?, ?, ?, ?)`
).bind(
notification.bankName,
notification.depositorName,
notification.depositorName.slice(0, 7),
notification.amount,
notification.balanceAfter || null,
notification.transactionTime?.toISOString() || null,
notification.rawMessage
).run();
const notificationId = insertResult.meta.last_row_id;
emailLogger.info('알림 저장 완료', { notificationId });
// 자동 매칭 시도
const matched = await matchPendingDeposit(env.DB, notificationId, notification);
// 매칭 결과 로깅 (민감 정보 마스킹)
if (matched) {
emailLogger.info('자동 매칭 성공', { transactionId: matched.transactionId });
} else {
emailLogger.info('매칭되는 거래 없음');
}
// 매칭 성공 시 사용자에게 알림
if (matched && env.BOT_TOKEN) {
// 병렬화: JOIN으로 단일 쿼리 (1회 네트워크 왕복)
const result = await env.DB.prepare(
`SELECT u.telegram_id, COALESCE(d.balance, 0) as balance
FROM users u
LEFT JOIN user_deposits d ON u.id = d.user_id
WHERE u.id = ?`
).bind(matched.userId).first<{ telegram_id: string; balance: number }>();
if (result) {
await sendMessage(
env.BOT_TOKEN,
parseInt(result.telegram_id),
`✅ <b>입금 확인 완료!</b>\n\n` +
`입금액: ${matched.amount.toLocaleString()}\n` +
`현재 잔액: ${result.balance.toLocaleString()}\n\n` +
`감사합니다! 🎉`
);
}
}
// 관리자에게 알림
if (env.BOT_TOKEN && env.DEPOSIT_ADMIN_ID) {
const statusMsg = matched
? `✅ 자동 매칭 완료! (거래 #${matched.transactionId})`
: '⏳ 매칭 대기 중 (사용자 입금 신고 필요)';
await sendMessage(
env.BOT_TOKEN,
parseInt(env.DEPOSIT_ADMIN_ID),
`🏦 <b>입금 알림</b>\n\n` +
`은행: ${notification.bankName}\n` +
`입금자: ${notification.depositorName}\n` +
`금액: ${notification.amount.toLocaleString()}\n` +
`${notification.balanceAfter ? `잔액: ${notification.balanceAfter.toLocaleString()}\n` : ''}` +
`\n${statusMsg}`
);
}
} catch (error) {
emailLogger.error('이메일 처리 오류', error as Error, {
from: message.from.replace(/@.+/, '@****'),
size: message.rawSize
});
// Don't rethrow - email routing expects success response
}
},
// Cron Triggers: 입금 대기 자동 취소 (24시간) + 서버 주문 자동 삭제 (5분)
async scheduled(event: ScheduledEvent, env: Env, _ctx: ExecutionContext): Promise<void> {
const cronSchedule = event.cron;
logger.info('Cron 작업 시작', { schedule: cronSchedule });
// 매 5분: pending 서버 주문 자동 삭제
if (cronSchedule === '*/5 * * * *') {
await cleanupStalePendingServerOrders(env);
return;
}
// 매일 자정 (KST): 입금 만료 + 정합성 검증
if (cronSchedule === '0 15 * * *') {
await cleanupExpiredDepositTransactions(env);
await reconcileDepositBalances(env);
return;
}
logger.warn('알 수 없는 Cron 스케줄', { schedule: cronSchedule });
},
// Queue Consumer 핸들러
async queue(batch: MessageBatch<ProvisionMessage>, env: Env): Promise<void> {
// Queue 이름으로 구분
const queueName = batch.queue;
if (queueName === 'server-provision-queue') {
await handleProvisionQueue(batch, env);
} else if (queueName === 'provision-dlq') {
await handleProvisionDLQ(batch, env);
} else {
logger.warn('알 수 없는 Queue', { queue: queueName });
}
},
};
// ============================================================================
// Cron Job Helper Functions
// ============================================================================
/**
* 5분 이상 pending 상태인 서버 주문 자동 삭제
* 실행 주기: 매 5분 (every 5 minutes)
*/
async function cleanupStalePendingServerOrders(env: Env): Promise<void> {
logger.info('서버 주문 정리 시작 (5분 경과)');
try {
// 5분 이상 된 pending 서버 주문 조회
const staleOrders = await env.DB.prepare(
`SELECT so.id, so.label, so.price_paid, u.telegram_id
FROM server_orders so
JOIN users u ON so.user_id = u.id
WHERE so.status = 'pending'
AND datetime(so.created_at) < datetime('now', '-5 minutes')
LIMIT 50`
).all<{
id: number;
label: string | null;
price_paid: number;
telegram_id: string;
}>();
if (!staleOrders.results?.length) {
logger.info('삭제할 서버 주문 없음');
return;
}
logger.info('방치된 서버 주문 발견', { count: staleOrders.results.length });
// 서버 주문 삭제
const orderIds = staleOrders.results.map(order => order.id);
await env.DB.prepare(
`DELETE FROM server_orders WHERE id IN (${orderIds.map(() => '?').join(',')})`
).bind(...orderIds).run();
logger.info('서버 주문 삭제 완료', { count: orderIds.length });
// 사용자 알림 병렬 처리 (개별 실패 무시)
const notificationPromises = staleOrders.results.map(order =>
sendMessage(
env.BOT_TOKEN,
parseInt(order.telegram_id),
`❌ <b>서버 주문 자동 취소</b>\n\n` +
`주문 #${order.id}이 처리되지 않아 자동 취소되었습니다.\n` +
`• 서버명: ${order.label || '(미지정)'}\n` +
`• 결제 금액: ${order.price_paid.toLocaleString()}\n\n` +
`다시 시도해주세요.`
).catch(err => {
logger.error('알림 전송 실패', err as Error, {
orderId: order.id,
userId: order.telegram_id
});
return null;
})
);
await Promise.all(notificationPromises);
logger.info('서버 주문 정리 완료', { count: staleOrders.results.length });
} catch (error) {
logger.error('서버 주문 정리 오류', error as Error);
}
}
/**
* 24시간 이상 pending 상태인 입금 거래 자동 취소
* 실행 주기: 매일 자정 KST (0 15 * * *)
*/
async function cleanupExpiredDepositTransactions(env: Env): Promise<void> {
logger.info('만료된 입금 대기 정리 시작');
try {
// 24시간 이상 된 pending 거래 조회
const expiredTxs = await env.DB.prepare(
`SELECT dt.id, dt.amount, dt.depositor_name, u.telegram_id
FROM deposit_transactions dt
JOIN users u ON dt.user_id = u.id
WHERE dt.status = 'pending'
AND dt.type = 'deposit'
AND datetime(dt.created_at) < datetime('now', '-1 day')
LIMIT 100`
).all<{
id: number;
amount: number;
depositor_name: string;
telegram_id: string;
}>();
if (!expiredTxs.results?.length) {
logger.info('만료된 거래 없음');
return;
}
logger.info('만료된 거래 발견', { count: expiredTxs.results.length });
// 단일 UPDATE 쿼리로 일괄 처리
const ids = expiredTxs.results.map(tx => tx.id);
await env.DB.prepare(
`UPDATE deposit_transactions
SET status = 'cancelled', description = '입금 대기 만료 (24시간)'
WHERE id IN (${ids.map(() => '?').join(',')})`
).bind(...ids).run();
logger.info('UPDATE 완료', { count: ids.length });
// 알림 병렬 처리 (개별 실패가 전체를 중단시키지 않도록 .catch() 추가)
const notificationPromises = expiredTxs.results.map(tx =>
sendMessage(
env.BOT_TOKEN,
parseInt(tx.telegram_id),
`⏰ <b>입금 대기 자동 취소</b>\n\n` +
`거래 #${tx.id}이 24시간 내 확인되지 않아 자동 취소되었습니다.\n` +
`• 입금액: ${tx.amount.toLocaleString()}\n` +
`• 입금자: ${tx.depositor_name}\n\n` +
`실제 입금하셨다면 다시 신고해주세요.`
).catch(err => {
logger.error('알림 전송 실패', err as Error, {
transactionId: tx.id,
userId: tx.telegram_id
});
return null;
})
);
await Promise.all(notificationPromises);
logger.info('만료 처리 완료', { count: expiredTxs.results.length });
} catch (error) {
logger.error('Cron 작업 오류', error as Error);
}
}
/**
* 예치금 정합성 검증 (Reconciliation)
* 실행 주기: 매일 자정 KST (0 15 * * *)
*/
async function reconcileDepositBalances(env: Env): Promise<void> {
logger.info('예치금 정합성 검증 시작');
try {
const report = await reconcileDeposits(env.DB);
if (report.inconsistencies > 0) {
// 관리자 알림 전송
const adminId = env.DEPOSIT_ADMIN_ID;
if (adminId) {
const message = formatReconciliationReport(report);
await sendMessage(env.BOT_TOKEN, parseInt(adminId), message).catch(err => {
logger.error('정합성 검증 알림 전송 실패', err as Error);
});
} else {
logger.warn('DEPOSIT_ADMIN_ID 미설정 - 알림 전송 불가');
}
}
logger.info('정합성 검증 완료', {
totalUsers: report.totalUsers,
inconsistencies: report.inconsistencies
});
} catch (error) {
logger.error('정합성 검증 실패', error as Error);
// 정합성 검증 실패가 전체 Cron을 중단시키지 않도록 에러를 catch만 하고 계속 진행
}
}