PubSub Messaging met Spring Data Redis

1. Overzicht

In dit tweede artikel uit de serie over Spring Data Redis kijken we naar de pub / sub-berichtenwachtrijen.

In Redis zijn uitgevers niet geprogrammeerd om hun berichten naar specifieke abonnees te sturen. In plaats daarvan worden gepubliceerde berichten gekarakteriseerd in kanalen, zonder te weten welke (eventuele) abonnees er kunnen zijn.

Evenzo tonen abonnees interesse in een of meer onderwerpen en ontvangen ze alleen berichten die van belang zijn, zonder te weten welke (eventuele) uitgevers er zijn.

Deze ontkoppeling van uitgevers en abonnees kan een grotere schaalbaarheid en een meer dynamische netwerktopologie mogelijk maken.

2. Redis-configuratie

Laten we beginnen met het toevoegen van de configuratie die nodig is voor de berichtenwachtrijen.

Eerst definiëren we een MessageListenerAdapter bean die een aangepaste implementatie van de MessageListener interface genaamd RedisMessageSubscriber. Deze bean fungeert als een abonnee in het pub-sub-berichtenmodel:

@Bean MessageListenerAdapter messageListener () {retourneer nieuwe MessageListenerAdapter (nieuwe RedisMessageSubscriber ()); }

RedisMessageListenerContainer is een klasse die wordt aangeboden door Spring Data Redis en die asynchroon gedrag biedt voor Redis-berichtlisteners. Dit wordt intern genoemd en, volgens de Spring Data Redis-documentatie, "behandelt het de lage details van luisteren, converteren en verzenden van berichten."

@Bean RedisMessageListenerContainer redisContainer () {RedisMessageListenerContainer container = nieuwe RedisMessageListenerContainer (); container.setConnectionFactory (jedisConnectionFactory ()); container.addMessageListener (messageListener (), topic ()); retourcontainer; }

We zullen ook een boon maken met behulp van een op maat gemaakt MessagePublisher interface en een RedisMessagePublisher implementatie. Op deze manier kunnen we een generieke API voor het publiceren van berichten hebben en de Redis-implementatie een redisTemplate en onderwerp als constructorargumenten:

@Bean MessagePublisher redisPublisher () {retourneer nieuwe RedisMessagePublisher (redisTemplate (), topic ()); }

Ten slotte stellen we een onderwerp op waarnaar de uitgever berichten stuurt en de abonnee ontvangt ze:

@Bean ChannelTopic topic () {retourneer nieuwe ChannelTopic ("messageQueue"); }

3. Berichten publiceren

3.1. Het definiëren van de MessagePublisher Koppel

Spring Data Redis biedt geen MessagePublisher interface die moet worden gebruikt voor berichtendistributie. We kunnen een aangepaste interface definiëren die zal gebruiken redisTemplate in uitvoering:

openbare interface MessagePublisher {void publish (String bericht); }

3.2. RedisMessagePublisher Implementatie

Onze volgende stap is om een ​​implementatie van het MessagePublisher interface, het toevoegen van berichtpublicatiedetails en het gebruik van de functies in redisTemplate.

De sjabloon bevat een zeer rijke set functies voor een breed scala aan bewerkingen - waarvan convertAndSend is in staat om een ​​bericht naar een wachtrij te sturen via een onderwerp:

openbare klasse RedisMessagePublisher implementeert MessagePublisher {@Autowired privé RedisTemplate redisTemplate; @Autowired privé ChannelTopic-onderwerp; openbare RedisMessagePublisher () {} openbare RedisMessagePublisher (RedisTemplate redisTemplate, ChannelTopic-onderwerp) {this.redisTemplate = redisTemplate; this.topic = onderwerp; } public void publish (String bericht) {redisTemplate.convertAndSend (topic.getTopic (), bericht); }} 

Zoals u kunt zien, is de implementatie van de uitgever eenvoudig. Het maakt gebruik van de convertAndSend () methode van de redisTemplate om het gegeven bericht op te maken en te publiceren naar het geconfigureerde onderwerp.

Een onderwerp implementeert de semantiek van publiceren en abonneren: wanneer een bericht wordt gepubliceerd, gaat het naar alle geregistreerde abonnees om naar dat onderwerp te luisteren.

4. Abonneren op berichten

RedisMessageSubscriber implementeert de door Spring Data Redis geleverde MessageListener koppel:

@Service openbare klasse RedisMessageSubscriber implementeert MessageListener {openbare statische lijst messageList = nieuwe ArrayList (); openbare void onMessage (berichtbericht, byte [] patroon) {messageList.add (message.toString ()); System.out.println ("Bericht ontvangen:" + message.toString ()); }}

Merk op dat er een tweede parameter is genaamd patroon, die we in dit voorbeeld niet hebben gebruikt. In de Spring Data Redis-documentatie staat dat deze parameter het 'patroon dat overeenkomt met het kanaal (indien gespecificeerd)' vertegenwoordigt, maar dat het kan worden nul.

5. Berichten verzenden en ontvangen

Nu zullen we het allemaal samenvoegen. Laten we een bericht maken en het vervolgens publiceren met de RedisMessagePublisher:

String message = "Bericht" + UUID.randomUUID (); redisMessagePublisher.publish (bericht);

Als we bellen publiceren (bericht), wordt de inhoud naar Redis gestuurd, waar deze wordt doorgestuurd naar het berichtenwachtrijonderwerp dat is gedefinieerd in onze uitgever. Vervolgens wordt het verspreid onder de abonnees van dat onderwerp.

Dat is je misschien al opgevallen RedisMessageSubscriber is een luisteraar, die zichzelf registreert in de wachtrij om berichten op te halen.

Bij aankomst van het bericht, van de abonnee onMessage () methode gedefinieerd geactiveerd.

In ons voorbeeld kunnen we controleren of we berichten hebben ontvangen die zijn gepubliceerd door de berichtenlijst in onze RedisMessageSubscriber:

RedisMessageSubscriber.messageList.get (0) .contains (bericht) 

6. Conclusie

In dit artikel hebben we een pub / sub-berichtenwachtrij-implementatie onderzocht met behulp van Spring Data Redis.

De implementatie van het bovenstaande voorbeeld is te vinden in een GitHub-project.