One of Storm’s common uses is in doing real-time ETL – where a topology performs some processing on a stream and persists state to an external data store. Often, the state being stored is a map-state, where we perform a grouping and aggregation on the stream and keep track of the results in a map-like data structure. An example of this would be consuming a stream of page visits, splitting the stream by user ID and performing a count operation – maintaining a map containing a real-time count of the number of pages each user has visited.
When using Storm, there is a choice to make around whether to use core Storm or Storm Trident. Trident is an abstraction on top of Storm which adds some additional capabilities (and complexity). The additional complexity means that for performance-critical applications, Trident is usually not a suitable choice – however there are a few good reasons to choose Trident:
- Trident takes a micro-batching approach to stream processing, meaning we can make use of optimisations involving bulk reads and writes to external data stores.
- Trident has built in abstractions for maintaining state in a fault-tolerant way. We could implement our own way of maintaining state on top of core Storm, but Trident provides this out-of-the-box.
- The state abstractions support maintaining different types of state, such as transactional or opaque (discussed later). This is good when you need exactly-once tuple processing or guarantees beyond core Storm’s at-least-once guarantee.
- Trident provides a nice API on top of Storm, for partitioning streams, performing aggregations and groupings, and persisting state.
Here’s what our earlier example might look like, as a Storm Trident topology:
1 2 3 4 5 |
TridentTopology topology = new TridentTopology(); TridentState userCounts = topology.newStream("page_visits", spout) .groupBy(new Fields("userId")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) .newValuesStream(); |
An interesting part of the Trident API is how state is persisted, via the call to persistentAggregate
. In the example above, we instruct Trident to perform an aggregation on the stream – counting the number of tuples for each userId and store the results.
There are three arguments to persistentAggregate
:
- A state implementation – a class that acts as a factory for building Trident states. In the example, the call to
persistentAggregate
comes immediately after agroupBy
operation. Trident recognises that we are operating on a grouped stream and therefore expects a state implementation that implements theMapState
interface. - An aggregator – in this case
Count
. The aggregator simply defines a state to assign to every observed tuple (in this case, 1) and a function for combining two states (in this case, performing addition). - Output fields – if a new stream is derived from the aggregation, the values of the state are output as a tuple with the fields defined here. In our example, the count aggregator produces a count, so we output this as a single field, count. Since we are operating on a grouped stream, the output tuples in the derived stream will contain two fields: userId and count
Trident is often demonstrated with the MemoryMapState
– a map-state implementation which stores all state in memory. In practice, you usually want to persist state to a non-volatile, fault-tolerant store. If your choice of data store doesn’t already have a Trident state implementation, you may have to write one yourself. The rest of this post will show what’s required to write your own Trident map-state, with examples from a map-state implementation that I wrote for Aerospike, a distributed key-value store.
Most map-state implementations in Trident provide three ways to store and update state, each of which yields different processing guarantees:
- Non-transactional – No processing guarantee. In this case the map-state will store only the latest result for each key, for example a count.
- Transactional – Exactly once processing guarantee, as long as the spout is also transactional. This means that the spout guarantees that every tuple appears in exactly one batch, and two batches with the same ID will always contain the same tuples. In this case the map-state will store the latest result for each key, along with a batch ID corresponding to the batch of the most recent update.
- Opaque – Exactly once processing guarantee, as long as the spout is also opaque. This means that the spout guarantees that every tuple is processed at least once, although a tuple may fail in one batch and then succeed in a later batch. In this case the map-state will store the latest result for each key, a batch ID corresponding to the batch of the most recent update and finally the previous value for the key.
Of course, transactional or opaque states are usually desirable, as they can provide exactly-once processing guarantees. They store additional information for each key, so the tradeoff comes with using more space. The fault-tolerance logic for these three types of map-state are the same for any implementation and Trident conveniently provides wrapper classes for each, which will wrap an underlying backing map with the fault-tolerance mechanics. We just need to implement the IBackingMap
interface, and provide the logic for doing batch reads and writes to our underlying datastore. Here’s how this might look:
1 2 3 4 5 6 7 8 9 10 11 |
public State makeState(StateType stateType, Long maxCacheSize) { IBackingMap state = new SomeCustomStateImplementation(...); CachedMap cachedMap = new CachedMap(myBackingMap, maxCacheSize); if (stateType == StateType.NON_TRANSACTIONAL) { return NonTransactionalMap.build(cachedMap); } else if (stateType == StateType.OPAQUE) { return OpaqueMap.build(cachedMap); } else if (stateType == StateType.TRANSACTIONAL) { return TransactionalMap.build(cachedMap); } } |
In this example I’ve wrapped the backing map with Trident’s CachedMap
implementation, which caches writes in-memory. This improves read performance and reduces load on our underlying data store, especially if it is likely that the same key will be updated multiple times within a short time period.
Implementing the multiGet
and multiPut
methods of IBackingMap
is where the bulk of our logic goes. This is where we write our custom logic to interact with our underlying data store and perform batch reads and writes. Typically, multiPut
would iterate over the keys in the batch, construct a batch write request, and finally perform a batch write. In my case, Aerospike does not support batch writes, so I had to perform a write for every key:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@Override public void multiPut(List<List<Object>> keys, List<T> vals) { for (int i = 0; i < keys.size(); i++) { ExtractedKey extractedKey = keyExtractor.build(keys.get(i)); Key key; if (options.keyType == AerospikeOptions.AerospikeKeyType.LONG) { key = new Key(options.namespace, options.set, (Long) extractedKey.recordKey); } else { key = new Key(options.namespace, options.set, (String) extractedKey.recordKey); } Bin bin = new Bin(binName, serializerHelper.toSimpleType(vals.get(i))); client.put(null, key, bin); } } |
The last piece is to define a way serialize the values to and from your underlying data store. Note the reference to serializerHelper
above – this helper contains functions to map between Trident values and values that the Aerospike client expects. I wrote a serializer for non-transactional, transactional and opaque states, which define mappings for Object
, Trident’s TransactionalValue
, and Trident’s OpaqueValue
respectively:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
import com.aerospike.client.Value; import storm.trident.state.OpaqueValue; import storm.trident.state.TransactionalValue; import java.io.Serializable; import java.util.List; public class SerializationUtils { private SerializationUtils() {} public interface SerializerHelper<T> extends Serializable { Value toSimpleType(T value); T toComplexType(Object object); } public static class NonTransactionalSerializerHelper implements SerializerHelper<Object> { @Override public Value toSimpleType(Object value) { return Value.get(value); } @Override @SuppressWarnings("unchecked") public Object toComplexType(Object value) { return value; } } public static class TransactionalSerializerHelper implements SerializerHelper<TransactionalValue> { @Override public Value toSimpleType(TransactionalValue value) { Value[] result = new Value[2]; result[0] = Value.get(value.getVal()); result[1] = Value.get(value.getTxid()); return Value.get(result); } @Override @SuppressWarnings("unchecked") public TransactionalValue toComplexType(Object object) { if (object != null) { List<Object> values = (List<Object>) object; return new TransactionalValue((long) values.get(1), values.get(0)); } return null; } } public static class OpaqueTransactionalSerializerHelper implements SerializerHelper<OpaqueValue> { @Override public Value toSimpleType(OpaqueValue value) { Value[] result = new Value[3]; result[0] = Value.get(value.getCurr()); result[1] = Value.get(value.getPrev()); result[2] = Value.get(value.getCurrTxid()); return Value.get(result); } @Override @SuppressWarnings("unchecked") public OpaqueValue toComplexType(Object object) { if (object != null) { List<Object> values = (List<Object>) object; return new OpaqueValue((long) values.get(2), values.get(0), values.get(1)); } return null; } } } |
That’s it – we are now ready to try out our new state implementation. In summary, the steps we followed were:
- Implement
IBackingMap
to domultiGet
s andmultiPut
s to the underlying data store - Write serializers for each of the three state value types
- Wrap our
IBackingMap
implementation with one of Trident’s map implementations, providing the appropriate serializer
Trident will handle the tricky fault-tolerance logic, and we can sit back and enjoy the processing guarantees 🙂