0
Sponsored Links


Ad by Google
In this tutorial, I am going to show you step by step tutorial to create a hello world program(word count in map reduce) in Hadoop. The word count program is the very basic and of course the first program to run just like hello world program in any other language.

If you are very new to big data technologies, than you can go back to my previous post to take an overview of Big Data here at What is Big Data?

Prerequisites for this Tutorial-

Tools and Technologies we are using here:

  • Java 8
  • Eclipse Mars
  • Hadoop 2.7.1
  • Maven 3.3
  • Ubuntu 14(Linux OS)
Overview of the project structure-

Input file(sample.txt) contents-
apple mango banana papaya apple  banana papaya apple mango banana papaya apple mango banana papaya apple mango banana papaya 
apple mango banana papaya apple mango banana papaya apple mango banana 
OUTPUT :








Main Objects of this project are:
  • WordCountMapper.java (mapper class)
  • WordCountReducer.java (reducer class)
  • WordCountDriver.java (main class) 
  • pom.xml
Step 1. Create a new maven project
Go to File Menu then New->Maven Project, and provide the required details, see the below attached screen.
Step 2. Edit pom.xml
Double click on your project's pom.xml file, it will looks like this with very limited information.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.javamakeuse.hadoop.poc</groupId>
  <artifactId>WordCountExample</artifactId>
  <version>1.0</version>
  <name>WordCountExample</name>
  <description>WordCount example in map reduce</description>
</project>
Now add Hadoop dependencies entry inside your pom.xml, paste below codes inside project tag.
<dependencies>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-client</artifactId>
   <version>2.7.1</version>
  </dependency>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-mapreduce-client-core</artifactId>
   <version>2.7.1</version>
  </dependency>
 </dependencies>
Here is the complete pom.xml file
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.javamakeuse.hadoop.poc</groupId>
  <artifactId>WordCountExample</artifactId>
  <version>1.0</version>
  <name>WordCountExample</name>
  <description>WordCount example in map reduce</description>
  <dependencies>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-client</artifactId>
   <version>2.7.1</version>
  </dependency>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-mapreduce-client-core</artifactId>
   <version>2.7.1</version>
  </dependency>
 </dependencies>
 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
     <source>1.7</source>
     <target>1.7</target>
    </configuration>
   </plugin>
  </plugins>
 </build>
</project>
Step 3. WordCountMapper.java (Mapper class)
This is our mapper class which will process the Object as a key and Text as a value and generate the output Text as a key and IntWriteable as a Value.

package com.javamakeuse.hadoop.poc.mapper;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {

 @Override
 protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
   throws IOException, InterruptedException {

  // getting the string token from the value.
  StringTokenizer strTokens = new StringTokenizer(value.toString());

  // iterating the strTokens for
  while (strTokens.hasMoreTokens()) {

   // the word present inside the input file
   String word = strTokens.nextToken();

   // writing the word into context with occurrence as 1
   // example: apple 1, mango 1, apple 1
   context.write(new Text(word), new IntWritable(1));
  }

 }
 @Override
 protected void setup(Mapper<Object, Text, Text, IntWritable>.Context context)
   throws IOException, InterruptedException {
  System.out.println("calls only once at startup");
 }
 @Override
 protected void cleanup(Mapper<Object, Text, Text, IntWritable>.Context context)
   throws IOException, InterruptedException {
  System.out.println("calls only once at end");
 }

}
Step 4. WordCountReducer.java (Reducer class)
The reducer class which will process the output generated by mapper and calculate the final result.
package com.javamakeuse.hadoop.poc.reducer;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text,IntWritable> {

 @Override
 protected void reduce(Text key, Iterable<IntWritable> values,
   Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

  int noOfFrequency = 0;
  for (IntWritable occurance : values) {
   noOfFrequency += occurance.get();
  }
  context.write(key,new IntWritable(noOfFrequency));
 }

}
Step 5. WordCountDriver.java (Main class to run map reduce program)
The main class which will call both mapper and reducer class.
package com.javamakeuse.hadoop.poc.driver;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.javamakeuse.hadoop.poc.mapper.WordCountMapper;
import com.javamakeuse.hadoop.poc.reducer.WordCountReducer;

public class WordCountDriver {
 public static void main(String[] args) {
  if (args.length < 2) {
   System.err.println("input path ");
  }

  try {
   Job job = Job.getInstance();
   job.setJobName("Word Count");

   // set file input/output path
   FileInputFormat.addInputPath(job, new Path(args[0]));
   FileOutputFormat.setOutputPath(job, new Path(args[1]));

   // set jar class name
   job.setJarByClass(WordCountDriver.class);

   // set mapper and reducer to job
   job.setMapperClass(WordCountMapper.class);
   job.setReducerClass(WordCountReducer.class);

   // set output key class
   job.setOutputKeyClass(Text.class);
   job.setOutputValueClass(IntWritable.class);

   int returnValue = job.waitForCompletion(true) ? 0 : 1;
   System.out.println(job.isSuccessful());

   System.exit(returnValue);

  } catch (IOException | ClassNotFoundException | InterruptedException e) {
   e.printStackTrace();
  }
 }
}
Step 6. Steps to execute our Map-Reduce Program
i. Start Hadoop components,open your terminal and type
subodh@subodh-Inspiron-3520:~/software$ start-all.sh
ii. Verify Hadoop started or not with jps command
subodh@subodh-Inspiron-3520:~/software$ jps
8385 NameNode
8547 DataNode
5701 org.eclipse.equinox.launcher_1.3.100.v20150511-1540.jar
9446 Jps
8918 ResourceManager
9054 NodeManager
8751 SecondaryNameNode
You can verify with web-ui also using "http://localhost:50070/explorer.html#/" url.
iii. Create input folder on HDFS with below command.
subodh@subodh-Inspiron-3520:~/software$ hadoop fs -mkdir /input
The above command will create an input folder on HDFS, you can verify it using web UI, Now time to move input file which we need to process, below is the command to copy the sample.txt input file on HDFS inside input folder.
subodh@subodh-Inspiron-3520:~$ hadoop fs -copyFromLocal /home/subodh/programs/input/sample.txt /input/
Note - you can create sample.txt file with any words, or off-course you can download it from our this tutorial, at the end you will find a downloadable link.

