This is part two of an extended article. See part one here.
A full listing of Hive best practices and optimization would fill a book. All we’ll do here is skim over the topics that best indicate the spirit of Hive, and how it is used most successfully. There’s plenty of detail available in the documentation and on the Web at large. Hopefully, these quick run-downs will provide enough background and keywords for a rewarding Google search.
One of the most important things that YARN made possible is the processing model known as Tez. While Tez is technically a client-side library, it functions much like MapReduce as the “engine” under Hive and Pig.
To understand what Tez does, first consider the stereotyped processing sequence of a MapReduce job. Mapper processes read raw data from HDFS, and process it into key-value pairs. The key-value pairs are stored temporarily on local disk, grouped by key, and are then shipped over the network to the data nodes that host the reducers. There, they are again stored on local disk pending the arrival on each reducer of all keys of a given value. The entire set of values for a given key are then sorted and read into a single reducer, processed, and written out to HDFS. HDFS then replicates the data two additional times, once within the same rack, and once to a different rack.
This many-step process may be repeated several times to accomplish a single Hive query, resulting in a large overhead of unnecessary intermediate HDFS writes and other operations, as well as in a separate setup process for each MapReduce. The main reason for all that logically unnecessary work is that the individual MapReduce jobs don’t know about each other, and can’t cooperate.
Hive over Tez doesn’t break a job up into distinct MapReduce jobs. Because Tez processing is orchestrated on the client side, Tez executions are not limited to repeated MapReduce cycles, but have the flexibility to do things such as pipe the output of a reducer into the input of a subsequent mapper without any intervening HDFS activity, or cascade a series of reducers without any intervening mapper steps. The structure of Tez processing is often referred to as a DAG, for Directed Acyclic Graph, which means the graph of processing steps has no loops, and data flow in only one direction. Technically, the MapReduce processing graph is also a DAG, but its form is so stereotyped that it is not usually called such.
Tez also implements a number of other optimization techniques, such as allowing re-use of containers for successive phases of processing, and providing pre-warmed containers so that jobs can start instantly. These and other similar optimization add up to a great improvement in efficiency and a lot of tuning options depending upon optimization goals.
- Always Turn On Tez unless you have an unusual reason why not. The choice of engine is set globally, but can be overridden per-query with set hive.execution.enginer=tez/mr
- Use Pre-warmed Containers: The overhead of starting containers used to be a big part of the latency of smaller jobs. But you can tell Tez to keep some containers started by default. This is more important for clusters supporting interactive queries.
- Keep Containers Alive After Completion: Tez can be instructed to keep containers alive for a while instead of killing them immediately. If the next task is compatible with the container, it will be reused to save startup time. For some workloads this can save a lot of unnecessary container startup time.
Use Compression by Default
Data compression saves disk space, but that is often a secondary reason for using it in the Hive world. The more important reason is that large Hive queries tend to be dominated by either disk I/O or network I/O, and compression means less data to move around.
The most common data formats used in HDFS tend to be very compressible when they are in large chunks, with compression factors between 3X and 8X common for delimited text data, so the savings can be large. The tradeoff, of course, is that there is a CPU cost to compressing and decompressing, so you may find cases when the increased CPU cost is not worth the I/O savings, but in general, compression is your good friend. This is doubly true for operations such as sorts and ETL that move a lot of data end-to-end and then back to HDFS (as opposed to aggregation queries that read a lot of data, but only write a little.)
Any of the following points in the Map/Reduce or Tez workflow can contribute to a disk or network I/O bottleneck.
- Reading data from a local HDFS directory into the mappers: load on local disk
- Reading data from a non-local HDFS directory: load on remote disk, plus load on network
- Moving mapped data to the reducers (i.e., the shuffle-sort phase): load on network plus load on the scratch disks at both ends
- Moving the final mapper or reducer output to HDFS (including replication): load on local and remote HDFS disks plus load on network (both the top-of-rack (ToR) switch and the primary network switches.)
Default to saving data in compressed form at both ends unless you have a compelling reason not to. For the output, remember, it’s not just one write to disk—data is replicated twice more, with two network hops, one within the rack and one to a different rack. In jobs that involves moving a large amount of mapped data to the reducers (for instance sorts) you’ll usually want to compress in the middle step as well to minimize network load.
Because HDFS by default keeps three copies of each block of data scattered around the cluster, when a disk, node, or rack fails, the blocks that were hosted on the failed equipment must be replicated to somewhere else from the other copies. Therefore, compressing data at rest also means less data to copy around the cluster in the event of disk or machine failure.
- When the data backing Hive tables is stored in compressed files in HDFS, it will be automatically and transparently uncompressed by Hive. This works whether you are using Hive “LOAD” or dropping the compressed tables directly into the directories with “hdfs dfs -put my_data.gz /apps/hive/warehouse/mytable.”
- A better way is to use Hive’s “insert” or “insert overwrite” to add to or overwrite a dataset as compressed data. See Compressed Storage. for an example.
- Intermediate compression can be tuned using configuration parameters found on this page.. Search for “compress.”
More about scheduling can be found here in What’s So Important About Compression.
Use the Optimized Row Column Format (ORC)
Before the ORC storage format was available, most Hive data lived on HDFS as delimited files. (Hive prefers non-printing delimiter characters, but comma, pipe, or tab-delimited were, and remain, common.) There were other specialized formats too, such as Sequence Files, but character-delimited files were the most common storage format for Hive data. Delimited text isn’t evil, but it’s the simplest format imaginable. Each block must be decompressed whole, and the entirety of each row must be read, parsed, and marshaled, in order to get access to a single column.
ORC files, together with their specialized Reader, and Writer, enable numerous Hive optimizations including among other things:
- Specialized forms of compression for individual column data types, and higher rates of compression
- The ability to skip processing entire ranges of rows within a block if they are not relevant to a query
- The ability to skip decompression of columns that are not relevant to a query projection
- Reduction of load on the NameNode because of fewer, bigger files.
- Support for reading multiple streams simultaneously from the same file, i.e., enables more parallelism
The ORC storage strategy is both column oriented and row oriented. An ORC format file consist of tabular data that is written in “stripes” of a convenient size to process in a single mapper (256 MB and 512 MB are common sizes.) Stripes may or may not be processed by separate tasks, and a single ORC file can contain many stripes, each of which is independent.
Type metadata from the Hive DDL is propagated into the storage format, which allows ORC readers and writers to do custom serialization and marshaling for columns based on column data type. Because the columns are compressed separately, only the columns that are in the projection for a query need be decompressed, which can greatly reduce not only CPU cost for decompression, but the memory use in the mapper.
Within a stripe, data is divided horizontally into row-groups, typically of 10,000 rows (the number of rows is called the “stride.”) For each row-group, an index contains:
- Statistics and aggregations, including min, max, sum, and whether or not there are any nulls, for every column
- The offset into the stripe where the stream for each column starts.
The ORC Reader uses the information in the index to enable a family of optimizations called “push-down predicates.” This means that Hive passes the where conditions for a query down to the ORC Reader, which can use the predicate to determine whether entire blocks of 10,000 rows are relevant to the query before bothering to unpack the relevant columns from the block. For instance, if the value from the where condition is not within the min/max range for a block, the entire block can be skipped.
Similarly, an “is-null” condition in a where clause can allow an entire block to be skipped if the Reader knows in advance that the column contains no nulls. Sums are another case where the benefits can be large. If the Reader can already tell that all rows in a block meet a where condition, the sum for the block can be returned without ever decompressing any columns. The result is that only a tiny proportion of the data in a stripe—or even no data at all—may end up being decompressed, marshaled and returned to Hive.
ORC features yet another way for the pushdown predicate mechanism to ignore irrelevant data. A Bloom filter is a probabilistic data structure that tells you whether an element is present in a set. So why not just use a Java set? The beauty of Bloom filters is that they do the job with only a very small expenditure of memory. Whereas a set needs to keep a copy of each object in the set, a Bloom filter can do the job with just a few bits.
In an ORC file, the set in question is the collection of distinct values that a column takes within in a row group. If a Bloom filter is specified for a column, even if the min/max values in a row-group’s index say that a give column value is within the range for the row group, the Bloom filter can answer specifically whether the value is actually present. If there is a significant probability that values in a where clause will not be present, this can save a lot of pointless data manipulation.
The catch with Bloom filters is that they will occasionally incorrectly answer that an element is present when it is not. Their saving grace, however, is that they will never tell you that an element is not present if it is. Therefore, in the rare case that a Bloom filter is wrong, the only consequence is that the reader may unnecessarily decompress the row group, and leave it to Hive to discover the same information the hard way. When a value is not present in the filter, checking is very cheap–in a correctly configure filter, on average it takes only two hashes to hit an unset bit.
ORC uses specialized compression algorithms for individual column data types, but also uses an overall compression algorithm for the stripe as a whole. The choices for overall compression algorithm are ZLib, Snappy, and None. ZLib compresses the most, but Snappy is faster. Which to use? It depends—if I/O bandwidth is at a premium and CPU is available in abundance, go for ZLib to minimize data volume. If CPU is tight, you might want to use Snappy. Rarely will you want to use none, which is used primarily for diagnostic purpose.
The ORC Writer is intelligent enough to detect when a given column is not actually made smaller by application of compression, which sometimes happens with high-entropy (i.e., random or pseudo-random) data. When this is the case, the data for that column is written in the original uncompressed form even if compression is specified.
Turn on Vectorized Processing
Vectorized processing is a low-level optimization that employs a combination of custom code (internal Hadoop code—not user code) for commonly used functions, together with compiler and hardware features of modern CPU’s, in order to streamline processing of data in chunks of 1024 rows at a time. Execution is speeded up because within a large block of data of the same type, the compiler can generate code to execute identical functions in a tight loop without going through the long code path that would be required for independent function calls. This SIMD-like behavior gives rise to a host of low-level efficiencies: fewer instructions executed, superior caching behavior, improved pipelining, more favorable TLB behavior, etc.
Not every data type and operation can benefit from vectorization, but most simple operations do, and the net is usually a significant speedup.
This feature should usually be on by default:
- set hive.vectotized.execution.enabled=true
- set hive.vectorized.execution.reduce.enabled=true
- set hive.vectorized.execution.reduce.groupby.enabled=true
The “explain” utility will tell you what parts of the query are subject to vectorizing, and in the rare event that something does not work right, as with most Hive features, it can be turned off/on per-query.
- For predicate-pushdown to be most effective:
- The data should be sorted by the columns that will be the most exclusive as used in the where clauses.
- More than one sort column can be specified. Put the most exclusive column first in the sorted-by statement.
- The more columns to be returned in a query, the more effective predicate pushdown is, because skipping a block eliminates more decompression, marshaling, and data movement operations.
- Choose a compression algorithm with orc.compress, keeping the CPU v I/O load in mind
- Zlib compresses more
- Snappy is faster
- No compression is usually just for diagnostic purposes, but there are special cases.
- Consider Bloom Filters for columns that are important in where clauses, and for which it is probable that a given value will not be found.
- orc.bloom.filter.columns: A comma delimited list of fields to build the filter for.
- orc.bloom.filter.fpp: The probability of a false positive. The cost of the BF increases rapidly as probability goes to zero.
- Vectorized processing requires ORC format.
- Consider converting to ORC format even for one-shot data. ORC is so efficient, particularly when combined with partitioning and/or bucketing, that at times it can actually be faster to convert to ORC and then run a query, than to simply run the query. This is especially true for tables backed by JSON, XML, etc.
Use Partitioning and Bucketing
By default, the files for a given table are stored in a single logical HDFS directory. HDFS has two ways to break up the data structure to facilitate both querying and data management: partitioning and bucketing.
With partitioning, the data is stored in a branching hierarchy of HDFS directories, each of branch of which holds all the rows that contain a particular value in the column you have partitioned by. (Remember that the logical HDFS directories and files are actually physically located across multiple machines.)
Because this system guarantees that the data below a partition directory all has the same value for the partitioning column, the representation of the column is removed from the table data itself and indicated only by the name of the directory. For instance, if your data is often queried by date, you could partition by year, month, and day. These fields would no longer appear in the data, but would become the names of directories—one for each year, within each year, one for each month, and within month, one for each day, 0-31.
Partitioning saves space on disk, but more importantly, it means that when reading data for a Hive query, the ORC reader does not have to traverse any branch of an HDFS directory tree that is disqualified by one of the predicates in the query’s where clause. For data partitioned as above, if the where clause specifies a particular day, month and year, the reader can skip directories for all but one year, and within that year, visit only one month, and under that single month, look only in blocks under the directory for the particular day.
To take advantage of partitioning, the fields in question must have reasonable cardinality, because a directory and at least one file will be created for each. Year, month, and day are reasonable fields to partition by, but customer ID’s, for instance, usually are not.
The HDFS directory structure under a partitioned table can be manipulated within or without Hive. One advantage of not using indexes is that you can simply delete an entire partition when it outlives its usefulness, or move the partition to archive storage without being encumbered by an index.
Note that you may need to re-order columns in order to use partitioning because the partition fields need come last in the inserted data.
Bucketing is somewhat like partitioning, but it works for fields that have high cardinality. As with partitioning, bucketing breaks the data out under multiple directory branches, but unlike partitioning, the data in a bucketed column is not removed from the table rows.Instead, a fixed number of directories are created, and the directory to which a given row is to be written is computed by hashing the field of interest and taking the result modulo the number of buckets.
This technique gives a pseudo-random distribution of the distinct column values to all the buckets, but rows for any given value will always land in the same bucket. As with partitioned data, if the query allows it, Hive can push the predicate to the ORC Reader, which is able to limit the directory tree in which it will look for data to that which is under a single bucket.
“Bucket joins” can take advantage of bucketing if rows are joined on a column that is bucketed in both tables. Some bucket joins require that the number of buckets be identical for the corresponding columns, and others only that the number of buckets in the second table is an integer multiple of the number of buckets in the first. The advantage is that individual mappers will always get rows bucketed by the same column values for both tables—a very large efficiency in many cases.
Buckets can also be used for sampling. Assignment to buckets is via hashing, so the contents of each bucket are pseudo-random with respect to the bucketing column. This means that you can query against one M/Nth of an table if it is bucketed N ways, using table-sampling syntax that is part of Hive. This is a common strategy when analyzing very large data sets.
Details on the creation and manipulation of partitioned and bucketed tables are beyond the scope of this document, but are easily found.
- Choose partitioning fields that will not result in an excessive number of partitions. Year/Month/Day are ok. Timestamps are not.
- Keep in mind the total amount of data expected so as not to end up with a large number of under-size files, which could result in:
- Losing the advantage of large streaming reads
- Result in pointless load on the NameNode to track all the files.
- Choose partitioning fields that
- Appear in the where-clauses of the most important queries.
- Are the most restrictive on what data must be pulled in
- Favor wide partitioning over deep. One or two levels is usually plenty.
- Partitioning is pointless, even counter-productive, for “dimension” tables and other small tables.
- There is nothing to be gained by chopping them into smaller pieces, and more files to open.
- Such tables are used whole in other optimization strategies (e.g. broadcast joins), so chopping them up can actually hurt performance.
- Partitioning is pointless if the partitioning fields do not restrict the data that needs to be read.
- Bucketing and partitioning should be consistent with query types.
- Bucketing works best for when queries either
- Can be restricted by individual values such as customer ID
- Require joins on a shared column that can be bucketed.
- Will be used for sampling
- Partitioning works best when
- A field that is heavily used in where clauses will restrict the amount of data read.
- Data can be managed based on a particular field such as a date, e.g., you may want to archive older data
- Bucketing works best for when queries either
Data Updates Under the Append-Only Model
Since HDFS data is “append-only,” and Hive merely imposes a schema abstraction over the stored HDFS files, it is easy to see how appending to an existing table is not a problem. But how can you apply updates and deletes under this model?
This situation often arises when data from a relational source is imported, for instance, via Sqoop. In a typical use-case, there would be an initial load, followed by a periodic update that includes new rows, updates, and deletes. Short of dumping the entire database again, how can you get an freshened version of a table?
The classic way this was, done is as follows, but new Hive features are making this less important.
- There is an initial load of a large data set (e.g., from Sqoop)
- Periodically, Hive receives a set of deltas (new rows, updates, deletes) from the RDBMS.
- Most databases have a “change data capture” facility that dumps these changes to a file.
- The current dataset is merged with the deltas and the results inserted into a temporary table.
- The current dataset is then dropped and replaced with the contents of the temporary table.
- This process is nicely described here: http://hortonworks.com/blog/four-step-strategy-incremental-updates-hive/.
As of release 1.1, in 2015, Hive has the ability to update and modify a table under ACID semantics. This capability is not intended to give Hive the low-latency transactions an RDBMS, but is intended to be used for periodic batch updates and several other use cases including streaming data ingestion and changes to dimension tables. Once caveat is that as of HDP 2.3.1, it should still be regarded as a work in progress, and to be used in production only for simple cases.
Hive is still based on append-only tables whether you use transactions or not. What this feature does under the covers is a more elaborate version of the manually coded update process describe above. The ACID transaction mechanism uses “delta tables” that record the changes associated with each transaction. The deltas are applied on the fly when data is queried again, and a periodic process fold the the delta files from multiple completed transactions into a single delta file for the table as a whole. A second-level merging process periodically performs an operation similar to the the manual process alluded to above to create new version of the table that incorporates all the preceding updates.
Hive transactions are a tremendous advance to Hive for both the periodic update use case and to handle streaming ingestion gracefully with minimal coding impact. The main caveat is that they should not be abused in an attempt to replicate the transactional capabilities of an RDBMS.
Use Schema-on-Read Before Investing in Fancy ETL
Many computing environments literally have more data than anyone knows what to do with. Data streams in from Web-server logs, customer interactions, machine sources, etc., but is not exploited even though it might have real value, and instead sits unused in log files, or is simply discarded, because it’s too voluminous to store and query in an RDBMS. There are many variations of the problem:
- Data arrives too fast to effectively store in a database in usable form, i.e., normalized, indexed, etc.
- Data is too bulky even to preserve on disk in raw form because of both storage media cost and data management cost.
- There is a chicken-or-egg problem, in that the data is not well enough understood a-priori for analysts to define a data reduction process, schema, etc., that might make it worth investing in a data reduction process and schema to make the data useful.
Hadoop environments tend not to have these problems because:
- Hadoop storage is dirt-cheap compared to storage on a jumbo Oracle system or data appliance—as little as 1% of the cost per GB, even including the HDFS three way replication.
- Neither HDFS nor Hive ingestion inherently require either indexing or any kind of ETL process—you can just drop the data into HDFS at essentially any rate at which you can copy the files in over the network.
The last of these reasons is the more interesting in some ways. Hive is very forgiving about how data is stored because of the principle of “schema on read,” which means you can store data raw, and apply format conversions and a table schema on the fly as the data is queried. This is impossible in a relational environment, but natural in Hive for several reasons.
- Unlike RDBMS tables, Hive tables do not describe how the data is stored–they are just metadata describing how it will be used in queries.
- The absence of indexing means means that a row-by-row ingestion process is not necessary to make data useful.
- Share-nothing parallel processing lets almost any number of processors work without communications bottlenecks or centralized orchestration. This means that there is almost no upper limit on the number disks or CPU’s that Hive can apply to the data.
Because Hive tables are strictly meta-data over what may be a very different representation of the data on disk, any number of tables, including zero, can be described over the same HDFS data. In other words, you can drop the data into HDFS first, and decide what to do with it later.
The abundant CPU and disk I/O bandwidth of Hive gives analysts tremendous freedom to defer expensive decisions on data processing until the data is understood. An extreme example of this is the ability to handle XML and JSON data, which may not even have a logically flat structure. To fully represent a complex hierarchical structure in a set of tables is a truly daunting exercise, but with Hive, you can simply leave the data set in JSON form, and on-the-fly, pull out a various tabular views of aspects you happen to be interested in using the built-in “SerDe” mechanism (see below.)
This kind of flexibility makes it very easy to have a policy of storing all data by default, so that analysts can explore it, but there are still many situations in which raw storage is not appropriate, and an ETL process is appropriate. The key thing about schema-on-read is that it defers the design of the ETL process until after the data is thoroughly understood.
As long as data is in delimited or other common format, an analyst does not have to think much about the details of how Hive figures out what it is looking at. You have to specify what the format is, e.g., comma delimited text, but beyond that, Hive takes care of most of the details. What’s going on underneath is that Hive is calling a data-specific “SerDe,” which stands for “serializer-deserialzer,” to parse the data. When your table create statement specifies “stored as text” you are actually telling it to use a particular SerDe to decipher the data as it is read in.
The same principle can be used tell Hive how to handle special formats by specifying a SerDe explicitly. Using an appropriate SerDe, Hive can parse JSON, XML, or any other custom format. Some of these specialized SerDe’s are shipped with the product (e.g. JSON and XML) and others are available on Github and elsewhere.
If you have your own exotic format—say, some kind of binary data emitted by machines, it is not difficult to write a custom SerDe that will support the data in Hive. A SerDe is just a Java class that conforms to a certain interface. Your custom Java SerDe simply implements the required serialization and deserialization methods. If you are already writing and reading the data without Hadoop, chances are that the essential code already exists, and merely needs to be wrapped appropriately. The Java jar file holding the class can be specified within the DDL for your data table.
Normalize v Don’t Normalize v Denormalize
Unless carried to an extreme, normalization is generally a good and necessary thing in the RDBMS world. Among it’s many benefits in an RMDBS, normalization makes it easier to maintain consistency, reduces locking for updates, reduces the amount of data stored, and reduces total disk I/O.
The cost/benefit balance for normalization is different for Hive, resulting in much less emphasis on normalization, and in some cases, even in deliberate de-normalizing data imported from highly normalized relational sources. Some of the reasons are:
- The use-cases are for Hive are not transactional but analytic. Normalization as a tool for maintaining consistency tends to be less important
- The many-way joins required for normalized data tend to be more expensive for Hive
- Hive’s massive parallelism eliminates many of the disk-I/O limitations of an RDBMS, reducing the value of normalization for reducing data volume.
- Hive is often used with data volumes for which it would impractical to use normalized data.
- It is common to drop anywhere from gigabytes to terabytes of data into HDFS and expect it to be accessible through Hive almost instantly.
- For many use-cases, e.g., log analysis, the cost of normalization and the cost of the joins it implies are high, with little benefit.
- “Save everything by default and let the analysts figure out how to use it.” is a common philosophy in the Hadoop world.
- Hadoop doesn’t require any schema, normalized or not, for ingestion
- Hive tables are metadata, and do have to be isomorphic with the underlying data on disk. Many tables can be defined over a single raw data set.
- If more than one table can be usefully defined over a raw data set, then the underlying data set is almost certainly not normalized anyway
- Many raw data sets cannot be normalized without extreme effort, because they are not relations in the first place (e.g., JSON or XML)
- The savings in data volume that would result from normalization in an RDBMS are are often more than offset by the high compression rates achievable with the large data blocks Hive deals with. Abundant CPU and absence of central processing bottleneck makes compression and decompression relatively cheap.
Many Ways To Be Not Normal
While a fully normalized schema can often be defined for Hive data, it is common practice to relax many aspects of normalization, and in fact, Hive also explicitly supports some kinds of non-normalized data storage.
- First normal form prohibits multiple data elements in a single column. Hive, on the other hand, provides syntax for storing and accessing lists and maps within a column.
- A normalized table may not hold more than one kind of data; references to unrelated data are made through foreign keys other tables. This requirement tends to be relaxed in order to:
- Reduce up-front costs of schema design and complexity of understanding a schema later
- Reduce the need for joins, which tend to be more expensive in Hive
- Minimize the amount of ETL to do for data that may only be analyzed once, rarely, or even never (e.g. logs or security data.)
- The property that differentiates Third Normal Form is often summarized as: “Each attribute must represent a fact about the key, the whole key, and nothing but the key.” Like the preceding item, this is often relaxed to allow fields that have a dependency on only part of a key.
Some Good Reasons to Normalize Hive Data
There are some use cases for which relatively strict normalization may be valuable in Hive. For instance, because of its abundant storage and processing capacity, Hive is often used to do ETL for OLTP or RDBMS systems. The output of such ETL is often both exported to the foreign systems, and kept in Hive to be used for bulk analytics. Such tables will usually be normalized to the requirements of the foreign system even if the Hive analyst doesn’t need it.
Another example is the classic “360 Degree View of a Customer.” This usually means that a refined view of a customer is gleaned from diverse sources, and added to over time. Source data may include records purchases, Web server logs, external social media, and conventional RDBMS data ingested from other systems. This use-case will often involve a well-defined schema with significant normalization for the result tables, while the source tables for the data might not be normalized at all.
A Sneaky Hive Trick
Beware that Hive has numerous tricks to reduce the cost that one would naively assume that a join must incur when implemented with MapReduce. One thing that Hive does is detect that a table (typically a dimension table) is small enough to simply pull into each mapper whole. Such a table will be represented within the Map process as a hash-map that uses the field cited in the join condition as the key. In this way, a small table can incorporated into the join results entirely on the map side and incur no corresponding shuffle-sort/reduce phase. This is called a map-side join or a broadcast join. If all but one table is small, the entire join can often be accomplished in this way, completely eliminating the shuffle and reduce phase.
- Favor leaving data raw until/unless you have a compelling reason to impose a schema on the physical data.
- How normalization and de-normalizing will affect performance may not be obvious; don’t do the work to normalize data unless you’re sure there’s a benefit.
- Don’t do ETL on raw data that is intended for use in Hive until/unless you are sure there is a commensurate benefit.
- Beware of wasting analyst time and processing time on normalizing data that will rarely be accessed. Sometimes brute force is all you need.
Some SQL Query Best Practices and Optimization Techniques
Turn on the Cost Based Optimizer by default.
There are often multiple logically equivalent ways that Hive could execute a query. Until the advent of the Cost Based Optimizer (CBO) Hive used more-or-less hard-coded strategies to generate a specific query plan. The CBO lets Hive tune up the query plan according to metadata that has been gathered in advance from the data. This has long been a feature of any RDBMS worth its salt, but it’s relatively new to Hive. Table statistics used by the CBO include min, max, number of distinct values, partitions, table DDL, and so on, to optimize the query plan. Turning on CBO will typically halve the execution time for a query, and may do much better than that.
- Metadata used by the CBO is stored in the metadata database.
- Statistical data for use by the CBO is gathered in two ways:
- For new tables, it is gathered automatically unless hive.stats.autogather is set to false.
- For existing tables, or new data files, data is computed using the analyze command.
- Using analyze statistics
- CBO can be turned on and off with: hive.cbo.enable=true/false
- Tell the CBO to consider the column-stats
- Tell the CBO to use partition stats
The cost-based-optimizer chooses a query plan based on metadata generated by “analyze.” But this data isn’t always generated automatically. For instance, provided you have “autogather” turned on, it will be generated when you add data programmatically, but it will not be generated when you add data by dropping files into HDFS manually or add it with the “LOAD” command, which just moves pointers to the data rather than touching the data itself.
Analyze executes a MapReduce table scan to accumulate the statistics, so running it has a noticeable cost for large data sets, but the savings on run-time are almost always worth it. Some variants of analyze are:
- ANALYZE TABLE <t-name> COMPUTE STATISTICS;
- ANALYSE TABLE <t-name> COMPUTE STATISICS FOR COLUMNS
- ANALYSE TABLE <t-name> PARTITION <coll=“x”> COMPUTE STATISTICS for COLUMNS
Note that the computation is by table, not by the underlying data set.
Tips for analyze:
- Set hive.stats.autogather=true; This controls whether the statistics are automatically computed when new data is added to the table.
- If you are adding data by any means other than inserting it with a SQL statement, re-run analyze, including “for columns”
The following are general guidelines for your DDL and Queries. The Apache Wiki has some nice page on joins that details many of the subtleties regarding what is efficient/possible relating to joins: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Joins and https://cwiki.apache.org/confluence/display/Hive/LanguageManual+JoinOptimization
- Don’t forget to specify ORC storage in the DDL. Many of the most important optimizations require it, so it should be your default format.
- Don’t try to use Hive like an OLTP system
- Transactions in Hive are best used for adding data and batch updates, not supporting concurrent updates
- There’s a separate file of deltas created for each transaction that changes the data—it is not a mechanism for high-frequency changes
- HBase (the NoSQL database included in HDP) is also is not a relational database, but it can handle many of the high-frequency lookup use-cases far better than Hive (and in some use-cases, better than any RDBMS.)
- Hive is often used to prepare data for HBase, but the use cases are mostly distinct.
- Joins are allowed in Hive, and are highly useful, but in general:
- A good Hive schema will be less normalized than an RDBMS or OLAP schema, in large part to avoid joins.
- Consider de-normalizing big normalized tables that have been ingested from RDBMS’s or OLAP systems. One way to do this is to:
- Define a de-normalized version of the table in question
- Define a view over multiple tables
- Select everything in the view into your de-normalized table
- If the table is often used in conjunction with other large tables, consider setting up the partitioning/bucketing on all the joined tables to match
- If you’re using “dimension” tables, make sure you’ve enabled broadcast joins
- Even if the query is entirely map-side, there is still a cost to a “star” configuration in the time and memory to build the maps, copying the dimension tables across the network, etc. Don’t use a star schema style unnecessarily.
- If “rank” can be substituted for over “max()” in a subquery, you will avoid a join and usually speed up a query.
- Look out for skewed data (in which some columns in the where-clauses have values that are grossly over-represented). If some mappers seem to take much longer than others, it is often a symptom of skew.
- Fix this in the DDL by creating the table with a “skewed by” column
- “Skewed by” splits out rows with the skewed value into multiple files of their own so they can be processed in parallel.
- If the data is already loaded, you can re-load the data by
- Moving the data to a temporary table
- Recreating the real table with the corrected skewed-by properties
- Inserting the data programmatically into the empty new table.
- There is no point is using “skewed by” if the column does not appear in where clauses.
- Buckets and partitions are your friends
- Partition and/or bucket large tables that joined frequently on the same column in the same way, in order to take advantage of physical locality
- Sort the bucketed columns to increase pushdown predicate effectiveness—sorting allows the ORC reader to use metadata to ignore inapplicable row-groups more effectively
- Don’t divide the data set so finely that you have under-sized blocks or splits
- Minimize the depth of partitions—one or two at most unless your tables are huge
- Beware that bucketing helps most when it
- Enables effective predicate pushdown
- Facilitates bucket-joins
- Can be used for sampling
- Consider increasing the replication factor for dimension tables that are heavily used
- This is done at the HDFS level, not at the Hive level
- It may be worth it to replicate small heavily used dimension tables—up to several megabytes—to every node in the cluster.
- When using CPU-intensive formats such as XML or JSON, it may be more economical to write a temporary extract of the data to ORC format and then execute your query than to execute your query against the raw data.
- Consider “roll-ups” for large frequently queried datasets, i.e., pre-computed aggregations and summaries
- Views are handy, but beware that they are pure metatdata—they are not materialized, and represent implicit computation
Control the Parallelism
Hadoop and Hive are all about parallel processing, but the question of how to use and control parallelism, and to what end, is complicated. There is no one best way because the correct choice for a given situation depends the workload and what you are trying to achieve. For instance, the following are all equally valid performance goals:
- Maximizing the speed of a single huge query
- Maximizing heavy batch throughput across the day
- Minimizing the average wait time for a query to be served
- Reducing the latency of interactive queries in a mixed-use environment with many kinds of processing going on.
Unfortunately, optimization strategies for different workloads are often in conflict. Consider a batch environment with zero interactive use. Tuning for maximum parallelism guarantees lowest average latency only for a single job. Why? Because starting up a container has a significant cost in initialization time during which that slice of cluster processing power is effectively idle. Therefore, if you run the maximum number of containers for every job, you are devoting proportionately more time (across the day’s jobs) to container start-up, because there is less reuse of containers.
At the other extreme, suppose you always have a backlog of jobs available to run anytime, i.e., you have an unlimited amount of work available. In this case, running multiple large jobs side by side is more efficient of cycles because average container re-use increases, and even though the elapsed time for the actual processing increases for all jobs, the average turnaround time for jobs entering the work queue decreases.
Of course, you can only take this so far. Running too many large jobs in parallel will eventually drive down efficiency too, because container startup time isn’t the only factor. For instance, as the number of active concurrent jobs goes up, the ratio of working containers to ApplicationMasters goes down, and the the ratio of working mappers to reducers goes up (because the number of reducers tends to be less flexible.) Some examples:
- If you’re Yahoo, running an endless series of gigantic batch jobs on trillions of rows of click data, then you might care only about number (1) because it’s all about the maximizing total gigabytes per server per day.
- A more common pattern would be to have a relatively small number massive batch queries for ETL daily, combined with numerous modest sized queries for reporting, etc. Such an organization would care somewhat about (1) and (2), but much more about (3). Who cares if a two hour batch job takes an extra half hour if hundreds of small jobs can run right away?
- In a multi-tenant environment, keeping a reasonable availability of resources for all paying organizational customers might be the main thing (3,4) because the cost is shared and nobody wants to be locked our of a cluster they are contributing money for.
- Human interactive use is the extreme case—in a cluster supporting interactive use by analysts, every other use of the cluster might be subordinated to the goal of minimizing the time humans spend twiddling their thumbs, i.e., (4)
A true “optimum” is hard to define, let alone find, because workloads almost never consist of a limitless supply of jobs ready to run. In real life, work tends to be feast or famine, so in addition to the size of the jobs, many factors, including the arrival time distribution, the urgency of different workload types, and the nature of the queries come into play. One of the most important factors in choosing a strategy is seldom mentioned: the analyst and developer cost of finding the best settings. “Pretty good” is often more efficient than perfect, when the cost of finding the best solution is figured in. One developer salary spent on optimizing can buy a lot of extra compute capacity over the course of a year.
For all that, parallelism can be usefully influenced on multiple levels. The most important strategies are:
- Controlling allocation of resources via YARN’s CapacityScheduler
- Controlling the number of containers at the Tez level
- Low-level control of the maximum supply of containers via manipulation of memory allocation for Yarn containers
By default, Hadoop treats all inbound queries as a single queue of jobs. Within a single processing queue, the first query that comes along gets all the resources that it asks for. The next query must be satisfied with whatever is left over, and so on until there are no more available resources. Once all the resources are spoken for, jobs have to wait for tasks to finish and release resources.
YARN’s CapacityScheduler runs by default with a single queue, making its default scheduling behavior pretty similar to that of plain Hadoop without YARN. Therefore, if you do nothing, all resources will be allocated first-come/first-served. Fortunately, the CapacityScheduler lets you define hierarchies of queues each of which gets a proportion of available resources, and you can assign jobs/users to the queues either manually or by policy.
When multiple queues are set up, YARN will allocate resources from each queue in much the same way it does with the single default queue. If a queue is given 50% of the cluster’s total resources, YARN will allocate 50% of cluster resources to jobs assigned to that queue, first-come/first-served, and reserve the rest of the capacity for the remaining queues.
You can use this behavior in many ways, for instance, to arrange a small number of queues to which big jobs will be assigned so that they will run in parallel instead of lining up one by one. Then you can reserve another fraction of the space for say, interactive Hive queries, to insure live users will get served quickly. Note that the CapacityScheduler controls resource allocation for other kinds of jobs as well, including MapReduce2 jobs, Pig, etc.
A partial list of ways to tune the CapacityScheduler includes:
- Within a queue, the maximum capacity that will be allocated to any one user can be limited.
- Queues can be permitted to use resources beyond their assigned limit if those resources would otherwise be idle, and caps can be set for how much more than their limit a queue can use.
- Policies can be set for how to take back extra resources that properly belong to other queues when new demand comes along. Excess containers being usurped by the over-allocated queue can be:
- Terminated immediately
- Allowed to complete, and then taken back, rather than being reused
More about the scheduling and the CapacityScheuler in particular can be found in Choosing a Scheduler.
Controlling Container Size and Number in Tez:
RThe number of mappers for a job cannot exceed the number of blocks or splits that are in the underlying dataset. Depending upon the maximum number of mappers the cluster can support and the data set size, in many cases, the number of mappers can be capped by controlling the number of splits or blocks, depending on what your storage format is. Details may be found here: How Task Parallelism Works
The main attribute to tune here are
- tez.grouping.split-waves: Multiplier of the available capacity on the queue to determine the number of tasks for a vertex. Default is 1.7, i.e., it will generate 170% more tasks than the queue has resources for.
- tez.grouping.max-size: The most data Tez will assign to a task. Decreasing this means more parallelism
- tez.grouping.min-size: The least data Tez will assign to a task. Increasing this means less parallelism
- tez.grouping.split-count=n If you think you know better than Tez, you can override Tez logic for figuring out the number of splits, and set it by hand.
- hive.tez.container.size overrides tez.task.resource.memory.mb, and can be set per-query.
Controlling the number of Mappers
- The maximum number of mappers per hardware node can be limited by controlling the memory allocated to Tez containers. The ratio of these two is the maximum number of mappers per node.
- The typical ratio should be around containers per disk or core.
- Number of disks and number of cores should be in the same ballpark on a well-tuned commodity hardware cluster
Store First and Ask Questions Later
HDFS storage is cheap—as little as 1% of the cost of a high-end data appliance, and, equally importantly, data storage does not require an ETL step. Unlike a relational database, Hive need not ingest data row-by-row because there is no indexing involved. Data files are typically simply added to an HDFS directory, either by executing an “hdfs dfs -put my-file hdfs-dir” command, or by using the Hive “LOAD” command, which also moves files into HDFS by the block.
These two properties of Hive let you store and explore data types that would be absurd in a relational database. Many forward-looking companies now default to storing all data in HDFS and exposing it to the analysts via Hive. The analysts can impose their own schema on the data ad-hoc as needed for exploration. Later, and only if a clear need is established, add ETL steps, which may consist of little more than defining an appropriate storage format (which will usually be ORC with some kind of partitioning or bucketing.) The links below refer to articles that explain “save everything by default” at TrueCar, a company well known for it’s innovative Big Data strategy.
Defining a Storage Format on the Fly: Note that it is common for analysts to reprocess raw data into ORC format on the fly, even for a single query, if the dataset size or the queries warrant. This might typically be done because:
- Much of the data is not of interest, and the part that is can fit into significantly less space
- The raw data has many fields, but only a few appear in your where clauses and projection
- The raw data has large text data fields that won’t be part of your projection, and can thus be dropped entirely
- To take advantage of bucketing
- To take advantage of partitioning and sorting
- The raw data is in JSON or XML that is relatively expensive to parse on the fly.
Many Ways to Add Data To Hive
Relational database have numerous requirements that make the acquisition of bulk data expensive, both in terms of the CPU cycles required to store a row, and the up-front planning necessary to prepare for storing the data. The underlying principle is that non-trivial data needs to be sliced and diced and brought to something like third normal form, and in most cases, indexed, in order to be useful.
Data may also require a preliminary conversion from a native format such as XML or JSON by a process outside of the RDBMS.
Probably the most important barrier impediment to ingesting large amounts of data quickly is that storage mechanics require that data be dealt with row-by-row as it is read and inserted in to disk-based data structures that require random access, as opposed to streaming. Table size also typically has a a soft upper bound of a billion or so rows, after which storage cost soars and efficiency declines.
Hive lacks all of these constraints, because it does not use indexes (at least not global indexes in the RDBMS sense), does not require that files containing ingested data even be opened or decompressed, let alone processed row by row, and has no real maximum table if you have enough data nodes to support the disk space.
The most common form of data ingestion in fact consists merely of copying files into HDFS, but there are a number of ways to do it that fall into three categories, as follows:
- Dropping data files into HDFS from outside of Hive
- Directly copying files into HDFS using the hdfs or hadoop utilities, WebHDFS, or HDFS nfsmount.
- Data may be
- Directly dropped into the Hive table’s HDFS data warehouse directory
- Copied into an HDFS staging directory, then loaded into Hive using the “LOAD” command, which simply adjusts the HDFS links to move the files into the Hive data warehouse directory.
- Using the Hadoop “distcp” utility to do a distributed copy that accomplishes the processes described
- Copy data from HDFS on one cluster to HDFS on another
- Copy data within a cluster from one HDFS directory to another
- Copy data from a native file system to HDFS
- Using Apache Flume to automate the process of moving bulk files
- Flume can also “tail” files, such as log files, appending the data to the files backing a Hive table continuously.
- Using Sqoop automates the process of downloading data from a relational source and inserting the resulting data into hive directories.
- Using Hive to insert data into a Hive table
- Data is selected from one set of tables using a Hive SQL, and inserted into another Hive table. This is often used to convert raw data (e.g., a CSV file) into a table backed by ORC, possibly with columns rearranged, deleted, cleaned up, etc.
- The underlying process is distributed MapReduce or Tez, and result is data files dropped into the Hive data warehouse HDFS directories.
- Streaming processes
- Hive supports streaming in data from Storm and other sources and inserting it into a Hive table. The result is, once again, big data block files in the Hive warehouse HDFS directories.
It’s Not Just Hive
This document deals primarily with Hive, but most shops don’t confine themselves to Hive-over-HDFS. Other Hadoop components that are commonly used in conjunction with Hive include:
- Apache Pig is a scripting language that provides a wide range of scripting functionality to automate data conversion, ingestion, ETL, and batch processing.
- Apache Oozie provides higher level scheduling of Hive, Pig, and native OS operations such as bash scripts.
- Apache Flume provides diverse ingestion options for file-based data, particularly log files (hence the name)
- Apache Sqoop provides ingestion of data from relational sources (as well as returning processed data to relational destinations.)
- Apache Falcon orchestrates Flume and Sqoop ingestion and provides data governance, which includes scheduling of ingestion and end of life, tracking the provenance of data at a row level, etc.
- Apache Ranger provides authorization policies for both the underlying data and the metadata, giving SQL “grand” capabilities that can be centrally administered by policy.
- The hdfs and hadoop utilities provide command line access to
- POSIX-like functionality for HDFS, including ls, rm, chmod, chown, etc.
- Functionality for moving between HDFS and the native FS: put, get, etc.
- Functionality for moving data within and between clusters: disco
- HDFS system maintenance such fsck, du, df
Understand What’s Happening Under the Hood
Always Use the Query Summary—It’s Free
Using the setting set hive.tez.exec.print.summary = true; at the head of your query will tell Hive to print out an execution profile for your query. The output is full of useful information about where your query is spending its time and resources.
Like most RDBMS’s, Hive has an “explain” function that will tell you how it intends to lay out your query. You can try different queries that do the same thing to get a feel for what it is going to do. It is also interesting to do this before and after running analyze on your datasets. Sometimes the difference is very large.
Tez Execution Graph
The Tez execution graph can be viewed through YARN. This has somewhat similar information to the output of explain. It can be most easily examined through Ambari’s Tez View.