Deerwalk Blog

Bulk importing Data into HBase

Posted by Sumit Shrestha on July 26, 2011

HBase is a NoSQL cluster system that has a different schema than relational databases, such as MySQL. One of our new products will need to load healthcare data in a relational format (MySQL) into HBase. Since HBase is an evolving technology, various methods exist for loading data from relational format into non-relational format. One such method is a tool called Scoop that automatically imports data between these formats, given schema of both source and destination. We did some in depth research on this tool and concluded it could work well if you have relational data. In addition to this tool Map/Reduce allows various other options including a bulk import method. Bulk Importing bypasses the HBase API and writes contents, which are properly formatted as HBase data files - HFiles, directly to the file system. For the purposes of this article we will assume a level of knowledge about Hadoop/HBase technology and its setup steps. In addition, the reader is expected to know the Map/Reduce framework in detail, and so must have, at one point, run a map/reduce program in a multi-node (or at least in pseudo-distributed) cluster.

There are two steps involved in Bulk Import

  • Preparing Data (HFiles) using Map/Reduce
  • Importing Prepared Data into HBase table

Preparing Data (HFiles) using Map/Reduce

The main purpose of using bulk import is to create HFiles, which directly map to HBase data files. The HFiles are created using HFileOutputFormat class. The Map function maps the input data as usual. So, the input format for mapper can be chosen by the user. In our case, we used TextInputFormat which simply inputs one line of text as one set of data. The mapper must output either Put or KeyValue and in this case we have chosen a KeyValue which is an HBase Key/Value pair. In addition the map class has an output key class that can be user defined, and in our case it was: ImmutableBytesWritable. Here is a snippet of our Map class

public static class Map extends Mapper {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
ImmutableBytesWritable r = …;
KeyValue kv = …;
context.write(r, kv);

The configuration for Reduce is somewhat more difficult to follow. There is only one method: HFileOutputFormat.configureIncrementalLoad. This method does all of the configuring for the reducer and its output format, depending on mapper and table configurations. A look at the code is recommended to understand the complete mechanics. One problem we found was that it uses its own Reduce class, to ensure total ordering; replacing any reducer class that you may have previously set. The reducer class is dependent on the Mapper’s output value class that you have set. If it is KeyValue, then the equivalent reducer class is KeyValueSortReducer is used otherwise if it is Put Class, then the equivalent PutSortReducer class is used. In addition the configureIncrementLoad method also sets a practitioner class, TotalOrderPartitioner, to ensure total order partitioning. HFileOutputFormat must be configured such that each output HFile is matched to a single region. Jobs use the TotalOrderPartitioner class to partition the map output into ranges of the key space, which correspond to the key ranges of the regions in the table. The output for the reducer is specified as KeyValue (but not Put as it was for mapper), and the output format class is set to HFileOutputFormat. The number of reduce tasks is set according to the number of regions in the table where data must be loaded. The optimal number of regions for a multi-node cluster is 1.75 * number of slave nodes * number of reduce task per node (specified in hdfs-site.xml). We have found that for our test system 63 regions has proven to perform well; this will obviously require tuning as we move to a production environment. If the table has no region specified then only a single reduce task is run which will significantly slow the import of data. We have found new tables that it is best to pre-load our 63 regions into the table to allow for an optimal number of reducers. The code for overall map/reduce configuration is as follows:

Job job = new Job(config);
job.setJobName("Member Claims Schema type 2 Load Job Optimisation 1");
HFileOutputFormat.configureIncrementalLoad(job, table); // table where data to be imported which must exist in HBase and need to be passed as parameter here
FileInputFormat.setInputPaths(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));

