Skip to content

Commit 68b5b5f

Browse files
committed
Add support of custom sharding func for crud methods
1 parent 7fa492c commit 68b5b5f

17 files changed

+794
-30
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
2121
key specified with DDL schema or in `_ddl_sharding_key` space.
2222
NOTE: CRUD methods delete(), get() and update() requires that sharding key
2323
must be a part of primary key.
24+
* Support bucket id calculating using sharding func specified in
25+
DDL schema or in `_ddl_sharding_func` space.
2426

2527
### Fixed
2628

README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,25 @@ documentation). As soon as sharding key for a certain space is available in
7777
automatically. Note that CRUD methods `delete()`, `get()` and `update()`
7878
requires that sharding key must be a part of primary key.
7979

80+
You can specify sharding function to calculate bucket_id with
81+
sharding func definition as a part of [DDL
82+
schema](https://github.com/tarantool/ddl#input-data-format)
83+
or insert manually to the space `_ddl_sharding_func`.
84+
85+
CRUD uses `strcrc32` as sharding function by default.
86+
The reason why using of `strcrc32` is undesirable is that
87+
this sharding function is not consistent for cdata numbers.
88+
In particular, it returns 3 different values for normal Lua
89+
numbers like 123, for `unsigned long long` cdata
90+
(like `123ULL`, or `ffi.cast('unsigned long long',
91+
123)`), and for `signed long long` cdata (like `123LL`, or
92+
`ffi.cast('long long', 123)`).
93+
94+
We cannot change default sharding function `strcrc32`
95+
due to backward compatibility concerns, but please consider
96+
using better alternatives for sharding function.
97+
`mpcrc32` is one of them.
98+
8099
Table below describe what operations supports custom sharding key:
81100

82101
| CRUD method | Sharding key support |

crud/common/sharding/sharding.lua

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,27 @@ local errors = require('errors')
44
local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
55

66
local utils = require('crud.common.utils')
7+
local dev_checks = require('crud.common.dev_checks')
78
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
89

910
local sharding = {}
1011

11-
function sharding.key_get_bucket_id(key, specified_bucket_id)
12+
function sharding.key_get_bucket_id(space_name, key, specified_bucket_id)
13+
dev_checks('string', '?', '?number|cdata')
14+
1215
if specified_bucket_id ~= nil then
1316
return specified_bucket_id
1417
end
1518

19+
local sharding_func, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
20+
if err ~= nil then
21+
return nil, err
22+
end
23+
24+
if sharding_func ~= nil then
25+
return sharding_func(key)
26+
end
27+
1628
return vshard.router.bucket_id_strcrc32(key)
1729
end
1830

@@ -31,7 +43,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
3143
end
3244
local key = utils.extract_key(tuple, sharding_index_parts)
3345

34-
return sharding.key_get_bucket_id(key)
46+
return sharding.key_get_bucket_id(space.name, key)
3547
end
3648

3749
function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id)

crud/common/sharding/sharding_func.lua

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,31 @@ function sharding_func_module.extract_function_def(tuple)
9393
return nil
9494
end
9595

