From f27b515780d22afc3d74e644ab66794f0e52e7e2 Mon Sep 17 00:00:00 2001 From: Biranavan Parameswaran Date: Sat, 6 Dec 2025 17:24:37 +0100 Subject: [PATCH 1/4] [SYSTEMDS-3730] Multithreaded roll operation and improved tests This patch introduces multi-threading support for the roll operation to improve performance. The RollTest.java has been updated to cover both single and multithreaded execution modes. Furthermore, this update adds comprehensive consistency checks to ensure mathematical correctness. New tests were created to validate both dense and sparse matrix inputs. Additionally, cross-verification tests were added to confirm that sparse and dense rolling for single and multithreaded executions produce identical results. --- .../java/org/apache/sysds/hops/ReorgOp.java | 4 +- .../java/org/apache/sysds/lops/Transform.java | 2 +- .../instructions/cp/ReorgCPInstruction.java | 6 +- .../runtime/matrix/data/LibMatrixReorg.java | 119 ++++++++++- ...nseMatrixRollOperationCorrectnessTest.java | 183 +++++++++++++++++ .../RollOperationThreadSafetyTest.java | 123 ++++++++++++ .../matrix/libMatrixReorg/RollTest.java | 27 ++- ...rseMatrixRollOperationCorrectnessTest.java | 185 ++++++++++++++++++ 8 files changed, 640 insertions(+), 9 deletions(-) create mode 100644 src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/DenseMatrixRollOperationCorrectnessTest.java create mode 100644 src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java create mode 100644 src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/SparseMatrixRollOperationCorrectnessTest.java diff --git a/src/main/java/org/apache/sysds/hops/ReorgOp.java b/src/main/java/org/apache/sysds/hops/ReorgOp.java index 5fc73e2bd3f..f43d1cc2baf 100644 --- a/src/main/java/org/apache/sysds/hops/ReorgOp.java +++ b/src/main/java/org/apache/sysds/hops/ReorgOp.java @@ -173,7 +173,9 @@ _op, getDataType(), getValueType(), et, for (int i = 0; i < 2; i++) linputs[i] = getInput().get(i).constructLops(); - Transform transform1 = new Transform(linputs, _op, getDataType(), getValueType(), et, 1); + Transform transform1 = new Transform( + linputs, _op, getDataType(), getValueType(), et, + OptimizerUtils.getConstrainedNumThreads(_maxNumThreads)); setOutputDimensions(transform1); setLineNumbers(transform1); diff --git a/src/main/java/org/apache/sysds/lops/Transform.java b/src/main/java/org/apache/sysds/lops/Transform.java index 0d2e79f83a8..d9537dcca6c 100644 --- a/src/main/java/org/apache/sysds/lops/Transform.java +++ b/src/main/java/org/apache/sysds/lops/Transform.java @@ -180,7 +180,7 @@ private String getInstructions(String input1, int numInputs, String output) { sb.append( this.prepOutputOperand(output)); if( (getExecType()==ExecType.CP || getExecType()==ExecType.FED || getExecType()==ExecType.OOC) - && (_operation == ReOrgOp.TRANS || _operation == ReOrgOp.REV || _operation == ReOrgOp.SORT) ) { + && (_operation == ReOrgOp.TRANS || _operation == ReOrgOp.REV || _operation == ReOrgOp.SORT || _operation == ReOrgOp.ROLL) ) { sb.append( OPERAND_DELIMITOR ); sb.append( _numThreads ); if ( getExecType()==ExecType.FED ) { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java index a1788c0e251..be77f4eb4eb 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java @@ -118,11 +118,13 @@ else if ( opcode.equalsIgnoreCase(Opcodes.REV.toString()) ) { return new ReorgCPInstruction(new ReorgOperator(RevIndex.getRevIndexFnObject(), k), in, out, opcode, str); } else if (opcode.equalsIgnoreCase(Opcodes.ROLL.toString())) { - InstructionUtils.checkNumFields(str, 3); + InstructionUtils.checkNumFields(str, 3, 4); in.split(parts[1]); out.split(parts[3]); CPOperand shift = new CPOperand(parts[2]); - return new ReorgCPInstruction(new ReorgOperator(new RollIndex(0)), in, out, shift, opcode, str); + int k = (parts.length > 4) ? Integer.parseInt(parts[4]) : 1; + + return new ReorgCPInstruction(new ReorgOperator(new RollIndex(0), k), in, out, shift, opcode, str); } else if ( opcode.equalsIgnoreCase(Opcodes.DIAG.toString()) ) { parseUnaryInstruction(str, in, out); //max 2 operands diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java index 90ea445be8d..98b0177cb82 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java @@ -134,6 +134,8 @@ public static MatrixBlock reorg( MatrixBlock in, MatrixBlock out, ReorgOperator return rev(in, out); case ROLL: RollIndex rix = (RollIndex) op.fn; + if(op.getNumThreads() > 1) + return roll(in, out, rix.getShift(), op.getNumThreads()); return roll(in, out, rix.getShift()); case DIAG: return diag(in, out); @@ -514,6 +516,119 @@ public static MatrixBlock roll(MatrixBlock in, MatrixBlock out, int shift) { return out; } + public static MatrixBlock roll(MatrixBlock input, MatrixBlock output, int shift, int numThreads) { + + final int numRows = input.rlen; + final int numCols = input.clen; + final boolean isSparse = input.sparse; + + // sparse-safe operation + if(input.isEmptyBlock(false)) + return output; + + // special case: row vector + if(numRows == 1) { + output.copy(input); + return output; + } + + if(numThreads <= 1 || input.isEmptyBlock(false) || input.getLength() < PAR_NUMCELL_THRESHOLD) { + return roll(input, output, shift); // fallback to single-threaded + } + + final int normalizedShift = getNormalizedShiftForRoll(shift, numRows); + + output.reset(numRows, numCols, isSparse); + output.nonZeros = input.nonZeros; + + if(isSparse) { + output.allocateSparseRowsBlock(false); + } + else { + output.allocateDenseBlock(false); + } + + ExecutorService threadPool = CommonThreadPool.get(numThreads); + try { + final int rowsPerThread = (int) Math.ceil((double) numRows / numThreads); + List> tasks = new ArrayList<>(); + + for(int threadIndex = 0; threadIndex < numThreads; threadIndex++) { + + final int startRow = threadIndex * rowsPerThread; + final int endRow = Math.min((threadIndex + 1) * rowsPerThread, numRows); + + tasks.add(threadPool.submit(() -> { + if(isSparse) + rollSparseBlock(input, output, normalizedShift, startRow, endRow); + else + rollDenseBlock(input, output, normalizedShift, startRow, endRow); + })); + } + + for(Future task : tasks) + task.get(); + + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + finally { + threadPool.shutdown(); + } + + return output; + } + + private static int getNormalizedShiftForRoll(int shift, int numRows) { + shift = shift % numRows; + if(shift < 0) + shift += numRows; + + return shift; + } + + private static void rollDenseBlock(MatrixBlock input, MatrixBlock output, int shift, int startRow, int endRow) { + + DenseBlock inputBlock = input.getDenseBlock(); + DenseBlock outputBlock = output.getDenseBlock(); + final int numRows = input.rlen; + final int numCols = input.clen; + + for(int targetRow = startRow; targetRow < endRow; targetRow++) { + int sourceRow = targetRow - shift; + if(sourceRow < 0) + sourceRow += numRows; + + System.arraycopy(inputBlock.values(sourceRow), inputBlock.pos(sourceRow), outputBlock.values(targetRow), + outputBlock.pos(targetRow), numCols); + } + } + + private static void rollSparseBlock(MatrixBlock input, MatrixBlock output, int shift, int startRow, int endRow) { + + SparseBlock inputBlock = input.getSparseBlock(); + SparseBlock outputBlock = output.getSparseBlock(); + final int numRows = input.rlen; + + for(int targetRow = startRow; targetRow < endRow; targetRow++) { + int sourceRow = targetRow - shift; + if(sourceRow < 0) + sourceRow += numRows; + + if(!inputBlock.isEmpty(sourceRow)) { + int rowStart = inputBlock.pos(sourceRow); + int rowEnd = rowStart + inputBlock.size(sourceRow); + int[] colIndexes = inputBlock.indexes(sourceRow); + double[] values = inputBlock.values(sourceRow); + + for(int k = rowStart; k < rowEnd; k++) { + outputBlock.set(targetRow, colIndexes[k], values[k]); + } + } + } + } + public static void roll(IndexedMatrixValue in, long rlen, int blen, int shift, ArrayList out) { MatrixIndexes inMtxIdx = in.getIndexes(); MatrixBlock inMtxBlk = (MatrixBlock) in.getValue(); @@ -2554,7 +2669,7 @@ private static void reverseSparse(MatrixBlock in, MatrixBlock out, int rl, int r private static void rollDense(MatrixBlock in, MatrixBlock out, int shift) { final int m = in.rlen; - shift %= (m != 0 ? m : 1); // roll matrix with axis=none + shift = getNormalizedShiftForRoll(shift, m); // roll matrix with axis=none copyDenseMtx(in, out, 0, shift, m - shift, false, true); copyDenseMtx(in, out, m - shift, 0, shift, true, true); @@ -2562,7 +2677,7 @@ private static void rollDense(MatrixBlock in, MatrixBlock out, int shift) { private static void rollSparse(MatrixBlock in, MatrixBlock out, int shift) { final int m = in.rlen; - shift %= (m != 0 ? m : 1); // roll matrix with axis=0 + shift = getNormalizedShiftForRoll(shift, m); // roll matrix with axis=0 copySparseMtx(in, out, 0, shift, m - shift, false, true); copySparseMtx(in, out, m-shift, 0, shift, false, true); diff --git a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/DenseMatrixRollOperationCorrectnessTest.java b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/DenseMatrixRollOperationCorrectnessTest.java new file mode 100644 index 00000000000..157e411cf68 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/DenseMatrixRollOperationCorrectnessTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.matrix.libMatrixReorg; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.sysds.runtime.functionobjects.IndexFunction; +import org.apache.sysds.runtime.functionobjects.RollIndex; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.ReorgOperator; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class DenseMatrixRollOperationCorrectnessTest { + + private final double[][] input; + private final double[][] expected; + private final int shift; + + public DenseMatrixRollOperationCorrectnessTest(double[][] input, double[][] expected, int shift) { + this.input = input; + this.expected = expected; + this.shift = shift; + } + + @Parameterized.Parameters(name = "Shift={2}, Size={0}x{1}") + public static Collection data() { + return Arrays.asList(new Object[][] { + { + new double[][] {{1, 2, 3, 4, 5}}, + new double[][] {{1, 2, 3, 4, 5}}, + 0 + }, + { + new double[][] {{1, 2, 3, 4, 5}}, + new double[][] {{1, 2, 3, 4, 5}}, + 1 + }, + { + new double[][] {{1, 2, 3, 4, 5}}, + new double[][] {{1, 2, 3, 4, 5}}, + -3 + }, + { + new double[][] {{1, 2, 3, 4, 5}}, + new double[][] {{1, 2, 3, 4, 5}}, + 999 + }, + { + new double[][] {{1}, {2}, {3}, {4}, {5}}, + new double[][] {{4}, {5}, {1}, {2}, {3}}, + 2 + }, + { + new double[][] {{1}, {2}, {3}, {4}, {5}}, + new double[][] {{2}, {3}, {4}, {5}, {1}}, + -1 + }, + { + new double[][] {{1}, {2}, {3}, {4}, {5}}, + new double[][] {{1}, {2}, {3}, {4}, {5}}, + 5 + }, + { + new double[][] {{1, 2, 3}, {4, 5, 6}}, + new double[][] {{4, 5, 6}, {1, 2, 3}}, + 1 + }, + { + new double[][] {{1, 2, 3}, {4, 5, 6}}, + new double[][] {{4, 5, 6}, {1, 2, 3}}, + 7 + }, + { + new double[][] {{1, 2, 3}, {4, 5, 6}}, + new double[][] {{1, 2, 3}, {4, 5, 6}}, + 2 + }, + { + new double[][] {{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}, + new double[][] {{7, 8, 9}, {1, 2, 3}, {4, 5, 6}}, + 1 + }, + { + new double[][] {{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}, + new double[][] {{4, 5, 6}, {7, 8, 9}, {1, 2, 3}}, + -1 + }, + { + new double[][] {{9, 8, 7}, {6, 5, 4}, {3, 2, 1}}, + new double[][] {{3, 2, 1}, {9, 8, 7}, {6, 5, 4}}, + 1 + }, + { + new double[][] {{1, 2, 3, 4}, {5, 6, 7, 8}, {9, 10, 11, 12}}, + new double[][] {{9, 10, 11, 12}, {1, 2, 3, 4}, {5, 6, 7, 8}}, + 1 + }, + { + new double[][] {{1, 2, 3, 4}, {5, 6, 7, 8}, {9, 10, 11, 12}}, + new double[][] {{5, 6, 7, 8}, {9, 10, 11, 12}, {1, 2, 3, 4}}, + -1 + }, + { + new double[][] {{1, 2, 3, 4, 5}, {6, 7, 8, 9, 10}, {11, 12, 13, 14, 15}, {16, 17, 18, 19, 20}, {21, 22, 23, 24, 25}}, + new double[][] {{21, 22, 23, 24, 25}, {1, 2, 3, 4, 5}, {6, 7, 8, 9, 10}, {11, 12, 13, 14, 15}, {16, 17, 18, 19, 20}}, + 1 + }, + { + new double[][] {{1, 2, 3, 4, 5}, {6, 7, 8, 9, 10}, {11, 12, 13, 14, 15}, {16, 17, 18, 19, 20}, {21, 22, 23, 24, 25}}, + new double[][] {{11, 12, 13, 14, 15}, {16, 17, 18, 19, 20}, {21, 22, 23, 24, 25}, {1, 2, 3, 4, 5}, {6, 7, 8, 9, 10}}, + -2 + }, + { + new double[][] {{1, 2, 3}, {4, 5, 6}, {7, 8, 9}, {10, 11, 12}, {13, 14, 15}, {16, 17, 18}, {19, 20, 21}, + {22, 23, 24}, {25, 26, 27}, {28, 29, 30}}, + new double[][] {{22, 23, 24}, {25, 26, 27}, {28, 29, 30}, {1, 2, 3}, {4, 5, 6}, {7, 8, 9}, {10, 11, 12}, + {13, 14, 15}, {16, 17, 18}, {19, 20, 21}}, + 3 + }, + { + new double[][] {{1, 2}, {3, 4}, {5, 6}, {7, 8}}, + new double[][] {{5, 6}, {7, 8}, {1, 2}, {3, 4}}, + 1002 + }, + { + new double[][] {{1}, {2}, {3}, {4}, {5}}, + new double[][] {{3}, {4}, {5}, {1}, {2}}, + -12 + }, + { + new double[][] {{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}, + new double[][] {{4, 5, 6}, {7, 8, 9}, {1, 2, 3}}, + -10 + }, + { + new double[][] {{1, 2}, {3, 4}, {5, 6}, {7, 8}}, + new double[][] {{1, 2}, {3, 4}, {5, 6}, {7, 8}}, + -4 + }, + { + new double[][] {{1, 2}, {3, 4}, {5, 6}, {7, 8}}, + new double[][] {{3, 4}, {5, 6}, {7, 8}, {1, 2}}, + -5 + } + }); + } + + @Test + public void testRollOperationProducesExpectedOutput() { + MatrixBlock inBlock = new MatrixBlock(input.length, input[0].length, false); + inBlock.init(input, input.length, input[0].length); + + IndexFunction op = new RollIndex(shift); + MatrixBlock outBlock = inBlock.reorgOperations(new ReorgOperator(op), new MatrixBlock(), 0, 0, 5); + + MatrixBlock expectedBlock = new MatrixBlock(expected.length, expected[0].length, false); + expectedBlock.init(expected, expected.length, expected[0].length); + + TestUtils.compareMatrices(outBlock, expectedBlock, 1e-12, "Dense Roll operation does not match expected output"); + } +} diff --git a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java new file mode 100644 index 00000000000..56562deb308 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.matrix.libMatrixReorg; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Random; + +import org.apache.sysds.runtime.functionobjects.IndexFunction; +import org.apache.sysds.runtime.functionobjects.RollIndex; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.ReorgOperator; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class RollOperationThreadSafetyTest { + + private static final int MIN_ROWS = 2017; + private static final int MIN_COLS = 1001; + private static final int MIN_SHIFT = -50; + private static final int MAX_SHIFT = 1022; + private static final int NUM_TESTS = 100; + private static final double TEST_SPARSITY = 0.01; + private final int rows; + private final int cols; + private final int shift; + private final MatrixBlock inputDense; + private final MatrixBlock inputSparse; + + public RollOperationThreadSafetyTest(int rows, int cols, int shift) { + this.rows = rows; + this.cols = cols; + this.shift = shift; + + MatrixBlock tempInput = TestUtils.generateTestMatrixBlock(rows, cols, -100, 100, TEST_SPARSITY, 42); + + this.inputSparse = tempInput; + + this.inputDense = new MatrixBlock(rows, cols, false); + this.inputDense.copy(tempInput, false); + this.inputDense.recomputeNonZeros(); + } + + /** + * Defines the parameters for the test cases (Random Rows, Random Cols, Random Shift). + * + * @return Collection of test parameters. + */ + @Parameters(name = "Case {index}: Size={0}x{1}, Shift={2}") + public static Collection data() { + ArrayList tests = new ArrayList<>(); + Random rand = new Random(42); + + for(int i = 0; i < NUM_TESTS; i++) { + // Generate random dimensions (adding random buffer to the minimums) + int r = MIN_ROWS + rand.nextInt(500); + int c = MIN_COLS + rand.nextInt(500); + + int s = rand.nextInt((MAX_SHIFT - MIN_SHIFT) + 1) + MIN_SHIFT; + + tests.add(new Object[] {r, c, s}); + } + return tests; + } + + @Test + public void denseRollOperationSingleAndMultiThreadedShouldReturnSameResult() { + int numThreads = getNumThreads(); + + MatrixBlock outSingle = rollOperation(inputDense, 1); + MatrixBlock outMulti = rollOperation(inputDense, numThreads); + + TestUtils.compareMatrices(outSingle, outMulti, 1e-12, + "Dense Mismatch (numThreads=1 vs numThreads>1) for Size=" + rows + "x" + cols + " Shift=" + shift); + } + + @Test + public void sparseRollOperationSingleAndMultiThreadedShouldReturnSameResult() { + int numThreads = getNumThreads(); + + MatrixBlock outSingle = rollOperation(inputSparse, 1); + MatrixBlock outMulti = rollOperation(inputSparse, numThreads); + + TestUtils.compareMatrices(outSingle, outMulti, 1e-12, + "Sparse Mismatch (numThreads=1 vs numThreads>1) for Size=" + rows + "x" + cols + " Shift=" + shift); + } + + private MatrixBlock rollOperation(MatrixBlock inBlock, int numThreads) { + IndexFunction op = new RollIndex(shift); + ReorgOperator reorgOperator = new ReorgOperator(op, numThreads); + + MatrixBlock outBlock = new MatrixBlock(rows, cols, inBlock.isInSparseFormat()); + + return inBlock.reorgOperations(reorgOperator, outBlock, 0, 0, 0); + } + + private static int getNumThreads() { + // number of threads should be at least two to invoke multithreaded operation + int cores = Runtime.getRuntime().availableProcessors(); + return Math.max(2, cores); + } +} diff --git a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollTest.java b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollTest.java index d2ad83597bc..dc37990c331 100644 --- a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollTest.java +++ b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollTest.java @@ -100,15 +100,36 @@ public static Collection data() { /** * The actual test method that performs the roll operation on both * sparse and dense matrices and compares the results. + * This test will execute the single threaded operation */ @Test - public void test() { + public void testSingleThreadedOperation() { + int numThreads = 1; + compareDenseAndSparseRepresentation(numThreads); + } + + + /** + * The actual test method that performs the roll operation on both + * sparse and dense matrices and compares the results. + * This test will execute the multithreaded operation + */ + @Test + public void testMultiThreadedOperation() { + // number of threads should be at least two to invoke multithreaded operation + int cores = Runtime.getRuntime().availableProcessors(); + int numThreads = Math.max(2, cores); + + compareDenseAndSparseRepresentation(numThreads); + } + + private void compareDenseAndSparseRepresentation(int numThreads) { try { IndexFunction op = new RollIndex(shift); MatrixBlock outputDense = inputDense.reorgOperations( - new ReorgOperator(op), new MatrixBlock(), 0, 0, 0); + new ReorgOperator(op, numThreads), new MatrixBlock(), 0, 0, 0); MatrixBlock outputSparse = inputSparse.reorgOperations( - new ReorgOperator(op), new MatrixBlock(), 0, 0, 0); + new ReorgOperator(op, numThreads), new MatrixBlock(), 0, 0, 0); outputSparse.sparseToDense(); // Compare the dense representations of both outputs diff --git a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/SparseMatrixRollOperationCorrectnessTest.java b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/SparseMatrixRollOperationCorrectnessTest.java new file mode 100644 index 00000000000..e72b29072c1 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/SparseMatrixRollOperationCorrectnessTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.matrix.libMatrixReorg; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.sysds.runtime.functionobjects.IndexFunction; +import org.apache.sysds.runtime.functionobjects.RollIndex; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.ReorgOperator; +import org.apache.sysds.test.TestUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class SparseMatrixRollOperationCorrectnessTest { + + private final double[][] input; + private final double[][] expected; + private final int shift; + + public SparseMatrixRollOperationCorrectnessTest(double[][] input, double[][] expected, int shift) { + this.input = input; + this.expected = expected; + this.shift = shift; + } + + @Parameterized.Parameters(name = "Shift={2}, Size={0}x{1} (Sparse)") + public static Collection data() { + return Arrays.asList(new Object[][] { + { + new double[][] {{1, 0, 0}, {0, 2, 0}, {0, 0, 3}}, + new double[][] {{0, 0, 3}, {1, 0, 0}, {0, 2, 0}}, + 1 + }, + { + new double[][] {{1, 0, 0}, {0, 2, 0}, {0, 0, 3}}, + new double[][] {{0, 2, 0}, {0, 0, 3}, {1, 0, 0}}, + -1 + }, + { + new double[][] {{0}, {10}, {0}, {20}, {0}}, + new double[][] {{20}, {0}, {0}, {10}, {0}}, + 2 + }, + { + new double[][] {{1, 2}, {0, 0}, {3, 4}, {0, 0}}, + new double[][] {{0, 0}, {1, 2}, {0, 0}, {3, 4}}, + 1 + }, + { + new double[][] {{0, 0, 0}, {0, 0, 0}, {0, 5, 0}, {0, 0, 0}}, + new double[][] {{0, 5, 0}, {0, 0, 0}, {0, 0, 0}, {0, 0, 0}}, + 2 + }, + { + new double[][] {{1, 0}, {0, 2}, {3, 0}}, + new double[][] {{3, 0}, {1, 0}, {0, 2}}, + 4 + }, + { + new double[][] {{0, 1}, {0, 0}, {2, 0}}, + new double[][] {{0, 0}, {2, 0}, {0, 1}}, + -1 + }, + { + new double[][] {{0, 0}, {0, 0}}, + new double[][] {{0, 0}, {0, 0}}, + 1 + }, + { + new double[][] {{1, 0, 1}, {0, 1, 0}, {1, 0, 1}}, + new double[][] {{1, 0, 1}, {1, 0, 1}, {0, 1, 0}}, + 1 + }, + { + new double[][] {{0, 5}, {0, 0}, {2, 0}}, + new double[][] {{0, 5}, {0, 0}, {2, 0}}, + 0 + }, + { + new double[][] {{0, 5}, {0, 0}, {2, 0}}, + new double[][] {{0, 5}, {0, 0}, {2, 0}}, + 3 + }, + { + new double[][] {{0, 5}, {0, 0}, {2, 0}}, + new double[][] {{0, 5}, {0, 0}, {2, 0}}, + -3 + }, + { + new double[][] {{0, 0, 1, 0}, {0, 2, 0, 0}}, + new double[][] {{0, 2, 0, 0}, {0, 0, 1, 0}}, + 1 + }, + { + new double[][] {{0, 0, 1, 0}, {0, 2, 0, 0}}, + new double[][] {{0, 2, 0, 0}, {0, 0, 1, 0}}, + -1 + }, + { + new double[][] {{1, 1}, {0, 0}, {2, 2}, {0, 0}}, + new double[][] {{0, 0}, {1, 1}, {0, 0}, {2, 2}}, + 1 + }, + { + new double[][] {{0, 0}, {0, 0}, {1, 2}, {3, 4}}, + new double[][] {{1, 2}, {3, 4}, {0, 0}, {0, 0}}, + 2 + }, + { + new double[][] {{1, 0}, {0, 0}, {0, 2}}, + new double[][] {{0, 2}, {1, 0}, {0, 0}}, + 10 + }, + { + new double[][] {{1, 0}, {0, 0}, {0, 2}}, + new double[][] {{0, 0}, {0, 2}, {1, 0}}, + -10 + }, + { + new double[][] {{5, 0, 0, 0}, {0, 0, 0, 0}, {0, 0, 0, 0}, {0, 0, 0, 0}}, + new double[][] {{0, 0, 0, 0}, {0, 0, 0, 0}, {0, 0, 0, 0}, {5, 0, 0, 0}}, + 3 + }, + { + new double[][] {{5, 0, 0, 0}, {0, 0, 0, 0}, {0, 0, 0, 0}, {0, 0, 0, 0}}, + new double[][] {{5, 0, 0, 0}, {0, 0, 0, 0}, {0, 0, 0, 0}, {0, 0, 0, 0}}, + 4 + }, + { + new double[][] {{0, 1}, {0, 2}, {0, 3}, {0, 4}, {0, 5}}, + new double[][] {{0, 3}, {0, 4}, {0, 5}, {0, 1}, {0, 2}}, + 3 + }, + { + new double[][] {{-1, 0}, {0, 0}, {0, 5}}, + new double[][] {{0, 5}, {-1, 0}, {0, 0}}, + 1 + } + }); + } + + @Test + public void testRollOperationProducesExpectedOutputSparse() { + MatrixBlock inBlock = new MatrixBlock(input.length, input[0].length, false); + inBlock.init(input, input.length, input[0].length); + + inBlock.denseToSparse(true); + + Assert.assertTrue("Input block must be in sparse format", inBlock.isInSparseFormat()); + + IndexFunction op = new RollIndex(shift); + ReorgOperator reorgOperator = new ReorgOperator(op); + MatrixBlock matrixBlock = new MatrixBlock(); + + MatrixBlock outBlock = inBlock.reorgOperations(reorgOperator, matrixBlock, 0, 0, 0); + + MatrixBlock expectedBlock = new MatrixBlock(expected.length, expected[0].length, false); + expectedBlock.init(expected, expected.length, expected[0].length); + + TestUtils.compareMatrices(outBlock, expectedBlock, 1e-12, + "Sparse Roll operation does not match expected output"); + } +} From 1d411755d7b1d8831ed04a78f8681f3add8b1568 Mon Sep 17 00:00:00 2001 From: Biranavan Parameswaran Date: Mon, 8 Dec 2025 21:26:23 +0100 Subject: [PATCH 2/4] [SYSTEMDS-3730] Add time measurement This commit adds logs to measure the speedup between the single threaded and multithreaded roll operation (on dense and sparse matrices). --- .../RollOperationThreadSafetyTest.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java index 56562deb308..719b970d413 100644 --- a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java +++ b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java @@ -28,6 +28,7 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.operators.ReorgOperator; import org.apache.sysds.test.TestUtils; +import org.apache.sysds.utils.stats.Timing; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -88,8 +89,17 @@ public static Collection data() { public void denseRollOperationSingleAndMultiThreadedShouldReturnSameResult() { int numThreads = getNumThreads(); + // Single-threaded timing + Timing tSingle = new Timing(true); MatrixBlock outSingle = rollOperation(inputDense, 1); + double timeSingle = tSingle.stop(); + + // Multithreaded timing + Timing tMulti = new Timing(true); MatrixBlock outMulti = rollOperation(inputDense, numThreads); + double timeMulti = tMulti.stop(); + + logTiming("Dense", numThreads, timeSingle, timeMulti); TestUtils.compareMatrices(outSingle, outMulti, 1e-12, "Dense Mismatch (numThreads=1 vs numThreads>1) for Size=" + rows + "x" + cols + " Shift=" + shift); @@ -99,8 +109,17 @@ public void denseRollOperationSingleAndMultiThreadedShouldReturnSameResult() { public void sparseRollOperationSingleAndMultiThreadedShouldReturnSameResult() { int numThreads = getNumThreads(); + // Single-threaded timing + Timing tSingle = new Timing(true); MatrixBlock outSingle = rollOperation(inputSparse, 1); + double timeSingle = tSingle.stop(); + + // Multithreaded timing + Timing tMulti = new Timing(true); MatrixBlock outMulti = rollOperation(inputSparse, numThreads); + double timeMulti = tMulti.stop(); + + logTiming("Sparse", numThreads, timeSingle, timeMulti); TestUtils.compareMatrices(outSingle, outMulti, 1e-12, "Sparse Mismatch (numThreads=1 vs numThreads>1) for Size=" + rows + "x" + cols + " Shift=" + shift); @@ -120,4 +139,14 @@ private static int getNumThreads() { int cores = Runtime.getRuntime().availableProcessors(); return Math.max(2, cores); } + + private void logTiming(String type, int numThreads, double timeSingle, double timeMulti) { + double speedup = timeSingle / timeMulti; + + System.out.println("\n--- " + type + " Roll Operation Timing for " + rows + "x" + cols + ", Shift=" + shift + " ---"); + System.out.printf("Single-threaded (1 core) took: %.3f ms\n", timeSingle); + System.out.printf("Multithreaded (%d cores) took: %.3f ms\n", numThreads, timeMulti); + System.out.printf("Speedup: %.2f\n", speedup); + System.out.println("--------------------------------------------------------------------------------"); + } } From c69ba3c4ca5b5501a5bd0a834ee9f70755c3fee4 Mon Sep 17 00:00:00 2001 From: Biranavan Parameswaran Date: Sat, 13 Dec 2025 15:59:11 +0100 Subject: [PATCH 3/4] [SYSTEMDS-3730] Add MatrixRollPerf.java Introduced a new test class to properly measure the performance of single-threaded and multithreaded rolling and remove unnecessary if condition --- .../runtime/matrix/data/LibMatrixReorg.java | 2 +- .../performance/matrix/MatrixRollPerf.java | 108 ++++++++++++++++++ .../RollOperationThreadSafetyTest.java | 27 ----- 3 files changed, 109 insertions(+), 28 deletions(-) create mode 100644 src/test/java/org/apache/sysds/performance/matrix/MatrixRollPerf.java diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java index 98b0177cb82..4ead0013ca7 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java @@ -532,7 +532,7 @@ public static MatrixBlock roll(MatrixBlock input, MatrixBlock output, int shift, return output; } - if(numThreads <= 1 || input.isEmptyBlock(false) || input.getLength() < PAR_NUMCELL_THRESHOLD) { + if(numThreads <= 1 || input.getLength() < PAR_NUMCELL_THRESHOLD) { return roll(input, output, shift); // fallback to single-threaded } diff --git a/src/test/java/org/apache/sysds/performance/matrix/MatrixRollPerf.java b/src/test/java/org/apache/sysds/performance/matrix/MatrixRollPerf.java new file mode 100644 index 00000000000..97c06dbf149 --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/matrix/MatrixRollPerf.java @@ -0,0 +1,108 @@ +package org.apache.sysds.performance.matrix; + +import org.apache.sysds.performance.compression.APerfTest; +import org.apache.sysds.performance.generators.ConstMatrix; +import org.apache.sysds.performance.generators.IGenerate; +import org.apache.sysds.runtime.functionobjects.IndexFunction; +import org.apache.sysds.runtime.functionobjects.RollIndex; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.ReorgOperator; +import org.apache.sysds.test.TestUtils; +import org.apache.sysds.utils.stats.InfrastructureAnalyzer; + +import java.util.Random; + +public class MatrixRollPerf extends APerfTest { + + private final int rows; + private final int cols; + private final int shift; + private final int k; + + private final ReorgOperator reorg; + private MatrixBlock out; + + public MatrixRollPerf(int N, int W, IGenerate gen, int rows, int cols, int shift, int k) { + super(N, W, gen); + this.rows = rows; + this.cols = cols; + this.shift = shift; + this.k = k; + + IndexFunction op = new RollIndex(shift); + this.reorg = new ReorgOperator(op, k); + } + + public void run() throws Exception { + MatrixBlock mb = gen.take(); + logInfos(rows, cols, shift, mb.getSparsity(), k); + + + String info = String.format("rows: %5d cols: %5d sp: %.4f shift: %4d k: %2d", + rows, cols, mb.getSparsity(), shift, k); + + + warmup(this::rollOnce, W); + + execute(this::rollOnce, info); + } + + private void logInfos(int rows, int cols, int shift, double sparsity, int k) { + String matrixType = sparsity == 1 ? "Dense" : "Sparse"; + if (k == 1) { + System.out.println("---------------------------------------------------------------------------------------------------------"); + System.out.printf("%s Experiment for rows %d columns %d and shift %d \n", matrixType, rows, cols, shift); + System.out.println("---------------------------------------------------------------------------------------------------------"); + } + } + + private void rollOnce() { + MatrixBlock in = gen.take(); + + if (out == null) + out = new MatrixBlock(rows, cols, in.isInSparseFormat()); + + out.reset(rows, cols, in.isInSparseFormat()); + + in.reorgOperations(reorg, out, 0, 0, 0); + + ret.add(null); + } + + @Override + protected String makeResString() { + return ""; + } + + public static void main(String[] args) throws Exception { + int kMulti = InfrastructureAnalyzer.getLocalParallelism(); + int reps = 2000; + int warmup = 200; + + int minRows = 2017; + int minCols = 1001; + double spSparse = 0.01; + int minShift = -50; + int maxShift = 1022; + int iterations = 10; + + Random rand = new Random(42); + + for (int i = 0; i < iterations; i++) { + int rows = minRows + rand.nextInt(500); + int cols = minCols + rand.nextInt(500); + int shift = rand.nextInt((maxShift - minShift) + 1) + minShift; + + MatrixBlock denseIn = TestUtils.generateTestMatrixBlock(rows, cols, -100, 100, 1.0, 42); + MatrixBlock sparseIn = TestUtils.generateTestMatrixBlock(rows, cols, -100, 100, spSparse, 42); + + // Run Dense Case (Single vs Multi-threaded) + new MatrixRollPerf(reps, warmup, new ConstMatrix(denseIn, -1), rows, cols, shift, 1).run(); + new MatrixRollPerf(reps, warmup, new ConstMatrix(denseIn, -1), rows, cols, shift, kMulti).run(); + + // Run Sparse Case (Single vs Multi-threaded) + new MatrixRollPerf(reps, warmup, new ConstMatrix(sparseIn, -1), rows, cols, shift, 1).run(); + new MatrixRollPerf(reps, warmup, new ConstMatrix(sparseIn, -1), rows, cols, shift, kMulti).run(); + } + } +} diff --git a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java index 719b970d413..b6b5053ca1c 100644 --- a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java +++ b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java @@ -28,7 +28,6 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.operators.ReorgOperator; import org.apache.sysds.test.TestUtils; -import org.apache.sysds.utils.stats.Timing; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -89,17 +88,9 @@ public static Collection data() { public void denseRollOperationSingleAndMultiThreadedShouldReturnSameResult() { int numThreads = getNumThreads(); - // Single-threaded timing - Timing tSingle = new Timing(true); MatrixBlock outSingle = rollOperation(inputDense, 1); - double timeSingle = tSingle.stop(); - // Multithreaded timing - Timing tMulti = new Timing(true); MatrixBlock outMulti = rollOperation(inputDense, numThreads); - double timeMulti = tMulti.stop(); - - logTiming("Dense", numThreads, timeSingle, timeMulti); TestUtils.compareMatrices(outSingle, outMulti, 1e-12, "Dense Mismatch (numThreads=1 vs numThreads>1) for Size=" + rows + "x" + cols + " Shift=" + shift); @@ -109,17 +100,9 @@ public void denseRollOperationSingleAndMultiThreadedShouldReturnSameResult() { public void sparseRollOperationSingleAndMultiThreadedShouldReturnSameResult() { int numThreads = getNumThreads(); - // Single-threaded timing - Timing tSingle = new Timing(true); MatrixBlock outSingle = rollOperation(inputSparse, 1); - double timeSingle = tSingle.stop(); - // Multithreaded timing - Timing tMulti = new Timing(true); MatrixBlock outMulti = rollOperation(inputSparse, numThreads); - double timeMulti = tMulti.stop(); - - logTiming("Sparse", numThreads, timeSingle, timeMulti); TestUtils.compareMatrices(outSingle, outMulti, 1e-12, "Sparse Mismatch (numThreads=1 vs numThreads>1) for Size=" + rows + "x" + cols + " Shift=" + shift); @@ -139,14 +122,4 @@ private static int getNumThreads() { int cores = Runtime.getRuntime().availableProcessors(); return Math.max(2, cores); } - - private void logTiming(String type, int numThreads, double timeSingle, double timeMulti) { - double speedup = timeSingle / timeMulti; - - System.out.println("\n--- " + type + " Roll Operation Timing for " + rows + "x" + cols + ", Shift=" + shift + " ---"); - System.out.printf("Single-threaded (1 core) took: %.3f ms\n", timeSingle); - System.out.printf("Multithreaded (%d cores) took: %.3f ms\n", numThreads, timeMulti); - System.out.printf("Speedup: %.2f\n", speedup); - System.out.println("--------------------------------------------------------------------------------"); - } } From 2978d006becdfffbb4e8c752fedaa7f6f13e1459 Mon Sep 17 00:00:00 2001 From: Biranavan Parameswaran Date: Thu, 18 Dec 2025 01:01:55 +0100 Subject: [PATCH 4/4] [SYSTEMDS-3730] Add missing license --- .../performance/matrix/MatrixRollPerf.java | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/src/test/java/org/apache/sysds/performance/matrix/MatrixRollPerf.java b/src/test/java/org/apache/sysds/performance/matrix/MatrixRollPerf.java index 97c06dbf149..05f291b0ab0 100644 --- a/src/test/java/org/apache/sysds/performance/matrix/MatrixRollPerf.java +++ b/src/test/java/org/apache/sysds/performance/matrix/MatrixRollPerf.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.sysds.performance.matrix; import org.apache.sysds.performance.compression.APerfTest; @@ -89,8 +108,8 @@ public static void main(String[] args) throws Exception { Random rand = new Random(42); for (int i = 0; i < iterations; i++) { - int rows = minRows + rand.nextInt(500); - int cols = minCols + rand.nextInt(500); + int rows = 10_000_000; + int cols = 10; int shift = rand.nextInt((maxShift - minShift) + 1) + minShift; MatrixBlock denseIn = TestUtils.generateTestMatrixBlock(rows, cols, -100, 100, 1.0, 42);