import datetime
import re
import traceback
from collections import defaultdict
import aiohttp
import discord
from discord . ext import pages
import unidecode
from models . embed_statics_model import EmbedStatics
from services . deletion_service import Deletion
from models . openai_model import Model , Override
from models . user_model import EmbeddedConversationItem , RedoUser
from services . environment_service import EnvService
from services . moderations_service import Moderation
BOT_NAME = EnvService . get_custom_bot_name ( )
PRE_MODERATE = EnvService . get_premoderate ( )
class TextService :
def __init__ ( self ) :
pass
@staticmethod
async def encapsulated_send (
converser_cog ,
id ,
prompt ,
ctx ,
response_message = None ,
overrides = None ,
instruction = None ,
from_ask_command = False ,
from_edit_command = False ,
codex = False ,
model = None ,
user = None ,
custom_api_key = None ,
edited_request = False ,
redo_request = False ,
from_ask_action = False ,
from_other_action = None ,
from_message_context = None ,
) :
""" General service function for sending and receiving gpt generations
Args :
converser_cog ( Cog ) : The conversation cog with our gpt commands
id ( user or thread id ) : A user or thread id for keeping track of conversations
prompt ( str ) : The prompt to use for generation
ctx ( ApplicationContext ) : The interaction which called this
response_message ( discord . Message , optional ) : For when we ' re doing redos. Defaults to None.
temp_override ( float , optional ) : Sets the temperature for the generation . Defaults to None .
top_p_override ( float , optional ) : Sets the top p for the generation . Defaults to None .
frequency_penalty_override ( float , optional ) : Sets the frequency penalty for the generation . Defaults to None .
presence_penalty_override ( float , optional ) : Sets the presence penalty for the generation . Defaults to None .
instruction ( str , optional ) : Instruction for use with the edit endpoint . Defaults to None .
from_ask_command ( bool , optional ) : Called from the ask command . Defaults to False .
from_edit_command ( bool , optional ) : Called from the edit command . Defaults to False .
codex ( bool , optional ) : Pass along that we want to use a codex model . Defaults to False .
model ( str , optional ) : Which model to generate output with . Defaults to None .
user ( discord . User , optional ) : An user object that can be used to set the stop . Defaults to None .
custom_api_key ( str , optional ) : per - user api key . Defaults to None .
edited_request ( bool , optional ) : If we ' re doing an edited message. Defaults to False.
redo_request ( bool , optional ) : If we ' re redoing a previous prompt. Defaults to False.
from_action ( bool , optional ) : If the function is being called from a message action . Defaults to False .
"""
new_prompt , _new_prompt_clean = (
prompt # + "\n" + BOT_NAME
if not from_ask_command and not from_edit_command and not redo_request
else prompt
) , prompt
# Determine if we're sending a ChatGPT model request
chatgpt = False
if model and " chatgpt " in model . lower ( ) :
chatgpt = True
stop = f " { ctx . author . display_name if user is None else user . display_name } : "
from_context = isinstance ( ctx , discord . ApplicationContext )
if not instruction :
tokens = converser_cog . usage_service . count_tokens ( new_prompt )
else :
tokens = converser_cog . usage_service . count_tokens (
new_prompt
) + converser_cog . usage_service . count_tokens ( instruction )
try :
user_displayname = (
ctx . author . display_name if not user else user . display_name
)
# Pinecone is enabled, we will create embeddings for this conversation.
if (
converser_cog . pinecone_service
and ctx . channel . id in converser_cog . conversation_threads
) :
for item in converser_cog . conversation_threads [ ctx . channel . id ] . history :
if item . text . strip ( ) == BOT_NAME + " <|endofstatement|> " :
converser_cog . conversation_threads [
ctx . channel . id
] . history . remove ( item )
# The conversation_id is the id of the thread
conversation_id = ctx . channel . id
# Create an embedding and timestamp for the prompt
# new_prompt = prompt.encode("ascii", "ignore").decode()
new_prompt = unidecode . unidecode ( new_prompt )
prompt_less_author = f " { new_prompt } <|endofstatement|> \n "
new_prompt = f " \n { user_displayname } : { new_prompt } <|endofstatement|> \n "
# new_prompt = new_prompt.encode("ascii", "ignore").decode()
new_prompt = unidecode . unidecode ( new_prompt )
timestamp = int (
str ( datetime . datetime . now ( ) . timestamp ( ) ) . replace ( " . " , " " )
)
new_prompt_item = EmbeddedConversationItem ( new_prompt , timestamp )
if not redo_request :
converser_cog . conversation_threads [ conversation_id ] . history . append (
new_prompt_item
)
if edited_request :
new_prompt = " " . join (
[
item . text
for item in converser_cog . conversation_threads [
ctx . channel . id
] . history
]
)
converser_cog . redo_users [ ctx . author . id ] . prompt = new_prompt
else :
# Create and upsert the embedding for the conversation id, prompt, timestamp
await converser_cog . pinecone_service . upsert_conversation_embedding (
converser_cog . model ,
conversation_id ,
new_prompt ,
timestamp ,
custom_api_key = custom_api_key ,
)
# Print all phrases
embedding_prompt_less_author = await converser_cog . model . send_embedding_request (
prompt_less_author , custom_api_key = custom_api_key
) # Use the version of the prompt without the author's name for better clarity on retrieval.
# Now, build the new prompt by getting the X most similar with pinecone
similar_prompts = converser_cog . pinecone_service . get_n_similar (
conversation_id ,
embedding_prompt_less_author ,
n = converser_cog . model . num_conversation_lookback ,
)
# When we are in embeddings mode, only the pre-text is contained in converser_cog.conversation_threads[message.channel.id].history, so we
# can use that as a base to build our new prompt
_prompt_with_history = [
converser_cog . conversation_threads [ ctx . channel . id ] . history [ 0 ]
]
# Append the similar prompts to the prompt with history
_prompt_with_history + = [
EmbeddedConversationItem ( prompt , timestamp )
for prompt , timestamp in similar_prompts
]
# iterate UP TO the last X prompts in the history
for i in range (
1 ,
min (
len (
converser_cog . conversation_threads [
ctx . channel . id
] . history
) ,
converser_cog . model . num_static_conversation_items ,
) ,
) :
_prompt_with_history . append (
converser_cog . conversation_threads [ ctx . channel . id ] . history [
- i
]
)
# remove duplicates from prompt_with_history and set the conversation history
_prompt_with_history = list ( dict . fromkeys ( _prompt_with_history ) )
# Sort the prompt_with_history by increasing timestamp if pinecone is enabled
if converser_cog . pinecone_service :
_prompt_with_history . sort ( key = lambda x : x . timestamp )
# Remove the last two entries after sort, this is from the end of the list as prompt(redo), answer, prompt(original), leaving only prompt(original) and further history
if redo_request :
_prompt_with_history = _prompt_with_history [ : - 2 ]
converser_cog . conversation_threads [
ctx . channel . id
] . history = _prompt_with_history
# Ensure that the last prompt in this list is the prompt we just sent (new_prompt_item)
if _prompt_with_history [ - 1 ] . text != new_prompt_item . text :
try :
_prompt_with_history . remove ( new_prompt_item )
except ValueError :
pass
_prompt_with_history . append ( new_prompt_item )
prompt_with_history = " " . join (
[ item . text for item in _prompt_with_history ]
)
new_prompt = prompt_with_history + " \n " + BOT_NAME
tokens = converser_cog . usage_service . count_tokens ( new_prompt )
# No pinecone, we do conversation summarization for long term memory instead
elif (
id in converser_cog . conversation_threads
and tokens > converser_cog . model . summarize_threshold
and not from_ask_command
and not from_edit_command
and not converser_cog . pinecone_service # This should only happen if we are not doing summarizations.
) :
# We don't need to worry about the differences between interactions and messages in this block,
# because if we are in this block, we can only be using a message object for ctx
if converser_cog . model . summarize_conversations :
await ctx . reply (
" I ' m currently summarizing our current conversation so we can keep chatting, "
" give me one moment! "
)
await converser_cog . summarize_conversation ( ctx , new_prompt )
# Check again if the prompt is about to go past the token limit
new_prompt = (
" " . join (
[
item . text
for item in converser_cog . conversation_threads [
id
] . history
]
)
+ " \n "
+ BOT_NAME
)
tokens = converser_cog . usage_service . count_tokens ( new_prompt )
if (
tokens > converser_cog . model . summarize_threshold - 150
) : # 150 is a buffer for the second stage
await ctx . reply (
" I tried to summarize our current conversation so we could keep chatting, "
" but it still went over the token "
" limit. Please try again later. "
)
await converser_cog . end_conversation ( ctx )
converser_cog . remove_awaiting ( ctx . author . id , ctx . channel . id )
return
else :
await ctx . reply ( " The conversation context limit has been reached. " )
await converser_cog . end_conversation ( ctx )
return
if not converser_cog . pinecone_service :
_prompt_with_history = converser_cog . conversation_threads [ ctx . channel . id ] . history
print ( " The prompt with history is " , _prompt_with_history )
# Send the request to the model
if chatgpt :
response = await converser_cog . model . send_chatgpt_request (
_prompt_with_history ,
bot_name = BOT_NAME ,
user_displayname = user_displayname ,
temp_override = overrides . temperature ,
top_p_override = overrides . top_p ,
frequency_penalty_override = overrides . frequency_penalty ,
presence_penalty_override = overrides . presence_penalty ,
stop = stop if not from_ask_command else None ,
custom_api_key = custom_api_key ,
)
elif from_edit_command :
response = await converser_cog . model . send_edit_request (
text = new_prompt ,
instruction = instruction ,
temp_override = overrides . temperature ,
top_p_override = overrides . top_p ,
codex = codex ,
custom_api_key = custom_api_key ,
)
else :
response = await converser_cog . model . send_request (
new_prompt ,
tokens = tokens ,
temp_override = overrides . temperature ,
top_p_override = overrides . top_p ,
frequency_penalty_override = overrides . frequency_penalty ,
presence_penalty_override = overrides . presence_penalty ,
model = model ,
stop = stop if not from_ask_command else None ,
custom_api_key = custom_api_key ,
)
# Clean the request response
response_text = converser_cog . cleanse_response (
str ( response [ " choices " ] [ 0 ] [ " text " ] )
) if not chatgpt else converser_cog . cleanse_response ( str ( response [ " choices " ] [ 0 ] [ " message " ] [ " content " ] ) )
if from_message_context :
response_text = f " { response_text } "
elif from_other_action :
response_text = f " *** { from_other_action } *** { response_text } "
elif from_ask_command or from_ask_action :
response_text = f " *** { prompt } *** { response_text } "
elif from_edit_command :
if codex :
response_text = response_text . strip ( )
response_text = f " ***Prompt: \n ` { prompt } `*** \n ***Instruction: \n ` { instruction } `*** \n \n ``` \n { response_text } \n ``` "
else :
response_text = response_text . strip ( )
response_text = f " ***Prompt: \n ` { prompt } `*** \n ***Instruction: \n ` { instruction } `*** \n \n { response_text } \n "
# If gpt3 tries writing a user mention try to replace it with their name
response_text = await converser_cog . mention_to_username ( ctx , response_text )
# If the user is conversing, add the GPT response to their conversation history.
if (
ctx . channel . id in converser_cog . conversation_threads
and not from_ask_command
and not converser_cog . pinecone_service
) :
if not redo_request :
converser_cog . conversation_threads [ ctx . channel . id ] . history . append (
EmbeddedConversationItem (
" \n "
+ BOT_NAME
+ str ( response_text )
+ " <|endofstatement|> \n " ,
0 ,
)
)
# Embeddings case!
elif (
ctx . channel . id in converser_cog . conversation_threads
and not from_ask_command
and not from_edit_command
and converser_cog . pinecone_service
) :
conversation_id = ctx . channel . id
# A cleaner version for the convo history
response_text_clean = str ( response_text )
# Create an embedding and timestamp for the prompt
response_text = (
" \n " + BOT_NAME + str ( response_text ) + " <|endofstatement|> \n "
)
# response_text = response_text.encode("ascii", "ignore").decode()
response_text = unidecode . unidecode ( response_text )
# Print the current timestamp
timestamp = int (
str ( datetime . datetime . now ( ) . timestamp ( ) ) . replace ( " . " , " " )
)
converser_cog . conversation_threads [ conversation_id ] . history . append (
EmbeddedConversationItem ( response_text , timestamp )
)
# Create and upsert the embedding for the conversation id, prompt, timestamp
embedding = (
await converser_cog . pinecone_service . upsert_conversation_embedding (
converser_cog . model ,
conversation_id ,
response_text ,
timestamp ,
custom_api_key = custom_api_key ,
)
)
# Cleanse again
response_text = converser_cog . cleanse_response ( response_text )
converser_cog . full_conversation_history [ ctx . channel . id ] . append (
response_text
)
# escape any other mentions like @here or @everyone
response_text = discord . utils . escape_mentions ( response_text )
# If we don't have a response message, we are not doing a redo, send as a new message(s)
if not response_message :
if len ( response_text ) > converser_cog . TEXT_CUTOFF :
if not from_context :
paginator = None
await converser_cog . paginate_and_send ( response_text , ctx )
else :
embed_pages = await converser_cog . paginate_embed (
response_text , codex , prompt , instruction
)
view = ConversationView (
ctx ,
converser_cog ,
ctx . channel . id ,
model ,
from_ask_command ,
from_edit_command ,
custom_api_key = custom_api_key ,
)
paginator = pages . Paginator (
pages = embed_pages ,
timeout = None ,
custom_view = view ,
author_check = True ,
)
response_message = await paginator . respond ( ctx . interaction )
else :
paginator = None
if not from_context :
response_message = await ctx . reply (
response_text ,
view = ConversationView (
ctx ,
converser_cog ,
ctx . channel . id ,
model ,
custom_api_key = custom_api_key ,
) ,
)
elif from_edit_command :
response_message = await ctx . respond (
embed = EmbedStatics . get_edit_command_output_embed (
response_text
) ,
view = ConversationView (
ctx ,
converser_cog ,
ctx . channel . id ,
model ,
from_edit_command = from_edit_command ,
custom_api_key = custom_api_key ,
) ,
)
else :
response_message = await ctx . respond (
response_text ,
view = ConversationView (
ctx ,
converser_cog ,
ctx . channel . id ,
model ,
from_ask_command = from_ask_command ,
custom_api_key = custom_api_key ,
) ,
)
converser_cog . redo_users [ ctx . author . id ] = RedoUser (
prompt = new_prompt if not converser_cog . pinecone_service else prompt ,
instruction = instruction ,
ctx = ctx ,
message = ctx ,
response = response_message ,
codex = codex ,
paginator = paginator ,
)
converser_cog . redo_users [ ctx . author . id ] . add_interaction (
response_message . id
)
# We are doing a redo, edit the message.
else :
paginator = converser_cog . redo_users . get ( ctx . author . id ) . paginator
if isinstance ( paginator , pages . Paginator ) :
embed_pages = await converser_cog . paginate_embed (
response_text , codex , prompt , instruction
)
view = ConversationView (
ctx ,
converser_cog ,
ctx . channel . id ,
model ,
from_ask_command ,
from_edit_command ,
custom_api_key = custom_api_key ,
)
await paginator . update ( pages = embed_pages , custom_view = view )
elif len ( response_text ) > converser_cog . TEXT_CUTOFF :
if not from_context :
await response_message . channel . send (
" Over 2000 characters " , delete_after = 5
)
else :
if not from_edit_command :
await response_message . edit ( content = response_text )
else :
await response_message . edit (
embed = EmbedStatics . get_edit_command_output_embed (
response_text
)
)
await converser_cog . send_debug_message (
converser_cog . generate_debug_message ( prompt , response ) ,
converser_cog . debug_channel ,
)
converser_cog . remove_awaiting (
ctx . author . id , ctx . channel . id , from_ask_command , from_edit_command
)
# Error catching for AIOHTTP Errors
except aiohttp . ClientResponseError as e :
embed = EmbedStatics . get_invalid_api_response_embed ( e )
if from_context :
await ctx . send_followup ( embed = embed )
else :
await ctx . reply ( embed = embed )
converser_cog . remove_awaiting (
ctx . author . id , ctx . channel . id , from_ask_command , from_edit_command
)
# Error catching for OpenAI model value errors
except ValueError as e :
embed = EmbedStatics . get_invalid_value_embed ( e )
if from_ask_action :
await ctx . respond ( embed = embed , ephemeral = True )
elif from_context :
await ctx . send_followup ( embed = embed , ephemeral = True )
else :
await ctx . reply ( e )
converser_cog . remove_awaiting (
ctx . author . id , ctx . channel . id , from_ask_command , from_edit_command
)
# General catch case for everything
except Exception as e :
embed = EmbedStatics . get_general_error_embed ( e )
if not from_context :
await ctx . send_followup ( embed = embed )
elif from_ask_command :
await ctx . respond ( embed = embed )
else :
await ctx . reply ( embed = embed )
converser_cog . remove_awaiting (
ctx . author . id , ctx . channel . id , from_ask_command , from_edit_command
)
traceback . print_exc ( )
try :
await converser_cog . end_conversation ( ctx )
except Exception :
pass
return
@staticmethod
async def process_conversation_message (
converser_cog , message , USER_INPUT_API_KEYS , USER_KEY_DB
) :
content = message . content . strip ( )
conversing = converser_cog . check_conversing ( message . channel . id , content )
# If the user is conversing and they want to end it, end it immediately before we continue any further.
if conversing and message . content . lower ( ) in converser_cog . END_PROMPTS :
await converser_cog . end_conversation ( message )
return
if conversing :
# Pre-moderation check
if PRE_MODERATE :
if await Moderation . simple_moderate_and_respond (
message . content , message
) :
await message . delete ( )
return
user_api_key = None
if USER_INPUT_API_KEYS :
user_api_key = await TextService . get_user_api_key (
message . author . id , message , USER_KEY_DB
)
if not user_api_key :
return
prompt = await converser_cog . mention_to_username ( message , content )
if await converser_cog . check_conversation_limit ( message ) :
return
# If the user is in a conversation thread
if message . channel . id in converser_cog . conversation_threads :
# Since this is async, we don't want to allow the user to send another prompt while a conversation
# prompt is processing, that'll mess up the conversation history!
if message . author . id in converser_cog . awaiting_responses :
resp_message = await message . reply (
embed = discord . Embed (
title = f " You are already waiting for a response, please wait and speak afterwards. " ,
color = 0x808080 ,
)
)
try :
await resp_message . channel . trigger_typing ( )
except :
pass
# get the current date, add 10 seconds to it, and then turn it into a timestamp.
# we need to use our deletion service because this isn't an interaction, it's a regular message.
deletion_time = datetime . datetime . now ( ) + datetime . timedelta (
seconds = 5
)
deletion_time = deletion_time . timestamp ( )
deletion_message = Deletion ( resp_message , deletion_time )
deletion_original_message = Deletion ( message , deletion_time )
await converser_cog . deletion_queue . put ( deletion_message )
await converser_cog . deletion_queue . put ( deletion_original_message )
return
if message . channel . id in converser_cog . awaiting_thread_responses :
resp_message = await message . reply (
embed = discord . Embed (
title = f " This thread is already waiting for a response, please wait and speak afterwards. " ,
color = 0x808080 ,
)
)
try :
await resp_message . channel . trigger_typing ( )
except :
pass
# get the current date, add 10 seconds to it, and then turn it into a timestamp.
# we need to use our deletion service because this isn't an interaction, it's a regular message.
deletion_time = datetime . datetime . now ( ) + datetime . timedelta (
seconds = 5
)
deletion_time = deletion_time . timestamp ( )
deletion_message = Deletion ( resp_message , deletion_time )
deletion_original_message = Deletion ( message , deletion_time )
await converser_cog . deletion_queue . put ( deletion_message )
await converser_cog . deletion_queue . put ( deletion_original_message )
return
converser_cog . awaiting_responses . append ( message . author . id )
converser_cog . awaiting_thread_responses . append ( message . channel . id )
if not converser_cog . pinecone_service :
converser_cog . conversation_threads [
message . channel . id
] . history . append (
EmbeddedConversationItem (
f " \n { message . author . display_name } : { prompt } <|endofstatement|> \n " ,
0 ,
)
)
# increment the conversation counter for the user
converser_cog . conversation_threads [ message . channel . id ] . count + = 1
# Send the request to the model
# If conversing, the prompt to send is the history, otherwise, it's just the prompt
if (
converser_cog . pinecone_service
or message . channel . id not in converser_cog . conversation_threads
) :
primary_prompt = prompt
else :
primary_prompt = " " . join (
[
item . text
for item in converser_cog . conversation_threads [
message . channel . id
] . history
]
)
# set conversation overrides
conversation_overrides = converser_cog . conversation_threads [
message . channel . id
] . get_overrides ( )
overrides = Override (
conversation_overrides [ " temperature " ] ,
conversation_overrides [ " top_p " ] ,
conversation_overrides [ " frequency_penalty " ] ,
conversation_overrides [ " presence_penalty " ] ,
)
# Send an embed that tells the user that the bot is thinking
thinking_embed = discord . Embed (
title = f " 🤖💬 Thinking... " ,
color = 0x808080 ,
)
thinking_embed . set_footer ( text = " This may take a few seconds. " )
thinking_message = await message . reply ( embed = thinking_embed )
try :
await message . channel . trigger_typing ( )
except Exception :
pass
converser_cog . full_conversation_history [ message . channel . id ] . append ( prompt )
if not converser_cog . pinecone_service :
primary_prompt + = BOT_NAME
await TextService . encapsulated_send (
converser_cog ,
message . channel . id ,
primary_prompt ,
message ,
overrides = overrides ,
model = converser_cog . conversation_threads [ message . channel . id ] . model ,
custom_api_key = user_api_key ,
)
# Delete the thinking embed
await thinking_message . delete ( )
return True
@staticmethod
async def get_user_api_key ( user_id , ctx , USER_KEY_DB ) :
user_api_key = None if user_id not in USER_KEY_DB else USER_KEY_DB [ user_id ]
if user_api_key is None or user_api_key == " " :
modal = SetupModal ( user_key_db = USER_KEY_DB )
if isinstance ( ctx , discord . ApplicationContext ) :
await ctx . send_modal ( modal )
await ctx . send_followup (
" You must set up your API key before using this command. "
)
else :
await ctx . reply (
" You must set up your API key before typing in a GPT3 powered channel, type `/setup` to enter your API key. "
)
return user_api_key
@staticmethod
async def process_conversation_edit ( converser_cog , after , original_message ) :
if after . author . id in converser_cog . redo_users :
if after . id == original_message [ after . author . id ] :
response_message = converser_cog . redo_users [ after . author . id ] . response
ctx = converser_cog . redo_users [ after . author . id ] . ctx
await response_message . edit ( content = " Redoing prompt 🔄... " )
edited_content = await converser_cog . mention_to_username (
after , after . content
)
if after . channel . id in converser_cog . conversation_threads :
# Remove the last two elements from the history array and add the new <username>: prompt
converser_cog . conversation_threads [
after . channel . id
] . history = converser_cog . conversation_threads [
after . channel . id
] . history [
: - 2
]
pinecone_dont_reinsert = None
if not converser_cog . pinecone_service :
converser_cog . conversation_threads [
after . channel . id
] . history . append (
EmbeddedConversationItem (
f " \n { after . author . display_name } : { after . content } <|endofstatement|> \n " ,
0 ,
)
)
converser_cog . conversation_threads [ after . channel . id ] . count + = 1
overrides = converser_cog . conversation_threads [
after . channel . id
] . get_overrides ( )
await TextService . encapsulated_send (
converser_cog ,
id = after . channel . id ,
prompt = edited_content ,
ctx = ctx ,
response_message = response_message ,
temp_override = overrides [ " temperature " ] ,
top_p_override = overrides [ " top_p " ] ,
frequency_penalty_override = overrides [ " frequency_penalty " ] ,
presence_penalty_override = overrides [ " presence_penalty " ] ,
model = converser_cog . conversation_threads [ after . channel . id ] . model ,
edited_request = True ,
)
if not converser_cog . pinecone_service :
converser_cog . redo_users [ after . author . id ] . prompt = edited_content
#
# Conversation interaction buttons
#
class ConversationView ( discord . ui . View ) :
def __init__ (
self ,
ctx ,
converser_cog ,
id ,
model ,
from_ask_command = False ,
from_edit_command = False ,
custom_api_key = None ,
) :
super ( ) . __init__ ( timeout = 3600 ) # 1 hour interval to redo.
self . converser_cog = converser_cog
self . ctx = ctx
self . model = model
self . from_ask_command = from_ask_command
self . from_edit_command = from_edit_command
self . custom_api_key = custom_api_key
self . add_item (
RedoButton (
self . converser_cog ,
model = model ,
from_ask_command = from_ask_command ,
from_edit_command = from_edit_command ,
custom_api_key = self . custom_api_key ,
)
)
if id in self . converser_cog . conversation_threads :
self . add_item ( EndConvoButton ( self . converser_cog ) )
async def on_timeout ( self ) :
# Remove the button from the view/message
self . clear_items ( )
# Send a message to the user saying the view has timed out
if self . message :
# check if the timeout happens in a thread and if it's locked
if isinstance ( self . message . channel , discord . Thread ) :
if self . message . channel . locked :
return
await self . message . edit (
view = None ,
)
else :
await self . ctx . edit (
view = None ,
)
class EndConvoButton ( discord . ui . Button [ " ConversationView " ] ) :
def __init__ ( self , converser_cog ) :
super ( ) . __init__ (
style = discord . ButtonStyle . danger ,
label = " End Conversation " ,
custom_id = " conversation_end " ,
)
self . converser_cog = converser_cog
async def callback ( self , interaction : discord . Interaction ) :
# Get the user
user_id = interaction . user . id
if (
user_id in self . converser_cog . conversation_thread_owners
and interaction . channel . id
in self . converser_cog . conversation_thread_owners [ user_id ]
) :
try :
await self . converser_cog . end_conversation (
interaction , opener_user_id = interaction . user . id
)
except Exception as e :
print ( e )
traceback . print_exc ( )
await interaction . response . send_message (
e , ephemeral = True , delete_after = 30
)
else :
await interaction . response . send_message (
" This is not your conversation to end! " , ephemeral = True , delete_after = 10
)
class RedoButton ( discord . ui . Button [ " ConversationView " ] ) :
def __init__ (
self , converser_cog , model , from_ask_command , from_edit_command , custom_api_key
) :
super ( ) . __init__ (
style = discord . ButtonStyle . danger ,
label = " Retry " ,
custom_id = " conversation_redo " ,
)
self . converser_cog = converser_cog
self . model = model
self . from_ask_command = from_ask_command
self . from_edit_command = from_edit_command
self . custom_api_key = custom_api_key
async def callback ( self , interaction : discord . Interaction ) :
# Get the user
user_id = interaction . user . id
if user_id in self . converser_cog . redo_users and self . converser_cog . redo_users [
user_id
] . in_interaction ( interaction . message . id ) :
# Get the message and the prompt and call encapsulated_send
prompt = self . converser_cog . redo_users [ user_id ] . prompt
instruction = self . converser_cog . redo_users [ user_id ] . instruction
ctx = self . converser_cog . redo_users [ user_id ] . ctx
response_message = self . converser_cog . redo_users [ user_id ] . response
codex = self . converser_cog . redo_users [ user_id ] . codex
await interaction . response . send_message (
" Retrying your original request... " , ephemeral = True , delete_after = 15
)
await TextService . encapsulated_send (
self . converser_cog ,
overrides = Override ( None , None , None , None ) ,
id = user_id ,
prompt = prompt ,
instruction = instruction ,
ctx = ctx ,
model = self . model ,
response_message = response_message ,
codex = codex ,
custom_api_key = self . custom_api_key ,
redo_request = True ,
from_ask_command = self . from_ask_command ,
from_edit_command = self . from_edit_command ,
)
else :
await interaction . response . send_message (
" You can only redo the most recent prompt that you sent yourself. " ,
ephemeral = True ,
delete_after = 10 ,
)
#
# The setup modal when using user input API keys
#
class SetupModal ( discord . ui . Modal ) :
def __init__ ( self , user_key_db ) - > None :
super ( ) . __init__ ( title = " API Key Setup " )
# Get the argument named "user_key_db" and save it as USER_KEY_DB
self . USER_KEY_DB = user_key_db
self . add_item (
discord . ui . InputText (
label = " OpenAI API Key " ,
placeholder = " sk--...... " ,
)
)
async def callback ( self , interaction : discord . Interaction ) :
user = interaction . user
api_key = self . children [ 0 ] . value
# Validate that api_key is indeed in this format
if not re . match ( r " sk-[a-zA-Z0-9] {32} " , api_key ) :
await interaction . response . send_message (
" Your API key looks invalid, please check that it is correct before proceeding. Please run the /setup command to set your key. " ,
ephemeral = True ,
delete_after = 100 ,
)
else :
# We can save the key for the user to the database.
# Make a test request using the api key to ensure that it is valid.
try :
await Model . send_test_request ( api_key )
await interaction . response . send_message (
" Your API key was successfully validated. " ,
ephemeral = True ,
delete_after = 10 ,
)
except aiohttp . ClientResponseError as e :
await interaction . response . send_message (
embed = EmbedStatics . get_invalid_api_response_embed ( e ) ,
ephemeral = True ,
delete_after = 30 ,
)
return
except Exception as e :
await interaction . response . send_message (
f " Your API key looks invalid, the API returned: { e } . Please check that your API key is correct before proceeding " ,
ephemeral = True ,
delete_after = 30 ,
)
return
# Save the key to the database
try :
self . USER_KEY_DB [ user . id ] = api_key
self . USER_KEY_DB . commit ( )
await interaction . followup . send (
" Your API key was successfully saved. " ,
ephemeral = True ,
delete_after = 10 ,
)
except Exception :
traceback . print_exc ( )
await interaction . followup . send (
" There was an error saving your API key. " ,
ephemeral = True ,
delete_after = 30 ,
)
return