Java 項目中使用 Resilience4j 框架實現(xiàn)故障隔離


到目前為止,在本系列中,我們已經(jīng)了解了 Resilience4j 及其?[Retry](https://icodewalker.com/blog/261/),?[RateLimiter](https://icodewalker.com/blog/288/)?和?[TimeLimiter](https://icodewalker.com/blog/302/)?模塊。在本文中,我們將探討 Bulkhead 模塊。我們將了解它解決了什么問題,何時以及如何使用它,并查看一些示例。
代碼示例
本文附有?[GitHub 上](https://github.com/thombergs/code-examples/tree/master/resilience4j/bulkhead)的工作代碼示例。
什么是 Resilience4j?
請參閱上一篇文章中的描述,快速了解?[Resilience4j 的一般工作原理](https://icodewalker.com/blog/261/#what-is-resilience4j)。
什么是故障隔離?
幾年前,我們遇到了一個生產(chǎn)問題,其中一臺服務(wù)器停止響應(yīng)健康檢查,負(fù)載均衡器將服務(wù)器從池中取出。
就在我們開始調(diào)查這個問題的時候,還有第二個警報——另一臺服務(wù)器已經(jīng)停止響應(yīng)健康檢查,也被從池中取出。
幾分鐘后,每臺服務(wù)器都停止響應(yīng)健康探測,我們的服務(wù)完全關(guān)閉。
我們使用 Redis 為應(yīng)用程序支持的幾個功能緩存一些數(shù)據(jù)。正如我們后來發(fā)現(xiàn)的那樣,Redis 集群同時出現(xiàn)了一些問題,它已停止接受新連接。我們使用 Jedis 庫連接到 Redis,該庫的默認(rèn)行為是無限期地阻塞調(diào)用線程,直到建立連接。
我們的服務(wù)托管在 Tomcat 上,它的默認(rèn)請求處理線程池大小為 200 個線程。因此,通過連接到 Redis 的代碼路徑的每個請求最終都會無限期地阻塞線程。
幾分鐘之內(nèi),集群中的所有 2000 個線程都無限期地阻塞了——甚至沒有空閑線程來響應(yīng)負(fù)載均衡器的健康檢查。
該服務(wù)本身支持多項功能,并非所有功能都需要訪問 Redis 緩存。但是當(dāng)這一方面出現(xiàn)問題時,它最終影響了整個服務(wù)。
這正是故障隔離要解決的問題——它可以防止某個服務(wù)區(qū)域的問題影響整個服務(wù)。
雖然我們的服務(wù)發(fā)生的事情是一個極端的例子,但我們可以看到緩慢的上游依賴如何影響調(diào)用服務(wù)的不相關(guān)區(qū)域。
如果我們在每個服務(wù)器實例上對 Redis 設(shè)置了 20 個并發(fā)請求的限制,那么當(dāng) Redis 連接問題發(fā)生時,只有這些線程會受到影響。剩余的請求處理線程可以繼續(xù)為其他請求提供服務(wù)。
故障隔離背后的想法是對我們對遠(yuǎn)程服務(wù)進(jìn)行的并發(fā)調(diào)用數(shù)量設(shè)置限制。我們將對不同遠(yuǎn)程服務(wù)的調(diào)用視為不同的、隔離的池,并對可以同時進(jìn)行的調(diào)用數(shù)量設(shè)置限制。
術(shù)語艙壁本身來自它在船舶中的使用,其中船舶的底部被分成彼此分開的部分。如果有裂縫,并且水開始流入,則只有該部分會充滿水。這可以防止整艘船沉沒。
Resilience4j 隔板概念
resilience4j-bulkhead?的工作原理類似于其他 Resilience4j 模塊。我們?yōu)樗峁┝宋覀兿胍鳛楹瘮?shù)構(gòu)造執(zhí)行的代碼——一個進(jìn)行遠(yuǎn)程調(diào)用的 lambda 表達(dá)式或一個從遠(yuǎn)程服務(wù)中檢索到的某個值的?Supplier,等等——并且隔板用代碼裝飾它以控制并發(fā)調(diào)用數(shù)。
Resilience4j 提供兩種類型的隔板 -?SemaphoreBulkhead?和?ThreadPoolBulkhead。
SemaphoreBulkhead?內(nèi)部使用?java.util.concurrent.Semaphore?來控制并發(fā)調(diào)用的數(shù)量并在當(dāng)前線程上執(zhí)行我們的代碼。
ThreadPoolBulkhead?使用線程池中的一個線程來執(zhí)行我們的代碼。它內(nèi)部使用?java.util.concurrent.ArrayBlockingQueue?和?java.util.concurrent.ThreadPoolExecutor?來控制并發(fā)調(diào)用的數(shù)量。
SemaphoreBulkhead
讓我們看看與信號量隔板相關(guān)的配置及其含義。
maxConcurrentCalls?確定我們可以對遠(yuǎn)程服務(wù)進(jìn)行的最大并發(fā)調(diào)用數(shù)。我們可以將此值視為初始化信號量的許可數(shù)。
任何嘗試超過此限制調(diào)用遠(yuǎn)程服務(wù)的線程都可以立即獲得?BulkheadFullException?或等待一段時間以等待另一個線程釋放許可。這由?maxWaitDuration?值決定。
當(dāng)有多個線程在等待許可時,fairCallHandlingEnabled?配置確定等待的線程是否以先進(jìn)先出的順序獲取許可。
最后,?writableStackTraceEnabled?配置讓我們可以在?BulkheadFullException?發(fā)生時減少堆棧跟蹤中的信息量。這很有用,因為如果沒有它,當(dāng)異常多次發(fā)生時,我們的日志可能會充滿許多類似的信息。通常在讀取日志時,只知道發(fā)生了?BulkheadFullException?就足夠了。
ThreadPoolBulkhead
coreThreadPoolSize?、?maxThreadPoolSize?、?keepAliveDuration?和?queueCapacity?是與?ThreadPoolBulkhead?相關(guān)的主要配置。ThreadPoolBulkhead?內(nèi)部使用這些配置來構(gòu)造一個 ThreadPoolExecutor。
internalThreadPoolExecutor?使用可用的空閑線程之一執(zhí)行傳入的任務(wù)。 如果沒有線程可以自由執(zhí)行傳入的任務(wù),則該任務(wù)將排隊等待線程可用時稍后執(zhí)行。如果已達(dá)到?queueCapacity,則遠(yuǎn)程調(diào)用將被拒絕并返回?BulkheadFullException。
ThreadPoolBulkhead?也有?writableStackTraceEnabled?配置來控制?BulkheadFullException?的堆棧跟蹤中的信息量。
使用 Resilience4j 隔板模塊
讓我們看看如何使用?resilience4j-bulkhead?模塊中可用的各種功能。
我們將使用與本系列前幾篇文章相同的示例。假設(shè)我們正在為一家航空公司建立一個網(wǎng)站,以允許其客戶搜索和預(yù)訂航班。我們的服務(wù)與?FlightSearchService?類封裝的遠(yuǎn)程服務(wù)對話。
SemaphoreBulkhead
使用基于信號量的隔板時,BulkheadRegistry、BulkheadConfig?和?Bulkhead?是我們使用的主要抽象。
BulkheadRegistry?是一個用于創(chuàng)建和管理?Bulkhead?對象的工廠。
BulkheadConfig?封裝了?maxConcurrentCalls、maxWaitDuration、writableStackTraceEnabled?和?fairCallHandlingEnabled?配置。每個 Bulkhead 對象都與一個?BulkheadConfig?相關(guān)聯(lián)。
第一步是創(chuàng)建一個?BulkheadConfig:
BulkheadConfig config = BulkheadConfig.ofDefaults();
這將創(chuàng)建一個?BulkheadConfig,其默認(rèn)值為?maxConcurrentCalls(25)、maxWaitDuration(0s)、writableStackTraceEnabled(true)?和?fairCallHandlingEnabled(true)。
假設(shè)我們希望將并發(fā)調(diào)用的數(shù)量限制為 2,并且我們愿意等待 2 秒讓線程獲得許可:
BulkheadConfig config = BulkheadConfig.custom()
?.maxConcurrentCalls(2)
?.maxWaitDuration(Duration.ofSeconds(2))
?.build();
然后我們創(chuàng)建一個?Bulkhead:
BulkheadRegistry registry = BulkheadRegistry.of(config);Bulkhead bulkhead = registry.bulkhead("flightSearchService");
現(xiàn)在讓我們表達(dá)我們的代碼以作為?Supplier?運行航班搜索并使用?bulkhead?裝飾它:
BulkheadRegistry registry = BulkheadRegistry.of(config);Bulkhead bulkhead = registry.bulkhead("flightSearchService");
最后,讓我們調(diào)用幾次裝飾操作來了解隔板的工作原理。我們可以使用?CompletableFuture?來模擬來自用戶的并發(fā)航班搜索請求:
for (int i=0; i<4; i++) { ?CompletableFuture
? ?.supplyAsync(decoratedFlightsSupplier) ? ?.thenAccept(flights -> System.out.println("Received results"));
}
輸出中的時間戳和線程名稱顯示,在 4 個并發(fā)請求中,前兩個請求立即通過:
Searching for flights; current time = 11:42:13 187; current thread = ForkJoinPool.commonPool-worker-3Searching for flights; current time = 11:42:13 187; current thread = ForkJoinPool.commonPool-worker-5Flight search successful at 11:42:13 226Flight search successful at 11:42:13 226Received resultsReceived resultsSearching for flights; current time = 11:42:14 239; current thread = ForkJoinPool.commonPool-worker-9Searching for flights; current time = 11:42:14 239; current thread = ForkJoinPool.commonPool-worker-7Flight search successful at 11:42:14 239Flight search successful at 11:42:14 239Received resultsReceived results
第三個和第四個請求僅在 1 秒后就能夠獲得許可,在之前的請求完成之后。
如果線程無法在我們指定的 2s?maxWaitDuration?內(nèi)獲得許可,則會拋出?BulkheadFullException:
Caused by: io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further calls
? ?at io.github.resilience4j.bulkhead.BulkheadFullException.createBulkheadFullException(BulkheadFullException.java:49)
? ?at io.github.resilience4j.bulkhead.internal.SemaphoreBulkhead.acquirePermission(SemaphoreBulkhead.java:164)
? ?at io.github.resilience4j.bulkhead.Bulkhead.lambda$decorateSupplier$5(Bulkhead.java:194)
? ?at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
? ?... 6 more
除了第一行,堆棧跟蹤中的其他行沒有增加太多價值。如果?BulkheadFullException?發(fā)生多次,這些堆棧跟蹤行將在我們的日志文件中重復(fù)。
我們可以通過將?writableStackTraceEnabled?配置設(shè)置為?false?來減少堆棧跟蹤中生成的信息量:
BulkheadConfig config = BulkheadConfig.custom()
? ?.maxConcurrentCalls(2)
? ?.maxWaitDuration(Duration.ofSeconds(1))
? ?.writableStackTraceEnabled(false)
.build();
現(xiàn)在,當(dāng)?BulkheadFullException?發(fā)生時,堆棧跟蹤中只存在一行:
Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-3Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-5io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further callsFlight search successful at 12:27:58 699Flight search successful at 12:27:58 699Received resultsReceived results
與我們見過的其他 Resilience4j 模塊類似,Bulkhead?還提供了額外的方法,如?decorateCheckedSupplier()、decorateCompletionStage()、decorateRunnable()、decorateConsumer()?等,因此我們可以在?Supplier?供應(yīng)商之外的其他結(jié)構(gòu)中提供我們的代碼。
ThreadPoolBulkhead
當(dāng)使用基于線程池的隔板時,?ThreadPoolBulkheadRegistry、ThreadPoolBulkheadConfig?和?ThreadPoolBulkhead?是我們使用的主要抽象。
ThreadPoolBulkheadRegistry?是用于創(chuàng)建和管理?ThreadPoolBulkhead?對象的工廠。
ThreadPoolBulkheadConfig?封裝了?coreThreadPoolSize?、?maxThreadPoolSize、?keepAliveDuration?和?queueCapacity?配置。每個?ThreadPoolBulkhead?對象都與一個?ThreadPoolBulkheadConfig?相關(guān)聯(lián)。
第一步是創(chuàng)建一個?ThreadPoolBulkheadConfig:
ThreadPoolBulkheadConfig config =
?ThreadPoolBulkheadConfig.ofDefaults();
這將創(chuàng)建一個?ThreadPoolBulkheadConfig,其默認(rèn)值為?coreThreadPoolSize(可用處理器數(shù)量 - 1)、maxThreadPoolSize(可用處理器最大數(shù)量)、keepAliveDuration(20ms)和?queueCapacity(100)。
假設(shè)我們要將并發(fā)調(diào)用的數(shù)量限制為 2:
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
?.maxThreadPoolSize(2)
?.coreThreadPoolSize(1)
?.queueCapacity(1)
?.build();
然后我們創(chuàng)建一個?ThreadPoolBulkhead:
ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config);ThreadPoolBulkhead bulkhead = registry.bulkhead("flightSearchService");
現(xiàn)在讓我們表達(dá)我們的代碼以作為?Supplier?運行航班搜索并使用?bulkhead?裝飾它:
Supplier<List<Flight>> flightsSupplier = ?() -> service.searchFlightsTakingOneSecond(request);
Supplier<CompletionStage<List<Flight>>> decoratedFlightsSupplier =
?ThreadPoolBulkhead.decorateSupplier(bulkhead, flightsSupplier);
與返回一個?Supplier<List<Flight>>?的?SemaphoreBulkhead.decorateSupplier()?不同,ThreadPoolBulkhead.decorateSupplier()?返回一個?Supplier<CompletionStage<List<Flight>>。這是因為?ThreadPoolBulkHead?不會在當(dāng)前線程上同步執(zhí)行代碼。
最后,讓我們調(diào)用幾次裝飾操作來了解隔板的工作原理:
for (int i=0; i<3; i++) { ?decoratedFlightsSupplier
? ?.get() ? ?.whenComplete((r,t) -> { ? ? ?if (r != null) { ? ? ? ?System.out.println("Received results");
? ? ?} ? ? ?if (t != null) { ? ? ? ?t.printStackTrace();
? ? ?}
? ?});
}
輸出中的時間戳和線程名稱顯示,雖然前兩個請求立即執(zhí)行,但第三個請求已排隊,稍后由釋放的線程之一執(zhí)行:
Searching for flights; current time = 16:15:00 097; current thread = bulkhead-flightSearchService-1Searching for flights; current time = 16:15:00 097; current thread = bulkhead-flightSearchService-2Flight search successful at 16:15:00 136Flight search successful at 16:15:00 135Received resultsReceived resultsSearching for flights; current time = 16:15:01 151; current thread = bulkhead-flightSearchService-2Flight search successful at 16:15:01 151Received results
如果隊列中沒有空閑線程和容量,則拋出?BulkheadFullException:
Exception in thread "main" io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further calls
at io.github.resilience4j.bulkhead.BulkheadFullException.createBulkheadFullException(BulkheadFullException.java:64)
at io.github.resilience4j.bulkhead.internal.FixedThreadPoolBulkhead.submit(FixedThreadPoolBulkhead.java:157)
... other lines omitted ...
我們可以使用?writableStackTraceEnabled?配置來減少堆棧跟蹤中生成的信息量:
ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom()
?.maxThreadPoolSize(2)
?.coreThreadPoolSize(1)
?.queueCapacity(1)
?.writableStackTraceEnabled(false)
?.build();
現(xiàn)在,當(dāng)?BulkheadFullException?發(fā)生時,堆棧跟蹤中只存在一行:
Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-3Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-5io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further callsFlight search successful at 12:27:58 699Flight search successful at 12:27:58 699Received resultsReceived results
上下文傳播
有時我們將數(shù)據(jù)存儲在?ThreadLocal?變量中并在代碼的不同區(qū)域中讀取它。我們這樣做是為了避免在方法鏈之間顯式地將數(shù)據(jù)作為參數(shù)傳遞,尤其是當(dāng)該值與我們正在實現(xiàn)的核心業(yè)務(wù)邏輯沒有直接關(guān)系時。
例如,我們可能希望將當(dāng)前用戶 ID 或事務(wù) ID 或某個請求跟蹤 ID 記錄到每個日志語句中,以便更輕松地搜索日志。對于此類場景,使用?ThreadLocal?是一種有用的技術(shù)。
使用?ThreadPoolBulkhead?時,由于我們的代碼不在當(dāng)前線程上執(zhí)行,因此我們存儲在?ThreadLocal?變量中的數(shù)據(jù)在其他線程中將不可用。
讓我們看一個例子來理解這個問題。首先我們定義一個?RequestTrackingIdHolder?類,一個圍繞?ThreadLocal?的包裝類:
class RequestTrackingIdHolder { ?static ThreadLocal<String> threadLocal = new ThreadLocal<>(); ?static String getRequestTrackingId() { ? ?return threadLocal.get();
?} ?static void setRequestTrackingId(String id) { ? ?if (threadLocal.get() != null) {
? ? ?threadLocal.set(null);
? ? ?threadLocal.remove();
? ?}
? ?threadLocal.set(id);
?} ?static void clear() {
? ?threadLocal.set(null);
? ?threadLocal.remove();
?}
}
靜態(tài)方法可以輕松設(shè)置和獲取存儲在?ThreadLocal?上的值。我們接下來在調(diào)用隔板裝飾的航班搜索操作之前設(shè)置一個請求跟蹤 ID:
for (int i=0; i<2; i++) {
?String trackingId = UUID.randomUUID().toString();
?System.out.println("Setting trackingId " + trackingId + " on parent, main thread before calling flight search");
?RequestTrackingIdHolder.setRequestTrackingId(trackingId);
?decoratedFlightsSupplier
? ?.get()
? ?.whenComplete((r,t) -> { ? ? ? ?// other lines omitted
? ?});
}
示例輸出顯示此值在隔板管理的線程中不可用:
Setting trackingId 98ff99df-466a-47f7-88f7-5e31fc8fcb6b on parent, main thread before calling flight searchSetting trackingId 6b98d73c-a590-4a20-b19d-c85fea783caf on parent, main thread before calling flight searchSearching for flights; current time = 19:53:53 799; current thread = bulkhead-flightSearchService-1; Request Tracking Id = nullFlight search successful at 19:53:53 824Received resultsSearching for flights; current time = 19:53:54 836; current thread = bulkhead-flightSearchService-1; Request Tracking Id = nullFlight search successful at 19:53:54 836Received results
為了解決這個問題,ThreadPoolBulkhead?提供了一個?ContextPropagator。ContextPropagator?是一種用于跨線程邊界檢索、復(fù)制和清理值的抽象。它定義了一個接口,其中包含從當(dāng)前線程 (retrieve()) 獲取值、將其復(fù)制到新的執(zhí)行線程 (copy()) 并最終在執(zhí)行線程 (clear()) 上進(jìn)行清理的方法。
讓我們實現(xiàn)一個?RequestTrackingIdPropagator:
class RequestTrackingIdPropagator implements ContextPropagator { ?
?public Supplier<Optional> retrieve() {
? ?System.out.println("Getting request tracking id from thread: " + Thread.currentThread().getName()); ? ?return () -> Optional.of(RequestTrackingIdHolder.getRequestTrackingId());
?} ?
?Consumer<Optional> copy() { ? ?return optional -> {
? ? ?System.out.println("Setting request tracking id " + optional.get() + " on thread: " + Thread.currentThread().getName());
? ? ?optional.ifPresent(s -> RequestTrackingIdHolder.setRequestTrackingId(s.toString()));
? ?};
?} ?
?Consumer<Optional> clear() { ? ?return optional -> {
? ? ?System.out.println("Clearing request tracking id on thread: " + Thread.currentThread().getName());
? ? ?optional.ifPresent(s -> RequestTrackingIdHolder.clear());
? ?};
?}
}
我們通過在?ThreadPoolBulkheadConfig?上的設(shè)置來為?ThreadPoolBulkhead?提供?ContextPropagator:
class RequestTrackingIdPropagator implements ContextPropagator { ?
?public Supplier<Optional> retrieve() {
? ?System.out.println("Getting request tracking id from thread: " + Thread.currentThread().getName()); ? ?return () -> Optional.of(RequestTrackingIdHolder.getRequestTrackingId());
?} ?
?Consumer<Optional> copy() { ? ?return optional -> {
? ? ?System.out.println("Setting request tracking id " + optional.get() + " on thread: " + Thread.currentThread().getName());
? ? ?optional.ifPresent(s -> RequestTrackingIdHolder.setRequestTrackingId(s.toString()));
? ?};
?} ?
?Consumer<Optional> clear() { ? ?return optional -> {
? ? ?System.out.println("Clearing request tracking id on thread: " + Thread.currentThread().getName());
? ? ?optional.ifPresent(s -> RequestTrackingIdHolder.clear());
? ?};
?}
}
現(xiàn)在,示例輸出顯示請求跟蹤 ID 在隔板管理的線程中可用:
Setting trackingId 71d44cb8-dab6-4222-8945-e7fd023528ba on parent, main thread before calling flight searchGetting request tracking id from thread: mainSetting trackingId 5f9dd084-f2cb-4a20-804b-038828abc161 on parent, main thread before calling flight searchGetting request tracking id from thread: mainSetting request tracking id 71d44cb8-dab6-4222-8945-e7fd023528ba on thread: bulkhead-flightSearchService-1Searching for flights; current time = 20:07:56 508; current thread = bulkhead-flightSearchService-1; Request Tracking Id = 71d44cb8-dab6-4222-8945-e7fd023528baFlight search successful at 20:07:56 538Clearing request tracking id on thread: bulkhead-flightSearchService-1Received resultsSetting request tracking id 5f9dd084-f2cb-4a20-804b-038828abc161 on thread: bulkhead-flightSearchService-1Searching for flights; current time = 20:07:57 542; current thread = bulkhead-flightSearchService-1; Request Tracking Id = 5f9dd084-f2cb-4a20-804b-038828abc161Flight search successful at 20:07:57 542Clearing request tracking id on thread: bulkhead-flightSearchService-1Received results
Bulkhead事件
Bulkhead?和?ThreadPoolBulkhead?都有一個?EventPublisher?來生成以下類型的事件:
BulkheadOnCallPermittedEvent
BulkheadOnCallRejectedEvent?和
BulkheadOnCallFinishedEvent
我們可以監(jiān)聽這些事件并記錄它們,例如:
Bulkhead bulkhead = registry.bulkhead("flightSearchService");
bulkhead.getEventPublisher().onCallPermitted(e -> System.out.println(e.toString()));
bulkhead.getEventPublisher().onCallFinished(e -> System.out.println(e.toString()));
bulkhead.getEventPublisher().onCallRejected(e -> System.out.println(e.toString()));
示例輸出顯示了記錄的內(nèi)容:
2020-08-26T12:27:39.790435: Bulkhead 'flightSearch' permitted a call.
... other lines omitted ...2020-08-26T12:27:40.290987: Bulkhead 'flightSearch' rejected a call.
... other lines omitted ...2020-08-26T12:27:41.094866: Bulkhead 'flightSearch' has finished a call.
Bulkhead 指標(biāo)
SemaphoreBulkhead
Bulkhead?暴露了兩個指標(biāo):
可用權(quán)限的最大數(shù)量(resilience4j.bulkhead.max.allowed.concurrent.calls),和
允許的并發(fā)調(diào)用數(shù)(resilience4j.bulkhead.available.concurrent.calls)。
bulkhead.available?指標(biāo)與我們在?BulkheadConfig?上配置的?maxConcurrentCalls?相同。
首先,我們像前面一樣創(chuàng)建?BulkheadConfig、BulkheadRegistry?和?Bulkhead。然后,我們創(chuàng)建一個?MeterRegistry?并將?BulkheadRegistry?綁定到它:
MeterRegistry meterRegistry = new SimpleMeterRegistry();
TaggedBulkheadMetrics.ofBulkheadRegistry(registry)
?.bindTo(meterRegistry);
運行幾次隔板裝飾操作后,我們顯示捕獲的指標(biāo):
Consumer<Meter> meterConsumer = meter -> { ?String desc = meter.getId().getDescription(); ?String metricName = meter.getId().getName();
?Double metricValue = StreamSupport.stream(meter.measure().spliterator(), false)
? ?.filter(m -> m.getStatistic().name().equals("VALUE"))
? ?.findFirst()
? ?.map(m -> m.getValue())
? ?.orElse(0.0);
?System.out.println(desc + " - " + metricName + ": " + metricValue);};meterRegistry.forEachMeter(meterConsumer);
這是一些示例輸出:
The maximum number of available permissions - resilience4j.bulkhead.max.allowed.concurrent.calls: 8.0The number of available permissions - resilience4j.bulkhead.available.concurrent.calls: 3.0
ThreadPoolBulkhead
ThreadPoolBulkhead?暴露五個指標(biāo):
隊列的當(dāng)前長度(resilience4j.bulkhead.queue.depth),
當(dāng)前線程池的大?。╮esilience4j.bulkhead.thread.pool.size),
線程池的核心和最大容量(resilience4j.bulkhead.core.thread.pool.size?和?resilience4j.bulkhead.max.thread.pool.size),以及
隊列的容量(resilience4j.bulkhead.queue.capacity)。
首先,我們像前面一樣創(chuàng)建?ThreadPoolBulkheadConfig、ThreadPoolBulkheadRegistry?和?ThreadPoolBulkhead。然后,我們創(chuàng)建一個?MeterRegistry?并將?ThreadPoolBulkheadRegistry?綁定到它:
MeterRegistry meterRegistry = new SimpleMeterRegistry();
TaggedThreadPoolBulkheadMetrics.ofThreadPoolBulkheadRegistry(registry).bindTo(meterRegistry);
運行幾次隔板裝飾操作后,我們將顯示捕獲的指標(biāo):
The queue capacity - resilience4j.bulkhead.queue.capacity: 5.0The queue depth - resilience4j.bulkhead.queue.depth: 1.0The thread pool size - resilience4j.bulkhead.thread.pool.size: 5.0The maximum thread pool size - resilience4j.bulkhead.max.thread.pool.size: 5.0The core thread pool size - resilience4j.bulkhead.core.thread.pool.size: 3.0
在實際應(yīng)用中,我們會定期將數(shù)據(jù)導(dǎo)出到監(jiān)控系統(tǒng)并在儀表板上進(jìn)行分析。
實施隔板時的陷阱和良好實踐
使隔板成為單例
對給定遠(yuǎn)程服務(wù)的所有調(diào)用都應(yīng)通過同一個?Bulkhead?實例。對于給定的遠(yuǎn)程服務(wù),Bulkhead?必須是單例。
如果我們不強(qiáng)制執(zhí)行此操作,我們代碼庫的某些區(qū)域可能會繞過?Bulkhead?直接調(diào)用遠(yuǎn)程服務(wù)。為了防止這種情況,遠(yuǎn)程服務(wù)的實際調(diào)用應(yīng)該在一個核心、內(nèi)部層和其他區(qū)域應(yīng)該使用內(nèi)部層暴露的隔板裝飾器。
我們?nèi)绾未_保未來的新開發(fā)人員理解這一意圖? 查看 Tom 的文章,該文章展示了解決此類問題的一種方法,即通過組織包結(jié)構(gòu)來明確此類意圖。此外,它還展示了如何通過在 ArchUnit 測試中編碼意圖來強(qiáng)制執(zhí)行此操作。
與其他 Resilience4j 模塊結(jié)合
將隔板與一個或多個其他 Resilience4j 模塊(如重試和速率限制器)結(jié)合使用會更有效。例如,如果有?BulkheadFullException,我們可能希望在一些延遲后重試。
結(jié)論
在本文中,我們學(xué)習(xí)了如何使用 Resilience4j 的 Bulkhead 模塊對我們對遠(yuǎn)程服務(wù)進(jìn)行的并發(fā)調(diào)用設(shè)置限制。我們了解了為什么這很重要,還看到了一些有關(guān)如何配置它的實際示例。
您可以使用?[GitHub 上](https://github.com/thombergs/code-examples/tree/master/resilience4j/bulkhead)的代碼演示一個完整的應(yīng)用程序。
本文譯自: [Implementing Bulkhead with Resilience4j - Reflectoring](Implementing Bulkhead with Resilience4j - Reflectoring)