Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
must be a part of primary key.
* `crud.count()` function to calculate the number of tuples
in the space according to conditions.
* Support bucket id calculating using sharding func specified in
DDL schema or in `_ddl_sharding_func` space.

### Fixed

Expand Down
32 changes: 31 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,25 @@ documentation). As soon as sharding key for a certain space is available in
automatically. Note that CRUD methods `delete()`, `get()` and `update()`
requires that sharding key must be a part of primary key.

You can specify sharding function to calculate bucket_id with
sharding func definition as a part of [DDL
schema](https://github.com/tarantool/ddl#input-data-format)
or insert manually to the space `_ddl_sharding_func`.

CRUD uses `strcrc32` as sharding function by default.
The reason why using of `strcrc32` is undesirable is that
this sharding function is not consistent for cdata numbers.
In particular, it returns 3 different values for normal Lua
numbers like 123, for `unsigned long long` cdata
(like `123ULL`, or `ffi.cast('unsigned long long',
123)`), and for `signed long long` cdata (like `123LL`, or
`ffi.cast('long long', 123)`).

We cannot change default sharding function `strcrc32`
due to backward compatibility concerns, but please consider
using better alternatives for sharding function.
`mpcrc32` is one of them.

Table below describe what operations supports custom sharding key:

| CRUD method | Sharding key support |
Expand All @@ -101,12 +120,23 @@ Current limitations for using custom sharding key:
- It's not possible to update sharding keys automatically when schema is
updated on storages, see
[#212](https://github.com/tarantool/crud/issues/212). However it is possible
to do it manually with `require('crud.common.sharding_key').update_cache()`.
to do it manually with `require('crud.common.sharding_key').update_cache()`
(this function updates both caches: sharding key cache and sharding function
cache, but returned value is sharding key from cache).
- No support of JSON path for sharding key, see
[#219](https://github.com/tarantool/crud/issues/219).
- `primary_index_fieldno_map` is not cached, see
[#243](https://github.com/tarantool/crud/issues/243).

Current limitations for using custom sharding functions:

- It's not possible to update sharding functions automatically when schema is
updated on storages, see
[#212](https://github.com/tarantool/crud/issues/212). However it is possible
to do it manually with `require('crud.common.sharding_func').update_cache()`
(this function updates both caches: sharding key cache and sharding function
cache, but returned value is sharding function from cache).

### Insert

```lua
Expand Down
4 changes: 2 additions & 2 deletions crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ local truncate = require('crud.truncate')
local len = require('crud.len')
local count = require('crud.count')
local borders = require('crud.borders')
local sharding_key = require('crud.common.sharding_key')
local sharding_metadata = require('crud.common.sharding.sharding_metadata')
local utils = require('crud.common.utils')

local crud = {}
Expand Down Expand Up @@ -120,7 +120,7 @@ function crud.init_storage()
len.init()
count.init()
borders.init()
sharding_key.init()
sharding_metadata.init()
end

function crud.init_router()
Expand Down
2 changes: 1 addition & 1 deletion crud/common/const.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ local const = {}

const.RELOAD_RETRIES_NUM = 1
const.RELOAD_SCHEMA_TIMEOUT = 3 -- 3 seconds
const.FETCH_SHARDING_KEY_TIMEOUT = 3 -- 3 seconds
const.FETCH_SHARDING_METADATA_TIMEOUT = 3 -- 3 seconds

return const
20 changes: 16 additions & 4 deletions crud/common/sharding.lua → crud/common/sharding/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
local GetReplicasetsError = errors.new_class('GetReplicasetsError', {capture_stack = false})

local utils = require('crud.common.utils')
local sharding_key_module = require('crud.common.sharding_key')
local dev_checks = require('crud.common.dev_checks')
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')

local sharding = {}

Expand All @@ -20,11 +21,22 @@ function sharding.get_replicasets_by_bucket_id(bucket_id)
}
end

function sharding.key_get_bucket_id(key, specified_bucket_id)
function sharding.key_get_bucket_id(space_name, key, specified_bucket_id)
dev_checks('string', '?', '?number|cdata')

if specified_bucket_id ~= nil then
return specified_bucket_id
end

local sharding_func, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
if err ~= nil then
return nil, err
end

if sharding_func ~= nil then
return sharding_func(key)
end

return vshard.router.bucket_id_strcrc32(key)
end

Expand All @@ -34,7 +46,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
end

local sharding_index_parts = space.index[0].parts
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space.name)
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space.name)
if err ~= nil then
return nil, err
end
Expand All @@ -43,7 +55,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
end
local key = utils.extract_key(tuple, sharding_index_parts)

return sharding.key_get_bucket_id(key)
return sharding.key_get_bucket_id(space.name, key)
end

function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id)
Expand Down
111 changes: 111 additions & 0 deletions crud/common/sharding/sharding_func.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
local errors = require('errors')
local log = require('log')

local dev_checks = require('crud.common.dev_checks')
local cache = require('crud.common.sharding.sharding_metadata_cache')
local utils = require('crud.common.utils')

local ShardingFuncError = errors.new_class('ShardingFuncError', {capture_stack = false})

local sharding_func_module = {}

local function is_callable(object)
if type(object) == 'function' then
return true
end

-- all objects with type `cdata` are allowed
-- because there is no easy way to get
-- metatable.__call of object with type `cdata`
if type(object) == 'cdata' then
return true
end

local object_metatable = getmetatable(object)
if (type(object) == 'table' or type(object) == 'userdata') then
-- if metatable type is not `table` -> metatable is protected ->
-- cannot detect metamethod `__call` exists
if object_metatable and type(object_metatable) ~= 'table' then
return true
end

-- `__call` metamethod can be only the `function`
-- and cannot be a `table` | `userdata` | `cdata`
-- with `__call` methamethod on its own
if object_metatable and object_metatable.__call then
return type(object_metatable.__call) == 'function'
end
end

return false
end

local function get_function_from_G(func_name)
local chunks = string.split(func_name, '.')
local sharding_func = _G

-- check is the each chunk an identifier
for _, chunk in pairs(chunks) do
if not utils.check_name_isident(chunk) or sharding_func == nil then
return nil
end
sharding_func = rawget(sharding_func, chunk)
end

return sharding_func
end

local function as_callable_object(sharding_func_def, space_name)
if type(sharding_func_def) == 'string' then
local sharding_func = get_function_from_G(sharding_func_def)
if sharding_func ~= nil and is_callable(sharding_func) == true then
return sharding_func
end
end

if type(sharding_func_def) == 'table' then
local sharding_func, err = loadstring('return ' .. sharding_func_def.body)
if sharding_func == nil then
return nil, ShardingFuncError:new(
"Body is incorrect in sharding_func for space (%s): %s", space_name, err)
end
return sharding_func()
end

return nil, ShardingFuncError:new(
"Wrong sharding function specified in _ddl_sharding_func space for (%s) space", space_name
)
end

function sharding_func_module.construct_as_callable_obj_cache(metadata_map, specified_space_name)
dev_checks('table', 'string')

local result_err

cache.sharding_func_map = {}
for space_name, metadata in pairs(metadata_map) do
if metadata.sharding_func_def ~= nil then
local sharding_func, err = as_callable_object(metadata.sharding_func_def,
space_name)
if err ~= nil then
if specified_space_name == space_name then
result_err = err
log.error(err)
else
log.warn(err)
end
end

cache.sharding_func_map[space_name] = sharding_func
end
end

return result_err
end

sharding_func_module.internal = {
as_callable_object = as_callable_object,
is_callable = is_callable,
}

return sharding_func_module
133 changes: 133 additions & 0 deletions crud/common/sharding/sharding_key.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
local errors = require('errors')
local log = require('log')

local dev_checks = require('crud.common.dev_checks')
local cache = require('crud.common.sharding.sharding_metadata_cache')
local utils = require('crud.common.utils')

local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false})
local WrongShardingConfigurationError = errors.new_class('WrongShardingConfigurationError', {capture_stack = false})

local sharding_key_module = {}

-- Build a structure similar to index, but it is not a real index object,
-- it contains only parts key with fieldno's.
local function as_index_object(space_name, space_format, sharding_key_def)
dev_checks('string', 'table', 'table')

local fieldnos = {}
local fieldno_map = utils.get_format_fieldno_map(space_format)
for _, field_name in ipairs(sharding_key_def) do
local fieldno = fieldno_map[field_name]
if fieldno == nil then
return nil, WrongShardingConfigurationError:new(
"No such field (%s) in a space format (%s)", field_name, space_name)
end
table.insert(fieldnos, {fieldno = fieldno})
end

return {parts = fieldnos}
end

-- Make sure sharding key definition is a part of primary key.
local function is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj)
dev_checks('string', 'table', 'table')

if cache.is_part_of_pk[space_name] ~= nil then
return cache.is_part_of_pk[space_name]
end

local is_part_of_pk = true
local pk_fieldno_map = utils.get_index_fieldno_map(primary_index_parts)
for _, part in ipairs(sharding_key_as_index_obj.parts) do
if pk_fieldno_map[part.fieldno] == nil then
is_part_of_pk = false
break
end
end
cache.is_part_of_pk[space_name] = is_part_of_pk

return is_part_of_pk
end

-- Build an array with sharding key values. Function extracts those values from
-- primary key that are part of sharding key (passed as index object).
local function extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj)
dev_checks('table', 'table', 'table')

-- TODO: extract_from_index() calculates primary_index_parts on each
-- request. It is better to cache it's value.
-- https://github.com/tarantool/crud/issues/243
local primary_index_fieldno_map = utils.get_index_fieldno_map(primary_index_parts)

local sharding_key = {}
for _, part in ipairs(sharding_key_as_index_obj.parts) do
-- part_number cannot be nil because earlier we checked that tuple
-- field names defined in sharding key definition are part of primary
-- key.
local part_number = primary_index_fieldno_map[part.fieldno]
assert(part_number ~= nil)
local field_value = primary_key[part_number]
table.insert(sharding_key, field_value)
end

return sharding_key
end

-- Extract sharding key from pk.
-- Returns a table with sharding key or pair of nil and error.
function sharding_key_module.extract_from_pk(space_name, sharding_key_as_index_obj, primary_index_parts, primary_key)
dev_checks('string', '?table', 'table', '?')

if sharding_key_as_index_obj == nil then
return primary_key
end

local res = is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj)
if res == false then
return nil, ShardingKeyError:new(
"Sharding key for space %q is missed in primary index, specify bucket_id",
space_name
)
end
if type(primary_key) ~= 'table' then
primary_key = {primary_key}
end

return extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj)
end

function sharding_key_module.construct_as_index_obj_cache(metadata_map, specified_space_name)
dev_checks('table', 'string')

local result_err

cache.sharding_key_as_index_obj_map = {}
for space_name, metadata in pairs(metadata_map) do
if metadata.sharding_key_def ~= nil then
local sharding_key_as_index_obj, err = as_index_object(space_name,
metadata.space_format,
metadata.sharding_key_def)
if err ~= nil then
if specified_space_name == space_name then
result_err = err
log.error(err)
else
log.warn(err)
end
end

cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj
end
end

return result_err
end

sharding_key_module.internal = {
as_index_object = as_index_object,
extract_from_index = extract_from_index,
is_part_of_pk = is_part_of_pk,
}

return sharding_key_module
Loading