diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java index dbdfdc8fe06e..b03a6cdf7a2f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java @@ -85,7 +85,7 @@ public static boolean buildIndex( PartitionPredicate partitionPredicate, Options userOptions) throws Exception { - DataStream allCommitMessages = null; + List> allStreams = new ArrayList<>(); for (String indexColumn : indexColumns) { BTreeGlobalIndexBuilder indexBuilder = indexBuilderSupplier.get().withIndexField(indexColumn); @@ -160,15 +160,15 @@ public static boolean buildIndex( recordsPerRange, maxParallelism); - allCommitMessages = - allCommitMessages == null - ? commitMessages - : allCommitMessages.union(commitMessages); + allStreams.add(commitMessages); } } } - if (allCommitMessages != null) { - commit(table, allCommitMessages); + if (!allStreams.isEmpty()) { + @SuppressWarnings("unchecked") + DataStream[] rest = + allStreams.subList(1, allStreams.size()).toArray(new DataStream[0]); + commit(table, allStreams.get(0).union(rest)); } return true; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java index d6368da23f01..9aaa39e446f2 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java @@ -23,11 +23,14 @@ import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.table.FileStoreTable; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -140,4 +143,91 @@ private void buildBTreeIndexForTable(String tableName, String indexColumn) { "CALL sys.create_global_index(`table` => 'default.%s', index_column => '%s', index_type => 'btree')", tableName, indexColumn); } + + @Test + void testBTreeIndexWithManyPartitions() throws Catalog.TableNotExistException { + int numPartitions = 50; + sql( + "CREATE TABLE T_MANY_PT (pt INT, id INT, name STRING) PARTITIONED BY (pt) WITH (" + + "'global-index.enabled' = 'true', " + + "'row-tracking.enabled' = 'true', " + + "'data-evolution.enabled' = 'true'" + + ")"); + + for (int p = 0; p < numPartitions; p++) { + insertPartitionRows("T_MANY_PT", p, p * 2, 2, "r_"); + } + + buildBTreeIndexForTable("T_MANY_PT", "id"); + + FileStoreTable table = paimonTable("T_MANY_PT"); + long totalRowCount = + table.store().newIndexFileHandler().scanEntries().stream() + .filter(e -> "btree".equals(e.indexFile().indexType())) + .map(IndexManifestEntry::indexFile) + .mapToLong(IndexFileMeta::rowCount) + .sum(); + assertThat(totalRowCount).isEqualTo((long) numPartitions * 2); + } + + @Test + void testUnionDoesNotStackOverflow() throws InterruptedException { + int totalUnions = 1000; + long stackSize = 512 * 1024; // Flink JM default + + // Chained union: result = result.union(new) — causes StackOverflowError + AtomicReference chainedError = new AtomicReference<>(); + Thread chainedThread = + new Thread( + null, + () -> { + try { + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream all = null; + for (int i = 0; i < totalUnions; i++) { + DataStream s = env.fromElements("item-" + i); + all = all == null ? s : all.union(s); + } + all.print(); + env.getExecutionPlan(); + } catch (Throwable t) { + chainedError.set(t); + } + }, + "chained-union-test", + stackSize); + chainedThread.start(); + chainedThread.join(); + assertThat(chainedError.get()).isInstanceOf(StackOverflowError.class); + + // Flat union: first.union(rest...) — no overflow at same stack size + AtomicReference flatError = new AtomicReference<>(); + Thread flatThread = + new Thread( + null, + () -> { + try { + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + @SuppressWarnings("unchecked") + DataStream[] streams = new DataStream[totalUnions]; + for (int i = 0; i < totalUnions; i++) { + streams[i] = env.fromElements("item-" + i); + } + @SuppressWarnings("unchecked") + DataStream[] rest = new DataStream[totalUnions - 1]; + System.arraycopy(streams, 1, rest, 0, totalUnions - 1); + streams[0].union(rest).print(); + env.getExecutionPlan(); + } catch (Throwable t) { + flatError.set(t); + } + }, + "flat-union-test", + stackSize); + flatThread.start(); + flatThread.join(); + assertThat(flatError.get()).isNull(); + } }