Writing a Custom Map-State for Storm Trident

One of Storm’s common uses is in doing real-time ETL – where a topology performs some processing on a stream and persists state to an external data store. Often, the state being stored is a map-state, where we perform a grouping and aggregation on the stream and keep track of the results in a map-like data structure. An example of this would be consuming a stream of page visits, splitting the stream by user ID and performing a count operation – maintaining a map containing a real-time count of the number of pages each user has visited.

When using Storm, there is a choice to make around whether to use core Storm or Storm Trident. Trident is an abstraction on top of Storm which adds some additional capabilities (and complexity). The additional complexity means that for performance-critical applications, Trident is usually not a suitable choice – however there are a few good reasons to choose Trident:

  • Trident takes a micro-batching approach to stream processing, meaning we can make use of optimisations involving bulk reads and writes to external data stores.
  • Trident has built in abstractions for maintaining state in a fault-tolerant way. We could implement our own way of maintaining state on top of core Storm, but Trident provides this out-of-the-box.
  • The state abstractions support maintaining different types of state, such as transactional or opaque (discussed later). This is good when you need exactly-once tuple processing or guarantees beyond core Storm’s at-least-once guarantee.
  • Trident provides a nice API on top of Storm, for partitioning streams, performing aggregations and groupings, and persisting state.

Here’s what our earlier example might look like, as a Storm Trident topology:

An interesting part of the Trident API is how state is persisted, via the call to persistentAggregate. In the example above, we instruct Trident to perform an aggregation on the stream – counting the number of tuples for each userId and store the results.

There are three arguments to persistentAggregate:

  • A state implementation – a class that acts as a factory for building Trident states. In the example, the call to persistentAggregate comes immediately after a groupBy operation. Trident recognises that we are operating on a grouped stream and therefore expects a state implementation that implements the MapState interface.
  • An aggregator – in this case Count. The aggregator simply defines a state to assign to every observed tuple (in this case, 1) and a function for combining two states (in this case, performing addition).
  • Output fields – if a new stream is derived from the aggregation, the values of the state are output as a tuple with the fields defined here. In our example, the count aggregator produces a count, so we output this as a single field, count. Since we are operating on a grouped stream, the output tuples in the derived stream will contain two fields: userId and count

Trident is often demonstrated with the MemoryMapState – a map-state implementation which stores all state in memory. In practice, you usually want to persist state to a non-volatile, fault-tolerant store. If your choice of data store doesn’t already have a Trident state implementation, you may have to write one yourself. The rest of this post will show what’s required to write your own Trident map-state, with examples from a map-state implementation that I wrote for Aerospike, a distributed key-value store.

Most map-state implementations in Trident provide three ways to store and update state, each of which yields different processing guarantees:

  • Non-transactional – No processing guarantee. In this case the map-state will store only the latest result for each key, for example a count.
  • Transactional – Exactly once processing guarantee, as long as the spout is also transactional. This means that the spout guarantees that every tuple appears in exactly one batch, and two batches with the same ID will always contain the same tuples. In this case the map-state will store the latest result for each key, along with a batch ID corresponding to the batch of the most recent update.
  • Opaque – Exactly once processing guarantee, as long as the spout is also opaque. This means that the spout guarantees that every tuple is processed at least once, although a tuple may fail in one batch and then succeed in a later batch. In this case the map-state will store the latest result for each key, a batch ID corresponding to the batch of the most recent update and finally the previous value for the key.

Of course, transactional or opaque states are usually desirable, as they can provide exactly-once processing guarantees. They store additional information for each key, so the tradeoff comes with using more space. The fault-tolerance logic for these three types of map-state are the same for any implementation and Trident conveniently provides wrapper classes for each, which will wrap an underlying backing map with the fault-tolerance mechanics. We just need to implement the IBackingMap interface, and provide the logic for doing batch reads and writes to our underlying datastore. Here’s how this might look:

In this example I’ve wrapped the backing map with Trident’s CachedMap implementation, which caches writes in-memory. This improves read performance and reduces load on our underlying data store, especially if it is likely that the same key will be updated multiple times within a short time period.

Implementing the multiGet and multiPut methods of IBackingMap is where the bulk of our logic goes. This is where we write our custom logic to interact with our underlying data store and perform batch reads and writes. Typically, multiPut would iterate over the keys in the batch, construct a batch write request, and finally perform a batch write. In my case, Aerospike does not support batch writes, so I had to perform a write for every key:

