Mem0 源码阅读

Posted on 7月 23, 2024

初始化记忆

Mem0 使用MemoryConfig初始化了一个Memory对象。

MemoryConfig对象包含

  • 向量数据库 qdrant 目前只支持 qdrant
  • LLM
  • 向量模型
  • 历史数据 db Sqlite
  • 默认集合名字 “mem0”
  • 向量数据库大小

⚠️ capture_event("mem0.init", self) 会捕获事件上传到云端进行分析,记得更新key 以免造成隐私泄露

class MemoryConfig(BaseModel):
    vector_store: VectorStoreConfig = Field(
        description="Configuration for the vector store",
        default_factory=VectorStoreConfig,
    )
    llm: LlmConfig = Field(
        description="Configuration for the language model",
        default_factory=LlmConfig,
    )
    embedder: EmbedderConfig = Field(
        description="Configuration for the embedding model",
        default_factory=EmbedderConfig,
    )
    history_db_path: str = Field(
        description="Path to the history database",
        default=os.path.join(mem0_dir, "history.db"),
    )
    collection_name: str = Field(default="mem0", description="Name of the collection")
    embedding_model_dims: int = Field(
        default=1536, description="Dimensions of the embedding model"
    )

class Memory(MemoryBase):
    def __init__(self, config: MemoryConfig = MemoryConfig()):
        self.config = config
        self.embedding_model = EmbedderFactory.create(self.config.embedder.provider)
        # Initialize the appropriate vector store based on the configuration
        vector_store_config = self.config.vector_store.config
        if self.config.vector_store.provider == "qdrant":
            self.vector_store = Qdrant(
                host=vector_store_config.host,
                port=vector_store_config.port,
                path=vector_store_config.path,
                url=vector_store_config.url,
                api_key=vector_store_config.api_key,
            )
        else:
            raise ValueError(
                f"Unsupported vector store type: {self.config.vector_store_type}"
            )

        self.llm = LlmFactory.create(self.config.llm.provider, self.config.llm.config)
        self.db = SQLiteManager(self.config.history_db_path)
        self.collection_name = self.config.collection_name
        self.vector_store.create_col(
            name=self.collection_name, vector_size=self.embedding_model.dims
        )
        self.vector_store.create_col(
            name=self.collection_name, vector_size=self.embedding_model.dims
        )
        capture_event("mem0.init", self)

添加记忆

首先会使用向量模型对记忆内容进行向量化。

然后会对记忆进行提取

这里使用了两段Prompt

SYSTEM Prompt:

You are an expert at deducing facts, preferences and memories from unstructured text.
您是从一个非结构化的文本中推断事实、偏好和记忆的专家。

User Prompt:

Deduce the facts, preferences, and memories from the provided text.
Just return the facts, preferences, and memories in bullet points:
Natural language text: {user_input}
User/Agent details: {metadata}

Constraint for deducing facts, preferences, and memories:
- The facts, preferences, and memories should be concise and informative.
- Don't start by "The person likes Pizza". Instead, start with "Likes Pizza".
- Don't remember the user/agent details provided. Only remember the facts, preferences, and memories.

Deduced facts, preferences, and memories:

从提供的文本中推断出事实、偏好和记忆。
只以项目符号的形式返回事实、偏好和记忆:
自然语言文本:{用户输入}
用户/代理详情:{元数据}
推断事实、偏好和记忆的约束:
- 事实、偏好和记忆应简洁且富有信息。
- 不要以“这个人喜欢比萨”开始。相反,以“喜欢比萨”开始。
- 不要记住提供的用户/代理详情。只记住事实、偏好和记忆。
推断出的事实、偏好和记忆:
def add(
        self,
        data,
        user_id=None,
        agent_id=None,
        run_id=None,
        metadata=None,
        filters=None,
        prompt=None,
    ):
        """
        Create a new memory.

        Args:
            data (str): Data to store in the memory.
            user_id (str, optional): ID of the user creating the memory. Defaults to None.
            agent_id (str, optional): ID of the agent creating the memory. Defaults to None.
            run_id (str, optional): ID of the run creating the memory. Defaults to None.
            metadata (dict, optional): Metadata to store with the memory. Defaults to None.
            filters (dict, optional): Filters to apply to the search. Defaults to None.

        Returns:
            str: ID of the created memory.
        """
        if metadata is None:
            metadata = {}
        embeddings = self.embedding_model.embed(data)

        filters = filters or {}
        if user_id:
            filters["user_id"] = metadata["user_id"] = user_id
        if agent_id:
            filters["agent_id"] = metadata["agent_id"] = agent_id
        if run_id:
            filters["run_id"] = metadata["run_id"] = run_id

        if not prompt:
            prompt = MEMORY_DEDUCTION_PROMPT.format(user_input=data, metadata=metadata)
        extracted_memories = self.llm.generate_response(
            messages=[
                {
                    "role": "system",
                    "content": "You are an expert at deducing facts, preferences and memories from unstructured text.",
                },
                {"role": "user", "content": prompt},
            ]
        )

