Jerome Rajan

0 %
Jerome Rajan
Staff Solutions Consultant at Google
Data & Analytics
  • Residence:
    India
  • City:
    Mumbai
SQL
Dataproc, EMR
Hadoop
BigQuery
AWS Glue
PySpark, Python
Data Pipeline Design
Tableau, Redshift, Snowflake
IBM DataStage
  • AWS Lambda, S3, EMR, SQS, DynamoDB, Step Functions, Cloud Functions
  • Unix Shell Scripting, Python
  • Oracle, DB2, Redis
  • Alteryx, VBA, Blueprism, UiPath
English
Tamil
Hindi
Malayalam
Marathi

Hadoop — Understanding Splits, Blocks & Everything In Between

April 21, 2023

Originally published at my Medium Blog

Understanding Hadoop is like trying to unravel a tangled ball of yarn while wearing oven mitts. I’ve had my fair share of struggles trying to wrap my head around mappers, reducers, splits, blocks, containers, heap memory, GC, et al. Often times, in the deepest of rabbit holes, my ladder to escape was a story — A story that I can tell my 5 year old self without censorship or fear of judgement. In this article, I am going to try to present some of those stories to you in the hope that you find them enlightening in your darkest hours. Let’s jump right in!

To keep my focus laser sharp, we will try to answer a simple question through the length of this article:

How are mapper and reducer task counts determined?

To answer the primary question, we need to understand blocks and splits in HDFS.

What Is An HDFS Block?

The Toddler Tale

Imagine that you have a lot of marbles that you want to store. You can put them in a big box, but it might be hard to find the marble you need. Also, if the big box were to develop a tear, all your marbles will be lost.

Instead, you can put them in smaller boxes (blocks), so you can easily find the one you want. If one of the smaller boxes starts going bad, you can easily replace the smaller box without touching the other marbles.

This is how HDFS blocks work.

The Senior Citizen’s Version

The Block size refers to the size of the physical data blocks that are stored on the Hadoop Distributed File System (HDFS). When a large file is uploaded to HDFS, it is broken down into smaller blocks. These blocks are then distributed across the HDFS cluster and replicated for fault tolerance.

The property that controls the size of a block is dfs.block.size .

Unless this article has time travelled back to the stone-age, it is very possible that you are no longer using HDFS for storing persistent data but are using a Cloud storage service like GCS or S3.

Let’s assume you use GCS. GCS doesn’t really expose its block sizes. In fact, GCS uses a different approach to store data rather than fixed-size blocks.

Then what is fs.gs.block.size ?

When data is written to GCS via Hadoop, it is split into smaller chunks and stored as separate objects in GCS. The size of these objects is determined by GCS, not by the fs.gs.block.size parameter. However, Hadoop still needs to split the data into smaller chunks to parallelize processing and optimize data transfers. The fs.gs.block.size parameter is used to specify the size of these smaller chunks, which are then written to GCS as separate objects. This is therefore important from a performance point of view because it determines network usage, Disk IOPS and consequently the CPU as well.

This Sounds Similar To Splitting. What Is The Split Size Then?

The Toddler Version

Let’s take our box of marbles story a little forward. Say you have a million marbles which you have split into smaller physical boxes each of which can hold 10,000 marbles. You therefore now have —

Number of Blocks = Total Number of Marbles/Capacity of each block
Number of Blocks = 1,000,000/10,000 = 100

You now want to get a count of marbles by colour and realise that going through a million is a daunting task. So you decide to bring in a few friends to do the job for you. You tell your friends that each one has to work through 5000 marbles each.

This 5000 is nothing but the split size. It is a virtual split in the sense that physically they are still grouped in a block of 10,000 but your friends are going to go through it only 5000 at a time. So, how many friends do you need to go through the entire set?

Number of friends required = Total number of marbles/split size
Number of friends required = 1,000,000 / 5000 = 200

The Senior Citizen’s Version

The friends are your mappers and the split size along with the input volume determines how many mappers your application will require. MapReduce uses the below properties to control the split sizes

mapreduce.input.fileinputformat.split.minsize
mapreduce.input.fileinputformat.split.maxsize

Let’s say

mapreduce.input.fileinputformat.split.minsize=16777216
mapreduce.input.fileinputformat.split.maxsize=1073741824
fs.gs.block.size=134217728

If your file size is 5 GB, then Hadoop will create 40 splits

Split size = max(minimum split size, min(maximum split size, block size))
Split size = max(16 MB, min (1 GB, 128 MB))
Hence, Split size = 128 MB
Number of splits = 5 GB/128 MB ~= 40