The last piece is to define a way serialize the values to and from your underlying data store. Note the reference to serializerHelper above – this helper contains functions to map between Trident values and values that the Aerospike client expects. I wrote a serializer for non-transactional, transactional and opaque states, which define mappings for Object, Trident’s TransactionalValue, and Trident’s OpaqueValue respectively:

That’s it – we are now ready to try out our new state implementation. In summary, the steps we followed were:

  1. Implement IBackingMap to do multiGets and multiPuts to the underlying data store
  2. Write serializers for each of the three state value types
  3. Wrap our IBackingMap implementation with one of Trident’s map implementations, providing the appropriate serializer

Trident will handle the tricky fault-tolerance logic, and we can sit back and enjoy the processing guarantees 🙂

Cross Datacenter Mirroring with Kafka

Having recently began working for one of Europe’s leading online ad-tech providers, I have been involved in the configuration of a new high-load Kafka cluster with the aim of handling up to 300K messages/second. It really is a high-load requirement – as even LinkedIn, the original developers of Kafka are processing a lower volume, at around 100K messages/second.

One of the choices we faced in setting up the new cluster was centered around how to publish messages from various datacenters around the world. We could either publish to a single Kafka cluster, located in our main datacenter, or we could have a separate Kafka cluster in each region. The network connection between datacenters is slow and potentially unreliable, so it didn’t make sense to publish to a single Kafka cluster from anywhere in the world as there would be a considerable risk of data-loss. For this reason, we wanted to have a separate, local Kafka cluster within each region, but ultimately would need all data to be pushed to our main cluster. In fact, Kafka ships with a tool designed specifically for this purpose – to replicate Kafka topics from one cluster to another. Note that this is not the usual subject of replication in relation to Kafka, which refers to maintaining replicas of Kafka topics within a single cluster for fault-tolerance.

The MirrorMaker tool consists of a consumer and producer. For fault tolerance, multiple MirrorMaker processes can be fired up for the same mirroring task, within the same consumer group. If one fails, consumer re-balancing will take place as usual. The diagram below shows the full picture.

Mirroring data between two Kafka clusters
Mirroring data between two Kafka clusters

Starting a MirrorMaker process is simple; run the following command from the Kafka installation directory:

bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config config/consumer_source_cluster.properties --producer.config config/producer_target_cluster.properties --whitelist mytopic --num.producers 2 --num.streams 4

Here, the number of consumer and producer threads are specified by the num.streams and num.producers parameters respectively. One thing worth mentioning here is that the topic list parameter accepts a string in the format of a Java regex, so it’s possible to specify topic names with wildcards.

Confirming that the MirrorMaker is keeping up with the volume of messages in the source cluster’s topics can be done by running the built in offset checker and checking that the offset lag for each partition is not growing indefinitely:

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect zookeeperhost:2181 --group mirrormaker

There is a possibility of data loss when using the MirrorMaker – in the case where the producer is unable to produce messages and the consumer is still able to consume. This could occur if there were a network outage which only affected the producer. In this scenario, the consumer will continue to consume messages and commit offsets. This will likely be for a short period of time as the MirrorMaker exits upon an exception in either the consumer or producer threads, but those messages consumed and not produced are lost. There is a plan to address this issue which has been accepted for the next Kafka release.

There are a few settings that can be tweaked, such as the number of consumer and producer threads, the number of MirrorMaker processes and the properties for both the consumer and producer. In the producer properties, the queue.enqueue.timeout.ms should be set to -1 to cause the producer to block indefinitely if its queue queue is full – this can help to prevent data loss in the scenario described above. I ran some tests to investigate how the configuration affects the mirroring throughput. In these tests, the MirrorMaker processes ran in the target datacenter – which seems a sensible choice given the data-loss scenario. Failure between consuming and producing should be less likely if the cross-datacenter network transfer has already taken place before producing takes place. Here were the results:

Processes Consumer Threads Producer Threads Maximum Throughput
1 6 2 6 MB/s
1 12 2 10 MB/s
1 18 2 10 MB/s
2 12 2 14 MB/s
2 12 4 19 MB/s
3 12 6 24 MB/s
3 12 8 29 MB/s

In this investigation, the source topic had 12 partitions and we found that for maximum parallelism, the total number of consumer threads across all mirroring processes should add up to the total number of partitions. Since partitions are the unit of parallelism in Kafka, this rule applies to any type of Kafka consumer. We found that if the number of producer threads too low, this can be a bottleneck – this is shown by the difference in throughput between the fourth and fifth tests. We can also see that throughput scales fairly linearly with the number of MirrorMaker processes and consumer threads.