然后通过向量数据库检索要添加记忆的相关记忆,并进行相应的更新。

        # 检索需要添加的记忆
        existing_memories = self.vector_store.search(
            name=self.collection_name,
            query=embeddings,
            limit=5,
            filters=filters,
        )
        existing_memories = [
            MemoryItem(
                id=mem.id,
                score=mem.score,
                metadata=mem.payload,
                text=mem.payload["data"],
            )
            for mem in existing_memories
        ]
        serialized_existing_memories = [
            item.model_dump(include={"id", "text", "score"})
            for item in existing_memories
        ]
        logging.info(f"Total existing memories: {len(existing_memories)}")
        messages = get_update_memory_messages(
            serialized_existing_memories, extracted_memories
        )

get_update_memory_messages 这里使用了一段 User Prompt 将新的记忆和已有记忆的消息组合到更新记忆的prompt中

You are an expert at merging, updating, and organizing memories. When provided with existing memories and new information, your task is to merge and update the memory list to reflect the most accurate and current information. You are also provided with the matching score for each existing memory to the new information. Make sure to leverage this information to make informed decisions about which memories to update or merge.

Guidelines:
- Eliminate duplicate memories and merge related memories to ensure a concise and updated list.
- If a memory is directly contradicted by new information, critically evaluate both pieces of information:
    - If the new memory provides a more recent or accurate update, replace the old memory with new one.
    - If the new memory seems inaccurate or less detailed, retain the original and discard the old one.
- Maintain a consistent and clear style throughout all memories, ensuring each entry is concise yet informative.
- If the new memory is a variation or extension of an existing memory, update the existing memory to reflect the new information.

Here are the details of the task:
- Existing Memories:
{existing_memories}

- New Memory: {memory}

你是合并、更新和组织记忆的专家。当提供现有记忆和新信息时,你的任务是合并和更新记忆列表,以反映最准确和最新的信息。你还被提供了每个现有记忆与新信息的匹配分数。确保利用这些信息做出明智的决策,决定哪些记忆需要更新或合并。
指导方针:
- 消除重复的记忆,合并相关的记忆,确保列表简洁且更新。
- 如果一个记忆被新信息直接矛盾,仔细评估这两条信息:
- 如果新记忆提供了更近期或更准确的更新,用新记忆替换旧记忆。
- 如果新记忆看起来不准确或细节较少,保留原始记忆并丢弃旧记忆。
- 确保所有记忆保持一致且清晰的风格,确保每个条目简洁但信息丰富。
- 如果新记忆是现有记忆的变化或扩展,更新现有记忆以反映新信息。
以下是任务的详细信息:
- 现有记忆:
{existing_memories}
- 新记忆:{memory}

然后添加了三个工具供LLM调用分别是 ADD_MEMORY_TOOL, UPDATE_MEMORY_TOOL, DELETE_MEMORY_TOOL 添加 更新 删除 记忆 由LLM决策需要的动作

        # Add tools for noop, add, update, delete memory.
        tools = [ADD_MEMORY_TOOL, UPDATE_MEMORY_TOOL, DELETE_MEMORY_TOOL]
        response = self.llm.generate_response(messages=messages, tools=tools)
        tool_calls = response["tool_calls"]

        response = []
        if tool_calls:
            # Create a new memory
            available_functions = {
                "add_memory": self._create_memory_tool,
                "update_memory": self._update_memory_tool,
                "delete_memory": self._delete_memory_tool,
            }
            for tool_call in tool_calls:
                function_name = tool_call["name"]
                function_to_call = available_functions[function_name]
                function_args = tool_call["arguments"]
                logging.info(
                    f"[openai_func] func: {function_name}, args: {function_args}"
                )

                # Pass metadata to the function if it requires it
                if function_name in ["add_memory", "update_memory"]:
                    function_args["metadata"] = metadata

                function_result = function_to_call(**function_args)
                # Fetch the memory_id from the response
                response.append(
                    {
                        "id": function_result,
                        "event": function_name.replace("_memory", ""),
                        "data": function_args.get("data"),
                    }
                )
                capture_event(
                    "mem0.add.function_call",
                    self,
                    {"memory_id": function_result, "function_name": function_name},
                )
        capture_event("mem0.add", self)
        return response

