@@ -6,14 +6,42 @@ use ext_php_rs::convert::IntoZval;
66use ext_php_rs:: error:: Error ;
77use ext_php_rs:: prelude:: * ;
88use ext_php_rs:: types:: { ZendHashTable , Zval } ;
9- use rust_rocksdb:: { Options , DB } ;
9+ use json_patch:: Patch ;
10+ use rust_rocksdb:: {
11+ ColumnFamilyDescriptor , DBWithThreadMode , MergeOperands , Options , SingleThreaded , DB ,
12+ } ;
13+ use serde_json:: { from_value, Value } ;
1014use std:: collections:: HashMap ;
1115use std:: time:: Duration ;
1216
1317use crate :: backup:: RocksDBBackup ;
1418use crate :: transaction:: RocksDBTransaction ;
1519use crate :: write_batch:: RocksDBWriteBatch ;
1620
21+ fn json_merge (
22+ _new_key : & [ u8 ] ,
23+ existing_val : Option < & [ u8 ] > ,
24+ operands : & MergeOperands ,
25+ ) -> Option < Vec < u8 > > {
26+ // Decode the existing value
27+ let mut doc: Value = if let Some ( val) = existing_val {
28+ serde_json:: from_slice ( val) . unwrap_or ( Value :: Array ( vec ! [ ] ) )
29+ } else {
30+ Value :: Array ( vec ! [ ] )
31+ } ;
32+
33+ // Process each operand
34+ for op in operands {
35+ if let Ok ( patch) = serde_json:: from_slice :: < Value > ( op) {
36+ let p: Patch = from_value ( patch) . unwrap ( ) ;
37+ json_patch:: patch ( & mut doc, & p) . unwrap ( ) ;
38+ }
39+ }
40+
41+ // Serialize the updated JSON object back to bytes
42+ Some ( serde_json:: to_vec ( & doc) . unwrap ( ) )
43+ }
44+
1745#[ derive( Debug ) ]
1846pub struct KeyValueResult {
1947 pub key : Option < String > ,
@@ -34,7 +62,7 @@ impl IntoZval for KeyValueResult {
3462
3563#[ php_class]
3664pub struct RocksDB {
37- pub db : DB ,
65+ pub db : DBWithThreadMode < SingleThreaded > ,
3866 position : Option < Vec < u8 > > ,
3967}
4068
@@ -46,13 +74,29 @@ impl RocksDB {
4674 opts. create_if_missing ( true ) ;
4775 opts. set_max_open_files ( 1000 ) ;
4876 opts. set_log_level ( rust_rocksdb:: LogLevel :: Warn ) ;
77+ opts. set_merge_operator_associative ( "json_merge" , json_merge) ;
78+
79+ let cf_names = DB :: list_cf ( & opts, & path) . unwrap_or ( vec ! [ "default" . to_string( ) ] ) ;
80+ let cf_descriptors: Vec < ColumnFamilyDescriptor > = cf_names
81+ . iter ( )
82+ . map ( |name| {
83+ let mut cf_opts = Options :: default ( ) ;
84+ cf_opts. set_merge_operator_associative ( "json_merge" , json_merge) ;
85+ ColumnFamilyDescriptor :: new ( name, cf_opts)
86+ } )
87+ . collect ( ) ;
4988
5089 let db = match ttl_secs {
5190 Some ( ttl) => {
5291 let duration = Duration :: from_secs ( ttl) ;
53- DB :: open_with_ttl ( & opts, & path, duration)
92+ DBWithThreadMode :: open_cf_descriptors_with_ttl (
93+ & opts,
94+ & path,
95+ cf_descriptors,
96+ duration,
97+ )
5498 }
55- None => DB :: open ( & opts, & path) ,
99+ None => DBWithThreadMode :: open_cf_descriptors ( & opts, & path, cf_descriptors ) ,
56100 } ;
57101
58102 match db {
@@ -146,14 +190,15 @@ impl RocksDB {
146190 }
147191 }
148192
149- pub fn create_column_family ( & self , cf_name : String ) -> PhpResult < ( ) > {
150- let opts = Options :: default ( ) ;
193+ pub fn create_column_family ( & mut self , cf_name : String ) -> PhpResult < ( ) > {
194+ let mut opts = Options :: default ( ) ;
195+ opts. set_merge_operator_associative ( "json_merge" , json_merge) ;
151196 self . db
152197 . create_cf ( & cf_name, & opts)
153198 . map_err ( |e| e. to_string ( ) . into ( ) )
154199 }
155200
156- pub fn drop_column_family ( & self , cf_name : String ) -> PhpResult < ( ) > {
201+ pub fn drop_column_family ( & mut self , cf_name : String ) -> PhpResult < ( ) > {
157202 self . db . drop_cf ( & cf_name) . map_err ( |e| e. to_string ( ) . into ( ) )
158203 }
159204
0 commit comments