Gids voor java.util.concurrent.BlockingQueue

1. Overzicht

In dit artikel zullen we een van de meest bruikbare constructies bekijken java.util.concurrent om het probleem van de concurrent producent-consument op te lossen. We kijken naar een API van de BlockingQueue interface en hoe methoden van die interface het schrijven van gelijktijdige programma's gemakkelijker maken.

Verderop in het artikel laten we een voorbeeld zien van een eenvoudig programma met meerdere producer-threads en meerdere consumententhreads.

2. BlockingQueue Types

We kunnen twee soorten BlockingQueue:

  • onbegrensde wachtrij - kan bijna oneindig groeien
  • begrensde wachtrij - met maximale capaciteit gedefinieerd

2.1. Onbegrensde wachtrij

Het creëren van onbegrensde wachtrijen is eenvoudig:

BlockingQueue blockingQueue = nieuwe LinkedBlockingDeque ();

De capaciteit van blockingQueue wordt ingesteld op Geheel getal.MAX_VALUE. Alle bewerkingen die een element aan de onbegrensde wachtrij toevoegen, worden nooit geblokkeerd, dus het kan tot een zeer grote omvang uitgroeien.

Het belangrijkste bij het ontwerpen van een producer-consumentprogramma met onbegrensde BlockingQueue is dat consumenten berichten net zo snel moeten kunnen consumeren als producenten berichten aan de wachtrij toevoegen. Anders zou het geheugen vol kunnen raken en zouden we een OutOfMemory uitzondering.

2.2. Begrensde wachtrij

Het tweede type wachtrij is de begrensde wachtrij. We kunnen dergelijke wachtrijen maken door de capaciteit als argument door te geven aan een constructor:

BlockingQueue blockingQueue = nieuwe LinkedBlockingDeque (10);

Hier hebben we een blockingQueue met een capaciteit gelijk aan 10. Het betekent dat wanneer een producent een element probeert toe te voegen aan een reeds volledige wachtrij, afhankelijk van een methode die werd gebruikt om het toe te voegen (aanbod(), toevoegen() of leggen()), wordt het geblokkeerd totdat er ruimte beschikbaar komt voor het invoegen van een object. Anders mislukken de bewerkingen.

Het gebruik van een begrensde wachtrij is een goede manier om gelijktijdige programma's te ontwerpen, omdat wanneer we een element invoegen in een reeds volledige wachtrij, die bewerkingen moeten wachten totdat de consument zijn achterstand heeft ingehaald en wat ruimte in de wachtrij vrijmaakt. Het geeft ons een beperking zonder enige inspanning van onze kant.

3. BlockingQueue API

Er zijn twee soorten methoden in het BlockingQueue koppelmethoden die verantwoordelijk zijn voor het toevoegen van elementen aan een wachtrij en methoden die deze elementen ophalen. Elke methode uit deze twee groepen gedraagt ​​zich anders als de wachtrij vol / leeg is.

3.1. Elementen toevoegen

  • toevoegen () - geeft terug waar als het inbrengen is gelukt, gooit anders een IllegalStateException
  • leggen() - voegt het opgegeven element in een wachtrij in en wacht indien nodig op een vrij slot
  • aanbod() - geeft terug waar als het inbrengen is gelukt, anders false
  • aanbieding (E e, lange time-out, TimeUnit-eenheid) - probeert een element in een wachtrij in te voegen en wacht op een beschikbaar slot binnen een opgegeven time-out

3.2. Elementen ophalen

  • nemen() - wacht op een hoofdelement van een wachtrij en verwijdert dit. Als de wachtrij leeg is, wordt deze geblokkeerd en wacht hij tot een element beschikbaar komt
  • poll (lange time-out, TimeUnit-eenheid) - haalt de kop van de wachtrij op en verwijdert deze, en wacht indien nodig tot de opgegeven wachttijd voordat een element beschikbaar komt. Geeft terug nul na een time-out

Deze methoden zijn de belangrijkste bouwstenen van BlockingQueue interface bij het bouwen van producer-consumer-programma's.

4. Multithreaded producent-consument voorbeeld

Laten we een programma maken dat uit twee delen bestaat: een producent en een consument.

De producent produceert een willekeurig getal van 0 tot 100 en zet dat getal in een BlockingQueue. We hebben 4 producer-threads en gebruiken de leggen() methode om te blokkeren totdat er ruimte beschikbaar is in de wachtrij.

Het belangrijkste om te onthouden is dat we moeten voorkomen dat onze consumententhreads wachten tot een element voor onbepaalde tijd in een wachtrij verschijnt.

Een goede techniek om van producent naar consument te signaleren dat er geen berichten meer te verwerken zijn, is het sturen van een speciaal bericht, een gifpil. We moeten zoveel gifpillen opsturen als we consumenten hebben. Wanneer een consument dan dat speciale gifpilbericht uit een wachtrij haalt, zal het de uitvoering netjes afronden.

Laten we eens kijken naar een producer-klasse:

