MapReducer-找共同好友

MapReducer-找共同好友

package com.billstudy.mr.friends;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 找朋友
 *  共同好友
	原始数据:每个人的好友列表
	A:B,C,D,F,E,O
	B:A,C,E,K
	C:F,A,D,I
	D:A,E,F,L
	E:B,C,D,M,L
	F:A,B,C,D,E,O,M
	G:A,C,D,E,F
	H:A,C,D,E,O
	I:A,O
	J:B,O
	K:A,C,D
	L:D,E,F
	M:E,F,G
	O:A,H,I,J
	……
	
	输出结果:每个人和其他各人所拥有的功能好友
	A-B	C,E,
	A-C	D,F,
	A-D	E,F,
	A-E	B,C,D,
	A-F	B,C,D,E,O,
	A-G	C,D,E,F,
	A-H	C,D,E,O,
	A-I	O,
	A-J	B,O,
	A-K	C,D,
	A-L	D,E,F,
	A-M	E,F,
	B-C	A,
	B-D	A,E,
	……
 * @author Bill
 * @since V1.0 2015年6月24日 - 下午4:53:01
 */
public class ShareFriends {

	/**
	 * 
	 * 把拥有同一个朋友的放到同一组 
	 * 
	 * 将
	 *  A:B,C,D,F,E,O
	 * 
	 * 输出:
	 * B	A
	 * C	A
	 * D	A
	 * F	A
	 * ....
	 * 
	 * @author Bill
	 * @since V1.0 2015年6月24日 - 下午5:15:21
	 */
	static class Mapper1 extends Mapper {
		
		private final Text k = new Text();
		
		private final Text v = new Text();
		
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			
			String[] persons = value.toString().split(":");
			
			if(persons.length != 2){
				return;
			}
			
			// 切分字段
			String self = persons[0];
			String[] friends = persons[1].split(",");
			
			v.set(self);
			for (int i = 0; i < friends.length; i++) {
				k.set(friends[i]);
				context.write(k, v);
			}
		}
	}

	/**
	 * 
	 * 把拥有同一个朋友的拼接到一起,输出
	 * 
	 * 将
	 * A	{B,C,D}
	 * 
	 * 输出:
	 * A	B-C-D
	 * @author Bill
	 * @since V1.0 2015年6月24日 - 下午5:14:26
	 */
	static class Reducer1 extends Reducer{
		
		// private final Text k = new Text();
		
		private final Text v = new Text();
		
		@Override
		protected void reduce(Text key, Iterable friends, Context context)
				throws IOException, InterruptedException {
			
			StringBuilder friendNames = new StringBuilder();
			
			for (Text friend : friends) {
				friendNames.append(friend.toString() + "-");
			}
			
			// 去掉最后一个杠杠
			v.set(friendNames.length() > 0 ? friendNames.substring(0, friendNames.length() - 1) : "" );
			
			context.write(key, v);
			
		}
	}

	/**
	 * 将拥有同一个朋友的人排序后两两拼接输出,让朋友任意的一对组合都可以分到同一组
	 * 
	 * 将
	 * A	B-C-D-E-F
	 * 
	 * 输出:
	 * B-C	A
	 * B-D	A
	 * B-E	A
	 * B-F	A
	 * C-D	A
	 * ....
	 * @author Bill
	 * @since V1.0 2015年6月24日 - 下午5:12:44
	 */
	static class Mapper2 extends Mapper{
		
		private final Text k = new Text();
		
		private final Text v = new Text();
		
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			
			String[] persons = value.toString().split("\t");
			String self = persons[0];
			String[] friends = persons[1].split("-");
			
			// 此处必须要对其朋友排序,否则交叉输出时会导致A-B:D / B-A:F 的问题出现,实际上述两个key为同一组。应该为:A-B:D,F
			Arrays.sort(friends);

			v.set(self);
			
			// 交叉打印
			for (int i = 0; i < friends.length - 1; i++) {
				for (int j = i + 1; j < friends.length; j++) {
					 k.set(friends[i] + "-" + friends[j]);
					 context.write(k, v);
				}
			}
		}
	}
	
	/**
	 * 
	 * 把分到同一组组合的朋友拼接输出
	 * 
	 * 将类似:
	 * 	A-B B
	 *  A-B C
	 *  A-B D
	 *  
	 * 输出:
	 * A-B	B,C,D
	 * @author Bill
	 * @since V1.0 2015年6月24日 - 下午5:11:24
	 */
	static class Reducer2 extends Reducer{
		
		// private final Text k = new Text();
		
		private final Text v = new Text();
		
		@Override
		protected void reduce(Text pair, Iterable friends,Context context)
				throws IOException, InterruptedException {
			
			StringBuilder friendNames = new StringBuilder();
			
			for (Text friend : friends) {
				friendNames.append(friend.toString() + ",");
			}
			
			// 去掉逗号
			v.set(friendNames.length() > 0 ? friendNames.substring(0, friendNames.length() - 1) : "");
			
			context.write(pair, v);
			
		}
		
	}
	
	
	public static void main(String[] args) throws Exception {
		
		if (args.length != 3) {
			System.err.println("Usage:  ");
			System.exit(1);
		}
		
		Configuration conf = new Configuration();
		
		// 创建路径,清除旧数据
		Path job1InputPath = new Path(args[0]);
		Path job1OutputPath = new Path(args[1]);
		Path job2OutputPath = new Path(args[2]);
		
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(job1OutputPath)) {
			fs.delete(job1OutputPath, true);
		}

		if (fs.exists(job2OutputPath)) {
			fs.delete(job2OutputPath, true);
		}

		
		// job1
		Job job1 = Job.getInstance(conf);
		job1.setJarByClass(ShareFriends.class);
		job1.setMapperClass(Mapper1.class);
		job1.setReducerClass(Reducer1.class);
		
		job1.setOutputKeyClass(Text.class);
		job1.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job1, job1InputPath);
		FileOutputFormat.setOutputPath(job1, job1OutputPath);
		
		// job2
		Job job2 = Job.getInstance(conf);
		job2.setJarByClass(ShareFriends.class);
		job2.setMapperClass(Mapper2.class);
		job2.setReducerClass(Reducer2.class);
		
		job2.setOutputKeyClass(Text.class);
		job2.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job2, job1OutputPath);
		FileOutputFormat.setOutputPath(job2, job2OutputPath);
		
		// 控制依赖
		ControlledJob controlledJob1 = new ControlledJob(conf);
		ControlledJob controlledJob2 = new ControlledJob(conf);
		
		controlledJob1.setJob(job1);
		controlledJob2.setJob(job2);
		controlledJob2.addDependingJob(controlledJob1);
		
		JobControl jobControl = new JobControl("share-friends");
		jobControl.addJob(controlledJob1);
		jobControl.addJob(controlledJob2);

		// 创建线程,开始执行任务
		Thread shareFriendExecuteThread = new Thread(jobControl);
		shareFriendExecuteThread.start();
		
		while(!jobControl.allFinished()){
			TimeUnit.SECONDS.sleep(1);
		}

		jobControl.stop();
		
		//  弹出两个job的输出结果文件夹
		Runtime.getRuntime().exec("cmd.exe /c start " + job1OutputPath.toUri().getPath().substring(1));
		Runtime.getRuntime().exec("cmd.exe /c start " + job2OutputPath.toUri().getPath().substring(1));
	}
	
}

版权声明:本文为博主原创文章,未经博主允许不得转载。

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注