-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path3.0-Lab-Extract-&-Load-Data.sql
More file actions
159 lines (116 loc) · 6.79 KB
/
Copy path3.0-Lab-Extract-&-Load-Data.sql
File metadata and controls
159 lines (116 loc) · 6.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
-- Databricks notebook source
-- MAGIC %md
-- MAGIC ### Extract and Load Data Lab
-- MAGIC In this lab, you will extract and load raw data from JSON files into a Delta table.
-- MAGIC
-- MAGIC #### Objectives
-- MAGIC - Create an external table to extract data from JSON files
-- MAGIC - Create an empty Delta table with a provided schema
-- MAGIC - Insert records from an existing table into a Delta table
-- MAGIC - Use a CTAS statement to create a Delta table from files
-- MAGIC
-- MAGIC #### 1.0. Import Shared Utilities and Data Files
-- MAGIC This lab will demonstrate ingesting data from a sample of **Apache Kafka** data that's been written to JSON files. Apache Kafka is a distributed event store and stream-processing platform; therefore, this data represents what would be expected from a *Streaming* data source. Each file contains records consumed in a 5-minute interval, stored with the full Kafka schema as a multi-record JSON file.
-- MAGIC
-- MAGIC | field | type | description |
-- MAGIC | --- | --- | --- |
-- MAGIC | key | BINARY | The **`user_id`** field is used as the key; this is a unique alphanumeric field that corresponds to session/cookie information |
-- MAGIC | value | BINARY | This is the full data payload (to be discussed later), sent as JSON |
-- MAGIC | topic | STRING | While the Kafka service hosts multiple topics, only those records from the **`clickstream`** topic are included here |
-- MAGIC | partition | INTEGER | Our current Kafka implementation uses only 2 partitions (0 and 1) |
-- MAGIC | offset | LONG | This is a unique value, monotonically increasing for each partition |
-- MAGIC | timestamp | LONG | This timestamp is recorded as milliseconds since epoch, and represents the time at which the producer appends a record to a partition |
-- MAGIC
-- MAGIC First, run the following cell to import the data and make various utilities available for our experimentation.
-- COMMAND ----------
-- MAGIC %run ./includes/3.0-Lab-setup
-- COMMAND ----------
-- MAGIC %md
-- MAGIC #### 2.0 Extract Raw Events From JSON Files
-- MAGIC To load this data into Delta properly, we first need to extract the JSON data using the correct schema. Create an external table against JSON files located at the filepath provided below. Name this table **`events_json`** and declare the schema above.
-- COMMAND ----------
CREATE TABLE IF NOT EXISTS events_json
(key BINARY, offset BIGINT, partition INT, timestamp BIGINT, topic STRING, value BINARY)
USING JSON
options(path = "${da.paths.datasets}/raw/events-kafka")
-- COMMAND ----------
-- MAGIC %md
-- MAGIC **NOTE**: We'll use Python to run checks occasionally throughout the lab. The following cell will return an error with a message on what needs to change if you have not followed instructions. No output from cell execution means that you have completed this step.
-- COMMAND ----------
-- MAGIC %python
-- MAGIC assert spark.table("events_json"), "Table named `events_json` does not exist"
-- MAGIC assert spark.table("events_json").columns == ['key', 'offset', 'partition', 'timestamp', 'topic', 'value'], "Please name the columns in the order provided above"
-- MAGIC assert spark.table("events_json").dtypes == [('key', 'binary'), ('offset', 'bigint'), ('partition', 'int'), ('timestamp', 'bigint'), ('topic', 'string'), ('value', 'binary')], "Please make sure the column types are identical to those provided above"
-- MAGIC
-- MAGIC total = spark.table("events_json").count()
-- MAGIC assert total == 2252, f"Expected 2252 records, found {total}"
-- COMMAND ----------
-- MAGIC %md #### 3.0. Insert Raw Events Into Delta Table
-- MAGIC ##### 3.1. Create an empty managed Delta table named **`events_raw`** using the same schema.
-- COMMAND ----------
CREATE TABLE IF NOT EXISTS events_raw
(key BINARY, offset BIGINT, partition INT, timestamp BIGINT, topic STRING, value BINARY)
USING delta
-- COMMAND ----------
-- MAGIC %md Run the cell below to confirm the table was created correctly.
-- COMMAND ----------
-- MAGIC %python
-- MAGIC assert spark.table("events_raw"), "Table named `events_raw` does not exist"
-- MAGIC assert spark.table("events_raw").columns == ['key', 'offset', 'partition', 'timestamp', 'topic', 'value'], "Please name the columns in the order provided above"
-- MAGIC assert spark.table("events_raw").dtypes == [('key', 'binary'), ('offset', 'bigint'), ('partition', 'int'), ('timestamp', 'bigint'), ('topic', 'string'), ('value', 'binary')], "Please make sure the column types are identical to those provided above"
-- MAGIC assert spark.table("events_raw").count() == 0, "The table should have 0 records"
-- COMMAND ----------
-- MAGIC %md
-- MAGIC ##### 3.2. Once the extracted data and Delta table are ready, **`insert`** the JSON records from the **`events_json`** table into the new **`events_raw`** Delta table.
-- COMMAND ----------
INSERT INTO events_raw(
key,
offset,
partition,
timestamp,
topic,
value
)
SELECT
key,
offset,
partition,
timestamp,
topic,
value
FROM events_json;
-- COMMAND ----------
-- MAGIC %md
-- MAGIC ##### 3.3. Manually review the table contents to ensure data was written as expected.
-- COMMAND ----------
SELECT * FROM events_raw;
-- COMMAND ----------
-- MAGIC %md
-- MAGIC Run the cell below to confirm the data has been loaded correctly.
-- COMMAND ----------
-- MAGIC %python
-- MAGIC assert spark.table("events_raw").count() == 2252, "The table should have 2252 records"
-- MAGIC assert set(row['timestamp'] for row in spark.table("events_raw").select("timestamp").limit(5).collect()) == {1593880885085, 1593880892303, 1593880889174, 1593880886106, 1593880889725}, "Make sure you have not modified the data provided"
-- COMMAND ----------
-- MAGIC %md #### 4.0 Create Delta Table from a Query
-- MAGIC In addition to new events data, let's also load a small lookup table that provides product details that we'll use later in the course.
-- MAGIC Use a CTAS statement to create a managed Delta table named **`item_lookup`** that extracts data from the **`parquet`** directory provided below.
-- COMMAND ----------
CREATE TABLE item_lookup
USING DELTA
AS
SELECT *
FROM parquet.`${da.paths.datasets}/raw/item-lookup`
-- COMMAND ----------
-- MAGIC %md
-- MAGIC Run the cell below to confirm the lookup table has been loaded correctly.
-- COMMAND ----------
-- MAGIC %python
-- MAGIC assert spark.table("item_lookup").count() == 12, "The table should have 12 records"
-- MAGIC assert set(row['item_id'] for row in spark.table("item_lookup").select("item_id").limit(5).collect()) == {'M_PREM_F', 'M_PREM_K', 'M_PREM_Q', 'M_PREM_T', 'M_STAN_F'}, "Make sure you have not modified the data provided"
-- COMMAND ----------
-- MAGIC %md
-- MAGIC Run the following cell to delete the tables and files associated with this lesson.
-- COMMAND ----------
-- MAGIC %python
-- MAGIC DA.cleanup()