Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
11b63e6
debug an error function name
tangg555 Oct 20, 2025
72e8f39
feat: Add DynamicCache compatibility for different transformers versions
tangg555 Oct 20, 2025
5702870
feat: implement APIAnalyzerForScheduler for memory operations
tangg555 Oct 21, 2025
4655b41
feat: Add search_ws API endpoint and enhance API analyzer functionality
tangg555 Oct 21, 2025
c20736c
fix: resolve test failures and warnings in test suite
tangg555 Oct 21, 2025
da72e7e
feat: add a test_robustness execution to test thread pool execution
tangg555 Oct 21, 2025
5b9b1e4
feat: optimize scheduler configuration and API search functionality
tangg555 Oct 22, 2025
6dac11e
feat: Add Redis auto-initialization with fallback strategies
tangg555 Oct 22, 2025
a207bf4
feat: add database connection management to ORM module
tangg555 Oct 24, 2025
8c1cc04
remove part of test
tangg555 Oct 24, 2025
f2b0da4
feat: add Redis-based ORM with multiprocess synchronization
tangg555 Oct 24, 2025
f0e8aab
fix: resolve scheduler module import and Redis integration issues
tangg555 Oct 24, 2025
731f00d
revise naive memcube creation in server router
tangg555 Oct 25, 2025
6d442fb
remove long-time tests in test_scheduler
tangg555 Oct 25, 2025
157f858
remove redis test which needs .env
tangg555 Oct 25, 2025
c483011
refactor all codes about mixture search with scheduler
tangg555 Oct 25, 2025
b81b82e
fix: resolve Redis API synchronization issues and implement search AP…
tangg555 Oct 26, 2025
90d1a0b
remove a test for api module
tangg555 Oct 26, 2025
1de72cf
revise to pass the test suite
tangg555 Oct 26, 2025
c72858e
addressed all conflicts
tangg555 Oct 27, 2025
3245376
address some bugs to make mix_search normally running
tangg555 Oct 27, 2025
57482cf
modify codes according to evaluation logs
tangg555 Oct 27, 2025
e4b8313
Merge remote-tracking branch 'upstream/dev' into dev
tangg555 Oct 27, 2025
c5db08a
Merge branch 'dev' into dev
CaralHsi Oct 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion evaluation/scripts/utils/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def search(self, query, user_id, top_k):
"mem_cube_id": user_id,
"conversation_id": "",
"top_k": top_k,
"mode": "mixture",
},
ensure_ascii=False,
)
Expand Down Expand Up @@ -230,6 +231,7 @@ def search(self, query, user_id, top_k):
"query": query,
"user_id": user_id,
"memory_limit_number": top_k,
"mode": "mixture",
}
)

Expand Down Expand Up @@ -311,7 +313,7 @@ def add(self, messages, user_id, iso_date):
agent_name=self.agent_id,
session_date=iso_date,
)
self.wait_for_completion(response.task_id)
self.wait_for_completion(response.item_id)
except Exception as error:
print("❌ Error saving conversation:", error)

