diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MergingSortedRowDataReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MergingSortedRowDataReader.java
new file mode 100644
index 000000000000..228abb50e667
--- /dev/null
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MergingSortedRowDataReader.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.iceberg.BaseScanTaskGroup;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderComparators;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.source.metrics.TaskNumDeletes;
+import org.apache.iceberg.spark.source.metrics.TaskNumSplits;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SortedMerge;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.ProjectingInternalRow;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+/**
+ * A {@link PartitionReader} that reads multiple sorted files and merges them into a single sorted
+ * stream using a k-way heap merge ({@link SortedMerge}).
+ *
+ *
This reader is used when {@code preserve-data-ordering} is enabled and the task group contains
+ * multiple files that all have the same sort order.
+ *
+ *
Sort key columns absent from the requested projection are temporarily added to the read schema
+ * so that {@link SortOrderComparators} can access them during the merge. The extra columns are
+ * stripped from each row before it is returned to Spark.
+ */
+class MergingSortedRowDataReader implements PartitionReader {
+ private static final Logger LOG = LoggerFactory.getLogger(MergingSortedRowDataReader.class);
+
+ private final CloseableGroup resources;
+ private final CloseableIterator mergedIterator;
+ private final List fileReaders;
+ // non-null only when sort key columns were added to the read schema beyond what Spark projected
+ private final ProjectingInternalRow projectingRow;
+ private InternalRow current;
+
+ MergingSortedRowDataReader(SparkInputPartition partition) {
+ Table table = partition.table();
+ ScanTaskGroup taskGroup = partition.taskGroup();
+ Schema projection = partition.projection();
+ SortOrder sortOrder = table.sortOrder();
+
+ int numFiles = taskGroup.tasks().size();
+
+ Preconditions.checkState(
+ sortOrder.isSorted(), "Cannot create merging reader for unsorted table %s", table.name());
+ Preconditions.checkState(
+ numFiles > 1, "Merging reader requires multiple files, got %s", numFiles);
+
+ LOG.info(
+ "Creating merging reader for {} files with sort order {} in table {}",
+ numFiles,
+ sortOrder.orderId(),
+ table.name());
+
+ // Augment the projected schema with any sort key columns Spark did not request so that
+ // SortOrderComparators can access every sort key field during the merge.
+ Schema mergeReadSchema = mergeReadSchema(projection, sortOrder, table);
+ this.projectingRow = buildProjectingRow(projection, mergeReadSchema);
+
+ this.resources = new CloseableGroup();
+ this.fileReaders =
+ taskGroup.tasks().stream()
+ .map(
+ task ->
+ new RowDataReader(
+ table,
+ partition.io(),
+ new BaseScanTaskGroup<>(ImmutableList.of(task)),
+ mergeReadSchema,
+ partition.isCaseSensitive(),
+ partition.cacheDeleteFilesOnExecutors()))
+ .toList();
+ fileReaders.forEach(resources::addCloseable);
+ // Wrap each reader as a CloseableIterable and feed into SortedMerge.
+ List> fileIterables =
+ fileReaders.stream().map(this::readerToIterable).toList();
+ SortedMerge sortedMerge =
+ new SortedMerge<>(buildComparator(mergeReadSchema, sortOrder), fileIterables);
+ resources.addCloseable(sortedMerge);
+ this.mergedIterator = sortedMerge.iterator();
+ }
+
+ /**
+ * Adapts a {@link RowDataReader} to a {@link CloseableIterable} for use with {@link SortedMerge}.
+ * Each row is copied before it enters the priority queue because Spark's Parquet/ORC readers
+ * reuse {@link InternalRow} instances for performance.
+ */
+ private CloseableIterable readerToIterable(RowDataReader reader) {
+ return CloseableIterable.withNoopClose(
+ () ->
+ new CloseableIterator<>() {
+ private boolean advanced = false;
+ private boolean hasNext = false;
+
+ @Override
+ public boolean hasNext() {
+ if (!advanced) {
+ try {
+ hasNext = reader.next();
+ advanced = true;
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to advance reader", e);
+ }
+ }
+ return hasNext;
+ }
+
+ @Override
+ public InternalRow next() {
+ if (!advanced) {
+ hasNext();
+ }
+ advanced = false;
+ return reader.get().copy();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+ });
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ if (!mergedIterator.hasNext()) {
+ return false;
+ }
+
+ InternalRow merged = mergedIterator.next();
+ if (projectingRow == null) {
+ this.current = merged;
+ } else {
+ projectingRow.project(merged);
+ this.current = projectingRow;
+ }
+
+ return true;
+ }
+
+ @Override
+ public InternalRow get() {
+ return current;
+ }
+
+ @Override
+ public void close() throws IOException {
+ resources.close();
+ }
+
+ @Override
+ public CustomTaskMetric[] currentMetricsValues() {
+ long totalDeletes =
+ fileReaders.stream()
+ .flatMap(reader -> Arrays.stream(reader.currentMetricsValues()))
+ .filter(metric -> metric instanceof TaskNumDeletes)
+ .mapToLong(CustomTaskMetric::value)
+ .sum();
+ return new CustomTaskMetric[] {
+ new TaskNumSplits(fileReaders.size()), new TaskNumDeletes(totalDeletes)
+ };
+ }
+
+ /**
+ * Builds a comparator for merging {@link InternalRow}s by the given sort order. Uses {@link
+ * SortOrderComparators} which handles all transform types (identity, bucket, truncate), ASC/DESC
+ * directions, and null ordering. The two {@link InternalRowWrapper} instances are allocated once
+ * and reused — {@code wrap()} just updates an internal reference.
+ */
+ private static Comparator buildComparator(
+ Schema mergeReadSchema, SortOrder sortOrder) {
+ StructType sparkSchema = SparkSchemaUtil.convert(mergeReadSchema);
+ Comparator keyComparator =
+ SortOrderComparators.forSchema(mergeReadSchema, sortOrder);
+ InternalRowWrapper left = new InternalRowWrapper(sparkSchema, mergeReadSchema.asStruct());
+ InternalRowWrapper right = new InternalRowWrapper(sparkSchema, mergeReadSchema.asStruct());
+ return (r1, r2) -> keyComparator.compare(left.wrap(r1), right.wrap(r2));
+ }
+
+ /**
+ * Returns a {@link ProjectingInternalRow} that remaps columns from the wider merge schema back to
+ * the requested projection, or {@code null} if no extra columns were added.
+ */
+ private static ProjectingInternalRow buildProjectingRow(Schema projection, Schema mergeSchema) {
+ if (projection.columns().size() == mergeSchema.columns().size()) {
+ return null;
+ }
+
+ List mergeColumns = mergeSchema.columns();
+ List