Inleiding tot Spring Cloud Stream

1. Overzicht

Spring Cloud Stream is een framework dat bovenop Spring Boot en Spring Integration is gebouwd helpt bij het creëren van gebeurtenisgestuurde of berichtgestuurde microservices.

In dit artikel introduceren we concepten en constructies van Spring Cloud Stream met enkele eenvoudige voorbeelden.

2. Maven afhankelijkheden

Om te beginnen, moeten we de Spring Cloud Starter Stream met de makelaar RabbitMQ Maven-afhankelijkheid als messaging-middleware toevoegen aan onze pom.xml:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 1.3.0.RELEASE 

En we zullen de module-afhankelijkheid van Maven Central toevoegen om ook JUnit-ondersteuning in te schakelen:

 org.springframework.cloud spring-cloud-stream-test-support 1.3.0.RELEASE-test 

3. Hoofdconcepten

Microservices-architectuur volgt het principe van "slimme eindpunten en domme pijpen". Communicatie tussen eindpunten wordt aangestuurd door messaging-middleware-partijen zoals RabbitMQ of Apache Kafka. Services communiceren door domeingebeurtenissen te publiceren via deze eindpunten of kanalen.

Laten we de concepten bekijken waaruit het Spring Cloud Stream-framework bestaat, samen met de essentiële paradigma's waarvan we op de hoogte moeten zijn om berichtgestuurde services te bouwen.

3.1. Constructen

Laten we eens kijken naar een eenvoudige service in Spring Cloud Stream die naar luistert invoer bindend en stuurt een reactie naar de output verbindend:

@SpringBootApplication @EnableBinding (Processor.class) openbare klasse MyLoggerServiceApplication {openbare statische leegte hoofd (String [] args) {SpringApplication.run (MyLoggerServiceApplication.class, args); } @StreamListener (Processor.INPUT) @SendTo (Processor.OUTPUT) openbare LogMessage enrichLogMessage (LogMessage-log) {retourneer nieuwe LogMessage (String.format ("[1]:% s", log.getMessage ())); }}

De annotatie @EnableBinding configureert de applicatie om de kanalen te binden INVOER en UITGANG gedefinieerd binnen de interface Verwerker. Beide kanalen zijn bindingen die kunnen worden geconfigureerd om een ​​concrete messaging-middleware of binder te gebruiken.

Laten we eens kijken naar de definitie van al deze concepten:

  • Bindingen - een verzameling interfaces die de input- en outputkanalen declaratief identificeren
  • Binder - messaging-middleware-implementatie zoals Kafka of RabbitMQ
  • Kanaal - vertegenwoordigt de communicatiepijp tussen messaging-middleware en de applicatie
  • StreamListeners - berichtafhandelingsmethoden in bonen die automatisch worden aangeroepen op een bericht van het kanaal na de MessageConverter doet de serialisatie / deserialisatie tussen middleware-specifieke gebeurtenissen en domeinobjecttypen / POJO's
  • Messalie Schema's - gebruikt voor serialisatie en deserialisatie van berichten, deze schema's kunnen statisch worden gelezen vanaf een locatie of dynamisch worden geladen, waardoor de evolutie van domeinobjecttypen wordt ondersteund

3.2. Communicatiepatronen

Berichten die zijn toegewezen aan bestemmingen worden bezorgd door de Publiceer-abonneer berichtenpatroon. Uitgevers categoriseren berichten in onderwerpen, elk aangeduid met een naam. Abonnees tonen interesse in een of meer onderwerpen. De middleware filtert de berichten en levert die van de interessante onderwerpen aan de abonnees.

Nu kunnen de abonnees worden gegroepeerd. EEN consumentengroep is een reeks abonnees of consumenten, aangeduid met een groeps-id, waarin berichten van een onderwerp of de partitie van een onderwerp op een load-balanced manier worden afgeleverd.

4. Programmeermodel

Dit gedeelte beschrijft de basisprincipes van het bouwen van Spring Cloud Stream-applicaties.

4.1. Functioneel testen

De testondersteuning is een binder-implementatie die interactie met de kanalen en het inspecteren van berichten mogelijk maakt.

Laten we een bericht sturen naar het bovenstaande enrichLogMessage service en controleer of het antwoord de tekst bevat “[1]: “ aan het begin van het bericht:

@RunWith (SpringJUnit4ClassRunner.class) @ContextConfiguration (classes = MyLoggerServiceApplication.class) @DirtiesContext openbare klasse MyLoggerApplicationTests {@Autowired privé Processorpijp; @Autowired privé MessageCollector messageCollector; @Test public void whenSendMessage_thenResponseShouldUpdateText () {pipe.input () .send (MessageBuilder.withPayload (new LogMessage ("This is my message")) .build ()); Object payload = messageCollector.forChannel (pipe.output ()) .poll () .getPayload (); assertEquals ("[1]: Dit is mijn bericht", payload.toString ()); }}

4.2. Aangepaste kanalen

In het bovenstaande voorbeeld hebben we de Verwerker interface geleverd door Spring Cloud, die slechts één ingangs- en één uitgangskanaal heeft.

Als we iets anders nodig hebben, zoals één ingangs- en twee uitgangskanalen, kunnen we een aangepaste processor maken:

openbare interface MyProcessor {String INPUT = "myInput"; @Input SubscribableChannel myInput (); @Output ("myOutput") MessageChannel anOutput (); @Output MessageChannel anotherOutput (); }

Spring zorgt voor de juiste implementatie van deze interface voor ons. De kanaalnamen kunnen worden ingesteld met behulp van annotaties zoals in @Output ("myOutput").

Anders gebruikt Spring de namen van de methoden als de kanaalnamen. Daarom hebben we drie kanalen genaamd mijnInput, myOutput, en anotherOutput.

Laten we ons nu eens voorstellen dat we de berichten naar de ene uitvoer willen sturen als de waarde kleiner is dan 10 en naar een andere uitvoer als de waarde groter is dan of gelijk is aan 10:

@Autowired privé MyProcessor-processor; @StreamListener (MyProcessor.INPUT) public void routeValues ​​(geheel getal waarde) {if (val <10) {processor.anOutput (). Send (bericht (val)); } else {processor.anotherOutput (). send (bericht (val)); }} privé statisch laatste bericht bericht (T val) {return MessageBuilder.withPayload (val) .build (); }

4.3. Voorwaardelijke verzending

De ... gebruiken @StreamListener annotatie kunnen we ook filter de berichten die we bij de consument verwachten met behulp van een voorwaarde die we definiëren met SpEL-expressies.

We zouden bijvoorbeeld voorwaardelijke verzending kunnen gebruiken als een andere benadering om berichten naar verschillende uitgangen te routeren:

@Autowired privé MyProcessor-processor; @StreamListener (target = MyProcessor.INPUT, voorwaarde = "payload = 10") openbare ongeldige routeValuesToAnotherOutput (geheel getal) {processor.anotherOutput (). Send (bericht (waarde)); }

De enige beperking van deze benadering is dat deze methoden geen waarde mogen retourneren.

5. Installatie

Laten we de applicatie opzetten die het bericht van de RabbitMQ-makelaar zal verwerken.

5.1. Binder-configuratie

We kunnen onze applicatie configureren om de standaard binder-implementatie te gebruiken via META-INF / veerbinders:

konijn: \ org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

Of we kunnen de binder-bibliotheek voor RabbitMQ aan het klassenpad toevoegen door deze afhankelijkheid:

 org.springframework.cloud spring-cloud-stream-binder-rabbit 1.3.0.RELEASE 

Als er geen bindmiddelimplementatie wordt geleverd, gebruikt Spring directe berichtcommunicatie tussen de kanalen.

5.2. RabbitMQ-configuratie

Om het voorbeeld in sectie 3.1 te configureren om de RabbitMQ-binder te gebruiken, moeten we de application.yml gevestigd in src / main / resources:

spring: cloud: stream: bindings: input: destination: queue.log.messages binder: local_rabbit output: destination: queue.pretty.log.messages binder: local_rabbit binders: local_rabbit: type: rabbit omgeving: spring: rabbitmq: host: port : 5672 gebruikersnaam: wachtwoord: virtual-host: /

De invoer binding gebruikt de exchange genaamd queue.log.messages, en de output binding gebruikt de uitwisseling queue.pretty.log.messages. Beide bindingen gebruiken de binder met de naam local_rabbit.

Merk op dat we de RabbitMQ-uitwisselingen of wachtrijen niet van tevoren hoeven aan te maken. Bij het uitvoeren van de applicatie, beide uitwisselingen worden automatisch gemaakt.

Om de applicatie te testen, kunnen we de RabbitMQ beheersite gebruiken om een ​​bericht te publiceren. In de Publiceer bericht paneel van de uitwisseling queue.log.messages, moeten we het verzoek in JSON-indeling invoeren.

5.3. Berichtconversie aanpassen

Spring Cloud Stream stelt ons in staat om berichtconversie toe te passen voor specifieke inhoudstypen. In het bovenstaande voorbeeld willen we in plaats van het JSON-formaat te gebruiken platte tekst.

Om dit te doen, zullen we dat doen pas een aangepaste transformatie toe op LogMessage gebruik maken van een MessageConverter:

@SpringBootApplication @EnableBinding (Processor.class) openbare klasse MyLoggerServiceApplication {// ... @Bean openbare MessageConverter biedtTextPlainMessageConverter () {retourneer nieuwe TextPlainMessageConverter (); } // ...}
openbare klasse TextPlainMessageConverter breidt AbstractMessageConverter uit {public TextPlainMessageConverter () {super (nieuw MimeType ("text", "plain")); } @Override beschermde booleaanse ondersteuningen (Class clazz) {return (LogMessage.class == clazz); } @Override beschermd Object convertFromInternal (Berichtbericht, Class targetClass, Object conversionHint) {Object payload = message.getPayload (); String text = payload instantie van String? (String) payload: nieuwe String ((byte []) payload); retourneer nieuwe LogMessage (tekst); }}

Nadat u deze wijzigingen hebt toegepast, gaat u terug naar het Publiceer bericht paneel, als we de koptekst 'contentTypes" naar "tekst / gewoon"En de payload naar"Hallo Wereld“, Het zou moeten werken zoals voorheen.

5.4. Consumentengroepen

Bij het uitvoeren van meerdere instanties van onze applicatie, elke keer dat er een nieuw bericht is in een invoerkanaal, worden alle abonnees op de hoogte gebracht.

Meestal hoeven we het bericht maar één keer te verwerken. Spring Cloud Stream implementeert dit gedrag via consumentengroepen.

Om dit gedrag mogelijk te maken, kan elke consumentenbinding de spring.cloud.stream.bindings..group eigenschap om een ​​groepsnaam op te geven:

spring: cloud: stream: bindings: input: destination: queue.log.messages binder: local_rabbit group: logMessageConsumers ...

6. Berichtgestuurde microservices

In dit gedeelte introduceren we alle vereiste functies voor het uitvoeren van onze Spring Cloud Stream-applicaties in een microservicescontext.

6.1. Opschalen

Als er meerdere applicaties worden uitgevoerd, is het belangrijk om ervoor te zorgen dat de gegevens correct over consumenten worden verdeeld. Om dit te doen, biedt Spring Cloud Stream twee eigenschappen:

  • spring.cloud.stream.instanceCount - aantal actieve applicaties
  • spring.cloud.stream.instanceIndex - index van de huidige applicatie

Als we bijvoorbeeld twee exemplaren van het bovenstaande hebben geïmplementeerd MyLoggerServiceApplication applicatie, de eigenschap spring.cloud.stream.instanceCount moet 2 zijn voor beide toepassingen en de eigenschap spring.cloud.stream.instanceIndex moeten respectievelijk 0 en 1 zijn.

Deze eigenschappen worden automatisch ingesteld als we de Spring Cloud Stream-applicaties implementeren met Spring Data Flow, zoals beschreven in dit artikel.

6.2. Verdeling

De domeingebeurtenissen zouden kunnen zijn Gepartitioneerd berichten. Dit helpt als we dat zijn het opschalen van de opslag en het verbeteren van de applicatieprestaties.

De domeingebeurtenis heeft meestal een partitiesleutel zodat deze in dezelfde partitie terechtkomt met gerelateerde berichten.

Laten we zeggen dat we willen dat de logboekberichten worden gepartitioneerd door de eerste letter in het bericht, wat de partitiesleutel zou zijn, en gegroepeerd in twee partities.

Er zou één partitie zijn voor de logboekberichten die beginnen met A-M en nog een partitie voor N-Z. Dit kan worden geconfigureerd met behulp van twee eigenschappen:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression - de uitdrukking om de payloads te partitioneren
  • spring.cloud.stream.bindings.output.producer.partitionCount - het aantal groepen

Soms is de te partitioneren uitdrukking te complex om deze op slechts één regel te schrijven. Voor deze gevallen kunnen we onze aangepaste partitiestrategie schrijven met behulp van de eigenschap spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass.

6.3. Gezondheidsindicator

In de context van microservices moeten we dat ook doen detecteren wanneer een service uitvalt of begint te falen. Spring Cloud Stream biedt de accommodatie management.health.binders.ingeschakeld om de gezondheidsindicatoren voor bindmiddelen mogelijk te maken.

Bij het uitvoeren van de applicatie kunnen we de gezondheidsstatus opvragen op //:/Gezondheid.

7. Conclusie

In deze tutorial hebben we de belangrijkste concepten van Spring Cloud Stream gepresenteerd en laten zien hoe je het kunt gebruiken door middel van enkele eenvoudige voorbeelden op RabbitMQ. Meer informatie over Spring Cloud Stream vind je hier.

De broncode voor dit artikel is te vinden op GitHub.


$config[zx-auto] not found$config[zx-overlay] not found