feat: implement archive service for old conversations

- Add archive-service.ts with 3 main functions:
  1. generateArchiveSummary(): AI-powered conversation summarization
     - Uses OpenAI for intelligent summaries
     - Fallback to keyword-based simple summaries
     - Format: [YYYY-MM-DD ~ YYYY-MM-DD archive] summary
  2. archiveUserConversations(): Archive conversations per user
     - Query messages older than 180 days (user messages only)
     - Create summaries in batches of 100 messages
     - Save to summaries table with incremented generation
     - Delete original messages and update meta table
  3. archiveOldConversations(): Archive all users (Cron job)
     - Iterate through conversation_tables
     - Call archiveUserConversations for each user
     - Return ArchiveResult with stats

- Constants: ARCHIVE_DAYS=180, BATCH_SIZE=100
- Error handling: Comprehensive logging with structured errors
- Type safety: All functions use strict TypeScript types
- No breaking changes: Works with existing conversation storage

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
kappa
2026-02-05 12:23:49 +09:00
parent 2a84f12d46
commit a14e640db9

View File

@@ -0,0 +1,324 @@
import type { Env, ArchiveResult } from '../types';
import { createLogger } from '../utils/logger';
import { generateProfileWithOpenAI } from '../openai-service';
const logger = createLogger('archive-service');
// 상수
const ARCHIVE_DAYS = 180; // 6개월
const BATCH_SIZE = 100; // 요약당 메시지 수
/**
* 사용자 메시지 타입
*/
interface UserMessage {
content: string;
created_at: string;
}
/**
* AI 요약 생성 (OpenAI 사용, 폴백 시 단순 요약)
* @param env 환경 변수
* @param messages 사용자 메시지 배열
* @param startDate 시작일 (YYYY-MM-DD)
* @param endDate 종료일 (YYYY-MM-DD)
* @returns 아카이브 요약 문자열
*/
export async function generateArchiveSummary(
env: Env,
messages: UserMessage[],
startDate: string,
endDate: string
): Promise<string> {
try {
if (messages.length === 0) {
return `[${startDate} ~ ${endDate} 아카이브] 메시지 없음`;
}
// 사용자 발언 목록 생성
const userStatements = messages
.map((msg, idx) => `${idx + 1}. ${msg.content}`)
.join('\n');
const prompt = `당신은 대화 아카이브 전문가입니다.
아래 기간의 사용자 발언을 분석하여 핵심 정보를 요약하세요.
## 기간
${startDate} ~ ${endDate}
## 사용자 발언 (${messages.length}개)
${userStatements}
## 요구사항
1. 주요 관심사, 요청사항, 질문 주제를 파악
2. 중요한 맥락 정보 (직업, 프로젝트, 목표 등) 보존
3. 무의미한 내용 제외 (인사, 감사 표현 등)
4. 300-400자 이내로 간결하게
5. 한국어로 작성
요약:`;
// OpenAI 사용 (설정된 경우)
if (env.OPENAI_API_KEY) {
const aiSummary = await generateProfileWithOpenAI(env, prompt);
return `[${startDate} ~ ${endDate} 아카이브] ${aiSummary}`;
}
// 폴백: 단순 요약
const topKeywords = extractTopKeywords(messages, 5);
const simpleSummary = `주요 키워드: ${topKeywords.join(', ')} (총 ${messages.length}개 메시지)`;
return `[${startDate} ~ ${endDate} 아카이브] ${simpleSummary}`;
} catch (error) {
logger.error('아카이브 요약 생성 실패', error as Error, {
startDate,
endDate,
messageCount: messages.length,
});
// 에러 시 단순 요약 반환
return `[${startDate} ~ ${endDate} 아카이브] ${messages.length}개 메시지 보존`;
}
}
/**
* 메시지에서 상위 키워드 추출 (단순 빈도 기반)
* @param messages 메시지 배열
* @param topN 상위 N개 반환
* @returns 키워드 배열
*/
function extractTopKeywords(messages: UserMessage[], topN: number): string[] {
const wordFreq = new Map<string, number>();
messages.forEach((msg) => {
const words = msg.content
.replace(/[^\w\s가-힣]/g, ' ')
.split(/\s+/)
.filter((word) => word.length >= 2);
words.forEach((word) => {
wordFreq.set(word, (wordFreq.get(word) || 0) + 1);
});
});
return Array.from(wordFreq.entries())
.sort((a, b) => b[1] - a[1])
.slice(0, topN)
.map(([word]) => word);
}
/**
* 사용자별 대화 아카이브
* - 180일 이상 된 메시지 조회
* - 100개 단위로 요약 생성
* - summaries 테이블에 저장 (generation 증가)
* - 원본 메시지 삭제
* - 메타 테이블 message_count 업데이트
*
* @param env 환경 변수
* @param telegramId 텔레그램 사용자 ID
* @param tableName 대화 테이블명 (예: conv_123456)
* @param olderThanDays 기준 일수 (기본값: 180일)
* @returns { archived: 삭제된 메시지 수, summaries: 생성된 요약 수 }
*/
export async function archiveUserConversations(
env: Env,
telegramId: string,
tableName: string,
olderThanDays: number = ARCHIVE_DAYS
): Promise<{ archived: number; summaries: number }> {
try {
logger.info('사용자 아카이브 시작', { telegramId, tableName, olderThanDays });
// 1. 기준 날짜 계산 (N일 이전)
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - olderThanDays);
const cutoffDateStr = cutoffDate.toISOString();
// 2. 오래된 메시지 조회 (사용자 발언만, role='user')
const { results: oldMessages } = await env.DB.prepare(
`SELECT content, created_at
FROM ${tableName}
WHERE role = 'user' AND created_at < ?
ORDER BY created_at ASC`
)
.bind(cutoffDateStr)
.all<UserMessage>();
if (!oldMessages || oldMessages.length === 0) {
logger.info('아카이브할 메시지 없음', { telegramId, tableName });
return { archived: 0, summaries: 0 };
}
logger.info('아카이브 대상 메시지 조회 완료', {
telegramId,
messageCount: oldMessages.length,
cutoffDate: cutoffDateStr,
});
// 3. 사용자 ID 조회
const user = await env.DB.prepare('SELECT id FROM users WHERE telegram_id = ?')
.bind(telegramId)
.first<{ id: number }>();
if (!user) {
logger.error('사용자 ID 조회 실패', undefined, { telegramId });
return { archived: 0, summaries: 0 };
}
const userId = user.id;
// 4. 최신 요약의 generation 조회
const latestSummary = await env.DB.prepare(
'SELECT generation FROM summaries WHERE user_id = ? ORDER BY generation DESC LIMIT 1'
)
.bind(userId)
.first<{ generation: number }>();
let currentGeneration = (latestSummary?.generation || 0) + 1;
// 5. 100개 단위로 배치 처리
const batches: UserMessage[][] = [];
for (let i = 0; i < oldMessages.length; i += BATCH_SIZE) {
batches.push(oldMessages.slice(i, i + BATCH_SIZE));
}
logger.info('배치 생성 완료', {
telegramId,
totalMessages: oldMessages.length,
batchCount: batches.length,
batchSize: BATCH_SIZE,
});
// 6. 각 배치별 요약 생성 및 저장
let summariesCreated = 0;
for (const batch of batches) {
// 시작/종료 날짜 추출
const startDate = batch[0].created_at.split('T')[0]; // YYYY-MM-DD
const endDate = batch[batch.length - 1].created_at.split('T')[0]; // YYYY-MM-DD
// AI 요약 생성
const summary = await generateArchiveSummary(env, batch, startDate, endDate);
// summaries 테이블에 저장
await env.DB.prepare(
`INSERT INTO summaries (user_id, chat_id, generation, summary, message_count)
VALUES (?, ?, ?, ?, ?)`
)
.bind(userId, telegramId, currentGeneration, summary, batch.length)
.run();
logger.info('아카이브 요약 저장 완료', {
telegramId,
generation: currentGeneration,
messageCount: batch.length,
startDate,
endDate,
});
currentGeneration++;
summariesCreated++;
}
// 7. 원본 메시지 삭제 (role과 created_at 조건 모두 사용)
const deleteResult = await env.DB.prepare(
`DELETE FROM ${tableName}
WHERE created_at < ?`
)
.bind(cutoffDateStr)
.run();
const deletedCount = deleteResult.meta?.changes || 0;
// 8. 메타 테이블 message_count 업데이트
await env.DB.prepare(
`UPDATE conversation_tables
SET message_count = message_count - ?
WHERE telegram_id = ?`
)
.bind(deletedCount, telegramId)
.run();
logger.info('사용자 아카이브 완료', {
telegramId,
archived: deletedCount,
summaries: summariesCreated,
});
return { archived: deletedCount, summaries: summariesCreated };
} catch (error) {
logger.error('사용자 아카이브 실패', error as Error, { telegramId, tableName });
throw error;
}
}
/**
* 전체 사용자 대화 아카이브 (Cron용)
* - conversation_tables 순회
* - 각 사용자별 archiveUserConversations 호출
*
* @param env 환경 변수
* @returns ArchiveResult (처리된 사용자 수, 아카이브된 메시지 수, 생성된 요약 수, 에러 목록)
*/
export async function archiveOldConversations(env: Env): Promise<ArchiveResult> {
const startTime = Date.now();
logger.info('전체 아카이브 시작');
const result: ArchiveResult = {
processed_users: 0,
archived_messages: 0,
created_summaries: 0,
errors: [],
};
try {
// 1. conversation_tables에서 모든 사용자 조회
const { results: users } = await env.DB.prepare(
'SELECT telegram_id, table_name FROM conversation_tables ORDER BY telegram_id'
).all<{ telegram_id: string; table_name: string }>();
if (!users || users.length === 0) {
logger.info('아카이브할 사용자 없음');
return result;
}
logger.info('아카이브 대상 사용자 조회 완료', { userCount: users.length });
// 2. 각 사용자별 아카이브 실행
for (const user of users) {
try {
const { archived, summaries } = await archiveUserConversations(
env,
user.telegram_id,
user.table_name
);
if (archived > 0 || summaries > 0) {
result.processed_users++;
result.archived_messages += archived;
result.created_summaries += summaries;
}
} catch (error) {
const errorMsg = `${user.telegram_id}: ${(error as Error).message}`;
result.errors.push(errorMsg);
logger.error('사용자 아카이브 실패', error as Error, {
telegramId: user.telegram_id,
});
}
}
const duration = Date.now() - startTime;
logger.info('전체 아카이브 완료', {
duration: `${duration}ms`,
processedUsers: result.processed_users,
archivedMessages: result.archived_messages,
createdSummaries: result.created_summaries,
errorCount: result.errors.length,
});
return result;
} catch (error) {
logger.error('전체 아카이브 실패', error as Error);
result.errors.push(`전체 실패: ${(error as Error).message}`);
return result;
}
}