Skip to content

Commit d1b0db0

Browse files
authored
refactor: simplify arrowtable with memtable (#134)
1 parent 26b2413 commit d1b0db0

File tree

1 file changed

+6
-17
lines changed

1 file changed

+6
-17
lines changed

datafusion-postgres/src/pg_catalog.rs

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,11 @@ use datafusion::arrow::array::{
1010
use datafusion::arrow::datatypes::{DataType, Field, SchemaRef};
1111
use datafusion::arrow::ipc::reader::FileReader;
1212
use datafusion::catalog::streaming::StreamingTable;
13-
use datafusion::catalog::{CatalogProviderList, SchemaProvider};
13+
use datafusion::catalog::{CatalogProviderList, MemTable, SchemaProvider};
1414
use datafusion::common::utils::SingleRowListArrayBuilder;
1515
use datafusion::datasource::{TableProvider, ViewTable};
1616
use datafusion::error::{DataFusionError, Result};
17-
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
1817
use datafusion::logical_expr::{ColumnarValue, ScalarUDF, Volatility};
19-
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
2018
use datafusion::physical_plan::streaming::PartitionStream;
2119
use datafusion::prelude::{create_udf, SessionContext};
2220
use postgres_types::Oid;
@@ -394,19 +392,10 @@ impl ArrowTable {
394392
data: batches,
395393
})
396394
}
397-
}
398-
399-
impl PartitionStream for ArrowTable {
400-
fn schema(&self) -> &SchemaRef {
401-
&self.schema
402-
}
403395

404-
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
405-
let data = self.data.clone();
406-
Box::pin(RecordBatchStreamAdapter::new(
407-
self.schema.clone(),
408-
futures::stream::iter(data.into_iter().map(Ok)),
409-
))
396+
/// Convert the arrow data into datafusion MemTable
397+
pub fn try_into_memtable(self) -> Result<MemTable> {
398+
MemTable::try_new(self.schema, vec![self.data])
410399
}
411400
}
412401

@@ -664,8 +653,8 @@ impl PgCatalogStaticTables {
664653
/// Create table from dumped arrow data
665654
fn create_arrow_table(data_bytes: Vec<u8>) -> Result<Arc<dyn TableProvider>> {
666655
let table = ArrowTable::from_ipc_data(data_bytes)?;
667-
let streaming_table = StreamingTable::try_new(table.schema.clone(), vec![Arc::new(table)])?;
668-
Ok(Arc::new(streaming_table))
656+
let mem_table = table.try_into_memtable()?;
657+
Ok(Arc::new(mem_table))
669658
}
670659
}
671660

0 commit comments

Comments
 (0)