96+
function sharding_func_module.construct_as_callable_obj_cache(metadata_map, specified_space_name)
97+
dev_checks('table', 'string')
98+
99+
local result_err
100+
101+
cache.sharding_func_map = {}
102+
for space_name, metadata in pairs(metadata_map) do
103+
if metadata.sharding_func_def ~= nil then
104+
local sharding_func, err = as_callable_object(metadata.sharding_func_def,
105+
space_name)
106+
if err ~= nil then
107+
if specified_space_name == space_name then
108+
result_err = err
109+
else
110+
log.warn(err)
111+
end
112+
end
113+
114+
cache.sharding_func_map[space_name] = sharding_func
115+
end
116+
end
117+
118+
return result_err
119+
end
120+
96121
sharding_func_module.internal = {
97122
as_callable_object = as_callable_object,
98123
is_callable = is_callable,

crud/common/sharding/sharding_metadata.lua

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ local call = require('crud.common.call')
55
local const = require('crud.common.const')
66
local dev_checks = require('crud.common.dev_checks')
77
local cache = require('crud.common.sharding.sharding_metadata_cache')
8+
local sharding_func = require('crud.common.sharding.sharding_func')
89
local sharding_key = require('crud.common.sharding.sharding_key')
910

1011
local FetchShardingMetadataError = errors.new_class('FetchShardingMetadataError', {capture_stack = false})
@@ -38,25 +39,39 @@ local function locked(f)
3839
end
3940
end
4041

41-
-- Return a map with metadata or nil when space box.space._ddl_sharding_key is
42-
-- not available on storage.
42+
-- Return a map with metadata or nil when spaces box.space._ddl_sharding_key and
43+
-- box.space._ddl_sharding_func are not available on storage.
4344
function sharding_metadata_module.fetch_on_storage()
4445
local sharding_key_space = box.space._ddl_sharding_key
45-
if sharding_key_space == nil then
46+
local sharding_func_space = box.space._ddl_sharding_func
47+
48+
if sharding_key_space == nil and sharding_func_space == nil then
4649
return nil
4750
end
4851

4952
local SPACE_NAME_FIELDNO = 1
5053
local SPACE_SHARDING_KEY_FIELDNO = 2
5154
local metadata_map = {}
52-
for _, tuple in sharding_key_space:pairs() do
53-
local space_name = tuple[SPACE_NAME_FIELDNO]
54-
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
55-
local space_format = box.space[space_name]:format()
56-
metadata_map[space_name] = {
57-
sharding_key_def = sharding_key_def,
58-
space_format = space_format,
59-
}
55+
56+
if sharding_key_space ~= nil then
57+
for _, tuple in sharding_key_space:pairs() do
58+
local space_name = tuple[SPACE_NAME_FIELDNO]
59+
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
60+
local space_format = box.space[space_name]:format()
61+
metadata_map[space_name] = {
62+
sharding_key_def = sharding_key_def,
63+
space_format = space_format,
64+
}
65+
end
66+
end
67+
68+
if sharding_func_space ~= nil then
69+
for _, tuple in sharding_func_space:pairs() do
70+
local space_name = tuple[SPACE_NAME_FIELDNO]
71+
local sharding_func_def = sharding_func.extract_function_def(tuple)
72+
metadata_map[space_name] = metadata_map[space_name] or {}
73+
metadata_map[space_name].sharding_func_def = sharding_func_def
74+
end
6075
end
6176

6277
return metadata_map
@@ -83,24 +98,21 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name)
8398
end
8499
if metadata_map == nil then
85100
cache[cache.SHARDING_KEY_MAP_NAME] = {}
101+
cache[cache.SHARDING_FUNC_MAP_NAME] = {}
86102
return
87103
end
88104

89105
local err = sharding_key.construct_as_index_obj_cache(metadata_map, space_name)
90106
if err ~= nil then
91107
return err
92108
end
109+
110+
local err = sharding_func.construct_as_callable_obj_cache(metadata_map, space_name)
111+
if err ~= nil then
112+
return err
113+
end
93114
end)
94115

95-
-- Get sharding index for a certain space.
96-
--
97-
-- Return:
98-
-- - sharding key as index object, when sharding key definition found on
99-
-- storage.
100-
-- - nil, when sharding key definition was not found on storage. Pay attention
101-
-- that nil without error is a successfull return value.
102-
-- - nil and error, when something goes wrong on fetching attempt.
103-
--
104116
local function fetch_on_router(space_name, metadata_map_name, timeout)
105117
if cache[metadata_map_name] ~= nil then
106118
return cache[metadata_map_name][space_name]
@@ -120,17 +132,46 @@ local function fetch_on_router(space_name, metadata_map_name, timeout)
120132
"Fetching sharding key for space '%s' is failed", space_name)
121133
end
122134

