RabbitMQ-berichtverzending met Spring AMQP

1. Inleiding

In deze tutorial verkennen we het concept van uitwaaieren en uitwisselingen van onderwerpen met Spring AMQP en RabbitMQ.

Op een hoog niveau, fanout uitwisselingen zullen hetzelfde bericht uitzenden naar alle gebonden wachtrijen, terwijl uitwisselingen van onderwerpen gebruik een routingsleutel voor het doorgeven van berichten aan een bepaalde gebonden wachtrij of wachtrijen.

Voorafgaand aan het lezen van Messaging With Spring AMQP wordt aanbevolen voor deze tutorial.

2. Opzetten van een Fanout Exchange

Laten we een fanout-uitwisseling opzetten met twee daaraan gebonden wachtrijen. Als we een bericht naar deze centrale sturen, ontvangen beide wachtrijen het bericht. Onze fanout-uitwisseling negeert elke routingsleutel die bij het bericht wordt geleverd.

Spring AMQP stelt ons in staat om alle declaraties van wachtrijen, uitwisselingen en bindingen samen te voegen in een Aangifteplichtigen voorwerp:

@Bean public Declarables fanoutBindings () {Wachtrij fanoutQueue1 = nieuwe wachtrij ("fanout.queue1", false); Wachtrij fanoutQueue2 = nieuwe wachtrij ("fanout.queue2", false); FanoutExchange fanoutExchange = nieuwe FanoutExchange ("fanout.exchange"); retourneer nieuwe Declarables (fanoutQueue1, fanoutQueue2, fanoutExchange, bind (fanoutQueue1) .to (fanoutExchange), BindingBuilder.bind (fanoutQueue2) .to (fanoutExchange)); }

3. Een onderwerpuitwisseling opzetten

Nu gaan we ook een onderwerpuitwisseling opzetten met twee wachtrijen, elk met een ander bindpatroon:

@Bean public Declarables topicBindings () {Wachtrij topicQueue1 = nieuwe wachtrij (topicQueue1Name, false); Wachtrij topicQueue2 = nieuwe wachtrij (topicQueue2Name, false); TopicExchange topicExchange = nieuwe TopicExchange (topicExchangeName); retourneer nieuwe Declarables (topicQueue1, topicQueue2, topicExchange, BindingBuilder .bind (topicQueue1) .to (topicExchange) .with ("*. belangrijk. *"), BindingBuilder .bind (topicQueue2) .to (topicExchange) .with ("#. fout")); }

Een onderwerpuitwisseling stelt ons in staat om er wachtrijen aan te binden met verschillende sleutelpatronen. Dit is erg flexibel en stelt ons in staat om meerdere wachtrijen met hetzelfde patroon of zelfs meerdere patronen aan dezelfde wachtrij te binden.

Als de routeringssleutel van het bericht overeenkomt met het patroon, wordt deze in de wachtrij geplaatst. Als een wachtrij meerdere bindingen heeft die overeenkomen met de routeringssleutel van het bericht, wordt slechts één kopie van het bericht in de wachtrij geplaatst.

Onze bindpatronen kunnen een asterisk ("*") gebruiken om overeen te komen met een woord op een specifieke positie of een hekje ("#") om overeen te komen met nul of meer woorden.

Zo onze topicQueue1 ontvangt berichten met routingtoetsen met een patroon van drie woorden, waarbij het middelste woord 'belangrijk' is - bijvoorbeeld: "User.important.error" of "Blog.important.notification".

En onze topicQueue2 ontvangt berichten met routingsleutels die eindigen op het woord error; bijpassende voorbeelden zijn "fout", "User.important.error" of "Blog.post.save.error".

4. Opzetten van een producent

