Skip to content

Commit b16b494

Browse files
committed
fix: resolve scheduler module import and Redis integration issues
1 parent f54612e commit b16b494

File tree

4 files changed

+357
-46
lines changed

4 files changed

+357
-46
lines changed

src/memos/api/routers/server_router.py

Lines changed: 125 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import os
23
import traceback
34

@@ -29,7 +30,12 @@
2930
from memos.mem_reader.factory import MemReaderFactory
3031
from memos.mem_scheduler.orm_modules.base_model import BaseDBManager
3132
from memos.mem_scheduler.scheduler_factory import SchedulerFactory
32-
from memos.mem_scheduler.schemas.general_schemas import SearchMode
33+
from memos.mem_scheduler.schemas.general_schemas import (
34+
API_MIX_SEARCH_LABEL,
35+
SearchMode,
36+
)
37+
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
38+
from memos.mem_scheduler.utils.db_utils import get_utc_now
3339
from memos.memories.textual.tree_text_memory.organize.manager import MemoryManager
3440
from memos.memories.textual.tree_text_memory.retrieve.internet_retriever_factory import (
3541
InternetRetrieverFactory,
@@ -101,6 +107,21 @@ def _get_default_memory_size(cube_config) -> dict[str, int]:
101107
}
102108

103109

110+
def _create_naive_mem_cube() -> NaiveMemCube:
111+
"""Create a NaiveMemCube instance with initialized components."""
112+
naive_mem_cube = NaiveMemCube(
113+
llm=llm,
114+
embedder=embedder,
115+
mem_reader=mem_reader,
116+
graph_db=graph_db,
117+
reranker=reranker,
118+
internet_retriever=internet_retriever,
119+
memory_manager=memory_manager,
120+
default_cube_config=default_cube_config,
121+
)
122+
return naive_mem_cube
123+
124+
104125
def init_server():
105126
"""Initialize server components and configurations."""
106127
# Get default cube configuration
@@ -152,6 +173,10 @@ def init_server():
152173
)
153174
mem_scheduler.start()
154175

176+
# Initialize SchedulerAPIModule
177+
api_module = mem_scheduler.api_module
178+
179+
naive_mem_cube = _create_naive_mem_cube()
155180
return (
156181
graph_db,
157182
mem_reader,
@@ -163,6 +188,8 @@ def init_server():
163188
default_cube_config,
164189
mos_server,
165190
mem_scheduler,
191+
naive_mem_cube,
192+
api_module,
166193
)
167194

168195

@@ -178,24 +205,11 @@ def init_server():
178205
default_cube_config,
179206
mos_server,
180207
mem_scheduler,
208+
naive_mem_cube,
209+
api_module,
181210
) = init_server()
182211

183212

