Mahout Digest, October 2010

We’ve been very busy here at Sematext, so we haven’t covered Mahout during the last few months.  We are pleased with what’s been keeping us busy, but are not happy about our irregular Mahout Digests.  We had covered the last (0.3) release with all of its features and we are not going to miss covering very important milestone for Mahout: release 0.4 is out! In this digest we’ll summarize the most important changes in Mahout from the last digest and add some perspective.

Before we dive into Mahout, please note that we are looking for people with Machine Learning skills and Mahout experience (as well as good Lucene/Solr search people).  See our Hiring Search and Data Analytics Engineers post.

This Mahout release brings overall changes regarding model refactoring and command line interface to Mahout aimed at improving integration and consistency (easier access to Mahout operations via the command line). The command line interface is pretty much standardized for working with all the various options now, which makes it easier to run and use. Interfaces are better and more consistent across algorithms and there have been many small fixes, improvements, refactorings, and clean-ups. Details on what’s included can be found in the release notes and download is available from the Apache Mirrors.

Now let’s add some context to various changes and new features.

GSoC projects

Mahout completed its Google Summer of Code  projects and two completed successfully:

  • EigenCuts spectral clustering implementation on Map-Reduce for Apache Mahout (addresses issue MAHOUT-328), proposal and implementation details can be found in MAHOUT-363
  • Hidden Markov Models based sequence classification (proposal for a summer-term university project), proposal and implementation details in  MAHOUT-396

Two projects did not complete due to lack of student participation and one remains in progress.

Clustering

The biggest addition in clustering department are EigenCuts clustering algorithm (project from GSoC) and MinHash based clustering which we covered as one of possible GSoC suggestions in one of previous digests . MinHash clustering was implemented, but not as a GSoC project. In the first digest from the Mahout series we covered problems related to evaluation of clustering results (unsupervised learning issue), so big addition to Mahout’s clustering are Cluster Evaluation Tools featuring new ClusterEvaluator (uses Mahout In Action code for inter-cluster density and similar code for intra-cluster density over a set of representative points, not the entire clustered data set) and CDbwEvaluator which offers new ways to evaluate clustering effectiveness.

Logistic Regression

Online learning capabilities such as Stochastic Gradient Descent (SGD) algorithm implementation are now part of Mahout. Logistic regression is a model used for prediction of the probability of occurrence of an event. It makes use of several predictor variables that may be either numerical or categories. For example, the probability that a person has a heart attack within a specified time period might be predicted from knowledge of the person’s age, sex and body mass index. Logistic regression is used extensively in the medical and social sciences as well as marketing applications such as prediction of a customer’s propensity to purchase a product or cease a subscription. The Mahout implementation uses Stochastic Gradient Descent (SGD), check more on initial request and development in MAHOUT-228. New sequential logistic regression training framework supports feature vector encoding framework for high speed vectorization without a pre-built dictionary. You can find more details on Mahout’s logistic regression wiki page.

Math

There has been a lot of cleanup done in the math module (you can check details in Cleanup Math discussion on ML), lot’s of it related to an untested Colt framework integration (and deprecated code in Colt framework). The discussion resulted in several pieces of Colt framework getting promoted to a tested status (QRdecomposition, in particular)

Classification

In addition to speedups and bug fixes, main new features in classification are new classifiers (new classification algorithms) and more open/uniformed input data formats (vectors). Most important changes are:

  • New SGD classifier
  • Experimental new type of Naive bayes classifier (using vectors) and feature reduction options for existing Naive bayes classifier (variable length coding of vectors)
  • New VectorModelClassifier allows any set of clusters to be used for classification (clustering as input for classification)
  • Now random forest can be saved and used to classify new data. Read more on how to build a random forest and how to use it to classify new cases on this dedicated wiki page.

Recommendation Engine

The most important changes in this area are related to distributed similarity computations which can be used in Collaborative Filtering (or other areas like clustering, for example). Implementation of Map-Reduce job, based on algorithm suggested in Elsayed et al: Pairwise Document Similarity in Large Collections with MapReduce, which computes item-item similarities for item-based Collaborative Filtering can be found in MAHOUT-362. Generalization of algorithm based on the mailing list discussion led to an implementation of  Map-Reduce job which computes pairwise similarities of the rows of a matrix using a customizable similarity measure (with implementations already provided for Cooccurrence, Euclidean Distance, Loglikelihood, Pearson Correlation, Tanimoto coefficient, Cosine). More on distributed version of any item similarity function (which was available in a non-distributed implementation before) can be found in MAHOUT-393. With pairwise similarity computation defined, RecommenderJob has been evolved to a fully distributed item-based recommender (implementation depends on how the pairwise similarities are computed). You can read more on distributed item-based recommender in MAHOUT-420.

