CyclicBarrier in Java

1. Inleiding

CyclicBarriers zijn synchronisatieconstructies die zijn geïntroduceerd met Java 5 als onderdeel van de java.util.concurrent pakket.

In dit artikel zullen we deze implementatie onderzoeken in een gelijktijdigheidsscenario.

2. Java Concurrency - Synchronizers

De java.util.concurrent pakket bevat verschillende klassen die helpen bij het beheren van een reeks discussielijnen die met elkaar samenwerken. Enkele hiervan zijn:

  • CyclicBarrier
  • Phaser
  • CountDownLatch
  • Uitwisselaar
  • Semafoor
  • Synchrone wachtrij

Deze klassen bieden kant-en-klare functionaliteit voor veelvoorkomende interactiepatronen tussen threads.

Als we een reeks threads hebben die met elkaar communiceren en lijken op een van de gemeenschappelijke patronen, we kunnen de juiste bibliotheekklassen (ook wel Synchronisatoren) in plaats van te proberen een aangepast schema te bedenken met behulp van een set vergrendelingen en voorwaardeobjecten en de gesynchroniseerd trefwoord.

Laten we ons concentreren op de CyclicBarrier vooruit gaan.

3. CyclicBarrier

EEN CyclicBarrier is een synchronisator waarmee een reeks threads op elkaar kan wachten om een ​​gemeenschappelijk uitvoeringspunt te bereiken, ook wel een barrière.

CyclicBarriers worden gebruikt in programma's waarin we een vast aantal threads hebben die op elkaar moeten wachten om een ​​gemeenschappelijk punt te bereiken alvorens verder te gaan met de uitvoering.

De slagboom heet cyclisch omdat het kan worden hergebruikt nadat de wachtende threads zijn vrijgegeven.

4. Gebruik

De constructor voor een CyclicBarrier is simpel. Er is één geheel getal nodig dat het aantal threads aangeeft dat de wachten() methode op de barrière-instantie om aan te geven dat het gemeenschappelijke uitvoeringspunt wordt bereikt:

openbare CyclicBarrier (int partijen)

De threads die hun uitvoering moeten synchroniseren, worden ook wel genoemd feesten en het bellen van de wachten() methode is hoe we kunnen registreren dat een bepaalde thread het barrièrepunt heeft bereikt.

Deze aanroep is synchroon en de thread die deze methode aanroept, schort de uitvoering op totdat een gespecificeerd aantal threads dezelfde methode op de barrière heeft aangeroepen. Deze situatie waarin het vereiste aantal threads is aangeroepen wachten(), wordt genoemd struikelen over de barrière.

Optioneel kunnen we het tweede argument doorgeven aan de constructor, namelijk een Runnable voorbeeld. Dit heeft logica die zou worden uitgevoerd door de laatste thread die de barrière uitschakelt:

openbare CyclicBarrier (int partijen, Runnable barrierAction)

5. Implementatie

Zien CyclicBarrier Laten we in actie kijken naar het volgende scenario:

Er is een bewerking die een vast aantal threads uitvoert en de bijbehorende resultaten in een lijst opslaat. Wanneer alle threads klaar zijn met het uitvoeren van hun actie, begint een van hen (meestal de laatste die de barrière doorbreekt) met het verwerken van de gegevens die door elk van deze threads zijn opgehaald.

Laten we de hoofdklasse implementeren waar alle actie plaatsvindt:

openbare klasse CyclicBarrierDemo {privé CyclicBarrier cyclicBarrier; privélijst PartialResults = Collections.synchronizedList (nieuwe ArrayList ()); private Random random = nieuwe Random (); privé int NUM_PARTIAL_RESULTS; privé int NUM_WORKERS; // ...}

Deze les is vrij ongecompliceerd - NUM_WORKERS is het aantal threads dat wordt uitgevoerd en NUM_PARTIAL_RESULTS is het aantal resultaten dat elk van de werkdraden gaat produceren.

Eindelijk hebben we gedeeltelijke resultaten dat is een lijst waarin de resultaten van elk van deze werkthreads worden opgeslagen. Houd er rekening mee dat deze lijst een SynchronizedList omdat er meerdere threads tegelijkertijd naar zullen schrijven, en de toevoegen() methode is niet thread-safe op een vlakte ArrayList.

Laten we nu de logica van elk van de werkthreads implementeren:

openbare klasse CyclicBarrierDemo {// ... klasse NumberCruncherThread implementeert Runnable {@Override public void run () {String thisThreadName = Thread.currentThread (). getName (); Lijst PartialResult = new ArrayList (); // Crunch enkele getallen en sla het gedeeltelijke resultaat op voor (int i = 0; i <NUM_PARTIAL_RESULTS; i ++) {Integer num = random.nextInt (10); System.out.println (thisThreadName + ": Enkele getallen kraken! Eindresultaat -" + num); PartialResult.add (num); } PartialResults.add (PartialResult); probeer {System.out.println (thisThreadName + "wachtend tot anderen de barrière bereiken."); cyclicBarrier.await (); } catch (InterruptedException e) {// ...} catch (BrokenBarrierException e) {// ...}}}}

We zullen nu de logica implementeren die wordt uitgevoerd wanneer de barrière is geactiveerd.

Laten we, om het simpel te houden, alle getallen in de lijst met gedeeltelijke resultaten toevoegen:

openbare klasse CyclicBarrierDemo {// ... klasse AggregatorThread implementeert Runnable {@Override public void run () {String thisThreadName = Thread.currentThread (). getName (); System.out.println (thisThreadName + ": Berekende som van" + NUM_WORKERS + "workers, met" + NUM_PARTIAL_RESULTS + "resultaten elk."); int som = 0; voor (Lijst threadResult: PartialResults) {System.out.print ("Toevoegen"); for (Geheel getal partieelResult: threadResult) {System.out.print (partieelResultaat + ""); som + = partieel resultaat; } System.out.println (); } System.out.println (thisThreadName + ": Eindresultaat =" + som); }}}

De laatste stap zou zijn om het CyclicBarrier en trappen af ​​met een hoofd() methode:

openbare klasse CyclicBarrierDemo {// Vorige code openbare leegte runSimulation (int numWorkers, int numberOfPartialResults) {NUM_PARTIAL_RESULTS = numberOfPartialResults; NUM_WORKERS = aantalWorkers; cyclicBarrier = nieuwe CyclicBarrier (NUM_WORKERS, nieuwe AggregatorThread ()); System.out.println ("Spawning" + NUM_WORKERS + "werkthreads om te berekenen" + NUM_PARTIAL_RESULTS + "gedeeltelijke resultaten elk"); voor (int i = 0; i <NUM_WORKERS; i ++) {Thread worker = new Thread (new NumberCruncherThread ()); worker.setName ("Thread" + i); worker.start (); }} public static void main (String [] args) {CyclicBarrierDemo demo = nieuwe CyclicBarrierDemo (); demo.runSimulation (5, 3); }} 

In de bovenstaande code hebben we de cyclische barrière geïnitialiseerd met 5 threads die elk 3 gehele getallen produceren als onderdeel van hun berekening en deze opslaan in de resulterende lijst.

Zodra de barrière is geactiveerd, voert de laatste thread die de barrière heeft geactiveerd de logica uit die is gespecificeerd in de AggregatorThread, namelijk - voeg alle nummers toe die door de threads zijn geproduceerd.

6. Resultaten

Hier is de uitvoer van één uitvoering van het bovenstaande programma - elke uitvoering kan verschillende resultaten opleveren omdat de threads in een andere volgorde kunnen worden voortgebracht:

5 werkthreads voortbrengen om elk 3 gedeeltelijke resultaten te berekenen Thread 0: enkele getallen kraken! Eindresultaat - 6 Onderwerp 0: Enkele getallen kraken! Eindresultaat - 2 Thread 0: Enkele getallen kraken! Eindresultaat - 2 Thread 0 wachtend tot anderen de barrière bereiken. Onderwerp 1: Enkele getallen kraken! Eindresultaat - 2 Onderwerp 1: Enkele cijfers kraken! Eindresultaat - 0 Discussie 1: Enkele cijfers kraken! Eindresultaat - 5 Thread 1 wachtend tot anderen de barrière bereiken. Onderwerp 3: Enkele getallen kraken! Eindresultaat - 6 Onderwerp 3: Enkele getallen kraken! Eindresultaat - 4 Discussie 3: Enkele cijfers kraken! Eindresultaat - 0 Thread 3 wacht tot anderen de barrière bereiken. Onderwerp 2: Enkele getallen kraken! Eindresultaat - 1 Onderwerp 2: Enkele getallen kraken! Eindresultaat - 1 Onderwerp 2: Enkele getallen kraken! Eindresultaat - 0 Thread 2 wacht tot anderen de barrière bereiken. Onderwerp 4: Enkele getallen kraken! Eindresultaat - 9 Onderwerp 4: Enkele cijfers kraken! Eindresultaat - 3 Thread 4: Enkele getallen kraken! Eindresultaat - 5 Thread 4 wachtend tot anderen de barrière bereiken. Onderwerp 4: De uiteindelijke som van 5 werknemers berekenen, met elk 3 resultaten. Toevoegen 6 2 2 Toevoegen 2 0 5 Toevoegen 6 4 0 Toevoegen 1 1 0 Toevoegen 9 3 5 Onderwerp 4: Eindresultaat = 46 

Zoals de bovenstaande uitvoer laat zien, Draad 4 is degene die de barrière doorbreekt en ook de laatste aggregatielogica uitvoert. Het is ook niet nodig dat threads daadwerkelijk worden uitgevoerd in de volgorde waarin ze zijn gestart, zoals het bovenstaande voorbeeld laat zien.

7. Conclusie

In dit artikel hebben we gezien wat een CyclicBarrier is, en in wat voor situaties het nuttig is.

We hebben ook een scenario geïmplementeerd waarin we een vast aantal threads nodig hadden om een ​​vast uitvoeringspunt te bereiken, voordat we verder gingen met andere programmalogica.

Zoals altijd is de code voor de tutorial te vinden op GitHub.