persist conversations through restarts

Kaveen Kumarasinghe 2 years ago
parent d77f18cb2d
commit f439ef9ce9

@ -1,4 +1,6 @@
import asyncio
import datetime
import pickle
import re
import traceback
import sys
@ -21,6 +23,7 @@ from models.user_model import Thread, EmbeddedConversationItem
from collections import defaultdict
from sqlitedict import SqliteDict
from services.pickle_service import Pickler
from services.sharegpt_service import ShareGPTService
from services.text_service import SetupModal, TextService
@ -80,6 +83,7 @@ class GPT3ComCon(discord.Cog, name="GPT3ComCon"):
DEBUG_CHANNEL,
data_path: Path,
pinecone_service,
pickle_queue,
):
super().__init__()
self.GLOBAL_COOLDOWN_TIME = 0.25
@ -99,6 +103,9 @@ class GPT3ComCon(discord.Cog, name="GPT3ComCon"):
self.users_to_interactions = defaultdict(list)
self.redo_users = {}
# Pickle queue
self.pickle_queue = pickle_queue
# Conversations-specific data
self.END_PROMPTS = [
"end",
@ -113,6 +120,7 @@ class GPT3ComCon(discord.Cog, name="GPT3ComCon"):
self.full_conversation_history = defaultdict(list)
self.summarize = self.model.summarize_conversations
# Pinecone data
self.pinecone_service = pinecone_service
@ -221,6 +229,35 @@ class GPT3ComCon(discord.Cog, name="GPT3ComCon"):
)
print("The debug channel was acquired")
print("Attempting to load from pickles")
# Try to load self.full_conversation_history, self.conversation_threads, and self.conversation_thread_owners from the `pickles` folder
try:
with open(EnvService.save_path() / "pickles" / "full_conversation_history.pickle", "rb") as f:
self.full_conversation_history = pickle.load(f)
print("Loaded full_conversation_history")
with open(EnvService.save_path() / "pickles" / "conversation_threads.pickle", "rb") as f:
self.conversation_threads = pickle.load(f)
print("Loaded conversation_threads")
with open(EnvService.save_path() / "pickles" / "conversation_thread_owners.pickle", "rb") as f:
self.conversation_thread_owners = pickle.load(f)
print("Loaded conversation_thread_owners")
# Fail if all three weren't loaded
assert self.full_conversation_history is not {}
assert self.conversation_threads is not {}
assert self.conversation_thread_owners is not defaultdict(list)
except Exception:
print("Failed to load from pickles")
self.full_conversation_history = defaultdict(list)
self.conversation_threads = {}
self.conversation_thread_owners = defaultdict(list)
traceback.print_exc()
print("Syncing commands...")
await self.bot.sync_commands(
commands=None,
method="individual",
@ -232,6 +269,15 @@ class GPT3ComCon(discord.Cog, name="GPT3ComCon"):
)
print("Commands synced")
# Start an inline async loop that runs every 10 seconds to save the conversation history to a pickle file
print("Starting pickle loop")
while True:
await asyncio.sleep(15)
await self.pickle_queue.put(
Pickler(self.full_conversation_history, self.conversation_threads, self.conversation_thread_owners))
def check_conversing(self, channel_id, message_content):
'''given channel id and a message, return true if it's a conversation thread, false if not, or if the message starts with "~"'''
cond1 = channel_id in self.conversation_threads

@ -21,3 +21,5 @@ Then, name the index `conversation-embeddings`, set the dimensions to `1536`, an
<center><img src="https://i.imgur.com/zoeLsrw.png"/></center>
Permanent memory using pinecone is still in alpha, I will be working on cleaning up this work, adding auto-clearing, and optimizing for stability and reliability, any help and feedback is appreciated (**add me on Discord Kaveen#0001 for pinecone help**)! If at any time you're having too many issues with pinecone, simply remove the `PINECONE_TOKEN` line in your `.env` file and the bot will revert to using conversation summarizations.
Conversations persist even through bot restarts. Bot conversation data is stored locally in a folder called `pickles`. If you find your bot getting slow, delete this folder. A cleaner solution will be implemented in the future.

@ -22,6 +22,7 @@ from cogs.translation_service_cog import TranslationService
from cogs.index_service_cog import IndexService
from models.deepl_model import TranslationModel
from services.health_service import HealthService
from services.pickle_service import Pickler
from services.pinecone_service import PineconeService
from services.deletion_service import Deletion
@ -32,7 +33,7 @@ from services.environment_service import EnvService
from models.openai_model import Model
__version__ = "10.9.17"
__version__ = "11.0.0"
PID_FILE = Path("bot.pid")
@ -76,6 +77,15 @@ deletion_queue = asyncio.Queue()
asyncio.ensure_future(Message.process_message_queue(message_queue, 1.5, 5))
asyncio.ensure_future(Deletion.process_deletion_queue(deletion_queue, 1, 1))
# Pickling service for conversation persistence
try:
Path(EnvService.save_path()/"pickles").mkdir(exist_ok=True)
except Exception:
traceback.print_exc()
print("Could not start pickle service. Conversation history will not be persistent across restarts.")
pickle_queue = asyncio.Queue()
asyncio.ensure_future(Pickler.process_pickle_queue(pickle_queue, 5, 1))
#
# Settings for the bot
@ -131,6 +141,7 @@ async def main():
debug_channel,
data_path,
pinecone_service=pinecone_service,
pickle_queue=pickle_queue,
)
)

@ -823,7 +823,7 @@ class Model:
def cleanse_username(self, text):
text = text.strip()
text = text.replace(":", "_")
text = text.replace(":", "")
text = text.replace(" ", "")
# Replace any character that's not a letter or number with an underscore
text = re.sub(r"[^a-zA-Z0-9]", "_", text)

@ -0,0 +1,45 @@
import asyncio
import pickle
import traceback
from datetime import datetime
import aiofiles
import discord
from services.environment_service import EnvService
class Pickler:
def __init__(self, full_conversation_history, conversation_threads, conversation_thread_owners):
self.full_conversation_history = full_conversation_history
self.conversation_threads = conversation_threads
self.conversation_thread_owners = conversation_thread_owners
# This function will be called by the bot to process the message queue
@staticmethod
async def process_pickle_queue(
pickle_queue, PROCESS_WAIT_TIME, EMPTY_WAIT_TIME
):
while True:
try:
# If the queue is empty, sleep for a short time before checking again
if pickle_queue.empty():
await asyncio.sleep(EMPTY_WAIT_TIME)
continue
# Get the next object to pickle from the queue
to_pickle = await pickle_queue.get()
# Pickle all the objects inside to_pickle using aiofiles
async with aiofiles.open(EnvService.save_path() / "pickles" / "full_conversation_history.pickle", "wb") as f:
await f.write(pickle.dumps(to_pickle.full_conversation_history))
async with aiofiles.open(EnvService.save_path() / "pickles" / "conversation_threads.pickle", "wb") as f:
await f.write(pickle.dumps(to_pickle.conversation_threads))
async with aiofiles.open(EnvService.save_path() / "pickles" / "conversation_thread_owners.pickle", "wb") as f:
await f.write(pickle.dumps(to_pickle.conversation_thread_owners))
await asyncio.sleep(PROCESS_WAIT_TIME)
except Exception:
traceback.print_exc()
Loading…
Cancel
Save