MapReduce高级编程——自定义InputFormat

发布时间:2017-5-1 1:01:27 编辑:www.fx114.net 分享查询网我要评论
本篇文章主要介绍了"MapReduce高级编程——自定义InputFormat",主要涉及到MapReduce高级编程——自定义InputFormat方面的内容,对于MapReduce高级编程——自定义InputFormat感兴趣的同学可以参考一下。

http://irwenqiang.iteye.com/blog/1448164 0、测试集样例 Java代码   ball, 3.5, 12.7, 9.0   car, 15, 23.76, 42.23   device, 0.0, 12.4, -67.1     1、测试Point3D InputFormat Java代码   import java.io.IOException;   import java.net.URI;      import javax.xml.soap.Text;      import org.apache.hadoop.conf.Configuration;   import org.apache.hadoop.fs.FileSystem;   import org.apache.hadoop.fs.Path;   import org.apache.hadoop.mapreduce.Job;   import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;      /**   * desc:Custom Data Types <code>TestPoint3DInputFormat</code>   *    * @author chenwq   */   public class TestPoint3DInputFormat {        /**       * @param args       * @throws IOException        * @throws ClassNotFoundException        * @throws InterruptedException        */       public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {           // TODO Auto-generated method stub           System.out.println("hello,chenwq!");           Job job=new Job();           Configuration conf=new Configuration();           FileSystem fs=FileSystem.get(URI.create(args[1]), conf);           fs.delete(new Path(args[1]));           job.setJobName("测试MyInputFormat程序。。。。。");           FileInputFormat.addInputPath(job, new Path(args[0]));           FileOutputFormat.setOutputPath(job, new Path(args[1]));           job.setInputFormatClass(Point3DinputFormat.class);           job.setMapOutputKeyClass(Text.class);           job.setMapOutputValueClass(Point3D.class);           job.setMapperClass(Point3DMapper.class);           job.setNumReduceTasks(0);           job.waitForCompletion(false);       }   }     2、自定义类型Point3D必须实现WritableComparable接口,才能在Hadoop环境中传输 Java代码   import java.io.DataInput;   import java.io.DataOutput;   import java.io.IOException;      import org.apache.hadoop.io.WritableComparable;      /**   * desc:Custom Data Types <code>Point</code>   *    * @author chenwq   */   public class Point3D implements WritableComparable {       public float x;       public float y;       public float z;          public Point3D(float x, float y, float z) {           this.x = x;           this.y = y;           this.z = z;       }          public Point3D() {           this(0.0f, 0.0f, 0.0f);       }          public void set(float x, float y, float z) {           this.x = x;           this.y = y;           this.z = z;       }          public void write(DataOutput out) throws IOException {           out.writeFloat(x);           out.writeFloat(y);           out.writeFloat(z);       }          public void readFields(DataInput in) throws IOException {           x = in.readFloat();           y = in.readFloat();           z = in.readFloat();       }          public String toString() {           return Float.toString(x) + ", " + Float.toString(y) + ", "                   + Float.toString(z);       }          public float distanceFromOrigin() {           return (float) Math.sqrt(x * x + y * y + z * z);       }          public int compareTo(Object other) {           float myDistance = this.distanceFromOrigin();           float otherDistance = ((Point3D) other).distanceFromOrigin();              return Float.compare(myDistance, otherDistance);       }          public boolean equals(Object o) {           Point3D other = (Point3D) o;           if (!(other instanceof Point3D)) {               return false;           }              return this.x == other.x && this.y == other.y && this.z == other.z;       }          public int hashCode() {           return Float.floatToIntBits(x) ^ Float.floatToIntBits(y)                   ^ Float.floatToIntBits(z);       }      }    3、自定义Point3DInputFormat类型,供MapReduce编程模型使用 Java代码   import java.io.IOException;      import java.util.StringTokenizer;      import org.apache.hadoop.conf.Configuration;   import org.apache.hadoop.fs.FSDataInputStream;   import org.apache.hadoop.fs.FileSystem;   import org.apache.hadoop.fs.Path;   import org.apache.hadoop.io.Text;   import org.apache.hadoop.mapreduce.InputSplit;   import org.apache.hadoop.mapreduce.JobContext;   import org.apache.hadoop.mapreduce.RecordReader;   import org.apache.hadoop.mapreduce.TaskAttemptContext;   import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   import org.apache.hadoop.mapreduce.lib.input.FileSplit;   import org.apache.hadoop.util.LineReader;      public class Point3DinputFormat extends FileInputFormat<Text, Point3D> {              @Override       protected boolean isSplitable(JobContext context, Path filename) {           // TODO Auto-generated method stub           return false;       }       @Override       public RecordReader<Text, Point3D> createRecordReader(InputSplit inputsplit,               TaskAttemptContext context) throws IOException, InterruptedException {           // TODO Auto-generated method stub           return new objPosRecordReader();       }       public static class objPosRecordReader extends RecordReader<Text,Point3D>{              public LineReader in;           public Text lineKey;           public Point3D lineValue;           public StringTokenizer token=null;                      public Text line;                    @Override           public void close() throws IOException {               // TODO Auto-generated method stub                          }              @Override           public Text getCurrentKey() throws IOException, InterruptedException {               //lineKey.set(token.nextToken());               return lineKey;           }              @Override           public Point3D getCurrentValue() throws IOException,                   InterruptedException {               // TODO Auto-generated method stub               return lineValue;           }              @Override           public float getProgress() throws IOException, InterruptedException {               // TODO Auto-generated method stub               return 0;           }              @Override           public void initialize(InputSplit input, TaskAttemptContext context)                   throws IOException, InterruptedException {               // TODO Auto-generated method stub               FileSplit split=(FileSplit)input;               Configuration job=context.getConfiguration();               Path file=split.getPath();               FileSystem fs=file.getFileSystem(job);                              FSDataInputStream filein=fs.open(file);               in=new LineReader(filein,job);                              line=new Text();               lineKey=new Text();               lineValue=new Point3D();           }              @Override           public boolean nextKeyValue() throws IOException, InterruptedException {               // TODO Auto-generated method stub               int linesize=in.readLine(line);               if(linesize==0)                   return false;                              String[] pieces = line.toString().split(",");               if(pieces.length != 4){                   throw new IOException("Invalid record received");               }                              // try to parse floating point components of value               float fx, fy, fz;               try{                   fx = Float.parseFloat(pieces[1].trim());                   fy = Float.parseFloat(pieces[2].trim());                   fz = Float.parseFloat(pieces[3].trim());               }catch(NumberFormatException nfe){                   throw new IOException("Error parsing floating poing value in record");               }               lineKey.set(pieces[0]);                              lineValue.set(fx, fy, fz);                              return true;           }       }   }     4、编写Mapper类,这里仅仅测试自定义类型Point3D的InputFormat,不需要Reducer Java代码   import java.io.IOException;      import org.apache.hadoop.io.Text;   import org.apache.hadoop.mapreduce.Mapper;         public class Point3DMapper extends Mapper<Text, Point3D, Text, Point3D>{       protected void map(Text key, Point3D value, Context context) throws IOException, InterruptedException{           context.write(key, value);       }   }   上面RecordReader<LongWritable, Text>的<key、value>都是可以自己定义的。但key必须实现WritableComparable类,而value必须实现Writable类。

上一篇:《高效学习OpenGL》之 投影变换 glFrustum(),gluPerspective(),glOrtho(),gluOrtho2D()
下一篇:iOS企业应用发布教程

相关文章

相关评论

本站评论功能暂时取消,后续此功能例行通知。

一、不得利用本站危害国家安全、泄露国家秘密,不得侵犯国家社会集体的和公民的合法权益,不得利用本站制作、复制和传播不法有害信息!

二、互相尊重,对自己的言论和行为负责。

好贷网好贷款