Skip to content

Paimon implement read interfaces #1505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.lake.lakestorage.LakeStorage;
import com.alibaba.fluss.lake.paimon.source.PaimonLakeSource;
import com.alibaba.fluss.lake.paimon.source.PaimonSplit;
import com.alibaba.fluss.lake.paimon.tiering.PaimonCommittable;
import com.alibaba.fluss.lake.paimon.tiering.PaimonLakeTieringFactory;
import com.alibaba.fluss.lake.paimon.tiering.PaimonWriteResult;
Expand Down Expand Up @@ -46,7 +48,7 @@ public PaimonLakeCatalog createLakeCatalog() {
}

@Override
public LakeSource<?> createLakeSource(TablePath tablePath) {
throw new UnsupportedOperationException("Not implemented");
public LakeSource<PaimonSplit> createLakeSource(TablePath tablePath) {
return new PaimonLakeSource(paimonConfig, tablePath);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.lake.paimon.source;

import com.alibaba.fluss.row.TimestampLtz;
import com.alibaba.fluss.row.TimestampNtz;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

/** Adapter class for converting Fluss row to Paimon row. */
public class FlussRowAsPaimonRow implements InternalRow {

protected com.alibaba.fluss.row.InternalRow internalRow;
protected final RowType tableRowType;

public FlussRowAsPaimonRow(RowType tableTowType) {
this.tableRowType = tableTowType;
}

public FlussRowAsPaimonRow(
com.alibaba.fluss.row.InternalRow internalRow, RowType tableTowType) {
this.internalRow = internalRow;
this.tableRowType = tableTowType;
}

@Override
public int getFieldCount() {
return internalRow.getFieldCount();
}

@Override
public RowKind getRowKind() {
return RowKind.INSERT;
}

@Override
public void setRowKind(RowKind rowKind) {
// do nothing
}

@Override
public boolean isNullAt(int pos) {
return internalRow.isNullAt(pos);
}

@Override
public boolean getBoolean(int pos) {
return internalRow.getBoolean(pos);
}

@Override
public byte getByte(int pos) {
return internalRow.getByte(pos);
}

@Override
public short getShort(int pos) {
return internalRow.getShort(pos);
}

@Override
public int getInt(int pos) {
return internalRow.getInt(pos);
}

@Override
public long getLong(int pos) {
return internalRow.getLong(pos);
}

@Override
public float getFloat(int pos) {
return internalRow.getFloat(pos);
}

@Override
public double getDouble(int pos) {
return internalRow.getDouble(pos);
}

@Override
public BinaryString getString(int pos) {
return BinaryString.fromBytes(internalRow.getString(pos).toBytes());
}

@Override
public Decimal getDecimal(int pos, int precision, int scale) {
com.alibaba.fluss.row.Decimal flussDecimal = internalRow.getDecimal(pos, precision, scale);
if (flussDecimal.isCompact()) {
return Decimal.fromUnscaledLong(flussDecimal.toUnscaledLong(), precision, scale);
} else {
return Decimal.fromBigDecimal(flussDecimal.toBigDecimal(), precision, scale);
}
}

@Override
public Timestamp getTimestamp(int pos, int precision) {
DataType paimonTimestampType = tableRowType.getTypeAt(pos);
switch (paimonTimestampType.getTypeRoot()) {
case TIMESTAMP_WITHOUT_TIME_ZONE:
if (TimestampNtz.isCompact(precision)) {
return Timestamp.fromEpochMillis(
internalRow.getTimestampNtz(pos, precision).getMillisecond());
} else {
TimestampNtz timestampNtz = internalRow.getTimestampNtz(pos, precision);
return Timestamp.fromEpochMillis(
timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond());
}

case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
if (TimestampLtz.isCompact(precision)) {
return Timestamp.fromEpochMillis(
internalRow.getTimestampLtz(pos, precision).getEpochMillisecond());
} else {
TimestampLtz timestampLtz = internalRow.getTimestampLtz(pos, precision);
return Timestamp.fromEpochMillis(
timestampLtz.getEpochMillisecond(),
timestampLtz.getNanoOfMillisecond());
}
default:
throw new UnsupportedOperationException(
"Unsupported data type to get timestamp: " + paimonTimestampType);
}
}

@Override
public byte[] getBinary(int pos) {
return internalRow.getBytes(pos);
}

@Override
public Variant getVariant(int i) {
throw new UnsupportedOperationException(
"getVariant is not support for Fluss record currently.");
}

@Override
public InternalArray getArray(int pos) {
throw new UnsupportedOperationException(
"getArray is not support for Fluss record currently.");
}

@Override
public InternalMap getMap(int pos) {
throw new UnsupportedOperationException(
"getMap is not support for Fluss record currently.");
}

@Override
public InternalRow getRow(int pos, int pos1) {
throw new UnsupportedOperationException(
"getRow is not support for Fluss record currently.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.lake.paimon.source;

import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
import com.alibaba.fluss.lake.source.LakeSource;
import com.alibaba.fluss.lake.source.Planner;
import com.alibaba.fluss.lake.source.RecordReader;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.predicate.Predicate;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;

/**
* Paimon Lake format implementation of {@link com.alibaba.fluss.lake.source.LakeSource} for reading
* paimon table.
*/
public class PaimonLakeSource implements LakeSource<PaimonSplit> {

private final Configuration paimonConfig;
private final TablePath tablePath;

private @Nullable int[][] project;
private @Nullable org.apache.paimon.predicate.Predicate predicate;

public PaimonLakeSource(Configuration paimonConfig, TablePath tablePath) {
this.paimonConfig = paimonConfig;
this.tablePath = tablePath;
}

@Override
public void withProject(int[][] project) {
this.project = project;
}

@Override
public void withLimit(int limit) {
throw new UnsupportedOperationException("Not impl.");
}

@Override
public FilterPushDownResult withFilters(List<Predicate> predicates) {
return FilterPushDownResult.of(Collections.emptyList(), predicates);
}

@Override
public Planner<PaimonSplit> createPlanner(PlannerContext plannerContext) {
return new PaimonSplitPlanner(
paimonConfig, tablePath, predicate, plannerContext.snapshotId());
}

@Override
public RecordReader createRecordReader(ReaderContext<PaimonSplit> context) throws IOException {
try (Catalog catalog = getCatalog()) {
FileStoreTable fileStoreTable = getTable(catalog, tablePath);
if (fileStoreTable.primaryKeys().isEmpty()) {
return new PaimonRecordReader(
fileStoreTable, context.lakeSplit(), project, predicate);
} else {
return new PaimonSortedRecordReader(
fileStoreTable, context.lakeSplit(), project, predicate);
}
} catch (Exception e) {
throw new IOException("Fail to create record reader.", e);
}
}

@Override
public SimpleVersionedSerializer<PaimonSplit> getSplitSerializer() {
return new PaimonSplitSerializer();
}

private Catalog getCatalog() {
return CatalogFactory.createCatalog(
CatalogContext.create(Options.fromMap(paimonConfig.toMap())));
}

private FileStoreTable getTable(Catalog catalog, TablePath tablePath) throws Exception {
return (FileStoreTable) catalog.getTable(toPaimon(tablePath));
}
}
Loading