五月天青色头像情侣网名,国产亚洲av片在线观看18女人,黑人巨茎大战俄罗斯美女,扒下她的小内裤打屁股

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

Java 項目中使用 Resilience4j 框架實現(xiàn)故障隔離

2021-11-26 14:07 作者:信碼由韁  | 我要投稿



到目前為止,在本系列中,我們已經(jīng)了解了 Resilience4j 及其?[Retry](https://icodewalker.com/blog/261/),?[RateLimiter](https://icodewalker.com/blog/288/)?和?[TimeLimiter](https://icodewalker.com/blog/302/)?模塊。在本文中,我們將探討 Bulkhead 模塊。我們將了解它解決了什么問題,何時以及如何使用它,并查看一些示例。

代碼示例

本文附有?[GitHub 上](https://github.com/thombergs/code-examples/tree/master/resilience4j/bulkhead)的工作代碼示例。

什么是 Resilience4j?

請參閱上一篇文章中的描述,快速了解?[Resilience4j 的一般工作原理](https://icodewalker.com/blog/261/#what-is-resilience4j)。

什么是故障隔離?

幾年前,我們遇到了一個生產(chǎn)問題,其中一臺服務(wù)器停止響應(yīng)健康檢查,負(fù)載均衡器將服務(wù)器從池中取出。

就在我們開始調(diào)查這個問題的時候,還有第二個警報——另一臺服務(wù)器已經(jīng)停止響應(yīng)健康檢查,也被從池中取出。

幾分鐘后,每臺服務(wù)器都停止響應(yīng)健康探測,我們的服務(wù)完全關(guān)閉。

我們使用 Redis 為應(yīng)用程序支持的幾個功能緩存一些數(shù)據(jù)。正如我們后來發(fā)現(xiàn)的那樣,Redis 集群同時出現(xiàn)了一些問題,它已停止接受新連接。我們使用 Jedis 庫連接到 Redis,該庫的默認(rèn)行為是無限期地阻塞調(diào)用線程,直到建立連接。

我們的服務(wù)托管在 Tomcat 上,它的默認(rèn)請求處理線程池大小為 200 個線程。因此,通過連接到 Redis 的代碼路徑的每個請求最終都會無限期地阻塞線程。

幾分鐘之內(nèi),集群中的所有 2000 個線程都無限期地阻塞了——甚至沒有空閑線程來響應(yīng)負(fù)載均衡器的健康檢查。

該服務(wù)本身支持多項功能,并非所有功能都需要訪問 Redis 緩存。但是當(dāng)這一方面出現(xiàn)問題時,它最終影響了整個服務(wù)。

這正是故障隔離要解決的問題——它可以防止某個服務(wù)區(qū)域的問題影響整個服務(wù)。

雖然我們的服務(wù)發(fā)生的事情是一個極端的例子,但我們可以看到緩慢的上游依賴如何影響調(diào)用服務(wù)的不相關(guān)區(qū)域。

如果我們在每個服務(wù)器實例上對 Redis 設(shè)置了 20 個并發(fā)請求的限制,那么當(dāng) Redis 連接問題發(fā)生時,只有這些線程會受到影響。剩余的請求處理線程可以繼續(xù)為其他請求提供服務(wù)。

故障隔離背后的想法是對我們對遠(yuǎn)程服務(wù)進(jìn)行的并發(fā)調(diào)用數(shù)量設(shè)置限制。我們將對不同遠(yuǎn)程服務(wù)的調(diào)用視為不同的、隔離的池,并對可以同時進(jìn)行的調(diào)用數(shù)量設(shè)置限制。

術(shù)語艙壁本身來自它在船舶中的使用,其中船舶的底部被分成彼此分開的部分。如果有裂縫,并且水開始流入,則只有該部分會充滿水。這可以防止整艘船沉沒。

Resilience4j 隔板概念

resilience4j-bulkhead?的工作原理類似于其他 Resilience4j 模塊。我們?yōu)樗峁┝宋覀兿胍鳛楹瘮?shù)構(gòu)造執(zhí)行的代碼——一個進(jìn)行遠(yuǎn)程調(diào)用的 lambda 表達(dá)式或一個從遠(yuǎn)程服務(wù)中檢索到的某個值的?Supplier,等等——并且隔板用代碼裝飾它以控制并發(fā)調(diào)用數(shù)。

Resilience4j 提供兩種類型的隔板 -?SemaphoreBulkhead??ThreadPoolBulkhead。

SemaphoreBulkhead?內(nèi)部使用?java.util.concurrent.Semaphore?來控制并發(fā)調(diào)用的數(shù)量并在當(dāng)前線程上執(zhí)行我們的代碼。

ThreadPoolBulkhead?使用線程池中的一個線程來執(zhí)行我們的代碼。它內(nèi)部使用?java.util.concurrent.ArrayBlockingQueue?和?java.util.concurrent.ThreadPoolExecutor?來控制并發(fā)調(diào)用的數(shù)量。

SemaphoreBulkhead

讓我們看看與信號量隔板相關(guān)的配置及其含義。

maxConcurrentCalls?確定我們可以對遠(yuǎn)程服務(wù)進(jìn)行的最大并發(fā)調(diào)用數(shù)。我們可以將此值視為初始化信號量的許可數(shù)。

任何嘗試超過此限制調(diào)用遠(yuǎn)程服務(wù)的線程都可以立即獲得?BulkheadFullException?或等待一段時間以等待另一個線程釋放許可。這由?maxWaitDuration?值決定。

當(dāng)有多個線程在等待許可時,fairCallHandlingEnabled?配置確定等待的線程是否以先進(jìn)先出的順序獲取許可。

最后,?writableStackTraceEnabled?配置讓我們可以在?BulkheadFullException?發(fā)生時減少堆棧跟蹤中的信息量。這很有用,因為如果沒有它,當(dāng)異常多次發(fā)生時,我們的日志可能會充滿許多類似的信息。通常在讀取日志時,只知道發(fā)生了?BulkheadFullException?就足夠了。

ThreadPoolBulkhead

coreThreadPoolSize?、?maxThreadPoolSize?、?keepAliveDuration?和?queueCapacity?是與?ThreadPoolBulkhead?相關(guān)的主要配置。ThreadPoolBulkhead?內(nèi)部使用這些配置來構(gòu)造一個 ThreadPoolExecutor。

internalThreadPoolExecutor?使用可用的空閑線程之一執(zhí)行傳入的任務(wù)。 如果沒有線程可以自由執(zhí)行傳入的任務(wù),則該任務(wù)將排隊等待線程可用時稍后執(zhí)行。如果已達(dá)到?queueCapacity,則遠(yuǎn)程調(diào)用將被拒絕并返回?BulkheadFullException。

ThreadPoolBulkhead?也有?writableStackTraceEnabled?配置來控制?BulkheadFullException?的堆棧跟蹤中的信息量。

使用 Resilience4j 隔板模塊

讓我們看看如何使用?resilience4j-bulkhead?模塊中可用的各種功能。

我們將使用與本系列前幾篇文章相同的示例。假設(shè)我們正在為一家航空公司建立一個網(wǎng)站,以允許其客戶搜索和預(yù)訂航班。我們的服務(wù)與?FlightSearchService?類封裝的遠(yuǎn)程服務(wù)對話。

SemaphoreBulkhead

使用基于信號量的隔板時,BulkheadRegistry、BulkheadConfig?和?Bulkhead?是我們使用的主要抽象。

BulkheadRegistry?是一個用于創(chuàng)建和管理?Bulkhead?對象的工廠。

BulkheadConfig?封裝了?maxConcurrentCalls、maxWaitDuration、writableStackTraceEnabled?和?fairCallHandlingEnabled?配置。每個 Bulkhead 對象都與一個?BulkheadConfig?相關(guān)聯(lián)。

第一步是創(chuàng)建一個?BulkheadConfig:

BulkheadConfig config = BulkheadConfig.ofDefaults();

這將創(chuàng)建一個?BulkheadConfig,其默認(rèn)值為?maxConcurrentCalls(25)、maxWaitDuration(0s)、writableStackTraceEnabled(true)?和?fairCallHandlingEnabled(true)。

假設(shè)我們希望將并發(fā)調(diào)用的數(shù)量限制為 2,并且我們愿意等待 2 秒讓線程獲得許可:

BulkheadConfig config = BulkheadConfig.custom() ?.maxConcurrentCalls(2) ?.maxWaitDuration(Duration.ofSeconds(2)) ?.build();

然后我們創(chuàng)建一個?Bulkhead:

BulkheadRegistry registry = BulkheadRegistry.of(config);Bulkhead bulkhead = registry.bulkhead("flightSearchService");

現(xiàn)在讓我們表達(dá)我們的代碼以作為?Supplier?運行航班搜索并使用?bulkhead?裝飾它:

BulkheadRegistry registry = BulkheadRegistry.of(config);Bulkhead bulkhead = registry.bulkhead("flightSearchService");

最后,讓我們調(diào)用幾次裝飾操作來了解隔板的工作原理。我們可以使用?CompletableFuture?來模擬來自用戶的并發(fā)航班搜索請求:

for (int i=0; i<4; i++) { ?CompletableFuture ? ?.supplyAsync(decoratedFlightsSupplier) ? ?.thenAccept(flights -> System.out.println("Received results")); }

輸出中的時間戳和線程名稱顯示,在 4 個并發(fā)請求中,前兩個請求立即通過:

Searching for flights; current time = 11:42:13 187; current thread = ForkJoinPool.commonPool-worker-3Searching for flights; current time = 11:42:13 187; current thread = ForkJoinPool.commonPool-worker-5Flight search successful at 11:42:13 226Flight search successful at 11:42:13 226Received resultsReceived resultsSearching for flights; current time = 11:42:14 239; current thread = ForkJoinPool.commonPool-worker-9Searching for flights; current time = 11:42:14 239; current thread = ForkJoinPool.commonPool-worker-7Flight search successful at 11:42:14 239Flight search successful at 11:42:14 239Received resultsReceived results

第三個和第四個請求僅在 1 秒后就能夠獲得許可,在之前的請求完成之后。

如果線程無法在我們指定的 2s?maxWaitDuration?內(nèi)獲得許可,則會拋出?BulkheadFullException:

Caused by: io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further calls ? ?at io.github.resilience4j.bulkhead.BulkheadFullException.createBulkheadFullException(BulkheadFullException.java:49) ? ?at io.github.resilience4j.bulkhead.internal.SemaphoreBulkhead.acquirePermission(SemaphoreBulkhead.java:164) ? ?at io.github.resilience4j.bulkhead.Bulkhead.lambda$decorateSupplier$5(Bulkhead.java:194) ? ?at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ? ?... 6 more

除了第一行,堆棧跟蹤中的其他行沒有增加太多價值。如果?BulkheadFullException?發(fā)生多次,這些堆棧跟蹤行將在我們的日志文件中重復(fù)。

我們可以通過將?writableStackTraceEnabled?配置設(shè)置為?false?來減少堆棧跟蹤中生成的信息量:

BulkheadConfig config = BulkheadConfig.custom() ? ?.maxConcurrentCalls(2) ? ?.maxWaitDuration(Duration.ofSeconds(1)) ? ?.writableStackTraceEnabled(false) .build();

現(xiàn)在,當(dāng)?BulkheadFullException?發(fā)生時,堆棧跟蹤中只存在一行:

Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-3Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-5io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further callsFlight search successful at 12:27:58 699Flight search successful at 12:27:58 699Received resultsReceived results

與我們見過的其他 Resilience4j 模塊類似,Bulkhead?還提供了額外的方法,如?decorateCheckedSupplier()、decorateCompletionStage()、decorateRunnable()、decorateConsumer()?等,因此我們可以在?Supplier?供應(yīng)商之外的其他結(jié)構(gòu)中提供我們的代碼。

ThreadPoolBulkhead

當(dāng)使用基于線程池的隔板時,?ThreadPoolBulkheadRegistry、ThreadPoolBulkheadConfig?和?ThreadPoolBulkhead?是我們使用的主要抽象。

ThreadPoolBulkheadRegistry?是用于創(chuàng)建和管理?ThreadPoolBulkhead?對象的工廠。

ThreadPoolBulkheadConfig?封裝了?coreThreadPoolSize?、?maxThreadPoolSize、?keepAliveDuration?和?queueCapacity?配置。每個?ThreadPoolBulkhead?對象都與一個?ThreadPoolBulkheadConfig?相關(guān)聯(lián)。

第一步是創(chuàng)建一個?ThreadPoolBulkheadConfig:

ThreadPoolBulkheadConfig config = ?ThreadPoolBulkheadConfig.ofDefaults();

這將創(chuàng)建一個?ThreadPoolBulkheadConfig,其默認(rèn)值為?coreThreadPoolSize(可用處理器數(shù)量 - 1)、maxThreadPoolSize(可用處理器最大數(shù)量)、keepAliveDuration(20ms)和?queueCapacity(100)。

假設(shè)我們要將并發(fā)調(diào)用的數(shù)量限制為 2:

ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom() ?.maxThreadPoolSize(2) ?.coreThreadPoolSize(1) ?.queueCapacity(1) ?.build();

然后我們創(chuàng)建一個?ThreadPoolBulkhead:

ThreadPoolBulkheadRegistry registry = ThreadPoolBulkheadRegistry.of(config);ThreadPoolBulkhead bulkhead = registry.bulkhead("flightSearchService");

現(xiàn)在讓我們表達(dá)我們的代碼以作為?Supplier?運行航班搜索并使用?bulkhead?裝飾它:

Supplier<List<Flight>> flightsSupplier = ?() -> service.searchFlightsTakingOneSecond(request); Supplier<CompletionStage<List<Flight>>> decoratedFlightsSupplier = ?ThreadPoolBulkhead.decorateSupplier(bulkhead, flightsSupplier);

與返回一個?Supplier<List<Flight>>?的?SemaphoreBulkhead.decorateSupplier()?不同,ThreadPoolBulkhead.decorateSupplier()?返回一個?Supplier<CompletionStage<List<Flight>>。這是因為?ThreadPoolBulkHead?不會在當(dāng)前線程上同步執(zhí)行代碼。

最后,讓我們調(diào)用幾次裝飾操作來了解隔板的工作原理:

for (int i=0; i<3; i++) { ?decoratedFlightsSupplier ? ?.get() ? ?.whenComplete((r,t) -> { ? ? ?if (r != null) { ? ? ? ?System.out.println("Received results"); ? ? ?} ? ? ?if (t != null) { ? ? ? ?t.printStackTrace(); ? ? ?} ? ?}); }

輸出中的時間戳和線程名稱顯示,雖然前兩個請求立即執(zhí)行,但第三個請求已排隊,稍后由釋放的線程之一執(zhí)行:

Searching for flights; current time = 16:15:00 097; current thread = bulkhead-flightSearchService-1Searching for flights; current time = 16:15:00 097; current thread = bulkhead-flightSearchService-2Flight search successful at 16:15:00 136Flight search successful at 16:15:00 135Received resultsReceived resultsSearching for flights; current time = 16:15:01 151; current thread = bulkhead-flightSearchService-2Flight search successful at 16:15:01 151Received results

如果隊列中沒有空閑線程和容量,則拋出?BulkheadFullException:

Exception in thread "main" io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further calls at io.github.resilience4j.bulkhead.BulkheadFullException.createBulkheadFullException(BulkheadFullException.java:64) at io.github.resilience4j.bulkhead.internal.FixedThreadPoolBulkhead.submit(FixedThreadPoolBulkhead.java:157) ... other lines omitted ...

我們可以使用?writableStackTraceEnabled?配置來減少堆棧跟蹤中生成的信息量:

ThreadPoolBulkheadConfig config = ThreadPoolBulkheadConfig.custom() ?.maxThreadPoolSize(2) ?.coreThreadPoolSize(1) ?.queueCapacity(1) ?.writableStackTraceEnabled(false) ?.build();

現(xiàn)在,當(dāng)?BulkheadFullException?發(fā)生時,堆棧跟蹤中只存在一行:

Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-3Searching for flights; current time = 12:27:58 658; current thread = ForkJoinPool.commonPool-worker-5io.github.resilience4j.bulkhead.BulkheadFullException: Bulkhead 'flightSearchService' is full and does not permit further callsFlight search successful at 12:27:58 699Flight search successful at 12:27:58 699Received resultsReceived results

上下文傳播

有時我們將數(shù)據(jù)存儲在?ThreadLocal?變量中并在代碼的不同區(qū)域中讀取它。我們這樣做是為了避免在方法鏈之間顯式地將數(shù)據(jù)作為參數(shù)傳遞,尤其是當(dāng)該值與我們正在實現(xiàn)的核心業(yè)務(wù)邏輯沒有直接關(guān)系時。

例如,我們可能希望將當(dāng)前用戶 ID 或事務(wù) ID 或某個請求跟蹤 ID 記錄到每個日志語句中,以便更輕松地搜索日志。對于此類場景,使用?ThreadLocal?是一種有用的技術(shù)。

使用?ThreadPoolBulkhead?時,由于我們的代碼不在當(dāng)前線程上執(zhí)行,因此我們存儲在?ThreadLocal?變量中的數(shù)據(jù)在其他線程中將不可用。

讓我們看一個例子來理解這個問題。首先我們定義一個?RequestTrackingIdHolder?類,一個圍繞?ThreadLocal?的包裝類:

class RequestTrackingIdHolder { ?static ThreadLocal<String> threadLocal = new ThreadLocal<>(); ?static String getRequestTrackingId() { ? ?return threadLocal.get(); ?} ?static void setRequestTrackingId(String id) { ? ?if (threadLocal.get() != null) { ? ? ?threadLocal.set(null); ? ? ?threadLocal.remove(); ? ?} ? ?threadLocal.set(id); ?} ?static void clear() { ? ?threadLocal.set(null); ? ?threadLocal.remove(); ?} }

靜態(tài)方法可以輕松設(shè)置和獲取存儲在?ThreadLocal?上的值。我們接下來在調(diào)用隔板裝飾的航班搜索操作之前設(shè)置一個請求跟蹤 ID:

for (int i=0; i<2; i++) { ?String trackingId = UUID.randomUUID().toString(); ?System.out.println("Setting trackingId " + trackingId + " on parent, main thread before calling flight search"); ?RequestTrackingIdHolder.setRequestTrackingId(trackingId); ?decoratedFlightsSupplier ? ?.get() ? ?.whenComplete((r,t) -> { ? ? ? ?// other lines omitted ? ?}); }

示例輸出顯示此值在隔板管理的線程中不可用:

Setting trackingId 98ff99df-466a-47f7-88f7-5e31fc8fcb6b on parent, main thread before calling flight searchSetting trackingId 6b98d73c-a590-4a20-b19d-c85fea783caf on parent, main thread before calling flight searchSearching for flights; current time = 19:53:53 799; current thread = bulkhead-flightSearchService-1; Request Tracking Id = nullFlight search successful at 19:53:53 824Received resultsSearching for flights; current time = 19:53:54 836; current thread = bulkhead-flightSearchService-1; Request Tracking Id = nullFlight search successful at 19:53:54 836Received results

為了解決這個問題,ThreadPoolBulkhead?提供了一個?ContextPropagator。ContextPropagator?是一種用于跨線程邊界檢索、復(fù)制和清理值的抽象。它定義了一個接口,其中包含從當(dāng)前線程 (retrieve()) 獲取值、將其復(fù)制到新的執(zhí)行線程 (copy()) 并最終在執(zhí)行線程 (clear()) 上進(jìn)行清理的方法。

讓我們實現(xiàn)一個?RequestTrackingIdPropagator:

class RequestTrackingIdPropagator implements ContextPropagator { ?@Override ?public Supplier<Optional> retrieve() { ? ?System.out.println("Getting request tracking id from thread: " + Thread.currentThread().getName()); ? ?return () -> Optional.of(RequestTrackingIdHolder.getRequestTrackingId()); ?} ?@Override ?Consumer<Optional> copy() { ? ?return optional -> { ? ? ?System.out.println("Setting request tracking id " + optional.get() + " on thread: " + Thread.currentThread().getName()); ? ? ?optional.ifPresent(s -> RequestTrackingIdHolder.setRequestTrackingId(s.toString())); ? ?}; ?} ?@Override ?Consumer<Optional> clear() { ? ?return optional -> { ? ? ?System.out.println("Clearing request tracking id on thread: " + Thread.currentThread().getName()); ? ? ?optional.ifPresent(s -> RequestTrackingIdHolder.clear()); ? ?}; ?} }

我們通過在?ThreadPoolBulkheadConfig?上的設(shè)置來為?ThreadPoolBulkhead?提供?ContextPropagator:

class RequestTrackingIdPropagator implements ContextPropagator { ?@Override ?public Supplier<Optional> retrieve() { ? ?System.out.println("Getting request tracking id from thread: " + Thread.currentThread().getName()); ? ?return () -> Optional.of(RequestTrackingIdHolder.getRequestTrackingId()); ?} ?@Override ?Consumer<Optional> copy() { ? ?return optional -> { ? ? ?System.out.println("Setting request tracking id " + optional.get() + " on thread: " + Thread.currentThread().getName()); ? ? ?optional.ifPresent(s -> RequestTrackingIdHolder.setRequestTrackingId(s.toString())); ? ?}; ?} ?@Override ?Consumer<Optional> clear() { ? ?return optional -> { ? ? ?System.out.println("Clearing request tracking id on thread: " + Thread.currentThread().getName()); ? ? ?optional.ifPresent(s -> RequestTrackingIdHolder.clear()); ? ?}; ?} }

現(xiàn)在,示例輸出顯示請求跟蹤 ID 在隔板管理的線程中可用:

Setting trackingId 71d44cb8-dab6-4222-8945-e7fd023528ba on parent, main thread before calling flight searchGetting request tracking id from thread: mainSetting trackingId 5f9dd084-f2cb-4a20-804b-038828abc161 on parent, main thread before calling flight searchGetting request tracking id from thread: mainSetting request tracking id 71d44cb8-dab6-4222-8945-e7fd023528ba on thread: bulkhead-flightSearchService-1Searching for flights; current time = 20:07:56 508; current thread = bulkhead-flightSearchService-1; Request Tracking Id = 71d44cb8-dab6-4222-8945-e7fd023528baFlight search successful at 20:07:56 538Clearing request tracking id on thread: bulkhead-flightSearchService-1Received resultsSetting request tracking id 5f9dd084-f2cb-4a20-804b-038828abc161 on thread: bulkhead-flightSearchService-1Searching for flights; current time = 20:07:57 542; current thread = bulkhead-flightSearchService-1; Request Tracking Id = 5f9dd084-f2cb-4a20-804b-038828abc161Flight search successful at 20:07:57 542Clearing request tracking id on thread: bulkhead-flightSearchService-1Received results

Bulkhead事件

Bulkhead?和?ThreadPoolBulkhead?都有一個?EventPublisher?來生成以下類型的事件:

  • BulkheadOnCallPermittedEvent

  • BulkheadOnCallRejectedEvent?和

  • BulkheadOnCallFinishedEvent

我們可以監(jiān)聽這些事件并記錄它們,例如:

Bulkhead bulkhead = registry.bulkhead("flightSearchService"); bulkhead.getEventPublisher().onCallPermitted(e -> System.out.println(e.toString())); bulkhead.getEventPublisher().onCallFinished(e -> System.out.println(e.toString())); bulkhead.getEventPublisher().onCallRejected(e -> System.out.println(e.toString()));

示例輸出顯示了記錄的內(nèi)容:

2020-08-26T12:27:39.790435: Bulkhead 'flightSearch' permitted a call. ... other lines omitted ...2020-08-26T12:27:40.290987: Bulkhead 'flightSearch' rejected a call. ... other lines omitted ...2020-08-26T12:27:41.094866: Bulkhead 'flightSearch' has finished a call.

Bulkhead 指標(biāo)

SemaphoreBulkhead

Bulkhead?暴露了兩個指標(biāo):

  • 可用權(quán)限的最大數(shù)量(resilience4j.bulkhead.max.allowed.concurrent.calls),和

  • 允許的并發(fā)調(diào)用數(shù)(resilience4j.bulkhead.available.concurrent.calls)。

bulkhead.available?指標(biāo)與我們在?BulkheadConfig?上配置的?maxConcurrentCalls?相同。

首先,我們像前面一樣創(chuàng)建?BulkheadConfig、BulkheadRegistry?和?Bulkhead。然后,我們創(chuàng)建一個?MeterRegistry?并將?BulkheadRegistry?綁定到它:

MeterRegistry meterRegistry = new SimpleMeterRegistry(); TaggedBulkheadMetrics.ofBulkheadRegistry(registry) ?.bindTo(meterRegistry);

運行幾次隔板裝飾操作后,我們顯示捕獲的指標(biāo):

Consumer<Meter> meterConsumer = meter -> { ?String desc = meter.getId().getDescription(); ?String metricName = meter.getId().getName(); ?Double metricValue = StreamSupport.stream(meter.measure().spliterator(), false) ? ?.filter(m -> m.getStatistic().name().equals("VALUE")) ? ?.findFirst() ? ?.map(m -> m.getValue()) ? ?.orElse(0.0); ?System.out.println(desc + " - " + metricName + ": " + metricValue);};meterRegistry.forEachMeter(meterConsumer);

這是一些示例輸出:

The maximum number of available permissions - resilience4j.bulkhead.max.allowed.concurrent.calls: 8.0The number of available permissions - resilience4j.bulkhead.available.concurrent.calls: 3.0

ThreadPoolBulkhead

ThreadPoolBulkhead?暴露五個指標(biāo):

  • 隊列的當(dāng)前長度(resilience4j.bulkhead.queue.depth),

  • 當(dāng)前線程池的大?。╮esilience4j.bulkhead.thread.pool.size),

  • 線程池的核心和最大容量(resilience4j.bulkhead.core.thread.pool.size?和?resilience4j.bulkhead.max.thread.pool.size),以及

  • 隊列的容量(resilience4j.bulkhead.queue.capacity)。

首先,我們像前面一樣創(chuàng)建?ThreadPoolBulkheadConfig、ThreadPoolBulkheadRegistry?和?ThreadPoolBulkhead。然后,我們創(chuàng)建一個?MeterRegistry?并將?ThreadPoolBulkheadRegistry?綁定到它:

MeterRegistry meterRegistry = new SimpleMeterRegistry(); TaggedThreadPoolBulkheadMetrics.ofThreadPoolBulkheadRegistry(registry).bindTo(meterRegistry);

運行幾次隔板裝飾操作后,我們將顯示捕獲的指標(biāo):

The queue capacity - resilience4j.bulkhead.queue.capacity: 5.0The queue depth - resilience4j.bulkhead.queue.depth: 1.0The thread pool size - resilience4j.bulkhead.thread.pool.size: 5.0The maximum thread pool size - resilience4j.bulkhead.max.thread.pool.size: 5.0The core thread pool size - resilience4j.bulkhead.core.thread.pool.size: 3.0

在實際應(yīng)用中,我們會定期將數(shù)據(jù)導(dǎo)出到監(jiān)控系統(tǒng)并在儀表板上進(jìn)行分析。

實施隔板時的陷阱和良好實踐

使隔板成為單例

對給定遠(yuǎn)程服務(wù)的所有調(diào)用都應(yīng)通過同一個?Bulkhead?實例。對于給定的遠(yuǎn)程服務(wù),Bulkhead?必須是單例。

如果我們不強(qiáng)制執(zhí)行此操作,我們代碼庫的某些區(qū)域可能會繞過?Bulkhead?直接調(diào)用遠(yuǎn)程服務(wù)。為了防止這種情況,遠(yuǎn)程服務(wù)的實際調(diào)用應(yīng)該在一個核心、內(nèi)部層和其他區(qū)域應(yīng)該使用內(nèi)部層暴露的隔板裝飾器。

我們?nèi)绾未_保未來的新開發(fā)人員理解這一意圖? 查看 Tom 的文章,該文章展示了解決此類問題的一種方法,即通過組織包結(jié)構(gòu)來明確此類意圖。此外,它還展示了如何通過在 ArchUnit 測試中編碼意圖來強(qiáng)制執(zhí)行此操作。

與其他 Resilience4j 模塊結(jié)合

將隔板與一個或多個其他 Resilience4j 模塊(如重試和速率限制器)結(jié)合使用會更有效。例如,如果有?BulkheadFullException,我們可能希望在一些延遲后重試。

結(jié)論

在本文中,我們學(xué)習(xí)了如何使用 Resilience4j 的 Bulkhead 模塊對我們對遠(yuǎn)程服務(wù)進(jìn)行的并發(fā)調(diào)用設(shè)置限制。我們了解了為什么這很重要,還看到了一些有關(guān)如何配置它的實際示例。

您可以使用?[GitHub 上](https://github.com/thombergs/code-examples/tree/master/resilience4j/bulkhead)的代碼演示一個完整的應(yīng)用程序。


本文譯自: [Implementing Bulkhead with Resilience4j - Reflectoring](Implementing Bulkhead with Resilience4j - Reflectoring)


Java 項目中使用 Resilience4j 框架實現(xiàn)故障隔離的評論 (共 條)

分享到微博請遵守國家法律
汉源县| 兴文县| 泗阳县| 蒙阴县| 西平县| 繁峙县| 莫力| 吉木乃县| 德保县| 西林县| 航空| 寿光市| 壤塘县| 克什克腾旗| 丽江市| 铜川市| 卓尼县| 汤阴县| 威海市| 祥云县| 靖边县| 德兴市| 泗洪县| 礼泉县| 商丘市| 凯里市| 娱乐| 永善县| 英德市| 梅河口市| 鄂托克旗| 九龙坡区| 绍兴市| 新巴尔虎右旗| 白玉县| 南澳县| 阳泉市| 莆田市| 绥阳县| 衡水市| 江城|