Step 7. Create & Execute jar file
We almost done,now create jar file of the WordCountExample source code. You can create jar file using eclipse or by using mvn package command.
To execute WordCountExample-1.0.jar file use below command
hadoop jar /home/subodh/WordCountExample-1.0.jar com.javamakeuse.hadoop.poc.driver.WordCountDriver /input /output
Above will generate below output and also create an output folder with output of the WordCountExample program.
16/01/26 22:52:25 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/26 22:52:26 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/01/26 22:52:26 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/01/26 22:52:26 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/01/26 22:52:26 INFO input.FileInputFormat: Total input paths to process : 1
16/01/26 22:52:26 INFO mapreduce.JobSubmitter: number of splits:1
16/01/26 22:52:27 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local932083413_0001
16/01/26 22:52:27 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
16/01/26 22:52:27 INFO mapreduce.Job: Running job: job_local932083413_0001
16/01/26 22:52:27 INFO mapred.LocalJobRunner: OutputCommitter set in config null
16/01/26 22:52:27 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/01/26 22:52:27 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
16/01/26 22:52:27 INFO mapred.LocalJobRunner: Waiting for map tasks
16/01/26 22:52:27 INFO mapred.LocalJobRunner: Starting task: attempt_local932083413_0001_m_000000_0
16/01/26 22:52:27 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/01/26 22:52:27 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
16/01/26 22:52:27 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/input/sample.txt:0+197
16/01/26 22:52:27 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
16/01/26 22:52:27 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
16/01/26 22:52:27 INFO mapred.MapTask: soft limit at 83886080
16/01/26 22:52:27 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
16/01/26 22:52:27 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
16/01/26 22:52:27 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
calls only once at startup
calls only once at end
16/01/26 22:52:27 INFO mapred.LocalJobRunner: 
16/01/26 22:52:27 INFO mapred.MapTask: Starting flush of map output
16/01/26 22:52:27 INFO mapred.MapTask: Spilling map output
16/01/26 22:52:27 INFO mapred.MapTask: bufstart = 0; bufend = 315; bufvoid = 104857600
16/01/26 22:52:27 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214280(104857120); length = 117/6553600
16/01/26 22:52:27 INFO mapred.MapTask: Finished spill 0
16/01/26 22:52:27 INFO mapred.Task: Task:attempt_local932083413_0001_m_000000_0 is done. And is in the process of committing
16/01/26 22:52:27 INFO mapred.LocalJobRunner: map
16/01/26 22:52:27 INFO mapred.Task: Task 'attempt_local932083413_0001_m_000000_0' done.
16/01/26 22:52:27 INFO mapred.LocalJobRunner: Finishing task: attempt_local932083413_0001_m_000000_0
16/01/26 22:52:27 INFO mapred.LocalJobRunner: map task executor complete.
16/01/26 22:52:27 INFO mapred.LocalJobRunner: Waiting for reduce tasks
16/01/26 22:52:27 INFO mapred.LocalJobRunner: Starting task: attempt_local932083413_0001_r_000000_0
16/01/26 22:52:27 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/01/26 22:52:27 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
16/01/26 22:52:27 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@6b374b09
16/01/26 22:52:27 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=334338464, maxSingleShuffleLimit=83584616, mergeThreshold=220663392, ioSortFactor=10, memToMemMergeOutputsThreshold=10
16/01/26 22:52:27 INFO reduce.EventFetcher: attempt_local932083413_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
16/01/26 22:52:27 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local932083413_0001_m_000000_0 decomp: 377 len: 381 to MEMORY
16/01/26 22:52:27 INFO reduce.InMemoryMapOutput: Read 377 bytes from map-output for attempt_local932083413_0001_m_000000_0
16/01/26 22:52:27 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 377, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->377
16/01/26 22:52:27 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
16/01/26 22:52:27 INFO mapred.LocalJobRunner: 1 / 1 copied.
16/01/26 22:52:27 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
16/01/26 22:52:27 INFO mapred.Merger: Merging 1 sorted segments
16/01/26 22:52:27 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 369 bytes
16/01/26 22:52:27 INFO reduce.MergeManagerImpl: Merged 1 segments, 377 bytes to disk to satisfy reduce memory limit
16/01/26 22:52:27 INFO reduce.MergeManagerImpl: Merging 1 files, 381 bytes from disk
16/01/26 22:52:27 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
16/01/26 22:52:27 INFO mapred.Merger: Merging 1 sorted segments
16/01/26 22:52:27 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 369 bytes
16/01/26 22:52:27 INFO mapred.LocalJobRunner: 1 / 1 copied.
16/01/26 22:52:27 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
16/01/26 22:52:27 INFO mapred.Task: Task:attempt_local932083413_0001_r_000000_0 is done. And is in the process of committing
16/01/26 22:52:27 INFO mapred.LocalJobRunner: 1 / 1 copied.
16/01/26 22:52:27 INFO mapred.Task: Task attempt_local932083413_0001_r_000000_0 is allowed to commit now
16/01/26 22:52:27 INFO output.FileOutputCommitter: Saved output of task 'attempt_local932083413_0001_r_000000_0' to hdfs://localhost:9000/output/_temporary/0/task_local932083413_0001_r_000000
16/01/26 22:52:27 INFO mapred.LocalJobRunner: reduce > reduce
16/01/26 22:52:27 INFO mapred.Task: Task 'attempt_local932083413_0001_r_000000_0' done.
16/01/26 22:52:27 INFO mapred.LocalJobRunner: Finishing task: attempt_local932083413_0001_r_000000_0
16/01/26 22:52:27 INFO mapred.LocalJobRunner: reduce task executor complete.
16/01/26 22:52:28 INFO mapreduce.Job: Job job_local932083413_0001 running in uber mode : false
16/01/26 22:52:28 INFO mapreduce.Job:  map 100% reduce 100%
16/01/26 22:52:28 INFO mapreduce.Job: Job job_local932083413_0001 completed successfully
16/01/26 22:52:28 INFO mapreduce.Job: Counters: 35
 File System Counters
  FILE: Number of bytes read=13358
  FILE: Number of bytes written=562883
  FILE: Number of read operations=0
  FILE: Number of large read operations=0
  FILE: Number of write operations=0
  HDFS: Number of bytes read=394
  HDFS: Number of bytes written=34
  HDFS: Number of read operations=13
  HDFS: Number of large read operations=0
  HDFS: Number of write operations=4
 Map-Reduce Framework
  Map input records=2
  Map output records=30
  Map output bytes=315
  Map output materialized bytes=381
  Input split bytes=103
  Combine input records=0
  Combine output records=0
  Reduce input groups=4
  Reduce shuffle bytes=381
  Reduce input records=30
  Reduce output records=4
  Spilled Records=60
  Shuffled Maps =1
  Failed Shuffles=0
  Merged Map outputs=1
  GC time elapsed (ms)=6
  Total committed heap usage (bytes)=497549312
 Shuffle Errors
  BAD_ID=0
  CONNECTION=0
  IO_ERROR=0
  WRONG_LENGTH=0
  WRONG_MAP=0
  WRONG_REDUCE=0
 File Input Format Counters 
  Bytes Read=197
 File Output Format Counters 
  Bytes Written=34
true

Step 8. Verify output
Now time to verify the output of the WordCountExample program, the above command will create an output folder inside HDFS and inside output folder it will create two file _SUCCESS and part-r-00000, let see the output using below cat command.
subodh@subodh-Inspiron-3520:~$ hadoop fs -cat /output/part-r-00000
16/01/26 23:07:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
apple 8
banana 8
mango 7
papaya 7

You can also verify it on Web-UI on "http://localhost:50070/explorer.html#/output" below is the web ui screen shot


Note - To verify your logic you can directly run the WordCountExample from the eclipse also.

Now once you executed the Word count in MapReduce, next is to understand the internals of MapReduce like Hadoop data types and How MapReduce works.
Done :)

Download the complete example from here Source Code



Sponsored Links

0 comments:

Post a Comment