Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
501 changes: 501 additions & 0 deletions 2026-04-rtm-transformWithState-GamingSessionization/README.md

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Databricks notebook source
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType

# COMMAND ----------

kafka_bootstrap_servers_plaintext = dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers")
output_topic = "output_sessions"
volume_path = '/Volumes/gaming_sessionization_demo/rtm_workload/write_to_kafka'
checkpoint_path = f'{volume_path}/{output_topic}'

dbutils.widgets.text('clean_checkpoint', 'yes')
clean_checkpoint = dbutils.widgets.get('clean_checkpoint')
if clean_checkpoint == 'yes':
dbutils.fs.rm(checkpoint_path, True)

# COMMAND ----------

# MAGIC %sql
# MAGIC drop table if exists gaming_sessionization_demo.rtm_workload.session_results_rtm;

# COMMAND ----------

kafka_schema = StructType([
StructField("deviceId", StringType(), True),
StructField("appSessionId", LongType(), True),
StructField("psnAccountId", StringType(), True),
StructField("sessionStatus", StringType(), True),
StructField("session_timestamp", TimestampType(), True),
StructField("sessionDuration", LongType(), True),
StructField("upstream_timestamp", TimestampType(), True),
StructField("processing_timestamp", TimestampType(), True),
StructField("timer_info", TimestampType(), True),
StructField("debug_info", StringType(), True)
])

# COMMAND ----------

stream_df = (spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext)
.option("subscribe", output_topic)
.option("startingOffsets", "earliest")
.load()
.withColumn("value", col("value").cast('string'))
.withColumn("value_struct", from_json(col("value"), kafka_schema))
.selectExpr(
'timestamp as output_timestamp',
'value_struct.*'
)
)

# COMMAND ----------

(
stream_df
.writeStream
.queryName('write_RTM_sessions')
.outputMode("append")
.trigger(processingTime = '1 seconds')
.option("checkpointLocation", checkpoint_path)
.toTable("gaming_sessionization_demo.rtm_workload.session_results_rtm")
)

# COMMAND ----------

58 changes: 58 additions & 0 deletions 2026-04-rtm-transformWithState-GamingSessionization/debug.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
--get Counts of input stream by minute
with sessions_stream as (
select appSessionId, eventId, hostPcId as deviceId, psnAccountId, timestamp, totalFgTime from gaming_sessionization_demo.rtm_workload.pc_sessions_stream
union all
select appSessionId, eventId, openPsid as deviceId, psnAccountId, timestamp, totalFgTime from gaming_sessionization_demo.rtm_workload.console_sessions_stream
)
select date_trunc('MINUTE', `timestamp`), count(1)
from sessions_stream
group by date_trunc('MINUTE', `timestamp`)
order by date_trunc('MINUTE', `timestamp`)

--SessionStart vs SessionHeartbeat ratio from output
select
date_trunc('MINUTE', `session_timestamp`),
sum(case when sessionStatus = 'SessionStart' then 1 else null end) as SessionStart,
sum(case when sessionStatus = 'SessionHeartbeat' then 1 else null end) as SessionHeartbeat,
sum(case when sessionStatus = 'SessionEnd' then 1 else null end) as SessionEnd
from gaming_sessionization_demo.rtm_workload.session_results_rtm
group by date_trunc('MINUTE', `session_timestamp`)
order by date_trunc('MINUTE', `session_timestamp`)


-- Debugging Timer heartbeats for the session scope - get some example deviceIds
with sessions_stream as (
select appSessionId, eventId, hostPcId as deviceId, psnAccountId, timestamp, totalFgTime from gaming_sessionization_demo.rtm_workload.pc_sessions_stream
union all
select appSessionId, eventId, openPsid as deviceId, psnAccountId, timestamp, totalFgTime from gaming_sessionization_demo.rtm_workload.console_sessions_stream
),
minute_1 as (select deviceid, appSessionId, totalFgTime, `timestamp` as session_start from sessions_stream where minute(`timestamp`) = 1 ),
minute_5 as (select deviceid, appSessionId, totalFgTime, `timestamp` as session_end from sessions_stream where minute(`timestamp`) = 6 )

select * from minute_1 join minute_5 on minute_1.deviceid = minute_5.deviceid and minute_1.appSessionId = minute_5.appSessionId order by session_start, session_end

-- Debugging Timer heartbeats for the session scope - pick a device id and
select * except(deviceId, psnAccountId)
from gaming_sessionization_demo.rtm_workload.session_results_rtm where deviceId = 'REPLACE_WITH_DEVICE_ID'
order by output_timestamp asc

-- Getting latency metrics
with tab1 as (
select deviceId, sessionStatus, appSessionId, psnAccountId, processing_timestamp, upstream_timestamp, output_timestamp, timestampdiff(MILLISECOND, upstream_timestamp, output_timestamp) as latency_ms
from gaming_sessionization_demo.rtm_workload.session_results_rtm where sessionStatus in ('SessionStart', 'SessionEnd')
order by timestampdiff(SECOND, upstream_timestamp, output_timestamp) asc
)
select
count(1),
min(latency_ms) as min,
percentile(latency_ms, 0.10) as p10,
percentile(latency_ms, 0.25) as p25,
percentile(latency_ms, 0.5) as median,
percentile(latency_ms, 0.75) as p75,
percentile(latency_ms, 0.90) as p90,
percentile(latency_ms, 0.95) as p95,
percentile(latency_ms, 0.99) as p99,
max(latency_ms) as max
from tab1


Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Databricks notebook source
from pyspark.sql import functions as F
from pyspark.sql.streaming import StreamingQueryListener

# COMMAND ----------

# MAGIC %sql
# MAGIC CREATE EXTERNAL VOLUME IF NOT EXISTS gaming_sessionization_demo.rtm_workload.write_to_kafka
# MAGIC LOCATION 's3://YOUR_EXTERNAL_LOCATION/gaming-sessionization-rtm/write_to_kafka/'

# COMMAND ----------

kafka_bootstrap_servers_plaintext = dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers")
console_sessions_topic = 'console_sessions'
volume_path = '/Volumes/gaming_sessionization_demo/rtm_workload/write_to_kafka'
checkpoint_path = f'{volume_path}/{console_sessions_topic}'

dbutils.widgets.text('clean_checkpoint', 'yes')
clean_checkpoint = dbutils.widgets.get('clean_checkpoint')
if clean_checkpoint == 'yes':
dbutils.fs.rm(checkpoint_path, True)

# COMMAND ----------

class MyStreamingListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress
print(f"****************************************** batchId ***********************")
print(f"batchId = {row.batchId} timestamp = {row.timestamp} numInputRows = {row.numInputRows} batchDuration = {row.batchDuration}")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
try:
spark.streams.removeListener(MyStreamingListener())
except:
pass
spark.streams.addListener(MyStreamingListener())

# COMMAND ----------

console_stream_df = (
spark.readStream
.format("delta")
.option("maxFilesPerTrigger", 1)
.table("gaming_sessionization_demo.rtm_workload.console_sessions_stream")
.withColumn("all_columns", F.to_json(F.struct('appSessionId', 'eventId', 'openPsid', 'psnAccountId', 'timestamp', 'totalFgTime')))
.selectExpr('CAST(all_columns AS BINARY) AS value')
)

# COMMAND ----------

(
console_stream_df
.writeStream
.queryName('console_sessions_stream')
.format('kafka')
.option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext)
.option("topic", console_sessions_topic)
.option("checkpointLocation", checkpoint_path)
.trigger(processingTime = '1 seconds')
.start()
)

# COMMAND ----------

Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Databricks notebook source
from pyspark.sql import functions as F
from pyspark.sql.streaming import StreamingQueryListener

