更新時(shí)間:2021年03月26日13時(shí)53分 來(lái)源:傳智教育 瀏覽次數(shù):
因?yàn)槲⒎?wù)是目前互聯(lián)網(wǎng)公司比較流行的架構(gòu),所以spring就提供了一個(gè)頂級(jí)框架-spring cloud,來(lái)解決我們?cè)陂_(kāi)發(fā)微服務(wù)架構(gòu)中遇到的各種各樣的問(wèn)題,今天的主角是spring cloud 框架中集成的組件Ribbon,那么Ribbon能解決什么問(wèn)題呢,我們來(lái)思考下面的問(wèn)題。
微服務(wù)架構(gòu)中的每個(gè)服務(wù)為了高可用,很大程度上都會(huì)進(jìn)行集群,我們假設(shè)現(xiàn)在集群了3個(gè)user服務(wù),同時(shí)能提供相同的服務(wù),問(wèn)題來(lái)了,我們?nèi)绾螞Q定調(diào)用這3個(gè)user服務(wù)中的哪一個(gè)呢?
根據(jù)不同分析角度,會(huì)有不同的答案,也可以理解為根據(jù)不同的情況,我們可以寫(xiě)不同的算法,來(lái)決定到底此時(shí)此刻,調(diào)用這3個(gè)user服務(wù)的哪一個(gè),那么,Ribbon就給我們提供了不同的算法,我們可以根據(jù)業(yè)務(wù)場(chǎng)景,調(diào)整配置文件,決定到底使用哪個(gè)算法,這樣,算法中就會(huì)計(jì)算出調(diào)用哪個(gè)user服務(wù)了。
1)我們準(zhǔn)備一個(gè)eureka注冊(cè)中心
2)再準(zhǔn)備一個(gè)order服務(wù)
3)再準(zhǔn)備3個(gè)相同代碼的user服務(wù),這樣,order服務(wù)通過(guò)eureka注冊(cè)中心,就可以發(fā)現(xiàn)user的3個(gè)服務(wù)
Ribbon是通過(guò)IRule的這個(gè)接口來(lái)選擇3個(gè)user服務(wù)中的哪個(gè)的,但是實(shí)際執(zhí)行的代碼肯定是繼承了這個(gè)接口的實(shí)現(xiàn)類,所以選擇不同的實(shí)現(xiàn)類,就會(huì)選擇不同負(fù)載均衡策略
public interface IRule { Server choose(Object var1); void setLoadBalancer(ILoadBalancer var1); ILoadBalancer getLoadBalancer(); }
此策略是Ribbon的默認(rèn)策略,是按照順序,依次對(duì)所有的user服務(wù)進(jìn)行訪問(wèn)。
通過(guò)重寫(xiě)IRule的choose方法,來(lái)選擇并返回決定調(diào)用的user服務(wù),在下面的源碼中,List
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } else { Server server = null; int count = 0; while(true) { if (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers(); List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); //總服務(wù)實(shí)例數(shù)量 int serverCount = allServers.size(); if (upCount != 0 && serverCount != 0) { int nextServerIndex = this.incrementAndGetModulo(serverCount); server = (Server)allServers.get(nextServerIndex); if (server == null) { Thread.yield(); } else { if (server.isAlive() && server.isReadyToServe()) { return server; } server = null; } continue; } log.warn("No up servers available from load balancer: " + lb); return null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; } } }
debug的圖例:
第一次訪問(wèn):訪問(wèn)的是第一個(gè)實(shí)例
第二次訪問(wèn):訪問(wèn)的是第二個(gè)實(shí)例
就和這個(gè)策略的名字一樣,是對(duì)user的3個(gè)服務(wù)的隨機(jī)調(diào)用,所以不存在規(guī)律,如下源碼中int index = this.chooseRandomInt(serverCount); 通過(guò)隨機(jī)數(shù)來(lái)選擇下標(biāo),所以對(duì)user服務(wù)的調(diào)用是隨機(jī)的
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } else { Server server = null; while(server == null) { if (Thread.interrupted()) { return null; } List<Server> upList = lb.getReachableServers(); List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int index = this.chooseRandomInt(serverCount); server = (Server)upList.get(index); if (server == null) { Thread.yield(); } else { if (server.isAlive()) { return server; } server = null; Thread.yield(); } } return server; } }
debug的圖例:
第一次訪問(wèn):訪問(wèn)的是第一個(gè)實(shí)例
第二次訪問(wèn):訪問(wèn)的還是第一個(gè)實(shí)例
第三次訪問(wèn):訪問(wèn)的是第三個(gè)實(shí)例
根據(jù)user的3個(gè)服務(wù)的響應(yīng)時(shí)間來(lái)分配權(quán)重,響應(yīng)時(shí)間越長(zhǎng)的服務(wù),權(quán)重越低,那么被調(diào)用的概率也就越低。相反,響應(yīng)時(shí)間越短的服務(wù),權(quán)重越高,被調(diào)用的概率也就越高
響應(yīng)時(shí)間加權(quán)重策略的實(shí)現(xiàn)分為兩步:
extends RoundRobinRule
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } else { Server server = null; while(server == null) { List<Double> currentWeights = this.accumulatedWeights; if (Thread.interrupted()) { return null; } List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int serverIndex = 0; double maxTotalWeight = currentWeights.size() == 0 ? 0.0D : (Double)currentWeights.get(currentWeights.size() - 1); //在30秒之內(nèi),maxTotalWeight變量會(huì)一直是0.0 if (maxTotalWeight >= 0.001D && serverCount == currentWeights.size()) { double randomWeight = this.random.nextDouble() * maxTotalWeight; int n = 0; for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) { Double d = (Double)var13.next(); if (d >= randomWeight) { serverIndex = n; break; } } server = (Server)allList.get(serverIndex); } else { server = super.choose(this.getLoadBalancer(), key); if (server == null) { return server; } } if (server == null) { Thread.yield(); } else { if (server.isAlive()) { return server; } server = null; } } return server; } }
debug的圖例:
前幾次訪問(wèn):maxTotalWeight都是0.0,使用輪詢策略,但是開(kāi)始緩存權(quán)重?cái)?shù)據(jù)
30秒之后:開(kāi)始根據(jù)權(quán)重?cái)?shù)據(jù)來(lái)分配權(quán)重,選擇實(shí)例
如下圖:8081端口的權(quán)重顯然沒(méi)有8082的權(quán)重大,所以8082端口的user服務(wù)實(shí)例被訪問(wèn)的次數(shù)多
重試策略是指通過(guò)輪詢策略選出一個(gè)實(shí)例,然后去訪問(wèn),如果此實(shí)例為null或者已經(jīng)失效,那么會(huì)重試其他的實(shí)例,answer = this.subRule.choose(key); 會(huì)根據(jù)輪詢策略選擇一個(gè)實(shí)例,然后if ((answer == null || !answer.isAlive()) && System.currentTimeMillis() < deadline)判斷如果實(shí)例為null或者失效,那么會(huì)重新選擇
public Server choose(ILoadBalancer lb, Object key) { long requestTime = System.currentTimeMillis(); long deadline = requestTime + this.maxRetryMillis; Server answer = null; answer = this.subRule.choose(key); if ((answer == null || !answer.isAlive()) && System.currentTimeMillis() < deadline) { InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis()); while(!Thread.interrupted()) { answer = this.subRule.choose(key); if (answer != null && answer.isAlive() || System.currentTimeMillis() >= deadline) { break; } Thread.yield(); } task.cancel(); } return answer != null && answer.isAlive() ? answer : null; }
會(huì)根據(jù)每個(gè)服務(wù)實(shí)例的并發(fā)數(shù)量來(lái)決定,訪問(wèn)并發(fā)數(shù)最少的那個(gè)服務(wù),int concurrentConnections = serverStats.getActiveRequestsCount(currentTime); 會(huì)獲得當(dāng)前遍歷的實(shí)例的并發(fā)數(shù),然后和其他的實(shí)例的并發(fā)數(shù)進(jìn)行判斷,最終訪問(wèn)并發(fā)量最少的那個(gè)實(shí)例
public Server choose(Object key) { if (this.loadBalancerStats == null) { return super.choose(key); } else { List<Server> serverList = this.getLoadBalancer().getAllServers(); int minimalConcurrentConnections = 2147483647; long currentTime = System.currentTimeMillis(); Server chosen = null; Iterator var7 = serverList.iterator(); while(var7.hasNext()) { //遍歷所有的實(shí)例 Server server = (Server)var7.next(); ServerStats serverStats = this.loadBalancerStats.getSingleServerStat(server); if (!serverStats.isCircuitBreakerTripped(currentTime)) { int concurrentConnections = serverStats.getActiveRequestsCount(currentTime); //判斷并發(fā)數(shù),并和已經(jīng)判斷出的最少的并發(fā)數(shù)比較 if (concurrentConnections < minimalConcurrentConnections) { minimalConcurrentConnections = concurrentConnections; chosen = server; } } } if (chosen == null) { return super.choose(key); } else { return chosen; } } }
此策略會(huì)聰明的過(guò)濾掉一直失敗并被標(biāo)記為circuit tripped的user服務(wù),而且會(huì)過(guò)濾掉那些高并發(fā)的user服務(wù)
public Server choose(Object key) { int count = 0; for(Server server = this.roundRobinRule.choose(key); count++ <= 10; server = this.roundRobinRule.choose(key)) { //通過(guò)predicate來(lái)過(guò)濾 if (this.predicate.apply(new PredicateKey(server))) { return server; } } //過(guò)濾掉一些服務(wù)之后,會(huì)采用輪詢的方式調(diào)用剩下的服務(wù) return super.choose(key); }
此策略本身并沒(méi)有實(shí)現(xiàn)什么特殊的處理邏輯,但是可以通過(guò)重置LoadBalancer來(lái)達(dá)到自定義一些高級(jí)策略的目的,可以重寫(xiě)initWithNiwsConfig和setLoadBalancer
public void initWithNiwsConfig(IClientConfig clientConfig) { this.roundRobinRule = new RoundRobinRule(); } public void setLoadBalancer(ILoadBalancer lb) { super.setLoadBalancer(lb); this.roundRobinRule.setLoadBalancer(lb); } public Server choose(Object key) { if (this.roundRobinRule != null) { return this.roundRobinRule.choose(key); } else { throw new IllegalArgumentException("This class has not been initialized with the RoundRobinRule class"); } }
猜你喜歡:
Spring Cloud是什么?怎么理解Spring Cloud?
北京校區(qū)