added rss, autojoin module, ping and echo modules are combined, added docker support, added logger

This commit is contained in:
D3M0N 2025-03-22 17:39:23 +05:00
parent a5385a90fe
commit 7e5e1e34c5
27 changed files with 708 additions and 174 deletions

11
Dockerfile Normal file
View file

@ -0,0 +1,11 @@
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "main.py"]

View file

@ -1,6 +1,65 @@
# Antichrist Bot
## Description
**Antichrist Bot** is a flexible and extensible tool designed to automate tasks, interact with users, and integrate with various services in the Matrix ecosystem. The bot is built on a modular architecture that allows you to easily add, remove or customize functionality depending on your needs. Each module is responsible for a specific task, making the bot versatile and easy to use.
# Antichrist Bot
**Antichrist Bot** is a flexible and extensible tool designed to automate tasks, interact with users, and integrate with various services in the Matrix ecosystem. The bot is built on a modular architecture that allows you to easily add, remove, or customize functionality depending on your needs. Each module is responsible for a specific task, making the bot versatile and easy to use.
## Modules
[modules](modules/README.md)
### RSS Module
The **RSS Module** allows the bot to subscribe to RSS feeds and automatically post updates to specified Matrix rooms. Users can manage subscriptions, set custom templates for feed updates, and control the frequency of updates.
**Features:**
- **Add RSS Feeds:** Subscribe to new RSS feeds using the `rss add <feed_url>` command.
- **Remove RSS Feeds:** Unsubscribe from RSS feeds using the `rss remove <index>` command.
- **List Subscriptions:** View all active RSS subscriptions in a room with the `rss list` command.
- **Custom Templates:** Define custom templates for formatting RSS updates using the `template` command.
- **Automatic Updates:** The bot periodically checks subscribed feeds for new content and posts updates to the room.
**Commands:**
- `rss add <feed_url>`: Add a new RSS feed subscription.
- `rss remove <index>`: Remove an RSS feed subscription by index.
- `rss list`: List all RSS feed subscriptions in the room.
- `template add <name> <content>`: Add a new template for formatting RSS updates.
- `template set <index> <template_name>`: Set a template for a specific RSS feed subscription.
- `template list`: List all templates available in the room.
### Autojoin Module
The **Autojoin Module** automatically joins the bot to any rooms it is invited to. This module is useful for ensuring the bot is always available in the rooms where it is needed without manual intervention.
### Ping Module
The **Ping Module** provides basic utility commands for testing and debugging. It includes commands to check the bot's responsiveness and to echo messages back to the user.
**Features:**
- **Ping Command:** Responds with "Pong!" and the latency in milliseconds.
- **Echo Command:** Repeats the provided message back to the user.
**Commands:**
- `ping`: Check the bot's responsiveness.
- `echo <message>`: Repeat the provided message.
## Getting Started
To get started with **Antichrist Bot**, follow these steps:
1. **Installation:** Clone the repository.
- ```commandline
git clone https://codeberg.org/D3M0N/antichrist_bot.git
cd antichrist_bot
```
2. **Configuration:** Modify the configuration files to set up the bot's credentials and desired modules.
- change the values in the config `configs/example.config.toml`
- rename the config `mv configs/example.config.toml configs/config.toml`
3. **Run the Bot:** Start the bot using the provided startup script.
- Directly with Python:
```commandline
python -m venv .venv
source .venv/bin/activate
python main.py
```
- Using Docker: Build and run the bot with Docker Compose:
```commandline
docker-compose up -d
```
4. **Invite the Bot:** Invite the bot to your Matrix rooms and start using its features.
## License
**Antichrist Bot** is licensed under the [GPLv3](LICENSE).

View file

@ -5,4 +5,5 @@ password = ""
[bot]
command_prefix = "!"
admins = ["@d3v1l-h4cker:4d2.org"]
module_folder = "modules/"
whitelisted_users = []

View file

View file

@ -1,13 +1,18 @@
import inspect
from typing import Callable
import markdown
from typing import Callable, List, Type, Union
from nio import Event, InviteEvent
from core.bot_interface import BotInterface
from core.command import Command
from core.logger import logger
class BaseModule:
def __init__(self, bot: BotInterface):
self.bot = bot
self.client = bot.client
self._pending_subcommands: List[Callable] = []
self.register_handlers()
def on_start(self):
@ -18,40 +23,64 @@ class BaseModule:
def register_handlers(self):
for name, method in inspect.getmembers(self, predicate=inspect.iscoroutinefunction):
if hasattr(method, "is_command_handler"):
command = Command(method.command_name, method.command_description, method)
self.bot.command_processor.register_command(command)
if hasattr(method, "is_subcommand"):
parent_command_name = method.parent_command_name
parent_command = next((cmd for cmd in self.bot.command_processor.commands.values() if cmd.name == parent_command_name), None)
if parent_command is None:
print(f"Warning: Parent command '{parent_command_name}' not found for subcommand '{method.subcommand_name}'")
else:
subcommand = Command(method.subcommand_name, method.subcommand_description, method)
parent_command.add_subcommand(subcommand)
if hasattr(method, "_command_info"):
self._register_command(method)
elif hasattr(method, "_subcommand_info"):
self._pending_subcommands.append(method)
elif hasattr(method, "_event_type"):
self._register_event_handler(method)
if hasattr(method, "is_event_handler"):
self.bot.event_dispatcher.subscribe(method.event_type, method)
for method in self._pending_subcommands:
self._register_subcommand(method)
def _register_command(self, method: Callable):
command_info = getattr(method, "_command_info")
required_level = getattr(method, "_required_level", 0) # Уровень прав по умолчанию
command = Command(
name=command_info["name"],
description=command_info["description"],
handler=method,
required_level=required_level
)
self.bot.command_processor.register_command(command)
def _register_subcommand(self, method: Callable):
subcommand_info = getattr(method, "_subcommand_info")
parent_command = self.bot.command_processor.commands.get(subcommand_info["parent_name"])
if not parent_command:
logger.warning(f"Parent command '{subcommand_info['parent_name']}' not found for subcommand '{subcommand_info['name']}'")
return
required_level = getattr(method, "_required_level", 0) # Уровень прав по умолчанию
subcommand = Command(
name=subcommand_info["name"],
description=subcommand_info["description"],
handler=method,
required_level=required_level
)
parent_command.add_subcommand(subcommand)
def _register_event_handler(self, method: Callable):
event_type = getattr(method, "_event_type")
self.bot.event_dispatcher.subscribe(event_type, method)
@classmethod
def command_handler_decorator(cls, name: str, description: str):
def command_handler(cls, name: str, description: str, required_level: int = 0):
def decorator(func: Callable):
func.is_command_handler = True
func.command_name = name
func.command_description = description
func.subcommand = cls.create_subcommand_decorator(func.__name__)
func._command_info = {"name": name, "description": description}
func._required_level = required_level
func.subcommand = cls._create_subcommand_decorator(name)
return func
return decorator
@classmethod
def create_subcommand_decorator(cls, parent_command_name: str):
def subcommand_decorator(name: str, description: str):
def _create_subcommand_decorator(cls, parent_name: str):
def subcommand_decorator(name: str, description: str, required_level: int = 0):
def decorator(func: Callable):
func.is_subcommand = True
func.subcommand_name = name
func.subcommand_description = description
func.parent_command_name = parent_command_name
func._subcommand_info = {"parent_name": parent_name, "name": name, "description": description}
func._required_level = required_level
return func
return decorator
@ -59,8 +88,29 @@ class BaseModule:
return subcommand_decorator
@classmethod
def on_event_decorator(cls, event_type: type):
def on_event(cls, event_type: Type[Union[Event, InviteEvent]]):
def decorator(func: Callable):
func.is_event_handler = True
func.event_type = event_type
func._event_type = event_type
return func
return decorator
async def send_message(self, room_id: str, message: str, send_formatted: bool = True):
if send_formatted:
formatted_body = {
"msgtype": "m.text",
"body": message,
"format": "org.matrix.custom.html",
"formatted_body": markdown.markdown(message)
}
else:
formatted_body = {
"msgtype": "m.text",
"body": message
}
await self.bot.client.room_send(
room_id,
"m.room.message",
formatted_body
)

View file