添加记忆工具

会将新的记忆添加到现有向量数据库中。并在sqlite中添加记忆更新事件

    def _create_memory_tool(self, data, metadata=None):
        logging.info(f"Creating memory with {data=}")
        embeddings = self.embedding_model.embed(data)
        memory_id = str(uuid.uuid4())
        metadata = metadata or {}
        metadata["data"] = data
        metadata["created_at"] = int(time.time())

        self.vector_store.insert(
            name=self.collection_name,
            vectors=[embeddings],
            ids=[memory_id],
            payloads=[metadata],
        )
        self.db.add_history(memory_id, None, data, "add")
        return memory_id

更新记忆工具

更新记忆工具首先会通过记忆id取出原有的记忆,然后将新的记忆更新到原记忆id的向量数据库的记录中。 并在sqlite中添加记忆更新事件

    def _update_memory_tool(self, memory_id, data, metadata=None):
        existing_memory = self.vector_store.get(
            name=self.collection_name, vector_id=memory_id
        )
        prev_value = existing_memory.payload.get("data")

        new_metadata = metadata or {}
        new_metadata["data"] = data
        new_metadata["updated_at"] = int(time.time())
        embeddings = self.embedding_model.embed(data)
        self.vector_store.update(
            name=self.collection_name,
            vector_id=memory_id,
            vector=embeddings,
            payload=new_metadata,
        )
        logging.info(f"Updating memory with ID {memory_id=} with {data=}")
        self.db.add_history(memory_id, prev_value, data, "update")

删除记忆工具

删除记忆工具首先会通过记忆id取出原有的记忆,然后将原记忆从向量数据库中删除。 并在sqlite中添加记忆更新事件

    def _delete_memory_tool(self, memory_id):
        logging.info(f"Deleting memory with {memory_id=}")
        existing_memory = self.vector_store.get(
            name=self.collection_name, vector_id=memory_id
        )
        prev_value = existing_memory.payload["data"]
        self.vector_store.delete(name=self.collection_name, vector_id=memory_id)
        self.db.add_history(memory_id, prev_value, None, "delete", is_deleted=1)

列出所有记忆

从向量数据库中取出所有记忆 可以通过 用户id agentid 执行id 进行过滤

    def get_all(self, user_id=None, agent_id=None, run_id=None, limit=100):
        """
        List all memories.

        Returns:
            list: List of all memories.
        """
        filters = {}
        if user_id:
            filters["user_id"] = user_id
        if agent_id:
            filters["agent_id"] = agent_id
        if run_id:
            filters["run_id"] = run_id

        capture_event("mem0.get_all", self, {"filters": len(filters), "limit": limit})
        memories = self.vector_store.list(
            name=self.collection_name, filters=filters, limit=limit
        )
        return [
            MemoryItem(
                id=mem.id,
                metadata=mem.payload,
                text=mem.payload["data"],
            ).model_dump(exclude={"score"})
            for mem in memories[0]
        ]

搜索记忆

会根据检索的内容在向量数据库中搜索相关记忆

    def search(
        self, query, user_id=None, agent_id=None, run_id=None, limit=100, filters=None
    ):
        """
        Search for memories.

        Args:
            query (str): Query to search for.
            user_id (str, optional): ID of the user to search for. Defaults to None.
            agent_id (str, optional): ID of the agent to search for. Defaults to None.
            run_id (str, optional): ID of the run to search for. Defaults to None.
            limit (int, optional): Limit the number of results. Defaults to 100.
            filters (dict, optional): Filters to apply to the search. Defaults to None.

        Returns:
            list: List of search results.
        """
        filters = filters or {}
        if user_id:
            filters["user_id"] = user_id
        if agent_id:
            filters["agent_id"] = agent_id
        if run_id:
            filters["run_id"] = run_id

        capture_event("mem0.search", self, {"filters": len(filters), "limit": limit})
        embeddings = self.embedding_model.embed(query)
        memories = self.vector_store.search(
            name=self.collection_name, query=embeddings, limit=limit, filters=filters
        )
        return [
            MemoryItem(
                id=mem.id,
                metadata=mem.payload,
                score=mem.score,
                text=mem.payload["data"],
            ).model_dump()
            for mem in memories
        ]