Implementation of distributed operations on very large matrices are very important for a scalable machine learning library which supports large data sets. For example, when term vector is built from textual document/content, terms vectors tend to have high dimension. Now,  if we consider a term-document matrix where each row represents terms from document(s), while a column represents a document we obviously end up with high dimensional matrix. Same/similar thing occurs in Collaborative Filtering: it uses a user-item matrix containing ratings for matrix values, row corresponds to a user and each column represents an item. Again we have large dimension matrix that is sparse.

Now, in both cases (term-document matrix and user-item matrix) we are dealing with high matrix dimensionality which needs to be reduced, but most of information needs to be preserved (in best way possible). Obviously we need to have some sort of matrix operation which will provide lower dimension matrix with important information preserved. For example, large dimensional matrix may be approximated to lower dimensions using Singular Value Decomposition (SVD).

It’s obvious that we need some (java) matrix framework capable of fundamental matrix decompositions. JAMA is a great example of widely used linear algebra package for matrix operations, capable of SVD and other fundamental matrix decompositions (WEKA for example uses JAMA for matrix operations). Operations on highly dimensional matrices always require heavy computation and this requirements produces high HW requirements on any ML production system. This is where Mahout, which features distributed operations on large matrices, should be the production choice for Machine Learning algorithms over frameworks like JAMA, which although great, can not distribute its operations.

In typical recommendation setup users often ‘have’ (used/interacted with) only a few items from the whole item set (item set can be very large) which leads to user-item matrices being sparse matrices. Mahout’s (0.4) distributed Lanczos SVD implementation is particularly useful for finding decompositions of very large sparse matrices.

News and Roadmap

All of the new distributed similarity/recommender implementations we analyzed in previous paragraph were contributed by Sebastian Schelter and as a recognition for this important work he was elected as a new Mahout committer.

The book “Mahout in Action”, published by Manning, has reached 15/16 chapters complete and will soon enter final review.

This is all from us for now.  Any comments/questions/suggestions are more than welcome and until next Mahout digest keep an eye on Mahout’s road map for 0.5 or discussion about what is Mahout missing to become production stabile (1.0) framework.  We’ll see you next month – @sematext.

Hiring Search and Data Analytics Engineers

We are growing and looking for smart people to join us either in an “elastic”, on-demand, per-project, or more permanent role:

Lucene/Solr expert who…

  • Has built non-trivial applications with Lucene or Solr or Elastic Search, knows how to tune them, and can design systems for large volume of data and queries
  • Is familiar with (some of the) internals of Lucene or Solr or Elastic Search, at least on the high level (yeah, a bit of an oxymoron)
  • Has a systems/ops bent or knows how to use performance-related UNIX and JVM tools for analyzing disk IO, CPU, GC, etc.

Data Analytics expert who…

  • Has used or built tools to process and analyze large volumes of data
  • Has experience using HDFS and MapReduce, and have ideally also worked with HBase, or Pig, or Hive, or Cassandra, or Voldemort, or Cascading or…
  • Has experience using Mahout or other similar tools
  • Has interest or background in Statistics, or Machine Learning, or Data Mining, or Text Analytics or…
  • Has interest in growing into a Lead role for the Data Analytics team

We like to dream that we can find a person who gets both Search and Data Analytics, and ideally wants or knows how to marry them.

Ideal candidates also have the ability to:

  • Write articles on interesting technical topics (that may or may not relate to Lucene/Solr) on Sematext Blog or elsewhere
  • Create and give technical talks/presentations (at conferences, local user groups, etc.)

Additional personal and professional traits we really like:

  • Proactive and analytical: takes initiative, doesn’t wait to be asked or told what to do and how to do it
  • Self-improving and motivated: acquires new knowledge and skills, reads books, follows relevant projects, keeps up with changes in the industry…
  • Self-managing and organized: knows how to parcel work into digestible tasks, organizes them into Sprints, updates and closes them, keeps team members in the loop…
  • Realistic: good estimator of time and effort (i.e. knows how to multiply by 2)
  • Active in OSS projects: participates in open source community (e.g. mailing list participation, patch contribution…) or at least keeps up with relevant projects via mailing list or some other means
  • Follows good development practices: from code style to code design to architecture
  • Productive, gets stuff done: minimal philosophizing and over-designing

Here are some of the Search things we do (i.e. that you will do if you join us):

  • Work with external clients on their Lucene/Solr projects.  This may involve anything from performance troubleshooting to development of custom components, to designing highly scalable, high performance, fault-tolerant architectures.  See our services page for common requests.
  • Provide Lucene/Solr technical support to our tech support customers
  • Work on search-related products and services

