netty 实现HTTP服务器
异步事件驱动,网络应用程序框架,快速开发可维护的高性能协议服务器和客户端。
Pipeline处理链,由一系列ChannelInboundHandler和ChannelOutbound handler 串联组成,ChannelInboundHandler是用来Inbound事件的处理程序,ChannelOutboundHandler是Outbound事件的处理程序。
HTTP请求及常见的 Content-Type 类型
1、application/x-www-form-urlencoded, POST提交数据的方式,原生Form表单,如果不设置enctype属性的默认方式;
2、multipart/form-data,POST提交数据的方式,Form表单的enctype设置为multipart/form-data,表单的数据处理为一条消息,以标签为单元,用分隔符(这就是boundary的作用)分开,这种方式将数据有很多部分,它既可以上传键值对,也可以上传文件,甚至多个文件。
3、application/ JSON ,JSON格式。
4、binary (application/octet-stream),只能提交 二进制 。
实现HTTP服务器案例
<!-- Netty -all --> | |
<dependency> | |
<groupId>io.netty</groupId> | |
<artifactId>netty-all</artifactId> | |
<version>.1.60.Final</version> | |
</dependency> | |
<!-- protobuf -java --> | |
<dependency> | |
<groupId>com.google.protobuf</groupId> | |
<artifactId>protobuf-java</artifactId> | |
<version>.15.6</version> | |
</dependency> | |
package com.what.netty.http; | |
import io.netty.bootstrap.ServerBootstrap; | |
import io.netty.channel.ChannelFuture; | |
import io.netty.channel.EventLoopGroup; | |
import io.netty.channel.nio.NioEventLoopGroup; | |
import io.netty.channel. socket .nio.NioServerSocketChannel; | |
public class NettyHttpServer { | |
public static void main(String[] args) { | |
// 访问地址: | |
int serverPort =; | |
// 初始化==>用于Acceptor的主" 线程池 " | |
EventLoopGroup bossEventGroup = new NioEventLoopGroup(); | |
// 初始化==>用于I/O工作的从"线程池" | |
NioEventLoopGroup workerEventGroup = new NioEventLoopGroup(); | |
try { | |
ServerBootstrap serverBootstrap = new ServerBootstrap(); | |
// group方法设置主从线程池 | |
serverBootstrap.group(bossEventGroup, workerEventGroup) | |
// 指定通道channel类型,服务端为:NioServerSocketChannel | |
.channel(NioServerSocketChannel.class) | |
// 添加 Handler | |
.childHandler(new NettyHttpServerInitializer()); | |
// 绑定并侦听端口 | |
ChannelFuture channelFuture = serverBootstrap.bind(serverPort).sync(); | |
// 等待服务监听端口关闭 | |
channelFuture.channel().closeFuture().sync(); | |
} catch (Interrupted Exception e) { | |
e.printStackTrace(); | |
} finally { | |
// 优雅退出,释放"线程池" | |
bossEventGroup. shutdown Gracefully(); | |
workerEventGroup.shutdownGracefully(); | |
} | |
} | |
} | |
package com.what.netty.http; | |
import io.netty.channel.ChannelInitializer; | |
import io.netty.channel.ChannelPipeline; | |
import io.netty.channel.socket.SocketChannel; | |
import io.netty.handler.codec.http.HttpServerCodec; | |
/** | |
* netty 实现简单的 http 协议:配置 解码器、handler | |
*/ | |
public class NettyHttpServerInitializer extends ChannelInitializer<SocketChannel> { | |
protected void initChannel(SocketChannel socketChannel) throws Exception { | |
ChannelPipeline pipeline = socketChannel.pipeline(); | |
// 管道中添加 netty 提供的 http 编解码器 | |
// HttpServerCodec==>http编解码的处理类 | |
pipeline.addLast("HttpServerCodec", new HttpServerCodec()); | |
pipeline.addLast("MyNettyHttpServerHandler", new NettyHttpServerHandler()); | |
} | |
} | |
package com.what.netty.http; | |
import io.netty.buffer. byte Buf; | |
import io.netty.buffer.Unpooled; | |
import io.netty.channel.ChannelHandlerContext; | |
import io.netty.channel.SimpleChannelInboundHandler; | |
import io.netty.handler.codec.http.*; | |
import io.netty.util. charset Util; | |
import java.net.URI; | |
/** | |
* netty 实现简单的 http:handler | |
*/ | |
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> { | |
/** | |
* 触发读取事件 | |
* | |
* @param channelHandlerContext 通道上下文 | |
* @param httpObject http数据 | |
* @throws Exception 异常 | |
*/ | |
protected void channelRead(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception { | |
if (httpObject instanceof Http Request ) { | |
System.out.println("[服务端] 数据类型: " + httpObject.getClass()); | |
System.out.println("[服务端] 客户端地址: " + channelHandlerContext.channel().remoteAddress()); | |
HttpRequest httpRequest = (HttpRequest) httpObject; | |
final URI uri = new URI(httpRequest.uri()); | |
if ("/favico.ico".equals(uri.getPath())) { | |
System.out.println("[服务端]请求了 favico.ico,不做处理 "); | |
return; | |
} | |
ByteBuf byteBuf = Unpooled.copiedBuffer("[http helloworld]", CharsetUtil.UTF_); | |
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP__1, HttpResponseStatus.OK, byteBuf); | |
response. Header s().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8"); | |
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes()); | |
channelHandlerContext.writeAndFlush(response); | |
} | |
} | |
} |
实现简单的TCP通信案例
服务器端:
package com.what.netty.channel.demo; | |
import io.netty.bootstrap.ServerBootstrap; | |
import io.netty.channel.ChannelFuture; | |
import io.netty.channel.ChannelInitializer; | |
import io.netty.channel.ChannelOption; | |
import io.netty.channel.EventLoopGroup; | |
import io.netty.channel.nio.NioEventLoopGroup; | |
import io.netty.channel.socket.SocketChannel; | |
import io.netty.channel.socket.nio.NioServerSocketChannel; | |
public class NettyTcpServer { | |
public static void main(String[] args) { | |
// 初始化==>用于Acceptor的主"线程池" | |
EventLoopGroup bossGroup = new NioEventLoopGroup(); | |
// 初始化==>用于I/O工作的从"线程池" | |
NioEventLoopGroup workerGroup = new NioEventLoopGroup(); | |
try { | |
ServerBootstrap serverBootstrap = new ServerBootstrap(); | |
// group方法设置主从线程池 | |
serverBootstrap.group(bossGroup, workerGroup) | |
// 使用 NioServerSocketChannel作为服务器的通道实现 | |
.channel(NioServerSocketChannel.class) | |
// 设置 线程队列 等待 连接的 个数 | |
.option(ChannelOption.SO_BACKLOG,) | |
// 设置保持 活动/生存的 连接状态 | |
.childOption(ChannelOption.SO_KEEPALIVE, true) | |
// 设置管道工厂 | |
.childHandler(new ChannelInitializer<SocketChannel>() { | |
protected void initChannel(SocketChannel socketChannel) throws Exception { | |
socketChannel.pipeline().addLast(new NettyTcpServerHandler()); | |
} | |
}); | |
System.out.println("[服务器]启动成功"); | |
// 绑定并侦听端口 | |
ChannelFuture channelFuture = serverBootstrap.bind().sync(); | |
// 等待服务监听端口关闭 | |
channelFuture.channel().closeFuture().sync(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} finally { | |
// 优雅退出,释放"线程池" | |
bossGroup.shutdownGracefully(); | |
workerGroup.shutdownGracefully(); | |
} | |
} | |
} | |
package com.what.netty.channel.demo; | |
import io.netty.buffer.ByteBuf; | |
import io.netty.buffer.Unpooled; | |
import io.netty.channel.ChannelHandlerContext; | |
import io.netty.channel.ChannelInboundHandler; | |
import io.netty.util.CharsetUtil; | |
import lombok.extern.slfj.Slf4j; | |
import java .util.concurrent.TimeUnit; | |
public class NettyTcpServerHandler implements ChannelInboundHandler { | |
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { | |
System.out.println("channelRegistered()" + ctx); | |
} | |
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { | |
System.out.println("channelUnregistered()" + ctx); | |
} | |
/** | |
* 客户端发起连接,服务端通道就绪,触发本方法 | |
* | |
* @param ctx 通道 | |
* @throws Exception 异常 | |
*/ | |
public void channelActive(ChannelHandlerContext ctx) throws Exception { | |
System.out.println("channelActive()" + ctx); | |
System.out.println("[服务端]通道连接准备就绪"); | |
} | |
public void channelInactive(ChannelHandlerContext var) throws Exception { | |
System.out.println("channelInactive()" + var); | |
} | |
/** | |
* 客户端连接后发送数据,服务端接收数据时触发 | |
* | |
* @param ctx 通道 | |
* @param msg 数据 | |
* @throws Exception 异常 | |
*/ | |
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | |
System.out.println("channelRead()" + ctx); | |
// 用户 自定义 普通任务 | |
ctx.channel().eventLoop().execute(() -> { | |
try { | |
TimeUnit.SECONDS.sleep(); | |
ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]客户端处理耗时阻塞", CharsetUtil.UTF_)); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
}); | |
// 自定义 定时任务 提交到 ScheduledTaskQueue队列中。 | |
ctx.channel().eventLoop().schedule(() -> { | |
try { | |
TimeUnit.SECONDS.sleep(); | |
ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]客户端定时任务", CharsetUtil.UTF_)); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
},, TimeUnit.SECONDS); | |
// 读取 客户端上传的 信息 | |
System.out.println("[服务端]服务端读取线程:" + Thread.currentThread().getName()); | |
ByteBuf byteBuffer = (ByteBuf) msg; | |
String message = byteBuffer.toString(CharsetUtil.UTF_8); | |
System.out.println("[服务端]服务端接收数据:" + message); | |
} | |
/** | |
* 服务端数据读取结束后触发 | |
* | |
* @param ctx 通道 | |
* @throws Exception 异常 | |
*/ | |
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { | |
System.out.println("channelReadComplete()" + ctx); | |
ctx.writeAndFlush(Unpooled.copiedBuffer("[服务端]数据接收完毕", CharsetUtil.UTF_)); | |
} | |
public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception { | |
System.out.println("userEventTriggered()" + ctx); | |
} | |
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { | |
System.out.println("channelWritabilityChanged()" + ctx); | |
} | |
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { | |
System.out.println("handlerAdded()" + ctx); | |
} | |
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { | |
System.out.println("handlerRemoved()" + ctx); | |
} | |
/** | |
* 发生异常,关闭通道 | |
* | |
* @param ctx 通道 | |
* @param cause 原因 | |
* @throws Exception 异常 | |
*/ | |
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { | |
ctx.close(); | |
} | |
} |
客户端实现:
package com.what.netty.channel.demo; | |
import io.netty.bootstrap.Bootstrap; | |
import io.netty.channel.ChannelFuture; | |
import io.netty.channel.ChannelInitializer; | |
import io.netty.channel.nio.NioEventLoopGroup; | |
import io.netty.channel.socket.SocketChannel; | |
import io.netty.channel.socket.nio.NioSocketChannel; | |
/** | |
* netty 实现 tcp服务 | |
*/ | |
public class NettyTcpClient { | |
public static void main(String[] args) { | |
// 初始化==>用于I/O工作的从"线程池" | |
final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); | |
try { | |
Bootstrap bootstrap = new Bootstrap(); | |
// group方法设置线程池 | |
bootstrap.group(eventLoopGroup) | |
// 设置客户端 通道 是 NioSocketChannel 类型 | |
.channel(NioSocketChannel.class) | |
// 设置管道工厂 | |
.handler(new ChannelInitializer<SocketChannel>() { | |
protected void initChannel(SocketChannel socketChannel) throws Exception { | |
socketChannel.pipeline().addLast(new NettyTcpClientHandler()); | |
} | |
}); | |
System.out.println("[客户端]启动成功"); | |
// 连接 | |
ChannelFuture channelFuture = bootstrap.connect("localhost",); | |
channelFuture.channel().closeFuture().sync(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} finally { | |
// 优雅退出,释放"线程池" | |
eventLoopGroup.shutdownGracefully(); | |
} | |
} | |
} | |
package com.what.netty.channel.demo; | |
import io.netty.buffer.ByteBuf; | |
import io.netty.buffer.Unpooled; | |
import io.netty.channel.ChannelHandlerContext; | |
import io.netty.channel.ChannelInboundHandler; | |
import io.netty.util.CharsetUtil; | |
import lombok.extern.slfj.Slf4j; | |
public class NettyTcpClientHandler implements ChannelInboundHandler { | |
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { | |
System.out.println("channelRegistered()" + ctx); | |
} | |
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { | |
System.out.println("channelUnregistered()" + ctx); | |
} | |
public void channelActive(ChannelHandlerContext ctx) throws Exception { | |
System.out.println("channelActive()" + ctx); | |
System.out.println("[客户端]通道就绪"); | |
ctx.writeAndFlush(Unpooled.copiedBuffer("[这儿有内鬼,终止交易]", CharsetUtil.UTF_)); | |
} | |
public void channelInactive(ChannelHandlerContext ctx) throws Exception { | |
System.out.println("channelInactive()" + ctx); | |
} | |
/** | |
* @param ctx 通道 | |
* @param msg 数据 | |
* @throws Exception 异常 | |
*/ | |
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | |
System.out.println("channelRead()" + ctx); | |
ByteBuf byteBuf = (ByteBuf) msg; | |
String message = byteBuf.toString(CharsetUtil.UTF_); | |
System.out.println("[客户端]服务端地址:" + ctx.channel().remoteAddress() + ",服务端回复信息:" + message); | |
} | |
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { | |
System.out.println("channelReadComplete()" + ctx); | |
} | |
public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception { | |
System.out.println("userEventTriggered()" + ctx); | |
} | |
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { | |
System.out.println("userEventTriggered()" + ctx); | |
} | |
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { | |
System.out.println("handlerAdded()" + ctx); | |
} | |
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { | |
System.out.println("handlerRemoved()" + ctx); | |
} | |
/** | |
* 发生异常,关闭通道 | |
* | |
* @param ctx 通道 | |
* @param cause 原因 | |
* @throws Exception 异常 | |
*/ | |
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { | |
System.out.println("exceptionCaught()" + ctx); | |
cause.printStackTrace(); | |
ctx.close(); | |
} | |
} |