Skip to content

Commit b5ea7e6

Browse files
authored
feat: introduce async memory add for TreeTextMemory using MemScheduler (#373)
* feat: define mem-read schedular message&consumer; add async mem-reader mode in core; * feat: add fast/fine mode in mem-reader; * feat: add mem-reader in scheduler * feat: change async remove * feat: modify async-add in core.py * feat: add 'remove and refresh memory in schedular' * feat: add naive fast mode in mem-reader * feat: finish fast mode in mem-reader * feat: add token-based window splitting and concurrency improvements * feat: add split chunker into mode in simple struct mem reader * feat: update async-mode add * chore: update gitignore * feat: improve database note write performance * feat: fix mem-read scheduler * fix: nebula group-by bug * fix: bug in adding mem scheduler * fix: nebula index; mem-reader chat-time; * format: searcher * fix: some bug in shceduler and mem-reader * feat: add mem-organize in scheduler * feat: add tree.mode to config; modify scheduler config * fix: test bug
1 parent d9f863e commit b5ea7e6

File tree

22 files changed

+1241
-176
lines changed

22 files changed

+1241
-176
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ evaluation/.env
1515
!evaluation/configs-example/*.json
1616
evaluation/configs/*
1717
**tree_textual_memory_locomo**
18+
**script.py**
1819
.env
1920
evaluation/scripts/personamem
2021

examples/mem_reader/reader.py

Lines changed: 381 additions & 19 deletions
Large diffs are not rendered by default.

src/memos/chunkers/sentence_chunker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def __init__(self, config: SentenceChunkerConfig):
2828
)
2929
logger.info(f"Initialized SentenceChunker with config: {config}")
3030

31-
def chunk(self, text: str) -> list[Chunk]:
31+
def chunk(self, text: str) -> list[str] | list[Chunk]:
3232
"""Chunk the given text into smaller chunks based on sentences."""
3333
chonkie_chunks = self.chunker.chunk(text)
3434

src/memos/configs/mem_scheduler.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,11 @@ class BaseSchedulerConfig(BaseConfig):
2828
thread_pool_max_workers: int = Field(
2929
default=DEFAULT_THREAD_POOL_MAX_WORKERS,
3030
gt=1,
31-
lt=20,
3231
description=f"Maximum worker threads in pool (default: {DEFAULT_THREAD_POOL_MAX_WORKERS})",
3332
)
3433
consume_interval_seconds: float = Field(
3534
default=DEFAULT_CONSUME_INTERVAL_SECONDS,
3635
gt=0,
37-
le=60,
3836
description=f"Interval for consuming messages from queue in seconds (default: {DEFAULT_CONSUME_INTERVAL_SECONDS})",
3937
)
4038
auth_config_path: str | None = Field(

src/memos/configs/memory.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,11 @@ class TreeTextMemoryConfig(BaseTextMemoryConfig):
179179
),
180180
)
181181

182+
mode: str | None = Field(
183+
default="sync",
184+
description=("whether use asynchronous mode in memory add"),
185+
)
186+
182187

183188
class SimpleTreeTextMemoryConfig(TreeTextMemoryConfig):
184189
"""Simple tree text memory configuration class."""

src/memos/graph_dbs/nebular.py

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -440,20 +440,22 @@ def remove_oldest_memory(
440440
memory_type (str): Memory type (e.g., 'WorkingMemory', 'LongTermMemory').
441441
keep_latest (int): Number of latest WorkingMemory entries to keep.
442442
"""
443-
optional_condition = ""
444-
445-
user_name = user_name if user_name else self.config.user_name
446-
447-
optional_condition = f"AND n.user_name = '{user_name}'"
448-
query = f"""
449-
MATCH (n@Memory /*+ INDEX(idx_memory_user_name) */)
450-
WHERE n.memory_type = '{memory_type}'
451-
{optional_condition}
452-
ORDER BY n.updated_at DESC
453-
OFFSET {int(keep_latest)}
454-
DETACH DELETE n
455-
"""
456-
self.execute_query(query)
443+
try:
444+
user_name = user_name if user_name else self.config.user_name
445+
optional_condition = f"AND n.user_name = '{user_name}'"
446+
count = self.count_nodes(memory_type, user_name)
447+
if count > keep_latest:
448+
delete_query = f"""
449+
MATCH (n@Memory /*+ INDEX(idx_memory_user_name) */)
450+
WHERE n.memory_type = '{memory_type}'
451+
{optional_condition}
452+
ORDER BY n.updated_at DESC
453+
OFFSET {int(keep_latest)}
454+
DETACH DELETE n
455+
"""
456+
self.execute_query(delete_query)
457+
except Exception as e:
458+
logger.warning(f"Delete old mem error: {e}")
457459

458460
@timed
459461
def add_node(
@@ -1175,7 +1177,6 @@ def get_grouped_counts(
11751177
MATCH (n /*+ INDEX(idx_memory_user_name) */)
11761178
{where_clause}
11771179
RETURN {", ".join(return_fields)}, COUNT(n) AS count
1178-
GROUP BY {", ".join(group_by_fields)}
11791180
"""
11801181
result = self.execute_query(gql) # Pure GQL string execution
11811182

