HBase Real-time Analytics & Rollbacks via Append-based Updates (Part 2)

This is the second part of a 3-part post series in which we describe how we use HBase at Sematext for real-time analytics with an append-only updates approach.

In our previous post we explained the problem in detail with the help of example and touched on the suggested solution idea. In this post we will go through solution details as well as briefly introduce the open-sourced implementation of the described approach.

Suggested Solution

Suggested solution can be described as follows:

  1. replace update (Get+Put) operations at write time with simple append-only writes
  2. defer processing updates to periodic compaction jobs (not to be confused with minor/major HBase compaction)
  3. perform on the fly updates processing only if user asks for data earlier than updates compacted

Before (standard Get+Put updates approach):

The picture below shows an example of updating search query metrics that can be collected by Search Analytics system (something we do at Sematext).

  • each new piece of data (blue box) is processed individually
  • to apply update based on the new piece of data:
    • existing data (green box) is first read
    • data is changed and
    • written back

After (append-only updates approach):

1. Writing updates:

2. Periodic updates processing (compacting):

3. Performing on the fly updates processing (only if user asks for data earlier than updates compacted):

Note: the result of the processing updates on the fly can be optionally stored back right away, so that next time same data is requested no compaction is needed.

The idea is simple and not a new one, but given the specific qualities of HBase like fast range scans and high write throughput it works especially well with HBase. So, what we gain here is:

  • high update throughput
  • real-time updates visibility: despite deferring the actual updates processing, user always sees the latest data changes
  • efficient updates processing by replacing random Get+Put operations with processing sets of records at a time (during the fast scan) and eliminating redundant Get+Put attempts when writing the very first data item
  • better handling of update load peaks
  • ability to roll back any range of updates
  • avoid data inconsistency problems caused by tasks that fail after only partially updating data in HBase without doing rollback (when using with MapReduce, for example)
Let’s take a closer look at each of the above points.

High Update Throughput

Higher update throughput is achieved by not doing Get operation for every record update. Thus, Get+Put operations are replaced with Puts (which can be further optimized by using client-side buffer), which are really fast in HBase. The processing of updates (compaction) is still needed to perform the actual data merge, but it is done much more efficiently than doing Get+Put for each update operation (see below more details).

Real-time Updates Visibility

Users always see latest data changes. Even if updates have not yet been processed and still stored as a series of records, they will be merged on the fly.

By doing periodic merges of appended records we ensure data that has to be processed on the fly is small enough and does fast.

Efficient Updates Processing

  • N Get+Put operations at write time are replaced with N Puts + 1 Scan (shared) + 1 Put operations
  • Processing N changes at once is usually more effective than applying N individual changes

Let’s say we got 360 update requests for the same record: e.g. record keeps track of some sensor value for 1 hour interval and we collect data points every 10 seconds. These measurements needs to be merged in a single record that represents the whole 1-hour interval. Initially we would perform 360 Get+Put operations (while we can use some client-side buffer to perform partial aggregation and reduce the number of actual Get+Put operations, we want data to be sent immediately as it arrives instead of asking user to wait 10*N seconds). With append-only approach, we will perform 360 Put operations, 1 scan (which is actually meant to process not only these updates) that goes through 360 records (stored in sequence), it calculates resulting record, and performs 1 Put operation to store the result back. Fewer operations means using less resources and this leads to more efficient processing. Moreover, if the value which needs to be updated is a complex one (needs time to load in memory, etc.) it is much more efficient to apply all updates at once than one by one individually.

Deferring processing of updates is especially effective when large portion of operations is in essence insertion of new data, but not an update of stored data. In this case a lot of Get operations (checking if there’s something to update) are redundant.

Better Handling of Update Load Peaks

Deferring processing of updates helps handle load peaks without major performance degradation. The actual (periodic) processing of updates can be scheduled for off-peak time (e.g. nights or weekends).

Ability to Rollback Updates

Since updates don’t change the existing data (until they are processed) rolling back is easy.

Preserving rollback ability even after updates were compacted is also not hard. Updates can be grouped and compacted within time periods of given length as shown in the picture below. That means the client that reads data will still have to merge updates on-the-fly even right after compaction is finished. However using the proper configuration this isn’t going to be a problem as the number of records to be merged on the fly will be small enough.

Consider the following example where the goal is to keep all-time avg value for particular sensor. Let’s say data is collected every 10 seconds for 30 days, which gives 259200 separately written data points. While compacting on-the-fly this amount of values might be quite fast for a medium-large HBase cluster, performing periodic compaction will improve reading speed a lot. Let’s say we perform updates processing every 4 hours and use 1 hour interval as compacting base (as shown in the picture above). This gives us, at any point in time, less than 24*30 + 4*60*6 = 2,160 non-compacted records that need to be processed on-the-fly when fetching resulting record for 30 days. This is a small number of records and can be processed very fast. At the same time it is possible to perform rollback to any point in time with 1 hour granularity.

In case system should store more historical data, but we don’t care about rolling it back (if nothing wrong was found during 30 days the data is likely to be OK) then compaction can be configured to process all data older than 30 days as one group (i.e. merge into one record).

Automatic Handling of Task Failures which Write Data to HBase

Typical scenario: task updating HBase data fails in the middle of writing – some data was written, some not. Ideally we should be able to simply restart the same task (on the same input data) so that new one performs needed updates without corrupting data because some write operations were duplicate.

In the suggested solution every update is written as a new record. In order to make sure that performing the same (literally the same, not just similar) update operation multiple times does not result in multiple separate update operations, in which case data will be corrupted, every update operation should write to the record with the same row key from any task attempts. This results in overriding the same single record (if it was created by failed task) and avoiding doing the same update multiple times which in turn means that data is not corrupted.

This especially convenient in situations when we write to HBase table from MapReduce tasks, as MapReduce framework restarts failed tasks for you. With the given approach we can say that handling task failures happens automatically – no extra effort to manually roll back previous task changes and starting a new task is needed.

Cons

Below are the major drawbacks of the suggested solution. Usually there are ways to reduce their effect on your system depending on the specific case (e.g. by tuning HBase appropriately or by adjusting parameters involved in data compaction logic).

  • merging on the fly takes time. Properly configuring periodic updates processing is a key to keeping data fetching fast.
  • when performing compaction, scanning of many records that don’t need to be compacted can happen (already compacted or “alone-standing” records). Compaction can usually be performed only on data written after the previous compaction which allows to use efficient time-based filters to reduce the impact here.

Solving these issues may be implementation-specific. We’ll bring them up again when talking about our implementation in the follow up post.

Implemenation: Meet HBaseHUT

Suggested solution was implemented and open-sourced as HBaseHUT project. HBaseHUT will be covered in the follow up post shortly.

 

If you like this sort of stuff, we’re looking for Data Engineers!

HBase Real-time Analytics & Rollbacks via Append-based Updates

In this part 1 of a 3-part post series we’ll describe how we use HBase at Sematext for real-time analytics and how we can perform data rollbacks by using an append-only updates approach.

Some bits of this topic were already covered in Deferring Processing Updates to Increase HBase Write Performance and some were briefly presented at BerlinBuzzwords 2011 (video). We will also talk about some of the ideas below during HBaseCon-2012 in late May (see Real-time Analytics with HBase). The approach described in this post is used in our production systems (SPM & SA) and the implementation was open-sourced as HBaseHUT project.

Problem we are Solving

While HDFS & MapReduce are designed for massive batch processing and with the idea of data being immutable (write once, read many times), HBase includes support for additional operations such as real-time and random read/write/delete access to data records. HBase performs its basic job very well, but there are times when developers have to think at a higher level about how to utilize HBase capabilities for specific use-cases.  HBase is a great tool with good core functionality and implementation, but it does require one to do some thinking to ensure this core functionality is used properly and optimally. The use-case we’ll be working with in this post is a typical data analytics system where:

  • new data are continuously streaming in
  • data are processed and stored in HBase, usually as time-series data
  • processed data are served to users who can navigate through most recent data as well as dig deep into historical data

Although the above points frame the use-case relatively narrowly, the approach and its implementation that we’ll describe here are really more general and applicable to a number of other systems, too. The basic issues we want to solve are the following:

  • increase record update throughput. Ideally, despite high volume of incoming data changes can be applied in real-time . Usually. due to the limitations of the “normal  HBase update”, which requires Get+Put operations, updates are applied using batch-processing approach (e.g. as MapReduce jobs).  This, of course, is anything but real-time: incoming data is not immediately seen.  It is seen only after it has been processed.
  • ability to roll back changes in the served data. Human errors or any other issues should not permanently corrupt data that system serves.
  • ability to fetch data interactively (i.e. fast enough for inpatient humans).  When one  navigates through a small amount of recent data, as well as when selected time interval spans years, the retrieval should be fast.

Here is what we consider an “update”:

  • addition of a new record if no records with same key exists
  • update of an existing record with a particular key

Let’s take a look at the following example to better understand the problem we are solving.

Example Description

Briefly, here are the details of an example system:

  • System collects metrics from a large number of sensors (N) very frequently (each second) and displays them on chart(s) over time
  • User needs to be able to select small time intervals to display on a chart (e.g. several minutes) as well as very large spans (e.g. several years)
  • Ideally, data shown to user should be updated in real-time (i.e. user can see the most recent state of the sensors)

Note that even if some of the above points are not applicable to your system the ideas that follow may still be relevant and applicable.

Possible “direct” Implementation Steps

The following steps are by no means the only possible approach.

Step 1: Write every data point as a new record or new column(s) in some record in HBase. In other words, use a simple append-only approach. While this works well for displaying charts with data from short time intervals, showing a year (there are about 31,536,000 seconds in one year) worth of data may be too slow to call the experience “interactive”.

Step 2: Store extra records with aggregated data for larger time intervals (say 1 hour, so that 1 year = 8,760 data points). As new data comes in continuously and we want data to be seen in real-time, plus we cannot rely on data coming in a strict order, say because one sensor had network connectivity issues or we want to have ability to import historical data from a new data source, we have to use update operations on those records that hold data for longer intervals. This requires a lot of Get+Put operations to update aggregated records and this means degradation in performance — writing to HBase in this fashion will be significantly slower compared to using the append-only approach described in Step 1. This may slow writes so much that a system like this may not actually be able to keep up with the volume of the incoming data.  Not good.

Step 3: Compromise real-time data analysis and process data in small batches (near real-time). This will decrease the load on HBase as we can process (aggregate) data more efficiently in batches and can reduce the number of update (Get+Put) operations. But do we really want to compromise real-time analytics? No, of course not.  While it may seem OK in this specific example to show data for bigger intervals with some delay (near real-time), in real-world systems this usually affects other charts/reports, such as reports that need to show total, up to date figures. So no, we really don’t want to compromise real-time analytics if we don’t have to. In addition, imagine what happens if something goes wrong (e.g. wrong data was fed as input, or application aggregates data incorrectly due to a bug or human error).  If that happens we will not be able to easily roll back recently written data. Utilizing native HBase column versions may help in some cases, but in general, when we want greater control over rollback operation a better solution is needed.

Use Versions for Rolling Back?

Recent additions in managing cell versions make cell versioning even more powerful than before. Things like HBASE-4071 make it easy to store historical data without big data overhead by cleaning old data efficiently. While it seems obvious to use versions (native HBase feature) for allowing rolling back data changes, we cannot (and do not want to) rely heavily on cell versions here. The main reason for that is that it is just not very effective when dealing with lots of versions for a given cell. When update history for a record/cell becomes very long this requires many versions for a given cell. Versions are managed and navigated as a simple list in HBase (as opposed to using a Map-like structure that is used for records and columns) so managing long lists of versions is less efficient than having a bunch of separate records/columns. Besides, using versions will not help us with Get+Put situation and we are aiming to kill these two birds with one rock with the solution we are about to describe. One could try to use append-only updates approach described below and use cells versions as update log, but this would again bring us to managing long lists in a non-efficient way.

Suggested Solution

Given the example above, our suggested solution can be described as follows:

  • replace update (Get+Put) operations at write time with simple append-only writes and defer processing of updates to periodic jobs or perform aggregations on the fly if user asks for data earlier than individual additions are processed.

The idea is simple and not necessarily novel, but given the specific qualities of HBase, namely fast range scans and high write throughput, this approach works very well.  So well, in fact, that we’ve implemented it in HBaseHUT and have been using it with success in our production systems (SPM & SA).

So, what we gain here is:

  • high update throughput
  • real-time updates visibility: despite deferring the actual updates processing, user always sees the latest data changes
  • efficient updates processing by replacing random Get+Put operations with processing whole sets of records at a time (during the fast scan) and eliminating redundant Get+Put attempts when writing first data item
  • ability to roll back any range of updates
  • avoid data inconsistency problems caused by tasks that fail after only partially updating data in HBase without doing rollback (when using with MapReduce, for example)

In part 2 post we’ll dig into the details around each of the above points and we’ll talk more about HBaseHUT, which makes all of the above possible. If you like this sort of stuff, we’re looking for Data Engineers!

Data Engineer Position at Sematext International

If you’ve always wanted to work with Hadoop, HBase, Flume, and friends and build massively scalable, high-throughput distributed systems (like our Search Analytics and SPM), we have a Data Engineer position that is all about that!  If you are interested, please send your resume to jobs@sematext.com.

Responsibilities:

  • Versatile architect and developer – design and build large, high performance,scalable data processing systems using Hadoop, HBase, and other big data technologies
  • DevOps fan –  run and tune large data processing production clusters
  • Tool maker – develop ops and management tools 
  • Open source participant – keep up with development in areas of cloud and distributed computing, NoSQL, Big Data, Analytics, etc.

Pluses:

  • solid Math, Statistics, Machine Learning, or Data Mining is not required but is a big plus
  • experience with Analytics, OLAP, Data Warehouse or related technologies is a big plus
  • ability and desire to expand and lead a data engineering team
  • ability to think both business and engineering
  • ability to build products and services based on observed client needs
  • ability to present in public, at meetups, conferences, etc.
  • ability to contribute to blog.sematext.com
  • active participation in open-source communities
  • desire to share knowledge and teach
  • positive attitude, humor, agility, high integrity, and low ego, attention to detail

Location:

  • New York

We’re small and growing.  Our HQ is in Brooklyn, but our team is spread over 4 continents.  If you follow this blog you know we have deep expertise in search and big data analytics and that our team members are conference speakers, book authors, Apache members, open-source contributors, etc.

Relevant pointers:

Berlin Buzzwords 2012 – Three Talks from Sematext

Last year was our first time at Berlin Buzzwords.  We gave 1 full talk about Search Analytics (video) and 2 lightning talks (video, video).  We saw a number of good talks, too.  We also took part in a HBase Hackathon organized by Lars George in Groupon’s Berlin offices and even found time to go clubbing.  So in hopes of paying Berlin another visit this year, a few of us at Sematext (@sematext) submitted talk proposals.  Last week we all got acceptance emails, so this year there will be 3 talks from 3 Sematextans at Berlin Buzzwords!  Here is what we’ll be talking about:

RafałScaling Massive ElasticSearch Clusters

This talk describes how we’ve used ElasticSearch to build massive search clusters capable of indexing several thousand documents per second while at the same time serving a few hundred QPS over billions of documents in well under a second.  We’ll talk about building clusters that continuously grow in terms of both indexing and search rates. You will learn about finding cluster nodes that can handle more documents, about managing shard and replica allocation and prevention of unwanted shard rebalancing, about avoiding expensive distributed queries, etc.  We’ll also describe our experience doing performance testing of several ElasticSearch clusters and will share our observations about what settings affect search performance and how much.  In this talk you’ll also learn how to monitor large ElasticSearch clusters, what various metrics mean, and which ones to pay extra attention to.

AlexReal-time Analytics with HBase

HBase can store massive amounts of data and allow random access to it – great. MapReduce jobs can be used to perform data analytics on a large scale – great. MapReduce jobs are batch jobs – not so great if you are after Real-time Analytics. Meet append-only writes approach that allows going real-time where it wasn’t possible before.

In this talk we’ll explain how we implemented “update-less updates” (not a typo!) for HBase using append-only approach. This approach shines in situations where high data volume and velocity make random updates (aka Get+Put) prohibitively expensive.  Apart from making Real-time Analytics possible, we’ll show how the append-only approach to updates makes it possible to perform rollbacks of data changes, and avoid data inconsistency problems caused by tasks in MapReduce jobs that fail after only partially updating data in HBase.  The talk is based on Sematext’s success story of building a highly scalable, general purpose data aggregation framework which was used to build Search Analytics and Performance Monitoring services. Most of the generic code needed for append-only approach described in this talk is implemented in our HBaseHUT open-source project.

OtisLarge Scale ElasticSearch, Solr & HBase Performance Monitoring 

