Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion plugins/engine-datafusion/jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute
let table_path = shard_view.table_path();
let files_meta = shard_view.files_meta();

println!("Table path: {}", table_path);
println!("Files: {:?}", files_meta);

let list_file_cache = Arc::new(DefaultListFilesCache::default());
list_file_cache.put(table_path.prefix(), files_meta);
Expand Down Expand Up @@ -252,7 +254,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute
// Create a new TableProvider
let provider = Arc::new(ListingTable::try_new(config).unwrap());
let shard_id = table_path.prefix().filename().expect("error in fetching Path");
ctx.register_table("logs", provider)
ctx.register_table("hits", provider)
.expect("Failed to attach the Table");

});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class DatafusionEngine extends SearchExecEngine<DatafusionContext, Datafu
*/
public DatafusionEngine(DataFormat dataFormat, Collection<FileMetadata> formatCatalogSnapshot, DataFusionService dataFusionService) throws IOException {
this.dataFormat = dataFormat;
this.datafusionReaderManager = new DatafusionReaderManager("TODO://FigureOutPath", formatCatalogSnapshot);
this.datafusionReaderManager = new DatafusionReaderManager("/Users/abandeji/Public/workplace/OpenSearch/build/testclusters/runTask-0/data/nodes/0/indices/7xU89OS-Tn2_nO7CboVqMg/0/parquet", formatCatalogSnapshot);
this.datafusionService = dataFusionService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ public class DatafusionReader implements Closeable {
public DatafusionReader(String directoryPath, Collection<FileMetadata> files) {
this.directoryPath = directoryPath;
this.files = files;
String[] fileNames = Objects.isNull(files) ? new String[]{} : files.stream().map(FileMetadata::fileName).toArray(String[]::new);
String[] fileNames = Objects.isNull(files) ? new String[]{"hits_data.parquet"} : files.stream().map(FileMetadata::fileName).toArray(String[]::new);
this.cachePtr = DataFusionQueryJNI.createDatafusionReader(directoryPath, fileNames);
incRef();
}

/**
* Gets the cache pointer.
*
*
* @return the cache pointer
*/
public long getCachePtr() {
Expand All @@ -68,7 +68,7 @@ public void incRef() {

/**
* Decrements the reference count.
*
*
* @throws IOException if an I/O error occurs
*/
public void decRef() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testQueryPhaseExecutor() throws IOException {
Map<String, Object[]> finalRes = new HashMap<>();
DatafusionSearcher datafusionSearcher = null;
try {
DatafusionEngine engine = new DatafusionEngine(DataFormat.CSV, List.of(new FileMetadata(new TextDF(), "generation-1-optimized.parquet")), service);
DatafusionEngine engine = new DatafusionEngine(DataFormat.CSV, List.of(new FileMetadata(new TextDF(), "hits_data.parquet")), service);
datafusionSearcher = engine.acquireSearcher("Search");


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public CompositeIndexingExecutionEngine(PluginsService pluginsService, Any dataf
public CompositeIndexingExecutionEngine(PluginsService pluginsService) {
try {
DataSourcePlugin plugin = pluginsService.filterPlugins(DataSourcePlugin.class).stream()
.findFirst()
.findAny()
.orElseThrow(() -> new IllegalArgumentException("dataformat [" + DataFormat.TEXT + "] is not registered."));
delegates.add(plugin.indexingEngine());
} catch (NullPointerException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public CompositeEngine(MapperService mapperService, PluginsService pluginsServic
for(org.opensearch.vectorized.execution.search.DataFormat dataFormat : searchEnginePlugin.getSupportedFormats()) {
SearchExecEngine<?,?,?,?> searchExecEngine = searchEnginePlugin.createEngine(dataFormat,
catalogSnapshot.getSearchableFiles(dataFormat.toString()));
readEngines.getOrDefault(dataFormat, new ArrayList<>()).add(searchExecEngine);
List<SearchExecEngine<?, ?, ?, ?>> readEngine = readEngines.getOrDefault(dataFormat, new ArrayList<>());
readEngine.add(searchExecEngine);
readEngines.put(dataFormat, readEngine);
// TODO : figure out how to do internal and external refresh listeners
// Maybe external refresh should be managed in opensearch core and plugins should always give
// internal refresh managers
Expand Down
Loading