Because of the requirements of configureIncrementalLoad we do not need to build our own reducer but in some cases we need to have a reducers to process a grouped task, such as serially loading all claims of single member so that they can later be read using a range scan. Additionally the HFileOutputFormat class provided by the implementation of HBase we use, Cloudera CDH3 does not support loading multiple families. We have found that version 0.92 has a fix for it ( and So, instead of using HFileOutputFormat and KeyValueSortReducer classes from library, we have patched our application to use the new implementations of HFileOutputFormat and KeyValueSortReducer. The default code of KeyValueSortReducer class simply writes, to context, whatever keyvalues it gets one by one, in sorted order. The row is the same as the one that is passed by Mapper when it writes into context. Every KeyValue object unambiguously specifies one value in the HBase table. It has: row, column family, qualifier (column name) and finally timestamp; these are all necessary dimensions for a value in HBase table.

Consider a table MemberDemographicsClaim with the following design: memberid as row id, demographics as first column family: with two columns name and address, and, finally, claim as second column family with each next column corresponding to claim for that member. Suppose, a member with id XXX having two claims (A and B) and given demographics (name=N, address=MMM) is to be inserted. In the reduce method, you will create four objects for each value like this:

RowId Column Family Column Timestamp Value
XXX demographics name Not specified N
XXX demographics address Not specified MMM
XXX claim c1 Not specified A
XXX claim c2 Not specified B

Each of the above objects have to be written into context so that the system automatically orders them into the same row in the HFile. The KeyValueSortReducer class looks like this

public class KeyValueSortReducer extends Reducer {
protected void reduce(ImmutableBytesWritable row, java.lang.Iterable kvs,
Reducer .Context context)
throws, InterruptedException {
// for each key value k, context.write(row, k);

Here, kvs is list of keyvalue's corresponding for a single row that the mapper outputted earlier. The default KeyValueSortReducer reduce method simply writes this into context. If your mapper outputs data according to the way you want data to be written into hbase, then the default KeyValueSortReducer reduce method will work well. However, there can be instances when you want to override the default and apply custom logic. We encountered many cases in which this was necessary. Suppose we are writing a memberclaim index table (which has a column family with claim attributes as columns) so that all claims of a member come serially in row, and range scan can be applied. We configure the mapper to output member id as row id so all claims rows for the same member are grouped into the reducer for writing. At KeyValueSortReducer reduce method, we now write memberid + claimid as row id into context for index table. Even though mapper outputted keyvalues with memberid as row id, we have memberid + claimid as row id in index table by changing the default reduce method of KeyValueSortReducer.

A frequently occurring problem while reducing is lexical ordering. It happens when keyvalue list to be outputted from reducer is not sorted. One example is when qualifier names for a single row are not written in lexically increasing order. Another being when multiple rows are written in same reduce method and row id's are not written in lexically increasing order. It happens because reducer output is never sorted. All sorting occurs on keyvalue outputted by mapper and before it enters reduce method. So, it tries to add keyvalue's outputted from reduce method in incremental fashion assuming that it is presorted. So, before keyvalue's are written into context, they must be added into sorting list like TreeSet or HashSet with KeyValue.COMPARATOR as comparator and then writing them in order specified by sorted list.

Importing Prepared Data into an HBase table

There are two methods in which Hfiles in HDFS can be bulk imported into an Hbase table. The first one is a command line tool called completebulkload, which provides a simpler approach but has fewer options. The second approach is programmatic and is suitable for data prepared using Map/Reduce methods. It uses the LoadIncrementalHFiles.doBulkLoad method for loading HFiles in HDFS (output of data preparation after successfully running map/reduce). This step is very fast (30 GB of data imported in less than minute) because data prepared is directly mapped on the HBase table regions without actually loading.


Bulk import is a powerful option for importing data into HBase. It can significantly reduce processing time in various ways. We have found Bulk import to be significantly faster than other methods. It is becoming one of the most widely used methods for data import in the industry. Facebook, for example, uses it for importing trillions of message data into HBase. Here are few metrics obtained on bulk loading claims data worth about 35million rows.

Number of Regions on Claims Table 1 63
Space taken by HFile in HDFS 32068332831 Bytes == 29.865962296 GB 32167007358 Bytes == 29.95786011 GB
Total Map/Reduce running Time 4hrs, 20mins, 37sec 1hrs, 39mins, 20sec
Directory Count 22 22
File Count 155 706
Incremental Bulk Load Time 66091 milliseconds 231875 milliseconds

Subscribe to Blog Updates

Posts by Topic

see all