Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 240 additions & 0 deletions src/memos/api/config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import base64
import hashlib
import hmac
import json
import logging
import os
import re
import time

from typing import Any

import requests

from dotenv import load_dotenv

from memos.configs.mem_cube import GeneralMemCubeConfig
Expand All @@ -13,6 +21,238 @@
# Load environment variables
load_dotenv()

logger = logging.getLogger(__name__)


def _update_env_from_dict(data: dict[str, Any]) -> None:
"""Apply a dict to environment variables, with change logging."""

def _is_sensitive(name: str) -> bool:
n = name.upper()
return any(s in n for s in ["PASSWORD", "SECRET", "AK", "SK", "TOKEN", "KEY"])

for k, v in data.items():
if isinstance(v, dict):
new_val = json.dumps(v, ensure_ascii=False)
elif isinstance(v, bool):
new_val = "true" if v else "false"
elif v is None:
new_val = ""
else:
new_val = str(v)

old_val = os.environ.get(k)
os.environ[k] = new_val

try:
log_old = "***" if _is_sensitive(k) else (old_val if old_val is not None else "<unset>")
log_new = "***" if _is_sensitive(k) else new_val
if old_val != new_val:
logger.info(f"Nacos config update: {k}={log_new} (was {log_old})")
except Exception as e:
# Avoid logging failures blocking config updates
logger.debug(f"Skip logging change for {k}: {e}")


def get_config_json(name: str, default: Any | None = None) -> Any:
"""Read JSON object/array from env and parse. Returns default on missing/invalid."""
raw = os.getenv(name)
if not raw:
return default
try:
return json.loads(raw)
except Exception:
logger.warning(f"Invalid JSON in env '{name}', returning default.")
return default


def get_config_value(path: str, default: Any | None = None) -> Any:
"""Read value from env with optional dot-path for structured configs.

Examples:
- get_config_value("MONGODB_CONFIG.base_uri")
- get_config_value("MONGODB_BASE_URI")
"""
if "." not in path:
val = os.getenv(path)
return val if val is not None else default
root, *subkeys = path.split(".")
data = get_config_json(root, default=None)
if not isinstance(data, dict):
return default
cur: Any = data
for key in subkeys:
if isinstance(cur, dict) and key in cur:
cur = cur[key]
else:
return default
return cur


class NacosConfigManager:
_client = None
_data_id = None
_group = None
_enabled = False

# Pre-compile regex patterns for better performance
_KEY_VALUE_PATTERN = re.compile(r"^([^=]+)=(.*)$")
_INTEGER_PATTERN = re.compile(r"^[+-]?\d+$")
_FLOAT_PATTERN = re.compile(r"^[+-]?(\d+\.?\d*|\.\d+)([eE][+-]?\d+)?$")

@classmethod
def _sign(cls, secret_key: str, data: str) -> str:
"""HMAC-SHA1 sgin"""
signature = hmac.new(secret_key.encode("utf-8"), data.encode("utf-8"), hashlib.sha1)
return base64.b64encode(signature.digest()).decode()

@staticmethod
def _parse_value(value: str) -> Any:
"""Parse string value to appropriate Python type.

Supports: bool, int, float, and string.
"""
if not value:
return value

val_lower = value.lower()

# Boolean
if val_lower in ("true", "false"):
return val_lower == "true"

# Integer
if NacosConfigManager._INTEGER_PATTERN.match(value):
try:
return int(value)
except (ValueError, OverflowError):
return value

# Float
if NacosConfigManager._FLOAT_PATTERN.match(value):
try:
return float(value)
except (ValueError, OverflowError):
return value

# Default to string
return value

@staticmethod
def parse_properties(content: str) -> dict[str, Any]:
"""Parse properties file content to dictionary with type inference.

Supports:
- Comments (lines starting with #)
- Key-value pairs (KEY=VALUE)
- Type inference (bool, int, float, string)
"""
data: dict[str, Any] = {}

for line in content.splitlines():
line = line.strip()

# Skip empty lines and comments
if not line or line.startswith("#"):
continue

# Parse key-value pair
match = NacosConfigManager._KEY_VALUE_PATTERN.match(line)
if match:
key = match.group(1).strip()
value = match.group(2).strip()
data[key] = NacosConfigManager._parse_value(value)

return data

@classmethod
def start_config_watch(cls):
while True:
cls.init()
time.sleep(60)

@classmethod
def start_watch_if_enabled(cls) -> None:
enable = os.getenv("NACOS_ENABLE_WATCH", "false").lower() == "true"
print("enable:", enable)
if not enable:
return
interval = int(os.getenv("NACOS_WATCH_INTERVAL", "60"))
import threading

def _loop() -> None:
while True:
try:
cls.init()
except Exception as e:
logger.error(f"❌ Nacos watch loop error: {e}")
time.sleep(interval)

threading.Thread(target=_loop, daemon=True).start()
logger.info(f"Nacos watch thread started (interval={interval}s).")

@classmethod
def init(cls) -> None:
server_addr = os.getenv("NACOS_SERVER_ADDR")
data_id = os.getenv("NACOS_DATA_ID")
group = os.getenv("NACOS_GROUP", "DEFAULT_GROUP")
namespace = os.getenv("NACOS_NAMESPACE", "")
ak = os.getenv("AK")
sk = os.getenv("SK")

if not (server_addr and data_id and ak and sk):
logger.warning("❌ missing NACOS_SERVER_ADDR / AK / SK / DATA_ID")
return

base_url = f"http://{server_addr}/nacos/v1/cs/configs"

def _auth_headers():
ts = str(int(time.time() * 1000))

sign_data = namespace + "+" + group + "+" + ts if namespace else group + "+" + ts
signature = cls._sign(sk, sign_data)
return {
"Spas-AccessKey": ak,
"Spas-Signature": signature,
"timeStamp": ts,
}

try:
params = {
"dataId": data_id,
"group": group,
"tenant": namespace,
}

headers = _auth_headers()
resp = requests.get(base_url, headers=headers, params=params, timeout=10)

if resp.status_code != 200:
logger.error(f"Nacos AK/SK fail: {resp.status_code} {resp.text}")
return

content = resp.text.strip()
if not content:
logger.warning("⚠️ Nacos is empty")
return
try:
data_props = cls.parse_properties(content)
logger.info("nacos config:", data_props)
_update_env_from_dict(data_props)
logger.info("✅ parse Nacos setting is Properties ")
except Exception as e:
logger.error(f"⚠️ Nacos parse fail(not JSON/YAML/Properties): {e}")
raise Exception(f"Nacos configuration parsing failed: {e}") from e

except Exception as e:
logger.error(f"❌ Nacos AK/SK init fail: {e}")
raise Exception(f"❌ Nacos AK/SK init fail: {e}") from e


# init Nacos
NacosConfigManager.init()
NacosConfigManager.start_watch_if_enabled()


class APIConfig:
"""Centralized configuration management for MemOS APIs."""
Expand Down
1 change: 0 additions & 1 deletion src/memos/graph_dbs/polardb.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,6 @@ def get_nodes(
# Parse embedding from JSONB if it exists
if embedding_json is not None:
try:
print("embedding_json:", embedding_json)
# remove embedding
"""
embedding = json.loads(embedding_json) if isinstance(embedding_json, str) else embedding_json
Expand Down