@ -1,57 +1,69 @@
from nio import AsyncClient, Event, RoomMessageText, MatrixRoom
from datetime import datetime
from typing import Union
from nio import AsyncClient, Event, InviteEvent, RoomMessageText, MatrixRoom
from core.bot_interface import BotInterface
from core.module_manager import ModuleManager
from core.command import PermissionManager, CommandProcessor
from core.event_dispatcher import EventDispatcher
from core.command import CommandProcessor
from core.config import ConfigManager
from core.logger import logger
class Bot(BotInterface):
def __init__(self, config: ConfigManager):
self._config = config
self._credentials = self.config.get("credentials", None)
self._homeserver = self._credentials.get("homeserver")
self._user_id = self._credentials.get("user_id")
self._password = self._credentials.get("password")
self._token = self._credentials.get("token")
self._homeserver = self._config.get("credentials.homeserver")
self._user_id = self._config.get("credentials.user_id")
self._password = self._config.get("credentials.password")
self._token = self._config.get("credentials.token")
self._bot_config = config.get("bot", None)
self._module_folder = self._bot_config.get("module_folder")
self._command_prefix = self._bot_config.get("command_prefix")
self._module_folder = self._config.get("bot.module_folder")
self._command_prefix = self._config.get("bot.command_prefix")
self._whitelisted_users = self._config.get("bot.whitelisted_users")
self._client = AsyncClient(self._homeserver, self._user_id)
self._event_dispatcher = EventDispatcher()
self._command_processor = CommandProcessor()
self._permission_manager = PermissionManager(self._client, self._whitelisted_users)
self._command_processor = CommandProcessor(self._permission_manager)
self.module_manager = ModuleManager(self)
self._start_time = None # Время запуска бота
async def start(self):
logger.info("Bot started!")
self._start_time = datetime.now() # Записываем время запуска
if self._token:
self.client.access_token = self._token
else:
await self.client.login(self._password)
self.module_manager.load_modules_from_folder(self._module_folder)
self.client.add_event_callback(self._handle_event, Event)
self.client.add_event_callback(self._handle_event, Union[Event, InviteEvent])
await self.client.sync_forever(timeout=30000)
def stop(self):
async def stop(self):
for module in self.module_manager.modules:
self.module_manager.unregister_module(module)
await self._client.logout()
logger.info("Bot stopped!")
async def _handle_event(self, room: MatrixRoom, event: Event):
if isinstance(event, RoomMessageText) and event.body.startswith(self._command_prefix):
await self.command_processor.execute_command(room, event.body, event)
else:
await self.event_dispatcher.dispatch(room, event)
async def _handle_event(self, room: MatrixRoom, event: Union[Event, InviteEvent]):
if isinstance(event, RoomMessageText):
event_time = datetime.fromtimestamp(event.server_timestamp / 1000)
if self._start_time and event_time < self._start_time:
return
if event.body.startswith(self._command_prefix):
await self.command_processor.execute_command(room, event.body, event)
await self.event_dispatcher.dispatch(room, event)
@property
def client(self) -> AsyncClient:
return self._client
@property
def config(self) -> ConfigManager:
return self._config
@property
def command_processor(self) -> CommandProcessor:
return self._command_processor
@ -59,3 +71,8 @@ class Bot(BotInterface):
@property
def event_dispatcher(self) -> EventDispatcher:
return self._event_dispatcher
@property
def config(self) -> ConfigManager:
return self._config

View file

@ -9,10 +9,6 @@ class BotInterface(Protocol):
def client(self) -> AsyncClient:
...
@property
def config(self) -> ConfigManager:
...
@property
def command_processor(self) -> CommandProcessor:
...
@ -20,3 +16,7 @@ class BotInterface(Protocol):
@property
def event_dispatcher(self) -> EventDispatcher:
...
@property
def config(self) -> ConfigManager:
...

View file

@ -1,28 +1,55 @@
from typing import List, Callable, Dict
from nio import MatrixRoom, Event
from nio import MatrixRoom, Event, AsyncClient
from core.logger import logger
class PermissionManager:
def __init__(self, client: AsyncClient, whitelisted_users: List[str]):
self.client = client
self.whitelisted_users = whitelisted_users
async def get_user_power_level(self, room_id: str, user_id: str) -> int:
try:
power_levels = await self.client.room_get_state_event(room_id, "m.room.power_levels")
users_power_levels = power_levels.content.get("users", {})
return users_power_levels.get(user_id, 0)
except Exception as e:
logger.error(f"Failed to get power levels for room {room_id}: {e}")
return 0
async def has_permission(self, room_id: str, user_id: str, required_level: int) -> bool:
if user_id in self.whitelisted_users:
return True
user_level = await self.get_user_power_level(room_id, user_id)
return user_level >= required_level
class Command:
def __init__(self, name: str, description: str, handler: Callable):
def __init__(self, name: str, description: str, handler: Callable, required_level: int = 0):
self.name = name
self.description = description
self.handler = handler
self.subcommands: Dict[str, Command] = {}
self.required_level = required_level
def add_subcommand(self, subcommand) -> None:
self.subcommands[subcommand.name] = subcommand
async def execute(self, room, args: List[str], event: Event) -> None:
async def execute(self, room, args: List[str], event: Event, permission_manager: PermissionManager) -> None:
user_id = event.sender
if not await permission_manager.has_permission(room.room_id, user_id, self.required_level):
# await self.bot.send_message(room.room_id, "You do not have permission to execute this command.")
return
if args and args[0] in self.subcommands:
subcommand = self.subcommands[args[0]]
await subcommand.execute(room, args[1:], event)
await subcommand.execute(room, args[1:], event, permission_manager)
else:
await self.handler(room, args, event)
class CommandProcessor:
def __init__(self):
def __init__(self, permission_manager: PermissionManager):
self.commands: Dict[str, Command] = {}
self.permission_manager = permission_manager
def register_command(self, command: Command):
self.commands[command.name] = command
@ -35,9 +62,9 @@ class CommandProcessor:
command_message = parts[0][1:]
if command_message in self.commands:
command = self.commands[command_message]
await command.execute(room, parts[1:], event)
await command.execute(room, parts[1:], event, self.permission_manager)
else:
pass # TODO
logger.warning(f"Command {command_message} not found")
def get_commands(self) -> List[Dict[str, str]]:
return [

View file

@ -2,56 +2,40 @@ import tomllib
from pathlib import Path
from typing import Any, Dict
class ConfigManager:
def __init__(self, config_path: str = "config.toml"):
self.config_path = Path(config_path)
self._config: Dict[str, Any] = {}
def __init__(self, file_path: str):
self.file_path = Path(file_path)
self.config: Dict[str, Any] = {}
self._load()
def _load(self):
if self.config_path.exists():
try:
with open(self.config_path, "rb") as f:
self._config = tomllib.load(f)
except Exception as e:
raise Exception(f"Error occurred while loading a config: {e}")
else:
self._config = {}
def _load(self) -> None:
if not self.file_path.exists():
raise FileNotFoundError(f"Config file not found: {self.file_path}")
def save(self) -> None:
try:
with open(self.config_path, "w") as f:
pass
except Exception as e:
raise Exception(f"Error occurred while saving a config: {e}")
def _dict_to_toml(self, d: Dict[str, Any], indent: str = "") -> str:
lines = []
for key, value in d.items():
if isinstance(value, dict):
lines.append(f"\n{indent}[{key}]")
lines.append(self._dict_to_toml(value, indent + " "))
else:
if isinstance(value, str):
value = f'"{value}"'
elif isinstance(value, bool):
value = str(value).lower()
lines.append(f"{indent}{key} = {value}")
return "\n".join(lines)
with open(self.file_path, "rb") as f:
self.config = tomllib.load(f)
def get(self, key: str, default: Any = None) -> Any:
return self._config.get(key, default)
keys = key.split(".")
value = self.config
def set(self, key: str, value: Any) -> None:
self._config[key] = value
self.save()
try:
for k in keys:
value = value[k]
return value
except (KeyError, TypeError):
return default
def delete(self, key: str) -> None:
if key in self._config:
del self._config[key]
self.save()
def __getitem__(self, key: str) -> Any:
return self.get(key)
@property
def config(self) -> Dict[str, Any]:
return self._config.copy()
def __contains__(self, key: str) -> bool:
keys = key.split(".")
value = self.config
try:
for k in keys:
value = value[k]
return True
except (KeyError, TypeError):
return False

View file

@ -1,25 +1,25 @@
from typing import Callable, List, Dict, Type
from nio import Event, MatrixRoom
from typing import Callable, List, Dict, Type, Union
from nio import Event, InviteEvent, MatrixRoom
from core.logger import logger
class EventDispatcher:
def __init__(self):
self.subscribers: Dict[type[Event], List[Callable]] = {}
self.subscribers: Dict[type[Union[Event, InviteEvent]], List[Callable]] = {}
def subscribe(self, event_type: Type[Event], handler: Callable):
def subscribe(self, event_type: Type[Union[InviteEvent, Event]], handler: Callable):
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
def unsubscribe(self, event_type: Type[Event], handler: Callable):
def unsubscribe(self, event_type: Type[Union[InviteEvent, Event]], handler: Callable):
if event_type in self.subscribers:
self.subscribers[event_type].remove(handler)
async def dispatch(self, room: MatrixRoom, event: Event):
async def dispatch(self, room: MatrixRoom, event: Union[Event, InviteEvent]):
event_type = type(event)
if event_type in self.subscribers:
for handler in self.subscribers[event_type]:
try:
await handler(room, event)
except Exception as e:
pass
logger.error(f"Error occurred while handling event: {e}")

20
core/logger.py Normal file
View file

@ -0,0 +1,20 @@
import logging
import sys
def setup_logger(name, log_file='app.log', level=logging.DEBUG):
logger = logging.getLogger(name)
logger.setLevel(level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
logger = setup_logger(__name__)

View file

@ -1,4 +1,3 @@
# core/module_manager.py
import importlib
import inspect
import os
@ -7,6 +6,7 @@ from typing import List, Type
from core.base_module import BaseModule
from core.bot_interface import BotInterface
from core.logger import logger
class ModuleManager:
@ -28,7 +28,7 @@ class ModuleManager:
try:
module = importlib.import_module(module_name_for_import)
except Exception as e:
print(f"Error importing module {module_name_for_import}: {e}")
logger.error(f"Error importing module {module_name_for_import}: {e}")
raise
for name, obj in inspect.getmembers(module):
@ -45,7 +45,7 @@ class ModuleManager:
module_class = self._load_module(module_name)
modules.append(module_class)
except Exception as e:
print(f"Failed to load module {module_name}: {e}") # TODO: Proper logging
logger.error(f"Failed to load module {module_name}: {e}") # TODO: Proper logging
return modules
def register_module(self, module_class: Type['BaseModule']):
@ -53,14 +53,14 @@ class ModuleManager:
module_instance = module_class(self.bot)
self.modules.append(module_instance)
module_instance.on_start()
print(f"Module {module_class.__name__} registered and started.")
logger.info(f"Module {module_class.__name__} registered and started.")
except Exception as e:
print(f"Failed to register module {module_class.__name__}: {e}")
logger.error(f"Failed to register module {module_class.__name__}: {e}")
def unregister_module(self, module: 'BaseModule'):
try:
module.on_stop()
self.modules.remove(module)
print(f"Module {module.__class__.__name__} unregistered and stopped.")
logger.info(f"Module {module.__class__.__name__} unregistered and stopped.")
except Exception as e:
print(f"Failed to unregister module {module.__class__.__name__}: {e}")
logger.error(f"Failed to unregister module {module.__class__.__name__}: {e}")

8
docker-compose.yml Normal file
View file

@ -0,0 +1,8 @@
services:
antichrist_bot:
build: .
container_name: antichrist_bot_instance
volumes:
- ./configs:/app/configs
- ./modules:/app/modules
restart: unless-stopped

28
main.py
View file

@ -1,11 +1,37 @@
import asyncio
import signal
from core.bot import Bot
from core.config import ConfigManager
from core.logger import logger
async def shutdown(signal, loop, bot):
logger.warning(f"Received exit signal {signal.name}...")
await bot.stop()
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
async def main():
config = ConfigManager("configs/config.toml")
bot = Bot(config)
await bot.start()
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(
sig,
lambda s=sig: asyncio.create_task(shutdown(s, loop, bot))
)
try:
await bot.start()
except asyncio.CancelledError:
pass
if __name__ == '__main__':
asyncio.run(main())

View file

@ -1,12 +0,0 @@
# Modules
- **echo** - the echo module repeats (echoes) a message sent by a user back to the same channel or private chat. \
_Command_: `!echo <text>` \
_Example_: \
User: `!echo Hello, world!` \
Bot: `Hello, world!`
- **ping** - The ping module is used to check bot availability and measure response time. \
_Command_: `!ping` \
_Example_: \
User: `!ping` \
Bot: `Pong! (response time: 668 ms)`

View file

@ -0,0 +1,3 @@
from .module import AutoJoinModule
__all__=["AutoJoinModule"]

View file

@ -0,0 +1,24 @@
from nio import InviteMemberEvent, MatrixRoom
from core.base_module import BaseModule
from core.logger import logger
class AutoJoinModule(BaseModule):
def __init__(self, bot):
super().__init__(bot)
def on_start(self):
logger.info("AutoJoinModule started")
def on_stop(self):
logger.info("AutoJoinModule stopped")
@BaseModule.on_event(InviteMemberEvent)
async def on_invite(self, room: MatrixRoom, event: InviteMemberEvent):
logger.info(f"Received invite to room {room.room_id} from {event.sender}")
try:
await self.bot.client.join(room.room_id)
logger.info(f"Joined room {room.room_id}")
except Exception as e:
logger.error(f"Failed to join room {room.room_id}: {e}")

View file

@ -1,3 +0,0 @@
from .echo import EchoModule
__all__ = ["EchoModule"]

View file

@ -1,14 +0,0 @@
from nio import MatrixRoom, Event
from core.base_module import BaseModule
class EchoModule(BaseModule):
@BaseModule.command_handler_decorator(name="echo", description="echo command")
async def echo(self, room: MatrixRoom, args, event: Event):
await self.client.room_send(
room_id=room.room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": " ".join(args)
}
)

View file

@ -1,3 +1,3 @@
from .ping import PingModule
from .module import PingModule
__all__ = ["PingModule"]

20
modules/ping/module.py Normal file
View file

@ -0,0 +1,20 @@
import time
from nio import MatrixRoom, Event
from core.logger import logger
from core.base_module import BaseModule
class PingModule(BaseModule):
def on_start(self):
logger.info("PingModule started")
def on_stop(self):
logger.info("PingModule stopped")
@BaseModule.command_handler(name="ping", description="echo command")
async def ping(self, room: MatrixRoom, args, event: Event):
ping_time = int(time.time() * 1000) - event.server_timestamp
await self.send_message(room.room_id, f"Pong! {ping_time}ms")
@BaseModule.command_handler(name="echo", description="echo command")
async def echo(self, room: MatrixRoom, args, event: Event):
await self.send_message(room.room_id, " ".join(args))

View file

@ -1,17 +0,0 @@
import time
from nio import MatrixRoom, Event
from core.base_module import BaseModule
class PingModule(BaseModule):
@BaseModule.command_handler_decorator(name="ping", description="echo command")
async def echo(self, room: MatrixRoom, args, event: Event):
ping_time = int(time.time() * 1000) - event.server_timestamp
await self.client.room_send(
room_id=room.room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": f"Pong! {ping_time}ms"
}
)

