diff --git a/examples/insurance/implementations/models/dnn.py b/examples/insurance/implementations/models/dnn.py index 5f1db542ad..3c5a2ccec0 100644 --- a/examples/insurance/implementations/models/dnn.py +++ b/examples/insurance/implementations/models/dnn.py @@ -2,27 +2,33 @@ def create_estimator(run_config, model_config): + aggregates = model_config["input"]["aggregates"] + feature_columns = [ tf.feature_column.indicator_column( - tf.feature_column.categorical_column_with_vocabulary_list("sex", ["female", "male"]) + tf.feature_column.categorical_column_with_vocabulary_list( + "sex", aggregates["sex_vocab"] + ) ), tf.feature_column.indicator_column( - tf.feature_column.categorical_column_with_vocabulary_list("smoker", ["yes", "no"]) + tf.feature_column.categorical_column_with_vocabulary_list( + "smoker", aggregates["smoker_vocab"] + ) ), tf.feature_column.indicator_column( tf.feature_column.categorical_column_with_vocabulary_list( - "region", ["northwest", "northeast", "southwest", "southeast"] + "region", aggregates["region_vocab"] ) ), tf.feature_column.bucketized_column( - tf.feature_column.numeric_column("age"), [15, 20, 25, 35, 40, 45, 50, 55, 60, 65] + tf.feature_column.numeric_column("age"), aggregates["age_buckets"] ), tf.feature_column.bucketized_column( - tf.feature_column.numeric_column("bmi"), [15, 20, 25, 35, 40, 45, 50, 55] + tf.feature_column.numeric_column("bmi"), aggregates["bmi_buckets"] ), tf.feature_column.indicator_column( tf.feature_column.categorical_column_with_vocabulary_list( - "children", model_config["aggregates"]["children_set"] + "children", aggregates["children_set"] ) ), ] diff --git a/examples/insurance/resources/apis.yaml b/examples/insurance/resources/apis.yaml index 4507607637..fb1611e74e 100644 --- a/examples/insurance/resources/apis.yaml +++ b/examples/insurance/resources/apis.yaml @@ -1,5 +1,5 @@ - kind: api name: cost - model_name: dnn + model: @dnn compute: replicas: 1 diff --git a/examples/insurance/resources/environments.yaml b/examples/insurance/resources/environments.yaml index f1d0b44ad3..fa1444d1c2 100644 --- a/examples/insurance/resources/environments.yaml +++ b/examples/insurance/resources/environments.yaml @@ -3,11 +3,4 @@ data: type: csv path: s3a://cortex-examples/insurance.csv - schema: - - age - - sex - - bmi - - children - - smoker - - region - - charges + schema: [@age, @sex, @bmi, @children, @smoker, @region, @charges] diff --git a/examples/insurance/resources/features.yaml b/examples/insurance/resources/features.yaml index 0465fec6a2..4a299c36a9 100644 --- a/examples/insurance/resources/features.yaml +++ b/examples/insurance/resources/features.yaml @@ -47,30 +47,22 @@ - kind: aggregate name: charges_mean aggregator: cortex.mean - inputs: - columns: - col: charges + input: @charges - kind: aggregate name: charges_stddev aggregator: cortex.stddev - inputs: - columns: - col: charges + input: @charges - kind: aggregate name: children_set aggregator: cortex.collect_set_int - inputs: - columns: - col: children + input: @children - kind: transformed_column name: charges_normalized transformer: cortex.normalize - inputs: - columns: - num: charges - args: - mean: charges_mean - stddev: charges_stddev + input: + col: @charges + mean: @charges_mean + stddev: @charges_stddev diff --git a/examples/insurance/resources/models.yaml b/examples/insurance/resources/models.yaml index 4b68106722..76ad5d1119 100644 --- a/examples/insurance/resources/models.yaml +++ b/examples/insurance/resources/models.yaml @@ -1,14 +1,16 @@ - kind: model name: dnn - type: regression - target_column: charges_normalized - feature_columns: - - age - - sex - - bmi - - children - - smoker - - region + estimator_path: implementations/models/dnn.py + target_column: @charges_normalized + input: + features: [@age, @sex, @bmi, @children, @smoker, @region, @charges] + aggregates: + children_set: @children_set + region_vocab: ["northwest", "northeast", "southwest", "southeast"] + age_buckets: [15, 20, 25, 35, 40, 45, 50, 55, 60, 65] + bmi_buckets: [15, 20, 25, 35, 40, 45, 50, 55] + smoker_vocab: ["yes", "no"] + sex_vocab: ["female", "male"] hparams: hidden_units: [100, 100, 100] data_partition_ratio: @@ -16,5 +18,3 @@ evaluation: 0.2 training: num_steps: 10000 - aggregates: - - children_set diff --git a/pkg/workloads/lib/context.py b/pkg/workloads/lib/context.py index dcb8556c9d..9135147771 100644 --- a/pkg/workloads/lib/context.py +++ b/pkg/workloads/lib/context.py @@ -721,8 +721,8 @@ def _deserialize_raw_ctx(raw_ctx): # input should already have non-column arguments replaced, and all types validated def create_transformer_inputs_from_map(input, col_value_map): if util.is_str(input): - res_name = util.get_resource_ref(input) - if res_name is not None and res_name in col_value_map: + if util.is_resource_ref(input): + res_name = util.get_resource_ref(input) return col_value_map[res_name] return input diff --git a/pkg/workloads/lib/util.py b/pkg/workloads/lib/util.py index 3d60b80938..4a35aec5b1 100644 --- a/pkg/workloads/lib/util.py +++ b/pkg/workloads/lib/util.py @@ -879,19 +879,18 @@ def is_resource_ref(obj): def get_resource_ref(obj): if not is_str(obj): - return None + raise ValueError("expected input of type string but received " + str(type(obj))) if obj.startswith(resource_escape_seq): return obj[len(resource_escape_seq) :] elif obj.startswith(resource_escape_seq_raw): return obj[len(resource_escape_seq_raw) :] - return None + raise ValueError("expected a resource reference but got " + obj) def extract_resource_refs(input): if is_str(input): - res = get_resource_ref(input) - if res is not None: - return {res} + if is_resource_ref(input): + return {get_resource_ref(input)} return set() if is_list(input): diff --git a/pkg/workloads/spark_job/test/integration/insurance_context.py b/pkg/workloads/spark_job/test/integration/insurance_context.py new file mode 100644 index 0000000000..65d30365c7 --- /dev/null +++ b/pkg/workloads/spark_job/test/integration/insurance_context.py @@ -0,0 +1,631 @@ +# Copyright 2019 Cortex Labs, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import consts + +""" +HOW TO GENERATE CONTEXT + +1. cx deploy +2. get a path to a context +3. ssh into a docker container (spark or tf_train) +docker run -it --entrypoint "/bin/bash" cortexlabs/spark +4. run the following in python3 shell + +from lib import util +from lib.storage import S3 + +S3(bucket, client_config={}).get_msgpack(key) + +bucket, key = S3.deconstruct_s3_path('s3:///apps//contexts/.msgpack') + +5. udpate path of any implementations being used to // e.g. /transformers/normalize.py +6. delete images in cortex_config +""" + + +def get(input_data_path): + raw_ctx["environment_data"]["csv_data"]["path"] = input_data_path + raw_ctx["cortex_config"]["api_version"] = consts.CORTEX_VERSION + + return raw_ctx + + +raw_ctx = { + "constants": {}, + "root": "apps/insurance/data/2019-06-12-19-22-55-936375/483b4537be2db30b81be4809ab0c787f65230540b9b6779af7420922d654011", + "key": "apps/insurance/contexts/58feed5c3620ce81e3cb18ba0268078b28f102a70aa55cb8d30fa29a0d0f323.msgpack", + "estimators": { + "c5e53f81d6d57bd1b46ed04020509401b139864d483c35e781338f11b2cc301": { + "target_column": None, + "id": "be000f367fb9cd7167d4eb9bebc6469d7fd839fc55adb3cb6ff7e34e33335e6", + "index": 0, + "resource_type": "estimator", + "namespace": None, + "input": None, + "embed": None, + "impl_key": "estimators/c5e53f81d6d57bd1b46ed04020509401b139864d483c35e781338f11b2cc301.py", + "path": "implementations/models/dnn.py", + "hparams": None, + "prediction_key": "", + "file_path": "", + "name": "c5e53f81d6d57bd1b46ed04020509401b139864d483c35e781338f11b2cc301", + "training_input": None, + } + }, + "transformed_columns": { + "charges_normalized": { + "index": 10, + "type": "FLOAT_COLUMN", + "tags": {}, + "resource_type": "transformed_column", + "transformer_path": None, + "input": { + "col": "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dcharges", + "stddev": "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dcharges_stddev", + "mean": "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dcharges_mean", + }, + "embed": None, + "workload_id": "cs7eodcfv22u5ztm2l8g", + "name": "charges_normalized", + "transformer": "cortex.normalize", + "file_path": "resources/features.yaml", + "id": "a91d01f7a9a260d094fa90d3f6a1e77372f5c271d633ac129637f2eb6196d7e", + "compute": { + "driver_mem_overhead": None, + "executors": 1, + "executor_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "driver_cpu": '{"Quantity":"1","UserString":"1"}', + "mem_overhead_factor": None, + "executor_mem_overhead": None, + "driver_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "executor_cpu": '{"Quantity":"1","UserString":"1"}', + }, + } + }, + "id": "58feed5c3620ce81e3cb18ba0268078b28f102a70aa55cb8d30fa29a0d0f323", + "environment_data": { + "csv_data": { + "schema": [ + "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dage", + "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dsex", + "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dbmi", + "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dchildren", + "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dsmoker", + "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dregion", + "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dcharges", + ], + "csv_config": { + "ignore_trailing_white_space": None, + "null_value": None, + "positive_inf": None, + "escape": None, + "ignore_leading_white_space": None, + "header": None, + "negative_inf": None, + "max_chars_per_column": None, + "char_to_escape_quote_escaping": None, + "nan_value": None, + "encoding": None, + "multiline": None, + "quote": None, + "comment": None, + "sep": None, + "max_columns": None, + "empty_value": None, + }, + "type": "csv", + "path": "s3a://cortex-examples/insurance.csv", + "drop_null": False, + }, + "parquet_data": None, + }, + "apis": { + "cost": { + "model_name": "dnn", + "index": 0, + "tags": {}, + "resource_type": "api", + "embed": None, + "workload_id": "zixo1slsqroa3dheq2mg", + "path": "/insurance/cost", + "name": "cost", + "model": "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31ddnn", + "file_path": "resources/apis.yaml", + "id": "a5341babd551a85d82b7b49e488983113e97eb6a0c4c8643986a92a3c44ec32", + "compute": {"gpu": 0, "cpu": None, "replicas": 1, "mem": None}, + } + }, + "cortex_config": { + "enable_telemetry": False, + "id": "da5e65b994ba4ebb069bdc19cf73da64aee79e5d83f466038dc75b3ef04fa63", + "operator_in_cluster": False, + "log_group": "cortex", + "namespace": "cortex", + "region": "us-west-2", + "api_version": "master", + }, + "metadata_root": "apps/insurance/data/2019-06-12-19-22-55-936375/483b4537be2db30b81be4809ab0c787f65230540b9b6779af7420922d654011/metadata", + "raw_columns": { + "raw_string_columns": { + "smoker": { + "index": 4, + "required": True, + "tags": {}, + "type": "STRING_COLUMN", + "embed": None, + "workload_id": "cs7eodcfv22u5ztm2l8g", + "name": "smoker", + "file_path": "resources/features.yaml", + "id": "3625e8890ecad8d2f64382dac7a06b7546b21eeba32de894265fbb95e8c2140", + "values": ["yes", "no"], + "compute": { + "driver_mem_overhead": None, + "executors": 1, + "executor_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "driver_cpu": '{"Quantity":"1","UserString":"1"}', + "mem_overhead_factor": None, + "executor_mem_overhead": None, + "driver_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "executor_cpu": '{"Quantity":"1","UserString":"1"}', + }, + "resource_type": "raw_column", + }, + "sex": { + "index": 1, + "required": True, + "tags": {}, + "type": "STRING_COLUMN", + "embed": None, + "workload_id": "cs7eodcfv22u5ztm2l8g", + "name": "sex", + "file_path": "resources/features.yaml", + "id": "fe94653b7cc3ea3d6cdee9e42b0feab7ad8f1a5b7d2bd4f9f6860c1b3ce5071", + "values": ["female", "male"], + "compute": { + "driver_mem_overhead": None, + "executors": 1, + "executor_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "driver_cpu": '{"Quantity":"1","UserString":"1"}', + "mem_overhead_factor": None, + "executor_mem_overhead": None, + "driver_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "executor_cpu": '{"Quantity":"1","UserString":"1"}', + }, + "resource_type": "raw_column", + }, + "region": { + "index": 5, + "required": True, + "tags": {}, + "type": "STRING_COLUMN", + "embed": None, + "workload_id": "cs7eodcfv22u5ztm2l8g", + "name": "region", + "file_path": "resources/features.yaml", + "id": "a51c74f814e6a28891b6677698fc5129794c8bf60d90741ac2e51d5a112e024", + "values": ["northwest", "northeast", "southwest", "southeast"], + "compute": { + "driver_mem_overhead": None, + "executors": 1, + "executor_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "driver_cpu": '{"Quantity":"1","UserString":"1"}', + "mem_overhead_factor": None, + "executor_mem_overhead": None, + "driver_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "executor_cpu": '{"Quantity":"1","UserString":"1"}', + }, + "resource_type": "raw_column", + }, + }, + "raw_float_columns": { + "bmi": { + "index": 2, + "resource_type": "raw_column", + "tags": {}, + "required": True, + "embed": None, + "workload_id": "cs7eodcfv22u5ztm2l8g", + "max": 60.0, + "name": "bmi", + "min": 0.0, + "file_path": "resources/features.yaml", + "id": "05734a3de0cc2a28050b811241eb27c59634c9a175382fc0c0d8ceeb0840036", + "values": None, + "compute": { + "driver_mem_overhead": None, + "executors": 1, + "executor_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "driver_cpu": '{"Quantity":"1","UserString":"1"}', + "mem_overhead_factor": None, + "executor_mem_overhead": None, + "driver_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "executor_cpu": '{"Quantity":"1","UserString":"1"}', + }, + "type": "FLOAT_COLUMN", + }, + "charges": { + "index": 6, + "resource_type": "raw_column", + "tags": {}, + "required": True, + "embed": None, + "workload_id": "cs7eodcfv22u5ztm2l8g", + "max": 100000.0, + "name": "charges", + "min": 0.0, + "file_path": "resources/features.yaml", + "id": "9e8a5917e4a8c803af17ad0792b89201b1784415bfeb7801ca0308c1a8f6090", + "values": None, + "compute": { + "driver_mem_overhead": None, + "executors": 1, + "executor_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "driver_cpu": '{"Quantity":"1","UserString":"1"}', + "mem_overhead_factor": None, + "executor_mem_overhead": None, + "driver_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "executor_cpu": '{"Quantity":"1","UserString":"1"}', + }, + "type": "FLOAT_COLUMN", + }, + }, + "raw_int_columns": { + "age": { + "index": 0, + "resource_type": "raw_column", + "tags": {}, + "required": True, + "embed": None, + "workload_id": "cs7eodcfv22u5ztm2l8g", + "max": 100, + "name": "age", + "min": 0, + "file_path": "resources/features.yaml", + "id": "2957b75e2f53e5c74f6036022ef1681d1c6444e1c8a7ca424813f642463f503", + "values": None, + "compute": { + "driver_mem_overhead": None, + "executors": 1, + "executor_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "driver_cpu": '{"Quantity":"1","UserString":"1"}', + "mem_overhead_factor": None, + "executor_mem_overhead": None, + "driver_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "executor_cpu": '{"Quantity":"1","UserString":"1"}', + }, + "type": "INT_COLUMN", + }, + "children": { + "index": 3, + "resource_type": "raw_column", + "tags": {}, + "required": True, + "embed": None, + "workload_id": "cs7eodcfv22u5ztm2l8g", + "max": 10, + "name": "children", + "min": 0, + "file_path": "resources/features.yaml", + "id": "a782d9ba0db596ad7c3e1a46be4dcd5a0c24e626e8ee308e941e33de04149bc", + "values": None, + "compute": { + "driver_mem_overhead": None, + "executors": 1, + "executor_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "driver_cpu": '{"Quantity":"1","UserString":"1"}', + "mem_overhead_factor": None, + "executor_mem_overhead": None, + "driver_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "executor_cpu": '{"Quantity":"1","UserString":"1"}', + }, + "type": "INT_COLUMN", + }, + }, + "raw_inferred_columns": {}, + }, + "python_packages": {}, + "status_prefix": "apps/insurance/resource_statuses", + "aggregators": { + "cortex.mean": { + "input": { + "_type": "FLOAT_COLUMN|INT_COLUMN", + "_min_count": None, + "_optional": False, + "_allow_null": False, + "_default": None, + "_max_count": None, + }, + "file_path": "/home/ubuntu/src/github.com/cortexlabs/cortex/pkg/aggregators/aggregators.yaml", + "impl_key": "aggregators/71c8aa1ce07d9d7059e305ed2b180504c36a41452e73fb251ef532bf679f851.py", + "path": "spark/mean.py", + "name": "mean", + "index": 13, + "embed": None, + "output_type": "FLOAT", + "id": "945acd1e82ed2178b7937215f6f82c814abfa967ec7da545e0a8c776759a37f", + "resource_type": "aggregator", + "namespace": "cortex", + }, + "cortex.stddev": { + "input": { + "_type": "FLOAT_COLUMN|INT_COLUMN", + "_min_count": None, + "_optional": False, + "_allow_null": False, + "_default": None, + "_max_count": None, + }, + "file_path": "/home/ubuntu/src/github.com/cortexlabs/cortex/pkg/aggregators/aggregators.yaml", + "impl_key": "aggregators/b8fa468e54c55083bf350f8b482c5323bd4bc12dd5fa0d859908ab2829aea7f.py", + "path": "spark/stddev.py", + "name": "stddev", + "index": 18, + "embed": None, + "output_type": "FLOAT", + "id": "5b076c542b7e8ad7ebd57c1bedad32875ccddccf66bb11c4d87a6058ec765c9", + "resource_type": "aggregator", + "namespace": "cortex", + }, + "cortex.collect_set_int": { + "input": { + "_type": "INT_COLUMN", + "_min_count": None, + "_optional": False, + "_allow_null": False, + "_default": None, + "_max_count": None, + }, + "file_path": "/home/ubuntu/src/github.com/cortexlabs/cortex/pkg/aggregators/aggregators.yaml", + "impl_key": "aggregators/4a26fe7551ea175ae68b998ea12766a8e4ffc6a6763816b141bc84d42275e90.py", + "path": "spark/collect_set_int.py", + "name": "collect_set_int", + "index": 2, + "embed": None, + "output_type": ["INT"], + "id": "d35f3139f033053d7cc923869d15501cab7078c31ca023a32e307edaac35ae3", + "resource_type": "aggregator", + "namespace": "cortex", + }, + }, + "environment": { + "file_path": "resources/environments.yaml", + "name": "dev", + "index": 0, + "embed": None, + "limit": { + "randomize": None, + "random_seed": None, + "fraction_of_rows": None, + "num_rows": None, + }, + "id": "483b4537be2db30b81be4809ab0c787f65230540b9b6779af7420922d654011", + "log_level": {"spark": "WARN", "tensorflow": "DEBUG"}, + }, + "aggregates": { + "charges_stddev": { + "tags": {}, + "id": "ddacaf489cf1311c12eb898068bbb9f214be66ac789f3f705d6e1c57cdc943a", + "type": "FLOAT", + "aggregator": "cortex.stddev", + "resource_type": "aggregate", + "aggregator_path": None, + "input": "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dcharges", + "embed": None, + "name": "charges_stddev", + "file_path": "resources/features.yaml", + "workload_id": "cs7eodcfv22u5ztm2l8g", + "compute": { + "driver_mem_overhead": None, + "executors": 1, + "executor_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "driver_cpu": '{"Quantity":"1","UserString":"1"}', + "mem_overhead_factor": None, + "executor_mem_overhead": None, + "driver_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "executor_cpu": '{"Quantity":"1","UserString":"1"}', + }, + "key": "apps/insurance/data/2019-06-12-19-22-55-936375/483b4537be2db30b81be4809ab0c787f65230540b9b6779af7420922d654011/aggregates/ddacaf489cf1311c12eb898068bbb9f214be66ac789f3f705d6e1c57cdc943a.msgpack", + "index": 8, + }, + "charges_mean": { + "tags": {}, + "id": "c08af633f53a95ace3e71152b00d595bbbbbd282037ee5fd5e708adf1d96b38", + "type": "FLOAT", + "aggregator": "cortex.mean", + "resource_type": "aggregate", + "aggregator_path": None, + "input": "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dcharges", + "embed": None, + "name": "charges_mean", + "file_path": "resources/features.yaml", + "workload_id": "cs7eodcfv22u5ztm2l8g", + "compute": { + "driver_mem_overhead": None, + "executors": 1, + "executor_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "driver_cpu": '{"Quantity":"1","UserString":"1"}', + "mem_overhead_factor": None, + "executor_mem_overhead": None, + "driver_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "executor_cpu": '{"Quantity":"1","UserString":"1"}', + }, + "key": "apps/insurance/data/2019-06-12-19-22-55-936375/483b4537be2db30b81be4809ab0c787f65230540b9b6779af7420922d654011/aggregates/c08af633f53a95ace3e71152b00d595bbbbbd282037ee5fd5e708adf1d96b38.msgpack", + "index": 7, + }, + "children_set": { + "tags": {}, + "id": "800ae506a9af394af56ae5fd7727801eed39a79f0a7bb3bcb58d5722953c748", + "type": ["INT"], + "aggregator": "cortex.collect_set_int", + "resource_type": "aggregate", + "aggregator_path": None, + "input": "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dchildren", + "embed": None, + "name": "children_set", + "file_path": "resources/features.yaml", + "workload_id": "cs7eodcfv22u5ztm2l8g", + "compute": { + "driver_mem_overhead": None, + "executors": 1, + "executor_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "driver_cpu": '{"Quantity":"1","UserString":"1"}', + "mem_overhead_factor": None, + "executor_mem_overhead": None, + "driver_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "executor_cpu": '{"Quantity":"1","UserString":"1"}', + }, + "key": "apps/insurance/data/2019-06-12-19-22-55-936375/483b4537be2db30b81be4809ab0c787f65230540b9b6779af7420922d654011/aggregates/800ae506a9af394af56ae5fd7727801eed39a79f0a7bb3bcb58d5722953c748.msgpack", + "index": 9, + }, + }, + "app": { + "name": "insurance", + "id": "64e8937abc6d71cf2d2f0fe05e52c33666883443ae4e8af7924c71198caa1f9", + }, + "raw_dataset": { + "key": "apps/insurance/data/2019-06-12-19-22-55-936375/483b4537be2db30b81be4809ab0c787f65230540b9b6779af7420922d654011/data_raw/raw.parquet" + }, + "dataset_version": "2019-06-12-19-22-55-936375", + "transformers": { + "cortex.normalize": { + "input": { + "_type": { + "col": { + "_type": "FLOAT_COLUMN|INT_COLUMN", + "_min_count": None, + "_optional": False, + "_allow_null": False, + "_default": None, + "_max_count": None, + }, + "stddev": { + "_type": "INT|FLOAT", + "_min_count": None, + "_optional": False, + "_allow_null": False, + "_default": None, + "_max_count": None, + }, + "mean": { + "_type": "INT|FLOAT", + "_min_count": None, + "_optional": False, + "_allow_null": False, + "_default": None, + "_max_count": None, + }, + }, + "_min_count": None, + "_optional": False, + "_allow_null": False, + "_default": None, + "_max_count": None, + }, + "file_path": "/home/ubuntu/src/github.com/cortexlabs/cortex/pkg/transformers/transformers.yaml", + "impl_key": "/transformers/normalize.py", + "path": "normalize.py", + "name": "normalize", + "index": 1, + "embed": None, + "output_type": "FLOAT_COLUMN", + "id": "0dcf14b7b6208633b8805b29eb564f860736a07691176095fd4f19aa5ef75ab", + "resource_type": "transformer", + "namespace": "cortex", + } + }, + "models": { + "dnn": { + "training_input": None, + "data_partition_ratio": {"evaluation": 0.2, "training": 0.8}, + "dataset_compute": { + "driver_mem_overhead": None, + "executors": 1, + "executor_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "driver_cpu": '{"Quantity":"1","UserString":"1"}', + "mem_overhead_factor": None, + "executor_mem_overhead": None, + "driver_mem": '{"Quantity":"500Mi","UserString":"500Mi"}', + "executor_cpu": '{"Quantity":"1","UserString":"1"}', + }, + "training": { + "tf_random_seed": 1788, + "keep_checkpoint_every_n_hours": 10000, + "num_steps": 10000, + "keep_checkpoint_max": 3, + "log_step_count_steps": 100, + "save_checkpoints_steps": None, + "num_epochs": None, + "save_summary_steps": 100, + "batch_size": 40, + "save_checkpoints_secs": 600, + "tf_randomize_seed": False, + "shuffle": True, + }, + "input": { + "features": [ + "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dage", + "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dsex", + "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dbmi", + "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dchildren", + "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dsmoker", + "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dregion", + ], + "aggregates": { + "region_vocab": ["northwest", "northeast", "southwest", "southeast"], + "smoker_vocab": ["yes", "no"], + "children_set": "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dchildren_set", + "sex_vocab": ["female", "male"], + "age_buckets": [15, 20, 25, 35, 40, 45, 50, 55, 60, 65], + "bmi_buckets": [15, 20, 25, 35, 40, 45, 50, 55], + }, + }, + "target_column": "\U0001f31d\U0001f31d\U0001f31d\U0001f31d\U0001f31dcharges_normalized", + "embed": None, + "hparams": {"hidden_units": [100, 100, 100]}, + "workload_id": "gj4bdao7ocrfa1xbls9y", + "file_path": "resources/models.yaml", + "evaluation": { + "throttle_secs": 600, + "start_delay_secs": 120, + "batch_size": 40, + "num_epochs": None, + "shuffle": False, + "num_steps": 100, + }, + "estimator_path": "implementations/models/dnn.py", + "dataset": { + "file_path": "resources/models.yaml", + "model_name": "dnn", + "workload_id": "cs7eodcfv22u5ztm2l8g", + "name": "dnn/training_dataset", + "index": 0, + "embed": None, + "train_key": "apps/insurance/data/2019-06-12-19-22-55-936375/483b4537be2db30b81be4809ab0c787f65230540b9b6779af7420922d654011/data_training/1020431bc033a0aa2cc427349486b7636f2b0bb65e75553811fea82b9fa12fd/train.tfrecord", + "id": "1020431bc033a0aa2cc427349486b7636f2b0bb65e75553811fea82b9fa12fd", + "resource_type": "training_dataset", + "eval_key": "apps/insurance/data/2019-06-12-19-22-55-936375/483b4537be2db30b81be4809ab0c787f65230540b9b6779af7420922d654011/data_training/1020431bc033a0aa2cc427349486b7636f2b0bb65e75553811fea82b9fa12fd/eval.tfrecord", + }, + "id": "4145945c21c9f9fd616e666fef2b61aa501e8db15e756e9dcd2f2e3ec47575e", + "index": 0, + "resource_type": "model", + "tags": {}, + "prediction_key": "", + "name": "dnn", + "estimator": "c5e53f81d6d57bd1b46ed04020509401b139864d483c35e781338f11b2cc301", + "compute": {"gpu": None, "cpu": None, "mem": None}, + "key": "apps/insurance/data/2019-06-12-19-22-55-936375/483b4537be2db30b81be4809ab0c787f65230540b9b6779af7420922d654011/models/4145945c21c9f9fd616e666fef2b61aa501e8db15e756e9dcd2f2e3ec47575e.zip", + } + }, +} diff --git a/pkg/workloads/spark_job/test/integration/iris_test.py b/pkg/workloads/spark_job/test/integration/insurance_test.py similarity index 71% rename from pkg/workloads/spark_job/test/integration/iris_test.py rename to pkg/workloads/spark_job/test/integration/insurance_test.py index 7fcff3d3c4..50bea0d0a3 100644 --- a/pkg/workloads/spark_job/test/integration/iris_test.py +++ b/pkg/workloads/spark_job/test/integration/insurance_test.py @@ -17,7 +17,7 @@ from spark_job import spark_job from lib.exceptions import UserException from lib import Context -from test.integration import iris_context +from test.integration import insurance_context import pytest from pyspark.sql.types import * @@ -31,22 +31,22 @@ pytestmark = pytest.mark.usefixtures("spark") -iris_data = [ - [5.1, 3.5, 1.4, 0.2, "Iris-setosa"], - [4.9, 3.0, 1.4, 0.2, "Iris-setosa"], - [4.7, 3.2, 1.3, 0.2, "Iris-setosa"], - [4.6, 3.1, 1.5, 0.2, "Iris-setosa"], - [5.0, 3.6, 1.4, 0.2, "Iris-setosa"], - [7.0, 3.2, 4.7, 1.4, "Iris-versicolor"], - [6.4, 3.2, 4.5, 1.5, "Iris-versicolor"], - [6.9, 3.1, 4.9, 1.5, "Iris-versicolor"], - [5.5, 2.3, 4.0, 1.3, "Iris-versicolor"], - [6.5, 2.8, 4.6, 1.5, "Iris-versicolor"], - [6.3, 3.3, 6.0, 2.5, "Iris-virginica"], - [5.8, 2.7, 5.1, 1.9, "Iris-virginica"], - [7.1, 3.0, 5.9, 2.1, "Iris-virginica"], - [6.3, 2.9, 5.6, 1.8, "Iris-virginica"], - [6.5, 3.0, 5.8, 2.2, "Iris-virginica"], +insurance_data = [ + [19, "female", 27.9, 0, "yes", "southwest", 16884.924], + [18, "male", 33.77, 1, "no", "southeast", 1725.5523], + [28, "male", 33, 3, "no", "southeast", 4449.462], + [33, "male", 22.705, 0, "no", "northwest", 21984.47061], + [32, "male", 28.88, 0, "no", "northwest", 3866.8552], + [31, "female", 25.74, 0, "no", "southeast", 3756.6216], + [46, "female", 33.44, 1, "no", "southeast", 8240.5896], + [37, "female", 27.74, 3, "no", "northwest", 7281.5056], + [37, "male", 29.83, 2, "no", "northeast", 6406.4107], + [60, "female", 25.84, 0, "no", "northwest", 28923.13692], + [25, "male", 26.22, 0, "no", "northeast", 2721.3208], + [62, "female", 26.29, 0, "yes", "southeast", 27808.7251], + [23, "male", 34.4, 0, "no", "southwest", 1826.843], + [56, "female", 39.82, 0, "no", "southeast", 11090.7178], + [27, "male", 42.13, 0, "yes", "southeast", 39611.7577], ] @@ -54,11 +54,11 @@ def test_simple_end_to_end(spark): local_storage_path = Path("/workspace/local_storage") local_storage_path.mkdir(parents=True, exist_ok=True) should_ingest = True - input_data_path = os.path.join(str(local_storage_path), "iris.csv") + input_data_path = os.path.join(str(local_storage_path), "insurance.csv") - raw_ctx = iris_context.get(input_data_path) + raw_ctx = insurance_context.get(input_data_path) - workload_id = raw_ctx["raw_columns"]["raw_float_columns"]["sepal_length"]["workload_id"] + workload_id = raw_ctx["raw_columns"]["raw_string_columns"]["smoker"]["workload_id"] cols_to_validate = [] @@ -66,8 +66,8 @@ def test_simple_end_to_end(spark): for raw_column in column_type.values(): cols_to_validate.append(raw_column["id"]) - iris_data_string = "\n".join(",".join(str(val) for val in line) for line in iris_data) - Path(os.path.join(str(local_storage_path), "iris.csv")).write_text(iris_data_string) + insurance_data_string = "\n".join(",".join(str(val) for val in line) for line in insurance_data) + Path(os.path.join(str(local_storage_path), "insurance.csv")).write_text(insurance_data_string) ctx = Context( raw_obj=raw_ctx, cache_dir="/workspace/cache", local_storage_path=str(local_storage_path) @@ -86,7 +86,7 @@ def test_simple_end_to_end(spark): cols_to_aggregate = [r["id"] for r in raw_ctx["aggregates"].values()] - spark_job.run_custom_aggregators(spark, ctx, cols_to_aggregate, raw_df) + spark_job.run_aggregators(spark, ctx, cols_to_aggregate, raw_df) for aggregate_id in cols_to_aggregate: for aggregate_resource in raw_ctx["aggregates"].values(): diff --git a/pkg/workloads/spark_job/test/integration/iris_context.py b/pkg/workloads/spark_job/test/integration/iris_context.py deleted file mode 100644 index d36a9e4dd1..0000000000 --- a/pkg/workloads/spark_job/test/integration/iris_context.py +++ /dev/null @@ -1,572 +0,0 @@ -# Copyright 2019 Cortex Labs, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import consts - -""" -HOW TO GENERATE CONTEXT - -1. cx deploy -2. get a path to a context -3. ssh into a docker container (spark or tf_train) -docker run -it --entrypoint "/bin/bash" cortexlabs/spark -4. run the following in python3 shell - -from lib import util -from lib.storage import S3 -bucket, key = S3.deconstruct_s3_path('s3:///apps//contexts/.msgpack') -S3(bucket, client_config={}).get_msgpack(key) -""" - - -def get(input_data_path): - raw_ctx["environment_data"]["csv_data"]["path"] = input_data_path - raw_ctx["cortex_config"]["api_version"] = consts.CORTEX_VERSION - - return raw_ctx - - -raw_ctx = { - "raw_dataset": { - "key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_raw/raw.parquet" - }, - "aggregates": { - "class_index": { - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/aggregates/54ead5d565a57cad06972cc11d2f01f05c4e9e1dbfc525d1fa66b7999213722.msgpack", - "tags": {}, - "type": {"index": ["STRING"], "reversed_index": {"STRING": "INT"}}, - "embed": None, - "file_path": "resources/aggregates.yaml", - "name": "class_index", - "id": "54ead5d565a57cad06972cc11d2f01f05c4e9e1dbfc525d1fa66b7999213722", - "aggregator": "cortex.index_string", - "index": 8, - "id_with_tags": "2bd062924097b0add1143dab547387307cf68f40870f52443ce5902006e00d9", - "resource_type": "aggregate", - "inputs": {"columns": {"col": "class"}, "args": {}}, - }, - "sepal_width_mean": { - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/aggregates/38159191e6018b929b42c7e73e8bfd19f5778bba79e84d9909f5d448ac15fc9.msgpack", - "tags": {}, - "type": "FLOAT", - "embed": None, - "file_path": "resources/aggregates.yaml", - "name": "sepal_width_mean", - "id": "38159191e6018b929b42c7e73e8bfd19f5778bba79e84d9909f5d448ac15fc9", - "aggregator": "cortex.mean", - "index": 2, - "id_with_tags": "850aaab46427d39c331dd996e4a44af1bb326e45b47caaf699a5676863463f6", - "resource_type": "aggregate", - "inputs": {"columns": {"col": "sepal_width"}, "args": {}}, - }, - "petal_width_stddev": { - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/aggregates/986fd2cbc2b1d74aa06cf533b67d7dd7f54b5b7bf58689c58d0ec8c2568bae8.msgpack", - "tags": {}, - "type": "FLOAT", - "embed": None, - "file_path": "resources/aggregates.yaml", - "name": "petal_width_stddev", - "id": "986fd2cbc2b1d74aa06cf533b67d7dd7f54b5b7bf58689c58d0ec8c2568bae8", - "aggregator": "cortex.stddev", - "index": 7, - "id_with_tags": "b8f7440e0f71ec502cccbead6b52da80c119b833baaee1d00315542a6ab907c", - "resource_type": "aggregate", - "inputs": {"columns": {"col": "petal_width"}, "args": {}}, - }, - "petal_width_mean": { - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/aggregates/317856401885874d95fffd349fe0878595e8c04833ba63c4546233ffd899e4d.msgpack", - "tags": {}, - "type": "FLOAT", - "embed": None, - "file_path": "resources/aggregates.yaml", - "name": "petal_width_mean", - "id": "317856401885874d95fffd349fe0878595e8c04833ba63c4546233ffd899e4d", - "aggregator": "cortex.mean", - "index": 6, - "id_with_tags": "d8b09430972c97c3679fe3e90d67b69f19f1effcc2b587eb0876fb8f7d7dc55", - "resource_type": "aggregate", - "inputs": {"columns": {"col": "petal_width"}, "args": {}}, - }, - "sepal_length_stddev": { - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/aggregates/e7191b1effd1e4d351580f251aa35dc7c0b9825745b207fbb8cce904c94a937.msgpack", - "tags": {}, - "type": "FLOAT", - "embed": None, - "file_path": "resources/aggregates.yaml", - "name": "sepal_length_stddev", - "id": "e7191b1effd1e4d351580f251aa35dc7c0b9825745b207fbb8cce904c94a937", - "aggregator": "cortex.stddev", - "index": 1, - "id_with_tags": "a7da8887f18dfac4523dd1f2135913046001c1dc04701c9a1010e6b049db943", - "resource_type": "aggregate", - "inputs": {"columns": {"col": "sepal_length"}, "args": {}}, - }, - "petal_length_stddev": { - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/aggregates/6a9481dc91eb3f82458356f1f5f98da6f25a69b460679e09a67988543f79e3f.msgpack", - "tags": {}, - "type": "FLOAT", - "embed": None, - "file_path": "resources/aggregates.yaml", - "name": "petal_length_stddev", - "id": "6a9481dc91eb3f82458356f1f5f98da6f25a69b460679e09a67988543f79e3f", - "aggregator": "cortex.stddev", - "index": 5, - "id_with_tags": "325ae37b8684886d194ca37af19f4660c549c07880833a45ced4848895670a4", - "resource_type": "aggregate", - "inputs": {"columns": {"col": "petal_length"}, "args": {}}, - }, - "sepal_width_stddev": { - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/aggregates/64594f51d3cfb55a3776d013102d5fdab29bfe7332ce0c4f7c916d64d3ca29f.msgpack", - "tags": {}, - "type": "FLOAT", - "embed": None, - "file_path": "resources/aggregates.yaml", - "name": "sepal_width_stddev", - "id": "64594f51d3cfb55a3776d013102d5fdab29bfe7332ce0c4f7c916d64d3ca29f", - "aggregator": "cortex.stddev", - "index": 3, - "id_with_tags": "29e071e808b973aa21af717e9ee3d91f077b40baa5b890d4a58c9dc64a12d4b", - "resource_type": "aggregate", - "inputs": {"columns": {"col": "sepal_width"}, "args": {}}, - }, - "sepal_length_mean": { - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/aggregates/690f97171881c08770cac55137c672167a84324efba478cfd583ec98dd18844.msgpack", - "tags": {}, - "type": "FLOAT", - "embed": None, - "file_path": "resources/aggregates.yaml", - "name": "sepal_length_mean", - "id": "690f97171881c08770cac55137c672167a84324efba478cfd583ec98dd18844", - "aggregator": "cortex.mean", - "index": 0, - "id_with_tags": "1feedb4635d8955765dc82f58122e9756b1b797b3a7dcf3477ec99e655f05f2", - "resource_type": "aggregate", - "inputs": {"columns": {"col": "sepal_length"}, "args": {}}, - }, - "petal_length_mean": { - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/aggregates/4deea2705f55fa8a38658546ea5c2d31e37d4aad43a874e091f1c1667b63a6e.msgpack", - "tags": {}, - "type": "FLOAT", - "embed": None, - "file_path": "resources/aggregates.yaml", - "name": "petal_length_mean", - "id": "4deea2705f55fa8a38658546ea5c2d31e37d4aad43a874e091f1c1667b63a6e", - "aggregator": "cortex.mean", - "index": 4, - "id_with_tags": "34a2792b8c2c8489ea3c8db81533a946d8005eb9e547a4863673e0eae011259", - "resource_type": "aggregate", - "inputs": {"columns": {"col": "petal_length"}, "args": {}}, - }, - }, - "transformers": { - "cortex.normalize": { - "id": "eab74305749aa9eaff514882156111fd49b8b740018da396693147cd4443a9e", - "impl_key": "/transformers/normalize.py", - "embed": None, - "file_path": "/home/ubuntu/src/github.com/cortexlabs/cortex/pkg/transformers/transformers.yaml", - "name": "normalize", - "namespace": "cortex", - "path": "", - "output_type": "FLOAT_COLUMN", - "index": 1, - "id_with_tags": "eab74305749aa9eaff514882156111fd49b8b740018da396693147cd4443a9e", - "resource_type": "transformer", - "inputs": { - "columns": {"num": "FLOAT_COLUMN|INT_COLUMN"}, - "args": {"stddev": "INT|FLOAT", "mean": "INT|FLOAT"}, - }, - }, - "cortex.index_string": { - "id": "81bcee8795009e19f3378b2c3ea10fa6048741f2ad6ef841e5ed55c81319a0c", - "impl_key": "/transformers/index_string.py", - "embed": None, - "file_path": "/home/ubuntu/src/github.com/cortexlabs/cortex/pkg/transformers/transformers.yaml", - "name": "index_string", - "namespace": "cortex", - "path": "", - "output_type": "INT_COLUMN", - "index": 2, - "id_with_tags": "81bcee8795009e19f3378b2c3ea10fa6048741f2ad6ef841e5ed55c81319a0c", - "resource_type": "transformer", - "inputs": { - "columns": {"text": "STRING_COLUMN"}, - "args": {"indexes": {"index": ["STRING"], "reversed_index": {"STRING": "INT"}}}, - }, - }, - }, - "python_packages": {}, - "key": "apps/iris/contexts/33d7d279749ec97d342614cd77c5e81314a74ae0c0407ff71a120e83736a658.msgpack", - "raw_columns": { - "raw_float_columns": { - "sepal_length": { - "tags": {}, - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "values": None, - "embed": None, - "type": "FLOAT_COLUMN", - "required": False, - "file_path": "resources/raw_columns.yaml", - "name": "sepal_length", - "id": "9479e84647a126fe5ce36e6eeac35aacb7156cd8c8e0773e572a91a7f9c1e92", - "min": 0.0, - "max": 10.0, - "index": 0, - "resource_type": "raw_column", - "id_with_tags": "b68cec533b973640329709bd4f7628bd8e8da5e3040bf227a1df5a7ce05807c", - }, - "sepal_width": { - "tags": {}, - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "values": None, - "embed": None, - "type": "FLOAT_COLUMN", - "required": False, - "file_path": "resources/raw_columns.yaml", - "name": "sepal_width", - "id": "690b9a1c2e717c7ec4304804d4d7fd54fba554d8ce4829062467a3dc4d5f0f8", - "min": 0.0, - "max": 10.0, - "index": 1, - "resource_type": "raw_column", - "id_with_tags": "01430cc2265647e61dd8d8f9bec1b3918468968bdd8b27d0c6088848501da44", - }, - "petal_length": { - "tags": {}, - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "values": None, - "embed": None, - "type": "FLOAT_COLUMN", - "required": False, - "file_path": "resources/raw_columns.yaml", - "name": "petal_length", - "id": "eb81ff65ce934e409ce18627cbb7d77c804289404fd62850fa5f915a1a9d87f", - "min": 0.0, - "max": 10.0, - "index": 2, - "resource_type": "raw_column", - "id_with_tags": "b89d5ef63dc22bfeb81684631d0f6e387e33cb26a52f7bb1b5de73ab49f40df", - }, - "petal_width": { - "tags": {}, - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "values": None, - "embed": None, - "type": "FLOAT_COLUMN", - "required": False, - "file_path": "resources/raw_columns.yaml", - "name": "petal_width", - "id": "98ee0c5e9935442ea77835297777f4ab916830db5cb1ec82590d8b03f53eb6c", - "min": 0.0, - "max": 10.0, - "index": 3, - "resource_type": "raw_column", - "id_with_tags": "0d148b5ccc0e9266e3fd349793efd12e6880c4f1577d311e6bbef792b939d85", - }, - }, - "raw_string_columns": { - "class": { - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "values": ["Iris-setosa", "Iris-versicolor", "Iris-virginica"], - "embed": None, - "required": False, - "type": "STRING_COLUMN", - "tags": {}, - "file_path": "resources/raw_columns.yaml", - "name": "class", - "id": "397a3c2785bcfdab244acdd11d65b415e3e4258b762deb8c17e600ce187c425", - "index": 4, - "resource_type": "raw_column", - "id_with_tags": "7fa09a7ca3544e1631bd60a792d23719a76bc9f77a350277a68efd3670a1f66", - } - }, - "raw_int_columns": {}, - }, - "environment_data": { - "csv_data": { - "drop_null": False, - "type": "csv", - "path": "/workspace/iris.csv", - "csv_config": { - "negative_inf": None, - "null_value": None, - "sep": None, - "ignore_leading_white_space": None, - "empty_value": None, - "max_columns": None, - "positive_inf": None, - "max_chars_per_column": None, - "nan_value": None, - "comment": None, - "ignore_trailing_white_space": None, - "multiline": None, - "char_to_escape_quote_escaping": None, - "encoding": None, - "escape": None, - "quote": None, - "header": None, - }, - "schema": ["sepal_length", "sepal_width", "petal_length", "petal_width", "class"], - }, - "parquet_data": None, - }, - "apis": { - "iris-type": { - "workload_id": "rvxejtv4uoy3jfuawokc", - "embed": None, - "tags": {}, - "file_path": "resources/apis.yaml", - "name": "iris-type", - "id": "07c60601edc687a5c3106bad0ad49fef497bb207487c5fd0a36226068a3166b", - "path": "/iris/iris-type", - "index": 0, - "id_with_tags": "e574b47b359b60f47132d3c919fc5c82fb5684c7784665a0e61bb42316b5b31", - "resource_type": "api", - "compute": {"mem": None, "replicas": 1, "cpu": None, "gpu": 0}, - "model_name": "dnn", - } - }, - "constants": {}, - "id": "33d7d279749ec97d342614cd77c5e81314a74ae0c0407ff71a120e83736a658", - "dataset_version": "2019-03-08-09-58-35-701834", - "environment": { - "log_level": {"spark": "WARN", "tensorflow": "INFO"}, - "index": 0, - "limit": { - "random_seed": None, - "randomize": None, - "fraction_of_rows": None, - "num_rows": None, - }, - "embed": None, - "file_path": "resources/environments.yaml", - "name": "dev", - "id": "3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554", - }, - "status_prefix": "apps/iris/resource_statuses", - "transformed_columns": { - "sepal_length_normalized": { - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "embed": None, - "type": "FLOAT_COLUMN", - "tags": {}, - "file_path": "resources/transformed_columns.yaml", - "name": "sepal_length_normalized", - "id": "a44a0acbb54123d03d67b47469cf83712df2045b90aa99036dab99f37583d46", - "transformer": "cortex.normalize", - "index": 0, - "id_with_tags": "d4f335e49dec681bd7a79766a79ab7682c8205e51a2ec46e40207785835f35a", - "resource_type": "transformed_column", - "inputs": { - "columns": {"num": "sepal_length"}, - "args": {"stddev": "sepal_length_stddev", "mean": "sepal_length_mean"}, - }, - }, - "petal_width_normalized": { - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "embed": None, - "type": "FLOAT_COLUMN", - "tags": {}, - "file_path": "resources/transformed_columns.yaml", - "name": "petal_width_normalized", - "id": "41221f15eea0328c2987c44171f323529bfa7a196a697b1a87ff4915c143531", - "transformer": "cortex.normalize", - "index": 3, - "id_with_tags": "a3ad56b29de6467931c40c992ac84b8a238a9f6d9611345bf4e338df314bf6d", - "resource_type": "transformed_column", - "inputs": { - "columns": {"num": "petal_width"}, - "args": {"stddev": "petal_width_stddev", "mean": "petal_width_mean"}, - }, - }, - "sepal_width_normalized": { - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "embed": None, - "type": "FLOAT_COLUMN", - "tags": {}, - "file_path": "resources/transformed_columns.yaml", - "name": "sepal_width_normalized", - "id": "360fe839dbc1ee1db2d0e0f0e8ca0d1a2cc54aed69e29843e0361d285ddb700", - "transformer": "cortex.normalize", - "index": 1, - "id_with_tags": "9aa22e1962c62aab2ea56f4cfc1369ed3e559bfaa70a5e8a2e17b82d1042f48", - "resource_type": "transformed_column", - "inputs": { - "columns": {"num": "sepal_width"}, - "args": {"stddev": "sepal_width_stddev", "mean": "sepal_width_mean"}, - }, - }, - "petal_length_normalized": { - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "embed": None, - "type": "FLOAT_COLUMN", - "tags": {}, - "file_path": "resources/transformed_columns.yaml", - "name": "petal_length_normalized", - "id": "7cbc111099c4bf38e27d6a05f9b2d37bdb9038f6f934be10298a718deae6db5", - "transformer": "cortex.normalize", - "index": 2, - "id_with_tags": "0c1923b1cc93679e8df3ec21f212656c050cb32d980604ffbd89fac0815ddcc", - "resource_type": "transformed_column", - "inputs": { - "columns": {"num": "petal_length"}, - "args": {"stddev": "petal_length_stddev", "mean": "petal_length_mean"}, - }, - }, - "class_indexed": { - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "embed": None, - "type": "INT_COLUMN", - "tags": {}, - "file_path": "resources/transformed_columns.yaml", - "name": "class_indexed", - "id": "6097e63c46b62b3cf70d86d9e1282bdd77d15d62bc4d132d9154bb5ddc1861d", - "transformer": "cortex.index_string", - "index": 4, - "id_with_tags": "f3b94376e20e64f67d0808c3589d8a4bb09196e38ff81ba775408be38148c1e", - "resource_type": "transformed_column", - "inputs": {"columns": {"text": "class"}, "args": {"indexes": "class_index"}}, - }, - }, - "models": { - "dnn": { - "aggregates": ["class_index"], - "impl_id": "2d7091a3fff24213d9e67cf2a846e5e31fd27f406fffbdb341140419f138f48", - "training_columns": [], - "key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/models/4989cb227eb56c2d3ccc1904cb3dbcab9a1ceb1ebf8cdb9f95a20b86a8df019.zip", - "embed": None, - "type": "classification", - "tags": {}, - "id": "4989cb227eb56c2d3ccc1904cb3dbcab9a1ceb1ebf8cdb9f95a20b86a8df019", - "name": "dnn", - "impl_key": "model_implementations/2d7091a3fff24213d9e67cf2a846e5e31fd27f406fffbdb341140419f138f48.py", - "feature_columns": [ - "sepal_length_normalized", - "sepal_width_normalized", - "petal_length_normalized", - "petal_width_normalized", - ], - "target_column": "class_indexed", - "resource_type": "model", - "hparams": {"hidden_units": [4, 2]}, - "prediction_key": "", - "workload_id": "aokhfrzyw6ju730nbwli", - "dataset": { - "train_key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_training/5bdaecf9c5a0094d4a18df15348f709be8acfd3c6faf72c3f243956c3896e76/train.tfrecord", - "workload_id": "jjd3l0fi4fhwqtgmpatg", - "eval_key": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/data_training/5bdaecf9c5a0094d4a18df15348f709be8acfd3c6faf72c3f243956c3896e76/eval.tfrecord", - "embed": None, - "file_path": "resources/models.yaml", - "name": "dnn/training_dataset", - "id": "5bdaecf9c5a0094d4a18df15348f709be8acfd3c6faf72c3f243956c3896e76", - "index": 0, - "id_with_tags": "166a9c191c7d058a596fc2396ded7c39e27c8021bffd7b91ff2bbb07e26f729", - "resource_type": "training_dataset", - "model_name": "dnn", - }, - "data_partition_ratio": {"evaluation": 0.2, "training": 0.8}, - "file_path": "resources/models.yaml", - "path": "implementations/models/dnn.py", - "training": { - "keep_checkpoint_every_n_hours": 10000, - "shuffle": True, - "batch_size": 10, - "log_step_count_steps": 100, - "num_epochs": None, - "keep_checkpoint_max": 3, - "tf_random_seed": 1788, - "save_summary_steps": 100, - "save_checkpoints_steps": None, - "num_steps": 1000, - "save_checkpoints_secs": 600, - "tf_randomize_seed": False, - }, - "evaluation": { - "shuffle": False, - "batch_size": 40, - "num_epochs": None, - "throttle_secs": 600, - "num_steps": 100, - "start_delay_secs": 120, - }, - "index": 0, - "compute": {"mem": None, "cpu": None, "gpu": None}, - "id_with_tags": "4989cb227eb56c2d3ccc1904cb3dbcab9a1ceb1ebf8cdb9f95a20b86a8df019", - } - }, - "app": { - "id": "47612b3175fece07f6c3e91992412c5b16ca88a9068cb72fecbcf653eb5ffcd", - "name": "iris", - }, - "cortex_config": { - "region": "us-west-2", - "log_group": "cortex", - "api_version": "master", - "id": "da5e65b994ba4ebb069bdc19cf73da64aee79e5d83f466038dc75b3ef04fa63", - }, - "root": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554", - "metadata_root": "apps/iris/data/2019-03-08-09-58-35-701834/3976c5679bcf7cb550453802f4c3a9333c5f193f6097f1f5642de48d2397554/metadata", - "aggregators": { - "cortex.mean": { - "id": "a68b354ddadc2e14348698e03af74db72cba92d7acb162e3163629e3e343373", - "impl_key": "aggregators/71c8aa1ce07d9d7059e305ed2b180504c36a41452e73fb251ef532bf679f851.py", - "embed": None, - "file_path": "/home/ubuntu/src/github.com/cortexlabs/cortex/pkg/aggregators/aggregators.yaml", - "name": "mean", - "namespace": "cortex", - "path": "", - "output_type": "FLOAT", - "index": 13, - "id_with_tags": "a68b354ddadc2e14348698e03af74db72cba92d7acb162e3163629e3e343373", - "resource_type": "aggregator", - "inputs": {"columns": {"col": "FLOAT_COLUMN|INT_COLUMN"}, "args": {}}, - }, - "cortex.stddev": { - "id": "51ca32fabf602a0c8fd7a9b4f5bb9a3d92bb6b3bbc356a727d7a25b19787353", - "impl_key": "aggregators/b8fa468e54c55083bf350f8b482c5323bd4bc12dd5fa0d859908ab2829aea7f.py", - "embed": None, - "file_path": "/home/ubuntu/src/github.com/cortexlabs/cortex/pkg/aggregators/aggregators.yaml", - "name": "stddev", - "namespace": "cortex", - "path": "", - "output_type": "FLOAT", - "index": 18, - "id_with_tags": "51ca32fabf602a0c8fd7a9b4f5bb9a3d92bb6b3bbc356a727d7a25b19787353", - "resource_type": "aggregator", - "inputs": {"columns": {"col": "FLOAT_COLUMN|INT_COLUMN"}, "args": {}}, - }, - "cortex.index_string": { - "id": "c32f21159377d5dc3ddc664fe5cabbe7b275eadc82b5f6ed711faa1a988deb4", - "impl_key": "/aggregators/index_string.py", - "embed": None, - "file_path": "/home/ubuntu/src/github.com/cortexlabs/cortex/pkg/aggregators/aggregators.yaml", - "name": "index_string", - "namespace": "cortex", - "path": "", - "output_type": {"index": ["STRING"], "reversed_index": {"STRING": "INT"}}, - "index": 29, - "id_with_tags": "c32f21159377d5dc3ddc664fe5cabbe7b275eadc82b5f6ed711faa1a988deb4", - "resource_type": "aggregator", - "inputs": {"columns": {"col": "STRING_COLUMN"}, "args": {}}, - }, - }, -} diff --git a/pkg/workloads/spark_job/test/unit/spark_util_test.py b/pkg/workloads/spark_job/test/unit/spark_util_test.py index a95b95085a..b5e02ff950 100644 --- a/pkg/workloads/spark_job/test/unit/spark_util_test.py +++ b/pkg/workloads/spark_job/test/unit/spark_util_test.py @@ -16,7 +16,7 @@ import spark_util import consts from lib.exceptions import UserException - +from lib import util import pytest from pyspark.sql.types import * from pyspark.sql import Row @@ -28,13 +28,16 @@ pytestmark = pytest.mark.usefixtures("spark") +def add_res_ref(input): + return util.resource_escape_seq_raw + input + def test_read_csv_valid(spark, write_csv_file, ctx_obj, get_context): csv_str = "\n".join(["a,0.1,", "b,1,1", "c,1.1,4"]) path_to_file = write_csv_file(csv_str) ctx_obj["environment"] = { - "data": {"type": "csv", "path": path_to_file, "schema": ["a_str", "b_float", "c_long"]} + "data": {"type": "csv", "path": path_to_file, "schema": [add_res_ref("a_str"), add_res_ref("b_float"), add_res_ref("c_long")]} } ctx_obj["raw_columns"] = { @@ -52,7 +55,7 @@ def test_read_csv_invalid_type(spark, write_csv_file, ctx_obj, get_context): path_to_file = write_csv_file(csv_str) ctx_obj["environment"] = { - "data": {"type": "csv", "path": path_to_file, "schema": ["a_str", "b_long", "c_long"]} + "data": {"type": "csv", "path": path_to_file, "schema": [add_res_ref("a_str"), add_res_ref("b_long"), add_res_ref("c_long")]} } ctx_obj["raw_columns"] = { @@ -69,7 +72,7 @@ def test_read_csv_infer_type(spark, write_csv_file, ctx_obj, get_context): test_cases = [ { "csv": ["a,0.1,", "b,0.1,1", "c,1.1,4"], - "schema": ["a_str", "b_float", "c_long"], + "schema": [add_res_ref("a_str"), add_res_ref("b_float"), add_res_ref("c_long")], "raw_columns": { "a_str": {"name": "a_str", "type": "INFERRED_COLUMN", "required": True, "id": "-"}, "b_float": { @@ -89,7 +92,7 @@ def test_read_csv_infer_type(spark, write_csv_file, ctx_obj, get_context): }, { "csv": ["1,4,4.5", "1,3,1.2", "1,5,4.7"], - "schema": ["a_str", "b_int", "c_float"], + "schema": [add_res_ref("a_str"), add_res_ref("b_int"), add_res_ref("c_float")], "raw_columns": { "a_str": {"name": "a_str", "type": "STRING_COLUMN", "required": True, "id": "-"}, "b_int": {"name": "b_int", "type": "INFERRED_COLUMN", "required": True, "id": "-"}, @@ -104,7 +107,7 @@ def test_read_csv_infer_type(spark, write_csv_file, ctx_obj, get_context): }, { "csv": ["1,4,2017-09-16", "1,3,2017-09-16", "1,5,2017-09-16"], - "schema": ["a_str", "b_int", "c_str"], + "schema": [add_res_ref("a_str"), add_res_ref("b_int"), add_res_ref("c_str")], "raw_columns": { "a_str": {"name": "a_str", "type": "STRING_COLUMN", "required": True, "id": "-"}, "b_int": {"name": "b_int", "type": "INFERRED_COLUMN", "required": True, "id": "-"}, @@ -114,7 +117,7 @@ def test_read_csv_infer_type(spark, write_csv_file, ctx_obj, get_context): }, { "csv": ["1,4,2017-09-16", "1,3,2017-09-16", "1,5,2017-09-16"], - "schema": ["a_float", "b_int", "c_str"], + "schema": [add_res_ref("a_float"), add_res_ref("b_int"), add_res_ref("c_str")], "raw_columns": { "a_float": {"name": "a_float", "type": "FLOAT_COLUMN", "required": True, "id": "-"}, "b_int": {"name": "b_int", "type": "INFERRED_COLUMN", "required": True, "id": "-"}, @@ -145,7 +148,7 @@ def test_read_csv_infer_invalid(spark, write_csv_file, ctx_obj, get_context): test_cases = [ { "csv": ["a,0.1,", "a,0.1,1", "a,1.1,4"], - "schema": ["a_int", "b_float", "c_long"], + "schema": [add_res_ref("a_int"), add_res_ref("b_float"), add_res_ref("c_long")], "raw_columns": { "a_int": {"name": "a_int", "type": "INT_COLUMN", "required": True, "id": "-"}, "b_float": { @@ -164,7 +167,7 @@ def test_read_csv_infer_invalid(spark, write_csv_file, ctx_obj, get_context): }, { "csv": ["a,1.1,", "a,1.1,1", "a,1.1,4"], - "schema": ["a_str", "b_int", "c_int"], + "schema": [add_res_ref("a_str"), add_res_ref("b_int"), add_res_ref("c_int")], "raw_columns": { "a_str": {"name": "a_str", "type": "INFERRED_COLUMN", "required": True, "id": "-"}, "b_int": {"name": "b_int", "type": "INT_COLUMN", "required": True, "id": "-"}, @@ -193,7 +196,7 @@ def test_read_csv_missing_column(spark, write_csv_file, ctx_obj, get_context): path_to_file = write_csv_file(csv_str) ctx_obj["environment"] = { - "data": {"type": "csv", "path": path_to_file, "schema": ["a_str", "b_long", "c_long"]} + "data": {"type": "csv", "path": path_to_file, "schema": [add_res_ref("a_str"), add_res_ref("b_long"), add_res_ref("c_long")]} } ctx_obj["raw_columns"] = { @@ -221,7 +224,7 @@ def test_read_csv_valid_options(spark, write_csv_file, ctx_obj, get_context): "data": { "type": "csv", "path": path_to_file, - "schema": ["a_str", "b_float", "c_long"], + "schema": [add_res_ref("a_str"), add_res_ref("b_float"), add_res_ref("c_long")], "csv_config": { "header": True, "sep": "|", @@ -437,9 +440,9 @@ def test_ingest_parquet_valid(spark, write_parquet_file, ctx_obj, get_context): "type": "parquet", "path": path_to_file, "schema": [ - {"parquet_column_name": "a_str", "raw_column_name": "a_str"}, - {"parquet_column_name": "b_float", "raw_column_name": "b_float"}, - {"parquet_column_name": "c_long", "raw_column_name": "c_long"}, + {"parquet_column_name": "a_str", "raw_column": add_res_ref("a_str")}, + {"parquet_column_name": "b_float", "raw_column": add_res_ref("b_float")}, + {"parquet_column_name": "c_long", "raw_column": add_res_ref("c_long")}, ], } } @@ -473,9 +476,9 @@ def test_ingest_parquet_infer_valid(spark, write_parquet_file, ctx_obj, get_cont ] ), "env": [ - {"parquet_column_name": "a_str", "raw_column_name": "a_str"}, - {"parquet_column_name": "b_float", "raw_column_name": "b_float"}, - {"parquet_column_name": "c_long", "raw_column_name": "c_long"}, + {"parquet_column_name": "a_str", "raw_column": add_res_ref("a_str")}, + {"parquet_column_name": "b_float", "raw_column": add_res_ref("b_float")}, + {"parquet_column_name": "c_long", "raw_column": add_res_ref("c_long")}, ], "raw_columns": { "a_str": {"name": "a_str", "type": "INFERRED_COLUMN", "required": True, "id": "1"}, @@ -508,9 +511,9 @@ def test_ingest_parquet_infer_valid(spark, write_parquet_file, ctx_obj, get_cont ] ), "env": [ - {"parquet_column_name": "a_str", "raw_column_name": "a_str"}, - {"parquet_column_name": "b_float", "raw_column_name": "b_float"}, - {"parquet_column_name": "c_long", "raw_column_name": "c_long"}, + {"parquet_column_name": "a_str", "raw_column": add_res_ref("a_str")}, + {"parquet_column_name": "b_float", "raw_column": add_res_ref("b_float")}, + {"parquet_column_name": "c_long", "raw_column": add_res_ref("c_long")}, ], "raw_columns": { "a_str": {"name": "a_str", "type": "INFERRED_COLUMN", "required": True, "id": "1"}, @@ -547,9 +550,9 @@ def test_ingest_parquet_infer_valid(spark, write_parquet_file, ctx_obj, get_cont ] ), "env": [ - {"parquet_column_name": "a_str", "raw_column_name": "a_str"}, - {"parquet_column_name": "b_float", "raw_column_name": "b_float"}, - {"parquet_column_name": "c_str", "raw_column_name": "c_str"}, + {"parquet_column_name": "a_str", "raw_column": add_res_ref("a_str")}, + {"parquet_column_name": "b_float", "raw_column": add_res_ref("b_float")}, + {"parquet_column_name": "c_str", "raw_column": add_res_ref("c_str")}, ], "raw_columns": { "a_str": {"name": "a_str", "type": "INFERRED_COLUMN", "required": True, "id": "1"}, @@ -581,9 +584,9 @@ def test_ingest_parquet_infer_valid(spark, write_parquet_file, ctx_obj, get_cont ] ), "env": [ - {"parquet_column_name": "a_str", "raw_column_name": "a_str"}, - {"parquet_column_name": "b_float", "raw_column_name": "b_float"}, - {"parquet_column_name": "c_str", "raw_column_name": "c_str"}, + {"parquet_column_name": "a_str", "raw_column": add_res_ref("a_str")}, + {"parquet_column_name": "b_float", "raw_column": add_res_ref("b_float")}, + {"parquet_column_name": "c_str", "raw_column": add_res_ref("c_str")}, ], "raw_columns": { "a_str": {"name": "a_str", "type": "INFERRED_COLUMN", "required": True, "id": "1"}, @@ -637,9 +640,9 @@ def test_read_parquet_infer_invalid(spark, write_parquet_file, ctx_obj, get_cont ] ), "env": [ - {"parquet_column_name": "a_str", "raw_column_name": "a_str"}, - {"parquet_column_name": "b_float", "raw_column_name": "b_float"}, - {"parquet_column_name": "c_long", "raw_column_name": "c_long"}, + {"parquet_column_name": "a_str", "raw_column": add_res_ref("a_str")}, + {"parquet_column_name": "b_float", "raw_column": add_res_ref("b_float")}, + {"parquet_column_name": "c_long", "raw_column": add_res_ref("c_long")}, ], "raw_columns": { "a_str": {"name": "a_str", "type": "INFERRED_COLUMN", "required": True, "id": "1"}, @@ -662,9 +665,9 @@ def test_read_parquet_infer_invalid(spark, write_parquet_file, ctx_obj, get_cont ] ), "env": [ - {"parquet_column_name": "a_str", "raw_column_name": "a_str"}, - {"parquet_column_name": "b_float", "raw_column_name": "b_float"}, - {"parquet_column_name": "c_str", "raw_column_name": "c_str"}, + {"parquet_column_name": "a_str", "raw_column": add_res_ref("a_str")}, + {"parquet_column_name": "b_float", "raw_column": add_res_ref("b_float")}, + {"parquet_column_name": "c_str", "raw_column": add_res_ref("c_str")}, ], "raw_columns": { "a_str": {"name": "a_str", "type": "INFERRED_COLUMN", "required": True, "id": "1"}, @@ -687,9 +690,9 @@ def test_read_parquet_infer_invalid(spark, write_parquet_file, ctx_obj, get_cont ] ), "env": [ - {"parquet_column_name": "a_str", "raw_column_name": "a_str"}, - {"parquet_column_name": "b_float", "raw_column_name": "b_float"}, - {"parquet_column_name": "c_long", "raw_column_name": "c_long"}, + {"parquet_column_name": "a_str", "raw_column": add_res_ref("a_str")}, + {"parquet_column_name": "b_float", "raw_column": add_res_ref("b_float")}, + {"parquet_column_name": "c_long", "raw_column": add_res_ref("c_long")}, ], "raw_columns": { "a_str": {"name": "a_str", "type": "INFERRED_COLUMN", "required": True, "id": "1"}, @@ -712,9 +715,9 @@ def test_read_parquet_infer_invalid(spark, write_parquet_file, ctx_obj, get_cont ] ), "env": [ - {"parquet_column_name": "a_str", "raw_column_name": "a_str"}, - {"parquet_column_name": "b_float", "raw_column_name": "b_float"}, - {"parquet_column_name": "c_long", "raw_column_name": "c_long"}, + {"parquet_column_name": "a_str", "raw_column": add_res_ref("a_str")}, + {"parquet_column_name": "b_float", "raw_column": add_res_ref("b_float")}, + {"parquet_column_name": "c_long", "raw_column": add_res_ref("c_long")}, ], "raw_columns": { "a_str": {"name": "a_str", "type": "INT_COLUMN", "required": True, "id": "1"}, @@ -742,9 +745,9 @@ def test_read_parquet_infer_invalid(spark, write_parquet_file, ctx_obj, get_cont ] ), "env": [ - {"parquet_column_name": "a_str", "raw_column_name": "a_str"}, - {"parquet_column_name": "b_float", "raw_column_name": "b_float"}, - {"parquet_column_name": "c_long", "raw_column_name": "c_long"}, + {"parquet_column_name": "a_str", "raw_column": add_res_ref("a_str")}, + {"parquet_column_name": "b_float", "raw_column": add_res_ref("b_float")}, + {"parquet_column_name": "c_long", "raw_column": add_res_ref("c_long")}, ], "raw_columns": { "a_str": {"name": "a_str", "type": "INT_COLUMN", "required": True, "id": "1"}, @@ -799,10 +802,10 @@ def test_ingest_parquet_extra_cols(spark, write_parquet_file, ctx_obj, get_conte "type": "parquet", "path": path_to_file, "schema": [ - {"parquet_column_name": "a_str", "raw_column_name": "a_str"}, - {"parquet_column_name": "b_float", "raw_column_name": "b_float"}, - {"parquet_column_name": "c_long", "raw_column_name": "c_long"}, - {"parquet_column_name": "d_long", "raw_column_name": "d_long"}, + {"parquet_column_name": "a_str", "raw_column": add_res_ref("a_str")}, + {"parquet_column_name": "b_float", "raw_column": add_res_ref("b_float")}, + {"parquet_column_name": "c_long", "raw_column": add_res_ref("c_long")}, + {"parquet_column_name": "d_long", "raw_column": add_res_ref("d_long")}, ], } } @@ -834,9 +837,9 @@ def test_ingest_parquet_missing_cols(spark, write_parquet_file, ctx_obj, get_con "type": "parquet", "path": path_to_file, "schema": [ - {"parquet_column_name": "a_str", "raw_column_name": "a_str"}, - {"parquet_column_name": "b_float", "raw_column_name": "b_float"}, - {"parquet_column_name": "c_long", "raw_column_name": "c_long"}, + {"parquet_column_name": "a_str", "raw_column": add_res_ref("a_str")}, + {"parquet_column_name": "b_float", "raw_column": add_res_ref("b_float")}, + {"parquet_column_name": "c_long", "raw_column": add_res_ref("c_long")}, ], } } @@ -870,9 +873,9 @@ def test_ingest_parquet_type_mismatch(spark, write_parquet_file, ctx_obj, get_co "type": "parquet", "path": path_to_file, "schema": [ - {"parquet_column_name": "a_str", "raw_column_name": "a_str"}, - {"parquet_column_name": "b_float", "raw_column_name": "b_float"}, - {"parquet_column_name": "c_long", "raw_column_name": "c_long"}, + {"parquet_column_name": "a_str", "raw_column": add_res_ref("a_str")}, + {"parquet_column_name": "b_float", "raw_column": add_res_ref("b_float")}, + {"parquet_column_name": "c_long", "raw_column": add_res_ref("c_long")}, ], } } @@ -908,9 +911,9 @@ def test_ingest_parquet_failed_requirements( "type": "parquet", "path": path_to_file, "schema": [ - {"parquet_column_name": "a_str", "raw_column_name": "a_str"}, - {"parquet_column_name": "b_float", "raw_column_name": "b_float"}, - {"parquet_column_name": "c_long", "raw_column_name": "c_long"}, + {"parquet_column_name": "a_str", "raw_column": add_res_ref("a_str")}, + {"parquet_column_name": "b_float", "raw_column": add_res_ref("b_float")}, + {"parquet_column_name": "c_long", "raw_column": add_res_ref("c_long")}, ], } } @@ -928,43 +931,29 @@ def test_ingest_parquet_failed_requirements( assert validations == {"a_str": [("(a_str IN (a, b))", 1)]} -def test_column_names_to_index(): - sample_columns_input_config = {"b": "b_col", "a": "a_col"} - actual_list, actual_dict = spark_util.column_names_to_index(sample_columns_input_config) - assert (["a_col", "b_col"], {"b": 1, "a": 0}) == (actual_list, actual_dict) - - sample_columns_input_config = {"a": "a_col"} - - actual_list, actual_dict = spark_util.column_names_to_index(sample_columns_input_config) - assert (["a_col"], {"a": 0}) == (actual_list, actual_dict) - - sample_columns_input_config = {"nums": ["a_long", "a_col", "b_col", "b_col"], "a": "a_long"} - - expected_col_list = ["a_col", "a_long", "b_col"] - expected_columns_input_config = {"nums": [1, 0, 2, 2], "a": 1} - actual_list, actual_dict = spark_util.column_names_to_index(sample_columns_input_config) - - assert (expected_col_list, expected_columns_input_config) == (actual_list, actual_dict) - - def test_run_builtin_aggregators_success(spark, ctx_obj, get_context): + ctx_obj["raw_columns"] = { + "a": { + "id": "2", + "name": "a", + "type": "INT_COLUMN" + } + } ctx_obj["aggregators"] = { - "cortex.sum": {"name": "sum", "namespace": "cortex"}, - "cortex.first": {"name": "first", "namespace": "cortex"}, + "cortex.sum_int": { + "name": "sum_int", + "namespace": "cortex", + "input": {"_type": "INT_COLUMN"}, + "output_type": "INT_COLUMN", + } } ctx_obj["aggregates"] = { "sum_a": { "name": "sum_a", "id": "1", - "aggregator": "cortex.sum", - "inputs": {"columns": {"col": "a"}}, - }, - "first_a": { - "id": "2", - "name": "first_a", - "aggregator": "cortex.first", - "inputs": {"columns": {"col": "a"}, "args": {"ignorenulls": "some_constant"}}, - }, + "aggregator": "cortex.sum_int", + "input": add_res_ref("a"), + } } aggregate_list = [v for v in ctx_obj["aggregates"].values()] @@ -976,47 +965,15 @@ def test_run_builtin_aggregators_success(spark, ctx_obj, get_context): df = spark.createDataFrame(data, StructType([StructField("a", LongType())])) spark_util.run_builtin_aggregators(aggregate_list, df, ctx, spark) - calls = [call(6, ctx_obj["aggregates"]["sum_a"]), call(1, ctx_obj["aggregates"]["first_a"])] + calls = [call(6, ctx_obj["aggregates"]["sum_a"])] ctx.store_aggregate_result.assert_has_calls(calls, any_order=True) - ctx.populate_args.assert_called_once_with({"ignorenulls": "some_constant"}) - - -def test_run_builtin_aggregators_error(spark, ctx_obj, get_context): - ctx_obj["aggregators"] = {"cortex.first": {"name": "first", "namespace": "cortex"}} - ctx_obj["aggregates"] = { - "first_a": { - "name": "first_a", - "aggregator": "cortex.first", - "inputs": { - "columns": {"col": "a"}, - "args": {"ignoreNulls": "some_constant"}, # supposed to be ignorenulls - }, - "id": "1", - } - } - - aggregate_list = [v for v in ctx_obj["aggregates"].values()] - - ctx = get_context(ctx_obj) - ctx.store_aggregate_result = MagicMock() - ctx.populate_args = MagicMock(return_value={"ignoreNulls": True}) - - data = [Row(a=None), Row(a=1), Row(a=2), Row(a=3)] - df = spark.createDataFrame(data, StructType([StructField("a", LongType())])) - - with pytest.raises(Exception) as exec_info: - spark_util.run_builtin_aggregators(aggregate_list, df, ctx, spark) - - ctx.store_aggregate_result.assert_not_called() - ctx.populate_args.assert_called_once_with({"ignoreNulls": "some_constant"}) - -def test_infer_type(): - assert spark_util.infer_type(1) == consts.COLUMN_TYPE_INT - assert spark_util.infer_type(1.0) == consts.COLUMN_TYPE_FLOAT - assert spark_util.infer_type("cortex") == consts.COLUMN_TYPE_STRING +def test_infer_python_type(): + assert spark_util.infer_python_type(1) == consts.COLUMN_TYPE_INT + assert spark_util.infer_python_type(1.0) == consts.COLUMN_TYPE_FLOAT + assert spark_util.infer_python_type("cortex") == consts.COLUMN_TYPE_STRING - assert spark_util.infer_type([1]) == consts.COLUMN_TYPE_INT_LIST - assert spark_util.infer_type([1.0]) == consts.COLUMN_TYPE_FLOAT_LIST - assert spark_util.infer_type(["cortex"]) == consts.COLUMN_TYPE_STRING_LIST + assert spark_util.infer_python_type([1]) == consts.COLUMN_TYPE_INT_LIST + assert spark_util.infer_python_type([1.0]) == consts.COLUMN_TYPE_FLOAT_LIST + assert spark_util.infer_python_type(["cortex"]) == consts.COLUMN_TYPE_STRING_LIST