Introductie tot Netflix Mantis

1. Overzicht

In dit artikel kijken we naar het door Netflix ontwikkelde Mantis-platform.

We zullen de belangrijkste Mantis-concepten verkennen door een streamverwerkingstaak te maken, uit te voeren en te onderzoeken.

2. Wat is bidsprinkhaan?

Mantis is een platform voor het bouwen van stream-processing applicaties (banen). Het biedt een gemakkelijke manier om de inzet en levenscyclus van banen beheren. Bovendien is het vergemakkelijkt toewijzing van middelen, ontdekking en communicatie tussen deze taken.

Daarom kunnen ontwikkelaars zich concentreren op de feitelijke bedrijfslogica, terwijl ze de steun krijgen van een robuust en schaalbaar platform om hun niet-blokkerende applicaties met hoog volume en lage latentie uit te voeren.

Een Mantis-baan bestaat uit drie verschillende delen:

  • de bron, verantwoordelijk voor het ophalen van de gegevens uit een externe bron
  • een of meer stadia, verantwoordelijk voor het verwerken van de inkomende gebeurtenisstromen
  • en een wastafel die de verwerkte gegevens verzamelt

Laten we ze nu eens onderzoeken.

3. Installatie en afhankelijkheden

Laten we beginnen met het toevoegen van de mantis-runtime en jackson-databind afhankelijkheden:

 io.mantisrx mantis-runtime com.fasterxml.jackson.core jackson-databind 

Laten we nu de Mantis implementeren voor het opzetten van de gegevensbron van onze taak Bron koppel:

