Precies één keer verwerken in Kafka met Java

1. Overzicht

In deze tutorial zullen we kijken hoe Kafka zorgt voor exact één keer levering tussen producenten- en consumententoepassingen via de nieuw geïntroduceerde Transactional API.

Bovendien gebruiken we deze API om transactionele producenten en consumenten te implementeren om in een WordCount-voorbeeld end-to-end exact één keer te leveren.

2. Berichtaflevering in Kafka

Vanwege verschillende storingen kunnen berichtensystemen de bezorging van berichten tussen producenten en consumententoepassingen niet garanderen. Afhankelijk van hoe de clienttoepassingen omgaan met dergelijke systemen, zijn de volgende berichtsemantiek mogelijk:

  • Als een berichtensysteem nooit een bericht dupliceert, maar af en toe een bericht mist, noemen we dat ten hoogste één keer
  • Of, als het nooit een bericht zal missen maar af en toe een bericht kan dupliceren, noemen we het ten minste een keer
  • Maar als het altijd alle berichten zonder duplicatie aflevert, tenminste precies één keer

Aanvankelijk ondersteunde Kafka alleen de bezorging van berichten ten minste één keer en ten minste één keer.

Echter, de introductie van transacties tussen Kafka-makelaars en klantapplicaties zorgt voor exact één levering in Kafka. Laten we, om het beter te begrijpen, snel de transactionele client-API bekijken.

3. Maven afhankelijkheden

Om met de transactie-API te werken, hebben we de Java-client van Kafka nodig in onze pom:

 org.apache.kafka kafka-clients 2.0.0 

4. Een transactie consumeren-transformeren-produceren Lus

Voor ons voorbeeld gaan we berichten gebruiken van een invoeronderwerp, zinnen.

Vervolgens tellen we voor elke zin elk woord en sturen we het individuele aantal woorden naar een uitvoeronderwerp, telt.

In het voorbeeld gaan we ervan uit dat er al transactiegegevens beschikbaar zijn in het zinnen onderwerp.

4.1. Een transactiebewuste producent

Laten we dus eerst een typische Kafka-producer toevoegen.

Eigenschappen producerProps = nieuwe Eigenschappen (); producerProps.put ("bootstrap.servers", "localhost: 9092");

Bovendien moeten we een transactional.id en inschakelen idempotentie:

producerProps.put ("enable.idempotence", "true"); producerProps.put ("transactional.id", "prod-1"); KafkaProducer producer = nieuwe KafkaProducer (producerProps);

Omdat we idempotence hebben ingeschakeld, zal Kafka deze transactie-ID gebruiken als onderdeel van zijn algoritme voor ontdubbel elk bericht van deze producentstuurt, zorgen voor idempotentie.

Simpel gezegd, als de producer per ongeluk hetzelfde bericht meer dan eens naar Kafka stuurt, kunnen deze instellingen het opmerken.

Het enige dat we hoeven te doen is zorg ervoor dat de transactie-ID voor elke producent verschillend is, hoewel consistent bij het opnieuw opstarten.

4.2. De producent inschakelen voor transacties

Als we klaar zijn, moeten we ook bellen initTransaction om de producent voor te bereiden op het gebruik van transacties:

producer.initTransactions ();

Dit registreert de producent bij de makelaar als degene die transacties kan gebruiken, het identificeren door zijn transactional.id en een volgnummer, of epoche. De makelaar zal deze op zijn beurt gebruiken om eventuele acties naar een transactielogboek te schrijven.

En bijgevolg, de makelaar zal alle acties uit dat logboek verwijderen die behoren tot een producent met hetzelfde transactie-ID en eerdertijdperk, ervan uitgaande dat ze afkomstig zijn van niet meer bestaande transacties.

4.3. Een transactiebewuste consument

Als we consumeren, kunnen we alle berichten op een onderwerppartitie op volgorde lezen. Hoewel, we kunnen aangeven met isolation.level dat we moeten wachten met het lezen van transactionele berichten totdat de bijbehorende transactie is vastgelegd:

Eigenschappen consumerProps = nieuwe Eigenschappen (); consumerProps.put ("bootstrap.servers", "localhost: 9092"); consumerProps.put ("group.id", "my-group-id"); consumerProps.put ("enable.auto.commit", "false"); consumerProps.put ("isolation.level", "read_committed"); KafkaConsumer consument = nieuwe KafkaConsumer (consumerProps); consumer.subscribe (singleton ("zinnen"));

Gebruik een waarde van read_committed zorgt ervoor dat we geen transactieberichten lezen voordat de transactie is voltooid.

De standaardwaarde van isolation.level is read_uncommitted.

4.4. Consumeren en transformeren door transactie

Nu we de producent en de consument beide hebben geconfigureerd om te schrijven en te lezen, kunnen we records uit ons invoeronderwerp gebruiken en elk woord in elk record tellen:

ConsumerRecords records = consumer.poll (ofSeconds (60)); Map wordCountMap = records.records (nieuwe TopicPartition ("input", 0)) .stream () .flatMap (record -> Stream.of (record.value (). Split (""))) .map (word -> Tuple.of (word, 1)) .collect (Collectors.toMap (tuple -> tuple.getKey (), t1 -> t1.getValue (), (v1, v2) -> v1 + v2));

Houd er rekening mee dat er niets transactiegericht is aan de bovenstaande code. Maar, sinds we gebruikten read_committed, het betekent dat geen berichten die in dezelfde transactie naar het invoeronderwerp zijn geschreven, door deze consument worden gelezen totdat ze allemaal zijn geschreven.

Nu kunnen we het berekende aantal woorden naar het uitvoeronderwerp sturen.

Laten we eens kijken hoe we onze resultaten kunnen produceren, ook op transactiebasis.

4.5. API verzenden

Om onze tellingen als nieuwe berichten te verzenden, maar in dezelfde transactie, bellen we beginTransaction:

producer.beginTransaction ();

Vervolgens kunnen we ze allemaal naar ons "counts" -onderwerp schrijven, waarbij de sleutel het woord is en de count de waarde:

wordCountMap.forEach ((sleutel, waarde) -> producer.send (nieuwe ProducerRecord ("counts", sleutel, waarde.toString ())));

Merk op dat omdat de producent de gegevens op de sleutel kan partitioneren, dit betekent dat transactionele berichten kunnen meerdere partities omvatten, die elk door afzonderlijke consumenten worden gelezen. Daarom zal Kafka-makelaar een lijst met alle bijgewerkte partities voor een transactie opslaan.

Merk ook op dat, binnen een transactie kan een producent meerdere threads gebruiken om records parallel te verzenden.

4.6. Compensaties plegen

En tot slot moeten we onze compensaties vastleggen die we net hebben verbruikt. Bij transacties verbinden we de offsets terug naar het invoeronderwerp waarvan we ze lezen, zoals normaal. Maar ook wij stuur ze naar de transactie van de producent.

We kunnen dit allemaal in een enkele aanroep doen, maar we moeten eerst de offsets voor elke onderwerppartitie berekenen:

Kaart offsetsToCommit = nieuwe HashMap (); voor (TopicPartition-partitie: records.partitions ()) {List partitionedRecords = records.records (partitie); lange offset = partitionedRecords.get (partitionedRecords.size () - 1) .offset (); offsetsToCommit.put (partitie, nieuwe OffsetAndMetadata (offset + 1)); }

Merk op dat wat we aan de transactie vastleggen, de aanstaande compensatie is, wat betekent dat we er 1 moeten toevoegen.

Vervolgens kunnen we onze berekende offsets naar de transactie sturen:

producer.sendOffsetsToTransaction (offsetsToCommit, "mijn-groeps-id");

4.7. De transactie vastleggen of afbreken

En tot slot kunnen we de transactie vastleggen, die de compensaties atomair naar de consumer_offsets onderwerp en de transactie zelf:

producer.commitTransaction ();

Dit verwijdert elk gebufferd bericht naar de respectievelijke partities. Daarnaast stelt de Kafka-makelaar alle berichten in die transactie beschikbaar voor de consumenten.

Als er iets misgaat tijdens de verwerking, bijvoorbeeld als we een uitzondering opvangen, kunnen we natuurlijk bellen transactie afbreken:

probeer {// ... lees van invoeronderwerp // ... transform // ... schrijf naar uitvoeronderwerp producer.commitTransaction (); } catch (uitzondering e) {producer.abortTransaction (); }

En laat alle gebufferde berichten vallen en verwijder de transactie van de makelaar.

Als we niet vastleggen of afbreken voordat de broker is geconfigureerd max.transaction.timeout.ms, de Kafka-makelaar zal de transactie zelf afbreken. De standaardwaarde voor deze eigenschap is 900.000 milliseconden of 15 minuten.

5. Andere consumeren-transform-produceren Loops

Wat we zojuist hebben gezien, is een basis consumeren-transformeren-produceren loop die leest en schrijft naar hetzelfde Kafka-cluster.

Omgekeerd, toepassingen die moeten lezen en schrijven naar verschillende Kafka-clusters, moeten de oudere commitSync en commitAsync API. Doorgaans slaan applicaties consumentenoffsets op in hun externe statusopslag om de transactionaliteit te behouden.

6. Conclusie

Voor datakritische applicaties is end-to-end exact één keer verwerking vaak noodzakelijk.

In deze tutorial we hebben gezien hoe we Kafka gebruiken om precies dit te doen, met behulp van transacties, en we hebben een op transacties gebaseerd voorbeeld van het tellen van woorden geïmplementeerd om het principe te illustreren.

Bekijk gerust alle codevoorbeelden op GitHub.