Author

皇家理工学院

Paris Carbone1 Gyula Fora ´
2 Stephan Ewen3 Seif Haridi1,2 Kostas Tzoumas3
1KTH Royal Institute of Technology - {parisc,haridi}@kth.se
2Swedish Institute of Computer Science - {gyfora, seif}@sics.se
3Data Artisans GmbH - {stephan, kostas}@data-artisans.com

Abstract

Distributed stateful stream processing enables the deployment and execution of large scale continuous omputations in the cloud, targeting both low latency and high throughput. One of the most fundamental challenges of this paradigm is providing processing guarantees under potential failures. Existing approaches rely on periodic global state snapshots that can be used for failure recovery. Those approaches suffer from two main drawbacks. First, they often stall the overall computation which impacts ingestion. Second, they eagerly persist all records in transit along with the operation states which results in larger snapshots than required. In this work we propose Asynchronous Barrier Snapshotting (ABS), a lightweight algorithm suited for modern dataflow execution engines that minimises space requirements. ABS persists only operator states on acyclic execution topologies while keeping a minimal record log on cyclic dataflows. We implemented ABS on Apache Flink, a distributed analytics engine that supports stateful stream processing. Our evaluation shows that our algorithm does not have a heavy impact on the execution, maintaining linear scalability and performing well with frequent snapshots.

Keywords fault tolerance, distributed computing, stream processing, dataflow, cloud computing, state management

1. Introduction

Distributed dataflow processing is an emerging paradigm for data intensive computing which allows continuous computations on data in high volumes, targeting low end-to-end latency while guaranteeing high throughput. Several time-critical applications could benefit from dataflow processing systems such as Apache Flink [1] and Naiad [11], especially in the domains of real-time analysis (e.g. predictive analytics and complex event processing). Fault tolerance is of paramount importance in such systems, as failures cannot be afforded in most real-world use cases. Currently known approaches that guarantee exactly-once semantics on stateful processing systems rely on global, consistent snapshots of the execution state. However, there are two main drawbacks that make their application inefficient for real-time stream processing. Synchronous snapshotting techniques stop the overall execution of a distributed computation in order to obtain a consistent view of the overall state. Furthermore, to our knowledge all of the existing algorithms for distributed snapshots include records that are in transit in channels or unprocessed messages throughout the execution graph as part of the snapshotted state. Most often this includes state that is larger than required.
In this work, we focus on providing lightweight snapshotting, specifically targeted at distributed stateful dataflow systems, with low impact on performance. Our solution provides asynchronous state snapshots with low space costs that contain only operator states in acyclic execution topologies. Additionally, we cover the case of cyclic execution graphs by applying downstream backup on selected parts of the topology while keeping the snapshot state to minimum. Our technique does not halt the streaming operation and it only introduces a small runtime overhead. The contributions of this paper can be summarised as follows:

  • We propose and implement an asynchronous snapshotting algorithm that achieves minimal snapshots on acyclic execution graphs.
  • We describe and implement a generalisation of our algorithm that works on cyclic execution graphs.
  • We show the benefits of our approach compared to the state-of-the-art using Apache Flink Streaming as a base system for comparisons.

The rest of the paper is organised as follows: Section 2 gives an overview of existing approaches for distributed global snapshots in stateful dataflow systems. Section 3 provides an overview of the Apache Flink processing and execution model followed by Section 4 where we describe our main approach to global snapshotting in detail. Our recovery scheme is described briefly in Section 5. Finally, Section 6 summarises our implementation followed by our evaluation in Section 7 and future work and conclusion in Section 8.

Several recovery mechanisms have been proposed during the last decade for systems that do continuous processing [4, 11]. Systems that emulate continuous processing into stateless distributed batch computations such as Discretized Streams and Comet [6, 15] rely on state recomputation. On the other hand, stateful dataflow systems such as Naiad, SDGs, Piccolo and SEEP [3, 5, 11, 12] , which are our main focus in this work, use checkpointing to obtain consistent snapshots of the global execution for failure recovery.

The problem of consistent global snapshots in distributed environments, as introduced by Chandy and Lamport [4], has been researched extensively throughout the last decades [4, 7, 8]. A global snapshot theoretically reflects the overall state of an execution, or a possible state at a specific instance of its operation. A simple but costly approach employed by Naiad [11] is to perform a synchronous snapshot in three steps: first halting the overall computation of the execution graph, then performing the snapshot and finally instructing each task to continue its operation once the global snapshot is complete. This approach has a high impact on both throughput and space requirements due to the need to block the whole computation, while also relying on upstream backup that logs emitted records at the producer side. Another popular approach, originally proposed by Chandy and Lamport [4], that is deployed in many systems today is to perform snapshots asynchronously while eagerly doing upstream backup [4, 5, 10]. This is achieved by distributing markers throughout the execution graph that trigger the persistence of operator and channel state. This approach though still suffers from additional space requirements due to the need of an upstream backup and as a result higher recovery times caused by the reprocessing of backup records. Our approach extends the original asynchronous snapshotting idea of Chandy and Lamport, however, it considers no backup logging of records for acyclic graphs while also keeping very selective backup records on cyclic execution graphs.

Our current work is guided by the need for fault tolerance on Apache Flink Streaming, a distributed stream analytics system that is part of the Apache Flink Stack (former Stratosphere [2]). Apache Flink is architectured around a generic runtime engine uniformly processing both batch and streaming jobs composed of stateful interconnected tasks. Analytics jobs in Flink are compiled into directed graphs of tasks. Data elements are fetched from external sources and routed through the task graph in a pipelined fashion. Tasks are continuously manipulating their internal state based on the received inputs and are generating new outputs.

3.1 The Streaming Programming Model

The Apache Flink API for stream processing allows the composition of complex streaming analytics jobs by exposing unbounded partitioned data streams (partially ordered sequences of records) as its core data abstraction, called DataStreams. DataStreams can be created from external sources (e.g. message queues, socket streams, custom generators) or by invoking operations on other DataStreams. DataStreams support several operators such as map, filter and reduce in the form of higher order functions that are applied incrementally per record and generate new DataStreams. Every operator can be parallelised by placing parallel instances to run on different partitions of the respective stream, thus, allowing the distributed execution of stream transformations.

The code example in 1 shows how to implement a simple incremental word count in Apache Flink. In this program words are read from a text file and the current count for each word is printed to the standard output. This is a stateful streaming program as sources need to be aware of their current file offset and counters need to maintain the current count for each word as their internal state.

Figure 1: The execution graph for incremental word count

Figure 1: The execution graph for incremental word count

1
2
3
4
5
6
val env : StreamExecutionEnvironment = ...
env.setParallelism(2)

val wordStream = env.readTextFile(path)
val countStream = wordStream.groupBy(_).count
countStream.print

Example 1: Incremental Word Count

3.2 Distributed Dataflow Execution

When a user executes an application all DataStream operators compile into an execution graph that is in principle a directed graph $G=(T, E)$ , similarly to Naiad [11] where vertices $T$ represent tasks and edges $E$ represent data channels between tasks. An execution graph is depicted in Fig. 1 for the incremental word count example. As shown, every instance of an operator is encapsulated on a respective task. Tasks can be further classified as sources when they have no input channels and sinks when no output channels are set. Furthermore, $M$ denotes the set of all records transferred by tasks during their parallel execution. Each task $t\in T$ encapsulates the independent execution of an operator instance and is composed of the following: (1) a set of input and output channels: $I_t, O_t \subseteq E$; (2) an operator state $s_t$ and (3) a user defined function (UDF) $f_t$. Data ingestion is pull-based : during its execution each task consumes input records, updates its operator state and generates new records according to its user defined function.

More specifically, for each record $r\in M$ received by a task $t\in T$ a new state $s^{’}_t$

0
t
is produced along
with a set of output records D ⊆ M according to its
UDF ft
st
,r 7→ hs
0
t
,Di.

$$
fafas
$$

$\dot{a}$