This talk has all the buzzwords covered: big data, search, analytics, realtime, large scale, multi-tenant, SaaS, cloud, performance… and here is why:

In this talk we’ll share the “behind the scenes” details about SPM for HBase, ElasticSearch, and Solr, a large scale, multi-tenant performance monitoring SaaS built on top of Hadoop and HBase running in the cloud.  We will describe all its backend components, from the agent used for performance metrics gathering, to how metrics get sent to SPM in the cloud, how they get aggregated and stored in HBase, how alerting is implemented and how it’s triggered, how we graph performance data, etc.  We’ll also point out the key metrics to watch for each system type.  We’ll go over various pain-points we’ve encountered while building and running SPM, how we’ve dealt with them, and we’ll discuss our plans for SPM in the future.

We hope to see some of you in Berlin.  If these topics are of interest to you, but you won’t be coming to Berlin, feel free to get in touch, leave comments, or ping @sematext.  And if you love working with things our talks are about, we are hiring world-wide!

HBaseWD: Avoid RegionServer Hotspotting Despite Sequential Keys

In HBase world, RegionServer hotspotting is a common problem.  We can describe this problem with a single sentence: while writing records with sequential row keys allows the most efficient reading of data range given the start and stop keys, it causes undesirable RegionServer hotspotting at write time. In this 2-part post series we’ll discuss the problem and show you how to avoid this infamous problem.

Problem Description

Records in HBase are sorted lexicographically by the row key. This allows fast access to an individual record by its key and fast fetching of a range of data given start and stop keys. There are common cases where you would think row keys forming a natural sequence at write time would be a good choice because of  types of queries that will fetch the data later. For example, we may want to associate each record with a timestamp so that later we can fetch records from a particular time range.  Examples of such keys are:

  • time-based format: Long.MAX_VALUE – new Date().getTime()
  • increasing/decreasing sequence: ”001”, ”002”, ”003”,… or ”499”, ”498”, ”497”, …

But writing records with such naive keys will cause hotspotting because of how HBase writes data to its Regions.

RegionServer Hotspotting

When records with sequential keys are being written to HBase all writes hit one Region.  This would not be a problem if a Region was served by multiple RegionServers, but that is not the case – each Region lives on just one RegionServer.  Each Region has a pre-defined maximal size, so after a Region reaches that size it is split in two smaller Regions.  Following that, one of these new Regions takes all new records and then this Region and the RegionServer that serves it becomes the new hotspot victim.  Obviously, this uneven write load distribution is highly undesirable because it limits the write throughput to the capacity of a single server instead of making use of multiple/all nodes in the HBase cluster. The uneven load distribution can be seen in Figure 1. (chart courtesy of SPM for HBase):

HBase RegionServer Hotspotting

Figure 1. HBase RegionServer hotspotting

We can see that while one server was sweating trying to keep up with writes, others were “resting”. You can find some more information about this problem in HBase Reference Guide.

Solution Approach

So how do we solve this problem?  The cases discussed here assume that we don’t have all data we want to write to HBase all at once, but rather that the data are arriving continuously. In case of bulk import of data into HBase the best solutions, including those that avoid hotspotting, are described in bulk load section of HBase documentation.  However, if you are like us at Sematext, and many organizations nowadays are, the data keeps streaming in and needs processing and storing. The simplest way to avoid single RegionServer hotspotting in case of continuously arriving data would be to simply distribute writes over multiple Regions by using random row keys. Unfortunately, this would compromise ability to do fast range scans using start and stop keys. But that is not the only solution.  The following simple approach solves the hotspotting issue while at the same time preserving the ability to fetch data by start and stop key.  This solution, mentioned multiple times on HBase mailing lists and elsewhere is to salt row keys with a prefix.  For example, consider constructing the row key using this:

new_row_key = (++index % BUCKETS_NUMBER) + original_key

For the visual types among us, that may result in keys looking as shown in Figure 2.

HBase Row Key Prefix Salting

Figure 2. HBase row key prefix salting

Here we have:

  • index is the numeric (or any sequential) part of the specific record/row ID that we later want to use for record fetching (e.g. 1, 2, 3 ….)
  • BUCKETS_NUMBER is the number of “buckets” we want our new row keys to be spread across. As records are written, each bucket preserves the sequential notion of original records’ IDs
  • original_key is the original key of the record we want to write
  • new_row_key is the actual key that will be used when writing a new record (i.e. “distributed key” or “prefixed key”). Later in the post the “distributed records” term is used for records which were written with this “distributed key”.

Thus, new records will be split into multiple buckets, each (hopefully) ending up in a different Region in the HBase cluster. New row keys of bucketed records will no longer be in one sequence, but records in each bucket will preserve their original sequence. Of course, if you start writing into an empty HTable, you’ll have to wait some time (depending on the volume and velocity of incoming data, compression, and maximal Region size) before you have several Regions for a table. Hint: use pre-splitting feature for the new table to avoid the wait time.  Once writes using the above approach kick in and start writing to multiple Regions your “slaves load” chart should look better.

HBase RegionServer evenly distributed write load

Figure 3. HBase RegionServer evenly distributed write load

Scan

Since data is placed in multiple buckets during writes, we have to read from all of those buckets when doing scans based on “original” start and stop keys and merge data so that it preserves the “sorted” attribute. That means BUCKETS_NUMBER more Scans and this can affect performance. Luckily, these scans can be run in parallel and performance should not degrade or might even improve — compare the situation when you read 100K sequential records from one Region (and thus one RegionServer) with reading 10K records from 10 Regions and 10 RegionServers in parallel!

Get/Delete

