Files
telegram-web-client/backend/telegram/bot_chat.py
kappa e610a45fcf Initial commit: Telegram Web Client with bot chat sync
- Backend: FastAPI + Telethon v2 WebSocket server
- Frontend: React + TypeScript + Vite + Zustand
- Features: Phone auth, 2FA, real-time bot chat
- Fix: Use chats= instead of from_users= to sync messages from all devices
- Config: BOT_USERNAME=AnvilForgeBot

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 13:55:22 +09:00

78 lines
2.7 KiB
Python

from typing import Optional, Callable, List
from telethon import events
from telethon.tl.types import Message
from config import BOT_USERNAME
from telegram.client import TelegramClientWrapper
from models.schemas import MessageResponse
class BotChatManager:
def __init__(self, client_wrapper: TelegramClientWrapper):
self.client_wrapper = client_wrapper
self.bot_entity = None
self._message_callback: Optional[Callable] = None
async def init_bot_chat(self):
"""Initialize chat with the bot"""
client = self.client_wrapper.client
if not client:
raise RuntimeError("Client not initialized")
# Resolve bot entity
self.bot_entity = await client.get_entity(BOT_USERNAME)
# Set up event handler for all messages in bot chat (incoming and outgoing)
@client.on(events.NewMessage(chats=self.bot_entity))
async def handle_bot_message(event: events.NewMessage):
if self._message_callback:
msg = event.message
response = MessageResponse(
id=msg.id,
text=msg.text,
date=msg.date,
is_outgoing=msg.out,
sender_name="You" if msg.out else BOT_USERNAME
)
await self._message_callback(response)
return self.bot_entity
def set_message_callback(self, callback: Callable):
"""Set callback for incoming messages"""
self._message_callback = callback
async def send_message(self, text: str) -> MessageResponse:
"""Send message to the bot"""
client = self.client_wrapper.client
if not client or not self.bot_entity:
raise RuntimeError("Bot chat not initialized")
msg = await client.send_message(self.bot_entity, text)
return MessageResponse(
id=msg.id,
text=msg.text,
date=msg.date,
is_outgoing=True,
sender_name="You"
)
async def get_history(self, limit: int = 50) -> List[MessageResponse]:
"""Get chat history with the bot"""
client = self.client_wrapper.client
if not client or not self.bot_entity:
raise RuntimeError("Bot chat not initialized")
messages = []
async for msg in client.iter_messages(self.bot_entity, limit=limit):
messages.append(MessageResponse(
id=msg.id,
text=msg.text,
date=msg.date,
is_outgoing=msg.out,
sender_name="You" if msg.out else BOT_USERNAME
))
# Return in chronological order
return list(reversed(messages))