Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ protected void canDeriveSourceInternal() {
* both doc values and stored field
*/
@Override
protected DerivedFieldGenerator derivedFieldGenerator() {
public DerivedFieldGenerator derivedFieldGenerator() {
return new DerivedFieldGenerator(
mappedFieldType,
new SortedNumericDocValuesFetcher(mappedFieldType, simpleName()),
Expand Down
266 changes: 262 additions & 4 deletions plugins/engine-datafusion/jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,25 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::ptr::addr_of_mut;
use jni::objects::{JByteArray, JClass, JObject};
use jni::objects::{JByteArray, JClass, JIntArray, JObject};
use jni::sys::{jbyteArray, jlong, jstring};
use jni::JNIEnv;
use std::sync::Arc;
use arrow_array::{Array, StructArray};
use arrow_array::ffi::FFI_ArrowArray;
use arrow_schema::DataType;
use arrow_schema::ffi::FFI_ArrowSchema;
use datafusion::common::DataFusionError;

mod util;
mod row_id_optimizer;
mod listing_table;

use datafusion::execution::context::SessionContext;

use crate::util::{create_object_meta_from_filenames, parse_string_arr, set_object_result_error, set_object_result_ok};
use crate::util::{create_object_meta_from_filenames, parse_string_arr, set_object_result, set_object_result_error, set_object_result_ok};
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::listing::{ListingTableUrl};
use datafusion::execution::cache::cache_manager::CacheManagerConfig;
Expand All @@ -31,12 +33,22 @@ use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
use datafusion::prelude::SessionConfig;
use datafusion::DATAFUSION_VERSION;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::parquet::{ParquetAccessPlan, RowGroupAccess};
use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::execution::TaskContext;
use datafusion::parquet::arrow::arrow_reader::RowSelector;
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::PartitionedFile;
use datafusion_datasource::source::DataSourceExec;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::substrait::proto::Plan;
use futures::TryStreamExt;
use jni::objects::{JObjectArray, JString};
use object_store::ObjectMeta;
use parquet::file::reader::{FileReader, SerializedFileReader};
use prost::Message;
use tokio::runtime::Runtime;
use crate::listing_table::{ListingOptions, ListingTable, ListingTableConfig};
Expand Down Expand Up @@ -430,3 +442,249 @@ pub extern "system" fn Java_org_opensearch_datafusion_RecordBatchStream_getSchem
}
}
}

#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_executeFetchPhase(
mut env: JNIEnv,
_class: JClass,
shard_view_ptr: jlong,
values: JIntArray, // TODO : Add projections
tokio_runtime_env_ptr: jlong,
callback: JObject,
) {


let shard_view = unsafe { &*(shard_view_ptr as *const ShardView) };
let runtime_ptr = unsafe { &*(tokio_runtime_env_ptr as *const Runtime)};

let table_path = shard_view.table_path();
let files_meta = shard_view.files_meta();


// Safety checks first
if values.is_null() {
let _ = env.throw_new("java/lang/NullPointerException", "values array is null");
return;
}

// Get array length
let array_length = match env.get_array_length(&values) {
Ok(len) => len,
Err(e) => {
let _ = env.throw_new("java/lang/RuntimeException",
format!("Failed to get array length: {:?}", e));
return;
}
};

// Create a buffer to hold the array data
let mut row_ids = vec![0; array_length as usize];

// Copy the array data into the buffer
match env.get_int_array_region(&values, 0, &mut row_ids) {
Ok(_) => {
// Now buffer contains the array data
// println!("Received array of length {}: {:?}", array_length, row_ids);
},
Err(e) => {
let _ = env.throw_new("java/lang/RuntimeException",
format!("Failed to get array data: {:?}", e));
return;
}
}


// Safety checks
if tokio_runtime_env_ptr == 0 {
let error = DataFusionError::Execution("Null runtime pointer".to_string());
set_object_result_error(&mut env, callback, &error);
return;
}

let access_plans = create_access_plans(row_ids, files_meta.clone());


let runtime_env = RuntimeEnvBuilder::new()
.with_cache_manager(CacheManagerConfig::default()
//.with_list_files_cache(Some(list_file_cache)) TODO: //Fix this
).build().unwrap();
let ctx = SessionContext::new_with_config_rt(SessionConfig::new(), Arc::new(runtime_env));

// Create default parquet options
let file_format = ParquetFormat::new();
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(".parquet"); // TODO: take this as parameter
// .with_table_partition_cols(vec![("row_base".to_string(), DataType::Int32)]); // TODO: enable only for query phase

// Ideally the executor will give this



runtime_ptr.block_on(async {

let parquet_schema = listing_options
.infer_schema(&ctx.state(), &table_path.clone())
.await.unwrap();

let partitioned_files: Vec<PartitionedFile> = files_meta
.iter()
.zip(access_plans.await.iter())
.map(|(meta, access_plan)| {
PartitionedFile::new(
meta.location.clone(),
meta.size
).with_extensions(Arc::new(access_plan.clone()))
})
.collect();

let file_group = FileGroup::new(partitioned_files);

let file_source = Arc::new(
ParquetSource::default()
// provide the factory to create parquet reader without re-reading metadata
//.with_parquet_file_reader_factory(Arc::new(reader_factory)),
);

let file_scan_config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), parquet_schema.clone(), file_source)
//.with_limit(limit)
// .with_projection(projection)
.with_file_group(file_group)
.build();

let parquet_exec = DataSourceExec::from_data_source(file_scan_config);

// IMPORTANT: Only get one reference to each pointer
// let liquid_ctx = unsafe { &mut *(context_ptr as *mut SessionContext) };
// let session_ctx = unsafe { Box::from_raw(context_ptr as *mut SessionContext) };
let mut optimized_plan: Arc<dyn ExecutionPlan> = parquet_exec.clone();



let task_ctx = Arc::new(TaskContext::default());

// let runtime = unsafe { &mut *(runtime as *mut Runtime) };
// runtime.block_on(async {
match optimized_plan.execute(0, task_ctx) {
Ok(stream) => {
let boxed_stream = Box::new(stream);
let stream_ptr = Box::into_raw(boxed_stream);
set_object_result_ok(
&mut env,
callback,
stream_ptr
);
}
Err(e) => {
set_object_result_error(
&mut env,
callback,
&e
);
}
}
});
}

