0
Sponsored Links


Ad by Google
Joins are very important aspect in any databases and, In Hadoop MapReduce joins are also available to join the multiple datasets. Joins are always involved while preparing data for presentation from multiple tables/datasets like fetching users activities on social media, where user and user_activity two different datasets needs to be incorporate to extract the informations. We have already seen some most popular features of Hadoop MapReduce like Custom Data Types, Partitioner in MapReduce etc.

Joins in Hadoop MapReduce
Hadoop MapReduce supports two types of joins-
In this tutorial, I am going to show you an example of Reduce side join.
Reduce Side Join
A reduce side join is very simple and easy to implement as compared to map side join, but yes it is highly payee join as compared to map side join, because both datasets needs to go through with shuffle&sort phase, for more about internals of MapReduce and how it works see(how MapReduce work). Reduce side join required some additional activity/code to implement with the combination of Secondary sort, so must see an example of Secondary sort, below are the steps needs to perform to achieve the reduce side join.
  1. The key of the mapper output must be the join key like primary key, so that the record with same key must go to the same reducer.
  2. While processing the datasets in mapper phase, must be tagged with an identifier so that both datasets can be easily identified.(master datasets must be reached to reducer before the datasets to be joined)
  3. The Secondary sort must be implemented to ensure the ordering of the values reached to reducer.
  4. If the input datasets are in different format than must be use multiple mapper with MultipleInputs to process the multiple input files.
In this tutorial, I am going to show you an example of reduce side join in MapReduce framework. OK let's find the user activities on social media like a user what action performed on a social media,commenting on a post,shared something etc.
And for these we have two different log files -
  • user.log(tab separated)
  • user_activity.log(pipe | separated)
Note: input datasets used in this project, will be available inside the project source code in an input folder and it can be downloadable from the below given downloadable link.

Here is the tabular view of these datasets,
1. user.log






2. user_activity.log

3. Expected output

Tools and Technologies we are using here:

  • Java 8
  • Eclipse Mars
  • Hadoop 2.7.1
  • Maven 3.3
  • Ubuntu 14(Linux OS)
Main Objects of this project are:
  • CustomKey.java(composite key custom datatype)
  • KeyPartitioner.java(custom partitioner)
  • GroupComparator.java(comparator)
  • UserMapper.java(for tab separator)
  • UserActivityMapper.java(for pipe | separator)
  • UserActivityReducer.java
  • UserActivityDriver.java(with multiple input paths)
Step 1. Create a new maven project
Go to File Menu then New->Maven Project, and provide the required details, see the below attached screen.
Step 2. Edit pom.xml
Double click on your project's pom.xml file, it will looks like this with very limited information.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.javamakeuse.bd.poc</groupId>
  <artifactId>ReduceSideJoin</artifactId>
  <version>1.0</version>
  <name>ReduceSideJoin</name>
  <description>ReduceSideJoin Example in Map Reduce</description>
</project>
Now edit this pom.xml file and add Hadoop dependencies, below is the complete pom.xml file, just copy and paste, it will work.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.javamakeuse.bd.poc</groupId>
  <artifactId>ReduceSideJoin</artifactId>
  <version>1.0</version>
  <name>ReduceSideJoin</name>
  <description>ReduceSideJoin Example</description>
  <dependencies>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-client</artifactId>
   <version>2.7.1</version>
  </dependency>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-mapreduce-client-core</artifactId>
   <version>2.7.1</version>
  </dependency>
 </dependencies>
 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
     <source>1.7</source>
     <target>1.7</target>
    </configuration>
   </plugin>
  </plugins>
 </build>
