diff --git a/src/main/java/org/apache/sysds/hops/ReorgOp.java b/src/main/java/org/apache/sysds/hops/ReorgOp.java index 5fc73e2bd3f..f43d1cc2baf 100644 --- a/src/main/java/org/apache/sysds/hops/ReorgOp.java +++ b/src/main/java/org/apache/sysds/hops/ReorgOp.java @@ -173,7 +173,9 @@ _op, getDataType(), getValueType(), et, for (int i = 0; i < 2; i++) linputs[i] = getInput().get(i).constructLops(); - Transform transform1 = new Transform(linputs, _op, getDataType(), getValueType(), et, 1); + Transform transform1 = new Transform( + linputs, _op, getDataType(), getValueType(), et, + OptimizerUtils.getConstrainedNumThreads(_maxNumThreads)); setOutputDimensions(transform1); setLineNumbers(transform1); diff --git a/src/main/java/org/apache/sysds/lops/Transform.java b/src/main/java/org/apache/sysds/lops/Transform.java index 0d2e79f83a8..d9537dcca6c 100644 --- a/src/main/java/org/apache/sysds/lops/Transform.java +++ b/src/main/java/org/apache/sysds/lops/Transform.java @@ -180,7 +180,7 @@ private String getInstructions(String input1, int numInputs, String output) { sb.append( this.prepOutputOperand(output)); if( (getExecType()==ExecType.CP || getExecType()==ExecType.FED || getExecType()==ExecType.OOC) - && (_operation == ReOrgOp.TRANS || _operation == ReOrgOp.REV || _operation == ReOrgOp.SORT) ) { + && (_operation == ReOrgOp.TRANS || _operation == ReOrgOp.REV || _operation == ReOrgOp.SORT || _operation == ReOrgOp.ROLL) ) { sb.append( OPERAND_DELIMITOR ); sb.append( _numThreads ); if ( getExecType()==ExecType.FED ) { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java index a1788c0e251..be77f4eb4eb 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/ReorgCPInstruction.java @@ -118,11 +118,13 @@ else if ( opcode.equalsIgnoreCase(Opcodes.REV.toString()) ) { return new ReorgCPInstruction(new ReorgOperator(RevIndex.getRevIndexFnObject(), k), in, out, opcode, str); } else if (opcode.equalsIgnoreCase(Opcodes.ROLL.toString())) { - InstructionUtils.checkNumFields(str, 3); + InstructionUtils.checkNumFields(str, 3, 4); in.split(parts[1]); out.split(parts[3]); CPOperand shift = new CPOperand(parts[2]); - return new ReorgCPInstruction(new ReorgOperator(new RollIndex(0)), in, out, shift, opcode, str); + int k = (parts.length > 4) ? Integer.parseInt(parts[4]) : 1; + + return new ReorgCPInstruction(new ReorgOperator(new RollIndex(0), k), in, out, shift, opcode, str); } else if ( opcode.equalsIgnoreCase(Opcodes.DIAG.toString()) ) { parseUnaryInstruction(str, in, out); //max 2 operands diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java index 90ea445be8d..4ead0013ca7 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixReorg.java @@ -134,6 +134,8 @@ public static MatrixBlock reorg( MatrixBlock in, MatrixBlock out, ReorgOperator return rev(in, out); case ROLL: RollIndex rix = (RollIndex) op.fn; + if(op.getNumThreads() > 1) + return roll(in, out, rix.getShift(), op.getNumThreads()); return roll(in, out, rix.getShift()); case DIAG: return diag(in, out); @@ -514,6 +516,119 @@ public static MatrixBlock roll(MatrixBlock in, MatrixBlock out, int shift) { return out; } + public static MatrixBlock roll(MatrixBlock input, MatrixBlock output, int shift, int numThreads) { + + final int numRows = input.rlen; + final int numCols = input.clen; + final boolean isSparse = input.sparse; + + // sparse-safe operation + if(input.isEmptyBlock(false)) + return output; + + // special case: row vector + if(numRows == 1) { + output.copy(input); + return output; + } + + if(numThreads <= 1 || input.getLength() < PAR_NUMCELL_THRESHOLD) { + return roll(input, output, shift); // fallback to single-threaded + } + + final int normalizedShift = getNormalizedShiftForRoll(shift, numRows); + + output.reset(numRows, numCols, isSparse); + output.nonZeros = input.nonZeros; + + if(isSparse) { + output.allocateSparseRowsBlock(false); + } + else { + output.allocateDenseBlock(false); + } + + ExecutorService threadPool = CommonThreadPool.get(numThreads); + try { + final int rowsPerThread = (int) Math.ceil((double) numRows / numThreads); + List> tasks = new ArrayList<>(); + + for(int threadIndex = 0; threadIndex < numThreads; threadIndex++) { + + final int startRow = threadIndex * rowsPerThread; + final int endRow = Math.min((threadIndex + 1) * rowsPerThread, numRows); + + tasks.add(threadPool.submit(() -> { + if(isSparse) + rollSparseBlock(input, output, normalizedShift, startRow, endRow); + else + rollDenseBlock(input, output, normalizedShift, startRow, endRow); + })); + } + + for(Future task : tasks) + task.get(); + + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + finally { + threadPool.shutdown(); + } + + return output; + } + + private static int getNormalizedShiftForRoll(int shift, int numRows) { + shift = shift % numRows; + if(shift < 0) + shift += numRows; + + return shift; + } + + private static void rollDenseBlock(MatrixBlock input, MatrixBlock output, int shift, int startRow, int endRow) { + + DenseBlock inputBlock = input.getDenseBlock(); + DenseBlock outputBlock = output.getDenseBlock(); + final int numRows = input.rlen; + final int numCols = input.clen; + + for(int targetRow = startRow; targetRow < endRow; targetRow++) { + int sourceRow = targetRow - shift; + if(sourceRow < 0) + sourceRow += numRows; + + System.arraycopy(inputBlock.values(sourceRow), inputBlock.pos(sourceRow), outputBlock.values(targetRow), + outputBlock.pos(targetRow), numCols); + } + } + + private static void rollSparseBlock(MatrixBlock input, MatrixBlock output, int shift, int startRow, int endRow) { + + SparseBlock inputBlock = input.getSparseBlock(); + SparseBlock outputBlock = output.getSparseBlock(); + final int numRows = input.rlen; + + for(int targetRow = startRow; targetRow < endRow; targetRow++) { + int sourceRow = targetRow - shift; + if(sourceRow < 0) + sourceRow += numRows; + + if(!inputBlock.isEmpty(sourceRow)) { + int rowStart = inputBlock.pos(sourceRow); + int rowEnd = rowStart + inputBlock.size(sourceRow); + int[] colIndexes = inputBlock.indexes(sourceRow); + double[] values = inputBlock.values(sourceRow); + + for(int k = rowStart; k < rowEnd; k++) { + outputBlock.set(targetRow, colIndexes[k], values[k]); + } + } + } + } + public static void roll(IndexedMatrixValue in, long rlen, int blen, int shift, ArrayList out) { MatrixIndexes inMtxIdx = in.getIndexes(); MatrixBlock inMtxBlk = (MatrixBlock) in.getValue(); @@ -2554,7 +2669,7 @@ private static void reverseSparse(MatrixBlock in, MatrixBlock out, int rl, int r private static void rollDense(MatrixBlock in, MatrixBlock out, int shift) { final int m = in.rlen; - shift %= (m != 0 ? m : 1); // roll matrix with axis=none + shift = getNormalizedShiftForRoll(shift, m); // roll matrix with axis=none copyDenseMtx(in, out, 0, shift, m - shift, false, true); copyDenseMtx(in, out, m - shift, 0, shift, true, true); @@ -2562,7 +2677,7 @@ private static void rollDense(MatrixBlock in, MatrixBlock out, int shift) { private static void rollSparse(MatrixBlock in, MatrixBlock out, int shift) { final int m = in.rlen; - shift %= (m != 0 ? m : 1); // roll matrix with axis=0 + shift = getNormalizedShiftForRoll(shift, m); // roll matrix with axis=0 copySparseMtx(in, out, 0, shift, m - shift, false, true); copySparseMtx(in, out, m-shift, 0, shift, false, true); diff --git a/src/test/java/org/apache/sysds/performance/matrix/MatrixRollPerf.java b/src/test/java/org/apache/sysds/performance/matrix/MatrixRollPerf.java new file mode 100644 index 00000000000..05f291b0ab0 --- /dev/null +++ b/src/test/java/org/apache/sysds/performance/matrix/MatrixRollPerf.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.performance.matrix; + +import org.apache.sysds.performance.compression.APerfTest; +import org.apache.sysds.performance.generators.ConstMatrix; +import org.apache.sysds.performance.generators.IGenerate; +import org.apache.sysds.runtime.functionobjects.IndexFunction; +import org.apache.sysds.runtime.functionobjects.RollIndex; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.ReorgOperator; +import org.apache.sysds.test.TestUtils; +import org.apache.sysds.utils.stats.InfrastructureAnalyzer; + +import java.util.Random; + +public class MatrixRollPerf extends APerfTest { + + private final int rows; + private final int cols; + private final int shift; + private final int k; + + private final ReorgOperator reorg; + private MatrixBlock out; + + public MatrixRollPerf(int N, int W, IGenerate gen, int rows, int cols, int shift, int k) { + super(N, W, gen); + this.rows = rows; + this.cols = cols; + this.shift = shift; + this.k = k; + + IndexFunction op = new RollIndex(shift); + this.reorg = new ReorgOperator(op, k); + } + + public void run() throws Exception { + MatrixBlock mb = gen.take(); + logInfos(rows, cols, shift, mb.getSparsity(), k); + + + String info = String.format("rows: %5d cols: %5d sp: %.4f shift: %4d k: %2d", + rows, cols, mb.getSparsity(), shift, k); + + + warmup(this::rollOnce, W); + + execute(this::rollOnce, info); + } + + private void logInfos(int rows, int cols, int shift, double sparsity, int k) { + String matrixType = sparsity == 1 ? "Dense" : "Sparse"; + if (k == 1) { + System.out.println("---------------------------------------------------------------------------------------------------------"); + System.out.printf("%s Experiment for rows %d columns %d and shift %d \n", matrixType, rows, cols, shift); + System.out.println("---------------------------------------------------------------------------------------------------------"); + } + } + + private void rollOnce() { + MatrixBlock in = gen.take(); + + if (out == null) + out = new MatrixBlock(rows, cols, in.isInSparseFormat()); + + out.reset(rows, cols, in.isInSparseFormat()); + + in.reorgOperations(reorg, out, 0, 0, 0); + + ret.add(null); + } + + @Override + protected String makeResString() { + return ""; + } + + public static void main(String[] args) throws Exception { + int kMulti = InfrastructureAnalyzer.getLocalParallelism(); + int reps = 2000; + int warmup = 200; + + int minRows = 2017; + int minCols = 1001; + double spSparse = 0.01; + int minShift = -50; + int maxShift = 1022; + int iterations = 10; + + Random rand = new Random(42); + + for (int i = 0; i < iterations; i++) { + int rows = 10_000_000; + int cols = 10; + int shift = rand.nextInt((maxShift - minShift) + 1) + minShift; + + MatrixBlock denseIn = TestUtils.generateTestMatrixBlock(rows, cols, -100, 100, 1.0, 42); + MatrixBlock sparseIn = TestUtils.generateTestMatrixBlock(rows, cols, -100, 100, spSparse, 42); + + // Run Dense Case (Single vs Multi-threaded) + new MatrixRollPerf(reps, warmup, new ConstMatrix(denseIn, -1), rows, cols, shift, 1).run(); + new MatrixRollPerf(reps, warmup, new ConstMatrix(denseIn, -1), rows, cols, shift, kMulti).run(); + + // Run Sparse Case (Single vs Multi-threaded) + new MatrixRollPerf(reps, warmup, new ConstMatrix(sparseIn, -1), rows, cols, shift, 1).run(); + new MatrixRollPerf(reps, warmup, new ConstMatrix(sparseIn, -1), rows, cols, shift, kMulti).run(); + } + } +} diff --git a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/DenseMatrixRollOperationCorrectnessTest.java b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/DenseMatrixRollOperationCorrectnessTest.java new file mode 100644 index 00000000000..157e411cf68 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/DenseMatrixRollOperationCorrectnessTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.matrix.libMatrixReorg; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.sysds.runtime.functionobjects.IndexFunction; +import org.apache.sysds.runtime.functionobjects.RollIndex; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.ReorgOperator; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class DenseMatrixRollOperationCorrectnessTest { + + private final double[][] input; + private final double[][] expected; + private final int shift; + + public DenseMatrixRollOperationCorrectnessTest(double[][] input, double[][] expected, int shift) { + this.input = input; + this.expected = expected; + this.shift = shift; + } + + @Parameterized.Parameters(name = "Shift={2}, Size={0}x{1}") + public static Collection data() { + return Arrays.asList(new Object[][] { + { + new double[][] {{1, 2, 3, 4, 5}}, + new double[][] {{1, 2, 3, 4, 5}}, + 0 + }, + { + new double[][] {{1, 2, 3, 4, 5}}, + new double[][] {{1, 2, 3, 4, 5}}, + 1 + }, + { + new double[][] {{1, 2, 3, 4, 5}}, + new double[][] {{1, 2, 3, 4, 5}}, + -3 + }, + { + new double[][] {{1, 2, 3, 4, 5}}, + new double[][] {{1, 2, 3, 4, 5}}, + 999 + }, + { + new double[][] {{1}, {2}, {3}, {4}, {5}}, + new double[][] {{4}, {5}, {1}, {2}, {3}}, + 2 + }, + { + new double[][] {{1}, {2}, {3}, {4}, {5}}, + new double[][] {{2}, {3}, {4}, {5}, {1}}, + -1 + }, + { + new double[][] {{1}, {2}, {3}, {4}, {5}}, + new double[][] {{1}, {2}, {3}, {4}, {5}}, + 5 + }, + { + new double[][] {{1, 2, 3}, {4, 5, 6}}, + new double[][] {{4, 5, 6}, {1, 2, 3}}, + 1 + }, + { + new double[][] {{1, 2, 3}, {4, 5, 6}}, + new double[][] {{4, 5, 6}, {1, 2, 3}}, + 7 + }, + { + new double[][] {{1, 2, 3}, {4, 5, 6}}, + new double[][] {{1, 2, 3}, {4, 5, 6}}, + 2 + }, + { + new double[][] {{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}, + new double[][] {{7, 8, 9}, {1, 2, 3}, {4, 5, 6}}, + 1 + }, + { + new double[][] {{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}, + new double[][] {{4, 5, 6}, {7, 8, 9}, {1, 2, 3}}, + -1 + }, + { + new double[][] {{9, 8, 7}, {6, 5, 4}, {3, 2, 1}}, + new double[][] {{3, 2, 1}, {9, 8, 7}, {6, 5, 4}}, + 1 + }, + { + new double[][] {{1, 2, 3, 4}, {5, 6, 7, 8}, {9, 10, 11, 12}}, + new double[][] {{9, 10, 11, 12}, {1, 2, 3, 4}, {5, 6, 7, 8}}, + 1 + }, + { + new double[][] {{1, 2, 3, 4}, {5, 6, 7, 8}, {9, 10, 11, 12}}, + new double[][] {{5, 6, 7, 8}, {9, 10, 11, 12}, {1, 2, 3, 4}}, + -1 + }, + { + new double[][] {{1, 2, 3, 4, 5}, {6, 7, 8, 9, 10}, {11, 12, 13, 14, 15}, {16, 17, 18, 19, 20}, {21, 22, 23, 24, 25}}, + new double[][] {{21, 22, 23, 24, 25}, {1, 2, 3, 4, 5}, {6, 7, 8, 9, 10}, {11, 12, 13, 14, 15}, {16, 17, 18, 19, 20}}, + 1 + }, + { + new double[][] {{1, 2, 3, 4, 5}, {6, 7, 8, 9, 10}, {11, 12, 13, 14, 15}, {16, 17, 18, 19, 20}, {21, 22, 23, 24, 25}}, + new double[][] {{11, 12, 13, 14, 15}, {16, 17, 18, 19, 20}, {21, 22, 23, 24, 25}, {1, 2, 3, 4, 5}, {6, 7, 8, 9, 10}}, + -2 + }, + { + new double[][] {{1, 2, 3}, {4, 5, 6}, {7, 8, 9}, {10, 11, 12}, {13, 14, 15}, {16, 17, 18}, {19, 20, 21}, + {22, 23, 24}, {25, 26, 27}, {28, 29, 30}}, + new double[][] {{22, 23, 24}, {25, 26, 27}, {28, 29, 30}, {1, 2, 3}, {4, 5, 6}, {7, 8, 9}, {10, 11, 12}, + {13, 14, 15}, {16, 17, 18}, {19, 20, 21}}, + 3 + }, + { + new double[][] {{1, 2}, {3, 4}, {5, 6}, {7, 8}}, + new double[][] {{5, 6}, {7, 8}, {1, 2}, {3, 4}}, + 1002 + }, + { + new double[][] {{1}, {2}, {3}, {4}, {5}}, + new double[][] {{3}, {4}, {5}, {1}, {2}}, + -12 + }, + { + new double[][] {{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}, + new double[][] {{4, 5, 6}, {7, 8, 9}, {1, 2, 3}}, + -10 + }, + { + new double[][] {{1, 2}, {3, 4}, {5, 6}, {7, 8}}, + new double[][] {{1, 2}, {3, 4}, {5, 6}, {7, 8}}, + -4 + }, + { + new double[][] {{1, 2}, {3, 4}, {5, 6}, {7, 8}}, + new double[][] {{3, 4}, {5, 6}, {7, 8}, {1, 2}}, + -5 + } + }); + } + + @Test + public void testRollOperationProducesExpectedOutput() { + MatrixBlock inBlock = new MatrixBlock(input.length, input[0].length, false); + inBlock.init(input, input.length, input[0].length); + + IndexFunction op = new RollIndex(shift); + MatrixBlock outBlock = inBlock.reorgOperations(new ReorgOperator(op), new MatrixBlock(), 0, 0, 5); + + MatrixBlock expectedBlock = new MatrixBlock(expected.length, expected[0].length, false); + expectedBlock.init(expected, expected.length, expected[0].length); + + TestUtils.compareMatrices(outBlock, expectedBlock, 1e-12, "Dense Roll operation does not match expected output"); + } +} diff --git a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java new file mode 100644 index 00000000000..b6b5053ca1c --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollOperationThreadSafetyTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.matrix.libMatrixReorg; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Random; + +import org.apache.sysds.runtime.functionobjects.IndexFunction; +import org.apache.sysds.runtime.functionobjects.RollIndex; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.ReorgOperator; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class RollOperationThreadSafetyTest { + + private static final int MIN_ROWS = 2017; + private static final int MIN_COLS = 1001; + private static final int MIN_SHIFT = -50; + private static final int MAX_SHIFT = 1022; + private static final int NUM_TESTS = 100; + private static final double TEST_SPARSITY = 0.01; + private final int rows; + private final int cols; + private final int shift; + private final MatrixBlock inputDense; + private final MatrixBlock inputSparse; + + public RollOperationThreadSafetyTest(int rows, int cols, int shift) { + this.rows = rows; + this.cols = cols; + this.shift = shift; + + MatrixBlock tempInput = TestUtils.generateTestMatrixBlock(rows, cols, -100, 100, TEST_SPARSITY, 42); + + this.inputSparse = tempInput; + + this.inputDense = new MatrixBlock(rows, cols, false); + this.inputDense.copy(tempInput, false); + this.inputDense.recomputeNonZeros(); + } + + /** + * Defines the parameters for the test cases (Random Rows, Random Cols, Random Shift). + * + * @return Collection of test parameters. + */ + @Parameters(name = "Case {index}: Size={0}x{1}, Shift={2}") + public static Collection data() { + ArrayList tests = new ArrayList<>(); + Random rand = new Random(42); + + for(int i = 0; i < NUM_TESTS; i++) { + // Generate random dimensions (adding random buffer to the minimums) + int r = MIN_ROWS + rand.nextInt(500); + int c = MIN_COLS + rand.nextInt(500); + + int s = rand.nextInt((MAX_SHIFT - MIN_SHIFT) + 1) + MIN_SHIFT; + + tests.add(new Object[] {r, c, s}); + } + return tests; + } + + @Test + public void denseRollOperationSingleAndMultiThreadedShouldReturnSameResult() { + int numThreads = getNumThreads(); + + MatrixBlock outSingle = rollOperation(inputDense, 1); + + MatrixBlock outMulti = rollOperation(inputDense, numThreads); + + TestUtils.compareMatrices(outSingle, outMulti, 1e-12, + "Dense Mismatch (numThreads=1 vs numThreads>1) for Size=" + rows + "x" + cols + " Shift=" + shift); + } + + @Test + public void sparseRollOperationSingleAndMultiThreadedShouldReturnSameResult() { + int numThreads = getNumThreads(); + + MatrixBlock outSingle = rollOperation(inputSparse, 1); + + MatrixBlock outMulti = rollOperation(inputSparse, numThreads); + + TestUtils.compareMatrices(outSingle, outMulti, 1e-12, + "Sparse Mismatch (numThreads=1 vs numThreads>1) for Size=" + rows + "x" + cols + " Shift=" + shift); + } + + private MatrixBlock rollOperation(MatrixBlock inBlock, int numThreads) { + IndexFunction op = new RollIndex(shift); + ReorgOperator reorgOperator = new ReorgOperator(op, numThreads); + + MatrixBlock outBlock = new MatrixBlock(rows, cols, inBlock.isInSparseFormat()); + + return inBlock.reorgOperations(reorgOperator, outBlock, 0, 0, 0); + } + + private static int getNumThreads() { + // number of threads should be at least two to invoke multithreaded operation + int cores = Runtime.getRuntime().availableProcessors(); + return Math.max(2, cores); + } +} diff --git a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollTest.java b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollTest.java index d2ad83597bc..dc37990c331 100644 --- a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollTest.java +++ b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/RollTest.java @@ -100,15 +100,36 @@ public static Collection data() { /** * The actual test method that performs the roll operation on both * sparse and dense matrices and compares the results. + * This test will execute the single threaded operation */ @Test - public void test() { + public void testSingleThreadedOperation() { + int numThreads = 1; + compareDenseAndSparseRepresentation(numThreads); + } + + + /** + * The actual test method that performs the roll operation on both + * sparse and dense matrices and compares the results. + * This test will execute the multithreaded operation + */ + @Test + public void testMultiThreadedOperation() { + // number of threads should be at least two to invoke multithreaded operation + int cores = Runtime.getRuntime().availableProcessors(); + int numThreads = Math.max(2, cores); + + compareDenseAndSparseRepresentation(numThreads); + } + + private void compareDenseAndSparseRepresentation(int numThreads) { try { IndexFunction op = new RollIndex(shift); MatrixBlock outputDense = inputDense.reorgOperations( - new ReorgOperator(op), new MatrixBlock(), 0, 0, 0); + new ReorgOperator(op, numThreads), new MatrixBlock(), 0, 0, 0); MatrixBlock outputSparse = inputSparse.reorgOperations( - new ReorgOperator(op), new MatrixBlock(), 0, 0, 0); + new ReorgOperator(op, numThreads), new MatrixBlock(), 0, 0, 0); outputSparse.sparseToDense(); // Compare the dense representations of both outputs diff --git a/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/SparseMatrixRollOperationCorrectnessTest.java b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/SparseMatrixRollOperationCorrectnessTest.java new file mode 100644 index 00000000000..e72b29072c1 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/matrix/libMatrixReorg/SparseMatrixRollOperationCorrectnessTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.matrix.libMatrixReorg; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.sysds.runtime.functionobjects.IndexFunction; +import org.apache.sysds.runtime.functionobjects.RollIndex; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.ReorgOperator; +import org.apache.sysds.test.TestUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class SparseMatrixRollOperationCorrectnessTest { + + private final double[][] input; + private final double[][] expected; + private final int shift; + + public SparseMatrixRollOperationCorrectnessTest(double[][] input, double[][] expected, int shift) { + this.input = input; + this.expected = expected; + this.shift = shift; + } + + @Parameterized.Parameters(name = "Shift={2}, Size={0}x{1} (Sparse)") + public static Collection data() { + return Arrays.asList(new Object[][] { + { + new double[][] {{1, 0, 0}, {0, 2, 0}, {0, 0, 3}}, + new double[][] {{0, 0, 3}, {1, 0, 0}, {0, 2, 0}}, + 1 + }, + { + new double[][] {{1, 0, 0}, {0, 2, 0}, {0, 0, 3}}, + new double[][] {{0, 2, 0}, {0, 0, 3}, {1, 0, 0}}, + -1 + }, + { + new double[][] {{0}, {10}, {0}, {20}, {0}}, + new double[][] {{20}, {0}, {0}, {10}, {0}}, + 2 + }, + { + new double[][] {{1, 2}, {0, 0}, {3, 4}, {0, 0}}, + new double[][] {{0, 0}, {1, 2}, {0, 0}, {3, 4}}, + 1 + }, + { + new double[][] {{0, 0, 0}, {0, 0, 0}, {0, 5, 0}, {0, 0, 0}}, + new double[][] {{0, 5, 0}, {0, 0, 0}, {0, 0, 0}, {0, 0, 0}}, + 2 + }, + { + new double[][] {{1, 0}, {0, 2}, {3, 0}}, + new double[][] {{3, 0}, {1, 0}, {0, 2}}, + 4 + }, + { + new double[][] {{0, 1}, {0, 0}, {2, 0}}, + new double[][] {{0, 0}, {2, 0}, {0, 1}}, + -1 + }, + { + new double[][] {{0, 0}, {0, 0}}, + new double[][] {{0, 0}, {0, 0}}, + 1 + }, + { + new double[][] {{1, 0, 1}, {0, 1, 0}, {1, 0, 1}}, + new double[][] {{1, 0, 1}, {1, 0, 1}, {0, 1, 0}}, + 1 + }, + { + new double[][] {{0, 5}, {0, 0}, {2, 0}}, + new double[][] {{0, 5}, {0, 0}, {2, 0}}, + 0 + }, + { + new double[][] {{0, 5}, {0, 0}, {2, 0}}, + new double[][] {{0, 5}, {0, 0}, {2, 0}}, + 3 + }, + { + new double[][] {{0, 5}, {0, 0}, {2, 0}}, + new double[][] {{0, 5}, {0, 0}, {2, 0}}, + -3 + }, + { + new double[][] {{0, 0, 1, 0}, {0, 2, 0, 0}}, + new double[][] {{0, 2, 0, 0}, {0, 0, 1, 0}}, + 1 + }, + { + new double[][] {{0, 0, 1, 0}, {0, 2, 0, 0}}, + new double[][] {{0, 2, 0, 0}, {0, 0, 1, 0}}, + -1 + }, + { + new double[][] {{1, 1}, {0, 0}, {2, 2}, {0, 0}}, + new double[][] {{0, 0}, {1, 1}, {0, 0}, {2, 2}}, + 1 + }, + { + new double[][] {{0, 0}, {0, 0}, {1, 2}, {3, 4}}, + new double[][] {{1, 2}, {3, 4}, {0, 0}, {0, 0}}, + 2 + }, + { + new double[][] {{1, 0}, {0, 0}, {0, 2}}, + new double[][] {{0, 2}, {1, 0}, {0, 0}}, + 10 + }, + { + new double[][] {{1, 0}, {0, 0}, {0, 2}}, + new double[][] {{0, 0}, {0, 2}, {1, 0}}, + -10 + }, + { + new double[][] {{5, 0, 0, 0}, {0, 0, 0, 0}, {0, 0, 0, 0}, {0, 0, 0, 0}}, + new double[][] {{0, 0, 0, 0}, {0, 0, 0, 0}, {0, 0, 0, 0}, {5, 0, 0, 0}}, + 3 + }, + { + new double[][] {{5, 0, 0, 0}, {0, 0, 0, 0}, {0, 0, 0, 0}, {0, 0, 0, 0}}, + new double[][] {{5, 0, 0, 0}, {0, 0, 0, 0}, {0, 0, 0, 0}, {0, 0, 0, 0}}, + 4 + }, + { + new double[][] {{0, 1}, {0, 2}, {0, 3}, {0, 4}, {0, 5}}, + new double[][] {{0, 3}, {0, 4}, {0, 5}, {0, 1}, {0, 2}}, + 3 + }, + { + new double[][] {{-1, 0}, {0, 0}, {0, 5}}, + new double[][] {{0, 5}, {-1, 0}, {0, 0}}, + 1 + } + }); + } + + @Test + public void testRollOperationProducesExpectedOutputSparse() { + MatrixBlock inBlock = new MatrixBlock(input.length, input[0].length, false); + inBlock.init(input, input.length, input[0].length); + + inBlock.denseToSparse(true); + + Assert.assertTrue("Input block must be in sparse format", inBlock.isInSparseFormat()); + + IndexFunction op = new RollIndex(shift); + ReorgOperator reorgOperator = new ReorgOperator(op); + MatrixBlock matrixBlock = new MatrixBlock(); + + MatrixBlock outBlock = inBlock.reorgOperations(reorgOperator, matrixBlock, 0, 0, 0); + + MatrixBlock expectedBlock = new MatrixBlock(expected.length, expected[0].length, false); + expectedBlock.init(expected, expected.length, expected[0].length); + + TestUtils.compareMatrices(outBlock, expectedBlock, 1e-12, + "Sparse Roll operation does not match expected output"); + } +}