Een gids voor Java SynchronousQueue

1. Overzicht

In dit artikel zullen we kijken naar de Synchrone wachtrij van de java.util.concurrent pakket.

Simpel gezegd stelt deze implementatie ons in staat om op een thread-veilige manier informatie tussen threads uit te wisselen.

2. API-overzicht

De Synchrone wachtrij heeft alleen twee ondersteunde bewerkingen: nemen() en leggen(), en beiden blokkeren.

Als we bijvoorbeeld een element aan de wachtrij willen toevoegen, moeten we de leggen() methode. Die methode wordt geblokkeerd totdat een andere thread het nemen() methode, wat aangeeft dat het klaar is om een ​​element te nemen.

Hoewel de Synchrone wachtrij heeft een interface van een wachtrij, we zouden het moeten beschouwen als een uitwisselingspunt voor een enkel element tussen twee threads, waarin de ene thread een element overdraagt ​​en een andere thread dat element pakt.

3. Handoffs implementeren met een gedeelde variabele

Om te zien waarom het Synchrone wachtrij kan zo handig zijn, we zullen een logica implementeren met behulp van een gedeelde variabele tussen twee threads en vervolgens zullen we die logica herschrijven met Synchrone wachtrij waardoor onze code een stuk eenvoudiger en leesbaarder wordt.

Laten we zeggen dat we twee threads hebben - een producent en een consument - en wanneer de producent een waarde van een gedeelde variabele instelt, willen we dat feit aan de consumentendraad signaleren. Vervolgens haalt de consumententhread een waarde op van een gedeelde variabele.

We zullen de CountDownLatch om die twee threads te coördineren, om te voorkomen dat de consument toegang krijgt tot een waarde van een gedeelde variabele die nog niet is ingesteld.

We zullen een sharedState variabele en een CountDownLatch die zal worden gebruikt voor het coördineren van de verwerking:

ExecutorService executor = Executors.newFixedThreadPool (2); AtomicInteger sharedState = nieuwe AtomicInteger (); CountDownLatch countDownLatch = nieuwe CountDownLatch (1);

De producer slaat een willekeurig geheel getal op in het sharedState variabele en voer het countDown () methode op de countDownLatch, signalering aan de consument dat het een waarde kan ophalen uit de sharedState:

Runnable producer = () -> {Geheel getal produceerdeElement = ThreadLocalRandom .current () .nextInt (); sharedState.set (geproduceerdElement); countDownLatch.countDown (); };

De consument wacht op de countDownLatch de ... gebruiken wachten() methode. Wanneer de producent aangeeft dat de variabele is ingesteld, haalt de consument deze op van het sharedState:

Runnable consumer = () -> {probeer {countDownLatch.await (); Integer consumedElement = sharedState.get (); } catch (InterruptedException ex) {ex.printStackTrace (); }};

Laten we tot slot ons programma starten:

executor.execute (producent); uitvoerder.execute (consument); executor.awaitTermination (500, TimeUnit.MILLISECONDS); executor.shutdown (); assertEquals (countDownLatch.getCount (), 0);

Het levert de volgende output op:

Het opslaan van een element: -1507375353 naar het uitwisselingspunt verbruikt een element: -1507375353 vanaf het uitwisselingspunt

We kunnen zien dat dit veel code is om zo'n eenvoudige functionaliteit te implementeren als het uitwisselen van een element tussen twee threads. In de volgende sectie zullen we proberen het te verbeteren.

4. Handoffs implementeren met behulp van de Synchrone wachtrij

Laten we nu dezelfde functionaliteit implementeren als in de vorige sectie, maar met een Synchrone wachtrij. Het heeft een dubbel effect omdat we het kunnen gebruiken voor het uitwisselen van status tussen threads en voor het coördineren van die actie, zodat we niets anders hoeven te gebruiken Synchrone wachtrij.

Ten eerste zullen we een wachtrij definiëren:

ExecutorService executor = Executors.newFixedThreadPool (2); SynchronousQueue-wachtrij = nieuwe SynchronousQueue ();

De producer belt een leggen() methode die zal blokkeren totdat een andere thread een element uit de wachtrij haalt:

Runnable producer = () -> {Integer produceElement = ThreadLocalRandom .current () .nextInt (); probeer {queue.put (produceElement); } catch (InterruptedException ex) {ex.printStackTrace (); }};

De consument haalt dat element gewoon op met de nemen() methode:

Runnable consumer = () -> {probeer {Integer consumedElement = queue.take (); } catch (InterruptedException ex) {ex.printStackTrace (); }};

Vervolgens starten we ons programma:

uitvoerder.execute (producent); executor.execute (consument); executor.awaitTermination (500, TimeUnit.MILLISECONDS); executor.shutdown (); assertEquals (queue.size (), 0);

Het levert de volgende output op:

Het opslaan van een element: 339626897 naar het uitwisselingspunt verbruikt een element: 339626897 van het uitwisselingspunt

We kunnen zien dat a Synchrone wachtrij wordt gebruikt als een uitwisselingspunt tussen de threads, wat een stuk beter en begrijpelijker is dan het vorige voorbeeld dat de gedeelde status samen met een CountDownLatch.

5. Conclusie

In deze korte tutorial hebben we gekeken naar de Synchrone wachtrij construeren. We hebben een programma gemaakt dat gegevens uitwisselt tussen twee threads met behulp van gedeelde status, en vervolgens dat programma herschreven om gebruik te maken van de Synchrone wachtrij construeren. Dit dient als een uitwisselingspunt dat de producent en de consumentendraad coördineert.

De implementatie van al deze voorbeelden en codefragmenten is te vinden in het GitHub-project - dit is een Maven-project, dus het moet gemakkelijk te importeren en uit te voeren zijn zoals het is.