</project>
Step 3. CustomKey.java
CustomKey is a custom datatype used as key in mapper and reducer phase,contains userId and dataSetType to identify the type of data like user.log or user_activity.log(1=user.log and 2=user_activity.log).
package com.javamakeuse.bd.poc.util;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class CustomKey implements WritableComparable<CustomKey> {
 private Integer userId;
 private Integer dataSetType;

 public CustomKey() {
 }
 public CustomKey(Integer userId, Integer dataSetType) {
  super();
  this.userId = userId;
  this.dataSetType = dataSetType;
 }
 public Integer getUserId() {
  return userId;
 }
 public Integer getDataSetType() {
  return dataSetType;
 }
 @Override
 public void write(DataOutput out) throws IOException {
  out.writeInt(userId);
  out.writeInt(dataSetType);
 }
 @Override
 public void readFields(DataInput in) throws IOException {
  userId = in.readInt();
  dataSetType = in.readInt();
 }
 @Override
 public int hashCode() {
  final int prime = 31;
  int result = 1;
  result = prime * result + ((dataSetType == null) ? 0 : dataSetType.hashCode());
  result = prime * result + ((userId == null) ? 0 : userId.hashCode());
  return result;
 }
 @Override
 public boolean equals(Object obj) {
  if (this == obj)
   return true;
  if (obj == null)
   return false;
  if (getClass() != obj.getClass())
   return false;
  CustomKey other = (CustomKey) obj;
  if (dataSetType == null) {
   if (other.dataSetType != null)
    return false;
  } else if (!dataSetType.equals(other.dataSetType))
   return false;
  if (userId == null) {
   if (other.userId != null)
    return false;
  } else if (!userId.equals(other.userId))
   return false;
  return true;
 }
 @Override
 public int compareTo(CustomKey o) {
  int returnValue = compare(userId, o.getUserId());
  if (returnValue != 0) {
   return returnValue;
  }
  return compare(dataSetType, o.getDataSetType());
 }
 public static int compare(int k1, int k2) {
  return (k1 < k2 ? -1 : (k1 == k2 ? 0 : 1));
 }
 @Override
 public String toString() {
  return "CustomKey [userId=" + userId + ", dataSetType=" + dataSetType + "]";
 }

}
Step 4. GroupComparator.java
Group the joined data based on the natural key.
package com.javamakeuse.bd.poc.util;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class GroupComparator extends WritableComparator {
 protected GroupComparator() {
  super(CustomKey.class, true);
 }

 @Override
 public int compare(WritableComparable w1, WritableComparable w2) {
  CustomKey ck = (CustomKey) w1;
  CustomKey ck2 = (CustomKey) w2;
  return CustomKey.compare(ck.getUserId(), ck2.getUserId());
 }
}
Step 5. KeyPartitioner.java
Custom partitioner to control the keys of intermediate output of mappers, custom partitioner tutorial.
package com.javamakeuse.bd.poc.util;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class KeyPartitioner extends Partitioner<CustomKey, Text> {

 @Override
 public int getPartition(CustomKey key, Text value, int numPartitions) {
  return (key.getUserId().hashCode() & Integer.MAX_VALUE) % numPartitions;
 }

}
Step 6. UserMapper.java
Mapper class to process the tab separated user.log dataset, which will treated as master dataset for this project, here we are adding 1 with key for user.log dataset.
package com.javamakeuse.bd.poc.mapper;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.javamakeuse.bd.poc.util.CustomKey;

public class UserMapper extends Mapper<LongWritable, Text, CustomKey, Text> {
 @Override
 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CustomKey, Text>.Context context)
   throws IOException, InterruptedException {
  String[] columns = value.toString().split("\t");
  int userId = Integer.parseInt(columns[0]);
  // dataSetType 1= user.log data
  context.write(new CustomKey(userId, 1), new Text(columns[1]));
 }
}
Step 7. UserActivityMapper.java
Mapper class to process the pipe | separated user_activity.log dataset, here we are adding 2 with key for user_activity.log data.
package com.javamakeuse.bd.poc.mapper;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.javamakeuse.bd.poc.util.CustomKey;

public class UserActivityMapper extends Mapper<LongWritable, Text, CustomKey, Text> {

 @Override
 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CustomKey, Text>.Context context)
   throws IOException, InterruptedException {

  String[] columns = value.toString().split("\\|");

  if (columns != null && columns.length > 3) {
   // dataSetType 2=user_activity.log data
   context.write(new CustomKey(Integer.parseInt(columns[1]), 2), new Text(columns[2] + "\t" + columns[3]));
  }
 }
}
Step 8. UserActivityReducer.java
This is our reducer class and here we are joining the two datasets, extracting first record from the values which is user data as we had added 1 for user and 2 for user_activity data, so the first record corresponding to a key should be user name and next records are user_activity data.
package com.javamakeuse.bd.poc.reducer;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.javamakeuse.bd.poc.util.CustomKey;

