Inleiding tot Project Reactor Bus

1. Overzicht

In dit korte artikel introduceren we de reactorbus door een real-life scenario op te zetten voor een reactieve, gebeurtenisgestuurde applicatie.

2. De basisprincipes van Project Reactor

2.1. Waarom Reactor?

Moderne applicaties hebben te maken met een groot aantal gelijktijdige verzoeken en verwerken een aanzienlijke hoeveelheid gegevens. Standaard blokkeercode is niet langer voldoende om aan deze eisen te voldoen.

Het reactieve ontwerppatroon is een op gebeurtenissen gebaseerde architecturale benadering voor asynchrone afhandeling van een groot aantal gelijktijdige serviceverzoeken afkomstig van enkele of meerdere service handlers.

De projectreactor is gebaseerd op dit patroon en heeft een duidelijk en ambitieus doel om niet-blokkerende, reactieve applicaties te bouwen op de JVM.

2.2. Voorbeeldscenario's

Voordat we beginnen, zijn hier een paar interessante scenario's waarin het zinvol zou zijn om de reactieve architecturale stijl te gebruiken, om een ​​idee te krijgen van waar we deze zouden kunnen toepassen:

  • Meldingsservices voor een groot online winkelplatform zoals Amazon
  • Enorme transactieverwerkingsdiensten voor de banksector
  • Aandelenhandel in bedrijven waar de prijzen van aandelen gelijktijdig veranderen

3. Maven afhankelijkheden

Laten we Project Reactor Bus gaan gebruiken door de volgende afhankelijkheid toe te voegen aan onze pom.xml:

 io.projectreactor reactor-bus 2.0.8.RELEASE 

We kunnen de laatste versie van reactorbus in Maven Central.

4. Een demo-applicatie bouwen

Laten we eens kijken naar een praktisch voorbeeld om de voordelen van de reactorgebaseerde benadering beter te begrijpen.

We bouwen een eenvoudige applicatie die verantwoordelijk is voor het verzenden van meldingen naar de gebruikers van een online winkelplatform. Als een gebruiker bijvoorbeeld een nieuwe bestelling plaatst, stuurt de app een orderbevestiging via e-mail of sms.

Een typische synchrone implementatie zou natuurlijk worden beperkt door de verwerkingscapaciteit van de e-mail- of sms-service. Daarom zouden verkeerspieken, zoals vakanties, over het algemeen problematisch zijn.

Met een reactieve aanpak kunnen we ons systeem flexibeler ontwerpen en beter aanpassen aan storingen of time-outs die kunnen optreden in de externe systemen, zoals gatewayservers.

Laten we de toepassing eens bekijken - te beginnen met de meer traditionele aspecten en door te gaan met de meer reactieve constructies.

4.1. Eenvoudig POJO

Laten we eerst een POJO-klasse maken om de meldingsgegevens weer te geven:

