diff --git a/src/consumers/queue-dispatcher.ts b/src/consumers/queue-dispatcher.ts new file mode 100644 index 0000000..a39f3db --- /dev/null +++ b/src/consumers/queue-dispatcher.ts @@ -0,0 +1,45 @@ +/** + * Queue Dispatcher - 태그 기반 범용 큐 메시지 라우팅 + * + * WORK_QUEUE로 들어오는 메시지를 tag 필드에 따라 적절한 핸들러로 분배합니다. + * 새 기능 추가 시 TAG_HANDLERS에 태그 → 핸들러를 등록하면 됩니다. + */ + +import { createLogger } from '../utils/logger'; +import type { Env } from '../types'; + +const logger = createLogger('queue-dispatcher'); + +export interface QueueMessage { + tag: string; + [key: string]: unknown; +} + +type TagHandler = (body: unknown, env: Env) => Promise; + +const TAG_HANDLERS: Record = { + // 태그 → 핸들러 등록 + // example: (body, env) => handleExample(body as ExampleMessage, env), +}; + +export async function dispatchQueueMessage( + body: unknown, + env: Env +): Promise { + const message = body as QueueMessage; + const tag = message?.tag; + + if (!tag) { + logger.warn('Queue message missing tag, skipping', { body: JSON.stringify(body).slice(0, 200) }); + return; + } + + const handler = TAG_HANDLERS[tag]; + if (!handler) { + logger.warn('Unknown queue tag, skipping', { tag }); + return; + } + + logger.info('Dispatching queue message', { tag }); + await handler(body, env); +} diff --git a/src/index.ts b/src/index.ts index 9663634..08cf06b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -114,6 +114,24 @@ app.notFound((c) => c.text('Not Found', 404)); export default { fetch: app.fetch, + async queue(batch: MessageBatch, env: Env): Promise { + const { dispatchQueueMessage } = await import('./consumers/queue-dispatcher'); + const queueLogger = createLogger('queue'); + + for (const msg of batch.messages) { + try { + await dispatchQueueMessage(msg.body, env); + msg.ack(); + } catch (error) { + queueLogger.error('Queue message processing failed', error as Error, { + queue: batch.queue, + messageId: msg.id, + }); + msg.retry(); + } + } + }, + async scheduled(event: ScheduledEvent, env: Env, _ctx: ExecutionContext): Promise { const cronSchedule = event.cron; logger.info('Cron job started', { schedule: cronSchedule });