To get or delete a single record by original key may need to perform 1 or up to BUCKETS_NUMBER Get operations depending on the logic we used for prefix generation. E.g. when using “static” hash as prefix, given the original key we may precisely identify the prefixed key. In case we used random prefix we will have to perform Get for each of the possible buckets. The same goes for Delete operations.

MapReduce Input

Since we still want to benefit from data locality, the implementation of feeding “distributed” data to a MapReduce job will likely break the order in which data comes to mappers. This is at least true for the current HBaseWD implementation (see below). Each map task will process data for a particular bucket. Of course, records will be in same order based on original keys within a bucket. However, since two records which were meant to be “near each other” based on their original key may have fallen into different buckets, the will be fed to different map tasks. Thus, if the mapper assumes records come in the strict/original sequence, we will be hurt, since the order will be preserved only within each bucket, but not globally.

Increased Number of Map Tasks

When using data (written using the suggested approach) as a MapReduce input (with start and/or stop key provided) the splits number will likely to be increased (depends on the implementation). For current HBaseWD implementation you’ll get BUCKETS_NUMBER times more splits compared to “regular” MapReduce with same parameters.  This is due to the same logic for data selection as with simple Scan operation, as described above. As the result, MapReduce jobs will have BUCKETS_NUMBER times more map tasks. This should not decrease performance if BUCKETS_NUMBER is reasonably not too high (when MR job initialization & cleanup work takes more time than processing itself). Moreover, in many use-cases having many more mappers helps improve performance. Many users reported more mappers having a positive impact given that standard HTable input based MapReduce job usually has too few map tasks (one per Region) which cannot be changed without extra coding.

Another strong signal the suggested approach and its implementation could help is if in your application, in addition to writing records with sequential keys, the application also continuously processes newly written data delta using MapReduce . In such use-cases when data is written sequentially (not using any artificial distribution) and is being processed relatively frequently, the delta to be processed resides in just a few Regions (or perhaps in even just one Region if the write load is not high, if maximal Region size is high, and processing batches are very frequent).

Solution Implementation: HBaseWD

We implemented the solution described above and open-sourced it as a small HBaseWD project. We say small because HBaseWD is really self-contained and really simple to integrate into an existing code due to support for native HBase client API (see examples below). HBaseWD project was first presented at BerlinBuzzwords 2011(video) and is currently used in a number of production systems.

Configuring Distribution

Simple Even Distribution

Distributing records with sequential keys to be distributed in up to Byte.MAX_VALUE buckets (single byte is added in front of a key):

byte bucketsCount = (byte) 32; // distributing into 32 buckets
RowKeyDistributor keyDistributor =  new RowKeyDistributorByOneBytePrefix(bucketsCount);
Put put = new Put(keyDistributor.getDistributedKey(originalKey));
... // add values
hTable.put(put);

Hash-Based Distribution

Another useful RowKeyDistributor is RowKeyDistributorByHashPrefix. Please see example below. It creates the “distributed key” based on original key value so that later when you have original key and want to update the record you can calculate distributed key without having to call HBase (too see what bucket it is in). Or, you can perform a single Get operation when original key is known (instead of reading from all buckets).

AbstractRowKeyDistributor keyDistributor =
     new RowKeyDistributorByHashPrefix(
            new RowKeyDistributorByHashPrefix.OneByteSimpleHash(15));

You can use your own hashing logic here by implementing this simple interface:

public static interface Hasher extends Parametrizable {
  byte[] getHashPrefix(byte[] originalKey);
  byte[][] getAllPossiblePrefixes();
}

Custom Distribution Logic

HBaseWD is designed to be flexible especially when it comes to supporting custom row key distribution approaches. In addition to the above mentioned ability to implement custom hashing logic to be used with RowKeyDistributorByHashPrefix, one can define custom row key distribution logic by extending AbstractRowKeyDistributor abstract class whose interface is super simple:

public abstract class AbstractRowKeyDistributor implements Parametrizable {
  public abstract byte[] getDistributedKey(byte[] originalKey);
  public abstract byte[] getOriginalKey(byte[] adjustedKey);
  public abstract byte[][] getAllDistributedKeys(byte[] originalKey);
  ... // some utility methods
}

Common Operations

Scan

Performing a range scan over data:

Scan scan = new Scan(startKey, stopKey);
ResultScanner rs = DistributedScanner.create(hTable, scan, keyDistributor);
for (Result current : rs) {
  ...
}

Configuring MapReduce Job

Performing MapReduce job over the data chunk specified by Scan:

Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf, "testMapreduceJob");
Scan scan = new Scan(startKey, stopKey);
TableMapReduceUtil.initTableMapperJob("table", scan,
RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
// Substituting standard TableInputFormat which was set in
// TableMapReduceUtil.initTableMapperJob(...)
job.setInputFormatClass(WdTableInputFormat.class);
keyDistributor.addInfo(job.getConfiguration());

What’s Next?

In the next post we’ll cover:

  • Integration into already running production systems
  • Changing distribution logic in running systems
  • Other “advanced topics”
If you’ve read this far you must be interested in HBase.  And since we (@sematext) are interested in people with interest in HBase, here are some open positions at Sematext, some nice advantages we offer and “problems” we are into that you may want to check out.
Oh, and all HBase performance charts you see in this post are from our SPM service which uses HBase and, you guessed it, HBaseWD too! ;)

Sematext Presenting Real-time Analytics at HBaseCon 2012

HBaseCon 2012 is the first-ever HBase-focused conference happening this May in San Francisco.  I’m happy to say that Sematext will be there as both a sponsor (likely) and a presenter (definitely) along Facebook, Cloudera, Adobe, Tumblr (nice to see our clients there!) and others.  Alex will be presenting our work on HBaseHUT, one of  Sematext open-sourced projects that were spun out of our work on Scalable Performance Monitoring (for HBase, Solr, ElasticSearch, Sensei, etc.) and Search Analytics.

