"""
youtube - A maubot plugin to display YouTube video information
Copyright (C) 2025 L. Bradley LaBoon
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License version 3 as
published by the Free Software Foundation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
"""
import re
from typing import Type
from yarl import URL
from maubot import Plugin, MessageEvent
from maubot.handlers import event
from mautrix.types import TextMessageEventContent, MessageType, Format, EventType
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
class Config(BaseProxyConfig):
def do_update(self, helper: ConfigUpdateHelper) -> None:
helper.copy("api_key")
class YouTube(Plugin):
async def start(self) -> None:
self.config.load_and_update()
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
return Config
# Fetch an image URL, upload it to the matrix media store, and return the mcx:// address
async def upload_img(self, link: str) -> str:
image_req = await self.http.get(link)
if (image_req.status < 400):
image = await image_req.read()
mxc = await self.client.upload_media(image, filename=image_req.url.name)
return mxc
return ""
# Listen to all messages passively instead of requiring a command
@event.on(EventType.ROOM_MESSAGE)
async def handle_message(self, evt: MessageEvent) -> None:
# Ignore messages from the bot itself to prevent infinite loops
if evt.sender == self.client.mxid or not evt.content.body:
return
# Only process standard text messages or notices
if evt.content.msgtype not in (MessageType.TEXT, MessageType.NOTICE):
return
# Find all youtube URLs in the message body using regex
# The regex avoids matching trailing markdown brackets/parentheses
urls = re.findall(r'(https?://(?:www\.)?(?:youtube\.com|youtu\.be)[^\s\]\)\>]+)', evt.content.body)
if not urls:
return
for video_url_str in urls:
video_url = URL(video_url_str)
video_params = video_url.query
video_id = None
offset = ""
# Handle URLs
if video_url.host in ("youtube.com", "www.youtube.com"):
if "shorts" in video_url.parts:
video_id = video_url.name
else:
if "v" in video_params:
video_id = video_params["v"]
if "t" in video_params:
offset = f"&t={video_params['t']}"
elif video_url.host == "youtu.be":
video_id = video_url.name
if "t" in video_params:
offset = f"&t={video_params['t']}"
# If we couldn't extract a video_id (e.g., they linked to a channel page), skip silently
if not video_id:
continue
# Fetch video info
params = {
"key": self.config["api_key"],
"part": "snippet",
"id": video_id
}
async with self.http.get("https://www.googleapis.com/youtube/v3/videos", params=params) as response:
videos = await response.json()
if "error" in videos:
self.log.error(f"GET {str(response.url)}: {response.status} {videos['error']['message']}")
continue
elif response.status >= 400:
self.log.error(f"GET {str(response.url)}: {response.status}")
continue
# If video is private, deleted, or wrong ID, skip silently
if len(videos.get("items", [])) < 1:
continue
video = videos["items"][0]
# Fix KeyError: 'maxres' by gracefully falling back to available resolutions
thumbnails = video["snippet"]["thumbnails"]
thumb_url = ""
for res in ["maxres", "high", "standard", "medium", "default"]:
if res in thumbnails:
thumb_url = thumbnails[res]["url"]
break
# Upload thumbnail
img_html = ""
if thumb_url:
mxc = await self.upload_img(thumb_url)
if mxc:
img_html = f"
"
# Construct and send response
content = TextMessageEventContent(
msgtype=MessageType.NOTICE,
format=Format.HTML,
body=f"> YouTube\n> [{video['snippet']['channelTitle']}](https://www.youtube.com/channel/{video['snippet']['channelId']})\n> **[{video['snippet']['title']}](https://www.youtube.com/watch?v={video['id']}{offset})**",
formatted_body=f"
{img_html}
"
)
await evt.respond(content)