Gids voor DelayQueue

1. Overzicht

In dit artikel zullen we kijken naar de DelayQueue construeren vanuit de java.util.concurrent pakket. Dit is een blokkerende wachtrij die kan worden gebruikt in producenten-consumentenprogramma's.

Het heeft een zeer nuttige eigenschap - wanneer de consument een element uit de wachtrij wil halen, kan hij het alleen opnemen als de vertraging voor dat specifieke element is verstreken.

2. Implementeren Vertraagd voor Elements in the DelayQueue

Elk element dat we in het DelayQueue moet het Vertraagd koppel. Laten we zeggen dat we een DelayObject klasse. Instanties van die klasse worden in de DelayQueue.

We passeren de Draad gegevens en delayInMilliseconds als en argumenten voor de constructor:

openbare klasse DelayObject implementeert Delayed {private String-gegevens; privé lange starttijd; public DelayObject (String data, long delayInMilliseconds) {this.data = data; this.startTime = System.currentTimeMillis () + delayInMilliseconds; }

We definiëren een starttijd - dit is een tijd waarin het element uit de wachtrij moet worden geconsumeerd. Vervolgens moeten we het getDelay () methode - het zou de resterende vertraging moeten retourneren die is gekoppeld aan dit object in de gegeven tijdseenheid.

Daarom moeten we de TimeUnit.convert () methode om de resterende vertraging in de juiste terug te geven Tijdeenheid:

@Override public long getDelay (TimeUnit unit) {long diff = startTime - System.currentTimeMillis (); return unit.convert (diff, TimeUnit.MILLISECONDS); }

Wanneer de consument een element uit de wachtrij probeert te halen, wordt het DelayQueue zal uitvoeren getDelay () om erachter te komen of dat element uit de wachtrij mag worden geretourneerd. Als het getDelay () methode retourneert nul of een negatief getal, wat betekent dat het uit de wachtrij kan worden opgehaald.

We moeten ook het vergelijk met() methode, omdat de elementen in de DelayQueue wordt gesorteerd op de vervaltijd. Het item dat als eerste vervalt, wordt bovenaan de wachtrij gehouden en het element met de hoogste vervaltijd wordt achter in de wachtrij gehouden:

@Override public int CompareTo (Delayed o) {return Ints.saturatedCast (this.startTime - ((DelayObject) o) .startTime); }

3. DelayQueue Consumer en Producer

Om onze DelayQueue we moeten producenten- en consumentenlogica implementeren. De klasse producer neemt de wachtrij, het aantal te produceren elementen en de vertraging van elk bericht in milliseconden als argumenten.

Toen de rennen() methode wordt aangeroepen, het plaatst elementen in de wachtrij en slaapt 500 milliseconden na elke put:

openbare klasse DelayQueueProducer implementeert Runnable {private BlockingQueue-wachtrij; privé geheel getal numberOfElementsToProduce; privé Geheel getal delayOfEachProducedMessageMilliseconds; // standaard constructor @Override public void run () {for (int i = 0; i <numberOfElementsToProduce; i ++) {DelayObject object = nieuw DelayObject (UUID.randomUUID (). toString (), delayOfEachProducedMessageMilliseconds); System.out.println ("Put object:" + object); probeer {queue.put (object); Thread.sleep (500); } catch (InterruptedException ie) {ie.printStackTrace (); }}}}

De implementatie door de consument lijkt erg op elkaar, maar het houdt ook het aantal berichten bij dat is verbruikt:

openbare klasse DelayQueueConsumer implementeert Runnable {private BlockingQueue-wachtrij; privé geheel getal numberOfElementsToTake; openbare AtomicInteger numberOfConsumedElements = nieuwe AtomicInteger (); // standard constructors @Override public void run () {for (int i = 0; i <numberOfElementsToTake; i ++) {probeer {DelayObject object = queue.take (); numberOfConsumedElements.incrementAndGet (); System.out.println ("Consument nemen:" + object); } catch (InterruptedException e) {e.printStackTrace (); }}}}

4. DelayQueue Gebruikstest

Om het gedrag van de DelayQueue, we zullen een producer-thread en een consument-thread maken.

De producent zal leggen() twee objecten in de wachtrij met een vertraging van 500 milliseconden. De test stelt dat de consument twee berichten heeft verbruikt:

@Test openbare leegte gegevenDelayQueue_whenProduceElement _thenShouldConsumeAfterGivenDelay () gooit InterruptedException {// gegeven ExecutorService executor = Executors.newFixedThreadPool (2); BlockingQueue-wachtrij = nieuwe DelayQueue (); int numberOfElementsToProduce = 2; int delayOfEachProducedMessageMilliseconds = 500; DelayQueueConsumer consument = nieuwe DelayQueueConsumer (wachtrij, numberOfElementsToProduce); DelayQueueProducer producer = nieuwe DelayQueueProducer (wachtrij, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds); // when executor.submit (producer); executor.submit (consument); // dan executor.awaitTermination (5, TimeUnit.SECONDS); executor.shutdown (); assertEquals (consumer.numberOfConsumedElements.get (), numberOfElementsToProduce); }

We kunnen zien dat het uitvoeren van dit programma de volgende uitvoer zal opleveren:

Put object: {data = '86046157-e8a0-49b2-9cbb-8326124bcab8', startTime = 1494069868007} Consumenten nemen: {data = '86046157-e8a0-49b2-9cbb-8326124bcab8', startTime = 1494069868007 data = 1494069868007 'd47927ef-18c7-449b-b491-5ff30e6795ed', startTime = 1494069868512} Consumenten nemen: {data = 'd47927ef-18c7-449b-b491-5ff30e6795ed', startTime = 1494069868512}

De producent plaatst het object en na een tijdje wordt het eerste object waarvoor de vertraging is verstreken verbruikt.

Dezelfde situatie deed zich voor bij het tweede element.

5. Consument kan in de gegeven tijd niet consumeren

Laten we zeggen dat we een producer hebben die een element produceert dat dat wel zal doen vervalt over 10 seconden:

int numberOfElementsToProduce = 1; int delayOfEachProducedMessageMilliseconds = 10_000; DelayQueueConsumer consument = nieuwe DelayQueueConsumer (wachtrij, numberOfElementsToProduce); DelayQueueProducer producer = nieuwe DelayQueueProducer (wachtrij, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

We beginnen onze test, maar deze wordt na 5 seconden beëindigd. Vanwege de kenmerken van de DelayQueue, de consument zal het bericht uit de wachtrij niet kunnen consumeren omdat het element nog niet is verlopen:

executor.submit (producent); executor.submit (consument); executor.awaitTermination (5, TimeUnit.SECONDS); executor.shutdown (); assertEquals (consumer.numberOfConsumedElements.get (), 0);

Merk op dat de consument numberOfConsumedElements heeft een waarde gelijk aan nul.

6. Een element produceren met onmiddellijke vervaldatum

Wanneer de implementaties van de Vertraagd bericht getDelay () methode retourneert een negatief getal, wat betekent dat het gegeven element al is verlopen. In deze situatie zal de producent dat element onmiddellijk consumeren.

We kunnen de situatie testen waarbij een element met negatieve vertraging wordt geproduceerd:

int numberOfElementsToProduce = 1; int delayOfEachProducedMessageMilliseconds = -10_000; DelayQueueConsumer consument = nieuwe DelayQueueConsumer (wachtrij, numberOfElementsToProduce); DelayQueueProducer producer = nieuwe DelayQueueProducer (wachtrij, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

Wanneer we de testcase starten, zal de consument het element onmiddellijk consumeren omdat het al is verlopen:

executor.submit (producent); executor.submit (consument); executor.awaitTermination (1, TimeUnit.SECONDS); executor.shutdown (); assertEquals (consumer.numberOfConsumedElements.get (), 1);

7. Conclusie

In dit artikel keken we naar de DelayQueue construeren vanuit de java.util.concurrent pakket.

We hebben een Vertraagd element dat is geproduceerd en verbruikt vanuit de wachtrij.

We hebben onze implementatie van het DelayQueue om elementen te consumeren die zijn verlopen.

De implementatie van al deze voorbeelden en codefragmenten is te vinden in het GitHub-project - wat een Maven-project is, dus het moet gemakkelijk te importeren en uit te voeren zijn zoals het is.