ETL met Spring Cloud Data Flow

1. Overzicht

Spring Cloud Data Flow is een cloud-native toolkit voor het bouwen van realtime datapijplijnen en batchprocessen. Spring Cloud Data Flow is klaar om te worden gebruikt voor een reeks toepassingen voor gegevensverwerking, zoals eenvoudige import / export, ETL-verwerking, gebeurtenisstreaming en voorspellende analyses.

In deze zelfstudie leren we een voorbeeld van real-time Extract Transform and Load (ETL) met behulp van een stroompijplijn die gegevens uit een JDBC-database haalt, deze omzet in eenvoudige POJO's en deze in een MongoDB laadt.

2. ETL en Event-Stream-verwerking

ETL - extraheren, transformeren en laden - werd gewoonlijk een proces genoemd dat gegevens uit verschillende databases en systemen in een gemeenschappelijk datawarehouse laadt. In dit datawarehouse is het mogelijk om zware gegevensanalyses uit te voeren zonder de algehele prestaties van het systeem in gevaar te brengen.

Nieuwe trends veranderen echter de manier waarop dit wordt gedaan. ETL speelt nog steeds een rol bij het overbrengen van data naar datawarehouses en datameren.

Tegenwoordig kan dit met streams in een event-stream-architectuur met behulp van Spring Cloud Data Flow.

3. Spring Cloud-gegevensstroom

Met Spring Cloud Data Flow (SCDF) kunnen ontwikkelaars datapijplijnen maken in twee smaken:

  • Langlevende real-time streamtoepassingen met Spring Cloud Stream
  • Kortstondige gegroepeerde taaktoepassingen met Spring Cloud Task

In dit artikel bespreken we de eerste, een langlevende streamingapplicatie op basis van Spring Cloud Stream.

3.1. Spring Cloud Stream-applicaties

De SCDF Stream-pijpleidingen zijn opgebouwd uit stappen, waarelke stap is een applicatie gebouwd in Spring Boot-stijl met behulp van het Spring Cloud Stream micro-framework. Deze applicaties zijn geïntegreerd met een messaging-middleware zoals Apache Kafka of RabbitMQ.

Deze applicaties zijn ingedeeld in bronnen, processors en sinks. In vergelijking met het ETL-proces zouden we kunnen zeggen dat de bron het "extract" is, de processor de "transformator" en de put het "load" -gedeelte.

In sommige gevallen kunnen we een applicatiestarter inzetten in één of meerdere stappen van de pijplijn. Dit betekent dat we voor een stap geen nieuwe applicatie hoeven te implementeren, maar in plaats daarvan een reeds geïmplementeerde applicatie-starter moeten configureren.

Een lijst met starters voor toepassingen vindt u hier.

3.2. Spring Cloud Data Flow Server

Het laatste stukje architectuur is de Spring Cloud Data Flow Server. De SCDF-server doet de implementatie van de applicaties en de pijplijnstroom met behulp van de Spring Cloud Deployer-specificatie. Deze specificatie ondersteunt de cloud-native SCDF-smaak door te implementeren in een reeks moderne runtimes, zoals Kubernetes, Apache Mesos, Yarn en Cloud Foundry.

We kunnen de stream ook als een lokale implementatie uitvoeren.

Meer informatie over de SCDF-architectuur vindt u hier.

4. Omgeving instellen

Voordat we beginnen, moeten we dat doen kies de onderdelen van deze complexe inzet. Het eerste stuk dat moet worden gedefinieerd, is de SCDF-server.

Om uit te proberen, we gebruiken SCDF Server Local voor lokale ontwikkeling. Voor de productie-implementatie kunnen we later een cloud-native runtime kiezen, zoals SCDF Server Kubernetes. We kunnen de lijst met serverruntimes hier vinden.

Laten we nu eens kijken naar de systeemvereisten om deze server te laten draaien.

4.1. systeem vereisten

Om de SCDF-server uit te voeren, moeten we twee afhankelijkheden definiëren en instellen:

  • de messaging middleware, en
  • het RDBMS.

Voor de messaging-middleware, we werken met RabbitMQ, en we kiezen PostgreSQL als RDBMS voor het opslaan van onze pijplijnstroomdefinities.

Om RabbitMQ uit te voeren, download je hier de nieuwste versie en start je een RabbitMQ-instantie met de standaardconfiguratie of voer je de volgende Docker-opdracht uit:

docker run --naam dataflow-rabbit -p 15672: 15672 -p 5672: 5672 -d rabbitmq: 3-beheer

Als laatste installatiestap installeert en voert u PostgreSQL RDBMS uit op de standaardpoort 5432. Maak daarna een database waarin SCDF zijn streamdefinities kan opslaan met behulp van het volgende script:

DATABASE-gegevensstroom maken;

4.2. Spring Cloud Data Flow Server Lokaal

Voor het draaien van de SCDF Server Local kunnen we ervoor kiezen om de server te starten met docker-compose, of we kunnen het starten als een Java-applicatie.

Hier voeren we de SCDF Server Local uit als een Java-applicatie. Om de applicatie te configureren, moeten we de configuratie definiëren als Java-applicatieparameters. We hebben Java 8 nodig in het systeempad.

Om de potten en afhankelijkheden te hosten, moeten we een basismap voor onze SCDF-server maken en de lokale SCDF-serverdistributie naar deze map downloaden. U kunt de meest recente distributie van SCDF Server Local hier downloaden.

We moeten ook een lib-map maken en daar een JDBC-stuurprogramma plaatsen. De nieuwste versie van de PostgreSQL-driver is hier beschikbaar.

Laten we tot slot de lokale SCDF-server uitvoeren:

$ java -Dloader.path = lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \ --spring.datasource.url = jdbc: postgresql: //127.0.0.1: 5432 / dataflow \ --spring.datasource.username = postgres_username \ --spring.datasource.password = postgres_password \ --spring.datasource.driver-class-name = org.postgresql.Driver \ --spring.rabbitmq.host = 127.0.0.1 \ --spring.rabbitmq.port = 5672 \ --spring.rabbitmq.username = gast \ --spring.rabbitmq.password = gast

We kunnen controleren of het actief is door naar deze URL te kijken:

// localhost: 9393 / dashboard

4.3. Spring Cloud Data Flow Shell

De SCDF-schaal is een opdrachtregelprogramma dat het gemakkelijk maakt om onze applicaties en pijplijnen samen te stellen en te implementeren. Deze Shell-opdrachten worden uitgevoerd via de Spring Cloud Data Flow Server REST API.

Download de nieuwste versie van de pot in uw SCDF-basismap, die hier beschikbaar is. Als het klaar is, voert u de volgende opdracht uit (update de versie indien nodig):

$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar ____ ____ _ __ / ___ | _ __ _ __ (_) _ __ __ _ / ___ | | ___ _ _ __ | | \ ___ \ | '_ \ | '__ | | '_ \ / _` | | | | | / _ \ | | | | / _` | ___) | | _) | | | | | | | (_ | | | | ___ | | (_) | | _ | | (_ | | | ____ / | .__ / | _ | | _ | _ | | _ | \ __, | \ ____ | _ | \ ___ / \ __, _ | \ __, _ | ____ | _ | _ __ | ___ / __________ | _ \ __ _ | | _ __ _ | ___ | | _____ __ \ \ \ \ \ \ | | | | / _` | __ / _` | | | _ | | / _ \ \ / \ / / \ \ \ \ \ \ | | _ | | (_ | | || (_ | | | _ | | | (_) \ VV / / / / / / / | ____ / \ __, _ | \ __ \ __, _ | | _ | | _ | \ ___ / \ _ / \ _ / / _ / _ / _ / _ / _ / Welkom bij de Spring Cloud Data Flow-shell. Voor hulp, klik op TAB of typ "help". dataflow:>

Als in plaats van 'dataflow:> " Jij krijgt "server-unknown:> " in de laatste regel draai je de SCDF-server niet op localhost. Voer in dat geval de volgende opdracht uit om verbinding te maken met een andere host:

server-onbekend:> dataflow-configuratieserver // {host}

Nu is Shell verbonden met de SCDF-server en kunnen we onze opdrachten uitvoeren.

Het eerste dat we in Shell moeten doen, is de applicatiestarters importeren. Vind hier de laatste versie voor RabbitMQ + Maven in Spring Boot 2.0.x, en voer de volgende opdracht uit (update nogmaals de versie, hier “Darwin-SR1", Indien nodig):

$ dataflow:> app importeren --uri //bit.ly/Darwin-SR1-stream-applications-rabbit-maven

Voer het volgende Shell-commando uit om de geïnstalleerde applicaties te controleren:

$ dataflow:> app-lijst

Als gevolg hiervan zouden we een tabel moeten zien met alle geïnstalleerde applicaties.

SCDF biedt ook een grafische interface met de naam Flo, waartoe we toegang hebben via dit adres: // localhost: 9393 / dashboard. Het gebruik ervan valt echter niet binnen het bestek van dit artikel.

5. Samenstellen van een ETL-pijplijn

Laten we nu onze streampijplijn maken. Om dit te doen, gebruiken we de JDBC Source-toepassingsstarter om informatie uit onze relationele database te extraheren.

We zullen ook een aangepaste processor maken voor het transformeren van de informatiestructuur en een aangepaste sink om onze gegevens in een MongoDB te laden.

5.1. Extract - Het voorbereiden van een relationele database voor extractie

Laten we een database maken met de naam van crm en een tafel met de naam van klant:

CREATE DATABASE crm;
CREATE TABLE klant (id bigint NOT NULL, geïmporteerde booleaanse DEFAULT false, klantnaam-teken varieert (50), PRIMAIRE SLEUTEL (id))

Merk op dat we een vlag gebruiken geïmporteerd, waarin wordt opgeslagen welk record al is geïmporteerd. Indien nodig kunnen we deze informatie ook in een andere tabel opslaan.

Laten we nu wat gegevens invoegen:

INVOEGEN IN klant (id, klantnaam, geïmporteerd) WAARDEN (1, 'Jan Jansen', false);

5.2. Transformeren - in kaart brengen JDBC Velden naar de MongoDB Veldenstructuur

Voor de transformatiestap doen we een eenvoudige vertaling van het veld klantnaam van de brontabel naar een nieuw veld naam. Andere transformaties kunnen hier worden gedaan, maar laten we het voorbeeld kort houden.

Om dit te doen, maken we een nieuw project met de naam klanttransformatie. De eenvoudigste manier om dit te doen, is door de Spring Initializr-site te gebruiken om het project te maken. Kies na het bereiken van de website een groep en een artefactnaam. We zullen gebruiken com.klant en klanttransformatie, respectievelijk.

Zodra dit is gebeurd, klikt u op de knop "Project genereren" om het project te downloaden. Pak het project vervolgens uit en importeer het in uw favoriete IDE, en voeg de volgende afhankelijkheid toe aan het pom.xml:

 org.springframework.cloud spring-cloud-stream-binder-rabbit 

Nu zijn we klaar om de conversie van de veldnaam te coderen. Om dit te doen, maken we het Klant class om als een adapter te fungeren. Deze klas krijgt de klantnaam via de setName () methode en zal zijn waarde uitvoeren via getName methode.

De @JsonProperty annotaties zullen de transformatie uitvoeren terwijl ze deserialiseren van JSON naar Java:

openbare klasse Klant {privé Lange id; private String naam; @JsonProperty ("klantnaam") public void setName (String naam) {this.name = naam; } @JsonProperty ("naam") openbare String getName () {naam retourneren; } // Getters en Setters}

De processor moet gegevens van een input ontvangen, de transformatie uitvoeren en het resultaat aan een outputkanaal binden. Laten we een klas maken om dit te doen:

importeer org.springframework.cloud.stream.annotation.EnableBinding; importeer org.springframework.cloud.stream.messaging.Processor; importeer org.springframework.integration.annotation.Transformer; @EnableBinding (Processor.class) openbare klasse CustomerProcessorConfiguration {@Transformer (inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) openbaar Customer convertToPojo (Payload klant) {return payload; }}

In de bovenstaande code kunnen we zien dat de transformatie automatisch plaatsvindt. De invoer ontvangt de gegevens terwijl JSON en Jackson deze deserialiseren naar een Klant object met behulp van de set methoden.

Het tegenovergestelde is voor de uitvoer, de gegevens worden geserialiseerd naar JSON met behulp van de krijgen methoden.

5.3. Load - Sink in MongoDB

Net als bij de transformatiestap, we maken nog een maven-project, nu met de naam klant-mongodb-wastafel. Nogmaals, toegang tot de Spring Initializr, voor de groep kiest com.klant, en kies voor het Artefact klant-mongodb-gootsteen. Typ vervolgens MongoDB in het zoekvak voor afhankelijkheden en download het project.

Pak het vervolgens uit en importeer het naar uw favoriete IDE.

Voeg vervolgens dezelfde extra afhankelijkheid toe als in het klanttransformatie project.

Nu gaan we nog een maken Klant class, voor het ontvangen van input in deze stap:

importeer org.springframework.data.mongodb.core.mapping.Document; @Document (collection = "customer") openbare klasse Klant {privé Lange id; private String naam; // Getters en Setters}

Voor het zinken van de Klant, maken we een Listener-klasse die de klantentiteit opslaat met de CustomerRepository:

@EnableBinding (Sink.class) openbare klasse CustomerListener {@Autowired privé CustomerRepository-repository; @StreamListener (Sink.INPUT) public void save (klant klant) {repository.save (klant); }}

En de CustomerRepository, in dit geval is een MongoRepository van Spring Data:

importeer org.springframework.data.mongodb.repository.MongoRepository; importeer org.springframework.stereotype.Repository; @Repository openbare interface CustomerRepository breidt MongoRepository uit {} 

5.4. Streamdefinitie

Nu, beide maatwerkapplicaties zijn klaar om te worden geregistreerd op SCDF Server. Om dit te bereiken, compileert u beide projecten met de opdracht Maven mvn installeren.

We registreren ze vervolgens met behulp van de Spring Cloud Data Flow Shell:

app register --naam klanttransformatie --type processor --uri maven: //com.customer: klanttransformatie: 0.0.1-SNAPSHOT
app register --naam klant-mongodb-sink --type sink --uri maven: //com.customer: klant-mongodb-sink: jar: 0.0.1-SNAPSHOT

Laten we tot slot kijken of de applicaties zijn opgeslagen op SCDF, voer het commando applicatielijst uit in de shell:

app lijst

Als gevolg hiervan zouden we beide toepassingen in de resulterende tabel moeten zien.

5.4.1. Stream Pipeline Domain-Specific Language - DSL

Een DSL definieert de configuratie en gegevensstroom tussen de applicaties. De SCDF DSL is eenvoudig. In het eerste woord definiëren we de naam van de applicatie, gevolgd door de configuraties.

De syntaxis is ook een op Unix geïnspireerde Pipeline-syntaxis, die gebruikmaakt van verticale balken, ook wel "pipes" genoemd, om meerdere applicaties met elkaar te verbinden:

http --poort = 8181 | logboek

Dit creëert een HTTP-applicatie die wordt bediend in poort 8181 die elke ontvangen body-payload naar een logboek stuurt.

Laten we nu eens kijken hoe we de DSL-streamdefinitie van de JDBC-bron kunnen maken.

5.4.2. JDBC-bronstroomdefinitie

De belangrijkste configuraties voor de JDBC-bron zijn vraag en bijwerken.vraag zal ongelezen records selecteren terwijl bijwerken zal een vlag veranderen om te voorkomen dat de huidige records opnieuw worden gelezen.

We zullen ook de JDBC-bron definiëren om te pollen met een vaste vertraging van 30 seconden en maximaal 1000 rijen te pollen. Ten slotte zullen we de verbindingsconfiguraties definiëren, zoals stuurprogramma, gebruikersnaam, wachtwoord en verbindings-URL:

jdbc --query = 'SELECTEER id, klantnaam FROM public.customer WAAR geïmporteerd = false' --update = 'UPDATE public.customer SET geïmporteerd = waar WHERE id in (: id)' --max-rijen-per-poll = 1000 --fixed-delay = 30 --time-unit = SECONDS --driver-class-name = org.postgresql.Driver --url = jdbc: postgresql: // localhost: 5432 / crm --username = postgres - wachtwoord = postgres

Meer JDBC-bronconfiguratie-eigenschappen zijn hier te vinden.

5.4.3. Klant MongoDB Sink Stream-definitie

Omdat we de verbindingsconfiguraties niet hebben gedefinieerd in application.properties van klant-mongodb-gootsteen, zullen we configureren via DSL-parameters.

Onze applicatie is volledig gebaseerd op de MongoDataAutoConfiguration. Bekijk hier de andere mogelijke configuraties. Kortom, we zullen de spring.data.mongodb.uri:

klant-mongodb-sink --spring.data.mongodb.uri = mongodb: // localhost / main

5.4.4. Maak en implementeer de stream

Om de definitieve streamdefinitie te maken, gaat u eerst terug naar de Shell en voert u de volgende opdracht uit (zonder regeleinden, ze zijn zojuist ingevoegd voor leesbaarheid):

stream create --name jdbc-to-mongodb --definition "jdbc --query = 'SELECTEER id, klantnaam VAN public.customer WAAR geïmporteerd = false' --fixed-delay = 30 --max-rijen-per-poll = 1000 --update = 'UPDATE klantenset geïmporteerd = waar WHERE id in (: id)' --time-unit = SECONDS --password = postgres --driver-class-name = org.postgresql.Driver --username = postgres --url = jdbc: postgresql: // localhost: 5432 / crm | klanttransformatie | klant-mongodb-sink --spring.data.mongodb.uri = mongodb: // localhost / main " 

Deze stream DSL definieert een stream met de naam jdbc-naar-mongodb. De volgende, we zullen de stream bij zijn naam inzetten:

stream implementeren --naam jdbc-to-mongodb 

Ten slotte zouden we de locaties van alle beschikbare logboeken in de loguitvoer moeten zien:

Logboeken bevinden zich in {PATH_TO_LOG} /spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink Logboeken bevinden zich in {PATH_TO_LOG} / spring-cloud-deployer / jdbc-to-mongodb /jdbc-to-mongodb.customer-transform Logboeken staan ​​in {PATH_TO_LOG} /spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc

6. Conclusie

In dit artikel hebben we een volledig voorbeeld gezien van een ETL-datapijplijn met Spring Cloud Data Flow.

Het meest opmerkelijke was dat we de configuraties van een applicatiestarter zagen, een ETL-stroompijplijn creëerden met behulp van de Spring Cloud Data Flow Shell en aangepaste applicaties implementeerden voor ons lezen, transformeren en schrijven van gegevens.

Zoals altijd is de voorbeeldcode te vinden in het GitHub-project.


$config[zx-auto] not found$config[zx-overlay] not found