1
Sponsored Links


Ad by Google
In this tutorial, I am going to show you an example of custom partitioner in Hadoop map reduce.
In my previous tutorial, you have already seen an example of Combiner in Hadoop map reduce programming and the benefits of having combiner in map reduce framework. Combiner(how combiner works in map reduce) is one of the very very important feature of Hadoop, which really helps a lot to reduce the network traffic between map and reduce phase.

What is Partitioner?
The partitioner controls the partitioning of the keys of intermediate map outputs. The partitioner, which decides the output of the mapper goes to which reducer, generally it uses the default hash partitioner for the same. When there are multiple reducer, the map task partition their output, each creating one partitioner for each reduce task.
Although it uses the default hash partitioner, you can also control it by using your own custom partitioner. The total number of partitioner is equals to the the total number of reduce task.

In this tutorial, I am going to show you an example of custom partitioner. We have the click stream data sets of 10 countries and from this dataset, we want to analyse the clicks of India and USA country only. So here, I am going to use a custom partitioner called CountryPartitioner to send the data of India and USA country to specific reduce task. At the end we will have three output files.
OUTPUT:

Tools and Technologies we are using here:

  • Java 8
  • Eclipse Mars
  • Hadoop 2.7.1
  • Maven 3.3
  • Ubuntu 14(Linux OS)
Main Objects of this project are:
  • ClickStreamDriver.java
  • ClickStreamReducer.java
  • ClickStreamMapper.java
  • ClickStreamVO.java
  • CountryPartitioner.java
  • pom.xml
Step 1. Create a new maven project
Go to File Menu then New->Maven Project, and provide the required details, see the below attached screen.

Step 2. Edit pom.xml
Double click on your project's pom.xml file, it will looks like this with very limited information.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.javamakeuse.bd.poc</groupId>
  <artifactId>CustomPartitioner</artifactId>
  <version>1.0</version>
  <name>CustomPartitioner</name>
  <description>CustomPartitioner Example in Map Reduce</description>
</project>
Now edit this pom.xml file and add Hadoop dependencies, below is the complete pom.xml file, just copy and paste, it will work.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.javamakeuse.poc</groupId>
  <artifactId>CustomPartitioner</artifactId>
  <version>1.0</version>
  <name>CustomPartitioner</name>
  <description>CustomPartitioner Example</description>
  <dependencies>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-client</artifactId>
   <version>2.7.1</version>
  </dependency>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-mapreduce-client-core</artifactId>
   <version>2.7.1</version>
  </dependency>
 </dependencies>
 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
     <source>1.7</source>
     <target>1.7</target>
    </configuration>
   </plugin>
  </plugins>
 </build>
</project>
Step 3. ClickStreamMapper.java (Mapper)
This is our mapper class which will process LongWritable as a key and Text as a value and generate the output Text as a key and ClickStreamVO as a Value.
package com.javamakeuse.bd.poc;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ClickStreamMapper extends Mapper<LongWritable, Text, Text, ClickStreamVO> {

 IntWritable count = new IntWritable(1);
 Text country = new Text();

 @Override
 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  String[] columns = value.toString().split("\\t");

  if (columns.length > 4) {
   ClickStreamVO clickStreamVO = new ClickStreamVO();
   clickStreamVO.setReferredUrl(columns[1]);
   clickStreamVO.setRequestedUrl(columns[2]);
   clickStreamVO.setUserAgent(columns[3]);
   clickStreamVO.setCountry(columns[4]);
   country.set(columns[4]);
   // India 1
   context.write(country, clickStreamVO);
  }
 }
}
Step 4. ClickStreamReducer.java (Reducer)
This is our reducer class, which will run reduce task and produce the output.
package com.javamakeuse.bd.poc;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ClickStreamReducer extends Reducer<Text, ClickStreamVO, Text, ClickStreamVO> {

 @Override
 protected void reduce(Text key, Iterable<ClickStreamVO> values,
   Reducer<Text, ClickStreamVO, Text, ClickStreamVO>.Context context)
     throws IOException, InterruptedException {
  for (ClickStreamVO clickStream : values) {
   context.write(key, clickStream);
  }
 }
}

Step 5. ClickStreamVO.java (Custom Data type for values)
This class will play a role of custom data type for values, implemented the Writable interface.
package com.javamakeuse.bd.poc;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class ClickStreamVO implements Writable {
 private String referredUrl;
 private String requestedUrl;
 private String country;
 private String userAgent;

 public String getReferredUrl() {
  return referredUrl;
 }
 public void setReferredUrl(String referredUrl) {
  this.referredUrl = referredUrl;
 }
 public String getRequestedUrl() {
  return requestedUrl;
 }
 public void setRequestedUrl(String requestedUrl) {
  this.requestedUrl = requestedUrl;
 }
 public String getCountry() {
  return country;
 }
 public void setCountry(String country) {
  this.country = country;
 }
 public String getUserAgent() {
  return userAgent;
 }
 public void setUserAgent(String userAgent) {
  this.userAgent = userAgent;
 }
 @Override
 public void write(DataOutput out) throws IOException {
  out.writeUTF(country);
  out.writeUTF(requestedUrl);
  out.writeUTF(referredUrl);
  out.writeUTF(userAgent);
 }

 @Override
 public void readFields(DataInput in) throws IOException {
  country = in.readUTF();
  requestedUrl = in.readUTF();
  referredUrl = in.readUTF();
  userAgent = in.readUTF();
 }

 @Override
 public String toString() {
  return "ClickStreamVO [referredUrl=" + referredUrl + ", requestedUrl=" + requestedUrl + ", country=" + country
    + ", userAgent=" + userAgent + "]";
 }

}

Step 6. CountryPartitioner.java (Custom Partitioner)
This is our Custom Partitioner, we are going to set this class as a Partitioner class inside driver class.
package com.javamakeuse.bd.poc;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class CountryPartitioner extends Partitioner<Text, ClickStreamVO> {

 enum Country {
  INDIA, USA,
 }

 @Override
 public int getPartition(Text key, ClickStreamVO value, int numPartitions) {
  
  String country = key.toString();
  
  if (Country.INDIA.toString().equalsIgnoreCase(country)) {
   return 0;
  }
  if (Country.USA.toString().equalsIgnoreCase(country)) {
   return 1;
  } else {
   return 2;
  }
 }

}

Step 7. ClickStreamDriver.java (Driver class)
This is our driver/main class which will run the Custom Partitioner project.
package com.javamakeuse.bd.poc;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ClickStreamDriver extends Configured implements Tool {

 @Override
 public int run(String[] args) throws Exception {
  if (args.length != 2) {
   System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName());
   ToolRunner.printGenericCommandUsage(System.err);
   return -1;
  }

  Job job = Job.getInstance();
  job.setJarByClass(getClass());

  // input path
  FileInputFormat.addInputPath(job, new Path(args[0]));

  // output path
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  job.setMapperClass(ClickStreamMapper.class);
  job.setReducerClass(ClickStreamReducer.class);
  
  job.setPartitionerClass(CountryPartitioner.class);
  job.setNumReduceTasks(3);

  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(ClickStreamVO.class);

  return job.waitForCompletion(true) ? 0 : 1;
 }

 public static void main(String[] args) throws Exception {
  int exitCode = ToolRunner.run(new ClickStreamDriver(), args);
  System.exit(exitCode);
 }
}
Done, next to run this program, you can run it using any eclipse also and below are the steps to run using terminal.

Step 8. Steps to execute our Custom Partitioner project
i. Start Hadoop components,open your terminal and type
subodh@subodh-Inspiron-3520:~/software$ start-all.sh
ii. Verify Hadoop started or not with jps command
subodh@subodh-Inspiron-3520:~/software$ jps
8385 NameNode
8547 DataNode
5701 org.eclipse.equinox.launcher_1.3.100.v20150511-1540.jar
9446 Jps
8918 ResourceManager
9054 NodeManager
8751 SecondaryNameNode
You can verify with web-ui also using "http://localhost:50070/explorer.html#/" url.
iii. Create input folder on HDFS with below command.
subodh@subodh-Inspiron-3520:~/software$ hadoop fs -mkdir /input
The above command will create an input folder on HDFS, you can verify it using web UI, Now time to move input file which we need to process, below is the command to copy the click_stream_data.txt input file on HDFS inside input folder.
subodh@subodh-Inspiron-3520:~$ hadoop fs -copyFromLocal /home/subodh/programs/input/click_stream_data.txt /input/
Note - click_stream_data.txt file will available inside this project source code, you would be able to download it from our downloadable link, you will find downloadable link at the end of this tutorial.