184-
def _create_naive_mem_cube() -> NaiveMemCube:
185-
"""Create a NaiveMemCube instance with initialized components."""
186-
naive_mem_cube = NaiveMemCube(
187-
llm=llm,
188-
embedder=embedder,
189-
mem_reader=mem_reader,
190-
graph_db=graph_db,
191-
reranker=reranker,
192-
internet_retriever=internet_retriever,
193-
memory_manager=memory_manager,
194-
default_cube_config=default_cube_config,
195-
)
196-
return naive_mem_cube
197-
198-
199213
def _format_memory_item(memory_data: Any) -> dict[str, Any]:
200214
"""Format a single memory item for API response."""
201215
memory = memory_data.model_dump()
@@ -257,30 +271,99 @@ def mix_search_memories(
257271
search_req: APISearchRequest,
258272
user_context: UserContext,
259273
):
260-
target_session_id = search_req.session_id
261-
if not target_session_id:
262-
target_session_id = "default_session"
263-
search_filter = {"session_id": search_req.session_id} if search_req.session_id else None
264-
265-
# Create MemCube and perform search
266-
naive_mem_cube = _create_naive_mem_cube()
267-
search_results = naive_mem_cube.text_mem.search(
268-
query=search_req.query,
269-
user_name=user_context.mem_cube_id,
270-
top_k=search_req.top_k,
271-
mode=search_req.mode,
272-
manual_close_internet=not search_req.internet_search,
273-
moscube=search_req.moscube,
274-
search_filter=search_filter,
275-
info={
276-
"user_id": search_req.user_id,
277-
"session_id": target_session_id,
278-
"chat_history": search_req.chat_history,
279-
},
280-
)
281-
formatted_memories = [_format_memory_item(data) for data in search_results]
282-
283-
return formatted_memories
274+
"""
275+
Mix search memories: fast search + async fine search
276+
"""
277+
# Get fast memories first
278+
fast_memories = fast_search_memories(search_req, user_context)
279+
280+
# Check if scheduler and dispatcher are available for async execution
281+
if mem_scheduler and hasattr(mem_scheduler, "dispatcher") and mem_scheduler.dispatcher:
282+
try:
283+
# Create message for async fine search
284+
message_content = {
285+
"search_req": {
286+
"query": search_req.query,
287+
"user_id": search_req.user_id,
288+
"session_id": search_req.session_id,
289+
"top_k": search_req.top_k,
290+
"internet_search": search_req.internet_search,
291+
"moscube": search_req.moscube,
292+
"chat_history": search_req.chat_history,
293+
},
294+
"user_context": {"mem_cube_id": user_context.mem_cube_id},
295+
}
296+
297+
message = ScheduleMessageItem(
298+
item_id=f"mix_search_{search_req.user_id}_{get_utc_now().timestamp()}",
299+
user_id=search_req.user_id,
300+
mem_cube_id=user_context.mem_cube_id,
301+
label=API_MIX_SEARCH_LABEL,
302+
mem_cube=naive_mem_cube,
303+
content=json.dumps(message_content),
304+
timestamp=get_utc_now(),
305+
)
306+
307+
# Submit async task
308+
mem_scheduler.dispatcher.submit_message(message)
309+
logger.info(f"Submitted async fine search task for user {search_req.user_id}")
310+
311+
# Try to get pre-computed fine memories if available
312+
try:
313+
pre_fine_memories = api_module.get_pre_fine_memories(
314+
user_id=search_req.user_id, mem_cube_id=user_context.mem_cube_id
315+
)
316+
if pre_fine_memories:
317+
# Merge fast and pre-computed fine memories
318+
all_memories = fast_memories + pre_fine_memories
319+
# Remove duplicates based on content
320+
seen_contents = set()
321+
unique_memories = []
322+
for memory in all_memories:
323+
content_key = memory.get("content", "")
324+
if content_key not in seen_contents:
325+
seen_contents.add(content_key)
326+
unique_memories.append(memory)
327+
return unique_memories
328+
except Exception as e:
329+
logger.warning(f"Failed to get pre-computed fine memories: {e}")
330+
331+
except Exception as e:
332+
logger.error(f"Failed to submit async fine search task: {e}")
333+
# Fall back to synchronous execution
334+
335+
# Fallback: synchronous fine search
336+
try:
337+
fine_memories = fine_search_memories(search_req, user_context)
338+
339+
# Merge fast and fine memories
340+
all_memories = fast_memories + fine_memories
341+
342+
# Remove duplicates based on content
343+
seen_contents = set()
344+
unique_memories = []
345+
for memory in all_memories:
346+
content_key = memory.get("content", "")
347+
if content_key not in seen_contents:
348+
seen_contents.add(content_key)
349+
unique_memories.append(memory)
350+
351+
# Sync search data to Redis
352+
try:
353+
api_module.sync_search_data(
354+
user_id=search_req.user_id,
355+
mem_cube_id=user_context.mem_cube_id,
356+
query=search_req.query,
357+
formatted_memories=unique_memories,
358+
)
359+
except Exception as e:
360+
logger.error(f"Failed to sync search data: {e}")
361+
362+
return unique_memories
363+
364+
except Exception as e:
365+
logger.error(f"Fine search failed: {e}")
366+
return fast_memories
284367

285368

