目录
- 正文
- 1:如何运行项目
- 2:从客户端调用开始(springboot-zk-study项目)
- 3:服务端处理请求
- 4:接下来要做什么
正文
netty-study 这个项目是没用到的,可以删掉,主要是测试Netty自定义协议的
1:如何运行项目
1:本地起一个zookeeper服务
2: 只需要运行 rpc-server 和 springboot-zk-study二个项目即可
3: 二个项目的application.yml 都不需要改,唯一要改的就是zookeepr的连接配置信息
4:启动好之后,在浏览器访问
http://localhost:8081/zk/test
http://localhost:8081/zk/people
http://localhost:8081/zk/list
可以查看到返回结果
2:从客户端调用开始(springboot-zk-study项目)
@RestController | |
@RequestMapping("/zk") | |
public class ZkController { | |
@Resource | |
@MyResource | |
private UserService userService; | |
@Resource | |
@MyResource | |
private PeopleService peopleService; | |
@GetMapping("/test") | |
public String test() { | |
return userService.test("bjh-",); | |
} | |
@GetMapping("/people") | |
public Object people() { | |
return peopleService.query(L); | |
} | |
@GetMapping("/list") | |
public Object list() { | |
return peopleService.list(); | |
} | |
} |
只需要在我们需要进行RPC调用的接口上添加 @MyResource 注解即可,当我们执行这个方法之后,就会执行代理方法,代理方法在 rpc-core 项目中,为了阅读清晰,我只贴出重点的方法
public class ServiceProxy<T> implements InvocationHandler, ApplicationContextAware, ApplicationRunner { | |
......省略一些代码 | |
// 客户端执行方法之后,就会执行到这里的代理方法 | |
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { | |
//从注册中心拿到服务列表 | |
ZkNodeData zkNodeData = objectMapper.readValue(nodeData, ZkNodeData.class); | |
List<ZkProperties> zkPropertiesList = zkNodeData.getZkPropertiesList(); | |
for(ZkProperties zkProperties : zkPropertiesList) { | |
String interfaceName = zkProperties.getInterfaceName(); | |
Class<?> declaringClass = method.getDeclaringClass(); | |
if(StringUtils.equals(declaringClass.getName(),interfaceName)) { | |
List<InterfaceInfo> info = zkProperties.getInfo(); | |
InterfaceInfo interfaceInfo = info.get(); | |
String ipAddress = interfaceInfo.getIpAddress(); | |
List<InterfaceImplInfo> interfaceImplInfo = interfaceInfo.getInterfaceImplInfo(); | |
InterfaceImplInfo implInfo = interfaceImplInfo.get(); | |
String[] strings = ipAddress.split(":"); | |
//与远程Netty服务端发起连接 | |
RpcClient rpcClient = connNettyServer(strings[], zkPropertiesSource.getNettyConnectPort()); | |
/** | |
* 封装请求参数 | |
*/ | |
//获取方法参数类型 | |
Class<?>[] parameterTypes = method.getParameterTypes(); | |
List<String> types = getTypes(parameterTypes); | |
//同步调用 | |
result = remoteCall(method.getName(), types, args, rpcClient, implInfo, interfaceName); | |
log.info("返回结果是:{}",result); | |
} | |
} | |
Class<?> returnType = method.getReturnType(); | |
Object value = objectMapper.readValue(result.toString(), returnType); | |
return value; | |
} | |
private RpcClient connNettyServer(String ipAddress,Integer port) { | |
return new RpcClient(ipAddress,port); | |
} | |
private Object remoteCall(String methodName, List<String> argTypes, Object[] args,RpcClient rpcClient,InterfaceImplInfo implInfo,String interfaceName) throws Exception{ | |
RpcMessage rpcMessage = new RpcMessage(); | |
...... | |
//发送请求 | |
Response result = rpcClient.sendRequest(rpcMessage); | |
log.info("请求结果是:{}", JSONUtil.toJsonPrettyStr(result)); | |
return result.getData(); | |
} | |
......省略一些代码 |
我们初始化客户端连接和发送请求都在一个RpcClient的类中,我们看下这个类的代码
public class RpcClient { | |
EventLoopGroup group = new NioEventLoopGroup(); | |
Bootstrap bootstrap; | |
private String ip; | |
private Integer port; | |
RpcClientHandler rpcClientHandler; | |
private ChannelFuture channelFuture; | |
public RpcClient(String ip,Integer port) { | |
bootstrap = new Bootstrap(); | |
bootstrap.group(group) | |
.channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现 | |
.handler(new ChannelInitializer<SocketChannel>() { | |
protected void initChannel(SocketChannel ch) throws Exception { | |
//加入处理器 | |
rpcClientHandler = new RpcClientHandler(); | |
ch.pipeline().addLast(new RpcDecoder()); | |
ch.pipeline().addLast(new RpcEncoder()); | |
ch.pipeline().addLast(rpcClientHandler); | |
} | |
}); | |
try { | |
// 和远程Nett服务端建立连接 | |
channelFuture = bootstrap.connect(ip, port).sync(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
public Response sendRequest(RpcMessage rpcMessage) throws Exception{ | |
//发送请求 | |
channelFuture.channel().writeAndFlush(rpcMessage).sync(); | |
channelFuture.channel().closeFuture().sync(); | |
log.info("获取返回结果====================="); | |
Response response = rpcClientHandler.getResponse(); | |
return response; | |
} | |
} |
客户端在这发送请求到服务端之后,就接收服务端返回回来的消息即可,然后将返回结果返回给我们的接口。客户端的调用就到这里了,现在看下服务端的
3:服务端处理请求
服务端处理请求的核心都在 rpc-core的 RpcServerHandler中
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcMessage> { | |
ObjectMapper objectMapper = new ObjectMapper(); | |
protected void channelRead(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception { | |
Object obj = rpcMessage.getObj(); | |
RpcMessage rpcMessageResponse = new RpcMessage(); | |
Response response = new Response(); | |
try{ | |
Request request = objectMapper.readValue(obj.toString(), Request.class); | |
String interfaceImplName = request.getInterfaceImplName(); | |
Class<?> aClass = Class.forName(interfaceImplName); | |
List<String> paramsTypes = request.getParamsTypes(); | |
try { | |
Object result = null; | |
//判读方法是有参数的还是没有参数的 | |
if(paramsTypes.isEmpty()) { | |
Method declaredMethod = aClass.getDeclaredMethod(request.getMethodName()); | |
result = declaredMethod.invoke(aClass.newInstance()); | |
}else { | |
Map<String, Object> paramsObjectMap = TypeParseUtil.parseTypeStringClass(paramsTypes, request.getParams().toArray()); | |
Class<?>[] classTypes = (Class<?>[]) paramsObjectMap.get("classTypes"); | |
Object[] args = (Object[]) paramsObjectMap.get("args"); | |
result = aClass.getMethod(request.getMethodName(), classTypes).invoke(aClass.newInstance(), args); | |
} | |
log.info("返回结果是:{}",result); | |
response.setData(objectMapper.writeValueAsString(result)); | |
response.setIsOk(); | |
response.setErrInfo("error"); | |
rpcMessageResponse.setObj(response); | |
} catch (Throwable throwable) { | |
throwable.printStackTrace(); | |
response.setData("error"); | |
response.setIsOk(); | |
response.setErrInfo(throwable.getMessage()); | |
rpcMessageResponse.setObj(response); | |
} | |
}catch (Exception e) { | |
response.setData("error"); | |
response.setIsOk(); | |
response.setErrInfo(e.getMessage()); | |
rpcMessageResponse.setObj(response); | |
} | |
String valueAsString = objectMapper.writeValueAsString(response); | |
rpcMessageResponse.setDataLength(valueAsString.getBytes(Charset.forName("utf-")).length); | |
rpcMessageResponse.setObj(valueAsString); | |
channelHandlerContext.writeAndFlush(rpcMessageResponse); | |
} | |
} |
服务端就拿到客户端传过来的接口名称,从zookeeper获取到具体的实现类,然后通过反射调用即可
4:接下来要做什么
上面只是简单的介绍了下整个调用的大概过程,还有很多问题没有解释清楚,比如
1:在客户端我们要使用UserService,但是你会发现我们使用了二个注解,一个是我们自定义的,一个是spring注入用的,但是在项目中我们并没有这个接口的实现类,spring是怎么将这个接口注入到自己容器中的呢
2: 为什么调用使用了 @MyResource的接口方法都会走代理方法,是怎么做到的
@Resource | |
@MyResource | |
private PeopleService peopleService; |
3:我们的服务是怎么在服务启动的时候注册到zookeeper的,注册的信息又是什么,可以看下我们服务注册到zookeeper的信息如下
{ | |
"zkPropertiesList": [{ | |
"interfaceName": "com.bjh.service.PeopleService", | |
"info": [{ | |
"ipAddress": ".168.83.1:9091", | |
"interfaceImplInfo": [{ | |
"name": "com.bjh.service.PeopleServiceImpl", | |
"value": "com.bjh.service.PeopleServiceImpl" | |
}] | |
}] | |
}, { | |
"interfaceName": "com.bjh.service.UserService", | |
"info": [{ | |
"ipAddress": ".168.83.1:9091", | |
"interfaceImplInfo": [{ | |
"name": "com.bjh.service.UserServiceImpl", | |
"value": "com.bjh.service.UserServiceImpl" | |
}] | |
}] | |
}] | |
} |
4:在我们的服务端的实现类,我们只使用了我们自定义的 @Service注解,这个注解不是Spring的
public class PeopleServiceImpl implements PeopleService{ | |
public People query(long id) { | |
People people = new People(); | |
people.setId(id); | |
people.setName("coco"); | |
return people; | |
} | |
public List<People> list() { | |
List<People> list = new ArrayList<>(); | |
People people = new People(); | |
people.setId(L); | |
people.setName("coco"); | |
People people = new People(); | |
people.setId(124L); | |
people.setName("baojh"); | |
list.add(people); | |
list.add(people); | |
return list; | |
} | |
} |
5:还有客户端请求的结构体是怎么样的,还有返回响应结果是怎么样的等等,后续我会继续更新