Java,Netty,实现HTTP服务器案例,实现简单的TCP通信案例

Java
285
0
0
2023-06-28
标签   网络协议

netty 实现HTTP服务器

异步事件驱动,网络应用程序框架,快速开发可维护的高性能协议服务器和客户端。

Pipeline处理链,由一系列ChannelInboundHandler和ChannelOutbound handler 串联组成,ChannelInboundHandler是用来Inbound事件的处理程序,ChannelOutboundHandler是Outbound事件的处理程序。

HTTP请求及常见的 Content-Type 类型

1、application/x-www-form-urlencoded, POST提交数据的方式,原生Form表单,如果不设置enctype属性的默认方式;

2、multipart/form-data,POST提交数据的方式,Form表单的enctype设置为multipart/form-data,表单的数据处理为一条消息,以标签为单元,用分隔符(这就是boundary的作用)分开,这种方式将数据有很多部分,它既可以上传键值对,也可以上传文件,甚至多个文件。

3、application/ JSON ,JSON格式。

4、binary (application/octet-stream),只能提交 二进制 。

实现HTTP服务器案例

 <!--  Netty -all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>.1.60.Final</version>
</dependency>
<!--  protobuf -java -->
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>.15.6</version>
</dependency> 
 package com.what.netty.http;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel. socket .nio.NioServerSocketChannel;

public class NettyHttpServer {

    public  static   void  main(String[] args) {
        // 访问地址:
        int serverPort =;
        // 初始化==>用于Acceptor的主" 线程池 "
        EventLoopGroup bossEventGroup = new NioEventLoopGroup();
        // 初始化==>用于I/O工作的从"线程池"
        NioEventLoopGroup workerEventGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // group方法设置主从线程池
            serverBootstrap.group(bossEventGroup, workerEventGroup)
                    // 指定通道channel类型,服务端为:NioServerSocketChannel
                    .channel(NioServerSocketChannel.class)
                    // 添加  Handler 
                    .childHandler(new NettyHttpServerInitializer());
            // 绑定并侦听端口
            ChannelFuture channelFuture = serverBootstrap.bind(serverPort).sync();
            // 等待服务监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } catch (Interrupted Exception  e) {
            e.printStackTrace();
        } finally {
            // 优雅退出,释放"线程池"
            bossEventGroup. shutdown Gracefully();
            workerEventGroup.shutdownGracefully();
        }
    }
} 
 package com.what.netty.http;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;

/**
 * netty 实现简单的 http 协议:配置 解码器、handler
 */
public class NettyHttpServerInitializer  extends  ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 管道中添加 netty 提供的 http 编解码器
        // HttpServerCodec==>http编解码的处理类
        pipeline.addLast("HttpServerCodec", new HttpServerCodec());
        pipeline.addLast("MyNettyHttpServerHandler", new NettyHttpServerHandler());
    }

} 
 package com.what.netty.http;

import io.netty.buffer. byte Buf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util. charset Util;

import java.net.URI;

/**
 * netty 实现简单的 http:handler
 */
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {

    /**
     * 触发读取事件
     *
     * @param channelHandlerContext 通道上下文
     * @param httpObject            http数据
     * @throws Exception 异常
     */
    @Override
    protected void channelRead(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
        if (httpObject  instanceof  Http Request ) {
            System.out.println("[服务端] 数据类型: " + httpObject.getClass());
            System.out.println("[服务端] 客户端地址: " + channelHandlerContext.channel().remoteAddress());

            HttpRequest httpRequest = (HttpRequest) httpObject;
            final URI uri = new URI(httpRequest.uri());
            if ("/favico.ico".equals(uri.getPath())) {
                System.out.println("[服务端]请求了 favico.ico,不做处理 ");
                return;
            }
            ByteBuf byteBuf = Unpooled.copiedBuffer("[http helloworld]", CharsetUtil.UTF_);
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP__1, HttpResponseStatus.OK, byteBuf);
            response. Header s().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());

            channelHandlerContext.writeAndFlush(response);
        }
    }

} 

实现简单的TCP通信案例

服务器端:

 package com.what.netty.channel.demo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyTcpServer {

    public static void main(String[] args) {
        // 初始化==>用于Acceptor的主"线程池"
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 初始化==>用于I/O工作的从"线程池"
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // group方法设置主从线程池
            serverBootstrap.group(bossGroup, workerGroup)
                    // 使用 NioServerSocketChannel作为服务器的通道实现
                    .channel(NioServerSocketChannel.class)
                    // 设置 线程队列 等待 连接的 个数
                    .option(ChannelOption.SO_BACKLOG,)
                    // 设置保持 活动/生存的 连接状态
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    // 设置管道工厂
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyTcpServerHandler());
                        }
                    });
            System.out.println("[服务器]启动成功");
            // 绑定并侦听端口
            ChannelFuture channelFuture = serverBootstrap.bind().sync();
            // 等待服务监听端口关闭
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 优雅退出,释放"线程池"
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

} 
 package com.what.netty.channel.demo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;
import lombok.extern.slfj.Slf4j;

import  java .util.concurrent.TimeUnit;

@Slfj
public class NettyTcpServerHandler implements ChannelInboundHandler {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered()" + ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelUnregistered()" + ctx);
    }

    /**
     * 客户端发起连接,服务端通道就绪,触发本方法
     *
     * @param ctx 通道
     * @throws Exception 异常
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive()" + ctx);
        System.out.println("[服务端]通道连接准备就绪");
    }

    @Override
    public void channelInactive(ChannelHandlerContext var) throws Exception {
        System.out.println("channelInactive()" + var);
    }

    /**
     * 客户端连接后发送数据,服务端接收数据时触发
     *
     * @param ctx 通道
     * @param msg 数据
     * @throws Exception 异常
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channelRead()" + ctx);
        // 用户 自定义 普通任务
        ctx.channel().eventLoop().execute(() -> {
            try {
                TimeUnit.SECONDS.sleep();
                ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]客户端处理耗时阻塞", CharsetUtil.UTF_));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 自定义 定时任务 提交到 ScheduledTaskQueue队列中。
        ctx.channel().eventLoop().schedule(() -> {
            try {
                TimeUnit.SECONDS.sleep();
                ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]客户端定时任务", CharsetUtil.UTF_));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },, TimeUnit.SECONDS);
        // 读取 客户端上传的 信息
        System.out.println("[服务端]服务端读取线程:" + Thread.currentThread().getName());
        ByteBuf byteBuffer = (ByteBuf) msg;
         String  message = byteBuffer.toString(CharsetUtil.UTF_8);
        System.out.println("[服务端]服务端接收数据:" + message);
    }

    /**
     * 服务端数据读取结束后触发
     *
     * @param ctx 通道
     * @throws Exception 异常
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete()" + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]数据接收完毕", CharsetUtil.UTF_));
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("userEventTriggered()" + ctx);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelWritabilityChanged()" + ctx);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerAdded()" + ctx);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerRemoved()" + ctx);
    }

    /**
     * 发生异常,关闭通道
     *
     * @param ctx   通道
     * @param cause 原因
     * @throws Exception 异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

} 

客户端实现:

 package com.what.netty.channel.demo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * netty 实现 tcp服务
 */
public class NettyTcpClient {

    public static void main(String[] args) {
        // 初始化==>用于I/O工作的从"线程池"
        final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            // group方法设置线程池
            bootstrap.group(eventLoopGroup)
                    // 设置客户端 通道 是 NioSocketChannel 类型
                    .channel(NioSocketChannel.class)
                    // 设置管道工厂
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyTcpClientHandler());
                        }
                    });
            System.out.println("[客户端]启动成功");
            // 连接
            ChannelFuture channelFuture = bootstrap.connect("localhost",);
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 优雅退出,释放"线程池"
            eventLoopGroup.shutdownGracefully();
        }

    }

}
 package com.what.netty.channel.demo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;
import lombok.extern.slfj.Slf4j;

@Slfj
public class NettyTcpClientHandler implements ChannelInboundHandler {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered()" + ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelUnregistered()" + ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive()" + ctx);
        System.out.println("[客户端]通道就绪");
        ctx.writeAndFlush(Unpooled.copiedBuffer("[这儿有内鬼,终止交易]", CharsetUtil.UTF_));
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive()" + ctx);
    }

    /**
     * @param ctx 通道
     * @param msg 数据
     * @throws Exception 异常
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channelRead()" + ctx);
        ByteBuf byteBuf = (ByteBuf) msg;
        String message = byteBuf.toString(CharsetUtil.UTF_);
        System.out.println("[客户端]服务端地址:" + ctx.channel().remoteAddress() + ",服务端回复信息:" + message);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete()" + ctx);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("userEventTriggered()" + ctx);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("userEventTriggered()" + ctx);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerAdded()" + ctx);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerRemoved()" + ctx);
    }

    /**
     * 发生异常,关闭通道
     *
     * @param ctx   通道
     * @param cause 原因
     * @throws Exception 异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("exceptionCaught()" + ctx);
        cause.printStackTrace();
        ctx.close();
    }

}