135+
-- Get sharding index for a certain space.
136+
--
137+
-- Return:
138+
-- - sharding key as index object, when sharding key definition found on
139+
-- storage.
140+
-- - nil, when sharding key definition was not found on storage. Pay attention
141+
-- that nil without error is a successfull return value.
142+
-- - nil and error, when something goes wrong on fetching attempt.
143+
--
123144
function sharding_metadata_module.fetch_sharding_key_on_router(space_name, timeout)
124145
dev_checks('string', '?number')
125146

126147
return fetch_on_router(space_name, cache.SHARDING_KEY_MAP_NAME, timeout)
127148
end
128149

150+
-- Get sharding func for a certain space.
151+
--
152+
-- Return:
153+
-- - sharding func as callable object, when sharding func definition found on
154+
-- storage.
155+
-- - nil, when sharding func definition was not found on storage. Pay attention
156+
-- that nil without error is a successfull return value.
157+
-- - nil and error, when something goes wrong on fetching attempt.
158+
--
159+
function sharding_metadata_module.fetch_sharding_func_on_router(space_name, timeout)
160+
dev_checks('string', '?number')
161+
162+
return fetch_on_router(space_name, cache.SHARDING_FUNC_MAP_NAME, timeout)
163+
end
164+
129165
function sharding_metadata_module.update_sharding_key_cache(space_name)
130166
cache.drop_caches()
131167
return sharding_metadata_module.fetch_sharding_key_on_router(space_name)
132168
end
133169

170+
function sharding_metadata_module.update_sharding_func_cache(space_name)
171+
cache.drop_caches()
172+
return sharding_metadata_module.fetch_sharding_func_on_router(space_name)
173+
end
174+
134175
function sharding_metadata_module.init()
135176
_G._crud.fetch_on_storage = sharding_metadata_module.fetch_on_storage
136177
end

crud/common/sharding/sharding_metadata_cache.lua

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ local fiber = require('fiber')
33
local sharding_metadata_cache = {}
44

55
sharding_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map"
6+
sharding_metadata_cache.SHARDING_FUNC_MAP_NAME = "sharding_func_map"
67
sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
8+
sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
79
sharding_metadata_cache.fetch_lock = fiber.channel(1)
810
sharding_metadata_cache.is_part_of_pk = {}
911

1012
function sharding_metadata_cache.drop_caches()
1113
sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
14+
sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
1215
if sharding_metadata_cache.fetch_lock ~= nil then
1316
sharding_metadata_cache.fetch_lock:close()
1417
end

crud/delete.lua

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,11 @@ local function call_delete_on_router(space_name, key, opts)
7474
end
7575
end
7676

77-
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
77+
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
78+
if err ~= nil then
79+
return nil, err
80+
end
81+
7882
local call_opts = {
7983
mode = 'write',
8084
timeout = opts.timeout,

crud/get.lua

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@ local function call_get_on_router(space_name, key, opts)
7777
end
7878
end
7979

80-
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
80+
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
81+
if err ~= nil then
82+
return nil, err
83+
end
84+
8185
local call_opts = {
8286
mode = opts.mode or 'read',
8387
prefer_replica = opts.prefer_replica,

crud/select/compat/select.lua

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,11 @@ local function build_select_iterator(space_name, user_conditions, opts)
103103
local perform_map_reduce = opts.force_map_call == true or
104104
(opts.bucket_id == nil and plan.sharding_key == nil)
105105
if not perform_map_reduce then
106-
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
106+
local bucket_id = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
107+
if err ~= nil then
108+
return nil, err
109+
end
110+
107111
assert(bucket_id ~= nil)
108112

109113
local err

crud/select/compat/select_old.lua

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,11 @@ local function build_select_iterator(space_name, user_conditions, opts)
129129
local perform_map_reduce = opts.force_map_call == true or
130130
(opts.bucket_id == nil and plan.sharding_key == nil)
131131
if not perform_map_reduce then
132-
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
132+
local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
133+
if err ~= nil then
134+
return nil, err
135+
end
136+
133137
assert(bucket_id ~= nil)
134138

135139
local err

0 commit comments

Comments
 (0)