openbare klasse NotificationData {privé lange id; private String naam; privé String-e-mail; privé String mobiel; // getter- en setter-methoden}

4.2. De servicelaag

Laten we nu een eenvoudige servicelaag definiëren:

openbare interface NotificationService {void initiateNotification (NotificationData notificationData) gooit InterruptedException; }

En de implementatie, die een langlopende operatie simuleert:

@Service public class NotificationServiceimpl implementeert NotificationService {@Override public void initiateNotification (NotificationData notificationData) gooit InterruptedException {System.out.println ("Notification service gestart voor" + "Notification ID:" + notificationData.getId ()); Thread.sleep (5000); System.out.println ("Meldingsservice beëindigd voor" + "Meldings-ID:" + notificationData.getId ()); }}

Merk op dat ter illustratie van een realistisch scenario van het verzenden van berichten via een sms- of e-mailgateway, we opzettelijk een vertraging van vijf seconden introduceren in de initiateNotification methode met Thread.sleep (5000).

Als een thread de service raakt, wordt deze daarom gedurende vijf seconden geblokkeerd.

4.3. De consument

Laten we nu ingaan op de meer reactieve aspecten van onze applicatie en een consument implementeren - die we vervolgens zullen toewijzen aan de reactorgebeurtenisbus:

@Service openbare klasse NotificationConsumer implementeert Consumer {@Autowired privé NotificationService notificationService; @Override public void accept (Event notificationDataEvent) {NotificationData notificationData = notificationDataEvent.getData (); probeer {notificationService.initiateNotification (notificationData); } catch (InterruptedException e) {// ignore}}}

Zoals we kunnen zien, implementeert de consument die we hebben gemaakt het Klant koppel. De belangrijkste logica zit in het aanvaarden methode.

Dit is een vergelijkbare benadering die we kunnen tegenkomen in een typische Spring listener-implementatie.

4.4. De controller

Eindelijk, nu we de evenementen kunnen consumeren, laten we ze ook genereren.

We gaan dat doen in een eenvoudige controller:

@Controller openbare klasse NotificationController {@Autowired privé EventBus eventBus; @GetMapping ("/ startNotification / {param}") public void startNotification (@PathVariable Integer param) {for (int i = 0; i <param; i ++) {NotificationData data = new NotificationData (); data.setId (i); eventBus.notify ("notificationConsumer", Event.wrap (data)); System.out.println ("Notificatie" + i + ": meldingstaak succesvol ingediend"); }}}

Dit spreekt voor zich: we zenden gebeurtenissen uit via de EventBus hier.

Als een client bijvoorbeeld de URL bereikt met een parameterwaarde van tien, worden tien gebeurtenissen via de gebeurtenisbus verzonden.

4.5. Het Java Config

Laten we nu alles samenvoegen en een eenvoudige Spring Boot-applicatie maken.

Eerst moeten we configureren EventBus en Milieu bonen:

@ Configuratie openbare klasse Config {@Bean openbare omgeving env () {return Environment.initializeIfEmpty (). AssignErrorJournal (); } @Bean openbare EventBus createEventBus (Omgeving env) {retourneer EventBus.create (env, Environment.THREAD_POOL); }}

In ons geval, we instantiëren de EventBus met een standaard threadpool die beschikbaar is in de omgeving.

Als alternatief kunnen we een op maat gemaakte Verzender voorbeeld:

EventBus evBus = EventBus.create (env, Environment.newDispatcher (REACTOR_CAPACITY, REACTOR_CONSUMERS_COUNT, DispatcherType.THREAD_POOL_EXECUTOR));

Nu zijn we klaar om een ​​hoofdtoepassingscode te maken:

importeer statische reactor.bus.selector.Selectors. $; @SpringBootApplication openbare klasse NotificationApplication implementeert CommandLineRunner {@Autowired privé EventBus eventBus; @Autowired privé NotificationConsumer notificationConsumer; @Override public void run (String ... args) gooit Uitzondering {eventBus.on ($ ("notificationConsumer"), notificationConsumer); } openbare statische leegte hoofd (String [] args) {SpringApplication.run (NotificationApplication.class, args); }}

In onze rennen methode we registreren het notificatie Consument wordt geactiveerd wanneer de melding overeenkomt met een bepaalde selector.

Merk op hoe we de statische import van het $ attribuut om een Selector voorwerp.

5. Test de applicatie

Laten we nu een test maken om onze Kennisgeving Toepassing in actie:

@RunWith (SpringRunner.class) @SpringBootTest (webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) openbare klasse NotificationApplicationIntegrationTest {@LocalServerPort privé int-poort; @Test openbare ongeldig gegevenAppStarted_whenNotificationTasksSubmitted_thenProcessed () {RestTemplate restTemplate = nieuwe RestTemplate (); restTemplate.getForObject ("// localhost:" + port + "/ startNotification / 10", String.class); }}

Zoals we kunnen zien, zodra het verzoek is uitgevoerd, alle tien taken worden direct verzonden zonder blokkering. En eenmaal ingediend, worden de meldingsgebeurtenissen parallel verwerkt.

Melding 0: meldingstaak succesvol ingediend Melding 1: meldingstaak succesvol ingediend Melding 2: meldingstaak succesvol ingediend Melding 3: meldingstaak succesvol ingediend Melding 4: meldingstaak succesvol ingediend Melding 5: meldingstaak succesvol ingediend Melding 6: meldingstaak succesvol ingediend Melding 7: meldingstaak succesvol ingediend Melding 8: meldingstaak succesvol verzonden Melding 9: meldingstaak succesvol ingediend Meldingsservice gestart voor meldings-ID: 1 Meldingsservice gestart voor meldings-ID: 2 Meldingsservice gestart voor meldings-ID: 3 Meldingsservice gestart voor meldings-ID : 0 Meldingsservice beëindigd voor meldings-ID: 1 Meldingsservice beëindigd voor meldings-ID: 0 Meldingsservice gestart voor meldings-ID: 4 Meldingsservice beëindigd voor meldings-ID: 3 Meldingsservice beëindigd voor meldings-ID: 2 Meldingsservice gestart voor meldings-ID: 6 Meldingsservice gestart voor meldings-ID: 5 Meldingsservice gestart voor meldings-ID: 7 Meldingsservice is beëindigd voor meldings-ID: 4 Meldingsservice gestart voor meldings-ID: 8 Meldingsservice is beëindigd voor Meldings-ID: 6 Meldingsservice is beëindigd voor meldings-ID: 5 Meldingsservice is gestart voor meldings-ID: 9 Meldingsservice is beëindigd voor meldings-ID: 7 Meldingsservice is beëindigd voor meldings-ID: 8 Meldingsservice is beëindigd voor meldings-ID: 9

Het is belangrijk om daar rekening mee te houden in ons scenario is het niet nodig om deze gebeurtenissen in een bepaalde volgorde te verwerken.

6. Conclusie

In deze korte tutorial, we hebben een eenvoudige gebeurtenisgestuurde applicatie gemaakt. We hebben ook gezien hoe we kunnen beginnen met het schrijven van een meer reactieve en niet-blokkerende code.

Echter, dit scenario krast gewoon het oppervlak van het onderwerp en vormt gewoon een goede basis om te beginnen met experimenteren met het reactieve paradigma.

Zoals altijd is de broncode beschikbaar op GitHub.


$config[zx-auto] not found$config[zx-overlay] not found