We gebruiken de convertAndSend methode van de KonijnTemplate om onze voorbeeldberichten te verzenden:

 String message = "payload wordt uitgezonden"; return args -> {rabbitTemplate.convertAndSend (FANOUT_EXCHANGE_NAME, "", "fanout" + bericht); rabbitTemplate.convertAndSend (TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN, "topic belangrijk waarschuwing" + bericht); rabbitTemplate.convertAndSend (TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR, "onderwerp belangrijke fout" + bericht); };

De KonijnTemplate biedt veel overbelast convertAndSend () methoden voor verschillende uitwisselingstypes.

Wanneer we een bericht naar een fanout-uitwisseling sturen, wordt de routingsleutel genegeerd en wordt het bericht doorgegeven aan alle gebonden wachtrijen.

Als we een bericht naar de topic exchange sturen, moeten we een routingsleutel doorgeven. Op basis van deze routingsleutel wordt het bericht afgeleverd bij specifieke wachtrijen.

5. Consumenten configureren

Laten we tot slot vier consumenten instellen - één voor elke wachtrij - om de geproduceerde berichten op te halen:

 @RabbitListener (wachtrijen = {FANOUT_QUEUE_1_NAME}) openbare ongeldige ontvangst ontvangenMessageFromFanout1 (String bericht) {System.out.println ("Fanout 1 bericht ontvangen:" + bericht); } @RabbitListener (wachtrijen = {FANOUT_QUEUE_2_NAME}) openbare ongeldige ontvangst ontvangenMessageFromFanout2 (String bericht) {System.out.println ("Fanout 2 bericht ontvangen:" + bericht); } @RabbitListener (wachtrijen = {TOPIC_QUEUE_1_NAME}) openbare ongeldige ontvangst ontvangenMessageFromTopic1 (String bericht) {System.out.println ("Ontvangen onderwerp 1 (" + BINDING_PATTERN_IMPORTANT + ") bericht:" + bericht); } @RabbitListener (wachtrijen = {TOPIC_QUEUE_2_NAME}) openbare ongeldige ontvangst ontvangenMessageFromTopic2 (String bericht) {System.out.println ("Ontvangen onderwerp 2 (" + BINDING_PATTERN_ERROR + ") bericht:" + bericht); }

We configureren consumenten met behulp van de @RabbitListener annotatie. Het enige argument dat hier wordt doorgegeven, is de naam van de wachtrij. Consumenten zijn hier niet op de hoogte van uitwisselingen of routeringssleutels.

6. Het voorbeeld uitvoeren

Ons voorbeeldproject is een Spring Boot-applicatie, en dus zal het de applicatie initialiseren samen met een verbinding met RabbitMQ en alle wachtrijen, uitwisselingen en bindingen opzetten.

Standaard verwacht onze applicatie een RabbitMQ-instantie die draait op de localhost op poort 5672. We kunnen deze en andere standaardinstellingen wijzigen in application.yaml.

Ons project stelt het HTTP-eindpunt op de URI bloot - /uitzending - die POST's accepteert met een bericht in de hoofdtekst van het verzoek.

Wanneer we een verzoek naar deze URI sturen met de hoofdtekst "Test", zouden we iets soortgelijks in de uitvoer moeten zien:

Ontvangen fanout 1 bericht: fanout payload wordt uitgezonden Ontvangen topic 1 (* .important. *) Bericht: topic belangrijk waarschuwen payload wordt uitgezonden Ontvangen topic 2 (# .error) bericht: topic belangrijk fout payload wordt uitgezonden Ontvangen fanout 2 bericht: fanout payload wordt uitgezonden Ontvangen onderwerp 1 (* .belangrijk. *) bericht: onderwerp belangrijke fout payload wordt uitgezonden

De volgorde waarin we deze berichten zullen zien, is natuurlijk niet gegarandeerd.

7. Conclusie

In deze korte tutorial hebben we fanout- en onderwerpuitwisselingen met Spring AMQP en RabbitMQ behandeld.

De volledige broncode en alle codefragmenten voor deze tutorial zijn beschikbaar in de GitHub-repository.