Advertisement

A Year of Blink at Alibaba: Apache Flink in Large Scale Production

By on

Click to learn more about author Xiaowei Jiang.

It has been a great year for Blink, our fork of Apache Flink®, at Alibaba. We went into production with Blink about a year ago, and since then, we have used it to make real-time updates to listings in various search products such as Taobao, Tmall, AliExpress, etc. We’ve also used it to update features and personalize search results at real time. Singles’ Day (Nov 11th) is by far the largest shopping holiday in China. Lots of products go on sale for 50% off. It recorded $17.8 billion worth of gross merchandise volume (GMV) in one day. User’s behavior is quite different than usual and also changes frequently, so real-time training becomes important. But the massive real-time data processing required presents a daunting engineering challenge. Fortunately, we were able to use Blink to train and update our ranking models in real-time. This alone increased our purchase conversion rate by around 30% in search!

Our team is responsible for all the data processing for search products at Alibaba, and we started looking into Flink almost two years ago. A search product like ours usually includes a daily batch job that produces all the search documents at once as well as a streaming job that delivers incremental updates to the search engine. Previously, we used Map/Reduce for the daily snapshots and an internally-developed streaming framework for the incremental updates. There were a few challenges with this approach. We needed to maintain two codebases, one for the daily job and one for the incremental update job, which means more work and reduced productivity. The logical consistency between these jobs also presents a non-trivial problem. In addition, it can be difficult to make sure that we process each update exactly once. Of course, scalability and performance are always huge challenges at Taobao’s scale.


In looking into ways to solve this problem, we realized that we needed a compute engine that is capable of unified batch and stream processing, provides exactly once semantics, and is able to scale up to thousands of machines. There are two common ways to unify batch and stream. We can treat batch as a first-class citizen and break a stream down into many small batches. This is the micro-batching approach. Alternatively, we can consider a stream to be fundamental and treat batch as a special case of streaming (a finite stream).

The problem with the first (micro-batching) approach is that it’s hard to achieve very low latency and maintain high throughput. Flink uses the second, true-streaming approach, which unifies batch and streaming without sacrificing either latency or throughput. This was exciting to us, and we wanted to use Flink as our compute engine.

Early on, we discovered a few issues running our use cases in production, so we started our Blink project with the aim to make Flink work more effectively for us at very large scale. I’ll go over a few major improvements below.

One issue was how Flink interacted with our resource management system. We use YARN to manage resources in our Hadoop cluster. Flink pre-allocates resources from YARN when a Flink cluster starts, and it was difficult to dynamically resize the Flink cluster after that. This can lead to inefficiency in resource usage. To solve for this, we made various architectural changes to integrate with YARN natively. As a result, we can dynamically request/release resources. And as a bonus, we also removed the single-point bottleneck of the JobManager, which allows us to run thousands of jobs in a cluster.

We also had an issue with Flink’s checkpoints. Flink uses the Chandy-Lamport snapshot algorithm to arrive at a globally consistent view of the system. But every time Flink makes a checkpoint, it needs to go over all state data. In complex jobs, the state can be huge and may also grow over time. It quickly becomes impractical to do this efficiently in production. We implemented an incremental checkpoint mechanism that allows us to go over new state data only.

When we deploy a job, we have to choose the degree of parallelism for it. But we might need to change this later, for example, when a product becomes more popular so we have more data to process. This can be complex because Flink is a stateful compute engine, and state needs to be redistributed when we rescale our jobs. We solved this problem by using over-sharding. This allows us to rescale jobs efficiently without losing state.

In a distributed system, machines fail from time to time. It’s critical to recover from such failures as quickly as possible. We also improved the failover strategy so that we only do the minimal work required for a proper recovery.

Our system was running well after these updates, so we started porting our A/B Testing framework to Blink, but we encountered another obstacle. Our A/B Testing framework has lots of jobs, and it was going to be quite some work to rewrite them all in Flink’s DataStream API, so we looked and discovered Table API support in Flink. Table API is basically a Scala version of SQL. Unfortunately, its support for streaming was very limited (only SELECT and WHERE clause were supported at the time). So we spent another month or so to add missing functionality such as stream-stream join, UDF, UDTF, UDAGG, windowing, retraction etc. With this work, our A/B Testing jobs became very succinct in Table API.

But this is a gross understatement of the power of Table API/SQL. Allowing much simpler user code is just one of its great benefits. It’s also essential in unifying batch and stream processing. Flink’s batch and stream share the same underlying runtime. However, there are two distinct APIs – DataSet for batch and DataStream for stream — and this is at odds with our initial goal of unification. Table API/SQL addresses this problem by providing a unified interface to the users.

Let me explain how this works semantically. A stream can be considered as a changelog for an imaginary table. This table’s content changes over time as we apply more entries from the changelog. We refer to such time varying tables as dynamic tables. On the other hand, if we have a dynamic table, we can derive its changelog as a stream. So in some sense, the stream (i.e. the changelog) and the dynamic table contain the same information. This is the stream-table duality. With this duality, it’s possible to define a unified semantics for batch and stream processing in Table API/SQL. A job written in Table API/SQL can run in either batch or stream mode. Their results are compatible with each other. We can even mix these two modes, i.e., run such a job in batch mode first to produce an initial snapshot then run it in stream mode to continuously update the snapshot.

There is another important reason why we bet on the Table API/SQL. SQL is known widely as a successful declarative language for data processing. It is powerful in the sense that almost any data processing can be described this way. It’s also sufficiently high level, which allows the framework to do essential optimizations. This relieves most users from worrying about hand tuning their jobs. There is still a long way to go in this direction, but we are off to a good start.

We were having lots of fun on our own, and then a very important transition happened when we engaged with the open source community shortly after we went into production with our A/B Testing framework. We exchanged our work and our plans. There was lots of excitement in our discussions, and we found that we were working on exactly the same issues as members of the community. We quickly came to the conclusion that it would be beneficial for both the Flink community and Alibaba to work together, so we started our journey to integrate the changes in Blink back to Flink.

A year had passed since we created our branch when we started the Blink-Flink integration effort. So much has progressed in Flink, and the transition will be a difficult one. But we truly believe in the power of the community and are willing to spend the effort on this process.

The first thing we tackled is the resource management integration (FLIP-6). We made various architectural changes to the runtime so Flink can work with any resource management system. The work is a bit slow in the beginning as we were learning the best way to work with the community. But the effort certainly paid off. After a few months’ hard work together with the Flink community, FLIP-6 is mostly in Flink’s master branch. We also made lots of progress in integrating incremental checkpoints, fast failover and various other changes back to Flink’s master. We expect the work to move much more quickly from now on. There’s also lots of collaboration happening on Table API/SQL. We worked with the community to define the semantics of a unified batch/stream Table API/SQL. We have integrated most of our improvements in Table API/SQL back to Flink’s master.

Looking back, it was no doubt a huge year for Blink and Flink at Alibaba. No one thought that we would make this much progress in a year, and we are very grateful to all the people who helped us in the community. Flink is proven to work at the very large scale. We are more committed than ever to continue our work with the community to move Flink forward!

Leave a Reply