Een datapijplijn bouwen met Kafka, Spark Streaming en Cassandra

1. Overzicht

Apache Kafka is een schaalbaar, hoogwaardig platform met lage latentie dat maakt het lezen en schrijven van gegevensstromen mogelijk, net als een berichtensysteem. We kunnen vrij gemakkelijk beginnen met Kafka in Java.

Spark Streaming maakt deel uit van het Apache Spark-platform dat maakt schaalbare, fouttolerante verwerking van gegevensstromen met hoge doorvoer mogelijk. Hoewel het in Scala is geschreven, biedt Spark Java API's om mee te werken.

Apache Cassandra is een gedistribueerde NoSQL-gegevensopslag met brede kolommen. Meer informatie over Cassandra is beschikbaar in ons vorige artikel.

In deze tutorial combineren we deze om een zeer schaalbare en fouttolerante datapijplijn voor een realtime datastroom.

2. Installaties

Om te beginnen hebben we Kafka, Spark en Cassandra lokaal geïnstalleerd op onze computer om de applicatie uit te voeren. We zullen gaandeweg zien hoe we een datapijplijn kunnen ontwikkelen met behulp van deze platforms.

We laten echter alle standaardconfiguraties, inclusief poorten, voor alle installaties staan, wat zal helpen om de tutorial soepel te laten verlopen.

2.1. Kafka

Het installeren van Kafka op onze lokale computer is redelijk eenvoudig en kan worden gevonden als onderdeel van de officiële documentatie. We zullen de 2.1.0-release van Kafka gebruiken.

Daarnaast, Kafka vereist dat Apache Zookeeper wordt uitgevoerd maar voor deze tutorial gebruiken we de Zookeeper-instantie met één knoop die is verpakt met Kafka.

Zodra we Zookeeper en Kafka lokaal hebben kunnen starten volgens de officiële gids, kunnen we doorgaan met het maken van ons onderwerp met de naam "berichten":

 $ KAFKA_HOME $ \ bin \ windows \ kafka-topics.bat --create \ --zookeeper localhost: 2181 \ --replication-factor 1 --partitions 1 \ --topic messages

Merk op dat het bovenstaande script voor het Windows-platform is, maar er zijn ook vergelijkbare scripts beschikbaar voor Unix-achtige platforms.

2.2. Vonk

Spark gebruikt de clientbibliotheken van Hadoop voor HDFS en YARN. Bijgevolg, het kan erg lastig zijn om de compatibele versies van al deze samen te stellen. De officiële download van Spark wordt echter voorverpakt met populaire versies van Hadoop. Voor deze tutorial gebruiken we het pakket van versie 2.3.0 "vooraf gebouwd voor Apache Hadoop 2.7 en later".

Zodra het juiste pakket Spark is uitgepakt, kunnen de beschikbare scripts worden gebruikt om aanvragen in te dienen. We zullen dit later zien wanneer we onze applicatie in Spring Boot ontwikkelen.

2.3. Cassandra

DataStax stelt een community-editie van Cassandra beschikbaar voor verschillende platforms, waaronder Windows. We kunnen dit heel gemakkelijk downloaden en installeren op onze lokale computer volgens de officiële documentatie. We gebruiken versie 3.9.0.

Zodra het ons is gelukt om Cassandra op onze lokale computer te installeren en te starten, kunnen we doorgaan met het maken van onze sleutelruimte en tabel. Dit kan worden gedaan met behulp van de CQL-shell die bij onze installatie wordt geleverd:

CREËER KEYSPACE-vocabulaire MET REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}; GEBRUIK woordenschat; CREATE TABLE-woorden (woordtekst PRIMARY KEY, count int);

Merk op dat we een naamruimte hebben gemaakt met de naam woordenschat en een tafel daarin riep woorden met twee kolommen, woord, en tellen.

3. Afhankelijkheden

We kunnen Kafka- en Spark-afhankelijkheden in onze applicatie integreren via Maven. We halen deze afhankelijkheden uit Maven Central:

  • Kernvonk
  • SQL Spark
  • Streaming Spark
  • Kafka Spark streamen
  • Cassandra Spark
  • Cassandra Java Spark

En we kunnen ze dienovereenkomstig aan onze pom toevoegen:

 org.apache.spark spark-core_2.11 2.3.0 verstrekt org.apache.spark spark-sql_2.11 2.3.0 verstrekt org.apache.spark spark-streaming_2.11 2.3.0 verstrekt org.apache.spark spark-streaming -kafka-0-10_2.11 2.3.0 com.datastax.spark spark-cassandra-connector_2.11 2.3.0 com.datastax.spark spark-cassandra-connector-java_2.11 1.5.2 

Merk op dat sommige van deze afhankelijkheden zijn gemarkeerd als voorzien in omvang. Dit komt omdat deze beschikbaar worden gemaakt door de Spark-installatie waar we de aanvraag voor uitvoering indienen met behulp van spark-submit.