Expand Down
4 changes: 2 additions & 2 deletions src/memos/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,8 @@ def get_scheduler_config() -> dict[str, Any]:
"thread_pool_max_workers": int(
os.getenv("MOS_SCHEDULER_THREAD_POOL_MAX_WORKERS", "10")
),
"consume_interval_seconds": int(
os.getenv("MOS_SCHEDULER_CONSUME_INTERVAL_SECONDS", "3")
"consume_interval_seconds": float(
os.getenv("MOS_SCHEDULER_CONSUME_INTERVAL_SECONDS", "0.01")
),
"enable_parallel_dispatch": os.getenv(
"MOS_SCHEDULER_ENABLE_PARALLEL_DISPATCH", "true"
Expand Down
138 changes: 27 additions & 111 deletions src/memos/api/routers/server_router.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import json
import os
import traceback

from concurrent.futures import ThreadPoolExecutor
from typing import Any
from typing import TYPE_CHECKING, Any

from fastapi import APIRouter, HTTPException

Expand Down Expand Up @@ -33,11 +32,8 @@
from memos.mem_scheduler.orm_modules.base_model import BaseDBManager
from memos.mem_scheduler.scheduler_factory import SchedulerFactory
from memos.mem_scheduler.schemas.general_schemas import (
API_MIX_SEARCH_LABEL,
SearchMode,
)
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
from memos.mem_scheduler.utils.db_utils import get_utc_now
from memos.memories.textual.prefer_text_memory.config import (
AdderConfigFactory,
ExtractorConfigFactory,
Expand All @@ -54,6 +50,10 @@
)
from memos.reranker.factory import RerankerFactory
from memos.templates.instruction_completion import instruct_completion


if TYPE_CHECKING:
from memos.mem_scheduler.optimized_scheduler import OptimizedScheduler
from memos.types import MOSSearchResult, UserContext
from memos.vec_dbs.factory import VecDBFactory

Expand Down Expand Up @@ -154,7 +154,6 @@ def init_server():

# Build component configurations
graph_db_config = _build_graph_db_config()
print(graph_db_config)
llm_config = _build_llm_config()
embedder_config = _build_embedder_config()
mem_reader_config = _build_mem_reader_config()
Expand Down Expand Up @@ -209,22 +208,6 @@ def init_server():
online_bot=False,
)

# Initialize Scheduler
scheduler_config_dict = APIConfig.get_scheduler_config()
scheduler_config = SchedulerConfigFactory(
backend="optimized_scheduler", config=scheduler_config_dict
)
mem_scheduler = SchedulerFactory.from_config(scheduler_config)
mem_scheduler.initialize_modules(
chat_llm=llm,
process_llm=mem_reader.llm,
db_engine=BaseDBManager.create_default_sqlite_engine(),
)
mem_scheduler.start()

# Initialize SchedulerAPIModule
api_module = mem_scheduler.api_module

naive_mem_cube = NaiveMemCube(
llm=llm,
embedder=embedder,
Expand All @@ -240,6 +223,23 @@ def init_server():
pref_retriever=pref_retriever,
)

# Initialize Scheduler
scheduler_config_dict = APIConfig.get_scheduler_config()
scheduler_config = SchedulerConfigFactory(
backend="optimized_scheduler", config=scheduler_config_dict
)
mem_scheduler: OptimizedScheduler = SchedulerFactory.from_config(scheduler_config)
mem_scheduler.initialize_modules(
chat_llm=llm,
process_llm=mem_reader.llm,
db_engine=BaseDBManager.create_default_sqlite_engine(),
)
mem_scheduler.current_mem_cube = naive_mem_cube
mem_scheduler.start()

# Initialize SchedulerAPIModule
api_module = mem_scheduler.api_module

return (
graph_db,
mem_reader,
Expand Down Expand Up @@ -400,96 +400,12 @@ def mix_search_memories(
"""
Mix search memories: fast search + async fine search
"""
# Get fast memories first
fast_memories = fast_search_memories(search_req, user_context)

# Check if scheduler and dispatcher are available for async execution
if mem_scheduler and hasattr(mem_scheduler, "dispatcher") and mem_scheduler.dispatcher:
try:
# Create message for async fine search
message_content = {
"search_req": {
"query": search_req.query,
"user_id": search_req.user_id,
"session_id": search_req.session_id,
"top_k": search_req.top_k,
"internet_search": search_req.internet_search,
"moscube": search_req.moscube,
"chat_history": search_req.chat_history,
},
"user_context": {"mem_cube_id": user_context.mem_cube_id},
}

message = ScheduleMessageItem(
item_id=f"mix_search_{search_req.user_id}_{get_utc_now().timestamp()}",
user_id=search_req.user_id,
mem_cube_id=user_context.mem_cube_id,
label=API_MIX_SEARCH_LABEL,
mem_cube=naive_mem_cube,
content=json.dumps(message_content),
timestamp=get_utc_now(),
)

# Submit async task
mem_scheduler.dispatcher.submit_message(message)
logger.info(f"Submitted async fine search task for user {search_req.user_id}")

# Try to get pre-computed fine memories if available
try:
pre_fine_memories = api_module.get_pre_fine_memories(
user_id=search_req.user_id, mem_cube_id=user_context.mem_cube_id
)
if pre_fine_memories:
# Merge fast and pre-computed fine memories
all_memories = fast_memories + pre_fine_memories
# Remove duplicates based on content
seen_contents = set()
unique_memories = []
for memory in all_memories:
content_key = memory.get("content", "")
if content_key not in seen_contents:
seen_contents.add(content_key)
unique_memories.append(memory)
return unique_memories
except Exception as e:
logger.warning(f"Failed to get pre-computed fine memories: {e}")

except Exception as e:
logger.error(f"Failed to submit async fine search task: {e}")
# Fall back to synchronous execution

# Fallback: synchronous fine search
try:
fine_memories = fine_search_memories(search_req, user_context)

# Merge fast and fine memories
all_memories = fast_memories + fine_memories

# Remove duplicates based on content
seen_contents = set()
unique_memories = []
for memory in all_memories:
content_key = memory.get("content", "")
if content_key not in seen_contents:
seen_contents.add(content_key)
unique_memories.append(memory)

# Sync search data to Redis
try:
api_module.sync_search_data(
user_id=search_req.user_id,
mem_cube_id=user_context.mem_cube_id,
query=search_req.query,
formatted_memories=unique_memories,
)
except Exception as e:
logger.error(f"Failed to sync search data: {e}")

return unique_memories

except Exception as e:
logger.error(f"Fine search failed: {e}")
return fast_memories
formatted_memories = mem_scheduler.mix_search_memories(
search_req=search_req,
user_context=user_context,
)
return formatted_memories


def fine_search_memories(
Expand Down
5 changes: 5 additions & 0 deletions src/memos/configs/mem_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
DEFAULT_CONSUME_INTERVAL_SECONDS,
DEFAULT_CONTEXT_WINDOW_SIZE,
DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE,
DEFAULT_MULTI_TASK_RUNNING_TIMEOUT,
DEFAULT_THREAD_POOL_MAX_WORKERS,
DEFAULT_TOP_K,
DEFAULT_USE_REDIS_QUEUE,
Expand Down Expand Up @@ -59,6 +60,10 @@ class BaseSchedulerConfig(BaseConfig):
default=DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE,
description="Maximum size of internal message queue when not using Redis",
)
multi_task_running_timeout: int = Field(
default=DEFAULT_MULTI_TASK_RUNNING_TIMEOUT,
description="Default timeout for multi-task running operations in seconds",
)


class GeneralSchedulerConfig(BaseSchedulerConfig):
Expand Down
Loading