A few words about us:

We work with search and big data (Lucene, Solr, Nutch, Hadoop, MapReduce, HBase, etc.) on a daily basis.  Our projects with external clients range from 1 week to several months.  Some clients are small startups, some are large international organizations.  Some are top secret.  New customers knock on our door regularly and this keeps us busy at pretty much all times.  When we are not busy with clients we work on our products.  We run search-lucene.com and search-hadoop.com.  We participate in open-source projects and publish monthly Digest posts that cover Lucene, Solr, Nutch, Mahout, Hadoop, and HBase.  We don’t write huge spec docs, we work in sprints, we multitask, and try our best to be agile. We send people to conferences, trainings (Hadoop, HBase, Cassandra), and certifications (2 of our team members are Cloudera Certified Hadoop Developers).

We are a small and mostly office-free, highly distributed team that communicates via email, Skype voice/IM, BaseCamp.  Some of our developers are in Eastern Europe, so we are especially open to new team members being in that area, but we are also interested in good people world-wide, from South America to Far East.

Interested? Please send your resume to jobs @ sematext.com.

HBase Case-Study: Using HBaseTestingUtility for Local Testing & Development

Motivation

As HBase becomes more mature there’s is a growing demand for tools and methods for making development process easier – here at Sematext (@sematext) we’ve gone through our own per aspera ad astra learning process in addition to Cloudera’s Hadoop trainings and certifications. In this post we share what we’ve learned and show how one can HBaseTestingUtility for this.

Suppose there is a system that deals with processing data stored in HBase and displaying stored data via reporting application. Data processing is done using Hadoop MapReduce jobs. During development, it would be desirable to be able to:

  • debug MapReduce jobs in an IDE
  • run reporting application locally (on developer’s machine, without setting up a cluster) with possibility of debugging in IDE
  • easily access data stored in HBase for debugging purposes (easily means “naturally” as if all rows are in a text file)

Disclaimer

Described use-case and solution are just one option, an option that makes use of HbaseTestingUtility and underlying “mini” clusters. Depending on the context, this solution might not be the most optimal, but it is a good fit for presenting the ideas. This solution and this post should encourage developers to look at HBase’s unit-test sources when constructing their own tests and/or when finding ways for easier debugging & development.

Problem Details

In our example there are two tables in HBase: one with raw data and another with processed data.  Let’s call them RawDataTable and ProcessedDataTable. We import data into RawDataTable via simple importing MapReduce job which initally takes data from a log file. Subsequently, another MapReduce job processes data in that table and stores the outcome into ProcessedDataTable. We use HBase Scan and Get operations to access the processed data from the client.

Solution

As stated in javadocs, HBaseTestingUtility is a “facility for testing HBase”. Its description comes with a bit more of explanation: “Create an instance and keep it around doing HBase testing. This class is meant to be your one-stop shop for anything you mind need testing. Manages one cluster at a time only.” In this post we describe one possible way of how to use it to achieve the goals described above.

Processing Data

Step 1: Init cluster.

The following code starts “local” cluster and creates two tables:

private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
private HTable rawDataTable;
private HTable processedDataTable;
…
void initCluster() throws Exception {
  testUtil.getConfiguration().addResource("hbase-site-local.xml");
  testUtil.getConfiguration().reloadConfiguration();
  // start mini hbase cluster
  testUtil.startMiniCluster(1);
  // create tables
  rawDataTable = testUtil.createTable(RAW_TABLE_NAME, RAW_TABLE_COLUMN_FAMILIES);
  processedDataTable = testUtil.createTable(PROCESSED_TABLE_NAME, PROCESSED_TABLE_COLUMN_FAMILIES);
  // start MR cluster
  testUtil.startMiniMapReduceCluster();
}

testUtil.startMiniCluster(1) means start cluster with 1 datanode and 1 regionserver. You can start cluster with greater number of servers for test purposes.

Step 2: Import Data

We use simple map-only job for import data. Please refer to org.apache.hadoop.hbase.mapreduce.ImportTsv class for an example of such a job. The following code runs the job that uses locally stored files (e.g. a part of the log file of reasonable size) on just created cluster:

String[] importJobArgs = new String[] {RAW_TABLE_NAME, "file://" + inputFile};
if (!MyImportJob.createSubmittableJob(testUtil.getConfiguration(), importJobArgs).waitForCompletion(true)) {
  System.exit(1);
}

Step 3: Process Data

To process data in RawDataTable we run an appropriate MapReduce job in the same way as during the import:

if (!ProcessLogsJob.createSubmittableJob(testUtil.getConfiguration(), processLogsJobArgs).waitForCompletion(true)) {
  System.exit(1);
}

Step 4: Persist Processed Data

Since we need processed data during our reporting application development and debugging we persist it in some local file. In order to have “easy” access to this data during debugging it makes sense to store table data in a text file in a readable form (so that we could perform “grep” and other handy commands). So we actually write to two files at once. The Result class implements Writable interface, so there is a natural way to serialize its data.

BufferedWriter bw = ...;
DataOutputStream dos = ...;
ResultScanner rs = processedDataTable.getScanner(new Scan());
Result next = rs.next();
while (next != null) {
  next.write(dos);
  bw.write(getHumanReadableString(next));
  bw.newLine();
  next = rs.next();
}

After this step, the processed data is stored on the local disk and can be used for running the reporting application. Importing and processing of data is performed locally and is thus easier to debug.
In order to add extra processed data incrementally to the already stored data, instead of rewriting it from scratch, we need to load it from the file after cluster initialization as described in the following section.

Fetching Data

In order to make reporting application run on “local” cluster instead of the “true” one, we create an alternative HTable factory. Reporting application code uses a single HTable object instantiated by the factory during its whole lifecycle – this is the best practice for minimizing creation of HTable objects.

Step 1: Init cluster.

This step is exactly the same as described previously.

Step 2: Load processed data.

We use a file created during processing data stage to load the data back into just initialized cluster:

DataInputStream dis = ...;
Result next = new Result();
next.readFields(dis);
while (next.getRow() != null) {
  Put put = new Put(next.getRow());
  for (KeyValue kv : next.raw()) {
    put.add(kv);
  }
  processedDataTable.put(put);
  next = new Result();
  try {
    next.readFields(dos);
  } catch (EOFException e) {
    // file went to an end.
    break;
  }
}

After data is all loaded, the constructed processedDataTable can be used by the reporting application code. The app can now also be started and debugged easily from an IDE.

Next Steps

Internally HBaseTestingUtility makes use of a whole bunch of “mini” clusters: MiniZooKeeperCluster, MiniDFSCluster, MiniHBaseCluster and MiniMRCluster. Refer to the unit-test implementations in the source code of respective projects to get more examples on how to use them.

Thank you for reading, we hope you found this useful.  Follow @sematext on Twitter to be notified of new posts on Hadoop, HBase, Lucene, Solr, Mahout, and other related topics.

Introducing Cloud MapReduce

The following post is the introduction to Cloud MapReduce (CMR) written by Huan Liu, CMR’s main author and the Research Manager at Accenture Technology Labs.

MapReduce is a programming model (borrowed from functional programming languages) and its associated implementation, and it was first proposed by Google in 2003 to cope with the challenge of processing an exponentially growing amount of data. In the same year the technology was invented, Google’s production index system was converted to MapReduce. Since then, it is quickly proven to be applicable to a wide range of problems. For example, there are roughly 10,000 MapReduce programs written in Google by June 2007, and there are 2,217,000 MapReduce job runs in the month of September 2007. MapReduce also has found wide application outside of the Google environment.

Cloud MapReduce is another implementation of the MapReduce programming model. Back in late 2008, we saw the emergence of a cloud Operating System (OS) — a set of software managing a large cloud infrastructure rather than an individual PC. We asked ourselves the following questions: what if we build systems on top of a cloud OS instead of directly on bare metal? Can we dramatically simplify system design? We thought we will try implementing MapReduce as a proof of concept; thus, the Cloud MapReduce project was born. At the time, Amazon was the only one that has a “complete” cloud OS, so we built on top of it. In the course of the project, we encountered a lot of problems working with the Amazon cloud OS, most could be attributed to the weaker consistent model it presents. Fortunately, we were able to work through all issues and successfully built MapReduce on top of the Amazon cloud OS. The end result surprises us somewhat. We are not only able to have a simpler design, but we are also able to make it more scalable, more fault tolerant and faster than Hadoop.

Why Cloud MapReduce

There are already several MapReduce implementations, including Hadoop, which is already widely used. So the natural question to ask is: why another implementation? The answer is that all previous implementations essentially copy what Google have described in their original MapReduce paper, but we need to explore alternative implementation approaches for the following reasons:

1) Patent risk. MapReduce is a core technology in Google. By using MapReduce, Google engineers are able to focus on their core algorithms, rather than being bogged down by parallelization details. As a result, MapReduce greatly increases their productivity. It is no surprise that Google would patent such a technology to maintain its competitive advantage. The MapReduce patent covers the Google implementation as described in its paper. Since CMR only implements the programming model, but has a totally different architecture and implementation, it poses a minimal risk w.r.t. the MapReduce patent. This is particularly important for enterprise customers who are concerned about potential risks.

