How to build stateful streaming applications with Apache Flink

Take advantage of Flink’s DataStream API, ProcessFunctions, and SQL support to build event-driven or streaming analytics applications

How to build stateful streaming applications with Apache Flink
Thinkstock

Apache Flink is a framework for implementing stateful stream processing applications and running them at scale on a compute cluster. In a previous article we examined what stateful stream processing is, what use cases it addresses, and why you should implement and run your streaming applications with Apache Flink.

In this article, I will present examples for two common use cases of stateful stream processing and discuss how they can be implemented with Flink. The first use case is event-driven applications, i.e., applications that ingest continuous streams of events and apply some business logic to these events. The second is the streaming analytics use case, where I will present two analytical queries implemented with Flink’s SQL API, which aggregate streaming data in real-time. We at Data Artisans provide the source code of all of our examples in a public GitHub repository.

Before we dive into the details of the examples, I will introduce the event stream that is ingested by the example applications and explain how you can run the code that we provide.

A stream of taxi ride events

Our example applications are based on a public data set about taxi rides that happened in New York City in 2013. The organizers of the 2015 DEBS (ACM International Conference on Distributed Event-Based Systems) Grand Challenge rearranged the original data set and converted it into a single CSV file from which we are reading the following nine fields.

  • Medallion—an MD5 sum id of the taxi
  • Hack_license—an MD5 sum id of the taxi license
  • Pickup_datetime—the time when passengers were picked up
  • Dropoff_datetime—the time when passengers were dropped off
  • Pickup_longitude—the longitude of the pick-up location
  • Pickup_latitude—the latitude of the pick-up location
  • Dropoff_longitude—the longitude of the drop-off location
  • Dropoff_latitude—the latitude of the drop-off location
  • Total_amount—total paid in dollars

The CSV file stores the records in ascending order of their drop-off time attribute. Hence, the file can be treated as an ordered log of events that were published when a trip ended. In order to run the examples that we provide on GitHub, you need to download the data set of the DEBS challenge from Google Drive.

All example applications sequentially read the CSV file and ingest it as a stream of taxi ride events. From there on, the applications process the events just like any other stream, i.e., like a stream that is ingested from a log-based publish-subscribe system, such as Apache Kafka or Kinesis. In fact, reading a file (or any other type of persisted data) and treating it as a stream is a cornerstone of Flink’s approach to unifying batch and stream processing.

Running the Flink examples

As mentioned earlier, we published the source code of our example applications in a GitHub repository. We encourage you to fork and clone the repository. The examples can be easily executed from within your IDE of choice; you don’t need to set up and configure a Flink cluster to run them. First, import the source code of the examples as a Maven project. Then, execute the main class of an application and provide the storage location of the data file (see above for the link to download the data) as a program parameter.

Once you have launched an application, it will start a local, embedded Flink instance inside the application’s JVM process and submit the application to execute it. You will see a bunch of log statements while Flink is starting and the job’s tasks are being scheduled. Once the application is running, its output will be written to the standard output.

Building an event-driven application in Flink

Now, let discuss our first use case, which is an event-driven application. Event-driven applications ingest streams of events, perform computations as the events are received, and may emit new events or trigger external actions. Multiple event-driven applications can be composed by connecting them together via event log systems, similar to how large systems can be composed from microservices. Event-driven applications, event logs, and application state snapshots (known as savepoints in Flink) comprise a very powerful design pattern because you can reset their state and replay their input to recover from a failure, to fix a bug, or to migrate an application to a different cluster.

In this article we will examine an event-driven application that backs a service, which monitors the working hours of taxi drivers. In 2016, the NYC Taxi and Limousine Commission decided to restrict the working hours of taxi drivers to 12 hour shifts and require a break of at least eight hours before the next shift may be started. A shift starts with the beginning of the first ride. From then on, a driver may start new rides within a window of 12 hours. Our application tracks the rides of drivers, marks the end time of their 12-hour window (i.e., the time when they may start the last ride), and flags rides that violated the regulation. You can find the full source code of this example in our GitHub repository.

Our application is implemented with Flink’s DataStream API and a KeyedProcessFunction. The DataStream API is a functional API and based on the concept of typed data streams. A DataStream<T> is the logical representation of a stream of events of type T. A stream is processed by applying a function to it that produces another data stream, possibly of a different type. Flink processes streams in parallel by distributing events to stream partitions and applying different instances of functions to each partition.

The following code snippet shows the high-level flow of our monitoring application.

// ingest stream of taxi rides.
DataStream<TaxiRide> rides = TaxiRides.getRides(env, inputPath);

DataStream<Tuple2<String, String>> notifications = rides
   // partition stream by the driver’s license id
   .keyBy(r -> r.licenseId)
   // monitor ride events and generate notifications
   .process(new MonitorWorkTime());

// print notifications
notifications.print();

The application starts ingesting a stream of taxi ride events. In our example, the events are read from a text file, parsed, and stored in TaxiRide POJO objects. A real-world application would typically ingest the events from a message queue or event log, such as Apache Kafka or Pravega. The next step is to key the TaxiRide events by the licenseId of the driver. The keyBy operation partitions the stream on the declared field, such that all events with the same key are processed by the same parallel instance of the following function. In our case, we partition on the licenseId field because we want to monitor the working time of each individual driver.

