0
Sponsored Links


Ad by Google
The Combiner is one of the powerful feature of Hadoop map reduce programming and used as an optimization technique. In my previous post, we have seen an example of custom data type in Hadoop at here using WritableComparable interface.
In this tutorial, I am going to show you an example of map reduce programming using Combiner, although, we have already seen an example of map reduce programming in our wordcount tutorial.


What is Combiner?
Combiner in map reduce programming is known as a local reducer/mini reducer. It can be used to reduce the amount of data transfer between map and reduce phase. And, It's known as one of the key component to reduce the network traffic between map and reduce phase. Combiner runs on the output of mapper task.
Possible that Combiner will not run even if you used in your map reduce programming. It's better to use combiner as an optimization. It's not guarantee that Combiner will always run, it depends on the spilled. Possible that sometimes mapper output doesn’t needs to spilled to the disk and that time Combiner will not execute even it's used in map reduce programming.

Note - Don't use Combiner blindly because it can be caused of wrong output, if you are using combiner while doing aggregation like in average function. It can be used in commutative and associative function.

Ok, let's implement it in a map reduce programming, I am going to apply Combiner in our click stream analysis program. We have a click stream data set with these values in a tab separated format.
  • timestamp
  • requested_url
  • referring_url
  • user_agent
  • country
  • mac_address
From these dataset, I am going to calculate the clicks by country using Combiner.

Tools and Technologies we are using here:

  • Java 8
  • Eclipse Mars
  • Hadoop 2.7.1
  • Maven 3.3
  • Ubuntu 14(Linux OS)

Overview of the project structure:
Main Objects of this project are:
  • ClickStreamDriver.java
  • ClickStreamReducer.java
  • ClickStreamMapper.java
  • pom.xml
Step 1. Create a new maven project
Go to File Menu then New->Maven Project, and provide the required details, see the below attached screen.

Step 2. Edit pom.xml
Double click on your project's pom.xml file, it will looks like this with very limited information.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.javamakeuse.bd.exampl</groupId>
  <artifactId>CombinerExample</artifactId>
  <version>1.0</version>
  <name>CombinerExample</name>
  <description>Combiner Example in Map Reduce</description>
</project>
Now edit this pom.xml file and add Hadoop dependencies, below is the complete pom.xml file.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.javamakeuse.bd.exampl</groupId>
  <artifactId>CombinerExample</artifactId>
  <version>1.0</version>
  <name>CombinerExample</name>
  <description>Combiner Example in Map Reduce</description>
  <dependencies>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-client</artifactId>
   <version>2.7.1</version>
  </dependency>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-mapreduce-client-core</artifactId>
   <version>2.7.1</version>
  </dependency>
 </dependencies>
 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
     <source>1.7</source>
     <target>1.7</target>
    </configuration>
   </plugin>
  </plugins>
 </build>
</project>
Step 3. ClickStreamMapper.java (Mapper)
This is our mapper class which will process LongWritable as a key and Text as a value and generate the output Text as a key and IntWriteable as a Value.
package com.javamakeuse.bd;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ClickStreamMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

 IntWritable count = new IntWritable(1);
 Text country = new Text();

 @Override
 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  String[] columns = value.toString().split("\\t");

  if (columns.length > 4) {
   country.set(columns[4]);
   // India 1
   context.write(country, count);
  }
 }
}
Step 4. ClickStreamReducer.java (Reducer)
This class will play a role of Combiner, there is no any specific implementation required for combiner it's just a reducer class, which can be set as a Combiner.
package com.javamakeuse.bd;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ClickStreamReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

 @Override
 public void reduce(Text key, Iterable<IntWritable> values, Context context)
   throws IOException, InterruptedException {

  int noOfClicks = 0;
  for (IntWritable value : values) {
   // calculating the number of clicks per country.
   noOfClicks += value.get();
  }
  // writing to the disk
  context.write(key, new IntWritable(noOfClicks));
 }
}
Step 5. ClickStreamDriver.java (Driver main)
Main class to run, and produce the output at the destination,and in this class we defined Combiner also, see line no 33.
package com.javamakeuse.bd;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ClickStreamDriver extends Configured implements Tool {

 @Override
 public int run(String[] args) throws Exception {
  if (args.length != 2) {
   System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName());
   ToolRunner.printGenericCommandUsage(System.err);
   return -1;
  }

  Job job = Job.getInstance();
  job.setJarByClass(getClass());
                job.setJobName("ClickStream");
  //input path
  FileInputFormat.addInputPath(job, new Path(args[0]));
  
  // output path
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  job.setMapperClass(ClickStreamMapper.class);
  job.setCombinerClass(ClickStreamReducer.class);
  job.setReducerClass(ClickStreamReducer.class);

  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  return job.waitForCompletion(true) ? 0 : 1;
 }

 public static void main(String[] args) throws Exception {
  int exitCode = ToolRunner.run(new ClickStreamDriver(), args);
  System.exit(exitCode);
 }
}
Step 6. Steps to execute our Click Stream Program
i. Start Hadoop components,open your terminal and type
subodh@subodh-Inspiron-3520:~/software$ start-all.sh
ii. Verify Hadoop started or not with jps command
subodh@subodh-Inspiron-3520:~/software$ jps
8385 NameNode
8547 DataNode
5701 org.eclipse.equinox.launcher_1.3.100.v20150511-1540.jar
9446 Jps
8918 ResourceManager
9054 NodeManager
8751 SecondaryNameNode
You can verify with web-ui also using "http://localhost:50070/explorer.html#/" url.
iii. Create input folder on HDFS with below command.
subodh@subodh-Inspiron-3520:~/software$ hadoop fs -mkdir /input
The above command will create an input folder on HDFS, you can verify it using web UI, Now time to move input file which we need to process, below is the command to copy the click_stream_data.txt input file on HDFS inside input folder.
subodh@subodh-Inspiron-3520:~$ hadoop fs -copyFromLocal /home/subodh/programs/input/click_stream_data.txt /input/
Note - click_stream_data.txt file will available inside this project source code, you would be able to download it from our downloadable link, you will find downloadable link at the end of this tutorial.

Step 7. Create & Execute jar file
We almost done,now create jar file of CombinerExample source code. You can create jar file using eclipse or by using mvn package command.
To execute CombinerExample-1.0.jar file use below command
hadoop jar /home/subodh/CombinerExample-1.0.jar com.javamakeuse.bd.ClickStreamDriver /input /output
Above will generate below output and also create an output folder with output of the CombinerExample project.
16/03/08 23:36:20 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/03/08 23:36:21 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/03/08 23:36:21 INFO input.FileInputFormat: Total input paths to process : 1
16/03/08 23:36:21 INFO mapreduce.JobSubmitter: number of splits:1
16/03/08 23:36:21 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1706395962_0001
16/03/08 23:36:21 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
16/03/08 23:36:21 INFO mapreduce.Job: Running job: job_local1706395962_0001
16/03/08 23:36:21 INFO mapred.LocalJobRunner: OutputCommitter set in config null
16/03/08 23:36:21 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/03/08 23:36:21 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
16/03/08 23:36:21 INFO mapred.LocalJobRunner: Waiting for map tasks
16/03/08 23:36:21 INFO mapred.LocalJobRunner: Starting task: attempt_local1706395962_0001_m_000000_0
16/03/08 23:36:21 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/03/08 23:36:21 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
16/03/08 23:36:21 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/input/click_stream_data.txt:0+159973
16/03/08 23:36:22 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
16/03/08 23:36:22 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
16/03/08 23:36:22 INFO mapred.MapTask: soft limit at 83886080
16/03/08 23:36:22 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
16/03/08 23:36:22 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
16/03/08 23:36:22 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
16/03/08 23:36:22 INFO mapred.LocalJobRunner: 
16/03/08 23:36:22 INFO mapred.MapTask: Starting flush of map output
16/03/08 23:36:22 INFO mapred.MapTask: Spilling map output
16/03/08 23:36:22 INFO mapred.MapTask: bufstart = 0; bufend = 1120; bufvoid = 104857600
16/03/08 23:36:22 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26213996(104855984); length = 401/6553600
16/03/08 23:36:22 INFO mapred.MapTask: Finished spill 0
16/03/08 23:36:22 INFO mapred.Task: Task:attempt_local1706395962_0001_m_000000_0 is done. And is in the process of committing
16/03/08 23:36:22 INFO mapred.LocalJobRunner: map
16/03/08 23:36:22 INFO mapred.Task: Task 'attempt_local1706395962_0001_m_000000_0' done.
16/03/08 23:36:22 INFO mapred.LocalJobRunner: Finishing task: attempt_local1706395962_0001_m_000000_0
16/03/08 23:36:22 INFO mapred.LocalJobRunner: map task executor complete.
16/03/08 23:36:22 INFO mapred.LocalJobRunner: Waiting for reduce tasks
16/03/08 23:36:22 INFO mapred.LocalJobRunner: Starting task: attempt_local1706395962_0001_r_000000_0
16/03/08 23:36:22 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/03/08 23:36:22 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
16/03/08 23:36:22 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@30d6304e
16/03/08 23:36:22 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=334338464, maxSingleShuffleLimit=83584616, mergeThreshold=220663392, ioSortFactor=10, memToMemMergeOutputsThreshold=10
16/03/08 23:36:22 INFO reduce.EventFetcher: attempt_local1706395962_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
16/03/08 23:36:22 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local1706395962_0001_m_000000_0 decomp: 63 len: 67 to MEMORY
16/03/08 23:36:22 INFO reduce.InMemoryMapOutput: Read 63 bytes from map-output for attempt_local1706395962_0001_m_000000_0
16/03/08 23:36:22 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 63, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->63
16/03/08 23:36:22 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
16/03/08 23:36:22 INFO mapred.LocalJobRunner: 1 / 1 copied.
16/03/08 23:36:22 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
16/03/08 23:36:22 INFO mapred.Merger: Merging 1 sorted segments
16/03/08 23:36:22 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 55 bytes
16/03/08 23:36:22 INFO reduce.MergeManagerImpl: Merged 1 segments, 63 bytes to disk to satisfy reduce memory limit
16/03/08 23:36:22 INFO reduce.MergeManagerImpl: Merging 1 files, 67 bytes from disk
16/03/08 23:36:22 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
16/03/08 23:36:22 INFO mapred.Merger: Merging 1 sorted segments
16/03/08 23:36:22 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 55 bytes
16/03/08 23:36:22 INFO mapred.LocalJobRunner: 1 / 1 copied.
16/03/08 23:36:22 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
16/03/08 23:36:22 INFO mapreduce.Job: Job job_local1706395962_0001 running in uber mode : false
16/03/08 23:36:22 INFO mapreduce.Job:  map 100% reduce 0%
16/03/08 23:36:23 INFO mapred.Task: Task:attempt_local1706395962_0001_r_000000_0 is done. And is in the process of committing
16/03/08 23:36:23 INFO mapred.LocalJobRunner: 1 / 1 copied.
16/03/08 23:36:23 INFO mapred.Task: Task attempt_local1706395962_0001_r_000000_0 is allowed to commit now
16/03/08 23:36:23 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1706395962_0001_r_000000_0' to hdfs://localhost:9000/user/subodh/outputc/_temporary/0/task_local1706395962_0001_r_000000
16/03/08 23:36:23 INFO mapred.LocalJobRunner: reduce > reduce
16/03/08 23:36:23 INFO mapred.Task: Task 'attempt_local1706395962_0001_r_000000_0' done.
16/03/08 23:36:23 INFO mapred.LocalJobRunner: Finishing task: attempt_local1706395962_0001_r_000000_0
16/03/08 23:36:23 INFO mapred.LocalJobRunner: reduce task executor complete.
16/03/08 23:36:23 INFO mapreduce.Job:  map 100% reduce 100%
16/03/08 23:36:23 INFO mapreduce.Job: Job job_local1706395962_0001 completed successfully
16/03/08 23:36:23 INFO mapreduce.Job: Counters: 35
 File System Counters
  FILE: Number of bytes read=11380
  FILE: Number of bytes written=565119
  FILE: Number of read operations=0
  FILE: Number of large read operations=0
  FILE: Number of write operations=0
  HDFS: Number of bytes read=319946
  HDFS: Number of bytes written=47
  HDFS: Number of read operations=13
  HDFS: Number of large read operations=0
  HDFS: Number of write operations=4
 Map-Reduce Framework
  Map input records=101
  Map output records=101
  Map output bytes=1120
  Map output materialized bytes=67
  Input split bytes=114
  Combine input records=101
  Combine output records=4
  Reduce input groups=4
  Reduce shuffle bytes=67
  Reduce input records=4
  Reduce output records=4
  Spilled Records=8
  Shuffled Maps =1
  Failed Shuffles=0
  Merged Map outputs=1
  GC time elapsed (ms)=5
  Total committed heap usage (bytes)=526385152
 Shuffle Errors
  BAD_ID=0
  CONNECTION=0
  IO_ERROR=0
  WRONG_LENGTH=0
  WRONG_MAP=0
  WRONG_REDUCE=0
 File Input Format Counters 
  Bytes Read=159973
 File Output Format Counters 
  Bytes Written=47
We are using Combiner, so reducer will received the commutative value from mapper, without using combiner mapper will generate something like this value -
China 1
China 1
China 1
Pakistan 1
Pakistan 1
Pakistan 1
Pakistan 1
United States 1

Step 8. Verify output
You can verify output inside output folder.
subodh@subodh-Inspiron-3520:~/programs$ hadoop fs -cat /output/pa*
16/03/08 23:52:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
China 84
Pakistan 4
United States 12

That's it :)

Download the complete example from here Source Code

Sponsored Links

0 comments:

Post a Comment