Gids voor de Java Phaser

1. Overzicht

In dit artikel zullen we kijken naar de Phaser construeren vanuit de java.util.concurrent pakket. Het is een constructie die erg lijkt op de CountDownLatch waarmee we de uitvoering van threads kunnen coördineren. In vergelijking met de CountDownLatch, het heeft wat extra functionaliteit.

De Phaser is een barrière waarop het dynamische aantal threads moet wachten voordat de uitvoering kan worden voortgezet. In de CountDownLatch dat nummer kan niet dynamisch worden geconfigureerd en moet worden opgegeven wanneer we de instantie maken.

2. Phaser API

De Phaser stelt ons in staat om logica te bouwen waarin threads moeten op de barrière wachten voordat ze naar de volgende uitvoeringsstap gaan.

We kunnen meerdere uitvoeringsfasen coördineren, waarbij we een Phaser bijvoorbeeld voor elke programmafase. Elke fase kan een ander aantal threads hebben dat wacht op het doorgaan naar een andere fase. We zullen later een voorbeeld bekijken van het gebruik van fasen.

Om deel te nemen aan de coördinatie, moet de draad registreren() zelf met de Phaser voorbeeld. Merk op dat dit alleen het aantal geregistreerde partijen verhoogt, en we kunnen niet controleren of de huidige thread is geregistreerd - we zouden de implementatie moeten subklasse om dit te ondersteunen.

De draad geeft aan dat het bij de slagboom is aangekomen door de comeAndAwaitAdvance (), wat een blokkeermethode is. Wanneer het aantal binnengekomen partijen gelijk is aan het aantal aangemelde partijen, wordt de uitvoering van het programma voortgezet, en het fasegetal zal toenemen. We kunnen het nummer van de huidige fase krijgen door de getPhase () methode.

Als de draad zijn werk heeft gedaan, moeten we de comeAndDeregister () methode om aan te geven dat de huidige thread niet langer in deze specifieke fase moet worden verantwoord.

3. Implementeren van logica met behulp van Phaser API

Laten we zeggen dat we meerdere fasen van acties willen coördineren. Drie threads zullen de eerste fase verwerken en twee threads zullen de tweede fase verwerken.

We maken een LongRunningAction klasse die de Runnable koppel:

klasse LongRunningAction implementeert Runnable {private String threadName; privé phaser ph; LongRunningAction (String threadName, Phaser ph) {this.threadName = threadName; this.ph = ph; ph.register (); } @Override public void run () {ph.arriveAndAwaitAdvance (); probeer {Thread.sleep (20); } catch (InterruptedException e) {e.printStackTrace (); } ph.arriveAndDeregister (); }}

Wanneer onze actieklasse wordt geïnstantieerd, registreren we ons bij de Phaser instantie met behulp van de registreren() methode. Dit verhoogt het aantal threads met dat specifieke Phaser.

De oproep naar de comeAndAwaitAdvance () zorgt ervoor dat de huidige thread op de barrière wacht. Zoals eerder vermeld, zal de uitvoering worden voortgezet wanneer het aantal aangekomen partijen gelijk wordt aan het aantal geregistreerde partijen.

Nadat de verwerking is voltooid, meldt de huidige thread zichzelf af door de comeAndDeregister () methode.

Laten we een testcase maken waarin we er drie beginnen LongRunningAction draden en blok op de barrière. Nadat de actie is voltooid, zullen we vervolgens twee extra maken LongRunningAction threads die de verwerking van de volgende fase zullen uitvoeren.

Bij het maken van Phaser instantie van de hoofdthread, we passeren 1 als argument. Dit komt overeen met het aanroepen van de registreren() methode van de huidige thread. We doen dit omdat, wanneer we drie worker-threads maken, de hoofdthread een coördinator is, en daarom de Phaser er moeten vier threads voor zijn geregistreerd:

ExecutorService executorService = Executors.newCachedThreadPool (); Phaser ph = nieuwe Phaser (1); assertEquals (0, ph.getPhase ());

De fase na de initialisatie is gelijk aan nul.

De Phaser class heeft een constructor waarin we er een bovenliggende instantie aan kunnen doorgeven. Het is handig in gevallen waarin we een groot aantal partijen hebben die te maken zouden krijgen met enorme kosten voor synchronisatieconflicten. In dergelijke situaties kunnen Fasers kan zo worden opgezet dat groepen subfasers een gemeenschappelijke ouder delen.

Laten we vervolgens drie beginnen LongRunningAction actiedraden, die op de slagboom zullen wachten totdat we de comeAndAwaitAdvance () methode uit de rode draad.

Houd er rekening mee dat we onze hebben geïnitialiseerd Phaser met 1 en belde registreren() nog drie keer. Nu hebben drie actiethreads aangekondigd dat ze bij de slagboom zijn aangekomen, dus nog een oproep comeAndAwaitAdvance () is nodig - die van de hoofdthread:

executorService.submit (nieuwe LongRunningAction ("thread-1", ph)); executorService.submit (nieuwe LongRunningAction ("thread-2", ph)); executorService.submit (nieuwe LongRunningAction ("thread-3", ph)); ph.arriveAndAwaitAdvance (); assertEquals (1, ph.getPhase ());

Na voltooiing van die fase, de getPhase () methode retourneert een omdat het programma klaar is met het verwerken van de eerste uitvoeringsstap.

Laten we zeggen dat twee threads de volgende verwerkingsfase moeten uitvoeren. We kunnen profiteren Phaser om dat te bereiken omdat het ons in staat stelt om dynamisch het aantal threads te configureren dat op de barrière moet wachten. We beginnen met twee nieuwe threads, maar deze worden pas uitgevoerd nadat de comeAndAwaitAdvance () van de hoofddraad (hetzelfde als in het vorige geval):

executorService.submit (nieuwe LongRunningAction ("thread-4", ph)); executorService.submit (nieuwe LongRunningAction ("thread-5", ph)); ph.arriveAndAwaitAdvance (); assertEquals (2, ph.getPhase ()); ph.arriveAndDeregister ();

Hierna is het getPhase () methode retourneert fasegetal gelijk aan twee. Als we ons programma willen afmaken, moeten we de comeAndDeregister () methode omdat de hoofdthread nog steeds is geregistreerd in de Phaser. Wanneer de uitschrijving ertoe leidt dat het aantal geregistreerde partijen nul wordt, wordt het Phaser is beëindigd. Alle oproepen naar synchronisatiemethoden worden niet meer geblokkeerd en komen onmiddellijk terug.

Het uitvoeren van het programma levert de volgende uitvoer op (de volledige broncode met de printline-instructies is te vinden in de coderepository):

Dit is fase 0 Dit is fase 0 Dit is fase 0 Thread thread-2 voor langlopende actie Thread thread-1 before long running action Thread thread-3 voor lang lopende actie Dit is fase 1 Dit is fase 1 Thread thread-4 binnenkort lopende actie Draad draad-5 voor lange lopende actie

We zien dat alle threads wachten op uitvoering totdat de slagboom opengaat. De volgende fase van de uitvoering wordt alleen uitgevoerd als de vorige met succes is voltooid.

4. Conclusie

In deze tutorial hebben we de Phaser construeren van java.util.concurrent en we implementeerden de coördinatielogica met meerdere fasen met behulp van Phaser klasse.

De implementatie van al deze voorbeelden en codefragmenten is te vinden in het GitHub-project - dit is een Maven-project, dus het moet gemakkelijk te importeren en uit te voeren zijn zoals het is.