async fn create_access_plans(
row_ids: Vec<i32>,
files_meta: Arc<Vec<ObjectMeta>>
) -> Result<Vec<ParquetAccessPlan>, DataFusionError> {
let mut parquet_metadata = Vec::new();

// First get ParquetMetaData for all files
for meta in files_meta.iter() {
let path = meta.location.as_ref();
let file = File::open(path)?;
let reader = SerializedFileReader::new(file).unwrap();
parquet_metadata.push(reader.metadata().clone());
}

let mut access_plans = Vec::new();
let mut cumulative_rows = 0;

// Process each file
for (file_idx, metadata) in parquet_metadata.iter().enumerate() {
let total_row_groups = metadata.num_row_groups();
let mut access_plan = ParquetAccessPlan::new_all(total_row_groups);

// Calculate total rows in current file
let mut file_total_rows = 0;
let mut row_group_map: HashMap<usize, i32> = HashMap::new();
for group_id in 0..total_row_groups {
let rows_in_group = metadata.row_group(group_id).num_rows() as i32;
row_group_map.insert(group_id, rows_in_group);
file_total_rows += rows_in_group;
}

// Filter row IDs that belong to this file
let file_row_ids: Vec<i32> = row_ids.iter()
.filter(|&&id| id >= cumulative_rows && id < cumulative_rows + file_total_rows)
.map(|&id| id - cumulative_rows) // Convert global row IDs to local
.collect();

if file_row_ids.is_empty() {
// If no rows belong to this file, skip all row groups
for group_id in 0..total_row_groups {
access_plan.skip(group_id);
}
} else {
// Group local row IDs by row group
let mut group_map: HashMap<usize, BTreeSet<i32>> = HashMap::new();
for row_id in file_row_ids {
let rows_per_group = *row_group_map.get(&0).unwrap();
group_map.entry((row_id / rows_per_group) as usize)
.or_default()
.insert(row_id % rows_per_group);
}

// Process each row group
for group_id in 0..total_row_groups {
let row_group_size = *row_group_map.get(&group_id).unwrap() as usize;
if let Some(group_row_ids) = group_map.get(&group_id) {
let mut relative_row_ids: Vec<usize> = group_row_ids.iter()
.map(|&x| x as usize)
.collect();

if relative_row_ids.is_empty() {
access_plan.skip(group_id);
} else if relative_row_ids.len() == row_group_size {
access_plan.scan(group_id);
} else {
// Create selectors
let mut selectors = Vec::new();
let mut current_pos = 0;
let mut i = 0;
while i < relative_row_ids.len() {
let mut target_pos = relative_row_ids[i];
if target_pos > current_pos {
selectors.push(RowSelector::skip(target_pos - current_pos));
}
let mut select_count = 1;
while i + 1 < relative_row_ids.len() &&
relative_row_ids[i + 1] == relative_row_ids[i] + 1 {
select_count += 1;
i += 1;
target_pos = relative_row_ids[i];
}
selectors.push(RowSelector::select(select_count));
current_pos = relative_row_ids[i] + 1;
i += 1;
}
if current_pos < row_group_size {
selectors.push(RowSelector::skip(row_group_size - current_pos));
}
access_plan.set(group_id, RowGroupAccess::Selection(selectors.into()));
}
} else {
access_plan.skip(group_id);
}
}
}

access_plans.push(access_plan);
cumulative_rows += file_total_rows;
}

Ok(access_plans)
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ private static synchronized void loadNativeLibrary() {
*/
public static native long executeSubstraitQuery(long cachePtr, byte[] substraitPlan, long runtimePtr);

/**
* Execute a Substrait query plan
* @param cachePtr the session context ID
* @param rowIds row ids for which record needs to fetch
* @param runtimePtr runtime pointer
* @return stream pointer for result iteration
*/

// TODO: tie this to actual FetchPhase
public static native long executeFetchPhase(long cachePtr, long[] rowIds, long runtimePtr);

public static native long createDatafusionReader(String path, String[] files);

public static native void closeDatafusionReader(long ptr);
Expand Down
Loading
Loading