Skip to content

Commit d751320

Browse files
committed
Fix failing tests
1 parent d64de7e commit d751320

File tree

5 files changed

+28
-12
lines changed

5 files changed

+28
-12
lines changed

datafusion/common/src/config.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1566,7 +1566,7 @@ config_namespace! {
15661566
pub struct CsvOptions {
15671567
/// Specifies whether there is a CSV header (i.e. the first line
15681568
/// consists of is column names). If not specified, uses default from
1569-
/// the `CREATE TABLE` command, if any.
1569+
/// the session state, if any.
15701570
pub has_header: Option<bool>, default = None
15711571
pub delimiter: u8, default = b','
15721572
pub quote: u8, default = b'"'
@@ -1609,8 +1609,8 @@ impl CsvOptions {
16091609

16101610
/// Returns true if the first line is a header. If format options does not
16111611
/// specify whether there is a header, consults the configuration.
1612-
pub fn has_header(&self, config_opt: &ConfigOptions) -> bool {
1613-
self.has_header.unwrap_or(config_opt.catalog.has_header)
1612+
pub fn has_header(&self) -> Option<bool> {
1613+
self.has_header
16141614
}
16151615

16161616
/// The character separating values within a row.

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use arrow::array::RecordBatch;
4040
use arrow::csv::WriterBuilder;
4141
use arrow::datatypes::SchemaRef;
4242
use arrow::datatypes::{DataType, Field, Fields, Schema};
43-
use datafusion_common::config::{ConfigOptions, CsvOptions};
43+
use datafusion_common::config::CsvOptions;
4444
use datafusion_common::file_options::csv_writer::CsvWriterOptions;
4545
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
4646
use datafusion_execution::TaskContext;
@@ -142,8 +142,8 @@ impl CsvFormat {
142142
}
143143

144144
/// True if the first line is a header.
145-
pub fn has_header(&self, config_opt: &ConfigOptions) -> bool {
146-
self.options.has_header(config_opt)
145+
pub fn has_header(&self) -> Option<bool> {
146+
self.options.has_header
147147
}
148148

149149
/// The character separating values within a row.
@@ -245,7 +245,9 @@ impl FileFormat for CsvFormat {
245245
conf,
246246
// If format options does not specify whether there is a header,
247247
// we consult configuration options.
248-
self.options.has_header(state.config_options()),
248+
self.options
249+
.has_header
250+
.unwrap_or(state.config_options().catalog.has_header),
249251
self.options.delimiter,
250252
self.options.quote,
251253
self.options.escape,
@@ -303,7 +305,10 @@ impl CsvFormat {
303305
while let Some(chunk) = stream.next().await.transpose()? {
304306
let format = arrow::csv::reader::Format::default()
305307
.with_header(
306-
self.options.has_header(state.config_options()) && first_chunk,
308+
self.options
309+
.has_header
310+
.unwrap_or(state.config_options().catalog.has_header)
311+
&& first_chunk,
307312
)
308313
.with_delimiter(self.options.delimiter);
309314

datafusion/core/src/datasource/stream.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use arrow_schema::SchemaRef;
3030
use async_trait::async_trait;
3131
use futures::StreamExt;
3232

33-
use datafusion_common::{plan_err, Constraints, DataFusionError, Result};
33+
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result};
3434
use datafusion_common_runtime::SpawnedTask;
3535
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3636
use datafusion_expr::{CreateExternalTable, Expr, TableType};
@@ -58,11 +58,22 @@ impl TableProviderFactory for StreamTableFactory {
5858
let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into());
5959
let location = cmd.location.clone();
6060
let encoding = cmd.file_type.parse()?;
61+
let header = if let Ok(opt) = cmd
62+
.options
63+
.get("format.has_header")
64+
.map(|has_header| bool::from_str(has_header))
65+
.transpose()
66+
{
67+
opt.unwrap_or(false)
68+
} else {
69+
return config_err!("format.has_header can either be true or false");
70+
};
6171

6272
let config = StreamConfig::new_file(schema, location.into())
6373
.with_encoding(encoding)
6474
.with_order(cmd.order_exprs.clone())
6575
.with_batch_size(state.config().batch_size())
76+
.with_header(header)
6677
.with_constraints(cmd.constraints.clone());
6778

6879
Ok(Arc::new(StreamTable(Arc::new(config))))

datafusion/core/tests/sql/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ async fn register_aggregate_csv_by_sql(ctx: &SessionContext) {
8585
c13 VARCHAR NOT NULL
8686
)
8787
STORED AS CSV
88-
LOCATION '{testdata}/csv/aggregate_test_100.csv
88+
LOCATION '{testdata}/csv/aggregate_test_100.csv'
8989
OPTIONS ('format.has_header' 'true')
9090
"
9191
))

datafusion/proto/tests/cases/roundtrip_logical_plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ async fn roundtrip_custom_listing_tables() -> Result<()> {
238238
STORED AS CSV
239239
WITH ORDER (a ASC, b ASC)
240240
WITH ORDER (c ASC)
241-
LOCATION '../core/tests/data/window_2.csv';
241+
LOCATION '../core/tests/data/window_2.csv'
242242
OPTIONS ('format.has_header' 'true')";
243243

244244
let plan = ctx.state().create_logical_plan(query).await?;
@@ -268,7 +268,7 @@ async fn roundtrip_logical_plan_aggregation_with_pk() -> Result<()> {
268268
STORED AS CSV
269269
WITH ORDER (a ASC, b ASC)
270270
WITH ORDER (c ASC)
271-
LOCATION '../core/tests/data/window_2.csv';
271+
LOCATION '../core/tests/data/window_2.csv'
272272
OPTIONS ('format.has_header' 'true')",
273273
)
274274
.await?;

0 commit comments

Comments
 (0)