From 745cfd2eedce63ab1d545898d2993e92552f3335 Mon Sep 17 00:00:00 2001 From: muraliktalluri Date: Thu, 16 Apr 2026 15:41:39 -0500 Subject: [PATCH] Add gaming sessionization RTM vs MBM demo assets --- .../.DS_Store | Bin 0 -> 6148 bytes .../README.md | 501 +++++++++++ .../RTM-Sessionization/RTM-TWS-Lakebase.scala | 829 ++++++++++++++++++ .../RTM-Sessionization/RTM-TWS.scala | 452 ++++++++++ .../Write_RTM_session_results_to_delta.py | 65 ++ .../debug.sql | 58 ++ .../Kafka-console-sessions-stream-ingest.py | 66 ++ .../Kafka-pc-sessions-stream-ingest.py | 66 ++ .../create-delete-topic-scala.scala | 64 ++ .../ingest-source-data/create-delete-topic.py | 80 ++ .../generate-fake-session-data.py | 306 +++++++ 11 files changed, 2487 insertions(+) create mode 100644 2026-04-rtm-transformWithState-GamingSessionization/.DS_Store create mode 100644 2026-04-rtm-transformWithState-GamingSessionization/README.md create mode 100644 2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/RTM-TWS-Lakebase.scala create mode 100644 2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/RTM-TWS.scala create mode 100644 2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/Write_RTM_session_results_to_delta.py create mode 100644 2026-04-rtm-transformWithState-GamingSessionization/debug.sql create mode 100644 2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/Kafka-console-sessions-stream-ingest.py create mode 100644 2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/Kafka-pc-sessions-stream-ingest.py create mode 100644 2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/create-delete-topic-scala.scala create mode 100644 2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/create-delete-topic.py create mode 100644 2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/generate-fake-session-data.py diff --git a/2026-04-rtm-transformWithState-GamingSessionization/.DS_Store b/2026-04-rtm-transformWithState-GamingSessionization/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..89110af764f133ed1020d8ee95809a0c515f5101 GIT binary patch literal 6148 zcmeHKy-LJj5S-O34ivo7@_PXb-yj@kafJwa4sB0VCdK z{0AKN!}!t + // Check if the incoming record is for session start + if (row.eventId == "ApplicationSessionStart") { + // Generate sessionStart record, and emit that + val outputRecord = generateSessionStart(key, row, timerValues) + outputRows.append(outputRecord) + } // checking if the incoming record is for session end + else if (row.eventId == "ApplicationSessionEndBi") { + val outputRecord = generateSessionEnd(key, row, timerValues) + outputRows.append(outputRecord) + } + } + outputRows.iterator + } + + def generateSessionStart(key: String, inputRow: InputRow, timerValues: TimerValues): OutputRow = { + + // Get the processing time of the current micro-batch, remains same for every record in the same micro-batch + val currentProcessingTimeMillis = timerValues.getCurrentProcessingTimeInMs() + val currentProcessingTime = new Timestamp(currentProcessingTimeMillis) + + // create a new timer for next 30 sec, current processing time + 30 sec + val timerMillis = currentProcessingTimeMillis + TIMER_THRESHOLD_IN_MS + val timerTime = new Timestamp(timerMillis) + + // Generate sessionStart record + val outputRecord = OutputRow( + key, + inputRow.appSessionId, + inputRow.psnAccountId, + "SessionStart", + inputRow.session_timestamp, + 0, + inputRow.kafka_timestamp, + currentProcessingTime, + timerTime, + DEBUG_INFO_1 + ) + + // Create a key-value entry into the map state + val mapKey = inputRow.appSessionId + val mapValue = MeasuredSessionValue( + inputRow.psnAccountId, + "SessionStart", + inputRow.session_timestamp, + 0, + inputRow.kafka_timestamp, + currentProcessingTime, + timerTime + ) + sessionStatusState.updateValue(mapKey, mapValue) + + // Set the timer for current batch processingTime + 30 seconds + getHandle.registerTimer(timerMillis) + + outputRecord + } + + def generateSessionEnd(key: String, inputRow: InputRow, timerValues: TimerValues): OutputRow = { + // Get the processing time of the current micro-batch, remains same for every record in the same micro-batch + val currentProcessingTimeMillis = timerValues.getCurrentProcessingTimeInMs() + val currentProcessingTime = new Timestamp(currentProcessingTimeMillis) + + // Generate a sessionEnd record, and emit that + var outputRecord = OutputRow( + key, + inputRow.appSessionId, + inputRow.psnAccountId, + "SessionEnd", + inputRow.session_timestamp, + inputRow.totalFgTime, + inputRow.kafka_timestamp, + currentProcessingTime, + null, + DEBUG_INFO_3 + ) + + // remove the session from state + sessionStatusState.removeKey(inputRow.appSessionId) + + // Delete all the timers for the deviceId + val timerIter = getHandle.listTimers() + for (timer <- timerIter) getHandle.deleteTimer(timer) + + outputRecord + } + + // Define the logic for handling expired timers + override def handleExpiredTimer( + key: String, + timerValues: TimerValues, + expiredTimerInfo: ExpiredTimerInfo): Iterator[OutputRow] = { + + // Get the processing time of the current micro-batch, remains same for every record in the same micro-batch + val currentProcessingTimeMillis = timerValues.getCurrentProcessingTimeInMs() + val currentProcessingTime = new Timestamp(currentProcessingTimeMillis) + + // Get the expired Timer info + val expiredTimerMillis = expiredTimerInfo.getExpiryTimeInMs() + val expiredTimerTime = new Timestamp(expiredTimerMillis) + + // create a new timer for next 30 sec, current timer expiry time + 30 sec + val nextTimerMillis = expiredTimerMillis + TIMER_THRESHOLD_IN_MS + val nextTimerTime = new Timestamp(nextTimerMillis) + + var outputRows = ListBuffer[OutputRow]() + + // Expecting an entry for the expired timer for a given sessionId. + if (sessionStatusState.exists()) { + + // pull the existing sessionId, generate sessionHeartbeat record, and emit that + val sessionIdKeyIter = sessionStatusState.keys() + + for (sessionId <- sessionIdKeyIter) { + var sessionValue = sessionStatusState.getValue(sessionId) + + // Caluate the sessionDuration for heartbeat records, existing duration + 30 seconds + val currentSessionDuration: Long = sessionValue.sessionDuration + (currentProcessingTime.getTime - sessionValue.processing_timestamp.getTime)/1000 + val sessionEvent = "SessionHeartbeat" + + // Generate sessionHeartbeat record, and emit that + val outputRecord = OutputRow( + key, + sessionId, + sessionValue.psnAccountId, + sessionEvent, + sessionValue.session_timestamp, + currentSessionDuration, + sessionValue.upstream_timestamp, + currentProcessingTime, + nextTimerTime, + "Timer expired at: " + expiredTimerTime + ) + outputRows.append(outputRecord) + + // Update the existing key-value entry into the map state + val updatedSessionValue = sessionValue.copy( + sessionStatus = sessionEvent, + sessionDuration = currentSessionDuration, + processing_timestamp = currentProcessingTime, + timer_info = nextTimerTime + ) + sessionStatusState.updateValue(sessionId, updatedSessionValue) + + // create a new timer for next 30 sec, current processing time + 30 sec + getHandle.registerTimer(nextTimerMillis) + } + } + outputRows.iterator + } + +} + +// COMMAND ---------- + +val processed_stream_df = input_stream_df.as[InputRow] + .groupByKey(_.deviceId) + .transformWithState( + new Sessionization(), + TimeMode.ProcessingTime, + OutputMode.Update() + ) + .toDF() + .select( + col("deviceId"), + col("appSessionId"), + col("psnAccountId"), + col("sessionStatus"), + col("session_timestamp"), + col("sessionDuration"), + col("upstream_timestamp"), + col("processing_timestamp"), + col("timer_info"), + col("debug_info") + ) + +// COMMAND ---------- + +import java.sql.{Connection, DriverManager, PreparedStatement, Types} +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.{ForeachWriter, Row} +import org.apache.spark.sql.types._ +import org.slf4j.LoggerFactory + +// --------------------------------------------------------------------------- +// Configuration +// --------------------------------------------------------------------------- + +/** + * @param username Database username + * @param password Database password + * @param table Target table name (e.g., "public.my_table") + * @param host Lakebase host address + * @param mode Write mode: "insert", "upsert", or "bulk-insert" + * @param primaryKeys Primary key columns (required for upsert mode) + * @param batchSize Number of rows per batch (default: 1000) + * @param batchIntervalMs Max time between flushes in milliseconds (default: 100) + */ +case class LakebaseConfig( + username: String, + password: String, + table: String, + host: String, + mode: String = "insert", + primaryKeys: Seq[String] = Seq.empty, + batchSize: Int = 5000, + batchIntervalMs: Int = 100, + queueSize: Int = 50000 +) extends Serializable + +// --------------------------------------------------------------------------- +// ForeachWriter +// --------------------------------------------------------------------------- + +class LakebaseForeachWriter(config: LakebaseConfig, schema: StructType) + extends ForeachWriter[Row] with Serializable { + + @transient private lazy val logger = LoggerFactory.getLogger(getClass) + + private val columns: Array[String] = schema.fieldNames + private val columnTypes: Array[DataType] = schema.fields.map(_.dataType) + private val sql: Option[String] = buildSql() + + // Validate schema at construction time + locally { + val unsupported = findUnsupportedFields(schema) + if (unsupported.nonEmpty) { + throw new IllegalArgumentException( + s"Unsupported field types: ${unsupported.mkString(", ")}. " + + "Convert complex types to supported formats first." + ) + } + + val jdbcUrl = s"jdbc:postgresql://${config.host}:5432/databricks_postgres?sslmode=require" + val checkConn = DriverManager.getConnection(jdbcUrl, config.username, config.password) + try { + val columnDefs = schema.fields.map { field => + val pgType = sparkTypeToPgType(field.dataType) + val nullable = if (field.nullable) "" else " NOT NULL" + s"${field.name} $pgType$nullable" + }.mkString(", ") + + val pkClause = if (config.primaryKeys.nonEmpty) { + s", PRIMARY KEY (${config.primaryKeys.mkString(", ")})" + } else "" + + val createSql = + s"CREATE TABLE IF NOT EXISTS ${config.table} ($columnDefs$pkClause)" + + val stmt = checkConn.createStatement() + try { + stmt.execute(createSql) + } finally { + stmt.close() + } + logger.info(s"Ensured table ${config.table} exists") + } finally { + checkConn.close() + } + } + + // Runtime state (initialized in open(), not serialized) + @transient private var conn: Connection = _ + @transient private var queue: LinkedBlockingQueue[Array[Any]] = _ + @transient private var stopEvent: AtomicBoolean = _ + @transient private var workerThread: Thread = _ + @transient private var batch: ArrayBuffer[Array[Any]] = _ + @transient private var lastFlush: Long = _ + @transient private var workerError: String = _ + @transient private var partitionId: Long = _ + @transient private var epochId: Long = _ + + // ------------------------------------------------------------------------- + // ForeachWriter Interface + // ------------------------------------------------------------------------- + + override def open(partitionId: Long, epochId: Long): Boolean = { + try { + this.partitionId = partitionId + this.epochId = epochId + + val jdbcUrl = + s"jdbc:postgresql://${config.host}:5432/databricks_postgres?sslmode=require" + conn = DriverManager.getConnection(jdbcUrl, config.username, config.password) + conn.setAutoCommit(false) + + queue = new LinkedBlockingQueue[Array[Any]](config.queueSize) + stopEvent = new AtomicBoolean(false) + batch = ArrayBuffer.empty[Array[Any]] + lastFlush = System.currentTimeMillis() + workerError = null + + workerThread = new Thread(() => worker()) + workerThread.setDaemon(true) + workerThread.start() + + logger.info(s"[$partitionId|$epochId] Opening writer for table ${config.table}") + true + } catch { + case e: Exception => + logger.error(s"Failed to open writer: ${e.getMessage}", e) + false + } + } + + override def process(row: Row): Unit = { + if (workerError != null) { + throw new RuntimeException(s"Worker failed: $workerError") + } + + val rowData = new Array[Any](columns.length) + for (i <- columns.indices) { + rowData(i) = + if (row.isNullAt(i)) null + else convertValue(row.get(i), columnTypes(i)) + } + queue.put(rowData) + } + + override def close(error: Throwable): Unit = { + try { + if (stopEvent != null) stopEvent.set(true) + if (workerThread != null && workerThread.isAlive) { + val maxWaitMs = if (config.mode.toLowerCase == "upsert") 30000 else 5000 + val start = System.currentTimeMillis() + while (workerThread.isAlive && (System.currentTimeMillis() - start) < maxWaitMs) { + workerThread.join(1000) + } + val waited = System.currentTimeMillis() - start + if (workerThread.isAlive) { + logger.warn(s"[$partitionId|$epochId] Worker still alive after ${waited}ms (max: ${maxWaitMs}ms)") + } else { + logger.info(s"[$partitionId|$epochId] Worker finished after ${waited}ms") + } + } + if (queue != null && queue.size() > config.batchSize * 5) { + logger.warn( + s"[$partitionId|$epochId] Large queue remaining: ${queue.size()}" + ) + } + flushRemaining() + } finally { + if (conn != null) { + try conn.close() + catch { case _: Exception => } + } + } + logger.info(s"[$partitionId|$epochId] Writer closed") + } + + // ------------------------------------------------------------------------- + // Internal Methods + // ------------------------------------------------------------------------- + + private def worker(): Unit = { + while (!stopEvent.get()) { + try { + var collecting = true + while (collecting && batch.size < config.batchSize) { + val item = queue.poll(10, TimeUnit.MILLISECONDS) + if (item != null) batch += item + else collecting = false + } + + if (batch.size >= config.batchSize || (batch.nonEmpty && timeToFlush())) { + flushBatch() + } + + Thread.sleep(0, 100000) // 0.1ms + } catch { + case e: Exception => + logger.error(s"Worker error: ${e.getMessage}", e) + workerError = e.getMessage + return + } + } + } + + private def flushBatch(): Unit = { + if (batch.isEmpty) return + + val perfStart = System.currentTimeMillis() + try { + config.mode.toLowerCase match { + case "bulk-insert" => flushWithCopy() + case _ => flushWithExecuteBatch() + } + conn.commit() + + val flushedCount = batch.size + batch.clear() + lastFlush = System.currentTimeMillis() + val perfTime = System.currentTimeMillis() - perfStart + logger.info( + s"[$partitionId|$epochId] Flushed $flushedCount rows in ${perfTime}ms" + ) + } catch { + case e: Exception => + try conn.rollback() + catch { case _: Exception => } + throw e + } + } + + private def flushWithCopy(): Unit = { + val cm = new org.postgresql.copy.CopyManager( + conn.unwrap(classOf[org.postgresql.core.BaseConnection]) + ) + val cols = columns.mkString(", ") + val copyIn = + cm.copyIn(s"COPY ${config.table} ($cols) FROM STDIN WITH (FORMAT text)") + + try { + for (row <- batch) { + val line = row.map { + case null => "\\N" + case v => + v.toString + .replace("\\", "\\\\") + .replace("\t", "\\t") + .replace("\n", "\\n") + .replace("\r", "\\r") + }.mkString("\t") + "\n" + val bytes = line.getBytes("UTF-8") + copyIn.writeToCopy(bytes, 0, bytes.length) + } + copyIn.endCopy() + } finally { + if (copyIn.isActive) { + copyIn.cancelCopy() + } + } + } + + private def flushWithExecuteBatch(): Unit = { + val stmt = conn.prepareStatement(sql.get) + try { + for (row <- batch) { + for (i <- row.indices) { + if (row(i) == null) stmt.setNull(i + 1, Types.NULL) + else setParameter(stmt, i + 1, row(i), columnTypes(i)) + } + stmt.addBatch() + } + stmt.executeBatch() + } finally { + stmt.close() + } + } + + private def flushRemaining(): Unit = { + if (queue != null) { + val remaining = new java.util.ArrayList[Array[Any]]() + queue.drainTo(remaining) + remaining.forEach(item => batch += item) + } + if (batch != null && batch.nonEmpty) flushBatch() + } + + private def timeToFlush(): Boolean = + System.currentTimeMillis() - lastFlush >= config.batchIntervalMs + + // ------------------------------------------------------------------------- + // SQL Builder + // ------------------------------------------------------------------------- + + private def buildSql(): Option[String] = { + val cols = columns.mkString(", ") + val placeholders = columns.map(_ => "?").mkString(", ") + + config.mode.toLowerCase match { + case "insert" => + Some(s"INSERT INTO ${config.table} ($cols) VALUES ($placeholders)") + + case "upsert" => + if (config.primaryKeys.isEmpty) { + throw new IllegalArgumentException("primaryKeys required for upsert mode") + } + val pkCols = config.primaryKeys.mkString(", ") + val pkSet = config.primaryKeys.map(_.toLowerCase).toSet + val updateCols = columns + .filterNot(c => pkSet.contains(c.toLowerCase)) + .map(c => s"$c = EXCLUDED.$c") + .mkString(", ") + + if (updateCols.isEmpty) { + Some( + s"""INSERT INTO ${config.table} ($cols) VALUES ($placeholders) + |ON CONFLICT ($pkCols) DO NOTHING""".stripMargin + ) + } else { + Some( + s"""INSERT INTO ${config.table} ($cols) VALUES ($placeholders) + |ON CONFLICT ($pkCols) DO UPDATE SET $updateCols""".stripMargin + ) + } + + case "bulk-insert" => + None + + case other => + throw new IllegalArgumentException( + s"Invalid mode: $other. Use 'insert', 'upsert', or 'bulk-insert'." + ) + } + } + + // ------------------------------------------------------------------------- + // Type Conversion + // ------------------------------------------------------------------------- + + private def convertValue(value: Any, dataType: DataType): Any = { + if (value == null) return null + dataType match { + case BooleanType => value.asInstanceOf[Boolean] + case IntegerType => value match { case i: Int => i; case o => o.toString.toInt } + case LongType => value match { case l: Long => l; case o => o.toString.toLong } + case FloatType => value match { case f: Float => f; case o => o.toString.toFloat } + case DoubleType => value match { case d: Double => d; case o => o.toString.toDouble } + case ShortType => value match { case s: Short => s; case o => o.toString.toShort } + case ByteType => value match { case b: Byte => b; case o => o.toString.toByte } + case _: DecimalType => + value match { + case d: java.math.BigDecimal => d + case d: scala.math.BigDecimal => d.bigDecimal + case o => new java.math.BigDecimal(o.toString) + } + case StringType => value.toString + case DateType => value + case TimestampType => value + case BinaryType => value + case _ => value + } + } + + private def setParameter( + stmt: PreparedStatement, + index: Int, + value: Any, + dataType: DataType + ): Unit = { + dataType match { + case BooleanType => stmt.setBoolean(index, value.asInstanceOf[Boolean]) + case IntegerType => stmt.setInt(index, value.asInstanceOf[Int]) + case LongType => stmt.setLong(index, value.asInstanceOf[Long]) + case FloatType => stmt.setFloat(index, value.asInstanceOf[Float]) + case DoubleType => stmt.setDouble(index, value.asInstanceOf[Double]) + case ShortType => stmt.setShort(index, value.asInstanceOf[Short]) + case ByteType => stmt.setByte(index, value.asInstanceOf[Byte]) + case _: DecimalType => stmt.setBigDecimal(index, value.asInstanceOf[java.math.BigDecimal]) + case StringType => stmt.setString(index, value.asInstanceOf[String]) + case DateType => stmt.setDate(index, value.asInstanceOf[java.sql.Date]) + case TimestampType => stmt.setTimestamp(index, value.asInstanceOf[java.sql.Timestamp]) + case BinaryType => stmt.setBytes(index, value.asInstanceOf[Array[Byte]]) + case _ => stmt.setObject(index, value) + } + } + + // ------------------------------------------------------------------------- + // Spark to PostgreSQL Type Mapping + // ------------------------------------------------------------------------- + + private def sparkTypeToPgType(dataType: DataType): String = { + dataType match { + case BooleanType => "BOOLEAN" + case ByteType => "SMALLINT" + case ShortType => "SMALLINT" + case IntegerType => "INTEGER" + case LongType => "BIGINT" + case FloatType => "REAL" + case DoubleType => "DOUBLE PRECISION" + case d: DecimalType => s"DECIMAL(${d.precision}, ${d.scale})" + case StringType => "TEXT" + case DateType => "DATE" + case TimestampType => "TIMESTAMP" + case BinaryType => "BYTEA" + case _ => "TEXT" + } + } + + // ------------------------------------------------------------------------- + // Schema Validation + // ------------------------------------------------------------------------- + + private def findUnsupportedFields(schema: StructType): Seq[String] = { + schema.fields.toSeq.flatMap { field => + field.dataType match { + case _: StructType => Some(field.name) + case _: MapType => Some(field.name) + case arr: ArrayType + if arr.elementType.isInstanceOf[StructType] || + arr.elementType.isInstanceOf[MapType] => + Some(field.name) + case _ => None + } + } + } +} + + +// COMMAND ---------- + +val username = dbutils.secrets.get("gaming-sessionization-rtm-demo", "lakebase-jdbc-username") +val password = dbutils.secrets.get("gaming-sessionization-rtm-demo", "lakebase-jdbc-password") +val table = "session_results_rtm_insert" +// Replace with your Lakebase hostname (never commit a real instance host in public forks). +val host = "YOUR_LAKEBASE_DATABASE_HOST" + +val lakebaseConfig = LakebaseConfig( + username = username, + password = password, + table = table, + host = host, + mode = "insert", // insert or upsert or bulk-insert + // primaryKeys = Seq("deviceid", "appsessionid", "psnaccountid", "sessionstatus") +) + +processed_stream_df + .writeStream + .queryName("sessionization") + .option("checkpointLocation", checkpoint_path) + .outputMode("update") + .trigger(RealTimeTrigger.apply("5 minutes")) + .foreach(new LakebaseForeachWriter(lakebaseConfig, processed_stream_df.schema)) + .start() + +// COMMAND ---------- + + + +// COMMAND ---------- + diff --git a/2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/RTM-TWS.scala b/2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/RTM-TWS.scala new file mode 100644 index 0000000..7957dfa --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/RTM-TWS.scala @@ -0,0 +1,452 @@ +// Databricks notebook source +// import org.apache.spark.sql.streaming.{TTLConfig, ValueState, OutputMode, StatefulProcessor, TimeMode, TimerValues} +import org.apache.spark.sql.streaming._ +import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ + +import org.apache.spark.sql.streaming.Trigger +import org.apache.log4j.Logger +import java.sql.Timestamp +import java.time._ +import scala.collection.mutable.ListBuffer +import java.time.Duration +import org.apache.spark.sql.execution.streaming.RealTimeTrigger +import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress} + +// COMMAND ---------- + +dbutils.widgets.dropdown("mode", "RTM", Seq("RTM", "MBM")) +val mode = dbutils.widgets.get("mode") + +// COMMAND ---------- + +spark.conf.set( + "spark.sql.streaming.stateStore.providerClass", + "com.databricks.sql.streaming.state.RocksDBStateStoreProvider" +) +// spark.conf.set( +// "spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled", +// "true" +// ) + +if (mode == "RTM") { + spark.conf.set("spark.sql.shuffle.partitions", "112") +} else { + spark.conf.set("spark.sql.shuffle.partitions", "128") +} + +// COMMAND ---------- + +val stream_name = "RTM-tws-poc" +val volume_path = "/Volumes/gaming_sessionization_demo/rtm_workload/write_to_kafka" +val checkpoint_path = s"$volume_path/$stream_name" + +dbutils.widgets.text("clean_checkpoint", "yes") +val clean_checkpoint = dbutils.widgets.get("clean_checkpoint") +if (clean_checkpoint == "yes") { + dbutils.fs.rm(checkpoint_path, true) +} + +// COMMAND ---------- + +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +class CustomStreamingQueryListener extends StreamingQueryListener { + implicit val formats: Formats = DefaultFormats + override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { + println(s"Query started: id=${event.id}, name=${event.name}") + } + override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { + val progress: StreamingQueryProgress = event.progress + println(s"batchId = ${progress.batchId} " + s"timestamp = ${progress.timestamp} " + s"numInputRows=${progress.numInputRows} " + s"batchDuration=${progress.batchDuration} ") + val stateOperators = progress.stateOperators(0) + val stateMetrics = stateOperators.customMetrics + // print(stateOperators) + println( + s"numRowsTotal = ${stateOperators.numRowsTotal} " + s"numRowsUpdated = ${stateOperators.numRowsUpdated} " + + s"numExpiredTimers=${stateMetrics.get("numExpiredTimers")} " + s"numRegisteredTimers=${stateMetrics.get("numRegisteredTimers")} " + + s"rocksdbPutLatency=${stateMetrics.get("rocksdbPutLatency")} " + s"timerProcessingTimeMs=${stateMetrics.get("timerProcessingTimeMs")} " + ) + if (mode == "RTM") { + val progress_json = parse(progress.json) + val latenciesJson: JValue = progress_json \ "latencies" + println(pretty(render(latenciesJson))) + } + } + override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { + println(s"Query terminated: id=${event.id}, runId=${event.runId}") + } +} + +val listener = new CustomStreamingQueryListener + +// Remove existing instance if present +spark.streams.removeListener(listener) + +// Add the listener +spark.streams.addListener(listener) + +// COMMAND ---------- + +val kafka_bootstrap_servers_plaintext = dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers") +val pc_sessions_topic = "pc_sessions" +val console_sessions_topic = "console_sessions" +val output_topic = "output_sessions" + +// COMMAND ---------- + +val kafka_schema = new StructType() + .add("appSessionId", LongType) + .add("eventId", StringType) + .add("psnAccountId", StringType) + .add("hostPcId", StringType) // for pc_sessions, may be null for console_sessions + .add("openPsid", StringType) // for console_sessions, may be null for pc_sessions + .add("timestamp", TimestampType) + .add("totalFgTime", LongType) + +// COMMAND ---------- + +package org.databricks.TransformWithStateStructs +import java.sql.Timestamp + +object SessionStructs { + case class InputRow( + topic: String, + partition: Int, + offset: Long, + kafka_timestamp: Timestamp, + deviceId: String, + psnAccountId: String, + appSessionId: Long, + eventId: String, + session_timestamp: Timestamp, + totalFgTime: Long, + ) + + case class OutputRow( + deviceId: String, + appSessionId: Long, + psnAccountId: String, + sessionStatus: String, + session_timestamp: Timestamp, + sessionDuration: Long, + upstream_timestamp: Timestamp, + processing_timestamp: Timestamp, + timer_info: Timestamp, + debug_info: String + ) + + case class MeasuredSessionValue( + psnAccountId: String, + sessionStatus: String, + session_timestamp: Timestamp, + sessionDuration: Long, + upstream_timestamp: Timestamp, + processing_timestamp: Timestamp, + timer_info: Timestamp, + ) +} + +// COMMAND ---------- + +implicit class PipeOps[A](val a: A) extends AnyVal { + def pipeIf(cond: Boolean)(f: A => A): A = if (cond) f(a) else a +} + +val input_stream_df = spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext) + .option("subscribe", s"$pc_sessions_topic,$console_sessions_topic") + .option("startingOffsets", "earliest") + .pipeIf(mode == "RTM")(_.option("maxPartitions", 16)) + .load() + .withColumn("key", col("key").cast("string")) + .withColumn("value", col("value").cast("string")) + .withColumnRenamed("timestamp", "kafka_timestamp") + .withColumn("session", from_json(col("value"), kafka_schema)) + .withColumn( + "deviceId", + when(col("topic") === pc_sessions_topic, col("session.hostPcId")) + .when(col("topic") === console_sessions_topic, col("session.openPsid")) + .otherwise(lit(null)) + ) + .selectExpr( + "topic", "partition", "offset", "kafka_timestamp", + "deviceId", "session.psnAccountId", "session.appSessionId","session.eventId","session.timestamp as session_timestamp", "session.totalFgTime" + ) + +// COMMAND ---------- + +// DBTITLE 1,Untitled +// import SessionStructs._ +import org.databricks.TransformWithStateStructs.SessionStructs._ + +class Sessionization() extends StatefulProcessor[String, InputRow, OutputRow] { + + @transient protected var sessionStatusState: MapState[Long, MeasuredSessionValue] = _ + + val TIMER_THRESHOLD_IN_MS = 30000 + val SESSION_THRESHOLD_IN_SECONDS = 1800 + val DEBUG_INFO_1 = "SessionStart" + val DEBUG_INFO_2 = "SessionEnd for existing Session" + val DEBUG_INFO_3 = "SessionEnd due to THRESHOLD" + val DEBUG_INFO_4 = "SessionEnd due to another SessionStart" + + override def init( + outputMode: OutputMode, + timeMode: TimeMode): Unit = { + + sessionStatusState = getHandle.getMapState[Long, MeasuredSessionValue]("sessionStatusState", Encoders.scalaLong, Encoders.product[MeasuredSessionValue], TTLConfig.NONE) + } + + override def handleInputRows( + key: String, + inputRows: Iterator[InputRow], + timerValues: TimerValues): Iterator[OutputRow] = { + + var outputRows = ListBuffer[OutputRow]() + + inputRows.foreach { row => + // Check if the incoming record is for session start + if (row.eventId == "ApplicationSessionStart") { + // Check if there is already an active Session for the Device + if(sessionStatusState.exists()) { + // pull the existing sessionId from statestore, generate sessionEnd record, and emit that + val sessionKeyIter = sessionStatusState.keys() + for (sessionId <- sessionKeyIter) { + val outputRecord = generateSessionEnd(key, null, sessionId, timerValues) + outputRows.append(outputRecord) + } + } + // Generate sessionStart record, and emit that + val outputRecord = generateSessionStart(key, row, timerValues) + outputRows.append(outputRecord) + } // checking if the incoming record is for session end + else if (row.eventId == "ApplicationSessionEndBi") { + // There should already an active Session for the Device + if(sessionStatusState.exists()) { + // Generate sessionEnd record, and emit that + val outputRecord = generateSessionEnd(key, row, -1L, timerValues) + outputRows.append(outputRecord) + } else { + // This can happen only of the sessionEnd event comes after the session has already been removed due to SESSION_THRESHOLD_IN_SECONDS. + } + } + } + + outputRows.iterator + } + + def generateSessionStart(key: String, inputRow: InputRow, timerValues: TimerValues): OutputRow = { + + // Get the processing time of the current micro-batch, remains same for every record in the same micro-batch + val currentProcessingTimeMillis = timerValues.getCurrentProcessingTimeInMs() + val currentProcessingTime = new Timestamp(currentProcessingTimeMillis) + + // create a new timer for next 30 sec, current processing time + 30 sec + val timerMillis = currentProcessingTimeMillis + TIMER_THRESHOLD_IN_MS + val timerTime = new Timestamp(timerMillis) + + // Generate sessionStart record + val outputRecord = OutputRow( + key, + inputRow.appSessionId, + inputRow.psnAccountId, + "SessionStart", + inputRow.session_timestamp, + 0, + inputRow.kafka_timestamp, + currentProcessingTime, + timerTime, + DEBUG_INFO_1 + ) + + // Create a key-value entry into the map state + val mapKey = inputRow.appSessionId + val mapValue = MeasuredSessionValue( + inputRow.psnAccountId, + "SessionStart", + inputRow.session_timestamp, + 0, + inputRow.kafka_timestamp, + currentProcessingTime, + timerTime + ) + sessionStatusState.updateValue(mapKey, mapValue) + + // Set the timer for current batch processingTime + 30 seconds + getHandle.registerTimer(timerMillis) + + outputRecord + } + + def generateSessionEnd(key: String, inputRow: InputRow, sessionId: Long, timerValues: TimerValues): OutputRow = { + // Get the processing time of the current micro-batch, remains same for every record in the same micro-batch + val currentProcessingTimeMillis = timerValues.getCurrentProcessingTimeInMs() + val currentProcessingTime = new Timestamp(currentProcessingTimeMillis) + + // Generate a sessionEnd record, and emit that + var outputRecord:OutputRow = null + if (inputRow != null) { + outputRecord = OutputRow( + key, + inputRow.appSessionId, + inputRow.psnAccountId, + "SessionEnd", + inputRow.session_timestamp, + inputRow.totalFgTime, + inputRow.kafka_timestamp, + currentProcessingTime, + null, + DEBUG_INFO_2 + ) + // remove the session from state + sessionStatusState.removeKey(inputRow.appSessionId) + } else { + val sessionValue = sessionStatusState.getValue(sessionId) + val currentSessionDuration: Long = (currentProcessingTimeMillis - sessionValue.processing_timestamp.getTime)/1000 + outputRecord = OutputRow( + key, + sessionId, + sessionValue.psnAccountId, + "SessionEnd", + sessionValue.session_timestamp, + currentSessionDuration, + sessionValue.upstream_timestamp, + currentProcessingTime, + null, + DEBUG_INFO_4 + ) + // remove the session from state + sessionStatusState.removeKey(sessionId) + } + + // Delete all the timers for the deviceId + val timerIter = getHandle.listTimers() + for (timer <- timerIter) getHandle.deleteTimer(timer) + + outputRecord + } + + // Define the logic for handling expired timers + override def handleExpiredTimer( + key: String, + timerValues: TimerValues, + expiredTimerInfo: ExpiredTimerInfo): Iterator[OutputRow] = { + + // Get the processing time of the current micro-batch, remains same for every record in the same micro-batch + val currentProcessingTimeMillis = timerValues.getCurrentProcessingTimeInMs() + val currentProcessingTime = new Timestamp(currentProcessingTimeMillis) + + // Get the expired Timer info + val expiredTimerMillis = expiredTimerInfo.getExpiryTimeInMs() + val expiredTimerTime = new Timestamp(expiredTimerMillis) + + // create a new timer for next 30 sec, current timer expiry time + 30 sec + val nextTimerMillis = expiredTimerMillis + TIMER_THRESHOLD_IN_MS + val nextTimerTime = new Timestamp(nextTimerMillis) + + var outputRows = ListBuffer[OutputRow]() + + // Expecting an entry for the expired timer for a given sessionId. + if (sessionStatusState.exists()) { + + // pull the existing sessionId, generate sessionHeartbeat record, and emit that + val sessionIdKeyIter = sessionStatusState.keys() + + for (sessionId <- sessionIdKeyIter) { + var sessionValue = sessionStatusState.getValue(sessionId) + + // Caluate the sessionDuration for heartbeat records, Current processing time - SessionStart event processing time + val currentSessionDuration: Long = (currentProcessingTimeMillis - sessionValue.processing_timestamp.getTime)/1000 + val sessionEvent = "SessionHeartbeat" + var outputRecord:OutputRow = null + + if (currentSessionDuration < SESSION_THRESHOLD_IN_SECONDS) { + // Generate sessionHeartbeat record, and emit that + outputRecord = OutputRow( + key, + sessionId, + sessionValue.psnAccountId, + sessionEvent, + sessionValue.session_timestamp, + currentSessionDuration, + sessionValue.upstream_timestamp, + currentProcessingTime, + nextTimerTime, + "Timer expired at: " + expiredTimerTime + ) + // create a new timer for next 30 sec, current processing time + 30 sec + getHandle.registerTimer(nextTimerMillis) + } else { + outputRecord = OutputRow( + key, + sessionId, + sessionValue.psnAccountId, + "SessionEnd", + sessionValue.session_timestamp, + currentSessionDuration, + sessionValue.upstream_timestamp, + currentProcessingTime, + null, + DEBUG_INFO_3 + ) + // remove the session from state + sessionStatusState.removeKey(sessionId) + } + outputRows.append(outputRecord) + } + } + outputRows.iterator + } + +} + +// COMMAND ---------- + +val processed_stream_df = input_stream_df.as[InputRow] + .groupByKey(_.deviceId) + .transformWithState( + new Sessionization(), + TimeMode.ProcessingTime, + OutputMode.Update() + ) + .toDF() + .withColumn("value", struct( + col("deviceId"), + col("appSessionId"), + col("psnAccountId"), + col("sessionStatus"), + col("session_timestamp"), + col("sessionDuration"), + col("upstream_timestamp"), + col("processing_timestamp"), + col("timer_info"), + col("debug_info") + )) + .select(to_json(col("value")).alias("value")) + +// COMMAND ---------- + +processed_stream_df + .writeStream + .queryName("sessionization") + .format("kafka") + .option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext) + .option("topic", output_topic) + .option("checkpointLocation", checkpoint_path) + .trigger( + if (mode == "RTM") RealTimeTrigger.apply("5 minutes") + else Trigger.ProcessingTime("0.5 seconds") + ) + .outputMode("update") + .start() + +// COMMAND ---------- + + + +// COMMAND ---------- + diff --git a/2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/Write_RTM_session_results_to_delta.py b/2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/Write_RTM_session_results_to_delta.py new file mode 100644 index 0000000..22a8b0b --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/Write_RTM_session_results_to_delta.py @@ -0,0 +1,65 @@ +# Databricks notebook source +from pyspark.sql.functions import col, from_json +from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType + +# COMMAND ---------- + +kafka_bootstrap_servers_plaintext = dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers") +output_topic = "output_sessions" +volume_path = '/Volumes/gaming_sessionization_demo/rtm_workload/write_to_kafka' +checkpoint_path = f'{volume_path}/{output_topic}' + +dbutils.widgets.text('clean_checkpoint', 'yes') +clean_checkpoint = dbutils.widgets.get('clean_checkpoint') +if clean_checkpoint == 'yes': + dbutils.fs.rm(checkpoint_path, True) + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC drop table if exists gaming_sessionization_demo.rtm_workload.session_results_rtm; + +# COMMAND ---------- + +kafka_schema = StructType([ + StructField("deviceId", StringType(), True), + StructField("appSessionId", LongType(), True), + StructField("psnAccountId", StringType(), True), + StructField("sessionStatus", StringType(), True), + StructField("session_timestamp", TimestampType(), True), + StructField("sessionDuration", LongType(), True), + StructField("upstream_timestamp", TimestampType(), True), + StructField("processing_timestamp", TimestampType(), True), + StructField("timer_info", TimestampType(), True), + StructField("debug_info", StringType(), True) +]) + +# COMMAND ---------- + +stream_df = (spark.readStream.format("kafka") + .option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext) + .option("subscribe", output_topic) + .option("startingOffsets", "earliest") + .load() + .withColumn("value", col("value").cast('string')) + .withColumn("value_struct", from_json(col("value"), kafka_schema)) + .selectExpr( + 'timestamp as output_timestamp', + 'value_struct.*' + ) + ) + +# COMMAND ---------- + +( + stream_df + .writeStream + .queryName('write_RTM_sessions') + .outputMode("append") + .trigger(processingTime = '1 seconds') + .option("checkpointLocation", checkpoint_path) + .toTable("gaming_sessionization_demo.rtm_workload.session_results_rtm") +) + +# COMMAND ---------- + diff --git a/2026-04-rtm-transformWithState-GamingSessionization/debug.sql b/2026-04-rtm-transformWithState-GamingSessionization/debug.sql new file mode 100644 index 0000000..fe2c18e --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/debug.sql @@ -0,0 +1,58 @@ +--get Counts of input stream by minute +with sessions_stream as ( + select appSessionId, eventId, hostPcId as deviceId, psnAccountId, timestamp, totalFgTime from gaming_sessionization_demo.rtm_workload.pc_sessions_stream + union all + select appSessionId, eventId, openPsid as deviceId, psnAccountId, timestamp, totalFgTime from gaming_sessionization_demo.rtm_workload.console_sessions_stream +) +select date_trunc('MINUTE', `timestamp`), count(1) +from sessions_stream +group by date_trunc('MINUTE', `timestamp`) +order by date_trunc('MINUTE', `timestamp`) + +--SessionStart vs SessionHeartbeat ratio from output +select +date_trunc('MINUTE', `session_timestamp`), +sum(case when sessionStatus = 'SessionStart' then 1 else null end) as SessionStart, +sum(case when sessionStatus = 'SessionHeartbeat' then 1 else null end) as SessionHeartbeat, +sum(case when sessionStatus = 'SessionEnd' then 1 else null end) as SessionEnd +from gaming_sessionization_demo.rtm_workload.session_results_rtm +group by date_trunc('MINUTE', `session_timestamp`) +order by date_trunc('MINUTE', `session_timestamp`) + + +-- Debugging Timer heartbeats for the session scope - get some example deviceIds +with sessions_stream as ( + select appSessionId, eventId, hostPcId as deviceId, psnAccountId, timestamp, totalFgTime from gaming_sessionization_demo.rtm_workload.pc_sessions_stream + union all + select appSessionId, eventId, openPsid as deviceId, psnAccountId, timestamp, totalFgTime from gaming_sessionization_demo.rtm_workload.console_sessions_stream +), +minute_1 as (select deviceid, appSessionId, totalFgTime, `timestamp` as session_start from sessions_stream where minute(`timestamp`) = 1 ), +minute_5 as (select deviceid, appSessionId, totalFgTime, `timestamp` as session_end from sessions_stream where minute(`timestamp`) = 6 ) + +select * from minute_1 join minute_5 on minute_1.deviceid = minute_5.deviceid and minute_1.appSessionId = minute_5.appSessionId order by session_start, session_end + +-- Debugging Timer heartbeats for the session scope - pick a device id and +select * except(deviceId, psnAccountId) +from gaming_sessionization_demo.rtm_workload.session_results_rtm where deviceId = 'REPLACE_WITH_DEVICE_ID' +order by output_timestamp asc + +-- Getting latency metrics +with tab1 as ( + select deviceId, sessionStatus, appSessionId, psnAccountId, processing_timestamp, upstream_timestamp, output_timestamp, timestampdiff(MILLISECOND, upstream_timestamp, output_timestamp) as latency_ms +from gaming_sessionization_demo.rtm_workload.session_results_rtm where sessionStatus in ('SessionStart', 'SessionEnd') +order by timestampdiff(SECOND, upstream_timestamp, output_timestamp) asc +) + select + count(1), + min(latency_ms) as min, + percentile(latency_ms, 0.10) as p10, + percentile(latency_ms, 0.25) as p25, + percentile(latency_ms, 0.5) as median, + percentile(latency_ms, 0.75) as p75, + percentile(latency_ms, 0.90) as p90, + percentile(latency_ms, 0.95) as p95, + percentile(latency_ms, 0.99) as p99, + max(latency_ms) as max + from tab1 + + diff --git a/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/Kafka-console-sessions-stream-ingest.py b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/Kafka-console-sessions-stream-ingest.py new file mode 100644 index 0000000..0ee6ab7 --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/Kafka-console-sessions-stream-ingest.py @@ -0,0 +1,66 @@ +# Databricks notebook source +from pyspark.sql import functions as F +from pyspark.sql.streaming import StreamingQueryListener + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC CREATE EXTERNAL VOLUME IF NOT EXISTS gaming_sessionization_demo.rtm_workload.write_to_kafka +# MAGIC LOCATION 's3://YOUR_EXTERNAL_LOCATION/gaming-sessionization-rtm/write_to_kafka/' + +# COMMAND ---------- + +kafka_bootstrap_servers_plaintext = dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers") +console_sessions_topic = 'console_sessions' +volume_path = '/Volumes/gaming_sessionization_demo/rtm_workload/write_to_kafka' +checkpoint_path = f'{volume_path}/{console_sessions_topic}' + +dbutils.widgets.text('clean_checkpoint', 'yes') +clean_checkpoint = dbutils.widgets.get('clean_checkpoint') +if clean_checkpoint == 'yes': + dbutils.fs.rm(checkpoint_path, True) + +# COMMAND ---------- + +class MyStreamingListener(StreamingQueryListener): + def onQueryStarted(self, event): + print(f"'{event.name}' [{event.id}] got started!") + def onQueryProgress(self, event): + row = event.progress + print(f"****************************************** batchId ***********************") + print(f"batchId = {row.batchId} timestamp = {row.timestamp} numInputRows = {row.numInputRows} batchDuration = {row.batchDuration}") + def onQueryTerminated(self, event): + print(f"{event.id} got terminated!") +try: + spark.streams.removeListener(MyStreamingListener()) +except: + pass +spark.streams.addListener(MyStreamingListener()) + +# COMMAND ---------- + +console_stream_df = ( + spark.readStream + .format("delta") + .option("maxFilesPerTrigger", 1) + .table("gaming_sessionization_demo.rtm_workload.console_sessions_stream") + .withColumn("all_columns", F.to_json(F.struct('appSessionId', 'eventId', 'openPsid', 'psnAccountId', 'timestamp', 'totalFgTime'))) + .selectExpr('CAST(all_columns AS BINARY) AS value') +) + +# COMMAND ---------- + +( + console_stream_df + .writeStream + .queryName('console_sessions_stream') + .format('kafka') + .option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext) + .option("topic", console_sessions_topic) + .option("checkpointLocation", checkpoint_path) + .trigger(processingTime = '1 seconds') + .start() +) + +# COMMAND ---------- + diff --git a/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/Kafka-pc-sessions-stream-ingest.py b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/Kafka-pc-sessions-stream-ingest.py new file mode 100644 index 0000000..6620778 --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/Kafka-pc-sessions-stream-ingest.py @@ -0,0 +1,66 @@ +# Databricks notebook source +from pyspark.sql import functions as F +from pyspark.sql.streaming import StreamingQueryListener + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC CREATE EXTERNAL VOLUME IF NOT EXISTS gaming_sessionization_demo.rtm_workload.write_to_kafka +# MAGIC LOCATION 's3://YOUR_EXTERNAL_LOCATION/gaming-sessionization-rtm/write_to_kafka/' + +# COMMAND ---------- + +kafka_bootstrap_servers_plaintext = dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers") +pc_sessions_topic = 'pc_sessions' +volume_path = '/Volumes/gaming_sessionization_demo/rtm_workload/write_to_kafka' +checkpoint_path = f'{volume_path}/{pc_sessions_topic}' + +dbutils.widgets.text('clean_checkpoint', 'yes') +clean_checkpoint = dbutils.widgets.get('clean_checkpoint') +if clean_checkpoint == 'yes': + dbutils.fs.rm(checkpoint_path, True) + +# COMMAND ---------- + +class MyStreamingListener(StreamingQueryListener): + def onQueryStarted(self, event): + print(f"'{event.name}' [{event.id}] got started!") + def onQueryProgress(self, event): + row = event.progress + print(f"****************************************** batchId ***********************") + print(f"batchId = {row.batchId} timestamp = {row.timestamp} numInputRows = {row.numInputRows} batchDuration = {row.batchDuration}") + def onQueryTerminated(self, event): + print(f"{event.id} got terminated!") +try: + spark.streams.removeListener(MyStreamingListener()) +except: + pass +spark.streams.addListener(MyStreamingListener()) + +# COMMAND ---------- + +pc_stream_df = ( + spark.readStream + .format("delta") + .option("maxFilesPerTrigger", 1) + .table("gaming_sessionization_demo.rtm_workload.pc_sessions_stream") + .withColumn("all_columns", F.to_json(F.struct('appSessionId', 'eventId', 'hostPcId', 'psnAccountId', 'timestamp', 'totalFgTime'))) + .selectExpr('CAST(all_columns AS BINARY) AS value') +) + +# COMMAND ---------- + +( + pc_stream_df + .writeStream + .queryName('pc_sessions_stream') + .format('kafka') + .option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext) + .option("topic", pc_sessions_topic) + .option("checkpointLocation", checkpoint_path) + .trigger(processingTime = '1 seconds') + .start() +) + +# COMMAND ---------- + diff --git a/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/create-delete-topic-scala.scala b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/create-delete-topic-scala.scala new file mode 100644 index 0000000..7c5de3f --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/create-delete-topic-scala.scala @@ -0,0 +1,64 @@ +// Databricks notebook source +// Maven dependency: org.apache.kafka:kafka-clients:3.5.1 + +// COMMAND ---------- + +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic} +import java.util.{Collections, Properties} +import scala.jdk.CollectionConverters._ + +// COMMAND ---------- + +val kafkaBootstrapServers = dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers") + +val pcSessionsTopic = "pc_sessions" +val consoleSessionsTopic = "console_sessions" +val outputTopic = "output_sessions" + +// COMMAND ---------- + +val props = new Properties() +props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers) +val adminClient = AdminClient.create(props) + +// COMMAND ---------- + +def deleteTopic(topicName: String): Unit = { + try { + adminClient.deleteTopics(Collections.singletonList(topicName)).all().get() + println(s"Topic '$topicName' deleted.") + } catch { + case e: Exception => println(s"Failed to delete topic '$topicName': ${e.getMessage}") + } +} + +deleteTopic(pcSessionsTopic) +deleteTopic(consoleSessionsTopic) +deleteTopic(outputTopic) + +// COMMAND ---------- + +// val retentionMs = 7 * 24 * 60 * 60 * 1000 // 7 days +val retentionMs = -1 + +def createTopic(topicName: String, numPartitions: Int, replicationFactor: Short = 3): Unit = { + val topic = new NewTopic(topicName, numPartitions, replicationFactor) + .configs(Map("retention.ms" -> retentionMs.toString).asJava) + try { + adminClient.createTopics(Collections.singletonList(topic)).all().get() + println(s"Topic '$topicName' created successfully.") + } catch { + case e: Exception => println(s"Error creating topic '$topicName': ${e.getMessage}") + } +} + +createTopic(pcSessionsTopic, 16) +createTopic(consoleSessionsTopic, 16) +createTopic(outputTopic, 64) + +// COMMAND ---------- + +adminClient.close() + +// COMMAND ---------- + diff --git a/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/create-delete-topic.py b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/create-delete-topic.py new file mode 100644 index 0000000..2bc9fad --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/create-delete-topic.py @@ -0,0 +1,80 @@ +# Databricks notebook source +# MAGIC %pip install --index-url https://pypi-proxy.dev.databricks.com/simple kafka-python + +# COMMAND ---------- + + +from kafka.admin import KafkaAdminClient, NewTopic +from kafka.errors import TopicAlreadyExistsError + +# COMMAND ---------- + +kafka_bootstrap_servers_plaintext = dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers") + +# Kafka topics +pc_sessions_topic = 'pc_sessions' +console_sessions_topic = 'console_sessions' +output_topic = "output_sessions" + +# COMMAND ---------- + +def delete_topic(topic_name): + # Configure Admin client + admin_client = KafkaAdminClient( + bootstrap_servers=kafka_bootstrap_servers_plaintext.split(','), + client_id="delete-topic-client" + ) + + # Delete the topic + try: + admin_client.delete_topics(topics=[topic_name]) + print(f"Topic '{topic_name}' marked for deletion.") + except Exception as e: + print(f"Failed to delete topic '{topic_name}': {e}") + +delete_topic(pc_sessions_topic) +delete_topic(console_sessions_topic) +delete_topic(output_topic) + +# COMMAND ---------- + +# retention_ms = 7 * 24 * 60 * 60 * 1000 # 7 days in milliseconds +retention_ms = -1 + +def create_topic(topic_name, num_partitions): + + # Configure Kafka admin client + create_client = KafkaAdminClient( + bootstrap_servers=kafka_bootstrap_servers_plaintext.split(','), # Replace with your Kafka broker(s) + client_id="create-topic-client" + ) + + # Define a new topic + topic = NewTopic( + name=topic_name, + num_partitions=num_partitions, + replication_factor=3, + topic_configs={ + 'retention.ms': str(retention_ms) + } + ) + + # Create the topic + try: + create_client.create_topics(new_topics=[topic], validate_only=False) + print(f"Topic '{topic_name}' created successfully.") + except Exception as e: + print(f"Error creating topic: {e}") + finally: + create_client.close() + +create_topic(pc_sessions_topic, 16) +create_topic(console_sessions_topic, 16) +create_topic(output_topic, 64) + +# COMMAND ---------- + + + +# COMMAND ---------- + diff --git a/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/generate-fake-session-data.py b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/generate-fake-session-data.py new file mode 100644 index 0000000..bb09bf1 --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/generate-fake-session-data.py @@ -0,0 +1,306 @@ +# Databricks notebook source +from datetime import datetime, timedelta +import uuid +import random +from pyspark.sql import functions as F + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC use catalog gaming_sessionization_demo; +# MAGIC create schema if not exists rtm_workload; +# MAGIC use schema rtm_workload; +# MAGIC +# MAGIC ALTER SCHEMA rtm_workload DISABLE PREDICTIVE OPTIMIZATION; +# MAGIC -- drop table if exists gaming_sessionization_demo.rtm_workload.pc_sessions; +# MAGIC -- drop table if exists gaming_sessionization_demo.rtm_workload.console_sessions; + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC set spark.databricks.delta.autoCompact.enabled = false; +# MAGIC set spark.databricks.delta.optimizeWrite.enabled = false; + +# COMMAND ---------- + +# def get_device_id(): +# return str(uuid.uuid4()) + +# def get_session_id(): +# return random.randint(1000000000, 9999999999) + +# def get_sink(): +# return random.choice(['PC','Console']) + +# def get_account_id(): +# return str(uuid.uuid4())[:9] + str(random.randint(999999000000000, 999999999999999)) + +# def get_session_start_time(start_time, end_time): +# total_seconds = int((end_time - start_time).total_seconds()) +# return start_time + timedelta(seconds=random.randint(0, total_seconds)) + +# def get_session_end_time(session_start_time): +# windows = [ +# (2*60, 4*60, 0.25), +# (4*60, 6*60, 0.25), +# (6*60, 8*60, 0.25), +# (8*60, 10*60, 0.25) +# ] +# window = random.choices( +# population=windows, +# weights=[w[2] for w in windows], +# k=1 +# )[0] +# offset_seconds = random.randint(window[0], window[1]) +# session_end_time = session_start_time + timedelta(seconds=offset_seconds) +# return session_end_time, offset_seconds + + + +# COMMAND ---------- + +# # Example start and end times (ISO format) +# start_time = datetime.fromisoformat("2025-11-01T00:00:00") +# end_time = datetime.fromisoformat("2025-11-01T00:59:59") +# eventIds = ['ApplicationSessionStart', 'ApplicationSessionEndBi'] +# minute_counter = 0 + +# minute_start = start_time +# while minute_start < end_time: +# minute_end = minute_start + timedelta(seconds=59) +# print(f"Minute: {minute_start} — {minute_end}") + +# pc_session_data = [] +# console_session_data = [] + +# if minute_counter < 2: +# num_sessions = 500000 +# elif 2 <= minute_counter < 3: +# num_sessions = 475000 +# elif 3 <= minute_counter < 4: +# num_sessions = 425000 +# elif 4 <= minute_counter < 5: +# num_sessions = 400000 +# elif 5 <= minute_counter < 6: +# num_sessions = 350000 +# elif 6 <= minute_counter < 8: +# num_sessions = 275000 +# elif 8 <= minute_counter < 12: +# num_sessions = 200000 +# elif 12 <= minute_counter < 15: +# num_sessions = 225000 + +# elif 15 <= minute_counter < 19: +# num_sessions = 300000 +# elif 19 <= minute_counter < 23: +# num_sessions = 275000 +# elif 23 <= minute_counter < 27: +# num_sessions = 250000 +# elif 27 <= minute_counter < 30: +# num_sessions = 225000 + +# elif 30 <= minute_counter < 34: +# num_sessions = 300000 +# elif 34 <= minute_counter < 38: +# num_sessions = 275000 +# elif 38 <= minute_counter < 42: +# num_sessions = 250000 +# elif 42 <= minute_counter < 45: +# num_sessions = 225000 + +# elif 45 <= minute_counter < 49: +# num_sessions = 300000 +# elif 49 <= minute_counter < 53: +# num_sessions = 275000 +# elif 53 <= minute_counter < 57: +# num_sessions = 250000 +# elif 57 <= minute_counter < 60: +# num_sessions = 225000 + + +# for _ in range(num_sessions): +# sink = get_sink() +# deviceId = get_device_id() +# sessionId = get_session_id() +# accountId = get_account_id() #psnAccountId +# sessionIdStartTime = get_session_start_time(minute_start, minute_end) +# sessionIdEndTime, sessionDuration = get_session_end_time(sessionIdStartTime) + +# if sink == 'PC': +# session_start = { +# "hostPcId": deviceId, +# "appSessionId": sessionId, +# "psnAccountId": accountId, +# "eventId": "ApplicationSessionStart", +# "timestamp": sessionIdStartTime, +# "totalFgTime": 0 +# } +# session_end = { +# "hostPcId": deviceId, +# "appSessionId": sessionId, +# "psnAccountId": accountId, +# "eventId": "ApplicationSessionEndBi", +# "timestamp": sessionIdEndTime, +# "totalFgTime": sessionDuration +# } +# pc_session_data.append(session_start) +# pc_session_data.append(session_end) +# elif sink == 'Console': +# session_start = { +# "openPsid": deviceId, +# "appSessionId": sessionId, +# "psnAccountId": accountId, +# "eventId": "ApplicationSessionStart", +# "timestamp": sessionIdStartTime, +# "totalFgTime": 0 +# } +# session_end = { +# "openPsid": deviceId, +# "appSessionId": sessionId, +# "psnAccountId": accountId, +# "eventId": "ApplicationSessionEndBi", +# "timestamp": sessionIdEndTime, +# "totalFgTime": sessionDuration +# } +# console_session_data.append(session_start) +# console_session_data.append(session_end) + +# pc_df = spark.createDataFrame(pc_session_data) +# console_df = spark.createDataFrame(console_session_data) + +# pc_df.repartition(1).write.format('delta').mode('append').saveAsTable('gaming_sessionization_demo.rtm_workload.pc_sessions') +# console_df.repartition(1).write.format('delta').mode('append').saveAsTable('gaming_sessionization_demo.rtm_workload.console_sessions') +# minute_counter +=1 +# minute_start += timedelta(minutes=1) + + + +# COMMAND ---------- + +# %sql +# create table gaming_sessionization_demo.rtm_workload.pc_sessions_bkp as select * from gaming_sessionization_demo.rtm_workload.pc_sessions@v59; +# create table gaming_sessionization_demo.rtm_workload.console_sessions_bkp as select * from gaming_sessionization_demo.rtm_workload.console_sessions@v59; + +# COMMAND ---------- + +# Testing Delayed SessionEnd +# %sql +# insert into gaming_sessionization_demo.rtm_workload.pc_sessions values (9402035216, 'ApplicationSessionStart', '6fb86ebc-3d88-4e31-b913-ea939acacf56', 'd03fae97-999999872866342', '2025-11-01T00:06:45.000+00:00', 0); +# insert into gaming_sessionization_demo.rtm_workload.pc_sessions values (9402035216, 'ApplicationSessionEndBi', '6fb86ebc-3d88-4e31-b913-ea939acacf56', 'd03fae97-999999872866342', '2025-11-01T00:10:45.000+00:00', 240); +# select * from gaming_sessionization_demo.rtm_workload.pc_sessions where hostpcid = '6fb86ebc-3d88-4e31-b913-ea939acacf56' order by timestamp + +# COMMAND ---------- + +# Testing another SessionStart while existing session is still active +# %sql +# insert into gaming_sessionization_demo.rtm_workload.console_sessions values (3432308279, 'ApplicationSessionStart', 'e77de5a1-05fb-46d1-be9c-aff02fc4fede', '087aa3fe-999999286732815', '2025-11-01T00:08:17.000+00:00', 0); +# insert into gaming_sessionization_demo.rtm_workload.console_sessions values (3432308279, 'ApplicationSessionEndBi', 'e77de5a1-05fb-46d1-be9c-aff02fc4fede', '087aa3fe-999999286732815', '2025-11-01T00:13:20.000+00:00', 303) +# select * from gaming_sessionization_demo.rtm_workload.console_sessions_stream where openPsid = 'e77de5a1-05fb-46d1-be9c-aff02fc4fede' order by timestamp + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC use catalog gaming_sessionization_demo; +# MAGIC create schema if not exists rtm_workload; +# MAGIC use schema rtm_workload; +# MAGIC +# MAGIC drop table if exists gaming_sessionization_demo.rtm_workload.pc_sessions_stream; +# MAGIC drop table if exists gaming_sessionization_demo.rtm_workload.console_sessions_stream; + +# COMMAND ---------- + +from datetime import timedelta + +pc_sessions_df = spark.table('gaming_sessionization_demo.rtm_workload.pc_sessions_bkp') +min_max_timestamp = pc_sessions_df.agg(F.min('timestamp').alias("min_timestamp"),F.max('timestamp').alias("max_timestamp")).collect()[0] +start_time = min_max_timestamp.min_timestamp +end_time = min_max_timestamp.max_timestamp + +interval = timedelta(seconds=1) + +current = start_time +while current < end_time: + interval_start = current + interval_end = current + interval - timedelta(microseconds=1) # inclusive end + print(f"Interval: {interval_start} — {interval_end}") + + filter_df = pc_sessions_df.filter(F.col('timestamp').between(F.lit(interval_start), F.lit(interval_end))) + filter_df.repartition(1).write.format('delta').mode('append').saveAsTable('gaming_sessionization_demo.rtm_workload.pc_sessions_stream') + + current += interval + + +# COMMAND ---------- + +from datetime import timedelta + +console_sessions_df = spark.table('gaming_sessionization_demo.rtm_workload.console_sessions_bkp') +min_max_timestamp = console_sessions_df.agg(F.min('timestamp').alias("min_timestamp"),F.max('timestamp').alias("max_timestamp")).collect()[0] +start_time = min_max_timestamp.min_timestamp +end_time = min_max_timestamp.max_timestamp + +interval = timedelta(seconds=1) + +current = start_time +while current < end_time: + interval_start = current + interval_end = current + interval - timedelta(microseconds=1) # inclusive end + print(f"Interval: {interval_start} — {interval_end}") + + filter_df = console_sessions_df.filter(F.col('timestamp').between(F.lit(interval_start), F.lit(interval_end))) + filter_df.repartition(1).write.format('delta').mode('append').saveAsTable('gaming_sessionization_demo.rtm_workload.console_sessions_stream') + + current += interval + + +# COMMAND ---------- + +# pc_sessions_df = spark.table('gaming_sessionization_demo.rtm_workload.pc_sessions') +# min_max_timestamp = pc_sessions_df.agg(F.min('timestamp').alias("min_timestamp"),F.max('timestamp').alias("max_timestamp")).collect()[0] +# start_time = min_max_timestamp.min_timestamp +# end_time = min_max_timestamp.max_timestamp + +# minute_start = start_time +# while minute_start < end_time: +# for i in range(12): +# sec_start = i * 5 +# sec_end = 5 * (i + 1) - 1 +# # Ensure last interval ends at 59 +# if sec_end > 59: +# sec_end = 59 +# interval_start = minute_start.replace(second=sec_start) +# interval_end = minute_start.replace(second=sec_end) +# print(f"Interval: {interval_start} — {interval_end}") +# filter_df = pc_sessions_df.filter(F.col('timestamp').between(F.lit(interval_start), F.lit(interval_end))) +# filter_df.repartition(1).write.format('delta').mode('append').saveAsTable('gaming_sessionization_demo.rtm_workload.pc_sessions_stream') + +# minute_start += timedelta(minutes=1) + +# COMMAND ---------- + +# console_sessions_df = spark.table('gaming_sessionization_demo.rtm_workload.console_sessions') +# min_max_timestamp = console_sessions_df.agg(F.min('timestamp').alias("min_timestamp"),F.max('timestamp').alias("max_timestamp")).collect()[0] +# start_time = min_max_timestamp.min_timestamp +# end_time = min_max_timestamp.max_timestamp + +# minute_start = start_time +# while minute_start < end_time: +# for i in range(12): +# sec_start = i * 5 +# sec_end = 5 * (i + 1) - 1 +# # Ensure last interval ends at 59 +# if sec_end > 59: +# sec_end = 59 +# interval_start = minute_start.replace(second=sec_start) +# interval_end = minute_start.replace(second=sec_end) +# print(f"Interval: {interval_start} — {interval_end}") +# filter_df = console_sessions_df.filter(F.col('timestamp').between(F.lit(interval_start), F.lit(interval_end))) +# filter_df.repartition(1).write.format('delta').mode('append').saveAsTable('gaming_sessionization_demo.rtm_workload.console_sessions_stream') + +# minute_start += timedelta(minutes=1) + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC ALTER TABLE gaming_sessionization_demo.rtm_workload.pc_sessions_stream DISABLE PREDICTIVE OPTIMIZATION; +# MAGIC ALTER TABLE gaming_sessionization_demo.rtm_workload.console_sessions_stream DISABLE PREDICTIVE OPTIMIZATION; \ No newline at end of file