diff --git a/.github/workflows/test_on_push.yaml b/.github/workflows/test_on_push.yaml index fa02f2ea..bcf6d348 100644 --- a/.github/workflows/test_on_push.yaml +++ b/.github/workflows/test_on_push.yaml @@ -18,7 +18,7 @@ jobs: include: - tarantool-version: "2.7" remove-merger: true - - tarantool-version: "2.7" + - tarantool-version: "2.8" coveralls: true fail-fast: false runs-on: [ubuntu-latest] diff --git a/CHANGELOG.md b/CHANGELOG.md index f781e309..eb9cad0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +* CRUD operations calculates bucket id automatically using sharding + key specified with DDL schema or in `_ddl_sharding_key` space. + NOTE: CRUD methods delete(), get() and update() requires that sharding key + must be a part of primary key. + ### Changed ### Fixed diff --git a/README.md b/README.md index 29ae1145..1e5e5bf6 100644 --- a/README.md +++ b/README.md @@ -53,11 +53,60 @@ crud.unflatten_rows(res.rows, res.metadata) **Notes:** * A space should have a format. -* By default, `bucket_id` is computed as `vshard.router.bucket_id_strcrc32(key)`, - where `key` is the primary key value. - Custom bucket ID can be specified as `opts.bucket_id` for each operation. - For operations that accepts tuple/object bucket ID can be specified as - tuple/object field as well as `opts.bucket_id` value. + +**Sharding key and bucket id calculation** + +*Sharding key* is a set of tuple field values used for calculation *bucket ID*. +*Sharding key definition* is a set of tuple field names that describe what +tuple field should be a part of sharding key. *Bucket ID* determines which +replicaset stores certain data. Function that used for bucket ID calculation is +named *sharding function*. + +By default CRUD calculates bucket ID using primary key and a function +`vshard.router.bucket_id_strcrc32(key)`, it happen automatically and doesn't +require any actions from user side. However, for operations that accepts +tuple/object bucket ID can be specified as tuple/object field as well as +`opts.bucket_id` value. + +Starting from 0.10.0 users who don't want to use primary key as a sharding key +may set custom sharding key definition as a part of [DDL +schema](https://github.com/tarantool/ddl#input-data-format) or insert manually +to the space `_ddl_sharding_key` (for both cases consider a DDL module +documentation). As soon as sharding key for a certain space is available in +`_ddl_sharding_key` space CRUD will use it for bucket ID calculation +automatically. Note that CRUD methods `delete()`, `get()` and `update()` +requires that sharding key must be a part of primary key. + +Table below describe what operations supports custom sharding key: + +| CRUD method | Sharding key support | +| -------------------------------- | -------------------------- | +| `get()` | Yes | +| `insert()` / `insert_object()` | Yes | +| `delete()` | Yes | +| `replace()` / `replace_object()` | Yes | +| `upsert()` / `upsert_object()` | Yes | +| `select()` / `pairs()` | Yes | +| `update()` | Yes | +| `upsert()` / `upsert_object()` | Yes | +| `replace() / replace_object()` | Yes | +| `min()` / `max()` | No (not required) | +| `cut_rows()` / `cut_objects()` | No (not required) | +| `truncate()` | No (not required) | +| `len()` | No (not required) | + +Current limitations for using custom sharding key: + +- It's not possible to update sharding keys automatically when schema is + updated on storages, see + [#212](https://github.com/tarantool/crud/issues/212). However it is possible + to do it manually with `require('crud.sharding_key').update_cache()`. +- CRUD select may lead map reduce in some cases, see + [#213](https://github.com/tarantool/crud/issues/213). +- No support of JSON path for sharding key, see + [#219](https://github.com/tarantool/crud/issues/219). +- `primary_index_fieldno_map` is not cached, see + [#243](https://github.com/tarantool/crud/issues/243). ### Insert diff --git a/crud.lua b/crud.lua index 751ddd59..2777013e 100644 --- a/crud.lua +++ b/crud.lua @@ -12,6 +12,7 @@ local select = require('crud.select') local truncate = require('crud.truncate') local len = require('crud.len') local borders = require('crud.borders') +local sharding_key = require('crud.common.sharding_key') local utils = require('crud.common.utils') local crud = {} @@ -113,6 +114,7 @@ function crud.init_storage() truncate.init() len.init() borders.init() + sharding_key.init() end function crud.init_router() diff --git a/crud/common/call.lua b/crud/common/call.lua index c5b94c98..65fc5e98 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -145,4 +145,31 @@ function call.single(bucket_id, func_name, func_args, opts) return res end +function call.any(func_name, func_args, opts) + dev_checks('string', '?table', { + timeout = '?number', + }) + + local timeout = opts.timeout or call.DEFAULT_VSHARD_CALL_TIMEOUT + + local replicasets, err = vshard.router.routeall() + if replicasets == nil then + return nil, CallError:new("Failed to get all replicasets: %s", err.err) + end + local replicaset = select(2, next(replicasets)) + + local res, err = replicaset:call(func_name, func_args, { + timeout = timeout, + }) + if err ~= nil then + return nil, wrap_vshard_err(err, func_name, replicaset.uuid) + end + + if res == box.NULL then + return nil + end + + return res +end + return call diff --git a/crud/common/const.lua b/crud/common/const.lua index 269525fe..e546dddc 100644 --- a/crud/common/const.lua +++ b/crud/common/const.lua @@ -2,5 +2,6 @@ local const = {} const.RELOAD_RETRIES_NUM = 1 const.RELOAD_SCHEMA_TIMEOUT = 3 -- 3 seconds +const.FETCH_SHARDING_KEY_TIMEOUT = 3 -- 3 seconds return const diff --git a/crud/common/sharding.lua b/crud/common/sharding.lua index 3cb76d6f..c7802d2e 100644 --- a/crud/common/sharding.lua +++ b/crud/common/sharding.lua @@ -4,6 +4,7 @@ local errors = require('errors') local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false}) local utils = require('crud.common.utils') +local sharding_key_module = require('crud.common.sharding_key') local sharding = {} @@ -20,7 +21,16 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id) return specified_bucket_id end - local key = utils.extract_key(tuple, space.index[0].parts) + local sharding_index_parts = space.index[0].parts + local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space.name) + if err ~= nil then + return nil, err + end + if sharding_key_as_index_obj ~= nil then + sharding_index_parts = sharding_key_as_index_obj.parts + end + local key = utils.extract_key(tuple, sharding_index_parts) + return sharding.key_get_bucket_id(key) end @@ -43,11 +53,15 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_ end end - if tuple[bucket_id_fieldno] == nil then - tuple[bucket_id_fieldno] = sharding.tuple_get_bucket_id(tuple, space) + local bucket_id = tuple[bucket_id_fieldno] + if bucket_id == nil then + bucket_id, err = sharding.tuple_get_bucket_id(tuple, space) + if err ~= nil then + return nil, err + end + tuple[bucket_id_fieldno] = bucket_id end - local bucket_id = tuple[bucket_id_fieldno] return bucket_id end diff --git a/crud/common/sharding_key.lua b/crud/common/sharding_key.lua new file mode 100644 index 00000000..8a1cef62 --- /dev/null +++ b/crud/common/sharding_key.lua @@ -0,0 +1,242 @@ +local fiber = require('fiber') +local errors = require('errors') + +local call = require('crud.common.call') +local const = require('crud.common.const') +local dev_checks = require('crud.common.dev_checks') +local cache = require('crud.common.sharding_key_cache') +local utils = require('crud.common.utils') + +local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false}) +local FetchShardingKeyError = errors.new_class('FetchShardingKeyError', {capture_stack = false}) +local WrongShardingConfigurationError = errors.new_class('WrongShardingConfigurationError', {capture_stack = false}) + +local FETCH_FUNC_NAME = '_crud.fetch_on_storage' + +local sharding_key_module = {} + +-- Function decorator that is used to prevent _fetch_on_router() from being +-- called concurrently by different fibers. +local function locked(f) + dev_checks('function') + + return function(timeout, ...) + local timeout_deadline = fiber.clock() + timeout + local ok = cache.fetch_lock:put(true, timeout) + -- channel:put() returns false in two cases: when timeout is exceeded + -- or channel has been closed. However error message describes only + -- first reason, I'm not sure we need to disclose to users such details + -- like problems with synchronization objects. + if not ok then + return FetchShardingKeyError:new( + "Timeout for fetching sharding key is exceeded") + end + local timeout = timeout_deadline - fiber.clock() + local status, err = pcall(f, timeout, ...) + cache.fetch_lock:get() + if not status or err ~= nil then + return err + end + end +end + +-- Build a structure similar to index, but it is not a real index object, +-- it contains only parts key with fieldno's. +local function as_index_object(space_name, space_format, sharding_key_def) + dev_checks('string', 'table', 'table') + + local fieldnos = {} + local fieldno_map = utils.get_format_fieldno_map(space_format) + for _, field_name in ipairs(sharding_key_def) do + local fieldno = fieldno_map[field_name] + if fieldno == nil then + return nil, WrongShardingConfigurationError:new( + "No such field (%s) in a space format (%s)", field_name, space_name) + end + table.insert(fieldnos, {fieldno = fieldno}) + end + + return {parts = fieldnos} +end + +-- Return a map with metadata or nil when space box.space._ddl_sharding_key is +-- not available on storage. +function sharding_key_module.fetch_on_storage() + local sharding_key_space = box.space._ddl_sharding_key + if sharding_key_space == nil then + return nil + end + + local SPACE_NAME_FIELDNO = 1 + local SPACE_SHARDING_KEY_FIELDNO = 2 + local metadata_map = {} + for _, tuple in sharding_key_space:pairs() do + local space_name = tuple[SPACE_NAME_FIELDNO] + local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO] + local space_format = box.space[space_name]:format() + metadata_map[space_name] = { + sharding_key_def = sharding_key_def, + space_format = space_format, + } + end + + return metadata_map +end + +-- Under high load we may get a case when more than one fiber will fetch +-- metadata from storages. It is not good from performance point of view. +-- locked() wraps a _fetch_on_router() to limit a number of fibers that fetches +-- a sharding metadata by a single one, other fibers will wait while +-- cache.fetch_lock become unlocked during timeout passed to +-- _fetch_on_router(). +local _fetch_on_router = locked(function(timeout) + dev_checks('number') + + if cache.sharding_key_as_index_obj_map ~= nil then + return + end + + local metadata_map, err = call.any(FETCH_FUNC_NAME, {}, { + timeout = timeout + }) + if err ~= nil then + return err + end + if metadata_map == nil then + cache.sharding_key_as_index_obj_map = {} + return + end + + cache.sharding_key_as_index_obj_map = {} + for space_name, metadata in pairs(metadata_map) do + local sharding_key_as_index_obj, err = as_index_object(space_name, + metadata.space_format, + metadata.sharding_key_def) + if err ~= nil then + return err + end + cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj + end +end) + +-- Get sharding index for a certain space. +-- +-- Return: +-- - sharding key as index object, when sharding key definition found on +-- storage. +-- - nil, when sharding key definition was not found on storage. Pay attention +-- that nil without error is a successfull return value. +-- - nil and error, when something goes wrong on fetching attempt. +-- +function sharding_key_module.fetch_on_router(space_name, timeout) + dev_checks('string', '?number') + + if cache.sharding_key_as_index_obj_map ~= nil then + return cache.sharding_key_as_index_obj_map[space_name] + end + + local timeout = timeout or const.FETCH_SHARDING_KEY_TIMEOUT + local err = _fetch_on_router(timeout) + if err ~= nil then + if cache.sharding_key_as_index_obj_map ~= nil then + return cache.sharding_key_as_index_obj_map[space_name] + end + return nil, err + end + + if cache.sharding_key_as_index_obj_map ~= nil then + return cache.sharding_key_as_index_obj_map[space_name] + end + + return nil, FetchShardingKeyError:new( + "Fetching sharding key for space '%s' is failed", space_name) +end + +function sharding_key_module.update_cache(space_name) + cache.drop_caches() + return sharding_key_module.fetch_on_router(space_name) +end + +-- Make sure sharding key definition is a part of primary key. +local function is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj) + dev_checks('string', 'table', 'table') + + if cache.is_part_of_pk[space_name] ~= nil then + return cache.is_part_of_pk[space_name] + end + + local is_part_of_pk = true + local pk_fieldno_map = utils.get_index_fieldno_map(primary_index_parts) + for _, part in ipairs(sharding_key_as_index_obj.parts) do + if pk_fieldno_map[part.fieldno] == nil then + is_part_of_pk = false + break + end + end + cache.is_part_of_pk[space_name] = is_part_of_pk + + return is_part_of_pk +end + +-- Build an array with sharding key values. Function extracts those values from +-- primary key that are part of sharding key (passed as index object). +local function extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj) + dev_checks('table', 'table', 'table') + + -- TODO: extract_from_index() calculates primary_index_parts on each + -- request. It is better to cache it's value. + -- https://github.com/tarantool/crud/issues/243 + local primary_index_fieldno_map = utils.get_index_fieldno_map(primary_index_parts) + + local sharding_key = {} + for _, part in ipairs(sharding_key_as_index_obj.parts) do + -- part_number cannot be nil because earlier we checked that tuple + -- field names defined in sharding key definition are part of primary + -- key. + local part_number = primary_index_fieldno_map[part.fieldno] + assert(part_number ~= nil) + local field_value = primary_key[part_number] + table.insert(sharding_key, field_value) + end + + return sharding_key +end + +-- Extract sharding key from pk. +-- Returns a table with sharding key or pair of nil and error. +function sharding_key_module.extract_from_pk(space_name, primary_index_parts, primary_key, timeout) + dev_checks('string', 'table', '?', '?number') + + local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name, timeout) + if err ~= nil then + return nil, err + end + if sharding_key_as_index_obj == nil then + return primary_key + end + + local res = is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj) + if res == false then + return nil, ShardingKeyError:new( + "Sharding key for space %q is missed in primary index, specify bucket_id", + space_name + ) + end + if type(primary_key) ~= 'table' then + primary_key = {primary_key} + end + + return extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj) +end + +function sharding_key_module.init() + _G._crud.fetch_on_storage = sharding_key_module.fetch_on_storage +end + +sharding_key_module.internal = { + as_index_object = as_index_object, + extract_from_index = extract_from_index, + is_part_of_pk = is_part_of_pk, +} + +return sharding_key_module diff --git a/crud/common/sharding_key_cache.lua b/crud/common/sharding_key_cache.lua new file mode 100644 index 00000000..a1ab3965 --- /dev/null +++ b/crud/common/sharding_key_cache.lua @@ -0,0 +1,18 @@ +local fiber = require('fiber') + +local sharding_key_cache = {} + +sharding_key_cache.sharding_key_as_index_obj_map = nil +sharding_key_cache.fetch_lock = fiber.channel(1) +sharding_key_cache.is_part_of_pk = {} + +function sharding_key_cache.drop_caches() + sharding_key_cache.sharding_key_as_index_obj_map = nil + if sharding_key_cache.fetch_lock ~= nil then + sharding_key_cache.fetch_lock:close() + end + sharding_key_cache.fetch_lock = fiber.channel(1) + sharding_key_cache.is_part_of_pk = {} +end + +return sharding_key_cache diff --git a/crud/common/utils.lua b/crud/common/utils.lua index ac3688aa..d7a62941 100644 --- a/crud/common/utils.lua +++ b/crud/common/utils.lua @@ -421,6 +421,33 @@ function utils.get_bucket_id_fieldno(space, shard_index_name) return bucket_id_index.parts[1].fieldno end +-- Build a map with field number as a keys and part number +-- as a values using index parts as a source. +function utils.get_index_fieldno_map(index_parts) + dev_checks('table') + + local fieldno_map = {} + for i, part in ipairs(index_parts) do + local fieldno = part.fieldno + fieldno_map[fieldno] = i + end + + return fieldno_map +end + +-- Build a map with field names as a keys and fieldno's +-- as a values using space format as a source. +function utils.get_format_fieldno_map(space_format) + dev_checks('table') + + local fieldno_map = {} + for fieldno, field_format in ipairs(space_format) do + fieldno_map[field_format.name] = fieldno + end + + return fieldno_map +end + local uuid_t = ffi.typeof('struct tt_uuid') function utils.is_uuid(value) return ffi.istype(uuid_t, value) diff --git a/crud/compare/conditions.lua b/crud/compare/conditions.lua index 3d963650..25faa6d8 100644 --- a/crud/compare/conditions.lua +++ b/crud/compare/conditions.lua @@ -113,7 +113,7 @@ function conditions.parse(user_conditions) local cond_func = funcs_by_symbols[operator_symbol] if cond_func == nil then return nil, ParseConditionError:new( - "condition[1] %q isn't a valid condition oprator, (condition %s)", operator_symbol, i + "condition[1] %q isn't a valid condition operator, (condition %s)", operator_symbol, i ) end diff --git a/crud/delete.lua b/crud/delete.lua index 9f0497fe..deca7318 100644 --- a/crud/delete.lua +++ b/crud/delete.lua @@ -5,6 +5,7 @@ local vshard = require('vshard') local call = require('crud.common.call') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') +local sharding_key_module = require('crud.common.sharding_key') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') @@ -55,7 +56,17 @@ local function call_delete_on_router(space_name, key, opts) key = key:totable() end - local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id) + local sharding_key = key + if opts.bucket_id == nil then + local err + local primary_index_parts = space.index[0].parts + sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout) + if err ~= nil then + return nil, err + end + end + + local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id) local call_opts = { mode = 'write', timeout = opts.timeout, diff --git a/crud/get.lua b/crud/get.lua index f00957e5..7f474e85 100644 --- a/crud/get.lua +++ b/crud/get.lua @@ -5,6 +5,7 @@ local vshard = require('vshard') local call = require('crud.common.call') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') +local sharding_key_module = require('crud.common.sharding_key') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') @@ -58,7 +59,17 @@ local function call_get_on_router(space_name, key, opts) key = key:totable() end - local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id) + local sharding_key = key + if opts.bucket_id == nil then + local err + local primary_index_parts = space.index[0].parts + sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout) + if err ~= nil then + return nil, err + end + end + + local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id) local call_opts = { mode = opts.mode or 'read', prefer_replica = opts.prefer_replica, diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index 8df9655c..c703615c 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -7,6 +7,7 @@ local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local common = require('crud.select.compat.common') local schema = require('crud.common.schema') +local sharding_key_module = require('crud.common.sharding_key') local compare_conditions = require('crud.compare.conditions') local select_plan = require('crud.select.plan') @@ -50,12 +51,17 @@ local function build_select_iterator(space_name, user_conditions, opts) return nil, SelectError:new("Space %q doesn't exist", space_name), true end local space_format = space:format() + local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name) + if err ~= nil then + return nil, err + end -- plan select local plan, err = select_plan.new(space, conditions, { first = opts.first, after_tuple = opts.after, field_names = opts.field_names, + sharding_key_as_index_obj = sharding_key_as_index_obj, }) if err ~= nil then diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index 84ae06a4..33406edf 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -8,6 +8,7 @@ local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') +local sharding_key_module = require('crud.common.sharding_key') local compare_conditions = require('crud.compare.conditions') local select_plan = require('crud.select.plan') @@ -102,6 +103,10 @@ local function build_select_iterator(space_name, user_conditions, opts) return nil, SelectError:new("Space %q doesn't exist", space_name), true end local space_format = space:format() + local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name) + if err ~= nil then + return nil, err + end -- plan select local plan, err = select_plan.new(space, conditions, { @@ -109,6 +114,7 @@ local function build_select_iterator(space_name, user_conditions, opts) after_tuple = opts.after, field_names = opts.field_names, force_map_call = opts.force_map_call, + sharding_key_as_index_obj = sharding_key_as_index_obj, }) if err ~= nil then diff --git a/crud/select/plan.lua b/crud/select/plan.lua index cc46f3b8..260ed207 100644 --- a/crud/select/plan.lua +++ b/crud/select/plan.lua @@ -126,6 +126,7 @@ function select_plan.new(space, conditions, opts) after_tuple = '?table|cdata', field_names = '?table', force_map_call = '?boolean', + sharding_key_as_index_obj = '?table', }) conditions = conditions ~= nil and conditions or {} @@ -226,7 +227,7 @@ function select_plan.new(space, conditions, opts) end end - local sharding_index = primary_index -- XXX: only sharding by primary key is supported + local sharding_index = opts.sharding_key_as_index_obj or primary_index -- get sharding key value local sharding_key @@ -234,11 +235,6 @@ function select_plan.new(space, conditions, opts) sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index) end - if sharding_key ~= nil and opts.force_map_call ~= true then - total_tuples_count = 1 - scan_iter = box.index.REQ - end - local plan = { conditions = conditions, space_name = space_name, diff --git a/crud/update.lua b/crud/update.lua index 3c551310..8e7b1aa0 100644 --- a/crud/update.lua +++ b/crud/update.lua @@ -5,6 +5,7 @@ local vshard = require('vshard') local call = require('crud.common.call') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') +local sharding_key_module = require('crud.common.sharding_key') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') @@ -83,6 +84,16 @@ local function call_update_on_router(space_name, key, user_operations, opts) key = key:totable() end + local sharding_key = key + if opts.bucket_id == nil then + local err + local primary_index_parts = space.index[0].parts + sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout) + if err ~= nil then + return nil, err + end + end + local operations = user_operations if not utils.tarantool_supports_fieldpaths() then operations, err = utils.convert_operations(user_operations, space_format) @@ -91,7 +102,7 @@ local function call_update_on_router(space_name, key, user_operations, opts) end end - local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id) + local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id) local call_opts = { mode = 'write', timeout = opts.timeout, diff --git a/test/entrypoint/srv_ddl.lua b/test/entrypoint/srv_ddl.lua new file mode 100755 index 00000000..61c35da0 --- /dev/null +++ b/test/entrypoint/srv_ddl.lua @@ -0,0 +1,144 @@ +#!/usr/bin/env tarantool + +require('strict').on() +_G.is_initialized = function() return false end + +local log = require('log') +local errors = require('errors') +local cartridge = require('cartridge') +local ddl = require('ddl') + +package.preload['customers-storage'] = function() + return { + role_name = 'customers-storage', + init = function() + local engine = os.getenv('ENGINE') or 'memtx' + local customers_schema = { + engine = engine, + is_local = true, + temporary = false, + format = { + {name = 'id', is_nullable = false, type = 'unsigned'}, + {name = 'bucket_id', is_nullable = false, type = 'unsigned'}, + {name = 'name', is_nullable = false, type = 'string'}, + {name = 'age', is_nullable = false, type = 'number'}, + }, + indexes = { + -- This table is intentionally blank. + }, + } + + local primary_index = { + name = 'id', + type = 'TREE', + unique = true, + parts = { + {path = 'id', is_nullable = false, type = 'unsigned'}, + {path = 'name', is_nullable = false, type = 'string'}, + }, + } + local primary_index_id = { + name = 'id', + type = 'TREE', + unique = true, + parts = { + {path = 'id', is_nullable = false, type = 'unsigned'}, + }, + } + local bucket_id_index = { + name = 'bucket_id', + type = 'TREE', + unique = false, + parts = { + {path = 'bucket_id', is_nullable = false, type = 'unsigned'}, + } + } + local name_index = { + name = 'name', + type = 'TREE', + unique = true, + parts = { + {path = 'name', is_nullable = false, type = 'string'}, + }, + } + local secondary_index = { + name = 'secondary', + type = 'TREE', + unique = false, + parts = { + {path = 'id', is_nullable = false, type = 'unsigned'}, + {path = 'name', is_nullable = false, type = 'string'}, + }, + } + + local customers_name_key_schema = table.deepcopy(customers_schema) + customers_name_key_schema.sharding_key = {'name'} + table.insert(customers_name_key_schema.indexes, primary_index) + table.insert(customers_name_key_schema.indexes, bucket_id_index) + + local customers_name_key_uniq_index_schema = table.deepcopy(customers_schema) + customers_name_key_uniq_index_schema.sharding_key = {'name'} + table.insert(customers_name_key_uniq_index_schema.indexes, primary_index) + table.insert(customers_name_key_uniq_index_schema.indexes, bucket_id_index) + table.insert(customers_name_key_uniq_index_schema.indexes, name_index) + + local customers_name_key_non_uniq_index_schema = table.deepcopy(customers_schema) + customers_name_key_non_uniq_index_schema.sharding_key = {'name'} + name_index.unique = false + table.insert(customers_name_key_non_uniq_index_schema.indexes, primary_index) + table.insert(customers_name_key_non_uniq_index_schema.indexes, bucket_id_index) + table.insert(customers_name_key_non_uniq_index_schema.indexes, name_index) + + local customers_secondary_idx_name_key_schema = table.deepcopy(customers_schema) + customers_secondary_idx_name_key_schema.sharding_key = {'name'} + table.insert(customers_secondary_idx_name_key_schema.indexes, primary_index_id) + table.insert(customers_secondary_idx_name_key_schema.indexes, secondary_index) + table.insert(customers_secondary_idx_name_key_schema.indexes, bucket_id_index) + + local customers_age_key_schema = table.deepcopy(customers_schema) + customers_age_key_schema.sharding_key = {'age'} + table.insert(customers_age_key_schema.indexes, primary_index) + table.insert(customers_age_key_schema.indexes, bucket_id_index) + + local schema = { + spaces = { + customers_name_key = customers_name_key_schema, + customers_name_key_uniq_index = customers_name_key_uniq_index_schema, + customers_name_key_non_uniq_index = customers_name_key_non_uniq_index_schema, + customers_secondary_idx_name_key = customers_secondary_idx_name_key_schema, + customers_age_key = customers_age_key_schema, + } + } + + if not box.info.ro then + local ok, err = ddl.set_schema(schema) + if not ok then + error(err) + end + end + + rawset(_G, 'set_sharding_key', function(space_name, sharding_key_def) + local fieldno_sharding_key = 2 + box.space['_ddl_sharding_key']:update(space_name, {{'=', fieldno_sharding_key, sharding_key_def}}) + end) + end, + } +end + +local ok, err = errors.pcall('CartridgeCfgError', cartridge.cfg, { + advertise_uri = 'localhost:3301', + http_port = 8081, + bucket_count = 3000, + roles = { + 'customers-storage', + 'cartridge.roles.crud-router', + 'cartridge.roles.crud-storage', + }, +}) + +if not ok then + log.error('%s', err) + os.exit(1) +end + +_G.is_initialized = cartridge.is_healthy diff --git a/test/helper.lua b/test/helper.lua index 64f6b48f..913aa56c 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -323,4 +323,13 @@ function helpers.tarantool_version_at_least(wanted_major, wanted_minor, return true end +function helpers.update_cache(cluster, space_name) + return cluster.main_server.net_box:eval([[ + local sharding_key = require('crud.common.sharding_key') + + local space_name = ... + return sharding_key.update_cache(space_name) + ]], {space_name}) +end + return helpers diff --git a/test/integration/ddl_sharding_key_test.lua b/test/integration/ddl_sharding_key_test.lua new file mode 100644 index 00000000..3bdaf67b --- /dev/null +++ b/test/integration/ddl_sharding_key_test.lua @@ -0,0 +1,612 @@ +local fio = require('fio') +local crud = require('crud') +local t = require('luatest') + +local helpers = require('test.helper') +local storage_stat = require('test.helpers.storage_stat') + +local ok = pcall(require, 'ddl') +if not ok then + t.skip('Lua module ddl is required to run test') +end + +local pgroup = t.group('ddl_sharding_key', { + {engine = 'memtx'}, + {engine = 'vinyl'}, +}) + +pgroup.before_all(function(g) + g.cluster = helpers.Cluster:new({ + datadir = fio.tempdir(), + server_command = helpers.entrypoint('srv_ddl'), + use_vshard = true, + replicasets = helpers.get_test_replicasets(), + env = { + ['ENGINE'] = g.params.engine, + }, + }) + g.cluster:start() + local result, err = g.cluster.main_server.net_box:eval([[ + local ddl = require('ddl') + + local ok, err = ddl.get_schema() + return ok, err + ]]) + t.assert_equals(type(result), 'table') + t.assert_equals(err, nil) + + helpers.call_on_storages(g.cluster, function(server) + server.net_box:eval([[ + local storage_stat = require('test.helpers.storage_stat') + storage_stat.init_on_storage() + ]]) + end) +end) + +pgroup.after_all(function(g) helpers.stop_cluster(g.cluster) end) + +pgroup.before_each(function(g) + helpers.truncate_space_on_cluster(g.cluster, 'customers_name_key') + helpers.truncate_space_on_cluster(g.cluster, 'customers_name_key_uniq_index') + helpers.truncate_space_on_cluster(g.cluster, 'customers_name_key_non_uniq_index') + helpers.truncate_space_on_cluster(g.cluster, 'customers_secondary_idx_name_key') + helpers.truncate_space_on_cluster(g.cluster, 'customers_age_key') +end) + +pgroup.test_insert_object = function(g) + local result, err = g.cluster.main_server.net_box:call( + 'crud.insert_object', {'customers_name_key', {id = 1, name = 'Augustus', age = 48}}) + t.assert_equals(err, nil) + + t.assert_equals(result.metadata, { + {is_nullable = false, name = 'id', type = 'unsigned'}, + {is_nullable = false, name = 'bucket_id', type = 'unsigned'}, + {is_nullable = false, name = 'name', type = 'string'}, + {is_nullable = false, name = 'age', type = 'number'}, + }) + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_equals(objects, {{id = 1, bucket_id = 782, name = 'Augustus', age = 48}}) + + local conn_s1 = g.cluster:server('s1-master').net_box + -- There is no tuple on s1 that we inserted before using crud.insert_object(). + local result = conn_s1.space['customers_name_key']:get({1, 'Augustus'}) + t.assert_equals(result, nil) + + local conn_s2 = g.cluster:server('s2-master').net_box + -- There is a tuple on s2 that we inserted before using crud.insert_object(). + local result = conn_s2.space['customers_name_key']:get({1, 'Augustus'}) + t.assert_equals(result, {1, 782, 'Augustus', 48}) + +end + +pgroup.test_insert = function(g) + -- Insert a tuple. + local result, err = g.cluster.main_server.net_box:call( + 'crud.insert', {'customers_name_key', {2, box.NULL, 'Ivan', 20}}) + t.assert_equals(err, nil) + t.assert_equals(result.metadata, { + {is_nullable = false, name = 'id', type = 'unsigned'}, + {is_nullable = false, name = 'bucket_id', type = 'unsigned'}, + {is_nullable = false, name = 'name', type = 'string'}, + {is_nullable = false, name = 'age', type = 'number'}, + }) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows[1], {2, 1366, 'Ivan', 20}) + + -- There is a tuple on s2 that we inserted before using crud.insert(). + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers_name_key']:get({2, 'Ivan'}) + t.assert_equals(result, {2, 1366, 'Ivan', 20}) + + -- There is no tuple on s1 that we inserted before using crud.insert(). + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers_name_key']:get({2, 'Ivan'}) + t.assert_equals(result, nil) +end + +pgroup.test_replace = function(g) + -- bucket_id is 596, storage is s-2 + local tuple = {7, 596, 'Dimitrion', 20} + + -- Put tuple to s1 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers_name_key']:insert(tuple) + t.assert_not_equals(result, nil) + + -- Put tuple to s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers_name_key']:insert(tuple) + t.assert_not_equals(result, nil) + + local tuple = {7, box.NULL, 'Augustus', 21} + + -- Replace a tuple. + local result, err = g.cluster.main_server.net_box:call('crud.replace', { + 'customers_name_key', tuple + }) + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows[1], {7, 782, 'Augustus', 21}) + + -- There is no replaced tuple on s1 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers_name_key']:get({7, 'Augustus'}) + t.assert_equals(result, nil) + + -- There is replaced tuple on s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers_name_key']:get({7, 'Augustus'}) + t.assert_equals(result, {7, 782, 'Augustus', 21}) +end + +pgroup.test_replace_object = function(g) + -- bucket_id is 596, storage is s-2 + local tuple = {8, 596, 'Dimitrion', 20} + + -- Put tuple to s1 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers_name_key']:insert(tuple) + t.assert_not_equals(result, nil) + + -- Put tuple to s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers_name_key']:insert(tuple) + t.assert_not_equals(result, nil) + + -- Replace an object. + local result, err = g.cluster.main_server.net_box:call( + 'crud.replace_object', {'customers_name_key', {id = 8, name = 'John Doe', age = 25}}) + t.assert_equals(err, nil) + local objects = crud.unflatten_rows(result.rows, result.metadata) + t.assert_equals(objects, {{id = 8, bucket_id = 1035, name = 'John Doe', age = 25}}) + + -- There is no replaced tuple on s1 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers_name_key']:get({8, 'John Doe'}) + t.assert_equals(result, nil) + + -- There is replaced tuple on s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers_name_key']:get({8, 'John Doe'}) + t.assert_equals(result, {8, 1035, 'John Doe', 25}) +end + +pgroup.test_upsert_object = function(g) + -- Upsert an object first time. + local result, err = g.cluster.main_server.net_box:call( + 'crud.upsert_object', {'customers_name_key', {id = 66, name = 'Jack Sparrow', age = 25}, { + {'+', 'age', 25}, + }}) + t.assert_equals(#result.rows, 0) + t.assert_equals(result.metadata, { + {is_nullable = false, name = 'id', type = 'unsigned'}, + {is_nullable = false, name = 'bucket_id', type = 'unsigned'}, + {is_nullable = false, name = 'name', type = 'string'}, + {is_nullable = false, name = 'age', type = 'number'}, + }) + t.assert_equals(err, nil) + + -- There is a tuple on s1 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers_name_key']:get({66, 'Jack Sparrow'}) + t.assert_equals(result, {66, 2719, 'Jack Sparrow', 25}) + + -- There is no tuple on s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers_name_key']:get({66, 'Jack Sparrow'}) + t.assert_equals(result, nil) + + -- Upsert the same query second time when tuple exists. + local result, err = g.cluster.main_server.net_box:call( + 'crud.upsert_object', {'customers_name_key', {id = 66, name = 'Jack Sparrow', age = 25}, { + {'+', 'age', 25}, + }}) + t.assert_equals(#result.rows, 0) + t.assert_equals(err, nil) + + -- There is an updated tuple on s1 replicaset. + local result = conn_s1.space['customers_name_key']:get({66, 'Jack Sparrow'}) + t.assert_equals(result, {66, 2719, 'Jack Sparrow', 50}) + + -- There is no tuple on s2 replicaset. + local result = conn_s2.space['customers_name_key']:get({66, 'Jack Sparrow'}) + t.assert_equals(result, nil) +end + +pgroup.test_upsert = function(g) + local tuple = {1, box.NULL, 'John', 25} + + -- Upsert an object first time. + local result, err = g.cluster.main_server.net_box:call('crud.upsert', { + 'customers_name_key', tuple, {} + }) + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 0) + + -- There is a tuple on s1 replicaset. + local conn_s1 = g.cluster:server('s1-master').net_box + local result = conn_s1.space['customers_name_key']:get({1, 'John'}) + t.assert_equals(result, {1, 2699, 'John', 25}) + + -- There is no tuple on s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers_name_key']:get({1, 'John'}) + t.assert_equals(result, nil) + + -- Upsert the same query second time when tuple exists. + local result, err = g.cluster.main_server.net_box:call( + 'crud.upsert_object', {'customers_name_key', {id = 1, name = 'John', age = 25}, { + {'+', 'age', 25}, + }}) + t.assert_equals(#result.rows, 0) + t.assert_equals(err, nil) + + -- There is an updated tuple on s1 replicaset. + local result = conn_s1.space['customers_name_key']:get({1, 'John'}) + t.assert_equals(result, {1, 2699, 'John', 50}) + + -- There is no tuple on s2 replicaset. + local result = conn_s2.space['customers_name_key']:get({1, 'John'}) + t.assert_equals(result, nil) +end + +-- The main purpose of testcase is to verify that CRUD will calculate bucket_id +-- using secondary sharding key (name) correctly and will get tuple on storage +-- in replicaset s2. +-- bucket_id was calculated using function below: +-- function(key) +-- return require('vshard.hash').strcrc32(key) % 3000 + 1 +-- end +-- where 3000 is a default number of buckets used in vshard. +pgroup.test_select = function(g) + -- bucket_id is 234, storage is s-2 + local tuple = {8, 234, 'Ptolemy', 20} + + -- Put tuple to s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers_name_key']:insert(tuple) + t.assert_not_equals(result, nil) + + local conditions = {{'==', 'name', 'Ptolemy'}} + local result, err = g.cluster.main_server.net_box:call('crud.select', { + 'customers_name_key', conditions, + }) + + t.assert_equals(err, nil) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows[1], tuple) +end + +-- TODO: After enabling support of sharding keys that are not equal to primary +-- keys, we should handle it differently: it is not enough to look just on scan +-- value, we should traverse all conditions. Now missed cases lead to +-- map-reduce. Will be resolved in #213. +pgroup.test_select_wont_lead_map_reduce = function(g) + local space_name = 'customers_name_key_uniq_index' + + local conn_s1 = g.cluster:server('s1-master').net_box + local conn_s2 = g.cluster:server('s2-master').net_box + + -- bucket_id is 477, storage is s-2 + local result = conn_s2.space[space_name]:insert({1, 477, 'Viktor Pelevin', 58}) + t.assert_not_equals(result, nil) + -- bucket_id is 401, storage is s-1 + local result = conn_s1.space[space_name]:insert({2, 401, 'Isaac Asimov', 72}) + t.assert_not_equals(result, nil) + -- bucket_id is 2804, storage is s-2 + local result = conn_s2.space[space_name]:insert({3, 2804, 'Aleksandr Solzhenitsyn', 89}) + t.assert_not_equals(result, nil) + -- bucket_id is 1161, storage is s-2 + local result = conn_s2.space[space_name]:insert({4, 1161, 'James Joyce', 59}) + t.assert_not_equals(result, nil) + + local stat_a = storage_stat.collect(g.cluster) + + -- Select a tuple with name 'Viktor Pelevin'. + local result, err = g.cluster.main_server.net_box:call('crud.select', { + space_name, {{'==', 'name', 'Viktor Pelevin'}} + }) + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + + local stat_b = storage_stat.collect(g.cluster) + + -- Check a number of select() requests made by CRUD on cluster's storages + -- after calling select() on a router. Make sure only a single storage has + -- a single select() request. Otherwise we lead map-reduce. + t.assert_equals(storage_stat.diff(stat_b, stat_a), { + ['s-1'] = { + select_requests = 0, + }, + ['s-2'] = { + select_requests = 1, + }, + }) +end + +pgroup.test_select_secondary_idx = function(g) + local tuple = {2, box.NULL, 'Ivan', 20} + + -- insert tuple + local result, err = g.cluster.main_server.net_box:call('crud.insert', { + 'customers_secondary_idx_name_key', tuple + }) + + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + + local conditions = {{'==', 'name', 'Ivan'}} + + local result, err = g.cluster.main_server.net_box:call('crud.select', { + 'customers_secondary_idx_name_key', conditions, + }) + + t.assert_equals(err, nil) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows[1], {2, 1366, 'Ivan', 20}) +end + +pgroup.test_select_non_unique_index = function(g) + local space_name = 'customers_name_key_non_uniq_index' + local customers = helpers.insert_objects(g, space_name, { + {id = 1, name = 'Viktor Pelevin', age = 58}, + {id = 2, name = 'Isaac Asimov', age = 72}, + {id = 3, name = 'Aleksandr Solzhenitsyn', age = 89}, + {id = 4, name = 'James Joyce', age = 59}, + {id = 5, name = 'Oscar Wilde', age = 46}, + -- First tuple with name 'Ivan Bunin'. + {id = 6, name = 'Ivan Bunin', age = 83}, + {id = 7, name = 'Ivan Turgenev', age = 64}, + {id = 8, name = 'Alexander Ostrovsky', age = 63}, + {id = 9, name = 'Anton Chekhov', age = 44}, + -- Second tuple with name 'Ivan Bunin'. + {id = 10, name = 'Ivan Bunin', age = 83}, + }) + t.assert_equals(#customers, 10) + + local result, err = g.cluster.main_server.net_box:call('crud.select', { + space_name, {{'==', 'name', 'Ivan Bunin'}} + }) + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 2) +end + +pgroup.test_update = function(g) + -- bucket_id is 1366, storage is s-2 + local tuple = {2, 1366, 'Ivan', 10} + + local conn_s1 = g.cluster:server('s1-master').net_box + local conn_s2 = g.cluster:server('s2-master').net_box + + -- Put tuple with to s1 replicaset. + local result = conn_s1.space['customers_name_key']:insert(tuple) + t.assert_not_equals(result, nil) + + -- Put tuple with to s2 replicaset. + local result = conn_s2.space['customers_name_key']:insert(tuple) + t.assert_not_equals(result, nil) + + -- Update a tuple. + local update_operations = { + {'+', 'age', 10}, + } + local result, err = g.cluster.main_server.net_box:call('crud.update', { + 'customers_name_key', {2, 'Ivan'}, update_operations, + }) + t.assert_equals(err, nil) + t.assert_equals(#result.rows, 1) + t.assert_equals(result.rows, {{2, 1366, 'Ivan', 20}}) + + -- Tuple on s1 replicaset was not updated. + local result = conn_s1.space['customers_name_key']:get({2, 'Ivan'}) + t.assert_equals(result, {2, 1366, 'Ivan', 10}) + + -- Tuple on s2 replicaset was updated. + local result = conn_s2.space['customers_name_key']:get({2, 'Ivan'}) + t.assert_equals(result, {2, 1366, 'Ivan', 20}) +end + +pgroup.test_get = function(g) + -- bucket_id is 596, storage is s-2 + local tuple = {7, 596, 'Dimitrion', 20} + + -- Put tuple to s2 replicaset. + local conn_s2 = g.cluster:server('s2-master').net_box + local result = conn_s2.space['customers_name_key']:insert(tuple) + t.assert_not_equals(result, nil) + + -- Get a tuple. + local result, err = g.cluster.main_server.net_box:call('crud.get', { + 'customers_name_key', {7, 'Dimitrion'}, + }) + t.assert_equals(err, nil) + t.assert_equals(result.rows, {{7, 596, 'Dimitrion', 20}}) +end + +pgroup.test_delete = function(g) + -- bucket_id is 596, storage is s-2 + local tuple = {7, 596, 'Dimitrion', 20} + + local conn_s1 = g.cluster:server('s1-master').net_box + local conn_s2 = g.cluster:server('s2-master').net_box + + -- Put tuple to s1 replicaset. + local result = conn_s1.space['customers_name_key']:insert(tuple) + t.assert_not_equals(result, nil) + + -- Put tuple to s2 replicaset. + local result = conn_s2.space['customers_name_key']:insert(tuple) + t.assert_not_equals(result, nil) + + -- Delete tuple. + local _, err = g.cluster.main_server.net_box:call('crud.delete', { + 'customers_name_key', {7, 'Dimitrion'}, + }) + t.assert_equals(err, nil) + + -- There is a tuple on s1 replicaset. + local result = conn_s1.space['customers_name_key']:get({7, 'Dimitrion'}) + t.assert_equals(result, {7, 596, 'Dimitrion', 20}) + + -- There is no tuple on s2 replicaset. + local result = conn_s2.space['customers_name_key']:get({7, 'Dimitrion'}) + t.assert_equals(result, nil) +end + +pgroup.test_delete_incomplete_sharding_key = function(g) + local tuple = {2, box.NULL, 'Viktor Pelevin', 58} + + -- insert tuple + local result, err = g.cluster.main_server.net_box:call('crud.insert', { + 'customers_age_key', tuple + }) + + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + + local result, err = g.cluster.main_server.net_box:call('crud.delete', { + 'customers_age_key', {58, 'Viktor Pelevin'} + }) + + t.assert_str_contains(err.err, + "Sharding key for space \"customers_age_key\" is missed in primary index, specify bucket_id") + t.assert_equals(result, nil) +end + +pgroup.test_get_incomplete_sharding_key = function(g) + local tuple = {2, box.NULL, 'Viktor Pelevin', 58} + + -- insert tuple + local result, err = g.cluster.main_server.net_box:call('crud.insert', { + 'customers_age_key', tuple + }) + + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + + local result, err = g.cluster.main_server.net_box:call('crud.get', { + 'customers_age_key', {58, 'Viktor Pelevin'} + }) + + t.assert_str_contains(err.err, + "Sharding key for space \"customers_age_key\" is missed in primary index, specify bucket_id") + t.assert_equals(result, nil) +end + +pgroup.test_update_incomplete_sharding_key = function(g) + local tuple = {2, box.NULL, 'Viktor Pelevin', 58} + + -- insert tuple + local result, err = g.cluster.main_server.net_box:call('crud.insert', { + 'customers_age_key', tuple + }) + + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + + local update_operations = { + {'=', 'age', 60}, + } + + local result, err = g.cluster.main_server.net_box:call('crud.update', { + 'customers_age_key', {2, 'Viktor Pelevin'}, update_operations, + }) + + t.assert_str_contains(err.err, + "Sharding key for space \"customers_age_key\" is missed in primary index, specify bucket_id") + t.assert_equals(result, nil) +end + +pgroup.test_get_secondary_idx = function(g) + local tuple = {4, box.NULL, 'Leo', 44} + + -- insert tuple + local result, err = g.cluster.main_server.net_box:call('crud.insert', { + 'customers_secondary_idx_name_key', tuple + }) + + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + + -- get + local result, err = g.cluster.main_server.net_box:call('crud.get', + {'customers_secondary_idx_name_key', {4, 'Leo'}}) + + t.assert_str_contains(err.err, + "Sharding key for space \"customers_secondary_idx_name_key\" is missed in primary index, specify bucket_id") + t.assert_equals(result, nil) +end + +pgroup.test_update_secondary_idx = function(g) + local tuple = {6, box.NULL, 'Victor', 58} + + -- insert tuple + local result, err = g.cluster.main_server.net_box:call('crud.insert', { + 'customers_secondary_idx_name_key', tuple + }) + + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + + local update_operations = { + {'=', 'age', 58}, + } + + local result, err = g.cluster.main_server.net_box:call('crud.update', { + 'customers_secondary_idx_name_key', {6, 'Victor'}, update_operations, + }) + + t.assert_str_contains(err.err, + "Sharding key for space \"customers_secondary_idx_name_key\" is missed in primary index, specify bucket_id") + t.assert_equals(result, nil) +end + +pgroup.test_delete_secondary_idx = function(g) + local tuple = {8, box.NULL, 'Alexander', 37} + + -- insert tuple + local result, err = g.cluster.main_server.net_box:call('crud.insert', { + 'customers_secondary_idx_name_key', tuple + }) + + t.assert_equals(err, nil) + t.assert_not_equals(result, nil) + t.assert_equals(#result.rows, 1) + + local result, err = g.cluster.main_server.net_box:call('crud.delete', { + 'customers_secondary_idx_name_key', {8, 'Alexander'} + }) + + t.assert_str_contains(err.err, + "Sharding key for space \"customers_secondary_idx_name_key\" is missed in primary index, specify bucket_id") + t.assert_equals(result, nil) +end + +pgroup.test_update_cache = function(g) + local space_name = 'customers_name_key' + local sharding_key_as_index_obj = helpers.update_cache(g.cluster, space_name) + t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 3}}}) + + helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server) + server.net_box:call('set_sharding_key', {space_name, {'age'}}) + end) + sharding_key_as_index_obj = helpers.update_cache(g.cluster, space_name) + t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 4}}}) + + -- Recover sharding key. + helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server) + server.net_box:call('set_sharding_key', {space_name, {'name'}}) + end) + sharding_key_as_index_obj = helpers.update_cache(g.cluster, space_name) + t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 3}}}) +end diff --git a/test/unit/call_test.lua b/test/unit/call_test.lua index 52024495..b189ed08 100644 --- a/test/unit/call_test.lua +++ b/test/unit/call_test.lua @@ -225,3 +225,32 @@ g.test_map_vshard_calls = function() mode = 'read', prefer_replica = true, balance = true, }) end + +g.test_any_vshard_call = function() + g.clear_vshard_calls() + local results, err = g.cluster.main_server.net_box:eval([[ + local call = require('crud.common.call') + return call.any('say_hi_politely', {'dude'}, {}) + ]]) + + t.assert_equals(results, 'HI, dude! I am s2-master') + t.assert_equals(err, nil) +end + +g.test_any_vshard_call_timeout = function() + local timeout = 0.2 + + local results, err = g.cluster.main_server.net_box:eval([[ + local call = require('crud.common.call') + + local say_hi_timeout, call_timeout = ... + + return call.any('say_hi_sleepily', {say_hi_timeout}, { + timeout = call_timeout, + }) + ]], {timeout + 0.1, timeout}) + + t.assert_equals(results, nil) + t.assert_str_contains(err.err, "Failed for %w+%-0000%-0000%-0000%-000000000000", true) + t.assert_str_contains(err.err, "Timeout exceeded") +end diff --git a/test/unit/parse_conditions_test.lua b/test/unit/parse_conditions_test.lua index 137246a6..e4a34212 100644 --- a/test/unit/parse_conditions_test.lua +++ b/test/unit/parse_conditions_test.lua @@ -86,7 +86,7 @@ g.test_parse_errors = function() local _, err = compare_conditions.parse(user_conditions) t.assert_str_contains( err.err, - 'condition[1] "===" isn\'t a valid condition oprator, (condition 2)' + 'condition[1] "===" isn\'t a valid condition operator, (condition 2)' ) -- bad operand diff --git a/test/unit/select_plan_test.lua b/test/unit/select_plan_test.lua index 026c3e7a..cc0dee3f 100644 --- a/test/unit/select_plan_test.lua +++ b/test/unit/select_plan_test.lua @@ -159,7 +159,6 @@ g.test_is_scan_by_full_sharding_key_eq = function() t.assert_equals(err, nil) - t.assert_equals(plan.total_tuples_count, 1) t.assert_equals(plan.sharding_key, {15}) -- id is a part of scan index @@ -173,7 +172,6 @@ g.test_is_scan_by_full_sharding_key_eq = function() t.assert_equals(plan.index_id, 3) -- index name_id is used t.assert_equals(plan.scan_value, {'Ivan', 11}) - t.assert_equals(plan.total_tuples_count, 1) t.assert_equals(plan.sharding_key, {11}) -- other index is first @@ -221,7 +219,6 @@ g.test_is_scan_by_full_sharding_key_eq = function() t.assert_equals(err, nil) - t.assert_equals(plan.total_tuples_count, 1) t.assert_equals(plan.sharding_key, {1, 0}) end diff --git a/test/unit/sharding_key_test.lua b/test/unit/sharding_key_test.lua new file mode 100644 index 00000000..110eab1c --- /dev/null +++ b/test/unit/sharding_key_test.lua @@ -0,0 +1,233 @@ +local t = require('luatest') +local sharding_key_module = require('crud.common.sharding_key') +local cache = require('crud.common.sharding_key_cache') +local utils = require('crud.common.utils') + +local helpers = require('test.helper') + +local g = t.group('sharding_key') + +g.before_each(function() + local sharding_key_format = { + {name = 'space_name', type = 'string', is_nullable = false}, + {name = 'sharding_key', type = 'array', is_nullable = false} + } + -- Create a space _ddl_sharding_key with a tuple that + -- contains a space name and it's sharding key. + if type(box.cfg) ~= 'table' then + helpers.box_cfg() + end + box.schema.space.create('_ddl_sharding_key', { + format = sharding_key_format, + }) + box.space._ddl_sharding_key:create_index('pk') + box.schema.space.create('fetch_on_storage') +end) + +g.after_each(function() + -- Cleanup. + if box.space._ddl_sharding_key ~= nil then + box.space._ddl_sharding_key:drop() + end + box.space.fetch_on_storage:drop() + cache.drop_caches() +end) + +g.test_as_index_object_positive = function() + local space_name = 'as_index_object' + local space_format = { + {name = 'id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'unsigned'}, + } + local sharding_key_def = {'name', 'age'} + + local index_obj, err = sharding_key_module.internal.as_index_object(space_name, + space_format, + sharding_key_def) + t.assert_equals(err, nil) + t.assert_equals(index_obj, { + parts = { + {fieldno = 2}, + {fieldno = 3}, + } + }) +end + +g.test_as_index_object_negative = function() + local space_name = 'as_index_object' + local space_format = { + {name = 'id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'unsigned'}, + } + local sharding_key_def = {'dude', 'age'} + + local index_obj, err = sharding_key_module.internal.as_index_object(space_name, + space_format, + sharding_key_def) + t.assert_str_contains(err.err, + 'No such field (dude) in a space format (as_index_object)') + t.assert_equals(index_obj, nil) +end + +g.test_get_format_fieldno_map = function() + local space_format = { + {name = 'id', type = 'unsigned'}, + {name = 'name', type = 'string'}, + {name = 'age', type = 'unsigned'}, + } + + local fieldno_map = utils.get_format_fieldno_map(space_format) + t.assert_equals(fieldno_map, {age = 3, id = 1, name = 2}) +end + +g.test_fetch_on_storage_positive = function() + local space_name = 'fetch_on_storage' + local sharding_key_def = {'name', 'age'} + box.space._ddl_sharding_key:insert({space_name, sharding_key_def}) + + local metadata_map = sharding_key_module.fetch_on_storage() + + t.assert_equals(metadata_map, { + [space_name] = { + sharding_key_def = sharding_key_def, + space_format = {} + }, + }) +end + +g.test_fetch_on_storage_negative = function() + -- Test checks return value when _ddl_sharding_key is absent. + box.space._ddl_sharding_key:drop() + + local metadata_map = sharding_key_module.fetch_on_storage() + t.assert_equals(metadata_map, nil) +end + +g.test_extract_from_index_sharding_key_direct_order = function() + local primary_index_parts = { + {fieldno = 1}, + {fieldno = 2}, + } + local sharding_key_as_index_obj = { + parts = { + {fieldno = 1}, + {fieldno = 2}, + } + } + local primary_key = {'name', 'age'} + + local extract_from_index = sharding_key_module.internal.extract_from_index + local sharding_key = extract_from_index(primary_key, + primary_index_parts, + sharding_key_as_index_obj) + t.assert_equals(sharding_key, {'name', 'age'}) +end + +g.test_extract_from_index_sharding_key_reverse_order = function() + local primary_index_parts = { + {fieldno = 1}, + {fieldno = 2}, + } + local sharding_key_as_index_obj = { + parts = { + {fieldno = 2}, + {fieldno = 1}, + } + } + local primary_key = {'name', 'age'} + + local extract_from_index = sharding_key_module.internal.extract_from_index + local sharding_key = extract_from_index(primary_key, + primary_index_parts, + sharding_key_as_index_obj) + t.assert_equals(sharding_key, {'age', 'name'}) +end + +g.test_extract_from_index_sharding_key_single_field = function() + local primary_index_parts = { + {fieldno = 1}, + {fieldno = 2}, + {fieldno = 3}, + } + local sharding_key_as_index_obj = { + parts = { + {fieldno = 2}, + } + } + local primary_key = {'name', 'age', 'location'} + + local extract_from_index = sharding_key_module.internal.extract_from_index + local sharding_key = extract_from_index(primary_key, + primary_index_parts, + sharding_key_as_index_obj) + t.assert_equals(sharding_key, {'age'}) +end + +g.test_extract_from_index_sharding_key_none_fields = function() + local primary_index_parts = { + {fieldno = 1}, + {fieldno = 3}, + } + local sharding_key_as_index_obj = { + parts = { + {fieldno = 2}, + } + } + local primary_key = {'name', 'age', 'location'} + + local extract_from_index = sharding_key_module.internal.extract_from_index + local ok, err = pcall(extract_from_index, primary_key, + primary_index_parts, + sharding_key_as_index_obj) + t.assert_equals(ok, false) + t.assert_str_contains(err, 'assertion failed') +end + +g.test_get_index_fieldno_map = function() + local index_parts = { + {fieldno = 2}, + {fieldno = 3}, + } + + local fieldno_map = utils.get_index_fieldno_map(index_parts) + t.assert_equals(fieldno_map, { + [2] = 1, + [3] = 2 + }) +end + +g.test_is_part_of_pk_positive = function() + local space_name = 'is_part_of_pk' + local index_parts = { + {fieldno = 2}, + {fieldno = 3}, + } + local sharding_key_as_index_obj = { + parts = { + {fieldno = 2}, + } + } + + local is_part_of_pk = sharding_key_module.internal.is_part_of_pk + local res = is_part_of_pk(space_name, index_parts, sharding_key_as_index_obj) + t.assert_equals(res, true) +end + +g.test_is_part_of_pk_negative = function() + local space_name = 'is_part_of_pk' + local index_parts = { + {fieldno = 1}, + {fieldno = 3}, + } + local sharding_key_as_index_obj = { + parts = { + {fieldno = 2}, + } + } + + local is_part_of_pk = sharding_key_module.internal.is_part_of_pk + local res = is_part_of_pk(space_name, index_parts, sharding_key_as_index_obj) + t.assert_equals(res, false) +end