commit 2335f7ff16d61f89c927cd63f354f86e5387900c Author: Buzzbyte Date: Mon Apr 25 00:23:37 2022 -0400 Add files diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e2c0b35 --- /dev/null +++ b/.gitignore @@ -0,0 +1,118 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# dotenv +.env + +# virtualenv +.venv +venv/ +ENV/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ + +### VisualStudioCode ### +.vscode/* +# Not needed for this project +#!.vscode/settings.json +#!.vscode/tasks.json +#!.vscode/launch.json +#!.vscode/extensions.json + +### VisualStudioCode Patch ### +# Ignore all local history of files +.history + +# i have no idea what this file is or what it +# was doing here, so imma just ignore it and +# hope it doesn't blow up everying... +engines.sqlite diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..bacb0c4 --- /dev/null +++ b/__init__.py @@ -0,0 +1,129 @@ +import re +import traceback +from aiohttp import ClientSession +import discord +import dyphanbot.utils as utils +from dyphanbot import Plugin + +from .engines import EngineError, EngineRateLimitError +from .engines.saucenao import SauceNao + +from .utils import parse_image_url + + +class SaucePlz(Plugin): + """ + DyphanBot plugin for reverse image searching + """ + + async def help(self, message, args): + prefix = self.get_local_prefix(message) + return { + "helptext": ("__*Experimental feature, expect bugs!*__\n" + "Finds the source of an image using reverse image search engines.\n" + "Currently, only searches [SauceNAO](https://saucenao.com) databases, " + "but will hopefully support more engines in the future."), + "shorthelp": "Finds the source of an image using SauceNAO.", + "color": discord.Colour(0x1), + "sections": [{ + "name": "Usage", + "value": ( + "> {0}saucepls\n> {0}saucepls `URL`\n" + "Post an image or video with the above command " + "or call it with a URL.\n" + "Also works with replies!\n" + "Alias: *sauceplz*" + ).format(prefix), + "inline": False + }] + } + + def start(self): + self._config_fn = "config.json" + self.config = self.load_json(self._config_fn, initial_data={ + "saucenao": { + "api_key": "__SAUCENAO_API_KEY__" + } + }, save_json=self._save_config) + + def _save_config(self, filename, data): + return self.dyphanbot.data.save_json(filename, data, indent=4) + + async def lookup_sauce(self, message, url): + try: + sn_engine = SauceNao(config=self.config, loop=self.dyphanbot.loop) + result = await sn_engine.best_match(url, hide_nsfw=not message.channel.is_nsfw()) + if not result: + return {"content": "Unable to find match."} + + embed = result.generate_embed(requester=message.author) + return {"content": "Sauce Found?", "embed": embed} + except EngineRateLimitError as err: + traceback.print_exc() + return {"content": f"Ratelimited: {err}"} + except EngineError as err: + traceback.print_exc() + return {"content": f"Error: {err}"} + + @Plugin.command + async def sauceplz(self, client, message, args): + url = None + if len(args) > 0: + url = " ".join(args).strip("<>") + + pre_text = "" + target_message = message + if len(message.attachments) <= 0 and not url: + # check if message is a reply + if message.reference: + msg_ref = message.reference + if not msg_ref.resolved: + try: + target_message = message.channel.fetch_message(msg_ref.message_id) + except Exception: + return await message.reply("Unable to retrieve referenced message.") + elif isinstance(msg_ref.resolved, discord.DeletedReferencedMessage): + return await message.reply("Referenced message was deleted.") + else: + target_message = msg_ref.resolved + urls = re.findall(r'(https?://\S+)', target_message.content) + if urls: + if len(urls) > 1: + pre_text += "Multiple URLs found in referenced message. Using the first one.\n" + url = urls[0] + if len(target_message.attachments) <= 0 and not url: + return await message.reply("No attachment or URL found in referenced message.") + else: + return await message.reply("No attachment or URL provided.") + + if len(target_message.attachments) >= 1 and url is not None: + pre_text += "Both attachment and URL provided. Using URL.\n" + elif len(target_message.attachments) > 1: + pre_text += "Multiple attachments found. Using the first one.\n" + + response_msg = None + + if not url: + response_msg = await message.reply(f"{pre_text}*Getting attachment...*", mention_author=False) + image = target_message.attachments[0] + url = image.url + + if response_msg: + await response_msg.edit( + allowed_mentions=discord.AllowedMentions(replied_user=False), + content=f"{pre_text}*Looking for the sauce...*") + else: + response_msg = await message.reply(f"{pre_text}*Looking for the sauce...*", mention_author=False) + + async with ClientSession(loop=self.dyphanbot.loop) as session: + url = await parse_image_url(session, url) + + results = await self.lookup_sauce(message, url=url) + await response_msg.edit( + allowed_mentions=discord.AllowedMentions(replied_user=True), + **results) + + @Plugin.command + async def saucepls(self, client, message, args): + # alias to sauceplz + return await self.sauceplz(client, message, args) \ No newline at end of file diff --git a/engines/__init__.py b/engines/__init__.py new file mode 100644 index 0000000..cc4492e --- /dev/null +++ b/engines/__init__.py @@ -0,0 +1,99 @@ +import posixpath as path +import logging +import asyncio +from urllib.parse import quote_plus + +# pyright: reportWildcardImportFromLibrary=false +from typing import * + +# pyright: reportPrivateImportUsage=false +from aiohttp_client_cache import CachedSession, SQLiteBackend + +class EngineError(Exception): + pass + +class EngineResponseError(EngineError): + pass + +class EngineRateLimitError(EngineError): + pass + +class ReverseImageSearchEngine(object): + """Base class for reverse image search engines + + Different reverse image search engines can inherit from this class + + Args: + url_base (str): Base url of the engine + url_path (str): Path and query to the reverse image search function. + Should contain `{image_url}` in which the input url will be placed + to perform the search needed. + name (str, optional): Name of the search engine + """ + logger = logging.getLogger(__name__) + + def __init__(self, url_base, url_path, name=None, config={}, loop=None, **request_args) -> None: + self.url_base = url_base + self.url_path = url_path + self.name = name + self.request_args = request_args + self.engine_config = config + + if not loop: + loop = asyncio.get_event_loop() + self.loop = loop + + # session used to make async cached requests + self._session = CachedSession( + cache=SQLiteBackend( + __name__.lower(), + expire_after=604800, # 1 week + allowable_methods=('GET', 'HEAD', 'POST') + ), + loop=loop + ) + + def _build_search_url(self, url) -> str: + search_path = self.url_path.format(image_url=quote_plus(url)) + return path.join(self.url_base, search_path) + + async def _request(self, method, url, **kwargs) -> str: + async with self._session as session: + resp = await session.request(method, url, **kwargs) + return await resp.text() + + async def _request_get(self, url, **kwargs) -> str: + return await self._request('GET', url, **kwargs) + + async def _request_post(self, url, **kwargs) -> str: + return await self._request('POST', url, **kwargs) + + async def search(self, url, post=False, **request_args): + search_url = self._build_search_url(url) + req_func = self._request_post if post else self._request_get + result = await req_func(search_url, **request_args, **self.request_args) + return result + + def parse_html(self, html): + """Parses the search engine's html to a useable dictionary + + Override this method when inheriting from this class with the + site-specific implementation. + """ + raise NotImplementedError + + async def top_matches(self, url, limit=3): + """Get a list of the top matched results from the engine + + Override this method when inheriting from this class with the + site-specific implementation. + """ + raise NotImplementedError + + async def best_match(self, url): + """Get the best matched result from the reverse image search engine + + Override this method when inheriting from this class with the + site-specific implementation. + """ + raise NotImplementedError \ No newline at end of file diff --git a/engines/saucenao.py b/engines/saucenao.py new file mode 100644 index 0000000..33faebb --- /dev/null +++ b/engines/saucenao.py @@ -0,0 +1,533 @@ +import json +import logging +import asyncio +from urllib.parse import quote as url_encode + +import aiohttp +import discord + +import dyphanbot.utils as utils + +from . import EngineRateLimitError, EngineResponseError, ReverseImageSearchEngine + +SAUCE_INDEX = { + "0" : "H-Magazines", + "2" : "H-Game CG", + "3" : "DoujinshiDB", + "5" : "Pixiv", + "6" : "Pixiv (Historical)", + "8" : "Nico Nico Seiga", + "9" : "Danbooru", + "10": "drawr Images", + "11": "Nijie Images", + "12": "Yande.re", + "15": "Shutterstock", + "16": "FAKKU", + "18": "H-Misc (nhentai)", + "19": "2D-Market", + "20": "MediBang", + "21": "Anime", + "22": "H-Anime", + "23": "Movies", + "24": "Shows", + "25": "Gelbooru", + "26": "Konachan", + "27": "Sankaku Channel", + "28": "Anime-Pictures.net", + "29": "e621.net", + "30": "Idol Complex", + "31": "bcy.net Illust", + "32": "bcy.net Cosplay", + "33": "PortalGraphics.net (Hist)", + "34": "deviantArt", + "35": "Pawoo.net", + "36": "Madokami (Manga)", + "37": "MangaDex", + "38": "E-Hentai", + "39": "ArtStation", + "40": "FurAffinity", + "41": "Twitter", + "42": "Furry Network", + "43": "Kemono", + + # fucking unlisted indexes... x_x + # these probably wont show up tho since they're sub-indexes, + # but they're added just in case... + "51": "Pixiv", + "52": "Pixiv", + "53": "Pixiv", + "211": "Anime", + + # these, however, WILL show up as an index_id, + # but for some reason they weren't documented anywhere. smh + "341": "deviantArt", + "371": "MangaDex" +} + +SAUCE_TYPES = { + "booru": [9, 12, 25, 26, 27, 28, 29, 30], + "manga": [0, 3, 16, 18, 36, 37, 38, 371], + "pixiv": [5, 6, 51, 52, 53], + "anime": [21, 22, 211], + "video": [23, 24], + "twitter": [41] +} + +NL = "\n" # cause fuck f-strings. + +class SauceNao(ReverseImageSearchEngine): + """ + SauceNAO engine + """ + + url_base = "https://saucenao.com" + url_path = "search.php" + + def __init__(self, config={}, loop=None, **request_args) -> None: + super().__init__(self.url_base, self.url_path, name="SauceNAO", config=config, loop=loop, **request_args) + + self.config = self.engine_config.get("saucenao", {}) + self.api_key = self.config.get("api_key") + + async def top_matches(self, url, limit=3, hide_nsfw=True): + try: + api_req = await self.search(url, post=True, data={ + "output_type": 2, + "api_key": self.api_key, + "db": 999, + "numres": limit if limit <= 10 else 10, + "url": url, + "hide": 2 + }) + + api_data = json.loads(api_req) + + meta = api_data.get("header", {}) + results = api_data.get("results", []) + min_similarity = float(meta.get("minimum_similarity", 50)) + + returned_results = [] + low_similarity_count = 0 + hidden_nsfw_count = 0 + + for result in results: + header = result["header"] + data = result["data"] + + similarity = float(header["similarity"]) + if similarity < min_similarity: + low_similarity_count += 1 + continue + + if header.get("hidden", 0) > 0 and hide_nsfw: + hidden_nsfw_count += 1 + continue + + sanitized_result = {} + + index_id = header["index_id"] + sanitized_result["type"] = "generic" + for sauce_type, indexes in SAUCE_TYPES.items(): + if index_id in indexes: + sanitized_result["type"] = sauce_type + break + + sanitized_result.update({ + "input_url": url, + "similarity": similarity, + "min_similarity": min_similarity, + "nsfw": header.get("hidden", 0) > 0, + "thumbnail": header.get("thumbnail"), + "index_name": header.get("index_name"), + "index_id": index_id, + "index": SAUCE_INDEX.get(str(index_id)), + "data": data + }) + + # Make a base "generic" class and subclass the different type, + # then call a factory function from the base class to instansiate + # the proper subclass, if applicable, from the sanitized_result. Append + # this object to returned_results, then return it. + + parsed_result = GenericSauce.from_dict(sanitized_result) + await parsed_result._async_tasks() + + print(parsed_result) + returned_results.append(parsed_result) + + return returned_results + + except aiohttp.ClientResponseError as err: + if err.status == 429: + raise EngineRateLimitError("Daily limit reached (100)") + raise EngineResponseError(f"{err.status} {err.message}") + except json.JSONDecodeError: + raise EngineResponseError("Could not interpret result.") + + async def best_match(self, url, hide_nsfw=True): + # Call self.top_matches() with the url and return the first result. + top_three = await self.top_matches(url, hide_nsfw=hide_nsfw) + if not top_three: + return None + + return top_three[0] + +# Parts of the following classes were referenced from: +# https://github.com/MakotoAme/pysaucenao/blob/master/pysaucenao/containers.py + +class GenericSauce(object): + """ Generic attributes that are applicable for any source, but not always """ + _type = "generic" + + def __init__(self, result: dict): + self.result = result + + self.input_url = self.result["input_url"] + + # header attribs + self.similarity = self.result["similarity"] + self.min_similarity = self.result["min_similarity"] + self.nsfw = self.result["nsfw"] + self.thumbnail = self.result["thumbnail"] + self.index_name = self.result["index_name"] + self.index_id = self.result["index_id"] + self.index = self.result["index"] + + # data attribs (will be parsed later) + self.author_name = None + self.author_url = None + self.authors = None + self.title = None + self.url = None + self.urls = None + + self._data = self.result["data"] + self._parse_data(self._data) + + @classmethod + def from_dict(cls, result): + """ Instantiate a sauce object from dict """ + + def all_subclasses(cls): + """ Make sure we get all the inherited classes """ + return set(cls.__subclasses__()).union( + [s for c in cls.__subclasses__() for s in all_subclasses(c)]) + + res_type = result.get("type") + if res_type: + for subcls in all_subclasses(cls): + cls_type = subcls._type + if cls_type == res_type: + return subcls(result) + + return cls(result) + + @property + def sauce_url(self): + """ + Returns the standard source url of the result + """ + return self.url + + async def _async_tasks(self): + """ Called after initialization to complete async tasks needed by the source """ + return + + def _parse_data(self, data: dict): + """ + Parse the data from the dict into the appropriate attributes; called at initialization + """ + + # messy api... smh + # "source" can sometimes be a url instead... -_- + for title_field in ["title", "material", "eng_name", "source"]: + if title_field not in data: + continue + + self.title = data[title_field] + break + + for author_field in ["member_name", "creator", "author_name", "author", + "pawoo_user_acct", "pawoo_user_username", "pawoo_user_display_name"]: + if author_field not in data: + continue + + if isinstance(data[author_field], list): + # it can sometimes be a list of authors, so parse accordingly + self.author_name = data[author_field][0] + self.authors = data[author_field] + break + + self.author_name = data[author_field] + self.authors = [data[author_field]] + + if "author_url" in data: + self.author_url = data["author_url"] + elif "pawoo_id" in data and "ext_urls" in data: + self.author_url = data['ext_urls'][0] + + if "ext_urls" in data: + self.url = data["ext_urls"][0] + self.urls = data["ext_urls"] + + def generate_embed(self, requester=None, additional_desc="", show_links=True): + """ Returns a discord embed to display the resulting information """ + nsfw_tag = '**NSFW**\n' if self.nsfw else '' + description = f"{nsfw_tag}Similarity: {self.similarity}%" + + if self.index: + description += f"{NL}Matched in: {self.index}" + + if self.authors: + author_str = ', '.join(self.authors) + author_text = f"[{author_str}]({self.author_url})" if self.author_url else author_str + description += f"{NL}**Author:** {author_text}" + + description += f"{NL}{additional_desc}" if additional_desc else "" + + if self.urls and show_links: + url_list_str = '\n'.join(self.urls) + description += f"{NL}{NL}**Links:**{NL}{url_list_str}" + + embed = discord.Embed(title=self.title, url=self.sauce_url, description=description) + + embed.set_author( + name="SauceNAO", + url=f"https://saucenao.com/search.php?url={url_encode(self.input_url)}", + icon_url="https://i.imgur.com/Ynoqpam.png" + ) + + if self.thumbnail: + embed.set_thumbnail(url=self.thumbnail) + + if requester: + embed.set_footer( + icon_url=utils.get_user_avatar_url(requester), + text=f"Requested by {requester}" + ) + + return embed + +class PixivSauce(GenericSauce): + """ Pixiv source type """ + _type = "pixiv" + + def __init__(self, data: dict): + super().__init__(data) + + def _parse_data(self, data: dict): + super()._parse_data(data) + self.author_url = f"https://pixiv.net/member.php?id={data['member_id']}" + +class BooruSauce(GenericSauce): + """ Booru source type """ + _type = "booru" + + def __init__(self, data: dict): + super().__init__(data) + + @property + def sauce_url(self): + """ Returns the linked source if available """ + return self._data.get("source", self.url) + + def _parse_data(self, data): + super()._parse_data(data) + + for booru in ["gelbooru", "danbooru", "yandere", + "konachan", "sankaku", "anime-pictures", + "e621", "idol"]: + id_field = f"{booru}_id" + if id_field not in data: + continue + + self.booru_type = booru + self.post_id = data.get(id_field) + break + + self.characters = data.get("characters") + self.material = data.get("material") + + if self.characters: + self.characters = [x.strip() for x in self.characters.split(',')] + + if self.material: + self.material = [x.strip() for x in self.material.split(',')] + + if not self.title: + self.title = f"Post #{self.post_id}" + + def generate_embed(self, requester=None): + additional_desc = "" + if self.characters: + characters_str = ', '.join([f'`{x}`' for x in self.characters]) if isinstance(self.characters, list) else str(self.characters) + additional_desc += f"**Characters:** {characters_str}" + + if self.material: + material_str = ', '.join([f'`{x}`' for x in self.material]) if isinstance(self.material, list) else str(self.material) + additional_desc += f"{NL}**Material:** {material_str}" + + return super().generate_embed(requester=requester, additional_desc=additional_desc) + +class TwitterSauce(GenericSauce): + """ Twitter sauce type """ + _type = "twitter" + + def __init__(self, data: dict): + super().__init__(data) + + def _parse_data(self, data: dict): + super()._parse_data(data) + + self.tweet_id = data["tweet_id"] + self.twitter_user_id = data["twitter_user_id"] + self.twitter_user_handle = data["twitter_user_handle"] + + self.author_name = self.twitter_user_handle + self.author_url = f"https://twitter.com/i/user/{self.twitter_user_id}" + self.authors = [self.author_name] + + if not self.title: + self.title = f"Tweet by @{self.twitter_user_handle}" + +class VideoSauce(GenericSauce): + """ Movies and Shows source """ + _type = "video" + + def __init__(self, data: dict): + self.episode = None + self.timestamp = None + self.year = None + + super().__init__(data) + + def _parse_data(self, data): + super()._parse_data(data) + + self.episode = data.get("part") + self.timestamp = data.get("est_time") + self.year = data.get("year") + + def generate_embed(self, requester=None, additional_desc="", show_links=True): + desc = "" + if self.year: + desc = f"**Year:** {self.year}" + + if self.episode: + desc += f"{NL}**Episode:** {self.episode}" + + if self.timestamp: + desc += f"{NL}**Timestamp:** {self.timestamp}" + + desc += f"{NL}{additional_desc}" if additional_desc else "" + + return super().generate_embed(requester, additional_desc=desc, show_links=show_links) + +class AnimeSauce(VideoSauce): + """ Anime source """ + _type = "anime" + + def __init__(self, data): + self._logger = logging.getLogger(__name__) + self._async_done = False + + self.anidb_aid = None + self.anilist_id = None + self.mal_id = None + + super().__init__(data) + + async def _async_tasks(self, loop=None): + if self._async_done: + return + + if not self.anidb_aid: + return + + if self.anilist_id and self.mal_id: + return + + async with aiohttp.ClientSession(loop=loop, raise_for_status=True) as session: + try: + resp = await session.get(f"https://relations.yuna.moe/api/ids?source=anidb&id={self.anidb_aid}") + ids = await resp.json() or {} + + if not self.anilist_id: + self.anilist_id = ids.get('anilist') + + if not self.mal_id: + self.mal_id = ids.get("myanimelist") + except json.JSONDecodeError: + self._logger.info(f"relations.yuna.moe lookup failed for aid: {self.anidb_aid}") + except aiohttp.ClientResponseError as err: + self._logger.info(f"relations.yuna.moe returned a {err.status} error.") + except aiohttp.ClientError as err: + self._logger.info(f"unable to connect to relations.yuna.moe api") + + def _parse_data(self, data): + super()._parse_data(data) + + self.anidb_aid = data.get("anidb_aid") + self.anilist_id = data.get("anilist_id") + self.mal_id = data.get("mal_id") + + @property + def anidb_url(self): + if not self.anidb_aid: + return None + + return f"https://anidb.net/anime/{self.anidb_aid}" + + @property + def anilist_url(self): + if not self.anilist_id: + return None + + return f"https://anilist.co/anime/{self.anilist_id}" + + @property + def mal_url(self): + if not self.mal_id: + return None + + return f"https://myanimelist.net/anime/{self.mal_id}" + + def generate_embed(self, requester=None): + link_strs = [] + if self.anidb_url: + link_strs.append(f"[AniDB]({self.anidb_url})") + + if self.anilist_url: + link_strs.append(f"[AniList]({self.anilist_url})") + + if self.mal_url: + link_strs.append(f"[MyAnimeList]({self.mal_url})") + + desc = f"{NL}{' | '.join(link_strs)}" + return super().generate_embed(requester, additional_desc=desc, show_links=False) + +class MangaSauce(GenericSauce): + """ Manga source type """ + _type = "manga" + + def __init__(self, data: dict): + self.chapter = None + self.artist = None + super().__init__(data) + + def _parse_data(self, data): + super()._parse_data(data) + + self.chapter = data.get("part") + self.artist = data.get("artist") + + def generate_embed(self, requester=None): + desc = "" + + if self.artist: + desc += f"**Artist:** {self.artist}" + + if self.chapter: + desc += f"{NL}**Chapter:** {self.chapter}" + + return super().generate_embed(requester, additional_desc=desc) \ No newline at end of file diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..e9c7e51 --- /dev/null +++ b/utils.py @@ -0,0 +1,84 @@ +import re +import json +import traceback + +REGEXES = { + # group 1 = single image id, or "gallery", or "a" + # group 2 = empty if single image, or album/gallery id + "imgur" : re.compile(r"https?://(?:www.)?imgur.com/((?:(?!\/).)+)/?((?:(?![\./]).)+)?"), + + # group 1 = gfycat/redgifs id + "gfycat" : re.compile(r"https?://(?:www.|giant.)?gfycat.com/(?:gifs/detail/)?((?:(?![\./-]).)+)"), + "redgifs": re.compile(r"https?://(?:www.)?redgifs.com/watch/((?:(?![\./-]).)+)"), + + # group 1 = giphy id + "giphy" : re.compile(r'https?://(?:www.)?giphy.com/gifs/(?:.*\-)?((?:(?![\./-]).)+)'), + + # group 1 = tenor id + "tenor" : re.compile(r'https?://(?:www.)?tenor.com?/view/(?:.*\-)?((?:(?![\./-]).)+)') +} + +GIPHY_DIRECT_URL = "https://media.giphy.com/media/{0}/giphy.gif" + +def _match_url(url): + for site, regex in REGEXES.items(): + match = regex.match(url) + if match: + return (site, match) + return (None, None) + +async def parse_image_url(session, url): + site, match = _match_url(url) + if not match: + return url + + image_id = match.group(1) + if site == "imgur": + request_url = f"https://imgur.com/{image_id}" + if image_id == "gallery" or image_id == "a": + album_type = image_id + album_id = match.group(2) + request_url = f"https://imgur.com/{album_type}/{album_id}" + + try: + r = await session.get(request_url) + r.raise_for_status() + image_page_html = await r.text() + image_url_match = re.search(r"", image_page_html) + image_url = image_url_match.group(1).strip() if image_url_match else None + if not image_url: + return url + return image_url + except Exception: + traceback.print_exc() + return url + elif site == "gfycat" or site == "redgifs": + try: + r = await session.get(f"https://api.{site}.com/v1/gfycats/{image_id}") + r.raise_for_status() + gif_info = await r.json() + poster_url = gif_info.get('gfyItem', {}).get('posterUrl') + if not poster_url: + return url + return poster_url + except Exception: + traceback.print_exc() + return url + elif site == "giphy": + return GIPHY_DIRECT_URL.format(image_id) + elif site == "tenor": + try: + r = await session.get(f"https://tenor.com/view/{image_id}") + r.raise_for_status() + gif_page_html = await r.text() + gif_info_match = re.search(r"", gif_page_html) + gif_info_raw = gif_info_match.group(1).strip() if gif_info_match else "" + if not gif_info_raw: + return url + gif_info = json.loads(gif_info_raw) + return gif_info['image']['contentUrl'] + except Exception: + traceback.print_exc() + return url + else: + return url