diff --git a/base-config.yaml b/base-config.yaml new file mode 100644 index 0000000..8e42085 --- /dev/null +++ b/base-config.yaml @@ -0,0 +1,7 @@ +# Your youtube API key +# To obtain an API key: +# 1. Log into the Google Developer Console (https://console.developers.google.com) +# 2. Create a new project +# 3. Click "Enable APIs and Services", select the "YouTube Data API v3", and click "Enable" +# 4. Click "Create Credentials", select "Youtube Data API v3", and choose "Public data" and copy your key below +api_key: '' diff --git a/maubot.yaml b/maubot.yaml new file mode 100644 index 0000000..da29c60 --- /dev/null +++ b/maubot.yaml @@ -0,0 +1,12 @@ +maubot: 0.5.1 +id: io.laboon.maubot.youtube +version: 1.0.0 +license: AGPL-3.0-only +modules: + - youtube +main_class: YouTube +extra_files: + - base-config.yaml +config: true +webapp: false +database: false diff --git a/youtube.mbp b/youtube.mbp new file mode 100644 index 0000000..42711cd Binary files /dev/null and b/youtube.mbp differ diff --git a/youtube.py b/youtube.py new file mode 100644 index 0000000..a07f131 --- /dev/null +++ b/youtube.py @@ -0,0 +1,132 @@ +""" +youtube - A maubot plugin to display YouTube video information +Copyright (C) 2025 L. Bradley LaBoon + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License version 3 as +published by the Free Software Foundation. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +""" +import re +from typing import Type +from yarl import URL + +from maubot import Plugin, MessageEvent +from maubot.handlers import event +from mautrix.types import TextMessageEventContent, MessageType, Format, EventType +from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper + +class Config(BaseProxyConfig): + def do_update(self, helper: ConfigUpdateHelper) -> None: + helper.copy("api_key") + +class YouTube(Plugin): + async def start(self) -> None: + self.config.load_and_update() + + @classmethod + def get_config_class(cls) -> Type[BaseProxyConfig]: + return Config + + # Fetch an image URL, upload it to the matrix media store, and return the mcx:// address + async def upload_img(self, link: str) -> str: + image_req = await self.http.get(link) + if (image_req.status < 400): + image = await image_req.read() + mxc = await self.client.upload_media(image, filename=image_req.url.name) + return mxc + return "" + + # Listen to all messages passively instead of requiring a command + @event.on(EventType.ROOM_MESSAGE) + async def handle_message(self, evt: MessageEvent) -> None: + # Ignore messages from the bot itself to prevent infinite loops + if evt.sender == self.client.mxid or not evt.content.body: + return + + # Only process standard text messages or notices + if evt.content.msgtype not in (MessageType.TEXT, MessageType.NOTICE): + return + + # Find all youtube URLs in the message body using regex + # The regex avoids matching trailing markdown brackets/parentheses + urls = re.findall(r'(https?://(?:www\.)?(?:youtube\.com|youtu\.be)[^\s\]\)\>]+)', evt.content.body) + + if not urls: + return + + for video_url_str in urls: + video_url = URL(video_url_str) + video_params = video_url.query + video_id = None + offset = "" + + # Handle URLs + if video_url.host in ("youtube.com", "www.youtube.com"): + if "shorts" in video_url.parts: + video_id = video_url.name + else: + if "v" in video_params: + video_id = video_params["v"] + if "t" in video_params: + offset = f"&t={video_params['t']}" + elif video_url.host == "youtu.be": + video_id = video_url.name + if "t" in video_params: + offset = f"&t={video_params['t']}" + + # If we couldn't extract a video_id (e.g., they linked to a channel page), skip silently + if not video_id: + continue + + # Fetch video info + params = { + "key": self.config["api_key"], + "part": "snippet", + "id": video_id + } + async with self.http.get("https://www.googleapis.com/youtube/v3/videos", params=params) as response: + videos = await response.json() + if "error" in videos: + self.log.error(f"GET {str(response.url)}: {response.status} {videos['error']['message']}") + continue + elif response.status >= 400: + self.log.error(f"GET {str(response.url)}: {response.status}") + continue + + # If video is private, deleted, or wrong ID, skip silently + if len(videos.get("items", [])) < 1: + continue + + video = videos["items"][0] + + # Fix KeyError: 'maxres' by gracefully falling back to available resolutions + thumbnails = video["snippet"]["thumbnails"] + thumb_url = "" + for res in ["maxres", "high", "standard", "medium", "default"]: + if res in thumbnails: + thumb_url = thumbnails[res]["url"] + break + + # Upload thumbnail + img_html = "" + if thumb_url: + mxc = await self.upload_img(thumb_url) + if mxc: + img_html = f"" + + # Construct and send response + content = TextMessageEventContent( + msgtype=MessageType.NOTICE, + format=Format.HTML, + body=f"> YouTube\n> [{video['snippet']['channelTitle']}](https://www.youtube.com/channel/{video['snippet']['channelId']})\n> **[{video['snippet']['title']}](https://www.youtube.com/watch?v={video['id']}{offset})**", + formatted_body=f"
YouTube

{video['snippet']['channelTitle']}

{video['snippet']['title']}

{img_html}
" + ) + await evt.respond(content)