feat: add SessionManager generic class for agent sessions
- Create reusable SessionManager<T> with CRUD operations - Support for session expiry, message limits - Specialized managers for Domain and Server agents - Reduce code duplication across 4 agents Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
340
src/utils/session-manager.ts
Normal file
340
src/utils/session-manager.ts
Normal file
@@ -0,0 +1,340 @@
|
||||
/**
|
||||
* Session Manager - Generic session CRUD for agents
|
||||
*
|
||||
* Eliminates duplicated session management code across agents:
|
||||
* - Domain Agent
|
||||
* - Deposit Agent
|
||||
* - Server Agent
|
||||
* - Troubleshoot Agent
|
||||
*/
|
||||
|
||||
import { createLogger } from './logger';
|
||||
|
||||
const logger = createLogger('session-manager');
|
||||
|
||||
/**
|
||||
* Base interface for all agent sessions
|
||||
*/
|
||||
export interface BaseSession {
|
||||
user_id: string;
|
||||
status: string;
|
||||
collected_info: Record<string, unknown>;
|
||||
messages: Array<{ role: 'user' | 'assistant'; content: string }>;
|
||||
created_at: number;
|
||||
updated_at: number;
|
||||
expires_at: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configuration for SessionManager
|
||||
*/
|
||||
export interface SessionManagerConfig {
|
||||
tableName: string;
|
||||
ttlMs: number;
|
||||
maxMessages: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generic session manager for all agents
|
||||
* Provides CRUD operations, expiry checking, and message management
|
||||
*
|
||||
* Usage:
|
||||
* ```typescript
|
||||
* const manager = new SessionManager<DomainSession>({
|
||||
* tableName: 'domain_sessions',
|
||||
* ttlMs: 60 * 60 * 1000, // 1 hour
|
||||
* maxMessages: 20
|
||||
* });
|
||||
*
|
||||
* const session = await manager.get(db, userId);
|
||||
* if (!session) {
|
||||
* const newSession = manager.create(userId, 'gathering');
|
||||
* await manager.save(db, newSession);
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
export class SessionManager<T extends BaseSession> {
|
||||
private readonly config: SessionManagerConfig;
|
||||
|
||||
constructor(config: SessionManagerConfig) {
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get session from D1 database
|
||||
*
|
||||
* @param db - D1 Database
|
||||
* @param userId - Telegram User ID
|
||||
* @returns Session or null (not found or expired)
|
||||
*/
|
||||
async get(db: D1Database, userId: string): Promise<T | null> {
|
||||
try {
|
||||
const now = Date.now();
|
||||
const result = await db.prepare(
|
||||
`SELECT * FROM ${this.config.tableName} WHERE user_id = ? AND expires_at > ?`
|
||||
).bind(userId, now).first();
|
||||
|
||||
if (!result) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Parse JSON fields
|
||||
const session: T = {
|
||||
user_id: result.user_id as string,
|
||||
status: result.status as string,
|
||||
collected_info: result.collected_info
|
||||
? JSON.parse(result.collected_info as string)
|
||||
: {},
|
||||
messages: result.messages
|
||||
? JSON.parse(result.messages as string)
|
||||
: [],
|
||||
created_at: result.created_at as number,
|
||||
updated_at: result.updated_at as number,
|
||||
expires_at: result.expires_at as number,
|
||||
// Spread any additional fields from result
|
||||
...this.parseAdditionalFields(result),
|
||||
} as T;
|
||||
|
||||
logger.info('세션 조회 성공', {
|
||||
userId,
|
||||
status: session.status,
|
||||
tableName: this.config.tableName
|
||||
});
|
||||
|
||||
return session;
|
||||
} catch (error) {
|
||||
logger.error('세션 조회 실패', error as Error, { userId, tableName: this.config.tableName });
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Save session to D1 database (insert or replace)
|
||||
*
|
||||
* @param db - D1 Database
|
||||
* @param session - Session to save
|
||||
*/
|
||||
async save(db: D1Database, session: T): Promise<void> {
|
||||
try {
|
||||
const now = Date.now();
|
||||
session.updated_at = now;
|
||||
session.expires_at = now + this.config.ttlMs;
|
||||
|
||||
// Build additional columns from subclass
|
||||
const additionalColumns = this.getAdditionalColumns(session);
|
||||
const additionalColumnNames = Object.keys(additionalColumns);
|
||||
const additionalColumnPlaceholders = additionalColumnNames.map(() => '?').join(', ');
|
||||
const additionalColumnValues = Object.values(additionalColumns);
|
||||
|
||||
// Build SQL with optional additional columns
|
||||
const baseColumns = ['user_id', 'status', 'collected_info', 'messages', 'created_at', 'updated_at', 'expires_at'];
|
||||
const allColumns = [...baseColumns, ...additionalColumnNames];
|
||||
const allPlaceholders = [...Array(baseColumns.length).fill('?'), ...Array(additionalColumnNames.length).fill('?')];
|
||||
|
||||
const sql = `
|
||||
INSERT INTO ${this.config.tableName}
|
||||
(${allColumns.join(', ')})
|
||||
VALUES (${allPlaceholders.join(', ')})
|
||||
ON CONFLICT(user_id) DO UPDATE SET
|
||||
status = excluded.status,
|
||||
collected_info = excluded.collected_info,
|
||||
messages = excluded.messages,
|
||||
updated_at = excluded.updated_at,
|
||||
expires_at = excluded.expires_at
|
||||
${additionalColumnNames.length > 0 ? ', ' + additionalColumnNames.map(col => `${col} = excluded.${col}`).join(', ') : ''}
|
||||
`;
|
||||
|
||||
const baseValues = [
|
||||
session.user_id,
|
||||
session.status,
|
||||
JSON.stringify(session.collected_info || {}),
|
||||
JSON.stringify(session.messages || []),
|
||||
session.created_at || now,
|
||||
now,
|
||||
session.expires_at
|
||||
];
|
||||
|
||||
await db.prepare(sql).bind(...baseValues, ...additionalColumnValues).run();
|
||||
|
||||
logger.info('세션 저장 성공', {
|
||||
userId: session.user_id,
|
||||
status: session.status,
|
||||
tableName: this.config.tableName
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('세션 저장 실패', error as Error, { userId: session.user_id, tableName: this.config.tableName });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete session from D1 database
|
||||
*
|
||||
* @param db - D1 Database
|
||||
* @param userId - Telegram User ID
|
||||
*/
|
||||
async delete(db: D1Database, userId: string): Promise<void> {
|
||||
try {
|
||||
await db.prepare(
|
||||
`DELETE FROM ${this.config.tableName} WHERE user_id = ?`
|
||||
).bind(userId).run();
|
||||
|
||||
logger.info('세션 삭제 성공', { userId, tableName: this.config.tableName });
|
||||
} catch (error) {
|
||||
logger.error('세션 삭제 실패', error as Error, { userId, tableName: this.config.tableName });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if session exists (without full load)
|
||||
*
|
||||
* @param db - D1 Database
|
||||
* @param userId - Telegram User ID
|
||||
* @returns true if active session exists, false otherwise
|
||||
*/
|
||||
async has(db: D1Database, userId: string): Promise<boolean> {
|
||||
try {
|
||||
const now = Date.now();
|
||||
const result = await db.prepare(
|
||||
`SELECT expires_at FROM ${this.config.tableName} WHERE user_id = ? AND expires_at > ?`
|
||||
).bind(userId, now).first();
|
||||
|
||||
return result !== null;
|
||||
} catch (error) {
|
||||
logger.error('세션 존재 확인 실패', error as Error, { userId, tableName: this.config.tableName });
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new session object
|
||||
*
|
||||
* @param userId - Telegram User ID
|
||||
* @param status - Initial status
|
||||
* @param additionalFields - Additional fields for subclass
|
||||
* @returns New session object
|
||||
*/
|
||||
create(userId: string, status: string, additionalFields?: Partial<T>): T {
|
||||
const now = Date.now();
|
||||
return {
|
||||
user_id: userId,
|
||||
status,
|
||||
collected_info: {},
|
||||
messages: [],
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
expires_at: now + this.config.ttlMs,
|
||||
...additionalFields,
|
||||
} as T;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if session is expired
|
||||
*
|
||||
* @param session - Session to check
|
||||
* @returns true if expired, false otherwise
|
||||
*/
|
||||
isExpired(session: T): boolean {
|
||||
return session.expires_at < Date.now();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add message to session with max limit
|
||||
*
|
||||
* @param session - Session to modify
|
||||
* @param role - Message role ('user' | 'assistant')
|
||||
* @param content - Message content
|
||||
*/
|
||||
addMessage(session: T, role: 'user' | 'assistant', content: string): void {
|
||||
session.messages.push({ role, content });
|
||||
|
||||
// Trim old messages if over limit
|
||||
if (session.messages.length > this.config.maxMessages) {
|
||||
session.messages = session.messages.slice(-this.config.maxMessages);
|
||||
logger.warn('세션 메시지 최대 개수 초과, 오래된 메시지 제거', {
|
||||
userId: session.user_id,
|
||||
maxMessages: this.config.maxMessages,
|
||||
tableName: this.config.tableName
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or create session (convenience method)
|
||||
*
|
||||
* @param db - D1 Database
|
||||
* @param userId - Telegram User ID
|
||||
* @param initialStatus - Status for new session
|
||||
* @returns Existing or new session
|
||||
*/
|
||||
async getOrCreate(db: D1Database, userId: string, initialStatus: string): Promise<T> {
|
||||
const existing = await this.get(db, userId);
|
||||
if (existing) return existing;
|
||||
return this.create(userId, initialStatus);
|
||||
}
|
||||
|
||||
/**
|
||||
* Override this method in subclasses to parse additional fields from DB result
|
||||
* (e.g., target_domain for DomainSession, last_recommendation for ServerSession)
|
||||
*
|
||||
* @param result - Raw DB result
|
||||
* @returns Additional fields to merge into session
|
||||
*/
|
||||
protected parseAdditionalFields(result: Record<string, unknown>): Partial<T> {
|
||||
return {};
|
||||
}
|
||||
|
||||
/**
|
||||
* Override this method in subclasses to provide additional columns for saving
|
||||
* (e.g., target_domain for DomainSession, last_recommendation for ServerSession)
|
||||
*
|
||||
* @param session - Session to save
|
||||
* @returns Additional column values
|
||||
*/
|
||||
protected getAdditionalColumns(session: T): Record<string, unknown> {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Specialized session manager for Domain Agent
|
||||
* Handles target_domain field
|
||||
*/
|
||||
export class DomainSessionManager extends SessionManager<DomainSession> {
|
||||
protected parseAdditionalFields(result: Record<string, unknown>): Partial<DomainSession> {
|
||||
return {
|
||||
target_domain: result.target_domain as string | undefined,
|
||||
};
|
||||
}
|
||||
|
||||
protected getAdditionalColumns(session: DomainSession): Record<string, unknown> {
|
||||
return {
|
||||
target_domain: session.target_domain || null,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Specialized session manager for Server Agent
|
||||
* Handles last_recommendation field
|
||||
*/
|
||||
export class ServerSessionManager extends SessionManager<ServerSession> {
|
||||
protected parseAdditionalFields(result: Record<string, unknown>): Partial<ServerSession> {
|
||||
return {
|
||||
last_recommendation: result.last_recommendation
|
||||
? JSON.parse(result.last_recommendation as string)
|
||||
: undefined,
|
||||
};
|
||||
}
|
||||
|
||||
protected getAdditionalColumns(session: ServerSession): Record<string, unknown> {
|
||||
return {
|
||||
last_recommendation: session.last_recommendation
|
||||
? JSON.stringify(session.last_recommendation)
|
||||
: null,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Import types (avoid circular dependency by importing at end)
|
||||
import type { DomainSession, ServerSession } from '../types';
|
||||
Reference in New Issue
Block a user