# COMMAND ----------

# MAGIC %sql
# MAGIC CREATE EXTERNAL VOLUME IF NOT EXISTS gaming_sessionization_demo.rtm_workload.write_to_kafka
# MAGIC LOCATION 's3://YOUR_EXTERNAL_LOCATION/gaming-sessionization-rtm/write_to_kafka/'

# COMMAND ----------

kafka_bootstrap_servers_plaintext = dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers")
pc_sessions_topic = 'pc_sessions'
volume_path = '/Volumes/gaming_sessionization_demo/rtm_workload/write_to_kafka'
checkpoint_path = f'{volume_path}/{pc_sessions_topic}'

dbutils.widgets.text('clean_checkpoint', 'yes')
clean_checkpoint = dbutils.widgets.get('clean_checkpoint')
if clean_checkpoint == 'yes':
dbutils.fs.rm(checkpoint_path, True)

# COMMAND ----------

class MyStreamingListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress
print(f"****************************************** batchId ***********************")
print(f"batchId = {row.batchId} timestamp = {row.timestamp} numInputRows = {row.numInputRows} batchDuration = {row.batchDuration}")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
try:
spark.streams.removeListener(MyStreamingListener())
except:
pass
spark.streams.addListener(MyStreamingListener())

# COMMAND ----------

pc_stream_df = (
spark.readStream
.format("delta")
.option("maxFilesPerTrigger", 1)
.table("gaming_sessionization_demo.rtm_workload.pc_sessions_stream")
.withColumn("all_columns", F.to_json(F.struct('appSessionId', 'eventId', 'hostPcId', 'psnAccountId', 'timestamp', 'totalFgTime')))
.selectExpr('CAST(all_columns AS BINARY) AS value')
)

# COMMAND ----------

(
pc_stream_df
.writeStream
.queryName('pc_sessions_stream')
.format('kafka')
.option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext)
.option("topic", pc_sessions_topic)
.option("checkpointLocation", checkpoint_path)
.trigger(processingTime = '1 seconds')
.start()
)

# COMMAND ----------

Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Databricks notebook source
// Maven dependency: org.apache.kafka:kafka-clients:3.5.1

// COMMAND ----------

import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
import java.util.{Collections, Properties}
import scala.jdk.CollectionConverters._

// COMMAND ----------

val kafkaBootstrapServers = dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers")

val pcSessionsTopic = "pc_sessions"
val consoleSessionsTopic = "console_sessions"
val outputTopic = "output_sessions"

// COMMAND ----------

val props = new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers)
val adminClient = AdminClient.create(props)

// COMMAND ----------

def deleteTopic(topicName: String): Unit = {
try {
adminClient.deleteTopics(Collections.singletonList(topicName)).all().get()
println(s"Topic '$topicName' deleted.")
} catch {
case e: Exception => println(s"Failed to delete topic '$topicName': ${e.getMessage}")
}
}

deleteTopic(pcSessionsTopic)
deleteTopic(consoleSessionsTopic)
deleteTopic(outputTopic)

// COMMAND ----------

// val retentionMs = 7 * 24 * 60 * 60 * 1000 // 7 days
val retentionMs = -1

def createTopic(topicName: String, numPartitions: Int, replicationFactor: Short = 3): Unit = {
val topic = new NewTopic(topicName, numPartitions, replicationFactor)
.configs(Map("retention.ms" -> retentionMs.toString).asJava)
try {
adminClient.createTopics(Collections.singletonList(topic)).all().get()
println(s"Topic '$topicName' created successfully.")
} catch {
case e: Exception => println(s"Error creating topic '$topicName': ${e.getMessage}")
}
}

createTopic(pcSessionsTopic, 16)
createTopic(consoleSessionsTopic, 16)
createTopic(outputTopic, 64)

// COMMAND ----------

adminClient.close()

// COMMAND ----------

Loading