import json import logging import asyncio from urllib.parse import quote as url_encode import aiohttp import discord import dyphanbot.utils as utils from . import EngineRateLimitError, EngineResponseError, ReverseImageSearchEngine SAUCE_INDEX = { "0" : "H-Magazines", "2" : "H-Game CG", "3" : "DoujinshiDB", "5" : "Pixiv", "6" : "Pixiv (Historical)", "8" : "Nico Nico Seiga", "9" : "Danbooru", "10": "drawr Images", "11": "Nijie Images", "12": "Yande.re", "15": "Shutterstock", "16": "FAKKU", "18": "H-Misc (nhentai)", "19": "2D-Market", "20": "MediBang", "21": "Anime", "22": "H-Anime", "23": "Movies", "24": "Shows", "25": "Gelbooru", "26": "Konachan", "27": "Sankaku Channel", "28": "Anime-Pictures.net", "29": "e621.net", "30": "Idol Complex", "31": "bcy.net Illust", "32": "bcy.net Cosplay", "33": "PortalGraphics.net (Hist)", "34": "deviantArt", "35": "Pawoo.net", "36": "Madokami (Manga)", "37": "MangaDex", "38": "E-Hentai", "39": "ArtStation", "40": "FurAffinity", "41": "Twitter", "42": "Furry Network", "43": "Kemono", # fucking unlisted indexes... x_x # these probably wont show up tho since they're sub-indexes, # but they're added just in case... "51": "Pixiv", "52": "Pixiv", "53": "Pixiv", "211": "Anime", # these, however, WILL show up as an index_id, # but for some reason they weren't documented anywhere. smh "341": "deviantArt", "371": "MangaDex" } SAUCE_TYPES = { "booru": [9, 12, 25, 26, 27, 28, 29, 30], "manga": [0, 3, 16, 18, 36, 37, 38, 371], "pixiv": [5, 6, 51, 52, 53], "anime": [21, 22, 211], "video": [23, 24], "twitter": [41] } NL = "\n" # cause fuck f-strings. class SauceNao(ReverseImageSearchEngine): """ SauceNAO engine """ url_base = "https://saucenao.com" url_path = "search.php" def __init__(self, config={}, loop=None, **request_args) -> None: super().__init__(self.url_base, self.url_path, name="SauceNAO", config=config, loop=loop, **request_args) self.config = self.engine_config.get("saucenao", {}) self.api_key = self.config.get("api_key") async def top_matches(self, url, limit=3, hide_nsfw=True): try: api_req = await self.search(url, post=True, data={ "output_type": 2, "api_key": self.api_key, "db": 999, "numres": limit if limit <= 10 else 10, "url": url, "hide": 2 }) api_data = json.loads(api_req) meta = api_data.get("header", {}) results = api_data.get("results", []) min_similarity = float(meta.get("minimum_similarity", 50)) returned_results = [] low_similarity_count = 0 hidden_nsfw_count = 0 for result in results: header = result["header"] data = result["data"] similarity = float(header["similarity"]) if similarity < min_similarity: low_similarity_count += 1 continue if header.get("hidden", 0) > 0 and hide_nsfw: hidden_nsfw_count += 1 continue sanitized_result = {} index_id = header["index_id"] sanitized_result["type"] = "generic" for sauce_type, indexes in SAUCE_TYPES.items(): if index_id in indexes: sanitized_result["type"] = sauce_type break sanitized_result.update({ "input_url": url, "similarity": similarity, "min_similarity": min_similarity, "nsfw": header.get("hidden", 0) > 0, "thumbnail": header.get("thumbnail"), "index_name": header.get("index_name"), "index_id": index_id, "index": SAUCE_INDEX.get(str(index_id)), "data": data }) # Make a base "generic" class and subclass the different type, # then call a factory function from the base class to instansiate # the proper subclass, if applicable, from the sanitized_result. Append # this object to returned_results, then return it. parsed_result = GenericSauce.from_dict(sanitized_result) await parsed_result._async_tasks() print(parsed_result) returned_results.append(parsed_result) return returned_results except aiohttp.ClientResponseError as err: if err.status == 429: raise EngineRateLimitError("Daily limit reached (100)") raise EngineResponseError(f"{err.status} {err.message}") except json.JSONDecodeError: raise EngineResponseError("Could not interpret result.") async def best_match(self, url, hide_nsfw=True): # Call self.top_matches() with the url and return the first result. top_three = await self.top_matches(url, hide_nsfw=hide_nsfw) if not top_three: return None return top_three[0] # Parts of the following classes were referenced from: # https://github.com/MakotoAme/pysaucenao/blob/master/pysaucenao/containers.py class GenericSauce(object): """ Generic attributes that are applicable for any source, but not always """ _type = "generic" def __init__(self, result: dict): self.result = result self.input_url = self.result["input_url"] # header attribs self.similarity = self.result["similarity"] self.min_similarity = self.result["min_similarity"] self.nsfw = self.result["nsfw"] self.thumbnail = self.result["thumbnail"] self.index_name = self.result["index_name"] self.index_id = self.result["index_id"] self.index = self.result["index"] # data attribs (will be parsed later) self.author_name = None self.author_url = None self.authors = None self.title = None self.url = None self.urls = None self._data = self.result["data"] self._parse_data(self._data) @classmethod def from_dict(cls, result): """ Instantiate a sauce object from dict """ def all_subclasses(cls): """ Make sure we get all the inherited classes """ return set(cls.__subclasses__()).union( [s for c in cls.__subclasses__() for s in all_subclasses(c)]) res_type = result.get("type") if res_type: for subcls in all_subclasses(cls): cls_type = subcls._type if cls_type == res_type: return subcls(result) return cls(result) @property def sauce_url(self): """ Returns the standard source url of the result """ return self.url async def _async_tasks(self): """ Called after initialization to complete async tasks needed by the source """ return def _parse_data(self, data: dict): """ Parse the data from the dict into the appropriate attributes; called at initialization """ # messy api... smh # "source" can sometimes be a url instead... -_- for title_field in ["title", "material", "eng_name", "source"]: if title_field not in data: continue self.title = data[title_field] break for author_field in ["member_name", "creator", "author_name", "author", "pawoo_user_acct", "pawoo_user_username", "pawoo_user_display_name"]: if author_field not in data: continue if isinstance(data[author_field], list): # it can sometimes be a list of authors, so parse accordingly self.author_name = data[author_field][0] self.authors = data[author_field] break self.author_name = data[author_field] self.authors = [data[author_field]] if "author_url" in data: self.author_url = data["author_url"] elif "pawoo_id" in data and "ext_urls" in data: self.author_url = data['ext_urls'][0] if "ext_urls" in data: self.url = data["ext_urls"][0] self.urls = data["ext_urls"] def generate_embed(self, requester=None, additional_desc="", show_links=True): """ Returns a discord embed to display the resulting information """ nsfw_tag = '**NSFW**\n' if self.nsfw else '' description = f"{nsfw_tag}Similarity: {self.similarity}%" if self.index: description += f"{NL}Matched in: {self.index}" if self.authors: author_str = ', '.join(self.authors) author_text = f"[{author_str}]({self.author_url})" if self.author_url else author_str description += f"{NL}**Author:** {author_text}" description += f"{NL}{additional_desc}" if additional_desc else "" if self.urls and show_links: url_list_str = '\n'.join(self.urls) description += f"{NL}{NL}**Links:**{NL}{url_list_str}" embed = discord.Embed(title=self.title, url=self.sauce_url, description=description) embed.set_author( name="SauceNAO", url=f"https://saucenao.com/search.php?url={url_encode(self.input_url)}", icon_url="https://i.imgur.com/Ynoqpam.png" ) if self.thumbnail: embed.set_thumbnail(url=self.thumbnail) if requester: embed.set_footer( icon_url=utils.get_user_avatar_url(requester), text=f"Requested by {requester}" ) return embed class PixivSauce(GenericSauce): """ Pixiv source type """ _type = "pixiv" def __init__(self, data: dict): super().__init__(data) def _parse_data(self, data: dict): super()._parse_data(data) self.author_url = f"https://pixiv.net/member.php?id={data['member_id']}" class BooruSauce(GenericSauce): """ Booru source type """ _type = "booru" def __init__(self, data: dict): super().__init__(data) @property def sauce_url(self): """ Returns the linked source if available """ return self._data.get("source", self.url) def _parse_data(self, data): super()._parse_data(data) for booru in ["gelbooru", "danbooru", "yandere", "konachan", "sankaku", "anime-pictures", "e621", "idol"]: id_field = f"{booru}_id" if id_field not in data: continue self.booru_type = booru self.post_id = data.get(id_field) break self.characters = data.get("characters") self.material = data.get("material") if self.characters: self.characters = [x.strip() for x in self.characters.split(',')] if self.material: self.material = [x.strip() for x in self.material.split(',')] if not self.title: self.title = f"Post #{self.post_id}" def generate_embed(self, requester=None): additional_desc = "" if self.characters: characters_str = ', '.join([f'`{x}`' for x in self.characters]) if isinstance(self.characters, list) else str(self.characters) additional_desc += f"**Characters:** {characters_str}" if self.material: material_str = ', '.join([f'`{x}`' for x in self.material]) if isinstance(self.material, list) else str(self.material) additional_desc += f"{NL}**Material:** {material_str}" return super().generate_embed(requester=requester, additional_desc=additional_desc) class TwitterSauce(GenericSauce): """ Twitter sauce type """ _type = "twitter" def __init__(self, data: dict): super().__init__(data) def _parse_data(self, data: dict): super()._parse_data(data) self.tweet_id = data["tweet_id"] self.twitter_user_id = data["twitter_user_id"] self.twitter_user_handle = data["twitter_user_handle"] self.author_name = self.twitter_user_handle self.author_url = f"https://twitter.com/i/user/{self.twitter_user_id}" self.authors = [self.author_name] if not self.title: self.title = f"Tweet by @{self.twitter_user_handle}" class VideoSauce(GenericSauce): """ Movies and Shows source """ _type = "video" def __init__(self, data: dict): self.episode = None self.timestamp = None self.year = None super().__init__(data) def _parse_data(self, data): super()._parse_data(data) self.episode = data.get("part") self.timestamp = data.get("est_time") self.year = data.get("year") def generate_embed(self, requester=None, additional_desc="", show_links=True): desc = "" if self.year: desc = f"**Year:** {self.year}" if self.episode: desc += f"{NL}**Episode:** {self.episode}" if self.timestamp: desc += f"{NL}**Timestamp:** {self.timestamp}" desc += f"{NL}{additional_desc}" if additional_desc else "" return super().generate_embed(requester, additional_desc=desc, show_links=show_links) class AnimeSauce(VideoSauce): """ Anime source """ _type = "anime" def __init__(self, data): self._logger = logging.getLogger(__name__) self._async_done = False self.anidb_aid = None self.anilist_id = None self.mal_id = None super().__init__(data) async def _async_tasks(self, loop=None): if self._async_done: return if not self.anidb_aid: return if self.anilist_id and self.mal_id: return async with aiohttp.ClientSession(loop=loop, raise_for_status=True) as session: try: resp = await session.get(f"https://relations.yuna.moe/api/ids?source=anidb&id={self.anidb_aid}") ids = await resp.json() or {} if not self.anilist_id: self.anilist_id = ids.get('anilist') if not self.mal_id: self.mal_id = ids.get("myanimelist") except json.JSONDecodeError: self._logger.info(f"relations.yuna.moe lookup failed for aid: {self.anidb_aid}") except aiohttp.ClientResponseError as err: self._logger.info(f"relations.yuna.moe returned a {err.status} error.") except aiohttp.ClientError as err: self._logger.info(f"unable to connect to relations.yuna.moe api") def _parse_data(self, data): super()._parse_data(data) self.anidb_aid = data.get("anidb_aid") self.anilist_id = data.get("anilist_id") self.mal_id = data.get("mal_id") @property def anidb_url(self): if not self.anidb_aid: return None return f"https://anidb.net/anime/{self.anidb_aid}" @property def anilist_url(self): if not self.anilist_id: return None return f"https://anilist.co/anime/{self.anilist_id}" @property def mal_url(self): if not self.mal_id: return None return f"https://myanimelist.net/anime/{self.mal_id}" def generate_embed(self, requester=None): link_strs = [] if self.anidb_url: link_strs.append(f"[AniDB]({self.anidb_url})") if self.anilist_url: link_strs.append(f"[AniList]({self.anilist_url})") if self.mal_url: link_strs.append(f"[MyAnimeList]({self.mal_url})") desc = f"{NL}{' | '.join(link_strs)}" return super().generate_embed(requester, additional_desc=desc, show_links=False) class MangaSauce(GenericSauce): """ Manga source type """ _type = "manga" def __init__(self, data: dict): self.chapter = None self.artist = None super().__init__(data) def _parse_data(self, data): super()._parse_data(data) self.chapter = data.get("part") self.artist = data.get("artist") def generate_embed(self, requester=None): desc = "" if self.artist: desc += f"**Artist:** {self.artist}" if self.chapter: desc += f"{NL}**Chapter:** {self.chapter}" return super().generate_embed(requester, additional_desc=desc)