SaucePlz/engines/saucenao.py
2022-04-25 00:23:37 -04:00

533 lines
17 KiB
Python

import json
import logging
import asyncio
from urllib.parse import quote as url_encode
import aiohttp
import discord
import dyphanbot.utils as utils
from . import EngineRateLimitError, EngineResponseError, ReverseImageSearchEngine
SAUCE_INDEX = {
"0" : "H-Magazines",
"2" : "H-Game CG",
"3" : "DoujinshiDB",
"5" : "Pixiv",
"6" : "Pixiv (Historical)",
"8" : "Nico Nico Seiga",
"9" : "Danbooru",
"10": "drawr Images",
"11": "Nijie Images",
"12": "Yande.re",
"15": "Shutterstock",
"16": "FAKKU",
"18": "H-Misc (nhentai)",
"19": "2D-Market",
"20": "MediBang",
"21": "Anime",
"22": "H-Anime",
"23": "Movies",
"24": "Shows",
"25": "Gelbooru",
"26": "Konachan",
"27": "Sankaku Channel",
"28": "Anime-Pictures.net",
"29": "e621.net",
"30": "Idol Complex",
"31": "bcy.net Illust",
"32": "bcy.net Cosplay",
"33": "PortalGraphics.net (Hist)",
"34": "deviantArt",
"35": "Pawoo.net",
"36": "Madokami (Manga)",
"37": "MangaDex",
"38": "E-Hentai",
"39": "ArtStation",
"40": "FurAffinity",
"41": "Twitter",
"42": "Furry Network",
"43": "Kemono",
# fucking unlisted indexes... x_x
# these probably wont show up tho since they're sub-indexes,
# but they're added just in case...
"51": "Pixiv",
"52": "Pixiv",
"53": "Pixiv",
"211": "Anime",
# these, however, WILL show up as an index_id,
# but for some reason they weren't documented anywhere. smh
"341": "deviantArt",
"371": "MangaDex"
}
SAUCE_TYPES = {
"booru": [9, 12, 25, 26, 27, 28, 29, 30],
"manga": [0, 3, 16, 18, 36, 37, 38, 371],
"pixiv": [5, 6, 51, 52, 53],
"anime": [21, 22, 211],
"video": [23, 24],
"twitter": [41]
}
NL = "\n" # cause fuck f-strings.
class SauceNao(ReverseImageSearchEngine):
"""
SauceNAO engine
"""
url_base = "https://saucenao.com"
url_path = "search.php"
def __init__(self, config={}, loop=None, **request_args) -> None:
super().__init__(self.url_base, self.url_path, name="SauceNAO", config=config, loop=loop, **request_args)
self.config = self.engine_config.get("saucenao", {})
self.api_key = self.config.get("api_key")
async def top_matches(self, url, limit=3, hide_nsfw=True):
try:
api_req = await self.search(url, post=True, data={
"output_type": 2,
"api_key": self.api_key,
"db": 999,
"numres": limit if limit <= 10 else 10,
"url": url,
"hide": 2
})
api_data = json.loads(api_req)
meta = api_data.get("header", {})
results = api_data.get("results", [])
min_similarity = float(meta.get("minimum_similarity", 50))
returned_results = []
low_similarity_count = 0
hidden_nsfw_count = 0
for result in results:
header = result["header"]
data = result["data"]
similarity = float(header["similarity"])
if similarity < min_similarity:
low_similarity_count += 1
continue
if header.get("hidden", 0) > 0 and hide_nsfw:
hidden_nsfw_count += 1
continue
sanitized_result = {}
index_id = header["index_id"]
sanitized_result["type"] = "generic"
for sauce_type, indexes in SAUCE_TYPES.items():
if index_id in indexes:
sanitized_result["type"] = sauce_type
break
sanitized_result.update({
"input_url": url,
"similarity": similarity,
"min_similarity": min_similarity,
"nsfw": header.get("hidden", 0) > 0,
"thumbnail": header.get("thumbnail"),
"index_name": header.get("index_name"),
"index_id": index_id,
"index": SAUCE_INDEX.get(str(index_id)),
"data": data
})
# Make a base "generic" class and subclass the different type,
# then call a factory function from the base class to instansiate
# the proper subclass, if applicable, from the sanitized_result. Append
# this object to returned_results, then return it.
parsed_result = GenericSauce.from_dict(sanitized_result)
await parsed_result._async_tasks()
print(parsed_result)
returned_results.append(parsed_result)
return returned_results
except aiohttp.ClientResponseError as err:
if err.status == 429:
raise EngineRateLimitError("Daily limit reached (100)")
raise EngineResponseError(f"{err.status} {err.message}")
except json.JSONDecodeError:
raise EngineResponseError("Could not interpret result.")
async def best_match(self, url, hide_nsfw=True):
# Call self.top_matches() with the url and return the first result.
top_three = await self.top_matches(url, hide_nsfw=hide_nsfw)
if not top_three:
return None
return top_three[0]
# Parts of the following classes were referenced from:
# https://github.com/MakotoAme/pysaucenao/blob/master/pysaucenao/containers.py
class GenericSauce(object):
""" Generic attributes that are applicable for any source, but not always """
_type = "generic"
def __init__(self, result: dict):
self.result = result
self.input_url = self.result["input_url"]
# header attribs
self.similarity = self.result["similarity"]
self.min_similarity = self.result["min_similarity"]
self.nsfw = self.result["nsfw"]
self.thumbnail = self.result["thumbnail"]
self.index_name = self.result["index_name"]
self.index_id = self.result["index_id"]
self.index = self.result["index"]
# data attribs (will be parsed later)
self.author_name = None
self.author_url = None
self.authors = None
self.title = None
self.url = None
self.urls = None
self._data = self.result["data"]
self._parse_data(self._data)
@classmethod
def from_dict(cls, result):
""" Instantiate a sauce object from dict """
def all_subclasses(cls):
""" Make sure we get all the inherited classes """
return set(cls.__subclasses__()).union(
[s for c in cls.__subclasses__() for s in all_subclasses(c)])
res_type = result.get("type")
if res_type:
for subcls in all_subclasses(cls):
cls_type = subcls._type
if cls_type == res_type:
return subcls(result)
return cls(result)
@property
def sauce_url(self):
"""
Returns the standard source url of the result
"""
return self.url
async def _async_tasks(self):
""" Called after initialization to complete async tasks needed by the source """
return
def _parse_data(self, data: dict):
"""
Parse the data from the dict into the appropriate attributes; called at initialization
"""
# messy api... smh
# "source" can sometimes be a url instead... -_-
for title_field in ["title", "material", "eng_name", "source"]:
if title_field not in data:
continue
self.title = data[title_field]
break
for author_field in ["member_name", "creator", "author_name", "author",
"pawoo_user_acct", "pawoo_user_username", "pawoo_user_display_name"]:
if author_field not in data:
continue
if isinstance(data[author_field], list):
# it can sometimes be a list of authors, so parse accordingly
self.author_name = data[author_field][0]
self.authors = data[author_field]
break
self.author_name = data[author_field]
self.authors = [data[author_field]]
if "author_url" in data:
self.author_url = data["author_url"]
elif "pawoo_id" in data and "ext_urls" in data:
self.author_url = data['ext_urls'][0]
if "ext_urls" in data:
self.url = data["ext_urls"][0]
self.urls = data["ext_urls"]
def generate_embed(self, requester=None, additional_desc="", show_links=True):
""" Returns a discord embed to display the resulting information """
nsfw_tag = '**NSFW**\n' if self.nsfw else ''
description = f"{nsfw_tag}Similarity: {self.similarity}%"
if self.index:
description += f"{NL}Matched in: {self.index}"
if self.authors:
author_str = ', '.join(self.authors)
author_text = f"[{author_str}]({self.author_url})" if self.author_url else author_str
description += f"{NL}**Author:** {author_text}"
description += f"{NL}{additional_desc}" if additional_desc else ""
if self.urls and show_links:
url_list_str = '\n'.join(self.urls)
description += f"{NL}{NL}**Links:**{NL}{url_list_str}"
embed = discord.Embed(title=self.title, url=self.sauce_url, description=description)
embed.set_author(
name="SauceNAO",
url=f"https://saucenao.com/search.php?url={url_encode(self.input_url)}",
icon_url="https://i.imgur.com/Ynoqpam.png"
)
if self.thumbnail:
embed.set_thumbnail(url=self.thumbnail)
if requester:
embed.set_footer(
icon_url=utils.get_user_avatar_url(requester),
text=f"Requested by {requester}"
)
return embed
class PixivSauce(GenericSauce):
""" Pixiv source type """
_type = "pixiv"
def __init__(self, data: dict):
super().__init__(data)
def _parse_data(self, data: dict):
super()._parse_data(data)
self.author_url = f"https://pixiv.net/member.php?id={data['member_id']}"
class BooruSauce(GenericSauce):
""" Booru source type """
_type = "booru"
def __init__(self, data: dict):
super().__init__(data)
@property
def sauce_url(self):
""" Returns the linked source if available """
return self._data.get("source", self.url)
def _parse_data(self, data):
super()._parse_data(data)
for booru in ["gelbooru", "danbooru", "yandere",
"konachan", "sankaku", "anime-pictures",
"e621", "idol"]:
id_field = f"{booru}_id"
if id_field not in data:
continue
self.booru_type = booru
self.post_id = data.get(id_field)
break
self.characters = data.get("characters")
self.material = data.get("material")
if self.characters:
self.characters = [x.strip() for x in self.characters.split(',')]
if self.material:
self.material = [x.strip() for x in self.material.split(',')]
if not self.title:
self.title = f"Post #{self.post_id}"
def generate_embed(self, requester=None):
additional_desc = ""
if self.characters:
characters_str = ', '.join([f'`{x}`' for x in self.characters]) if isinstance(self.characters, list) else str(self.characters)
additional_desc += f"**Characters:** {characters_str}"
if self.material:
material_str = ', '.join([f'`{x}`' for x in self.material]) if isinstance(self.material, list) else str(self.material)
additional_desc += f"{NL}**Material:** {material_str}"
return super().generate_embed(requester=requester, additional_desc=additional_desc)
class TwitterSauce(GenericSauce):
""" Twitter sauce type """
_type = "twitter"
def __init__(self, data: dict):
super().__init__(data)
def _parse_data(self, data: dict):
super()._parse_data(data)
self.tweet_id = data["tweet_id"]
self.twitter_user_id = data["twitter_user_id"]
self.twitter_user_handle = data["twitter_user_handle"]
self.author_name = self.twitter_user_handle
self.author_url = f"https://twitter.com/i/user/{self.twitter_user_id}"
self.authors = [self.author_name]
if not self.title:
self.title = f"Tweet by @{self.twitter_user_handle}"
class VideoSauce(GenericSauce):
""" Movies and Shows source """
_type = "video"
def __init__(self, data: dict):
self.episode = None
self.timestamp = None
self.year = None
super().__init__(data)
def _parse_data(self, data):
super()._parse_data(data)
self.episode = data.get("part")
self.timestamp = data.get("est_time")
self.year = data.get("year")
def generate_embed(self, requester=None, additional_desc="", show_links=True):
desc = ""
if self.year:
desc = f"**Year:** {self.year}"
if self.episode:
desc += f"{NL}**Episode:** {self.episode}"
if self.timestamp:
desc += f"{NL}**Timestamp:** {self.timestamp}"
desc += f"{NL}{additional_desc}" if additional_desc else ""
return super().generate_embed(requester, additional_desc=desc, show_links=show_links)
class AnimeSauce(VideoSauce):
""" Anime source """
_type = "anime"
def __init__(self, data):
self._logger = logging.getLogger(__name__)
self._async_done = False
self.anidb_aid = None
self.anilist_id = None
self.mal_id = None
super().__init__(data)
async def _async_tasks(self, loop=None):
if self._async_done:
return
if not self.anidb_aid:
return
if self.anilist_id and self.mal_id:
return
async with aiohttp.ClientSession(loop=loop, raise_for_status=True) as session:
try:
resp = await session.get(f"https://relations.yuna.moe/api/ids?source=anidb&id={self.anidb_aid}")
ids = await resp.json() or {}
if not self.anilist_id:
self.anilist_id = ids.get('anilist')
if not self.mal_id:
self.mal_id = ids.get("myanimelist")
except json.JSONDecodeError:
self._logger.info(f"relations.yuna.moe lookup failed for aid: {self.anidb_aid}")
except aiohttp.ClientResponseError as err:
self._logger.info(f"relations.yuna.moe returned a {err.status} error.")
except aiohttp.ClientError as err:
self._logger.info(f"unable to connect to relations.yuna.moe api")
def _parse_data(self, data):
super()._parse_data(data)
self.anidb_aid = data.get("anidb_aid")
self.anilist_id = data.get("anilist_id")
self.mal_id = data.get("mal_id")
@property
def anidb_url(self):
if not self.anidb_aid:
return None
return f"https://anidb.net/anime/{self.anidb_aid}"
@property
def anilist_url(self):
if not self.anilist_id:
return None
return f"https://anilist.co/anime/{self.anilist_id}"
@property
def mal_url(self):
if not self.mal_id:
return None
return f"https://myanimelist.net/anime/{self.mal_id}"
def generate_embed(self, requester=None):
link_strs = []
if self.anidb_url:
link_strs.append(f"[AniDB]({self.anidb_url})")
if self.anilist_url:
link_strs.append(f"[AniList]({self.anilist_url})")
if self.mal_url:
link_strs.append(f"[MyAnimeList]({self.mal_url})")
desc = f"{NL}{' | '.join(link_strs)}"
return super().generate_embed(requester, additional_desc=desc, show_links=False)
class MangaSauce(GenericSauce):
""" Manga source type """
_type = "manga"
def __init__(self, data: dict):
self.chapter = None
self.artist = None
super().__init__(data)
def _parse_data(self, data):
super()._parse_data(data)
self.chapter = data.get("part")
self.artist = data.get("artist")
def generate_embed(self, requester=None):
desc = ""
if self.artist:
desc += f"**Artist:** {self.artist}"
if self.chapter:
desc += f"{NL}**Chapter:** {self.chapter}"
return super().generate_embed(requester, additional_desc=desc)