286369
def fine_search_memories(
@@ -293,12 +376,11 @@ def fine_search_memories(
293376
search_filter = {"session_id": search_req.session_id} if search_req.session_id else None
294377

295378
# Create MemCube and perform search
296-
naive_mem_cube = _create_naive_mem_cube()
297379
search_results = naive_mem_cube.text_mem.search(
298380
query=search_req.query,
299381
user_name=user_context.mem_cube_id,
300382
top_k=search_req.top_k,
301-
mode=search_req.mode,
383+
mode=SearchMode.FINE,
302384
manual_close_internet=not search_req.internet_search,
303385
moscube=search_req.moscube,
304386
search_filter=search_filter,
@@ -323,12 +405,11 @@ def fast_search_memories(
323405
search_filter = {"session_id": search_req.session_id} if search_req.session_id else None
324406

325407
# Create MemCube and perform search
326-
naive_mem_cube = _create_naive_mem_cube()
327408
search_results = naive_mem_cube.text_mem.search(
328409
query=search_req.query,
329410
user_name=user_context.mem_cube_id,
330411
top_k=search_req.top_k,
331-
mode=search_req.mode,
412+
mode=SearchMode.FAST,
332413
manual_close_internet=not search_req.internet_search,
333414
moscube=search_req.moscube,
334415
search_filter=search_filter,
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import threading
2+
3+
from typing import Any
4+
5+
from memos.log import get_logger
6+
from memos.mem_scheduler.general_modules.base import BaseSchedulerModule
7+
from memos.mem_scheduler.orm_modules.redis_model import RedisDBManager, SimpleListManager
8+
9+
10+
logger = get_logger(__name__)
11+
12+
13+
class SchedulerAPIModule(BaseSchedulerModule):
14+
def __init__(self):
15+
super().__init__()
16+
17+
self.search_history_managers: dict[str, RedisDBManager] = {}
18+
19+
def get_search_history_manager(self, user_id: str, mem_cube_id: str) -> RedisDBManager:
20+
"""Get or create a Redis manager for search history."""
21+
key = f"search_history:{user_id}:{mem_cube_id}"
22+
if key not in self.search_history_managers:
23+
self.search_history_managers[key] = RedisDBManager(
24+
user_id=user_id, mem_cube_id=mem_cube_id
25+
)
26+
return self.search_history_managers[key]
27+
28+
def sync_search_data(
29+
self, user_id: str, mem_cube_id: str, query: str, formatted_memories: Any
30+
) -> None:
31+
"""
32+
Sync search data to Redis, maintaining a list of size 5.
33+
34+
Args:
35+
user_id: User identifier
36+
mem_cube_id: Memory cube identifier
37+
query: Search query string
38+
formatted_memories: Formatted search results
39+
"""
40+
try:
41+
# Get the search history manager
42+
manager = self.get_search_history_manager(user_id, mem_cube_id)
43+
44+
# Create search data entry
45+
search_entry = {
46+
"query": query,
47+
"formatted_memories": formatted_memories,
48+
"timestamp": threading.current_thread().ident, # Use thread ID as simple timestamp
49+
}
50+
51+
# Load existing search history
52+
existing_data = manager.load_from_db()
53+
54+
if existing_data is None:
55+
search_history = SimpleListManager([])
56+
else:
57+
# If existing data is a SimpleListManager, use it; otherwise create new one
58+
if isinstance(existing_data, SimpleListManager):
59+
search_history = existing_data
60+
else:
61+
search_history = SimpleListManager([])
62+
63+
# Add new entry and keep only latest 5
64+
search_history.add_item(str(search_entry))
65+
if len(search_history) > 5:
66+
# Keep only the latest 5 items
67+
search_history.items = search_history.items[-5:]
68+
69+
# Save back to Redis
70+
manager.save_to_db(search_history)
71+
72+
logger.info(
73+
f"Synced search data for user {user_id}, mem_cube {mem_cube_id}. History size: {len(search_history)}"
74+
)
75+
76+
except Exception as e:
77+
logger.error(f"Failed to sync search data: {e}", exc_info=True)
78+
79+
def get_pre_fine_memories(self, user_id: str, mem_cube_id: str) -> list:
80+
"""
81+
Get the most recent pre-computed fine memories from search history.
82+
83+
Args:
84+
user_id: User identifier
85+
mem_cube_id: Memory cube identifier
86+
87+
Returns:
88+
List of formatted memories from the most recent search, or empty list if none found
89+
"""
90+
try:
91+
manager = self.get_search_history_manager(user_id, mem_cube_id)
92+
search_history_key = "search_history_list"
93+
existing_data = manager.load_from_db(search_history_key)
94+
95+
if existing_data is None:
96+
return []
97+
98+
search_history = (
99+
existing_data.obj_instance
100+
if hasattr(existing_data, "obj_instance")
101+
else existing_data
102+
)
103+
104+
if not search_history or len(search_history) == 0:
105+
return []
106+
107+
# Return the formatted_memories from the most recent search
108+
latest_entry = search_history[-1]
109+
return (
110+
latest_entry.get("formatted_memories", []) if isinstance(latest_entry, dict) else []
111+
)
112+
113+
except Exception as e:
114+
logger.error(f"Failed to get pre-computed fine memories: {e}", exc_info=True)
115+
return []

0 commit comments

Comments
 (0)