2) Architectural exploration. Google only described one implementation of the MapReduce programming model in its paper. Is it the best one? What are the tradeoffs if using a different one? CMR is the first to explore a completely different architecture. In the following, I will describe what is unique about CMR’s architecture.

Architectural principle and advantages

CMR advocates component decoupling: separate out common components as independent cloud services. If we can separate out a common component as a stand-alone cloud service, the component not only can be leveraged for other systems, but it can also evolve independently. As we have seen in other contexts (e.g., SOA, virtualization), decoupling enables faster innovation.

CMR currently uses existing components offered by Amazon, including Amazon S3, SimpleDB, and SQS. By leveraging the concept of component decoupling, CMR achieves a couple of key advantages.

- A fully distributed architecture. Since each component is a smaller project, it is easier to build it as a fully distributed system. Amazon has done it for all its services (S3, SimpleDB, SQS). Building on what Amazon has done, we are able to build a fully distributed MapReduce implementation with only 3,000 lines of code. A fully distributed architecture has several advantages over a master/slave architecture. First, it is more fault tolerant. Many enterprise customers are not willing to adopt something with a single point of failure, especially for their mission critical data. Second, it is more scalable. In one comparison study, we are able to stress the master node in Hadoop so much that CMR has a 60x performance advantage.

- More efficient data shuffling between Map and Reduce. CMR uses queue as the intermediate point between Map and Reduce, which enables Map to “push” data to Reduce (rather than Reduce “pulls” data from Map). This design is similar to what is used in parallel databases, so it inherits the benefits of efficient data transfer as a result of pipelining. However, unlike in parallel databases, by using tagging, filtering and a commit mechanism, CMR still maintains the fine grain fault tolerance property offered by other MapReduce implementations. The majority of CMR’s performance gain (aside from the 60x gain which is from stressing the master node) comes from this optimization.

CMR is particularly attractive in a cloud environment due to its native integration with the cloud. Hadoop, on the other hand, is designed for a static environment inside an enterprise. If run in a cloud, Hadoop introduces additional overhead. For example, after launching a cluster of virtual machines, Amazon’s Elastic MapReduce (pay-per-use Hadoop) has to configure and setup Hadoop on the cluster, then copy data from S3 to the Hadoop file system before it can start data processing. At the end, it also has to copy results back to S3. All these steps are additional overheads that CMR does not incur, because CMR starts processing right away on S3 data directly, no cluster configuration and data copying are necessary.

CMR’s vision

Although CMR currently only runs on Amazon components, we envision that it will support a wide range of components in the future, including other clouds, such as Microsoft Windows Azure. There are a number of very interesting open source projects already, such as jclouds, libcloud, deltacloud and Dasein, that are building a layer of abstraction on top of various cloud services to hide their differences. These middleware would make it much easier for CMR to support a large number of cloud components.

At the same time, we are also looking at how to build these components and deploy them locally inside an enterprise. Although several products, such as vCloud and Eucalyptus, provide cloud services inside an enterprise, their current version is limited to the compute capability. There are other cloud services, such as storage and queue, that an enterprise has to deploy to provide a full cloud capability to its internal customers. At Accenture Technology Labs, we are helping to address some pieces of the puzzle. For example, we have started a research project to design a fully distributed and scalable queue service, which is similar to SQS in functionality, but exploring a different tradeoff point.

Synergy with Hadoop

Although, on the surface, CMR may seem to compete with Hadoop, there are actually quite a bit of synergies between the two projects. First, they are both moving to the vision of component decoupling. In the recent 0.20.1 release of Hadoop, the HDFS file system is separated out as an independent component. This makes a lot of sense because HDFS is useful as a stand alone component to store large data sets, even if the users are not interested in MapReduce at all. Second, there are lessons to be learned from each project. For example, CMR points the way on how to “push” data from Map to Reduce to streamline data transfer without sacrificing fine-grain fault tolerance. Similarly, Hadoop supports rich data types beyond simple strings, which is something that CMR will for sure inherit in the near future.

Hopefully, by now, I have convinced you that CMR is something that is at least worth a look. In the next post (coming in two weeks), I will follow up with a practical post showing step by step how to write a CMR application and how to run it. The current thinking is that I will demonstrate how to perform certain data analytics with data from search-hadoop.com, but I am open to suggestions. If you have suggestions, I would appreciate if you could post them in comments below.

Follow

Get every new post delivered to your Inbox.

Join 599 other followers