HBaseHUT makes real-time analytics with HBase possible today — pull requests welcome!

Search Analytics: Business Value & NoSQL Backend Presentation

Last week involved a few late nights for some of us at Sematext – we were busy readying our Search Analytics and Scalable Performance Monitoring services, as well as putting the final touches on the our Search Analytics: Business Value & NoSQL Backend presentation for Lucene Eurocon in Barcelona.

In the past we’ve given a few other public talks about Search Analytics and you can check them all out via http://blog.sematext.com/tag/analytics/.

Extending Hadoop Metrics

Here at Sematext we really like performance metrics and we like HBase.  We like them so much we’ve created a service for HBase Performance Monitoring (and for Solr, too).  In the process we’ve done some experiments with Hadoop and HBase around performance monitoring and are sharing our experience and some relevant code in this post.

The Hadoop metrics framework is simple to extend and customise. For example, you can very easily write a custom MetricsContext which sends metrics to your own storage solution.

All you need to do is extend the AbstractMetricsContext class and implement

protected void emitRecord(String context, String record, OutputRecord outputrecord)
  throws IOException;

To demonstrate, I wrote HBaseMetricsContext which stores Hadoop metrics in HBase. Since HBase itself uses the Hadoop metrics framework, you can use it to store its own metrics inside itself. Useful? Maybe. This is just an example after all.

If you’d like to try it out, get the source from GitHub. Then build the project using:

mvn package

Put the resulting Jar file in the HBase lib directory.

You will need to create a table with the relevant column families. We assume the column families are a composite of:

columnFamily = contextName + "." + recordName

In the HBase shell create your table:

create 'metrics', 'hbase.master', 'hbase.regionserver'

Edit your hadoop-metrics.properties file to include:

hbase.class=com.sematext.hadoop.metrics.HBaseMetricsContext
hbase.tableName=metrics
hbase.period=10

Restart HBase and it will start inserting to the metrics table every 10 seconds.

The row key of each record is made up of the timestamp and the tags (for disambiguation) like so:

rowKey = bytes(maxlong - timestamp) + bytes(tagName) + bytes(tagValue) + …

Subtracting the timestamp from maxlong ensures the scans get the most recent record first.

Each tag and metric is stored in it’s own column. This gives us a table that looks something like this:

hbase.master hbase.regionserver
cluster_requests hostName hostName flushQueueSize regions
rowKey2 rs1.example.org 0 1
rowKey1 101 master.example.org

For clarity timestamps are not included in the above table, as each cell is timestamped. All cells for a record will have the same timestamp.

Flume and HBase Integration

This quick How-To post will teach you how to use HBase sink(s) in Flume. There are currently two generic HBase sinks available: hbase() and attr2hbase(). The former requires one to be more explicit when providing mapping from event data to HBase record, while the latter allows adding record cells dynamically based on event data. Despite this difference, these two sinks have a lot in common.

Configuring Sink(s)

Both sinks write a single or no records into an HBase table based on a single Flume event. Both sinks have three attributes (at least) in common: table, writeBufferSize and writeToWal. These attributes control the HBase client behavior.

The attribute table is the name of the output HBase table. Yes, that means that currently one sink can be configured to output into just one table. If you desperately need to write to multiple tables you can use Flume’s native features to configure several sinks and direct each to the desired HBase table.

The writeBufferSize corresponds to the attribute of HTable with the same name and defines client-side buffer (in bytes). If writeBufferSize has a non-zero value, the HTable’s autoFlush is set to false. In this case the sink buffers data and sends them in chunks of the writeBufferSize size to the server.  This decreases the number of remote invocations and helps improve performance. By default the sink sends data on every received event. While it seems obvious that one would want to specify a bigger buffer to improve performance, determining the best buffer size depends on the use-case: no data will be sent to server before the buffer is full, which may break some “near real-time” models. The default writeBufferSize for HTable is 2 MB. Another important factor to consider is that the greater client-side buffer you have the more data you might loose in case of HBase failure [11]. Please find more details about this in Reliability Issues part of this post.

The writeToWal corresponds to the attribute of the HBase Put with the same name. If set to false the Put operations will not be written into HBase’s edit log. While this means fewer operations to perform and hence much better performance, this approach isn’t reliable: every non-persistent change (HBase likes to keep as much data as it can in memory to keep things fast) will be lost in case of HBase failure [1]. One would typically want to skip writing edits to WAL in case losing some portion of data is acceptable (e.g. when doing non mission-critical log analysis) or during a bulk import (it goes  much faster without writing to WAL) which can be restarted in case of failure.

HBase Sink: hbase()

As of this writing the hbase() sink has the following semantics:

hbase("table", "rowkey", "cf1", "c1", "val1"[,"cf2", "c2", "val2", ....]
 {, writeBufferSize=int, writeToWal=true|false})

As you can see, one is asked to provide the values to be placed into the HBase record explicitly. One has to use expressions in values of “rowkey”, “cf1″, “c1″, “val1″, etc. to make it usable. Expressions available with any event type are “%{hostname}” (or “%{host}”), “%{nanos}”, “%{priority}”, “%{body}” (and soon “%{timestamp}” [8]) which resolve to event properties. You can also fetch value of any event’s custom attribute with the help of “%{attribute_name}”. For example, with the following configuration an HBase table would be populated with records so that one record corresponds to one event with event’s nanos (event.getNanos()) as row key and two cells in event_colfam column family: body and host with values of event’s body and hostname respectively.

