import json import asyncio from typing import Dict, Optional from fastapi import WebSocket, WebSocketDisconnect import logging from telegram.client import TelegramClientWrapper from telegram.bot_chat import BotChatManager from models.schemas import MessageResponse logger = logging.getLogger(__name__) class ConnectionManager: def __init__(self): self.active_connections: Dict[str, WebSocket] = {} self.clients: Dict[str, TelegramClientWrapper] = {} self.bot_managers: Dict[str, BotChatManager] = {} async def connect(self, websocket: WebSocket, session_id: str): """Accept WebSocket connection""" await websocket.accept() self.active_connections[session_id] = websocket # Initialize Telegram client client_wrapper = TelegramClientWrapper(session_id) await client_wrapper.init_client() self.clients[session_id] = client_wrapper logger.info(f"WebSocket connected: {session_id}") async def disconnect(self, session_id: str): """Handle WebSocket disconnection""" if session_id in self.active_connections: del self.active_connections[session_id] if session_id in self.clients: await self.clients[session_id].disconnect() del self.clients[session_id] if session_id in self.bot_managers: del self.bot_managers[session_id] logger.info(f"WebSocket disconnected: {session_id}") async def send_message(self, session_id: str, message: dict): """Send message to specific connection""" if session_id in self.active_connections: await self.active_connections[session_id].send_json(message) async def handle_message(self, session_id: str, data: dict): """Handle incoming WebSocket message""" msg_type = data.get("type") payload = data.get("data", {}) client = self.clients.get(session_id) if not client: await self.send_message(session_id, { "type": "error", "data": {"message": "Client not initialized"} }) return try: if msg_type == "check_auth": await self._handle_check_auth(session_id, client) elif msg_type == "send_code": await self._handle_send_code(session_id, client, payload) elif msg_type == "verify_code": await self._handle_verify_code(session_id, client, payload) elif msg_type == "verify_password": await self._handle_verify_password(session_id, client, payload) elif msg_type == "send_message": await self._handle_send_message(session_id, payload) elif msg_type == "get_history": await self._handle_get_history(session_id) else: await self.send_message(session_id, { "type": "error", "data": {"message": f"Unknown message type: {msg_type}"} }) except Exception as e: logger.error(f"Error handling message: {e}") await self.send_message(session_id, { "type": "error", "data": {"message": str(e)} }) async def _handle_check_auth(self, session_id: str, client: TelegramClientWrapper): """Check if user is authenticated""" is_auth = await client.is_authorized() if is_auth: me = await client.get_me() # Initialize bot chat bot_manager = BotChatManager(client) await bot_manager.init_bot_chat() # Set up callback for incoming messages async def on_bot_message(msg: MessageResponse): await self.send_message(session_id, { "type": "new_message", "data": msg.model_dump(mode='json') }) bot_manager.set_message_callback(on_bot_message) self.bot_managers[session_id] = bot_manager await self.send_message(session_id, { "type": "auth_status", "data": { "authenticated": True, "user_id": me.id, "username": me.username, "first_name": me.first_name } }) else: await self.send_message(session_id, { "type": "auth_status", "data": {"authenticated": False} }) async def _handle_send_code(self, session_id: str, client: TelegramClientWrapper, payload: dict): """Send verification code""" phone = payload.get("phone") if not phone: await self.send_message(session_id, { "type": "error", "data": {"message": "Phone number required"} }) return phone_code_hash = await client.send_code(phone) await self.send_message(session_id, { "type": "code_sent", "data": { "phone": phone, "phone_code_hash": phone_code_hash } }) async def _handle_verify_code(self, session_id: str, client: TelegramClientWrapper, payload: dict): """Verify code and sign in""" phone = payload.get("phone") code = payload.get("code") phone_code_hash = payload.get("phone_code_hash") result = await client.sign_in_with_code(phone, code, phone_code_hash) if result.get("success"): # Initialize bot chat after successful auth bot_manager = BotChatManager(client) await bot_manager.init_bot_chat() async def on_bot_message(msg: MessageResponse): await self.send_message(session_id, { "type": "new_message", "data": msg.model_dump(mode='json') }) bot_manager.set_message_callback(on_bot_message) self.bot_managers[session_id] = bot_manager await self.send_message(session_id, { "type": "auth_success", "data": result }) elif result.get("needs_password"): await self.send_message(session_id, { "type": "need_password", "data": {"message": result.get("message")} }) else: await self.send_message(session_id, { "type": "auth_error", "data": {"message": result.get("message")} }) async def _handle_verify_password(self, session_id: str, client: TelegramClientWrapper, payload: dict): """Verify 2FA password""" password = payload.get("password") result = await client.sign_in_with_password(password) if result.get("success"): # Initialize bot chat after successful auth bot_manager = BotChatManager(client) await bot_manager.init_bot_chat() async def on_bot_message(msg: MessageResponse): await self.send_message(session_id, { "type": "new_message", "data": msg.model_dump(mode='json') }) bot_manager.set_message_callback(on_bot_message) self.bot_managers[session_id] = bot_manager await self.send_message(session_id, { "type": "auth_success", "data": result }) else: await self.send_message(session_id, { "type": "auth_error", "data": {"message": result.get("message")} }) async def _handle_send_message(self, session_id: str, payload: dict): """Send message to bot""" bot_manager = self.bot_managers.get(session_id) if not bot_manager: await self.send_message(session_id, { "type": "error", "data": {"message": "Not authenticated or bot chat not initialized"} }) return text = payload.get("text") if not text: return msg = await bot_manager.send_message(text) await self.send_message(session_id, { "type": "message_sent", "data": msg.model_dump(mode='json') }) async def _handle_get_history(self, session_id: str): """Get chat history""" bot_manager = self.bot_managers.get(session_id) if not bot_manager: await self.send_message(session_id, { "type": "error", "data": {"message": "Not authenticated"} }) return messages = await bot_manager.get_history() await self.send_message(session_id, { "type": "history", "data": {"messages": [m.model_dump(mode='json') for m in messages]} }) manager = ConnectionManager()