2
Sponsored Links


Ad by Google
We have already seen an example of Combiner in MapReduce programming and Custom Partitioner. In this tutorial, I am going to show you an example of Map side join in Hadoop MapReduce. If you want to dig more into the deep of MapReduce, and how it works, than you may like this article on how map reduce works.
MapReduce process the big data sets, and processing large data sets most of the time required joining between datasets based on common key like we almost always do while playing with any RDBMS database based on primary/foreign key concept.

Joins in Hadoop MapReduce
Hadoop MapReduce supports two types of joins-
In this tutorial, I am going to explain you the usage of Map side join.
Map side Join
You can use Map side join using two different ways based on your datasets, and those depends on below conditions -
  1. Both datasets are must be divided into the same number of partitions, and must be already sorted by the same key.
  2. From the two datasets one must be small(something like master dataset) and able to fit into the memory of each nodes.
In this tutorial, I will show you the second one, and to use the second one you need distributed cache to keep the small(master) dataset into the memory of each nodes.

OK, Let's find the user activity on social media, what are the actions user performed on popular social media like commenting on post, shared something, like something etc.
And for these we have two different log files -
  • user.log
  • user_activity.log
Here is the tabular view of these datasets,
1. user.log






2. user_activity.log

3. Expected output

Tools and Technologies we are using here:

  • Java 8
  • Eclipse Mars
  • Hadoop 2.7.1
  • Maven 3.3
  • Ubuntu 14(Linux OS)
Step 1. Create a new maven project
Go to File Menu then New->Maven Project, and provide the required details, see the below attached screen.

Step 2. Edit pom.xml
Double click on your project's pom.xml file, it will looks like this with very limited information.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.javamakeuse.bd.poc</groupId>
  <artifactId>MapSideJoin</artifactId>
  <version>1.0</version>
  <name>MapSideJoin</name>
  <description>MapSideJoin Example in Map Reduce</description>
</project>
Now edit this pom.xml file and add Hadoop dependencies, below is the complete pom.xml file, just copy and paste, it will work.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.javamakeuse.bd.poc</groupId>
  <artifactId>MapSideJoin</artifactId>
  <version>1.0</version>
  <name>MapSideJoin</name>
  <description>MapSideJoin Example</description>
  <dependencies>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-client</artifactId>
   <version>2.7.1</version>
  </dependency>
  <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-mapreduce-client-core</artifactId>
   <version>2.7.1</version>
  </dependency>
 </dependencies>
 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
     <source>1.7</source>
     <target>1.7</target>
    </configuration>
   </plugin>
  </plugins>
 </build>
</project>
Step 3. UserActivityVO.java
This is our value object class, which will contains the fields needs to be written as an output of the project.
package com.javamakeuse.bd.poc.vo;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

public class UserActivityVO implements Writable {

 private int userId;
 private String userName;
 private String comments;
 private String postShared;

 public int getUserId() {
  return userId;
 }
 public void setUserId(int userId) {
  this.userId = userId;
 }
 public String getUserName() {
  return userName;
 }
 public void setUserName(String userName) {
  this.userName = userName;
 }
 public String getComments() {
  return comments;
 }
 public void setComments(String comments) {
  this.comments = comments;
 }
 public String getPostShared() {
  return postShared;
 }
 public void setPostShared(String postShared) {
  this.postShared = postShared;
 }
 @Override
 public void write(DataOutput out) throws IOException {
  out.writeInt(userId);
  out.writeUTF(userName);
  out.writeUTF(comments);
  out.writeUTF(postShared);
 }
 @Override
 public void readFields(DataInput in) throws IOException {
  userId = in.readInt();
  userName = in.readUTF();
  comments = in.readUTF();
  postShared = in.readUTF();
 }

 @Override
 public String toString() {
  return "UserActivityVO [userId=" + userId + ", userName=" + userName + ",   comments=" + comments
    + ", postShared=" + postShared + "]";
 }

}
Step 4. UserActivityMapper.java (Mapper)
Inside this mapper class, we are setting the properties of UserActivityVO class, user.log file is in from distributed cache.
package com.javamakeuse.bd.poc.mapper;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.javamakeuse.bd.poc.vo.UserActivityVO;

public class UserActivityMapper extends Mapper<LongWritable, Text, IntWritable, UserActivityVO> {

 // user map to keep the userId-userName
 private Map<Integer, String> userMap = new HashMap<>();

