diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 60b952b285e13..1883042f1c18a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -441,9 +441,24 @@ object UnsupportedOperationChecker extends Logging { } case j @ Join(left, right, joinType, condition, _) => - if (left.isStreaming && right.isStreaming && outputMode != InternalOutputModes.Append) { - throwError("Join between two streaming DataFrames/Datasets is not supported" + - s" in ${outputMode} output mode, only in Append output mode") + if (left.isStreaming && right.isStreaming) { + joinType match { + // The behavior for unmatched rows in outer joins with update mode + // hasn't been defined yet. + case LeftOuter | RightOuter | FullOuter => + if (outputMode != InternalOutputModes.Append) { + throwError(s"$joinType join between two streaming DataFrames/Datasets" + + s" is not supported in ${outputMode} output mode, only in Append output mode") + } + case _: InnerLike | LeftSemi => + if (outputMode != InternalOutputModes.Append && + outputMode != InternalOutputModes.Update) { + throwError(s"$joinType join between two streaming DataFrames/Datasets" + + s" is not supported in ${outputMode} output mode, only in Append and Update " + + "output modes") + } + case _ => // we will throw an error in the next pattern match + } } joinType match { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 425df0856a58a..dc429c87346db 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -370,9 +370,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { testBinaryOperationInStreamingPlan( "inner join in update mode", _.join(_, joinType = Inner), - outputMode = Update, - streamStreamSupported = false, - expectedMsg = "is not supported in Update output mode") + outputMode = Update) // Full outer joins: stream-batch/batch-stream join are not allowed, // and stream-stream join is allowed 'conditionally' - see below check @@ -403,16 +401,25 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { streamStreamSupported = false, expectedMsg = "RightOuter join") - // Left outer, right outer, full outer, left semi joins - Seq(LeftOuter, RightOuter, FullOuter, LeftSemi).foreach { joinType => - // Update mode not allowed + // Left outer, right outer, full outer joins: Update mode not allowed + Seq(LeftOuter, RightOuter, FullOuter).foreach { joinType => assertNotSupportedInStreamingPlan( s"$joinType join with stream-stream relations and update mode", streamRelation.join(streamRelation, joinType = joinType, condition = Some(attribute === attribute)), OutputMode.Update(), Seq("is not supported in Update output mode")) + } + // LeftSemi join: Update mode allowed (equivalent to Append mode for non-outer joins) + assertSupportedInStreamingPlan( + s"LeftSemi join with stream-stream relations and update mode", + streamRelation.join(streamRelation, joinType = LeftSemi, + condition = Some(attributeWithWatermark === attribute)), + OutputMode.Update()) + + // Left outer, right outer, full outer, left semi joins + Seq(LeftOuter, RightOuter, FullOuter, LeftSemi).foreach { joinType => // Complete mode not allowed assertNotSupportedInStreamingPlan( s"$joinType join with stream-stream relations and complete mode", @@ -671,6 +678,21 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { outputMode = Append) } + assertPassOnGlobalWatermarkLimit( + "streaming aggregation after stream-stream inner join in Update mode", + streamRelation.join(streamRelation, joinType = Inner, + condition = Some(attributeWithWatermark === attribute)) + .groupBy("a")(count("*")), + outputMode = Update) + + assertFailOnGlobalWatermarkLimit( + "streaming aggregation on both sides followed by stream-stream inner join in Update mode", + streamRelation.groupBy("a")(count("*")).join( + streamRelation.groupBy("a")(count("*")), + joinType = Inner, + condition = Some(attributeWithWatermark === attribute)), + outputMode = Update) + // Cogroup: only batch-batch is allowed testBinaryOperationInStreamingPlan( "cogroup", @@ -851,6 +873,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null, Deduplicate(Seq(attribute), streamRelation)), outputMode = Append) + + Seq(Append, Update).foreach { outputMode => + assertPassOnGlobalWatermarkLimit( + s"stream-stream inner join with deduplicate on both sides " + + s"(with event-time) in ${outputMode} mode", + Deduplicate(Seq(attributeWithWatermark), streamRelation).join( + Deduplicate(Seq(attributeWithWatermark), streamRelation), + joinType = Inner, + condition = Some(attributeWithWatermark === attribute)), + outputMode = outputMode) + + assertPassOnGlobalWatermarkLimit( + s"stream-stream inner join with deduplicate on both sides " + + s"(without event-time) in ${outputMode} mode", + Deduplicate(Seq(attribute), streamRelation).join( + Deduplicate(Seq(attribute), streamRelation), + joinType = Inner, + condition = Some(attributeWithWatermark === attribute)), + outputMode = outputMode) + } } /* diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala index 1c50e6802c323..4346f1096a15f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.StreamingJoinHelper import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, GenericInternalRow, JoinedRow, Literal, Predicate, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetric @@ -35,6 +36,7 @@ import org.apache.spark.sql.execution.streaming.operators.stateful.join.Streamin import org.apache.spark.sql.execution.streaming.operators.stateful.join.SymmetricHashJoinStateManager.KeyToValuePair import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.internal.{SessionState, SQLConf} +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} @@ -142,7 +144,9 @@ case class StreamingSymmetricHashJoinExec( stateWatermarkPredicates: JoinStateWatermarkPredicates, stateFormatVersion: Int, left: SparkPlan, - right: SparkPlan) extends BinaryExecNode with StateStoreWriter with SchemaValidationUtils { + right: SparkPlan, + outputMode: Option[OutputMode]) + extends BinaryExecNode with StateStoreWriter with SchemaValidationUtils { def this( leftKeys: Seq[Expression], @@ -157,7 +161,8 @@ case class StreamingSymmetricHashJoinExec( leftKeys, rightKeys, joinType, JoinConditionSplitPredicates(condition, left, right), stateInfo = None, eventTimeWatermarkForLateEvents = None, eventTimeWatermarkForEviction = None, - stateWatermarkPredicates = JoinStateWatermarkPredicates(), stateFormatVersion, left, right) + stateWatermarkPredicates = JoinStateWatermarkPredicates(), stateFormatVersion, left, right, + None) } if (stateFormatVersion < 2 && joinType != Inner) { @@ -184,6 +189,13 @@ case class StreamingSymmetricHashJoinExec( joinType == LeftSemi, errorMessageForJoinType) + outputMode.foreach { mode => + if (mode == InternalOutputModes.Update) { + require(joinType == Inner || joinType == LeftSemi, + s"Update output mode is not supported for stream-stream $joinType join") + } + } + // The assertion against join keys is same as hash join for batch query. require(leftKeys.length == rightKeys.length && leftKeys.map(_.dataType) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala index 169ab6f606dae..8165117a028d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala @@ -411,7 +411,8 @@ class IncrementalExecution( j.copy( stateInfo = Some(nextStatefulOperationStateInfo()), eventTimeWatermarkForLateEvents = None, - eventTimeWatermarkForEviction = None + eventTimeWatermarkForEviction = None, + outputMode = Some(outputMode) ) case l: StreamingGlobalLimitExec => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala index 21bf370f82a5f..cd901deae8e14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala @@ -46,6 +46,13 @@ class MultiStatefulOperatorsSuite StateStore.stop() } + private def testWithAppendAndUpdate(testName: String)( + testBody: OutputMode => Any): Unit = { + Seq(OutputMode.Append(), OutputMode.Update()).foreach { outputMode => + test(s"$testName - $outputMode")(testBody(outputMode)) + } + } + test("window agg -> window agg, append mode") { val inputData = MemoryStream[Int] @@ -934,6 +941,68 @@ class MultiStatefulOperatorsSuite ) } + testWithAppendAndUpdate("dedup on both sides -> stream-stream inner join") { outputMode => + val input1 = MemoryStream[Int] + val inputDF1 = input1.toDF() + .select($"value".as("value1"), timestamp_seconds($"value").as("eventTime1")) + .withWatermark("eventTime1", "10 seconds") + .dropDuplicates("value1", "eventTime1") + + val input2 = MemoryStream[Int] + val inputDF2 = input2.toDF() + .select($"value".as("value2"), timestamp_seconds($"value").as("eventTime2")) + .withWatermark("eventTime2", "10 seconds") + .dropDuplicates("value2", "eventTime2") + + val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), "inner") + .select($"value1", $"value2") + + testStream(stream, outputMode)( + MultiAddData(input1, 1, 2, 3, 1)(input2, 1, 2, 3, 2), + CheckNewAnswer((1, 1), (2, 2), (3, 3)), + + MultiAddData(input1, 1, 2, 4)(input2, 2, 3, 4), + CheckNewAnswer((4, 4)) + ) + } + + test("stream-stream inner join -> window agg, update mode") { + val input1 = MemoryStream[Int] + val inputDF1 = input1.toDF() + .select($"value".as("value1"), timestamp_seconds($"value").as("eventTime1")) + .withWatermark("eventTime1", "0 seconds") + + val input2 = MemoryStream[Int] + val inputDF2 = input2.toDF() + .select($"value".as("value2"), timestamp_seconds($"value").as("eventTime2")) + .withWatermark("eventTime2", "0 seconds") + + val stream = inputDF1.join(inputDF2, expr("eventTime1 = eventTime2"), "inner") + .groupBy(window($"eventTime1", "5 seconds").as("window")) + .agg(count("*").as("count")) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + testStream(stream, OutputMode.Update())( + MultiAddData(input1, 1, 2)(input2, 1, 2), + // join output: (1, 1), (2, 2) + // agg: [0, 5) count = 2 + CheckNewAnswer((0, 2)), + + // Add more data to the same window [0, 5) + MultiAddData(input1, 3, 4)(input2, 3, 4), + // join output: (3, 3), (4, 4) + // agg: [0, 5) count = 2 + 2 = 4 + // Update mode re-emits the window with updated count + CheckNewAnswer((0, 4)), + + MultiAddData(input1, 5 to 8: _*)(input2, 5 to 8: _*), + // join output: (5, 5), (6, 6), (7, 7), (8, 8) + // agg: [5, 10) count = 4 + // Only the new/updated window is emitted + CheckNewAnswer((5, 4)) + ) + } + private def assertNumStateRows(numTotalRows: Seq[Long]): AssertOnQuery = AssertOnQuery { q => q.processAllAvailable() val progressWithData = q.recentProgress.lastOption.get diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala index 7dc228feaff81..d6cbcff7430cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala @@ -30,7 +30,7 @@ import org.scalatest.{BeforeAndAfter, Tag} import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.scheduler.ExecutorCacheTaskLocation -import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions @@ -117,6 +117,13 @@ abstract class StreamingJoinSuite } } + protected def testWithAppendAndUpdate(testName: String, testTags: Tag*)( + testBody: OutputMode => Any): Unit = { + Seq(OutputMode.Append(), OutputMode.Update()).foreach { outputMode => + test(s"$testName - $outputMode", testTags: _*)(testBody(outputMode)) + } + } + import testImplicits._ before { @@ -323,7 +330,7 @@ abstract class StreamingJoinSuite abstract class StreamingInnerJoinBase extends StreamingJoinSuite { import testImplicits._ - test("stream stream inner join on non-time column") { + testWithAppendAndUpdate("stream stream inner join on non-time column") { outputMode => val input1 = MemoryStream[Int] val input2 = MemoryStream[Int] @@ -331,7 +338,7 @@ abstract class StreamingInnerJoinBase extends StreamingJoinSuite { val df2 = input2.toDF().select($"value" as "key", ($"value" * 3) as "rightValue") val joined = df1.join(df2, "key") - testStream(joined)( + testStream(joined, outputMode)( AddData(input1, 1), CheckAnswer(), AddData(input2, 1, 10), // 1 arrived on input1 first, then input2, should join @@ -352,7 +359,7 @@ abstract class StreamingInnerJoinBase extends StreamingJoinSuite { ) } - test("stream stream inner join on windows - without watermark") { + testWithAppendAndUpdate("stream stream inner join on windows - without watermark") { outputMode => val input1 = MemoryStream[Int] val input2 = MemoryStream[Int] @@ -369,7 +376,7 @@ abstract class StreamingInnerJoinBase extends StreamingJoinSuite { val joined = df1.join(df2, Seq("key", "window")) .select($"key", $"window.end".cast("long"), $"leftValue", $"rightValue") - testStream(joined)( + testStream(joined, outputMode)( AddData(input1, 1), CheckNewAnswer(), AddData(input2, 1), @@ -393,7 +400,8 @@ abstract class StreamingInnerJoinBase extends StreamingJoinSuite { ) } - test("stream stream inner join with time range - with watermark - one side condition") { + testWithAppendAndUpdate("stream stream inner join with time range - with watermark" + + " - one side condition") { outputMode => import org.apache.spark.sql.functions._ val leftInput = MemoryStream[(Int, Int)] @@ -413,7 +421,7 @@ abstract class StreamingInnerJoinBase extends StreamingJoinSuite { df1.join(df2, expr("leftKey = rightKey AND leftTime < rightTime - interval 5 seconds")) .select($"leftKey", $"leftTime".cast("int"), $"rightTime".cast("int")) - testStream(joined)( + testStream(joined, outputMode)( AddData(leftInput, (1, 5)), CheckAnswer(), AddData(rightInput, (1, 11)), @@ -453,7 +461,8 @@ abstract class StreamingInnerJoinBase extends StreamingJoinSuite { ) } - test("stream stream inner join with time range - with watermark - two side conditions") { + testWithAppendAndUpdate("stream stream inner join with time range - with watermark" + + " - two side conditions") { outputMode => import org.apache.spark.sql.functions._ val leftInput = MemoryStream[(Int, Int)] @@ -498,7 +507,7 @@ abstract class StreamingInnerJoinBase extends StreamingJoinSuite { df1.join(df2, condition).select($"leftKey", $"leftTime".cast("int"), $"rightTime".cast("int")) - testStream(joined)( + testStream(joined, outputMode)( // If leftTime = 20, then it match only with rightTime = [15, 30] AddData(leftInput, (1, 20)), CheckAnswer(), @@ -556,14 +565,14 @@ abstract class StreamingInnerJoinBase extends StreamingJoinSuite { assert(e.toString.contains("Stream-stream join without equality predicate is not supported")) } - test("stream stream self join") { + testWithAppendAndUpdate("stream stream self join") { outputMode => val input = MemoryStream[Int] val df = input.toDF() val join = df.select($"value" % 5 as "key", $"value").join( df.select($"value" % 5 as "key", $"value"), "key") - testStream(join)( + testStream(join, outputMode)( AddData(input, 1, 2), CheckAnswer((1, 1, 1), (2, 2, 2)), StopStream, @@ -669,7 +678,7 @@ abstract class StreamingInnerJoinBase extends StreamingJoinSuite { assert(query.lastExecution.executedPlan.collect { case j @ StreamingSymmetricHashJoinExec(_, _, _, _, _, _, _, _, _, ShuffleExchangeExec(opA: HashPartitioning, _, _, _), - ShuffleExchangeExec(opB: HashPartitioning, _, _, _)) + ShuffleExchangeExec(opB: HashPartitioning, _, _, _), _) if partitionExpressionsColumns(opA.expressions) === Seq("a", "b") && partitionExpressionsColumns(opB.expressions) === Seq("a", "b") && opA.numPartitions == numPartitions && opB.numPartitions == numPartitions => j @@ -875,36 +884,39 @@ abstract class StreamingInnerJoinBase extends StreamingJoinSuite { ) } - test("joining non-nullable left join key with nullable right join key") { + testWithAppendAndUpdate("joining non-nullable left join key with nullable right join key") { + outputMode => val input1 = MemoryStream[Int] val input2 = MemoryStream[JInteger] val joined = testForJoinKeyNullability(input1.toDF(), input2.toDF()) - testStream(joined)( + testStream(joined, outputMode)( AddData(input1, 1, 5), AddData(input2, JInteger.valueOf(1), JInteger.valueOf(5), JInteger.valueOf(10), null), CheckNewAnswer(Row(1, 1, 2, 3), Row(5, 5, 10, 15)) ) } - test("joining nullable left join key with non-nullable right join key") { + testWithAppendAndUpdate("joining nullable left join key with non-nullable right join key") { + outputMode => val input1 = MemoryStream[JInteger] val input2 = MemoryStream[Int] val joined = testForJoinKeyNullability(input1.toDF(), input2.toDF()) - testStream(joined)( + testStream(joined, outputMode)( AddData(input1, JInteger.valueOf(1), JInteger.valueOf(5), JInteger.valueOf(10), null), AddData(input2, 1, 5), CheckNewAnswer(Row(1, 1, 2, 3), Row(5, 5, 10, 15)) ) } - test("joining nullable left join key with nullable right join key") { + testWithAppendAndUpdate("joining nullable left join key with nullable right join key") { + outputMode => val input1 = MemoryStream[JInteger] val input2 = MemoryStream[JInteger] val joined = testForJoinKeyNullability(input1.toDF(), input2.toDF()) - testStream(joined)( + testStream(joined, outputMode)( AddData(input1, JInteger.valueOf(1), JInteger.valueOf(5), JInteger.valueOf(10), null), AddData(input2, JInteger.valueOf(1), JInteger.valueOf(5), null), CheckNewAnswer( @@ -1048,7 +1060,7 @@ abstract class StreamingInnerJoinBase extends StreamingJoinSuite { abstract class StreamingInnerJoinSuite extends StreamingInnerJoinBase { import testImplicits._ - test("stream stream inner join on windows - with watermark") { + testWithAppendAndUpdate("stream stream inner join on windows - with watermark") { outputMode => val input1 = MemoryStream[Int] val input2 = MemoryStream[Int] @@ -1066,7 +1078,7 @@ abstract class StreamingInnerJoinSuite extends StreamingInnerJoinBase { val joined = df1.join(df2, Seq("key", "window")) .select($"key", $"window.end".cast("long"), $"leftValue", $"rightValue") - testStream(joined)( + testStream(joined, outputMode)( AddData(input1, 1), CheckAnswer(), assertNumStateRows(total = 1, updated = 1), @@ -1242,6 +1254,25 @@ abstract class StreamingOuterJoinBase extends StreamingJoinSuite { import testImplicits._ import org.apache.spark.sql.functions._ + Seq("left_outer", "right_outer").foreach { joinType => + test(s"stream-stream $joinType join does not support Update mode") { + val input1 = MemoryStream[Int] + val input2 = MemoryStream[Int] + + val df1 = input1.toDF().select($"value" as "key", ($"value" * 2) as "leftValue") + val df2 = input2.toDF().select($"value" as "key", ($"value" * 3) as "rightValue") + val joined = df1.join(df2, Seq("key"), joinType) + + val e = intercept[AnalysisException] { + testStream(joined, OutputMode.Update())( + AddData(input1, 1), + CheckAnswer() + ) + } + assert(e.getMessage.contains("is not supported in Update output mode")) + } + } + test("left outer early state exclusion on left") { withTempDir { checkpointDir => val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_outer") @@ -1954,6 +1985,25 @@ abstract class StreamingOuterJoinSuite extends StreamingOuterJoinBase { @SlowSQLTest abstract class StreamingFullOuterJoinBase extends StreamingJoinSuite { + import testImplicits._ + + test("stream-stream full outer join does not support Update mode") { + val input1 = MemoryStream[Int] + val input2 = MemoryStream[Int] + + val df1 = input1.toDF().select($"value" as "key", ($"value" * 2) as "leftValue") + val df2 = input2.toDF().select($"value" as "key", ($"value" * 3) as "rightValue") + val joined = df1.join(df2, Seq("key"), "full_outer") + + val e = intercept[AnalysisException] { + testStream(joined, OutputMode.Update())( + AddData(input1, 1), + CheckAnswer() + ) + } + assert(e.getMessage.contains("is not supported in Update output mode")) + } + test("windowed full outer join") { withTempDir { checkpointDir => val (leftInput, rightInput, joined) = setupWindowedJoin("full_outer") @@ -2176,11 +2226,11 @@ abstract class StreamingLeftSemiJoinBase extends StreamingJoinSuite { import testImplicits._ - test("windowed left semi join") { + testWithAppendAndUpdate("windowed left semi join") { outputMode => withTempDir { checkpointDir => val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi") - testStream(joined)( + testStream(joined, outputMode)( StartStream(checkpointLocation = checkpointDir.getCanonicalPath), MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7), CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)), @@ -2242,10 +2292,10 @@ abstract class StreamingLeftSemiJoinBase extends StreamingJoinSuite { } } - test("left semi early state exclusion on left") { + testWithAppendAndUpdate("left semi early state exclusion on left") { outputMode => val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_semi") - testStream(joined)( + testStream(joined, outputMode)( MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5), // The left rows with leftValue <= 4 should not generate their semi join rows and // not get added to the state. @@ -2279,10 +2329,10 @@ abstract class StreamingLeftSemiJoinBase extends StreamingJoinSuite { ) } - test("left semi early state exclusion on right") { + testWithAppendAndUpdate("left semi early state exclusion on right") { outputMode => val (leftInput, rightInput, joined) = setupWindowedJoinWithRightCondition("left_semi") - testStream(joined)( + testStream(joined, outputMode)( MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3), // The right rows with rightValue <= 7 should never be added to the state. // The right row with rightValue = 9 > 7, hence joined and added to state. @@ -2317,10 +2367,10 @@ abstract class StreamingLeftSemiJoinBase extends StreamingJoinSuite { ) } - test("left semi join with watermark range condition") { + testWithAppendAndUpdate("left semi join with watermark range condition") { outputMode => val (leftInput, rightInput, joined) = setupJoinWithRangeCondition("left_semi") - testStream(joined)( + testStream(joined, outputMode)( AddData(leftInput, (1, 5), (3, 5)), CheckNewAnswer(), // states @@ -2373,10 +2423,10 @@ abstract class StreamingLeftSemiJoinBase extends StreamingJoinSuite { ) } - test("self left semi join") { + testWithAppendAndUpdate("self left semi join") { outputMode => val (inputStream, query) = setupSelfJoin("left_semi") - testStream(query)( + testStream(query, outputMode)( AddData(inputStream, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)), CheckNewAnswer((2, 2), (4, 4)), // batch 1 - global watermark = 0