Files
telegram-ai-support/src/index.ts
kappa 3d97190186 Add queue handler and tag-based queue dispatcher
- Add queue() handler in index.ts for WORK_QUEUE processing
- Add consumers/queue-dispatcher.ts with tag-based routing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 09:33:11 +09:00

182 lines
5.1 KiB
TypeScript

import { Hono } from 'hono';
import type { Env } from './types';
import { webhookRouter } from './routes/webhook';
import { apiRouter } from './routes/api';
import { healthRouter } from './routes/health';
import { setWebhook, getWebhookInfo } from './telegram';
import { timingSafeEqual } from './security';
import { validateEnv } from './utils/env-validation';
import { createLogger } from './utils/logger';
const logger = createLogger('worker');
let envValidated = false;
const app = new Hono<{ Bindings: Env }>();
// Environment validation middleware (runs once per worker instance)
app.use('*', async (c, next) => {
if (!envValidated) {
const result = validateEnv(c.env as unknown as Record<string, unknown>);
if (!result.success) {
logger.error('Environment validation failed', new Error('Invalid configuration'), {
errors: result.errors,
});
return c.json({
error: 'Configuration error',
message: 'The worker is not properly configured.',
}, 500);
}
if (result.warnings.length > 0) {
logger.warn('Environment configuration warnings', { warnings: result.warnings });
}
logger.info('Environment validation passed', {
environment: c.env.ENVIRONMENT || 'production',
warnings: result.warnings.length,
});
envValidated = true;
}
return await next();
});
// Health check
app.route('/health', healthRouter);
// Setup webhook
app.get('/setup-webhook', async (c) => {
const env = c.env;
if (!env.BOT_TOKEN || !env.WEBHOOK_SECRET) {
return c.json({ error: 'Server configuration error' }, 500);
}
const token = c.req.query('token');
const secret = c.req.query('secret');
if (!token || !timingSafeEqual(token, env.BOT_TOKEN)) {
return c.text('Unauthorized', 401);
}
if (!secret || !timingSafeEqual(secret, env.WEBHOOK_SECRET)) {
return c.text('Unauthorized', 401);
}
const webhookUrl = `${new URL(c.req.url).origin}/webhook`;
const result = await setWebhook(env.BOT_TOKEN, webhookUrl, env.WEBHOOK_SECRET);
return c.json(result);
});
// Webhook info
app.get('/webhook-info', async (c) => {
const env = c.env;
if (!env.BOT_TOKEN || !env.WEBHOOK_SECRET) {
return c.json({ error: 'Server configuration error' }, 500);
}
const token = c.req.query('token');
const secret = c.req.query('secret');
if (!token || !timingSafeEqual(token, env.BOT_TOKEN)) {
return c.text('Unauthorized', 401);
}
if (!secret || !timingSafeEqual(secret, env.WEBHOOK_SECRET)) {
return c.text('Unauthorized', 401);
}
const result = await getWebhookInfo(env.BOT_TOKEN);
return c.json(result);
});
// API routes
app.route('/api', apiRouter);
// Telegram Webhook
app.route('/webhook', webhookRouter);
// Root
app.get('/', (c) => {
return c.text(
`Telegram AI Support Bot
Endpoints:
GET /health - Health check
GET /webhook-info - Webhook status
GET /setup-webhook - Configure webhook
POST /webhook - Telegram webhook (authenticated)
GET /api/* - Admin API (authenticated)`,
200
);
});
// 404
app.notFound((c) => c.text('Not Found', 404));
export default {
fetch: app.fetch,
async queue(batch: MessageBatch, env: Env): Promise<void> {
const { dispatchQueueMessage } = await import('./consumers/queue-dispatcher');
const queueLogger = createLogger('queue');
for (const msg of batch.messages) {
try {
await dispatchQueueMessage(msg.body, env);
msg.ack();
} catch (error) {
queueLogger.error('Queue message processing failed', error as Error, {
queue: batch.queue,
messageId: msg.id,
});
msg.retry();
}
}
},
async scheduled(event: ScheduledEvent, env: Env, _ctx: ExecutionContext): Promise<void> {
const cronSchedule = event.cron;
logger.info('Cron job started', { schedule: cronSchedule });
const {
cleanupExpiredSessions,
sendExpiryNotifications,
archiveOldConversations,
cleanupStaleOrders,
monitoringCheck,
notifyServerStatusChanges,
notifyTransactionStatusChanges,
sendPaymentReminders,
notifyBankMatches,
} = await import('./services/cron-jobs');
try {
switch (cronSchedule) {
// Midnight KST (15:00 UTC): expiry notifications, archiving, session cleanup
case '0 15 * * *':
await sendExpiryNotifications(env);
await archiveOldConversations(env);
await cleanupExpiredSessions(env);
break;
// Every 5 minutes: stale cleanup + proactive notifications
case '*/5 * * * *':
await cleanupStaleOrders(env);
await notifyServerStatusChanges(env);
await notifyTransactionStatusChanges(env);
await notifyBankMatches(env);
break;
// Every hour: monitoring checks + payment reminders
case '0 * * * *':
await monitoringCheck(env);
await sendPaymentReminders(env);
break;
default:
logger.warn('Unknown cron schedule', { schedule: cronSchedule });
}
} catch (error) {
logger.error('Cron job failed', error as Error, { schedule: cronSchedule });
}
},
};