Apache RocketMQ met Spring Boot

1. Inleiding

In deze zelfstudie maken we een berichtproducent en -consument met behulp van Spring Boot en Apache RocketMQ, een open-source platform voor gedistribueerde berichten en streaminggegevens.

2. Afhankelijkheden

Voor Maven-projecten moeten we de afhankelijkheid van RocketMQ Spring Boot Starter toevoegen:

 org.apache.rocketmq rocketmq-spring-boot-starter 2.0.4 

3. Berichten produceren

Voor ons voorbeeld maken we een basisberichtproducent die gebeurtenissen verzendt wanneer de gebruiker een item toevoegt aan of verwijdert uit het winkelwagentje.

Laten we eerst onze serverlocatie en groepsnaam instellen in onze application.properties:

rocketmq.name-server = 127.0.0.1: 9876 rocketmq.producer.group = winkelwagen-producent-groep

Merk op dat als we meer dan één naamserver hadden, we ze als host: poort; host: poort.

Om het simpel te houden, maken we een CommandLineRunner applicatie en genereer enkele gebeurtenissen tijdens het opstarten van de applicatie:

@SpringBootApplication openbare klasse CartEventProducer implementeert CommandLineRunner {@Autowired privé RocketMQTemplate rocketMQTemplate; openbare statische leegte hoofd (String [] args) {SpringApplication.run (CartEventProducer.class, args); } public void run (String ... args) gooit uitzondering {rocketMQTemplate.convertAndSend ("cart-item-add-topic", nieuwe CartItemEvent ("bike", 1)); rocketMQTemplate.convertAndSend ("winkelwagen-item-add-topic", nieuwe CartItemEvent ("computer", 2)); rocketMQTemplate.convertAndSend ("winkelwagen-item-verwijderd-onderwerp", nieuwe CartItemEvent ("fiets", 1)); }}

De CartItemEvent bestaat uit slechts twee eigenschappen - de id van het artikel en een hoeveelheid:

class CartItemEvent {private String itemId; private int hoeveelheid; // constructeur, getters en setters}

In het bovenstaande voorbeeld gebruiken we de convertAndSend () methode, een generieke methode gedefinieerd door de AbstractMessageSendingTemplate abstracte klasse, om onze winkelwagenevenementen te sturen. Er zijn twee parameters voor nodig: een bestemming, in ons geval een onderwerpnaam, en een berichtpayload.

4. Bericht consument

Het consumeren van RocketMQ-berichten is net zo eenvoudig als het maken van een Spring-component met annotaties @RotterdamZoo en het implementeren van de RocketMQListener koppel:

@SpringBootApplication openbare klasse CartEventConsumer {openbare statische leegte hoofd (String [] args) {SpringApplication.run (CartEventConsumer.class, args); } @Service @RocketMQMessageListener (topic = "cart-item-add-topic", consumerGroup = "cart-consumer_cart-item-add-topic") public class CardItemAddConsumer implementeert RocketMQListener {public void onMessage (CartItemEvent addItemEvent) {log.info ( "Item toevoegen: {}", addItemEvent); // aanvullende logica}} @Service @RocketMQMessageListener (topic = "cart-item-remove-topic", consumerGroup = "cart-consumer_cart-item-remove-topic") public class CardItemRemoveConsumer implementeert RocketMQListener {public void onMessage (CartItemEvent removeItemEvent) {log.info ("Item verwijderen: {}", removeItemEvent); // aanvullende logica}}}

We moeten een apart onderdeel maken voor elk berichtonderwerp waarnaar we luisteren. In elk van deze luisteraars definiëren we de naam van het onderwerp en de naam van de consumentengroep via de @RocketMQMessageListener annotatie.

5. Synchrone en asynchrone verzending

In de vorige voorbeelden hebben we de convertAndSend methode om onze berichten te verzenden. We hebben echter nog een aantal andere opties.

We kunnen bijvoorbeeld bellen syncSend die verschilt van convertAndSend omdat het terugkeert SendResult voorwerp.

Het kan bijvoorbeeld worden gebruikt om te controleren of ons bericht met succes is verzonden of om het ID te krijgen:

public void run (String ... args) gooit uitzondering {SendResult addBikeResult = rocketMQTemplate.syncSend ("cart-item-add-topic", nieuwe CartItemEvent ("bike", 1)); SendResult addComputerResult = rocketMQTemplate.syncSend ("cart-item-add-topic", nieuwe CartItemEvent ("computer", 2)); SendResult removeBikeResult = rocketMQTemplate.syncSend ("winkelwagen-item-verwijderd-onderwerp", nieuwe CartItemEvent ("fiets", 1)); }

Leuk vinden convertAndSend, deze methode wordt alleen geretourneerd als de verzendprocedure is voltooid.

We moeten synchrone verzending gebruiken in gevallen die een hoge betrouwbaarheid vereisen, zoals belangrijke meldingsberichten of sms-meldingen.

Aan de andere kant willen we in plaats daarvan het bericht asynchroon verzenden en op de hoogte worden gesteld wanneer het verzenden is voltooid.

We kunnen dit met asyncSend, waarvoor een Terugbellen als parameter en keert onmiddellijk terug:

rocketMQTemplate.asyncSend ("cart-item-add-topic", new CartItemEvent ("bike", 1), new SendCallback () {@Override public void onSuccess (SendResult sendResult) {log.error ("CartItemEvent succesvol verzonden") ;} @Override public void onException (Throwable throwable) {log.error ("Uitzondering tijdens het verzenden van winkelwagen-item", throwable);}});

We gebruiken asynchrone verzending in gevallen die een hoge doorvoer vereisen.

Ten slotte kunnen we voor scenario's waarin we zeer hoge doorvoervereisten hebben, gebruiken sendOneWay in plaats van asyncSend. sendOneWay verschilt van asyncSend in dat het niet garandeert dat het bericht wordt verzonden.

Verzending in één richting kan ook worden gebruikt voor gewone betrouwbaarheidsgevallen, zoals het verzamelen van logboeken.

6. Berichten in transactie verzenden

RocketMQ biedt ons de mogelijkheid om berichten binnen een transactie te verzenden. We kunnen het doen door de sendInTransaction () methode:

MessageBuilder.withPayload (nieuwe CartItemEvent ("bike", 1)). Build (); rocketMQTemplate.sendMessageInTransaction ("testtransactie", "onderwerpnaam", msg, null);

We moeten ook een RocketMQLocalTransactionListener koppel:

@RocketMQTransactionListener (txProducerGroup = "test-transaction") klasse TransactionListenerImpl implementeert RocketMQLocalTransactionListener {@Override openbaar RocketMQLocalTransactionState executeLocalTransaction (bericht msg, Object arg) {// ... lokaal transactieproces, retourneer ROLLocalUNBACK of ONBEKEND, COMMIT. } @Override openbare RocketMQLocalTransactionState checkLocalTransaction (Bericht msg) {// ... controleer transactiestatus en retourneer ROLLBACK, COMMIT of ONBEKEND return RocketMQLocalTransactionState.COMMIT; }}

In sendMessageInTransaction (), is de eerste parameter de transactienaam. Het moet hetzelfde zijn als het @RocketMQTransactionListener‘Ledenveld txProducerGroup.

7. Configuratie berichtproducent

We kunnen ook aspecten van de berichtproducent zelf configureren:

  • rocketmq.producer.send-bericht-time-out: De time-out voor het verzenden van het bericht in milliseconden - de standaardwaarde is 3000
  • rocketmq.producer.compress-message-body-drempel: Drempel waarboven RocketMQ berichten comprimeert - de standaardwaarde is 1024.
  • rocketmq.producer.max-berichtgrootte: De maximale berichtgrootte in bytes - de standaardwaarde is 4096.
  • rocketmq.producer.retry-times-when-send-async-failed: Het maximale aantal nieuwe pogingen om intern in asynchrone modus uit te voeren voordat de verzending mislukt - de standaardwaarde is 2.
  • rocketmq.producer.retry-next-server: Geeft aan of een andere broker opnieuw moet worden geprobeerd bij het intern verzenden van een fout - de standaardwaarde is false.
  • rocketmq.producer.retry-times-when-send-failed: Het maximale aantal nieuwe pogingen om intern in asynchrone modus uit te voeren voordat de verzending mislukt - de standaardwaarde is 2.

8. Conclusie

In dit artikel hebben we geleerd hoe we berichten kunnen verzenden en gebruiken met Apache RocketMQ en Spring Boot. Zoals altijd is alle broncode beschikbaar op GitHub.