Next, we apply the MonitorWorkTime function on the partitioned TaxiRide events. The function tracks the rides per driver and monitors their shifts and break times. It emits events of type Tuple2<String, String>, where each tuple represents a notification consisting of the license ID of the driver and a message. Finally, our application emits the messages by printing them to the standard output. A real-world application would write the notifications to an external message or storage system, like Apache Kafka, HDFS, or a database system, or would trigger an external call to immediately push them out.

Now that we’ve discussed the overall flow of the application, let’s have a look at the MonitorWorkTime function, which contains most of the application’s actual business logic. The MonitorWorkTime function is a stateful KeyedProcessFunction that ingests TaxiRide events and emits Tuple2<String, String> records. The KeyedProcessFunction interface features two methods to process data: processElement() and onTimer(). The processElement() method is called for each arriving event. The onTimer() method is called when a previously registered timer fires. The following snippet shows the skeleton of the MonitorWorkTime function and everything that is declared outside of the processing methods.

public static class MonitorWorkTime
    extends KeyedProcessFunction<String, TaxiRide, Tuple2<String, String>> {

  // time constants in milliseconds
  private static final long ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 hours
  private static final long REQ_BREAK_TIME = 8 * 60 * 60 * 1000;     // 8 hours
  private static final long CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 hours

 private transient DateTimeFormatter formatter;

  // state handle to store the starting time of a shift
  ValueState<Long> shiftStart;

  @Override
  public void open(Configuration conf) {
    // register state handle
    shiftStart = getRuntimeContext().getState(
      new ValueStateDescriptor<>(“shiftStart”, Types.LONG));
    // initialize time formatter
    this.formatter = DateTimeFormat.forPattern(“yyyy-MM-dd HH:mm:ss”);
  }

  // processElement() and onTimer() are discussed in detail below.
}

The function declares a few constants for time intervals in milliseconds, a time formatter, and a state handle for keyed state that is managed by Flink. Managed state is periodically checkpointed and automatically restored in case of a failure. Keyed state is organized per key, which means that a function will maintain one value per handle and key. In our case, the MonitorWorkTime function maintains a Long value for each key, i.e., for each licenseId. The shiftStart state stores the starting time of a driver’s shift. The state handle is initialized in the open() method, which is called once before the first event is processed.

Now, let’s have a look at the processElement() method.

@Override
public void processElement(
    TaxiRide ride,
    Context ctx,
    Collector<Tuple2<String, String>> out) throws Exception {

  // look up start time of the last shift
  Long startTs = shiftStart.value();

  if (startTs == null ||
    startTs < ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

    // this is the first ride of a new shift.
    startTs = ride.pickUpTime;
    shiftStart.update(startTs);
    long endTs = startTs + ALLOWED_WORK_TIME;
    out.collect(Tuple2.of(ride.licenseId,
      “You are allowed to accept new passengers until “ + formatter.print(endTs)));

    // register timer to clean up the state in 24h
    ctx.timerService().registerEventTimeTimer(startTs + CLEAN_UP_INTERVAL);
  } else if (startTs < ride.pickUpTime - ALLOWED_WORK_TIME) {
    // this ride started after the allowed work time ended.
    // it is a violation of the regulations!
    out.collect(Tuple2.of(ride.licenseId,
      “This ride violated the working time regulations.”));
  }
}

The processElement() method is called for each TaxiRide event. First, the method fetches the start time of the driver’s shift from the state handle. If the state does not contain a start time (startTs == null) or if the last shift started more than 20 hours (ALLOWED_WORK_TIME + REQ_BREAK_TIME) earlier than the current ride, the current ride is the first ride of a new shift. In either case, the function starts a new shift by updating the start time of the shift to the start time of the current ride, emits a message to the driver with the end time of the new shift, and registers a timer to clean up the state in 24 hours.

If the current ride is not the first ride of a new shift, the function checks if it violates the working time regulation, i.e., whether it started more than 12 hours later than the start of the driver’s current shift. If that is the case, the function emits a message to inform the driver about the violation.

The processElement() method of the MonitorWorkTime function registers a timer to clean up the state 24 hours after the start of a shift. Removing state that is no longer needed is important to prevent growing state sizes due to leaking state. A timer fires when the time of the application passes the timer’s timestamp. At that point, the onTimer() method is called. Similar to state, timers are maintained per key, and the function is put into the context of the associated key before the onTimer() method is called. Hence, all state access is directed to the key that was active when the timer was registered.

Let’s have a look at the onTimer() method of MonitorWorkTime.

@Override
public void onTimer(
    long timerTs,
    OnTimerContext ctx,
    Collector<Tuple2<String, String>> out) throws Exception {

  // remove the shift state if no new shift was started already.
  Long startTs = shiftStart.value();
  if (startTs == timerTs - CLEAN_UP_INTERVAL) {
    shiftStart.clear();
  }
}

The processElement() method registers timers for 24 hours after a shift started to clean up state that is no longer needed. Cleaning up the state is the only logic that the onTimer() method implements. When a timer fires, we check if the driver started a new shift in the meantime, i.e., whether the shift starting time changed. If that is not the case, we clear the shift state for the driver.

1 2 Page 1
Page 1 of 2