133 lines
5.5 KiB
Python
133 lines
5.5 KiB
Python
"""
|
|
youtube - A maubot plugin to display YouTube video information
|
|
Copyright (C) 2025 L. Bradley LaBoon
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License version 3 as
|
|
published by the Free Software Foundation.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
"""
|
|
import re
|
|
from typing import Type
|
|
from yarl import URL
|
|
|
|
from maubot import Plugin, MessageEvent
|
|
from maubot.handlers import event
|
|
from mautrix.types import TextMessageEventContent, MessageType, Format, EventType
|
|
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
|
|
|
|
class Config(BaseProxyConfig):
|
|
def do_update(self, helper: ConfigUpdateHelper) -> None:
|
|
helper.copy("api_key")
|
|
|
|
class YouTube(Plugin):
|
|
async def start(self) -> None:
|
|
self.config.load_and_update()
|
|
|
|
@classmethod
|
|
def get_config_class(cls) -> Type[BaseProxyConfig]:
|
|
return Config
|
|
|
|
# Fetch an image URL, upload it to the matrix media store, and return the mcx:// address
|
|
async def upload_img(self, link: str) -> str:
|
|
image_req = await self.http.get(link)
|
|
if (image_req.status < 400):
|
|
image = await image_req.read()
|
|
mxc = await self.client.upload_media(image, filename=image_req.url.name)
|
|
return mxc
|
|
return ""
|
|
|
|
# Listen to all messages passively instead of requiring a command
|
|
@event.on(EventType.ROOM_MESSAGE)
|
|
async def handle_message(self, evt: MessageEvent) -> None:
|
|
# Ignore messages from the bot itself to prevent infinite loops
|
|
if evt.sender == self.client.mxid or not evt.content.body:
|
|
return
|
|
|
|
# Only process standard text messages or notices
|
|
if evt.content.msgtype not in (MessageType.TEXT, MessageType.NOTICE):
|
|
return
|
|
|
|
# Find all youtube URLs in the message body using regex
|
|
# The regex avoids matching trailing markdown brackets/parentheses
|
|
urls = re.findall(r'(https?://(?:www\.)?(?:youtube\.com|youtu\.be)[^\s\]\)\>]+)', evt.content.body)
|
|
|
|
if not urls:
|
|
return
|
|
|
|
for video_url_str in urls:
|
|
video_url = URL(video_url_str)
|
|
video_params = video_url.query
|
|
video_id = None
|
|
offset = ""
|
|
|
|
# Handle URLs
|
|
if video_url.host in ("youtube.com", "www.youtube.com"):
|
|
if "shorts" in video_url.parts:
|
|
video_id = video_url.name
|
|
else:
|
|
if "v" in video_params:
|
|
video_id = video_params["v"]
|
|
if "t" in video_params:
|
|
offset = f"&t={video_params['t']}"
|
|
elif video_url.host == "youtu.be":
|
|
video_id = video_url.name
|
|
if "t" in video_params:
|
|
offset = f"&t={video_params['t']}"
|
|
|
|
# If we couldn't extract a video_id (e.g., they linked to a channel page), skip silently
|
|
if not video_id:
|
|
continue
|
|
|
|
# Fetch video info
|
|
params = {
|
|
"key": self.config["api_key"],
|
|
"part": "snippet",
|
|
"id": video_id
|
|
}
|
|
async with self.http.get("https://www.googleapis.com/youtube/v3/videos", params=params) as response:
|
|
videos = await response.json()
|
|
if "error" in videos:
|
|
self.log.error(f"GET {str(response.url)}: {response.status} {videos['error']['message']}")
|
|
continue
|
|
elif response.status >= 400:
|
|
self.log.error(f"GET {str(response.url)}: {response.status}")
|
|
continue
|
|
|
|
# If video is private, deleted, or wrong ID, skip silently
|
|
if len(videos.get("items", [])) < 1:
|
|
continue
|
|
|
|
video = videos["items"][0]
|
|
|
|
# Fix KeyError: 'maxres' by gracefully falling back to available resolutions
|
|
thumbnails = video["snippet"]["thumbnails"]
|
|
thumb_url = ""
|
|
for res in ["maxres", "high", "standard", "medium", "default"]:
|
|
if res in thumbnails:
|
|
thumb_url = thumbnails[res]["url"]
|
|
break
|
|
|
|
# Upload thumbnail
|
|
img_html = ""
|
|
if thumb_url:
|
|
mxc = await self.upload_img(thumb_url)
|
|
if mxc:
|
|
img_html = f"<img width=400 src='{mxc}' />"
|
|
|
|
# Construct and send response
|
|
content = TextMessageEventContent(
|
|
msgtype=MessageType.NOTICE,
|
|
format=Format.HTML,
|
|
body=f"> YouTube\n> [{video['snippet']['channelTitle']}](https://www.youtube.com/channel/{video['snippet']['channelId']})\n> **[{video['snippet']['title']}](https://www.youtube.com/watch?v={video['id']}{offset})**",
|
|
formatted_body=f"<blockquote><h5>YouTube<br><br><a href='https://www.youtube.com/channel/{video['snippet']['channelId']}'><span data-mx-color='#FFFFFF'>{video['snippet']['channelTitle']}</span></a></h5><h4><a href='https://www.youtube.com/watch?v={video['id']}{offset}'>{video['snippet']['title']}</a></h4>{img_html}</blockquote>"
|
|
)
|
|
await evt.respond(content)
|