 @Override
 protected void map(LongWritable key, Text value,
   Mapper<LongWritable, Text, IntWritable, UserActivityVO>.Context context)
     throws IOException, InterruptedException {

  String[] columns = value.toString().split("\t");
  if (columns != null && columns.length > 2) {
   UserActivityVO userActivityVO = new UserActivityVO();
   userActivityVO.setUserId(Integer.parseInt(columns[1]));
   userActivityVO.setComments(columns[2]);
   userActivityVO.setPostShared(columns[3]);
   userActivityVO.setUserName(userMap.get(userActivityVO.getUserId()));
   // writing into context
   context.write(new IntWritable(userActivityVO.getUserId()), userActivityVO);
  }
 }

 @Override
 protected void setup(Mapper<LongWritable, Text, IntWritable, UserActivityVO>.Context context)
   throws IOException, InterruptedException {
  // loading user map in context
  loadUserInMemory(context);
 }

 private void loadUserInMemory(Mapper<LongWritable, Text, IntWritable, UserActivityVO>.Context context) {
  // user.log is in distributed cache
  try (BufferedReader br = new BufferedReader(new FileReader("user.log"))) {
   String line;
   while ((line = br.readLine()) != null) {
    String columns[] = line.split("\t");
    userMap.put(Integer.parseInt(columns[0]), columns[1]);
   }
  } catch (IOException e) {
   e.printStackTrace();
  }

 }
}
Step 5. UserActivityReducer.java (Reducer)
The reducer class just iterating the values and writing into the context.
package com.javamakeuse.bd.poc.reducer;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import com.javamakeuse.bd.poc.vo.UserActivityVO;

public class UserActivityReducer extends Reducer<IntWritable, UserActivityVO, UserActivityVO, NullWritable> {

 NullWritable value = NullWritable.get();

 @Override
 protected void reduce(IntWritable key, Iterable<UserActivityVO> values,
   Reducer<IntWritable, UserActivityVO, UserActivityVO, NullWritable>.Context context)
     throws IOException, InterruptedException {
  for (UserActivityVO userActivityVO : values) {
   context.write(userActivityVO, value);
  }
 }
}
Step 6. UserActivityDriver.java (Driver)
Here we are adding user.log file into the distributed cache.
package com.javamakeuse.bd.poc.driver;

import java.net.URI;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.javamakeuse.bd.poc.mapper.UserActivityMapper;
import com.javamakeuse.bd.poc.reducer.UserActivityReducer;
import com.javamakeuse.bd.poc.vo.UserActivityVO;

public class UserActivityDriver extends Configured implements Tool {

