Reactieve systemen in Java

1. Inleiding

In deze tutorial zullen we de basisprincipes begrijpen van het maken van reactieve systemen in Java met behulp van Spring en andere tools en frameworks.

Tijdens het proces zullen we bespreken hoe reactief programmeren slechts een drijfveer is voor het creëren van een reactief systeem. Dit zal ons helpen de grondgedachte te begrijpen achter het creëren van reactieve systemen en verschillende specificaties, bibliotheken en standaarden die het gaandeweg heeft geïnspireerd.

2. Wat zijn reactieve systemen?

In de afgelopen decennia heeft het technologielandschap verschillende verstoringen ondergaan die hebben geleid tot een complete transformatie in de manier waarop we waarde in technologie zien. De computerwereld vóór internet had nooit de manieren en middelen kunnen bedenken waarop het onze huidige tijd zal veranderen.

Met het bereik van internet naar de massa en de steeds evoluerende ervaring die het belooft, moeten applicatie-architecten alert zijn om aan hun vraag te voldoen.

Dit betekent in wezen dat we nooit een applicatie kunnen ontwerpen zoals we dat vroeger deden. EEN zeer responsieve applicatie is niet langer een luxe maar een noodzaak.

Ook dat is in het licht van willekeurige storingen en onvoorspelbare belasting. De behoefte aan het uur is niet alleen om het juiste resultaat te krijgen, maar om het snel te krijgen! Het is heel belangrijk om de geweldige gebruikerservaringen die we beloven te leveren, te stimuleren.

Dit is wat de behoefte creëert aan een architecturale stijl die ons Reactive Systems kan geven.

2.1. Reactief manifest

In het jaar 2013 een team van ontwikkelaars, geleid door Jonas Boner, kwam samen om een ​​reeks kernprincipes te definiëren in een document dat bekend staat als het Reactive Manifesto. Dit is wat de basis heeft gelegd voor een architectuurstijl om Reactive Systems te creëren. Sindsdien heeft dit manifest veel interesse gewekt bij de ontwikkelaarsgemeenschap.

In principe schrijft dit document voor het recept voor een reactief systeem om flexibel, losjes gekoppeld en schaalbaar te zijn. Dit maakt dergelijke systemen gemakkelijk te ontwikkelen, tolerant voor fouten en vooral zeer responsief, de basis voor ongelooflijke gebruikerservaringen.

Dus wat is dit geheime recept? Nou, het is nauwelijks een geheim! Het manifest definieert de fundamentele kenmerken of principes van een reactief systeem:

  • Snel reagerend: Een reactief systeem moet een snelle en consistente responstijd bieden en dus een consistente servicekwaliteit
  • Veerkrachtig: Een reactief systeem moet responsief blijven in geval van willekeurige storingen door replicatie en isolatie
  • Elastisch: Een dergelijk systeem moet responsief blijven onder onvoorspelbare workloads door middel van kosteneffectieve schaalbaarheid
  • Berichtgestuurd: Het zou moeten vertrouwen op asynchrone berichten die tussen systeemcomponenten worden doorgegeven

Deze principes klinken eenvoudig en verstandig, maar zijn niet altijd gemakkelijker te implementeren in een complexe bedrijfsarchitectuur. In deze tutorial zullen we een voorbeeldsysteem in Java ontwikkelen met deze principes in gedachten!

3. Wat is reactief programmeren?

Voordat we verder gaan, is het belangrijk om het verschil te begrijpen tussen reactief programmeren en reactieve systemen. We gebruiken beide termen vrij vaak en begrijpen gemakkelijk de ene voor de andere verkeerd. Zoals we eerder hebben gezien, zijn reactieve systemen het resultaat van een specifieke architecturale stijl.

In tegenstelling tot, reactief programmeren is een programmeerparadigma waarbij de focus ligt op het ontwikkelen van asynchrone en niet-blokkerende componenten. De kern van reactief programmeren is een datastroom die we kunnen observeren en waarop we kunnen reageren en zelfs tegendruk kunnen uitoefenen. Dit leidt tot niet-blokkerende uitvoering en dus tot een betere schaalbaarheid met minder uitvoeringsdraden.

Dit betekent niet dat reactieve systemen en reactief programmeren elkaar wederzijds uitsluiten. In feite is reactief programmeren een belangrijke stap om een ​​reactief systeem te realiseren, maar het is niet alles!

3.1. Reactieve streams

Reactive Streams is een gemeenschapsinitiatief dat begon in het jaar 2013 tot een standaard bieden voor asynchrone streamverwerking met niet-blokkerende tegendruk. Het doel hier was om een ​​reeks interfaces, methoden en protocollen te definiëren die de noodzakelijke bewerkingen en entiteiten kunnen beschrijven.

Sindsdien zijn er verschillende implementaties in meerdere programmeertalen verschenen die voldoen aan de specificatie van reactieve streams. Deze omvatten Akka Streams, Ratpack en Vert.x om er maar een paar te noemen.

3.2. Reactieve bibliotheken voor Java

Een van de oorspronkelijke doelstellingen van de reactieve streams was om uiteindelijk te worden opgenomen als een officiële Java-standaardbibliotheek. Als gevolg hiervan is de specificatie van reactieve streams semantisch equivalent aan de Java Flow-bibliotheek, geïntroduceerd in Java 9.

Afgezien daarvan zijn er een paar populaire keuzes om reactief programmeren in Java te implementeren:

  • Reactive Extensions: in de volksmond bekend als ReactiveX, bieden ze API voor asynchroon programmeren met waarneembare streams. Deze zijn beschikbaar voor meerdere programmeertalen en platforms, waaronder Java, waar het bekend staat als RxJava
  • Project Reactor: Dit is een andere reactieve bibliotheek, gebaseerd op de specificatie van reactieve stromen, gericht op het bouwen van niet-applicaties op de JVM. Het is toevallig ook de basis van de reactieve stapel in het Spring-ecosysteem

4. Een eenvoudige applicatie

Voor deze tutorial zullen we een eenvoudige applicatie ontwikkelen op basis van microservices-architectuur met een minimale frontend. De applicatiearchitectuur moet voldoende elementen bevatten om een ​​reactief systeem te creëren.

Voor onze applicatie gebruiken we end-to-end reactieve programmering en andere patronen en tools om de fundamentele kenmerken van een reactief systeem te bereiken.

4.1. Architectuur

We beginnen met het definiëren een eenvoudige applicatiearchitectuur die niet noodzakelijk de kenmerken van reactieve systemen vertoont. Vanaf dat moment zullen we de nodige wijzigingen aanbrengen om deze kenmerken een voor een te bereiken.

Laten we dus eerst beginnen met het definiëren van een eenvoudige architectuur:

Dit is een vrij eenvoudige architectuur met een heleboel microservices om een ​​commerce-use-case te vergemakkelijken waarin we een bestelling kunnen plaatsen. Het heeft ook een frontend voor gebruikerservaring en alle communicatie verloopt als REST via HTTP. Bovendien beheert elke microservice zijn gegevens in individuele databases, een praktijk die bekend staat als database-per-service.

We gaan door en maken deze eenvoudige applicatie in de volgende subsecties. Dit wordt onze basis om de drogredenen van deze architectuur te begrijpen en manieren en middelen om principes en praktijken toe te passen, zodat we dit kunnen omvormen tot een reactief systeem.

4.3. Inventaris Microservice

Inventaris microservice zal zijn verantwoordelijk voor het beheren van een lijst met producten en hun huidige voorraad. Het maakt het ook mogelijk om de voorraad te wijzigen terwijl bestellingen worden verwerkt. We gebruiken Spring Boot met MongoDB om deze service te ontwikkelen.

Laten we beginnen met het definiëren van een controller om enkele eindpunten bloot te leggen:

@GetMapping openbare lijst getAllProducts () {retourneer productService.getProducts (); } @PostMapping openbare Order processOrder (@RequestBody Order order) {return productService.handleOrder (order); } @DeleteMapping openbare bestelling revertOrder (@RequestBody Order bestelling) {return productService.revertOrder (bestelling); }

en een service om onze bedrijfslogica samen te vatten:

@Transactional public Order handleOrder (Order order) {order.getLineItems () .forEach (l -> {Product> p = productRepository.findById (l.getProductId ()) .orElseThrow (() -> new RuntimeException ("Kon niet vinden het product: "+ l.getProductId ())); if (p.getStock ()> = l.getQuantity ()) {p.setStock (p.getStock () - l.getQuantity ()); productRepository.save ( p);} else {throw new RuntimeException ("Product is niet op voorraad:" + l.getProductId ());}}); retour order.setOrderStatus (OrderStatus.SUCCESS); } @Transactional public Order revertOrder (Order order) {order.getLineItems () .forEach (l -> {Product p = productRepository.findById (l.getProductId ()) .orElseThrow (() -> nieuwe RuntimeException ("Kon niet vinden het product: "+ l.getProductId ())); p.setStock (p.getStock () + l.getQuantity ()); productRepository.save (p);}); retour order.setOrderStatus (OrderStatus.SUCCESS); }

Merk op dat we het voortbestaan ​​van de entiteiten binnen een transactie, wat ervoor zorgt dat er geen inconsistente toestand resulteert in het geval van uitzonderingen.

Afgezien van deze, zullen we ook de domeinentiteiten, de repository-interface en een aantal configuratieklassen moeten definiëren die nodig zijn om alles correct te laten werken.

Maar aangezien deze meestal standaard zijn, zullen we ze vermijden, en er kan naar worden verwezen in de GitHub-repository in het laatste gedeelte van dit artikel.

4.4. Verzending Microservice

De microservice voor verzending zal ook niet veel verschillen. Dit zal zijn verantwoordelijk voor het controleren of een zending kan worden gegenereerd voor de bestelling en maak er indien mogelijk een aan.

Zoals eerder zullen we een controller definiëren om onze eindpunten bloot te leggen, in feite slechts een enkel eindpunt:

@PostMapping openbaar bestelproces (@RequestBody Order-bestelling) {return shippingService.handleOrder (bestelling); }

en een service om de bedrijfslogica met betrekking tot het verzenden van bestellingen samen te vatten:

public Order handleOrder (Order order) {LocalDate shippingDate = null; if (LocalTime.now (). isAfter (LocalTime.parse ("10:00")) && LocalTime.now (). isBefore (LocalTime.parse ("18:00"))) {shippingDate = LocalDate.now () .plusDays (1); } else {throw new RuntimeException ("De huidige tijd is buiten de limieten om een ​​bestelling te plaatsen."); } shippingRepository.save (nieuwe verzending () .setAddress (order.getShippingAddress ()) .setShippingDate (shippingDate)); retour order.setShippingDate (shippingDate) .setOrderStatus (OrderStatus.SUCCESS); }

Onze eenvoudige verzendservice controleert gewoon het geldige tijdvenster om bestellingen te plaatsen. We zullen de rest van de standaardcode niet zoals eerder bespreken.

4.5. Bestel Microservice

Ten slotte zullen we een order-microservice definiëren die zal zijn verantwoordelijk voor het creëren van een nieuwe bestelling, afgezien van andere dingen. Interessant is dat het ook zal spelen als een orchestrator-service, waar het zal communiceren met de voorraadservice en de verzendservice voor de bestelling.

Laten we onze controller definiëren met de vereiste eindpunten:

@PostMapping public Order create (@RequestBody Order order) {Order verwerktOrder = orderService.createOrder (order); if (OrderStatus.FAILURE.equals (verwerktOrder.getOrderStatus ())) {throw new RuntimeException ("Orderverwerking mislukt, probeer het later opnieuw."); } retour verwerkt } @GetMapping openbare lijst getAll () {retour orderService.getOrders (); }

En een service om de bedrijfslogica met betrekking tot bestellingen samen te vatten:

openbare order createOrder (ordervolgorde) {boolean success = true; Bestelling opgeslagenOrder = orderRepository.save (bestelling); Bestel inventoryResponse = null; probeer {inventoryResponse = restTemplate.postForObject (inventoryServiceUrl, order, Order.class); } catch (Exception ex) {success = false; } Bestelling shippingResponse = null; probeer {shippingResponse = restTemplate.postForObject (shippingServiceUrl, order, Order.class); } catch (uitzondering ex) {success = false; HttpEntity deleteRequest = nieuwe HttpEntity (bestelling); ResponseEntity deleteResponse = restTemplate.exchange (inventoryServiceUrl, HttpMethod.DELETE, deleteRequest, Order.class); } if (succes) {savedOrder.setOrderStatus (OrderStatus.SUCCESS); opgeslagenOrder.setShippingDate (shippingResponse.getShippingDate ()); } anders {opgeslagenOrder.setOrderStatus (OrderStatus.FAILURE); } retour orderRepository.save (opgeslagenOrder); } openbare lijst getOrders () {retour orderRepository.findAll (); }

De afhandeling van bestellingen waarbij we oproepen naar voorraad- en verzendservices orkestreren, is verre van ideaal. Verdeeld transacties met meerdere microservices is een complex onderwerp op zich en valt buiten het bestek van deze tutorial.

We zullen later in deze tutorial echter zien hoe een reactief systeem de noodzaak van gedistribueerde transacties tot op zekere hoogte kan vermijden.

Zoals eerder zullen we de rest van de boilerplate-code niet doornemen. Er kan echter naar worden verwezen in de GitHub-opslagplaats.

4.6. Voorkant

Laten we ook een gebruikersinterface toevoegen om de discussie compleet te maken. De gebruikersinterface zal gebaseerd zijn op Angular en zal een eenvoudige applicatie van één pagina zijn.

We zullen moeten maak een eenvoudige component in Angular om bestellingen aan te maken en op te halen. Van specifiek belang is het gedeelte waar we onze API aanroepen om de bestelling te creëren:

createOrder () {let headers = new HttpHeaders ({'Content-Type': 'application / json'}); let options = {headers: headers} this.http.post ('// localhost: 8080 / api / orders', this.form.value, options) .subscribe ((response) => {this.response = response}, (error) => {this.error = error})}

Het bovenstaande codefragment verwacht dat ordergegevens worden vastgelegd in een vorm en beschikbaar zijn binnen de scope van de component. Angular biedt fantastische ondersteuning voor het maken van eenvoudige tot complexe formulieren met behulp van reactieve en sjabloongestuurde formulieren.

Ook belangrijk is het gedeelte waar we eerder gemaakte bestellingen krijgen:

getOrders () {this.previousOrders = this.http.get ('' // localhost: 8080 / api / orders '')}

Houd er rekening mee dat de Angular HTTP-module asynchroon van aard en retourneert daarom RxJS Waarneembaars. We kunnen het antwoord naar onze mening afhandelen door ze door een asynchrone buis te leiden:

Uw tot dusver geplaatste bestellingen:

  • Bestellings-ID: {{order.id}}, Bestellingsstatus: {{order.orderStatus}}, Bestellingsbericht: {{order.responseMessage}}

Natuurlijk heeft Angular sjablonen, stijlen en configuraties nodig om te werken, maar hier kan naar worden verwezen in de GitHub-repository. Houd er rekening mee dat we hier alles in een enkele component hebben gebundeld, wat idealiter niet iets is dat we zouden moeten doen.

Maar voor deze tutorial vallen die zorgen niet onder de aandacht.

4.7. De applicatie implementeren

Nu we alle afzonderlijke onderdelen van de applicatie hebben gemaakt, hoe moeten we ze dan implementeren? Nou, we kunnen dit altijd handmatig doen. Maar we moeten oppassen dat het snel vervelend kan worden.

Voor deze tutorial gebruiken we Docker Compose to bouw en implementeer onze applicatie op een Docker Machine. Dit vereist dat we een standaard Dockerfile toevoegen aan elke service en een Docker Compose-bestand maken voor de hele applicatie.

Laten we eens kijken hoe dit docker-compose.yml bestand ziet eruit:

versie: '3' services: frontend: build: ./frontend poorten: - "80:80" bestelservice: build: ./order-service poorten: - "8080: 8080" inventarisservice: build: ./inventaris -servicepoorten: - "8081: 8081" verzendservice: build: ./shipping-service poorten: - "8082: 8082"

Dit is een vrij standaarddefinitie van services in Docker Compose en vereist geen speciale aandacht.

4.8. Problemen met deze architectuur

Nu we een eenvoudige applicatie hebben met meerdere services die met elkaar communiceren, kunnen we de problemen in deze architectuur bespreken. Er zijn wat we zullen proberen aan te pakken in de volgende secties en uiteindelijk de staat bereiken waarin we onze applicatie zouden hebben getransformeerd in een reactief systeem!

Hoewel deze applicatie verre van software van productiekwaliteit is en er verschillende problemen zijn, zullen we focus op de kwesties die betrekking hebben op de motivaties voor reactieve systemen:

  • Storingen in de voorraadservice of de verzendservice kunnen een trapsgewijs effect hebben
  • De oproepen naar externe systemen en database zijn allemaal blokkerend van aard
  • De implementatie kan storingen en fluctuerende belastingen niet automatisch verwerken

5. Reactief programmeren

Oproepen vaak in elk programma blokkeren resulteren in kritieke bronnen die wachten tot er dingen gebeuren. Dit zijn onder meer databaseaanroepen, oproepen naar webservices en bestandssysteemoproepen. Als we door dit wachten threads van uitvoering kunnen vrijmaken en een mechanisme kunnen bieden om terug te keren zodra de resultaten beschikbaar zijn, zal dit een veel beter gebruik van bronnen opleveren.

Dit is wat het adopteren van het reactieve programmeerparadigma voor ons doet. Hoewel het voor veel van deze oproepen mogelijk is om over te schakelen naar een reactieve bibliotheek, is het misschien niet voor alles mogelijk. Gelukkig maakt Spring het voor ons veel gemakkelijker om reactief programmeren te gebruiken met MongoDB- en REST-API's:

Spring Data Mongo heeft ondersteuning voor reactieve toegang via de MongoDB Reactive Streams Java Driver. Het zorgt voor ReactiveMongoTemplate en ReactiveMongoRepository, die beide een uitgebreide kaartfunctionaliteit hebben.

Spring WebFlux biedt het reactive-stack webraamwerk voor Spring, waardoor niet-blokkerende code en Reactive Streams tegendruk mogelijk worden gemaakt. Het maakt gebruik van de Reactor als zijn reactieve bibliotheek. Verder biedt het Web cliënt voor het uitvoeren van HTTP-verzoeken met tegendruk van Reactive Streams. Het gebruikt Reactor Netty als de HTTP-clientbibliotheek.

5.1. Voorraadservice

We beginnen met het wijzigen van onze eindpunten om reactieve uitgevers uit te zenden:

@GetMapping openbare Flux getAllProducts () {retourneer productService.getProducts (); }
@PostMapping openbaar Mono processOrder (@RequestBody Order order) {return productService.handleOrder (order); } @DeleteMapping openbare Mono revertOrder (@RequestBody Order order) {return productService.revertOrder (order); }

Uiteraard zullen we ook de nodige wijzigingen in de service moeten aanbrengen:

@Transactional openbare Mono handleOrder (Order order) {return Flux.fromIterable (order.getLineItems ()) .flatMap (l -> productRepository.findById (l.getProductId ())) .flatMap (p -> {int q = order. getLineItems (). stream () .filter (l -> l.getProductId (). is gelijk aan (p.getId ())) .findAny (). get () .getQuantity (); if (p.getStock ()> = q) {p.setStock (p.getStock () - q); return productRepository.save (p);} else {return Mono.error (new RuntimeException ("Product is niet op voorraad:" + p.getId ()) );}}). dan (Mono.just (order.setOrderStatus ("SUCCESS"))); } @Transactional public Mono revertOrder (Order order) {return Flux.fromIterable (order.getLineItems ()) .flatMap (l -> productRepository.findById (l.getProductId ())) .flatMap (p -> {int q = order .getLineItems (). stream () .filter (l -> l.getProductId (). is gelijk aan (p.getId ())) .findAny (). get () .getQuantity (); p.setStock (p.getStock ( ) + q); retourneer productRepository.save (p);}) .then (Mono.just (order.setOrderStatus ("SUCCESS"))); }

5.2. Verzendservice

Op dezelfde manier veranderen we het eindpunt van onze verzendservice:

@PostMapping openbaar Mono-proces (@RequestBody Order order) {return shippingService.handleOrder (order); }

En overeenkomstige wijzigingen in de service om reactief programmeren te benutten:

public Mono handleOrder (Order order) {return Mono.just (order) .flatMap (o -> {LocalDate shippingDate = null; if (LocalTime.now (). isAfter (LocalTime.parse ("10:00")) && LocalTime .now (). isBefore (LocalTime.parse ("18:00"))) {shippingDate = LocalDate.now (). plusDays (1);} else {return Mono.error (new RuntimeException ("De huidige tijd is uitgeschakeld de limieten om een ​​bestelling te plaatsen. "));} retourzendingRepository.save (nieuwe verzending () .setAddress (order.getShippingAddress ()) .setShippingDate (shippingDate));}) .map (s -> order.setShippingDate (s. getShippingDate ()) .setOrderStatus (OrderStatus.SUCCESS)); }

5.3. Bestelservice

We zullen soortgelijke wijzigingen moeten aanbrengen in de eindpunten van de bestelservice:

@PostMapping public Mono create (@RequestBody Order order) {return orderService.createOrder (order) .flatMap (o -> {if (OrderStatus.FAILURE.equals (o.getOrderStatus ())) {return Mono.error (nieuwe RuntimeException ( "Orderverwerking mislukt, probeer het later opnieuw." + O.getResponseMessage ()));} else {return Mono.just (o);}}); } @GetMapping openbare Flux getAll () {retour orderService.getOrders (); }

De serviceveranderingen zullen er meer bij betrokken zijn, aangezien we gebruik moeten maken van Spring Web cliënt om de reactieve eindpunten voor inventaris en verzending aan te roepen:

openbare Mono createOrder (Order order) {return Mono.just (order) .flatMap (orderRepository :: save) .flatMap (o -> {retourneer webClient.method (HttpMethod.POST) .uri (inventoryServiceUrl) .body (BodyInserters.fromValue (o)) .exchange ();}) .onErrorResume (err -> {return Mono.just (order.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (err.getMessage ()));}) .flatMap (o -> {if (! OrderStatus.FAILURE.equals (o.getOrderStatus ())) {return webClient.method (HttpMethod.POST) .uri (shippingServiceUrl) .body (BodyInserters.fromValue (o)) .exchange ();} anders { return Mono.just (o);}}) .onErrorResume (err -> {return webClient.method (HttpMethod.POST) .uri (inventoryServiceUrl) .body (BodyInserters.fromValue (order)) .retrieve () .bodyToMono (Order .class) .map (o -> o.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (err.getMessage ()));}) .map (o -> {if (! OrderStatus.FAILURE.equals (o.getOrderStatus ( ))) {retour order.setShippingDate (o.getShippingDate ()) .setOrderStatus (OrderStatus.SUCCESS);} anders {retour order.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (o.getResponseMessage ()); }}) .flatMap (orderRepository :: save); } openbare Flux getOrders () {retour orderRepository.findAll (); }

Dit soort orkestratie met reactieve API's is geen gemakkelijke oefening en vaak foutgevoelig en moeilijk te debuggen. We zullen zien hoe dit kan worden vereenvoudigd in de volgende sectie.

5.4. Voorkant

Nu onze API's in staat zijn om gebeurtenissen te streamen terwijl ze plaatsvinden, is het heel normaal dat we dat ook in onze front-end moeten kunnen gebruiken. Gelukkig ondersteunt Angular EventSource, de interface voor door de server verzonden gebeurtenissen.

Laten we eens kijken hoe we al onze eerdere bestellingen kunnen ophalen en verwerken als een stroom van evenementen:

getOrderStream () {return Observable.create ((observer) => {let eventSource = nieuwe EventSource ('// localhost: 8080 / api / orders') eventSource.onmessage = (event) => {let json = JSON.parse ( event.data) this.orders.push (json) this._zone.run (() => {observer.next (this.orders)})} eventSource.onerror = (error) => {if (eventSource.readyState = == 0) {eventSource.close () this._zone.run (() => {observer.complete ()})} anders {this._zone.run (() => {observer.error ('EventSource-fout: '+ error)})}}})}

6. Berichtgestuurde architectuur

Het eerste probleem dat we gaan aanpakken, heeft betrekking op service-to-service-communicatie. Direct, deze communicatie is synchroon, wat verschillende problemen oplevert. Deze omvatten trapsgewijze fouten, complexe orkestratie en gedistribueerde transacties om er maar een paar te noemen.

Een voor de hand liggende manier om dit probleem op te lossen, is door deze communicatie asynchroon te maken. EEN message broker voor het faciliteren van alle service-to-service-communicatie kan de truc voor ons doen. We gebruiken Kafka als onze berichtenmakelaar en Spring voor Kafka om berichten te produceren en te consumeren:

We gebruiken één onderwerp om bestelberichten met verschillende bestelstatussen te produceren en te consumeren, zodat services kunnen reageren.

Laten we eens kijken hoe elke service moet veranderen.

6.1. Voorraadservice

Laten we beginnen met het definiëren van de berichtproducent voor onze inventarisatiedienst:

@Autowired privé KafkaTemplate kafkaTemplate; public void sendMessage (Order order) {this.kafkaTemplate.send ("orders", order); }

Vervolgens moeten we een berichtgebruiker voor voorraadservice definiëren om op verschillende berichten over het onderwerp te reageren:

@KafkaListener (topics = "orders", groupId = "inventory") public void consume (Order order) gooit IOException {if (OrderStatus.RESERVE_INVENTORY.equals (order.getOrderStatus ())) {productService.handleOrder (order) .doOnSuccess ( o -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_SUCCESS));}) .doOnError (e -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_FAILURE) .setRespgetMessage (e.))); }). abonneren (); } anders if (OrderStatus.REVERT_INVENTORY.equals (order.getOrderStatus ())) {productService.revertOrder (order) .doOnSuccess (o -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_REVERT_SUCCESS)));}). e -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_REVERT_FAILURE) .setResponseMessage (e.getMessage ()));}). subscribe (); }}

Dit betekent ook dat we nu veilig enkele van de overtollige eindpunten van onze controller kunnen verwijderen. Deze wijzigingen zijn voldoende om asynchrone communicatie in onze applicatie te realiseren.

6.2. Verzendservice

De wijzigingen in de verzendservice zijn relatief vergelijkbaar met wat we eerder deden met de voorraadservice. De berichtproducent is dezelfde en de berichtgebruiker is specifiek voor verzendlogica:

@KafkaListener (topics = "orders", groupId = "verzending") public void consume (Order order) gooit IOException {if (OrderStatus.PREPARE_SHIPPING.equals (order.getOrderStatus ())) {shippingService.handleOrder (order) .doOnSuccess ( o -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.SHIPPING_SUCCESS) .setShippingDate (o.getShippingDate ()));}) .doOnError (e -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.SHIPPING). (e.getMessage ()));}). abonneren (); }}

We kunnen nu veilig alle eindpunten in onze controller laten vallen omdat we ze niet langer nodig hebben.

6.3. Bestelservice

De wijzigingen in de bestelservice zullen iets meer betrokken zijn, aangezien we hier eerder al de orkestratie deden.

Desalniettemin blijft de berichtproducent ongewijzigd en neemt de berichtconsument de bestelservice-specifieke logica over:

@KafkaListener (topics = "orders", groupId = "orders") public void consume (Order order) gooit IOException {if (OrderStatus.INITIATION_SUCCESS.equals (order.getOrderStatus ())) {orderRepository.findById (order.getId () ) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.RESERVE_INVENTORY)); retourneer o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) .flatMap (orderRepository ());}). : opslaan) .subscribe (); } else if ("INVENTORY-SUCCESS" .equals (order.getOrderStatus ())) {orderRepository.findById (order.getId ()) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.PREPARE_SHIPPING)) ; retourneer o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) .flatMap (orderRepository :: save) .subscribe (); } else if ("SHIPPING-FAILURE" .equals (order.getOrderStatus ())) {orderRepository.findById (order.getId ()) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.REVERT_INVENTORY)) ; retourneer o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) .flatMap (orderRepository :: save) .subscribe (); } anders {orderRepository.findById (order.getId ()) .map (o -> {terug o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) .flatMap (orderRepository :: save ). abonneren (); }}

De de consument reageert hier alleen op bestelberichten met verschillende bestelstatussen. Dit is wat ons de choreografie geeft tussen verschillende diensten.

Ten slotte zal onze bestelservice ook moeten veranderen om deze choreografie te ondersteunen:

openbare Mono createOrder (Order order) {return Mono.just (order) .flatMap (orderRepository :: save) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.INITIATION_SUCCESS)); return o;}). onErrorResume (err -> {return Mono.just (order.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (err.getMessage ()));}) .flatMap (orderRepository :: save); }

Merk op dat dit veel eenvoudiger is dan de service die we moesten schrijven met reactieve eindpunten in de laatste sectie. Asynchroon choreografie resulteert vaak in veel eenvoudigere code, hoewel dit ten koste gaat van de uiteindelijke consistentie en complexe debugging en monitoring. Zoals we wellicht raden, krijgt onze front-end niet langer onmiddellijk de definitieve status van de bestelling.

7. Container Orchestration-service

Het laatste stukje van de puzzel dat we willen oplossen heeft te maken met inzet.

Wat we in de applicatie willen, is voldoende redundantie en de neiging om automatisch omhoog of omlaag te schalen, afhankelijk van de behoefte.

We hebben de services al gecontaineriseerd via Docker en beheren de onderlinge afhankelijkheden via Docker Compose. Hoewel dit op zichzelf al fantastische tools zijn, helpen ze ons niet om te bereiken wat we willen.

Vandaar dat wij een containerorkestratieservice nodig die kan zorgen voor redundantie en schaalbaarheid in onze applicatie. Hoewel er verschillende opties zijn, is een van de populaire opties Kubernetes. Kubernetes biedt ons een cloudleverancier-agnostische manier om zeer schaalbare implementaties van gecontaineriseerde workloads te realiseren.

Kubernetes wikkelt containers zoals Docker in Pods, die de kleinste implementatie-eenheid zijn. Verder kunnen we Deployment gebruiken om de gewenste staat declaratief te beschrijven.

Bij implementatie worden ReplicaSets gemaakt, die intern verantwoordelijk is voor het openen van de pods. We kunnen een minimumaantal identieke pods beschrijven die op elk moment in de tijd zouden moeten werken. Dit zorgt voor redundantie en daarmee voor een hoge beschikbaarheid.

Laten we eens kijken hoe we een Kubernetes-implementatie voor onze applicaties kunnen definiëren:

apiVersion: apps / v1 soort: implementatie metagegevens: naam: inventarisimplementatiespecificatie: replica's: 3 selector: matchLabels: naam: inventarisimplementatiesjabloon: metadata: labels: naam: inventarisimplementatiespecificatie: containers: - naam: inventarisimage: inventaris-service-async: laatste poorten: - containerPort: 8081 --- apiVersion: apps / v1 soort: implementatie metadata: naam: verzending-implementatie spec: replica's: 3 selector: matchLabels: naam: verzending-implementatie sjabloon: metadata: labels : naam: verzending-implementatie specificatie: containers: - naam: verzending afbeelding: verzending-service-async: laatste poorten: - containerPort: 8082 --- apiVersion: apps / v1 soort: implementatie metadata: naam: order-implementatie specificatie: replica's : 3 selector: matchLabels: name: order-deployment sjabloon: metadata: labels: name: order-deployment spec: containers: - name: order image: order-service-async: laatste poorten: - containerPort: 8080

Hier verklaren we onze implementatie om op elk moment drie identieke replica's van pods te behouden. Hoewel dit een goede manier is om redundantie toe te voegen, is het mogelijk niet voldoende voor wisselende belastingen. Kubernetes biedt een andere bron die bekend staat als de horizontale pod-autoscaler die dat wel kan schaal het aantal pods in een implementatie op basis van geobserveerde metrische gegevens zoals CPU-gebruik.

Houd er rekening mee dat we zojuist de schaalbaarheidsaspecten hebben behandeld van de applicatie die wordt gehost op een Kubernetes-cluster. Dit betekent niet noodzakelijk dat het onderliggende cluster zelf schaalbaar is. Het maken van een Kubernetes-cluster met hoge beschikbaarheid is een niet-triviale taak en valt buiten het bestek van deze zelfstudie.

8. Resulterend reactief systeem

Nu we verschillende verbeteringen in onze architectuur hebben aangebracht, is het misschien tijd om dit te evalueren aan de hand van de definitie van een reactief systeem. We houden de evaluatie af tegen de vier kenmerken van een reactief systeem dat we eerder in de tutorial bespraken:

  • Snel reagerend: De acceptatie van het reactieve programmeerparadigma zou ons moeten helpen om end-to-end niet-blokkerende en dus een responsieve applicatie te bereiken
  • Veerkrachtig: Kubernetes-implementatie met ReplicaSet van het gewenste aantal pods moet veerkracht bieden tegen willekeurige fouten
  • Elastisch: Kubernetes-cluster en resources moeten ons de nodige ondersteuning bieden om elastisch te zijn bij onvoorspelbare belastingen
  • Berichtgestuurd: Alle service-naar-service-communicatie asynchroon laten verlopen via een Kafka-makelaar zou ons hierbij moeten helpen

Hoewel dit er veelbelovend uitziet, is het nog lang niet voorbij. Om eerlijk te zijn, de de zoektocht naar een echt reactief systeem zou een voortdurende oefening van verbeteringen moeten zijn. We kunnen nooit alles voorkomen dat kan mislukken in een zeer complexe infrastructuur, waar onze applicatie maar een klein onderdeel is.

Een reactief systeem zal het dus doen eisen betrouwbaarheid van elk onderdeel dat het geheel maakt. Van het fysieke netwerk tot infrastructuurdiensten zoals DNS, ze moeten allemaal op één lijn liggen om ons te helpen het einddoel te bereiken.

Vaak is het voor ons niet mogelijk om voor al deze onderdelen de nodige garanties te beheren en te bieden. En dit is waar een beheerde cloudinfrastructuur helpt onze pijn te verlichten. We kunnen kiezen uit een scala aan diensten zoals IaaS (Infeastrure-as-a-Service), BaaS (Backend-as-a-Service) en PaaS (Platform-as-a-Service) om de verantwoordelijkheden aan externe partijen te delegeren. Hierdoor blijven wij zoveel mogelijk verantwoordelijk voor onze applicatie.

9. Conclusie

In deze tutorial hebben we de basisprincipes van reactieve systemen doorgenomen en hoe verhoudt deze zich tot reactief programmeren. We hebben een eenvoudige applicatie gemaakt met meerdere microservices en hebben de problemen belicht die we willen oplossen met een reactief systeem.

Verder gingen we door en introduceerden we reactieve programmering, berichtgebaseerde architectuur en containerorkestratieservice in de architectuur om een ​​reactief systeem te realiseren.

Ten slotte bespraken we de resulterende architectuur en hoe het een reis naar het reactieve systeem blijft! Deze tutorial laat ons niet alle tools, frameworks of patronen zien die ons kunnen helpen een reactief systeem te creëren, maar het laat ons kennismaken met de reis.

Zoals gewoonlijk is de broncode voor dit artikel te vinden op GitHub.