If for whatever reason, 40 mappers are a bit much or too little, you have to tune the split size which in turn is dependent on the above mentioned properties.

Also note, that the block size should generally be a multiple of the split size which means that it should be equal to or larger than the split sizes.

What about Mappers & Reducers? What’s An Ideal Count?

The Toddler Version

So these friends you’ve brought in — all 200 of them are going to be working for you. These are your mappers. They are each going to go through 5000 marbles at a time, religiously keeping note of how many marbles of each colour is in their chunk. But as these 200 friends start entering your house, you soon realise that your small 1BHK apartment is not big enough to house the 200. You have to re-strategise and you start by figuring out a way to bring down the number of friends to 50 because that’s all the chairs you can accommodate. How do you do this? For starters — you increase the split size from 5000 to 20,000.

Number of friends = Total Volume / Split Size = 1,000,000 / 20,000 = 50

Now you have 50 friends neatly packed into your house each with a chair and working through chunks of 20,000 marbles in parallel!

Now each friend finishes counting and gives their individual results to you say in the following format –

red:5
blue:7
green:2

You collate all the individual results and combine it into the final output. In this case, you were the reducer! You didn’t need as many reducers as mappers. So, it is important to note that the reducer count need not be the same as mapper count.

The Senior Citizen’s Version

In our example above, which factor determined the end goal of 50 friends? It was the fact that I had only 50 chairs. These 50 chairs are synonymous with the cluster resources — CPU and Memory. So, you’d typically want at least as many mappers as there are cores in the cluster. But more often than not, 100% of the cores are not used all the time. There’s always room for improving CPU utilisation by scheduling additional containers while the OS handles CPU scheduling and sharing. You can control the mappers by controlling the split size as explained in the previous section.

A caveat — The number of input splits is also influenced by the number of files in the input directory.
For e.g., if you have 10,000 files and a total input size of 10 GB and your split size is 1 GB, Hadoop will create an input split for each file, which will result in 10,000 input splits and as many mappers. This can become a problem especially when the number of cores in your cluster are limited. A general guideline to control the number of mappers in such scenarios is to use the CombineInputFormat which allows Hadoop to combine multiple small files into a single input split, thus reducing the number of mappers

Determining the number of reducers is dependent on multiple other factors. The reducer count recommendations are slightly different for Hive and a standard MapReduce job. Although Hive runs MR under the hood, it provides certain optimisations that allow for more dynamic and efficient usage of resources.

Hive

For Hive, the one way “NOT RECOMMENDED” is to simply force Hadoop to use a certain number of reducers which is using mapreduce.job.reduces=N.

By default, mapreduce.job.reduces=-1 which means that you are allowing the framework to determine the number of reducers required.

So, how does the framework determine the reducers and how can you tune it?

The key properties are hive.exec.reducers.max and hive.exec.reducers.bytes.per.reducer . There are some other factors that are explained in the below Cloudera article.

Hive on Tez Performance Tuning – Determining Reducer Counts

But simplistically speaking, Hive uses the below formula to determine the reducer count —

Number of reducers = 
max(1, min(hive.exec.reducers.max, Input data to reducers / hive.exec.reducers.bytes.per.reducer))

This is simply Hive trying to ensure that the reducers are always between 1 and hive.exec.reducers.max while using hive.exec.reducers.bytes.per.reducer to control how much work each reducer will have to do. Again — you typically would want the reducer count to be allowed to go as far as the cores in the cluster and even higher if the CPU utilisation allows for it.

MapReduce Jobs

The story is a little different for MapReduce where you are expected to set mapreduce.job.reduces=N . The general recommendation for this value is documented in the official Hadoop website

Source — https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html

Motivation

The reason I started writing this article and learning more about this was from an issue I was facing with a customer’s workloads.

I noticed almost 0 CPU utilisation and increased network and disk IOPs in a Hadoop job and realised that it was possibly due to the low number of reducers. This caused the job to spend more time waiting for data to be transferred between the mappers and reducers, which in turn increased the network and disk I/O usage.

Additionally, if there are too few reducers, it can also lead to data skew, where some reducers are responsible for processing a disproportionately large amount of data. This can cause some reducers to be idle while others are overburdened, leading to lower CPU utilization.

Hope this article was informative and helped you understand some of the inner workings of Hadoop. If you have any feedback for me, please drop a note in the comments and I’ll be happy to respond!

Posted in TechnologyTags:
Write a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Be Original
Would the boy you were be proud of the man you are?