public class RandomLogSource implementeert Source {@Override public Observable call (Contextcontext, Indexindex) {return Observable.just (Observable .interval (250, TimeUnit.MILLISECONDS) .map (this :: createRandomLogEvent)); } private String createRandomLogEvent (Long tick) {// genereer een willekeurige tekenreeks voor logboekinvoer ...}}

Zoals we kunnen zien, genereert het eenvoudig meerdere keren per seconde willekeurige logboekvermeldingen.

4. Onze eerste baan

Laten we nu een Mantis-taak maken die eenvoudig logboekgebeurtenissen van ons verzamelt RandomLogSource. Later zullen we groeps- en aggregatietransformaties toevoegen voor een complexer en interessanter resultaat.

Laten we om te beginnen een LogEvent entiteit:

openbare klasse LogEvent implementeert JsonType {privé Long-index; privé String-niveau; privé String-bericht; // ...}

Laten we dan onze toevoegen TransformLogStage.

Het is een eenvoudige fase die de ScalarComputation-interface implementeert en een logboekitem splitst om een LogEvent. Het filtert ook alle verkeerd opgemaakte tekenreeksen uit:

public class TransformLogStage implementeert ScalarComputation {@Override public Observable call (Context context, Observable logEntry) {return logEntry .map (log -> log.split ("#")) .filter (parts -> parts.length == 3). map (LogEvent :: nieuw); }}

4.1. De taak uitvoeren

Op dit moment hebben we genoeg bouwstenen om onze Mantis-baan samen te stellen:

public class LogCollectingJob breidt MantisJobProvider uit {@Override public Job getJobInstance () {return MantisJob .source (nieuwe RandomLogSource ()) .stage (nieuwe TransformLogStage (), nieuwe ScalarToScalar.Config ()) .sink (Sinks.eagerSubscribe (Sinks.eagerSubscribe (Sinks.eagerSubse) LogEvent :: toJsonString))) .metadata (nieuwe Metadata.Builder (). Build ()) .create (); }}

Laten we ons werk eens nader bekijken.

Zoals we kunnen zien, strekt het zich uit MantisJobProvider. In eerste instantie haalt het gegevens op van ons RandomLogSource en past de TransformLogStage naar de opgehaalde gegevens. Ten slotte stuurt het de verwerkte gegevens naar de ingebouwde sink die zich gretig abonneert en gegevens levert via SSE.

Laten we nu onze taak configureren om lokaal uit te voeren bij het opstarten:

@SpringBootApplication openbare klasse MantisApplication implementeert CommandLineRunner {// ... @Override openbare ongeldige run (String ... args) {LocalJobExecutorNetworked.execute (nieuwe LogCollectingJob (). GetJobInstance ()); }}

Laten we de applicatie starten. We zien een logbericht zoals:

... Presenteren van moderne HTTP SSE-server-sink op poort: 86XX

Laten we nu verbinding maken met de gootsteen met krullen:

$ curl localhost: 86XX data: {"index": 86, "level": "WARN", "message": "inlogpoging"} data: {"index": 87, "level": "ERROR", "message ":" user created "} data: {" index ": 88," level ":" INFO "," message ":" user created "} data: {" index ": 89," level ":" INFO ", "message": "inlogpoging"} data: {"index": 90, "level": "INFO", "message": "user created"} data: {"index": 91, "level": "ERROR "," message ":" user created "} data: {" index ": 92," level ":" WARN "," message ":" inlogpoging "} data: {" index ": 93," level ": "INFO", "message": "user created"} ...

4.2. De gootsteen configureren

Tot nu toe hebben we de ingebouwde gootsteen gebruikt om onze verwerkte gegevens te verzamelen. Eens kijken of we kunnen meer flexibiliteit aan ons scenario toevoegen door een aangepaste gootsteen te bieden.

Wat als we bijvoorbeeld logboeken willen filteren op bericht?

Laten we een LogSink dat implementeert de Wastafel koppel:

public class LogSink implementeert Sink {@Override public void call (Context context, PortRequest portRequest, Observable logEventObservable) {SelfDocumentingSink sink = new ServerSentEventsSink.Builder () .withEncoder (LogEvent :: toJsonString) .withPredicate (filterBy )Log. ; logEventObservable.subscribe (); sink.call (context, portRequest, logEventObservable); } privé predikaat filterByLogMessage () {retourneer nieuw predikaat ("filter op bericht", parameters -> {if (parameters! = null && parameters.containsKey ("filter")) {retour logEvent -> logEvent.getMessage (). bevat ( parameters.get ("filter"). get (0));} return logEvent -> true;}); }}

In deze Sink-implementatie hebben we een predikaat geconfigureerd dat de filter parameter om alleen logboeken op te halen die de tekst bevatten die is ingesteld in de filter parameter:

$ curl localhost: 8874? filter = inloggegevens: {"index": 93, "level": "ERROR", "message": "inlogpoging"} data: {"index": 95, "level": "INFO "," message ":" inlogpoging "} gegevens: {" index ": 97," level ":" ERROR "," message ":" inlogpoging "} ...

Opmerking Mantis biedt ook een krachtige querytaal, MQL, die kan worden gebruikt voor het opvragen, transformeren en analyseren van stroomgegevens op een SQL-manier.

5. Stage Chaining

Laten we nu aannemen dat we geïnteresseerd zijn om te weten hoeveel FOUT, WAARSCHUWEN, of INFO logboekvermeldingen die we hebben in een bepaald tijdsinterval. Hiervoor voegen we nog twee fasen toe aan ons werk en koppelen we ze aan elkaar.

5.1. Groepering

Laten we eerst een GroupLogStage.

Deze fase is een ToGroupComputation implementatie die een LogEvent gegevens streamen van het bestaande TransformLogStage. Daarna groepeert het de vermeldingen op logboekniveau en stuurt ze naar de volgende fase:

openbare klasse GroupLogStage implementeert ToGroupComputation {@Override public Observable call (Contextcontext, Observable logEvent) {return logEvent.map (log -> nieuwe MantisGroup (log.getLevel (), log)); } openbare statische ScalarToGroup.Config config () {retourneer nieuwe ScalarToGroup.Config () .description ("Groepeer gebeurtenisgegevens op niveau") .codec (JacksonCodecs.pojo (LogEvent.class)) .concurrentInput (); }}

We hebben ook een aangepaste faseconfiguratie gemaakt door een beschrijving op te geven, de codec die moet worden gebruikt voor het serialiseren van de uitvoer, en hebben toegestaan ​​dat de aanroepmethode van deze fase gelijktijdig wordt uitgevoerd door gebruik te maken van concurrentInput ().

Een ding om op te merken is dat deze fase horizontaal schaalbaar is. Dit betekent dat we zoveel instanties van deze fase kunnen uitvoeren als nodig is. Ook het vermelden waard, wanneer ingezet in een Mantis-cluster, deze fase verzendt gegevens naar de volgende fase, zodat alle gebeurtenissen die tot een bepaalde groep behoren, op dezelfde werknemer van de volgende fase terechtkomen.

5.2. Aggregeren

Voordat we verder gaan en de volgende fase maken, voegen we eerst een LogAggregate entiteit:

public class LogAggregate implementeert JsonType {private final Integer count; privé laatste String-niveau; }

Laten we nu de laatste fase in de keten creëren.

Deze fase implementeert GroupToScalarComputation en transformeert een stroom logboekgroepen naar een scalair LogAggregate. Het doet dit door te tellen hoe vaak elk type log in de stream verschijnt. Daarnaast heeft het ook een LogAggregationDuration parameter, die kan worden gebruikt om de grootte van het aggregatievenster te regelen:

openbare klasse CountLogStage implementeert GroupToScalarComputation {privé int duur; @Override public void init (Context context) {duration = (int) context.getParameters (). Get ("LogAggregationDuration", 1000); } @Override public Observable call (Context context, Observable mantisGroup) {return mantisGroup .window (duration, TimeUnit.MILLISECONDS) .flatMap (o -> o.groupBy (MantisGroup :: getKeyValue) .flatMap (groep -> group.reduce (0, (count, value) -> count = count + 1) .map ((count) -> nieuwe LogAggregate (count, group.getKey ())))); } openbare statische GroupToScalar.Config config () {retourneer nieuwe GroupToScalar.Config () .description ("som gebeurtenissen voor een logboekniveau") .codec (JacksonCodecs.pojo (LogAggregate.class)) .withParameters (getParameters ()); } openbare statische lijst getParameters () {Lijst params = nieuwe ArrayList (); params.add (nieuwe IntParameter () .name ("LogAggregationDuration") .description ("venstergrootte voor aggregatie in milliseconden") .validator (Validators.range (100, 10000)) .defaultValue (5000) .build ()); retourparameters; }}

5.3. Configureer en voer de taak uit

Het enige dat u nu nog moet doen, is onze taak configureren:

public class LogAggregationJob breidt MantisJobProvider uit {@Override public Job getJobInstance () {return MantisJob .source (nieuwe RandomLogSource ()) .stage (nieuwe TransformLogStage (), TransformLogStage.stageConfig ()) .stage (nieuwe GroupLogStage ()., GroupLogStage (). )) .stage (nieuwe CountLogStage (), CountLogStage.config ()) .sink (Sinks.eagerSubscribe (Sinks.sse (LogAggregate :: toJsonString))) .metadata (nieuwe Metadata.Builder (). build ()) .create (); }}

Zodra we de applicatie starten en onze nieuwe taak uitvoeren, kunnen we zien dat de logtellingen om de paar seconden worden opgehaald:

$ curl localhost: 8133 data: {"count": 3, "level": "ERROR"} data: {"count": 13, "level": "INFO"} data: {"count": 4, "level ":" WARN "} data: {" count ": 8," level ":" ERROR "} data: {" count ": 5," level ":" INFO "} data: {" count ": 7," level ":" WARN "} ...

6. Conclusie

Samenvattend: in dit artikel hebben we gezien wat Netflix Mantis is en waarvoor het kan worden gebruikt. Verder hebben we gekeken naar de belangrijkste concepten, deze gebruikt om banen te bouwen en aangepaste configuraties voor verschillende scenario's onderzocht.

Zoals altijd is de volledige code beschikbaar op GitHub.