Files
kappa e610a45fcf Initial commit: Telegram Web Client with bot chat sync
- Backend: FastAPI + Telethon v2 WebSocket server
- Frontend: React + TypeScript + Vite + Zustand
- Features: Phone auth, 2FA, real-time bot chat
- Fix: Use chats= instead of from_users= to sync messages from all devices
- Config: BOT_USERNAME=AnvilForgeBot

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 13:55:22 +09:00

254 lines
8.8 KiB
Python

import json
import asyncio
from typing import Dict, Optional
from fastapi import WebSocket, WebSocketDisconnect
import logging
from telegram.client import TelegramClientWrapper
from telegram.bot_chat import BotChatManager
from models.schemas import MessageResponse
logger = logging.getLogger(__name__)
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.clients: Dict[str, TelegramClientWrapper] = {}
self.bot_managers: Dict[str, BotChatManager] = {}
async def connect(self, websocket: WebSocket, session_id: str):
"""Accept WebSocket connection"""
await websocket.accept()
self.active_connections[session_id] = websocket
# Initialize Telegram client
client_wrapper = TelegramClientWrapper(session_id)
await client_wrapper.init_client()
self.clients[session_id] = client_wrapper
logger.info(f"WebSocket connected: {session_id}")
async def disconnect(self, session_id: str):
"""Handle WebSocket disconnection"""
if session_id in self.active_connections:
del self.active_connections[session_id]
if session_id in self.clients:
await self.clients[session_id].disconnect()
del self.clients[session_id]
if session_id in self.bot_managers:
del self.bot_managers[session_id]
logger.info(f"WebSocket disconnected: {session_id}")
async def send_message(self, session_id: str, message: dict):
"""Send message to specific connection"""
if session_id in self.active_connections:
await self.active_connections[session_id].send_json(message)
async def handle_message(self, session_id: str, data: dict):
"""Handle incoming WebSocket message"""
msg_type = data.get("type")
payload = data.get("data", {})
client = self.clients.get(session_id)
if not client:
await self.send_message(session_id, {
"type": "error",
"data": {"message": "Client not initialized"}
})
return
try:
if msg_type == "check_auth":
await self._handle_check_auth(session_id, client)
elif msg_type == "send_code":
await self._handle_send_code(session_id, client, payload)
elif msg_type == "verify_code":
await self._handle_verify_code(session_id, client, payload)
elif msg_type == "verify_password":
await self._handle_verify_password(session_id, client, payload)
elif msg_type == "send_message":
await self._handle_send_message(session_id, payload)
elif msg_type == "get_history":
await self._handle_get_history(session_id)
else:
await self.send_message(session_id, {
"type": "error",
"data": {"message": f"Unknown message type: {msg_type}"}
})
except Exception as e:
logger.error(f"Error handling message: {e}")
await self.send_message(session_id, {
"type": "error",
"data": {"message": str(e)}
})
async def _handle_check_auth(self, session_id: str, client: TelegramClientWrapper):
"""Check if user is authenticated"""
is_auth = await client.is_authorized()
if is_auth:
me = await client.get_me()
# Initialize bot chat
bot_manager = BotChatManager(client)
await bot_manager.init_bot_chat()
# Set up callback for incoming messages
async def on_bot_message(msg: MessageResponse):
await self.send_message(session_id, {
"type": "new_message",
"data": msg.model_dump(mode='json')
})
bot_manager.set_message_callback(on_bot_message)
self.bot_managers[session_id] = bot_manager
await self.send_message(session_id, {
"type": "auth_status",
"data": {
"authenticated": True,
"user_id": me.id,
"username": me.username,
"first_name": me.first_name
}
})
else:
await self.send_message(session_id, {
"type": "auth_status",
"data": {"authenticated": False}
})
async def _handle_send_code(self, session_id: str, client: TelegramClientWrapper, payload: dict):
"""Send verification code"""
phone = payload.get("phone")
if not phone:
await self.send_message(session_id, {
"type": "error",
"data": {"message": "Phone number required"}
})
return
phone_code_hash = await client.send_code(phone)
await self.send_message(session_id, {
"type": "code_sent",
"data": {
"phone": phone,
"phone_code_hash": phone_code_hash
}
})
async def _handle_verify_code(self, session_id: str, client: TelegramClientWrapper, payload: dict):
"""Verify code and sign in"""
phone = payload.get("phone")
code = payload.get("code")
phone_code_hash = payload.get("phone_code_hash")
result = await client.sign_in_with_code(phone, code, phone_code_hash)
if result.get("success"):
# Initialize bot chat after successful auth
bot_manager = BotChatManager(client)
await bot_manager.init_bot_chat()
async def on_bot_message(msg: MessageResponse):
await self.send_message(session_id, {
"type": "new_message",
"data": msg.model_dump(mode='json')
})
bot_manager.set_message_callback(on_bot_message)
self.bot_managers[session_id] = bot_manager
await self.send_message(session_id, {
"type": "auth_success",
"data": result
})
elif result.get("needs_password"):
await self.send_message(session_id, {
"type": "need_password",
"data": {"message": result.get("message")}
})
else:
await self.send_message(session_id, {
"type": "auth_error",
"data": {"message": result.get("message")}
})
async def _handle_verify_password(self, session_id: str, client: TelegramClientWrapper, payload: dict):
"""Verify 2FA password"""
password = payload.get("password")
result = await client.sign_in_with_password(password)
if result.get("success"):
# Initialize bot chat after successful auth
bot_manager = BotChatManager(client)
await bot_manager.init_bot_chat()
async def on_bot_message(msg: MessageResponse):
await self.send_message(session_id, {
"type": "new_message",
"data": msg.model_dump(mode='json')
})
bot_manager.set_message_callback(on_bot_message)
self.bot_managers[session_id] = bot_manager
await self.send_message(session_id, {
"type": "auth_success",
"data": result
})
else:
await self.send_message(session_id, {
"type": "auth_error",
"data": {"message": result.get("message")}
})
async def _handle_send_message(self, session_id: str, payload: dict):
"""Send message to bot"""
bot_manager = self.bot_managers.get(session_id)
if not bot_manager:
await self.send_message(session_id, {
"type": "error",
"data": {"message": "Not authenticated or bot chat not initialized"}
})
return
text = payload.get("text")
if not text:
return
msg = await bot_manager.send_message(text)
await self.send_message(session_id, {
"type": "message_sent",
"data": msg.model_dump(mode='json')
})
async def _handle_get_history(self, session_id: str):
"""Get chat history"""
bot_manager = self.bot_managers.get(session_id)
if not bot_manager:
await self.send_message(session_id, {
"type": "error",
"data": {"message": "Not authenticated"}
})
return
messages = await bot_manager.get_history()
await self.send_message(session_id, {
"type": "history",
"data": {"messages": [m.model_dump(mode='json') for m in messages]}
})
manager = ConnectionManager()