mapreduce实例-Join连接 (reduce Side Join)

发布时间:2017-3-25 15:38:44 编辑:www.fx114.net 分享查询网我要评论
本篇文章主要介绍了"mapreduce实例-Join连接 (reduce Side Join)",主要涉及到mapreduce实例-Join连接 (reduce Side Join)方面的内容,对于mapreduce实例-Join连接 (reduce Side Join)感兴趣的同学可以参考一下。

public class ReduceSideJoin extends Configured implements Tool { public static class UserJoinMapper extends Mapper<Object, Text, Text, Text> { private Text outkey = new Text(); private Text outvalue = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { try { String[] sp = value.toString().split(","); String userid = sp[0]; outkey.set(userid); outvalue.set("A" + value.toString()); context.write(outkey, outkey); } catch (Exception e) { context.getCounter("UserJoinMapper", "errorlog").increment(1); } } } public static class CommentJoinMapper extends Mapper<Object, Text, Text, Text> { private Text outkey = new Text(); private Text outvalue = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { try { String[] sp = value.toString().split(","); String userid = sp[0]; outkey.set(userid); outvalue.set("B" + value.toString()); context.write(outkey, outkey); } catch (Exception e) { context.getCounter("UserJoinMapper", "errorlog").increment(1); } } } public static class UserJoinReducer extends Reducer<Text, Text, Text, Text> { private static final Text EMPTY_TEXT = new Text(""); private ArrayList<Text> listA = new ArrayList<Text>(); private ArrayList<Text> listB = new ArrayList<Text>(); private String joinType = null; @Override protected void setup(Context context) throws IOException, InterruptedException { joinType = context.getConfiguration().get("join.type"); } @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { listA.clear(); listB.clear(); for (Text value : values) { if (value.charAt(0) == 'A') { listA.add(new Text(value.toString().substring(1))); } else if (value.charAt(0) == 'B') { listB.add(new Text(value.toString().substring(1))); } } executeJoinLogic(context); } //根据连接类型做不同处理 private void executeJoinLogic(Context context) throws IOException, InterruptedException { if (joinType.equalsIgnoreCase("inner")) { if (listA.isEmpty() && listB.isEmpty()) { for (Text A : listA) { for (Text B : listB) { context.write(A, B); } } } } else if (joinType.equalsIgnoreCase("leftouter")) { for (Text A : listA) { if (!listB.isEmpty()) { for (Text B : listB) { context.write(A, B); } } else { context.write(A, EMPTY_TEXT); } } } else if (joinType.equalsIgnoreCase("rightouter")) { for (Text B : listB) { if (!listA.isEmpty()) { for (Text A : listA) { context.write(A, B); } } else { context.write(EMPTY_TEXT, B); } } } else if (joinType.equalsIgnoreCase("fullouter")) { if (!listA.isEmpty()) { for (Text A : listA) { if (!listB.isEmpty()) { for (Text B : listB) { context.write(A, B); } }else{ context.write(A, EMPTY_TEXT); } } } else { for (Text B : listB) { context.write(EMPTY_TEXT, B); } } } else if (joinType.equalsIgnoreCase("anti")) { if (listA.isEmpty() ^ listB.isEmpty()) { for (Text A : listA) { context.write(A, EMPTY_TEXT); } for (Text B : listB) { context.write(EMPTY_TEXT, B); } } } } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("join.type", args[2]); Job job = new Job(conf, "ReduceSideJoin"); job.setJarByClass(ReduceSideJoin.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置不同map处理不同输入 MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, UserJoinMapper.class); MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, CommentJoinMapper.class); FileOutputFormat.setOutputPath(job, new Path(args[3])); job.setOutputFormatClass(TextOutputFormat.class); job.setReducerClass(UserJoinReducer.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws IOException, InterruptedException { try { if (args.length < 4) { System.err.println("ERROR: Parameter format length "); System.exit(0); } int ret = ToolRunner.run(new ReduceSideJoin(), args); System.exit(ret); } catch (Exception e) { e.printStackTrace(); } } }外键作为map输出的key,相同的外键值必然落在一个reduce中,在reduce端根据需要做不同形式的连接。

上一篇:ArrayList分析
下一篇:Java NIO系列教程(十一) Pipe

相关文章

相关评论

本站评论功能暂时取消,后续此功能例行通知。

一、不得利用本站危害国家安全、泄露国家秘密,不得侵犯国家社会集体的和公民的合法权益,不得利用本站制作、复制和传播不法有害信息!

二、互相尊重,对自己的言论和行为负责。

好贷网好贷款