From e820c9f091c1d1e2da1036e24c47e166d9c0d4c7 Mon Sep 17 00:00:00 2001 From: AnaNek Date: Wed, 22 Dec 2021 10:52:16 +0300 Subject: [PATCH 1/6] Replace sharding methods in new directory `sharding` Since this patch presents the implementation of support of DDL sharding function, the number of files associated with sharding will increase in the `common` folder. Therefore, a separate directory was created for files containing methods for working with sharding. Part of #237 --- crud.lua | 2 +- crud/common/{sharding.lua => sharding/init.lua} | 2 +- crud/common/{ => sharding}/sharding_key.lua | 2 +- crud/common/{ => sharding}/sharding_key_cache.lua | 0 crud/count.lua | 2 +- crud/delete.lua | 2 +- crud/get.lua | 2 +- crud/select/compat/select.lua | 2 +- crud/select/compat/select_old.lua | 2 +- crud/update.lua | 2 +- test/helper.lua | 2 +- test/unit/sharding_key_test.lua | 4 ++-- 12 files changed, 12 insertions(+), 12 deletions(-) rename crud/common/{sharding.lua => sharding/init.lua} (97%) rename crud/common/{ => sharding}/sharding_key.lua (99%) rename crud/common/{ => sharding}/sharding_key_cache.lua (100%) diff --git a/crud.lua b/crud.lua index c7e6fa6f..9d070ef0 100644 --- a/crud.lua +++ b/crud.lua @@ -13,7 +13,7 @@ local truncate = require('crud.truncate') local len = require('crud.len') local count = require('crud.count') local borders = require('crud.borders') -local sharding_key = require('crud.common.sharding_key') +local sharding_key = require('crud.common.sharding.sharding_key') local utils = require('crud.common.utils') local crud = {} diff --git a/crud/common/sharding.lua b/crud/common/sharding/init.lua similarity index 97% rename from crud/common/sharding.lua rename to crud/common/sharding/init.lua index 66586fb3..d409f150 100644 --- a/crud/common/sharding.lua +++ b/crud/common/sharding/init.lua @@ -5,7 +5,7 @@ local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false}) local GetReplicasetsError = errors.new_class('GetReplicasetsError', {capture_stack = false}) local utils = require('crud.common.utils') -local sharding_key_module = require('crud.common.sharding_key') +local sharding_key_module = require('crud.common.sharding.sharding_key') local sharding = {} diff --git a/crud/common/sharding_key.lua b/crud/common/sharding/sharding_key.lua similarity index 99% rename from crud/common/sharding_key.lua rename to crud/common/sharding/sharding_key.lua index 8a1cef62..4385e0d8 100644 --- a/crud/common/sharding_key.lua +++ b/crud/common/sharding/sharding_key.lua @@ -4,7 +4,7 @@ local errors = require('errors') local call = require('crud.common.call') local const = require('crud.common.const') local dev_checks = require('crud.common.dev_checks') -local cache = require('crud.common.sharding_key_cache') +local cache = require('crud.common.sharding.sharding_key_cache') local utils = require('crud.common.utils') local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false}) diff --git a/crud/common/sharding_key_cache.lua b/crud/common/sharding/sharding_key_cache.lua similarity index 100% rename from crud/common/sharding_key_cache.lua rename to crud/common/sharding/sharding_key_cache.lua diff --git a/crud/count.lua b/crud/count.lua index c42d8069..e62ff5fb 100644 --- a/crud/count.lua +++ b/crud/count.lua @@ -10,7 +10,7 @@ local filters = require('crud.compare.filters') local count_plan = require('crud.compare.plan') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') -local sharding_key_module = require('crud.common.sharding_key') +local sharding_key_module = require('crud.common.sharding.sharding_key') local compare_conditions = require('crud.compare.conditions') diff --git a/crud/delete.lua b/crud/delete.lua index deca7318..623682fa 100644 --- a/crud/delete.lua +++ b/crud/delete.lua @@ -5,7 +5,7 @@ local vshard = require('vshard') local call = require('crud.common.call') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') -local sharding_key_module = require('crud.common.sharding_key') +local sharding_key_module = require('crud.common.sharding.sharding_key') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') diff --git a/crud/get.lua b/crud/get.lua index 7f474e85..9d0f7b37 100644 --- a/crud/get.lua +++ b/crud/get.lua @@ -5,7 +5,7 @@ local vshard = require('vshard') local call = require('crud.common.call') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') -local sharding_key_module = require('crud.common.sharding_key') +local sharding_key_module = require('crud.common.sharding.sharding_key') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index 2f18a889..1eab0d84 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -7,7 +7,7 @@ local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local common = require('crud.select.compat.common') local schema = require('crud.common.schema') -local sharding_key_module = require('crud.common.sharding_key') +local sharding_key_module = require('crud.common.sharding.sharding_key') local compare_conditions = require('crud.compare.conditions') local select_plan = require('crud.compare.plan') diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index ece7eb3f..5e952f6b 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -8,7 +8,7 @@ local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') -local sharding_key_module = require('crud.common.sharding_key') +local sharding_key_module = require('crud.common.sharding.sharding_key') local compare_conditions = require('crud.compare.conditions') local select_plan = require('crud.compare.plan') diff --git a/crud/update.lua b/crud/update.lua index 8e7b1aa0..0708233c 100644 --- a/crud/update.lua +++ b/crud/update.lua @@ -5,7 +5,7 @@ local vshard = require('vshard') local call = require('crud.common.call') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') -local sharding_key_module = require('crud.common.sharding_key') +local sharding_key_module = require('crud.common.sharding.sharding_key') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') diff --git a/test/helper.lua b/test/helper.lua index 913aa56c..9a4779cf 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -325,7 +325,7 @@ end function helpers.update_cache(cluster, space_name) return cluster.main_server.net_box:eval([[ - local sharding_key = require('crud.common.sharding_key') + local sharding_key = require('crud.common.sharding.sharding_key') local space_name = ... return sharding_key.update_cache(space_name) diff --git a/test/unit/sharding_key_test.lua b/test/unit/sharding_key_test.lua index 110eab1c..fecdac83 100644 --- a/test/unit/sharding_key_test.lua +++ b/test/unit/sharding_key_test.lua @@ -1,6 +1,6 @@ local t = require('luatest') -local sharding_key_module = require('crud.common.sharding_key') -local cache = require('crud.common.sharding_key_cache') +local sharding_key_module = require('crud.common.sharding.sharding_key') +local cache = require('crud.common.sharding.sharding_key_cache') local utils = require('crud.common.utils') local helpers = require('test.helper') From cc69113886789cdd4e5e3c6e1bccf9d79ad907c3 Mon Sep 17 00:00:00 2001 From: AnaNek Date: Tue, 11 Jan 2022 20:28:52 +0300 Subject: [PATCH 2/6] Copy the content of `sharding_key.lua` file to `sharding_metadata.lua` file PR #181 introduced support of DDL sharding keys. Implementation of sharding keys support contains methods that are common to support sharding keys and sharding functions. That's why a separate file `sharding_metadata.lua` was created to contain common methods. This commit relocates functions for fetching sharding key from `sharding_key.lua` file to `sharding_metadata.lua` file to simplify a reviewer's life and display the history of changes relative to PR #181 in the following commits. Part of #237 --- crud.lua | 4 +- crud/common/sharding/init.lua | 4 +- crud/common/sharding/sharding_key.lua | 139 +---------------- crud/common/sharding/sharding_metadata.lua | 144 ++++++++++++++++++ ..._cache.lua => sharding_metadata_cache.lua} | 0 crud/count.lua | 4 +- crud/select/compat/select.lua | 4 +- crud/select/compat/select_old.lua | 4 +- test/helper.lua | 4 +- ...ey_test.lua => sharding_metadata_test.lua} | 9 +- 10 files changed, 165 insertions(+), 151 deletions(-) create mode 100644 crud/common/sharding/sharding_metadata.lua rename crud/common/sharding/{sharding_key_cache.lua => sharding_metadata_cache.lua} (100%) rename test/unit/{sharding_key_test.lua => sharding_metadata_test.lua} (95%) diff --git a/crud.lua b/crud.lua index 9d070ef0..b045e7fe 100644 --- a/crud.lua +++ b/crud.lua @@ -13,7 +13,7 @@ local truncate = require('crud.truncate') local len = require('crud.len') local count = require('crud.count') local borders = require('crud.borders') -local sharding_key = require('crud.common.sharding.sharding_key') +local sharding_metadata = require('crud.common.sharding.sharding_metadata') local utils = require('crud.common.utils') local crud = {} @@ -120,7 +120,7 @@ function crud.init_storage() len.init() count.init() borders.init() - sharding_key.init() + sharding_metadata.init() end function crud.init_router() diff --git a/crud/common/sharding/init.lua b/crud/common/sharding/init.lua index d409f150..51687d6c 100644 --- a/crud/common/sharding/init.lua +++ b/crud/common/sharding/init.lua @@ -5,7 +5,7 @@ local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false}) local GetReplicasetsError = errors.new_class('GetReplicasetsError', {capture_stack = false}) local utils = require('crud.common.utils') -local sharding_key_module = require('crud.common.sharding.sharding_key') +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local sharding = {} @@ -34,7 +34,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id) end local sharding_index_parts = space.index[0].parts - local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space.name) + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space.name) if err ~= nil then return nil, err end diff --git a/crud/common/sharding/sharding_key.lua b/crud/common/sharding/sharding_key.lua index 4385e0d8..9a9134ca 100644 --- a/crud/common/sharding/sharding_key.lua +++ b/crud/common/sharding/sharding_key.lua @@ -1,45 +1,14 @@ -local fiber = require('fiber') local errors = require('errors') -local call = require('crud.common.call') -local const = require('crud.common.const') local dev_checks = require('crud.common.dev_checks') -local cache = require('crud.common.sharding.sharding_key_cache') +local cache = require('crud.common.sharding.sharding_metadata_cache') local utils = require('crud.common.utils') local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false}) -local FetchShardingKeyError = errors.new_class('FetchShardingKeyError', {capture_stack = false}) local WrongShardingConfigurationError = errors.new_class('WrongShardingConfigurationError', {capture_stack = false}) -local FETCH_FUNC_NAME = '_crud.fetch_on_storage' - local sharding_key_module = {} --- Function decorator that is used to prevent _fetch_on_router() from being --- called concurrently by different fibers. -local function locked(f) - dev_checks('function') - - return function(timeout, ...) - local timeout_deadline = fiber.clock() + timeout - local ok = cache.fetch_lock:put(true, timeout) - -- channel:put() returns false in two cases: when timeout is exceeded - -- or channel has been closed. However error message describes only - -- first reason, I'm not sure we need to disclose to users such details - -- like problems with synchronization objects. - if not ok then - return FetchShardingKeyError:new( - "Timeout for fetching sharding key is exceeded") - end - local timeout = timeout_deadline - fiber.clock() - local status, err = pcall(f, timeout, ...) - cache.fetch_lock:get() - if not status or err ~= nil then - return err - end - end -end - -- Build a structure similar to index, but it is not a real index object, -- it contains only parts key with fieldno's. local function as_index_object(space_name, space_format, sharding_key_def) @@ -59,104 +28,6 @@ local function as_index_object(space_name, space_format, sharding_key_def) return {parts = fieldnos} end --- Return a map with metadata or nil when space box.space._ddl_sharding_key is --- not available on storage. -function sharding_key_module.fetch_on_storage() - local sharding_key_space = box.space._ddl_sharding_key - if sharding_key_space == nil then - return nil - end - - local SPACE_NAME_FIELDNO = 1 - local SPACE_SHARDING_KEY_FIELDNO = 2 - local metadata_map = {} - for _, tuple in sharding_key_space:pairs() do - local space_name = tuple[SPACE_NAME_FIELDNO] - local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO] - local space_format = box.space[space_name]:format() - metadata_map[space_name] = { - sharding_key_def = sharding_key_def, - space_format = space_format, - } - end - - return metadata_map -end - --- Under high load we may get a case when more than one fiber will fetch --- metadata from storages. It is not good from performance point of view. --- locked() wraps a _fetch_on_router() to limit a number of fibers that fetches --- a sharding metadata by a single one, other fibers will wait while --- cache.fetch_lock become unlocked during timeout passed to --- _fetch_on_router(). -local _fetch_on_router = locked(function(timeout) - dev_checks('number') - - if cache.sharding_key_as_index_obj_map ~= nil then - return - end - - local metadata_map, err = call.any(FETCH_FUNC_NAME, {}, { - timeout = timeout - }) - if err ~= nil then - return err - end - if metadata_map == nil then - cache.sharding_key_as_index_obj_map = {} - return - end - - cache.sharding_key_as_index_obj_map = {} - for space_name, metadata in pairs(metadata_map) do - local sharding_key_as_index_obj, err = as_index_object(space_name, - metadata.space_format, - metadata.sharding_key_def) - if err ~= nil then - return err - end - cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj - end -end) - --- Get sharding index for a certain space. --- --- Return: --- - sharding key as index object, when sharding key definition found on --- storage. --- - nil, when sharding key definition was not found on storage. Pay attention --- that nil without error is a successfull return value. --- - nil and error, when something goes wrong on fetching attempt. --- -function sharding_key_module.fetch_on_router(space_name, timeout) - dev_checks('string', '?number') - - if cache.sharding_key_as_index_obj_map ~= nil then - return cache.sharding_key_as_index_obj_map[space_name] - end - - local timeout = timeout or const.FETCH_SHARDING_KEY_TIMEOUT - local err = _fetch_on_router(timeout) - if err ~= nil then - if cache.sharding_key_as_index_obj_map ~= nil then - return cache.sharding_key_as_index_obj_map[space_name] - end - return nil, err - end - - if cache.sharding_key_as_index_obj_map ~= nil then - return cache.sharding_key_as_index_obj_map[space_name] - end - - return nil, FetchShardingKeyError:new( - "Fetching sharding key for space '%s' is failed", space_name) -end - -function sharding_key_module.update_cache(space_name) - cache.drop_caches() - return sharding_key_module.fetch_on_router(space_name) -end - -- Make sure sharding key definition is a part of primary key. local function is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj) dev_checks('string', 'table', 'table') @@ -207,7 +78,9 @@ end function sharding_key_module.extract_from_pk(space_name, primary_index_parts, primary_key, timeout) dev_checks('string', 'table', '?', '?number') - local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name, timeout) + local sharding_key_as_index_obj, err = require( + 'crud.common.sharding.sharding_metadata' + ).fetch_on_router(space_name, timeout) if err ~= nil then return nil, err end @@ -229,10 +102,6 @@ function sharding_key_module.extract_from_pk(space_name, primary_index_parts, pr return extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj) end -function sharding_key_module.init() - _G._crud.fetch_on_storage = sharding_key_module.fetch_on_storage -end - sharding_key_module.internal = { as_index_object = as_index_object, extract_from_index = extract_from_index, diff --git a/crud/common/sharding/sharding_metadata.lua b/crud/common/sharding/sharding_metadata.lua new file mode 100644 index 00000000..f19b06e9 --- /dev/null +++ b/crud/common/sharding/sharding_metadata.lua @@ -0,0 +1,144 @@ +local fiber = require('fiber') +local errors = require('errors') + +local call = require('crud.common.call') +local const = require('crud.common.const') +local dev_checks = require('crud.common.dev_checks') +local cache = require('crud.common.sharding.sharding_metadata_cache') + +local FetchShardingKeyError = errors.new_class('FetchShardingKeyError', {capture_stack = false}) + +local FETCH_FUNC_NAME = '_crud.fetch_on_storage' + +local sharding_key_module = {} + +-- Function decorator that is used to prevent _fetch_on_router() from being +-- called concurrently by different fibers. +local function locked(f) + dev_checks('function') + + return function(timeout, ...) + local timeout_deadline = fiber.clock() + timeout + local ok = cache.fetch_lock:put(true, timeout) + -- channel:put() returns false in two cases: when timeout is exceeded + -- or channel has been closed. However error message describes only + -- first reason, I'm not sure we need to disclose to users such details + -- like problems with synchronization objects. + if not ok then + return FetchShardingKeyError:new( + "Timeout for fetching sharding key is exceeded") + end + local timeout = timeout_deadline - fiber.clock() + local status, err = pcall(f, timeout, ...) + cache.fetch_lock:get() + if not status or err ~= nil then + return err + end + end +end + +-- Return a map with metadata or nil when space box.space._ddl_sharding_key is +-- not available on storage. +function sharding_key_module.fetch_on_storage() + local sharding_key_space = box.space._ddl_sharding_key + if sharding_key_space == nil then + return nil + end + + local SPACE_NAME_FIELDNO = 1 + local SPACE_SHARDING_KEY_FIELDNO = 2 + local metadata_map = {} + for _, tuple in sharding_key_space:pairs() do + local space_name = tuple[SPACE_NAME_FIELDNO] + local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO] + local space_format = box.space[space_name]:format() + metadata_map[space_name] = { + sharding_key_def = sharding_key_def, + space_format = space_format, + } + end + + return metadata_map +end + +-- Under high load we may get a case when more than one fiber will fetch +-- metadata from storages. It is not good from performance point of view. +-- locked() wraps a _fetch_on_router() to limit a number of fibers that fetches +-- a sharding metadata by a single one, other fibers will wait while +-- cache.fetch_lock become unlocked during timeout passed to +-- _fetch_on_router(). +local _fetch_on_router = locked(function(timeout) + dev_checks('number') + + if cache.sharding_key_as_index_obj_map ~= nil then + return + end + + local metadata_map, err = call.any(FETCH_FUNC_NAME, {}, { + timeout = timeout + }) + if err ~= nil then + return err + end + if metadata_map == nil then + cache.sharding_key_as_index_obj_map = {} + return + end + + cache.sharding_key_as_index_obj_map = {} + for space_name, metadata in pairs(metadata_map) do + local sharding_key_as_index_obj, err = require( + 'crud.common.sharding.sharding_key' + ).internal.as_index_object(space_name, + metadata.space_format, + metadata.sharding_key_def) + if err ~= nil then + return err + end + cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj + end +end) + +-- Get sharding index for a certain space. +-- +-- Return: +-- - sharding key as index object, when sharding key definition found on +-- storage. +-- - nil, when sharding key definition was not found on storage. Pay attention +-- that nil without error is a successfull return value. +-- - nil and error, when something goes wrong on fetching attempt. +-- +function sharding_key_module.fetch_on_router(space_name, timeout) + dev_checks('string', '?number') + + if cache.sharding_key_as_index_obj_map ~= nil then + return cache.sharding_key_as_index_obj_map[space_name] + end + + local timeout = timeout or const.FETCH_SHARDING_KEY_TIMEOUT + local err = _fetch_on_router(timeout) + if err ~= nil then + if cache.sharding_key_as_index_obj_map ~= nil then + return cache.sharding_key_as_index_obj_map[space_name] + end + return nil, err + end + + if cache.sharding_key_as_index_obj_map ~= nil then + return cache.sharding_key_as_index_obj_map[space_name] + end + + return nil, FetchShardingKeyError:new( + "Fetching sharding key for space '%s' is failed", space_name) +end + +function sharding_key_module.update_cache(space_name) + cache.drop_caches() + return sharding_key_module.fetch_on_router(space_name) +end + +function sharding_key_module.init() + _G._crud.fetch_on_storage = sharding_key_module.fetch_on_storage +end + +return sharding_key_module diff --git a/crud/common/sharding/sharding_key_cache.lua b/crud/common/sharding/sharding_metadata_cache.lua similarity index 100% rename from crud/common/sharding/sharding_key_cache.lua rename to crud/common/sharding/sharding_metadata_cache.lua diff --git a/crud/count.lua b/crud/count.lua index e62ff5fb..0a3cad2b 100644 --- a/crud/count.lua +++ b/crud/count.lua @@ -10,7 +10,7 @@ local filters = require('crud.compare.filters') local count_plan = require('crud.compare.plan') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') -local sharding_key_module = require('crud.common.sharding.sharding_key') +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local compare_conditions = require('crud.compare.conditions') @@ -114,7 +114,7 @@ local function call_count_on_router(space_name, user_conditions, opts) return nil, CountError:new("Space %q doesn't exist", space_name), true end - local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name) + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name) if err ~= nil then return nil, err end diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index 1eab0d84..3588aa6a 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -7,7 +7,7 @@ local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local common = require('crud.select.compat.common') local schema = require('crud.common.schema') -local sharding_key_module = require('crud.common.sharding.sharding_key') +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local compare_conditions = require('crud.compare.conditions') local select_plan = require('crud.compare.plan') @@ -51,7 +51,7 @@ local function build_select_iterator(space_name, user_conditions, opts) return nil, SelectError:new("Space %q doesn't exist", space_name), true end local space_format = space:format() - local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name) + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name) if err ~= nil then return nil, err end diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index 5e952f6b..1cb2ea40 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -8,7 +8,7 @@ local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') -local sharding_key_module = require('crud.common.sharding.sharding_key') +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local compare_conditions = require('crud.compare.conditions') local select_plan = require('crud.compare.plan') @@ -103,7 +103,7 @@ local function build_select_iterator(space_name, user_conditions, opts) return nil, SelectError:new("Space %q doesn't exist", space_name), true end local space_format = space:format() - local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name) + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name) if err ~= nil then return nil, err end diff --git a/test/helper.lua b/test/helper.lua index 9a4779cf..6661a37a 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -325,10 +325,10 @@ end function helpers.update_cache(cluster, space_name) return cluster.main_server.net_box:eval([[ - local sharding_key = require('crud.common.sharding.sharding_key') + local sharding_metadata = require('crud.common.sharding.sharding_metadata') local space_name = ... - return sharding_key.update_cache(space_name) + return sharding_metadata.update_cache(space_name) ]], {space_name}) end diff --git a/test/unit/sharding_key_test.lua b/test/unit/sharding_metadata_test.lua similarity index 95% rename from test/unit/sharding_key_test.lua rename to test/unit/sharding_metadata_test.lua index fecdac83..1fa4ed80 100644 --- a/test/unit/sharding_key_test.lua +++ b/test/unit/sharding_metadata_test.lua @@ -1,11 +1,12 @@ local t = require('luatest') +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local sharding_key_module = require('crud.common.sharding.sharding_key') -local cache = require('crud.common.sharding.sharding_key_cache') +local cache = require('crud.common.sharding.sharding_metadata_cache') local utils = require('crud.common.utils') local helpers = require('test.helper') -local g = t.group('sharding_key') +local g = t.group('sharding_metadata') g.before_each(function() local sharding_key_format = { @@ -87,7 +88,7 @@ g.test_fetch_on_storage_positive = function() local sharding_key_def = {'name', 'age'} box.space._ddl_sharding_key:insert({space_name, sharding_key_def}) - local metadata_map = sharding_key_module.fetch_on_storage() + local metadata_map = sharding_metadata_module.fetch_on_storage() t.assert_equals(metadata_map, { [space_name] = { @@ -101,7 +102,7 @@ g.test_fetch_on_storage_negative = function() -- Test checks return value when _ddl_sharding_key is absent. box.space._ddl_sharding_key:drop() - local metadata_map = sharding_key_module.fetch_on_storage() + local metadata_map = sharding_metadata_module.fetch_on_storage() t.assert_equals(metadata_map, nil) end From 02e6fb1279053a67a6ddbf51c6d0e120706625a0 Mon Sep 17 00:00:00 2001 From: AnaNek Date: Wed, 22 Dec 2021 12:59:18 +0300 Subject: [PATCH 3/6] Separate common sharding metadata methods from sharding key methods It would be more efficient to get sharding keys and sharding functions from storage in one `fetch_on_storage` function call. So, this function is common for sharding keys and sharding functions support. As well as functions for fetching on router. These methods are introduced in `sharding_metadata` module. Methods for working with sharding key structure are introduced in sharding key module. Part of #237 --- crud/common/const.lua | 2 +- crud/common/sharding/init.lua | 2 +- crud/common/sharding/sharding_key.lua | 30 +++++--- crud/common/sharding/sharding_metadata.lua | 71 +++++++++---------- .../sharding/sharding_metadata_cache.lua | 23 +++--- crud/common/sharding_key.lua | 13 ++++ crud/count.lua | 2 +- crud/delete.lua | 12 +++- crud/get.lua | 12 +++- crud/select/compat/select.lua | 2 +- crud/select/compat/select_old.lua | 2 +- crud/update.lua | 12 +++- test/helper.lua | 6 +- test/integration/ddl_sharding_key_test.lua | 9 ++- test/unit/sharding_metadata_test.lua | 20 +++++- 15 files changed, 142 insertions(+), 76 deletions(-) create mode 100644 crud/common/sharding_key.lua diff --git a/crud/common/const.lua b/crud/common/const.lua index e546dddc..fc261269 100644 --- a/crud/common/const.lua +++ b/crud/common/const.lua @@ -2,6 +2,6 @@ local const = {} const.RELOAD_RETRIES_NUM = 1 const.RELOAD_SCHEMA_TIMEOUT = 3 -- 3 seconds -const.FETCH_SHARDING_KEY_TIMEOUT = 3 -- 3 seconds +const.FETCH_SHARDING_METADATA_TIMEOUT = 3 -- 3 seconds return const diff --git a/crud/common/sharding/init.lua b/crud/common/sharding/init.lua index 51687d6c..73435426 100644 --- a/crud/common/sharding/init.lua +++ b/crud/common/sharding/init.lua @@ -34,7 +34,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id) end local sharding_index_parts = space.index[0].parts - local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space.name) + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space.name) if err ~= nil then return nil, err end diff --git a/crud/common/sharding/sharding_key.lua b/crud/common/sharding/sharding_key.lua index 9a9134ca..dc5af1a8 100644 --- a/crud/common/sharding/sharding_key.lua +++ b/crud/common/sharding/sharding_key.lua @@ -75,15 +75,9 @@ end -- Extract sharding key from pk. -- Returns a table with sharding key or pair of nil and error. -function sharding_key_module.extract_from_pk(space_name, primary_index_parts, primary_key, timeout) - dev_checks('string', 'table', '?', '?number') - - local sharding_key_as_index_obj, err = require( - 'crud.common.sharding.sharding_metadata' - ).fetch_on_router(space_name, timeout) - if err ~= nil then - return nil, err - end +function sharding_key_module.extract_from_pk(space_name, sharding_key_as_index_obj, primary_index_parts, primary_key) + dev_checks('string', '?table', 'table', '?') + if sharding_key_as_index_obj == nil then return primary_key end @@ -102,6 +96,24 @@ function sharding_key_module.extract_from_pk(space_name, primary_index_parts, pr return extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj) end +function sharding_key_module.construct_as_index_obj_cache(metadata_map) + dev_checks('table') + + cache.sharding_key_as_index_obj_map = {} + for space_name, metadata in pairs(metadata_map) do + if metadata.sharding_key_def ~= nil then + local sharding_key_as_index_obj, err = as_index_object(space_name, + metadata.space_format, + metadata.sharding_key_def) + if err ~= nil then + return err + end + + cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj + end + end +end + sharding_key_module.internal = { as_index_object = as_index_object, extract_from_index = extract_from_index, diff --git a/crud/common/sharding/sharding_metadata.lua b/crud/common/sharding/sharding_metadata.lua index f19b06e9..2a740848 100644 --- a/crud/common/sharding/sharding_metadata.lua +++ b/crud/common/sharding/sharding_metadata.lua @@ -5,12 +5,13 @@ local call = require('crud.common.call') local const = require('crud.common.const') local dev_checks = require('crud.common.dev_checks') local cache = require('crud.common.sharding.sharding_metadata_cache') +local sharding_key = require('crud.common.sharding.sharding_key') -local FetchShardingKeyError = errors.new_class('FetchShardingKeyError', {capture_stack = false}) +local FetchShardingMetadataError = errors.new_class('FetchShardingMetadataError', {capture_stack = false}) local FETCH_FUNC_NAME = '_crud.fetch_on_storage' -local sharding_key_module = {} +local sharding_metadata_module = {} -- Function decorator that is used to prevent _fetch_on_router() from being -- called concurrently by different fibers. @@ -25,8 +26,8 @@ local function locked(f) -- first reason, I'm not sure we need to disclose to users such details -- like problems with synchronization objects. if not ok then - return FetchShardingKeyError:new( - "Timeout for fetching sharding key is exceeded") + return FetchShardingMetadataError:new( + "Timeout for fetching sharding metadata is exceeded") end local timeout = timeout_deadline - fiber.clock() local status, err = pcall(f, timeout, ...) @@ -39,7 +40,7 @@ end -- Return a map with metadata or nil when space box.space._ddl_sharding_key is -- not available on storage. -function sharding_key_module.fetch_on_storage() +function sharding_metadata_module.fetch_on_storage() local sharding_key_space = box.space._ddl_sharding_key if sharding_key_space == nil then return nil @@ -67,10 +68,10 @@ end -- a sharding metadata by a single one, other fibers will wait while -- cache.fetch_lock become unlocked during timeout passed to -- _fetch_on_router(). -local _fetch_on_router = locked(function(timeout) - dev_checks('number') +local _fetch_on_router = locked(function(timeout, metadata_map_name) + dev_checks('number', 'string') - if cache.sharding_key_as_index_obj_map ~= nil then + if cache[metadata_map_name] ~= nil then return end @@ -81,21 +82,13 @@ local _fetch_on_router = locked(function(timeout) return err end if metadata_map == nil then - cache.sharding_key_as_index_obj_map = {} + cache[cache.SHARDING_KEY_MAP_NAME] = {} return end - cache.sharding_key_as_index_obj_map = {} - for space_name, metadata in pairs(metadata_map) do - local sharding_key_as_index_obj, err = require( - 'crud.common.sharding.sharding_key' - ).internal.as_index_object(space_name, - metadata.space_format, - metadata.sharding_key_def) - if err ~= nil then - return err - end - cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj + local err = sharding_key.construct_as_index_obj_cache(metadata_map) + if err ~= nil then + return err end end) @@ -108,37 +101,41 @@ end) -- that nil without error is a successfull return value. -- - nil and error, when something goes wrong on fetching attempt. -- -function sharding_key_module.fetch_on_router(space_name, timeout) - dev_checks('string', '?number') - - if cache.sharding_key_as_index_obj_map ~= nil then - return cache.sharding_key_as_index_obj_map[space_name] +local function fetch_on_router(space_name, metadata_map_name, timeout) + if cache[metadata_map_name] ~= nil then + return cache[metadata_map_name][space_name] end - local timeout = timeout or const.FETCH_SHARDING_KEY_TIMEOUT - local err = _fetch_on_router(timeout) + local timeout = timeout or const.FETCH_SHARDING_METADATA_TIMEOUT + local err = _fetch_on_router(timeout, metadata_map_name) if err ~= nil then - if cache.sharding_key_as_index_obj_map ~= nil then - return cache.sharding_key_as_index_obj_map[space_name] + if cache[metadata_map_name] ~= nil then + return cache[metadata_map_name][space_name] end return nil, err end - if cache.sharding_key_as_index_obj_map ~= nil then - return cache.sharding_key_as_index_obj_map[space_name] + if cache[metadata_map_name] ~= nil then + return cache[metadata_map_name][space_name] end - return nil, FetchShardingKeyError:new( + return nil, FetchShardingMetadataError:new( "Fetching sharding key for space '%s' is failed", space_name) end -function sharding_key_module.update_cache(space_name) +function sharding_metadata_module.fetch_sharding_key_on_router(space_name, timeout) + dev_checks('string', '?number') + + return fetch_on_router(space_name, cache.SHARDING_KEY_MAP_NAME, timeout) +end + +function sharding_metadata_module.update_sharding_key_cache(space_name) cache.drop_caches() - return sharding_key_module.fetch_on_router(space_name) + return sharding_metadata_module.fetch_sharding_key_on_router(space_name) end -function sharding_key_module.init() - _G._crud.fetch_on_storage = sharding_key_module.fetch_on_storage +function sharding_metadata_module.init() + _G._crud.fetch_on_storage = sharding_metadata_module.fetch_on_storage end -return sharding_key_module +return sharding_metadata_module diff --git a/crud/common/sharding/sharding_metadata_cache.lua b/crud/common/sharding/sharding_metadata_cache.lua index a1ab3965..77c77594 100644 --- a/crud/common/sharding/sharding_metadata_cache.lua +++ b/crud/common/sharding/sharding_metadata_cache.lua @@ -1,18 +1,19 @@ local fiber = require('fiber') -local sharding_key_cache = {} +local sharding_metadata_cache = {} -sharding_key_cache.sharding_key_as_index_obj_map = nil -sharding_key_cache.fetch_lock = fiber.channel(1) -sharding_key_cache.is_part_of_pk = {} +sharding_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map" +sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil +sharding_metadata_cache.fetch_lock = fiber.channel(1) +sharding_metadata_cache.is_part_of_pk = {} -function sharding_key_cache.drop_caches() - sharding_key_cache.sharding_key_as_index_obj_map = nil - if sharding_key_cache.fetch_lock ~= nil then - sharding_key_cache.fetch_lock:close() +function sharding_metadata_cache.drop_caches() + sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil + if sharding_metadata_cache.fetch_lock ~= nil then + sharding_metadata_cache.fetch_lock:close() end - sharding_key_cache.fetch_lock = fiber.channel(1) - sharding_key_cache.is_part_of_pk = {} + sharding_metadata_cache.fetch_lock = fiber.channel(1) + sharding_metadata_cache.is_part_of_pk = {} end -return sharding_key_cache +return sharding_metadata_cache diff --git a/crud/common/sharding_key.lua b/crud/common/sharding_key.lua new file mode 100644 index 00000000..07c76f5d --- /dev/null +++ b/crud/common/sharding_key.lua @@ -0,0 +1,13 @@ +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') + +local sharding_key_cache = {} + +-- This method is exported here because +-- we already have customers using old API +-- for updating sharding key cache in their +-- projects like `require('crud.common.sharding_key').update_cache()` +function sharding_key_cache.update_cache(space_name) + return sharding_metadata_module.update_sharding_key_cache(space_name) +end + +return sharding_key_cache diff --git a/crud/count.lua b/crud/count.lua index 0a3cad2b..4eef84f2 100644 --- a/crud/count.lua +++ b/crud/count.lua @@ -114,7 +114,7 @@ local function call_count_on_router(space_name, user_conditions, opts) return nil, CountError:new("Space %q doesn't exist", space_name), true end - local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name) + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) if err ~= nil then return nil, err end diff --git a/crud/delete.lua b/crud/delete.lua index 623682fa..5fad9ea1 100644 --- a/crud/delete.lua +++ b/crud/delete.lua @@ -6,6 +6,7 @@ local call = require('crud.common.call') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local sharding_key_module = require('crud.common.sharding.sharding_key') +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') @@ -58,9 +59,16 @@ local function call_delete_on_router(space_name, key, opts) local sharding_key = key if opts.bucket_id == nil then - local err local primary_index_parts = space.index[0].parts - sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout) + + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + if err ~= nil then + return nil, err + end + + sharding_key, err = sharding_key_module.extract_from_pk(space_name, + sharding_key_as_index_obj, + primary_index_parts, key) if err ~= nil then return nil, err end diff --git a/crud/get.lua b/crud/get.lua index 9d0f7b37..e2853882 100644 --- a/crud/get.lua +++ b/crud/get.lua @@ -6,6 +6,7 @@ local call = require('crud.common.call') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local sharding_key_module = require('crud.common.sharding.sharding_key') +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') @@ -61,9 +62,16 @@ local function call_get_on_router(space_name, key, opts) local sharding_key = key if opts.bucket_id == nil then - local err local primary_index_parts = space.index[0].parts - sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout) + + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + if err ~= nil then + return nil, err + end + + sharding_key, err = sharding_key_module.extract_from_pk(space_name, + sharding_key_as_index_obj, + primary_index_parts, key) if err ~= nil then return nil, err end diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index 3588aa6a..7e8af44f 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -51,7 +51,7 @@ local function build_select_iterator(space_name, user_conditions, opts) return nil, SelectError:new("Space %q doesn't exist", space_name), true end local space_format = space:format() - local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name) + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) if err ~= nil then return nil, err end diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index 1cb2ea40..17696d55 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -103,7 +103,7 @@ local function build_select_iterator(space_name, user_conditions, opts) return nil, SelectError:new("Space %q doesn't exist", space_name), true end local space_format = space:format() - local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name) + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) if err ~= nil then return nil, err end diff --git a/crud/update.lua b/crud/update.lua index 0708233c..12487707 100644 --- a/crud/update.lua +++ b/crud/update.lua @@ -6,6 +6,7 @@ local call = require('crud.common.call') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local sharding_key_module = require('crud.common.sharding.sharding_key') +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') @@ -86,9 +87,16 @@ local function call_update_on_router(space_name, key, user_operations, opts) local sharding_key = key if opts.bucket_id == nil then - local err local primary_index_parts = space.index[0].parts - sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout) + + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + if err ~= nil then + return nil, err + end + + sharding_key, err = sharding_key_module.extract_from_pk(space_name, + sharding_key_as_index_obj, + primary_index_parts, key) if err ~= nil then return nil, err end diff --git a/test/helper.lua b/test/helper.lua index 6661a37a..f46bf701 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -323,12 +323,12 @@ function helpers.tarantool_version_at_least(wanted_major, wanted_minor, return true end -function helpers.update_cache(cluster, space_name) +function helpers.update_sharding_key_cache(cluster, space_name) return cluster.main_server.net_box:eval([[ - local sharding_metadata = require('crud.common.sharding.sharding_metadata') + local sharding_key = require('crud.common.sharding_key') local space_name = ... - return sharding_metadata.update_cache(space_name) + return sharding_key.update_cache(space_name) ]], {space_name}) end diff --git a/test/integration/ddl_sharding_key_test.lua b/test/integration/ddl_sharding_key_test.lua index feec4fb5..687414f9 100644 --- a/test/integration/ddl_sharding_key_test.lua +++ b/test/integration/ddl_sharding_key_test.lua @@ -675,19 +675,22 @@ end pgroup.test_update_cache = function(g) local space_name = 'customers_name_key' - local sharding_key_as_index_obj = helpers.update_cache(g.cluster, space_name) + local sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, space_name) + t.assert_equals(err, nil) t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 3}}}) helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server) server.net_box:call('set_sharding_key', {space_name, {'age'}}) end) - sharding_key_as_index_obj = helpers.update_cache(g.cluster, space_name) + sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, space_name) + t.assert_equals(err, nil) t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 4}}}) -- Recover sharding key. helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server) server.net_box:call('set_sharding_key', {space_name, {'name'}}) end) - sharding_key_as_index_obj = helpers.update_cache(g.cluster, space_name) + sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, space_name) + t.assert_equals(err, nil) t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 3}}}) end diff --git a/test/unit/sharding_metadata_test.lua b/test/unit/sharding_metadata_test.lua index 1fa4ed80..dad79e7f 100644 --- a/test/unit/sharding_metadata_test.lua +++ b/test/unit/sharding_metadata_test.lua @@ -83,7 +83,23 @@ g.test_get_format_fieldno_map = function() t.assert_equals(fieldno_map, {age = 3, id = 1, name = 2}) end -g.test_fetch_on_storage_positive = function() +g.test_fetch_sharding_metadata_on_storage_positive = function() + local space_name = 'fetch_on_storage' + local sharding_key_def = {'name', 'age'} + + box.space._ddl_sharding_key:insert({space_name, sharding_key_def}) + + local metadata_map = sharding_metadata_module.fetch_on_storage() + + t.assert_equals(metadata_map, { + [space_name] = { + sharding_key_def = sharding_key_def, + space_format = {} + }, + }) +end + +g.test_fetch_sharding_key_on_storage_positive = function() local space_name = 'fetch_on_storage' local sharding_key_def = {'name', 'age'} box.space._ddl_sharding_key:insert({space_name, sharding_key_def}) @@ -98,7 +114,7 @@ g.test_fetch_on_storage_positive = function() }) end -g.test_fetch_on_storage_negative = function() +g.test_fetch_sharding_metadata_on_storage_negative = function() -- Test checks return value when _ddl_sharding_key is absent. box.space._ddl_sharding_key:drop() From 2ed4841ecd0bd31a5b79788788add55eaf9d5bd7 Mon Sep 17 00:00:00 2001 From: AnaNek Date: Wed, 22 Dec 2021 14:29:29 +0300 Subject: [PATCH 4/6] Fix filling sharding key cache when sharding key data is incorrect Sharding key cache contains sharding key structures separated by space name. Before this fix if sharding key was incorrect for some space an error was returned and sharding keys that were after the incorrect one did not get into the cache. The solution is to create a variable for an error and write a message to it if an error occurred for the space we are working with, output a warning for other spaces. Part of #237 --- crud/common/sharding/sharding_key.lua | 16 +++++- crud/common/sharding/sharding_metadata.lua | 11 ++-- test/helper.lua | 8 +++ test/integration/ddl_sharding_key_test.lua | 67 ++++++++++++++++++++++ 4 files changed, 92 insertions(+), 10 deletions(-) diff --git a/crud/common/sharding/sharding_key.lua b/crud/common/sharding/sharding_key.lua index dc5af1a8..d38f561f 100644 --- a/crud/common/sharding/sharding_key.lua +++ b/crud/common/sharding/sharding_key.lua @@ -1,4 +1,5 @@ local errors = require('errors') +local log = require('log') local dev_checks = require('crud.common.dev_checks') local cache = require('crud.common.sharding.sharding_metadata_cache') @@ -96,8 +97,10 @@ function sharding_key_module.extract_from_pk(space_name, sharding_key_as_index_o return extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj) end -function sharding_key_module.construct_as_index_obj_cache(metadata_map) - dev_checks('table') +function sharding_key_module.construct_as_index_obj_cache(metadata_map, specified_space_name) + dev_checks('table', 'string') + + local result_err cache.sharding_key_as_index_obj_map = {} for space_name, metadata in pairs(metadata_map) do @@ -106,12 +109,19 @@ function sharding_key_module.construct_as_index_obj_cache(metadata_map) metadata.space_format, metadata.sharding_key_def) if err ~= nil then - return err + if specified_space_name == space_name then + result_err = err + log.error(err) + else + log.warn(err) + end end cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj end end + + return result_err end sharding_key_module.internal = { diff --git a/crud/common/sharding/sharding_metadata.lua b/crud/common/sharding/sharding_metadata.lua index 2a740848..86dc0a96 100644 --- a/crud/common/sharding/sharding_metadata.lua +++ b/crud/common/sharding/sharding_metadata.lua @@ -68,8 +68,8 @@ end -- a sharding metadata by a single one, other fibers will wait while -- cache.fetch_lock become unlocked during timeout passed to -- _fetch_on_router(). -local _fetch_on_router = locked(function(timeout, metadata_map_name) - dev_checks('number', 'string') +local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name) + dev_checks('number', 'string', 'string') if cache[metadata_map_name] ~= nil then return @@ -86,7 +86,7 @@ local _fetch_on_router = locked(function(timeout, metadata_map_name) return end - local err = sharding_key.construct_as_index_obj_cache(metadata_map) + local err = sharding_key.construct_as_index_obj_cache(metadata_map, space_name) if err ~= nil then return err end @@ -107,11 +107,8 @@ local function fetch_on_router(space_name, metadata_map_name, timeout) end local timeout = timeout or const.FETCH_SHARDING_METADATA_TIMEOUT - local err = _fetch_on_router(timeout, metadata_map_name) + local err = _fetch_on_router(timeout, space_name, metadata_map_name) if err ~= nil then - if cache[metadata_map_name] ~= nil then - return cache[metadata_map_name][space_name] - end return nil, err end diff --git a/test/helper.lua b/test/helper.lua index f46bf701..5d0795a5 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -332,4 +332,12 @@ function helpers.update_sharding_key_cache(cluster, space_name) ]], {space_name}) end +function helpers.get_sharding_key_cache(cluster) + return cluster.main_server.net_box:eval([[ + local sharding_metadata_cache = require('crud.common.sharding.sharding_metadata_cache') + + return sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] + ]]) +end + return helpers diff --git a/test/integration/ddl_sharding_key_test.lua b/test/integration/ddl_sharding_key_test.lua index 687414f9..14f4ba59 100644 --- a/test/integration/ddl_sharding_key_test.lua +++ b/test/integration/ddl_sharding_key_test.lua @@ -694,3 +694,70 @@ pgroup.test_update_cache = function(g) t.assert_equals(err, nil) t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 3}}}) end + +pgroup.test_update_cache_with_incorrect_key = function(g) + -- get data from cache for space with correct sharding key + local space_name = 'customers_name_key' + + local sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, space_name) + t.assert_equals(err, nil) + t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 3}}}) + + -- records for all spaces exist + sharding_key_as_index_obj = helpers.get_sharding_key_cache(g.cluster) + t.assert_equals(sharding_key_as_index_obj, { + customers_age_key = {parts = {{fieldno = 4}}}, + customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}}, + customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}}, + customers_name_key = {parts = {{fieldno = 3}}}, + customers_name_key_non_uniq_index = {parts = {{fieldno = 3}}}, + customers_name_key_uniq_index = {parts = {{fieldno = 3}}}, + customers_secondary_idx_name_key = {parts = {{fieldno = 3}}}, + }) + + -- no error just warning + local space_name = 'customers_name_key' + helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server) + server.net_box:call('set_sharding_key', {space_name, {'non_existent_field'}}) + end) + + -- we get no error because we sent request for correct space + local sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, 'customers_age_key') + t.assert_equals(err, nil) + t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 4}}}) + + -- cache['customers_name_key'] == nil (space with incorrect key) + -- other records for correct spaces exist in cache + sharding_key_as_index_obj = helpers.get_sharding_key_cache(g.cluster) + t.assert_equals(sharding_key_as_index_obj, { + customers_age_key = {parts = {{fieldno = 4}}}, + customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}}, + customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}}, + customers_name_key_non_uniq_index = {parts = {{fieldno = 3}}}, + customers_name_key_uniq_index = {parts = {{fieldno = 3}}}, + customers_secondary_idx_name_key = {parts = {{fieldno = 3}}}, + }) + + -- get data from cache for space with incorrect sharding key + local space_name = 'customers_name_key' + helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server) + server.net_box:call('set_sharding_key', {space_name, {'non_existent_field'}}) + end) + + -- we get an error because we sent request for incorrect space + local sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, space_name) + t.assert_equals(sharding_key_as_index_obj, nil) + t.assert_str_contains(err.err, "No such field (non_existent_field) in a space format (customers_name_key)") + + -- cache['customers_name_key'] == nil (space with incorrect key) + -- other records for correct spaces exist in cache + sharding_key_as_index_obj = helpers.get_sharding_key_cache(g.cluster) + t.assert_equals(sharding_key_as_index_obj, { + customers_age_key = {parts = {{fieldno = 4}}}, + customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}}, + customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}}, + customers_name_key_non_uniq_index = {parts = {{fieldno = 3}}}, + customers_name_key_uniq_index = {parts = {{fieldno = 3}}}, + customers_secondary_idx_name_key = {parts = {{fieldno = 3}}}, + }) +end From 6b7151ce51720a5179965d6c5738623d9222155a Mon Sep 17 00:00:00 2001 From: AnaNek Date: Wed, 22 Dec 2021 15:03:51 +0300 Subject: [PATCH 5/6] Add functions from `ddl` for sharding func module In DDL PR https://github.com/tarantool/ddl/pull/72 methods for checking and extracting sharding function were introduced. These methods are needed for supporting sharding functions in CRUD as well. In this commit these methods were coppied from DDL and covered by unit tests. Part of #237 --- crud/common/sharding/sharding_func.lua | 82 +++++++++++++ crud/common/utils.lua | 85 ++++++++++++++ test/unit/sharding_metadata_test.lua | 154 +++++++++++++++++++++++++ 3 files changed, 321 insertions(+) create mode 100644 crud/common/sharding/sharding_func.lua diff --git a/crud/common/sharding/sharding_func.lua b/crud/common/sharding/sharding_func.lua new file mode 100644 index 00000000..401190f5 --- /dev/null +++ b/crud/common/sharding/sharding_func.lua @@ -0,0 +1,82 @@ +local errors = require('errors') + +local utils = require('crud.common.utils') + +local ShardingFuncError = errors.new_class('ShardingFuncError', {capture_stack = false}) + +local sharding_func_module = {} + +local function is_callable(object) + if type(object) == 'function' then + return true + end + + -- all objects with type `cdata` are allowed + -- because there is no easy way to get + -- metatable.__call of object with type `cdata` + if type(object) == 'cdata' then + return true + end + + local object_metatable = getmetatable(object) + if (type(object) == 'table' or type(object) == 'userdata') then + -- if metatable type is not `table` -> metatable is protected -> + -- cannot detect metamethod `__call` exists + if object_metatable and type(object_metatable) ~= 'table' then + return true + end + + -- `__call` metamethod can be only the `function` + -- and cannot be a `table` | `userdata` | `cdata` + -- with `__call` methamethod on its own + if object_metatable and object_metatable.__call then + return type(object_metatable.__call) == 'function' + end + end + + return false +end + +local function get_function_from_G(func_name) + local chunks = string.split(func_name, '.') + local sharding_func = _G + + -- check is the each chunk an identifier + for _, chunk in pairs(chunks) do + if not utils.check_name_isident(chunk) or sharding_func == nil then + return nil + end + sharding_func = rawget(sharding_func, chunk) + end + + return sharding_func +end + +local function as_callable_object(sharding_func_def, space_name) + if type(sharding_func_def) == 'string' then + local sharding_func = get_function_from_G(sharding_func_def) + if sharding_func ~= nil and is_callable(sharding_func) == true then + return sharding_func + end + end + + if type(sharding_func_def) == 'table' then + local sharding_func, err = loadstring('return ' .. sharding_func_def.body) + if sharding_func == nil then + return nil, ShardingFuncError:new( + "Body is incorrect in sharding_func for space (%s): %s", space_name, err) + end + return sharding_func() + end + + return nil, ShardingFuncError:new( + "Wrong sharding function specified in _ddl_sharding_func space for (%s) space", space_name + ) +end + +sharding_func_module.internal = { + as_callable_object = as_callable_object, + is_callable = is_callable, +} + +return sharding_func_module diff --git a/crud/common/utils.lua b/crud/common/utils.lua index d7a62941..56c9e693 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -2,6 +2,7 @@ local errors = require('errors') local ffi = require('ffi') local vshard = require('vshard') local fun = require('fun') +local bit = require('bit') local schema = require('crud.common.schema') local dev_checks = require('crud.common.dev_checks') @@ -17,6 +18,54 @@ local utils = {} local space_format_cache = setmetatable({}, {__mode = 'k'}) +-- copy from LuaJIT lj_char.c +local lj_char_bits = { + 0, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 3, 3, 3, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 2, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, + 152,152,152,152,152,152,152,152,152,152, 4, 4, 4, 4, 4, 4, + 4,176,176,176,176,176,176,160,160,160,160,160,160,160,160,160, + 160,160,160,160,160,160,160,160,160,160,160, 4, 4, 4, 4,132, + 4,208,208,208,208,208,208,192,192,192,192,192,192,192,192,192, + 192,192,192,192,192,192,192,192,192,192,192, 4, 4, 4, 4, 1, + 128,128,128,128,128,128,128,128,128,128,128,128,128,128,128,128, + 128,128,128,128,128,128,128,128,128,128,128,128,128,128,128,128, + 128,128,128,128,128,128,128,128,128,128,128,128,128,128,128,128, + 128,128,128,128,128,128,128,128,128,128,128,128,128,128,128,128, + 128,128,128,128,128,128,128,128,128,128,128,128,128,128,128,128, + 128,128,128,128,128,128,128,128,128,128,128,128,128,128,128,128, + 128,128,128,128,128,128,128,128,128,128,128,128,128,128,128,128, + 128,128,128,128,128,128,128,128,128,128,128,128,128,128,128,128 +} + +local LJ_CHAR_IDENT = 0x80 +local LJ_CHAR_DIGIT = 0x08 + +local LUA_KEYWORDS = { + ['and'] = true, + ['end'] = true, + ['in'] = true, + ['repeat'] = true, + ['break'] = true, + ['false'] = true, + ['local'] = true, + ['return'] = true, + ['do'] = true, + ['for'] = true, + ['nil'] = true, + ['then'] = true, + ['else'] = true, + ['function'] = true, + ['not'] = true, + ['true'] = true, + ['elseif'] = true, + ['if'] = true, + ['or'] = true, + ['until'] = true, + ['while'] = true, +} + function utils.table_count(table) dev_checks("table") @@ -606,4 +655,40 @@ function utils.merge_options(opts_a, opts_b) return fun.chain(opts_a or {}, opts_b or {}):tomap() end +local function lj_char_isident(n) + return bit.band(lj_char_bits[n + 2], LJ_CHAR_IDENT) == LJ_CHAR_IDENT +end + +local function lj_char_isdigit(n) + return bit.band(lj_char_bits[n + 2], LJ_CHAR_DIGIT) == LJ_CHAR_DIGIT +end + +function utils.check_name_isident(name) + dev_checks('string') + + -- sharding function name cannot + -- be equal to lua keyword + if LUA_KEYWORDS[name] then + return false + end + + -- sharding function name cannot + -- begin with a digit + local char_number = string.byte(name:sub(1,1)) + if lj_char_isdigit(char_number) then + return false + end + + -- sharding func name must be sequence + -- of letters, digits, or underscore symbols + for i = 1, #name do + local char_number = string.byte(name:sub(i,i)) + if not lj_char_isident(char_number) then + return false + end + end + + return true +end + return utils diff --git a/test/unit/sharding_metadata_test.lua b/test/unit/sharding_metadata_test.lua index dad79e7f..b76af630 100644 --- a/test/unit/sharding_metadata_test.lua +++ b/test/unit/sharding_metadata_test.lua @@ -1,6 +1,8 @@ local t = require('luatest') +local ffi = require('ffi') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local sharding_key_module = require('crud.common.sharding.sharding_key') +local sharding_func_module = require('crud.common.sharding.sharding_func') local cache = require('crud.common.sharding.sharding_metadata_cache') local utils = require('crud.common.utils') @@ -248,3 +250,155 @@ g.test_is_part_of_pk_negative = function() local res = is_part_of_pk(space_name, index_parts, sharding_key_as_index_obj) t.assert_equals(res, false) end + +g.test_as_callable_object_func_body = function() + local sharding_func_def = {body = 'function(key) return key end'} + + local callable_obj, err = sharding_func_module.internal.as_callable_object(sharding_func_def, + 'space_name') + t.assert_equals(err, nil) + t.assert_equals(type(callable_obj), 'function') + t.assert_equals(callable_obj(5), 5) +end + +g.test_as_callable_object_G_func = function() + local some_module = { + sharding_func = function(key) return key % 10 end + } + local module_name = 'some_module' + local sharding_func_def = 'some_module.sharding_func' + rawset(_G, module_name, some_module) + + local callable_obj, err = sharding_func_module.internal.as_callable_object(sharding_func_def, + 'space_name') + t.assert_equals(err, nil) + t.assert_equals(callable_obj, some_module.sharding_func) + + rawset(_G, module_name, nil) +end + +g.test_as_callable_object_func_body_negative = function() + local sharding_func_def = {body = 'function(key) return key'} + + local callable_obj, err = sharding_func_module.internal.as_callable_object(sharding_func_def, + 'space_name') + t.assert_equals(callable_obj, nil) + t.assert_str_contains(err.err, + 'Body is incorrect in sharding_func for space (space_name)') +end + +g.test_as_callable_object_G_func_not_exist = function() + local sharding_func_def = 'some_module.sharding_func' + + local callable_obj, err = sharding_func_module.internal.as_callable_object(sharding_func_def, + 'space_name') + t.assert_equals(callable_obj, nil) + t.assert_str_contains(err.err, + 'Wrong sharding function specified in _ddl_sharding_func space for (space_name) space') +end + +g.test_as_callable_object_G_func_keyword = function() + local sharding_func_def = 'and' + rawset(_G, sharding_func_def, function(key) return key % 10 end) + + local callable_obj, err = sharding_func_module.internal.as_callable_object(sharding_func_def, + 'space_name') + t.assert_equals(callable_obj, nil) + t.assert_str_contains(err.err, + 'Wrong sharding function specified in _ddl_sharding_func space for (space_name) space') + + rawset(_G, sharding_func_def, nil) +end + +g.test_as_callable_object_G_func_begin_with_digit = function() + local sharding_func_def = '5incorrect_name' + rawset(_G, sharding_func_def, function(key) return key % 10 end) + + local callable_obj, err = sharding_func_module.internal.as_callable_object(sharding_func_def, + 'space_name') + t.assert_equals(callable_obj, nil) + t.assert_str_contains(err.err, + 'Wrong sharding function specified in _ddl_sharding_func space for (space_name) space') + + rawset(_G, sharding_func_def, nil) +end + +g.test_as_callable_object_G_func_incorrect_symbol = function() + local sharding_func_def = 'incorrect-name' + rawset(_G, sharding_func_def, function(key) return key % 10 end) + + local callable_obj, err = sharding_func_module.internal.as_callable_object(sharding_func_def, + 'space_name') + t.assert_equals(callable_obj, nil) + t.assert_str_contains(err.err, + 'Wrong sharding function specified in _ddl_sharding_func space for (space_name) space') + + rawset(_G, sharding_func_def, nil) +end + +g.test_as_callable_object_invalid_type = function() + local sharding_func_def = 5 + + local callable_obj, err = sharding_func_module.internal.as_callable_object(sharding_func_def, + 'space_name') + t.assert_equals(callable_obj, nil) + t.assert_str_contains(err.err, + 'Wrong sharding function specified in _ddl_sharding_func space for (space_name) space') +end + +g.test_is_callable_func = function() + local sharding_func_obj = function(key) return key end + + local ok = sharding_func_module.internal.is_callable(sharding_func_obj) + t.assert_equals(ok, true) +end + +g.test_is_callable_table_positive = function() + local sharding_func_table = setmetatable({}, { + __call = function(_, key) return key end + }) + + local ok = sharding_func_module.internal.is_callable(sharding_func_table) + t.assert_equals(ok, true) +end + +g.test_is_callable_table_negative = function() + local sharding_func_table = setmetatable({}, {}) + + local ok = sharding_func_module.internal.is_callable(sharding_func_table) + t.assert_equals(ok, false) +end + +g.test_is_callable_userdata_positive = function() + local sharding_func_userdata = newproxy(true) + local mt = getmetatable(sharding_func_userdata) + mt.__call = function(_, key) return key end + + local ok = sharding_func_module.internal.is_callable(sharding_func_userdata) + t.assert_equals(ok, true) +end + +g.test_is_callable_userdata_negative = function() + local sharding_func_userdata = newproxy(true) + local mt = getmetatable(sharding_func_userdata) + mt.__call = {} + + local ok = sharding_func_module.internal.is_callable(sharding_func_userdata) + t.assert_equals(ok, false) +end + +g.test_is_callable_cdata = function() + ffi.cdef[[ + typedef struct + { + int data; + } test_check_struct_t; + ]] + ffi.metatype('test_check_struct_t', { + __call = function(_, key) return key end + }) + local sharding_func_cdata = ffi.new('test_check_struct_t') + + local ok = sharding_func_module.internal.is_callable(sharding_func_cdata) + t.assert_equals(ok, true) +end From 2887e18c4d8908a3fd1cd43ce7e27050c64fb467 Mon Sep 17 00:00:00 2001 From: AnaNek Date: Wed, 22 Dec 2021 15:17:22 +0300 Subject: [PATCH 6/6] Add support of custom sharding func for crud methods This commit introduces modifications in functions for fetching sharding metadata on storage and router to get sharding function. Function `sharding.key_get_bucket_id` calculates bucket_id using DDL sharding function if sharding function exist for specified space. Description in documentation, integration and unit tests are added as well. Closes #237 --- CHANGELOG.md | 2 + README.md | 32 +- crud/common/sharding/init.lua | 16 +- crud/common/sharding/sharding_func.lua | 29 + crud/common/sharding/sharding_metadata.lua | 102 +++- .../sharding/sharding_metadata_cache.lua | 3 + crud/common/sharding_func.lua | 15 + crud/count.lua | 6 +- crud/delete.lua | 6 +- crud/get.lua | 6 +- crud/select/compat/select.lua | 6 +- crud/select/compat/select_old.lua | 6 +- crud/update.lua | 6 +- deps.sh | 2 + test/entrypoint/srv_ddl.lua | 29 + test/helper.lua | 38 ++ test/integration/ddl_sharding_func_test.lua | 542 ++++++++++++++++++ test/integration/ddl_sharding_key_test.lua | 6 + test/unit/sharding_metadata_test.lua | 66 ++- 19 files changed, 886 insertions(+), 32 deletions(-) create mode 100644 crud/common/sharding_func.lua create mode 100644 test/integration/ddl_sharding_func_test.lua diff --git a/CHANGELOG.md b/CHANGELOG.md index 77368329..3093ddef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. must be a part of primary key. * `crud.count()` function to calculate the number of tuples in the space according to conditions. +* Support bucket id calculating using sharding func specified in + DDL schema or in `_ddl_sharding_func` space. ### Fixed diff --git a/README.md b/README.md index 878ffed3..7ce5f820 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,25 @@ documentation). As soon as sharding key for a certain space is available in automatically. Note that CRUD methods `delete()`, `get()` and `update()` requires that sharding key must be a part of primary key. +You can specify sharding function to calculate bucket_id with +sharding func definition as a part of [DDL +schema](https://github.com/tarantool/ddl#input-data-format) +or insert manually to the space `_ddl_sharding_func`. + +CRUD uses `strcrc32` as sharding function by default. +The reason why using of `strcrc32` is undesirable is that +this sharding function is not consistent for cdata numbers. +In particular, it returns 3 different values for normal Lua +numbers like 123, for `unsigned long long` cdata +(like `123ULL`, or `ffi.cast('unsigned long long', +123)`), and for `signed long long` cdata (like `123LL`, or +`ffi.cast('long long', 123)`). + +We cannot change default sharding function `strcrc32` +due to backward compatibility concerns, but please consider +using better alternatives for sharding function. +`mpcrc32` is one of them. + Table below describe what operations supports custom sharding key: | CRUD method | Sharding key support | @@ -101,12 +120,23 @@ Current limitations for using custom sharding key: - It's not possible to update sharding keys automatically when schema is updated on storages, see [#212](https://github.com/tarantool/crud/issues/212). However it is possible - to do it manually with `require('crud.common.sharding_key').update_cache()`. + to do it manually with `require('crud.common.sharding_key').update_cache()` + (this function updates both caches: sharding key cache and sharding function + cache, but returned value is sharding key from cache). - No support of JSON path for sharding key, see [#219](https://github.com/tarantool/crud/issues/219). - `primary_index_fieldno_map` is not cached, see [#243](https://github.com/tarantool/crud/issues/243). +Current limitations for using custom sharding functions: + +- It's not possible to update sharding functions automatically when schema is + updated on storages, see + [#212](https://github.com/tarantool/crud/issues/212). However it is possible + to do it manually with `require('crud.common.sharding_func').update_cache()` + (this function updates both caches: sharding key cache and sharding function + cache, but returned value is sharding function from cache). + ### Insert ```lua diff --git a/crud/common/sharding/init.lua b/crud/common/sharding/init.lua index 73435426..de669a9a 100644 --- a/crud/common/sharding/init.lua +++ b/crud/common/sharding/init.lua @@ -5,6 +5,7 @@ local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false}) local GetReplicasetsError = errors.new_class('GetReplicasetsError', {capture_stack = false}) local utils = require('crud.common.utils') +local dev_checks = require('crud.common.dev_checks') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local sharding = {} @@ -20,11 +21,22 @@ function sharding.get_replicasets_by_bucket_id(bucket_id) } end -function sharding.key_get_bucket_id(key, specified_bucket_id) +function sharding.key_get_bucket_id(space_name, key, specified_bucket_id) + dev_checks('string', '?', '?number|cdata') + if specified_bucket_id ~= nil then return specified_bucket_id end + local sharding_func, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name) + if err ~= nil then + return nil, err + end + + if sharding_func ~= nil then + return sharding_func(key) + end + return vshard.router.bucket_id_strcrc32(key) end @@ -43,7 +55,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id) end local key = utils.extract_key(tuple, sharding_index_parts) - return sharding.key_get_bucket_id(key) + return sharding.key_get_bucket_id(space.name, key) end function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id) diff --git a/crud/common/sharding/sharding_func.lua b/crud/common/sharding/sharding_func.lua index 401190f5..8307d6d4 100644 --- a/crud/common/sharding/sharding_func.lua +++ b/crud/common/sharding/sharding_func.lua @@ -1,5 +1,8 @@ local errors = require('errors') +local log = require('log') +local dev_checks = require('crud.common.dev_checks') +local cache = require('crud.common.sharding.sharding_metadata_cache') local utils = require('crud.common.utils') local ShardingFuncError = errors.new_class('ShardingFuncError', {capture_stack = false}) @@ -74,6 +77,32 @@ local function as_callable_object(sharding_func_def, space_name) ) end +function sharding_func_module.construct_as_callable_obj_cache(metadata_map, specified_space_name) + dev_checks('table', 'string') + + local result_err + + cache.sharding_func_map = {} + for space_name, metadata in pairs(metadata_map) do + if metadata.sharding_func_def ~= nil then + local sharding_func, err = as_callable_object(metadata.sharding_func_def, + space_name) + if err ~= nil then + if specified_space_name == space_name then + result_err = err + log.error(err) + else + log.warn(err) + end + end + + cache.sharding_func_map[space_name] = sharding_func + end + end + + return result_err +end + sharding_func_module.internal = { as_callable_object = as_callable_object, is_callable = is_callable, diff --git a/crud/common/sharding/sharding_metadata.lua b/crud/common/sharding/sharding_metadata.lua index 86dc0a96..3c31be7c 100644 --- a/crud/common/sharding/sharding_metadata.lua +++ b/crud/common/sharding/sharding_metadata.lua @@ -5,6 +5,7 @@ local call = require('crud.common.call') local const = require('crud.common.const') local dev_checks = require('crud.common.dev_checks') local cache = require('crud.common.sharding.sharding_metadata_cache') +local sharding_func = require('crud.common.sharding.sharding_func') local sharding_key = require('crud.common.sharding.sharding_key') local FetchShardingMetadataError = errors.new_class('FetchShardingMetadataError', {capture_stack = false}) @@ -38,25 +39,58 @@ local function locked(f) end end --- Return a map with metadata or nil when space box.space._ddl_sharding_key is --- not available on storage. +local function extract_sharding_func_def(tuple) + if not tuple then + return nil + end + + local SPACE_SHARDING_FUNC_NAME_FIELDNO = 2 + local SPACE_SHARDING_FUNC_BODY_FIELDNO = 3 + + if tuple[SPACE_SHARDING_FUNC_BODY_FIELDNO] ~= nil then + return {body = tuple[SPACE_SHARDING_FUNC_BODY_FIELDNO]} + end + + if tuple[SPACE_SHARDING_FUNC_NAME_FIELDNO] ~= nil then + return tuple[SPACE_SHARDING_FUNC_NAME_FIELDNO] + end + + return nil +end + +-- Return a map with metadata or nil when spaces box.space._ddl_sharding_key and +-- box.space._ddl_sharding_func are not available on storage. function sharding_metadata_module.fetch_on_storage() local sharding_key_space = box.space._ddl_sharding_key - if sharding_key_space == nil then + local sharding_func_space = box.space._ddl_sharding_func + + if sharding_key_space == nil and sharding_func_space == nil then return nil end local SPACE_NAME_FIELDNO = 1 local SPACE_SHARDING_KEY_FIELDNO = 2 local metadata_map = {} - for _, tuple in sharding_key_space:pairs() do - local space_name = tuple[SPACE_NAME_FIELDNO] - local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO] - local space_format = box.space[space_name]:format() - metadata_map[space_name] = { - sharding_key_def = sharding_key_def, - space_format = space_format, - } + + if sharding_key_space ~= nil then + for _, tuple in sharding_key_space:pairs() do + local space_name = tuple[SPACE_NAME_FIELDNO] + local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO] + local space_format = box.space[space_name]:format() + metadata_map[space_name] = { + sharding_key_def = sharding_key_def, + space_format = space_format, + } + end + end + + if sharding_func_space ~= nil then + for _, tuple in sharding_func_space:pairs() do + local space_name = tuple[SPACE_NAME_FIELDNO] + local sharding_func_def = extract_sharding_func_def(tuple) + metadata_map[space_name] = metadata_map[space_name] or {} + metadata_map[space_name].sharding_func_def = sharding_func_def + end end return metadata_map @@ -83,6 +117,7 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name) end if metadata_map == nil then cache[cache.SHARDING_KEY_MAP_NAME] = {} + cache[cache.SHARDING_FUNC_MAP_NAME] = {} return end @@ -90,17 +125,13 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name) if err ~= nil then return err end + + local err = sharding_func.construct_as_callable_obj_cache(metadata_map, space_name) + if err ~= nil then + return err + end end) --- Get sharding index for a certain space. --- --- Return: --- - sharding key as index object, when sharding key definition found on --- storage. --- - nil, when sharding key definition was not found on storage. Pay attention --- that nil without error is a successfull return value. --- - nil and error, when something goes wrong on fetching attempt. --- local function fetch_on_router(space_name, metadata_map_name, timeout) if cache[metadata_map_name] ~= nil then return cache[metadata_map_name][space_name] @@ -120,17 +151,48 @@ local function fetch_on_router(space_name, metadata_map_name, timeout) "Fetching sharding key for space '%s' is failed", space_name) end +-- Get sharding index for a certain space. +-- +-- Return: +-- - sharding key as index object, when sharding key definition found on +-- storage. +-- - nil, when sharding key definition was not found on storage. Pay attention +-- that nil without error is a successfull return value. +-- - nil and error, when something goes wrong on fetching attempt. +-- function sharding_metadata_module.fetch_sharding_key_on_router(space_name, timeout) dev_checks('string', '?number') return fetch_on_router(space_name, cache.SHARDING_KEY_MAP_NAME, timeout) end +-- Get sharding func for a certain space. +-- +-- Return: +-- - sharding func as callable object, when sharding func definition found on +-- storage. +-- - nil, when sharding func definition was not found on storage. Pay attention +-- that nil without error is a successfull return value. +-- - nil and error, when something goes wrong on fetching attempt. +-- +function sharding_metadata_module.fetch_sharding_func_on_router(space_name, timeout) + dev_checks('string', '?number') + + return fetch_on_router(space_name, cache.SHARDING_FUNC_MAP_NAME, timeout) +end + function sharding_metadata_module.update_sharding_key_cache(space_name) cache.drop_caches() + return sharding_metadata_module.fetch_sharding_key_on_router(space_name) end +function sharding_metadata_module.update_sharding_func_cache(space_name) + cache.drop_caches() + + return sharding_metadata_module.fetch_sharding_func_on_router(space_name) +end + function sharding_metadata_module.init() _G._crud.fetch_on_storage = sharding_metadata_module.fetch_on_storage end diff --git a/crud/common/sharding/sharding_metadata_cache.lua b/crud/common/sharding/sharding_metadata_cache.lua index 77c77594..5b2fdcd6 100644 --- a/crud/common/sharding/sharding_metadata_cache.lua +++ b/crud/common/sharding/sharding_metadata_cache.lua @@ -3,12 +3,15 @@ local fiber = require('fiber') local sharding_metadata_cache = {} sharding_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map" +sharding_metadata_cache.SHARDING_FUNC_MAP_NAME = "sharding_func_map" sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil +sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil sharding_metadata_cache.fetch_lock = fiber.channel(1) sharding_metadata_cache.is_part_of_pk = {} function sharding_metadata_cache.drop_caches() sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil + sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil if sharding_metadata_cache.fetch_lock ~= nil then sharding_metadata_cache.fetch_lock:close() end diff --git a/crud/common/sharding_func.lua b/crud/common/sharding_func.lua new file mode 100644 index 00000000..a3aca4c1 --- /dev/null +++ b/crud/common/sharding_func.lua @@ -0,0 +1,15 @@ +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') + +local sharding_func_cache = {} + +-- This method is exported here because +-- we already have customers using old API +-- for updating sharding key cache in their +-- projects like `require('crud.common.sharding_key').update_cache()` +-- This method provides similar behavior for +-- sharding function cache. +function sharding_func_cache.update_cache(space_name) + return sharding_metadata_module.update_sharding_func_cache(space_name) +end + +return sharding_func_cache diff --git a/crud/count.lua b/crud/count.lua index 4eef84f2..09401a4d 100644 --- a/crud/count.lua +++ b/crud/count.lua @@ -162,7 +162,11 @@ local function call_count_on_router(space_name, user_conditions, opts) local perform_map_reduce = opts.force_map_call == true or (opts.bucket_id == nil and plan.sharding_key == nil) if not perform_map_reduce then - local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id) + local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id) + if err ~= nil then + return nil, err + end + assert(bucket_id ~= nil) local err diff --git a/crud/delete.lua b/crud/delete.lua index 5fad9ea1..01c061bf 100644 --- a/crud/delete.lua +++ b/crud/delete.lua @@ -74,7 +74,11 @@ local function call_delete_on_router(space_name, key, opts) end end - local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id) + local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id) + if err ~= nil then + return nil, err + end + local call_opts = { mode = 'write', timeout = opts.timeout, diff --git a/crud/get.lua b/crud/get.lua index e2853882..27e20193 100644 --- a/crud/get.lua +++ b/crud/get.lua @@ -77,7 +77,11 @@ local function call_get_on_router(space_name, key, opts) end end - local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id) + local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id) + if err ~= nil then + return nil, err + end + local call_opts = { mode = opts.mode or 'read', prefer_replica = opts.prefer_replica, diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index 7e8af44f..1984a87a 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -103,7 +103,11 @@ local function build_select_iterator(space_name, user_conditions, opts) local perform_map_reduce = opts.force_map_call == true or (opts.bucket_id == nil and plan.sharding_key == nil) if not perform_map_reduce then - local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id) + local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id) + if err ~= nil then + return nil, err + end + assert(bucket_id ~= nil) local err diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index 17696d55..1cf88744 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -129,7 +129,11 @@ local function build_select_iterator(space_name, user_conditions, opts) local perform_map_reduce = opts.force_map_call == true or (opts.bucket_id == nil and plan.sharding_key == nil) if not perform_map_reduce then - local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id) + local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id) + if err ~= nil then + return nil, err + end + assert(bucket_id ~= nil) local err diff --git a/crud/update.lua b/crud/update.lua index 12487707..f78e0035 100644 --- a/crud/update.lua +++ b/crud/update.lua @@ -110,7 +110,11 @@ local function call_update_on_router(space_name, key, user_operations, opts) end end - local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id) + local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id) + if err ~= nil then + return nil, err + end + local call_opts = { mode = 'write', timeout = opts.timeout, diff --git a/deps.sh b/deps.sh index 48ccdd0a..87ce6b92 100755 --- a/deps.sh +++ b/deps.sh @@ -16,4 +16,6 @@ tarantoolctl rocks install https://raw.githubusercontent.com/moteus/lua-path/mas tarantoolctl rocks install https://raw.githubusercontent.com/moteus/luacov-coveralls/master/rockspecs/luacov-coveralls-scm-0.rockspec tarantoolctl rocks install cartridge 2.7.3 +tarantoolctl rocks install ddl 1.6.0 + tarantoolctl rocks make diff --git a/test/entrypoint/srv_ddl.lua b/test/entrypoint/srv_ddl.lua index f240c743..30f432b8 100755 --- a/test/entrypoint/srv_ddl.lua +++ b/test/entrypoint/srv_ddl.lua @@ -9,6 +9,18 @@ local cartridge = require('cartridge') local ddl = require('ddl') package.preload['customers-storage'] = function() + -- set sharding func in dot.notation + -- in _G for sharding func tests + local some_module = { + sharding_func = + function(key) + if key ~= nil and key[1] ~= nil then + return key[1] % 10 + end + end + } + rawset(_G, 'some_module', some_module) + return { role_name = 'customers-storage', init = function() @@ -131,6 +143,18 @@ package.preload['customers-storage'] = function() table.insert(customers_name_age_key_three_fields_index_schema.indexes, bucket_id_index) table.insert(customers_name_age_key_three_fields_index_schema.indexes, three_fields_index) + local customers_id_key_schema = table.deepcopy(customers_schema) + customers_id_key_schema.sharding_key = {'id'} + table.insert(customers_id_key_schema.indexes, primary_index) + table.insert(customers_id_key_schema.indexes, bucket_id_index) + table.insert(customers_id_key_schema.indexes, name_index) + + local customers_body_func_schema = table.deepcopy(customers_id_key_schema) + customers_body_func_schema.sharding_func = { body = 'function(key) return key[1] % 10 end' } + + local customers_G_func_schema = table.deepcopy(customers_id_key_schema) + customers_G_func_schema.sharding_func = 'some_module.sharding_func' + local schema = { spaces = { customers_name_key = customers_name_key_schema, @@ -140,6 +164,8 @@ package.preload['customers-storage'] = function() customers_age_key = customers_age_key_schema, customers_name_age_key_different_indexes = customers_name_age_key_different_indexes_schema, customers_name_age_key_three_fields_index = customers_name_age_key_three_fields_index_schema, + customers_G_func = customers_G_func_schema, + customers_body_func = customers_body_func_schema, } } @@ -154,6 +180,9 @@ package.preload['customers-storage'] = function() local fieldno_sharding_key = 2 box.space['_ddl_sharding_key']:update(space_name, {{'=', fieldno_sharding_key, sharding_key_def}}) end) + rawset(_G, 'set_sharding_func', function(space_name, fieldno_sharding_func, sharding_func_def) + box.space['_ddl_sharding_func']:update(space_name, {{'=', fieldno_sharding_func, sharding_func_def}}) + end) end, } end diff --git a/test/helper.lua b/test/helper.lua index 5d0795a5..f2cdb6ab 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -340,4 +340,42 @@ function helpers.get_sharding_key_cache(cluster) ]]) end +-- it is not possible to get function or table with function +-- object through net.box that's why we get a sign of record +-- existence of cache but not the cache itself +function helpers.update_sharding_func_cache(cluster, space_name) + return cluster.main_server.net_box:eval([[ + local sharding_func = require('crud.common.sharding_func') + + local space_name = ... + local sharding_func, err = sharding_func.update_cache(space_name) + if sharding_func == nil then + return false, err + end + + return true, err + ]], {space_name}) +end + +-- it is not possible to get function or table with function +-- object through net.box that's why we get size of cache +-- but not the cache itself +function helpers.get_sharding_func_cache_size(cluster) + return cluster.main_server.net_box:eval([[ + local sharding_metadata_cache = require('crud.common.sharding.sharding_metadata_cache') + + local cache, err = sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] + if cache == nil then + return nil, err + end + + local cnt = 0 + for _, _ in pairs(cache) do + cnt = cnt + 1 + end + + return cnt, err + ]]) +end + return helpers diff --git a/test/integration/ddl_sharding_func_test.lua b/test/integration/ddl_sharding_func_test.lua new file mode 100644 index 00000000..2ae055cf --- /dev/null +++ b/test/integration/ddl_sharding_func_test.lua @@ -0,0 +1,542 @@ +local fio = require('fio') +local crud = require('crud') +local t = require('luatest') + +local helpers = require('test.helper') + +local ok = pcall(require, 'ddl') +if not ok then + t.skip('Lua module ddl is required to run test') +end + +local pgroup = t.group('ddl_sharding_func', { + {engine = 'memtx', space_name = 'customers_G_func'}, + {engine = 'memtx', space_name = 'customers_body_func'}, + {engine = 'vinyl', space_name = 'customers_G_func'}, + {engine = 'vinyl', space_name = 'customers_body_func'}, +}) + +local cache_group = t.group('ddl_sharding_func_cache', { + {engine = 'memtx'}, + {engine = 'vinyl'}, +}) + +pgroup.before_all(function(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_ddl'), + use_vshard = true, + replicasets = helpers.get_test_replicasets(), + env = { + ['ENGINE'] = g.params.engine, + }, + }) + g.cluster:start() + local result, err = g.cluster.main_server.net_box:eval([[ + local ddl = require('ddl') + + local ok, err = ddl.get_schema() + return ok, err + ]]) + t.assert_equals(type(result), 'table') + t.assert_equals(err, nil) +end) + +pgroup.after_all(function(g) helpers.stop_cluster(g.cluster) end) + +pgroup.before_each(function(g) + helpers.truncate_space_on_cluster(g.cluster, 'customers_G_func') + helpers.truncate_space_on_cluster(g.cluster, 'customers_body_func') +end) + +cache_group.before_all(function(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_ddl'), + use_vshard = true, + replicasets = helpers.get_test_replicasets(), + env = { + ['ENGINE'] = g.params.engine, + }, + }) + g.cluster:start() + local result, err = g.cluster.main_server.net_box:eval([[ + local ddl = require('ddl') + + local ok, err = ddl.get_schema() + return ok, err + ]]) + t.assert_equals(type(result), 'table') + t.assert_equals(err, nil) +end) + +cache_group.after_all(function(g) helpers.stop_cluster(g.cluster) end) + +cache_group.before_each(function(g) + helpers.truncate_space_on_cluster(g.cluster, 'customers_G_func') + helpers.truncate_space_on_cluster(g.cluster, 'customers_body_func') +end) + +pgroup.test_insert_object = function(g) + local result, err = g.cluster.main_server.net_box:call( + 'crud.insert_object', {g.params.space_name, {id = 158, name = 'Augustus', age = 48}}) + t.assert_equals(err, nil) + + t.assert_equals(result.metadata, { + {is_nullable = false, name = 'id', type = 'unsigned'}, + {is_nullable = false, name = 'bucket_id', type = 'unsigned'}, + {is_nullable = false, name = 'name', type = 'string'}, + {is_nullable = false, name = 'age', type = 'number'}, + }) + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_equals(objects, {{id = 158, bucket_id = 8, name = 'Augustus', age = 48}}) + + local conn_s1 = g.cluster:server('s1-master').net_box + -- There is no tuple on s1 that we inserted before using crud.insert_object(). + local result = conn_s1.space[g.params.space_name]:get({158, 'Augustus'}) + t.assert_equals(result, nil) + + local conn_s2 = g.cluster:server('s2-master').net_box + -- There is a tuple on s2 that we inserted before using crud.insert_object(). + local result = conn_s2.space[g.params.space_name]:get({158, 'Augustus'}) + t.assert_equals(result, {158, 8, 'Augustus', 48}) +end + +pgroup.test_insert = function(g) + -- Insert a tuple. + local result, err = g.cluster.main_server.net_box:call( + 'crud.insert', {g.params.space_name, {27, box.NULL, 'Ivan', 25}}) + t.assert_equals(err, nil) + t.assert_equals(result.metadata, { + {is_nullable = false, name = 'id', type = 'unsigned'}, + {is_nullable = false, name = 'bucket_id', type = 'unsigned'}, + {is_nullable = false, name = 'name', type = 'string'}, + {is_nullable = false, name = 'age', type = 'number'}, + }) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows[1], {27, 7, 'Ivan', 25}) + + -- There is a tuple on s2 that we inserted before using crud.insert(). + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:get({27, 'Ivan'}) + t.assert_equals(result, {27, 7, 'Ivan', 25}) + + -- There is no tuple on s1 that we inserted before using crud.insert(). + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space[g.params.space_name]:get({27, 'Ivan'}) + t.assert_equals(result, nil) +end + +pgroup.test_replace_object = function(g) + -- bucket_id is 596, storage is s-2 + local tuple = {8, 596, 'Dimitrion', 20} + + -- Put tuple to s1 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Put tuple to s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Replace an object. + local result, err = g.cluster.main_server.net_box:call( + 'crud.replace_object', {g.params.space_name, {id = 8, name = 'John Doe', age = 25}}) + t.assert_equals(err, nil) + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_equals(objects, {{id = 8, bucket_id = 8, name = 'John Doe', age = 25}}) + + -- There is no replaced tuple on s1 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space[g.params.space_name]:get({8, 'John Doe'}) + t.assert_equals(result, nil) + + -- There is replaced tuple on s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:get({8, 'John Doe'}) + t.assert_equals(result, {8, 8, 'John Doe', 25}) +end + +pgroup.test_replace = function(g) + -- bucket_id is 596, storage is s-2 + local tuple = {71, 596, 'Dimitrion', 20} + + -- Put tuple to s1 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Put tuple to s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + local tuple = {71, box.NULL, 'Augustus', 21} + + -- Replace a tuple. + local result, err = g.cluster.main_server.net_box:call('crud.replace', { + g.params.space_name, tuple + }) + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows[1], {71, 1, 'Augustus', 21}) + + -- There is no replaced tuple on s1 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space[g.params.space_name]:get({71, 'Augustus'}) + t.assert_equals(result, nil) + + -- There is replaced tuple on s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:get({71, 'Augustus'}) + t.assert_equals(result, {71, 1, 'Augustus', 21}) +end + +pgroup.test_upsert_object = function(g) + -- Upsert an object first time. + local result, err = g.cluster.main_server.net_box:call( + 'crud.upsert_object', + {g.params.space_name, {id = 66, name = 'Jack Sparrow', age = 25}, {{'+', 'age', 26}}} + ) + t.assert_equals(#result.rows, 0) + t.assert_equals(result.metadata, { + {is_nullable = false, name = 'id', type = 'unsigned'}, + {is_nullable = false, name = 'bucket_id', type = 'unsigned'}, + {is_nullable = false, name = 'name', type = 'string'}, + {is_nullable = false, name = 'age', type = 'number'}, + }) + t.assert_equals(err, nil) + + -- There is no tuple on s1 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space[g.params.space_name]:get({66, 'Jack Sparrow'}) + t.assert_equals(result, nil) + + -- There is a tuple on s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:get({66, 'Jack Sparrow'}) + t.assert_equals(result, {66, 6, 'Jack Sparrow', 25}) + + -- Upsert the same query second time when tuple exists. + local result, err = g.cluster.main_server.net_box:call( + 'crud.upsert_object', + {g.params.space_name, {id = 66, name = 'Jack Sparrow', age = 25}, {{'+', 'age', 26}}} + ) + t.assert_equals(#result.rows, 0) + t.assert_equals(err, nil) + + -- There is no tuple on s2 replicaset. + local result = conn_s1.space[g.params.space_name]:get({66, 'Jack Sparrow'}) + t.assert_equals(result, nil) + + -- There is an updated tuple on s1 replicaset. + local result = conn_s2.space[g.params.space_name]:get({66, 'Jack Sparrow'}) + t.assert_equals(result, {66, 6, 'Jack Sparrow', 51}) +end + +pgroup.test_upsert = function(g) + local tuple = {14, box.NULL, 'John', 25} + + -- Upsert an object first time. + local result, err = g.cluster.main_server.net_box:call('crud.upsert', { + g.params.space_name, tuple, {} + }) + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 0) + + -- There is no tuple on s2 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space[g.params.space_name]:get({14, 'John'}) + t.assert_equals(result, nil) + + -- There is a tuple on s1 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:get({14, 'John'}) + t.assert_equals(result, {14, 4, 'John', 25}) + + -- Upsert the same query second time when tuple exists. + local result, err = g.cluster.main_server.net_box:call( + 'crud.upsert_object', + {g.params.space_name, {id = 14, name = 'John', age = 25}, {{'+', 'age', 26}}} + ) + t.assert_equals(#result.rows, 0) + t.assert_equals(err, nil) + + -- There is no tuple on s2 replicaset. + local result = conn_s1.space[g.params.space_name]:get({14, 'John'}) + t.assert_equals(result, nil) + + -- There is an updated tuple on s1 replicaset. + local result = conn_s2.space[g.params.space_name]:get({14, 'John'}) + t.assert_equals(result, {14, 4, 'John', 51}) +end + +pgroup.test_select = function(g) + -- bucket_id is id % 10 = 8 + local tuple = {18, 8, 'Ptolemy', 25} + + -- Put tuple to s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + local conditions = {{'==', 'id', 18}} + local result, err = g.cluster.main_server.net_box:call('crud.select', { + g.params.space_name, conditions, + }) + + t.assert_equals(err, nil) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows[1], tuple) + + -- bucket_id is 2719, storage is s-1 + local tuple = {19, 2719, 'Ptolemy', 25} + + -- Put tuple to s1 replicaset. + local conn_s2 = g.cluster:server('s1-master').net_box + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- calculated bucket_id will be id % 10 = 19 % 10 = 9 -> + -- select will be performed on s2 replicaset + -- but tuple is on s1 replicaset -> result will be empty + local conditions = {{'==', 'id', 19}} + local result, err = g.cluster.main_server.net_box:call('crud.select', { + g.params.space_name, conditions, + }) + + t.assert_equals(err, nil) + t.assert_equals(result.rows, {}) +end + +pgroup.test_update = function(g) + -- bucket_id is id % 10 = 2 + local tuple = {12, 2, 'Ivan', 10} + + local conn_s1 = g.cluster:server('s1-master').net_box + local conn_s2 = g.cluster:server('s2-master').net_box + + -- Put tuple with to s1 replicaset. + local result = conn_s1.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Put tuple with to s2 replicaset. + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Update a tuple. + local update_operations = { + {'+', 'age', 10}, + } + local result, err = g.cluster.main_server.net_box:call('crud.update', { + g.params.space_name, {12, 'Ivan'}, update_operations, + }) + t.assert_equals(err, nil) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows, {{12, 2, 'Ivan', 20}}) + + -- Tuple on s1 replicaset was not updated. + local result = conn_s1.space[g.params.space_name]:get({12, 'Ivan'}) + t.assert_equals(result, {12, 2, 'Ivan', 10}) + + -- Tuple on s2 replicaset was updated. + local result = conn_s2.space[g.params.space_name]:get({12, 'Ivan'}) + t.assert_equals(result, {12, 2, 'Ivan', 20}) + + -- bucket_id is 2719, storage is s-1 + local tuple = {18, 2719, 'Ivan', 10} + + -- Put tuple with to s1 replicaset. + local result = conn_s1.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Update a tuple. + local update_operations = { + {'+', 'age', 10}, + } + -- calculated bucket_id will be id % 10 = 18 % 10 = 8 -> + -- select will be performed on s2 replicaset + -- but tuple is on s1 replicaset -> result will be empty + local result, err = g.cluster.main_server.net_box:call('crud.update', { + g.params.space_name, {18, 'Ivan'}, update_operations, + }) + t.assert_equals(err, nil) + t.assert_equals(result.rows, {}) + + -- Tuple on s1 replicaset was not updated. + local result = conn_s1.space[g.params.space_name]:get({18, 'Ivan'}) + t.assert_equals(result, {18, 2719, 'Ivan', 10}) +end + +pgroup.test_get = function(g) + -- bucket_id is id % 10 = 2 + local tuple = {12, 2, 'Ivan', 20} + + -- Put tuple to s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Get a tuple. + local result, err = g.cluster.main_server.net_box:call('crud.get', { + g.params.space_name, {12, 'Ivan'}, + }) + t.assert_equals(err, nil) + t.assert_equals(result.rows, {{12, 2, 'Ivan', 20}}) + + -- bucket_id is 2719, storage is s-1 + local tuple = {18, 2719, 'Ivan', 10} + + -- Put tuple to s1 replicaset. + local conn_s2 = g.cluster:server('s1-master').net_box + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- calculated bucket_id will be id % 10 = 18 % 10 = 8 -> + -- select will be performed on s2 replicaset + -- but tuple is on s1 replicaset -> result will be empty + local result, err = g.cluster.main_server.net_box:call('crud.get', { + g.params.space_name, {18, 'Ivan'}, + }) + t.assert_equals(err, nil) + t.assert_equals(result.rows, {}) +end + +pgroup.test_delete = function(g) + -- bucket_id is id % 10 = 2 + local tuple = {12, 2, 'Ivan', 20} + + local conn_s1 = g.cluster:server('s1-master').net_box + local conn_s2 = g.cluster:server('s2-master').net_box + + -- Put tuple to s1 replicaset. + local result = conn_s1.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Put tuple to s2 replicaset. + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- Delete tuple. + local _, err = g.cluster.main_server.net_box:call('crud.delete', { + g.params.space_name, {12, 'Ivan'}, + }) + t.assert_equals(err, nil) + + -- There is a tuple on s1 replicaset. + local result = conn_s1.space[g.params.space_name]:get({12, 'Ivan'}) + t.assert_equals(result, {12, 2, 'Ivan', 20}) + + -- There is no tuple on s2 replicaset. + local result = conn_s2.space[g.params.space_name]:get({12, 'Ivan'}) + t.assert_equals(result, nil) + + -- bucket_id is 2719, storage is s-1 + local tuple = {18, 2719, 'Ivan', 20} + + -- Put tuple with to s1 replicaset. + local result = conn_s1.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- calculated bucket_id will be id % 10 = 18 % 10 = 8 -> + -- select will be performed on s2 replicaset + -- but tuple is on s1 replicaset -> result will be empty + local _, err = g.cluster.main_server.net_box:call('crud.delete', { + g.params.space_name, {18, 'Ivan'} + }) + t.assert_equals(err, nil) + + -- Tuple on s1 replicaset was not deleted. + local result = conn_s1.space[g.params.space_name]:get({18, 'Ivan'}) + t.assert_equals(result, {18, 2719, 'Ivan', 20}) +end + +pgroup.test_count = function(g) + -- bucket_id is id % 10 = 8 + local tuple = {18, 8, 'Ptolemy', 25} + + -- Put tuple to s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + local conditions = {{'==', 'id', 18}} + local result, err = g.cluster.main_server.net_box:call('crud.count', { + g.params.space_name, conditions, + }) + + t.assert_equals(err, nil) + t.assert_equals(result, 1) + + -- bucket_id is 2719, storage is s-1 + local tuple = {19, 2719, 'Ptolemy', 25} + + -- Put tuple to s1 replicaset. + local conn_s2 = g.cluster:server('s1-master').net_box + local result = conn_s2.space[g.params.space_name]:insert(tuple) + t.assert_not_equals(result, nil) + + -- calculated bucket_id will be id % 10 = 19 % 10 = 9 -> + -- select will be performed on s2 replicaset + -- but tuple is on s1 replicaset -> result will be empty -> + -- count = 0 + local conditions = {{'==', 'id', 19}} + local result, err = g.cluster.main_server.net_box:call('crud.count', { + g.params.space_name, conditions, + }) + + t.assert_equals(err, nil) + t.assert_equals(result, 0) +end + +cache_group.test_update_cache_with_incorrect_func = function(g) + local fieldno_sharding_func_name = 2 + + -- get data from cache for space with correct sharding func + local space_name = 'customers_G_func' + + local record_exist, err = helpers.update_sharding_func_cache(g.cluster, space_name) + t.assert_equals(err, nil) + t.assert_equals(record_exist, true) + + -- records for all spaces exist + local cache_size = helpers.get_sharding_func_cache_size(g.cluster) + t.assert_equals(cache_size, 2) + + -- no error just warning + local space_name = 'customers_G_func' + helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server) + server.net_box:call('set_sharding_func', {space_name, fieldno_sharding_func_name, 'non_existent_func'}) + end) + + -- we get no error because we sent request for correct space + local record_exist, err = helpers.update_sharding_func_cache(g.cluster, 'customers_body_func') + t.assert_equals(err, nil) + t.assert_equals(record_exist, true) + + -- cache['customers_G_func'] == nil (space with incorrect func) + -- other records for correct spaces exist in cache + cache_size = helpers.get_sharding_func_cache_size(g.cluster) + t.assert_equals(cache_size, 1) + + -- get data from cache for space with incorrect sharding func + local space_name = 'customers_G_func' + helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server) + server.net_box:call('set_sharding_func', {space_name, fieldno_sharding_func_name, 'non_existent_func'}) + end) + + -- we get an error because we sent request for incorrect space + local record_exist, err = helpers.update_sharding_func_cache(g.cluster, space_name) + t.assert_equals(record_exist, false) + t.assert_str_contains(err.err, + "Wrong sharding function specified in _ddl_sharding_func space for (customers_G_func) space") + + -- cache['customers_G_func'] == nil (space with incorrect func) + -- other records for correct spaces exist in cache + cache_size = helpers.get_sharding_func_cache_size(g.cluster) + t.assert_equals(cache_size, 1) +end diff --git a/test/integration/ddl_sharding_key_test.lua b/test/integration/ddl_sharding_key_test.lua index 14f4ba59..8a874ad0 100644 --- a/test/integration/ddl_sharding_key_test.lua +++ b/test/integration/ddl_sharding_key_test.lua @@ -706,6 +706,8 @@ pgroup.test_update_cache_with_incorrect_key = function(g) -- records for all spaces exist sharding_key_as_index_obj = helpers.get_sharding_key_cache(g.cluster) t.assert_equals(sharding_key_as_index_obj, { + customers_G_func = {parts = {{fieldno = 1}}}, + customers_body_func = {parts = {{fieldno = 1}}}, customers_age_key = {parts = {{fieldno = 4}}}, customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}}, customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}}, @@ -730,6 +732,8 @@ pgroup.test_update_cache_with_incorrect_key = function(g) -- other records for correct spaces exist in cache sharding_key_as_index_obj = helpers.get_sharding_key_cache(g.cluster) t.assert_equals(sharding_key_as_index_obj, { + customers_G_func = {parts = {{fieldno = 1}}}, + customers_body_func = {parts = {{fieldno = 1}}}, customers_age_key = {parts = {{fieldno = 4}}}, customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}}, customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}}, @@ -753,6 +757,8 @@ pgroup.test_update_cache_with_incorrect_key = function(g) -- other records for correct spaces exist in cache sharding_key_as_index_obj = helpers.get_sharding_key_cache(g.cluster) t.assert_equals(sharding_key_as_index_obj, { + customers_G_func = {parts = {{fieldno = 1}}}, + customers_body_func = {parts = {{fieldno = 1}}}, customers_age_key = {parts = {{fieldno = 4}}}, customers_name_age_key_different_indexes = {parts = {{fieldno = 3}, {fieldno = 4}}}, customers_name_age_key_three_fields_index = {parts = {{fieldno = 3}, {fieldno = 4}}}, diff --git a/test/unit/sharding_metadata_test.lua b/test/unit/sharding_metadata_test.lua index b76af630..9c5bfda8 100644 --- a/test/unit/sharding_metadata_test.lua +++ b/test/unit/sharding_metadata_test.lua @@ -15,15 +15,31 @@ g.before_each(function() {name = 'space_name', type = 'string', is_nullable = false}, {name = 'sharding_key', type = 'array', is_nullable = false} } - -- Create a space _ddl_sharding_key with a tuple that - -- contains a space name and it's sharding key. + + local sharding_func_format = { + {name = 'space_name', type = 'string', is_nullable = false}, + {name = 'sharding_func_name', type = 'string', is_nullable = true}, + {name = 'sharding_func_body', type = 'string', is_nullable = true}, + } + if type(box.cfg) ~= 'table' then helpers.box_cfg() end + + -- Create a space _ddl_sharding_key with a tuple that + -- contains a space name and it's sharding key. box.schema.space.create('_ddl_sharding_key', { format = sharding_key_format, }) box.space._ddl_sharding_key:create_index('pk') + + -- Create a space _ddl_sharding_func with a tuple that + -- contains a space name and it's sharding func name/body. + box.schema.space.create('_ddl_sharding_func', { + format = sharding_func_format, + }) + box.space._ddl_sharding_func:create_index('pk') + box.schema.space.create('fetch_on_storage') end) @@ -32,6 +48,11 @@ g.after_each(function() if box.space._ddl_sharding_key ~= nil then box.space._ddl_sharding_key:drop() end + + if box.space._ddl_sharding_func ~= nil then + box.space._ddl_sharding_func:drop() + end + box.space.fetch_on_storage:drop() cache.drop_caches() end) @@ -88,20 +109,25 @@ end g.test_fetch_sharding_metadata_on_storage_positive = function() local space_name = 'fetch_on_storage' local sharding_key_def = {'name', 'age'} + local sharding_func_def = 'sharding_func_name' box.space._ddl_sharding_key:insert({space_name, sharding_key_def}) + box.space._ddl_sharding_func:insert({space_name, sharding_func_def}) local metadata_map = sharding_metadata_module.fetch_on_storage() t.assert_equals(metadata_map, { [space_name] = { sharding_key_def = sharding_key_def, + sharding_func_def = sharding_func_def, space_format = {} }, }) end g.test_fetch_sharding_key_on_storage_positive = function() + box.space._ddl_sharding_func:drop() + local space_name = 'fetch_on_storage' local sharding_key_def = {'name', 'age'} box.space._ddl_sharding_key:insert({space_name, sharding_key_def}) @@ -116,9 +142,43 @@ g.test_fetch_sharding_key_on_storage_positive = function() }) end +g.test_fetch_sharding_func_name_on_storage_positive = function() + box.space._ddl_sharding_key:drop() + + local space_name = 'fetch_on_storage' + local sharding_func_def = 'sharding_func_name' + box.space._ddl_sharding_func:insert({space_name, sharding_func_def}) + + local metadata_map = sharding_metadata_module.fetch_on_storage() + + t.assert_equals(metadata_map, { + [space_name] = { + sharding_func_def = sharding_func_def, + }, + }) +end + +g.test_fetch_sharding_func_body_on_storage_positive = function() + box.space._ddl_sharding_key:drop() + + local space_name = 'fetch_on_storage' + local sharding_func_def = 'function(key) return key end' + box.space._ddl_sharding_func:insert({space_name, nil, sharding_func_def}) + + local metadata_map = sharding_metadata_module.fetch_on_storage() + + t.assert_equals(metadata_map, { + [space_name] = { + sharding_func_def = {body = sharding_func_def}, + }, + }) +end + g.test_fetch_sharding_metadata_on_storage_negative = function() - -- Test checks return value when _ddl_sharding_key is absent. + -- Test checks return value when _ddl_sharding_key + -- and _ddl_sharding_func are absent. box.space._ddl_sharding_key:drop() + box.space._ddl_sharding_func:drop() local metadata_map = sharding_metadata_module.fetch_on_storage() t.assert_equals(metadata_map, nil)