4. Spark-streaming - Kafka-integratiestrategieën

Op dit punt is het de moeite waard om kort te praten over de integratiestrategieën voor Spark en Kafka.

Kafka introduceerde een nieuwe consumenten-API tussen versie 0.8 en 0.10. Daarom zijn de overeenkomstige Spark Streaming-pakketten beschikbaar voor beide broker-versies. Het is belangrijk om het juiste pakket te kiezen, afhankelijk van de beschikbare makelaar en de gewenste functies.

4.1. Spark Streaming Kafka 0.8

De 0.8-versie is de stabiele integratie-API met opties voor het gebruik van de ontvanger-gebaseerde of de directe benadering. We zullen niet ingaan op de details van deze benaderingen die we kunnen vinden in de officiële documentatie. Een belangrijk punt om op te merken is dat dit pakket compatibel is met Kafka Broker-versies 0.8.2.1 of hoger.

4.2. Spark Streaming Kafka 0.10

Dit bevindt zich momenteel in een experimentele staat en is alleen compatibel met Kafka Broker-versies 0.10.0 of hoger. Dit pakket biedt alleen de directe benadering en maakt nu gebruik van de nieuwe Kafka-consumenten-API. We kunnen hierover meer informatie vinden in de officiële documentatie. Belangrijker is dat het zo is niet achterwaarts compatibel met oudere Kafka Broker-versies.

Houd er rekening mee dat we voor deze tutorial gebruik maken van het 0.10-pakket. De afhankelijkheid die in de vorige paragraaf werd genoemd, verwijst hier alleen naar.

5. Ontwikkeling van een datapijplijn

We zullen een eenvoudige applicatie in Java maken met Spark die zal integreren met het Kafka-onderwerp dat we eerder hebben gemaakt. De applicatie leest de berichten zoals gepost en telt de frequentie van woorden in elk bericht. Dit wordt dan bijgewerkt in de Cassandra-tabel die we eerder hebben gemaakt.

Laten we snel visualiseren hoe de gegevens zullen stromen:

5.1. Krijgen JavaStreamingContext

Ten eerste beginnen we met het initialiseren van het JavaStreamingContext dat is het toegangspunt voor alle Spark Streaming-applicaties:

SparkConf sparkConf = nieuwe SparkConf (); sparkConf.setAppName ("WordCountingApp"); sparkConf.set ("spark.cassandra.connection.host", "127.0.0.1"); JavaStreamingContext streamingContext = nieuwe JavaStreamingContext (sparkConf, Durations.seconds (1));

5.2. Krijgen DStream van Kafka

Nu kunnen we verbinding maken met het Kafka-onderwerp vanuit het JavaStreamingContext:

Kaart kafkaParams = nieuwe HashMap (); kafkaParams.put ("bootstrap.servers", "localhost: 9092"); kafkaParams.put ("key.deserializer", StringDeserializer.class); kafkaParams.put ("value.deserializer", StringDeserializer.class); kafkaParams.put ("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put ("auto.offset.reset", "latest"); kafkaParams.put ("enable.auto.commit", false); Verzameling onderwerpen = Arrays.asList ("berichten"); JavaInputDStream messages = KafkaUtils.createDirectStream (streamingContext, LocationStrategies.PreferConsistent (), ConsumerStrategies. Abonneren (onderwerpen, kafkaParams));

Houd er rekening mee dat we hier deserializers voor sleutel en waarde moeten leveren. Voor veelgebruikte gegevenstypen zoals Draad, is de deserializer standaard beschikbaar. Als we echter aangepaste gegevenstypen willen ophalen, moeten we aangepaste deserializers leveren.

Hier hebben we verkregen JavaInputDStream wat een implementatie is van Discretized Streams of DStreams, de basis abstractie van Spark Streaming. Intern is DStreams niets anders dan een continue reeks RDD's.

5.3. Verwerking verkregen DStream

We zullen nu een reeks bewerkingen uitvoeren op het JavaInputDStream om woordfrequenties in de berichten te verkrijgen:

JavaPairDStream resultaten = berichten .mapToPair (record -> nieuwe Tuple2 (record.key (), record.value ())); JavaDStream-lijnen = resultaten .map (tuple2 -> tuple2._2 ()); JavaDStream-woorden = lijnen .flatMap (x -> Arrays.asList (x.split ("\ s +")). Iterator ()); JavaPairDStream wordCounts = woorden .mapToPair (s -> nieuwe Tuple2 (s, 1)) .reduceByKey ((i1, i2) -> i1 + i2);

5.4. Aanhoudend verwerkt DStream in Cassandra

Ten slotte kunnen we de verwerkte JavaPairDStream om ze in onze Cassandra-tafel in te voegen:

wordCounts.foreachRDD (javaRdd -> {Map wordCountMap = javaRdd.collectAsMap (); voor (String key: wordCountMap.keySet ()) {List wordList = Arrays.asList (nieuw woord (key, wordCountMap.get (key))); JavaRDD rdd = streamingContext.sparkContext (). Parallelize (wordList); javaFunctions (rdd) .writerBuilder ("vocabulary", "words", mapToRow (Word.class)). SaveToCassandra ();}});

5.5. De applicatie uitvoeren

Omdat dit een applicatie voor het verwerken van streams is, willen we deze draaiende houden:

streamingContext.start (); streamingContext.awaitTermination ();

6. Gebruikmaken van checkpoints

In een streamverwerkingsapplicatie, het is vaak handig om de status te behouden tussen batches met gegevens die worden verwerkt.

In onze vorige poging kunnen we bijvoorbeeld alleen de huidige frequentie van de woorden opslaan. Wat als we in plaats daarvan de cumulatieve frequentie willen opslaan? Spark Streaming maakt het mogelijk door middel van een concept genaamd checkpoints.

We zullen nu de pijplijn aanpassen die we eerder hebben gemaakt om gebruik te maken van controlepunten:

Houd er rekening mee dat we controlepunten alleen gebruiken voor de sessie van gegevensverwerking. Dit biedt geen fouttolerantie. Checkpointing kan echter ook worden gebruikt voor fouttolerantie.

Er zijn een paar wijzigingen die we in onze applicatie moeten aanbrengen om gebruik te maken van checkpoints. Dit omvat het verstrekken van de JavaStreamingContext met een checkpoint-locatie:

streamingContext.checkpoint ("./. checkpoint");

Hier gebruiken we het lokale bestandssysteem om controlepunten op te slaan. Voor robuustheid moet dit echter worden opgeslagen op een locatie zoals HDFS, S3 of Kafka. Meer hierover is beschikbaar in de officiële documentatie.

Vervolgens moeten we het ijkpunt ophalen en een cumulatief aantal woorden maken terwijl we elke partitie verwerken met behulp van een toewijzingsfunctie:

JavaMapWithStateDStream cumulativeWordCounts = wordCounts .mapWithState (StateSpec.function ((word, one, state) -> {int sum = one.orElse (0) + (state.exists ()? state.get (): 0); Tuple2 output = nieuw Tuple2 (woord, som); state.update (som); output teruggeven;}));

Zodra we het cumulatieve aantal woorden hebben, kunnen we doorgaan met herhalen en ze opslaan in Cassandra zoals eerder.

Houd er rekening mee dat while data checkpointing is handig voor stateful verwerking, het brengt latentiekosten met zich mee. Daarom is het noodzakelijk om dit verstandig te gebruiken, samen met een optimaal controlepuntinterval.

7. Offsets begrijpen

Als we ons enkele van de Kafka-parameters herinneren die we eerder hebben ingesteld:

kafkaParams.put ("auto.offset.reset", "latest"); kafkaParams.put ("enable.auto.commit", false);

Deze betekenen in feite dat we willen de offset niet automatisch vastleggen en willen elke keer dat een consumentengroep wordt geïnitialiseerd de laatste offset kiezen. Bijgevolg zal onze applicatie alleen berichten kunnen verwerken die zijn gepost tijdens de lopende periode.

Als we alle geposte berichten willen consumeren, ongeacht of de applicatie actief was of niet, en ook de reeds geplaatste berichten willen bijhouden, we zullen de offset op de juiste manier moeten configureren, samen met het opslaan van de offset-status, hoewel dit een beetje buiten het bereik van deze tutorial valt.

Dit is ook een manier waarop Spark Streaming een bepaald niveau van garantie biedt, zoals "precies één keer". Dit betekent in feite dat elk bericht dat op Kafka-onderwerp wordt gepost, slechts één keer door Spark Streaming wordt verwerkt.

8. Toepassing implementeren

Wij kunnen implementeer onze applicatie met behulp van het Spark-submit-script die voorverpakt wordt geleverd met de Spark-installatie:

$ SPARK_HOME $ \ bin \ spark-submit \ --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \ --master local [2] \ target \ spark-streaming-app-0.0.1-SNAPSHOT-jar-met-afhankelijkheden .pot

Houd er rekening mee dat de jar die we maken met Maven de afhankelijkheden moet bevatten die niet zijn gemarkeerd als voorzien in omvang.

Zodra we deze aanvraag hebben ingediend en enkele berichten hebben geplaatst in het Kafka-onderwerp dat we eerder hebben gemaakt, zouden we het cumulatieve aantal woorden moeten zien dat wordt gepost in de Cassandra-tabel die we eerder hebben gemaakt.

9. Conclusie

Samenvattend hebben we in deze zelfstudie geleerd hoe we een eenvoudige gegevenspijplijn kunnen maken met Kafka, Spark Streaming en Cassandra. We hebben ook geleerd hoe we checkpoints in Spark Streaming kunnen gebruiken om de status tussen batches te behouden.

Zoals altijd is de code voor de voorbeelden beschikbaar op GitHub.