Step 9. Create & Execute jar file
We almost done,now create jar file of CombinerExample source code. You can create jar file using eclipse or by using mvn package command.
To execute CombinerExample-1.0.jar file use below command
hadoop jar /home/subodh/CustomPartitioner-1.0.jar com.javamakeuse.bd.poc.ClickStreamDriver /input /output
Above will generate below output and also create an output folder with output of the CustomPartitioner project with three out file.
16/03/13 23:06:00 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
16/03/13 23:06:00 INFO mapred.Merger: Merging 1 sorted segments
16/03/13 23:06:00 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 159098 bytes
16/03/13 23:06:00 INFO reduce.MergeManagerImpl: Merged 1 segments, 159108 bytes to disk to satisfy reduce memory limit
16/03/13 23:06:00 INFO reduce.MergeManagerImpl: Merging 1 files, 159112 bytes from disk
16/03/13 23:06:00 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
16/03/13 23:06:00 INFO mapred.Merger: Merging 1 sorted segments
16/03/13 23:06:00 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 159098 bytes
16/03/13 23:06:00 INFO mapred.LocalJobRunner: 1 / 1 copied.
16/03/13 23:06:01 INFO mapreduce.Job: Job job_local1322927531_0001 running in uber mode : false
16/03/13 23:06:01 INFO mapreduce.Job:  map 100% reduce 67%
16/03/13 23:06:01 INFO mapred.Task: Task:attempt_local1322927531_0001_r_000002_0 is done. And is in the process of committing
16/03/13 23:06:01 INFO mapred.LocalJobRunner: 1 / 1 copied.
16/03/13 23:06:01 INFO mapred.Task: Task attempt_local1322927531_0001_r_000002_0 is allowed to commit now
16/03/13 23:06:01 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1322927531_0001_r_000002_0' to hdfs://localhost:9000/outputcp/_temporary/0/task_local1322927531_0001_r_000002
16/03/13 23:06:01 INFO mapred.LocalJobRunner: reduce > reduce
16/03/13 23:06:01 INFO mapred.Task: Task 'attempt_local1322927531_0001_r_000002_0' done.
16/03/13 23:06:01 INFO mapred.LocalJobRunner: Finishing task: attempt_local1322927531_0001_r_000002_0
16/03/13 23:06:01 INFO mapred.LocalJobRunner: reduce task executor complete.
16/03/13 23:06:02 INFO mapreduce.Job:  map 100% reduce 100%
16/03/13 23:06:02 INFO mapreduce.Job: Job job_local1322927531_0001 completed successfully
16/03/13 23:06:02 INFO mapreduce.Job: Counters: 35
 File System Counters
  FILE: Number of bytes read=372966
  FILE: Number of bytes written=1937150
  FILE: Number of read operations=0
  FILE: Number of large read operations=0
  FILE: Number of write operations=0
  HDFS: Number of bytes read=639892
  HDFS: Number of bytes written=164562
  HDFS: Number of read operations=38
  HDFS: Number of large read operations=0
  HDFS: Number of write operations=16
 Map-Reduce Framework
  Map input records=101
  Map output records=101
  Map output bytes=158704
  Map output materialized bytes=159124
  Input split bytes=114
  Combine input records=0
  Combine output records=0
  Reduce input groups=4
  Reduce shuffle bytes=159124
  Reduce input records=101
  Reduce output records=101
  Spilled Records=202
  Shuffled Maps =3
  Failed Shuffles=0
  Merged Map outputs=3
  GC time elapsed (ms)=6
  Total committed heap usage (bytes)=1022361600
 Shuffle Errors
  BAD_ID=0
  CONNECTION=0
  IO_ERROR=0
  WRONG_LENGTH=0
  WRONG_MAP=0
  WRONG_REDUCE=0
 File Input Format Counters 
  Bytes Read=159973
 File Output Format Counters 
  Bytes Written=164562


Step 10. Verify the output
subodh@subodh-Inspiron-3520:~$ hadoop fs -ls /output
16/03/13 23:18:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 4 items
-rw-r--r--   3 subodh supergroup          0 2016-03-13 23:06 /output/_SUCCESS
-rw-r--r--   3 subodh supergroup          0 2016-03-13 23:06 /output/part-r-00000
-rw-r--r--   3 subodh supergroup          0 2016-03-13 23:06 /output/part-r-00001
-rw-r--r--   3 subodh supergroup     164562 2016-03-13 23:06 /output/part-r-00002


That's it.

Download the complete example from here Source Code

Sponsored Links

1 comments: