使用Netty进行文件传输

发布时间:2016-12-8 15:56:11 编辑:www.fx114.net 分享查询网我要评论
本篇文章主要介绍了"使用Netty进行文件传输",主要涉及到使用Netty进行文件传输方面的内容,对于使用Netty进行文件传输感兴趣的同学可以参考一下。

在写出了Netty Hello World 和 netty对象传输之后,又觉得不够,看了官网的例子,所以有了现在的这个文件传输。 顺便说下,netty官网的例子真的好,如果要学习netty,还是看官网例子的好。 不过我英文不太好,刚开始走了绕了好大一圈,但是现在熟悉了之后,回过头来看,还是官网的牛X。 在这里再说下netty的零拷贝,这个零拷贝是netty在3.2版本中新加入的功能。 其主要指的是在进行一些比较大的传输比如对象或者文件传输的时候,通过改变数组索引的方式,将数据传输到特定的channel上。 是的channel之间的转换是零拷贝,例如:ByteBuffer和ChannelBuffer 下面就把文件传输的例子贴出来吧。 1、FileClient.java [java] view plaincopyprint? package filetrans;    import static org.jboss.netty.channel.Channels.pipeline;    import java.net.InetSocketAddress;  import java.util.concurrent.Executors;    import org.jboss.netty.bootstrap.ClientBootstrap;  import org.jboss.netty.channel.ChannelFuture;  import org.jboss.netty.channel.ChannelPipeline;  import org.jboss.netty.channel.ChannelPipelineFactory;  import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;  import org.jboss.netty.handler.codec.http.DefaultHttpRequest;  import org.jboss.netty.handler.codec.http.HttpMethod;  import org.jboss.netty.handler.codec.http.HttpRequest;  import org.jboss.netty.handler.codec.http.HttpRequestEncoder;  import org.jboss.netty.handler.codec.http.HttpResponseDecoder;  import org.jboss.netty.handler.codec.http.HttpVersion;  import org.jboss.netty.handler.stream.ChunkedWriteHandler;    public class FileClient  {      public static void main(String[] args)      {            ClientBootstrap bootstrap = new ClientBootstrap(                  new NioClientSocketChannelFactory(                          Executors.newCachedThreadPool(),                          Executors.newCachedThreadPool()));          bootstrap.setPipelineFactory(new ChannelPipelineFactory()          {                @Override              public ChannelPipeline getPipeline() throws Exception              {                  ChannelPipeline pipeline = pipeline();                    pipeline.addLast("decoder", new HttpResponseDecoder());                                    /*                  * 不能添加这个,对传输文件 进行了大小的限制。。。。。                  */  //                pipeline.addLast("aggregator", new HttpChunkAggregator(6048576));                  pipeline.addLast("encoder", new HttpRequestEncoder());                  pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());                  pipeline.addLast("handler", new FileClientHandler());                    return pipeline;              }            });            ChannelFuture future = bootstrap.connect(new InetSocketAddress(                  "localhost", 8080));            /*          * 这里为了保证connect连接,所以才进行了sleep          * 当然也可以通过future的connect属性判断          */          try          {              Thread.sleep(3000);          } catch (InterruptedException e)          {              e.printStackTrace();          }          HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1,                  HttpMethod.GET, "DSC01575.JPG");          future.getChannel().write(request);            // Wait until the connection is closed or the connection attempt fails.          future.getChannel().getCloseFuture().awaitUninterruptibly();            // Shut down thread pools to exit.          bootstrap.releaseExternalResources();        }  }   package filetrans; import static org.jboss.netty.channel.Channels.pipeline; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.handler.codec.http.DefaultHttpRequest; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpRequest; import org.jboss.netty.handler.codec.http.HttpRequestEncoder; import org.jboss.netty.handler.codec.http.HttpResponseDecoder; import org.jboss.netty.handler.codec.http.HttpVersion; import org.jboss.netty.handler.stream.ChunkedWriteHandler; public class FileClient { public static void main(String[] args) { ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = pipeline(); pipeline.addLast("decoder", new HttpResponseDecoder()); /* * 不能添加这个,对传输文件 进行了大小的限制。。。。。 */ // pipeline.addLast("aggregator", new HttpChunkAggregator(6048576)); pipeline.addLast("encoder", new HttpRequestEncoder()); pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); pipeline.addLast("handler", new FileClientHandler()); return pipeline; } }); ChannelFuture future = bootstrap.connect(new InetSocketAddress( "localhost", 8080)); /* * 这里为了保证connect连接,所以才进行了sleep * 当然也可以通过future的connect属性判断 */ try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "DSC01575.JPG"); future.getChannel().write(request); // Wait until the connection is closed or the connection attempt fails. future.getChannel().getCloseFuture().awaitUninterruptibly(); // Shut down thread pools to exit. bootstrap.releaseExternalResources(); } } 2、FileClientHandler.java [java] view plaincopyprint? package filetrans;    import java.io.File;  import java.io.FileOutputStream;    import org.jboss.netty.buffer.ChannelBuffer;  import org.jboss.netty.channel.ChannelHandlerContext;  import org.jboss.netty.channel.ExceptionEvent;  import org.jboss.netty.channel.MessageEvent;  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;  import org.jboss.netty.handler.codec.http.DefaultHttpResponse;  import org.jboss.netty.handler.codec.http.HttpChunk;  import org.jboss.netty.handler.codec.http.HttpResponse;    public class FileClientHandler extends SimpleChannelUpstreamHandler  {      private volatile boolean readingChunks;      private File downloadFile;      private FileOutputStream fOutputStream = null;        @Override      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)              throws Exception      {          /*          * 按照channle的顺序进行处理          * server先发送HttpResponse过来,所以这里先对HttpResponse进行处理,进行文件判断之类          * 之后,server发送的都是ChunkedFile了。          */                    if (e.getMessage() instanceof HttpResponse)          {              DefaultHttpResponse httpResponse = (DefaultHttpResponse) e                      .getMessage();              String fileName = httpResponse.getHeader("fileName");              downloadFile = new File(System.getProperty("user.dir")                      + File.separator + "recived_" + fileName);              readingChunks = httpResponse.isChunked();          } else          {              HttpChunk httpChunk = (HttpChunk) e.getMessage();              if (!httpChunk.isLast())              {                  ChannelBuffer buffer = httpChunk.getContent();                  if (fOutputStream == null)                  {                      fOutputStream = new FileOutputStream(downloadFile);                  }                  while (buffer.readable())                  {                      byte[] dst = new byte[buffer.readableBytes()];                      buffer.readBytes(dst);                      fOutputStream.write(dst);                  }              } else              {                  readingChunks = false;              }              fOutputStream.flush();          }          if (!readingChunks)          {              fOutputStream.close();              e.getChannel().close();          }      }        @Override      public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)              throws Exception      {          System.out.println(e.getCause());      }  }   package filetrans; import java.io.File; import java.io.FileOutputStream; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpChunk; import org.jboss.netty.handler.codec.http.HttpResponse; public class FileClientHandler extends SimpleChannelUpstreamHandler { private volatile boolean readingChunks; private File downloadFile; private FileOutputStream fOutputStream = null; @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { /* * 按照channle的顺序进行处理 * server先发送HttpResponse过来,所以这里先对HttpResponse进行处理,进行文件判断之类 * 之后,server发送的都是ChunkedFile了。 */ if (e.getMessage() instanceof HttpResponse) { DefaultHttpResponse httpResponse = (DefaultHttpResponse) e .getMessage(); String fileName = httpResponse.getHeader("fileName"); downloadFile = new File(System.getProperty("user.dir") + File.separator + "recived_" + fileName); readingChunks = httpResponse.isChunked(); } else { HttpChunk httpChunk = (HttpChunk) e.getMessage(); if (!httpChunk.isLast()) { ChannelBuffer buffer = httpChunk.getContent(); if (fOutputStream == null) { fOutputStream = new FileOutputStream(downloadFile); } while (buffer.readable()) { byte[] dst = new byte[buffer.readableBytes()]; buffer.readBytes(dst); fOutputStream.write(dst); } } else { readingChunks = false; } fOutputStream.flush(); } if (!readingChunks) { fOutputStream.close(); e.getChannel().close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { System.out.println(e.getCause()); } } 3、FileServer.java [java] view plaincopyprint? package filetrans;    import static org.jboss.netty.channel.Channels.pipeline;    import java.net.InetSocketAddress;  import java.util.concurrent.Executors;    import org.jboss.netty.bootstrap.ServerBootstrap;  import org.jboss.netty.channel.ChannelPipeline;  import org.jboss.netty.channel.ChannelPipelineFactory;  import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;  import org.jboss.netty.handler.codec.http.HttpChunkAggregator;  import org.jboss.netty.handler.codec.http.HttpRequestDecoder;  import org.jboss.netty.handler.codec.http.HttpResponseEncoder;  import org.jboss.netty.handler.stream.ChunkedWriteHandler;    public class FileServer  {      public static void main(String[] args)      {            ServerBootstrap bootstrap = new ServerBootstrap(                  new NioServerSocketChannelFactory(                          Executors.newCachedThreadPool(),                          Executors.newCachedThreadPool()));            bootstrap.setPipelineFactory(new ChannelPipelineFactory()          {                @Override              public ChannelPipeline getPipeline() throws Exception              {                  ChannelPipeline pipeline = pipeline();                  pipeline.addLast("decoder", new HttpRequestDecoder());                  pipeline.addLast("aggregator", new HttpChunkAggregator(65536));                  pipeline.addLast("encoder", new HttpResponseEncoder());                  pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());                    pipeline.addLast("handler", new FileServerHandler());                  return pipeline;              }            });            bootstrap.bind(new InetSocketAddress(8080));      }  }   package filetrans; import static org.jboss.netty.channel.Channels.pipeline; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.handler.codec.http.HttpChunkAggregator; import org.jboss.netty.handler.codec.http.HttpRequestDecoder; import org.jboss.netty.handler.codec.http.HttpResponseEncoder; import org.jboss.netty.handler.stream.ChunkedWriteHandler; public class FileServer { public static void main(String[] args) { ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = pipeline(); pipeline.addLast("decoder", new HttpRequestDecoder()); pipeline.addLast("aggregator", new HttpChunkAggregator(65536)); pipeline.addLast("encoder", new HttpResponseEncoder()); pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); pipeline.addLast("handler", new FileServerHandler()); return pipeline; } }); bootstrap.bind(new InetSocketAddress(8080)); } } 4、FileServerHandler.java [java] view plaincopyprint? package filetrans;    import static org.jboss.netty.handler.codec.http.HttpHeaders.*;  import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*;  import static org.jboss.netty.handler.codec.http.HttpMethod.*;  import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;  import static org.jboss.netty.handler.codec.http.HttpVersion.*;    import java.io.File;  import java.io.FileNotFoundException;  import java.io.RandomAccessFile;  import java.io.UnsupportedEncodingException;  import java.net.URLDecoder;    import org.jboss.netty.buffer.ChannelBuffers;  import org.jboss.netty.channel.Channel;  import org.jboss.netty.channel.ChannelFuture;  import org.jboss.netty.channel.ChannelFutureListener;  import org.jboss.netty.channel.ChannelFutureProgressListener;  import org.jboss.netty.channel.ChannelHandlerContext;  import org.jboss.netty.channel.DefaultFileRegion;  import org.jboss.netty.channel.ExceptionEvent;  import org.jboss.netty.channel.FileRegion;  import org.jboss.netty.channel.MessageEvent;  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;  import org.jboss.netty.handler.codec.frame.TooLongFrameException;  import org.jboss.netty.handler.codec.http.DefaultHttpResponse;  import org.jboss.netty.handler.codec.http.HttpRequest;  import org.jboss.netty.handler.codec.http.HttpResponse;  import org.jboss.netty.handler.codec.http.HttpResponseStatus;  import org.jboss.netty.handler.ssl.SslHandler;  import org.jboss.netty.handler.stream.ChunkedFile;  import org.jboss.netty.util.CharsetUtil;    /**  * 这里的代码主要来源于官网的例子,http里面有个例子,自己仿照server写了client  * @author Ransom  */  public class FileServerHandler extends SimpleChannelUpstreamHandler  {        @Override      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)              throws Exception      {          HttpRequest request = (HttpRequest) e.getMessage();          if (request.getMethod() != GET)          {              sendError(ctx, METHOD_NOT_ALLOWED);              return;          }            final String path = sanitizeUri(request.getUri());          if (path == null)          {              sendError(ctx, FORBIDDEN);              return;          }            File file = new File(path);          if (file.isHidden() || !file.exists())          {              sendError(ctx, NOT_FOUND);              return;          }          if (!file.isFile())          {              sendError(ctx, FORBIDDEN);              return;          }            RandomAccessFile raf;          try          {              raf = new RandomAccessFile(file, "r");          } catch (FileNotFoundException fnfe)          {              sendError(ctx, NOT_FOUND);              return;          }          long fileLength = raf.length();            HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);                    /*          * 由于是异步传输,所以不得已加入了一些属性,用来进行文件识别          */          response.addHeader("fileName", request.getUri());                    setContentLength(response, fileLength);            Channel ch = e.getChannel();            // Write the initial line and the header.          ch.write(response);            // Write the content.          ChannelFuture writeFuture;          if (ch.getPipeline().get(SslHandler.class) != null)          {              // Cannot use zero-copy with HTTPS.              writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192));          } else          {              // No encryption - use zero-copy.              final FileRegion region = new DefaultFileRegion(raf.getChannel(),                      0, fileLength);              writeFuture = ch.write(region);              writeFuture.addListener(new ChannelFutureProgressListener()              {                  public void operationComplete(ChannelFuture future)                  {                      region.releaseExternalResources();                  }                    public void operationProgressed(ChannelFuture future,                          long amount, long current, long total)                  {                      System.out.printf("%s: %d / %d (+%d)%n", path, current,                              total, amount);                  }              });          }            // Decide whether to close the connection or not.          if (!isKeepAlive(request))          {              // Close the connection when the whole content is written out.              writeFuture.addListener(ChannelFutureListener.CLOSE);          }      }        @Override      public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)              throws Exception      {          Channel ch = e.getChannel();          Throwable cause = e.getCause();          if (cause instanceof TooLongFrameException)          {              sendError(ctx, BAD_REQUEST);              return;          }            cause.printStackTrace();          if (ch.isConnected())          {              sendError(ctx, INTERNAL_SERVER_ERROR);          }      }        private String sanitizeUri(String uri)      {          // Decode the path.          try          {              uri = URLDecoder.decode(uri, "UTF-8");          } catch (UnsupportedEncodingException e)          {              try              {                  uri = URLDecoder.decode(uri, "ISO-8859-1");              } catch (UnsupportedEncodingException e1)              {                  throw new Error();              }          }            // Convert file separators.          uri = uri.replace('/', File.separatorChar);            // Simplistic dumb security check.          // You will have to do something serious in the production environment.          if (uri.contains(File.separator + ".")                  || uri.contains("." + File.separator) || uri.startsWith(".")                  || uri.endsWith("."))          {              return null;          }            // Convert to absolute path.          return System.getProperty("user.dir") + File.separator + uri;      }        private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status)      {          HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);          response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");          response.setContent(ChannelBuffers.copiedBuffer(                  "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));            // Close the connection as soon as the error message is sent.          ctx.getChannel().write(response)                  .addListener(ChannelFutureListener.CLOSE);      }  }   package filetrans; import static org.jboss.netty.handler.codec.http.HttpHeaders.*; import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*; import static org.jboss.netty.handler.codec.http.HttpMethod.*; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; import static org.jboss.netty.handler.codec.http.HttpVersion.*; import java.io.File; import java.io.FileNotFoundException; import java.io.RandomAccessFile; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelFutureProgressListener; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.DefaultFileRegion; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.FileRegion; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.handler.codec.frame.TooLongFrameException; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpRequest; import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.ssl.SslHandler; import org.jboss.netty.handler.stream.ChunkedFile; import org.jboss.netty.util.CharsetUtil; /** * 这里的代码主要来源于官网的例子,http里面有个例子,自己仿照server写了client * @author Ransom */ public class FileServerHandler extends SimpleChannelUpstreamHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { HttpRequest request = (HttpRequest) e.getMessage(); if (request.getMethod() != GET) { sendError(ctx, METHOD_NOT_ALLOWED); return; } final String path = sanitizeUri(request.getUri()); if (path == null) { sendError(ctx, FORBIDDEN); return; } File file = new File(path); if (file.isHidden() || !file.exists()) { sendError(ctx, NOT_FOUND); return; } if (!file.isFile()) { sendError(ctx, FORBIDDEN); return; } RandomAccessFile raf; try { raf = new RandomAccessFile(file, "r"); } catch (FileNotFoundException fnfe) { sendError(ctx, NOT_FOUND); return; } long fileLength = raf.length(); HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); /* * 由于是异步传输,所以不得已加入了一些属性,用来进行文件识别 */ response.addHeader("fileName", request.getUri()); setContentLength(response, fileLength); Channel ch = e.getChannel(); // Write the initial line and the header. ch.write(response); // Write the content. ChannelFuture writeFuture; if (ch.getPipeline().get(SslHandler.class) != null) { // Cannot use zero-copy with HTTPS. writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192)); } else { // No encryption - use zero-copy. final FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, fileLength); writeFuture = ch.write(region); writeFuture.addListener(new ChannelFutureProgressListener() { public void operationComplete(ChannelFuture future) { region.releaseExternalResources(); } public void operationProgressed(ChannelFuture future, long amount, long current, long total) { System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount); } }); } // Decide whether to close the connection or not. if (!isKeepAlive(request)) { // Close the connection when the whole content is written out. writeFuture.addListener(ChannelFutureListener.CLOSE); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { Channel ch = e.getChannel(); Throwable cause = e.getCause(); if (cause instanceof TooLongFrameException) { sendError(ctx, BAD_REQUEST); return; } cause.printStackTrace(); if (ch.isConnected()) { sendError(ctx, INTERNAL_SERVER_ERROR); } } private String sanitizeUri(String uri) { // Decode the path. try { uri = URLDecoder.decode(uri, "UTF-8"); } catch (UnsupportedEncodingException e) { try { uri = URLDecoder.decode(uri, "ISO-8859-1"); } catch (UnsupportedEncodingException e1) { throw new Error(); } } // Convert file separators. uri = uri.replace('/', File.separatorChar); // Simplistic dumb security check. // You will have to do something serious in the production environment. if (uri.contains(File.separator + ".") || uri.contains("." + File.separator) || uri.startsWith(".") || uri.endsWith(".")) { return null; } // Convert to absolute path. return System.getProperty("user.dir") + File.separator + uri; } private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); response.setContent(ChannelBuffers.copiedBuffer( "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8)); // Close the connection as soon as the error message is sent. ctx.getChannel().write(response) .addListener(ChannelFutureListener.CLOSE); } } Netty的例子到这里就算贴完了,后面还打算进行一些Netty实现原理,架构,代码解读方面的分析。到时候如果有什么心得,再写出来吧。  

上一篇:Mac 上Sublime Text 2配置lua环境
下一篇:浙江大学ACM俱乐部 1028:行车路程

相关文章

相关评论