Search Analytics: Business Value & NoSQL Backend Presentation

Last week involved a few late nights for some of us at Sematext – we were busy readying our Search Analytics and Scalable Performance Monitoring services, as well as putting the final touches on the our Search Analytics: Business Value & NoSQL Backend presentation for Lucene Eurocon in Barcelona.

In the past we’ve given a few other public talks about Search Analytics and you can check them all out via http://blog.sematext.com/tag/analytics/.

Extending Hadoop Metrics

Here at Sematext we really like performance metrics and we like HBase.  We like them so much we’ve created a service for HBase Performance Monitoring (and for Solr, too).  In the process we’ve done some experiments with Hadoop and HBase around performance monitoring and are sharing our experience and some relevant code in this post.

The Hadoop metrics framework is simple to extend and customise. For example, you can very easily write a custom MetricsContext which sends metrics to your own storage solution.

All you need to do is extend the AbstractMetricsContext class and implement

protected void emitRecord(String context, String record, OutputRecord outputrecord)
  throws IOException;

To demonstrate, I wrote HBaseMetricsContext which stores Hadoop metrics in HBase. Since HBase itself uses the Hadoop metrics framework, you can use it to store its own metrics inside itself. Useful? Maybe. This is just an example after all.

If you’d like to try it out, get the source from GitHub. Then build the project using:

mvn package

Put the resulting Jar file in the HBase lib directory.

You will need to create a table with the relevant column families. We assume the column families are a composite of:

columnFamily = contextName + "." + recordName

In the HBase shell create your table:

create 'metrics', 'hbase.master', 'hbase.regionserver'

Edit your hadoop-metrics.properties file to include:

hbase.class=com.sematext.hadoop.metrics.HBaseMetricsContext
hbase.tableName=metrics
hbase.period=10

Restart HBase and it will start inserting to the metrics table every 10 seconds.

The row key of each record is made up of the timestamp and the tags (for disambiguation) like so:

rowKey = bytes(maxlong - timestamp) + bytes(tagName) + bytes(tagValue) + …

Subtracting the timestamp from maxlong ensures the scans get the most recent record first.

Each tag and metric is stored in it’s own column. This gives us a table that looks something like this:

hbase.master hbase.regionserver
cluster_requests hostName hostName flushQueueSize regions
rowKey2 rs1.example.org 0 1
rowKey1 101 master.example.org

For clarity timestamps are not included in the above table, as each cell is timestamped. All cells for a record will have the same timestamp.

Flume and HBase Integration

This quick How-To post will teach you how to use HBase sink(s) in Flume. There are currently two generic HBase sinks available: hbase() and attr2hbase(). The former requires one to be more explicit when providing mapping from event data to HBase record, while the latter allows adding record cells dynamically based on event data. Despite this difference, these two sinks have a lot in common.

Configuring Sink(s)

Both sinks write a single or no records into an HBase table based on a single Flume event. Both sinks have three attributes (at least) in common: table, writeBufferSize and writeToWal. These attributes control the HBase client behavior.

The attribute table is the name of the output HBase table. Yes, that means that currently one sink can be configured to output into just one table. If you desperately need to write to multiple tables you can use Flume’s native features to configure several sinks and direct each to the desired HBase table.

The writeBufferSize corresponds to the attribute of HTable with the same name and defines client-side buffer (in bytes). If writeBufferSize has a non-zero value, the HTable’s autoFlush is set to false. In this case the sink buffers data and sends them in chunks of the writeBufferSize size to the server.  This decreases the number of remote invocations and helps improve performance. By default the sink sends data on every received event. While it seems obvious that one would want to specify a bigger buffer to improve performance, determining the best buffer size depends on the use-case: no data will be sent to server before the buffer is full, which may break some “near real-time” models. The default writeBufferSize for HTable is 2 MB. Another important factor to consider is that the greater client-side buffer you have the more data you might loose in case of HBase failure [11]. Please find more details about this in Reliability Issues part of this post.

The writeToWal corresponds to the attribute of the HBase Put with the same name. If set to false the Put operations will not be written into HBase’s edit log. While this means fewer operations to perform and hence much better performance, this approach isn’t reliable: every non-persistent change (HBase likes to keep as much data as it can in memory to keep things fast) will be lost in case of HBase failure [1]. One would typically want to skip writing edits to WAL in case losing some portion of data is acceptable (e.g. when doing non mission-critical log analysis) or during a bulk import (it goes  much faster without writing to WAL) which can be restarted in case of failure.

HBase Sink: hbase()

As of this writing the hbase() sink has the following semantics:

hbase("table", "rowkey", "cf1", "c1", "val1"[,"cf2", "c2", "val2", ....]
 {, writeBufferSize=int, writeToWal=true|false})

As you can see, one is asked to provide the values to be placed into the HBase record explicitly. One has to use expressions in values of “rowkey”, “cf1″, “c1″, “val1″, etc. to make it usable. Expressions available with any event type are “%{hostname}” (or “%{host}”), “%{nanos}”, “%{priority}”, “%{body}” (and soon “%{timestamp}” [8]) which resolve to event properties. You can also fetch value of any event’s custom attribute with the help of “%{attribute_name}”. For example, with the following configuration an HBase table would be populated with records so that one record corresponds to one event with event’s nanos (event.getNanos()) as row key and two cells in event_colfam column family: body and host with values of event’s body and hostname respectively.

hbase("table", "%{nanos}", "event_colfam", "body", "%{body}",
      "event_colfam", "host", "%{host}")

HBase Sink: attr2hbase()

This sink was originally contributed to Flume by Sematext, as we needed it for our products that make use of Flume and HBase.

As of this writing the attr2hbase() has the following semantic:

attr2hbase("table"[,"sysFamily"[,"writeBody"[,"attrPrefix"[,"writeBufferSize"
 [,"writeToWal"]]]]])

The attr2hbase() sink is used to write event attributes that correspond to a particular format (name starts with a specified prefix) into an HBase record. The names of columns (qualifiers) and column families are determined dynamically based on event’s attribute names. Thus, compared to the hbase() sink, you don’t have to list all possible event attributes you want to store in HBase along with their destination column families and qualifiers (columns). Your source and/or decorators can produce any (reasonable) number of attributes, with dynamic names (e.g. depending on the values) and they will be written into HBase if attr2hbase() sink is configured correctly. In other words, with attr2hbase() one can a) define column names at run-time and b) add whatever number of columns the business logic requires (also at run-time, based on the data being processed). E.g. in case email messages are processed we can store each recipient in a separate cell in an HBase record. You may want to implement your own decorator for that (keep reading to learn how to do this) due to some limitations [9].

sysFamily holds the name of the column family that  is used to store “system” data (event timestamp, host, priority). In case this parameter is absent or equals “”, the sink doesn’t write “system” data. E.g. with configuration like this:

attr2hbase( "mytable", "sysColfam")

each record will have the following cells:

sysColFam:timestamp=<event’s timestamp>, sysColFam:host=<event’s host>, sysColFam:priority=<event’s prioirty>:

hbase(main):002:0> scan 'mytable'         
ROW    COLUMN+CELL                                                                                                              
 123   column=sysColfam:host, timestamp=1309275728725, value=testhost                                        
 123   column=sysColfam:priority, timestamp=1309275728725, value=INFO                                                           
 123   column=sysColfam:timestamp, timestamp=1309275728725, value=\x00\x00\x010\xD6\xEA+T

writeBody indicates whether event body should be written with other “system” data. By default, (when this parameter is absent or equals ””) the attribute body is not written. This parameter should have the “column-family:qualifier” format in order for the sink to write the body to the specific column-family:qualifier.

attrPrefix defines which attributes will be written to HBase: every attribute with the name prefixed with attrPrefix parameter’s value is written.  The attribute key should be in the following format to be properly written into HBase:

“<attrPrefix><colfam>:<qual>”

The default value of attrPrefix is “2hb_”.  This means that all attributes with names “2hb_<colfam>:<qual>” should be written to HBase. Attribute with key “<attrPrefix>” must contain row key for Put, otherwise, if no row can be extracted, the event is skipped and no record is written to the HBase table. The table below shows how event attributes are handled: each cell shows where the event attribute  (first column) will be written based on the sink parameters (second and other cells in table header).

Event’s attr (“name”->”value”) attrPrefix=”2hb_”, sysFamily=null attrPrefix=”2hb_”, sysFamily=”sysfam” attrPrefix=”", sysFamily=”sysfam” attrPrefix=”", sysFamily=null
“any”->”foo” - - sysfam:any->foo -
“colfam:col”->”foo” - - colfam:col->foo colfam:col->foo
“2hb_any”->”foo” - sysfam:any->foo sysfam:2hb_any->foo -
“2hb_colfam:col”->”foo” colfam:col->foo colfam:col->foo 2hb_colfam:col->foo 2hb_colfam:col->foo

Example

This sink is usually used with the decorators that perform light transformation of event data into attributes with specific names. We say “light transformation” here to avoid getting into the discussion about whether Flume is meant for ETL or just for reliable data delivery. You can use different standard Flume decorators [2], like “value()”, “select()”, “regex()”, “split()” and others. For example, the following setting will make Flume write a new record into HBase table for every line in a tailed file:

'tail("/tmp/some.log")'
'split( ":", 0 , "2hb_colfam:ts")
 split( ":", 1 ,"2hb_colfam:value")
 format("%{nanos}:") split(":", 0, "2hb_")
  attr2hbase( "mytable", "", "", "2hb_", "1000", "false" )'

NOTE: we have to use format() + split() decorators as workaround for FLUME-676 [3]: value() sink doesn’t support EL values. So with format() we put %{nanos} into the body and then from there we put it into attribute with name “2hb_”. The written records will have row key whose value will hold the nanos of the event timestamp (“2hb_” attribute value) and two values pased from log line. Here’s the example output:

$ echo "2238947398:56" >> /tmp/some.log
$ hbase shell
hbase(main):025:0> scan 'mytable'
ROW COLUMN+CELL
9656555664717551 column=colfam:ts, timestamp=1308748686334, value=2238947398
9656555664717551 column=colfam:value, timestamp=1308748686334, value=56

Custom Decorator

The attr2hbase sink is often used with custom decorator that “prepares” events so that they contain attributes ready to be written into an HBase table. Implementing a custom decorator is very simple. Example code:

