HBase Real-time Analytics & Rollbacks via Append-based Updates (Part 2)

This is the second part of a 3-part post series in which we describe how we use HBase at Sematext for real-time analytics with an append-only updates approach.

In our previous post we explained the problem in detail with the help of example and touched on the suggested solution idea. In this post we will go through solution details as well as briefly introduce the open-sourced implementation of the described approach.

Suggested Solution

Suggested solution can be described as follows:

  1. replace update (Get+Put) operations at write time with simple append-only writes
  2. defer processing updates to periodic compaction jobs (not to be confused with minor/major HBase compaction)
  3. perform on the fly updates processing only if user asks for data earlier than updates compacted

Before (standard Get+Put updates approach):

The picture below shows an example of updating search query metrics that can be collected by Search Analytics system (something we do at Sematext).

  • each new piece of data (blue box) is processed individually
  • to apply update based on the new piece of data:
    • existing data (green box) is first read
    • data is changed and
    • written back

After (append-only updates approach):

1. Writing updates:

2. Periodic updates processing (compacting):

3. Performing on the fly updates processing (only if user asks for data earlier than updates compacted):

Note: the result of the processing updates on the fly can be optionally stored back right away, so that next time same data is requested no compaction is needed.

The idea is simple and not a new one, but given the specific qualities of HBase like fast range scans and high write throughput it works especially well with HBase. So, what we gain here is:

  • high update throughput
  • real-time updates visibility: despite deferring the actual updates processing, user always sees the latest data changes
  • efficient updates processing by replacing random Get+Put operations with processing sets of records at a time (during the fast scan) and eliminating redundant Get+Put attempts when writing the very first data item
  • better handling of update load peaks
  • ability to roll back any range of updates
  • avoid data inconsistency problems caused by tasks that fail after only partially updating data in HBase without doing rollback (when using with MapReduce, for example)
Let’s take a closer look at each of the above points.

High Update Throughput

Higher update throughput is achieved by not doing Get operation for every record update. Thus, Get+Put operations are replaced with Puts (which can be further optimized by using client-side buffer), which are really fast in HBase. The processing of updates (compaction) is still needed to perform the actual data merge, but it is done much more efficiently than doing Get+Put for each update operation (see below more details).

Real-time Updates Visibility

Users always see latest data changes. Even if updates have not yet been processed and still stored as a series of records, they will be merged on the fly.

By doing periodic merges of appended records we ensure data that has to be processed on the fly is small enough and does fast.

Efficient Updates Processing

  • N Get+Put operations at write time are replaced with N Puts + 1 Scan (shared) + 1 Put operations
  • Processing N changes at once is usually more effective than applying N individual changes

Let’s say we got 360 update requests for the same record: e.g. record keeps track of some sensor value for 1 hour interval and we collect data points every 10 seconds. These measurements needs to be merged in a single record that represents the whole 1-hour interval. Initially we would perform 360 Get+Put operations (while we can use some client-side buffer to perform partial aggregation and reduce the number of actual Get+Put operations, we want data to be sent immediately as it arrives instead of asking user to wait 10*N seconds). With append-only approach, we will perform 360 Put operations, 1 scan (which is actually meant to process not only these updates) that goes through 360 records (stored in sequence), it calculates resulting record, and performs 1 Put operation to store the result back. Fewer operations means using less resources and this leads to more efficient processing. Moreover, if the value which needs to be updated is a complex one (needs time to load in memory, etc.) it is much more efficient to apply all updates at once than one by one individually.

Deferring processing of updates is especially effective when large portion of operations is in essence insertion of new data, but not an update of stored data. In this case a lot of Get operations (checking if there’s something to update) are redundant.

Better Handling of Update Load Peaks

Deferring processing of updates helps handle load peaks without major performance degradation. The actual (periodic) processing of updates can be scheduled for off-peak time (e.g. nights or weekends).

Ability to Rollback Updates

Since updates don’t change the existing data (until they are processed) rolling back is easy.

Preserving rollback ability even after updates were compacted is also not hard. Updates can be grouped and compacted within time periods of given length as shown in the picture below. That means the client that reads data will still have to merge updates on-the-fly even right after compaction is finished. However using the proper configuration this isn’t going to be a problem as the number of records to be merged on the fly will be small enough.

Consider the following example where the goal is to keep all-time avg value for particular sensor. Let’s say data is collected every 10 seconds for 30 days, which gives 259200 separately written data points. While compacting on-the-fly this amount of values might be quite fast for a medium-large HBase cluster, performing periodic compaction will improve reading speed a lot. Let’s say we perform updates processing every 4 hours and use 1 hour interval as compacting base (as shown in the picture above). This gives us, at any point in time, less than 24*30 + 4*60*6 = 2,160 non-compacted records that need to be processed on-the-fly when fetching resulting record for 30 days. This is a small number of records and can be processed very fast. At the same time it is possible to perform rollback to any point in time with 1 hour granularity.

In case system should store more historical data, but we don’t care about rolling it back (if nothing wrong was found during 30 days the data is likely to be OK) then compaction can be configured to process all data older than 30 days as one group (i.e. merge into one record).

Automatic Handling of Task Failures which Write Data to HBase

Typical scenario: task updating HBase data fails in the middle of writing – some data was written, some not. Ideally we should be able to simply restart the same task (on the same input data) so that new one performs needed updates without corrupting data because some write operations were duplicate.

In the suggested solution every update is written as a new record. In order to make sure that performing the same (literally the same, not just similar) update operation multiple times does not result in multiple separate update operations, in which case data will be corrupted, every update operation should write to the record with the same row key from any task attempts. This results in overriding the same single record (if it was created by failed task) and avoiding doing the same update multiple times which in turn means that data is not corrupted.

This especially convenient in situations when we write to HBase table from MapReduce tasks, as MapReduce framework restarts failed tasks for you. With the given approach we can say that handling task failures happens automatically – no extra effort to manually roll back previous task changes and starting a new task is needed.

Cons

Below are the major drawbacks of the suggested solution. Usually there are ways to reduce their effect on your system depending on the specific case (e.g. by tuning HBase appropriately or by adjusting parameters involved in data compaction logic).

  • merging on the fly takes time. Properly configuring periodic updates processing is a key to keeping data fetching fast.
  • when performing compaction, scanning of many records that don’t need to be compacted can happen (already compacted or “alone-standing” records). Compaction can usually be performed only on data written after the previous compaction which allows to use efficient time-based filters to reduce the impact here.

Solving these issues may be implementation-specific. We’ll bring them up again when talking about our implementation in the follow up post.

Implemenation: Meet HBaseHUT

Suggested solution was implemented and open-sourced as HBaseHUT project. HBaseHUT will be covered in the follow up post shortly.

 

If you like this sort of stuff, we’re looking for Data Engineers!

About these ads

6 Responses to HBase Real-time Analytics & Rollbacks via Append-based Updates (Part 2)

  1. Nice articles.
    Would be interested in learning the differences, if any, between this approach and the one used by StumbleUpon in their OpenTSDB system (which also uses HBase to store time-series data in realtime).

    • sematext says:

      @Jeyendran – I’m sure Alex will reply here, but if you are going to HBaseCon in San Francisco this month, OpenTSDB will covered in Alex’s talk.

      • Alex Baranau says:

        Jeyendran,

        Thank you for the comment.

        I should have added this question to a FAQ.. Right, there will be one slide about how HBaseHUT compares to OpenTSDB in the presentation. In short:

        * In general, it is somewhat wrong to compare HBaseHUT and OpenTSDB:
        – OpenTSDB is a time-series database
        – HBaseHUT is a java library, to be used in your project
        * OpenTSDB uses “serve raw data” approach. Every value is stored “as is” and there’s no aggregated values (say, for bigger time periods). So, basically, there are *no updates*, just inserts.
        * HBaseHUT lib is used to *optimize update* operations. It is meant for “serve aggregated data” approach, but not limited to it. The lib can be used in any project to replace Get+Put like update operations.

        Please, let me know if this answers your Q.

        • Hi Alex,

          Thanks a lot for your reply. I totally agree that an HBase library which support many use cases is more useful to developers than an approach that is more like a purpose-built system on top of HBase. Also agree that OTSDB is an insert-only system, with no optimizations for aggregate-level level time periods like hours and days. They seem to do that at query-time, which is less efficient.

          They do carry out background compactions though, which is where I thought the approaches share some similarity. The goal of compaction in their case is a bit different – to optimize HBase storage space and a modest query execution time improvement. I am referring to this section at http://opentsdb.net/schema.html:
          ——–
          Reducing HBase overhead

          The problem with HBase’s implementation is that every single cell also stores the row key and a bunch of other redundant information. … This leads to several scalability problems, especially due to memory pressure inside Region Servers and the increased number of objects that HBase has to handle.

          The idea here is simply to re-compact rows in background so that all the values in a row would end up in one big cell, instead of being in many individual cells.
          ——–

          My congratulations to you on the project and for releasing it into open source.

          • Alex Baranau says:

            Right, OpenTSDB compactions are about optimizing storage, but not calculating aggregated values.

            While HBaseHUT is quite flexible when it comes to integrating it into your project to increase update throughput, I should note that OpenTSDB is perfect for its own use-case. It is highly optimized and extremely efficient.

            Currently storing data (and all optimizations related to that) in HBaseHUT is completely delegated to client code. This allows using arbitrary data (whereas e.g. OpenTSDB is meant to work with numeric data only), computing complex aggregates (e.g. bitmaps, etc.).

            There’s a plan to add basic types & aggregates that can be used out of the box (like min/max/percentile for numeric values, distinct counts for arbitrary values, time-based aggregation) and implementation of which can be optimized with regard to HBaseHUT (storage-wise, etc.). But this is likely to be implemented as separate “wrapper” project, and HBaseHUT will be left to serving its main purpose: implement “deferring updates processing” approach logic.

            Btw, feel free to ask any specific Qs on ML: https://groups.google.com/group/hbasehut/. Will be glad to assist you with integration of the HBaseHUT into your project. And, of course, feel free to participate in development & tests, pull requests are very welcome ;).

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

Join 1,564 other followers

%d bloggers like this: