Streaming Data
This chapter so far has explored data preparation methods for batch-driven ETL. You have learned the steps and techniques to get raw data from a source system, transform it into a historical archive, create an analytics layer, and finally do feature engineering and data splitting. We'll now make a switch to streaming data. Many of the concepts you have learned for batch processing are also relevant for stream processing; however, things (data) move a bit more quickly and timing becomes important.
When preparing streaming event data for analytics, for example, to be used in a model, some specific mechanisms come into play. Essentially, a data stream goes through the same steps as raw batch data: it has to be loaded, modeled, cleaned, and filtered. However, a data stream has no beginning and ending, and time is always important; therefore, the following patterns and practices need to be applied:
- Windows
- Event time
- Watermarks
We'll explain these topics in the next sections.
Windows
In many use cases, it's not possible to process all the data of a never-ending stream. In those cases, it is required to split up the data stream into chunks of time. We call these chunks windows of time. To window a data stream means to split it up into manageable pieces, for example, one chunk per hour.
There are several types of windows:
- A tumbling window has a fixed length and does not overlap with other windows. Events (the inpidual records) in a stream with tumbling windows only belong to one window.
- A sliding window has a fixed length but overlaps other sliding windows for a certain duration. Events in a stream with sliding windows belong to multiple windows since one window will not be completed yet when other windows begin. The window size is the length of the window; the window slide determines the frequency of windows.
- A session window is a window that is defined by the events themselves. The start and end of a session window are determined by data fields, for example, the login and log out actions of a website. It's also possible to define session windows by a series of following events with a gap in between. Session windows can overlap, but events in a stream with session windows can only belong to one session.
The following figure displays these kinds of windows in an overview. Each rectangle depicts a window that is evaluated in the progression of time. Each circle represents an event that occurred at a certain moment in time. For example, the first tumbling window contains two events, labeled 1 and 2. It's clear to see that sliding windows and session windows are more complex to manage than tumbling windows: events can belong to multiple windows and it's not always clear to which window an event belongs.
Modern stream processing frameworks such as Apache Spark Structured Streaming and Apache Flink provide built-in mechanisms for windowing, and customizable code to allow you to define your time windows.
While windowing the data in your stream is not necessary, it provides a great way to work with streaming data.
Event Time
It's important to realize that events contain multiple timestamps. We can identify three important ones:
- The event time: The timestamp that the event occurs. For example, the moment a person clicks on a button on a web page (say, at 291 milliseconds after 16:45).
- The ingestion time: The timestamp at which the event enters the stream processing software engine. This is slightly later than the event time since the event has to travel across the network and perhaps a few data stores (for example, a message bus such as Kafka). An example of the ingestion time is the moment that the mouse-click of the customer enters the stream processing engine (say, at 302 milliseconds after 16:45).
- The processing time: The timestamp at which the event is evaluated by the software that runs as a job within the stream processing engine. For example, the moment that the mouse-click is compared to the previous mouse-click in a window, to determine whether a customer is moving to another section of a website (say, at 312 milliseconds after 16:45).
In most event data, the event time is included as a data field. For example, money transactions contain the actual date and time when the transfer was made. This should be fairly accurate and precise, for example, in the order of milliseconds. The processing time is also available in the data stream processor; it's just the server time, defined by the clock of the infrastructure where the software runs. The ingestion time is somewhat rare and hardly used; it might be included as an extra field in events if you know that the processing of events takes a long time and you want to perform performance tests.
When analyzing events, the event time is the most useful timestamp to work with. It's the most accurate indication of the event. When the order of events is important, the event time is the only timestamp that guarantees the right order, since latency can cause out-of-order effects, as shown in the following figure:
In Figure 3.17 the dark blue circles represent timestamps. The blue arrows indicate the differences in time between the timestamps. For each event, event time always occurs first, followed by the ingestion time, and finally the processing time. Since these latencies can differ per event, as indicated by the highlighted circles, the out-of-order effect can occur.
Late Events and Watermarks
At the end of each window, the events in that window are evaluated. They can be processed as a batch in a similar way as normal ETL processes: filtering, enriching, modeling, aggregating, and so on. But the end of a window is an arbitrary thing. There should be a timed trigger that tells the data stream processor to start evaluating the window. For that trigger, the software could look at the processing time (its server clock). Or, it could look at the event data that comes in and trigger the window evaluation once an event comes in with a timestamp that belongs to the next window. All these methods are slightly flawed; after all, we want to look at the event time, which might be very different from the processing time due to network latency. So, we can never be sure that a trigger is timed well; there might always be events arriving in the data stream that belong to the previous window. They just happened to arrive a bit later. To cater to this, the concept of watermarking was introduced.
A watermark is an event that triggers the calculation of a window. It sets the evaluation time for the window a bit later than the actual end-time of the window, to allow late events to arrive. This bit of slack is all that is needed to make sure that most of the events are evaluated in the right window where they belong, and not in the next window or even ignored (see Figure 3.18).
In the next exercise, you'll build a stream processing job with Spark where the concepts of windows, event time, and watermarks are used.
Exercise 3.03: Streaming Data Processing with Spark
In this exercise, we are going to connect to a data stream and process the events. You'll connect to a real Twitter feed and aggregate the incoming data by specifying a window and counting the number of tweets. We'll use the Spark Structured Streaming library, which is rich in functionality and easy to use.
Since we are going to connect to Twitter, you'll need a Twitter developer account:
- Go to https://developer.twitter.com/ and create an account (or log in with your regular Twitter account if you already have one).
- Apply for an API by clicking Apply and selecting the purpose of Exploring the API.
- Fill in the form where you have to state the usage of Twitter data.
- After approval, you'll see an app in the Apps menu. Select the new app and navigate to the Keys and tokens tab. Click Generate to generate your access token and secret. Copy these and store them in a safe location, for example, a local file.
- Click Close and also copy the API key and API secret key in the local file. Never store these on the internet or in source code.
- Make sure that Oracle Java 8, 9, or 10 is installed on your system. Java 11 is not supported by Spark yet. Also, make sure that your JAVA_HOME environment variable is set; if not, do so by pointing it to your local Java folder, for example, export JAVA_HOME=/usr/lib/jvm/java-8-oracle.
- We'll use the tweepy library to connect to Twitter. Get it by following the installation instructions in the Preface.
There are two parts to this exercise. First, we will connect to Twitter and get a stream of tweets. We'll write them to a local socket. This is not supported for production, but it will do fine in a local development scenario. In production, it's common to write the raw event data to a message bus such as Kafka.
Perform the following steps to complete the exercise:
- Create a new Python file in your favorite editor (for example, PyCharm or VS Code) or a Jupyter Notebook and name it get_tweets.py or get_tweets.ipynb.
- If you're using Jupyter Notebook, install the tweepy library by entering the following lines in a cell and running it:
import sys
!conda install --yes --\
prefix {sys.prefix} -c conda-forge tweepy
- If you're using an IDE, make sure that tweepy is installed by typing the following command in your Anaconda console or shell:
pip install tweepy
- Connect to the Twitter API by entering the following code:
import socket
import tweepy
from tweepy import OAuthHandler
# TODO: replace the tokens and secrets with your own Twitter API values
ACCESS_TOKEN = ''
ACCESS_SECRET = ''
CONSUMER_KEY = ''
CONSUMER_SECRET = ''
Replace the values in the strings with the keys and secrets that you wrote down in the preparation part of this exercise.
- To connect to Twitter, the tweepy library first requires us to log in with the Oauth credentials that are in the keys and secrets. Let's create a dedicated method for this:
def connect_to_twitter():
auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
api = tweepy.API(auth)
- Now let's handle the events by adding some lines to the connect_to_twitter function. Put the following lines in the cell with the connect_to_twitter function:
my_stream_listener = MyStreamListener()
my_stream = tweepy.Stream(auth=api.auth, listener=my_stream_listener)
# select a (limited) tweet stream
my_stream.filter(track=['#AI'])
The connect_to_twitter function now authenticates us at the Twitter API, hooks up the MyStreamListener event handler, which sends data to the socket and starts receiving data from Twitter in a stream with the #AI search keyword. This will produce a limited stream of events since the Twitter API will only pass through a small subset of the actual tweets.
- Next, the tweepy library needs us to inherit the StreamListener class. We have to specify what to do when a tweet comes in an override of the on_data() function. In our case, we send all data (the entire tweet in JSON format) to the local socket:
class MyStreamListener(tweepy.StreamListener):
def on_error(self, status_code):
if status_code == 420:
return False
def on_data(self, data):
print(data)
# send the entire tweet to the socket on localhost where pySpark is listening
client_socket.sendall(bytes(data, encoding='utf-8'))
return True
We will now set up a local socket that has to be connected to Twitter. Python has a library called socket for this, which we imported already at the top of our file.
- Write the following lines to set up a socket at port 1234:
s = socket.socket()
s.bind(("localhost", 1234))
print("Waiting for connection...")
s.listen(1)
client_socket, address = s.accept()
print("Received request from: " + str(address))
- The final step is to call the connect_to_twitter method at the end of the file. We are still editing the same file in the IDE; add the following lines at the end:
# now that we have a connection to pySpark, connect to Twitter
connect_to_twitter()
- Run the get_tweets.py file from a Terminal (macOS or Linux) or Command Prompt (Windows)or Anaconda Prompt using the following command:
python get_tweets.py
You should get the following output:
The first part is now done. Run this file to set up the socket and the connection to Twitter. If all is well, there will be a Python job running with no output but Waiting for connection…. Since the socket has to respond to a caller, we have to create a socket client that listens to the same port.
The next part is to connect a Spark job to the socket on localhost (or a Kafka topic in production), get the tweets as a data stream, and perform a window operation on them.
- Create a new Python file or Jupyter Notebook and name it spark_twitter.py or spark_twitter.ipynb.
Note
The spark_twitter Jupyter Notebook can be found here:
- If you have done Exercise 3.02, Building an ETL Job Using Spark, PySpark is already installed on your local machine. If not, install PySpark with the following lines:
import sys
!conda install --yes --prefix {sys.prefix} \
-c conda-forge pyspark
- We first have to connect to a Spark cluster or a local instance. Enter the following lines in the file, notebook, or Python shell:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, \
window, to_timestamp
from pyspark.sql.types import StructType, \
StructField, StringType
- Enter the following line to create a Spark session:
spark = SparkSession.builder.appName('Packt').getOrCreate()
- To connect to the socket on localhost, enter the following line:
raw_stream = spark.readStream.format('socket')\
.option('host', 'localhost')\
.option('port', 1234).load()
Now, we have a stream of raw strings. These should be converted from JSON to a useful format, namely, a text field with the tweet text and a timestamp field that contains the event time. Let's do that in the next step.
- We'll define the JSON schema and add the string format that Twitter uses for its timestamps:
tweet_datetime_format = 'EEE MMM dd HH:mm:ss ZZZZ yyyy'
schema = StructType([StructField('created_at', \
StringType(), True),\
StructField('text', StringType(), True)])
- We can now convert the JSON strings with the from_json PySpark function:
tweet_stream = raw_stream.select(from_json('value', schema).alias('tweet'))
- The created_at field is still a string, so we have to convert it to a timestamp with the to_timestamp function:
timed_stream = tweet_stream\
.select(to_timestamp('tweet.created_at', \
tweet_datetime_format)\
.alias('timestamp'),\
'tweet.text')
- At this moment, you might want to check whether you're receiving the tweets and doing the parsing right. Add the following code:
query = timed_stream.writeStream.outputMode('append')\
.format('console').start()
query.awaitTermination()
Note
This function is not complete yet. We'll add some code to it in the next step.
- Run the file on a Terminal (macOS or Linux) or Command Prompt (Windows) or Anaconda Prompt using the following command:
python get_tweets.py
You should get a similar output to this:
- If all is fine, let's remove the last two lines of Python code and continue with the windowing function. We can create a sliding window of 1 minute and a slide of 10 seconds with the following statement:
windowed = timed_stream \
.withWatermark('timestamp', '2 seconds') \
.groupBy(window('timestamp', '1 minute', '10 seconds'))
As you can see, the code also contains a watermark that ensures that we have a slack of 2 seconds before the window evaluates. Now that we have a windowed stream, we have to specify the evaluation function of the window. In our case, this is a simple count of all the tweets in the window.
- Enter the following code to count all the tweets in the window:
counts_per_window = windowed.count().orderBy('window')
There are two more lines to get our stream running. First, we have to specify the output mode (or sink) for the stream. In many cases, this will be a Kafka topic again, or a database table. In this exercise, we'll just output the stream to the console. The awaitTermination() call has to be done to signal to Spark that it should start executing the stream:
query = counts_per_window.writeStream.outputMode('complete')\
.format('console').option("truncate", False).start()
query.awaitTermination()
Now we have created a windowed stream of tweets, where for each window of 1 minute we count the total number of tweets that come in with hashtag #AI. The final output will be similar to this:
As you are experiencing now, your code runs in an infinite loop. The stream never ends. But, of course, the processing of the stream on your local machine can be stopped. To stop the stream processor, simply press Ctrl + C.
Note
To access the source code for this specific section, please refer to https://packt.live/2BYZEXE.
In this exercise, you have used Spark Structured Streaming to analyze a live Twitter feed. In the next activity, you'll continue to work with this framework to analyze the tweets and count the words in a certain time window.
Activity 3.02: Counting the Words in a Twitter Data Stream to Determine the Trending Topics
In this activity, you'll connect to Twitter in the same way as in Exercise 3.03, Streaming Data Processing with Spark. You can reuse the get_tweets.py or get_tweets.ipynb file that we have created. Only, this time, your goal is to group and count the words in the specified time window rather than the total amount of tweets. In this way, you can create an overview of the trending topics per time window.
Note
The code can be found here: https://packt.live/3iX0ODx.
Perform the following steps to complete the activity:
- Create a new Python file and name it spark_twitter.py.
- Write the Python code for the required imports and connect to a Spark cluster or a local instance with the SparkSession.builder object.
- Connect to the socket on the localhost and create the raw data stream by calling the readStream function of a SparkSession object.
- Convert the raw event strings from JSON to a useful format, namely a text field with the tweet text and a timestamp field that contains the event time. Do this by specifying the date-time format and the JSON schema and using the from_json PySpark function.
- Convert the field that contains the event time to a timestamp with the to_timestamp function.
- Split the text of the tweets into words by using the explode and split functions.
Hint: the tutorial of Spark Structured Streaming contains an example of how to do this.
- Create a tumbling window of 10 minutes with groupBy(window(…)). Make sure to group the tweets in two fields: the window, and the words of the tweets.
- Add a watermark that ensures that we have a slack of 1 minute before the window evaluates.
- Specify the evaluation function of the window: a count of all the words in the window.
- Send the output of the stream to the console and start executing the stream with the awaitTermination function.
You should get the following output:
Note
The solution to this activity can be found on page 592.