hbase("table", "%{nanos}", "event_colfam", "body", "%{body}",
      "event_colfam", "host", "%{host}")

HBase Sink: attr2hbase()

This sink was originally contributed to Flume by Sematext, as we needed it for our products that make use of Flume and HBase.

As of this writing the attr2hbase() has the following semantic:

attr2hbase("table"[,"sysFamily"[,"writeBody"[,"attrPrefix"[,"writeBufferSize"
 [,"writeToWal"]]]]])

The attr2hbase() sink is used to write event attributes that correspond to a particular format (name starts with a specified prefix) into an HBase record. The names of columns (qualifiers) and column families are determined dynamically based on event’s attribute names. Thus, compared to the hbase() sink, you don’t have to list all possible event attributes you want to store in HBase along with their destination column families and qualifiers (columns). Your source and/or decorators can produce any (reasonable) number of attributes, with dynamic names (e.g. depending on the values) and they will be written into HBase if attr2hbase() sink is configured correctly. In other words, with attr2hbase() one can a) define column names at run-time and b) add whatever number of columns the business logic requires (also at run-time, based on the data being processed). E.g. in case email messages are processed we can store each recipient in a separate cell in an HBase record. You may want to implement your own decorator for that (keep reading to learn how to do this) due to some limitations [9].

sysFamily holds the name of the column family that  is used to store “system” data (event timestamp, host, priority). In case this parameter is absent or equals “”, the sink doesn’t write “system” data. E.g. with configuration like this:

attr2hbase( "mytable", "sysColfam")

each record will have the following cells:

sysColFam:timestamp=<event’s timestamp>, sysColFam:host=<event’s host>, sysColFam:priority=<event’s prioirty>:

hbase(main):002:0> scan 'mytable'         
ROW    COLUMN+CELL                                                                                                              
 123   column=sysColfam:host, timestamp=1309275728725, value=testhost                                        
 123   column=sysColfam:priority, timestamp=1309275728725, value=INFO                                                           
 123   column=sysColfam:timestamp, timestamp=1309275728725, value=\x00\x00\x010\xD6\xEA+T

writeBody indicates whether event body should be written with other “system” data. By default, (when this parameter is absent or equals ””) the attribute body is not written. This parameter should have the “column-family:qualifier” format in order for the sink to write the body to the specific column-family:qualifier.

attrPrefix defines which attributes will be written to HBase: every attribute with the name prefixed with attrPrefix parameter’s value is written.  The attribute key should be in the following format to be properly written into HBase:

“<attrPrefix><colfam>:<qual>”

The default value of attrPrefix is “2hb_”.  This means that all attributes with names “2hb_<colfam>:<qual>” should be written to HBase. Attribute with key “<attrPrefix>” must contain row key for Put, otherwise, if no row can be extracted, the event is skipped and no record is written to the HBase table. The table below shows how event attributes are handled: each cell shows where the event attribute  (first column) will be written based on the sink parameters (second and other cells in table header).

Event’s attr (“name”->”value”) attrPrefix=”2hb_”, sysFamily=null attrPrefix=”2hb_”, sysFamily=”sysfam” attrPrefix=””, sysFamily=”sysfam” attrPrefix=””, sysFamily=null
“any”->”foo” - - sysfam:any->foo -
“colfam:col”->”foo” - - colfam:col->foo colfam:col->foo
“2hb_any”->”foo” - sysfam:any->foo sysfam:2hb_any->foo -
“2hb_colfam:col”->”foo” colfam:col->foo colfam:col->foo 2hb_colfam:col->foo 2hb_colfam:col->foo

Example

This sink is usually used with the decorators that perform light transformation of event data into attributes with specific names. We say “light transformation” here to avoid getting into the discussion about whether Flume is meant for ETL or just for reliable data delivery. You can use different standard Flume decorators [2], like “value()”, “select()”, “regex()”, “split()” and others. For example, the following setting will make Flume write a new record into HBase table for every line in a tailed file:

'tail("/tmp/some.log")'
'split( ":", 0 , "2hb_colfam:ts")
 split( ":", 1 ,"2hb_colfam:value")
 format("%{nanos}:") split(":", 0, "2hb_")
  attr2hbase( "mytable", "", "", "2hb_", "1000", "false" )'

NOTE: we have to use format() + split() decorators as workaround for FLUME-676 [3]: value() sink doesn’t support EL values. So with format() we put %{nanos} into the body and then from there we put it into attribute with name “2hb_”. The written records will have row key whose value will hold the nanos of the event timestamp (“2hb_” attribute value) and two values pased from log line. Here’s the example output:

$ echo "2238947398:56" >> /tmp/some.log
$ hbase shell
hbase(main):025:0> scan 'mytable'
ROW COLUMN+CELL
9656555664717551 column=colfam:ts, timestamp=1308748686334, value=2238947398
9656555664717551 column=colfam:value, timestamp=1308748686334, value=56

Custom Decorator

The attr2hbase sink is often used with custom decorator that “prepares” events so that they contain attributes ready to be written into an HBase table. Implementing a custom decorator is very simple. Example code:

