Netty 基于Http协议下的数据传输示例

jopen 7年前

Http/Https协议是最重要最常用到的协议之一,Netty提供了一些了的Handler来处理Http协议下的编码工作。下面就介绍一个Netty实例:

1.通过HttpClient发送Protobuf类型数据到服务端

2.服务端Netty负责把接收到的Http请求中的数据再发送到客户端。

3.其中Netty对发送的数据量没有限制,因为Http发送的message往往是由一系列infragment构成,Netty可以把接收到的http请求片段信息整合(Aggregator)到一起,最终得到一个FullHttpRequest。

 

Client端:

    package NettyDemo.echo.client;                import java.io.IOException;        import org.apache.http.HttpEntity;        import org.apache.http.HttpHost;        import org.apache.http.HttpResponse;        import org.apache.http.client.ClientProtocolException;        import org.apache.http.client.methods.HttpPost;        import org.apache.http.conn.params.ConnRoutePNames;        import org.apache.http.entity.ByteArrayEntity;        import org.apache.http.impl.client.DefaultHttpClient;        import NettyDemo.echo.protocal.AddressBookProtos;        import NettyDemo.echo.server.HttpProtobufServer;                public class HttpClientDemo {            public static void main(String[] args) throws ClientProtocolException,                    IOException {                DefaultHttpClient httpclient = new DefaultHttpClient();                HttpHost proxy = new HttpHost(HttpProtobufServer.IP,                        HttpProtobufServer.PORT);                httpclient.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY,                        proxy);                HttpPost httppost = new HttpPost("");                AddressBookProtos.AddressBook.Builder address = AddressBookProtos.AddressBook                        .newBuilder();                for (int i = 0; i < 20000; i++) {                    AddressBookProtos.Person.Builder person = AddressBookProtos.Person                            .newBuilder();                    person.setName("HeavenWang");                    person.setId(i);                    person.setEmail("wanghouda@126.com");                    address.addPerson(person);                }                ByteArrayEntity entity = new ByteArrayEntity(address.build().toByteArray());                httppost.setEntity(entity);                HttpResponse response = httpclient.execute(httppost);                HttpEntity receiveEntity = response.getEntity();                System.out.println("----------------------------------------");                System.out.println(response.getStatusLine());                if (receiveEntity != null) {                    System.out.println("Response content length: " + receiveEntity.getContentLength());                }                System.out.println("success");            }        }  

服务器端NettyService:


package NettyDemo.echo.server;        import io.netty.bootstrap.ServerBootstrap;    import io.netty.channel.ChannelInitializer;    import io.netty.channel.ChannelPipeline;    import io.netty.channel.EventLoopGroup;    import io.netty.channel.nio.NioEventLoopGroup;    import io.netty.channel.socket.SocketChannel;    import io.netty.channel.socket.nio.NioServerSocketChannel;    import io.netty.handler.codec.http.HttpObjectAggregator;    import io.netty.handler.codec.http.HttpRequestDecoder;    import io.netty.handler.codec.http.HttpResponseEncoder;    import io.netty.handler.codec.http.HttpServerCodec;    import NettyDemo.echo.handler.HttpProtobufServerHandler;        public class HttpProtobufServer {        public static final String IP = "127.0.0.1";        public static final int PORT = 8080;        //private static MessageLite lite=AddressBookProtos.AddressBook.getDefaultInstance();        /**用于分配处理业务线程的线程组个数 */        protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2; //默认        /** 业务出现线程大小*/        protected static final int BIZTHREADSIZE = 4;        private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);        private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);        protected static void run() throws Exception {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup);            b.channel(NioServerSocketChannel.class);            b.childHandler(new ChannelInitializer<SocketChannel>() {                @Override                public void initChannel(SocketChannel ch) throws Exception {                    ChannelPipeline pipeline = ch.pipeline();                                   pipeline.addLast("decoder", new HttpRequestDecoder());                    /**usually we receive http message infragment,if we want full http message,                    * we should bundle HttpObjectAggregator and we can get FullHttpRequest。                    * 我们通常接收到的是一个http片段,如果要想完整接受一次请求的所有数据,我们需要绑定HttpObjectAggregator,然后我们                    * 就可以收到一个FullHttpRequest-是一个完整的请求信息。                   **/                    pipeline.addLast("servercodec",new HttpServerCodec());                    pipeline.addLast("aggegator",new HttpObjectAggregator(1024*1024*64));//定义缓冲数据量                    pipeline.addLast(new HttpProtobufServerHandler());                    pipeline.addLast("responseencoder",new HttpResponseEncoder());                }            });            b.bind(IP, PORT).sync();            System.out.println("TCP服务器已启动");        }            protected static void shutdown() {            workerGroup.shutdownGracefully();            bossGroup.shutdownGracefully();        }            public static void main(String[] args) throws Exception {            System.out.println("开始启动http服务器...");            HttpProtobufServer.run();    //      TcpServer.shutdown();        }    }  

Handler:

    package NettyDemo.echo.handler;                import io.netty.channel.ChannelHandlerContext;        import io.netty.channel.SimpleChannelInboundHandler;        import io.netty.handler.codec.http.DefaultFullHttpRequest;        import io.netty.handler.codec.http.DefaultFullHttpResponse;        import io.netty.handler.codec.http.HttpResponseStatus;        import io.netty.handler.codec.http.HttpVersion;                public class HttpProtobufServerHandler extends SimpleChannelInboundHandler<Object>{            @Override            protected void channelRead0(ChannelHandlerContext ctx,  Object msg) throws Exception {                DefaultFullHttpRequest request=(DefaultFullHttpRequest)msg;                DefaultFullHttpResponse response=new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.ACCEPTED, request.content());                ctx.writeAndFlush(response);            }                            }  


附:FullHttpRequest构成,因此一个FullHttpRequest会包含请求message的所有片段。

20140402151452921.png


来自:http://blog.csdn.net/suifeng3051/article/details/22800171