Fault Tolerant Message Processing in Storm
Storm is fault tolerant and allows the choice of the level of guarantee with which messages to be processed:
Storm is fault tolerant and allows the choice of the level of guarantee with which messages to be processed:
at-most-once: In this mode messages could be dropped if the processing fails or is timed out. This mode requires no special handling and the messages are processed in the order produced by spouts.
at-least-once: This mode tracks whether each spout tuple is “fully” processed within a configured timeout. Any input tuple not fully processed within the timeout is re-emitted. This means the same tuple could be processed more than once and messages can be processed out-of-order. This mode does require user code to follow some “rules”, which are briefly described below.
exactly-once: With Trident, Storm can provide “exactly-once” guarantee. This will not be discussed further in this post (maybe another in the future).
If your application does require “at-least-once” guarantee, your topology code needs to do the following 3 things:
When spouts emit tuples, specify a unique message ID. If you use spout implementation, like storm-kafka or storm-kestrel, they takes care of it so you don’t need to worry about it.
When bolts emit tuples, anchor them with input tuples.
When bolts are done processing the input tuple, ack or fail the input tuple.
That’s it! If anything goes wrong, Storm would re-emit the failed spout tuples. (Actually it is the spout’s responsibility to re-emit tuples when its fail
method is called. Luckily in most cases, we don’t have to write our own spout implementation.
So how does Storm implement this? Storm’s implementation is actually quite ingenious. Besides, I believe a little bit understanding of the internals is always a good thing. It helps the grasp of concepts.
I’ll use the following topology as the example, which is composed of 2 spouts and 2 bolts. Also, there are 2 ackers configured. Note that “sid1” and “sid2” are the IDs of the 2 spout tasks.
When a spout emits a tuple, it notifies a specific acker task about the new tuple’s ID and its own task ID. Because there could be multiple acker tasks, a simple mod hash function is used to determine which acker task to notify. In this case, spout task “sid1” emits tuple “stid1” and notifies Acker1 (mod-hash(stid1)=Acker1) with “[stid1, sid1]”. Upon receiving this info, Acker1 would create an entry for “stid1”. The bookkeeping entry contains 2 information: the originating spout task ID “sid1” and a so-called “ack val”. The “ack val” is initially set to be the spout tuple’s ID “stid1”, which is “1010” in binary form (we use 4-bit value for simplicity in this example. In reality, Storm uses 64-bit value). The same happens for the other spout’s generated tuple “stid2” (which is handled by Acker2).
Bolt1 receives tuples “stid1” and “stid2” and then emits tuple “tid1” anchored to both (multi-anchoring). Since Bolt1 knows the IDs of both spout tuples received, it can easily determine the correct acker tasks to notify (using the mod-hash function). There are two input tuples, so Bolt1 needs to notify twice, one for each. For “stid1”, it notifies Acker1 with “[stid1, tid1]”. For “stid2”, it notifies Acker2 with “[stid2, tid1]”. Acker1 first looks up the entry for “stid1” to find out the value of its current “ack val”. Then it XOR the current “ack val” with the new tuple’s ID “tid1” and updates the “ack val” with the new XOR result. In this case, 1010 XOR 1100 = 0110
. The same process for the “stid2” and Acker2 updates its entry to be 1011 XOR 1100 = 0111
.
The use of XOR on all tuple IDs of a DAG (tuple tree) is ingenious. There could be thousands if not ten of thousands of tuples in a DAG and keeping track each individually is neither efficient nor scalable. This method only requires a fixed amount of memory (about 20 bytes per DAG) and is also extremely fast. Also, the XOR strategy does not rely on the ordering of messages received by ackers (see this).
After Bolt1 completes the processing of both input tuples, it ack both and also notifies Acker1 and Acker2 the tuples acked. Acker1 updates the “ack val” of “stid1” to be 0110 XOR 1010 = 1100
and Acker2 updates the “ack val” of “stid2” to be 0111 XOR 1011 = 1100
.
Finally, the last Bolt2 in this topology receives tuple “tid1”. When it acks “tid1”, it also needs to notify acker tasks with this input tuple ID “tid1” along with the originating soput tuples’ IDs. Storm always copies the originating spout tuple ID(s) into the emitted tuples so the originating spout tuple ID(s) is always available downstream. In this case, the received tuple “tid1” contains both “stid1” and “stid2”. With spout tuple IDs, Bolt2 can notify both Acker1 and Acker2 the final ack of tuple “tid1”. Both Acker1 and Acker2 then update the “ack val” of “stid1” and “stid2” to be 1100 XOR 1100 = 0000
.
At this point, both “ack val” of “stid1” and “stid2” are “0000”. When ackers see a zero “ack val”, it marks the originating spout tuple as completed and calls the ack
method of the originating spout task.