Apache Spark: verschillen tussen dataframes, gegevenssets en RDD's

1. Overzicht

Apache Spark is een snel, gedistribueerd gegevensverwerkingssysteem. Het verwerkt gegevens in het geheugen en gebruikt in-memory caching en geoptimaliseerde uitvoering, wat resulteert in snelle prestaties. Het biedt API's op hoog niveau voor populaire programmeertalen zoals Scala, Python, Java en R.

In deze korte zelfstudie bespreken we drie van de basisconcepten van Spark: dataframes, datasets en RDD's.

2. Dataframe

Spark SQL introduceerde sinds Spark 1.3 een data-abstractie in tabelvorm, een DataFrame genaamd. Sindsdien is het een van de belangrijkste features in Spark geworden. Deze API is handig wanneer we gestructureerde en semi-gestructureerde, gedistribueerde gegevens willen verwerken.

In sectie 3 bespreken we Resilient Distributed Datasets (RDD). Dataframes slaan gegevens op een efficiëntere manier op dan RDD's, dit komt omdat ze de onveranderlijke, in-memory, veerkrachtige, gedistribueerde en parallelle mogelijkheden van RDD's gebruiken, maar ze passen ook een schema op de gegevens toe. DataFrames vertalen ook SQL-code naar geoptimaliseerde RDD-bewerkingen op laag niveau.

We kunnen DataFrames op drie manieren maken:

  • Bestaande RDD's converteren
  • SQL-query's uitvoeren
  • Externe gegevens laden

Spark-team geïntroduceerd SparkSession in versie 2.0 verenigt het alle verschillende contexten, zodat ontwikkelaars zich geen zorgen hoeven te maken over het creëren van verschillende contexten:

SparkSession-sessie = SparkSession.builder () .appName ("TouristDataFrameExample") .master ("local [*]") .getOrCreate (); DataFrameReader dataFrameReader = session.read ();

We analyseren de Tourist.csv het dossier:

Dataset data = dataFrameReader.option ("header", "true") .csv ("data / Tourist.csv");

Sinds Spark 2.0 DataFrame een Dataset van het type Rij, dus we kunnen een DataFrame gebruiken als alias voor een Dataset.

We kunnen specifieke kolommen selecteren waarin we geïnteresseerd zijn. We kunnen ook filteren en groeperen op een bepaalde kolom:

data.select (col ("land"), col ("jaar"), col ("waarde")) .show (); data.filter (col ("land"). equalTo ("Mexico")) .show (); data.groupBy (col ("country")) .count () .show ();

3. Datasets

Een dataset is een set sterk getypeerde, gestructureerde data. Ze bieden de vertrouwde objectgeoriënteerde programmeerstijl plus de voordelen van typeveiligheid, aangezien datasets de syntaxis kunnen controleren en fouten kunnen detecteren tijdens het compileren.

Dataset is een uitbreiding van DataFrame, dus we kunnen een DataFrame beschouwen als een niet-getypeerde weergave van een dataset.

Het Spark-team heeft het Dataset API in Spark 1.6 en zoals ze al zeiden: "het doel van Spark Datasets is om een ​​API te bieden waarmee gebruikers eenvoudig transformaties op objectdomeinen kunnen uitdrukken, terwijl ze ook de prestatie- en robuustheidsvoordelen van de Spark SQL-uitvoeringsengine bieden".

Eerst moeten we een type type maken TouristData:

openbare klasse TouristData {privé String-regio; privé String land; privé String jaar; particuliere String-serie; privé Dubbele waarde; private String-voetnoten; private String-bron; // ... getters en setters}

Om elk van onze records aan het opgegeven type toe te wijzen, hebben we een encoder nodig. Encoders vertalen tussen Java-objecten en het interne binaire formaat van Spark:

// SparkSession-initialisatie en gegevens laden Dataset responseWithSelectedColumns = data.select (col ("region"), col ("country"), col ("year"), col ("series"), col ("value"). Cast ("double"), col ("footnotes"), col ("source")); Dataset typedDataset = responseWithSelectedColumns .as (Encoders.bean (TouristData.class));

Net als bij DataFrame kunnen we filteren en groeperen op specifieke kolommen:

typedDataset.filter ((FilterFunction) record -> record.getCountry () .equals ("Noorwegen")) .show (); typedDataset.groupBy (typedDataset.col ("land")) .count () .show ();

We kunnen ook bewerkingen uitvoeren zoals filteren op kolom die overeenkomt met een bepaald bereik of de som van een specifieke kolom berekenen om de totale waarde ervan te krijgen:

typedDataset.filter ((FilterFunction) record -> record.getYear ()! = null && (Long.valueOf (record.getYear ())> 2010 && Long.valueOf (record.getYear ()) record.getValue ()! = null && record.getSeries () .contains ("uitgaven")) .groupBy ("land") .agg (som ("waarde")) .show ();

4. RDD's

De Resilient Distributed Dataset of RDD is de primaire abstractie van Spark. Het vertegenwoordigt een verzameling elementen die zijn: onveranderlijk, veerkrachtig en verdeeld.

Een RDD kapselt een grote dataset in, Spark zal automatisch de data in RDD's over ons cluster verdelen en de bewerkingen die we daarop uitvoeren parallelliseren.

We kunnen RDD's alleen maken door bewerkingen van gegevens in stabiele opslag of bewerkingen op andere RDD's.

Fouttolerantie is essentieel als we te maken hebben met grote sets gegevens en de gegevens worden gedistribueerd op clustermachines. RDD's zijn veerkrachtig vanwege de ingebouwde mechanismen voor foutherstel van Spark. Spark vertrouwt op het feit dat RDD's onthouden hoe ze zijn gemaakt, zodat we de afstamming gemakkelijk kunnen traceren om de partitie te herstellen.

Er zijn twee soorten bewerkingen die we op RDD's kunnen uitvoeren: Transformaties en acties.

4.1. Transformaties

We kunnen transformaties toepassen op een RDD om de gegevens ervan te manipuleren. Nadat deze manipulatie is uitgevoerd, krijgen we een gloednieuwe RDD, aangezien RDD's onveranderlijke objecten zijn.

We zullen kijken hoe Map en Filter, twee van de meest voorkomende transformaties, geïmplementeerd kunnen worden.

Eerst moeten we een JavaSparkContext en laad de gegevens als een RDD vanuit het Tourist.csv het dossier:

SparkConf conf = nieuwe SparkConf (). SetAppName ("uppercaseCountries") .setMaster ("local [*]"); JavaSparkContext sc = nieuwe JavaSparkContext (conf); JavaRDD tourist = sc.textFile ("data / Tourist.csv");

Laten we vervolgens de kaartfunctie toepassen om de naam van het land uit elk record te halen en de naam om te zetten in hoofdletters. We kunnen deze nieuw gegenereerde dataset opslaan als een tekstbestand op schijf:

JavaRDD upperCaseCountries = tourist.map (line -> {String [] columns = line.split (COMMA_DELIMITER); retourkolommen [1] .toUpperCase ();}). Distinct (); upperCaseCountries.saveAsTextFile ("data / output / hoofdletters.txt");

Als we alleen een specifiek land willen selecteren, kunnen we de filterfunctie toepassen op onze oorspronkelijke RDD voor toeristen:

JavaRDD touristInMexico = tourist .filter (line -> line.split (COMMA_DELIMITER) [1] .equals ("Mexico")); touristInMexico.saveAsTextFile ("data / output / touristInMexico.txt");

4.2. Acties

Acties retourneren een definitieve waarde of slaan de resultaten op schijf op, na enige berekening van de gegevens.

Twee van de regelmatig gebruikte acties in Spark zijn Count en Reduce.

Laten we het totale aantal landen in ons CSV-bestand tellen:

// Spark Context-initialisatie en gegevensladen JavaRDD-landen = tourist.map (regel -> {String [] kolommen = regel.split (COMMA_DELIMITER); retourkolommen [1];}). Distinct (); Lange numberOfCountries = landen.count ();

Nu gaan we de totale uitgaven per land berekenen. We zullen de records met uitgaven in hun beschrijving moeten filteren.

In plaats van een JavaRDDgebruiken we een JavaPairRDD. Een RDD-paar is een type RDD dat sleutel-waardeparen kan opslaan. Laten we het hierna bekijken:

JavaRDD touristExpenditure = toeristen .filter (regel -> regel.split (COMMA_DELIMITER) [3] .contains ("uitgaven")); JavaPairRDD uitgavenPairRdd = touristExpenditure .mapToPair (line -> {String [] columns = line.split (COMMA_DELIMITER); retourneer nieuwe Tuple2 (kolommen [1], Double.valueOf (kolommen [6]));}); Lijst totalByCountry = uitgavenPairRdd .reduceByKey ((x, y) -> x + y) .collect ();

5. Conclusie

Samenvattend: we zouden DataFrames of Datasets moeten gebruiken als we domeinspecifieke API's nodig hebben, we hebben expressies op hoog niveau nodig, zoals aggregatie-, som- of SQL-query's. Of als we typeveiligheid willen tijdens het compileren.

Aan de andere kant moeten we RDD's gebruiken wanneer gegevens ongestructureerd zijn en we geen specifiek schema hoeven te implementeren of wanneer we transformaties en acties op laag niveau nodig hebben.

Zoals altijd zijn alle codevoorbeelden beschikbaar op GitHub.