Aan de slag met stroomverwerking met Spring Cloud Data Flow

1. Inleiding

Spring Cloud-gegevensstroom is een cloud-native programmeer- en bedrijfsmodel voor composable data-microservices.

Met Spring Cloud-gegevensstroomkunnen ontwikkelaars datapijplijnen maken en orkestreren voor veelvoorkomende gebruiksscenario's, zoals het opnemen van gegevens, realtime analyses en het importeren / exporteren van gegevens.

Deze datapijplijnen zijn er in twee varianten: streaming- en batchgegevenspijplijnen.

In het eerste geval wordt een onbegrensde hoeveelheid gegevens verbruikt of geproduceerd via messaging-middleware. Terwijl in het tweede geval de kortstondige taak een eindige set gegevens verwerkt en vervolgens wordt beëindigd.

Dit artikel gaat over streamingverwerking.

2. Bouwkundig overzicht

De belangrijkste componenten van dit type architectuur zijn Toepassingen, de Gegevensstroom-server, en de beoogde looptijd.

Naast deze sleutelcomponenten hebben we meestal ook een Dataflow-shell en een bericht makelaar binnen de architectuur.

Laten we al deze componenten in meer detail bekijken.

2.1. Toepassingen

Typisch omvat een streaming-datapijplijn verbruiksgebeurtenissen van externe systemen, gegevensverwerking en meertalige persistentie. Deze fasen worden gewoonlijk aangeduid als Bron, Verwerker, en Wastafel in Lente Cloud terminologie:

  • Bron: is de applicatie die gebeurtenissen verbruikt
  • Bewerker: verbruikt gegevens van de Bron, verwerkt het en stuurt de verwerkte gegevens naar de volgende applicatie in de pijplijn
  • Wastafel: ofwel verbruikt van een Bron of Verwerker en schrijft de gegevens naar de gewenste persistentielaag

Deze applicaties kunnen op twee manieren worden verpakt:

  • Spring Boot uber-jar die wordt gehost in een maven-repository, bestand, http of een andere Spring-resource-implementatie (deze methode wordt in dit artikel gebruikt)
  • Docker

Veel bronnen, processor en sink-toepassingen voor algemeen gebruik (bijv. Jdbc, hdfs, http, router) zijn al beschikbaar en klaar voor gebruik door de Spring Cloud-gegevensstroom team.

2.2. Looptijd

Er is ook een runtime nodig om deze applicaties uit te voeren. De ondersteunde looptijden zijn:

  • Cloud Foundry
  • Apache GAREN
  • Kubernetes
  • Apache Mesos
  • Lokale server voor ontwikkeling (die in dit artikel wordt gebruikt)

2.3. Gegevensstroom-server

Het onderdeel dat verantwoordelijk is voor het implementeren van applicaties in een runtime is het Gegevensstroom-server. Er is een Gegevensstroom-server uitvoerbare jar verstrekt voor elk van de doelruntimes.

De Gegevensstroom-server is verantwoordelijk voor het tolken van:

  • Een stream DSL die de logische gegevensstroom door meerdere applicaties beschrijft.
  • Een implementatie manifest dat de toewijzing van applicaties aan de runtime beschrijft.

2.4. Dataflow-shell

De Data Flow Shell is een client voor de Data Flow Server. De shell stelt ons in staat om de DSL-opdracht uit te voeren die nodig is om met de server te communiceren.

Als voorbeeld zou de DSL om de gegevensstroom van een http-bron naar een jdbc-sink te beschrijven worden geschreven als "http | jdbc ”. Deze namen in de DSL zijn geregistreerd met de Gegevensstroom-server en toewijzen aan toepassingsartefacten die kunnen worden gehost in Maven- of Docker-opslagplaatsen.

Spring biedt ook een grafische interface, genaamd Flo, voor het maken en bewaken van streaminggegevenspijplijnen. Het gebruik ervan valt echter buiten de bespreking van dit artikel.

2.5. Berichtmakelaar

Zoals we hebben gezien in het voorbeeld van de vorige sectie, hebben we het pijpsymbool gebruikt bij de definitie van de gegevensstroom. Het pijpsymbool vertegenwoordigt de communicatie tussen de twee applicaties via messaging middleware.

Dit betekent dat we een message broker nodig hebben die actief is in de doelomgeving.

De twee ondersteunde middleware-brokers voor berichten zijn:

  • Apache Kafka
  • RabbitMQ

En dus, nu we een overzicht hebben van de architectonische componenten - het is tijd om onze eerste stream processing pipeline te bouwen.

3. Installeer een Message Broker

Zoals we hebben gezien, hebben de applicaties in de pijplijn een messaging-middleware nodig om te communiceren. Voor de toepassing van dit artikel gaan we verder met RabbitMQ.

Voor de volledige details van de installatie kunt u de instructies op de officiële site volgen.

4. De lokale gegevensstroomserver

Om het proces van het genereren van onze applicaties te versnellen, gebruiken we Spring Initializr; met zijn hulp kunnen we onze verkrijgen Spring Boot applicaties in een paar minuten.

