You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

156 lines
5.4 KiB

import traceback
import aiohttp
import re
import discord
from discord.ext import pages
from models.deepl_model import TranslationModel
from models.search_model import Search
from services.environment_service import EnvService
from services.text_service import TextService
ALLOWED_GUILDS = EnvService.get_allowed_guilds()
USER_INPUT_API_KEYS = EnvService.get_user_input_api_keys()
USER_KEY_DB = EnvService.get_api_db()
class RedoSearchUser:
def __init__(self, ctx, query, search_scope, nodes):
self.ctx = ctx
self.query = query
self.search_scope = search_scope
self.nodes = nodes
class SearchService(discord.Cog, name="SearchService"):
"""Cog containing translation commands and retrieval of translation services"""
def __init__(
self,
bot,
gpt_model,
usage_service,
):
super().__init__()
self.bot = bot
self.usage_service = usage_service
self.model = Search(gpt_model, usage_service)
self.EMBED_CUTOFF = 2000
self.redo_users = {}
# Make a mapping of all the country codes and their full country names:
async def paginate_embed(self, response_text, user: discord.Member):
"""Given a response text make embed pages and return a list of the pages. Codex makes it a codeblock in the embed"""
response_text = [
response_text[i : i + self.EMBED_CUTOFF]
for i in range(0, len(response_text), self.EMBED_CUTOFF)
]
pages = []
first = False
# Send each chunk as a message
for count, chunk in enumerate(response_text, start=1):
if not first:
page = discord.Embed(
title=f"Search Results",
description=chunk,
)
first = True
else:
page = discord.Embed(
title=f"Page {count}",
description=chunk,
)
page.set_footer(text=f"Requested by {user.name}", icon_url=user.avatar.url)
pages.append(page)
return pages
async def search_command(
self, ctx: discord.ApplicationContext, query, search_scope, nodes, redo=None
):
"""Command handler for the translation command"""
user_api_key = None
if USER_INPUT_API_KEYS:
user_api_key = await TextService.get_user_api_key(
ctx.user.id, ctx, USER_KEY_DB
)
if not user_api_key:
return
if (
not EnvService.get_google_search_api_key()
or not EnvService.get_google_search_engine_id()
):
await ctx.respond("The search service is not enabled.")
return
await ctx.defer() if not redo else None
try:
response, refined_text = await self.model.search(
ctx, query, user_api_key, search_scope, nodes
)
except ValueError:
await ctx.respond(
"The Google Search API returned an error. Check the console for more details.",
ephemeral=True,
)
return
except Exception:
await ctx.respond(
"An error occurred. Check the console for more details.", ephemeral=True
)
traceback.print_exc()
return
url_extract_pattern = "https?:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)"
urls = re.findall(
url_extract_pattern,
str(response.get_formatted_sources(length=200)),
flags=re.IGNORECASE,
)
urls = "\n".join(f"<{url}>" for url in urls)
query_response_message = f"**Question:**\n\n`{query.strip()}`\n\n**Google Search Query**\n\n`{refined_text.strip()}`\n\n**Final Answer:**\n\n{response.response.strip()}\n\n**Sources:**\n{urls}"
query_response_message = query_response_message.replace(
"<|endofstatement|>", ""
)
# If the response is too long, lets paginate using the discord pagination
# helper
embed_pages = await self.paginate_embed(query_response_message, ctx.user)
paginator = pages.Paginator(
pages=embed_pages,
timeout=None,
author_check=False,
custom_view=RedoButton(ctx, self),
)
self.redo_users[ctx.user.id] = RedoSearchUser(ctx, query, search_scope, nodes)
await paginator.respond(ctx.interaction)
# A view for a redo button
class RedoButton(discord.ui.View):
def __init__(self, ctx: discord.ApplicationContext, search_cog):
super().__init__()
self.ctx = ctx
self.search_cog = search_cog
@discord.ui.button(label="Redo", style=discord.ButtonStyle.danger)
async def redo(self, button: discord.ui.Button, interaction: discord.Interaction):
"""Redo the translation"""
await interaction.response.send_message(
"Redoing search...", ephemeral=True, delete_after=15
)
await self.search_cog.search_command(
self.search_cog.redo_users[self.ctx.user.id].ctx,
self.search_cog.redo_users[self.ctx.user.id].query,
self.search_cog.redo_users[self.ctx.user.id].search_scope,
self.search_cog.redo_users[self.ctx.user.id].nodes,
redo=True,
)