更新记忆

可以直接通过记忆id更新记忆

    def update(self, memory_id, data):
        """
        Update a memory by ID.

        Args:
            memory_id (str): ID of the memory to update.
            data (dict): Data to update the memory with.

        Returns:
            dict: Updated memory.
        """
        capture_event("mem0.get_all", self, {"memory_id": memory_id})
        self._update_memory_tool(memory_id, data)

获取记忆历史

通过记忆id在sqlite中获取记忆的变更历史

    def history(self, memory_id):
        """
        Get the history of changes for a memory by ID.

        Args:
            memory_id (str): ID of the memory to get history for.

        Returns:
            list: List of changes for the memory.
        """
        capture_event("mem0.history", self, {"memory_id": memory_id})
        return self.db.get_history(memory_id)

使用例子

  • 初始化:PersonalTravelAssistant使用 OpenAI 客户端和 Mem0 内存设置初始化该类。
  • 提问:该ask_question方法向人工智能发送问题,结合以前的记忆,并存储新信息。
  • 内存管理:get_memories和search_memories方法处理存储的记忆的检索和搜索。

在对话过程中会先检索问题相关的记忆,如果存在相关记忆则组合Prompt向人工智能提出问题​

组合Prompt

User input: {question}\n Previous memories: {previous_memories}
用户输入:{问题} 之前的记忆:{之前的记忆}

再将问题添加向量数据库中 这里只是添加了User 问题的部分 ​

import os
from openai import OpenAI
from mem0 import Memory

# Set the OpenAI API key
os.environ['OPENAI_API_KEY'] = 'sk-xxx'

class PersonalTravelAssistant:
    def __init__(self):
        self.client = OpenAI()
        self.memory = Memory()
        self.messages = [{"role": "system", "content": "You are a personal AI Assistant."}]

    def ask_question(self, question, user_id):
        # Fetch previous related memories
        previous_memories = self.search_memories(question, user_id=user_id)
        prompt = question
        if previous_memories:
            prompt = f"User input: {question}\n Previous memories: {previous_memories}"
        self.messages.append({"role": "user", "content": prompt})

        # Generate response using GPT-4o
        response = self.client.chat.completions.create(
            model="gpt-4o",
            messages=self.messages
        )
        answer = response.choices[0].message.content
        self.messages.append({"role": "assistant", "content": answer})

        # Store the question in memory
        self.memory.add(question, user_id=user_id)
        return answer

    def get_memories(self, user_id):
        memories = self.memory.get_all(user_id=user_id)
        return [m['text'] for m in memories]

    def search_memories(self, query, user_id):
        memories = self.memory.search(query, user_id=user_id)
        return [m['text'] for m in memories]

# Usage example
user_id = "traveler_123"
ai_assistant = PersonalTravelAssistant()

def main():
    while True:
        question = input("Question: ")
        if question.lower() in ['q', 'exit']:
            print("Exiting...")
            break

        answer = ai_assistant.ask_question(question, user_id=user_id)
        print(f"Answer: {answer}")
        memories = ai_assistant.get_memories(user_id=user_id)
        print("Memories:")
        for memory in memories:
            print(f"- {memory}")
        print("-----")

if __name__ == "__main__":
    main()

总体来说还是比较简单的算是压缩上下文的一种方式吧,可以预见的弊端有 :

如果想通过响应的内容中的数据 目前是不支持的 比如llm任务下发后获得的异步任务id 我想通过任务id查询任务结果怎么办?

这个可以通过添加工具返回来解决

这样有引发了新的问题 应该使用哪一次的任务id来做呢? 这个可以通过过滤同类型的记忆选择最新的来完成 但是我如果想查询历史的任务呢 ?

当然可以通过 在记录中添加时间维度 让llm在没有时间的要求下默认选择最新的 有时间要求的话再通过要求选择。

当然我也不能如此渴求一个新的项目解决我所有的问题 毕竟现在还算是MVP版本 不过已经赢得了 16K 的Start 看来LLM的记忆管理还是有一定的挑战

这个东西让我想起langchina中有一个类似的记忆实现 Entity Memory 会通过提取实体的相关信息作为记忆 之前看过部分源码有空的话重新写篇笔记吧

附录

参考

https://github.com/mem0ai/mem0

https://docs.mem0.ai/examples/personal-ai-tutor

版权信息

本文原载于 not only security,复制请保留原文出处。

comments powered by Disqus