HDFS storage is cheap—about 1% of the cost of storage on a data appliance such as Teradata. It takes some doing to use up all the disk space on even a small cluster of say, 30 nodes. Such a cluster may have anywhere from 12 to 24 TB per node, so a cluster of that size has from 720 to 1440 TB of storage space. If there’s no shortage of space, why bother wasting cycles on compression and decompression?
With Hadoop, that’s the wrong way to look at it because saving space is not the main reason to use compression in Hadoop clusters—minimizing disk and network I/O is usually more important. In a fully-used cluster, MapReduce and Tez, which do most of the work, tend to saturate disk-I/O capacity, while jobs that transform bulk data, such as ETL or sorting, can easily consume all available network I/O.
Which resource is swamped first depends on the kind of job and the characteristics of the cluster, but usually some kind of I/O is exhausted before CPU. The larger the cluster, the more network bandwidth limitations come into play because the network capacity does not automatically grow hand in hand with cluster size the way that disk-I/O capacity and CPU do.
- Disks are engaged for reading the input data, for scratch-writes during processing, and for writing the output.
- The network is heavily engaged when moving data from the map stage to the reduce stage (the shuffle sort) and in replicating the final output.
- Both resources are heavily used for ingesting data.
Where to Use Compression
Input: data at rest compressed so that it can be pulled from the disk more quickly.
Output: data should be written in compressed form for three reasons:
- It saves disk I/O bandwidth at the moment it is needed most—when the cluster is busy.
- The network overhead of replication. Every block written results in two network hops, one inside the rack via the ToR switch and the other, unless it’s a tiny cluster, over the main network to a different rack.
- Because the data is going to be read again.
Intermediate data: whenever the output is large or if for some other reason a lot of data is moved from mapper to reducer, or if output and intermediate data is moderate but the network is heavily used by other processing. Default to compressing intermediate data because it does no harm if the output is small, and much good if the output is large.
One special case that makes disk compression valuable is recovery from hardware failures. When a disk, data-node, or rack fails, the NameNode will soon notice and start recovery. This means that every block that the NameNode knows was on the failed device will be placed on a queue to be replicated to a third, or is some cases, to a second and third, location. All of these replications will be over the network, and there may be hundreds of thousands of large files involved.
Compression Doesn’t Always Help
There are several things that can go wrong if you take the above good advice.
Compressibility varies. Some kinds of data don’t compress much and are just not worth the cycles. Byte-packed genome data, media files, and pseudo-random data all have high entropy, i.e., randomness, and compress poorly. Some data will actually expand when squeezed.
If CPU is already the bottleneck, compression may add to the problem, but don’t just turn it all off because the opportunities are specific to load at different points in the processing:
- A CPU-bound map stage makes compression of input less valuable.
- Large output (sorts, ETL, etc.,) makes compression of the intermediate data more valuable.
- A CPU-bound reduce stage can make intermediate compression less advantageous, or it may not. With very large computations, the additional load on the network may end up slowing processing down further.
Splitting and Compression
Compression at the file level has some quirks. Let’s look at what happens when you store a file from your local file system into HDFS (as opposed to writing data to HDFS from within Hive or other Hadoop.)
When you move a large delimited text file in into HDFS with the hdfs dfs -put command, a subsequent hdfs dfs -ls will show the size of the file you stored, but in the underlying physical file systems, it is actually chopped into blocks of the default size, and each block is replicated to three different machines. For instance, when I save a 1.3 GB file called myfile.txt in directory /user/peter with
hdfs dfs -put myfile.txt /user/peter/
and then list it with
hdfs dfs -ls /user/peter/
I will see the result
-rw-r–r– 1 hdfs hdfs 1342177280 2015-10-03 22:16 /user/peter/myfile.txt
In the physical file system, (which one would not normally have any reason to look at) what you would see is multiple files with names like blk_1073742155 and blk_1073742155.meta. The ones without the .meta are the data. If I list one with:
ls -l /hadoop/hdfs/data/current/BP-1593317786-10.0.2.151428328100292/current/finalized/subdir0/subdir1/blk_1073742155
I see that it contains 134,217,728 bytes, or 128MB, which is not coincidentally the value dfs.blocksize=134217728 in the file hdfs-site.xml. (You don’t have to locate and open hdfs-site.xml manually—you can see the configuration values and the names of the files they are stored in through the Ambari HDFS screen’s config tab.)
The original file was 1.3GB, so it broke into 11 pieces, because 1.3GB/128MB is 10 plus a remainder that goes into an undersized file. There is no problem here, because this is a text file, and HDFS knows to break it at the end of any line.
Incidentally, it’s easy to poke around the internals of HDFS storage. On a normal deployment you may have a hard time finding the block files resulting from an hdfs dfs -put, but it’s easy on a single node deployment (e.g. the Hortonworks Sandbox). You can use the Linux find command to explore how they are stored. I used find /hadoop/hdfs/data and find /hadoop/hdfs/data | xargs ls -l. If your cluster stores the blocks elsewhere you can find the location in the Ambari HDFS screen’s config tab under dfs.datanode.data.dir.
But what happens if you try this with a compressed file? Actually, HDFS does more or less the same thing that it would do with a delimited text file. If the compressed file is no larger than the dfs.blocksize, it is saved as a single block and replicated as needed. If it is larger than the block size, it will be broken into pieces just like a text file, and the pieces replicated around the cluster. Try it on the sandbox, where you can find all the blocks—the sizes will add up to exactly the size of the file as given by HDFS.
The catch is that compression algorithms come in two flavors: splittable and non-splittable. Files generated by first kind of algorithm cannot be used in pieces, but must be decompressed whole. The second kind of algorithm inserts markers in the compressed output that indicate places where HDFS can split the files and still leave well-formed pieces. They also distribute the table of information necessary to decompress. These chunks can be used singly.
If you use an un-splittable algorithm, and HDFS split the file because it was bigger than dfs.blocksize, Hadoop must send all the component blocks to the same mapper to be reassembled in the correct order for decompression, thereby losing the advantages of both parallel processing and data locality, as well as generating a lot of unnecessary network traffic in the process. It works, but it’s very inefficient.
Note that non-splittable formats present this problem only when they are used at the file level. Sequence files and similar formats that compress data record-by-record can use any compression algorithm with impunity because the format separates the individually compressed records.
The available compression formats for Hadoop are:
- Non-splittable formats
- Zlib High compression, modest CPU
- Gzip High compression, modest CPU
- LZ4 Lower compression, very low CPU
- Snappy Very low compression, lowest CPU
- Splittable formats
- Bzip2 Highest compression, highest CPU
- LZO* Lower compression, very lowCPU. You must build an index with a separate M/R step.
LZO is supported, but cannot be supplied with Hortonworks for legal reasons—you have to download it separately.
There are a number of workarounds. The simplest is to ensure that the input files are all smaller that your block size, which is often possible if you control the software generating the data. For instance, loggers usually have a rollover-size. Another is to use BZip2, or use LZO with the appropriate indexing step. Beware that BZip2 has the most effective compression and is splittable, but it is very slow. One of the best solutions is discussed in detail below: use ORC for input and output.
If your data is columnar in nature, then Optimized Row Column, i.e., ORC format should be used whenever possible. In modern Hive applications, storage and manipulation of flat data in the form of character-delimited tables should be avoided except for the initial input. The reasons for this that are outlined below all relate to compression, but ORC also enables a number of other optimizations that do not relate to compression. Definitely make ORC your default choice.
Inherently hierarchical formats such as JSON often encode data that is basically a flat table except for the boilerplate—consider converting these to ORC too. In fact, even if JSON or XML is hierarchical, consider flattening it into ORC even if the hierarchical structure requires an extra table or two because the rewards are large. Hive’s ability to handle lists and maps can often be used to make flattening a second level of hierarchy relatively painless and easily handled in a single table.
The ORC Reader and Writer use several strategies to achieve high compression and decompression rates and to intelligently minimize the amount of data that needs to be decompressed.
- Compression-related optimizations include:
- ORC is aware of the data types of the individual columns.
- You tell ORC this information in the Hive table definitions.
- The Writer applies type-specific compression methods individual columns.
- An overall compression algorithm can be selected for the ORC split as a whole, either Snappy, ZLib, or none.
- Partitioning compresses data because the partitioning field is removed from the file.
- Partitioning field values are expressed as directory names.
- The directory name is the column value for all rows stored under that directory
- Data that does not compress adequately is detected and transparently left uncompressed.
- ORC is aware of the data types of the individual columns.
- Decompression-related optimizations include:
- Only the columns your query actually uses are decompressed—the rest are ignored.
- Predicate pushdown in the Reader:
- Uses indexes and aggregations that are recorded when data is stored, as well as other tricks, to ignore entire blocks of 10K rows if it can determine that the data cannot be relevant to your query.
- Ignores whole partitions if they cannot contain relevant data.
The net effect of all this is that ORC
- Stores data significantly more efficiently than purely file-level compression methods.
- Omits much unnecessary decompression (and in some cases even file reading).
Compression is your friend in Hadoop, minimizing disk and network bottlenecks and reducing the impact of hardware failures. Make it your default choice at all three stages of processing. Anywhere it’s feasible, convert table-like data to ORC, which provides higher compression and reduced decompression, and can speed up your processing by a large multiple.