diff --git a/src/memos/api/config.py b/src/memos/api/config.py index 92df1ecf8..7ac882d6c 100644 --- a/src/memos/api/config.py +++ b/src/memos/api/config.py @@ -1,8 +1,16 @@ +import base64 +import hashlib +import hmac import json +import logging import os +import re +import time from typing import Any +import requests + from dotenv import load_dotenv from memos.configs.mem_cube import GeneralMemCubeConfig @@ -13,6 +21,238 @@ # Load environment variables load_dotenv() +logger = logging.getLogger(__name__) + + +def _update_env_from_dict(data: dict[str, Any]) -> None: + """Apply a dict to environment variables, with change logging.""" + + def _is_sensitive(name: str) -> bool: + n = name.upper() + return any(s in n for s in ["PASSWORD", "SECRET", "AK", "SK", "TOKEN", "KEY"]) + + for k, v in data.items(): + if isinstance(v, dict): + new_val = json.dumps(v, ensure_ascii=False) + elif isinstance(v, bool): + new_val = "true" if v else "false" + elif v is None: + new_val = "" + else: + new_val = str(v) + + old_val = os.environ.get(k) + os.environ[k] = new_val + + try: + log_old = "***" if _is_sensitive(k) else (old_val if old_val is not None else "") + log_new = "***" if _is_sensitive(k) else new_val + if old_val != new_val: + logger.info(f"Nacos config update: {k}={log_new} (was {log_old})") + except Exception as e: + # Avoid logging failures blocking config updates + logger.debug(f"Skip logging change for {k}: {e}") + + +def get_config_json(name: str, default: Any | None = None) -> Any: + """Read JSON object/array from env and parse. Returns default on missing/invalid.""" + raw = os.getenv(name) + if not raw: + return default + try: + return json.loads(raw) + except Exception: + logger.warning(f"Invalid JSON in env '{name}', returning default.") + return default + + +def get_config_value(path: str, default: Any | None = None) -> Any: + """Read value from env with optional dot-path for structured configs. + + Examples: + - get_config_value("MONGODB_CONFIG.base_uri") + - get_config_value("MONGODB_BASE_URI") + """ + if "." not in path: + val = os.getenv(path) + return val if val is not None else default + root, *subkeys = path.split(".") + data = get_config_json(root, default=None) + if not isinstance(data, dict): + return default + cur: Any = data + for key in subkeys: + if isinstance(cur, dict) and key in cur: + cur = cur[key] + else: + return default + return cur + + +class NacosConfigManager: + _client = None + _data_id = None + _group = None + _enabled = False + + # Pre-compile regex patterns for better performance + _KEY_VALUE_PATTERN = re.compile(r"^([^=]+)=(.*)$") + _INTEGER_PATTERN = re.compile(r"^[+-]?\d+$") + _FLOAT_PATTERN = re.compile(r"^[+-]?(\d+\.?\d*|\.\d+)([eE][+-]?\d+)?$") + + @classmethod + def _sign(cls, secret_key: str, data: str) -> str: + """HMAC-SHA1 sgin""" + signature = hmac.new(secret_key.encode("utf-8"), data.encode("utf-8"), hashlib.sha1) + return base64.b64encode(signature.digest()).decode() + + @staticmethod + def _parse_value(value: str) -> Any: + """Parse string value to appropriate Python type. + + Supports: bool, int, float, and string. + """ + if not value: + return value + + val_lower = value.lower() + + # Boolean + if val_lower in ("true", "false"): + return val_lower == "true" + + # Integer + if NacosConfigManager._INTEGER_PATTERN.match(value): + try: + return int(value) + except (ValueError, OverflowError): + return value + + # Float + if NacosConfigManager._FLOAT_PATTERN.match(value): + try: + return float(value) + except (ValueError, OverflowError): + return value + + # Default to string + return value + + @staticmethod + def parse_properties(content: str) -> dict[str, Any]: + """Parse properties file content to dictionary with type inference. + + Supports: + - Comments (lines starting with #) + - Key-value pairs (KEY=VALUE) + - Type inference (bool, int, float, string) + """ + data: dict[str, Any] = {} + + for line in content.splitlines(): + line = line.strip() + + # Skip empty lines and comments + if not line or line.startswith("#"): + continue + + # Parse key-value pair + match = NacosConfigManager._KEY_VALUE_PATTERN.match(line) + if match: + key = match.group(1).strip() + value = match.group(2).strip() + data[key] = NacosConfigManager._parse_value(value) + + return data + + @classmethod + def start_config_watch(cls): + while True: + cls.init() + time.sleep(60) + + @classmethod + def start_watch_if_enabled(cls) -> None: + enable = os.getenv("NACOS_ENABLE_WATCH", "false").lower() == "true" + print("enable:", enable) + if not enable: + return + interval = int(os.getenv("NACOS_WATCH_INTERVAL", "60")) + import threading + + def _loop() -> None: + while True: + try: + cls.init() + except Exception as e: + logger.error(f"❌ Nacos watch loop error: {e}") + time.sleep(interval) + + threading.Thread(target=_loop, daemon=True).start() + logger.info(f"Nacos watch thread started (interval={interval}s).") + + @classmethod + def init(cls) -> None: + server_addr = os.getenv("NACOS_SERVER_ADDR") + data_id = os.getenv("NACOS_DATA_ID") + group = os.getenv("NACOS_GROUP", "DEFAULT_GROUP") + namespace = os.getenv("NACOS_NAMESPACE", "") + ak = os.getenv("AK") + sk = os.getenv("SK") + + if not (server_addr and data_id and ak and sk): + logger.warning("❌ missing NACOS_SERVER_ADDR / AK / SK / DATA_ID") + return + + base_url = f"http://{server_addr}/nacos/v1/cs/configs" + + def _auth_headers(): + ts = str(int(time.time() * 1000)) + + sign_data = namespace + "+" + group + "+" + ts if namespace else group + "+" + ts + signature = cls._sign(sk, sign_data) + return { + "Spas-AccessKey": ak, + "Spas-Signature": signature, + "timeStamp": ts, + } + + try: + params = { + "dataId": data_id, + "group": group, + "tenant": namespace, + } + + headers = _auth_headers() + resp = requests.get(base_url, headers=headers, params=params, timeout=10) + + if resp.status_code != 200: + logger.error(f"Nacos AK/SK fail: {resp.status_code} {resp.text}") + return + + content = resp.text.strip() + if not content: + logger.warning("⚠️ Nacos is empty") + return + try: + data_props = cls.parse_properties(content) + logger.info("nacos config:", data_props) + _update_env_from_dict(data_props) + logger.info("✅ parse Nacos setting is Properties ") + except Exception as e: + logger.error(f"⚠️ Nacos parse fail(not JSON/YAML/Properties): {e}") + raise Exception(f"Nacos configuration parsing failed: {e}") from e + + except Exception as e: + logger.error(f"❌ Nacos AK/SK init fail: {e}") + raise Exception(f"❌ Nacos AK/SK init fail: {e}") from e + + +# init Nacos +NacosConfigManager.init() +NacosConfigManager.start_watch_if_enabled() + class APIConfig: """Centralized configuration management for MemOS APIs.""" diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index 3f059e8ad..971a56e04 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -833,7 +833,6 @@ def get_nodes( # Parse embedding from JSONB if it exists if embedding_json is not None: try: - print("embedding_json:", embedding_json) # remove embedding """ embedding = json.loads(embedding_json) if isinstance(embedding_json, str) else embedding_json