mirror of
https://codeberg.org/D3M0N/thread-bot.git
synced 2025-04-11 20:58:47 +02:00
130 lines
4.5 KiB
Python
130 lines
4.5 KiB
Python
import asyncio
|
|
import logging
|
|
from typing import Optional
|
|
from time import time
|
|
|
|
from nio import (
|
|
AsyncClient,
|
|
MatrixRoom,
|
|
RoomMessage,
|
|
InviteMemberEvent,
|
|
JoinError
|
|
)
|
|
|
|
from config import load_config, Config
|
|
|
|
class ThreadBot:
|
|
def __init__(self, config: Config):
|
|
self.config = config
|
|
self.client: Optional[AsyncClient] = None
|
|
self.start_time = time()
|
|
self.setup_logging()
|
|
|
|
def setup_logging(self) -> None:
|
|
logging.basicConfig(
|
|
level=self.config.log_level,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
self.logger = logging.getLogger("ThreadBot")
|
|
|
|
async def logout(self) -> None:
|
|
if self.client and self.client.logged_in:
|
|
try:
|
|
await self.client.logout()
|
|
self.logger.info("Successfully logged out")
|
|
except Exception as e:
|
|
self.logger.error(f"Error during logout: {e}")
|
|
finally:
|
|
await self.client.close()
|
|
|
|
async def handle_invite(self, room: MatrixRoom, event: InviteMemberEvent) -> None:
|
|
if event.state_key != self.client.user_id:
|
|
return
|
|
|
|
try:
|
|
self.logger.info(f"Received invite to room {room.room_id}")
|
|
await self.client.join(room.room_id)
|
|
await self.client.room_send(
|
|
room_id=room.room_id,
|
|
message_type="m.room.message",
|
|
content={
|
|
"msgtype": "m.text",
|
|
"body": f"To work properly, I need moderator rights (power level 50+) to delete posts"
|
|
}
|
|
)
|
|
except JoinError as e:
|
|
self.logger.error(f"Error joining room {room.room_id}: {e}")
|
|
|
|
async def handle_message(self, room: MatrixRoom, event: RoomMessage) -> None:
|
|
if event.server_timestamp / 1000 < self.start_time:
|
|
return
|
|
|
|
if event.sender == self.client.user_id:
|
|
return
|
|
|
|
try:
|
|
power_levels = await self.client.room_get_state_event(
|
|
room.room_id,
|
|
"m.room.power_levels"
|
|
)
|
|
user_level = power_levels.content.get("users", {}).get(event.sender, 0)
|
|
is_threaded = bool(event.source.get('content', {}).get('m.relates_to', {}).get('rel_type') == 'm.thread')
|
|
|
|
if not is_threaded and user_level < self.config.required_power_level:
|
|
self.logger.info(f"Removing non-threaded message from {event.sender} in room {room.room_id}")
|
|
await self.client.room_redact(
|
|
room_id=room.room_id,
|
|
event_id=event.event_id,
|
|
reason="Messages must be in threads unless sent by admin"
|
|
)
|
|
return
|
|
|
|
if event.body == "!thread_bot help":
|
|
if user_level < self.config.required_power_level:
|
|
await self.client.room_send(
|
|
room_id=room.room_id,
|
|
message_type="m.room.message",
|
|
content={
|
|
"msgtype": "m.text",
|
|
"body": self.config.help_message
|
|
}
|
|
)
|
|
return
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing message in room {room.room_id}: {e}")
|
|
|
|
async def start(self) -> None:
|
|
self.client = AsyncClient(self.config.homeserver, self.config.user_id)
|
|
|
|
try:
|
|
await self.client.login(self.config.password)
|
|
self.logger.info("Successfully logged in")
|
|
|
|
self.client.add_event_callback(self.handle_message, RoomMessage)
|
|
self.client.add_event_callback(self.handle_invite, InviteMemberEvent)
|
|
|
|
await self.client.sync_forever()
|
|
except Exception as e:
|
|
self.logger.error(f"Error during bot execution: {e}")
|
|
finally:
|
|
await self.logout()
|
|
|
|
async def main() -> None:
|
|
config = load_config()
|
|
bot = ThreadBot(config)
|
|
try:
|
|
await bot.start()
|
|
except KeyboardInterrupt:
|
|
logging.info("Bot stopped by user")
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error: {e}")
|
|
finally:
|
|
await bot.logout()
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
logging.info("Bot stopped by user")
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error: {e}")
|