public class CustomDecorator<S extends EventSink> extends
EventSinkDecorator<S> {
  public CustomDecorator(String param) {
    super(null);
    // TODO: do some initialization
  }

  public void append(Event e) throws IOException {
        // TODO: transform event e here
    super.append(e);
  }

  public static SinkFactory.SinkDecoBuilder builder() {
    return new SinkFactory.SinkDecoBuilder() {
      @Override
      public EventSinkDecorator<EventSink> build(Context context,
          String... argv) {
        if (argv.length != 1) {
          throw new IllegalArgumentException("
usage: CustomDecorator(\"param\")");
        }
                return new CustomDecorator<EventSink>(argv[0]);
      }
    };

  }

  public static List<Pair<String, SinkFactory.SinkDecoBuilder>>
getDecoratorBuilders() {
    return Arrays.asList(new Pair<String,
SinkFactory.SinkDecoBuilder>("CustomDec", builder()));
  }

}

To make your custom decorator visible to Flume you need to register it in flume-site.xml:

<configuration>
<property>
<name>flume.plugin.classes</name>
 <value>your.own.CustomDecorator</value>
</property>
...
</configuration>

Reliability Issues

There are certain reliability details you will want to think about when using HBase sinks.

Temporary HBase Connection Loss Requires Sink Restart

If default HBase configuration is used, when connection from sink node to HBase cluster breaks for several minutes (e.g. network problems or cluster maintenance downtime) the sink stops working. Unfortunately, it does not recover after cluster becomes accessible again. To fix this behavior you can adjust configuration properties “hbase.client.pause” and “hbase.client.retries.number” in hbase-site.xml which should be in Flume’s classpath. The default behavior (i.e. the issue) is going to be fixed in a future release [10].

Loss of Data Buffered on Client-side

Despite using reliable data delivery approaches (like agentE2ESink), there is a loss of data possible if client-side buffer is used (i.e. if writeBufferSize > 0) when failure happens during the buffer flushing [11]. This occurs because the ACK about data being received is sent before the actual write to an HBase table is done – writes are performed only on buffer flush, not on per-event basis. Thus, in case of a write failure during flushing data from the client-side will be lost as ACKs will fool Flume into thinking data have already been persisted in HBase.

Other Considerations

Row Key

As usually, one of the most important design decisions when using HBase is around row key format/values. In initial versions of HBase sinks the default row key was %{nanos}, as it seems to be unique and records seems to be ordered by arrival time. However, we suggest you consider using a different row key value/format, and here’s why. Usually when users import data into HBase they use either some kind of UUID or a timestamp (inverted) based approach. The former helps achieve better performance by distributing write load between multiple regions. The latter is good when one needs fast scans of imported data based on time ranges and when import load is bearable. However, when we use nanos, which are currently obtained from System.nanoTime() Java call, we don’t achieve any of these advantages: keys from each event-producer go in sequence, so all write load is distributed over just a few regions and region servers at a time [12], and System.nanoTime() is not really suitable for use in scans for fetching data from a time interval. Moreover, we cannot even rely on records being written from different Flume nodes being ordered by time of arrival, as nanos are not necessarily in sync across JVMs. Thus, it may make more sense to use pure random row keys (although Flume has no %{random} or anything similar, as far as we know) or a timestamp (inverted) [8] (along with [12] if that is possible). There might be more work needed with timestamp-based approach: there is a possibility to get events with the exact same timestamp during high load. Thus, adding an extra hash would help here (or even using “%{timstamp}%{nanos}”).

Setup Instructions

The easiest way to install Flume is to use CDH3 [4]. Then you need to add flume-plugin-hbasesink jar into flume lib dir. You can compile it from Flume sources [5] or download a compiled jar [6]. You also need to add HBase jar into Flume’s lib dir. It can be downloaded from Maven repositories [7] or copied from your HBase setup dir (so that the jar used on client-side by Flume sink is the same as the one on the server-side, which is important). The last step is to add plugin(s) to Flume’s configuration file (flume-site.xml):

<configuration>
<property>
<name>flume.plugin.classes</name>
 <value>com.cloudera.flume.hbase.HBaseSink,com.cloudera.flume.hbase.Attr2HBaseEventSink</value>
</property>
...
</configuration>

As both HBase and Flume use Zookeeper, it makes sense to share the Zookeeper quorum, too.

If you read this far, you may want to know that we are hiring and are happy users of and contributors to Flume, HBase, and a few other projects.

[1] http://www.larsgeorge.com/2010/01/hbase-architecture-101-write-ahead-log.html

[2] http://archive.cloudera.com/cdh/3/flume/UserGuide/index.html#_flume_sink_decorator_catalog

[3] https://issues.cloudera.org/browse/FLUME-676

[4] https://ccp.cloudera.com/display/CDHDOC/Flume+Installation

[5] https://github.com/abaranau/flume-plugin-hbasesink-compiled

[6] https://github.com/cloudera/flume

[7] e.g. https://repository.cloudera.com/content/groups/public/org/apache/hbase/hbase/

[8] https://issues.cloudera.org/browse/FLUME-688

[9] https://issues.cloudera.org/browse/FLUME-689

[10] https://issues.cloudera.org/browse/FLUME-685

[11] https://issues.cloudera.org/browse/FLUME-390

[12] https://github.com/sematext/HBaseWD

Opening: HBase and Lucene / Solr / Elastic Search Developer

We are once again looking for smart people.  This time we are looking to hire a person who likes working with HBase and Lucene (or Solr or ElasticSearch).  This particular combination is important to us because the very first target for this person might be the integration of HBase and Lucene / Solr / ElasticSearch.  More specifically, we have our eyes on HBASE-3529, which we’ve closely examined during a recent HBase Hackathon that took place after BerlinBuzzwords.  Of course, we are also open to alternative approaches if the one takes in HBASE-3529 turns out to be problematic.  The work around the marriage of HBase and full-text search is to be done “in the open”, meaning in collaboration with HBase as well as Lucene, Solr, or Elastic Search developers, which makes this project that much more exciting.

Beyond HBase and search integration, we do other interesting stuff with HBase (and Flume and MapReduce and …), so this person would get to work on our Search Analytics and Scalable Performance Monitoring services.

Interested?  Please get in touch and see what else we like on our jobs page.

Follow

Get every new post delivered to your Inbox.

Join 1,637 other followers