public class CustomDecorator<S extends EventSink> extends
EventSinkDecorator<S> {
  public CustomDecorator(String param) {
    super(null);
    // TODO: do some initialization
  }

  public void append(Event e) throws IOException {
        // TODO: transform event e here
    super.append(e);
  }

  public static SinkFactory.SinkDecoBuilder builder() {
    return new SinkFactory.SinkDecoBuilder() {
      @Override
      public EventSinkDecorator<EventSink> build(Context context,
          String... argv) {
        if (argv.length != 1) {
          throw new IllegalArgumentException("
usage: CustomDecorator(\"param\")");
        }
                return new CustomDecorator<EventSink>(argv[0]);
      }
    };

  }

  public static List<Pair<String, SinkFactory.SinkDecoBuilder>>
getDecoratorBuilders() {
    return Arrays.asList(new Pair<String,
SinkFactory.SinkDecoBuilder>("CustomDec", builder()));
  }

}

To make your custom decorator visible to Flume you need to register it in flume-site.xml:

<configuration>
<property>
<name>flume.plugin.classes</name>
 <value>your.own.CustomDecorator</value>
</property>
...
</configuration>

Reliability Issues

There are certain reliability details you will want to think about when using HBase sinks.

Temporary HBase Connection Loss Requires Sink Restart

If default HBase configuration is used, when connection from sink node to HBase cluster breaks for several minutes (e.g. network problems or cluster maintenance downtime) the sink stops working. Unfortunately, it does not recover after cluster becomes accessible again. To fix this behavior you can adjust configuration properties “hbase.client.pause” and “hbase.client.retries.number” in hbase-site.xml which should be in Flume’s classpath. The default behavior (i.e. the issue) is going to be fixed in a future release [10].

Loss of Data Buffered on Client-side

Despite using reliable data delivery approaches (like agentE2ESink), there is a loss of data possible if client-side buffer is used (i.e. if writeBufferSize > 0) when failure happens during the buffer flushing [11]. This occurs because the ACK about data being received is sent before the actual write to an HBase table is done – writes are performed only on buffer flush, not on per-event basis. Thus, in case of a write failure during flushing data from the client-side will be lost as ACKs will fool Flume into thinking data have already been persisted in HBase.

Other Considerations

Row Key

As usually, one of the most important design decisions when using HBase is around row key format/values. In initial versions of HBase sinks the default row key was %{nanos}, as it seems to be unique and records seems to be ordered by arrival time. However, we suggest you consider using a different row key value/format, and here’s why. Usually when users import data into HBase they use either some kind of UUID or a timestamp (inverted) based approach. The former helps achieve better performance by distributing write load between multiple regions. The latter is good when one needs fast scans of imported data based on time ranges and when import load is bearable. However, when we use nanos, which are currently obtained from System.nanoTime() Java call, we don’t achieve any of these advantages: keys from each event-producer go in sequence, so all write load is distributed over just a few regions and region servers at a time [12], and System.nanoTime() is not really suitable for use in scans for fetching data from a time interval. Moreover, we cannot even rely on records being written from different Flume nodes being ordered by time of arrival, as nanos are not necessarily in sync across JVMs. Thus, it may make more sense to use pure random row keys (although Flume has no %{random} or anything similar, as far as we know) or a timestamp (inverted) [8] (along with [12] if that is possible). There might be more work needed with timestamp-based approach: there is a possibility to get events with the exact same timestamp during high load. Thus, adding an extra hash would help here (or even using “%{timstamp}%{nanos}”).

Setup Instructions

The easiest way to install Flume is to use CDH3 [4]. Then you need to add flume-plugin-hbasesink jar into flume lib dir. You can compile it from Flume sources [5] or download a compiled jar [6]. You also need to add HBase jar into Flume’s lib dir. It can be downloaded from Maven repositories [7] or copied from your HBase setup dir (so that the jar used on client-side by Flume sink is the same as the one on the server-side, which is important). The last step is to add plugin(s) to Flume’s configuration file (flume-site.xml):

<configuration>
<property>
<name>flume.plugin.classes</name>
 <value>com.cloudera.flume.hbase.HBaseSink,com.cloudera.flume.hbase.Attr2HBaseEventSink</value>
</property>
...
</configuration>

As both HBase and Flume use Zookeeper, it makes sense to share the Zookeeper quorum, too.

If you read this far, you may want to know that we are hiring and are happy users of and contributors to Flume, HBase, and a few other projects.

[1] http://www.larsgeorge.com/2010/01/hbase-architecture-101-write-ahead-log.html

[2] http://archive.cloudera.com/cdh/3/flume/UserGuide/index.html#_flume_sink_decorator_catalog

[3] https://issues.cloudera.org/browse/FLUME-676

[4] https://ccp.cloudera.com/display/CDHDOC/Flume+Installation

[5] https://github.com/abaranau/flume-plugin-hbasesink-compiled

[6] https://github.com/cloudera/flume

[7] e.g. https://repository.cloudera.com/content/groups/public/org/apache/hbase/hbase/

[8] https://issues.cloudera.org/browse/FLUME-688

[9] https://issues.cloudera.org/browse/FLUME-689

[10] https://issues.cloudera.org/browse/FLUME-685

[11] https://issues.cloudera.org/browse/FLUME-390

[12] https://github.com/sematext/HBaseWD

Opening: HBase and Lucene / Solr / Elastic Search Developer

We are once again looking for smart people.  This time we are looking to hire a person who likes working with HBase and Lucene (or Solr or ElasticSearch).  This particular combination is important to us because the very first target for this person might be the integration of HBase and Lucene / Solr / ElasticSearch.  More specifically, we have our eyes on HBASE-3529, which we’ve closely examined during a recent HBase Hackathon that took place after BerlinBuzzwords.  Of course, we are also open to alternative approaches if the one takes in HBASE-3529 turns out to be problematic.  The work around the marriage of HBase and full-text search is to be done “in the open”, meaning in collaboration with HBase as well as Lucene, Solr, or Elastic Search developers, which makes this project that much more exciting.

Beyond HBase and search integration, we do other interesting stuff with HBase (and Flume and MapReduce and …), so this person would get to work on our Search Analytics and Scalable Performance Monitoring services.

Interested?  Please get in touch and see what else we like on our jobs page.

Hiring: Data Mining, Analytics, Machine Learning Hackers

If you want to work with search, big data mining, analytics, and machine learning, and you are a positive, proactive, independent creature, please keep reading.We are looking for devops to hack on Sematext’s new products and services, as well as provide services to our growing list of clients.  Working knowledge of Mahout or statistics/machine learning/data mining background would be a major plus.
 

Skills & experience (the more of these you have under your belt the better):

  • Data mining and/or machine learning (Mahout or …)
  • Big data (HBase or Cassandra or Hive or …)
  • Search (Solr or Lucene or Elastic Search or …)

More about an ideal you:

  • You are well organized, disciplined, and efficient
  • You don’t wait to be told what to do and don’t need hand-holding
  • You are reliable, friendly, have a positive attitude, and don’t act like a prima donna
  • You have an eye for detail, don’t like sloppy code, poor spelelling and typous
  • You are able to communicate complex ideas in a clear fashion in English, clean and well designed code, or pretty diagrams

Optional bonus points:

  • You like to write or speak publicly about technologies relevant to what we do
  • You are an open-source software contributor

A few words about us:

We work with search and big data (Lucene, Solr, Nutch, Hadoop, MapReduce, HBase, etc.) on a daily basis and we present at conferences.  Our projects with external clients range from 1 week to several months.  Some clients are small startups, some are large international organizations.  Some are top secret.  New customers knock on our door regularly and this keeps us busy at pretty much all times.  When we are not busy with clients we work on our products.  We run search-lucene.com and search-hadoop.com.  We participate in open-source projects and publish monthly Digest posts that cover Lucene, Solr, Nutch, Mahout, Hadoop, Hive, and HBase.  We don’t write huge spec docs, we work in sprints, we multitask, and try our best to be agile. We send people to conferences, trainings (Hadoop, HBase, Cassandra), and certifications (2 of our team members are Cloudera Certified Hadoop Developers).

We are a small and mostly office-free, highly distributed team spanning 3 continents and 6 countries.  We communicates via email, Skype voice/IM, BaseCamp.  Some of our developers are in Eastern Europe, so we are especially open to new team members being in that area, but we are also interested in good people world-wide, from South America to Far East.

Interested? Please send your resume to jobs @ sematext.com feel free to check out our other positions.

Google Summer of Code and Intern Sponsoring

Are you a student and looking to do some fun and rewarding coding this summer? Then join us for the 2011 Google Summer of Code!

The application deadline is in less than a month! Lucene has identified initial potential projects, but this doesn’t mean you can also pick your own.  If you need additional ideas, look at our Lucene / Solr for Academia: PhD Thesis Ideas (or just the spreadsheet if you don’t want to read the what and the why),  just be sure to discuss with the community first (send an email to dev@lucene.apache.org).

We should also add that, separately from GSoC, Sematext would be happy to sponsor good students and interns interested in work on projects involving search (Lucene, Solr), machine learning & analytics (Mahout), big data (Hadoop, HBase, Hive, Pig, Cassandra), and related areas. We are a virtual and geographically distributed organization whose members are spread over several countries and continents and we welcome students from all across the globe.  For more information please inquire within.

HBase Backup Options

If you are thinking about using HBase you will likely want to understand HBase backup options.  I know we did, so let us share what we found.  Please let us know what we missed and what you use for HBase backup!

Export

You could export your tables using the Export (org.apache.hadoop.hbase.mapreduce.Export) MapReduce job that will export the table data into a Sequence File on HDFS.  This was implemented in HBASE-1684 if you want to check out the patch or comments there.  This tool works on one table at a time, so if you need to backup multiple tables, run this on each table.  The exported data can then be imported back into HBase by the Import tool.

Copy Table

If you have another HBase cluster that you want to treat as a backup cluster, you can use the handy CopyTable tool to copy a table at a time.

Distcp

You could use Hadoop’s distcp command to copy the whole /hbase directory from one HDFS cluster to the other.  However, this can leave your data in an inconsistent state, so it should be avoided.  See http://search-hadoop.com/m/wkMgSjVLDb

At this point we should point out that all of the above backup methods are per-table.  Moreover, they don’t work or create a snapshot of the table.   Export and CopyTable are atomic only at the row level.  Furthermore, if you have multiple tables whose tables depend on each other, if they are being modified while you are exporting or copying them, you will end up with inconsistent data – the data in those tables will not be in sync.  See http://search-hadoop.com/m/Q4bU81G116p.

Backup from Mozilla

Because of the above mentioned issues with distcp when running it over a cluster whose data is being modified while distcp is running, developers at Mozilla came up with their own Backup tool.  They’ve described the tool and its use in the popular Migrating HBase in the Trenches post.

Cluster Replication

HBase has a relatively new and not yet widely used whole cluster replication mechanism.  The backup cluster does not have to be identical to the master cluster, which means that the backup cluster could be much less powerful and thus cheaper, while still having enough storage to serve as backup.

Table Snapshot

Ah, the infamous HBASE-50!  This issue saw some great work during GSoC 2010, but it looks like it was never integrated into HBase.  It is unclear whether the contributor simply ran out of steam or time or whether it became apparent that table snapshots are too difficult to implement or simply not doable because of highly distributed nature of HBase.  The JIRA issue does contain patches you can look at, and the author has a now inactive hbase-snapshot repository up on Github.

HDFS Replication

You could also simply crank up the replication factor to the level that makes you feel safe and call that a backup.  This may not guard against data corruption, but it does guard against certain partial hardware failures.

Since so many people seem to be asking about HBase backup options, I hope this serves as a good point-in-time snapshot, a summary of all HBase backup options that are currently on the table.  With time, this will be added to the HBase Book.

Are there other HBase backup options we should have included?

What you use for HBase backup?

Deferring Processing Updates to Increase HBase Write Performance

In this post we’ll examine, via an example, how deferring the processing of updates can increase write throughput of a data store. In the end we’ll introduce our recently open-sourced HBaseHUT, a tool for automating of deferred updates in HBase.

Please note, that “deferred updates” here does not mean that updates are not “visible” immediately after a write operation. The system still serves add/update/delete and read operations in real-time.

Plug: if this sort of stuff interests you, we are hiring people who know about Hadoop, HBase, MapReduce…

Constraints & Assumptions

The decisions and ideas in the post are based on facts about HBase, but can be applied to many other data stores with similar characteristics:

  • Fetching a set of records as a range (scan operation) is much cheaper than fetching the same records one-by-one
  • Adding new records (simple write operation) is very fast
  • Storage space is cheap
  • “In-place” updates of records are not generally feasible

Well, the last assumption is a bit tricky: HBase provides incrementColumnValue() operation but it is very limited: one can only increment a specified cell that contains long value by some delta. In future versions HBase will allow incrementing of multiple cells at once (already in HBase trunk). However, not all update operations are that simple, as we’ll see in the example. Moreover, incrementing cells in a record is slower than writing a new record (we’ll use this fact later in the post).

Idea Explained

The idea behind deferred updates processing is to postpone updating of the existing record and store incoming deltas as a new record. Thus, record update operations become a simple write operations with corresponding performance. Deferred updates technique elaborated here fits well when system handles a lot of updates of stored data and write performance is the main concern, while reading speed requirements are not that strict. The following cases (each of them separately or any combination of them) may indicate  that one can benefit from using the technique (the list is not complete):

  • updates are well spread over the whole and large dataset
  • lower rate (among write operations) of “true updates” (i.e. low percentage of writes are for completely new data, not really updates of existing data)
  • good portion of data stored/updated may never be accessed
  • system should be able to handle high write peaks without major performance degradation

Next, let’s look at an example use-case to illustrate the idea better.

Example Use-Case

Imagine a simple presentation slides sharing system that gives one the ability to browse presentations on-line (not as downloadable PDF or whatever format files, since that would not be interesting for us here). Suppose we want to provide the presentation’s author with details on user behaviour when a user navigates through slides so that the author can use this data to infer the quality of the presentation (hey, slide sharing systems maintainers: free hint!). Let’s limit statistics data we want to share to simple metrics for the sake of keeping the example simple. Here’s what we can provide:

  • number of times a particular slide was viewed
  • avg/min/max time spent by user on a particular slide

Using these stats the presentation author can find out e.g.:

  • what are the most difficult to understand slides (they probably need to be divided into multiple ones, or better preparation before showing them should be done by giving more/better info on previous slides)
  • what are the most trivial slides (they can be merged/removed)
  • what are the typical users (e.g., If users spend a lot of time thinking on the slides the author can add more details in the presentation.  On the other hand, if users usually quickly walk through it then some very deep details can be removed)

Such a system could provide more stats to authors so that they can work on other sorts of presentation improvements.  For example, the system could provide information about user navigation actions (e.g., if users often go back and forth between two slides then probably the schemes/diagrams on them are better understandable if they are on the same slide and can be easily compared), but that’s not the focus of this post.

To track previously mentioned stats data we need to capture how much time a user spent on a particular slide and send this data to our data store. Let’s assume our records store the following info: {(key)presentationId_slideId, number of views, total viewing time, max viewing time, min viewing time}. Using this data we can provide useful information to author as needed.

Every time user leaves the slide, tracking logic sends the next data to the data store: {(key)presentationId_slideId, time spent on viewing}.

Direct Solution

The straight forward approach would be to update the presentationId_slideId record (fetch, modify and store updated data) as new data comes in. Here are some drawbacks of this naive approach:

  • for each update operation we have to perform fetch and write operations (Get and Put in case of HBase)
  • Most authors won’t really care about all these stats and majority of them will never look at it, so many processing operations can simply be omitted (until there’s interest in stats)
  • for first ever viewing of a slide a redundant fetch operation is performed (data is not there yet), which is OK if slides are viewed many times but in situation when many presentations are viewed just one time (e.g. by their creators) this may result in a bigger portion of redundant operations (this assumption is sort of made up, yes)

The very first point is a major one and is the obvious write performance killer.  This point is really not made up – we have first hand experience with that particular situation.

Deferring Updates Processing

One alternative solution is to defer updates processing to the time when data is requested by presentation author or to the time when the scheduled/periodic updates processing is performed (ideally when data store write load is minimal). In this deferred updates approach all new deltas (new user action data) are written as new records.

When an author requests the stats, efficient updates processing takes place. This time it is done in much more efficient way (compared to direct approach): by using range scan operations, and not by individual record read/writes. Given that presentation has usually tens of slides and is viewed by hundreds or thousands people such range scans are so fast the user will barely notice them. Also, it should be quite OK to ask a person to wait several seconds during his first view of stats page (processed updates are written back into the data store, so next time user fetches data it is available right away).

To prevent too long updates processing on user request the data can be processed periodically instead of at request time. Given the fact that (updated) statistics data is available to user immediately, the processing can be postponed for a long time without affecting the “real-time” visibility aspect of the system. The processing of updates can be scheduled to take place during off-peak hours, when they won’t compromise write performance. By doing this we add a “cushion” since from this point on the maximum amount of data that needs to be processed “on-the-fly” is one day ‘s worth of updates (assuming daily updates processing) , which is sufficient for the majority of cases (e.g. estimated max presentation views per day is tens of thousands which is a very quick scan operation).

The remainder of this post provides details of our implementation of this approach.

HBaseHUT

HBaseHUT is a tool that automates deferred processing of updates for HBase. It hides many details of updates processing from the client code, thus making it an easy to use solution. There are Put and ResultScanner wrappers that encapsulate all needed logic.

To write new data you need to use HutPut implementation of Put. Since it implements the standard HBase Put only the instance creation code needs to be changed:

public class SlideStatsTracker {
  byte[] SLIDE_CF = Bytes.toBytes("slide");
  byte[] VIEWS_NUM_C = Bytes.toBytes("views");
  byte[] TOTAL_VTIME_C = Bytes.toBytes("total_time");
  byte[] MIN_VTIME_C = Bytes.toBytes("min_time");
  byte[] MAX_VTIME_C = Bytes.toBytes("max_time");
  ...
  void trackSlideView(byte[] presentationId, byte[] slideId, long timeSpent)
                               throws InterruptedException, IOException {
    Put put = new HutPut(Bytes.add(presentationId , slideId));
    put.add(SLIDE_CF, VIEWS_NUM_C, Bytes.toBytes(1));
    put.add(SLIDE_CF, TOTAL_VTIME_C, Bytes.toBytes(timeSpent));
    hTable.put(put);
  }
  …
}

Fetching stats is performed using HutResultScanner which implements ResultScanner and hence can also be very easily integrated into the code that uses normal HBase API:

  Scan scan = new Scan(presentationId);
  ResultScanner resultScanner =
      new HutResultScanner(hTable.getScanner(scan), updateProcessor);
  Result result = resultScanner.next();
  while (result != null) {
    // fetch data from result object
    result = resultScanner.next();
  }

The updateProcessor passed as parameter encapsulates update operation logic:

  class SlideStatsUpdateProcessor implements UpdateProcessor {
    @Override
    public void process(Iterable records, UpdateProcessingResult processingResult) {
      int viewsNumber = 0;
      long totalViewTime = 0;
      long minViewTime = Long.MAX_VALUE;
      long maxViewTime = Long.MIN_VALUE;
      // Processing records
      for (Result record : records) {
        for (int i = 0; i < 5; i++) {
          int views = Bytes.toInt(record.getValue(SLIDE_CF, VIEWS_NUM_C));
          long totalTime = Bytes.toLong(record.getValue(SLIDE_CF, TOTAL_VTIME_C));
          long minTime = Bytes.toLong(record.getValue(SLIDE_CF, MIN_VTIME_C));
          long maxTime = Bytes.toLong(record.getValue(SLIDE_CF, MAX_VTIME_C));

          viewsNumber += views;
          totalViewTime += totalTime;
          minViewTime = minTime  maxViewTime ? maxTime : maxViewTime;
        }
      }

      processingResult.add(SLIDE_CF, VIEWS_NUM_C, Bytes.toBytes(viewsNumber));
      processingResult.add(SLIDE_CF, TOTAL_VTIME_C, Bytes.toBytes(totalViewTime));
      processingResult.add(SLIDE_CF, MIN_VTIME_C, Bytes.toBytes(minViewTime));
      processingResult.add(SLIDE_CF, MAX_VTIME_C, Bytes.toBytes(maxViewTime));
    }
  }

As you one can see, HBaseHUT abstraction doesn’t change HBase API and that makes it very easy to use in new code or integrate it in old code.

You can find more details about HBaseHUT features on the project’s wiki. HBaseHUT is a new, recently emerged project – your feedback/questions/ideas/feature requests/forks/pull requests, etc. are all very welcome. Please start new discussions on HBaseHUT’s mailing list.

If you read this far, we’d like to talk to you – we are hiring smart people who know about Hadoop, HBase, MapReduce, want to do large scale data (stream) processing, and build tools like HBaseHUT.

Hiring Search and Data Analytics Engineers

We are growing and looking for smart people to join us either in an “elastic”, on-demand, per-project, or more permanent role:

Lucene/Solr expert who…

  • Has built non-trivial applications with Lucene or Solr or Elastic Search, knows how to tune them, and can design systems for large volume of data and queries
  • Is familiar with (some of the) internals of Lucene or Solr or Elastic Search, at least on the high level (yeah, a bit of an oxymoron)
  • Has a systems/ops bent or knows how to use performance-related UNIX and JVM tools for analyzing disk IO, CPU, GC, etc.

Data Analytics expert who…

  • Has used or built tools to process and analyze large volumes of data
  • Has experience using HDFS and MapReduce, and have ideally also worked with HBase, or Pig, or Hive, or Cassandra, or Voldemort, or Cascading or…
  • Has experience using Mahout or other similar tools
  • Has interest or background in Statistics, or Machine Learning, or Data Mining, or Text Analytics or…
  • Has interest in growing into a Lead role for the Data Analytics team

We like to dream that we can find a person who gets both Search and Data Analytics, and ideally wants or knows how to marry them.

Ideal candidates also have the ability to:

  • Write articles on interesting technical topics (that may or may not relate to Lucene/Solr) on Sematext Blog or elsewhere
  • Create and give technical talks/presentations (at conferences, local user groups, etc.)

Additional personal and professional traits we really like:

  • Proactive and analytical: takes initiative, doesn’t wait to be asked or told what to do and how to do it
  • Self-improving and motivated: acquires new knowledge and skills, reads books, follows relevant projects, keeps up with changes in the industry…
  • Self-managing and organized: knows how to parcel work into digestible tasks, organizes them into Sprints, updates and closes them, keeps team members in the loop…
  • Realistic: good estimator of time and effort (i.e. knows how to multiply by 2)
  • Active in OSS projects: participates in open source community (e.g. mailing list participation, patch contribution…) or at least keeps up with relevant projects via mailing list or some other means
  • Follows good development practices: from code style to code design to architecture
  • Productive, gets stuff done: minimal philosophizing and over-designing

Here are some of the Search things we do (i.e. that you will do if you join us):

  • Work with external clients on their Lucene/Solr projects.  This may involve anything from performance troubleshooting to development of custom components, to designing highly scalable, high performance, fault-tolerant architectures.  See our services page for common requests.
  • Provide Lucene/Solr technical support to our tech support customers
  • Work on search-related products and services

A few words about us:

We work with search and big data (Lucene, Solr, Nutch, Hadoop, MapReduce, HBase, etc.) on a daily basis.  Our projects with external clients range from 1 week to several months.  Some clients are small startups, some are large international organizations.  Some are top secret.  New customers knock on our door regularly and this keeps us busy at pretty much all times.  When we are not busy with clients we work on our products.  We run search-lucene.com and search-hadoop.com.  We participate in open-source projects and publish monthly Digest posts that cover Lucene, Solr, Nutch, Mahout, Hadoop, and HBase.  We don’t write huge spec docs, we work in sprints, we multitask, and try our best to be agile. We send people to conferences, trainings (Hadoop, HBase, Cassandra), and certifications (2 of our team members are Cloudera Certified Hadoop Developers).

We are a small and mostly office-free, highly distributed team that communicates via email, Skype voice/IM, BaseCamp.  Some of our developers are in Eastern Europe, so we are especially open to new team members being in that area, but we are also interested in good people world-wide, from South America to Far East.

Interested? Please send your resume to jobs @ sematext.com.

Search Analytics: Hadoop World Presentation

After our Lucene Revolution talk in Boston, we got ready for last week’s Hadoop World conference in New York.  Like at the Lucene Revolution, we presented to a packed room of 200+ people. The topic of our talk was the Search Analytics tool we’ve built with the help of Flume, HBase, MapReduce, and other open-source tools, and which are now starting to use for search-hadoop.com and search-lucene.com.  If you couldn’t make it to Hadoop World, have a look at our presentation below.  And if you’d like to work on Search, Analytics, and related areas, we’re looking for good people world-wide – see our jobs page.  Enjoy!

Follow

Get every new post delivered to your Inbox.

Join 599 other followers