Thursday, April 25, 2013

Hadoop Writing Bytes

There are times where you might want to write bytes directly to HDFS.  Maybe you're writing binary data.  Maybe you're writing data with varying encodings.  In our case, we were doing both (depending on profile) and were trying to use MultipleOutputs to do so.  We discovered that there was no built-in OutputFormat that supported bytes, nor was there any examples on the web of how to do this with the new API. Granted, it's not overly complicated, but to save you a little time, here's what I came up with.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.DataOutputStream;
import java.io.IOException;


public class BytesValueOutputFormat extends FileOutputFormat<NullWritable, BytesWritable> {

    @Override
    public RecordWriter<NullWritable, BytesWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        Configuration conf = taskAttemptContext.getConfiguration();
        boolean isCompressed = getCompressOutput(taskAttemptContext);
        CompressionCodec codec = null;
        String extension = "";
        if (isCompressed) {
            Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(taskAttemptContext, GzipCodec.class);
            codec = ReflectionUtils.newInstance(codecClass, conf);
            extension = codec.getDefaultExtension();
        }
        Path file = getDefaultWorkFile(taskAttemptContext, extension);
        FileSystem fs = file.getFileSystem(conf);
        if (!isCompressed) {
            FSDataOutputStream fileOut = fs.create(file, false);
            return new ByteRecordWriter(fileOut);
        } else {
            FSDataOutputStream fileOut = fs.create(file, false);
            return new ByteRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)));
        }
    }

    protected static class ByteRecordWriter extends RecordWriter<NullWritable, BytesWritable> {
        private DataOutputStream out;

        public ByteRecordWriter(DataOutputStream out) {
            this.out = out;
        }

        @Override
        public void write(NullWritable key, BytesWritable value) throws IOException {
            boolean nullValue = value == null;
            if (!nullValue) {
                out.write(value.getBytes(), 0, value.getLength());
            }
        }

        @Override
        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            out.close();
        }
    }

}

Here's an example usage with MultipleOutputs
Instantiation
MultipleOutputs<NullWritable, BytesWritable> multipleOutputs = new MultipleOutputs<NullWritable, BytesWritable>(context);
Writing
byte[] bytesToWrite = someAppLogic();
MultipleOutputs.write(NullWritable.get(), new BytesWritable(bytesToWrite), fileName);

And of course, since it's like any other OutputFormat, it can also work with LazyOutputFormat if desired (as well as just about anything else you might choose to do with an OutputFormat).
LazyOutputFormat.setOutputFormatClass(job, BytesValueOutputFormat.class);

In our case, this the last step in our sequence of Hadoop jobs so we had no further need for the key. One could conceive of situations in which further manipulation is needed. In such cases, you could attempt some sort of delimited binary (to separate the key from the value), but it might be easier to just keep it all as text and use Common Codec's Base64 to pass the bytes value between jobs.