In conclusion, the MirrorMaker tool is a convenient solution to dealing with multiple Kafka clusters in different physical locations, when you want all data to be mirrored to a ‘main’ Kafka cluster. The number of MirrorMaker processes should be high enough to handle failure and the number of consumer and producer threads within these processes can be tweaked to achieve a target throughput. There is a data-loss scenario in the current version of MirrorMaker, but the probability of data-loss can be minimised by running MirrorMaker in the target datacenter.

Building a Search Engine for E-Commerce with Elasticsearch

This is a continuation of my previous post on search engines. Having been involved with using Elasticsearch to build a search engine for e-commerce, there are some interesting ideas which I have taken away from the experience. I will go through some of the design decisions made and problems encountered along the way.

Tweaking the Search Query
Elasticsearch provides a huge variety of different query types, each of which has a different approach to retrieving search results. For example, the term query will find documents containing a certain term, the fuzzy query will match documents containing terms which are approximately equal to a given term, the geo-shape query enables you to perform useful queries over documents containing longitude and latitude coordinates and many more. Any of these query types can be composed using compositional query types, such as the bool query.

When I talk about tweaking the search query, I mean choosing the query types to use and structuring our query in a way that will enable us to achieve optimal results for any kind of search over the product base. The two main properties of the search query which we are trying to optimise are:

  1. How to best determine which products match
  2. How to best determine the order (i.e. relative importance) of the matching products

One of the first options that we considered using was the query-string query: a query type that parses your query and decides what query types to use, and often does the sensible thing. For instance, jeans will match any document containing the term jeans, blue jeans will match any document containing both ‘blue’ and ‘jeans’ and "blue jeans" will match documents containing both terms together (an exact match). You can even make more complex searches, like blue jeans -(levi OR diesel) which will match documents containing ‘blue’ and ‘jeans’ but not the terms ‘levi’ or ‘diesel’. This all seems quite nice at first, but this query type can give very unexpected results if used incorrectly – for example t-shirt will match documents containing ‘t’, but will exclude all documents containing ‘shirt’. Of course, customers can’t be expected to understand why this happens or how to fix their query.

Another option is the multi-match query, which takes a list of fields to query over and builds a sequence of match queries. Similar to the query-string query, each field can each be given a different boost factor, which is used in determining relative importance of some terms matching a given field. You can also tweak the type of multi_match – e.g. whether the query terms must all be found in a single field, or can be found in different fields (but not necessarily in the same field). Here is an example of a multi_match query:

{
  "multi_match" : {
    "query":      "gladiator russell crowe",
    "type":       "best_fields",
    "fields":     [ "title^10", "actors^5", "description^1" ]
  }
}

Here, we have used the boost operator (^) to indicate that the title field should be given the most importance. The query will still be matched against the description field, but matches in the title and actors fields will be given a higher score and will appear first in our result set. The best_fields type gives precedence when the query terms appear in the same field, but they don’t have to.

When matching across multiple fields, it can be tricky to figure out which documents are most relevant to the query. If we just consider which fields matched the query, then we won’t get optimal results. For example, if the query is ps4 controller and a document contains ‘ps4’ in the title field and ‘controller’ in the description field, then should it be given a higher score than a document which contains both terms in the title field but neither in the description field? The first document has more matching fields, but intuitively, the second document should surely be considered a more relevant result. Elasticsearch provides a solution to this: the disjunction-max (dis-max) query. This enables us to perform sub-queries over multiple fields and take the score of the best matching field (i.e. the maximum scoring sub-query), instead of summing the scores from each matching sub-query. In practice, this often yields better results and is in fact the default behaviour for the multi-match query.

We used the multi-match query successfully in production for quite some time, but ultimately decided to switch to using the common terms query. This is an interesting query type which provides not only a way to determine stop words dynamically, but also a way to not completely disregard stop words at search time. When using the multi-match query, we assigned a stop word filter to each field in our mapping, which uses a pre-defined list of stop words to remove the most common and semantically useless words from the index. This is usually good because those words don’t add much meaning and make searching the index slower. But is this always a good thing? Consider the video game ‘The Last Of Us’. A search for the last of us would result in all products containing the term ‘last’ and those products would not be ordered very sensibly, since the other three terms (all stop words) would have been thrown away. In this scenario, the common terms query is much more effective. Instead of removing stop words, it uses term frequencies across the whole index to determine which terms are important and which occur frequently enough to be considered stop words. In this example, ‘last’ would likely be deemed an important term, while ‘the’, ‘of’ and ‘us’ would be deemed less important. The common terms query then splits searching into two steps:

  1. Use the important terms to determine the result set
  2. Use the less important terms to order the result set

