#! /usr/bin/env python # from .event import Event # from .scraping import get_soup, process_soup, get_events_today import asyncio import os from textwrap import dedent from markdown2 import Markdown from nio import AsyncClient from .sql_connector import Event, fetch_events MATRIX_URL = os.environ.get("MATRIX_URL", "https://matrix.pvv.ntnu.no").strip() MATRIX_USER = os.environ.get("MATRIX_USER", "@bot_calendar:pvv.ntnu.no").strip() MATRIX_TOKEN = os.environ.get("MATRIX_TOKEN").strip() ANNOUNCEMENT_CHANNEL = os.environ.get( "ANNOUNCEMENT_CHANNEL", "!announcements:pvv.ntnu.no", ).strip() client = None def create_announcement(event: Event, atEveryone: bool) -> str: url = f"https://www.pvv.ntnu.no/hendelser/info.php?id={event.id}" msgText = dedent("""\ ## Dagens arrangement / Event of the Day: "{name}" - 🕒 **{start}** - 📍 **{location}** {description} [Les mer / Read More]({url}) """).format( name=event.name, start=event.start.strftime("%H:%M"), location=event.location, description=event.description, url=url, ) if atEveryone: msgText = msgText + "\n@room" return msgText async def sendMatrixAnnouncement( event: Event, channel: str = ANNOUNCEMENT_CHANNEL, atEveryone: bool = False, ) -> None: msgText = create_announcement(event, atEveryone) return await client.room_send( room_id=channel, message_type="m.room.message", content={ "msgtype": "m.text", "body": msgText, "format": "org.matrix.custom.html", "formatted_body": Markdown().convert(msgText), }, ) async def sendCalendarEvents() -> None: global client client = AsyncClient(MATRIX_URL, MATRIX_USER) client.access_token = MATRIX_TOKEN eventsToday = fetch_events() for event in eventsToday: await sendMatrixAnnouncement(event, ANNOUNCEMENT_CHANNEL, False) await client.close() def main() -> None: asyncio.run(sendCalendarEvents()) if __name__ == "__main__": main()