3
modules/rss/__init__.py Normal file
View file

@ -0,0 +1,3 @@
from .module import RSSModule
__all__ = ["RSSModule"]

3
modules/rss/config.toml Normal file
View file

@ -0,0 +1,3 @@
[module]
global_template = "New config on {feed_url}: [{title}]({link})"
update_interval = 60 # seconds

277
modules/rss/module.py Normal file
View file

@ -0,0 +1,277 @@
import os
import asyncio
import json
import feedparser
from typing import Dict, List, Optional
from datetime import datetime
from nio import MatrixRoom, Event
from core.bot_interface import BotInterface
from core.base_module import BaseModule
from core.logger import logger
from core.config import ConfigManager
class RSSModule(BaseModule):
def __init__(self, bot: BotInterface):
super().__init__(bot)
self.config = ConfigManager(os.path.join(os.path.dirname(__file__), "config.toml"))
self.subscriptions_file = os.path.join(os.path.dirname(__file__), "rss_subscriptions.json")
self.subscriptions: Dict[str, List[Dict]] = {} # {room_id: [{feed_url: str, template_name: str, last_updated: str}]}
self.templates: Dict[str, Dict] = {} # {room_id: {"default_template": str, "templates": {name: str, content: str}}}
self.global_template = self.config.get("module.global_template")
self.rss_update_interval = self.config.get("module.update_interval") # 1 minute
self.load_subscriptions()
asyncio.create_task(self.rss_updater())
def on_start(self):
logger.info("RSS Module started")
def on_stop(self):
logger.info("RSS Module stopped")
def load_subscriptions(self):
try:
with open(self.subscriptions_file, "r") as f:
data = json.load(f)
self.subscriptions = data.get("subscriptions", {})
self.templates = data.get("templates", {})
except FileNotFoundError:
self.subscriptions = {}
self.templates = {}
except json.JSONDecodeError:
logger.warning("Error decoding RSS subscriptions file. Starting with empty data.")
self.subscriptions = {}
self.templates = {}
def save_subscriptions(self):
data = {"subscriptions": self.subscriptions, "templates": self.templates}
with open(self.subscriptions_file, "w") as f:
json.dump(data, f, indent=4)
async def rss_updater(self):
while True:
for room_id, subscriptions in self.subscriptions.items():
for sub in subscriptions:
feed_url = sub["feed_url"]
template_name = sub.get("template_name")
template = self.get_template(room_id, template_name)
try:
await self.check_feed(room_id, feed_url, template, sub)
except Exception as e:
logger.error(f"Error checking feed {feed_url}: {e}")
await asyncio.sleep(self.rss_update_interval)
def get_template(self, room_id: str, template_name: Optional[str] = None) -> Optional[str]:
if room_id not in self.templates:
return self.global_template
if template_name:
return self.templates[room_id]["templates"].get(template_name)
default_template_name = self.templates[room_id].get("default_template")
if default_template_name:
return self.templates[room_id]["templates"].get(default_template_name)
async def check_feed(self, room_id: str, feed_url: str, template: str, sub: Dict):
try:
feed = feedparser.parse(feed_url)
last_updated = sub.get("last_updated")
if last_updated:
last_updated = datetime.fromisoformat(last_updated)
for entry in feed.entries:
entry_updated = datetime(*entry.updated_parsed[:6]) if hasattr(entry, 'updated_parsed') else datetime.now()
if last_updated and entry_updated <= last_updated:
continue
message = template.format(
feed_url=feed_url,
title=entry.title,
link=entry.link,
summary=entry.get("summary", ""),
author=entry.get("author", ""),
content=entry.get("content", ""),
updated=entry_updated.isoformat()
)
await self.send_message(room_id, message)
await asyncio.sleep(10)
sub["last_updated"] = datetime.now().isoformat()
self.save_subscriptions()
except Exception as e:
logger.error(f"Error parsing feed {feed_url}: {e}")
@BaseModule.command_handler(name="rss", description="RSS feed management.")
async def rss_command(self, room: MatrixRoom, args: List[str], event: Event):
await self.send_message(room.room_id, "Use `rss add`, `rss remove`, `rss list`, `rss set` subcommands.")
@rss_command.subcommand("add", "Add a new RSS feed subscription.", required_level=50)
async def rss_add(self, room: MatrixRoom, args: List[str], event: Event):
if len(args) != 1:
await self.send_message(room.room_id, "Usage: `rss add <feed_url>`")
return
feed_url = args[0]
if room.room_id not in self.subscriptions:
self.subscriptions[room.room_id] = []
for sub in self.subscriptions[room.room_id]:
if sub["feed_url"] == feed_url:
await self.send_message(room.room_id, f"Feed {feed_url} is already subscribed in this room.")
return
self.subscriptions[room.room_id].append({"feed_url": feed_url, "last_updated": datetime.now().isoformat()})
self.save_subscriptions()
index = len(self.subscriptions[room.room_id]) - 1
await self.send_message(room.room_id, f"Subscribed to {feed_url} with index {index} using default template.")
@rss_command.subcommand("remove", "Remove an RSS feed subscription.", required_level=50)
async def rss_remove(self, room: MatrixRoom, args: List[str], event: Event):
if len(args) != 1:
await self.send_message(room.room_id, "Usage: `rss remove <index>`")
return
try:
index = int(args[0])
except ValueError:
await self.send_message(room.room_id, "Invalid index. Please provide a valid number.")
return
if room.room_id not in self.subscriptions or index >= len(self.subscriptions[room.room_id]):
await self.send_message(room.room_id, f"No subscription found with index {index} in this room.")
return
removed_sub = self.subscriptions[room.room_id].pop(index)
self.save_subscriptions()
await self.send_message(room.room_id, f"Unsubscribed from feed {removed_sub['feed_url']} with index {index}.")
@rss_command.subcommand("list", "List RSS feed subscriptions.")
async def rss_list(self, room: MatrixRoom, args: List[str], event: Event):
if room.room_id not in self.subscriptions or not self.subscriptions[room.room_id]:
await self.send_message(room.room_id, "No RSS subscriptions in this room.")
return
message = "<p>RSS subscriptions in this room:</p>\n"
for index, sub in enumerate(self.subscriptions[room.room_id]):
template_name = sub.get("template_name", "default")
message += f"- {index}: {sub['feed_url']} (Template: {template_name})\n"
await self.send_message(room.room_id, message)
@BaseModule.command_handler(name="template", description="Template management.")
async def template_command(self, room: MatrixRoom, args: List[str], event: Event):
await self.send_message(room.room_id, "Use `template add`, `template set`, `template del`, `template set_default`, `template list` subcommands.")
@template_command.subcommand("add", "Add a new template.", required_level=50)
async def template_add(self, room: MatrixRoom, args: List[str], event: Event):
if len(args) < 2:
await self.send_message(room.room_id, "Usage: `template add <name> <content>`")
return
name = args[0]
content = " ".join(args[1:])
if room.room_id not in self.templates:
self.templates[room.room_id] = {"default_template": None, "templates": {}}
if name in self.templates[room.room_id]["templates"]:
await self.send_message(room.room_id, f"Template with name {name} already exists.")
return
self.templates[room.room_id]["templates"][name] = content
self.save_subscriptions()
await self.send_message(room.room_id, f"Template {name} added.")
@template_command.subcommand("del", "Delete a template.", required_level=50)
async def template_del(self, room: MatrixRoom, args: List[str], event: Event):
if len(args) != 1:
await self.send_message(room.room_id, "Usage: `template del <name>`")
return
name = args[0]
if room.room_id not in self.templates or name not in self.templates[room.room_id]["templates"]:
await self.send_message(room.room_id, f"No template found with name {name} in this room.")
return
if self.templates[room.room_id].get("default_template") == name:
self.templates[room.room_id]["default_template"] = None
del self.templates[room.room_id]["templates"][name]
self.save_subscriptions()
await self.send_message(room.room_id, f"Template {name} deleted.")
@template_command.subcommand("set", "Set a template for an RSS feed subscription.", required_level=50)
async def template_set(self, room: MatrixRoom, args: List[str], event: Event):
if len(args) < 2:
await self.send_message(room.room_id, "Usage: `rss set <index> <template_name>`")
return
try:
index = int(args[0])
except ValueError:
await self.send_message(room.room_id, "Invalid index. Please provide a valid number.")
return
template_name = args[1]
if room.room_id not in self.subscriptions or index >= len(self.subscriptions[room.room_id]):
await self.send_message(room.room_id, f"No subscription found with index {index} in this room.")
return
if room.room_id not in self.templates or template_name not in self.templates[room.room_id]["templates"]:
await self.send_message(room.room_id, f"No template found with name {template_name} in this room.")
return
self.subscriptions[room.room_id][index]["template_name"] = template_name
self.save_subscriptions()
await self.send_message(room.room_id,
f"Template updated for subscription with index {index} to {template_name}.")
@template_command.subcommand("set_default", "Set the default template.", required_level=50)
async def template_set_default(self, room: MatrixRoom, args: List[str], event: Event):
if len(args) != 1:
await self.send_message(room.room_id, "Usage: `template set_default <name>`")
return
name = args[0]
if room.room_id not in self.templates or name not in self.templates[room.room_id]["templates"]:
await self.send_message(room.room_id, f"No template found with name {name} in this room.")
return
self.templates[room.room_id]["default_template"] = name
self.save_subscriptions()
await self.send_message(room.room_id, f"Default template set to {name}.")
@template_command.subcommand("reset_default", "Reset the default template to global default.", required_level=50)
async def template_reset_default(self, room: MatrixRoom, args: List[str], event: Event):
if room.room_id not in self.templates:
await self.send_message(room.room_id, "No templates in this room.")
return
if not self.templates[room.room_id].get("default_template"):
await self.send_message(room.room_id, "No default template is set for this room.")
return
self.templates[room.room_id]["default_template"] = None
self.save_subscriptions()
await self.send_message(room.room_id, "Default template reset to global default.")
@template_command.subcommand("list", "List all templates.")
async def template_list(self, room: MatrixRoom, args: List[str], event: Event):
if room.room_id not in self.templates or not self.templates[room.room_id]["templates"]:
await self.send_message(room.room_id, "No templates in this room.")
return
message = "<p>Templates in this room:</p>\n"
for name, content in self.templates[room.room_id]["templates"].items():
is_default = self.templates[room.room_id].get("default_template") == name
message += f"- {name}: {content} {'(default)' if is_default else ''}\n"
await self.send_message(room.room_id, message)