This way, stop words are not completely thrown away, but they are not considered until after the product result set has been determined. This fits very nicely with the two search query properties we are trying to optimise towards. To keep the cross-field search benefits of the multi-match, the common terms query can be wrapped inside a dis-max and have a boost factor applied to each field:

{
  "dis_max" : {
    "tie_breaker" : 0.3,
    "queries" : [
      {
       "common": {
          "title": {
          "query": "the last of us",
          "boost": 10,
          "cutoff_frequency": 0.001
          }
       }
      },
      {
        "common": {
          "studio": {
          "query": "the last of us",
          "boost": 3,
          "cutoff_frequency": 0.001
          }
        }
      },
      {
        "common": {
          "description": {
          "query": "the last of us",
          "boost": 1,
          "cutoff_frequency": 0.001
          }
        }
      }
    ]
  }
}

With this query, we saw an uplift in product page visits from search and more customers were clicking products returned in the first two rows of their search results. This demonstrates that our use of the common terms query was enabling customers to find the most relevant products more easily.

Customising the Score Function
As we’ve seen, Elasticsearch provides us with many clever ways to score search results such that the most relevant products appear first. But in e-commerce, there are some factors that are worth considering outside of how relevant the product itself is. This includes things like:

  • How popular is the product?
  • Is the product in stock?
  • Was the product released recently?
  • How profitable are sales of the product?

For example, if a product is extremely popular at the moment, then perhaps it should be boosted above other search results. Or, if a product is out of stock, we probably don’t want to show it in the first few search results.

To factor in this information, we can design our own scoring function which will adjust the scores computed by our Elasticsearch query. This is done by wrapping the main search query in a custom_score query. We can then provide a script which modifies the original score (denoted by _score) by using fields from the index and a set of parameters. This way, we could index a field such as ‘product_popularity’ into our product documents, and then boost the _score for more popular products. We would make it possible to assign different levels of importance to each factor with an adjustable weighting for each parameter. Normalisation is also important to ensure we operate on the same scale for each factor. Here’s an example of this with just the product popularity factor:

"custom_score": {
    "params": {
        "scoreWeighting": 2,
        "popularityWeighting": 5,
        "maxPopularity": x
    },
    "query": {...},
    "script": "scoreWeighting * _score + (popularityWeighting * (doc['popularity'].value / maxPopularity))"
}

In practice, our score function considers a lot more than the product popularity and is dynamically generated by the search service using a set of configurable parameters which can be changed at any time without a redeployment.

Achieving Faceted Search
Faceted search is a way of enhancing the search experience by enabling the user to navigate their search results by applying a set of filters. Faceted navigation is now seen on the majority of online retail sites and probably looks very familiar to you:

An example of faceted search
An example of faceted search

A facet is a set of filters. In the above example, there are three facets: category, sub-category and price. Sometimes, more complex faceting may be desirable – for instance, you might want it so when you apply the ‘DVD’ category filter, you are then given a choice of movie genres to filter by. This is called a nested facet, as it is a facet within a facet.

With Elasticsearch, it is fairly painless to set up faceted search. First, you will need to have in your mapping a non-analysed version of each field you want to facet on:

{
    "category": {
        "type":     "string",
        "index":    "not_analyzed"
    }
}

We use the not_analyzed setting because at index time we want the field to be mapped as an exact field, so that later, the filter options (in this case categories) will appear exactly as they were indexed.

Now, at query time, we can append a terms aggregation to our query:

{
    "aggs" : {
        "categories" : {
            "terms" : { "field" : "category" }
        }
    }
}

Our response will now contain all the information we need about categories for the given query. Elasticsearch will give us a breakdown of counts for each type of category within the result set for our query:

 "aggregations" : {
        "categories" : {
            "doc_count_error_upper_bound": 0, 
            "sum_other_doc_count": 0, 
            "buckets" : [ 
                {
                    "key" : "Merchandise",
                    "doc_count" : 856
                },
                {
                    "key" : "Clothing",
                    "doc_count" : 455
                },
                ... etc ...
            ]
        }
    }
}

Now, when a user clicks on a category, such as Clothing, we usually want our search results to be filtered to display only clothing, however the facet counts for categories should remain unchanged – the Merchandise facet count should still be 856. To achieve this, we can use Elasticsearch filters instead of extending the query. In this example, we would append a terms filter on the category field, with the term ‘Clothing’. This will achieve the behaviour we want because filters are not considered when computing the facet counts – the search results will be filtered, but the facet counts will remain unchanged.

