diff --git a/2026-04-rtm-transformWithState-GamingSessionization/.DS_Store b/2026-04-rtm-transformWithState-GamingSessionization/.DS_Store new file mode 100644 index 0000000..89110af Binary files /dev/null and b/2026-04-rtm-transformWithState-GamingSessionization/.DS_Store differ diff --git a/2026-04-rtm-transformWithState-GamingSessionization/README.md b/2026-04-rtm-transformWithState-GamingSessionization/README.md new file mode 100644 index 0000000..854bcc4 --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/README.md @@ -0,0 +1,501 @@ +# RealTimeMode vs MicroBatch: GamingSessionizationDemo + +**Self-serve sample** you can import into **Databricks** and run end-to-end: a full **gaming sessionization** pipeline on **Apache Spark™ Structured Streaming**, then **switch execution mode** to see how **Real-Time Mode (RTM)** and **micro-batch mode (MBM)** behave on **the same workload**—especially **end-to-end latency** for both **incoming Kafka events** and **timer-driven heartbeats** (`transformWithState` + processing-time timers). + +You bring **Kafka**, **Unity Catalog**, and a **Databricks Runtime** that supports this workload; we provide the **notebooks** and **data generator / replay** path so a customer team can reproduce the comparison in their own workspace without a separate streaming engine. **Sessionization:** **latest DBR 18.x** for the newest **RTM** features. **PC + console ingest:** a **recent DBR LTS** is enough (see Prerequisites). + +### Companion blog post + +**TODO:** When the companion blog is published, paste the **full URL** below. + +**Blog post:** *`[add full https://… link when published]`*. + +--- + +## Try it yourself + +This package matches the **“Try it yourself”** path from the Databricks **gaming sessionization** story: + +1. **Data** — [`ingest-source-data/generate-fake-session-data.py`](ingest-source-data/generate-fake-session-data.py): build **~1 hour** of synthetic/replay data in Delta, tuned toward **~500K session events per minute** for the reference test (see notebook for schema and volume knobs). +2. **Topics** — [`create-delete-topic.py`](ingest-source-data/create-delete-topic.py) or [`create-delete-topic-scala.scala`](ingest-source-data/create-delete-topic-scala.scala): create **`pc_sessions`**, **`console_sessions`**, **`output_sessions`**. Run on **any** **`m5d.4xlarge` × 2 workers** cluster you already use for **PC ingest**, **console ingest**, or (if you run it) **`Write_RTM_session_results_to_delta.py`**—**no** separate cluster just for topic admin. +3. **Sessionization first** — [`RTM-Sessionization/RTM-TWS.scala`](RTM-Sessionization/RTM-TWS.scala) on the **sessionization** cluster: choose **`RTM`** or **`MBM`** from the **`mode`** widget; start the stream so it is **already reading** Kafka **before** you flood topics from ingest. +4. **Ingest on separate clusters** — [`Kafka-pc-sessions-stream-ingest.py`](ingest-source-data/Kafka-pc-sessions-stream-ingest.py) and [`Kafka-console-sessions-stream-ingest.py`](ingest-source-data/Kafka-console-sessions-stream-ingest.py) on **two smaller** clusters push Delta stream tables into **`pc_sessions`** / **`console_sessions`**; **`RTM-TWS.scala`** writes to **`output_sessions`** for **both** RTM and MBM (**`mode`** switches trigger, Kafka **`maxPartitions`**, and shuffle tuning—see Step 3 below). +5. **Compare MBM vs RTM** — repeat the run with the other **`mode`**, **same** ingest layout, **`clean_checkpoint`** = `yes` when switching; use **`StreamingQueryListener`** logs ( **`latencies`** JSON in **RTM** ) for latency contrast. + +**What you are proving:** Same **`StatefulProcessor`**, same business logic, **same cluster**—you only change **`mode`** (which switches the **trigger** and the small **Kafka** / **shuffle** settings tied to that branch)—then you compare **latency** and **timer responsiveness** between **Real-Time Mode** and **micro-batch** without rewriting the session rules. + +--- + +## What you'll learn + +In this demo, you will: + +1. **Generate or replay** session-shaped data into **Delta**, then **feed Kafka** at a controlled rate (time-sliced append from the data notebook). +2. Run a **stateful** pipeline with **`transformWithState`** and a **`StatefulProcessor`** keyed by **`deviceId`**. +3. Use **`handleExpiredTimer()`** to emit **heartbeats on a schedule**, not only when new records arrive. +4. **Measure the gap** between **RTM** and **MBM** on the same pipeline using **`StreamingQueryListener`** (and your own Kafka timestamp analysis if desired). +5. Optionally **[`Write_RTM_session_results_to_delta.py`](RTM-Sessionization/Write_RTM_session_results_to_delta.py)** + **[`debug.sql`](debug.sql)** — land **`output_sessions`** in **Delta**, then query **latency** in SQL (skip if **`StreamingQueryListener`** is enough). Optional **Lakebase**: **Optional: Writing to Lakebase**. + +--- + +## The use case: per-device gaming sessions + +### The business problem + +Platforms need a **single live view per device**: when a session **starts**, how long it has been **active**, and when it **ends**—including **heartbeat-style** updates on a fixed cadence for millions of devices. + +| Event (upstream name) | Role | +|----------------------|------| +| **`ApplicationSessionStart`** | Opens a session for a **device** + **appSessionId**; must arm timers and emit an “active session” signal. | +| **`ApplicationSessionEndBi`** | Closes the session with final **foreground time** (`totalFgTime`). | + +Events arrive on **separate Kafka topics** for **PC** vs **console**; the pipeline **unifies** them and derives a common **`deviceId`** (`hostPcId` vs `openPsid`). + +### Why this is challenging + +``` +Timeline (one device): +────────────────────────────────────────────────────────────▶ time + │ │ │ │ + ▼ ▼ ▼ ▼ + SessionStart Heartbeat Heartbeat SessionEnd + (Kafka) (timer-driven) (timer-driven) (Kafka) +``` + +**Challenges** + +- You must **hold state** per device until the session ends or times out. +- You must **emit on a clock** (e.g. every **30 s**) even when **no** new Kafka rows arrive. +- A **new start** while a session is still active must **resolve overlap** (end or supersede the prior session—see processor logic). +- You want **low end-to-end latency** for both **input rows** and **timer firings**—that is what you **A/B** in the main notebook (**MBM** then **RTM**, or the reverse, with consistent load). + +### How this demo solves it + +The **`Sessionization`** processor (in [`RTM-Sessionization/RTM-TWS.scala`](RTM-Sessionization/RTM-TWS.scala)) uses **`transformWithState`** to: + +1. **Group by `deviceId`** so one logical processor owns all events for a device. +2. **`MapState[appSessionId → session metadata]`** for the active session. +3. **`registerTimer()`** on **processing time** for heartbeat cadence (**30 s** in code) and **`handleExpiredTimer()`** for proactive output. +4. **Emit JSON** to the **`output_sessions`** Kafka topic (main demo). To inspect **sessions** and **E2E latency** in SQL, optionally land that topic into **Delta** with **`Write_RTM_session_results_to_delta.py`** and run **`debug.sql`**. Optional **Lakebase** JDBC sink: see **Optional: Writing to Lakebase**. + +--- + +## Project structure + +``` +blog-RTM-IOT/ +├── debug.sql # SQL on Delta: ingest counts, session mix, E2E latency (needs Write_* Delta table) +├── ingest-source-data/ +│ ├── generate-fake-session-data.py # UC/Delta + time-sliced replay → *_sessions_stream +│ ├── Kafka-pc-sessions-stream-ingest.py # Delta streaming → Kafka topic pc_sessions +│ ├── Kafka-console-sessions-stream-ingest.py +│ ├── create-delete-topic.py # Topic admin (kafka-python) +│ └── create-delete-topic-scala.scala # Topic admin (Kafka AdminClient) +└── RTM-Sessionization/ + ├── RTM-TWS.scala # Main: Kafka → transformWithState → Kafka (RTM/MBM widget) + ├── RTM-TWS-Lakebase.scala # Optional: RTM → Lakebase (check RTM latencies / JDBC sink) — see "Optional: Writing to Lakebase" + └── Write_RTM_session_results_to_delta.py +``` + +**Fast path (matches the reference latency test):** + +1. [`generate-fake-session-data.py`](ingest-source-data/generate-fake-session-data.py) → Delta **`pc_sessions_stream`** / **`console_sessions_stream`** +2. [`create-delete-topic`](ingest-source-data/create-delete-topic.py) **(Python)** or [`create-delete-topic-scala.scala`](ingest-source-data/create-delete-topic-scala.scala) — attach to **any** **`m5d.4xlarge` × 2 workers** cluster used for **ingest** or **`Write_RTM_session_results_to_delta.py`** (no extra cluster for topic admin) +3. [`RTM-TWS.scala`](RTM-Sessionization/RTM-TWS.scala) on **sessionization cluster** (**`m6gd.4xlarge` × 8 workers** in reference tests) — **`mode`** = **RTM** or **MBM** +4. [`Kafka-pc-sessions-stream-ingest.py`](ingest-source-data/Kafka-pc-sessions-stream-ingest.py) + [`Kafka-console-sessions-stream-ingest.py`](ingest-source-data/Kafka-console-sessions-stream-ingest.py) on **two** **`m5d.4xlarge` × 2 workers** (+ driver) clusters **after** the stream is up +5. (Optional) [`Write_RTM_session_results_to_delta.py`](RTM-Sessionization/Write_RTM_session_results_to_delta.py) — **`output_sessions`** (Kafka) → **`session_results_rtm`** (Delta); own **`m5d.4xlarge` × 2 workers** + driver +6. (Optional) [`debug.sql`](debug.sql) — SQL over that Delta table (+ stream tables) for **latency** and session checks + +--- + +## Prerequisites + +### 1. Databricks workspace + +- **Sessionization (`RTM-TWS.scala`, optional Lakebase):** **latest DBR 18.x**—that line has the **latest Real-Time Mode** support; see [Real-time mode reference](https://docs.databricks.com/en/structured-streaming/real-time/reference.html). +- **PC + console Delta → Kafka ingest** ([`Kafka-pc-sessions-stream-ingest.py`](ingest-source-data/Kafka-pc-sessions-stream-ingest.py), [`Kafka-console-sessions-stream-ingest.py`](ingest-source-data/Kafka-console-sessions-stream-ingest.py)): use a **recent DBR LTS** your workspace offers (or **latest 18.x** if you want one image everywhere). + +### 2. Apache Kafka + +- Bootstrap servers reachable from the cluster. +- Topics used by the notebooks (default names—change to match yours): + +| Topic | Purpose | +|-------|---------| +| **`pc_sessions`** | PC session JSON events | +| **`console_sessions`** | Console session JSON events | +| **`output_sessions`** | Sessionization **sink** topic (`RTM-TWS.scala`; **RTM** and **MBM** both use this topic) | + +Run [`ingest-source-data/create-delete-topic.py`](ingest-source-data/create-delete-topic.py) or [`create-delete-topic-scala.scala`](ingest-source-data/create-delete-topic-scala.scala) to create/delete topics with the expected partition counts, or align your own topic layout. **Cluster:** use **any** **`m5d.4xlarge` × 2 workers** cluster already used for **PC ingest**, **console ingest**, or (if you run it) **`Write_RTM_session_results_to_delta.py`**—**no** dedicated topic-admin cluster. + +### 3. Databricks secrets + +Store **Kafka** bootstrap servers (comma-separated **`host:port`** list) under the names the notebooks expect (and **Lakebase** JDBC secrets **only if** you run the optional **`RTM-TWS-Lakebase.scala`** showcase): + +| Item | Value used in code | +|------|---------------------| +| **Secret scope** | `gaming-sessionization-rtm-demo` | +| **Secret key** (Kafka bootstrap) | `kafka-bootstrap-servers` | +| **Secret key** (Lakebase JDBC user, optional showcase) | `lakebase-jdbc-username` | +| **Secret key** (Lakebase JDBC password, optional showcase) | `lakebase-jdbc-password` | + +```bash +databricks secrets create-scope --scope gaming-sessionization-rtm-demo + +databricks secrets put --scope gaming-sessionization-rtm-demo --key kafka-bootstrap-servers +# paste comma-separated host:port list, same value all notebooks read + +# only if you run RTM-TWS-Lakebase.scala: +databricks secrets put --scope gaming-sessionization-rtm-demo --key lakebase-jdbc-username +databricks secrets put --scope gaming-sessionization-rtm-demo --key lakebase-jdbc-password +``` + +All Kafka notebooks call **`dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers")`**. **`RTM-TWS-Lakebase.scala`** reads **`lakebase-jdbc-username`** and **`lakebase-jdbc-password`** from the same scope. To use different names, update every notebook (or use a single `%run` / widget-driven config). + +### 4. Unity Catalog volume + +Notebooks checkpoint under a **Unity Catalog volume**. **Create an external volume**, then **update `volume_path`** in the notebooks to your **`/Volumes/...`** path. + +--- + +## Setup instructions + +### Step 1: Libraries and Spark configuration + +When you create clusters: **sessionization** → **latest DBR 18.x**. **PC** and **console** ingest clusters → **recent DBR LTS** (or **latest 18.x** to match). + +**Maven (topic admin Scala notebook example):** `org.apache.kafka:kafka-clients:3.5.1` + +**Python topic admin:** `%pip install kafka-python` (see [`create-delete-topic.py`](ingest-source-data/create-delete-topic.py)). + +**Spark settings used in the main sessionization notebook** ([`RTM-TWS.scala`](RTM-Sessionization/RTM-TWS.scala)): + +``` +spark.sql.streaming.stateStore.providerClass com.databricks.sql.streaming.state.RocksDBStateStoreProvider +# spark.sql.shuffle.partitions is set in code: 112 (RTM) vs 128 (MBM)—see Step 3 of the reference test +``` + +RTM vs MBM also differ by **Kafka `maxPartitions`** (RTM-only) and **`writeStream` trigger** (`RealTimeTrigger` vs `Trigger.ProcessingTime`)—see **Running the reference latency test → Step 3**. For **cluster-level** Real-Time Mode requirements (shuffle manager, scheduler, feature flags), follow the current **Databricks** docs for your DBR—do not assume the same JVM flags as unrelated demos. + +### Step 2: Import notebooks + +1. Clone or download this repository. +2. Import **`.py`** and **`.scala`** sources into your Databricks workspace (Repos or Workspace). +3. Keep paths consistent so `%run` / relative imports match how you organize files (this repo is flat per folder; adjust if you nest folders). + +### Step 3: Catalog, schema, volume, and external storage + +Notebooks assume this **Unity Catalog** layout (change only if your workspace uses different names): + +| Object | Default name in this repo | +|--------|---------------------------| +| **Catalog** | `gaming_sessionization_demo` | +| **Schema** | `rtm_workload` | +| **External volume** (checkpoints / Kafka sidecar) | `gaming_sessionization_demo.rtm_workload.write_to_kafka` | +| **Volume mount path** | `/Volumes/gaming_sessionization_demo/rtm_workload/write_to_kafka` | + +In the Delta → Kafka ingest notebooks: **create the external volume** (fix the storage path in the **`CREATE EXTERNAL VOLUME`** SQL for your cloud), then **update `volume_path`** to match your volume’s **`/Volumes/...`** URI. + +### Step 4: Update Kafka topic names (optional) + +If you rename topics, update variables in: + +- [`Kafka-pc-sessions-stream-ingest.py`](ingest-source-data/Kafka-pc-sessions-stream-ingest.py) / [`Kafka-console-sessions-stream-ingest.py`](ingest-source-data/Kafka-console-sessions-stream-ingest.py) +- [`RTM-TWS.scala`](RTM-Sessionization/RTM-TWS.scala) (`pc_sessions_topic`, `console_sessions_topic`, `output_topic`) + +--- + +## Running the reference latency test + +Run steps **in this order** so the sessionization query is **already live** when Kafka ingest ramps; that matches how the **MBM vs RTM** comparison was exercised (ingest on **isolated** clusters so CPU for **Delta → Kafka** does not steal from **`transformWithState`**). + +For a **fair A/B**, keep **ingest cluster sizes**, **data rate**, and **Kafka** layout identical; only change **`mode`** (and use **`clean_checkpoint`** when switching modes—see widgets in **`RTM-TWS.scala`**). + +--- + +### Step 1 — Generate data: `generate-fake-session-data.py` + +**What it does:** Creates or uses **Unity Catalog / Delta** session tables, then builds **`pc_sessions_stream`** and **`console_sessions_stream`** (for example **per-second** replay from **`pc_sessions_bkp`** / **`console_sessions_bkp`** in the sample notebook). + +**Reference test goal:** About **one hour** of event timeline in the backing data, with volume tuned so the **combined** PC + console path targets on the order of **~500K session events per minute** for the latency run. The notebook includes **commented** generators with explicit per-minute session counts you can reuse or adjust. + +**Delta columns (same logical schema as JSON in Kafka `value`):** + +| Column | Type (logical) | Notes | +|--------|----------------|--------| +| `appSessionId` | long | Gaming session id | +| `eventId` | string | `ApplicationSessionStart` or `ApplicationSessionEndBi` | +| `psnAccountId` | string | Account | +| `hostPcId` | string | PC device id (**PC** path; null on console-only rows) | +| `openPsid` | string | Console device id (**console** path; null on PC-only rows) | +| `timestamp` | timestamp | Event time | +| `totalFgTime` | long | Foreground seconds (**0** on start; set on end) | + +Ingest notebooks turn each row into a **JSON string** in the Kafka **`value`**; [`RTM-TWS.scala`](RTM-Sessionization/RTM-TWS.scala) parses that JSON and sets **`deviceId`** from **`hostPcId`** (topic **`pc_sessions`**) or **`openPsid`** (topic **`console_sessions`**). + +--- + +### Step 2 — Create Kafka topics + +Run **one** of: + +- [`ingest-source-data/create-delete-topic.py`](ingest-source-data/create-delete-topic.py) (install **`kafka-python`** via `%pip` in the notebook), or +- [`ingest-source-data/create-delete-topic-scala.scala`](ingest-source-data/create-delete-topic-scala.scala) (**Maven:** `org.apache.kafka:kafka-clients:3.5.1`). + +**Cluster:** attach either notebook to **any** reference **`m5d.4xlarge`**, **2 workers** + **1 driver** cluster you use for **PC ingest**, **console ingest**, or (if you run it) **`Write_RTM_session_results_to_delta.py`**. Topic admin is lightweight—**no** dedicated topic-admin cluster. + +**Default topic names in code** (update every notebook if you change them): + +| Kafka topic | Used by | +|-------------|---------| +| **`pc_sessions`** | `Kafka-pc-sessions-stream-ingest.py` (writes), `RTM-TWS.scala` (reads) | +| **`console_sessions`** | `Kafka-console-sessions-stream-ingest.py` (writes), `RTM-TWS.scala` (reads) | +| **`output_sessions`** | **Sink** for `RTM-TWS.scala` (**RTM** and **MBM**); **`Write_RTM_session_results_to_delta.py`** reads this topic | + +`RTM-TWS.scala` uses **`val output_topic = "output_sessions"`** for both **`mode`** values—the widget swaps **trigger**, **Kafka `maxPartitions`**, and **shuffle partitions** as in the snippets above. For RTM vs MBM A/B, **stop**, **`clean_checkpoint`**, switch **`mode`**, restart; consumers always read **`output_sessions`**. + +The topic scripts **delete** then **recreate** these topics—use only where that is safe. + +--- + +### Step 3 — Start sessionization first: `RTM-TWS.scala` (sessionization cluster) + +**Notebook:** [`RTM-Sessionization/RTM-TWS.scala`](RTM-Sessionization/RTM-TWS.scala) + +**What it does:** Structured Streaming **Kafka** source on **`pc_sessions`** + **`console_sessions`** → **`transformWithState`** → **Kafka** sink. **Default sink topic:** **`output_sessions`** (constant `output_topic` in the notebook—**both modes** write there unless you change it). + +**Widgets:** **`mode`** = **`RTM`** or **`MBM`** (switches **trigger**, **Kafka `maxPartitions`**, and **`spark.sql.shuffle.partitions`**—see code blocks above). **`clean_checkpoint`** = **`yes`** for a cold start (recommended when switching **RTM** vs **MBM** for comparison). + +**Reference cluster (published test):** **8 × `m6gd.4xlarge` workers** (~**512 GB** aggregate worker memory, **128** vCPU across workers, plus driver)—**dedicated** to this query only. Run on **latest DBR 18.x**. + +Start the streaming query and leave it **running**. Until ingest begins, the query may idle; once **`pc_sessions`** / **`console_sessions`** receive data, the **sink topic** (default **`output_sessions`**) fills with **SessionStart**, **SessionHeartbeat**, **SessionEnd**, etc. + +**Triggers (from the notebook):** + +```scala +.trigger( + if (mode == "RTM") RealTimeTrigger.apply("5 minutes") + else Trigger.ProcessingTime("0.5 seconds") +) +``` + +**Shuffle partitions (`spark.sql.shuffle.partitions`)** — set before the Kafka read in [`RTM-TWS.scala`](RTM-Sessionization/RTM-TWS.scala): + +```scala +if (mode == "RTM") { + spark.conf.set("spark.sql.shuffle.partitions", "112") +} else { + spark.conf.set("spark.sql.shuffle.partitions", "128") +} +``` + +**Kafka reader — `maxPartitions` (RTM vs MBM):** in **RTM** mode only, the stream adds **`.option("maxPartitions", 16)`** on the Kafka source (via **`pipeIf`**). In **MBM** mode that option is **omitted**, so the Kafka micro-batch reader uses **default** `maxPartitions` behavior. + +```scala +spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext) + .option("subscribe", s"$pc_sessions_topic,$console_sessions_topic") + .option("startingOffsets", "earliest") + .pipeIf(mode == "RTM")(_.option("maxPartitions", 16)) + .load() +``` + +--- + +### Step 4 — Start Delta → Kafka ingest (two separate clusters) + +After **`RTM-TWS.scala`** is up, start **each** ingest notebook on its **own** cluster so load is isolated. + +| Notebook | Delta source table (default in code) | Kafka topic | Reference cluster | +|----------|----------------------------------------|-------------|---------------------| +| [`Kafka-pc-sessions-stream-ingest.py`](ingest-source-data/Kafka-pc-sessions-stream-ingest.py) | `gaming_sessionization_demo.rtm_workload.pc_sessions_stream` | **`pc_sessions`** | **`m5d.4xlarge`**, **2 workers** + **1 driver**, **recent DBR LTS** (or **latest 18.x**) | +| [`Kafka-console-sessions-stream-ingest.py`](ingest-source-data/Kafka-console-sessions-stream-ingest.py) | `gaming_sessionization_demo.rtm_workload.console_sessions_stream` | **`console_sessions`** | **`m5d.4xlarge`**, **2 workers** + **1 driver**, **recent DBR LTS** (or **latest 18.x**), second cluster | + +- **`maxFilesPerTrigger`** and the **Delta** replay layout are aligned with the **~500K session events / minute** reference scenario when paired with the data notebook’s volume settings. +- If you use other UC names, search-replace **`gaming_sessionization_demo`** / **`rtm_workload`** across the repo. + +--- + +### Step 5 — Optional: Kafka **`output_sessions`** → Delta + **`debug.sql`** + +Run **[`Write_RTM_session_results_to_delta.py`](RTM-Sessionization/Write_RTM_session_results_to_delta.py)** when you want **sessionization results** from the **`output_sessions`** Kafka topic in a **Delta** table so you can query them easily—including **end-to-end latency**—with **[`debug.sql`](debug.sql)** in **Databricks SQL** (or **`%sql`**). + +That notebook is a small **Kafka → Delta** stream: subscribe to **`output_sessions`**, parse JSON into columns, write **`gaming_sessionization_demo.rtm_workload.session_results_rtm`** (rename in the notebook if you use different UC paths). You can skip it if **`StreamingQueryListener`** alone is enough. + +**Cluster (reference):** its **own** **`m5d.4xlarge`**, **2 workers** + **1 driver** (same footprint as each ingest cluster). If UC names differ from the defaults, search-replace in **`debug.sql`** too. + +--- + +### Step 6 — Compare **RTM** vs **MBM** + +Run the **same** ingest and cluster plan twice: once with **`mode = MBM`**, once with **`mode = RTM`** (or reverse). Reset checkpoints when switching. Compare **`CustomStreamingQueryListener`** (and **`latencies`** in **RTM**), or use **Step 5** + **[`debug.sql`](debug.sql)** for **SQL** on **`session_results_rtm`**. + +### Latency comparison (reference cluster size) + +**RTM** vs **MBM** **end-to-end** latency (**Kafka input topic timestamp → `output_sessions` Kafka timestamp**) measured when the pipeline was tested on the **reference cluster size** in this README: **8 × `m6gd.4xlarge` workers**, **latest DBR 18.x**, **~500K session events/minute** combined ingest, same **`RTM-TWS.scala`** shape as this repo. + +| Mode | **Median (p50)** | **p99** | +|------|------------------|--------| +| **RTM** | **145 ms** | **432 ms** | +| **MBM** | **1839 ms** | **8461 ms** | + +At this cluster size and load, **MBM** was about **~13×** slower than **RTM** on **median** latency and about **~20×** slower on **p99** (ratio of the numbers above). + +Your cluster will differ—treat this as a **baseline**, then reproduce with **`StreamingQueryListener`**, **`debug.sql`**, or your own **Kafka** timestamp analysis. + +--- + +## Optional: Writing to Lakebase + +[`RTM-TWS-Lakebase.scala`](RTM-Sessionization/RTM-TWS-Lakebase.scala) is **not** part of the **RTM vs MBM** **Kafka** path in this repo. It is an extra notebook that runs **Real-Time Mode** sessionization and sinks rows into **Lakebase** (Postgres) with **`ForeachWriter`**—**useful if you want to validate RTM behavior and latencies when the sink is Lakebase** (instead of only **`output_sessions`** + **Delta**/`debug.sql`), or to show **RTM** into a **managed Postgres** sink alongside the main demo. + +Set **`YOUR_LAKEBASE_DATABASE_HOST`** in the notebook to your **Lakebase** hostname (or read it from **secrets**). Use **Databricks secrets** for JDBC user and password (**`lakebase-jdbc-username`** / **`lakebase-jdbc-password`** in the same scope as the Kafka bootstrap secret—see Prerequisites). Do **not** hard-code credentials or real infrastructure hostnames in shared source. + +--- + +## Understanding the results + +- **Throughput:** With heavy timer-driven output, **output row count ≫ input event count**—most rows can come from **`handleExpiredTimer()`**, not from new Kafka messages. Expect **amplification** when comparing to raw ingest counts. +- **Latency (customer takeaway):** Compare **MBM** vs **RTM** with **`StreamingQueryProgress`** / **`CustomStreamingQueryListener`** (and **RTM** `latencies` JSON when enabled). For **SQL percentiles**, **`output_sessions`** must be in **Delta** via **[`Write_RTM_session_results_to_delta.py`](RTM-Sessionization/Write_RTM_session_results_to_delta.py)**; then **[`debug.sql`](debug.sql)** reads that table and computes **E2E latency** (`upstream_timestamp` → **`output_timestamp`**, ms) for **SessionStart** / **SessionEnd**. For **Kafka-only** timestamps, compare input vs **`output_sessions`** offline. Keep **data rate** and **cluster** constant between runs. +- **Operational:** Watch **`numExpiredTimers`**, **`numRegisteredTimers`**, and RocksDB custom metrics printed from **`stateOperators(0)`**. + +--- + +## Data model + +### Kafka message `value` (JSON) — input topics + +Same logical fields; **PC** uses **`hostPcId`**, **console** uses **`openPsid`** (the streaming read maps either to **`deviceId`**). + +| Field | Type (conceptual) | Description | +|-------|-------------------|-------------| +| `appSessionId` | long | Session identifier | +| `eventId` | string | `ApplicationSessionStart` / `ApplicationSessionEndBi` | +| `psnAccountId` | string | Account id | +| `hostPcId` | string | PC device id (null on console topic) | +| `openPsid` | string | Console device id (null on PC topic) | +| `timestamp` | timestamp | Event time | +| `totalFgTime` | long | Foreground seconds (meaningful on end event) | + +**Example (PC session start):** + +```json +{ + "hostPcId": "6fb86ebc-3d88-4e31-b913-ea939acacf56", + "appSessionId": 9402035216, + "psnAccountId": "d03fae97-999999872866342", + "eventId": "ApplicationSessionStart", + "timestamp": "2025-11-01T00:06:45.000+00:00", + "totalFgTime": 0 +} +``` + +**Example (PC session end):** same device, session, and account as the start event; **`eventId`** is **`ApplicationSessionEndBi`**; **`timestamp`** is the end time; **`totalFgTime`** is foreground duration in **seconds** (here **240** = 4 minutes after the start example). + +```json +{ + "hostPcId": "6fb86ebc-3d88-4e31-b913-ea939acacf56", + "appSessionId": 9402035216, + "psnAccountId": "d03fae97-999999872866342", + "eventId": "ApplicationSessionEndBi", + "timestamp": "2025-11-01T00:10:45.000+00:00", + "totalFgTime": 240 +} +``` + +### Kafka message `value` (JSON) — output topic + +Structured as a single JSON object per row (see `struct` → `to_json` in `RTM-TWS.scala`). Downstream **Delta** schema in [`Write_RTM_session_results_to_delta.py`](RTM-Sessionization/Write_RTM_session_results_to_delta.py): + +| Field | Description | +|-------|-------------| +| `deviceId` | Grouping key | +| `appSessionId` | Session id | +| `psnAccountId` | Account | +| `sessionStatus` | e.g. `SessionStart`, `SessionHeartbeat`, `SessionEnd` | +| `session_timestamp` | Original session event timestamp | +| `sessionDuration` | Duration used for heartbeats / end | +| `upstream_timestamp` | Kafka record timestamp path preserved in processor | +| `processing_timestamp` | Processing-time snapshot | +| `timer_info` | Next timer boundary when applicable | +| `debug_info` | Processor branch / diagnostics string | + +--- + +## Architecture + +``` +┌──────────────────┐ ┌─────────────────────┐ ┌─────────────────────────┐ +│ Delta replay │────▶│ Delta → Kafka │────▶│ pc_sessions / │ +│ (*_sessions_stream) │ (ingest notebooks) │ │ console_sessions topics │ +└──────────────────┘ └─────────────────────┘ └────────────┬────────────┘ + │ + ▼ +┌──────────────────┐ ┌─────────────────────┐ ┌─────────────────────────┐ +│ Optional Delta │◀────│ output_sessions │◀────│ transformWithState │ +│ sink notebook │ │ (Kafka) │ │ (StatefulProcessor) │ +└──────────────────┘ └─────────────────────┘ └─────────────────────────┘ +``` + +**Optional Lakebase path:** same *class* of sessionization over **JDBC** into **Lakebase** instead of the **Kafka** sink—see **Optional: Writing to Lakebase** (not used for the default RTM vs MBM latency testbed). + +--- + +## Key concepts + +### RTM vs MBM in this repo + +| Aspect | MBM (`ProcessingTime` trigger) | RTM (`RealTimeTrigger` in sample) | +|--------|--------------------------------|-----------------------------------| +| **Goal** | Baseline micro-batch latency | Lower end-to-end latency for operational-style workloads | +| **Tuning** | `spark.sql.shuffle.partitions` = **`128`**; Kafka source has **no** explicit **`maxPartitions`** | **`112`** + Kafka **`.option("maxPartitions", 16)`** (RTM only, via `pipeIf`) | +| **When to pick** | Higher batch interval tolerance, simpler ops | Strict latency for input + timer output (validate on your DBR) | + +**DBR** lines evolve—stay on **latest 18.x** for current **RTM** behavior; otherwise cross-check [Real-time mode in Structured Streaming](https://docs.databricks.com/en/structured-streaming/real-time/index.html). + +### Why processing time (not event time)? + +This demo uses **`TimeMode.ProcessingTime`** in **`transformWithState`** because: + +- **Simpler implementation** — heartbeats and session caps do not need **event-time watermarks** or late-data policy on the timer path; you still carry **`upstream_timestamp`** from Kafka on each row for **latency** measurement. +- **Predictable cleanup** — **`registerTimer()`** fires on **wall-clock** processing time, so heartbeat cadence and max-session behavior stay stable under load. +- **Fits live operational signals** — for **“is this device still in-session right now?”**, a **processing-time** heartbeat matches **wall-clock** expectations; the workload is not optimized for **historical replay** correctness under skewed event times. + +In code ([`RTM-TWS.scala`](RTM-Sessionization/RTM-TWS.scala)): + +```scala +.transformWithState( + new Sessionization(), + TimeMode.ProcessingTime, // processing time, not event time + OutputMode.Update() +) +``` + +### Processing-time timers + +Heartbeats use **processing time**, not event time watermarks—appropriate when you want **wall-clock** cadence and predictable timer behavior for this demo pattern: + +```scala +val timerMillis = timerValues.getCurrentProcessingTimeInMs() + TIMER_THRESHOLD_IN_MS +getHandle.registerTimer(timerMillis) +``` + +**Constants in [`RTM-TWS.scala`](RTM-Sessionization/RTM-TWS.scala):** `TIMER_THRESHOLD_IN_MS = 30000`, `SESSION_THRESHOLD_IN_SECONDS = 1800` (max session length before timer path emits **SessionEnd**). + +### State shape + +- **`MapState[Long, MeasuredSessionValue]`** keyed by **`appSessionId`** inside each **`deviceId`** group. +- **Timers** are listed and deleted when ending sessions so stale timers do not accumulate (`listTimers` / `deleteTimer` in **`generateSessionEnd`**). + +--- + +## Additional resources + +- [Real-time mode in Structured Streaming (Databricks)](https://docs.databricks.com/en/structured-streaming/real-time/index.html) +- [Real-time mode reference](https://docs.databricks.com/en/structured-streaming/real-time/reference.html) +- [**transformWithState** — Stateful applications (Databricks)](https://docs.databricks.com/aws/en/stateful-applications/) + +--- + +Happy streaming. diff --git a/2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/RTM-TWS-Lakebase.scala b/2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/RTM-TWS-Lakebase.scala new file mode 100644 index 0000000..6265552 --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/RTM-TWS-Lakebase.scala @@ -0,0 +1,829 @@ +// Databricks notebook source +// import org.apache.spark.sql.streaming.{TTLConfig, ValueState, OutputMode, StatefulProcessor, TimeMode, TimerValues} +import org.apache.spark.sql.streaming._ +import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ + +import org.apache.spark.sql.streaming.Trigger +import org.apache.log4j.Logger +import java.sql.Timestamp +import java.time._ +import scala.collection.mutable.ListBuffer +import java.time.Duration +import org.apache.spark.sql.execution.streaming.RealTimeTrigger +import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress} + +// COMMAND ---------- + +spark.conf.set( + "spark.sql.streaming.stateStore.providerClass", + "com.databricks.sql.streaming.state.RocksDBStateStoreProvider" +) +// spark.conf.set( +// "spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled", +// "true" +// ) +spark.conf.set("spark.sql.shuffle.partitions", "64") + +// COMMAND ---------- + +val stream_name = "RTM-tws-poc" +val volume_path = "/Volumes/gaming_sessionization_demo/rtm_workload/write_to_kafka" +val checkpoint_path = s"$volume_path/$stream_name" + +dbutils.widgets.text("clean_checkpoint", "yes") +val clean_checkpoint = dbutils.widgets.get("clean_checkpoint") +if (clean_checkpoint == "yes") { + dbutils.fs.rm(checkpoint_path, true) +} + +// COMMAND ---------- + +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +class CustomStreamingQueryListener extends StreamingQueryListener { + implicit val formats: Formats = DefaultFormats + override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { + println(s"Query started: id=${event.id}, name=${event.name}") + } + override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { + val progress: StreamingQueryProgress = event.progress + println(s"batchId = ${progress.batchId} " + s"timestamp = ${progress.timestamp} " + s"numInputRows=${progress.numInputRows} " + s"batchDuration=${progress.batchDuration} ") + val stateOperators = progress.stateOperators(0) + val stateMetrics = stateOperators.customMetrics + // print(stateOperators) + println( + s"numRowsTotal = ${stateOperators.numRowsTotal} " + s"numRowsUpdated = ${stateOperators.numRowsUpdated} " + + s"numExpiredTimers=${stateMetrics.get("numExpiredTimers")} " + s"numRegisteredTimers=${stateMetrics.get("numRegisteredTimers")} " + + s"rocksdbPutLatency=${stateMetrics.get("rocksdbPutLatency")} " + s"timerProcessingTimeMs=${stateMetrics.get("timerProcessingTimeMs")} " + ) + val progress_json = parse(progress.json) + val latenciesJson: JValue = progress_json \ "latencies" + println(pretty(render(latenciesJson))) + } + override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { + println(s"Query terminated: id=${event.id}, runId=${event.runId}") + } +} + +val listener = new CustomStreamingQueryListener + +// Remove existing instance if present +spark.streams.removeListener(listener) + +// Add the listener +spark.streams.addListener(listener) + +// COMMAND ---------- + +val kafka_bootstrap_servers_plaintext = dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers") +val pc_sessions_topic = "pc_sessions" +val console_sessions_topic = "console_sessions" +val output_topic = "output_sessions" + +// COMMAND ---------- + +val kafka_schema = new StructType() + .add("appSessionId", LongType) + .add("eventId", StringType) + .add("psnAccountId", StringType) + .add("hostPcId", StringType) // for pc_sessions, may be null for console_sessions + .add("openPsid", StringType) // for console_sessions, may be null for pc_sessions + .add("timestamp", TimestampType) + .add("totalFgTime", LongType) + +// COMMAND ---------- + +package org.databricks.TransformWithStateStructs +import java.sql.Timestamp + +object SessionStructs { + case class InputRow( + topic: String, + partition: Int, + offset: Long, + kafka_timestamp: Timestamp, + deviceId: String, + psnAccountId: String, + appSessionId: Long, + eventId: String, + session_timestamp: Timestamp, + totalFgTime: Long, + ) + + case class OutputRow( + deviceId: String, + appSessionId: Long, + psnAccountId: String, + sessionStatus: String, + session_timestamp: Timestamp, + sessionDuration: Long, + upstream_timestamp: Timestamp, + processing_timestamp: Timestamp, + timer_info: Timestamp, + debug_info: String + ) + + case class MeasuredSessionValue( + psnAccountId: String, + sessionStatus: String, + session_timestamp: Timestamp, + sessionDuration: Long, + upstream_timestamp: Timestamp, + processing_timestamp: Timestamp, + timer_info: Timestamp, + ) +} + +// COMMAND ---------- + +val input_stream_df = spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext) + .option("subscribe", s"$pc_sessions_topic,$console_sessions_topic") + .option("startingOffsets", "earliest") + .option("maxPartitions", 16) + .load() + .withColumn("key", col("key").cast("string")) + .withColumn("value", col("value").cast("string")) + .withColumnRenamed("timestamp", "kafka_timestamp") + .withColumn("session", from_json(col("value"), kafka_schema)) + .withColumn( + "deviceId", + when(col("topic") === pc_sessions_topic, col("session.hostPcId")) + .when(col("topic") === console_sessions_topic, col("session.openPsid")) + .otherwise(lit(null)) + ) + .selectExpr( + "topic", "partition", "offset", "kafka_timestamp", + "deviceId", "session.psnAccountId", "session.appSessionId","session.eventId","session.timestamp as session_timestamp", "session.totalFgTime" + ) + +// COMMAND ---------- + +// DBTITLE 1,Untitled +// import SessionStructs._ +import org.databricks.TransformWithStateStructs.SessionStructs._ + +class Sessionization() extends StatefulProcessor[String, InputRow, OutputRow] { + + @transient protected var sessionStatusState: MapState[Long, MeasuredSessionValue] = _ + + val TIMER_THRESHOLD_IN_MS = 30000 + val DEBUG_INFO_1 = "No Existing DeviceID-SessionStart" + val DEBUG_INFO_2 = "Existing DeviceID-Another SessionStart" + val DEBUG_INFO_3 = "Existing DeviceID-SessionEnd for existing Session" + val DEBUG_INFO_4 = "Existing DeviceID-NoSessionEnd but another SessionStart" + + override def init( + outputMode: OutputMode, + timeMode: TimeMode): Unit = { + + sessionStatusState = getHandle.getMapState[Long, MeasuredSessionValue]("sessionStatusState", Encoders.scalaLong, Encoders.product[MeasuredSessionValue], TTLConfig.NONE) + } + + override def handleInputRows( + key: String, + inputRows: Iterator[InputRow], + timerValues: TimerValues): Iterator[OutputRow] = { + + var outputRows = ListBuffer[OutputRow]() + + inputRows.foreach { row => + // Check if the incoming record is for session start + if (row.eventId == "ApplicationSessionStart") { + // Generate sessionStart record, and emit that + val outputRecord = generateSessionStart(key, row, timerValues) + outputRows.append(outputRecord) + } // checking if the incoming record is for session end + else if (row.eventId == "ApplicationSessionEndBi") { + val outputRecord = generateSessionEnd(key, row, timerValues) + outputRows.append(outputRecord) + } + } + outputRows.iterator + } + + def generateSessionStart(key: String, inputRow: InputRow, timerValues: TimerValues): OutputRow = { + + // Get the processing time of the current micro-batch, remains same for every record in the same micro-batch + val currentProcessingTimeMillis = timerValues.getCurrentProcessingTimeInMs() + val currentProcessingTime = new Timestamp(currentProcessingTimeMillis) + + // create a new timer for next 30 sec, current processing time + 30 sec + val timerMillis = currentProcessingTimeMillis + TIMER_THRESHOLD_IN_MS + val timerTime = new Timestamp(timerMillis) + + // Generate sessionStart record + val outputRecord = OutputRow( + key, + inputRow.appSessionId, + inputRow.psnAccountId, + "SessionStart", + inputRow.session_timestamp, + 0, + inputRow.kafka_timestamp, + currentProcessingTime, + timerTime, + DEBUG_INFO_1 + ) + + // Create a key-value entry into the map state + val mapKey = inputRow.appSessionId + val mapValue = MeasuredSessionValue( + inputRow.psnAccountId, + "SessionStart", + inputRow.session_timestamp, + 0, + inputRow.kafka_timestamp, + currentProcessingTime, + timerTime + ) + sessionStatusState.updateValue(mapKey, mapValue) + + // Set the timer for current batch processingTime + 30 seconds + getHandle.registerTimer(timerMillis) + + outputRecord + } + + def generateSessionEnd(key: String, inputRow: InputRow, timerValues: TimerValues): OutputRow = { + // Get the processing time of the current micro-batch, remains same for every record in the same micro-batch + val currentProcessingTimeMillis = timerValues.getCurrentProcessingTimeInMs() + val currentProcessingTime = new Timestamp(currentProcessingTimeMillis) + + // Generate a sessionEnd record, and emit that + var outputRecord = OutputRow( + key, + inputRow.appSessionId, + inputRow.psnAccountId, + "SessionEnd", + inputRow.session_timestamp, + inputRow.totalFgTime, + inputRow.kafka_timestamp, + currentProcessingTime, + null, + DEBUG_INFO_3 + ) + + // remove the session from state + sessionStatusState.removeKey(inputRow.appSessionId) + + // Delete all the timers for the deviceId + val timerIter = getHandle.listTimers() + for (timer <- timerIter) getHandle.deleteTimer(timer) + + outputRecord + } + + // Define the logic for handling expired timers + override def handleExpiredTimer( + key: String, + timerValues: TimerValues, + expiredTimerInfo: ExpiredTimerInfo): Iterator[OutputRow] = { + + // Get the processing time of the current micro-batch, remains same for every record in the same micro-batch + val currentProcessingTimeMillis = timerValues.getCurrentProcessingTimeInMs() + val currentProcessingTime = new Timestamp(currentProcessingTimeMillis) + + // Get the expired Timer info + val expiredTimerMillis = expiredTimerInfo.getExpiryTimeInMs() + val expiredTimerTime = new Timestamp(expiredTimerMillis) + + // create a new timer for next 30 sec, current timer expiry time + 30 sec + val nextTimerMillis = expiredTimerMillis + TIMER_THRESHOLD_IN_MS + val nextTimerTime = new Timestamp(nextTimerMillis) + + var outputRows = ListBuffer[OutputRow]() + + // Expecting an entry for the expired timer for a given sessionId. + if (sessionStatusState.exists()) { + + // pull the existing sessionId, generate sessionHeartbeat record, and emit that + val sessionIdKeyIter = sessionStatusState.keys() + + for (sessionId <- sessionIdKeyIter) { + var sessionValue = sessionStatusState.getValue(sessionId) + + // Caluate the sessionDuration for heartbeat records, existing duration + 30 seconds + val currentSessionDuration: Long = sessionValue.sessionDuration + (currentProcessingTime.getTime - sessionValue.processing_timestamp.getTime)/1000 + val sessionEvent = "SessionHeartbeat" + + // Generate sessionHeartbeat record, and emit that + val outputRecord = OutputRow( + key, + sessionId, + sessionValue.psnAccountId, + sessionEvent, + sessionValue.session_timestamp, + currentSessionDuration, + sessionValue.upstream_timestamp, + currentProcessingTime, + nextTimerTime, + "Timer expired at: " + expiredTimerTime + ) + outputRows.append(outputRecord) + + // Update the existing key-value entry into the map state + val updatedSessionValue = sessionValue.copy( + sessionStatus = sessionEvent, + sessionDuration = currentSessionDuration, + processing_timestamp = currentProcessingTime, + timer_info = nextTimerTime + ) + sessionStatusState.updateValue(sessionId, updatedSessionValue) + + // create a new timer for next 30 sec, current processing time + 30 sec + getHandle.registerTimer(nextTimerMillis) + } + } + outputRows.iterator + } + +} + +// COMMAND ---------- + +val processed_stream_df = input_stream_df.as[InputRow] + .groupByKey(_.deviceId) + .transformWithState( + new Sessionization(), + TimeMode.ProcessingTime, + OutputMode.Update() + ) + .toDF() + .select( + col("deviceId"), + col("appSessionId"), + col("psnAccountId"), + col("sessionStatus"), + col("session_timestamp"), + col("sessionDuration"), + col("upstream_timestamp"), + col("processing_timestamp"), + col("timer_info"), + col("debug_info") + ) + +// COMMAND ---------- + +import java.sql.{Connection, DriverManager, PreparedStatement, Types} +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.{ForeachWriter, Row} +import org.apache.spark.sql.types._ +import org.slf4j.LoggerFactory + +// --------------------------------------------------------------------------- +// Configuration +// --------------------------------------------------------------------------- + +/** + * @param username Database username + * @param password Database password + * @param table Target table name (e.g., "public.my_table") + * @param host Lakebase host address + * @param mode Write mode: "insert", "upsert", or "bulk-insert" + * @param primaryKeys Primary key columns (required for upsert mode) + * @param batchSize Number of rows per batch (default: 1000) + * @param batchIntervalMs Max time between flushes in milliseconds (default: 100) + */ +case class LakebaseConfig( + username: String, + password: String, + table: String, + host: String, + mode: String = "insert", + primaryKeys: Seq[String] = Seq.empty, + batchSize: Int = 5000, + batchIntervalMs: Int = 100, + queueSize: Int = 50000 +) extends Serializable + +// --------------------------------------------------------------------------- +// ForeachWriter +// --------------------------------------------------------------------------- + +class LakebaseForeachWriter(config: LakebaseConfig, schema: StructType) + extends ForeachWriter[Row] with Serializable { + + @transient private lazy val logger = LoggerFactory.getLogger(getClass) + + private val columns: Array[String] = schema.fieldNames + private val columnTypes: Array[DataType] = schema.fields.map(_.dataType) + private val sql: Option[String] = buildSql() + + // Validate schema at construction time + locally { + val unsupported = findUnsupportedFields(schema) + if (unsupported.nonEmpty) { + throw new IllegalArgumentException( + s"Unsupported field types: ${unsupported.mkString(", ")}. " + + "Convert complex types to supported formats first." + ) + } + + val jdbcUrl = s"jdbc:postgresql://${config.host}:5432/databricks_postgres?sslmode=require" + val checkConn = DriverManager.getConnection(jdbcUrl, config.username, config.password) + try { + val columnDefs = schema.fields.map { field => + val pgType = sparkTypeToPgType(field.dataType) + val nullable = if (field.nullable) "" else " NOT NULL" + s"${field.name} $pgType$nullable" + }.mkString(", ") + + val pkClause = if (config.primaryKeys.nonEmpty) { + s", PRIMARY KEY (${config.primaryKeys.mkString(", ")})" + } else "" + + val createSql = + s"CREATE TABLE IF NOT EXISTS ${config.table} ($columnDefs$pkClause)" + + val stmt = checkConn.createStatement() + try { + stmt.execute(createSql) + } finally { + stmt.close() + } + logger.info(s"Ensured table ${config.table} exists") + } finally { + checkConn.close() + } + } + + // Runtime state (initialized in open(), not serialized) + @transient private var conn: Connection = _ + @transient private var queue: LinkedBlockingQueue[Array[Any]] = _ + @transient private var stopEvent: AtomicBoolean = _ + @transient private var workerThread: Thread = _ + @transient private var batch: ArrayBuffer[Array[Any]] = _ + @transient private var lastFlush: Long = _ + @transient private var workerError: String = _ + @transient private var partitionId: Long = _ + @transient private var epochId: Long = _ + + // ------------------------------------------------------------------------- + // ForeachWriter Interface + // ------------------------------------------------------------------------- + + override def open(partitionId: Long, epochId: Long): Boolean = { + try { + this.partitionId = partitionId + this.epochId = epochId + + val jdbcUrl = + s"jdbc:postgresql://${config.host}:5432/databricks_postgres?sslmode=require" + conn = DriverManager.getConnection(jdbcUrl, config.username, config.password) + conn.setAutoCommit(false) + + queue = new LinkedBlockingQueue[Array[Any]](config.queueSize) + stopEvent = new AtomicBoolean(false) + batch = ArrayBuffer.empty[Array[Any]] + lastFlush = System.currentTimeMillis() + workerError = null + + workerThread = new Thread(() => worker()) + workerThread.setDaemon(true) + workerThread.start() + + logger.info(s"[$partitionId|$epochId] Opening writer for table ${config.table}") + true + } catch { + case e: Exception => + logger.error(s"Failed to open writer: ${e.getMessage}", e) + false + } + } + + override def process(row: Row): Unit = { + if (workerError != null) { + throw new RuntimeException(s"Worker failed: $workerError") + } + + val rowData = new Array[Any](columns.length) + for (i <- columns.indices) { + rowData(i) = + if (row.isNullAt(i)) null + else convertValue(row.get(i), columnTypes(i)) + } + queue.put(rowData) + } + + override def close(error: Throwable): Unit = { + try { + if (stopEvent != null) stopEvent.set(true) + if (workerThread != null && workerThread.isAlive) { + val maxWaitMs = if (config.mode.toLowerCase == "upsert") 30000 else 5000 + val start = System.currentTimeMillis() + while (workerThread.isAlive && (System.currentTimeMillis() - start) < maxWaitMs) { + workerThread.join(1000) + } + val waited = System.currentTimeMillis() - start + if (workerThread.isAlive) { + logger.warn(s"[$partitionId|$epochId] Worker still alive after ${waited}ms (max: ${maxWaitMs}ms)") + } else { + logger.info(s"[$partitionId|$epochId] Worker finished after ${waited}ms") + } + } + if (queue != null && queue.size() > config.batchSize * 5) { + logger.warn( + s"[$partitionId|$epochId] Large queue remaining: ${queue.size()}" + ) + } + flushRemaining() + } finally { + if (conn != null) { + try conn.close() + catch { case _: Exception => } + } + } + logger.info(s"[$partitionId|$epochId] Writer closed") + } + + // ------------------------------------------------------------------------- + // Internal Methods + // ------------------------------------------------------------------------- + + private def worker(): Unit = { + while (!stopEvent.get()) { + try { + var collecting = true + while (collecting && batch.size < config.batchSize) { + val item = queue.poll(10, TimeUnit.MILLISECONDS) + if (item != null) batch += item + else collecting = false + } + + if (batch.size >= config.batchSize || (batch.nonEmpty && timeToFlush())) { + flushBatch() + } + + Thread.sleep(0, 100000) // 0.1ms + } catch { + case e: Exception => + logger.error(s"Worker error: ${e.getMessage}", e) + workerError = e.getMessage + return + } + } + } + + private def flushBatch(): Unit = { + if (batch.isEmpty) return + + val perfStart = System.currentTimeMillis() + try { + config.mode.toLowerCase match { + case "bulk-insert" => flushWithCopy() + case _ => flushWithExecuteBatch() + } + conn.commit() + + val flushedCount = batch.size + batch.clear() + lastFlush = System.currentTimeMillis() + val perfTime = System.currentTimeMillis() - perfStart + logger.info( + s"[$partitionId|$epochId] Flushed $flushedCount rows in ${perfTime}ms" + ) + } catch { + case e: Exception => + try conn.rollback() + catch { case _: Exception => } + throw e + } + } + + private def flushWithCopy(): Unit = { + val cm = new org.postgresql.copy.CopyManager( + conn.unwrap(classOf[org.postgresql.core.BaseConnection]) + ) + val cols = columns.mkString(", ") + val copyIn = + cm.copyIn(s"COPY ${config.table} ($cols) FROM STDIN WITH (FORMAT text)") + + try { + for (row <- batch) { + val line = row.map { + case null => "\\N" + case v => + v.toString + .replace("\\", "\\\\") + .replace("\t", "\\t") + .replace("\n", "\\n") + .replace("\r", "\\r") + }.mkString("\t") + "\n" + val bytes = line.getBytes("UTF-8") + copyIn.writeToCopy(bytes, 0, bytes.length) + } + copyIn.endCopy() + } finally { + if (copyIn.isActive) { + copyIn.cancelCopy() + } + } + } + + private def flushWithExecuteBatch(): Unit = { + val stmt = conn.prepareStatement(sql.get) + try { + for (row <- batch) { + for (i <- row.indices) { + if (row(i) == null) stmt.setNull(i + 1, Types.NULL) + else setParameter(stmt, i + 1, row(i), columnTypes(i)) + } + stmt.addBatch() + } + stmt.executeBatch() + } finally { + stmt.close() + } + } + + private def flushRemaining(): Unit = { + if (queue != null) { + val remaining = new java.util.ArrayList[Array[Any]]() + queue.drainTo(remaining) + remaining.forEach(item => batch += item) + } + if (batch != null && batch.nonEmpty) flushBatch() + } + + private def timeToFlush(): Boolean = + System.currentTimeMillis() - lastFlush >= config.batchIntervalMs + + // ------------------------------------------------------------------------- + // SQL Builder + // ------------------------------------------------------------------------- + + private def buildSql(): Option[String] = { + val cols = columns.mkString(", ") + val placeholders = columns.map(_ => "?").mkString(", ") + + config.mode.toLowerCase match { + case "insert" => + Some(s"INSERT INTO ${config.table} ($cols) VALUES ($placeholders)") + + case "upsert" => + if (config.primaryKeys.isEmpty) { + throw new IllegalArgumentException("primaryKeys required for upsert mode") + } + val pkCols = config.primaryKeys.mkString(", ") + val pkSet = config.primaryKeys.map(_.toLowerCase).toSet + val updateCols = columns + .filterNot(c => pkSet.contains(c.toLowerCase)) + .map(c => s"$c = EXCLUDED.$c") + .mkString(", ") + + if (updateCols.isEmpty) { + Some( + s"""INSERT INTO ${config.table} ($cols) VALUES ($placeholders) + |ON CONFLICT ($pkCols) DO NOTHING""".stripMargin + ) + } else { + Some( + s"""INSERT INTO ${config.table} ($cols) VALUES ($placeholders) + |ON CONFLICT ($pkCols) DO UPDATE SET $updateCols""".stripMargin + ) + } + + case "bulk-insert" => + None + + case other => + throw new IllegalArgumentException( + s"Invalid mode: $other. Use 'insert', 'upsert', or 'bulk-insert'." + ) + } + } + + // ------------------------------------------------------------------------- + // Type Conversion + // ------------------------------------------------------------------------- + + private def convertValue(value: Any, dataType: DataType): Any = { + if (value == null) return null + dataType match { + case BooleanType => value.asInstanceOf[Boolean] + case IntegerType => value match { case i: Int => i; case o => o.toString.toInt } + case LongType => value match { case l: Long => l; case o => o.toString.toLong } + case FloatType => value match { case f: Float => f; case o => o.toString.toFloat } + case DoubleType => value match { case d: Double => d; case o => o.toString.toDouble } + case ShortType => value match { case s: Short => s; case o => o.toString.toShort } + case ByteType => value match { case b: Byte => b; case o => o.toString.toByte } + case _: DecimalType => + value match { + case d: java.math.BigDecimal => d + case d: scala.math.BigDecimal => d.bigDecimal + case o => new java.math.BigDecimal(o.toString) + } + case StringType => value.toString + case DateType => value + case TimestampType => value + case BinaryType => value + case _ => value + } + } + + private def setParameter( + stmt: PreparedStatement, + index: Int, + value: Any, + dataType: DataType + ): Unit = { + dataType match { + case BooleanType => stmt.setBoolean(index, value.asInstanceOf[Boolean]) + case IntegerType => stmt.setInt(index, value.asInstanceOf[Int]) + case LongType => stmt.setLong(index, value.asInstanceOf[Long]) + case FloatType => stmt.setFloat(index, value.asInstanceOf[Float]) + case DoubleType => stmt.setDouble(index, value.asInstanceOf[Double]) + case ShortType => stmt.setShort(index, value.asInstanceOf[Short]) + case ByteType => stmt.setByte(index, value.asInstanceOf[Byte]) + case _: DecimalType => stmt.setBigDecimal(index, value.asInstanceOf[java.math.BigDecimal]) + case StringType => stmt.setString(index, value.asInstanceOf[String]) + case DateType => stmt.setDate(index, value.asInstanceOf[java.sql.Date]) + case TimestampType => stmt.setTimestamp(index, value.asInstanceOf[java.sql.Timestamp]) + case BinaryType => stmt.setBytes(index, value.asInstanceOf[Array[Byte]]) + case _ => stmt.setObject(index, value) + } + } + + // ------------------------------------------------------------------------- + // Spark to PostgreSQL Type Mapping + // ------------------------------------------------------------------------- + + private def sparkTypeToPgType(dataType: DataType): String = { + dataType match { + case BooleanType => "BOOLEAN" + case ByteType => "SMALLINT" + case ShortType => "SMALLINT" + case IntegerType => "INTEGER" + case LongType => "BIGINT" + case FloatType => "REAL" + case DoubleType => "DOUBLE PRECISION" + case d: DecimalType => s"DECIMAL(${d.precision}, ${d.scale})" + case StringType => "TEXT" + case DateType => "DATE" + case TimestampType => "TIMESTAMP" + case BinaryType => "BYTEA" + case _ => "TEXT" + } + } + + // ------------------------------------------------------------------------- + // Schema Validation + // ------------------------------------------------------------------------- + + private def findUnsupportedFields(schema: StructType): Seq[String] = { + schema.fields.toSeq.flatMap { field => + field.dataType match { + case _: StructType => Some(field.name) + case _: MapType => Some(field.name) + case arr: ArrayType + if arr.elementType.isInstanceOf[StructType] || + arr.elementType.isInstanceOf[MapType] => + Some(field.name) + case _ => None + } + } + } +} + + +// COMMAND ---------- + +val username = dbutils.secrets.get("gaming-sessionization-rtm-demo", "lakebase-jdbc-username") +val password = dbutils.secrets.get("gaming-sessionization-rtm-demo", "lakebase-jdbc-password") +val table = "session_results_rtm_insert" +// Replace with your Lakebase hostname (never commit a real instance host in public forks). +val host = "YOUR_LAKEBASE_DATABASE_HOST" + +val lakebaseConfig = LakebaseConfig( + username = username, + password = password, + table = table, + host = host, + mode = "insert", // insert or upsert or bulk-insert + // primaryKeys = Seq("deviceid", "appsessionid", "psnaccountid", "sessionstatus") +) + +processed_stream_df + .writeStream + .queryName("sessionization") + .option("checkpointLocation", checkpoint_path) + .outputMode("update") + .trigger(RealTimeTrigger.apply("5 minutes")) + .foreach(new LakebaseForeachWriter(lakebaseConfig, processed_stream_df.schema)) + .start() + +// COMMAND ---------- + + + +// COMMAND ---------- + diff --git a/2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/RTM-TWS.scala b/2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/RTM-TWS.scala new file mode 100644 index 0000000..7957dfa --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/RTM-TWS.scala @@ -0,0 +1,452 @@ +// Databricks notebook source +// import org.apache.spark.sql.streaming.{TTLConfig, ValueState, OutputMode, StatefulProcessor, TimeMode, TimerValues} +import org.apache.spark.sql.streaming._ +import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ + +import org.apache.spark.sql.streaming.Trigger +import org.apache.log4j.Logger +import java.sql.Timestamp +import java.time._ +import scala.collection.mutable.ListBuffer +import java.time.Duration +import org.apache.spark.sql.execution.streaming.RealTimeTrigger +import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress} + +// COMMAND ---------- + +dbutils.widgets.dropdown("mode", "RTM", Seq("RTM", "MBM")) +val mode = dbutils.widgets.get("mode") + +// COMMAND ---------- + +spark.conf.set( + "spark.sql.streaming.stateStore.providerClass", + "com.databricks.sql.streaming.state.RocksDBStateStoreProvider" +) +// spark.conf.set( +// "spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled", +// "true" +// ) + +if (mode == "RTM") { + spark.conf.set("spark.sql.shuffle.partitions", "112") +} else { + spark.conf.set("spark.sql.shuffle.partitions", "128") +} + +// COMMAND ---------- + +val stream_name = "RTM-tws-poc" +val volume_path = "/Volumes/gaming_sessionization_demo/rtm_workload/write_to_kafka" +val checkpoint_path = s"$volume_path/$stream_name" + +dbutils.widgets.text("clean_checkpoint", "yes") +val clean_checkpoint = dbutils.widgets.get("clean_checkpoint") +if (clean_checkpoint == "yes") { + dbutils.fs.rm(checkpoint_path, true) +} + +// COMMAND ---------- + +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +class CustomStreamingQueryListener extends StreamingQueryListener { + implicit val formats: Formats = DefaultFormats + override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { + println(s"Query started: id=${event.id}, name=${event.name}") + } + override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { + val progress: StreamingQueryProgress = event.progress + println(s"batchId = ${progress.batchId} " + s"timestamp = ${progress.timestamp} " + s"numInputRows=${progress.numInputRows} " + s"batchDuration=${progress.batchDuration} ") + val stateOperators = progress.stateOperators(0) + val stateMetrics = stateOperators.customMetrics + // print(stateOperators) + println( + s"numRowsTotal = ${stateOperators.numRowsTotal} " + s"numRowsUpdated = ${stateOperators.numRowsUpdated} " + + s"numExpiredTimers=${stateMetrics.get("numExpiredTimers")} " + s"numRegisteredTimers=${stateMetrics.get("numRegisteredTimers")} " + + s"rocksdbPutLatency=${stateMetrics.get("rocksdbPutLatency")} " + s"timerProcessingTimeMs=${stateMetrics.get("timerProcessingTimeMs")} " + ) + if (mode == "RTM") { + val progress_json = parse(progress.json) + val latenciesJson: JValue = progress_json \ "latencies" + println(pretty(render(latenciesJson))) + } + } + override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { + println(s"Query terminated: id=${event.id}, runId=${event.runId}") + } +} + +val listener = new CustomStreamingQueryListener + +// Remove existing instance if present +spark.streams.removeListener(listener) + +// Add the listener +spark.streams.addListener(listener) + +// COMMAND ---------- + +val kafka_bootstrap_servers_plaintext = dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers") +val pc_sessions_topic = "pc_sessions" +val console_sessions_topic = "console_sessions" +val output_topic = "output_sessions" + +// COMMAND ---------- + +val kafka_schema = new StructType() + .add("appSessionId", LongType) + .add("eventId", StringType) + .add("psnAccountId", StringType) + .add("hostPcId", StringType) // for pc_sessions, may be null for console_sessions + .add("openPsid", StringType) // for console_sessions, may be null for pc_sessions + .add("timestamp", TimestampType) + .add("totalFgTime", LongType) + +// COMMAND ---------- + +package org.databricks.TransformWithStateStructs +import java.sql.Timestamp + +object SessionStructs { + case class InputRow( + topic: String, + partition: Int, + offset: Long, + kafka_timestamp: Timestamp, + deviceId: String, + psnAccountId: String, + appSessionId: Long, + eventId: String, + session_timestamp: Timestamp, + totalFgTime: Long, + ) + + case class OutputRow( + deviceId: String, + appSessionId: Long, + psnAccountId: String, + sessionStatus: String, + session_timestamp: Timestamp, + sessionDuration: Long, + upstream_timestamp: Timestamp, + processing_timestamp: Timestamp, + timer_info: Timestamp, + debug_info: String + ) + + case class MeasuredSessionValue( + psnAccountId: String, + sessionStatus: String, + session_timestamp: Timestamp, + sessionDuration: Long, + upstream_timestamp: Timestamp, + processing_timestamp: Timestamp, + timer_info: Timestamp, + ) +} + +// COMMAND ---------- + +implicit class PipeOps[A](val a: A) extends AnyVal { + def pipeIf(cond: Boolean)(f: A => A): A = if (cond) f(a) else a +} + +val input_stream_df = spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext) + .option("subscribe", s"$pc_sessions_topic,$console_sessions_topic") + .option("startingOffsets", "earliest") + .pipeIf(mode == "RTM")(_.option("maxPartitions", 16)) + .load() + .withColumn("key", col("key").cast("string")) + .withColumn("value", col("value").cast("string")) + .withColumnRenamed("timestamp", "kafka_timestamp") + .withColumn("session", from_json(col("value"), kafka_schema)) + .withColumn( + "deviceId", + when(col("topic") === pc_sessions_topic, col("session.hostPcId")) + .when(col("topic") === console_sessions_topic, col("session.openPsid")) + .otherwise(lit(null)) + ) + .selectExpr( + "topic", "partition", "offset", "kafka_timestamp", + "deviceId", "session.psnAccountId", "session.appSessionId","session.eventId","session.timestamp as session_timestamp", "session.totalFgTime" + ) + +// COMMAND ---------- + +// DBTITLE 1,Untitled +// import SessionStructs._ +import org.databricks.TransformWithStateStructs.SessionStructs._ + +class Sessionization() extends StatefulProcessor[String, InputRow, OutputRow] { + + @transient protected var sessionStatusState: MapState[Long, MeasuredSessionValue] = _ + + val TIMER_THRESHOLD_IN_MS = 30000 + val SESSION_THRESHOLD_IN_SECONDS = 1800 + val DEBUG_INFO_1 = "SessionStart" + val DEBUG_INFO_2 = "SessionEnd for existing Session" + val DEBUG_INFO_3 = "SessionEnd due to THRESHOLD" + val DEBUG_INFO_4 = "SessionEnd due to another SessionStart" + + override def init( + outputMode: OutputMode, + timeMode: TimeMode): Unit = { + + sessionStatusState = getHandle.getMapState[Long, MeasuredSessionValue]("sessionStatusState", Encoders.scalaLong, Encoders.product[MeasuredSessionValue], TTLConfig.NONE) + } + + override def handleInputRows( + key: String, + inputRows: Iterator[InputRow], + timerValues: TimerValues): Iterator[OutputRow] = { + + var outputRows = ListBuffer[OutputRow]() + + inputRows.foreach { row => + // Check if the incoming record is for session start + if (row.eventId == "ApplicationSessionStart") { + // Check if there is already an active Session for the Device + if(sessionStatusState.exists()) { + // pull the existing sessionId from statestore, generate sessionEnd record, and emit that + val sessionKeyIter = sessionStatusState.keys() + for (sessionId <- sessionKeyIter) { + val outputRecord = generateSessionEnd(key, null, sessionId, timerValues) + outputRows.append(outputRecord) + } + } + // Generate sessionStart record, and emit that + val outputRecord = generateSessionStart(key, row, timerValues) + outputRows.append(outputRecord) + } // checking if the incoming record is for session end + else if (row.eventId == "ApplicationSessionEndBi") { + // There should already an active Session for the Device + if(sessionStatusState.exists()) { + // Generate sessionEnd record, and emit that + val outputRecord = generateSessionEnd(key, row, -1L, timerValues) + outputRows.append(outputRecord) + } else { + // This can happen only of the sessionEnd event comes after the session has already been removed due to SESSION_THRESHOLD_IN_SECONDS. + } + } + } + + outputRows.iterator + } + + def generateSessionStart(key: String, inputRow: InputRow, timerValues: TimerValues): OutputRow = { + + // Get the processing time of the current micro-batch, remains same for every record in the same micro-batch + val currentProcessingTimeMillis = timerValues.getCurrentProcessingTimeInMs() + val currentProcessingTime = new Timestamp(currentProcessingTimeMillis) + + // create a new timer for next 30 sec, current processing time + 30 sec + val timerMillis = currentProcessingTimeMillis + TIMER_THRESHOLD_IN_MS + val timerTime = new Timestamp(timerMillis) + + // Generate sessionStart record + val outputRecord = OutputRow( + key, + inputRow.appSessionId, + inputRow.psnAccountId, + "SessionStart", + inputRow.session_timestamp, + 0, + inputRow.kafka_timestamp, + currentProcessingTime, + timerTime, + DEBUG_INFO_1 + ) + + // Create a key-value entry into the map state + val mapKey = inputRow.appSessionId + val mapValue = MeasuredSessionValue( + inputRow.psnAccountId, + "SessionStart", + inputRow.session_timestamp, + 0, + inputRow.kafka_timestamp, + currentProcessingTime, + timerTime + ) + sessionStatusState.updateValue(mapKey, mapValue) + + // Set the timer for current batch processingTime + 30 seconds + getHandle.registerTimer(timerMillis) + + outputRecord + } + + def generateSessionEnd(key: String, inputRow: InputRow, sessionId: Long, timerValues: TimerValues): OutputRow = { + // Get the processing time of the current micro-batch, remains same for every record in the same micro-batch + val currentProcessingTimeMillis = timerValues.getCurrentProcessingTimeInMs() + val currentProcessingTime = new Timestamp(currentProcessingTimeMillis) + + // Generate a sessionEnd record, and emit that + var outputRecord:OutputRow = null + if (inputRow != null) { + outputRecord = OutputRow( + key, + inputRow.appSessionId, + inputRow.psnAccountId, + "SessionEnd", + inputRow.session_timestamp, + inputRow.totalFgTime, + inputRow.kafka_timestamp, + currentProcessingTime, + null, + DEBUG_INFO_2 + ) + // remove the session from state + sessionStatusState.removeKey(inputRow.appSessionId) + } else { + val sessionValue = sessionStatusState.getValue(sessionId) + val currentSessionDuration: Long = (currentProcessingTimeMillis - sessionValue.processing_timestamp.getTime)/1000 + outputRecord = OutputRow( + key, + sessionId, + sessionValue.psnAccountId, + "SessionEnd", + sessionValue.session_timestamp, + currentSessionDuration, + sessionValue.upstream_timestamp, + currentProcessingTime, + null, + DEBUG_INFO_4 + ) + // remove the session from state + sessionStatusState.removeKey(sessionId) + } + + // Delete all the timers for the deviceId + val timerIter = getHandle.listTimers() + for (timer <- timerIter) getHandle.deleteTimer(timer) + + outputRecord + } + + // Define the logic for handling expired timers + override def handleExpiredTimer( + key: String, + timerValues: TimerValues, + expiredTimerInfo: ExpiredTimerInfo): Iterator[OutputRow] = { + + // Get the processing time of the current micro-batch, remains same for every record in the same micro-batch + val currentProcessingTimeMillis = timerValues.getCurrentProcessingTimeInMs() + val currentProcessingTime = new Timestamp(currentProcessingTimeMillis) + + // Get the expired Timer info + val expiredTimerMillis = expiredTimerInfo.getExpiryTimeInMs() + val expiredTimerTime = new Timestamp(expiredTimerMillis) + + // create a new timer for next 30 sec, current timer expiry time + 30 sec + val nextTimerMillis = expiredTimerMillis + TIMER_THRESHOLD_IN_MS + val nextTimerTime = new Timestamp(nextTimerMillis) + + var outputRows = ListBuffer[OutputRow]() + + // Expecting an entry for the expired timer for a given sessionId. + if (sessionStatusState.exists()) { + + // pull the existing sessionId, generate sessionHeartbeat record, and emit that + val sessionIdKeyIter = sessionStatusState.keys() + + for (sessionId <- sessionIdKeyIter) { + var sessionValue = sessionStatusState.getValue(sessionId) + + // Caluate the sessionDuration for heartbeat records, Current processing time - SessionStart event processing time + val currentSessionDuration: Long = (currentProcessingTimeMillis - sessionValue.processing_timestamp.getTime)/1000 + val sessionEvent = "SessionHeartbeat" + var outputRecord:OutputRow = null + + if (currentSessionDuration < SESSION_THRESHOLD_IN_SECONDS) { + // Generate sessionHeartbeat record, and emit that + outputRecord = OutputRow( + key, + sessionId, + sessionValue.psnAccountId, + sessionEvent, + sessionValue.session_timestamp, + currentSessionDuration, + sessionValue.upstream_timestamp, + currentProcessingTime, + nextTimerTime, + "Timer expired at: " + expiredTimerTime + ) + // create a new timer for next 30 sec, current processing time + 30 sec + getHandle.registerTimer(nextTimerMillis) + } else { + outputRecord = OutputRow( + key, + sessionId, + sessionValue.psnAccountId, + "SessionEnd", + sessionValue.session_timestamp, + currentSessionDuration, + sessionValue.upstream_timestamp, + currentProcessingTime, + null, + DEBUG_INFO_3 + ) + // remove the session from state + sessionStatusState.removeKey(sessionId) + } + outputRows.append(outputRecord) + } + } + outputRows.iterator + } + +} + +// COMMAND ---------- + +val processed_stream_df = input_stream_df.as[InputRow] + .groupByKey(_.deviceId) + .transformWithState( + new Sessionization(), + TimeMode.ProcessingTime, + OutputMode.Update() + ) + .toDF() + .withColumn("value", struct( + col("deviceId"), + col("appSessionId"), + col("psnAccountId"), + col("sessionStatus"), + col("session_timestamp"), + col("sessionDuration"), + col("upstream_timestamp"), + col("processing_timestamp"), + col("timer_info"), + col("debug_info") + )) + .select(to_json(col("value")).alias("value")) + +// COMMAND ---------- + +processed_stream_df + .writeStream + .queryName("sessionization") + .format("kafka") + .option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext) + .option("topic", output_topic) + .option("checkpointLocation", checkpoint_path) + .trigger( + if (mode == "RTM") RealTimeTrigger.apply("5 minutes") + else Trigger.ProcessingTime("0.5 seconds") + ) + .outputMode("update") + .start() + +// COMMAND ---------- + + + +// COMMAND ---------- + diff --git a/2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/Write_RTM_session_results_to_delta.py b/2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/Write_RTM_session_results_to_delta.py new file mode 100644 index 0000000..22a8b0b --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/RTM-Sessionization/Write_RTM_session_results_to_delta.py @@ -0,0 +1,65 @@ +# Databricks notebook source +from pyspark.sql.functions import col, from_json +from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType + +# COMMAND ---------- + +kafka_bootstrap_servers_plaintext = dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers") +output_topic = "output_sessions" +volume_path = '/Volumes/gaming_sessionization_demo/rtm_workload/write_to_kafka' +checkpoint_path = f'{volume_path}/{output_topic}' + +dbutils.widgets.text('clean_checkpoint', 'yes') +clean_checkpoint = dbutils.widgets.get('clean_checkpoint') +if clean_checkpoint == 'yes': + dbutils.fs.rm(checkpoint_path, True) + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC drop table if exists gaming_sessionization_demo.rtm_workload.session_results_rtm; + +# COMMAND ---------- + +kafka_schema = StructType([ + StructField("deviceId", StringType(), True), + StructField("appSessionId", LongType(), True), + StructField("psnAccountId", StringType(), True), + StructField("sessionStatus", StringType(), True), + StructField("session_timestamp", TimestampType(), True), + StructField("sessionDuration", LongType(), True), + StructField("upstream_timestamp", TimestampType(), True), + StructField("processing_timestamp", TimestampType(), True), + StructField("timer_info", TimestampType(), True), + StructField("debug_info", StringType(), True) +]) + +# COMMAND ---------- + +stream_df = (spark.readStream.format("kafka") + .option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext) + .option("subscribe", output_topic) + .option("startingOffsets", "earliest") + .load() + .withColumn("value", col("value").cast('string')) + .withColumn("value_struct", from_json(col("value"), kafka_schema)) + .selectExpr( + 'timestamp as output_timestamp', + 'value_struct.*' + ) + ) + +# COMMAND ---------- + +( + stream_df + .writeStream + .queryName('write_RTM_sessions') + .outputMode("append") + .trigger(processingTime = '1 seconds') + .option("checkpointLocation", checkpoint_path) + .toTable("gaming_sessionization_demo.rtm_workload.session_results_rtm") +) + +# COMMAND ---------- + diff --git a/2026-04-rtm-transformWithState-GamingSessionization/debug.sql b/2026-04-rtm-transformWithState-GamingSessionization/debug.sql new file mode 100644 index 0000000..fe2c18e --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/debug.sql @@ -0,0 +1,58 @@ +--get Counts of input stream by minute +with sessions_stream as ( + select appSessionId, eventId, hostPcId as deviceId, psnAccountId, timestamp, totalFgTime from gaming_sessionization_demo.rtm_workload.pc_sessions_stream + union all + select appSessionId, eventId, openPsid as deviceId, psnAccountId, timestamp, totalFgTime from gaming_sessionization_demo.rtm_workload.console_sessions_stream +) +select date_trunc('MINUTE', `timestamp`), count(1) +from sessions_stream +group by date_trunc('MINUTE', `timestamp`) +order by date_trunc('MINUTE', `timestamp`) + +--SessionStart vs SessionHeartbeat ratio from output +select +date_trunc('MINUTE', `session_timestamp`), +sum(case when sessionStatus = 'SessionStart' then 1 else null end) as SessionStart, +sum(case when sessionStatus = 'SessionHeartbeat' then 1 else null end) as SessionHeartbeat, +sum(case when sessionStatus = 'SessionEnd' then 1 else null end) as SessionEnd +from gaming_sessionization_demo.rtm_workload.session_results_rtm +group by date_trunc('MINUTE', `session_timestamp`) +order by date_trunc('MINUTE', `session_timestamp`) + + +-- Debugging Timer heartbeats for the session scope - get some example deviceIds +with sessions_stream as ( + select appSessionId, eventId, hostPcId as deviceId, psnAccountId, timestamp, totalFgTime from gaming_sessionization_demo.rtm_workload.pc_sessions_stream + union all + select appSessionId, eventId, openPsid as deviceId, psnAccountId, timestamp, totalFgTime from gaming_sessionization_demo.rtm_workload.console_sessions_stream +), +minute_1 as (select deviceid, appSessionId, totalFgTime, `timestamp` as session_start from sessions_stream where minute(`timestamp`) = 1 ), +minute_5 as (select deviceid, appSessionId, totalFgTime, `timestamp` as session_end from sessions_stream where minute(`timestamp`) = 6 ) + +select * from minute_1 join minute_5 on minute_1.deviceid = minute_5.deviceid and minute_1.appSessionId = minute_5.appSessionId order by session_start, session_end + +-- Debugging Timer heartbeats for the session scope - pick a device id and +select * except(deviceId, psnAccountId) +from gaming_sessionization_demo.rtm_workload.session_results_rtm where deviceId = 'REPLACE_WITH_DEVICE_ID' +order by output_timestamp asc + +-- Getting latency metrics +with tab1 as ( + select deviceId, sessionStatus, appSessionId, psnAccountId, processing_timestamp, upstream_timestamp, output_timestamp, timestampdiff(MILLISECOND, upstream_timestamp, output_timestamp) as latency_ms +from gaming_sessionization_demo.rtm_workload.session_results_rtm where sessionStatus in ('SessionStart', 'SessionEnd') +order by timestampdiff(SECOND, upstream_timestamp, output_timestamp) asc +) + select + count(1), + min(latency_ms) as min, + percentile(latency_ms, 0.10) as p10, + percentile(latency_ms, 0.25) as p25, + percentile(latency_ms, 0.5) as median, + percentile(latency_ms, 0.75) as p75, + percentile(latency_ms, 0.90) as p90, + percentile(latency_ms, 0.95) as p95, + percentile(latency_ms, 0.99) as p99, + max(latency_ms) as max + from tab1 + + diff --git a/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/Kafka-console-sessions-stream-ingest.py b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/Kafka-console-sessions-stream-ingest.py new file mode 100644 index 0000000..0ee6ab7 --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/Kafka-console-sessions-stream-ingest.py @@ -0,0 +1,66 @@ +# Databricks notebook source +from pyspark.sql import functions as F +from pyspark.sql.streaming import StreamingQueryListener + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC CREATE EXTERNAL VOLUME IF NOT EXISTS gaming_sessionization_demo.rtm_workload.write_to_kafka +# MAGIC LOCATION 's3://YOUR_EXTERNAL_LOCATION/gaming-sessionization-rtm/write_to_kafka/' + +# COMMAND ---------- + +kafka_bootstrap_servers_plaintext = dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers") +console_sessions_topic = 'console_sessions' +volume_path = '/Volumes/gaming_sessionization_demo/rtm_workload/write_to_kafka' +checkpoint_path = f'{volume_path}/{console_sessions_topic}' + +dbutils.widgets.text('clean_checkpoint', 'yes') +clean_checkpoint = dbutils.widgets.get('clean_checkpoint') +if clean_checkpoint == 'yes': + dbutils.fs.rm(checkpoint_path, True) + +# COMMAND ---------- + +class MyStreamingListener(StreamingQueryListener): + def onQueryStarted(self, event): + print(f"'{event.name}' [{event.id}] got started!") + def onQueryProgress(self, event): + row = event.progress + print(f"****************************************** batchId ***********************") + print(f"batchId = {row.batchId} timestamp = {row.timestamp} numInputRows = {row.numInputRows} batchDuration = {row.batchDuration}") + def onQueryTerminated(self, event): + print(f"{event.id} got terminated!") +try: + spark.streams.removeListener(MyStreamingListener()) +except: + pass +spark.streams.addListener(MyStreamingListener()) + +# COMMAND ---------- + +console_stream_df = ( + spark.readStream + .format("delta") + .option("maxFilesPerTrigger", 1) + .table("gaming_sessionization_demo.rtm_workload.console_sessions_stream") + .withColumn("all_columns", F.to_json(F.struct('appSessionId', 'eventId', 'openPsid', 'psnAccountId', 'timestamp', 'totalFgTime'))) + .selectExpr('CAST(all_columns AS BINARY) AS value') +) + +# COMMAND ---------- + +( + console_stream_df + .writeStream + .queryName('console_sessions_stream') + .format('kafka') + .option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext) + .option("topic", console_sessions_topic) + .option("checkpointLocation", checkpoint_path) + .trigger(processingTime = '1 seconds') + .start() +) + +# COMMAND ---------- + diff --git a/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/Kafka-pc-sessions-stream-ingest.py b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/Kafka-pc-sessions-stream-ingest.py new file mode 100644 index 0000000..6620778 --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/Kafka-pc-sessions-stream-ingest.py @@ -0,0 +1,66 @@ +# Databricks notebook source +from pyspark.sql import functions as F +from pyspark.sql.streaming import StreamingQueryListener + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC CREATE EXTERNAL VOLUME IF NOT EXISTS gaming_sessionization_demo.rtm_workload.write_to_kafka +# MAGIC LOCATION 's3://YOUR_EXTERNAL_LOCATION/gaming-sessionization-rtm/write_to_kafka/' + +# COMMAND ---------- + +kafka_bootstrap_servers_plaintext = dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers") +pc_sessions_topic = 'pc_sessions' +volume_path = '/Volumes/gaming_sessionization_demo/rtm_workload/write_to_kafka' +checkpoint_path = f'{volume_path}/{pc_sessions_topic}' + +dbutils.widgets.text('clean_checkpoint', 'yes') +clean_checkpoint = dbutils.widgets.get('clean_checkpoint') +if clean_checkpoint == 'yes': + dbutils.fs.rm(checkpoint_path, True) + +# COMMAND ---------- + +class MyStreamingListener(StreamingQueryListener): + def onQueryStarted(self, event): + print(f"'{event.name}' [{event.id}] got started!") + def onQueryProgress(self, event): + row = event.progress + print(f"****************************************** batchId ***********************") + print(f"batchId = {row.batchId} timestamp = {row.timestamp} numInputRows = {row.numInputRows} batchDuration = {row.batchDuration}") + def onQueryTerminated(self, event): + print(f"{event.id} got terminated!") +try: + spark.streams.removeListener(MyStreamingListener()) +except: + pass +spark.streams.addListener(MyStreamingListener()) + +# COMMAND ---------- + +pc_stream_df = ( + spark.readStream + .format("delta") + .option("maxFilesPerTrigger", 1) + .table("gaming_sessionization_demo.rtm_workload.pc_sessions_stream") + .withColumn("all_columns", F.to_json(F.struct('appSessionId', 'eventId', 'hostPcId', 'psnAccountId', 'timestamp', 'totalFgTime'))) + .selectExpr('CAST(all_columns AS BINARY) AS value') +) + +# COMMAND ---------- + +( + pc_stream_df + .writeStream + .queryName('pc_sessions_stream') + .format('kafka') + .option("kafka.bootstrap.servers", kafka_bootstrap_servers_plaintext) + .option("topic", pc_sessions_topic) + .option("checkpointLocation", checkpoint_path) + .trigger(processingTime = '1 seconds') + .start() +) + +# COMMAND ---------- + diff --git a/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/create-delete-topic-scala.scala b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/create-delete-topic-scala.scala new file mode 100644 index 0000000..7c5de3f --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/create-delete-topic-scala.scala @@ -0,0 +1,64 @@ +// Databricks notebook source +// Maven dependency: org.apache.kafka:kafka-clients:3.5.1 + +// COMMAND ---------- + +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic} +import java.util.{Collections, Properties} +import scala.jdk.CollectionConverters._ + +// COMMAND ---------- + +val kafkaBootstrapServers = dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers") + +val pcSessionsTopic = "pc_sessions" +val consoleSessionsTopic = "console_sessions" +val outputTopic = "output_sessions" + +// COMMAND ---------- + +val props = new Properties() +props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers) +val adminClient = AdminClient.create(props) + +// COMMAND ---------- + +def deleteTopic(topicName: String): Unit = { + try { + adminClient.deleteTopics(Collections.singletonList(topicName)).all().get() + println(s"Topic '$topicName' deleted.") + } catch { + case e: Exception => println(s"Failed to delete topic '$topicName': ${e.getMessage}") + } +} + +deleteTopic(pcSessionsTopic) +deleteTopic(consoleSessionsTopic) +deleteTopic(outputTopic) + +// COMMAND ---------- + +// val retentionMs = 7 * 24 * 60 * 60 * 1000 // 7 days +val retentionMs = -1 + +def createTopic(topicName: String, numPartitions: Int, replicationFactor: Short = 3): Unit = { + val topic = new NewTopic(topicName, numPartitions, replicationFactor) + .configs(Map("retention.ms" -> retentionMs.toString).asJava) + try { + adminClient.createTopics(Collections.singletonList(topic)).all().get() + println(s"Topic '$topicName' created successfully.") + } catch { + case e: Exception => println(s"Error creating topic '$topicName': ${e.getMessage}") + } +} + +createTopic(pcSessionsTopic, 16) +createTopic(consoleSessionsTopic, 16) +createTopic(outputTopic, 64) + +// COMMAND ---------- + +adminClient.close() + +// COMMAND ---------- + diff --git a/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/create-delete-topic.py b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/create-delete-topic.py new file mode 100644 index 0000000..2bc9fad --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/create-delete-topic.py @@ -0,0 +1,80 @@ +# Databricks notebook source +# MAGIC %pip install --index-url https://pypi-proxy.dev.databricks.com/simple kafka-python + +# COMMAND ---------- + + +from kafka.admin import KafkaAdminClient, NewTopic +from kafka.errors import TopicAlreadyExistsError + +# COMMAND ---------- + +kafka_bootstrap_servers_plaintext = dbutils.secrets.get("gaming-sessionization-rtm-demo", "kafka-bootstrap-servers") + +# Kafka topics +pc_sessions_topic = 'pc_sessions' +console_sessions_topic = 'console_sessions' +output_topic = "output_sessions" + +# COMMAND ---------- + +def delete_topic(topic_name): + # Configure Admin client + admin_client = KafkaAdminClient( + bootstrap_servers=kafka_bootstrap_servers_plaintext.split(','), + client_id="delete-topic-client" + ) + + # Delete the topic + try: + admin_client.delete_topics(topics=[topic_name]) + print(f"Topic '{topic_name}' marked for deletion.") + except Exception as e: + print(f"Failed to delete topic '{topic_name}': {e}") + +delete_topic(pc_sessions_topic) +delete_topic(console_sessions_topic) +delete_topic(output_topic) + +# COMMAND ---------- + +# retention_ms = 7 * 24 * 60 * 60 * 1000 # 7 days in milliseconds +retention_ms = -1 + +def create_topic(topic_name, num_partitions): + + # Configure Kafka admin client + create_client = KafkaAdminClient( + bootstrap_servers=kafka_bootstrap_servers_plaintext.split(','), # Replace with your Kafka broker(s) + client_id="create-topic-client" + ) + + # Define a new topic + topic = NewTopic( + name=topic_name, + num_partitions=num_partitions, + replication_factor=3, + topic_configs={ + 'retention.ms': str(retention_ms) + } + ) + + # Create the topic + try: + create_client.create_topics(new_topics=[topic], validate_only=False) + print(f"Topic '{topic_name}' created successfully.") + except Exception as e: + print(f"Error creating topic: {e}") + finally: + create_client.close() + +create_topic(pc_sessions_topic, 16) +create_topic(console_sessions_topic, 16) +create_topic(output_topic, 64) + +# COMMAND ---------- + + + +# COMMAND ---------- + diff --git a/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/generate-fake-session-data.py b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/generate-fake-session-data.py new file mode 100644 index 0000000..bb09bf1 --- /dev/null +++ b/2026-04-rtm-transformWithState-GamingSessionization/ingest-source-data/generate-fake-session-data.py @@ -0,0 +1,306 @@ +# Databricks notebook source +from datetime import datetime, timedelta +import uuid +import random +from pyspark.sql import functions as F + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC use catalog gaming_sessionization_demo; +# MAGIC create schema if not exists rtm_workload; +# MAGIC use schema rtm_workload; +# MAGIC +# MAGIC ALTER SCHEMA rtm_workload DISABLE PREDICTIVE OPTIMIZATION; +# MAGIC -- drop table if exists gaming_sessionization_demo.rtm_workload.pc_sessions; +# MAGIC -- drop table if exists gaming_sessionization_demo.rtm_workload.console_sessions; + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC set spark.databricks.delta.autoCompact.enabled = false; +# MAGIC set spark.databricks.delta.optimizeWrite.enabled = false; + +# COMMAND ---------- + +# def get_device_id(): +# return str(uuid.uuid4()) + +# def get_session_id(): +# return random.randint(1000000000, 9999999999) + +# def get_sink(): +# return random.choice(['PC','Console']) + +# def get_account_id(): +# return str(uuid.uuid4())[:9] + str(random.randint(999999000000000, 999999999999999)) + +# def get_session_start_time(start_time, end_time): +# total_seconds = int((end_time - start_time).total_seconds()) +# return start_time + timedelta(seconds=random.randint(0, total_seconds)) + +# def get_session_end_time(session_start_time): +# windows = [ +# (2*60, 4*60, 0.25), +# (4*60, 6*60, 0.25), +# (6*60, 8*60, 0.25), +# (8*60, 10*60, 0.25) +# ] +# window = random.choices( +# population=windows, +# weights=[w[2] for w in windows], +# k=1 +# )[0] +# offset_seconds = random.randint(window[0], window[1]) +# session_end_time = session_start_time + timedelta(seconds=offset_seconds) +# return session_end_time, offset_seconds + + + +# COMMAND ---------- + +# # Example start and end times (ISO format) +# start_time = datetime.fromisoformat("2025-11-01T00:00:00") +# end_time = datetime.fromisoformat("2025-11-01T00:59:59") +# eventIds = ['ApplicationSessionStart', 'ApplicationSessionEndBi'] +# minute_counter = 0 + +# minute_start = start_time +# while minute_start < end_time: +# minute_end = minute_start + timedelta(seconds=59) +# print(f"Minute: {minute_start} — {minute_end}") + +# pc_session_data = [] +# console_session_data = [] + +# if minute_counter < 2: +# num_sessions = 500000 +# elif 2 <= minute_counter < 3: +# num_sessions = 475000 +# elif 3 <= minute_counter < 4: +# num_sessions = 425000 +# elif 4 <= minute_counter < 5: +# num_sessions = 400000 +# elif 5 <= minute_counter < 6: +# num_sessions = 350000 +# elif 6 <= minute_counter < 8: +# num_sessions = 275000 +# elif 8 <= minute_counter < 12: +# num_sessions = 200000 +# elif 12 <= minute_counter < 15: +# num_sessions = 225000 + +# elif 15 <= minute_counter < 19: +# num_sessions = 300000 +# elif 19 <= minute_counter < 23: +# num_sessions = 275000 +# elif 23 <= minute_counter < 27: +# num_sessions = 250000 +# elif 27 <= minute_counter < 30: +# num_sessions = 225000 + +# elif 30 <= minute_counter < 34: +# num_sessions = 300000 +# elif 34 <= minute_counter < 38: +# num_sessions = 275000 +# elif 38 <= minute_counter < 42: +# num_sessions = 250000 +# elif 42 <= minute_counter < 45: +# num_sessions = 225000 + +# elif 45 <= minute_counter < 49: +# num_sessions = 300000 +# elif 49 <= minute_counter < 53: +# num_sessions = 275000 +# elif 53 <= minute_counter < 57: +# num_sessions = 250000 +# elif 57 <= minute_counter < 60: +# num_sessions = 225000 + + +# for _ in range(num_sessions): +# sink = get_sink() +# deviceId = get_device_id() +# sessionId = get_session_id() +# accountId = get_account_id() #psnAccountId +# sessionIdStartTime = get_session_start_time(minute_start, minute_end) +# sessionIdEndTime, sessionDuration = get_session_end_time(sessionIdStartTime) + +# if sink == 'PC': +# session_start = { +# "hostPcId": deviceId, +# "appSessionId": sessionId, +# "psnAccountId": accountId, +# "eventId": "ApplicationSessionStart", +# "timestamp": sessionIdStartTime, +# "totalFgTime": 0 +# } +# session_end = { +# "hostPcId": deviceId, +# "appSessionId": sessionId, +# "psnAccountId": accountId, +# "eventId": "ApplicationSessionEndBi", +# "timestamp": sessionIdEndTime, +# "totalFgTime": sessionDuration +# } +# pc_session_data.append(session_start) +# pc_session_data.append(session_end) +# elif sink == 'Console': +# session_start = { +# "openPsid": deviceId, +# "appSessionId": sessionId, +# "psnAccountId": accountId, +# "eventId": "ApplicationSessionStart", +# "timestamp": sessionIdStartTime, +# "totalFgTime": 0 +# } +# session_end = { +# "openPsid": deviceId, +# "appSessionId": sessionId, +# "psnAccountId": accountId, +# "eventId": "ApplicationSessionEndBi", +# "timestamp": sessionIdEndTime, +# "totalFgTime": sessionDuration +# } +# console_session_data.append(session_start) +# console_session_data.append(session_end) + +# pc_df = spark.createDataFrame(pc_session_data) +# console_df = spark.createDataFrame(console_session_data) + +# pc_df.repartition(1).write.format('delta').mode('append').saveAsTable('gaming_sessionization_demo.rtm_workload.pc_sessions') +# console_df.repartition(1).write.format('delta').mode('append').saveAsTable('gaming_sessionization_demo.rtm_workload.console_sessions') +# minute_counter +=1 +# minute_start += timedelta(minutes=1) + + + +# COMMAND ---------- + +# %sql +# create table gaming_sessionization_demo.rtm_workload.pc_sessions_bkp as select * from gaming_sessionization_demo.rtm_workload.pc_sessions@v59; +# create table gaming_sessionization_demo.rtm_workload.console_sessions_bkp as select * from gaming_sessionization_demo.rtm_workload.console_sessions@v59; + +# COMMAND ---------- + +# Testing Delayed SessionEnd +# %sql +# insert into gaming_sessionization_demo.rtm_workload.pc_sessions values (9402035216, 'ApplicationSessionStart', '6fb86ebc-3d88-4e31-b913-ea939acacf56', 'd03fae97-999999872866342', '2025-11-01T00:06:45.000+00:00', 0); +# insert into gaming_sessionization_demo.rtm_workload.pc_sessions values (9402035216, 'ApplicationSessionEndBi', '6fb86ebc-3d88-4e31-b913-ea939acacf56', 'd03fae97-999999872866342', '2025-11-01T00:10:45.000+00:00', 240); +# select * from gaming_sessionization_demo.rtm_workload.pc_sessions where hostpcid = '6fb86ebc-3d88-4e31-b913-ea939acacf56' order by timestamp + +# COMMAND ---------- + +# Testing another SessionStart while existing session is still active +# %sql +# insert into gaming_sessionization_demo.rtm_workload.console_sessions values (3432308279, 'ApplicationSessionStart', 'e77de5a1-05fb-46d1-be9c-aff02fc4fede', '087aa3fe-999999286732815', '2025-11-01T00:08:17.000+00:00', 0); +# insert into gaming_sessionization_demo.rtm_workload.console_sessions values (3432308279, 'ApplicationSessionEndBi', 'e77de5a1-05fb-46d1-be9c-aff02fc4fede', '087aa3fe-999999286732815', '2025-11-01T00:13:20.000+00:00', 303) +# select * from gaming_sessionization_demo.rtm_workload.console_sessions_stream where openPsid = 'e77de5a1-05fb-46d1-be9c-aff02fc4fede' order by timestamp + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC use catalog gaming_sessionization_demo; +# MAGIC create schema if not exists rtm_workload; +# MAGIC use schema rtm_workload; +# MAGIC +# MAGIC drop table if exists gaming_sessionization_demo.rtm_workload.pc_sessions_stream; +# MAGIC drop table if exists gaming_sessionization_demo.rtm_workload.console_sessions_stream; + +# COMMAND ---------- + +from datetime import timedelta + +pc_sessions_df = spark.table('gaming_sessionization_demo.rtm_workload.pc_sessions_bkp') +min_max_timestamp = pc_sessions_df.agg(F.min('timestamp').alias("min_timestamp"),F.max('timestamp').alias("max_timestamp")).collect()[0] +start_time = min_max_timestamp.min_timestamp +end_time = min_max_timestamp.max_timestamp + +interval = timedelta(seconds=1) + +current = start_time +while current < end_time: + interval_start = current + interval_end = current + interval - timedelta(microseconds=1) # inclusive end + print(f"Interval: {interval_start} — {interval_end}") + + filter_df = pc_sessions_df.filter(F.col('timestamp').between(F.lit(interval_start), F.lit(interval_end))) + filter_df.repartition(1).write.format('delta').mode('append').saveAsTable('gaming_sessionization_demo.rtm_workload.pc_sessions_stream') + + current += interval + + +# COMMAND ---------- + +from datetime import timedelta + +console_sessions_df = spark.table('gaming_sessionization_demo.rtm_workload.console_sessions_bkp') +min_max_timestamp = console_sessions_df.agg(F.min('timestamp').alias("min_timestamp"),F.max('timestamp').alias("max_timestamp")).collect()[0] +start_time = min_max_timestamp.min_timestamp +end_time = min_max_timestamp.max_timestamp + +interval = timedelta(seconds=1) + +current = start_time +while current < end_time: + interval_start = current + interval_end = current + interval - timedelta(microseconds=1) # inclusive end + print(f"Interval: {interval_start} — {interval_end}") + + filter_df = console_sessions_df.filter(F.col('timestamp').between(F.lit(interval_start), F.lit(interval_end))) + filter_df.repartition(1).write.format('delta').mode('append').saveAsTable('gaming_sessionization_demo.rtm_workload.console_sessions_stream') + + current += interval + + +# COMMAND ---------- + +# pc_sessions_df = spark.table('gaming_sessionization_demo.rtm_workload.pc_sessions') +# min_max_timestamp = pc_sessions_df.agg(F.min('timestamp').alias("min_timestamp"),F.max('timestamp').alias("max_timestamp")).collect()[0] +# start_time = min_max_timestamp.min_timestamp +# end_time = min_max_timestamp.max_timestamp + +# minute_start = start_time +# while minute_start < end_time: +# for i in range(12): +# sec_start = i * 5 +# sec_end = 5 * (i + 1) - 1 +# # Ensure last interval ends at 59 +# if sec_end > 59: +# sec_end = 59 +# interval_start = minute_start.replace(second=sec_start) +# interval_end = minute_start.replace(second=sec_end) +# print(f"Interval: {interval_start} — {interval_end}") +# filter_df = pc_sessions_df.filter(F.col('timestamp').between(F.lit(interval_start), F.lit(interval_end))) +# filter_df.repartition(1).write.format('delta').mode('append').saveAsTable('gaming_sessionization_demo.rtm_workload.pc_sessions_stream') + +# minute_start += timedelta(minutes=1) + +# COMMAND ---------- + +# console_sessions_df = spark.table('gaming_sessionization_demo.rtm_workload.console_sessions') +# min_max_timestamp = console_sessions_df.agg(F.min('timestamp').alias("min_timestamp"),F.max('timestamp').alias("max_timestamp")).collect()[0] +# start_time = min_max_timestamp.min_timestamp +# end_time = min_max_timestamp.max_timestamp + +# minute_start = start_time +# while minute_start < end_time: +# for i in range(12): +# sec_start = i * 5 +# sec_end = 5 * (i + 1) - 1 +# # Ensure last interval ends at 59 +# if sec_end > 59: +# sec_end = 59 +# interval_start = minute_start.replace(second=sec_start) +# interval_end = minute_start.replace(second=sec_end) +# print(f"Interval: {interval_start} — {interval_end}") +# filter_df = console_sessions_df.filter(F.col('timestamp').between(F.lit(interval_start), F.lit(interval_end))) +# filter_df.repartition(1).write.format('delta').mode('append').saveAsTable('gaming_sessionization_demo.rtm_workload.console_sessions_stream') + +# minute_start += timedelta(minutes=1) + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC ALTER TABLE gaming_sessionization_demo.rtm_workload.pc_sessions_stream DISABLE PREDICTIVE OPTIMIZATION; +# MAGIC ALTER TABLE gaming_sessionization_demo.rtm_workload.console_sessions_stream DISABLE PREDICTIVE OPTIMIZATION; \ No newline at end of file