首先我们要使用的数据如下
A:B,C,D,F,E,OB:A,C,E,KC:F,A,D,ID:A,E,F,LE:B,C,D,M,LF:A,B,C,D,E,O,MG:A,C,D,E,FH:A,C,D,E,OI:A,OJ:B,OK:A,C,DL:D,E,FM:E,F,GO:A,H,I,J
要想实现共同好友,首先,我们先求出哪些用户是他的好友,如B是A的好友,C是A的好友,D是A的好友…
那么我们以好友为key,用户为value向第一个Reduce中传输数据,传输的数据就会是:
key:B value:A
key:C value:A key:D value:A …
这样,我们在Reduce合并数据后,就变成了
key:B value:A,…
key:C value:A,… key:D value:A,…
MapReduce01
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FirendMapReduce01 { public static class MapTask extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取一行数据 String line = value.toString(); //将数据按照“:”拆分 splits[0]为用户,splits[1]为该用户好友 String[] splits = line.split(":"); //将好友按照“,”拆分 String[] firends = splits[1].split(","); //循环写出信息格式为: key:好友,value:用户 for (String firend : firends) { context.write(new Text(firend), new Text(splits[0])); } } } public static class ReduceTask extends Reducer { @Override protected void reduce(Text firend, Iterable users, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); //循环传输过来的values for (Text user : users) { //将循环的数据按照“,”组合 sb.append(user).append(","); } //写出,格式为:A I,K,C,B,G,F,H,O,D context.write(firend, new Text(sb.substring(0, sb.length() - 1))); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = new Job(conf); job.setMapperClass(MapTask.class); job.setReducerClass(ReduceTask.class); job.setJarByClass(FirendMapReduce01.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("F:\\hadoop\\friend.txt")); FileOutputFormat.setOutputPath(job, new Path("F:\\hadoop\\test")); boolean completion = job.waitForCompletion(true); }}
结果如下:
在得到了如下的数据后,我们就知道了如:I,K,C,B…有共同好友A
那么,我们把他们拆分重新组装成固定合适就OK了MapReduce2
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.util.Arrays;public class FirendMapReduce02 { public static class MapTask extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //读取一行数据 String lines = value.toString(); //将数据按照“\t”拆分 splits[0]为好友,splits[1]为有该好友的用户 String[] split = lines.split("\t"); //将好友按照“,”拆分 String[] users = split[1].split(","); //排序,以防止数据重复 Arrays.sort(users); //循环写出数据 格式为:key:A-B,value:共同拥有的好友 for (int i = 0; i < users.length; i++) { for (int j = i + 1; j < users.length; j++) { context.write(new Text(users[i] + "-" + users[j]), new Text(split[0])); } } } } public static class ReduceTask extends Reducer { @Override protected void reduce(Text key, Iterable firends, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); //循环整合数据,按“ ”拼接 for (Text firend : firends) { sb.append(firend).append(" "); } //写出数据,格式为A-B E C context.write(key, new Text(sb.toString())); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = new Job(conf); job.setMapperClass(FirendMapReduce02.MapTask.class); job.setReducerClass(FirendMapReduce02.ReduceTask.class); job.setJarByClass(FirendMapReduce02.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("F:\\hadoop\\test\\part-r-00000")); FileOutputFormat.setOutputPath(job, new Path("F:\\hadoop\\test1")); boolean completion = job.waitForCompletion(true); }}
结果如下(仅参考):