Amazon SQS-wachtrijen beheren in Java

1. Overzicht

In deze zelfstudie onderzoeken we hoe u dit kunt doen gebruik Amazon's SQS (Simple Queue Service) met behulp van de Java SDK.

2. Vereisten

De Maven-afhankelijkheden, AWS-accountinstellingen en clientverbinding die nodig zijn om de Amazon AWS SDK voor SQS te gebruiken, zijn hetzelfde als in dit artikel hier.

Ervan uitgaande dat we een exemplaar van AWSCredentials, zoals beschreven in het vorige artikel, kunnen we doorgaan en onze SQS-client maken:

AmazonSQS sqs = AmazonSQSClientBuilder.standard () .withCredentials (nieuwe AWSStaticCredentialsProvider (referenties)) .withRegion (Regions.US_EAST_1) .build (); 

3. Wachtrijen aanmaken

Zodra we onze SQS-client hebben opgezet, wachtrijen aanmaken is redelijk eenvoudig.

3.1. Een standaardwachtrij maken

Laten we eens kijken hoe we een standaardwachtrij kunnen maken. Om dit te doen, we moeten een instantie maken van CreateQueueRequest:

CreateQueueRequest createStandardQueueRequest = nieuwe CreateQueueRequest ("baeldung-queue"); String standardQueueUrl = sqs.createQueue (createStandardQueueRequest) .getQueueUrl (); 

3.2. Een FIFO-wachtrij maken

Het maken van een FIFO is vergelijkbaar met het maken van een standaardwachtrij. We gebruiken nog steeds een exemplaar van CreateQueueRequest, zoals we eerder deden. Alleen deze keer, we moeten wachtrijattributen doorgeven en de FifoQueue toe te schrijven aan waar:

Map queueAttributes = nieuwe HashMap (); queueAttributes.put ("FifoQueue", "true"); queueAttributes.put ("ContentBasedDeduplication", "true"); CreateQueueRequest createFifoQueueRequest = nieuwe CreateQueueRequest ("baeldung-queue.fifo"). WithAttributes (queueAttributes); String fifoQueueUrl = sqs.createQueue (createFifoQueueRequest) .getQueueUrl (); 

4. Berichten in wachtrijen plaatsen

Zodra we onze wachtrijen hebben ingesteld, kunnen we beginnen met het verzenden van berichten.

4.1. Een bericht in een standaardwachtrij plaatsen

Om berichten naar een standaardwachtrij te sturen, zullen we moet een instantie van SendMessageRequest.

Vervolgens voegen we een kaart met berichtkenmerken toe aan dit verzoek:

Kaart messageAttributes = nieuwe HashMap (); messageAttributes.put ("AttributeOne", nieuwe MessageAttributeValue () .withStringValue ("Dit is een attribuut") .withDataType ("String")); SendMessageRequest sendMessageStandardQueue = nieuw SendMessageRequest () .withQueueUrl (standardQueueUrl) .withMessageBody ("Een eenvoudig bericht.") .WithDelaySeconds (30) .withMessageAttributes (messageAttributes); sqs.sendMessage (sendMessageStandardQueue); 

De metDelaySeconds () specificeert na hoe lang het bericht in de wachtrij moet aankomen.

4.2. Een bericht in een FIFO-wachtrij plaatsen

Het enige verschil is in dit geval dat we zullen de groep waartoe het bericht behoort:

SendMessageRequest sendMessageFifoQueue = nieuw SendMessageRequest () .withQueueUrl (fifoQueueUrl) .withMessageBody ("Nog een eenvoudig bericht.") .WithMessageGroupId ("baeldung-group-1") .withMessageAttributes (messageAttributes);

Zoals u kunt zien in het codevoorbeeld hierboven, specificeren we de groep met behulp van withMessageGroupId ().

4.3. Meerdere berichten in een wachtrij plaatsen

We kunnen ook plaats meerdere berichten in een wachtrij met één verzoek. We maken een lijst met SendMessageBatchRequestEntry die we zullen verzenden met een instantie van SendMessageBatchRequest:

Lijst messageEntries = nieuwe ArrayList (); messageEntries.add (nieuwe SendMessageBatchRequestEntry () .withId ("id-1") .withMessageBody ("batch-1") .withMessageGroupId ("baeldung-group-1")); messageEntries.add (nieuwe SendMessageBatchRequestEntry () .withId ("id-2") .withMessageBody ("batch-2") .withMessageGroupId ("baeldung-group-1")); SendMessageBatchRequest sendMessageBatchRequest = nieuwe SendMessageBatchRequest (fifoQueueUrl, messageEntries); sqs.sendMessageBatch (sendMessageBatchRequest);

5. Berichten lezen uit wachtrijen

We kunnen berichten uit onze wachtrijen ontvangen door het aanroepen van de ontvangenMessage () methode op een exemplaar van ReceiveMessageRequest:

ReceiveMessageRequest ontvangenMessageRequest = nieuwe ReceiveMessageRequest (fifoQueueUrl) .withWaitTimeSeconds (10) .withMaxNumberOfMessages (10); Lijst sqsMessages = sqs.receiveMessage (ontvangenMessageRequest) .getMessages (); 

Gebruik makend van withMaxNumberOfMessages (), we specificeren hoeveel berichten we uit de wachtrij moeten ontvangen - hoewel het moet worden opgemerkt dat het maximum is 10.

De methode withWaitTimeSeconds () maakt het mogelijk lange peiling. Lange peiling is een manier om het aantal ontvangen berichtverzoeken te beperken dat we naar SQS sturen.

Simpel gezegd betekent dit dat we tot het opgegeven aantal seconden wachten om een ​​bericht op te halen. Als er gedurende die tijd geen berichten in de wachtrij staan, wordt het verzoek leeg geretourneerd. Als er gedurende die tijd een bericht in de wachtrij komt, wordt het geretourneerd.

Wij kunnen verkrijg de attributen en de body van een bepaald bericht:

sqsMessages.get (0) .getAttributes (); sqsMessages.get (0) .getBody ();

6. Een bericht uit een wachtrij verwijderen

Om een ​​bericht te verwijderen, gebruiken we een DeleteMessageRequest:

sqs.deleteMessage (nieuwe DeleteMessageRequest () .withQueueUrl (fifoQueueUrl) .withReceiptHandle (sqsMessages.get (0) .getReceiptHandle ())); 

7. Wachtrijen met dode letters

Een wachtrij met een dode letter moet van hetzelfde type zijn als de basiswachtrij - het moet FIFO zijn als de basiswachtrij FIFO is, en standaard als de basiswachtrij standaard is. Voor dit voorbeeld gebruiken we een standaardwachtrij.

Het eerste dat we moeten doen, is maak wat onze wachtrij met dode letters wordt:

String deadLetterQueueUrl = sqs.createQueue ("baeldung-dode-letter-wachtrij"). GetQueueUrl (); 

Vervolgens zullen we haal de ARN van onze nieuw aangemaakte wachtrij (Amazon Resource Name) op:

GetQueueAttributesResult deadLetterQueueAttributes = sqs.getQueueAttributes (nieuwe GetQueueAttributesRequest (deadLetterQueueUrl) .withAttributeNames ("QueueArn")); String deadLetterQueueARN = deadLetterQueueAttributes.getAttributes () .get ("QueueArn"); 

Eindelijk, we stel deze nieuw aangemaakte wachtrij in als de wachtrij met dode letters van onze oorspronkelijke standaardwachtrij:

SetQueueAttributesRequest queueAttributesRequest = nieuwe SetQueueAttributesRequest () .withQueueUrl (standardQueueUrl) .addAttributesEntry ("RedrivePolicy", "{\" maxReceiveCount \ ": \" 2 \ "," + "deadLetter "}"); sqs.setQueueAttributes (queueAttributesRequest); 

Het JSON-pakket dat we in het addAttributesEntry () methode bij het bouwen van onze SetQueueAttributesRequest instantie bevat de informatie die we nodig hebben: de maxReceiveCount is 2, wat betekent dat als een bericht zo vaak wordt ontvangen, wordt aangenomen dat het niet correct is verwerkt en naar onze wachtrij met dode letters wordt gestuurd.

De deadLetterTargetArn attribuut verwijst onze standaard wachtrij naar onze nieuw aangemaakte wachtrij met dode letters.

8. Toezicht

Wij kunnen controleer hoeveel berichten er momenteel in een bepaalde wachtrij staan ​​en hoeveel er in de vlucht zijn met de SDK. Eerst moeten we een GetQueueAttributesRequest.

Van daaruit controleren we de staat van de wachtrij:

GetQueueAttributesRequest getQueueAttributesRequest = nieuwe GetQueueAttributesRequest (standardQueueUrl) .withAttributeNames ("All"); GetQueueAttributesResult getQueueAttributesResult = sqs.getQueueAttributes (getQueueAttributesRequest); System.out.println (String.format ("Het aantal berichten in de wachtrij:% s", getQueueAttributesResult.getAttributes () .get ("approximateNumberOfMessages"))); System.out.println (String.format ("Het aantal berichten tijdens de vlucht:% s", getQueueAttributesResult.getAttributes () .get ("approximateNumberOfMessagesNotVisible")));

Meer diepgaande monitoring kan worden bereikt met Amazon Cloud Watch.

9. Conclusie

In dit artikel hebben we gezien hoe beheer SQS-wachtrijen met behulp van de AWS Java SDK.

Zoals gewoonlijk zijn alle codevoorbeelden die in het artikel worden gebruikt, te vinden op GitHub.