|
1 | 1 | import os |
2 | 2 | import threading |
3 | 3 | import typing |
| 4 | +import base64 |
4 | 5 | from sqlalchemy import create_engine, select, inspect as inspect_database |
5 | 6 | from sqlalchemy.ext import asyncio as asyncio_ext |
6 | 7 | from sqlalchemy.orm import sessionmaker |
7 | 8 | from sqlalchemy import Integer, String, JSON, LargeBinary |
8 | 9 | from sqlalchemy.orm import Mapped, mapped_column, DeclarativeBase, MappedAsDataclass |
9 | 10 | from sqlite3 import DatabaseError |
| 11 | +from pymongo import MongoClient, errors as mongo_errors |
| 12 | +from ..param import Parameterized |
| 13 | +from ..core.property import Property |
10 | 14 | from dataclasses import dataclass |
11 | 15 |
|
12 | 16 | from ..param import Parameterized |
@@ -458,5 +462,124 @@ def __exit__(self, exc_type, exc_value, exc_tb) -> None: |
458 | 462 | except Exception as ex: |
459 | 463 | pass |
460 | 464 |
|
| 465 | +class MongoThingDB: |
| 466 | + """ |
| 467 | + MongoDB-backed database engine for Thing properties and info. |
| 468 | + |
| 469 | + This class provides persistence for Thing properties using MongoDB. |
| 470 | + Properties are stored in the 'properties' collection, with fields: |
| 471 | + - id: Thing instance identifier |
| 472 | + - name: property name |
| 473 | + - serialized_value: serialized property value |
| 474 | + |
| 475 | + Methods mirror the interface of ThingDB for compatibility. |
| 476 | + """ |
| 477 | + def __init__(self, instance: Parameterized, config_file: typing.Union[str, None] = None) -> None: |
| 478 | + """ |
| 479 | + Initialize MongoThingDB for a Thing instance. |
| 480 | + Connects to MongoDB and sets up collections. |
| 481 | + """ |
| 482 | + self.thing_instance = instance |
| 483 | + self.id = instance.id |
| 484 | + self.config = self.load_conf(config_file) |
| 485 | + self.client = MongoClient(self.config.get("mongo_uri", "mongodb://localhost:27017")) |
| 486 | + self.db = self.client[self.config.get("database", "hololinked")] |
| 487 | + self.properties = self.db["properties"] |
| 488 | + self.things = self.db["things"] |
| 489 | + |
| 490 | + @classmethod |
| 491 | + def load_conf(cls, config_file: str) -> typing.Dict[str, typing.Any]: |
| 492 | + """ |
| 493 | + Load configuration from JSON file if provided. |
| 494 | + """ |
| 495 | + if not config_file: |
| 496 | + return {} |
| 497 | + elif config_file.endswith(".json"): |
| 498 | + with open(config_file, "r") as file: |
| 499 | + return JSONSerializer.load(file) |
| 500 | + else: |
| 501 | + raise ValueError(f"config files of extension - ['json'] expected, given file name {config_file}") |
| 502 | + |
| 503 | + def fetch_own_info(self): |
| 504 | + """ |
| 505 | + Fetch Thing instance metadata from the 'things' collection. |
| 506 | + """ |
| 507 | + doc = self.things.find_one({"id": self.id}) |
| 508 | + return doc |
| 509 | + |
| 510 | + def get_property(self, property: typing.Union[str, Property], deserialized: bool = True) -> typing.Any: |
| 511 | + """ |
| 512 | + Get a property value from MongoDB for this Thing. |
| 513 | + If deserialized=True, returns the Python value. |
| 514 | + """ |
| 515 | + name = property if isinstance(property, str) else property.name |
| 516 | + doc = self.properties.find_one({"id": self.id, "name": name}) |
| 517 | + if not doc: |
| 518 | + raise mongo_errors.PyMongoError(f"property {name} not found in database") |
| 519 | + if not deserialized: |
| 520 | + return doc |
| 521 | + serializer = Serializers.for_object(self.id, self.thing_instance.__class__.__name__, name) |
| 522 | + return serializer.loads(base64.b64decode(doc["serialized_value"])) |
| 523 | + |
| 524 | + def set_property(self, property: typing.Union[str, Property], value: typing.Any) -> None: |
| 525 | + """ |
| 526 | + Set a property value in MongoDB for this Thing. |
| 527 | + Value is serialized before storage. |
| 528 | + """ |
| 529 | + name = property if isinstance(property, str) else property.name |
| 530 | + serializer = Serializers.for_object(self.id, self.thing_instance.__class__.__name__, name) |
| 531 | + serialized_value = base64.b64encode(serializer.dumps(value)).decode("utf-8") |
| 532 | + self.properties.update_one( |
| 533 | + {"id": self.id, "name": name}, |
| 534 | + {"$set": {"serialized_value": serialized_value}}, |
| 535 | + upsert=True |
| 536 | + ) |
| 537 | + |
| 538 | + def get_properties(self, properties: typing.Dict[typing.Union[str, Property], typing.Any], deserialized: bool = True) -> typing.Dict[str, typing.Any]: |
| 539 | + """ |
| 540 | + Get multiple property values from MongoDB for this Thing. |
| 541 | + Returns a dict of property names to values. |
| 542 | + """ |
| 543 | + names = [obj if isinstance(obj, str) else obj.name for obj in properties.keys()] |
| 544 | + cursor = self.properties.find({"id": self.id, "name": {"$in": names}}) |
| 545 | + result = {} |
| 546 | + for doc in cursor: |
| 547 | + serializer = Serializers.for_object(self.id, self.thing_instance.__class__.__name__, doc["name"]) |
| 548 | + result[doc["name"]] = doc["serialized_value"] if not deserialized else serializer.loads(base64.b64decode(doc["serialized_value"])) |
| 549 | + return result |
| 550 | + |
| 551 | + def set_properties(self, properties: typing.Dict[typing.Union[str, Property], typing.Any]) -> None: |
| 552 | + """ |
| 553 | + Set multiple property values in MongoDB for this Thing. |
| 554 | + """ |
| 555 | + for obj, value in properties.items(): |
| 556 | + name = obj if isinstance(obj, str) else obj.name |
| 557 | + serializer = Serializers.for_object(self.id, self.thing_instance.__class__.__name__, name) |
| 558 | + serialized_value = base64.b64encode(serializer.dumps(value)).decode("utf-8") |
| 559 | + self.properties.update_one( |
| 560 | + {"id": self.id, "name": name}, |
| 561 | + {"$set": {"serialized_value": serialized_value}}, |
| 562 | + upsert=True |
| 563 | + ) |
461 | 564 |
|
| 565 | + def get_all_properties(self, deserialized: bool = True) -> typing.Dict[str, typing.Any]: |
| 566 | + cursor = self.properties.find({"id": self.id}) |
| 567 | + result = {} |
| 568 | + for doc in cursor: |
| 569 | + serializer = Serializers.for_object(self.id, self.thing_instance.__class__.__name__, doc["name"]) |
| 570 | + result[doc["name"]] = doc["serialized_value"] if not deserialized else serializer.loads(base64.b64decode(doc["serialized_value"])) |
| 571 | + return result |
| 572 | + |
| 573 | + def create_missing_properties(self, properties: typing.Dict[str, Property], get_missing_property_names: bool = False) -> typing.Any: |
| 574 | + missing_props = [] |
| 575 | + existing_props = self.get_all_properties() |
| 576 | + for name, new_prop in properties.items(): |
| 577 | + if name not in existing_props: |
| 578 | + serializer = Serializers.for_object(self.id, self.thing_instance.__class__.__name__, new_prop.name) |
| 579 | + serialized_value = base64.b64encode(serializer.dumps(getattr(self.thing_instance, new_prop.name))).decode("utf-8") |
| 580 | + self.properties.insert_one({"id": self.id, "name": new_prop.name, "serialized_value": serialized_value}) |
| 581 | + missing_props.append(name) |
| 582 | + if get_missing_property_names: |
| 583 | + return missing_props |
| 584 | + |
462 | 585 | __all__ = [BaseAsyncDB.__name__, BaseSyncDB.__name__, ThingDB.__name__, batch_db_commit.__name__] |
0 commit comments