 public static void main(String[] args) {
  try {
   int status = ToolRunner.run(new UserActivityDriver(), args);
   System.exit(status);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 public int run(String[] args) throws Exception {
  if (args.length != 2) {
   System.err.printf("Usage: %s [generic options] <input1> <output>\n", getClass().getSimpleName());
   ToolRunner.printGenericCommandUsage(System.err);
   return -1;
  }
  Job job = Job.getInstance();
  job.setJarByClass(getClass());
  job.setJobName("MapSideJoin Example");
  // input path
  FileInputFormat.addInputPath(job, new Path(args[0]));

  // output path
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  job.setMapperClass(UserActivityMapper.class);
  job.setReducerClass(UserActivityReducer.class);

  job.setMapOutputKeyClass(IntWritable.class);
  job.setMapOutputValueClass(UserActivityVO.class);

  job.addCacheFile(new URI("hdfs://localhost:9000/input/user.log"));
  job.setOutputKeyClass(UserActivityVO.class);
  job.setOutputValueClass(NullWritable.class);

  return job.waitForCompletion(true) ? 0 : 1;
 }

}
Done, next to run this program, you can run it using any eclipse also, below are the steps to run using terminal.

Step 7. Steps to execute MapSideJoin project
i. Start Hadoop components,open your terminal and type
subodh@subodh-Inspiron-3520:~/software$ start-all.sh
ii. Verify Hadoop started or not with jps command
subodh@subodh-Inspiron-3520:~/software$ jps
8385 NameNode
8547 DataNode
5701 org.eclipse.equinox.launcher_1.3.100.v20150511-1540.jar
9446 Jps
8918 ResourceManager
9054 NodeManager
8751 SecondaryNameNode
You can verify with web-ui also using "http://localhost:50070/explorer.html#/" url.
iii. Create input folder on HDFS with below command.
subodh@subodh-Inspiron-3520:~/software$ hadoop fs -mkdir /input
The above command will create an input folder on HDFS, you can verify it using web UI, Now time to move input file which we need to process, below is the command to copy the user.log and user_activity.log input file on HDFS inside input folder.
subodh@subodh-Inspiron-3520:~$ hadoop fs -copyFromLocal /home/subodh/programs/input/user.log user_activity.log /input/
Note - user.log and user_activity.log file is available inside this project source code, you would be able to download it from our downloadable link, you will find downloadable link at the end of this tutorial.

Step 8. Create & Execute jar file
We almost done,now create jar file of MapSideJoin source code. You can create jar file using eclipse or by using mvn package command.
To execute MapSideJoin-1.0.jar file use below command
hadoop jar /home/subodh/MapSideJoin-1.0.jar com.javamakeuse.bd.poc.driver.UserActivityDriver /input/user_activity.log /output
Above will generate below output and also create an output folder with output of the MapSideJoin project.
16/03/24 17:49:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/03/24 17:49:06 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/03/24 17:49:06 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/03/24 17:49:06 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/03/24 17:49:06 INFO input.FileInputFormat: Total input paths to process : 1
16/03/24 17:49:06 INFO mapreduce.JobSubmitter: number of splits:1
16/03/24 17:49:06 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1445861261_0001
16/03/24 17:49:07 INFO mapred.LocalDistributedCacheManager: Creating symlink: /tmp/hadoop-subodh/mapred/local/1458821947020/user.log <- /home/subodh/user.log
16/03/24 17:49:07 INFO mapred.LocalDistributedCacheManager: Localized hdfs://localhost:9000/input/user.log as file:/tmp/hadoop-subodh/mapred/local/1458821947020/user.log
16/03/24 17:49:07 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
16/03/24 17:49:07 INFO mapreduce.Job: Running job: job_local1445861261_0001
16/03/24 17:49:07 INFO mapred.LocalJobRunner: OutputCommitter set in config null
16/03/24 17:49:07 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/03/24 17:49:07 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
16/03/24 17:49:07 INFO mapred.LocalJobRunner: Waiting for map tasks
16/03/24 17:49:07 INFO mapred.LocalJobRunner: Starting task: attempt_local1445861261_0001_m_000000_0
16/03/24 17:49:07 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/03/24 17:49:07 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
16/03/24 17:49:07 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/input/user_activity.log:0+282
16/03/24 17:49:07 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
16/03/24 17:49:07 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
16/03/24 17:49:07 INFO mapred.MapTask: soft limit at 83886080
16/03/24 17:49:07 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
16/03/24 17:49:07 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
16/03/24 17:49:07 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
16/03/24 17:49:07 INFO mapred.LocalJobRunner: 
16/03/24 17:49:07 INFO mapred.MapTask: Starting flush of map output
16/03/24 17:49:07 INFO mapred.MapTask: Spilling map output
16/03/24 17:49:07 INFO mapred.MapTask: bufstart = 0; bufend = 339; bufvoid = 104857600
16/03/24 17:49:07 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214384(104857536); length = 13/6553600
16/03/24 17:49:07 INFO mapred.MapTask: Finished spill 0
16/03/24 17:49:07 INFO mapred.Task: Task:attempt_local1445861261_0001_m_000000_0 is done. And is in the process of committing
16/03/24 17:49:07 INFO mapred.LocalJobRunner: map
16/03/24 17:49:07 INFO mapred.Task: Task 'attempt_local1445861261_0001_m_000000_0' done.
16/03/24 17:49:07 INFO mapred.LocalJobRunner: Finishing task: attempt_local1445861261_0001_m_000000_0
16/03/24 17:49:07 INFO mapred.LocalJobRunner: map task executor complete.
16/03/24 17:49:07 INFO mapred.LocalJobRunner: Waiting for reduce tasks
16/03/24 17:49:07 INFO mapred.LocalJobRunner: Starting task: attempt_local1445861261_0001_r_000000_0
16/03/24 17:49:07 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/03/24 17:49:07 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
16/03/24 17:49:07 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@396989af
16/03/24 17:49:07 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=334338464, maxSingleShuffleLimit=83584616, mergeThreshold=220663392, ioSortFactor=10, memToMemMergeOutputsThreshold=10
16/03/24 17:49:07 INFO reduce.EventFetcher: attempt_local1445861261_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
16/03/24 17:49:07 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local1445861261_0001_m_000000_0 decomp: 349 len: 353 to MEMORY
16/03/24 17:49:07 INFO reduce.InMemoryMapOutput: Read 349 bytes from map-output for attempt_local1445861261_0001_m_000000_0
16/03/24 17:49:07 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 349, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->349
16/03/24 17:49:07 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
16/03/24 17:49:07 INFO mapred.LocalJobRunner: 1 / 1 copied.
16/03/24 17:49:07 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
16/03/24 17:49:07 INFO mapred.Merger: Merging 1 sorted segments
16/03/24 17:49:07 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 343 bytes
16/03/24 17:49:07 INFO reduce.MergeManagerImpl: Merged 1 segments, 349 bytes to disk to satisfy reduce memory limit
16/03/24 17:49:07 INFO reduce.MergeManagerImpl: Merging 1 files, 353 bytes from disk
16/03/24 17:49:07 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
16/03/24 17:49:07 INFO mapred.Merger: Merging 1 sorted segments
16/03/24 17:49:07 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 343 bytes
16/03/24 17:49:07 INFO mapred.LocalJobRunner: 1 / 1 copied.
16/03/24 17:49:07 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
16/03/24 17:49:07 INFO mapred.Task: Task:attempt_local1445861261_0001_r_000000_0 is done. And is in the process of committing
16/03/24 17:49:07 INFO mapred.LocalJobRunner: 1 / 1 copied.
16/03/24 17:49:07 INFO mapred.Task: Task attempt_local1445861261_0001_r_000000_0 is allowed to commit now
16/03/24 17:49:07 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1445861261_0001_r_000000_0' to hdfs://localhost:9000/ua6/_temporary/0/task_local1445861261_0001_r_000000
16/03/24 17:49:07 INFO mapred.LocalJobRunner: reduce > reduce
16/03/24 17:49:07 INFO mapred.Task: Task 'attempt_local1445861261_0001_r_000000_0' done.
16/03/24 17:49:07 INFO mapred.LocalJobRunner: Finishing task: attempt_local1445861261_0001_r_000000_0
16/03/24 17:49:07 INFO mapred.LocalJobRunner: reduce task executor complete.
16/03/24 17:49:08 INFO mapreduce.Job: Job job_local1445861261_0001 running in uber mode : false
16/03/24 17:49:08 INFO mapreduce.Job:  map 100% reduce 100%
16/03/24 17:49:08 INFO mapreduce.Job: Job job_local1445861261_0001 completed successfully
16/03/24 17:49:08 INFO mapreduce.Job: Counters: 35
 File System Counters
  FILE: Number of bytes read=17570
  FILE: Number of bytes written=576233
  FILE: Number of read operations=0
  FILE: Number of large read operations=0
  FILE: Number of write operations=0
  HDFS: Number of bytes read=854
  HDFS: Number of bytes written=527
  HDFS: Number of read operations=31
  HDFS: Number of large read operations=0
  HDFS: Number of write operations=4
 Map-Reduce Framework
  Map input records=4
  Map output records=4
  Map output bytes=339
  Map output materialized bytes=353
  Input split bytes=110
  Combine input records=0
  Combine output records=0
  Reduce input groups=4
  Reduce shuffle bytes=353
  Reduce input records=4
  Reduce output records=4
  Spilled Records=8
  Shuffled Maps =1
  Failed Shuffles=0
  Merged Map outputs=1
  GC time elapsed (ms)=0
  Total committed heap usage (bytes)=534773760
 Shuffle Errors
  BAD_ID=0
  CONNECTION=0
  IO_ERROR=0
  WRONG_LENGTH=0
  WRONG_MAP=0
  WRONG_REDUCE=0
 File Input Format Counters 
  Bytes Read=282
 File Output Format Counters 
  Bytes Written=527


Step 9. Verify output
subodh@subodh-Inspiron-3520:~/Downloads$ hadoop fs -cat /output/par*
16/03/24 17:49:18 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
UserActivityVO [userId=1, userName=Susan, comments=looking awesome:), postShared=http://dummyimage.com/160x166.gif/5fa2dd/ffffff]
UserActivityVO [userId=2, userName=Kathleen, comments=full masti, postShared=http://dummyimage.com/250x142.png/ff4444/ffffff]
UserActivityVO [userId=3, userName=Marilyn, comments=wow gangnam style,cool, postShared=http://dummyimage.com/124x173.png/cc0000/ffffff]
UserActivityVO [userId=4, userName=Craig, comments=welcome to the heaven, postShared=http://dummyimage.com/148x156.png/ff4444/ffffff]


That's it.

Download the complete example from here Source Code


Happy Data analytic :)
Sponsored Links

2 comments:

  1. Hi There,

    Can anybody please tell me What is the meaning of below line #49?

    userMap.put(Integer.parseInt(columns[0]), columns[1]);

    ReplyDelete
    Replies
    1. It's map to keep the user_id as key and user_name as value inside that map.

      Delete