Een datapijplijn bouwen met Flink en Kafka

1. Overzicht

Apache Flink is een framework voor streamverwerking dat gemakkelijk met Java kan worden gebruikt. Apache Kafka is een gedistribueerd streamverwerkingssysteem dat een hoge fouttolerantie ondersteunt.

In deze zelfstudie gaan we bekijken hoe we een datapijplijn kunnen bouwen met behulp van deze twee technologieën.

2. Installatie

Raadpleeg de officiële gids om Apache Kafka te installeren en configureren. Na de installatie kunnen we de volgende opdrachten gebruiken om de nieuwe onderwerpen met de naam te maken flink_input en flink_output:

 bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ --replication-factor 1 --partitions 1 \ --topic flink_output bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ --replicatiefactor 1 --partitions 1 \ --topic flink_input

In het belang van deze tutorial gebruiken we de standaardconfiguratie en standaardpoorten voor Apache Kafka.

3. Flinkgebruik

Apache Flink maakt een real-time streamverwerkingstechnologie mogelijk. Het framework maakt het mogelijk om meerdere systemen van derden te gebruiken als streambronnen of sinks.

In Flink - er zijn verschillende connectoren beschikbaar:

  • Apache Kafka (bron / sink)
  • Apache Cassandra (gootsteen)
  • Amazon Kinesis Streams (bron / bron)
  • Elasticsearch (gootsteen)
  • Hadoop FileSystem (sink)
  • RabbitMQ (bron / afvoer)
  • Apache NiFi (bron / afvoer)
  • Twitter Streaming API (bron)

Om Flink aan ons project toe te voegen, moeten we de volgende Maven-afhankelijkheden opnemen:

 org.apache.flink flink-core 1.5.0 org.apache.flink flink-connector-kafka-0.11_2.11 1.5.0 

Door deze afhankelijkheden toe te voegen, kunnen we Kafka-onderwerpen consumeren en van en naar Kafka-onderwerpen produceren. Je kunt de huidige versie van Flink vinden op Maven Central.

4. Kafka String-consument

Om gegevens van Kafka met Flink te gebruiken, moeten we een onderwerp en een Kafka-adres opgeven. We moeten ook een groeps-ID opgeven die wordt gebruikt om offsets vast te houden, zodat we niet altijd de volledige gegevens vanaf het begin lezen.

Laten we een statische methode maken die het maken van FlinkKafkaConsumer gemakkelijker:

openbare statische FlinkKafkaConsumer011 createStringConsumerForTopic (String-onderwerp, String kafkaAddress, String kafkaGroup) {Eigenschappen rekwisieten = nieuwe Eigenschappen (); props.setProperty ("bootstrap.servers", kafkaAddress); props.setProperty ("group.id", kafkaGroup); FlinkKafkaConsumer011 consument = nieuwe FlinkKafkaConsumer011 (onderwerp, nieuwe SimpleStringSchema (), rekwisieten); retour consument; }

Deze methode duurt een topic, kafkaAddress, en kafkaGroup en creëert het FlinkKafkaConsumer die gegevens van een bepaald onderwerp als een Draad sinds we hebben gebruikt SimpleStringSchema om gegevens te decoderen.

Het nummer 011 in de naam van de klasse verwijst naar de Kafka-versie.

5. Kafka String Producer

Om gegevens naar Kafka te produceren, moeten we het Kafka-adres en het onderwerp opgeven dat we willen gebruiken. Nogmaals, we kunnen een statische methode maken die ons zal helpen om producers te maken voor verschillende onderwerpen:

openbare statische FlinkKafkaProducer011 createStringProducer (String-onderwerp, String kafkaAddress) {retourneer nieuwe FlinkKafkaProducer011 (kafkaAddress, onderwerp, nieuwe SimpleStringSchema ()); }

Deze methode duurt slechts onderwerp en kafkaAddress als argumenten, aangezien het niet nodig is om groeps-ID op te geven wanneer we produceren naar Kafka-onderwerp.

6. Stringstroomverwerking

Als we een volledig werkende consument en producent hebben, kunnen we proberen om gegevens uit Kafka te verwerken en onze resultaten vervolgens weer op te slaan in Kafka. De volledige lijst met functies die kunnen worden gebruikt voor streamverwerking, vindt u hier.

In dit voorbeeld gaan we woorden in elk Kafka-item met een hoofdletter schrijven en het vervolgens terugschrijven naar Kafka.

Hiervoor moeten we een custom MapFunction:

public class WordsCapitalizer implementeert MapFunction {@Override public String map (String s) {return s.toUpperCase (); }}

Nadat we de functie hebben gemaakt, kunnen we deze gebruiken bij streamverwerking:

public static void capitalize () {String inputTopic = "flink_input"; String outputTopic = "flink_output"; String consumerGroup = "baeldung"; String address = "localhost: 9092"; StreamExecutionEnvironment-omgeving = StreamExecutionEnvironment .getExecutionEnvironment (); FlinkKafkaConsumer011 flinkKafkaConsumer = createStringConsumerForTopic (inputTopic, adres, consumerGroup); DataStream stringInputStream = omgeving .addSource (flinkKafkaConsumer); FlinkKafkaProducer011 flinkKafkaProducer = createStringProducer (outputTopic, adres); stringInputStream .map (nieuwe WordsCapitalizer ()) .addSink (flinkKafkaProducer); }

De applicatie leest gegevens van het flink_input onderwerp, voer bewerkingen uit op de stream en sla de resultaten vervolgens op in het flink_output onderwerp in Kafka.

We hebben gezien hoe we met Strings moeten omgaan met Flink en Kafka. Maar vaak is het vereist om bewerkingen uit te voeren op aangepaste objecten. In de volgende hoofdstukken zullen we zien hoe u dit kunt doen.

7. Deserialisatie van aangepaste objecten

De volgende klasse vertegenwoordigt een eenvoudig bericht met informatie over afzender en ontvanger:

@JsonSerialize openbare klasse InputMessage {String-afzender; String ontvanger; LocalDateTime sentAt; String bericht; }

Eerder gebruikten we SimpleStringSchema om berichten van Kafka te deserialiseren, maar nu we willen gegevens rechtstreeks deserialiseren naar aangepaste objecten.

Hiervoor hebben we een maatwerk nodig Deserialisatie schema:

openbare klasse InputMessageDeserializationSchema implementeert DeserializationSchema {statische ObjectMapper objectMapper = nieuwe ObjectMapper () .registerModule (nieuwe JavaTimeModule ()); @Override openbare InputMessage deserialize (byte [] bytes) genereert IOException {return objectMapper.readValue (bytes, InputMessage.class); } @Override openbare boolean isEndOfStream (InputMessage inputMessage) {return false; } @Override openbare TypeInformation getProducedType () {retourneer TypeInformation.of (InputMessage.class); }}

We gaan er hier van uit dat de berichten in Kafka als JSON worden bewaard.

Omdat we een veld met type hebben LocalDateTime, moeten we de JavaTimeModule, die zorgt voor het in kaart brengen LocalDateTime objecten naar JSON.

Flink-schema's mogen geen velden hebben die niet serialiseerbaar zijn omdat alle operators (zoals schema's of functies) aan het begin van de taak worden geserialiseerd.

Er zijn vergelijkbare problemen in Apache Spark. Een van de bekende oplossingen voor dit probleem is het initialiseren van velden als statisch, zoals we deden met ObjectMapper bovenstaande. Het is niet de mooiste oplossing, maar het is relatief eenvoudig en doet zijn werk.

De methode isEndOfStream kan worden gebruikt voor het speciale geval waarin de stream alleen moet worden verwerkt totdat bepaalde specifieke gegevens zijn ontvangen. Maar het is in ons geval niet nodig.

8. Serialisatie van aangepaste objecten

Laten we nu aannemen dat we willen dat ons systeem de mogelijkheid heeft om een ​​back-up van berichten te maken. We willen dat het proces automatisch verloopt en elke back-up moet bestaan ​​uit berichten die gedurende een hele dag worden verzonden.

Ook moet aan een back-upbericht een uniek ID worden toegewezen.

Hiervoor kunnen we de volgende klasse aanmaken:

openbare klasse Backup {@JsonProperty ("inputMessages") Lijst inputMessages; @JsonProperty ("backupTimestamp") LocalDateTime backupTimestamp; @JsonProperty ("uuid") UUID uuid; openbare back-up (lijst inputMessages, LocalDateTime backupTimestamp) {this.inputMessages = inputMessages; this.backupTimestamp = backupTimestamp; this.uuid = UUID.randomUUID (); }}

Houd er rekening mee dat het UUID-generatiemechanisme niet perfect is, omdat het duplicaten toestaat. Dit is echter voldoende voor de reikwijdte van dit voorbeeld.

We willen ons redden Back-up object als JSON naar Kafka, dus we moeten onze SerializationSchema:

openbare klasse BackupSerializationSchema implementeert SerializationSchema {ObjectMapper objectMapper; Logger-logger = LoggerFactory.getLogger (BackupSerializationSchema.class); @Override openbare byte [] serialize (Backup backupMessage) {if (objectMapper == null) {objectMapper = nieuwe ObjectMapper () .registerModule (nieuwe JavaTimeModule ()); } probeer {return objectMapper.writeValueAsString (backupMessage) .getBytes (); } catch (com.fasterxml.jackson.core.JsonProcessingException e) {logger.error ("Kan JSON niet parseren", e); } retourneer nieuwe byte [0]; }}

9. Berichten met tijdstempels

Omdat we een back-up willen maken voor alle berichten van elke dag, hebben berichten een tijdstempel nodig.

Flink geeft de drie verschillende tijdskenmerken EventTime, ProcessingTime, en Inslikken Tijd.

In ons geval moeten we het tijdstip gebruiken waarop het bericht is verzonden, dus we gebruiken EventTime.

Gebruiken EventTimewe hebben een ... nodig TimestampAssigner die tijdstempels uit onze invoergegevens haalt:

openbare klasse InputMessageTimestampAssigner implementeert AssignerWithPunctuatedWatermarks {@Override openbaar lang extractTimestamp (InputMessage-element, lang previousElementTimestamp) {ZoneId zoneId = ZoneId.systemDefault (); return element.getSentAt (). atZone (zoneId) .toEpochSecond () * 1000; } @Nullable @Override public Watermark checkAndGetNextWatermark (InputMessage lastElement, long extractedTimestamp) {return new Watermark (extractedTimestamp - 1500); }}

We moeten onze LocalDateTime naar EpochSecond aangezien dit het formaat is dat door Flink wordt verwacht. Na het toewijzen van tijdstempels, gebruiken alle op tijd gebaseerde bewerkingen de tijd vanaf verzonden veld te bedienen.

Omdat Flink verwacht dat tijdstempels in milliseconden zijn en toEpochSecond () geeft de tijd terug in seconden die we nodig hadden om het te vermenigvuldigen met 1000, dus Flink zal op de juiste manier vensters maken.

Flink definieert het concept van een Watermerk. Watermerken zijn handig in het geval van gegevens die niet aankomen in de volgorde waarin ze zijn verzonden. Een watermerk definieert de maximale vertraging die is toegestaan ​​voor elementen die moeten worden verwerkt.

Elementen met tijdstempels die lager zijn dan het watermerk, worden helemaal niet verwerkt.

10. Tijdvensters maken

Om ervoor te zorgen dat onze back-up alleen berichten verzamelt die gedurende één dag zijn verzonden, kunnen we de timeWindowAll methode op de stream, die berichten in vensters zal splitsen.

We moeten echter nog steeds berichten uit elk venster verzamelen en ze retourneren als Back-up.

Hiervoor hebben we een aangepaste versie nodig AggregateFunction:

openbare klasse BackupAggregator implementeert AggregateFunction {@Override openbare lijst createAccumulator () {retourneer nieuwe ArrayList (); } @Override openbare lijst toevoegen (InputMessage inputMessage, Lijst inputMessages) {inputMessages.add (inputMessage); return inputMessages; } @Override public Backup getResult (List inputMessages) {return new Backup (inputMessages, LocalDateTime.now ()); } @Override openbare lijst samenvoegen (lijst inputMessages, lijst acc1) {inputMessages.addAll (acc1); return inputMessages; }}

11. Samenvoegen van back-ups

Na het toewijzen van de juiste tijdstempels en het implementeren van onze AggregateFunction, kunnen we eindelijk onze Kafka-input nemen en deze verwerken:

public static void createBackup () gooit uitzondering {String inputTopic = "flink_input"; String outputTopic = "flink_output"; String consumerGroup = "baeldung"; String kafkaAddress = "192.168.99.100:9092"; StreamExecutionEnvironment-omgeving = StreamExecutionEnvironment.getExecutionEnvironment (); environment.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); FlinkKafkaConsumer011 flinkKafkaConsumer = createInputMessageConsumer (inputTopic, kafkaAddress, consumerGroup); flinkKafkaConsumer.setStartFromEarliest (); flinkKafkaConsumer.assignTimestampsAndWatermarks (nieuwe InputMessageTimestampAssigner ()); FlinkKafkaProducer011 flinkKafkaProducer = createBackupProducer (outputTopic, kafkaAddress); DataStream inputMessagesStream = environment.addSource (flinkKafkaConsumer); inputMessagesStream .timeWindowAll (Time.hours (24)) .aggregate (nieuwe BackupAggregator ()) .addSink (flinkKafkaProducer); omgeving.execute (); }

12. Conclusie

In dit artikel hebben we laten zien hoe u een eenvoudige gegevenspijplijn maakt met Apache Flink en Apache Kafka.

Zoals altijd is de code te vinden op Github.