概述
采用SpringColud Gateway 和 Nacos作为注册中心时,当一个实例在发布了,Gateway 并不能立刻感知,还是会把请求转发到发布中的实例上去。这里面一共有2个地方需要设置,一个就是Gateway能够 获取到最新的服务实例列表,其次 Nacos 客户端服务下线之后能立刻通知到Nacos服务端。将结合Gateway源码和Nacos Diccovery源码进行分析,该篇所用到的jar版本号为
spring-cloud-gateway-server-2.2.6.RELEASE.jar
spring-cloud-alibaba-commons-2.2.6.RELEASE.jar
spring-cloud-alibaba-nacos-discovery-2.1.1.RELEASE.jar
Nacos的客户端版本为nacos-client-1.4.2.jar【之前一直以为自己用的是Nacos2.x,1.x与2.x 差异较大】
Gateway获取可用服务
先上配置,后面代码分析为何这样配置
#gateway yml配置 轮训获取Nacos最新实例的 间隔
ribbon:
ServerListRefreshInterval: 1000 #毫秒,默认为30毫秒
LoadBalancerClientFilter
会根据 lb:// 前缀过滤处理,做负载均衡,选择最终要调用的服务地址 ,也是本次调试的入口
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null
|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
// preserve the original url
addOriginalRequestUrl(exchange, url);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url before: " + url);
}
//选择服务实例,主要关注该方法
final ServiceInstance instance = choose(exchange);
if (instance == null) {
throw NotFoundException.create(properties.isUse404(),
"Unable to find instance for " + url.getHost());
}
URI uri = exchange.getRequest().getURI();
// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = instance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
URI requestUrl = loadBalancer.reconstructURI(
new DelegatingServiceInstance(instance, overrideScheme), uri);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}
RibbonLoadBalancerClient
通过Ribbon负载寻找一个服务
/**
* New: Select a server using a 'key'.
* @param serviceId of the service to choose an instance for
* @param hint to specify the service instance
* @return the selected {@link ServiceInstance}
*/
public ServiceInstance choose(String serviceId, Object hint) {
//关注获取Server方法
Server server = getServer(getLoadBalancer(serviceId), hint);
if (server == null) {
return null;
}
return new RibbonServer(serviceId, server, isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
}
ZoneAwareLoadBalancer
负载均衡器加入了区域的概念,本地调试直接进入父类
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
//调用父类选择服务方法
return super.chooseServer(key);
}
Server server = null;
...省略部分代码
}
BaseLoadBalancer
根据负载规则选择一个合适的服务
/*
* Get the alive server dedicated to key
*
* @return the dedicated server
*/
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
//核心方法
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
PredicateBasedRule
ZoneAvoidanceRule使用父类的choose方法
/**
* Get a server by calling {@link AbstractServerPredicate#chooseRandomlyAfterFiltering(java.util.List, Object)}.
* The performance for this method is O(n) where n is number of servers to be filtered.
*/
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
//关注getAllServers方法
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
BaseLoadBalancer
获取服务最终的地方 是一个本地List缓存
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
@Override
public List<Server> getAllServers() {
return Collections.unmodifiableList(allServerList);
}
接下来只需要搞清楚allServerList 值是如何赋进去的即可
DynamicServerListLoadBalancer
负载均衡核心类,动态获取实例
// 通过构造方法初始化参数
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
//此处开启了一个轮训任务
restOfInit(clientConfig);
}
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
//开启轮训任务 serverListUpdater.start(updateAction);
enableAndInitLearnNewServersFeature();
//初始化的时候先立刻更新一次本地缓存
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
// 实现一个更新方法
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
//更新allServerList 本地缓存方法
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
// 真正意义上去NacosServerList 获取Nacos 服务端管理的服务实例
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
//
PollingServerListUpdater
负载任务寻轮的类,其时间配置也在这里
//通过构造方法 赋值 LISTOFSERVERS_CACHE_UPDATE_DELAY 也就是寻轮的间隔时间,可通过配置ribbon.ServerListRefreshInterval 来调整轮训的时间
public PollingServerListUpdater(IClientConfig clientConfig) {
this(LISTOFSERVERS_CACHE_UPDATE_DELAY, getRefreshIntervalMs(clientConfig));
}
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
//执行 DynamicServerListLoadBalancer 里的doUpdate方法
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
Nacos 获取服务列表
接着上面的 updateListOfServers 里面的getUpdatedListOfServers() ,方法将转向Nacos
NacosServerList
// 获取Nacos服务端注册过来的服务列表
private List<NacosServer> getServers() {
try {
String group = discoveryProperties.getGroup();
//主要关注selectInstances 方法
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, group, true);
return instancesToServerList(instances);
}
catch (Exception e) {
throw new IllegalStateException(
"Can not get service instances from nacos, serviceId=" + serviceId,
e);
}
}
NacosNamingService
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
if (subscribe) {
//这里采用的是订阅的方式,将从serviceInfoMap中获取,详细见下个方法
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
}
return selectInstances(serviceInfo, healthy);
}
HostReactor
此方法可以看到Nacos 这里获取服务端列表也是从serviceInfoMap 本地缓存里面取,接下来看serviceInfoMap的值是什么时候赋进去的
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
//从 serviceInfoMap中获取服务实例
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
//此处会开启一个轮训更新 serviceInfoMap
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
//寻轮的定时任务
@Override
public void run() {
long delayTime = DEFAULT_DELAY;
try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
if (serviceObj == null) {
updateService(serviceName, clusters);
return;
}
if (serviceObj.getLastRefTime() <= lastRefTime) {
//通过api 真正意义去Nacos 服务端查询最新的实例,并更新本地缓存
updateService(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}
lastRefTime = serviceObj.getLastRefTime();
if (!notifier.isSubscribed(serviceName, clusters) && !futureMap
.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
incFailCount();
return;
}
delayTime = serviceObj.getCacheMillis();
resetFailCount();
} catch (Throwable e) {
incFailCount();
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
} finally {
//此处可以看出在不报错的情况下 每隔10s执行一次,cacheMillis 是Nacos 服务端返回的
,且系统参数设置 不能小于10s,可以用过openApi设置
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
}
}
从上面可以看出会有个定时任务每隔10s 去轮训一次实例,且10s 不可设置更小,这个定时任务只是用来对账的,Nacos1.x 采用订阅的时候会有 启用一个udp长连接,当服务端有变化会立刻通知到服务端,接下来看 Nacos客户端接收服务端消息的代码
PushReceiver
接收服务端消息
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
String udpPort = getPushReceiverUdpPort();
//构造一个udpSocket监听,此时并没有socketIp,后面会通过查询实例把udp id传入服务端
if (StringUtils.isEmpty(udpPort)) {
this.udpSocket = new DatagramSocket();
} else {
this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
}
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});
//开始执行
this.executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}
//监听方法
@Override
public void run() {
while (!closed) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
udpSocket.receive(packet);
//接收最新的服务实例消息
String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
//此处更新了 serviceInfoMap
hostReactor.processServiceJson(pushPacket.data);
// send ack to server
ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"\"}";
} else if ("dump".equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
+ "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}
udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
packet.getSocketAddress()));
} catch (Exception e) {
if (closed) {
return;
}
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
NamingProxy
此类主要封装了通过API 和服务端交互的方法,主要看下queryList udp的ip是如何传入的,这里会有坑
/**
* Query instance list.
*
* @param serviceName service name
* @param clusters clusters
* @param udpPort udp port
* @param healthyOnly healthy only
* @return instance list
* @throws NacosException nacos exception
*/
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
//这里获取了clientIp,这个ip可以通过 com.alibaba.nacos.client.naming.local.ip 环境变量配置
若不配置则采用InetAddress.getLocalHost().getHostAddress() 通过hostName 取对应的IP,
所以多IP的情况下,订阅到服务端的IP 可能不是互通的,可以通过Nacos控制台查看订阅的IP这里
建议 修改HostName对应的IP 与Naocs服务端是互通的或者通过环境变量指定
com.alibaba.nacos.client.naming.local.ip
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}
总结
到此为止 有两种方法可以更新到Nacos客户端本地的 serviceInfoMap,只需要保证服务端的实例下线之后可以立马通知过来,Gateway拿到的实例就是最新的,这样就可以达到一个秒级的下线效果。那么如何保证客户端下线之后服务端能够立马感知到呢,这里可以通过三个参数配置
heartBeatInterval 心跳监测的时间
heartBeatTimeout 心跳监测超时时间
ipDeleteTimeout 删除服务实例的时间
如果服务端停止了 想要Nacos服务端立马下线需要配置这三个参数时间尽量小,但是这样无疑会增加服务端的负担,不停的发送心跳监测,从而影响正常的业务,我们一般都是在发布服务 热部署的时候需要让客户尽量无感知的过度且不影响其他实例正常执行业务,我们在通过脚本发布的时候 先kill 进程同时执行openApi 删除实例的方法,curl -X DELETE ‘127.0.0.1:8848/nacos/v1/ns/instance?serviceName=nacos.test.1&ip=1.1.1.1&port=8888&clusterName=TEST1’,来达到Nacos服务立刻下线并通过UDP通知到客户端更新serviceInfoMap。来达到Nacos秒级下线的效果
评论区