openbare klasse NumbersProducer implementeert Runnable {private BlockingQueue numbersQueue; privé laatste int poisonPill; private laatste int poisonPillPerProducer; openbare NumbersProducer (BlockingQueue numbersQueue, int poisonPill, int poisonPillPerProducer) {this.numbersQueue = numbersQueue; this.poisonPill = poisonPill; this.poisonPillPerProducer = poisonPillPerProducer; } public void run () {probeer {GenereerNumbers (); } catch (InterruptedException e) {Thread.currentThread (). interrupt (); }} private void generationNumbers () gooit InterruptedException {for (int i = 0; i <100; i ++) {numbersQueue.put (ThreadLocalRandom.current (). nextInt (100)); } for (int j = 0; j <poisonPillPerProducer; j ++) {numbersQueue.put (poisonPill); }}}

Onze producer constructor neemt als argument de BlockingQueue dat wordt gebruikt om de verwerking tussen de producent en de consument te coördineren. We zien die methode GenereerNummers () zet 100 elementen in een wachtrij. Er is ook een gifpilbericht nodig om te weten welk type bericht in een wachtrij moet worden geplaatst wanneer de uitvoering is voltooid. Die boodschap moet worden overgebracht poisonPillPerProducer keer in een wachtrij.

Elke consument neemt een element uit een BlockingQueue gebruik makend van nemen() methode zodat het blokkeert totdat er een element in een wachtrij staat. Na het nemen van een Geheel getal vanuit een wachtrij controleert het of het bericht een gifpil is, zo ja, dan is de uitvoering van een thread voltooid. Anders wordt het resultaat op standaarduitvoer afgedrukt samen met de naam van de huidige thread.

Dit geeft ons inzicht in de innerlijke werking van onze consumenten:

openbare klasse NumbersConsumer implementeert Runnable {private BlockingQueue-wachtrij; privé laatste int poisonPill; openbare NumbersConsumer (BlockingQueue wachtrij, int poisonPill) {this.queue = wachtrij; this.poisonPill = poisonPill; } public void run () {try {while (true) {Integer number = queue.take (); if (number.equals (poisonPill)) {return; } System.out.println (Thread.currentThread (). GetName () + "resultaat:" + nummer); }} catch (InterruptedException e) {Thread.currentThread (). interrupt (); }}}

Het belangrijkste om op te merken is het gebruik van een wachtrij. Hetzelfde als in de producer-constructor, een wachtrij wordt als argument doorgegeven. We kunnen het omdat BlockingQueue kan worden gedeeld tussen threads zonder enige expliciete synchronisatie.

Nu we onze producent en consument hebben, kunnen we ons programma starten. We moeten de capaciteit van de wachtrij definiëren, en we stellen deze in op 100 elementen.

We willen 4 producer-threads hebben en een aantal consumententhreads zal gelijk zijn aan het aantal beschikbare processors:

int GEBONDEN = 10; int N_PRODUCERS = 4; int N_CONSUMERS = Runtime.getRuntime (). availableProcessors (); int poisonPill = Geheel getal.MAX_VALUE; int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; int mod = N_CONSUMERS% N_PRODUCERS; BlockingQueue-wachtrij = nieuwe LinkedBlockingQueue (GEBONDEN); voor (int i = 1; i <N_PRODUCERS; i ++) {nieuwe Thread (nieuwe NumbersProducer (wachtrij, poisonPill, poisonPillPerProducer)). start (); } for (int j = 0; j <N_CONSUMERS; j ++) {nieuwe Thread (nieuwe NumbersConsumer (wachtrij, poisonPill)). start (); } nieuwe thread (nieuwe NumbersProducer (wachtrij, poisonPill, poisonPillPerProducer + mod)). start (); 

BlockingQueue is gemaakt met behulp van construct met een capaciteit. We creëren 4 producenten en N consumenten. We specificeren ons gifpilbericht als een Geheel getal.MAX_VALUE omdat een dergelijke waarde nooit door onze producent zal worden verzonden onder normale werkomstandigheden. Het belangrijkste dat hier opvalt, is dat BlockingQueue wordt gebruikt om het werk tussen hen te coördineren.

Wanneer we het programma uitvoeren, worden 4 producer-threads willekeurig geplaatst Gehele getallen in een BlockingQueue en consumenten zullen die elementen uit de wachtrij halen. Elke thread zal naar standaarduitvoer de naam van de thread samen met een resultaat afdrukken.

5. Conclusie

Dit artikel toont een praktisch gebruik van BlockingQueue en legt de methoden uit die worden gebruikt om er elementen aan toe te voegen en op te halen. We hebben ook laten zien hoe u een multithreaded producer-consumer-programma kunt bouwen met BlockingQueue om het werk tussen producenten en consumenten te coördineren.

De implementatie van al deze voorbeelden en codefragmenten is te vinden in het GitHub-project - dit is een op Maven gebaseerd project, dus het moet gemakkelijk te importeren en uit te voeren zijn zoals het is.


$config[zx-auto] not found$config[zx-overlay] not found