diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java index 03da696a0..f3eac859a 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java @@ -19,6 +19,8 @@ import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.lake.lakestorage.LakeStorage; +import com.alibaba.fluss.lake.paimon.source.PaimonLakeSource; +import com.alibaba.fluss.lake.paimon.source.PaimonSplit; import com.alibaba.fluss.lake.paimon.tiering.PaimonCommittable; import com.alibaba.fluss.lake.paimon.tiering.PaimonLakeTieringFactory; import com.alibaba.fluss.lake.paimon.tiering.PaimonWriteResult; @@ -46,7 +48,7 @@ public PaimonLakeCatalog createLakeCatalog() { } @Override - public LakeSource createLakeSource(TablePath tablePath) { - throw new UnsupportedOperationException("Not implemented"); + public LakeSource createLakeSource(TablePath tablePath) { + return new PaimonLakeSource(paimonConfig, tablePath); } } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRow.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRow.java new file mode 100644 index 000000000..ddcc5b8c3 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRow.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.source; + +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.data.variant.Variant; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; + +/** Adapter class for converting Fluss row to Paimon row. */ +public class FlussRowAsPaimonRow implements InternalRow { + + protected com.alibaba.fluss.row.InternalRow internalRow; + protected final RowType tableRowType; + + public FlussRowAsPaimonRow(RowType tableTowType) { + this.tableRowType = tableTowType; + } + + public FlussRowAsPaimonRow( + com.alibaba.fluss.row.InternalRow internalRow, RowType tableTowType) { + this.internalRow = internalRow; + this.tableRowType = tableTowType; + } + + @Override + public int getFieldCount() { + return internalRow.getFieldCount(); + } + + @Override + public RowKind getRowKind() { + return RowKind.INSERT; + } + + @Override + public void setRowKind(RowKind rowKind) { + // do nothing + } + + @Override + public boolean isNullAt(int pos) { + return internalRow.isNullAt(pos); + } + + @Override + public boolean getBoolean(int pos) { + return internalRow.getBoolean(pos); + } + + @Override + public byte getByte(int pos) { + return internalRow.getByte(pos); + } + + @Override + public short getShort(int pos) { + return internalRow.getShort(pos); + } + + @Override + public int getInt(int pos) { + return internalRow.getInt(pos); + } + + @Override + public long getLong(int pos) { + return internalRow.getLong(pos); + } + + @Override + public float getFloat(int pos) { + return internalRow.getFloat(pos); + } + + @Override + public double getDouble(int pos) { + return internalRow.getDouble(pos); + } + + @Override + public BinaryString getString(int pos) { + return BinaryString.fromBytes(internalRow.getString(pos).toBytes()); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + com.alibaba.fluss.row.Decimal flussDecimal = internalRow.getDecimal(pos, precision, scale); + if (flussDecimal.isCompact()) { + return Decimal.fromUnscaledLong(flussDecimal.toUnscaledLong(), precision, scale); + } else { + return Decimal.fromBigDecimal(flussDecimal.toBigDecimal(), precision, scale); + } + } + + @Override + public Timestamp getTimestamp(int pos, int precision) { + DataType paimonTimestampType = tableRowType.getTypeAt(pos); + switch (paimonTimestampType.getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + if (TimestampNtz.isCompact(precision)) { + return Timestamp.fromEpochMillis( + internalRow.getTimestampNtz(pos, precision).getMillisecond()); + } else { + TimestampNtz timestampNtz = internalRow.getTimestampNtz(pos, precision); + return Timestamp.fromEpochMillis( + timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond()); + } + + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (TimestampLtz.isCompact(precision)) { + return Timestamp.fromEpochMillis( + internalRow.getTimestampLtz(pos, precision).getEpochMillisecond()); + } else { + TimestampLtz timestampLtz = internalRow.getTimestampLtz(pos, precision); + return Timestamp.fromEpochMillis( + timestampLtz.getEpochMillisecond(), + timestampLtz.getNanoOfMillisecond()); + } + default: + throw new UnsupportedOperationException( + "Unsupported data type to get timestamp: " + paimonTimestampType); + } + } + + @Override + public byte[] getBinary(int pos) { + return internalRow.getBytes(pos); + } + + @Override + public Variant getVariant(int i) { + throw new UnsupportedOperationException( + "getVariant is not support for Fluss record currently."); + } + + @Override + public InternalArray getArray(int pos) { + throw new UnsupportedOperationException( + "getArray is not support for Fluss record currently."); + } + + @Override + public InternalMap getMap(int pos) { + throw new UnsupportedOperationException( + "getMap is not support for Fluss record currently."); + } + + @Override + public InternalRow getRow(int pos, int pos1) { + throw new UnsupportedOperationException( + "getRow is not support for Fluss record currently."); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java new file mode 100644 index 000000000..a41f5a2fc --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.source; + +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.Planner; +import com.alibaba.fluss.lake.source.RecordReader; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.predicate.Predicate; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon; + +/** + * Paimon Lake format implementation of {@link com.alibaba.fluss.lake.source.LakeSource} for reading + * paimon table. + */ +public class PaimonLakeSource implements LakeSource { + + private final Configuration paimonConfig; + private final TablePath tablePath; + + private @Nullable int[][] project; + private @Nullable org.apache.paimon.predicate.Predicate predicate; + + public PaimonLakeSource(Configuration paimonConfig, TablePath tablePath) { + this.paimonConfig = paimonConfig; + this.tablePath = tablePath; + } + + @Override + public void withProject(int[][] project) { + this.project = project; + } + + @Override + public void withLimit(int limit) { + throw new UnsupportedOperationException("Not impl."); + } + + @Override + public FilterPushDownResult withFilters(List predicates) { + return FilterPushDownResult.of(Collections.emptyList(), predicates); + } + + @Override + public Planner createPlanner(PlannerContext plannerContext) { + return new PaimonSplitPlanner( + paimonConfig, tablePath, predicate, plannerContext.snapshotId()); + } + + @Override + public RecordReader createRecordReader(ReaderContext context) throws IOException { + try (Catalog catalog = getCatalog()) { + FileStoreTable fileStoreTable = getTable(catalog, tablePath); + if (fileStoreTable.primaryKeys().isEmpty()) { + return new PaimonRecordReader( + fileStoreTable, context.lakeSplit(), project, predicate); + } else { + return new PaimonSortedRecordReader( + fileStoreTable, context.lakeSplit(), project, predicate); + } + } catch (Exception e) { + throw new IOException("Fail to create record reader.", e); + } + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return new PaimonSplitSerializer(); + } + + private Catalog getCatalog() { + return CatalogFactory.createCatalog( + CatalogContext.create(Options.fromMap(paimonConfig.toMap()))); + } + + private FileStoreTable getTable(Catalog catalog, TablePath tablePath) throws Exception { + return (FileStoreTable) catalog.getTable(toPaimon(tablePath)); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java new file mode 100644 index 000000000..1ace3dfcd --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.source; + +import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow; +import com.alibaba.fluss.lake.source.RecordReader; +import com.alibaba.fluss.record.ChangeType; +import com.alibaba.fluss.record.GenericRecord; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.ProjectedRow; +import com.alibaba.fluss.utils.CloseableIterator; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.stream.IntStream; + +import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toChangeType; +import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; +import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; + +/** Record reader for paimon table. */ +public class PaimonRecordReader implements RecordReader { + + protected PaimonRowAsFlussRecordIterator iterator; + protected @Nullable int[][] project; + protected @Nullable Predicate predicate; + protected RowType paimonRowType; + + public PaimonRecordReader( + FileStoreTable fileStoreTable, + PaimonSplit split, + @Nullable int[][] project, + @Nullable Predicate predicate) + throws IOException { + ReadBuilder readBuilder = fileStoreTable.newReadBuilder(); + RowType paimonFullRowType = fileStoreTable.rowType(); + if (project != null) { + readBuilder = applyProject(readBuilder, project, paimonFullRowType); + } + + if (predicate != null) { + readBuilder.withFilter(predicate); + } + + TableRead tableRead = readBuilder.newRead(); + paimonRowType = readBuilder.readType(); + + org.apache.paimon.reader.RecordReader recordReader = + tableRead.createReader(split.dataSplit()); + iterator = + new PaimonRecordReader.PaimonRowAsFlussRecordIterator( + recordReader.toCloseableIterator(), paimonRowType); + } + + @Override + public CloseableIterator read() throws IOException { + return iterator; + } + + private ReadBuilder applyProject( + ReadBuilder readBuilder, int[][] projects, RowType paimonFullRowType) { + int[] projectIds = Arrays.stream(projects).mapToInt(project -> project[0]).toArray(); + + int offsetFieldPos = paimonFullRowType.getFieldIndex(OFFSET_COLUMN_NAME); + int timestampFieldPos = paimonFullRowType.getFieldIndex(TIMESTAMP_COLUMN_NAME); + + int[] paimonProject = + IntStream.concat( + IntStream.of(projectIds), + IntStream.of(offsetFieldPos, timestampFieldPos)) + .toArray(); + + return readBuilder.withProjection(paimonProject); + } + + /** Iterator for paimon row as fluss record. */ + public static class PaimonRowAsFlussRecordIterator implements CloseableIterator { + + private final org.apache.paimon.utils.CloseableIterator paimonRowIterator; + + private final ProjectedRow projectedRow; + private final PaimonRowAsFlussRow paimonRowAsFlussRow; + + private final int logOffsetColIndex; + private final int timestampColIndex; + + public PaimonRowAsFlussRecordIterator( + org.apache.paimon.utils.CloseableIterator paimonRowIterator, + RowType paimonRowType) { + this.paimonRowIterator = paimonRowIterator; + this.logOffsetColIndex = paimonRowType.getFieldIndex(OFFSET_COLUMN_NAME); + this.timestampColIndex = paimonRowType.getFieldIndex(TIMESTAMP_COLUMN_NAME); + + int[] project = IntStream.range(0, paimonRowType.getFieldCount() - 2).toArray(); + projectedRow = ProjectedRow.from(project); + paimonRowAsFlussRow = new PaimonRowAsFlussRow(); + } + + @Override + public void close() { + try { + paimonRowIterator.close(); + } catch (Exception e) { + throw new RuntimeException("Fail to close iterator.", e); + } + } + + @Override + public boolean hasNext() { + return paimonRowIterator.hasNext(); + } + + @Override + public LogRecord next() { + InternalRow paimonRow = paimonRowIterator.next(); + ChangeType changeType = toChangeType(paimonRow.getRowKind()); + long offset = paimonRow.getLong(logOffsetColIndex); + long timestamp = paimonRow.getTimestamp(timestampColIndex, 6).getMillisecond(); + + return new GenericRecord( + offset, + timestamp, + changeType, + projectedRow.replaceRow(paimonRowAsFlussRow.replaceRow(paimonRow))); + } + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReader.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReader.java new file mode 100644 index 000000000..281695105 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReader.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.source; + +import com.alibaba.fluss.lake.source.SortedRecordReader; +import com.alibaba.fluss.row.InternalRow; + +import org.apache.paimon.KeyValueFileStore; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Comparator; + +/** Sorted record reader for primary key paimon table. */ +public class PaimonSortedRecordReader extends PaimonRecordReader implements SortedRecordReader { + + Comparator comparator; + + public PaimonSortedRecordReader( + FileStoreTable fileStoreTable, + PaimonSplit split, + @Nullable int[][] project, + @Nullable Predicate predicate) + throws IOException { + super(fileStoreTable, split, project, predicate); + this.comparator = + toFlussRowComparator( + paimonRowType, + ((KeyValueFileStore) fileStoreTable.store()).newKeyComparator()); + } + + @Override + public Comparator order() { + return comparator; + } + + private Comparator toFlussRowComparator( + RowType rowType, Comparator paimonRowcomparator) { + return (row1, row2) -> + paimonRowcomparator.compare( + new FlussRowAsPaimonRow(row1, rowType), + new FlussRowAsPaimonRow(row2, rowType)); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplit.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplit.java new file mode 100644 index 000000000..b28f8b61c --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplit.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.source; + +import com.alibaba.fluss.lake.source.LakeSplit; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.table.source.DataSplit; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** Split for paimon table. */ +public class PaimonSplit implements LakeSplit { + + private final DataSplit dataSplit; + + public PaimonSplit(DataSplit dataSplit) { + this.dataSplit = dataSplit; + } + + @Override + public int bucket() { + return dataSplit.bucket(); + } + + @Override + public List partition() { + BinaryRow partition = dataSplit.partition(); + if (partition.getFieldCount() == 0) { + return Collections.emptyList(); + } + + List partitions = new ArrayList<>(); + for (int i = 0; i < partition.getFieldCount(); i++) { + // Todo Currently, partition column must be String datatype, so we can always use + // consider it as string. Revisit here when + // #489 is finished. + partitions.add(partition.getString(i).toString()); + } + return partitions; + } + + public DataSplit dataSplit() { + return dataSplit; + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java new file mode 100644 index 000000000..e481d8102 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.source; + +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.lake.source.Planner; +import com.alibaba.fluss.metadata.TablePath; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.InnerTableScan; +import org.apache.paimon.table.source.Split; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon; + +/** Split panner for paimon table. */ +public class PaimonSplitPlanner implements Planner { + + private final Configuration paimonConfig; + private final TablePath tablePath; + private final @Nullable Predicate predicate; + private final long snapshotId; + + public PaimonSplitPlanner( + Configuration paimonConfig, + TablePath tablePath, + @Nullable Predicate predicate, + long snapshotId) { + this.paimonConfig = paimonConfig; + this.tablePath = tablePath; + this.predicate = predicate; + this.snapshotId = snapshotId; + } + + @Override + public List plan() { + try { + List splits = new ArrayList<>(); + try (Catalog catalog = getCatalog()) { + FileStoreTable fileStoreTable = getTable(catalog, tablePath, snapshotId); + // TODO: support filter .withFilter(predicate) + InnerTableScan tableScan = fileStoreTable.newScan(); + for (Split split : tableScan.plan().splits()) { + DataSplit dataSplit = (DataSplit) split; + splits.add(new PaimonSplit(dataSplit)); + } + } + return splits; + } catch (Exception e) { + throw new RuntimeException("Failed to plan splits for paimon.", e); + } + } + + private Catalog getCatalog() { + return CatalogFactory.createCatalog( + CatalogContext.create(Options.fromMap(paimonConfig.toMap()))); + } + + private FileStoreTable getTable(Catalog catalog, TablePath tablePath, long snapshotId) + throws Exception { + return (FileStoreTable) + catalog.getTable(toPaimon(tablePath)) + .copy( + Collections.singletonMap( + CoreOptions.SCAN_SNAPSHOT_ID.key(), + String.valueOf(snapshotId))); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializer.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializer.java new file mode 100644 index 000000000..112a8a3f2 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializer.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.source; + +import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer; + +import org.apache.paimon.io.DataOutputViewStreamWrapper; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.utils.InstantiationUtil; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +/** Serializer for paimon split. */ +public class PaimonSplitSerializer implements SimpleVersionedSerializer { + + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(PaimonSplit paimonSplit) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); + DataSplit dataSplit = paimonSplit.dataSplit(); + InstantiationUtil.serializeObject(view, dataSplit); + return out.toByteArray(); + } + + @Override + public PaimonSplit deserialize(int version, byte[] serialized) throws IOException { + ByteArrayInputStream in = new ByteArrayInputStream(serialized); + DataSplit dataSplit; + try { + dataSplit = InstantiationUtil.deserializeObject(in, getClass().getClassLoader()); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + return new PaimonSplit(dataSplit); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java index 97d543148..5edeb482a 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java @@ -17,18 +17,11 @@ package com.alibaba.fluss.lake.paimon.tiering; +import com.alibaba.fluss.lake.paimon.source.FlussRowAsPaimonRow; import com.alibaba.fluss.record.LogRecord; -import com.alibaba.fluss.row.TimestampLtz; -import com.alibaba.fluss.row.TimestampNtz; -import org.apache.paimon.data.BinaryString; -import org.apache.paimon.data.Decimal; -import org.apache.paimon.data.InternalArray; -import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; -import org.apache.paimon.data.variant.Variant; -import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -36,19 +29,17 @@ import static com.alibaba.fluss.utils.Preconditions.checkState; /** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */ -public class FlussRecordAsPaimonRow implements InternalRow { +public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow { // Lake table for paimon will append three system columns: __bucket, __offset,__timestamp private static final int LAKE_PAIMON_SYSTEM_COLUMNS = 3; - private final RowType tableTowType; private final int bucket; private LogRecord logRecord; private int originRowFieldCount; - private com.alibaba.fluss.row.InternalRow internalRow; public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) { + super(tableTowType); this.bucket = bucket; - this.tableTowType = tableTowType; } public void setFlussRecord(LogRecord logRecord) { @@ -56,7 +47,7 @@ public void setFlussRecord(LogRecord logRecord) { this.internalRow = logRecord.getRow(); this.originRowFieldCount = internalRow.getFieldCount(); checkState( - originRowFieldCount == tableTowType.getFieldCount() - LAKE_PAIMON_SYSTEM_COLUMNS, + originRowFieldCount == tableRowType.getFieldCount() - LAKE_PAIMON_SYSTEM_COLUMNS, "The paimon table fields count must equals to LogRecord's fields count."); } @@ -73,42 +64,22 @@ public RowKind getRowKind() { return toRowKind(logRecord.getChangeType()); } - @Override - public void setRowKind(RowKind rowKind) { - // do nothing - } - @Override public boolean isNullAt(int pos) { if (pos < originRowFieldCount) { - return internalRow.isNullAt(pos); + return super.isNullAt(pos); } // is the last three system fields: bucket, offset, timestamp which are never null return false; } - @Override - public boolean getBoolean(int pos) { - return internalRow.getBoolean(pos); - } - - @Override - public byte getByte(int pos) { - return internalRow.getByte(pos); - } - - @Override - public short getShort(int pos) { - return internalRow.getShort(pos); - } - @Override public int getInt(int pos) { if (pos == originRowFieldCount) { // bucket system column return bucket; } - return internalRow.getInt(pos); + return super.getInt(pos); } @Override @@ -121,32 +92,7 @@ public long getLong(int pos) { return logRecord.timestamp(); } // the origin RowData - return internalRow.getLong(pos); - } - - @Override - public float getFloat(int pos) { - return internalRow.getFloat(pos); - } - - @Override - public double getDouble(int pos) { - return internalRow.getDouble(pos); - } - - @Override - public BinaryString getString(int pos) { - return BinaryString.fromBytes(internalRow.getString(pos).toBytes()); - } - - @Override - public Decimal getDecimal(int pos, int precision, int scale) { - com.alibaba.fluss.row.Decimal flussDecimal = internalRow.getDecimal(pos, precision, scale); - if (flussDecimal.isCompact()) { - return Decimal.fromUnscaledLong(flussDecimal.toUnscaledLong(), precision, scale); - } else { - return Decimal.fromBigDecimal(flussDecimal.toBigDecimal(), precision, scale); - } + return super.getLong(pos); } @Override @@ -155,62 +101,6 @@ public Timestamp getTimestamp(int pos, int precision) { if (pos == originRowFieldCount + 2) { return Timestamp.fromEpochMillis(logRecord.timestamp()); } - - DataType paimonTimestampType = tableTowType.getTypeAt(pos); - - switch (paimonTimestampType.getTypeRoot()) { - case TIMESTAMP_WITHOUT_TIME_ZONE: - if (TimestampNtz.isCompact(precision)) { - return Timestamp.fromEpochMillis( - internalRow.getTimestampNtz(pos, precision).getMillisecond()); - } else { - TimestampNtz timestampNtz = internalRow.getTimestampNtz(pos, precision); - return Timestamp.fromEpochMillis( - timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond()); - } - - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - if (TimestampLtz.isCompact(precision)) { - return Timestamp.fromEpochMillis( - internalRow.getTimestampLtz(pos, precision).getEpochMillisecond()); - } else { - TimestampLtz timestampLtz = internalRow.getTimestampLtz(pos, precision); - return Timestamp.fromEpochMillis( - timestampLtz.getEpochMillisecond(), - timestampLtz.getNanoOfMillisecond()); - } - default: - throw new UnsupportedOperationException( - "Unsupported data type to get timestamp: " + paimonTimestampType); - } - } - - @Override - public byte[] getBinary(int pos) { - return internalRow.getBytes(pos); - } - - @Override - public Variant getVariant(int pos) { - throw new UnsupportedOperationException( - "getVariant is not support for Fluss record currently."); - } - - @Override - public InternalArray getArray(int pos) { - throw new UnsupportedOperationException( - "getArray is not support for Fluss record currently."); - } - - @Override - public InternalMap getMap(int pos) { - throw new UnsupportedOperationException( - "getMap is not support for Fluss record currently."); - } - - @Override - public InternalRow getRow(int pos, int pos1) { - throw new UnsupportedOperationException( - "getRow is not support for Fluss record currently."); + return super.getTimestamp(pos, precision); } } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java index 437cf1913..9d51e787b 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java @@ -50,6 +50,21 @@ public static RowKind toRowKind(ChangeType changeType) { } } + public static ChangeType toChangeType(RowKind rowKind) { + switch (rowKind) { + case INSERT: + return ChangeType.INSERT; + case UPDATE_BEFORE: + return ChangeType.UPDATE_BEFORE; + case UPDATE_AFTER: + return ChangeType.UPDATE_AFTER; + case DELETE: + return ChangeType.DELETE; + default: + throw new IllegalArgumentException("Unsupported rowKind: " + rowKind); + } + } + public static Identifier toPaimon(TablePath tablePath) { return Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName()); } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java new file mode 100644 index 000000000..3fc625501 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.utils; + +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; + +import org.apache.paimon.data.Timestamp; + +/** Adapter for paimon row as fluss row. */ +public class PaimonRowAsFlussRow implements InternalRow { + + private org.apache.paimon.data.InternalRow paimonRow; + + public PaimonRowAsFlussRow() {} + + public PaimonRowAsFlussRow(org.apache.paimon.data.InternalRow paimonRow) { + this.paimonRow = paimonRow; + } + + public PaimonRowAsFlussRow replaceRow(org.apache.paimon.data.InternalRow paimonRow) { + this.paimonRow = paimonRow; + return this; + } + + @Override + public int getFieldCount() { + return paimonRow.getFieldCount(); + } + + @Override + public boolean isNullAt(int pos) { + return paimonRow.isNullAt(pos); + } + + @Override + public boolean getBoolean(int pos) { + return paimonRow.getBoolean(pos); + } + + @Override + public byte getByte(int pos) { + return paimonRow.getByte(pos); + } + + @Override + public short getShort(int pos) { + return paimonRow.getShort(pos); + } + + @Override + public int getInt(int pos) { + return paimonRow.getInt(pos); + } + + @Override + public long getLong(int pos) { + return paimonRow.getLong(pos); + } + + @Override + public float getFloat(int pos) { + return paimonRow.getFloat(pos); + } + + @Override + public double getDouble(int pos) { + return paimonRow.getDouble(pos); + } + + @Override + public BinaryString getChar(int pos, int length) { + return BinaryString.fromBytes(paimonRow.getString(pos).toBytes()); + } + + @Override + public BinaryString getString(int pos) { + return BinaryString.fromBytes(paimonRow.getString(pos).toBytes()); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + org.apache.paimon.data.Decimal paimonDecimal = paimonRow.getDecimal(pos, precision, scale); + if (paimonDecimal.isCompact()) { + return Decimal.fromUnscaledLong(paimonDecimal.toUnscaledLong(), precision, scale); + } else { + return Decimal.fromBigDecimal(paimonDecimal.toBigDecimal(), precision, scale); + } + } + + @Override + public TimestampNtz getTimestampNtz(int pos, int precision) { + Timestamp timestamp = paimonRow.getTimestamp(pos, precision); + if (TimestampNtz.isCompact(precision)) { + return TimestampNtz.fromMillis(timestamp.getMillisecond()); + } else { + return TimestampNtz.fromMillis( + timestamp.getMillisecond(), timestamp.getNanoOfMillisecond()); + } + } + + @Override + public TimestampLtz getTimestampLtz(int pos, int precision) { + Timestamp timestamp = paimonRow.getTimestamp(pos, precision); + if (TimestampLtz.isCompact(precision)) { + return TimestampLtz.fromEpochMillis(timestamp.getMillisecond()); + } else { + return TimestampLtz.fromEpochMillis( + timestamp.getMillisecond(), timestamp.getNanoOfMillisecond()); + } + } + + @Override + public byte[] getBinary(int pos, int length) { + return paimonRow.getBinary(pos); + } + + @Override + public byte[] getBytes(int pos) { + return paimonRow.getBinary(pos); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java index 609bf68ac..3231fe644 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java @@ -27,7 +27,8 @@ import static com.alibaba.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; -class FlinkUnionReadTestBase extends FlinkPaimonTieringTestBase { +/** Base class for Flink union read test. */ +public class FlinkUnionReadTestBase extends FlinkPaimonTieringTestBase { protected static final int DEFAULT_BUCKET_NUM = 1; StreamTableEnvironment batchTEnv; diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java new file mode 100644 index 000000000..b20e1a5f2 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.alibaba.fluss.lake.paimon.source; + +import com.alibaba.fluss.record.GenericRecord; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.BinaryString; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.GenericRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; + +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; + +import static com.alibaba.fluss.record.ChangeType.APPEND_ONLY; +import static com.alibaba.fluss.record.ChangeType.DELETE; +import static com.alibaba.fluss.record.ChangeType.INSERT; +import static com.alibaba.fluss.record.ChangeType.UPDATE_AFTER; +import static com.alibaba.fluss.record.ChangeType.UPDATE_BEFORE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test case for {@link FlussRowAsPaimonRow}. */ +class FlussRowAsPaimonRowTest { + @Test + void testLogTableRecordAllTypes() { + // Construct a FlussRowAsPaimonRow instance + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.BooleanType(), + new org.apache.paimon.types.TinyIntType(), + new org.apache.paimon.types.SmallIntType(), + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BigIntType(), + new org.apache.paimon.types.FloatType(), + new org.apache.paimon.types.DoubleType(), + new org.apache.paimon.types.VarCharType(), + new org.apache.paimon.types.DecimalType(5, 2), + new org.apache.paimon.types.DecimalType(20, 0), + new org.apache.paimon.types.LocalZonedTimestampType(6), + new org.apache.paimon.types.TimestampType(6), + new org.apache.paimon.types.BinaryType(), + new org.apache.paimon.types.VarCharType()); + + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(14); + genericRow.setField(0, true); + genericRow.setField(1, (byte) 1); + genericRow.setField(2, (short) 2); + genericRow.setField(3, 3); + genericRow.setField(4, 4L); + genericRow.setField(5, 5.1f); + genericRow.setField(6, 6.0d); + genericRow.setField(7, BinaryString.fromString("string")); + genericRow.setField(8, Decimal.fromUnscaledLong(9, 5, 2)); + genericRow.setField(9, Decimal.fromBigDecimal(new BigDecimal(10), 20, 0)); + genericRow.setField(10, TimestampLtz.fromEpochMillis(1698235273182L, 5678)); + genericRow.setField(11, TimestampNtz.fromMillis(1698235273182L, 5678)); + genericRow.setField(12, new byte[] {1, 2, 3, 4}); + genericRow.setField(13, null); + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow); + FlussRowAsPaimonRow flussRowAsPaimonRow = + new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); + + // verify FlussRecordAsPaimonRow normal columns + assertThat(flussRowAsPaimonRow.getBoolean(0)).isTrue(); + assertThat(flussRowAsPaimonRow.getByte(1)).isEqualTo((byte) 1); + assertThat(flussRowAsPaimonRow.getShort(2)).isEqualTo((short) 2); + assertThat(flussRowAsPaimonRow.getInt(3)).isEqualTo(3); + assertThat(flussRowAsPaimonRow.getLong(4)).isEqualTo(4L); + assertThat(flussRowAsPaimonRow.getFloat(5)).isEqualTo(5.1f); + assertThat(flussRowAsPaimonRow.getDouble(6)).isEqualTo(6.0d); + assertThat(flussRowAsPaimonRow.getString(7).toString()).isEqualTo("string"); + assertThat(flussRowAsPaimonRow.getDecimal(8, 5, 2).toBigDecimal()) + .isEqualTo(new BigDecimal("0.09")); + assertThat(flussRowAsPaimonRow.getDecimal(9, 20, 0).toBigDecimal()) + .isEqualTo(new BigDecimal(10)); + assertThat(flussRowAsPaimonRow.getTimestamp(10, 6).getMillisecond()) + .isEqualTo(1698235273182L); + assertThat(flussRowAsPaimonRow.getTimestamp(10, 6).getNanoOfMillisecond()).isEqualTo(5678); + assertThat(flussRowAsPaimonRow.getTimestamp(11, 6).getMillisecond()) + .isEqualTo(1698235273182L); + assertThat(flussRowAsPaimonRow.getTimestamp(11, 6).getNanoOfMillisecond()).isEqualTo(5678); + assertThat(flussRowAsPaimonRow.getBinary(12)).isEqualTo(new byte[] {1, 2, 3, 4}); + assertThat(flussRowAsPaimonRow.isNullAt(13)).isTrue(); + } + + @Test + void testPrimaryKeyTableRecord() { + RowType tableRowType = + RowType.of( + new org.apache.paimon.types.IntType(), + new org.apache.paimon.types.BooleanType()); + + long logOffset = 0; + long timeStamp = System.currentTimeMillis(); + GenericRow genericRow = new GenericRow(2); + genericRow.setField(0, 10); + genericRow.setField(1, true); + + LogRecord logRecord = new GenericRecord(logOffset, timeStamp, INSERT, genericRow); + FlussRowAsPaimonRow flussRowAsPaimonRow = + new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType); + + assertThat(flussRowAsPaimonRow.getInt(0)).isEqualTo(10); + // verify rowkind + assertThat(flussRowAsPaimonRow.getRowKind()).isEqualTo(RowKind.INSERT); + + logRecord = new GenericRecord(logOffset, timeStamp, UPDATE_BEFORE, genericRow); + assertThat(new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType).getRowKind()) + .isEqualTo(RowKind.INSERT); + + logRecord = new GenericRecord(logOffset, timeStamp, UPDATE_AFTER, genericRow); + assertThat(new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType).getRowKind()) + .isEqualTo(RowKind.INSERT); + + logRecord = new GenericRecord(logOffset, timeStamp, DELETE, genericRow); + assertThat(new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType).getRowKind()) + .isEqualTo(RowKind.INSERT); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReaderTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReaderTest.java new file mode 100644 index 000000000..eb9b7a1ca --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReaderTest.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.source; + +import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow; +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.RecordReader; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.types.BigIntType; +import com.alibaba.fluss.types.BinaryType; +import com.alibaba.fluss.types.BooleanType; +import com.alibaba.fluss.types.DecimalType; +import com.alibaba.fluss.types.DoubleType; +import com.alibaba.fluss.types.FloatType; +import com.alibaba.fluss.types.IntType; +import com.alibaba.fluss.types.LocalZonedTimestampType; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.types.SmallIntType; +import com.alibaba.fluss.types.StringType; +import com.alibaba.fluss.types.TimestampType; +import com.alibaba.fluss.types.TinyIntType; +import com.alibaba.fluss.utils.CloseableIterator; + +import org.apache.flink.types.Row; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.types.DataTypes; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static com.alibaba.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test case for {@link PaimonRecordReader}. */ +class PaimonRecordReaderTest extends PaimonSourceTestBase { + @BeforeAll + protected static void beforeAll() { + PaimonSourceTestBase.beforeAll(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testReadLogTable(boolean isPartitioned) throws Exception { + // first of all, create table and prepare data + String tableName = "logTable_" + (isPartitioned ? "partitioned" : "non_partitioned"); + + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + + List writtenRows = new ArrayList<>(); + prepareLogTable(tablePath, isPartitioned, DEFAULT_BUCKET_NUM, writtenRows); + + LakeSource lakeSource = lakeStorage.createLakeSource(tablePath); + Table table = getTable(tablePath); + Snapshot snapshot = table.latestSnapshot().get(); + List paimonSplits = lakeSource.createPlanner(snapshot::id).plan(); + + List splits = ((FileStoreTable) table).newScan().plan().splits(); + assertThat(splits).hasSize(paimonSplits.size()); + assertThat(splits) + .isEqualTo( + paimonSplits.stream() + .map(PaimonSplit::dataSplit) + .collect(Collectors.toList())); + + List actual = new ArrayList<>(); + + InternalRow.FieldGetter[] fieldGetters = + InternalRow.createFieldGetters(getFlussRowType(isPartitioned)); + for (PaimonSplit paimonSplit : paimonSplits) { + RecordReader recordReader = lakeSource.createRecordReader(() -> paimonSplit); + CloseableIterator iterator = recordReader.read(); + actual.addAll( + convertToFlinkRow( + fieldGetters, + TransformingCloseableIterator.transform(iterator, LogRecord::getRow))); + iterator.close(); + } + List expectRows = + convertToFlinkRow(fieldGetters, CloseableIterator.wrap(writtenRows.iterator())); + assertThat(actual).containsExactlyInAnyOrderElementsOf(expectRows); + } + + @Test + void testReadLogTableWithProject() throws Exception { + // first of all, create table and prepare data + String tableName = "logTable_non_partitioned"; + + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + + List writtenRows = new ArrayList<>(); + prepareLogTable(tablePath, false, DEFAULT_BUCKET_NUM, writtenRows); + + LakeSource lakeSource = lakeStorage.createLakeSource(tablePath); + lakeSource.withProject(new int[][] {new int[] {5}, new int[] {1}, new int[] {3}}); + Table table = getTable(tablePath); + Snapshot snapshot = table.latestSnapshot().get(); + List paimonSplits = lakeSource.createPlanner(snapshot::id).plan(); + + List actual = new ArrayList<>(); + + InternalRow.FieldGetter[] fieldGetters = + InternalRow.createFieldGetters( + RowType.of(new FloatType(), new TinyIntType(), new IntType())); + for (PaimonSplit paimonSplit : paimonSplits) { + RecordReader recordReader = lakeSource.createRecordReader(() -> paimonSplit); + CloseableIterator iterator = recordReader.read(); + actual.addAll( + convertToFlinkRow( + fieldGetters, + TransformingCloseableIterator.transform(iterator, LogRecord::getRow))); + iterator.close(); + } + List expectRows = new ArrayList<>(); + ReadBuilder readBuilder = table.newReadBuilder().withProjection(new int[] {5, 1, 3}); + List splits = readBuilder.newScan().plan().splits(); + try (org.apache.paimon.reader.RecordReader + recordReader = readBuilder.newRead().createReader(splits)) { + org.apache.paimon.utils.CloseableIterator + closeableIterator = recordReader.toCloseableIterator(); + PaimonRowAsFlussRow paimonRowAsFlussRow = new PaimonRowAsFlussRow(); + expectRows.addAll( + convertToFlinkRow( + fieldGetters, + TransformingCloseableIterator.transform( + CloseableIterator.wrap(closeableIterator), + paimonRowAsFlussRow::replaceRow))); + } + assertThat(actual).containsExactlyInAnyOrderElementsOf(expectRows); + } + + private RowType getFlussRowType(boolean isPartitioned) { + return isPartitioned + ? RowType.of( + new BooleanType(), + new TinyIntType(), + new SmallIntType(), + new IntType(), + new BigIntType(), + new FloatType(), + new DoubleType(), + new StringType(), + new DecimalType(5, 2), + new DecimalType(20, 0), + new LocalZonedTimestampType(3), + new LocalZonedTimestampType(6), + new TimestampType(3), + new TimestampType(6), + new BinaryType(4), + new StringType()) + : RowType.of( + new BooleanType(), + new TinyIntType(), + new SmallIntType(), + new IntType(), + new BigIntType(), + new FloatType(), + new DoubleType(), + new StringType(), + new DecimalType(5, 2), + new DecimalType(20, 0), + new LocalZonedTimestampType(3), + new LocalZonedTimestampType(6), + new TimestampType(3), + new TimestampType(6), + new BinaryType(4)); + } + + private void prepareLogTable( + TablePath tablePath, boolean isPartitioned, int bucketNum, List rows) + throws Exception { + createFullTypeLogTable(tablePath, isPartitioned, bucketNum); + rows.addAll(writeFullTypeRows(tablePath, 10, isPartitioned ? "test" : null)); + } + + private void createFullTypeLogTable(TablePath tablePath, boolean isPartitioned, int bucketNum) + throws Exception { + Schema.Builder schemaBuilder = + Schema.newBuilder() + .column("f_boolean", DataTypes.BOOLEAN()) + .column("f_byte", DataTypes.TINYINT()) + .column("f_short", DataTypes.SMALLINT()) + .column("f_int", DataTypes.INT()) + .column("f_long", DataTypes.BIGINT()) + .column("f_float", DataTypes.FLOAT()) + .column("f_double", DataTypes.DOUBLE()) + .column("f_string", DataTypes.STRING()) + .column("f_decimal1", DataTypes.DECIMAL(5, 2)) + .column("f_decimal2", DataTypes.DECIMAL(20, 0)) + .column("f_timestamp_ltz1", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) + .column("f_timestamp_ltz2", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)) + .column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3)) + .column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6)) + .column("f_binary", DataTypes.BINARY(4)); + + if (isPartitioned) { + schemaBuilder.column("p", DataTypes.STRING()); + schemaBuilder.partitionKeys("p"); + schemaBuilder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum)); + schemaBuilder.option(CoreOptions.BUCKET_KEY.key(), "f_long"); + } + schemaBuilder + .column("__bucket", DataTypes.INT()) + .column("__offset", DataTypes.BIGINT()) + .column("__timestamp", DataTypes.TIMESTAMP(6)); + createTable(tablePath, schemaBuilder.build()); + } + + private List writeFullTypeRows( + TablePath tablePath, int rowCount, @Nullable String partition) throws Exception { + List rows = new ArrayList<>(); + List flussRows = new ArrayList<>(); + Table table = getTable(tablePath); + + for (int i = 0; i < rowCount; i++) { + if (partition == null) { + com.alibaba.fluss.row.GenericRow row = + row( + true, + (byte) 100, + (short) 200, + 30, + i + 400L, + 500.1f, + 600.0d, + com.alibaba.fluss.row.BinaryString.fromString( + "another_string_" + i), + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + 0, + (long) i, + TimestampNtz.fromMillis(System.currentTimeMillis())); + rows.add(new FlussRowAsPaimonRow(row, table.rowType())); + flussRows.add(row); + } else { + com.alibaba.fluss.row.GenericRow row = + row( + true, + (byte) 100, + (short) 200, + 30, + i + 400L, + 500.1f, + 600.0d, + com.alibaba.fluss.row.BinaryString.fromString( + "another_string_" + i), + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + com.alibaba.fluss.row.BinaryString.fromString(partition), + 0, + (long) i, + TimestampNtz.fromMillis(System.currentTimeMillis())); + rows.add(new FlussRowAsPaimonRow(row, table.rowType())); + flussRows.add(row); + } + } + writeRecord(tablePath, rows); + return flussRows; + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java new file mode 100644 index 000000000..6efc8706c --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.alibaba.fluss.lake.paimon.source; + +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.lake.source.RecordReader; +import com.alibaba.fluss.lake.source.SortedRecordReader; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.record.LogRecord; +import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.utils.CloseableIterator; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataTypes; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import static com.alibaba.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test case for {@link PaimonSortedRecordReader}. */ +class PaimonSortedRecordReaderTest extends PaimonSourceTestBase { + @BeforeAll + protected static void beforeAll() { + PaimonSourceTestBase.beforeAll(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testReadPkTable(boolean isPartitioned) throws Exception { + // first of all, create table and prepare data + String tableName = "pkTable_" + (isPartitioned ? "partitioned" : "non_partitioned"); + + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + + List writtenRows = new ArrayList<>(); + preparePkTable(tablePath, isPartitioned, DEFAULT_BUCKET_NUM, writtenRows); + + LakeSource lakeSource = lakeStorage.createLakeSource(tablePath); + Table table = getTable(tablePath); + Snapshot snapshot = table.latestSnapshot().get(); + List paimonSplits = lakeSource.createPlanner(snapshot::id).plan(); + + for (PaimonSplit paimonSplit : paimonSplits) { + RecordReader recordReader = lakeSource.createRecordReader(() -> paimonSplit); + assertThat(recordReader).isInstanceOf(PaimonSortedRecordReader.class); + CloseableIterator iterator = recordReader.read(); + assertThat( + isSorted( + TransformingCloseableIterator.transform( + iterator, LogRecord::getRow), + ((SortedRecordReader) recordReader).order())) + .isTrue(); + iterator.close(); + } + } + + private static boolean isSorted(Iterator iterator, Comparator comparator) { + if (!iterator.hasNext()) { + return true; + } + + T previous = iterator.next(); + while (iterator.hasNext()) { + T current = iterator.next(); + if (comparator.compare(previous, current) > 0) { + return false; + } + previous = current; + } + return true; + } + + private void preparePkTable( + TablePath tablePath, boolean isPartitioned, int bucketNum, List rows) + throws Exception { + createFullTypePkTable(tablePath, isPartitioned, bucketNum); + rows.addAll(writeFullTypeRows(tablePath, 10, isPartitioned ? "test" : null)); + } + + private void createFullTypePkTable(TablePath tablePath, boolean isPartitioned, int bucketNum) + throws Exception { + Schema.Builder schemaBuilder = + Schema.newBuilder() + .column("f_int", DataTypes.INT()) + .column("f_boolean", DataTypes.BOOLEAN()) + .column("f_byte", DataTypes.TINYINT()) + .column("f_short", DataTypes.SMALLINT()) + .column("f_long", DataTypes.BIGINT()) + .column("f_float", DataTypes.FLOAT()) + .column("f_double", DataTypes.DOUBLE()) + .column("f_string", DataTypes.STRING()) + .column("f_decimal1", DataTypes.DECIMAL(5, 2)) + .column("f_decimal2", DataTypes.DECIMAL(20, 0)) + .column("f_timestamp_ltz1", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) + .column("f_timestamp_ltz2", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)) + .column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3)) + .column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6)) + .column("f_binary", DataTypes.BINARY(4)); + + if (isPartitioned) { + schemaBuilder.column("p", DataTypes.STRING()); + schemaBuilder.partitionKeys("p"); + schemaBuilder.primaryKey("f_int", "p"); + schemaBuilder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum)); + schemaBuilder.option(CoreOptions.BUCKET_KEY.key(), "f_int"); + } else { + schemaBuilder.primaryKey("f_int"); + schemaBuilder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum)); + schemaBuilder.option(CoreOptions.BUCKET_KEY.key(), "f_int"); + } + schemaBuilder + .column("__bucket", DataTypes.INT()) + .column("__offset", DataTypes.BIGINT()) + .column("__timestamp", DataTypes.TIMESTAMP(6)); + createTable(tablePath, schemaBuilder.build()); + } + + private List writeFullTypeRows( + TablePath tablePath, int rowCount, @Nullable String partition) throws Exception { + List rows = new ArrayList<>(); + List flussRows = new ArrayList<>(); + Table table = getTable(tablePath); + + for (int i = 0; i < rowCount; i++) { + if (partition == null) { + com.alibaba.fluss.row.GenericRow row = + row( + i + 30, + true, + (byte) 100, + (short) 200, + 400L, + 500.1f, + 600.0d, + com.alibaba.fluss.row.BinaryString.fromString( + "another_string_" + i), + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + 0, + (long) i, + TimestampNtz.fromMillis(System.currentTimeMillis())); + rows.add(new FlussRowAsPaimonRow(row, table.rowType())); + flussRows.add(row); + } else { + com.alibaba.fluss.row.GenericRow row = + row( + i + 30, + true, + (byte) 100, + (short) 200, + 400L, + 500.1f, + 600.0d, + com.alibaba.fluss.row.BinaryString.fromString( + "another_string_" + i), + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + com.alibaba.fluss.row.BinaryString.fromString(partition), + 0, + (long) i, + TimestampNtz.fromMillis(System.currentTimeMillis())); + rows.add(new FlussRowAsPaimonRow(row, table.rowType())); + flussRows.add(row); + } + } + writeRecord(tablePath, rows); + return flussRows; + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSourceTestBase.java b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSourceTestBase.java new file mode 100644 index 000000000..93c73d329 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSourceTestBase.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.paimon.source; + +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.lake.paimon.PaimonLakeStorage; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.utils.CloseableIterator; + +import org.apache.flink.types.Row; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon; + +/** Base class for paimon lakehouse test. */ +class PaimonSourceTestBase { + protected static final String DEFAULT_DB = "fluss_lakehouse"; + protected static final String DEFAULT_TABLE = "test_lakehouse_table"; + protected static final int DEFAULT_BUCKET_NUM = 1; + + private static @TempDir File tempWarehouseDir; + protected static PaimonLakeStorage lakeStorage; + protected static Catalog paimonCatalog; + + @BeforeAll + protected static void beforeAll() { + Configuration configuration = new Configuration(); + configuration.setString("type", "paimon"); + configuration.setString("warehouse", tempWarehouseDir.toString()); + lakeStorage = new PaimonLakeStorage(configuration); + paimonCatalog = + CatalogFactory.createCatalog( + CatalogContext.create(Options.fromMap(configuration.toMap()))); + } + + public void createTable(TablePath tablePath, Schema schema) throws Exception { + paimonCatalog.createDatabase(tablePath.getDatabaseName(), true); + paimonCatalog.createTable(toPaimon(tablePath), schema, true); + } + + public void writeRecord(TablePath tablePath, List records) throws Exception { + Table table = getTable(tablePath); + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder().withOverwrite(); + try (BatchTableWrite writer = writeBuilder.newWrite()) { + for (InternalRow record : records) { + writer.write(record); + } + List messages = writer.prepareCommit(); + try (BatchTableCommit commit = writeBuilder.newCommit()) { + commit.commit(messages); + } + } + } + + public Table getTable(TablePath tablePath) throws Exception { + return paimonCatalog.getTable(toPaimon(tablePath)); + } + + public static List convertToFlinkRow( + com.alibaba.fluss.row.InternalRow.FieldGetter[] fieldGetters, + CloseableIterator flussRowIterator) { + List rows = new ArrayList<>(); + while (flussRowIterator.hasNext()) { + com.alibaba.fluss.row.InternalRow row = flussRowIterator.next(); + Row flinkRow = new Row(fieldGetters.length); + for (int i = 0; i < fieldGetters.length; i++) { + flinkRow.setField(i, fieldGetters[i].getFieldOrNull(row)); + } + rows.add(flinkRow); + } + return rows; + } + + /** Adapter for transforming closeable iterator. */ + public static class TransformingCloseableIterator implements CloseableIterator { + private final CloseableIterator source; + private final Function transformer; + + public TransformingCloseableIterator( + CloseableIterator source, Function transformer) { + this.source = source; + this.transformer = transformer; + } + + @Override + public boolean hasNext() { + return source.hasNext(); + } + + @Override + public U next() { + return transformer.apply(source.next()); + } + + @Override + public void close() { + source.close(); + } + + public static CloseableIterator transform( + CloseableIterator source, Function transformer) { + return new TransformingCloseableIterator<>(source, transformer); + } + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlannerTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlannerTest.java new file mode 100644 index 000000000..0d9e509fe --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlannerTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.alibaba.fluss.lake.paimon.source; + +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.metadata.TablePath; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.types.DataTypes; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test case for {@link PaimonSplitPlanner}. */ +class PaimonSplitPlannerTest extends PaimonSourceTestBase { + @Test + void testPlan() throws Exception { + // prepare paimon table + int bucketNum = 2; + TablePath tablePath = TablePath.of(DEFAULT_DB, DEFAULT_TABLE); + Schema.Builder builder = + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .column("c3", DataTypes.STRING()); + builder.partitionKeys("c3"); + builder.primaryKey("c1", "c3"); + builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum)); + builder.option(CoreOptions.BUCKET_KEY.key(), "c1"); + createTable(tablePath, builder.build()); + Table table = + paimonCatalog.getTable( + Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName())); + + GenericRow record1 = + GenericRow.of(12, BinaryString.fromString("a"), BinaryString.fromString("A")); + GenericRow record2 = + GenericRow.of(13, BinaryString.fromString("a"), BinaryString.fromString("A")); + writeRecord(tablePath, Arrays.asList(record1, record2)); + Snapshot snapshot = table.latestSnapshot().get(); + + LakeSource lakeSource = lakeStorage.createLakeSource(tablePath); + List paimonSplits = lakeSource.createPlanner(snapshot::id).plan(); + + List actualSplits = ((FileStoreTable) table).newScan().plan().splits(); + + assertThat(actualSplits).hasSize(paimonSplits.size()); + assertThat(actualSplits) + .isEqualTo( + paimonSplits.stream() + .map(PaimonSplit::dataSplit) + .collect(Collectors.toList())); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializerTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializerTest.java new file mode 100644 index 000000000..9f07509be --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializerTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.alibaba.fluss.lake.paimon.source; + +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.metadata.TablePath; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataTypes; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test case for {@link PaimonSplitSerializer}. */ +class PaimonSplitSerializerTest extends PaimonSourceTestBase { + private final PaimonSplitSerializer serializer = new PaimonSplitSerializer(); + + @Test + void testSerializeAndDeserialize() throws Exception { + // prepare paimon table + int bucketNum = 1; + TablePath tablePath = TablePath.of(DEFAULT_DB, DEFAULT_TABLE); + Schema.Builder builder = + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .column("c3", DataTypes.STRING()); + builder.partitionKeys("c3"); + builder.primaryKey("c1", "c3"); + builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum)); + createTable(tablePath, builder.build()); + Table table = + paimonCatalog.getTable( + Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName())); + + GenericRow record1 = + GenericRow.of(12, BinaryString.fromString("a"), BinaryString.fromString("A")); + writeRecord(tablePath, Arrays.asList(record1)); + Snapshot snapshot = table.latestSnapshot().get(); + + LakeSource lakeSource = lakeStorage.createLakeSource(tablePath); + List plan = lakeSource.createPlanner(snapshot::id).plan(); + + PaimonSplit originalPaimonSplit = plan.get(0); + byte[] serialized = serializer.serialize(originalPaimonSplit); + PaimonSplit deserialized = serializer.deserialize(serializer.getVersion(), serialized); + + assertThat(deserialized.dataSplit()).isEqualTo(originalPaimonSplit.dataSplit()); + } + + @Test + void testDeserializeWithInvalidData() { + byte[] invalidData = "invalid".getBytes(); + assertThatThrownBy(() -> serializer.deserialize(1, invalidData)) + .isInstanceOf(IOException.class); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitTest.java new file mode 100644 index 000000000..b0139d1d8 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package com.alibaba.fluss.lake.paimon.source; + +import com.alibaba.fluss.lake.source.LakeSource; +import com.alibaba.fluss.metadata.TablePath; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.types.DataTypes; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test case for {@link PaimonSplit}. */ +class PaimonSplitTest extends PaimonSourceTestBase { + + @Test + void testPaimonSplit() throws Exception { + // prepare paimon table + int bucketNum = 1; + TablePath tablePath = TablePath.of(DEFAULT_DB, DEFAULT_TABLE); + Schema.Builder builder = + Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .column("c3", DataTypes.STRING()); + builder.partitionKeys("c3"); + builder.primaryKey("c1", "c3"); + builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum)); + createTable(tablePath, builder.build()); + Table table = + paimonCatalog.getTable( + Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName())); + + GenericRow record1 = + GenericRow.of(12, BinaryString.fromString("a"), BinaryString.fromString("A")); + writeRecord(tablePath, Collections.singletonList(record1)); + Snapshot snapshot = table.latestSnapshot().get(); + + LakeSource lakeSource = lakeStorage.createLakeSource(tablePath); + List paimonSplits = lakeSource.createPlanner(snapshot::id).plan(); + + // test bucket() and partition() method + PaimonSplit paimonSplit = paimonSplits.get(0); + assertThat(paimonSplit.partition()).isEqualTo(Collections.singletonList("A")); + + List actualSplits = ((FileStoreTable) table).newScan().plan().splits(); + assertThat(actualSplits.size()).isEqualTo(paimonSplits.size()); + Split actualSplit = actualSplits.get(0); + assertThat(actualSplit).isEqualTo(paimonSplit.dataSplit()); + assertThat(((DataSplit) actualSplit).bucket()).isEqualTo(paimonSplit.bucket()); + } +} diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java index d9c0bebd5..8abb04d35 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java @@ -91,7 +91,7 @@ public class FlinkPaimonTieringTestBase { protected static Connection conn; protected static Admin admin; protected static Configuration clientConf; - private static String warehousePath; + protected static String warehousePath; protected static Catalog paimonCatalog; private static Configuration initConfig() {