Kies na het bereiken van de website een Groep en een Artefact naam.

Zodra dit is gebeurd, klikt u op de knop Genereer Project om de download van het Maven-artefact te starten.

Nadat de download is voltooid, pakt u het project uit en importeert u het als een Maven-project in uw IDE naar keuze.

Laten we een Maven-afhankelijkheid aan het project toevoegen. Zoals we nodig hebben Dataflow lokale server bibliotheken, laten we de spring-cloud-starter-dataflow-server-local afhankelijkheid toevoegen:

 org.springframework.cloud spring-cloud-starter-dataflow-server-local 

Nu moeten we de Spring Boot hoofdklasse met @EnableDataFlowServer annotatie:

@EnableDataFlowServer @SpringBootApplication openbare klasse SpringDataFlowServerApplication {openbare statische leegte hoofd (String [] args) {SpringApplication.run (SpringDataFlowServerApplication.class, args); }} 

Dat is alles. Onze Lokale gegevensstroomserver is klaar om te worden uitgevoerd:

mvn spring-boot: run

De applicatie zal opstarten op poort 9393.

5. De gegevensstroomschil

Nogmaals, ga naar de Spring Initializr en kies een Groep en Artefact naam.

Nadat we het project hebben gedownload en geïmporteerd, gaan we een spring-cloud-dataflow-shell-afhankelijkheid toevoegen:

 org.springframework.cloud spring-cloud-dataflow-shell 

Nu moeten we de @EnableDataFlowShell annotatie bij de Spring Boot hoofdklasse:

@EnableDataFlowShell @SpringBootApplication openbare klasse SpringDataFlowShellApplication {openbare statische leegte hoofd (String [] args) {SpringApplication.run (SpringDataFlowShellApplication.class, args); }} 

We kunnen nu de shell uitvoeren:

mvn spring-boot: run

Nadat de shell is gestart, kunnen we het helpen opdracht in de prompt om een ​​volledige lijst met opdrachten te zien die we kunnen uitvoeren.

6. De brontoepassing

Op dezelfde manier maken we op Initializr nu een eenvoudige applicatie en voegen we een Stroom Rabbit afhankelijkheid genaamd spring-cloud-starter-stream-rabbit:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 

We zullen dan het @EnableBinding (Source.class) annotatie bij de Spring Boot hoofdklasse:

@EnableBinding (Source.class) @SpringBootApplication openbare klasse SpringDataFlowTimeSourceApplication {openbare statische leegte hoofd (String [] args) {SpringApplication.run (SpringDataFlowTimeSourceApplication.class, args); }}

Nu moeten we de bron definiëren van de gegevens die moeten worden verwerkt. Deze bron kan elke potentieel eindeloze werklast zijn (sensordata van internet-of-things, 24/7 verwerking van gebeurtenissen, opname van online transactiegegevens).

In onze voorbeeldtoepassing produceren we elke 10 seconden één gebeurtenis (voor de eenvoud een nieuw tijdstempel) met een Poller.

De @BuienRadarNL annotatie stuurt een bericht naar het uitvoerkanaal van de bron, waarbij de retourwaarde wordt gebruikt als de lading van het bericht:

@Bean @InboundChannelAdapter (waarde = Source.OUTPUT, poller = @Poller (fixedDelay = "10000", maxMessagesPerPoll = "1")) openbare MessageSource timeMessageSource () {return () -> MessageBuilder.withPayload (nieuwe datum (). GetTime ()).bouwen(); } 

Onze databron is klaar.

7. De verwerkingsapplicatie

Vervolgens maken we een applicatie en voegen we een Stroom Rabbit afhankelijkheid.

We zullen dan het @EnableBinding (Processor.class) annotatie bij de Spring Boot hoofdklasse:

@EnableBinding (Processor.class) @SpringBootApplication openbare klasse SpringDataFlowTimeProcessorApplication {openbare statische leegte hoofd (String [] args) {SpringApplication.run (SpringDataFlowTimeProcessorApplication.class, args); }}

Vervolgens moeten we een methode definiëren om de gegevens te verwerken die afkomstig zijn van de brontoepassing.

Om een ​​transformator te definiëren, moeten we deze methode annoteren met @Transformator annotatie:

@Transformer (inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) openbare objecttransformatie (lange tijdstempel) {DateFormat dateFormat = nieuwe SimpleDateFormat ("jjjj / MM / dd uu: mm: jj"); String date = dateFormat.format (tijdstempel); retourdatum; }

Het converteert een tijdstempel van het ‘input'-kanaal naar een geformatteerde datum die naar het‘ output'-kanaal wordt gestuurd.

8. De gootsteentoepassing

De laatste applicatie die moet worden gemaakt, is de Sink-applicatie.

Nogmaals, ga naar de Spring Initializr en kies een Groep, een Artefact naam. Laten we na het downloaden van het project een Stroom Rabbit afhankelijkheid.

Voeg vervolgens het @EnableBinding (Sink.class) annotatie bij de Spring Boot hoofdklasse:

@EnableBinding (Sink.class) @SpringBootApplication openbare klasse SpringDataFlowLoggingSinkApplication {openbare statische leegte hoofd (String [] args) {SpringApplication.run (SpringDataFlowLoggingSinkApplication.class, args); }}

Nu hebben we een methode nodig om de berichten van de processortoepassing te onderscheppen.

Om dit te doen, moeten we het @StreamListener (Sink.INPUT) annotatie bij onze methode:

@StreamListener (Sink.INPUT) public void loggerSink (String datum) {logger.info ("Ontvangen:" + datum); }

De methode drukt eenvoudig het tijdstempel af dat is omgezet in een opgemaakte datum naar een logboekbestand.

9. Registreer een Stream-app

Met de Spring Cloud Data Flow Shell kunnen we een Stream-app registreren bij het App-register met behulp van de app registreren opdracht.

We moeten een unieke naam, toepassingstype en een URI opgeven die kunnen worden omgezet naar het app-artefact. Specificeer voor het type “bron“, “processor‘Of’wastafel“.

Wanneer u een URI met het maven-schema opgeeft, moet het formaat aan het volgende voldoen:

maven: //: [: [:]]:

Om het Bron, Verwerker en Wastafel eerder gemaakte applicaties, ga naar het Spring Cloud Data Flow Shell en voer de volgende opdrachten uit vanaf de prompt:

app register --naam tijdbron --type bron --uri maven: //com.baeldung.spring.cloud: spring-data-flow-time-source: jar: 0.0.1-SNAPSHOT app register --naam tijd -processor --type processor --uri maven: //com.baeldung.spring.cloud: spring-data-flow-time-processor: jar: 0.0.1-SNAPSHOT app-register --naam logging-sink --type sink --uri maven: //com.baeldung.spring.cloud: spring-data-flow-logging-sink: jar: 0.0.1-SNAPSHOT 

10. Maak en implementeer de stream

Ga naar het Spring Cloud Data Flow Shell en voer het volgende shell-commando uit:

stream create --name time-to-log --definition 'time-source | tijdprocessor | logging-sink '

Dit definieert een stream met de naam time-to-log gebaseerd op de DSL-expressie ‘Tijdsbron | tijdprocessor | logging-sink '.

Voer vervolgens de volgende shell-opdracht uit om de stream te implementeren:

stream deploy --name time-to-log

De Gegevensstroom-server lost op tijdbron, tijdprocessor, en logboekregistratie om coördinaten te maven en gebruikt die om het tijdbron, tijdprocessor en logboekregistratie toepassingen van de stream.

Als de stream correct is geïmplementeerd, ziet u in het Gegevensstroom-server registreert dat de modules zijn gestart en aan elkaar zijn gekoppeld:

2016-08-24 12: 29: 10.516 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer: implementatie van app time-to-log.logging-sink-instantie 0 Logboeken bevinden zich in PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034549734 / time-to-log.logging-sink 2016-08-24 12: 29: 17.600 INFO 8096 --- [io-9393-exec-10] oscd spi.local.LocalAppDeployer: implementatie van app time-to-log.time-processor-instantie 0 Logboeken bevinden zich in PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034556862 / time-to-log.time-processor 2016-08-24 12: 29: 23.280 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer: implementatie van time-to-log.time-source-instantie van app 0 Logboeken bevinden zich in PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034562861 / time-to-log.time-source

11. Het resultaat bekijken

In dit voorbeeld verzendt de bron eenvoudig het huidige tijdstempel als een bericht elke seconde, formatteert de processor het en voert de log-sink het opgemaakte tijdstempel uit met behulp van het logging-framework.

De logboekbestanden bevinden zich in de directory die wordt weergegeven in het Gegevensstroom-server'S logboekuitvoer, zoals hierboven weergegeven. Om het resultaat te zien, kunnen we het logboek volgen:

tail -f PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034549734 / time-to-log.logging-sink / stdout_0.log 2016-08-24 12: 40: 42.029 INFO 9488 --- [ r.time-to-log-1] scSpringDataFlowLoggingSinkApplication: Ontvangen: 2016/08/24 11:40:01 2016-08-24 12:40: 52.035 INFO 9488 --- [r.time-to-log-1 ] scSpringDataFlowLoggingSinkApplication: Ontvangen: 2016/08/24 11:40:11 2016-08-24 12: 41: 02.030 INFO 9488 --- [r.time-to-log-1] scSpringDataFlowLoggingSinkApplication: Ontvangen: 2016/08 / 24 11:40:21

12. Conclusie

In dit artikel hebben we gezien hoe we een datapijplijn kunnen bouwen voor streamverwerking door het gebruik van Spring Cloud-gegevensstroom.

We zagen ook de rol van Bron, Verwerker en Wastafel applicaties in de stream en hoe deze module in een Gegevensstroom-server door het gebruik van Dataflow-shell.

De voorbeeldcode is te vinden in het GitHub-project.


$config[zx-auto] not found$config[zx-overlay] not found