Implementing Instant Search
Instant search is where the search engines assists you with your search while you type. There are several variants of this:

  1. Displaying products relevant to what the customer has typed so far
  2. Displaying search suggestions – predicting what the customer is going to type next (AKA auto-complete)
  3. Detecting spelling mistakes and suggesting corrections

It is actually possible to achieve all of the above with a single Elasticsearch query! We achieved this by using an n-grams analyzer for auto-complete, a shingle analyzer for search suggestions and Elasticsearch’s in-built term and phrase suggester for spelling correction. Check out my colleague’s post here for a complete example of how to achieve this.

Handling Distributed Search
Elasticsearch is an excellent example of a sophisticated distributed system which hides much of the inherent complexity from the user. Behind the scenes, problems like partitioning documents into shards, balancing shards across the cluster, replicating data to maintain fault-tolerance and efficiently routing requests between nodes are handled. All you have to do is configure a couple of settings in your elasticsearch.yaml – such as the number of shards to split each index into and the number of replicas to keep of each shard.

While configuring distributed search is pretty easy, there are some more complex issues which should be addressed. One of these issues is: how can we make a change to our mapping (i.e. the index) without causing any downtime to our search engine. For an e-commerce company, any form of downtime translates directly to a loss in revenue, and with our large product base, re-populating the search indices is a lengthy process which takes several hours. This problem can be solved using index aliases – an Elasticsearch feature which enables us to set up something similar to a symbolic link – an index alias which always points to a live and fully prepared index. For example, we can set up an alias, products which points to a specific version of our products index:

PUT /products_v1/_alias/products

Now, we can make a change to our mapping and populate a new index, products_v2, wait until we are satisfied that all data has been indexed and shards balanced, before finally switching our alias to point to the new index.

There are some problems that come with the distributed nature of Elasticsearch. Imagine performing a search, getting 10 search results in the response, then refreshing the page and seeing 50 search results. How can search be non-deterministic?! Well, this is a problem that we encountered. This problem comes about as a result of Elasticsearch making optimisations and using approximate term frequencies to determine results. Each shard has a subset of documents in the index, and by default, Elasticsearch will use the shard’s term frequencies as an approximation for the actual term frequencies. When using the common terms query as described earlier, a term may fall under the threshold for being considered a stop word on one shard, but may be over the threshold on another shard. So, depending on which node a query gets routed to, we can end up with different results. Most of the time, this isn’t a problem as term frequencies should be very similar across all shards, providing there is enough data and the data is evenly distributed. But, if it does become a problem, full accuracy can be achieved by changing the default query type to dfs_query_and_fetch, by appending &search_type=dfs_query_then_fetch to the search URI. This query type performs an additional round-trip, collecting term frequencies from all nodes and calculating a global term frequency, before sending the query to all shards and computing results using the global frequencies. This ensures results are always accurate, but comes at the cost of some additional latency.

A similar problem can be seen in faceting. Facet counts are computed on each shard and then aggregated on the node designated as coordinator. If, say, our request is for the top 10 terms within a facet, then each node will return it’s locally computed top 10 facet elements. In cases where there are more than 10 terms, accuracy can be lost. To address this problem, a recent version of Elasticsearch introduced a shard_size attributed which can be set on the facet query, and specifies the number of elements each shard should return. This is separate from the size attribute – i.e. the number of elements we actually want. Asking each shard to return more elements is of course more expensive, but will give higher accuracy when it is needed.

Conclusions

  • It is hard to find a query which works well for every search. If there is a particular search found to yield bad results, it can be easy to optimise towards improving and fixing that search, but then other searches end up suffering as a result. When making changes to the search query, always think: will this work well for both general searches and specific searches?
  • Use filters for faceting, to filter search results without affecting facet counts. Also, Elasticsearch filters are (by default) cached, so can boost performance.
  • The three types of instant search: product suggestions, search suggestions and spelling corrections can be achieved with a single Elasticsearch query – providing the title field is configured with both a shingles and n-grams analyzer.
  • You should always A/B test whenever you make a change to the search experience. It can be invaluable to have good reporting on things like ‘searches which yield no results’ to easily catch problem with changes to the query.
  • Use index aliases to make large changes while maintaining zero-downtime.
  • There can be non-deterministic results with a distributed search engine, but with Elasticsearch these problems can be resolved at the cost of additional latency.
  • The search experience makes a big difference. It not only enables customers to discover the products they are looking for, but a well-tuned search experience can also can help them discover things they weren’t explicitly searching for. We saw significant boosts in revenue from search every time we made improvements to the search engine.