Inleiding tot KafkaStreams in Java

1. Overzicht

In dit artikel zullen we kijken naar de KafkaStreams bibliotheek.

KafkaStreams is ontwikkeld door de makers van Apache Kafka. Het primaire doel van dit stukje software is om programmeurs in staat te stellen efficiënte, realtime streamingapplicaties te maken die zouden kunnen werken als microservices.

KafkaStreams stelt ons in staat om Kafka-onderwerpen te gebruiken, gegevens te analyseren of te transformeren en deze mogelijk naar een ander Kafka-onderwerp te sturen.

Laten zien KafkaStreams, we maken een eenvoudige applicatie die zinnen uit een onderwerp voorleest, woorden telt en de telling per woord afdrukt.

Belangrijk om op te merken is dat de KafkaStreams bibliotheek is niet reactief en heeft geen ondersteuning voor asynchrone bewerkingen en tegendrukafhandeling.

2. Maven Afhankelijkheid

Om te beginnen met het schrijven van streamverwerkingslogica met KafkaStreams, we moeten een afhankelijkheid toevoegen aan kafka-streams en kafka-klanten:

 org.apache.kafka kafka-streams 1.0.0 org.apache.kafka kafka-clients 1.0.0 

We moeten ook Apache Kafka hebben geïnstalleerd en gestart omdat we een Kafka-onderwerp zullen gebruiken. Dit onderwerp wordt de gegevensbron voor onze streamingtaak.

We kunnen Kafka en andere vereiste afhankelijkheden downloaden van de officiële website.

3. KafkaStreams Input configureren

Het eerste dat we zullen doen, is de definitie van het input-Kafka-onderwerp.

We kunnen de Samenvloeiend tool die we hebben gedownload - het bevat een Kafka-server. Het bevat ook de kafka-console-producent die we kunnen gebruiken om berichten naar Kafka te publiceren.

Laten we om te beginnen ons Kafka-cluster uitvoeren:

./confluent start

Zodra Kafka is gestart, kunnen we onze gegevensbron en naam van onze applicatie definiëren met APPLICATION_ID_CONFIG:

String inputTopic = "inputTopic";
Eigenschappen streamsConfiguration = nieuwe Eigenschappen (); streamsConfiguration.put (StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");

Een cruciale configuratieparameter is de BOOTSTRAP_SERVER_CONFIG. Dit is de URL naar onze lokale Kafka-instantie die we net zijn gestart:

private String bootstrapServers = "localhost: 9092"; streamsConfiguration.put (StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

Vervolgens moeten we het type sleutel en de waarde van de berichten doorgeven waaruit zal worden geconsumeerd input Onderwerp:

streamsConfiguration.put (StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ()); streamsConfiguration.put (StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ());

Streamverwerking is vaak stateful. Als we tussenresultaten willen opslaan, moeten we het STATE_DIR_CONFIG parameter.

In onze test gebruiken we een lokaal bestandssysteem:

streamsConfiguration.put (StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory (). getAbsolutePath ()); 

4. Het bouwen van een streamingtopologie

Zodra we ons invoeronderwerp hebben gedefinieerd, kunnen we een streamingtopologie maken - dat is een definitie van hoe gebeurtenissen moeten worden afgehandeld en getransformeerd.

In ons voorbeeld willen we een woordenteller implementeren. Voor elke zin verzonden naar input Onderwerp, we willen het opsplitsen in woorden en het voorkomen van elk woord berekenen.

We kunnen een instantie van de KStreamsBuilder klasse om te beginnen met het construeren van onze topologie:

KStreamBuilder-builder = nieuwe KStreamBuilder (); KStream textLines = builder.stream (inputTopic); Patroonpatroon = Pattern.compile ("\ W +", Pattern.UNICODE_CHARACTER_CLASS); KTable wordCounts = textLines .flatMapValues ​​(waarde -> Arrays.asList (patroon.split (waarde.toLowerCase ()))) .groupBy ((sleutel, woord) -> woord) .count ();

Om het aantal woorden te implementeren, moeten we eerst de waarden splitsen met behulp van de reguliere expressie.

De split-methode retourneert een array. We gebruiken de flatMapValues ​​() om het af te vlakken. Anders zouden we eindigen met een lijst met arrays en zou het onhandig zijn om code te schrijven met een dergelijke structuur.

Ten slotte verzamelen we de waarden voor elk woord en noemen we de tellen () dat zal het voorkomen van een specifiek woord berekenen.

5. Omgaan met resultaten

We hebben het aantal woorden van onze invoerberichten al berekend. Laten we nu de resultaten op de standaarduitvoer afdrukken met behulp van de foreach () methode:

wordCounts .foreach ((w, c) -> System.out.println ("word:" + w + "->" + c));

Bij productie publiceert een dergelijke streaming-taak vaak de uitvoer naar een ander Kafka-onderwerp.

We zouden dit kunnen doen met behulp van de to () methode:

String outputTopic = "outputTopic"; Serde stringSerde = Serdes.String (); Serde longSerde = Serdes.Long (); wordCounts.to (stringSerde, longSerde, outputTopic);

De Serde class geeft ons voorgeconfigureerde serializers voor Java-typen die zullen worden gebruikt om objecten te serialiseren naar een array van bytes. De reeks bytes wordt vervolgens naar het Kafka-onderwerp gestuurd.

We gebruiken Draad als sleutel tot ons onderwerp en Lang als een waarde voor de werkelijke telling. De naar() methode slaat de resulterende gegevens op in outputTopic.

6. De KafkaStream-taak starten

Tot nu toe hebben we een topologie gebouwd die kan worden uitgevoerd. De klus is echter nog niet begonnen.

We moeten ons werk expliciet beginnen door de begin() methode op de KafkaStreams voorbeeld:

KafkaStreams streams = nieuwe KafkaStreams (builder, streamsConfiguration); streams.start (); Thread.sleep (30000); streams.close ();

Houd er rekening mee dat we 30 seconden wachten totdat de taak is voltooid. In een realistisch scenario zou die taak de hele tijd draaien en gebeurtenissen van Kafka verwerken zodra ze binnenkomen.

We kunnen ons werk testen door enkele evenementen op ons Kafka-onderwerp te publiceren.

Laten we een kafka-console-producent en stuur enkele evenementen handmatig naar onze input Onderwerp:

./kafka-console-producer --topic inputTopic --broker-list localhost: 9092> "dit is een pony"> "dit is een paard en een pony" 

Op deze manier hebben we twee evenementen op Kafka gepubliceerd. Onze applicatie zal die gebeurtenissen gebruiken en de volgende uitvoer afdrukken:

woord: -> 1 woord: dit -> 1 woord: is -> 1 woord: a -> 1 woord: pony -> 1 woord: -> 2 woord: dit -> 2 woord: is -> 2 woord: a - > 2 woord: paard -> 1 woord: en -> 1 woord: pony -> 2

We kunnen zien dat toen het eerste bericht arriveerde, het woord pony kwam slechts één keer voor. Maar toen we het tweede bericht stuurden, het woord pony gebeurde voor de tweede keer afdrukken: "woord: pony -> 2 ″.

6. Conclusie

In dit artikel wordt beschreven hoe u een primaire streamverwerkingstoepassing maakt met Apache Kafka als gegevensbron en het KafkaStreams bibliotheek als de streamverwerkingsbibliotheek.

Al deze voorbeelden en codefragmenten zijn te vinden in het GitHub-project - dit is een Maven-project, dus het moet gemakkelijk te importeren en uit te voeren zijn zoals het is.


$config[zx-auto] not found$config[zx-overlay] not found