Add queue handler and tag-based queue dispatcher
- Add queue() handler in index.ts for WORK_QUEUE processing - Add consumers/queue-dispatcher.ts with tag-based routing Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
45
src/consumers/queue-dispatcher.ts
Normal file
45
src/consumers/queue-dispatcher.ts
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
/**
|
||||||
|
* Queue Dispatcher - 태그 기반 범용 큐 메시지 라우팅
|
||||||
|
*
|
||||||
|
* WORK_QUEUE로 들어오는 메시지를 tag 필드에 따라 적절한 핸들러로 분배합니다.
|
||||||
|
* 새 기능 추가 시 TAG_HANDLERS에 태그 → 핸들러를 등록하면 됩니다.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { createLogger } from '../utils/logger';
|
||||||
|
import type { Env } from '../types';
|
||||||
|
|
||||||
|
const logger = createLogger('queue-dispatcher');
|
||||||
|
|
||||||
|
export interface QueueMessage {
|
||||||
|
tag: string;
|
||||||
|
[key: string]: unknown;
|
||||||
|
}
|
||||||
|
|
||||||
|
type TagHandler = (body: unknown, env: Env) => Promise<void>;
|
||||||
|
|
||||||
|
const TAG_HANDLERS: Record<string, TagHandler> = {
|
||||||
|
// 태그 → 핸들러 등록
|
||||||
|
// example: (body, env) => handleExample(body as ExampleMessage, env),
|
||||||
|
};
|
||||||
|
|
||||||
|
export async function dispatchQueueMessage(
|
||||||
|
body: unknown,
|
||||||
|
env: Env
|
||||||
|
): Promise<void> {
|
||||||
|
const message = body as QueueMessage;
|
||||||
|
const tag = message?.tag;
|
||||||
|
|
||||||
|
if (!tag) {
|
||||||
|
logger.warn('Queue message missing tag, skipping', { body: JSON.stringify(body).slice(0, 200) });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const handler = TAG_HANDLERS[tag];
|
||||||
|
if (!handler) {
|
||||||
|
logger.warn('Unknown queue tag, skipping', { tag });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info('Dispatching queue message', { tag });
|
||||||
|
await handler(body, env);
|
||||||
|
}
|
||||||
18
src/index.ts
18
src/index.ts
@@ -114,6 +114,24 @@ app.notFound((c) => c.text('Not Found', 404));
|
|||||||
export default {
|
export default {
|
||||||
fetch: app.fetch,
|
fetch: app.fetch,
|
||||||
|
|
||||||
|
async queue(batch: MessageBatch, env: Env): Promise<void> {
|
||||||
|
const { dispatchQueueMessage } = await import('./consumers/queue-dispatcher');
|
||||||
|
const queueLogger = createLogger('queue');
|
||||||
|
|
||||||
|
for (const msg of batch.messages) {
|
||||||
|
try {
|
||||||
|
await dispatchQueueMessage(msg.body, env);
|
||||||
|
msg.ack();
|
||||||
|
} catch (error) {
|
||||||
|
queueLogger.error('Queue message processing failed', error as Error, {
|
||||||
|
queue: batch.queue,
|
||||||
|
messageId: msg.id,
|
||||||
|
});
|
||||||
|
msg.retry();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
async scheduled(event: ScheduledEvent, env: Env, _ctx: ExecutionContext): Promise<void> {
|
async scheduled(event: ScheduledEvent, env: Env, _ctx: ExecutionContext): Promise<void> {
|
||||||
const cronSchedule = event.cron;
|
const cronSchedule = event.cron;
|
||||||
logger.info('Cron job started', { schedule: cronSchedule });
|
logger.info('Cron job started', { schedule: cronSchedule });
|
||||||
|
|||||||
Reference in New Issue
Block a user