Hadoop 1.0.0 – Extra Notes

The big Hadoop 1.0.0 release has arrived.  The general notes about releases from the dev team include:

  • security
  • Better support for HBase (append/hsynch/hflush, and security)
  • webhdfs (with full support for security)
  • performance enhanced access to local files for HBase
  • other performance enhancements, bug fixes, and features

You can also find the complete release notes here and see all fixes, improvements and new features included in the release. To save you time, please find below additional information about some of the items that attracted our attention from the Hadoop 1.0.0 release.

Cluster Management Optimizations

HADOOP-7728hadoop-setup-conf.sh should be modified to enable task memory manager
Adds additional options to manage memory usage by MR tasks. In particular, this allows to set max memory usage for map and reduce tasks (separately).

Performance Improvements

HDFS-2246hadoop-setup-conf.sh should be modified to enable task memory manager
This is a short-term solution for the issue HDFS-347 “DFS read performance suboptimal when client co-located on nodes with data” which is quite hot in Hadoop dev community nowadays. NOTE: by default this optimization is switched off (or is it? Update: it is not, see the comments) so some config adjustments are required to benefit from it. And you will definitely want to benefit from it: some reported two times I/O performance improvements. Also highly recommended for HBase users.

HDFS-895Allow hflush/sync to occur in parallel with new writes to the file
Previously if a hflush/sync were in progress, an application could not write data to the HDFS client buffer. Again we stress out this improvement for HBase users as this increases the write throughput of the transaction log in HBase.

MAPREDUCE-2494Make the distributed cache delete entires using LRU priority
When certain threshold was reached and distributed cache was being purged, previous implementation deleted all entries that were not currently being used. With new code more hot data can be left in the cache (the percentage is configurable) and thus decrease cache misses.

New Features

HDFS-2316 - [umbrella] WebHDFS: a complete FileSystem implementation for accessing HDFS over HTTP
Allows accessing HDFS over HTTP (read & write)

MAPREDUCE-3169Create a new MiniMRCluster equivalent which only provides client APIs cross MR1 and MR2
Cleaner MR1 & MR2 compatible API for mini MR cluster to be used in unit-tests.

HADOOP-7710Create a script to setup application in order to create root directories for application such hbase, hcat, hive etc
Similar to hadoop-setup-user script, a hadoop-setup-applications script was added to set up root directories for apps to write to (/hbase, /hive, etc.)

Enjoy Hadoop 1.0.0 and we hope you found this quick summary useful!

@sematext

Flume and HBase Integration

This quick How-To post will teach you how to use HBase sink(s) in Flume. There are currently two generic HBase sinks available: hbase() and attr2hbase(). The former requires one to be more explicit when providing mapping from event data to HBase record, while the latter allows adding record cells dynamically based on event data. Despite this difference, these two sinks have a lot in common.

Configuring Sink(s)

Both sinks write a single or no records into an HBase table based on a single Flume event. Both sinks have three attributes (at least) in common: table, writeBufferSize and writeToWal. These attributes control the HBase client behavior.

The attribute table is the name of the output HBase table. Yes, that means that currently one sink can be configured to output into just one table. If you desperately need to write to multiple tables you can use Flume’s native features to configure several sinks and direct each to the desired HBase table.

The writeBufferSize corresponds to the attribute of HTable with the same name and defines client-side buffer (in bytes). If writeBufferSize has a non-zero value, the HTable’s autoFlush is set to false. In this case the sink buffers data and sends them in chunks of the writeBufferSize size to the server.  This decreases the number of remote invocations and helps improve performance. By default the sink sends data on every received event. While it seems obvious that one would want to specify a bigger buffer to improve performance, determining the best buffer size depends on the use-case: no data will be sent to server before the buffer is full, which may break some “near real-time” models. The default writeBufferSize for HTable is 2 MB. Another important factor to consider is that the greater client-side buffer you have the more data you might loose in case of HBase failure [11]. Please find more details about this in Reliability Issues part of this post.

The writeToWal corresponds to the attribute of the HBase Put with the same name. If set to false the Put operations will not be written into HBase’s edit log. While this means fewer operations to perform and hence much better performance, this approach isn’t reliable: every non-persistent change (HBase likes to keep as much data as it can in memory to keep things fast) will be lost in case of HBase failure [1]. One would typically want to skip writing edits to WAL in case losing some portion of data is acceptable (e.g. when doing non mission-critical log analysis) or during a bulk import (it goes  much faster without writing to WAL) which can be restarted in case of failure.

HBase Sink: hbase()

As of this writing the hbase() sink has the following semantics:

