netty 实现HTTP服务器
异步事件驱动,网络应用程序框架,快速开发可维护的高性能协议服务器和客户端。
Pipeline处理链,由一系列ChannelInboundHandler和ChannelOutbound handler 串联组成,ChannelInboundHandler是用来Inbound事件的处理程序,ChannelOutboundHandler是Outbound事件的处理程序。
HTTP请求及常见的 Content-Type 类型
1、application/x-www-form-urlencoded, POST提交数据的方式,原生Form表单,如果不设置enctype属性的默认方式;
2、multipart/form-data,POST提交数据的方式,Form表单的enctype设置为multipart/form-data,表单的数据处理为一条消息,以标签为单元,用分隔符(这就是boundary的作用)分开,这种方式将数据有很多部分,它既可以上传键值对,也可以上传文件,甚至多个文件。
3、application/ JSON ,JSON格式。
4、binary (application/octet-stream),只能提交 二进制 。
实现HTTP服务器案例
<!-- Netty -all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>.1.60.Final</version>
</dependency>
<!-- protobuf -java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>.15.6</version>
</dependency>
package com.what.netty.http;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel. socket .nio.NioServerSocketChannel;
public class NettyHttpServer {
public static void main(String[] args) {
// 访问地址:
int serverPort =;
// 初始化==>用于Acceptor的主" 线程池 "
EventLoopGroup bossEventGroup = new NioEventLoopGroup();
// 初始化==>用于I/O工作的从"线程池"
NioEventLoopGroup workerEventGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// group方法设置主从线程池
serverBootstrap.group(bossEventGroup, workerEventGroup)
// 指定通道channel类型,服务端为:NioServerSocketChannel
.channel(NioServerSocketChannel.class)
// 添加 Handler
.childHandler(new NettyHttpServerInitializer());
// 绑定并侦听端口
ChannelFuture channelFuture = serverBootstrap.bind(serverPort).sync();
// 等待服务监听端口关闭
channelFuture.channel().closeFuture().sync();
} catch (Interrupted Exception e) {
e.printStackTrace();
} finally {
// 优雅退出,释放"线程池"
bossEventGroup. shutdown Gracefully();
workerEventGroup.shutdownGracefully();
}
}
}
package com.what.netty.http;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
/**
* netty 实现简单的 http 协议:配置 解码器、handler
*/
public class NettyHttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 管道中添加 netty 提供的 http 编解码器
// HttpServerCodec==>http编解码的处理类
pipeline.addLast("HttpServerCodec", new HttpServerCodec());
pipeline.addLast("MyNettyHttpServerHandler", new NettyHttpServerHandler());
}
}
package com.what.netty.http;
import io.netty.buffer. byte Buf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util. charset Util;
import java.net.URI;
/**
* netty 实现简单的 http:handler
*/
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
/**
* 触发读取事件
*
* @param channelHandlerContext 通道上下文
* @param httpObject http数据
* @throws Exception 异常
*/
@Override
protected void channelRead(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
if (httpObject instanceof Http Request ) {
System.out.println("[服务端] 数据类型: " + httpObject.getClass());
System.out.println("[服务端] 客户端地址: " + channelHandlerContext.channel().remoteAddress());
HttpRequest httpRequest = (HttpRequest) httpObject;
final URI uri = new URI(httpRequest.uri());
if ("/favico.ico".equals(uri.getPath())) {
System.out.println("[服务端]请求了 favico.ico,不做处理 ");
return;
}
ByteBuf byteBuf = Unpooled.copiedBuffer("[http helloworld]", CharsetUtil.UTF_);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP__1, HttpResponseStatus.OK, byteBuf);
response. Header s().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
channelHandlerContext.writeAndFlush(response);
}
}
}
实现简单的TCP通信案例
服务器端:
package com.what.netty.channel.demo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyTcpServer {
public static void main(String[] args) {
// 初始化==>用于Acceptor的主"线程池"
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 初始化==>用于I/O工作的从"线程池"
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// group方法设置主从线程池
serverBootstrap.group(bossGroup, workerGroup)
// 使用 NioServerSocketChannel作为服务器的通道实现
.channel(NioServerSocketChannel.class)
// 设置 线程队列 等待 连接的 个数
.option(ChannelOption.SO_BACKLOG,)
// 设置保持 活动/生存的 连接状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 设置管道工厂
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyTcpServerHandler());
}
});
System.out.println("[服务器]启动成功");
// 绑定并侦听端口
ChannelFuture channelFuture = serverBootstrap.bind().sync();
// 等待服务监听端口关闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 优雅退出,释放"线程池"
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.what.netty.channel.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;
import lombok.extern.slfj.Slf4j;
import java .util.concurrent.TimeUnit;
@Slfj
public class NettyTcpServerHandler implements ChannelInboundHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered()" + ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered()" + ctx);
}
/**
* 客户端发起连接,服务端通道就绪,触发本方法
*
* @param ctx 通道
* @throws Exception 异常
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive()" + ctx);
System.out.println("[服务端]通道连接准备就绪");
}
@Override
public void channelInactive(ChannelHandlerContext var) throws Exception {
System.out.println("channelInactive()" + var);
}
/**
* 客户端连接后发送数据,服务端接收数据时触发
*
* @param ctx 通道
* @param msg 数据
* @throws Exception 异常
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead()" + ctx);
// 用户 自定义 普通任务
ctx.channel().eventLoop().execute(() -> {
try {
TimeUnit.SECONDS.sleep();
ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]客户端处理耗时阻塞", CharsetUtil.UTF_));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 自定义 定时任务 提交到 ScheduledTaskQueue队列中。
ctx.channel().eventLoop().schedule(() -> {
try {
TimeUnit.SECONDS.sleep();
ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]客户端定时任务", CharsetUtil.UTF_));
} catch (InterruptedException e) {
e.printStackTrace();
}
},, TimeUnit.SECONDS);
// 读取 客户端上传的 信息
System.out.println("[服务端]服务端读取线程:" + Thread.currentThread().getName());
ByteBuf byteBuffer = (ByteBuf) msg;
String message = byteBuffer.toString(CharsetUtil.UTF_8);
System.out.println("[服务端]服务端接收数据:" + message);
}
/**
* 服务端数据读取结束后触发
*
* @param ctx 通道
* @throws Exception 异常
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete()" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]数据接收完毕", CharsetUtil.UTF_));
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("userEventTriggered()" + ctx);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelWritabilityChanged()" + ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded()" + ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved()" + ctx);
}
/**
* 发生异常,关闭通道
*
* @param ctx 通道
* @param cause 原因
* @throws Exception 异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端实现:
package com.what.netty.channel.demo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* netty 实现 tcp服务
*/
public class NettyTcpClient {
public static void main(String[] args) {
// 初始化==>用于I/O工作的从"线程池"
final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
// group方法设置线程池
bootstrap.group(eventLoopGroup)
// 设置客户端 通道 是 NioSocketChannel 类型
.channel(NioSocketChannel.class)
// 设置管道工厂
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyTcpClientHandler());
}
});
System.out.println("[客户端]启动成功");
// 连接
ChannelFuture channelFuture = bootstrap.connect("localhost",);
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 优雅退出,释放"线程池"
eventLoopGroup.shutdownGracefully();
}
}
}
package com.what.netty.channel.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;
import lombok.extern.slfj.Slf4j;
@Slfj
public class NettyTcpClientHandler implements ChannelInboundHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered()" + ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered()" + ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive()" + ctx);
System.out.println("[客户端]通道就绪");
ctx.writeAndFlush(Unpooled.copiedBuffer("[这儿有内鬼,终止交易]", CharsetUtil.UTF_));
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive()" + ctx);
}
/**
* @param ctx 通道
* @param msg 数据
* @throws Exception 异常
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead()" + ctx);
ByteBuf byteBuf = (ByteBuf) msg;
String message = byteBuf.toString(CharsetUtil.UTF_);
System.out.println("[客户端]服务端地址:" + ctx.channel().remoteAddress() + ",服务端回复信息:" + message);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete()" + ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("userEventTriggered()" + ctx);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("userEventTriggered()" + ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded()" + ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved()" + ctx);
}
/**
* 发生异常,关闭通道
*
* @param ctx 通道
* @param cause 原因
* @throws Exception 异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught()" + ctx);
cause.printStackTrace();
ctx.close();
}
}