Skip to content

Allow users to ingest a subset of input columns #92

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions examples/movie-ratings/resources/environments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,3 @@
- kind: raw_column
name: rating
type: FLOAT_COLUMN

- kind: raw_column
name: timestamp
type: INT_COLUMN
97 changes: 46 additions & 51 deletions pkg/workloads/spark_job/spark_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,39 +100,6 @@ def write_training_data(model_name, df, ctx, spark):
return df


def expected_schema_from_context(ctx):
data_config = ctx.environment["data"]

if data_config["type"] == "csv":
expected_field_names = data_config["schema"]
else:
expected_field_names = [f["raw_column_name"] for f in data_config["schema"]]

schema_fields = [
StructField(
name=fname,
dataType=CORTEX_TYPE_TO_SPARK_TYPE[ctx.columns[fname]["type"]],
nullable=not ctx.columns[fname].get("required", False),
)
for fname in expected_field_names
]
return StructType(schema_fields)


def compare_column_schemas(expected_schema, actual_schema):
# Nullables are being left out because when Spark is reading CSV files, it is setting nullable to true
# regardless of if the column has all values or not. The null checks will be done elsewhere.
# This compares only the schemas
expected_sorted_fields = sorted(
[(f.name, f.dataType) for f in expected_schema], key=lambda f: f[0]
)

# Sorted for determinism when testing
actual_sorted_fields = sorted([(f.name, f.dataType) for f in actual_schema], key=lambda f: f[0])

return expected_sorted_fields == actual_sorted_fields


def min_check(input_col, min):
return input_col >= min, input_col < min

Expand Down Expand Up @@ -220,50 +187,78 @@ def value_check_data(ctx, df, raw_columns=None):


def ingest(ctx, spark):
expected_schema = expected_schema_from_context(ctx)

if ctx.environment["data"]["type"] == "csv":
df = read_csv(ctx, spark)
elif ctx.environment["data"]["type"] == "parquet":
df = read_parquet(ctx, spark)

if compare_column_schemas(expected_schema, df.schema) is not True:
logger.error("expected schema:")
log_df_schema(spark.createDataFrame([], expected_schema), logger.error)
logger.error("found schema:")
log_df_schema(df, logger.error)
input_type_map = {f.name: f.dataType for f in df.schema}

for raw_column_name in ctx.raw_columns.keys():
raw_column = ctx.raw_columns[raw_column_name]
expected_types = CORTEX_TYPE_TO_ACCEPTABLE_SPARK_TYPES[raw_column["type"]]
actual_type = input_type_map[raw_column_name]
if actual_type not in expected_types:
logger.error("found schema:")
log_df_schema(df, logger.error)

raise UserException(
"raw column " + raw_column_name,
"type mismatch",
"expected {} but found {}".format(
" or ".join(str(x) for x in expected_types), actual_type
),
)
target_type = CORTEX_TYPE_TO_SPARK_TYPE[raw_column["type"]]

raise UserException("raw data schema mismatch")
if target_type != actual_type:
df = df.withColumn(raw_column_name, F.col(raw_column_name).cast(target_type))

return df
return df.select(*sorted(df.columns))


def read_csv(ctx, spark):
data_config = ctx.environment["data"]
schema = expected_schema_from_context(ctx)
expected_field_names = data_config["schema"]

schema_fields = []
for field_name in expected_field_names:
if field_name in ctx.raw_columns:
spark_type = CORTEX_TYPE_TO_SPARK_TYPE[ctx.raw_columns[field_name]["type"]]
else:
spark_type = StringType()

schema_fields.append(StructField(name=field_name, dataType=spark_type))

csv_config = {
util.snake_to_camel(param_name): val
for param_name, val in data_config.get("csv_config", {}).items()
if val is not None
}

return spark.read.csv(data_config["path"], schema=schema, mode="FAILFAST", **csv_config)
df = spark.read.csv(
data_config["path"], schema=StructType(schema_fields), mode="FAILFAST", **csv_config
)
return df.select(*ctx.raw_columns.keys())


def read_parquet(ctx, spark):
parquet_config = ctx.environment["data"]
df = spark.read.parquet(parquet_config["path"])

parquet_columns = [c["parquet_column_name"] for c in parquet_config["schema"]]
missing_cols = util.subtract_lists(parquet_columns, df.columns)
alias_map = {
c["parquet_column_name"]: c["raw_column_name"]
for c in parquet_config["schema"]
if c["parquet_column_name"] in ctx.raw_columns
}

missing_cols = set(alias_map.keys()) - set(df.columns)
if len(missing_cols) > 0:
raise UserException("parquet dataset", "missing columns: " + str(missing_cols))
logger.error("found schema:")
log_df_schema(df, logger.error)
raise UserException("missing column(s) in input dataset", str(missing_cols))

selectExprs = [
"{} as {}".format(c["parquet_column_name"], c["raw_column_name"])
for c in parquet_config["schema"]
]
selectExprs = ["{} as {}".format(alias_map[alias], alias) for alias in alias_map.keys()]

return df.selectExpr(*selectExprs)

Expand Down
Loading