View file

@ -0,0 +1,3 @@
{
}

44
requirements.txt Normal file
View file

@ -0,0 +1,44 @@
aiofiles==24.1.0
aiohappyeyeballs==2.4.6
aiohttp==3.11.12
aiohttp_socks==0.10.1
aiosignal==1.3.2
atomicwrites==1.4.1
attrs==25.1.0
cachetools==5.5.1
cffi==1.17.1
feedparser==6.0.11
frozenlist==1.5.0
h11==0.14.0
h2==4.2.0
hpack==4.1.0
hyperframe==6.1.0
idna==3.10
Jinja2==3.1.6
jsonschema==4.23.0
jsonschema-specifications==2024.10.1
linkify-it-py==2.0.3
Markdown==3.7
markdown-it-py==3.0.0
MarkupSafe==3.0.2
matrix-nio==0.25.2
mdit-py-plugins==0.4.2
mdurl==0.1.2
multidict==6.1.0
peewee==3.17.9
platformdirs==4.3.6
propcache==0.2.1
pycparser==2.22
pycryptodome==3.21.0
Pygments==2.19.1
python-olm==3.2.16
python-socks==2.7.1
referencing==0.36.2
rich==13.9.4
rpds-py==0.22.3
sgmllib3k==1.0.0
textual==2.1.2
typing_extensions==4.12.2
uc-micro-py==1.0.3
unpaddedbase64==2.1.0
yarl==1.18.3