public class UserActivityReducer extends Reducer<CustomKey, Text, IntWritable, Text> {
 @Override
 protected void reduce(CustomKey key, Iterable<Text> values,
   Reducer<CustomKey, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException {
  Iterator<Text> itr = values.iterator();
  Text userName = new Text(itr.next());
  while (itr.hasNext()) {
   Text activityInfo = itr.next();
   Text userActivityInfo = new Text(userName.toString() + "\t" + activityInfo.toString());
   context.write(new IntWritable(key.getUserId()), userActivityInfo);
  }
 }
}
Step 9. UserActivityDriver.java
Our main class to run the reduce side join example, here we have used MultipleInputPaths for two different datasets and also sets partitioner and group comparator to job instance.
package com.javamakeuse.bd.poc.driver;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.javamakeuse.bd.poc.mapper.UserActivityMapper;
import com.javamakeuse.bd.poc.mapper.UserMapper;
import com.javamakeuse.bd.poc.reducer.UserActivityReducer;
import com.javamakeuse.bd.poc.util.CustomKey;
import com.javamakeuse.bd.poc.util.GroupComparator;
import com.javamakeuse.bd.poc.util.KeyPartitioner;

public class UserActivityDriver extends Configured implements Tool {

 public static void main(String[] args) {
  try {
   int status = ToolRunner.run(new UserActivityDriver(), args);
   System.exit(status);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 public int run(String[] args) throws Exception {
  if (args.length != 3) {
   System.err.printf("Usage: %s [generic options] <input1> <output>\n", getClass().getSimpleName());
   ToolRunner.printGenericCommandUsage(System.err);
   return -1;
  }
  Job job = Job.getInstance();
  job.setJarByClass(getClass());
  job.setJobName("ReduceSideJoin Example");
  // input paths
  MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, UserActivityMapper.class);
  MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, UserMapper.class);

  // output path
  FileOutputFormat.setOutputPath(job, new Path(args[2]));

  job.setReducerClass(UserActivityReducer.class);

  job.setPartitionerClass(KeyPartitioner.class);
  job.setGroupingComparatorClass(GroupComparator.class);

  job.setMapOutputKeyClass(CustomKey.class);
  job.setMapOutputValueClass(Text.class);

  job.setOutputKeyClass(IntWritable.class);
  job.setOutputValueClass(Text.class);

  return job.waitForCompletion(true) ? 0 : 1;
 }

}
Done, next to run this program, you can run it using any eclipse also, below are the steps to run using terminal.

Step 10. Steps to execute ReduceSideJoin project
i. Start Hadoop components,open your terminal and type
subodh@subodh-Inspiron-3520:~/software$ start-all.sh
ii. Verify Hadoop started or not with jps command
subodh@subodh-Inspiron-3520:~/software$ jps
8385 NameNode
8547 DataNode
5701 org.eclipse.equinox.launcher_1.3.100.v20150511-1540.jar
9446 Jps
8918 ResourceManager
9054 NodeManager
8751 SecondaryNameNode
You can verify with web-ui also using "http://localhost:50070/explorer.html#/" url.
iii. Create input folder on HDFS using below command.
subodh@subodh-Inspiron-3520:~/software$ hadoop fs -mkdir /input
The above command will create an input folder on HDFS, you can verify it using web UI, Now time to move input file which we need to process, below is the command to copy the user.log and user_activity.log input file on HDFS inside input folder.
subodh@subodh-Inspiron-3520:~$ hadoop fs -copyFromLocal /home/subodh/programs/input/user.log user_activity.log /input/
Note - user.log and user_activity.log file is available inside this project source code, you would be able to download it from our downloadable link, you will be find downloadable link at the end of this tutorial.

Step 11. Create & Execute jar file
We almost done,now create jar file of ReduceSideJoin source code. You can create jar file using eclipse or by using mvn package command.
To execute ReduceSideJoin-1.0.jar file use below command
hadoop jar /home/subodh/ReduceSideJoin-1.0.jar com.javamakeuse.bd.poc.driver.UserActivityDriver /input/user_activity.log /input/user.log /output
Above will generate below output and also create an output folder with output of the ReduceSideJoin project.
16/03/27 22:08:31 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/03/27 22:08:31 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/03/27 22:08:31 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/03/27 22:08:32 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/03/27 22:08:32 INFO input.FileInputFormat: Total input paths to process : 1
16/03/27 22:08:32 INFO input.FileInputFormat: Total input paths to process : 1
16/03/27 22:08:32 INFO mapreduce.JobSubmitter: number of splits:2
16/03/27 22:08:32 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local574821361_0001
16/03/27 22:08:32 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
16/03/27 22:08:32 INFO mapreduce.Job: Running job: job_local574821361_0001
16/03/27 22:08:32 INFO mapred.LocalJobRunner: OutputCommitter set in config null
16/03/27 22:08:32 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/03/27 22:08:32 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
16/03/27 22:08:32 INFO mapred.LocalJobRunner: Waiting for map tasks
16/03/27 22:08:32 INFO mapred.LocalJobRunner: Starting task: attempt_local574821361_0001_m_000000_0
16/03/27 22:08:33 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/03/27 22:08:33 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
16/03/27 22:08:33 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/input/user_activity.log:0+641
16/03/27 22:08:33 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
16/03/27 22:08:33 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
16/03/27 22:08:33 INFO mapred.MapTask: soft limit at 83886080
16/03/27 22:08:33 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
16/03/27 22:08:33 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
16/03/27 22:08:33 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
16/03/27 22:08:33 INFO mapred.LocalJobRunner: 
16/03/27 22:08:33 INFO mapred.MapTask: Starting flush of map output
16/03/27 22:08:33 INFO mapred.MapTask: Spilling map output
16/03/27 22:08:33 INFO mapred.MapTask: bufstart = 0; bufend = 632; bufvoid = 104857600
16/03/27 22:08:33 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214364(104857456); length = 33/6553600
16/03/27 22:08:33 INFO mapred.MapTask: Finished spill 0
16/03/27 22:08:33 INFO mapred.Task: Task:attempt_local574821361_0001_m_000000_0 is done. And is in the process of committing
16/03/27 22:08:33 INFO mapred.LocalJobRunner: map
16/03/27 22:08:33 INFO mapred.Task: Task 'attempt_local574821361_0001_m_000000_0' done.
16/03/27 22:08:33 INFO mapred.LocalJobRunner: Finishing task: attempt_local574821361_0001_m_000000_0
16/03/27 22:08:33 INFO mapred.LocalJobRunner: Starting task: attempt_local574821361_0001_m_000001_0
16/03/27 22:08:33 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/03/27 22:08:33 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
16/03/27 22:08:33 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/input/user.log:0+157
16/03/27 22:08:33 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
16/03/27 22:08:33 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
16/03/27 22:08:33 INFO mapred.MapTask: soft limit at 83886080
16/03/27 22:08:33 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
16/03/27 22:08:33 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
16/03/27 22:08:33 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
16/03/27 22:08:33 INFO mapred.LocalJobRunner: 
16/03/27 22:08:33 INFO mapred.MapTask: Starting flush of map output
16/03/27 22:08:33 INFO mapred.MapTask: Spilling map output
16/03/27 22:08:33 INFO mapred.MapTask: bufstart = 0; bufend = 61; bufvoid = 104857600
16/03/27 22:08:33 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214384(104857536); length = 13/6553600
16/03/27 22:08:33 INFO mapred.MapTask: Finished spill 0
16/03/27 22:08:33 INFO mapred.Task: Task:attempt_local574821361_0001_m_000001_0 is done. And is in the process of committing
16/03/27 22:08:33 INFO mapred.LocalJobRunner: map
16/03/27 22:08:33 INFO mapred.Task: Task 'attempt_local574821361_0001_m_000001_0' done.
16/03/27 22:08:33 INFO mapred.LocalJobRunner: Finishing task: attempt_local574821361_0001_m_000001_0
16/03/27 22:08:33 INFO mapred.LocalJobRunner: map task executor complete.
16/03/27 22:08:33 INFO mapred.LocalJobRunner: Waiting for reduce tasks
16/03/27 22:08:33 INFO mapred.LocalJobRunner: Starting task: attempt_local574821361_0001_r_000000_0
16/03/27 22:08:33 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/03/27 22:08:33 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
16/03/27 22:08:33 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@4e12ef32
16/03/27 22:08:33 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=334338464, maxSingleShuffleLimit=83584616, mergeThreshold=220663392, ioSortFactor=10, memToMemMergeOutputsThreshold=10
16/03/27 22:08:33 INFO reduce.EventFetcher: attempt_local574821361_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
16/03/27 22:08:33 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local574821361_0001_m_000000_0 decomp: 652 len: 656 to MEMORY
16/03/27 22:08:33 INFO reduce.InMemoryMapOutput: Read 652 bytes from map-output for attempt_local574821361_0001_m_000000_0
16/03/27 22:08:33 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 652, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->652
16/03/27 22:08:33 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local574821361_0001_m_000001_0 decomp: 71 len: 75 to MEMORY
16/03/27 22:08:33 INFO reduce.InMemoryMapOutput: Read 71 bytes from map-output for attempt_local574821361_0001_m_000001_0
16/03/27 22:08:33 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 71, inMemoryMapOutputs.size() -> 2, commitMemory -> 652, usedMemory ->723
16/03/27 22:08:33 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
16/03/27 22:08:33 INFO mapred.LocalJobRunner: 2 / 2 copied.
16/03/27 22:08:33 INFO reduce.MergeManagerImpl: finalMerge called with 2 in-memory map-outputs and 0 on-disk map-outputs
16/03/27 22:08:33 INFO mapred.Merger: Merging 2 sorted segments
16/03/27 22:08:33 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 703 bytes
16/03/27 22:08:33 INFO reduce.MergeManagerImpl: Merged 2 segments, 723 bytes to disk to satisfy reduce memory limit
16/03/27 22:08:33 INFO reduce.MergeManagerImpl: Merging 1 files, 725 bytes from disk
16/03/27 22:08:33 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
16/03/27 22:08:33 INFO mapred.Merger: Merging 1 sorted segments
16/03/27 22:08:33 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 711 bytes
16/03/27 22:08:33 INFO mapred.LocalJobRunner: 2 / 2 copied.
16/03/27 22:08:33 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
16/03/27 22:08:33 INFO mapreduce.Job: Job job_local574821361_0001 running in uber mode : false
16/03/27 22:08:33 INFO mapreduce.Job:  map 100% reduce 0%
16/03/27 22:08:34 INFO mapred.Task: Task:attempt_local574821361_0001_r_000000_0 is done. And is in the process of committing
16/03/27 22:08:34 INFO mapred.LocalJobRunner: 2 / 2 copied.
16/03/27 22:08:34 INFO mapred.Task: Task attempt_local574821361_0001_r_000000_0 is allowed to commit now
16/03/27 22:08:34 INFO output.FileOutputCommitter: Saved output of task 'attempt_local574821361_0001_r_000000_0' to hdfs://localhost:9000/output2/_temporary/0/task_local574821361_0001_r_000000
16/03/27 22:08:34 INFO mapred.LocalJobRunner: reduce > reduce
16/03/27 22:08:34 INFO mapred.Task: Task 'attempt_local574821361_0001_r_000000_0' done.
16/03/27 22:08:34 INFO mapred.LocalJobRunner: Finishing task: attempt_local574821361_0001_r_000000_0
16/03/27 22:08:34 INFO mapred.LocalJobRunner: reduce task executor complete.
16/03/27 22:08:34 INFO mapreduce.Job:  map 100% reduce 100%
16/03/27 22:08:34 INFO mapreduce.Job: Job job_local574821361_0001 completed successfully
16/03/27 22:08:34 INFO mapreduce.Job: Counters: 35
 File System Counters
  FILE: Number of bytes read=36102
  FILE: Number of bytes written=869331
  FILE: Number of read operations=0
  FILE: Number of large read operations=0
  FILE: Number of write operations=0
  HDFS: Number of bytes read=2237
  HDFS: Number of bytes written=669
  HDFS: Number of read operations=28
  HDFS: Number of large read operations=0
  HDFS: Number of write operations=5
 Map-Reduce Framework
  Map input records=13
  Map output records=13
  Map output bytes=693
  Map output materialized bytes=731
  Input split bytes=519
  Combine input records=0
  Combine output records=0
  Reduce input groups=4
  Reduce shuffle bytes=731
  Reduce input records=13
  Reduce output records=9
  Spilled Records=26
  Shuffled Maps =2
  Failed Shuffles=0
  Merged Map outputs=2
  GC time elapsed (ms)=0
  Total committed heap usage (bytes)=938999808
 Shuffle Errors
  BAD_ID=0
  CONNECTION=0
  IO_ERROR=0
  WRONG_LENGTH=0
  WRONG_MAP=0
  WRONG_REDUCE=0
 File Input Format Counters 
  Bytes Read=0
 File Output Format Counters 
  Bytes Written=669
Step 12. Verify the output

That's it.

Download the complete example from here Source Code

Sponsored Links

0 comments:

Post a Comment