Gids voor CountDownLatch in Java

1. Inleiding

In dit artikel geven we een gids voor het CountDownLatch les en laat zien hoe het kan worden gebruikt in een paar praktische voorbeelden.

In wezen door een CountDownLatch we kunnen ervoor zorgen dat een thread blokkeert totdat andere threads een bepaalde taak hebben voltooid.

2. Gebruik bij gelijktijdig programmeren

Simpel gezegd, een CountDownLatch heeft een teller veld, dat u naar behoefte kunt verkleinen. We kunnen het dan gebruiken om een ​​aanroepende thread te blokkeren totdat deze naar nul is geteld.

Als we wat parallelle verwerking zouden uitvoeren, zouden we het CountDownLatch met dezelfde waarde voor de teller als een aantal threads waar we overheen willen werken. Dan kunnen we gewoon bellen aftellen () na elke draad eindigt, wat garandeert dat een afhankelijke draad aanroept wachten() wordt geblokkeerd totdat de werkdraden zijn voltooid.

3. Wachten tot een pool van discussies is voltooid

Laten we dit patroon uitproberen door een Werknemer en het gebruik van een CountDownLatch veld om aan te geven wanneer het is voltooid:

public class Worker implementeert Runnable {private List outputScraper; privé CountDownLatch countDownLatch; openbare Worker (lijst outputScraper, CountDownLatch countDownLatch) {this.outputScraper = outputScraper; this.countDownLatch = countDownLatch; } @Override public void run () {doSomeWork (); outputScraper.add ("Afgeteld"); countDownLatch.countDown (); }}

Laten we vervolgens een test maken om te bewijzen dat we een CountDownLatch wachten op de Werknemer instanties om te voltooien:

@Test openbare leegte whenParallelProcessing_thenMainThreadWillBlockUntilCompletion () gooit InterruptedException {List outputScraper = Collections.synchronizedList (nieuwe ArrayList ()); CountDownLatch countDownLatch = nieuwe CountDownLatch (5); List workers = Stream .generate (() -> nieuwe thread (nieuwe Worker (outputScraper, countDownLatch))) .limit (5) .collect (toList ()); workers.forEach (Thread :: start); countDownLatch.await (); outputScraper.add ("Vergrendeling vrijgegeven"); assertThat (outputScraper) .containsExactly ("Afgeteld", "Afgeteld", "Afgeteld", "Afgeteld", "Afgeteld", "Latch vrijgegeven"); }

Natuurlijk zal "Latch released" altijd de laatste output zijn - aangezien het afhankelijk is van de CountDownLatch vrijgeven.

Merk op dat als we niet hebben gebeld wachten(), zouden we de volgorde van de uitvoering van de threads niet kunnen garanderen, dus de test zou willekeurig mislukken.

4. Een pool van discussies die wachten om te beginnen

Als we het vorige voorbeeld hebben genomen, maar deze keer duizenden threads begonnen in plaats van vijf, is het waarschijnlijk dat veel van de eerdere threads zijn voltooid voordat we zelfs maar hebben gebeld begin() op de latere. Dit zou het moeilijk kunnen maken om te proberen een gelijktijdigheidsprobleem te reproduceren, omdat we niet in staat zouden zijn om al onze threads parallel te laten lopen.

Om dit te omzeilen, laten we de CountdownLatch anders werken dan in het vorige voorbeeld. In plaats van een bovenliggende thread te blokkeren totdat sommige onderliggende threads zijn voltooid, kunnen we elke onderliggende thread blokkeren totdat alle andere zijn gestart.

Laten we onze rennen() methode zodat het blokkeert voordat het wordt verwerkt:

openbare klasse WaitingWorker implementeert Runnable {private List outputScraper; privé CountDownLatch readyThreadCounter; privé CountDownLatch callingThreadBlocker; privé CountDownLatch voltooidThreadCounter; openbare WaitingWorker (lijst outputScraper, CountDownLatch readyThreadCounter, CountDownLatch callingThreadBlocker, CountDownLatch voltooidThreadCounter) {this.outputScraper = outputScraper; this.readyThreadCounter = readyThreadCounter; this.callingThreadBlocker = callingThreadBlocker; this.completedThreadCounter = completeThreadCounter; } @Override public void run () {readyThreadCounter.countDown (); probeer {callingThreadBlocker.await (); doe wat werk(); outputScraper.add ("Afgeteld"); } catch (InterruptedException e) {e.printStackTrace (); } eindelijk {completeThreadCounter.countDown (); }}}

Laten we nu onze test aanpassen zodat deze blokkeert tot alle Werknemers zijn begonnen, deblokkeert het Werknemers, en vervolgens blokken tot de Werknemers zijn klaar:

@Test openbare leegte whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime () gooit InterruptedException {Lijst outputScraper = Collections.synchronizedList (nieuwe ArrayList ()); CountDownLatch readyThreadCounter = nieuwe CountDownLatch (5); CountDownLatch callingThreadBlocker = nieuwe CountDownLatch (1); CountDownLatch voltooidThreadCounter = nieuwe CountDownLatch (5); List workers = Stream .generate (() -> nieuwe Thread (nieuwe WaitingWorker (outputScraper, readyThreadCounter, callingThreadBlocker, completeThreadCounter))) .limit (5) .collect (toList ()); workers.forEach (Thread :: start); readyThreadCounter.await (); outputScraper.add ("Werknemers gereed"); callingThreadBlocker.countDown (); voltooideThreadCounter.await (); outputScraper.add ("Werknemers compleet"); assertThat (outputScraper) .containsExactly ("Werknemers gereed", "Afgeteld", "Afgeteld", "Afgeteld", "Afgeteld", "Afgeteld", "Werknemers voltooid"); }

Dit patroon is erg handig om gelijktijdige bugs te reproduceren, zoals het kan worden gebruikt om duizenden threads te dwingen om te proberen wat logica parallel uit te voeren.

5. Beëindiging van een CountdownLatch Vroeg

Soms kunnen we een situatie tegenkomen waarin de Werknemers eindigen per ongeluk voordat de CountDownLatch. Dit kan ertoe leiden dat het nooit nul bereikt en wachten() nooit beëindigen:

@Override public void run () {if (true) {throw new RuntimeException ("Oh jee, ik ben een BrokenWorker"); } countDownLatch.countDown (); outputScraper.add ("Afgeteld"); }

Laten we onze eerdere test aanpassen om een BrokenWorker, om te laten zien hoe wachten() zal voor altijd blokkeren:

@Test openbare leegte whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck () gooit InterruptedException {List outputScraper = Collections.synchronizedList (nieuwe ArrayList ()); CountDownLatch countDownLatch = nieuwe CountDownLatch (5); Lijstwerkers = Stream .generate (() -> nieuwe thread (nieuwe BrokenWorker (outputScraper, countDownLatch))) .limit (5) .collect (toList ()); workers.forEach (Thread :: start); countDownLatch.await (); }

Dit is duidelijk niet het gedrag dat we willen - het zou veel beter zijn als de toepassing doorgaat dan oneindig blokkeren.

Om dit te omzeilen, voegen we een time-outargument toe aan onze aanroep naar wachten().

boolean voltooid = countDownLatch.await (3L, TimeUnit.SECONDS); assertThat (voltooid) .isFalse ();

Zoals we kunnen zien, zal de test uiteindelijk een time-out hebben en wachten() zal terugkeren false.

6. Conclusie

In deze korte handleiding hebben we laten zien hoe we een CountDownLatch om een ​​thread te blokkeren totdat andere threads een bewerking hebben voltooid.

We hebben ook laten zien hoe het kan worden gebruikt om gelijktijdigheidsproblemen op te lossen door ervoor te zorgen dat threads parallel lopen.

De implementatie van deze voorbeelden is te vinden op GitHub; dit is een op Maven gebaseerd project, dus het zou gemakkelijk moeten zijn zoals het is.