hbase("table", "rowkey", "cf1", "c1", "val1"[,"cf2", "c2", "val2", ....]
 {, writeBufferSize=int, writeToWal=true|false})

As you can see, one is asked to provide the values to be placed into the HBase record explicitly. One has to use expressions in values of “rowkey”, “cf1″, “c1″, “val1″, etc. to make it usable. Expressions available with any event type are “%{hostname}” (or “%{host}”), “%{nanos}”, “%{priority}”, “%{body}” (and soon “%{timestamp}” [8]) which resolve to event properties. You can also fetch value of any event’s custom attribute with the help of “%{attribute_name}”. For example, with the following configuration an HBase table would be populated with records so that one record corresponds to one event with event’s nanos (event.getNanos()) as row key and two cells in event_colfam column family: body and host with values of event’s body and hostname respectively.

hbase("table", "%{nanos}", "event_colfam", "body", "%{body}",
      "event_colfam", "host", "%{host}")

HBase Sink: attr2hbase()

This sink was originally contributed to Flume by Sematext, as we needed it for our products that make use of Flume and HBase.

As of this writing the attr2hbase() has the following semantic:

attr2hbase("table"[,"sysFamily"[,"writeBody"[,"attrPrefix"[,"writeBufferSize"
 [,"writeToWal"]]]]])

The attr2hbase() sink is used to write event attributes that correspond to a particular format (name starts with a specified prefix) into an HBase record. The names of columns (qualifiers) and column families are determined dynamically based on event’s attribute names. Thus, compared to the hbase() sink, you don’t have to list all possible event attributes you want to store in HBase along with their destination column families and qualifiers (columns). Your source and/or decorators can produce any (reasonable) number of attributes, with dynamic names (e.g. depending on the values) and they will be written into HBase if attr2hbase() sink is configured correctly. In other words, with attr2hbase() one can a) define column names at run-time and b) add whatever number of columns the business logic requires (also at run-time, based on the data being processed). E.g. in case email messages are processed we can store each recipient in a separate cell in an HBase record. You may want to implement your own decorator for that (keep reading to learn how to do this) due to some limitations [9].

sysFamily holds the name of the column family that  is used to store “system” data (event timestamp, host, priority). In case this parameter is absent or equals “”, the sink doesn’t write “system” data. E.g. with configuration like this:

attr2hbase( "mytable", "sysColfam")

each record will have the following cells:

sysColFam:timestamp=<event’s timestamp>, sysColFam:host=<event’s host>, sysColFam:priority=<event’s prioirty>:

hbase(main):002:0> scan 'mytable'         
ROW    COLUMN+CELL                                                                                                              
 123   column=sysColfam:host, timestamp=1309275728725, value=testhost                                        
 123   column=sysColfam:priority, timestamp=1309275728725, value=INFO                                                           
 123   column=sysColfam:timestamp, timestamp=1309275728725, value=\x00\x00\x010\xD6\xEA+T

writeBody indicates whether event body should be written with other “system” data. By default, (when this parameter is absent or equals ””) the attribute body is not written. This parameter should have the “column-family:qualifier” format in order for the sink to write the body to the specific column-family:qualifier.

attrPrefix defines which attributes will be written to HBase: every attribute with the name prefixed with attrPrefix parameter’s value is written.  The attribute key should be in the following format to be properly written into HBase:

“<attrPrefix><colfam>:<qual>”

The default value of attrPrefix is “2hb_”.  This means that all attributes with names “2hb_<colfam>:<qual>” should be written to HBase. Attribute with key “<attrPrefix>” must contain row key for Put, otherwise, if no row can be extracted, the event is skipped and no record is written to the HBase table. The table below shows how event attributes are handled: each cell shows where the event attribute  (first column) will be written based on the sink parameters (second and other cells in table header).

Event’s attr (“name”->”value”) attrPrefix=”2hb_”, sysFamily=null attrPrefix=”2hb_”, sysFamily=”sysfam” attrPrefix=””, sysFamily=”sysfam” attrPrefix=””, sysFamily=null
“any”->”foo” - - sysfam:any->foo -
“colfam:col”->”foo” - - colfam:col->foo colfam:col->foo
“2hb_any”->”foo” - sysfam:any->foo sysfam:2hb_any->foo -
“2hb_colfam:col”->”foo” colfam:col->foo colfam:col->foo 2hb_colfam:col->foo 2hb_colfam:col->foo

Example

This sink is usually used with the decorators that perform light transformation of event data into attributes with specific names. We say “light transformation” here to avoid getting into the discussion about whether Flume is meant for ETL or just for reliable data delivery. You can use different standard Flume decorators [2], like “value()”, “select()”, “regex()”, “split()” and others. For example, the following setting will make Flume write a new record into HBase table for every line in a tailed file:

'tail("/tmp/some.log")'
'split( ":", 0 , "2hb_colfam:ts")
 split( ":", 1 ,"2hb_colfam:value")
 format("%{nanos}:") split(":", 0, "2hb_")
  attr2hbase( "mytable", "", "", "2hb_", "1000", "false" )'

NOTE: we have to use format() + split() decorators as workaround for FLUME-676 [3]: value() sink doesn’t support EL values. So with format() we put %{nanos} into the body and then from there we put it into attribute with name “2hb_”. The written records will have row key whose value will hold the nanos of the event timestamp (“2hb_” attribute value) and two values pased from log line. Here’s the example output:

$ echo "2238947398:56" >> /tmp/some.log
$ hbase shell
hbase(main):025:0> scan 'mytable'
ROW COLUMN+CELL
9656555664717551 column=colfam:ts, timestamp=1308748686334, value=2238947398
9656555664717551 column=colfam:value, timestamp=1308748686334, value=56

Custom Decorator

The attr2hbase sink is often used with custom decorator that “prepares” events so that they contain attributes ready to be written into an HBase table. Implementing a custom decorator is very simple. Example code:

public class CustomDecorator<S extends EventSink> extends
EventSinkDecorator<S> {
  public CustomDecorator(String param) {
    super(null);
    // TODO: do some initialization
  }

  public void append(Event e) throws IOException {
        // TODO: transform event e here
    super.append(e);
  }

  public static SinkFactory.SinkDecoBuilder builder() {
    return new SinkFactory.SinkDecoBuilder() {
      @Override
      public EventSinkDecorator<EventSink> build(Context context,
          String... argv) {
        if (argv.length != 1) {
          throw new IllegalArgumentException("
usage: CustomDecorator(\"param\")");
        }
                return new CustomDecorator<EventSink>(argv[0]);
      }
    };

  }

  public static List<Pair<String, SinkFactory.SinkDecoBuilder>>
getDecoratorBuilders() {
    return Arrays.asList(new Pair<String,
SinkFactory.SinkDecoBuilder>("CustomDec", builder()));
  }

}

To make your custom decorator visible to Flume you need to register it in flume-site.xml:

<configuration>
<property>
<name>flume.plugin.classes</name>
 <value>your.own.CustomDecorator</value>
</property>
...
</configuration>

Reliability Issues

There are certain reliability details you will want to think about when using HBase sinks.

Temporary HBase Connection Loss Requires Sink Restart

If default HBase configuration is used, when connection from sink node to HBase cluster breaks for several minutes (e.g. network problems or cluster maintenance downtime) the sink stops working. Unfortunately, it does not recover after cluster becomes accessible again. To fix this behavior you can adjust configuration properties “hbase.client.pause” and “hbase.client.retries.number” in hbase-site.xml which should be in Flume’s classpath. The default behavior (i.e. the issue) is going to be fixed in a future release [10].

Loss of Data Buffered on Client-side

Despite using reliable data delivery approaches (like agentE2ESink), there is a loss of data possible if client-side buffer is used (i.e. if writeBufferSize > 0) when failure happens during the buffer flushing [11]. This occurs because the ACK about data being received is sent before the actual write to an HBase table is done – writes are performed only on buffer flush, not on per-event basis. Thus, in case of a write failure during flushing data from the client-side will be lost as ACKs will fool Flume into thinking data have already been persisted in HBase.

Other Considerations

Row Key

As usually, one of the most important design decisions when using HBase is around row key format/values. In initial versions of HBase sinks the default row key was %{nanos}, as it seems to be unique and records seems to be ordered by arrival time. However, we suggest you consider using a different row key value/format, and here’s why. Usually when users import data into HBase they use either some kind of UUID or a timestamp (inverted) based approach. The former helps achieve better performance by distributing write load between multiple regions. The latter is good when one needs fast scans of imported data based on time ranges and when import load is bearable. However, when we use nanos, which are currently obtained from System.nanoTime() Java call, we don’t achieve any of these advantages: keys from each event-producer go in sequence, so all write load is distributed over just a few regions and region servers at a time [12], and System.nanoTime() is not really suitable for use in scans for fetching data from a time interval. Moreover, we cannot even rely on records being written from different Flume nodes being ordered by time of arrival, as nanos are not necessarily in sync across JVMs. Thus, it may make more sense to use pure random row keys (although Flume has no %{random} or anything similar, as far as we know) or a timestamp (inverted) [8] (along with [12] if that is possible). There might be more work needed with timestamp-based approach: there is a possibility to get events with the exact same timestamp during high load. Thus, adding an extra hash would help here (or even using “%{timstamp}%{nanos}”).

Setup Instructions

The easiest way to install Flume is to use CDH3 [4]. Then you need to add flume-plugin-hbasesink jar into flume lib dir. You can compile it from Flume sources [5] or download a compiled jar [6]. You also need to add HBase jar into Flume’s lib dir. It can be downloaded from Maven repositories [7] or copied from your HBase setup dir (so that the jar used on client-side by Flume sink is the same as the one on the server-side, which is important). The last step is to add plugin(s) to Flume’s configuration file (flume-site.xml):

<configuration>
<property>
<name>flume.plugin.classes</name>
 <value>com.cloudera.flume.hbase.HBaseSink,com.cloudera.flume.hbase.Attr2HBaseEventSink</value>
</property>
...
</configuration>

As both HBase and Flume use Zookeeper, it makes sense to share the Zookeeper quorum, too.

If you read this far, you may want to know that we are hiring and are happy users of and contributors to Flume, HBase, and a few other projects.

[1] http://www.larsgeorge.com/2010/01/hbase-architecture-101-write-ahead-log.html

[2] http://archive.cloudera.com/cdh/3/flume/UserGuide/index.html#_flume_sink_decorator_catalog

[3] https://issues.cloudera.org/browse/FLUME-676

[4] https://ccp.cloudera.com/display/CDHDOC/Flume+Installation

[5] https://github.com/abaranau/flume-plugin-hbasesink-compiled

[6] https://github.com/cloudera/flume

[7] e.g. https://repository.cloudera.com/content/groups/public/org/apache/hbase/hbase/

[8] https://issues.cloudera.org/browse/FLUME-688

[9] https://issues.cloudera.org/browse/FLUME-689

[10] https://issues.cloudera.org/browse/FLUME-685

[11] https://issues.cloudera.org/browse/FLUME-390

[12] https://github.com/sematext/HBaseWD

Deferring Processing Updates to Increase HBase Write Performance

In this post we’ll examine, via an example, how deferring the processing of updates can increase write throughput of a data store. In the end we’ll introduce our recently open-sourced HBaseHUT, a tool for automating of deferred updates in HBase.

Please note, that “deferred updates” here does not mean that updates are not “visible” immediately after a write operation. The system still serves add/update/delete and read operations in real-time.

Plug: if this sort of stuff interests you, we are hiring people who know about Hadoop, HBase, MapReduce…

Constraints & Assumptions

The decisions and ideas in the post are based on facts about HBase, but can be applied to many other data stores with similar characteristics:

  • Fetching a set of records as a range (scan operation) is much cheaper than fetching the same records one-by-one
  • Adding new records (simple write operation) is very fast
  • Storage space is cheap
  • “In-place” updates of records are not generally feasible

Well, the last assumption is a bit tricky: HBase provides incrementColumnValue() operation but it is very limited: one can only increment a specified cell that contains long value by some delta. In future versions HBase will allow incrementing of multiple cells at once (already in HBase trunk). However, not all update operations are that simple, as we’ll see in the example. Moreover, incrementing cells in a record is slower than writing a new record (we’ll use this fact later in the post).

Idea Explained

The idea behind deferred updates processing is to postpone updating of the existing record and store incoming deltas as a new record. Thus, record update operations become a simple write operations with corresponding performance. Deferred updates technique elaborated here fits well when system handles a lot of updates of stored data and write performance is the main concern, while reading speed requirements are not that strict. The following cases (each of them separately or any combination of them) may indicate  that one can benefit from using the technique (the list is not complete):

  • updates are well spread over the whole and large dataset
  • lower rate (among write operations) of “true updates” (i.e. low percentage of writes are for completely new data, not really updates of existing data)
  • good portion of data stored/updated may never be accessed
  • system should be able to handle high write peaks without major performance degradation

Next, let’s look at an example use-case to illustrate the idea better.

Example Use-Case

Imagine a simple presentation slides sharing system that gives one the ability to browse presentations on-line (not as downloadable PDF or whatever format files, since that would not be interesting for us here). Suppose we want to provide the presentation’s author with details on user behaviour when a user navigates through slides so that the author can use this data to infer the quality of the presentation (hey, slide sharing systems maintainers: free hint!). Let’s limit statistics data we want to share to simple metrics for the sake of keeping the example simple. Here’s what we can provide:

  • number of times a particular slide was viewed
  • avg/min/max time spent by user on a particular slide

Using these stats the presentation author can find out e.g.:

  • what are the most difficult to understand slides (they probably need to be divided into multiple ones, or better preparation before showing them should be done by giving more/better info on previous slides)
  • what are the most trivial slides (they can be merged/removed)
  • what are the typical users (e.g., If users spend a lot of time thinking on the slides the author can add more details in the presentation.  On the other hand, if users usually quickly walk through it then some very deep details can be removed)

Such a system could provide more stats to authors so that they can work on other sorts of presentation improvements.  For example, the system could provide information about user navigation actions (e.g., if users often go back and forth between two slides then probably the schemes/diagrams on them are better understandable if they are on the same slide and can be easily compared), but that’s not the focus of this post.

To track previously mentioned stats data we need to capture how much time a user spent on a particular slide and send this data to our data store. Let’s assume our records store the following info: {(key)presentationId_slideId, number of views, total viewing time, max viewing time, min viewing time}. Using this data we can provide useful information to author as needed.

Every time user leaves the slide, tracking logic sends the next data to the data store: {(key)presentationId_slideId, time spent on viewing}.

Direct Solution

The straight forward approach would be to update the presentationId_slideId record (fetch, modify and store updated data) as new data comes in. Here are some drawbacks of this naive approach:

  • for each update operation we have to perform fetch and write operations (Get and Put in case of HBase)
  • Most authors won’t really care about all these stats and majority of them will never look at it, so many processing operations can simply be omitted (until there’s interest in stats)
  • for first ever viewing of a slide a redundant fetch operation is performed (data is not there yet), which is OK if slides are viewed many times but in situation when many presentations are viewed just one time (e.g. by their creators) this may result in a bigger portion of redundant operations (this assumption is sort of made up, yes)

The very first point is a major one and is the obvious write performance killer.  This point is really not made up – we have first hand experience with that particular situation.

Deferring Updates Processing

One alternative solution is to defer updates processing to the time when data is requested by presentation author or to the time when the scheduled/periodic updates processing is performed (ideally when data store write load is minimal). In this deferred updates approach all new deltas (new user action data) are written as new records.

When an author requests the stats, efficient updates processing takes place. This time it is done in much more efficient way (compared to direct approach): by using range scan operations, and not by individual record read/writes. Given that presentation has usually tens of slides and is viewed by hundreds or thousands people such range scans are so fast the user will barely notice them. Also, it should be quite OK to ask a person to wait several seconds during his first view of stats page (processed updates are written back into the data store, so next time user fetches data it is available right away).

To prevent too long updates processing on user request the data can be processed periodically instead of at request time. Given the fact that (updated) statistics data is available to user immediately, the processing can be postponed for a long time without affecting the “real-time” visibility aspect of the system. The processing of updates can be scheduled to take place during off-peak hours, when they won’t compromise write performance. By doing this we add a “cushion” since from this point on the maximum amount of data that needs to be processed “on-the-fly” is one day ‘s worth of updates (assuming daily updates processing) , which is sufficient for the majority of cases (e.g. estimated max presentation views per day is tens of thousands which is a very quick scan operation).

The remainder of this post provides details of our implementation of this approach.

HBaseHUT

HBaseHUT is a tool that automates deferred processing of updates for HBase. It hides many details of updates processing from the client code, thus making it an easy to use solution. There are Put and ResultScanner wrappers that encapsulate all needed logic.

To write new data you need to use HutPut implementation of Put. Since it implements the standard HBase Put only the instance creation code needs to be changed:

public class SlideStatsTracker {
  byte[] SLIDE_CF = Bytes.toBytes("slide");
  byte[] VIEWS_NUM_C = Bytes.toBytes("views");
  byte[] TOTAL_VTIME_C = Bytes.toBytes("total_time");
  byte[] MIN_VTIME_C = Bytes.toBytes("min_time");
  byte[] MAX_VTIME_C = Bytes.toBytes("max_time");
  ...
  void trackSlideView(byte[] presentationId, byte[] slideId, long timeSpent)
                               throws InterruptedException, IOException {
    Put put = new HutPut(Bytes.add(presentationId , slideId));
    put.add(SLIDE_CF, VIEWS_NUM_C, Bytes.toBytes(1));
    put.add(SLIDE_CF, TOTAL_VTIME_C, Bytes.toBytes(timeSpent));
    hTable.put(put);
  }
  …
}

Fetching stats is performed using HutResultScanner which implements ResultScanner and hence can also be very easily integrated into the code that uses normal HBase API:

  Scan scan = new Scan(presentationId);
  ResultScanner resultScanner =
      new HutResultScanner(hTable.getScanner(scan), updateProcessor);
  Result result = resultScanner.next();
  while (result != null) {
    // fetch data from result object
    result = resultScanner.next();
  }

The updateProcessor passed as parameter encapsulates update operation logic:

  class SlideStatsUpdateProcessor implements UpdateProcessor {
    @Override
    public void process(Iterable records, UpdateProcessingResult processingResult) {
      int viewsNumber = 0;
      long totalViewTime = 0;
      long minViewTime = Long.MAX_VALUE;
      long maxViewTime = Long.MIN_VALUE;
      // Processing records
      for (Result record : records) {
        for (int i = 0; i < 5; i++) {
          int views = Bytes.toInt(record.getValue(SLIDE_CF, VIEWS_NUM_C));
          long totalTime = Bytes.toLong(record.getValue(SLIDE_CF, TOTAL_VTIME_C));
          long minTime = Bytes.toLong(record.getValue(SLIDE_CF, MIN_VTIME_C));
          long maxTime = Bytes.toLong(record.getValue(SLIDE_CF, MAX_VTIME_C));

          viewsNumber += views;
          totalViewTime += totalTime;
          minViewTime = minTime  maxViewTime ? maxTime : maxViewTime;
        }
      }

      processingResult.add(SLIDE_CF, VIEWS_NUM_C, Bytes.toBytes(viewsNumber));
      processingResult.add(SLIDE_CF, TOTAL_VTIME_C, Bytes.toBytes(totalViewTime));
      processingResult.add(SLIDE_CF, MIN_VTIME_C, Bytes.toBytes(minViewTime));
      processingResult.add(SLIDE_CF, MAX_VTIME_C, Bytes.toBytes(maxViewTime));
    }
  }

As you one can see, HBaseHUT abstraction doesn’t change HBase API and that makes it very easy to use in new code or integrate it in old code.

You can find more details about HBaseHUT features on the project’s wiki. HBaseHUT is a new, recently emerged project – your feedback/questions/ideas/feature requests/forks/pull requests, etc. are all very welcome. Please start new discussions on HBaseHUT’s mailing list.

If you read this far, we’d like to talk to you – we are hiring smart people who know about Hadoop, HBase, MapReduce, want to do large scale data (stream) processing, and build tools like HBaseHUT.

Hadoop Digest, August 2010

The biggest announcement of the year: Apache Hadoop 0.21.0 released and is available for download here. Over 1300 issues have been addressed since 0.20.2; you can find details for Common, HDFS and MapReduce. Note from Tom White who did an excellent job as a release manager: “Please note that this release has not undergone testing at scale and should not be considered stable or suitable for production. It is being classified as a minor release, which means that it should be API compatible with 0.20.2.”. Please find a detailed description of what’s new in 0.21.0 release here.

Community trends & news:

  • New branch hadoop-0.20-security is being created. Apart from the security features, which are in high demand, it will include improvements and fixes from over 12 months of work by Yahoo!. The new security features are going to be a very valuable and welcome contribution (also discussed before).
  • A thorough discussion about approaches of backing up HDFS data in this thread.
  • Hive voted to become Top Level Apache Project (TLP) (also here).  Note that we’ll keep Hive under Search-Hadoop.com even after Hive goes TLP.
  • Pig voted to become TLP too (also here).  Note that we’ll keep Pig under Search-Hadoop.com even after Pig goes TLP.
  • Tip: if you define a Hadoop object (e.g. Partitioner, as implementing Configurable, then its setConf() method will be called once, right after it gets instantiated)
  • For those new to ZooKeeper and pressed for time, here you can find the shortest ZooKeeper description — only 4 sentences short!
  • Good read “Avoiding Common Hadoop Administration Issues” article.

Notable efforts:

  • Howl: Common metadata layer for Hadoop’s Map Reduce, Pig, and Hive (yet another contribution from Yahoo!)
  • PHP library for Avro, includes schema parsing, Avro data file and
    string IO.
  • avro-scala-compiler-plugin: aimed to auto-generate Avro serializable classes based on some simple case class definitions

FAQ:

  • How to programatically determine the names of the files in a particular Hadoop/HDFS directory?
    Use FileSystem & FileStatus API. Detailed examples are in this thread.
  • How to restrict HDFS space usage?
    Please, refer to HDFS Quotas Guide.
  • How to pass parameters determined at run-time (i.e. not hard-coded) to Hadoop objects (like Partitioner, Writable, etc.)?
    One option is to define a Hadoop object as implementing Configurable. In this case its setConf() method will be called once, right after it gets instantiated and you can use “native” Hadoop configuration for passing parameters you need.

HBase Digest, August 2010

The second “developer release”, hbase-0.89.201007d, is now available for download. To remind everyone, there are currently two active branches of HBase:

  • 0.20 – the current stable release series, being maintained with patches for bug fixes only.
  • 0.89 – a development release series with active feature and stability development, not currently recommended for production use.

First one doesn’t support HDFS durability (edits may be lost in the case of node failure) whereas the second one does. You can find more information at this wiki page.  HBase 0.90 release may happen in October!  See info from developers.

Community trends & news:

  • New HBase AMIs are available for dev release and 0.20.6.
  • Looking for some GUI that could be used for browsing through tables in HBase? Check out Toad for Cloud, watch for HBase-Explorer and HBase-GUI-Admin.
  • How many regions a RegionServer can support and what are the consequences of having lots of regions in a RegionServer? Check info in this thread.
  • Some more complaints to be aware of regarding HBase performing on EC2 in this thread. For those who missed it, more on Hadoop & HBase reliability with regard to EC2 in our March digest post.
  • Need guidance in sizing your first Hadoop/HBase cluster? This article will be helpful.

FAQ:

  • Where can I find information about data model design with regard to HBase?
    Take a look at http://wiki.apache.org/hadoop/HBase/HBasePresentations.
  • How can I perform SQL-like query “SELECT … FROM …” on HBase?
    First, consider that HBase is a key-value store which should be treated accordingly. But if you are still up for writing ad-hoc queries in your particular situation take a look at Hive & HBase integration.
  • How can I access Hadoop & HBase metrics?
    Refer to HBase Metrics documentation.
  • How to connect to HBase from java app running on remote (to cluster) machine?
    Check out client package documentation. Alternatively, one can use the REST interface: Stargate.

HBase Case-Study: Using HBaseTestingUtility for Local Testing & Development

Motivation

As HBase becomes more mature there’s is a growing demand for tools and methods for making development process easier – here at Sematext (@sematext) we’ve gone through our own per aspera ad astra learning process in addition to Cloudera’s Hadoop trainings and certifications. In this post we share what we’ve learned and show how one can HBaseTestingUtility for this.

Suppose there is a system that deals with processing data stored in HBase and displaying stored data via reporting application. Data processing is done using Hadoop MapReduce jobs. During development, it would be desirable to be able to:

  • debug MapReduce jobs in an IDE
  • run reporting application locally (on developer’s machine, without setting up a cluster) with possibility of debugging in IDE
  • easily access data stored in HBase for debugging purposes (easily means “naturally” as if all rows are in a text file)

Disclaimer

Described use-case and solution are just one option, an option that makes use of HbaseTestingUtility and underlying “mini” clusters. Depending on the context, this solution might not be the most optimal, but it is a good fit for presenting the ideas. This solution and this post should encourage developers to look at HBase’s unit-test sources when constructing their own tests and/or when finding ways for easier debugging & development.

Problem Details

In our example there are two tables in HBase: one with raw data and another with processed data.  Let’s call them RawDataTable and ProcessedDataTable. We import data into RawDataTable via simple importing MapReduce job which initally takes data from a log file. Subsequently, another MapReduce job processes data in that table and stores the outcome into ProcessedDataTable. We use HBase Scan and Get operations to access the processed data from the client.

Solution

As stated in javadocs, HBaseTestingUtility is a “facility for testing HBase”. Its description comes with a bit more of explanation: “Create an instance and keep it around doing HBase testing. This class is meant to be your one-stop shop for anything you mind need testing. Manages one cluster at a time only.” In this post we describe one possible way of how to use it to achieve the goals described above.

Processing Data

Step 1: Init cluster.

The following code starts “local” cluster and creates two tables:

private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
private HTable rawDataTable;
private HTable processedDataTable;
…
void initCluster() throws Exception {
  testUtil.getConfiguration().addResource("hbase-site-local.xml");
  testUtil.getConfiguration().reloadConfiguration();
  // start mini hbase cluster
  testUtil.startMiniCluster(1);
  // create tables
  rawDataTable = testUtil.createTable(RAW_TABLE_NAME, RAW_TABLE_COLUMN_FAMILIES);
  processedDataTable = testUtil.createTable(PROCESSED_TABLE_NAME, PROCESSED_TABLE_COLUMN_FAMILIES);
  // start MR cluster
  testUtil.startMiniMapReduceCluster();
}

testUtil.startMiniCluster(1) means start cluster with 1 datanode and 1 regionserver. You can start cluster with greater number of servers for test purposes.

Step 2: Import Data

We use simple map-only job for import data. Please refer to org.apache.hadoop.hbase.mapreduce.ImportTsv class for an example of such a job. The following code runs the job that uses locally stored files (e.g. a part of the log file of reasonable size) on just created cluster:

String[] importJobArgs = new String[] {RAW_TABLE_NAME, "file://" + inputFile};
if (!MyImportJob.createSubmittableJob(testUtil.getConfiguration(), importJobArgs).waitForCompletion(true)) {
  System.exit(1);
}

Step 3: Process Data

To process data in RawDataTable we run an appropriate MapReduce job in the same way as during the import:

if (!ProcessLogsJob.createSubmittableJob(testUtil.getConfiguration(), processLogsJobArgs).waitForCompletion(true)) {
  System.exit(1);
}

Step 4: Persist Processed Data

Since we need processed data during our reporting application development and debugging we persist it in some local file. In order to have “easy” access to this data during debugging it makes sense to store table data in a text file in a readable form (so that we could perform “grep” and other handy commands). So we actually write to two files at once. The Result class implements Writable interface, so there is a natural way to serialize its data.

BufferedWriter bw = ...;
DataOutputStream dos = ...;
ResultScanner rs = processedDataTable.getScanner(new Scan());
Result next = rs.next();
while (next != null) {
  next.write(dos);
  bw.write(getHumanReadableString(next));
  bw.newLine();
  next = rs.next();
}

After this step, the processed data is stored on the local disk and can be used for running the reporting application. Importing and processing of data is performed locally and is thus easier to debug.
In order to add extra processed data incrementally to the already stored data, instead of rewriting it from scratch, we need to load it from the file after cluster initialization as described in the following section.

Fetching Data

In order to make reporting application run on “local” cluster instead of the “true” one, we create an alternative HTable factory. Reporting application code uses a single HTable object instantiated by the factory during its whole lifecycle – this is the best practice for minimizing creation of HTable objects.

Step 1: Init cluster.

This step is exactly the same as described previously.

Step 2: Load processed data.

We use a file created during processing data stage to load the data back into just initialized cluster:

DataInputStream dis = ...;
Result next = new Result();
next.readFields(dis);
while (next.getRow() != null) {
  Put put = new Put(next.getRow());
  for (KeyValue kv : next.raw()) {
    put.add(kv);
  }
  processedDataTable.put(put);
  next = new Result();
  try {
    next.readFields(dos);
  } catch (EOFException e) {
    // file went to an end.
    break;
  }
}

After data is all loaded, the constructed processedDataTable can be used by the reporting application code. The app can now also be started and debugged easily from an IDE.

Next Steps

Internally HBaseTestingUtility makes use of a whole bunch of “mini” clusters: MiniZooKeeperCluster, MiniDFSCluster, MiniHBaseCluster and MiniMRCluster. Refer to the unit-test implementations in the source code of respective projects to get more examples on how to use them.

Thank you for reading, we hope you found this useful.  Follow @sematext on Twitter to be notified of new posts on Hadoop, HBase, Lucene, Solr, Mahout, and other related topics.

HBase Digest, July 2010

Big news first: HBase 0.20.6 is out and available for download. It fixes only 8 issues (including 2 blockers), but some of them might be significant in particular cases (like scan recovery in case of region server failure). You can find the release notes here. Message from the HBase dev team: “we recommend that all users, particularly those running 0.20.4, upgrade”.

The very sweet piece of functionality is under active development right now (and looks like it’s nearly complete).  This new functionality makes it possible to take HBase table snapshots: HBASE-50. This might be extremely useful in production. Design plan and implementation are looking so good that “committers should read it as they might learn something”.

Community news & trends:

  • Summary notes of HBase meetup (#11) at Facebook give a comprehensive overview of development activities and what’s in coming releases. Slides are available here.
  • Welcome the official HBase Blog.
  • It is strongly recommended to use HDFS patched with HDFS-630 with HBase. To save time, you can use Cloudera’s distribution: HDFS-630 is included in 0.20.1+169.89 (the latest CDH2, i.e. CDH2u1) a well as both betas of CDH3.
  • From the operations standpoint, is setup of a HBase cluster and their maintenance a fairly complex task? Can a single person manage it? People are sharing their experiences in this thread.
  • It makes sense to disable WAL (e.g. by Put.setWriteToWAL(false)) during one-time large import into HBase (of course this makes things unreliable, but it can be OK when doing import once given the resulting speedup).

Notable efforts:

  • HBase RowLog Library: a component to build WALs and queues backed by HBase.
  • Lily is out: meet Proof of Architecture release. Lily is the cloud-scalable NoSQL-based content store and search repository, built on top of Apache HBase and SOLR.

FAQ:

  • Why are recently added/modified records not present in result of scan and get operations? How can one make them available?
    Check autoFlush option: autoFlush=false causes the client to accumulate puts without sending them to the server. Gets and scans only talk to the server and thus ignore the client write cache. You can either set autoFlush to true or perform HTable.flushCommits() before reading data.

Are you on Twitter?  You can follow @sematext on Twitter.

Follow

Get every new post delivered to your Inbox.

Join 1,654 other followers