@@ -1620,7 +1621,13 @@ def _create_basic_property_indexes(self) -> None:
16201621
Create standard B-tree indexes on user_name when use Shared Database
16211622
Multi-Tenant Mode.
16221623
"""
1623-
fields = ["status", "memory_type", "created_at", "updated_at", "user_name"]
1624+
fields = [
1625+
"status",
1626+
"memory_type",
1627+
"created_at",
1628+
"updated_at",
1629+
"user_name",
1630+
]
16241631

16251632
for field in fields:
16261633
index_name = f"idx_memory_{field}"

src/memos/graph_dbs/neo4j.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ def search_by_embedding(
669669
vector (list[float]): The embedding vector representing query semantics.
670670
top_k (int): Number of top similar nodes to retrieve.
671671
scope (str, optional): Memory type filter (e.g., 'WorkingMemory', 'LongTermMemory').
672-
status (str, optional): Node status filter (e.g., 'active', 'archived').
672+
status (str, optional): Node status filter (e.g., 'activated', 'archived').
673673
If provided, restricts results to nodes with matching status.
674674
threshold (float, optional): Minimum similarity score threshold (0 ~ 1).
675675
search_filter (dict, optional): Additional metadata filters for search results.

src/memos/mem_os/core.py

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from memos.mem_scheduler.schemas.general_schemas import (
1818
ADD_LABEL,
1919
ANSWER_LABEL,
20+
MEM_READ_LABEL,
2021
QUERY_LABEL,
2122
)
2223
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
@@ -70,6 +71,7 @@ def __init__(self, config: MOSConfig, user_manager: UserManager | None = None):
7071
if self.enable_mem_scheduler:
7172
self._mem_scheduler = self._initialize_mem_scheduler()
7273
self._mem_scheduler.mem_cubes = self.mem_cubes
74+
self._mem_scheduler.mem_reader = self.mem_reader
7375
else:
7476
self._mem_scheduler: GeneralScheduler = None
7577

@@ -681,6 +683,12 @@ def add(
681683
logger.info(
682684
f"time add: get mem_cube_id check in mem_cubes time user_id: {target_user_id} time is: {time.time() - time_start_0}"
683685
)
686+
sync_mode = self.mem_cubes[mem_cube_id].text_mem.mode
687+
if sync_mode == "async":
688+
assert self.mem_scheduler is not None, (
689+
"Mem-Scheduler must be working when use asynchronous memory adding."
690+
)
691+
logger.debug(f"Mem-reader mode is: {sync_mode}")
684692
time_start_1 = time.time()
685693
if (
686694
(messages is not None)
@@ -690,6 +698,7 @@ def add(
690698
logger.info(
691699
f"time add: messages is not None and enable_textual_memory and text_mem is not None time user_id: {target_user_id} time is: {time.time() - time_start_1}"
692700
)
701+
693702
if self.mem_cubes[mem_cube_id].config.text_mem.backend != "tree_text":
694703
add_memory = []
695704
metadata = TextualMemoryMetadata(
@@ -707,21 +716,30 @@ def add(
707716
messages_list,
708717
type="chat",
709718
info={"user_id": target_user_id, "session_id": target_session_id},
719+
mode="fast" if sync_mode == "async" else "fine",
710720
)
711721
logger.info(
712722
f"time add: get mem_reader time user_id: {target_user_id} time is: {time.time() - time_start_2}"
713723
)
714-
mem_ids = []
715-
for mem in memories:
716-
mem_id_list: list[str] = self.mem_cubes[mem_cube_id].text_mem.add(mem)
717-
mem_ids.extend(mem_id_list)
718-
logger.info(
719-
f"Added memory user {target_user_id} to memcube {mem_cube_id}: {mem_id_list}"
720-
)
721-
724+
memories_flatten = [m for m_list in memories for m in m_list]
725+
mem_ids: list[str] = self.mem_cubes[mem_cube_id].text_mem.add(memories_flatten)
726+
logger.info(
727+
f"Added memory user {target_user_id} to memcube {mem_cube_id}: {mem_ids}"
728+
)
722729
# submit messages for scheduler
723730
if self.enable_mem_scheduler and self.mem_scheduler is not None:
724731
mem_cube = self.mem_cubes[mem_cube_id]
732+
if sync_mode == "async":
733+
message_item = ScheduleMessageItem(
734+
user_id=target_user_id,
735+
mem_cube_id=mem_cube_id,
736+
mem_cube=mem_cube,
737+
label=MEM_READ_LABEL,
738+
content=json.dumps(mem_ids),
739+
timestamp=datetime.utcnow(),
740+
)
741+
self.mem_scheduler.submit_messages(messages=[message_item])
742+
725743
message_item = ScheduleMessageItem(
726744
user_id=target_user_id,
727745
mem_cube_id=mem_cube_id,
@@ -749,10 +767,12 @@ def add(
749767
messages_list = [
750768
[{"role": "user", "content": memory_content}]
751769
] # for only user-str input and convert message
770+
752771
memories = self.mem_reader.get_memory(
753772
messages_list,
754773
type="chat",
755774
info={"user_id": target_user_id, "session_id": target_session_id},
775+
mode="fast" if sync_mode == "async" else "fine",
756776
)
757777

758778
mem_ids = []
@@ -766,6 +786,16 @@ def add(
766786
# submit messages for scheduler
767787
if self.enable_mem_scheduler and self.mem_scheduler is not None:
768788
mem_cube = self.mem_cubes[mem_cube_id]
789+
if sync_mode == "async":
790+
message_item = ScheduleMessageItem(
791+
user_id=target_user_id,
792+
mem_cube_id=mem_cube_id,
793+
mem_cube=mem_cube,
794+
label=MEM_READ_LABEL,
795+
content=json.dumps(mem_ids),
796+
timestamp=datetime.utcnow(),
797+
)
798+
self.mem_scheduler.submit_messages(messages=[message_item])
769799
message_item = ScheduleMessageItem(
770800
user_id=target_user_id,
771801
mem_cube_id=mem_cube_id,

src/memos/mem_reader/base.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,17 @@ def get_scene_data_info(self, scene_data: list, type: str) -> list[str]:
1818

1919
@abstractmethod
2020
def get_memory(
21-
self, scene_data: list, type: str, info: dict[str, Any]
21+
self, scene_data: list, type: str, info: dict[str, Any], mode: str = "fast"
2222
) -> list[list[TextualMemoryItem]]:
2323
"""Various types of memories extracted from scene_data"""
2424

2525
@abstractmethod
2626
def transform_memreader(self, data: dict) -> list[TextualMemoryItem]:
2727
"""Transform the memory data into a list of TextualMemoryItem objects."""
28+
29+
@abstractmethod
30+
def fine_transfer_simple_mem(
31+
self, input_memories: list[list[TextualMemoryItem]], type: str
32+
) -> list[list[TextualMemoryItem]]:
33+
"""Fine Transform TextualMemoryItem List into another list of
34+
TextualMemoryItem objects via calling llm to better understand users."""

0 commit comments

Comments
 (0)