目录
- 正文
- 1:如何运行项目
- 2:从客户端调用开始(springboot-zk-study项目)
- 3:服务端处理请求
- 4:接下来要做什么
正文
netty-study 这个项目是没用到的,可以删掉,主要是测试Netty自定义协议的
1:如何运行项目
1:本地起一个zookeeper服务
2: 只需要运行 rpc-server 和 springboot-zk-study二个项目即可
3: 二个项目的application.yml 都不需要改,唯一要改的就是zookeepr的连接配置信息
4:启动好之后,在浏览器访问
http://localhost:8081/zk/test
http://localhost:8081/zk/people
http://localhost:8081/zk/list
可以查看到返回结果
2:从客户端调用开始(springboot-zk-study项目)
@RestController
@RequestMapping("/zk")
public class ZkController {
@Resource
@MyResource
private UserService userService;
@Resource
@MyResource
private PeopleService peopleService;
@GetMapping("/test")
public String test() {
return userService.test("bjh-",);
}
@GetMapping("/people")
public Object people() {
return peopleService.query(L);
}
@GetMapping("/list")
public Object list() {
return peopleService.list();
}
}
只需要在我们需要进行RPC调用的接口上添加 @MyResource 注解即可,当我们执行这个方法之后,就会执行代理方法,代理方法在 rpc-core 项目中,为了阅读清晰,我只贴出重点的方法
@Slfj
public class ServiceProxy<T> implements InvocationHandler, ApplicationContextAware, ApplicationRunner {
......省略一些代码
// 客户端执行方法之后,就会执行到这里的代理方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//从注册中心拿到服务列表
ZkNodeData zkNodeData = objectMapper.readValue(nodeData, ZkNodeData.class);
List<ZkProperties> zkPropertiesList = zkNodeData.getZkPropertiesList();
for(ZkProperties zkProperties : zkPropertiesList) {
String interfaceName = zkProperties.getInterfaceName();
Class<?> declaringClass = method.getDeclaringClass();
if(StringUtils.equals(declaringClass.getName(),interfaceName)) {
List<InterfaceInfo> info = zkProperties.getInfo();
InterfaceInfo interfaceInfo = info.get();
String ipAddress = interfaceInfo.getIpAddress();
List<InterfaceImplInfo> interfaceImplInfo = interfaceInfo.getInterfaceImplInfo();
InterfaceImplInfo implInfo = interfaceImplInfo.get();
String[] strings = ipAddress.split(":");
//与远程Netty服务端发起连接
RpcClient rpcClient = connNettyServer(strings[], zkPropertiesSource.getNettyConnectPort());
/**
* 封装请求参数
*/
//获取方法参数类型
Class<?>[] parameterTypes = method.getParameterTypes();
List<String> types = getTypes(parameterTypes);
//同步调用
result = remoteCall(method.getName(), types, args, rpcClient, implInfo, interfaceName);
log.info("返回结果是:{}",result);
}
}
Class<?> returnType = method.getReturnType();
Object value = objectMapper.readValue(result.toString(), returnType);
return value;
}
private RpcClient connNettyServer(String ipAddress,Integer port) {
return new RpcClient(ipAddress,port);
}
private Object remoteCall(String methodName, List<String> argTypes, Object[] args,RpcClient rpcClient,InterfaceImplInfo implInfo,String interfaceName) throws Exception{
RpcMessage rpcMessage = new RpcMessage();
......
//发送请求
Response result = rpcClient.sendRequest(rpcMessage);
log.info("请求结果是:{}", JSONUtil.toJsonPrettyStr(result));
return result.getData();
}
......省略一些代码
我们初始化客户端连接和发送请求都在一个RpcClient的类中,我们看下这个类的代码
@Slfj
public class RpcClient {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap;
private String ip;
private Integer port;
RpcClientHandler rpcClientHandler;
private ChannelFuture channelFuture;
public RpcClient(String ip,Integer port) {
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入处理器
rpcClientHandler = new RpcClientHandler();
ch.pipeline().addLast(new RpcDecoder());
ch.pipeline().addLast(new RpcEncoder());
ch.pipeline().addLast(rpcClientHandler);
}
});
try {
// 和远程Nett服务端建立连接
channelFuture = bootstrap.connect(ip, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Response sendRequest(RpcMessage rpcMessage) throws Exception{
//发送请求
channelFuture.channel().writeAndFlush(rpcMessage).sync();
channelFuture.channel().closeFuture().sync();
log.info("获取返回结果=====================");
Response response = rpcClientHandler.getResponse();
return response;
}
}
客户端在这发送请求到服务端之后,就接收服务端返回回来的消息即可,然后将返回结果返回给我们的接口。客户端的调用就到这里了,现在看下服务端的
3:服务端处理请求
服务端处理请求的核心都在 rpc-core的 RpcServerHandler中
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcMessage> {
ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void channelRead(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {
Object obj = rpcMessage.getObj();
RpcMessage rpcMessageResponse = new RpcMessage();
Response response = new Response();
try{
Request request = objectMapper.readValue(obj.toString(), Request.class);
String interfaceImplName = request.getInterfaceImplName();
Class<?> aClass = Class.forName(interfaceImplName);
List<String> paramsTypes = request.getParamsTypes();
try {
Object result = null;
//判读方法是有参数的还是没有参数的
if(paramsTypes.isEmpty()) {
Method declaredMethod = aClass.getDeclaredMethod(request.getMethodName());
result = declaredMethod.invoke(aClass.newInstance());
}else {
Map<String, Object> paramsObjectMap = TypeParseUtil.parseTypeStringClass(paramsTypes, request.getParams().toArray());
Class<?>[] classTypes = (Class<?>[]) paramsObjectMap.get("classTypes");
Object[] args = (Object[]) paramsObjectMap.get("args");
result = aClass.getMethod(request.getMethodName(), classTypes).invoke(aClass.newInstance(), args);
}
log.info("返回结果是:{}",result);
response.setData(objectMapper.writeValueAsString(result));
response.setIsOk();
response.setErrInfo("error");
rpcMessageResponse.setObj(response);
} catch (Throwable throwable) {
throwable.printStackTrace();
response.setData("error");
response.setIsOk();
response.setErrInfo(throwable.getMessage());
rpcMessageResponse.setObj(response);
}
}catch (Exception e) {
response.setData("error");
response.setIsOk();
response.setErrInfo(e.getMessage());
rpcMessageResponse.setObj(response);
}
String valueAsString = objectMapper.writeValueAsString(response);
rpcMessageResponse.setDataLength(valueAsString.getBytes(Charset.forName("utf-")).length);
rpcMessageResponse.setObj(valueAsString);
channelHandlerContext.writeAndFlush(rpcMessageResponse);
}
}
服务端就拿到客户端传过来的接口名称,从zookeeper获取到具体的实现类,然后通过反射调用即可
4:接下来要做什么
上面只是简单的介绍了下整个调用的大概过程,还有很多问题没有解释清楚,比如
1:在客户端我们要使用UserService,但是你会发现我们使用了二个注解,一个是我们自定义的,一个是spring注入用的,但是在项目中我们并没有这个接口的实现类,spring是怎么将这个接口注入到自己容器中的呢
2: 为什么调用使用了 @MyResource的接口方法都会走代理方法,是怎么做到的
@Resource
@MyResource
private PeopleService peopleService;
3:我们的服务是怎么在服务启动的时候注册到zookeeper的,注册的信息又是什么,可以看下我们服务注册到zookeeper的信息如下
{
"zkPropertiesList": [{
"interfaceName": "com.bjh.service.PeopleService",
"info": [{
"ipAddress": ".168.83.1:9091",
"interfaceImplInfo": [{
"name": "com.bjh.service.PeopleServiceImpl",
"value": "com.bjh.service.PeopleServiceImpl"
}]
}]
}, {
"interfaceName": "com.bjh.service.UserService",
"info": [{
"ipAddress": ".168.83.1:9091",
"interfaceImplInfo": [{
"name": "com.bjh.service.UserServiceImpl",
"value": "com.bjh.service.UserServiceImpl"
}]
}]
}]
}
4:在我们的服务端的实现类,我们只使用了我们自定义的 @Service注解,这个注解不是Spring的
@Service
public class PeopleServiceImpl implements PeopleService{
@Override
public People query(long id) {
People people = new People();
people.setId(id);
people.setName("coco");
return people;
}
@Override
public List<People> list() {
List<People> list = new ArrayList<>();
People people = new People();
people.setId(L);
people.setName("coco");
People people = new People();
people.setId(124L);
people.setName("baojh");
list.add(people);
list.add(people);
return list;
}
}
5:还有客户端请求的结构体是怎么样的,还有返回响应结果是怎么样的等等,后续我会继续更新