Hive Tez AM split computation based on the input format
Published Aug 03 2021 12:54 AM 5,888 Views
Microsoft

Introduction:

 

The performance of the query depend on the size and type of data each of Tez container processes. Split computation plays a major role when deciding on the number of containers required to run a specific query. This affects the cluster resource utilization, performance of the specific query and the performance & running of concurrent queries.

 

Problem often encounters:

 

With text based formats like Parquet, TextFormat for the data under Hive, the input splits is straight forward. It is calculated based on:

             

 

No. of data files = No. of splits

 

 

These data files could be combined with Tez grouping algorithm based on the data locality and rack awareness. This is affected by several factors. If TEZ_GROUPING_SPLIT_COUNT is set, the value will be used as initial count. Otherwise, the one passed in parameter will be used. Then the initial count will be corrected according to TEZ_GROUPING_SPLIT_MIN_SIZE and TEZ_GROUPING_SPLIT_MAX_SIZE: if the initial count causes a too small grouped split size, it will be overridden as total input size/TEZ_GROUPING_SPLIT_MIN_SIZE; if initial count causes too large grouped split size, it will be override as total input size/TEZ_GROUPING_SPLIT_MAX_SIZE.

 

// HiveSplitGenerator   

 

2017-02-16 15:56:48,862 [INFO] [InputInitializer {Map 1} #0] |tez.HiveSplitGenerator|: Number of input splits: 5. 3 available slots, 1.7 waves. Input format is: org.apache.hadoop.hive.ql.io.HiveInputFormat

 

 

// then grouping logic get called in HiveSplitGenerator

 

 

2017-02-16 15:56:48,891 [DEBUG] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: Adding split hdfs://azure/apps/hive/warehouse/managed/hivesample01/sample1.csv to src new group? true

2017-02-16 15:56:48,891 [DEBUG] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: Adding split hdfs://azure/apps/hive/warehouse/managed/hivesample01/sample1.csv to src new group? false

2017-02-16 15:56:48,891 [DEBUG] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: Adding split hdfs://azure/apps/hive/warehouse/managed/hivesample01/sample1.csv to src new group? false

2017-02-16 15:56:48,891 [DEBUG] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: Adding split hdfs://azure/apps/hive/warehouse/managed/hivesample01/sample1.csv to src new group? false

2017-02-16 15:56:48,892 [DEBUG] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: Adding split hdfs://azure/apps/hive/warehouse/managed/hivesample01/sample1.csv to src new group? false

2017-02-16 15:56:48,892 [INFO] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: # Src groups for split generation: 2

2017-02-16 15:56:48,893 [INFO] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: Estimated number of tasks: 5 for bucket 1

 

 

org.apache.hadoop.mapred.split.TezMapredSplitsGrouper#getGroupedSplits(org.apache.hadoop.conf.Configuration, org.apache.hadoop.mapred.InputSplit[], int, java.lang.String, org.apache.hadoop.mapred.split.SplitSizeEstimator) based on tez.grouping.split-count

 

 

2017-02-16 15:56:48,894 [INFO] [InputInitializer {Map 1} #0] |split.TezMapredSplitsGrouper|: Grouping splits in Tez

2017-02-16 15:56:48,894 [INFO] [InputInitializer {Map 1} #0] |split.TezMapredSplitsGrouper|: Using original number of splits: 5 desired splits: 5

2017-02-16 15:56:48,897 [INFO] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: Original split size is 5 grouped split size is 5, for bucket: 1

2017-02-16 15:56:48,899 [INFO] [InputInitializer {Map 1} #0] |tez.HiveSplitGenerator|: Number of grouped splits: 5

 

 

This could be addressed from the Hive end as well with hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.

 

If we set this parameter in hive-site.xml, it will be the default Hive InputFormat provided not setting 'hive.input.format' explicitly before the HQL.

The InputFormat in this scenario serves different function in comparison to the former one. Firstly, let's take a glance at 'org.apache.hadoop.mapred.FileInputFormat', which is the base class for all file-based InputFormat. There are three essential methods in this class:

 

 

boolean isSplitable(FileSystem fs, Path filename)
InputSplit[] getSplits(JobConf job, int numSplits)
RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter)

 

 

'isSplitable' is self-explaining: it will return whether the given filename is splitable. This method is valid when working around MapReduce program, when it comes to Hive-related one, we could set 'mapreduce.input.fileinputformat.split.minsize' in hive-site.xml to a very big value to achieve the same effect alternatively.

'getSplits' will return an array of InputSplit objects, whose size is corresponding to the number of mappers for this HQL task. Every InputSplit contains one or more file chunks in current file system, the details will be discussed later.

'getRecordReader' will return a 'org.apache.hadoop.mapred.RecordReader' object, whose function is to read data record by record from underlying file system. The main methods are as follows:

 

K createKey()
V createValue()
boolean next(K key, V value)
float getProgress()

 

 

'createKey', 'createValue' and 'getProgress' is well self-explaining. 'next' will evaluate the key and value parameters from current read position provided it returns true; when being at EOF, false is returned.

In the former case as mentioned above, only 'getRecordReader' method will be used; Whereas in the latter case, only 'getSplits' method will be used.

 

Suppose you have a file for the 200M, press the top HiveInputFormatof the split algorithm:

  1. The total file size is 200M, goalSize = 200M/2 = 100M, minSize = 1, splitSize = max {1, min {100M, 128M}} = 100M
  2. 200M/100M> 1.1, so the size of the first block is 100M
  3. The remaining file size is 100M, less than 128M, so the second block size is 100M.

Whereas, CombineHiveInputFormat of the split algorithm:

  1. 128M <200M <128M X 2, so the size of the first block is 128M
  2. The remaining file size is 200M-128M = 72M, 72M <128M, so the second block size is 72M

 

What happens when it’s a ORC table and why ORC:

 

Split Computation and Grouping by OrcInputFormat:

 

Firstly, OrcInputFormat reads all stripes from all input ORC files, and creates one split per stripe unless the stripe size is lower than mapreduce.input.fileinputformat.split.minsize value.

If the stripe is smaller than mapreduce.input.fileinputformat.split.minsize then OrcInputFormat combines multiple stripes into a single input split.

 

Split Grouping by Tez:

 

In addition to OrcInputFormat, the Tez engine can further group the input splits.

Initially Tez asks the YARN Resource Manager about the number of available containers, multiplies this number by tez.grouping.split-waves (1.7 by default; for more information about split ways, read Tez Internals #1 – Number of Map Tasks) and gets the desired number of input splits (and tasks).

From the Tez Application Master log you can see:

 

[INFO] [InputInitializer {Map 1} #0] |tez.HiveSplitGenerator|: Number of input splits: 1084. 337 available slots, 1.7 waves.
[INFO] [InputInitializer {Map 1} #0] |tez.SplitGrouper|: Estimated number of tasks: 572

 

1084 is the total number of stripes in all input ORC files, 337 is the total number of available containers, and 337 * 1.7 gives 572 tasks.

 

Tez knows that it wants to run 572 tasks, so it defines the average data size per task:

 

[INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Grouping splits in Tez
[INFO] [InputInitializer {Map 1} #0] |grouper.TezSplitGrouper|: Desired numSplits: 572 lengthPerGroup: 822989185 numLocations: 1 numSplitsPerLocation: 1084 numSplitsInGroup: 1 totalLength: 470749813876 numOriginalSplits: 1084 . Grouping by length: true count: false nodeLocalOnly: false

 

Tez calculates totalLength that is the total volume of input data to process. Note that Tez uses the column statistics from ORC files, not from Hive Metastore (!) to get the estimated uncompressed data size. That’s why totalLength is 470 GB while the total size of input ORC files is just 143 GB.

 

Knowing the total data size and the desired number of tasks, Tez gets lengthPerGroup, the desired size of input split: 470749813876/572 = 822989185. So the desired input split is 822 MB (!), and again it is the uncompressed data size.

 

Similar to OrcInputFormat, Tez also goes through all ORC stripes (actually input splits created by OrcInputFormat), but now it deals with the uncompressed data sizes.

If the input split is smaller than tez.grouping.min-size then it is combined with another split trying to create input splits having the lengthPerGroup size.

 

It is hard to get this exact size, since Tez has to operate with full ORC stripes, it cannot split a single stripe into multiple input splits.

 

ORC Split strategy:

 

There are three available options HYBRID, ETL and BI:

 

HYBRID: Reads the footers for all files if there are fewer files than expected mapper count, switching over to generating 1 split per file if the average file sizes are smaller than the default HDFS blocksize.

BI: Per file split. It will be faster when the number of files is less

ETL: Reads the File Footer and then decides the number of splits. There is a searchArg passed to the reader which can eliminate orc stripes/splits based on the filter condition provided in the query. This is used when you can allow ORC split to spend time on calculating the Splits and is used when the Query is large.

 

Accelerate the ORC split computation:

 

Tuning the default value of hive.orc.compute.splits.num.threads=10, set the value of this parameter to match the number of available processors. This parameter controls the number of parallel threads involved in computing splits. For Parquet computing splits is still single-threaded, so split computations can take longer with Parquet and Cloud Object Stores.

 

With hive.exec.orc.split.strateg=ETL, set hive.orc.splits.include.file.footer=true – This ensure Hive and Tez AM is aware of the split payload from the footer information.

 

References:

https://www.slideshare.net/Hadoop_Summit/orc-file-optimizing-your-big-data

https://community.hortonworks.com/articles/68631/optimizing-hive-queries-for-orc-formatted-tables.ht...

Co-Authors
Version history
Last update:
‎Aug 03 2021 12:54 AM
Updated by: