Inleiding tot Kafka-connectoren

1. Overzicht

Apache Kafka® is een gedistribueerd streamingplatform. In een eerdere tutorial hebben we besproken hoe Kafka-consumenten en -producenten met Spring kunnen worden geïmplementeerd.

In deze zelfstudie leren we hoe u Kafka-connectoren gebruikt.

We zullen kijken naar:

  • Verschillende soorten Kafka-connectoren
  • Functies en modi van Kafka Connect
  • Connectors configureren met behulp van eigenschappenbestanden en de REST API

2. Basisprincipes van Kafka Connect en Kafka-connectoren

Kafka Connect is een framework om Kafka te verbinden met externe systemen zoals databases, sleutelwaardearchieven, zoekindexen en bestandssystemen, met behulp van zogenaamde Connectoren.

Kafka-connectoren zijn kant-en-klare componenten, die ons kunnen helpen om gegevens van externe systemen te importeren in Kafka-onderwerpen en exporteer gegevens van Kafka-onderwerpen naar externe systemen. We kunnen bestaande connectorimplementaties gebruiken voor veelgebruikte gegevensbronnen en sinks of onze eigen connectoren implementeren.

EEN bronconnector verzamelt gegevens van een systeem. Bronsystemen kunnen volledige databases, streamtabellen of message brokers zijn. Een bronconnector kan ook metrische gegevens van applicatieservers verzamelen in Kafka-onderwerpen, waardoor de gegevens beschikbaar zijn voor streamverwerking met een lage latentie.

EEN gootsteen connector levert gegevens van Kafka-onderwerpen in andere systemen, zoals indexen zoals Elasticsearch, batch-systemen zoals Hadoop of welke database dan ook.

Sommige connectoren worden onderhouden door de community, terwijl andere worden ondersteund door Confluent of zijn partners. We kunnen echt connectoren vinden voor de meest populaire systemen, zoals S3, JDBC en Cassandra, om er maar een paar te noemen.

3. Eigenschappen

Kafka Connect-functies zijn onder meer:

  • Een raamwerk voor het verbinden van externe systemen met Kafka - het vereenvoudigt de ontwikkeling, implementatie en het beheer van connectoren
  • Gedistribueerde en zelfstandige modi - het helpt ons om grote clusters te implementeren door gebruik te maken van de gedistribueerde aard van Kafka, evenals setups voor ontwikkeling, testen en kleine productie-implementaties
  • REST-interface - we kunnen connectoren beheren met behulp van een REST API
  • Automatisch offsetbeheer - Kafka Connect helpt ons om de offset commit-proces, wat ons de moeite bespaart om dit foutgevoelige deel van de connectorontwikkeling handmatig te implementeren
  • Standaard gedistribueerd en schaalbaar - Kafka Connect gebruikt het bestaande protocol voor groepsbeheer; we kunnen meer workers toevoegen om een ​​Kafka Connect-cluster op te schalen
  • Streaming en batch-integratie - Kafka Connect is een ideale oplossing voor het overbruggen van streaming- en batchdatasystemen in combinatie met de bestaande mogelijkheden van Kafka
  • Transformaties - deze stellen ons in staat om eenvoudige en lichtgewicht wijzigingen aan te brengen in individuele berichten

4. Installatie

In plaats van de gewone Kafka-distributie te gebruiken, zullen we Confluent Platform downloaden, een Kafka-distributie van Confluent, Inc., het bedrijf achter Kafka. Confluent Platform wordt geleverd met enkele extra tools en clients, vergeleken met gewone Kafka, evenals enkele extra vooraf gebouwde connectoren.

Voor ons geval is de Open Source-editie voldoende, die te vinden is op de site van Confluent.

5. Snel starten met Kafka Connect

Om te beginnen bespreken we het principe van Kafka Connect, met behulp van de meest elementaire connectoren, namelijk het bestand bron connector en het bestand wastafel connector.

Handig is dat Confluent Platform wordt geleverd met beide connectoren, evenals met referentieconfiguraties.

5.1. Configuratie bronconnector

Voor de source-connector is de referentieconfiguratie beschikbaar op $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties:

name = local-file-source connector.class = FileStreamSource-taken.max = 1 topic = connect-test-bestand = test.txt

Deze configuratie heeft enkele eigenschappen die gemeenschappelijk zijn voor alle bronconnectoren:

  • naam is een door de gebruiker opgegeven naam voor de connectorinstantie
  • connector.klasse specificeert de implementatieklasse, in feite het soort connector
  • taken.max specificeert hoeveel instanties van onze bronconnector parallel moeten worden uitgevoerd, en
  • onderwerp definieert het onderwerp waarnaar de connector de uitvoer moet sturen

In dit geval hebben we ook een connectorspecifiek kenmerk:

  • het dossier definieert het bestand waaruit de connector de invoer moet lezen

Om dit te laten werken, maken we een basisbestand met wat inhoud:

echo -e "foo \ nbar \ n"> $ CONFLUENT_HOME / test.txt

Merk op dat de werkdirectory $ CONFLUENT_HOME is.

5.2. Sink Connector Configuratie

Voor onze gootsteenconnector gebruiken we de referentieconfiguratie op $ CONFLUENT_HOME / etc / kafka / connect-file-sink.properties:

name = local-file-sink connector.class = FileStreamSink taken.max = 1 bestand = test.sink.txt topics = connect-test

Logischerwijs bevat het exact dezelfde parameters, hoewel deze keer connector.klasse specificeert de implementatie van de sink-connector, en het dossier is de locatie waar de connector de inhoud moet schrijven.

5.3. Worker Config

Ten slotte moeten we de Connect-werker configureren, die onze twee connectoren zal integreren en het werk zal doen van het lezen van de bronconnector en het schrijven naar de gootsteenconnector.

Daar kunnen we gebruik van maken $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties:

bootstrap.servers = localhost: 9092 key.converter = org.apache.kafka.connect.json.JsonConverter value.converter = org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable = false value.converter. schemas.enable = false offset.storage.file.filename = / tmp / connect.offsets offset.flush.interval.ms = 10000 plugin.path = / share / java

Let daar op plugin.path kan een lijst met paden bevatten waar connectorimplementaties beschikbaar zijn

Omdat we connectoren zullen gebruiken die bij Kafka zijn gebundeld, kunnen we instellen plugin.path naar $ CONFLUENT_HOME / share / java. Als u met Windows werkt, kan het nodig zijn om hier een absoluut pad op te geven.

Voor de andere parameters kunnen we de standaardwaarden laten staan:

  • bootstrap.servers bevat de adressen van de Kafka-makelaars
  • key.converter en value.converter definieer convertorklassen, die de gegevens serialiseren en deserialiseren terwijl deze van de bron naar Kafka en vervolgens van Kafka naar de sink stromen
  • key.converter.schemas.enable en value.converter.schemas.enable zijn converter-specifieke instellingen
  • offset.storage.file.filename is de belangrijkste instelling bij het uitvoeren van Connect in standalone-modus: het bepaalt waar Connect zijn offsetgegevens moet opslaan
  • offset.flush.interval.ms definieert het interval waarmee de werknemer offsets voor taken probeert vast te leggen

En de lijst met parameters is behoorlijk volwassen, dus bekijk de officiële documentatie voor een volledige lijst.

5.4. Kafka Connect in zelfstandige modus

En daarmee kunnen we onze eerste connectorconfiguratie starten:

$ CONFLUENT_HOME / bin / connect-standalone \ $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-sink. eigendommen

Ten eerste kunnen we de inhoud van het onderwerp inspecteren met behulp van de opdrachtregel:

$ CONFLUENT_HOME / bin / kafka-console-consument --bootstrap-server localhost: 9092 --topic connect-test - vanaf het begin

Zoals we kunnen zien, heeft de bronconnector de gegevens van het test.txt bestand, transformeerde het naar JSON en stuurde het naar Kafka:

{"schema": {"type": "string", "optional": false}, "payload": "foo"} {"schema": {"type": "string", "optional": false}, "payload": "bar"}

En als we de map eens bekijken $ CONFLUENT_HOME, kunnen we zien dat een bestand test.sink.txt is hier gemaakt:

cat $ CONFLUENT_HOME / test.sink.txt foo-balk

Omdat de sink-connector de waarde uit de laadvermogen attribuut en schrijft het naar het bestemmingsbestand, de gegevens in test.sink.txt heeft de inhoud van het origineel test.txt het dossier.

Laten we nu meer regels toevoegen aan test.txt.

Als we dat doen, zien we dat de bronconnector deze wijzigingen automatisch detecteert.

We hoeven er alleen voor te zorgen dat aan het einde een nieuwe regel wordt ingevoegd, anders houdt de bronconnector geen rekening met de laatste regel.

Laten we op dit punt het Connect-proces stoppen, aangezien we Connect starten gedistribueerde modus in een paar regels.

6. Connect's REST API

Tot nu toe hebben we alle configuraties gemaakt door eigenschappenbestanden via de opdrachtregel door te geven. Omdat Connect echter is ontworpen om als een service te worden uitgevoerd, is er ook een REST API beschikbaar.

Het is standaard beschikbaar op // localhost: 8083. Enkele eindpunten zijn:

  • GET / connectoren - geeft een lijst terug met alle gebruikte connectoren
  • GET / connectors / {naam} - geeft details over een specifieke connector terug
  • POST / connectoren - maakt een nieuwe connector aan; de hoofdtekst van het verzoek moet een JSON-object zijn met een stringnaamveld en een objectconfiguratieveld met de connectorconfiguratieparameters
  • GET / connectors / {naam} / status - geeft de huidige status van de connector terug - inclusief of deze actief, mislukt of gepauzeerd is - aan welke werknemer deze is toegewezen, foutinformatie als deze is mislukt en de status van al zijn taken
  • VERWIJDEREN / connectoren / {naam} - verwijdert een connector, stopt netjes alle taken en verwijdert de configuratie ervan
  • GET / connector-plug-ins - retourneert een lijst met connectorplug-ins die zijn geïnstalleerd in het Kafka Connect-cluster

De officiële documentatie biedt een lijst met alle eindpunten.

We gebruiken de REST API voor het maken van nieuwe connectoren in de volgende sectie.

7. Kafka Connect in gedistribueerde modus

De stand-alone modus werkt perfect voor ontwikkeling en testen, maar ook voor kleinere opstellingen. Als we echter volledig gebruik willen maken van de gedistribueerde aard van Kafka, moeten we Connect starten in de gedistribueerde modus.

Hierdoor worden connectorinstellingen en metagegevens opgeslagen in Kafka-onderwerpen in plaats van in het bestandssysteem. Als gevolg hiervan zijn de werkknooppunten echt staatloos.

7.1. Connect starten

Een referentieconfiguratie voor de gedistribueerde modus is te vinden op $ CONFLUENT_HOME/etc/kafka/connect-distributed.properties.

Parameters zijn meestal hetzelfde als voor de stand-alone modus. Er zijn maar een paar verschillen:

  • group.id definieert de naam van de Connect-clustergroep. De waarde moet verschillen van elke consumentengroep-ID
  • offset.storage.topic, config.storage.topic en status.storage.topic definieer onderwerpen voor deze instellingen. Voor elk onderwerp kunnen we ook een replicatiefactor definiëren

Nogmaals, de officiële documentatie biedt een lijst met alle parameters.

We kunnen Connect in de gedistribueerde modus als volgt starten:

$ CONFLUENT_HOME / bin / connect-distributed $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties

7.2. Connectors toevoegen met behulp van de REST API

Nu, in vergelijking met de zelfstandige opstartopdracht, hebben we geen connectorconfiguraties als argumenten doorgegeven. In plaats daarvan moeten we de connectoren maken met behulp van de REST API.

Om ons voorbeeld van vroeger in te stellen, moeten we twee POST-verzoeken sturen naar // localhost: 8083 / connectoren met de volgende JSON-structuren.

Eerst moeten we de body voor de bronconnector POST maken als een JSON-bestand. Hier noemen we het connect-file-source.json:

{"name": "local-file-source", "config": {"connector.class": "FileStreamSource", "taken.max": 1, "file": "test-distributed.txt", "topic ":" connect-distributed "}}

Merk op hoe dit er ongeveer hetzelfde uitziet als het referentieconfiguratiebestand dat we de eerste keer hebben gebruikt.

En dan posten we het:

curl -d @ "$ CONFLUENT_HOME / connect-file-source.json" \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors

Vervolgens doen we hetzelfde voor de sink-connector, waarbij we het bestand aanroepen connect-bestand-sink.json:

{"name": "local-file-sink", "config": {"connector.class": "FileStreamSink", "taken.max": 1, "file": "test-distributed.sink.txt", "topics": "connect-distributed"}}

En voer de POST uit zoals eerder:

curl -d @ $ CONFLUENT_HOME / connect-file-sink.json \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors

Indien nodig kunnen we controleren of deze installatie correct werkt:

$ CONFLUENT_HOME / bin / kafka-console-consumer --bootstrap-server localhost: 9092 --topic connect-distributed - vanaf het begin {"schema": {"type": "string", "optional": false}, "payload": "foo"} {"schema": {"type": "string", "optional": false}, "payload": "bar"}

En als we de map eens bekijken $ CONFLUENT_HOME, kunnen we zien dat een bestand test-distributed.sink.txt is hier gemaakt:

cat $ CONFLUENT_HOME / test-distributed.sink.txt foo-balk

Nadat we de gedistribueerde installatie hebben getest, gaan we opruimen door de twee connectoren te verwijderen:

curl -X DELETE // localhost: 8083 / connectors / local-file-source curl -X DELETE // localhost: 8083 / connectors / local-file-sink

8. Gegevens transformeren

8.1. Ondersteunde transformaties

Transformaties stellen ons in staat om eenvoudige en lichtgewicht wijzigingen aan te brengen in individuele berichten.

Kafka Connect ondersteunt de volgende ingebouwde transformaties:

  • InsertField - Voeg een veld toe met behulp van statische gegevens of record metagegevens
  • ReplaceField - Filter of hernoem velden
  • MaskField - Vervang een veld door de geldige null-waarde voor het type (bijvoorbeeld nul of een lege string)
  • HoistField - Wikkel het hele evenement als een enkel veld in een structuur of een kaart
  • ExtractField - Extraheer een specifiek veld uit struct en map en neem alleen dit veld op in de resultaten
  • SetSchemaMetadata - Wijzig de schemanaam of versie
  • TijdstempelRouter - Wijzig het onderwerp van een record op basis van het oorspronkelijke onderwerp en tijdstempel
  • RegexRouter - Wijzig het onderwerp van een record op basis van het oorspronkelijke onderwerp, een vervangende tekenreeks en een reguliere expressie

Een transformatie wordt geconfigureerd met de volgende parameters:

  • transformeert - Een door komma's gescheiden lijst met aliassen voor de transformaties
  • transformeert. $ alias.type - Klassenaam voor de transformatie
  • transformeert. $ alias. $ transformationSpecificConfig - Configuratie voor de respectievelijke transformatie

8.2. Een transformator toepassen

Laten we de volgende twee transformaties opzetten om enkele transformatiefuncties te testen:

  • Laten we eerst het hele bericht inpakken als een JSON-structuur
  • Laten we daarna een veld aan die structuur toevoegen

Voordat we onze transformaties toepassen, moeten we Connect configureren om schemaloze JSON te gebruiken door het connect-distributed.properties:

key.converter.schemas.enable = false value.converter.schemas.enable = false

Daarna moeten we Connect opnieuw opstarten, opnieuw in de gedistribueerde modus:

$ CONFLUENT_HOME / bin / connect-distributed $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties

Nogmaals, we moeten de body voor de bronconnector POST maken als een JSON-bestand. Hier noemen we het connect-file-source-transform.json.

Naast de reeds bekende parameters, voegen we een paar regels toe voor de twee vereiste transformaties:

{"name": "local-file-source", "config": {"connector.class": "FileStreamSource", "taken.max": 1, "file": "test-transformation.txt", "topic ":" connect-transformation "," transforms ":" MakeMap, InsertSource "," transforms.MakeMap.type ":" org.apache.kafka.connect.transforms.HoistField $ Value "," transforms.MakeMap.field ": "line", "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField $ Value", "transforms.InsertSource.static.field": "data_source", "transforms.InsertSource.static.value ":" test-file-source "}}

Laten we daarna de POST uitvoeren:

curl -d @ $ CONFLUENT_HOME / connect-file-source-transform.json \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors

Laten we een paar regels naar ons schrijven test-transformation.txt:

Foo Bar

Als we nu het connect-transformatie topic, zouden we de volgende regels moeten krijgen:

{"line": "Foo", "data_source": "test-file-source"} {"line": "Bar", "data_source": "test-file-source"}

9. Gebruiksklare connectoren

Laten we, nadat we deze eenvoudige connectoren hebben gebruikt, eens kijken naar meer geavanceerde, gebruiksklare connectoren en hoe ze te installeren.

9.1. Waar vindt u connectoren

Voorgebouwde connectoren zijn verkrijgbaar bij verschillende bronnen:

  • Een paar connectoren zijn gebundeld met gewone Apache Kafka (bron en sink voor bestanden en console)
  • Nog enkele connectoren worden gebundeld met Confluent Platform (ElasticSearch, HDFS, JDBC en AWS S3)
  • Bekijk ook Confluent Hub, een soort app store voor Kafka-connectoren. Het aantal aangeboden connectoren groeit continu:
    • Confluent connectoren (ontwikkeld, getest, gedocumenteerd en worden volledig ondersteund door Confluent)
    • Gecertificeerde connectoren (geïmplementeerd door een derde partij en gecertificeerd door Confluent)
    • Door de gemeenschap ontwikkelde en ondersteunde connectoren
  • Daarnaast biedt Confluent ook een Connectors-pagina, met enkele connectoren die ook beschikbaar zijn op de Confluent Hub, maar ook met wat meer community-connectoren
  • En tot slot zijn er ook leveranciers die connectoren leveren als onderdeel van hun product. Landoop biedt bijvoorbeeld een streamingbibliotheek met de naam Lenzen, die ook een set van ~ 25 open source-connectoren bevat (waarvan er vele ook op andere plaatsen kruiselings worden vermeld)

9.2. Connectors installeren vanaf Confluent Hub

De enterprise-versie van Confluent biedt een script voor het installeren van Connectors en andere componenten van Confluent Hub (het script is niet opgenomen in de Open Source-versie). Als we de enterprise-versie gebruiken, kunnen we een connector installeren met de volgende opdracht:

$ CONFLUENT_HOME / bin / confluent-hub installeer confluentinc / kafka-connect-mqtt: 1.0.0-preview

9.3. Connectors handmatig installeren

Als we een connector nodig hebben die niet beschikbaar is op Confluent Hub of als we de Open Source-versie van Confluent hebben, kunnen we de vereiste connectoren handmatig installeren. Daarvoor moeten we de connector downloaden en uitpakken, en de meegeleverde libs naar de map verplaatsen die is opgegeven als plugin.path.

Voor elke connector moet het archief twee mappen bevatten die voor ons interessant zijn:

  • De lib map bevat de connector jar, bijvoorbeeld kafka-connect-mqtt-1.0.0-preview.jar, evenals wat meer potten die nodig zijn voor de connector
  • De enz map bevat een of meer referentieconfiguratiebestanden

We moeten de lib map naar $ CONFLUENT_HOME / share / java, of welk pad we ook hebben opgegeven plugin.path in connect-standalone.properties en connect-distributed.properties. Daarbij kan het ook zinvol zijn om de map een zinvolle naam te geven.

We kunnen de configuratiebestanden gebruiken van enz ofwel door ernaar te verwijzen terwijl we in de stand-alone modus beginnen, of we kunnen gewoon de eigenschappen pakken en er een JSON-bestand van maken.

10. Conclusie

In deze tutorial hebben we bekeken hoe we Kafka Connect kunnen installeren en gebruiken.

We hebben gekeken naar soorten connectoren, zowel source als sink. We hebben ook gekeken naar enkele functies en modi die Connect kan gebruiken. Vervolgens hebben we transformatoren bekeken. En tot slot hebben we geleerd waar we kunnen komen en hoe we aangepaste connectoren kunnen installeren.

Zoals altijd zijn de configuratiebestanden te vinden op GitHub.