Inleiding tot Apache Kafka met Spring

Persistentie top

Ik heb zojuist het nieuwe aangekondigd Leer de lente natuurlijk, gericht op de basisprincipes van Spring 5 en Spring Boot 2:

>> BEKIJK DE CURSUS

1. Overzicht

Apache Kafka is een gedistribueerd en fouttolerant stroomverwerkingssysteem.

In dit artikel bespreken we Spring-ondersteuning voor Kafka en het niveau van abstracties dat het biedt via native Kafka Java-client-API's.

Spring Kafka brengt het eenvoudige en typische Spring-sjabloon programmeermodel met een Kafka-sjabloon en berichtgestuurde POJO's via @KafkaListener annotatie.

2. Installatie en configuratie

Raadpleeg de officiële gids hier om Kafka te downloaden en te installeren.

We moeten ook het lente-kafka afhankelijkheid van onze pom.xml:

 org.springframework.kafka spring-kafka 2.3.7.RELEASE 

De nieuwste versie van dit artefact is hier te vinden.

Onze voorbeeldtoepassing zal een Spring Boot-toepassing zijn.

In dit artikel wordt ervan uitgegaan dat de server wordt gestart met de standaardconfiguratie en dat er geen serverpoorten worden gewijzigd.

3. Onderwerpen configureren

Eerder gebruikten we opdrachtregelprogramma's om onderwerpen in Kafka te maken, zoals:

$ bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ --replicatiefactor 1 --partities 1 \ --topic mytopic

Maar met de introductie van AdminClient in Kafka kunnen we nu onderwerpen programmatisch maken.

We moeten de KafkaAdmin Spring bean, die automatisch onderwerpen toevoegt voor alle soorten bonen Nieuw onderwerp:

@Configuration openbare klasse KafkaTopicConfig {@Value (value = "$ {kafka.bootstrapAddress}") privé String bootstrapAddress; @Bean public KafkaAdmin kafkaAdmin () {Map configs = new HashMap (); configs.put (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); retourneer nieuwe KafkaAdmin (configuraties); } @Bean openbaar NewTopic topic1 () {retourneer nieuwe NewTopic ("baeldung", 1, (short) 1); }}

4. Berichten produceren

Om berichten te maken, moeten we eerst een ProducerFactory die de strategie bepaalt voor het maken van Kafka Producent gevallen.

Dan hebben we een Kafka-sjabloon die een Producent instantie en biedt handige methoden voor het verzenden van berichten naar Kafka-onderwerpen.

Producent instanties zijn thread-safe en daarom zal het gebruik van één instantie in een toepassingscontext betere prestaties opleveren. Bijgevolg, Kakfa-sjabloon instanties zijn ook thread-safe en het gebruik van één instantie wordt aanbevolen.

4.1. Producer Configuratie

@Configuration openbare klasse KafkaProducerConfig {@Bean openbare ProducerFactory producerFactory () {Map configProps = nieuwe HashMap (); configProps.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); retourneer nieuwe DefaultKafkaProducerFactory (configProps); } @Bean openbare KafkaTemplate kafkaTemplate () {retourneer nieuwe KafkaTemplate (producerFactory ()); }}

4.2. Berichten publiceren

We kunnen berichten verzenden met behulp van de Kafka-sjabloon klasse:

@Autowired privé KafkaTemplate kafkaTemplate; public void sendMessage (String msg) {kafkaTemplate.send (topicName, msg); }

De sturen API retourneert een Luisterbare toekomst voorwerp. Als we de verzendende thread willen blokkeren en het resultaat van het verzonden bericht willen krijgen, kunnen we de krijgen API van de Luisterbare toekomst voorwerp. De thread wacht op het resultaat, maar het vertraagt ​​de producer.

Kafka is een platform voor het verwerken van snelle streams. Het is dus een beter idee om de resultaten asynchroon te behandelen, zodat de volgende berichten niet wachten op het resultaat van het vorige bericht. We kunnen dit doen door middel van een callback:

public void sendMessage (String bericht) {ListenableFuture future = kafkaTemplate.send (topicName, message); future.addCallback (nieuwe ListenableFutureCallback() {@Override public void onSuccess (SendResult resultaat) {System.out.println ("Verzonden bericht = [" + bericht + "] met offset = [" + resultaat.getRecordMetadata (). Offset () + "]") ; } @Override public void onFailure (Throwable ex) {System.out.println ("Kan bericht niet verzenden = [" + bericht + "] vanwege:" + ex.getMessage ()); }}); }

5. Berichten consumeren

5.1. Consumentenconfiguratie

Voor het consumeren van berichten moeten we een ConsumerFactory en een KafkaListenerContainerFactory. Zodra deze bonen beschikbaar zijn in de Spring bean-fabriek, kunnen op POJO gebaseerde consumenten worden geconfigureerd met @KafkaListener annotatie.

@EnableKafka annotatie is vereist voor de configuratieklasse om detectie van @KafkaListener annotatie op lente-beheerde bonen:

@EnableKafka @Configuration openbare klasse KafkaConsumerConfig {@Bean openbare ConsumerFactory consumerFactory () {Map props = new HashMap (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put (ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); retourneer nieuwe DefaultKafkaConsumerFactory (rekwisieten); } @Bean openbaar ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = nieuw ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (consumerFactory ()); terugkeer fabriek; }}

5.2. Berichten consumeren

@KafkaListener (topics = "topicName", groupId = "foo") public void listenGroupFoo (String bericht) {System.out.println ("Ontvangen bericht in groep foo:" + bericht); }

Er kunnen meerdere luisteraars worden geïmplementeerd voor een onderwerp, elk met een andere groeps-ID. Bovendien kan een consument berichten beluisteren over verschillende onderwerpen:

@KafkaListener (topics = "topic1, topic2", groupId = "foo")

Spring ondersteunt ook het ophalen van een of meer berichtkoppen met behulp van de @Koptekst annotatie in de luisteraar:

@KafkaListener (topics = "topicName") public void listenWithHeaders (@Payload String bericht, @Header (KafkaHeaders.RECEIVED_PARTITION_ID) int partitie) {System.out.println ("Ontvangen bericht:" + bericht "+" van partitie: "+ partitie);}

5.3. Berichten van een specifieke partitie gebruiken

Zoals je misschien hebt gemerkt, hadden we het onderwerp gemaakt baeldung met slechts één partitie. Voor een onderwerp met meerdere partities geldt echter een @KafkaListener kan zich expliciet abonneren op een bepaalde partitie van een onderwerp met een initiële offset:

@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", partitionOffsets = {@PartitionOffset (partition = "0", initialOffset = "0"), @PartitionOffset (partition = "3", initialOffset = "0")}) , containerFactory = "partitionsKafkaListenerContainerFactory") public void listenToPartition (@Payload String bericht, @Header (KafkaHeaders.RECEIVED_PARTITION_ID) int partitie) {System.out.println ("Ontvangen bericht:" + bericht "+" van partitie: "+ partitie) ;}

Sinds de initialOffset is verzonden naar 0 in deze listener, worden alle eerder verbruikte berichten van partities 0 en drie opnieuw gebruikt telkens wanneer deze listener wordt geïnitialiseerd. Als het instellen van de offset niet nodig is, kunnen we de partities eigendom van @TopicPartition annotatie om alleen de partities in te stellen zonder de offset:

@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", partitions = {"0", "1"}))

5.4. Berichtfilter voor luisteraars toevoegen

Luisteraars kunnen worden geconfigureerd om specifieke soorten berichten te consumeren door een aangepast filter toe te voegen. Dit kan gedaan worden door een RecordFilterStrategy naar de KafkaListenerContainerFactory:

@Bean openbaar ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = nieuw ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (consumerFactory ()); factory.setRecordFilterStrategy (record -> record.value (). bevat ("Wereld")); terugkeer fabriek; }

Een luisteraar kan vervolgens worden geconfigureerd om deze containerfabriek te gebruiken:

@KafkaListener (topics = "topicName", containerFactory = "filterKafkaListenerContainerFactory") public void listenWithFilter (String bericht) {System.out.println ("Ontvangen bericht in gefilterde luisteraar:" + bericht); }

In deze luisteraar zijn alle berichten die overeenkomen met het filter, worden verwijderd.

6. Aangepaste berichtconverters

Tot nu toe hebben we alleen het verzenden en ontvangen van strings als berichten behandeld. We kunnen echter ook aangepaste Java-objecten verzenden en ontvangen. Dit vereist het configureren van de juiste serialisator in ProducerFactory en deserializer in ConsumerFactory.

Laten we eens kijken naar een eenvoudige bonenklasse, die we als berichten zullen verzenden:

openbare klasse Begroeting {private String msg; private String naam; // standaard getters, setters en constructor}

6.1. Aangepaste berichten produceren

In dit voorbeeld gebruiken we JsonSerializer. Laten we eens kijken naar de code voor ProducerFactory en Kafka-sjabloon:

@Bean openbare ProducerFactory greetingProducerFactory () {// ... configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); retourneer nieuwe DefaultKafkaProducerFactory (configProps); } @Bean openbare KafkaTemplate greetingKafkaTemplate () {retourneer nieuwe KafkaTemplate (greetingProducerFactory ()); }

Deze nieuwe Kafka-sjabloon kan worden gebruikt om de Groet bericht:

kafkaTemplate.send (topicName, nieuwe begroeting ("Hallo", "Wereld"));

6.2. Aangepaste berichten gebruiken

Laten we op dezelfde manier het ConsumerFactory en KafkaListenerContainerFactory om het begroetingsbericht correct te deserialiseren:

@Bean public ConsumerFactory greetingConsumerFactory () {// ... retourneer nieuwe DefaultKafkaConsumerFactory (rekwisieten, nieuwe StringDeserializer (), nieuwe JsonDeserializer (Greeting.class)); } @Bean openbaar ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = nieuw ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (greetingConsumerFactory ()); terugkeer fabriek; }

De spring-kafka JSON-serialisator en deserializer gebruiken de Jackson-bibliotheek die ook een optionele maven-afhankelijkheid is voor het spring-kafka-project. Dus laten we het toevoegen aan ons pom.xml:

 com.fasterxml.jackson.core jackson-databind 2.9.7 

In plaats van de nieuwste versie van Jackson te gebruiken, wordt aanbevolen om de versie te gebruiken die is toegevoegd aan het pom.xml van spring-kafka.

Ten slotte moeten we een luisteraar schrijven om te consumeren Groet berichten:

@KafkaListener (topics = "topicName", containerFactory = "greetingKafkaListenerContainerFactory") public void greetingListener (groetgroet) {// begroetingsbericht verwerken}

7. Conclusie

In dit artikel hebben we de basisprincipes van Spring-ondersteuning voor Apache Kafka besproken. We hebben kort gekeken naar de klassen die worden gebruikt voor het verzenden en ontvangen van berichten.

De volledige broncode voor dit artikel is te vinden op GitHub. Voordat u de code uitvoert, moet u ervoor zorgen dat de Kafka-server actief is en dat de onderwerpen handmatig zijn gemaakt.

Persistentie onderaan

Ik heb zojuist het nieuwe aangekondigd Leer de lente natuurlijk, gericht op de basisprincipes van Spring 5 en Spring Boot 2:

>> BEKIJK DE CURSUS