# Chapitre 7 - Passage au big data (2ème partie)

**Dans le cadre de ce Notebook, nous allons parler de l'environnement Apache Spark. Ce notebook n'est donc pas applicable dans votre environnement "classique".**

**Pour que le code foctionnne, il vous faut un environnement Spark correctement installé.**

**N'essayez pas de faire fonctionner les cellules si votre environnemnt n'est pas correctement paramétré. Les cellules de code ont été passées au format RawNBConvert afin de ne pas rendre le Notebook inutilisable**

Apache Spark est un projet de la fondation Apache (actuellement dans sa version 3).

Il a pour objectif de pallier les lacunes de Hadoop quant au traitement nécessitant de nombreux allers-retours.

Si, malgré tous vos efforts, vous n’avez pas réussi à extraire des données de
manière qu’elles tiennent dans votre mémoire RAM, le recours à une autre solution deviendra indispensable. Cette solution est Apache Spark.

Cet environnement, développé à Berkeley, est un système de traitement distribué
sur les noeuds d’une infrastructure big data.

Si vous voulez tester Spark, je vous conseille d'essayer la version gratuite de Databricks qui est simple d'accèes :

https://databricks.com/signup#signup/community
    

### 7.4.3 Le DataFrame de Spark SQL

Nous allons nous concentrer sur Spark SQL. Ceci nous permettra d’introduire un objet : le DataFrame de Spark. Il s’agit d’un objet proche du RDD, mais qui permet de stocker de manière distribuée des données structurées, là où les RDD nous permettent de stocker des données non structurées.

Il se rapproche très fortement du DataFrame de Pandas.

#### Lancer votre session Spark
Commençons par lancer une session Spark en utilisant dans un premier temps le
package findspark et la classe SparkSession de pyspark.sql :

In [1]:
# on importe findspark
import findspark
# on initialise findspark pour identifier nos chemins Spark
findspark.init()
# on importe SparkSession
from pyspark.sql import SparkSession
# on crée une session Spark
spark = SparkSession.builder \
         .appName("Exemples avec Python et Spark SQL") \
         .getOrCreate()

24/01/07 22:02:02 WARN Utils: Your hostname, r2-60-gra7 resolves to a loopback address: 127.0.1.1; using 51.91.138.22 instead (on interface ens3)
24/01/07 22:02:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/07 22:02:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/01/07 22:02:04 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


#### Lecture des données (json, parquet, csv, hive)

Spark vous permet de lire de nombreux types de données, que ce soit des données csv ou SQL classiques ou des données issues d’environnements big data. En voici quelques exemples :

In [2]:
# lecture d’un fichier json
df = spark.read.json("../data/data.json")
# lecture d’un fichier parquet
df3 = spark.read.load("../data/data.parquet")

24/01/07 22:02:10 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Spark permet aussi d’utiliser des données issues de fichiers csv :

In [3]:
data_idf=spark.read.format("csv").option("header", "true")\
              .option("delimiter",";")\
              .option("inferSchema", "true")\
              .load("../data/base-comparateur-de-territoires.csv")

Un autre format important dans le cadre du big data est le format Hive. Pour se
connecter à une base Hive et soumettre du code SQL, on utilisera :

On peut aussi transformer un DataFrame Pandas en DataFrame Spark en utilisant :

In [4]:
import pandas as pd
pandas_df = pd.DataFrame([10,20,30])
spark_df = spark.createDataFrame(pandas_df)

#### Manipuler des DataFrames
Il est très simple de manipuler des DataFrames de différentes manières. Spark part du principe que les calculs ne sont pas effectués chaque fois que vous soumettez du code. 

Ils le sont lorsque vous demandez explicitement à Spark de faire les calculs ou
d’afficher les résultats. Ces opérations de calcul ou d’affichage sont appliquées avec les méthodes .collect() ou .show().

Nous allons manipuler les données sur les communes d’Île-de-France. Nous
voulons extraire des informations de ces données sur les communes de la région
Île-de-France.

Les codes ci-dessous nous
permettent d’effectuer la plupart des manipulations dont nous avons besoin :

In [5]:
# on récupère les données d’Ile-de-France
# on a des titres dans la première ligne
# le séparateur est le;
# on demande à Spark d’inférer les types
data_idf = spark.read.format("csv").option("header", "true")\
           .option("delimiter",";")\
           .option("inferSchema", "true")\
           .load("../data/base-comparateur-de-territoires.csv")
# on peut afficher les 8 premiers noms de colonnes :
data_idf.columns[:8]

['CODGEO', 'LIBGEO', 'REG', 'DEP', 'P14_POP', 'P09_POP', 'SUPERF', 'NAIS0914']

In [6]:
# on sélectionne une colonne et on affiche le résultat
data_idf.select("LIBGEO").show()

+--------------------+
|              LIBGEO|
+--------------------+
|       Saint-Gratien|
|          Pierrelaye|
|Saint-Cyr-en-Arthies|
|      La Roche-Guyon|
|       Villiers-Adam|
|       Vallangoujard|
|   Le Plessis-Gassot|
|               Seugy|
|  Villers-en-Arthies|
|         Vaudherland|
|   Asnières-sur-Oise|
|       Saint-Maurice|
|          Arnouville|
|          Bray-et-Lû|
|             Santeny|
|  Le Plessis-Trévise|
|              Bezons|
|      Butry-sur-Oise|
|           Beauchamp|
|            Banthelu|
+--------------------+
only showing top 20 rows



Les opérations ci-dessus sont stockées en mémoire et ne renvoient rien. C’est
uniquement lorsqu’on ajoute show() ou collect() que les opérations sont
effectuées.

In [7]:
# on crée un DataFrame par opération avec deux colonnes dont une colonne
# qui indique si la commune est dans Paris
data_reduced = data_idf.select("P14_POP", data_idf["LIBGEO"].startswith("Paris"), "LIBGEO")
# on peut aussi travailler sur les colonnes
# on peut renommer une colonne :
data_col = data_idf.withColumnRenamed('P14_POP', 'Population_2014')
# on peut supprimer une colonne :
data_col = data_col.drop("LIBGEO", "Population_2014")

In [8]:
# on peut filtrer les observations
data_reduced.filter(data_reduced['startswith(LIBGEO, Paris)'] == True).show()

+--------+-------------------------+--------------------+
| P14_POP|startswith(LIBGEO, Paris)|              LIBGEO|
+--------+-------------------------+--------------------+
| 21263.0|                     true|Paris 2e Arrondis...|
| 26796.0|                     true|Paris 4e Arrondis...|
|165745.0|                     true|Paris 16e Arrondi...|
|170186.0|                     true|Paris 17e Arrondi...|
| 60030.0|                     true|Paris 5e Arrondis...|
| 55486.0|                     true|Paris 7e Arrondis...|
|182318.0|                     true|Paris 13e Arrondi...|
|141230.0|                     true|Paris 14e Arrondi...|
|195468.0|                     true|Paris 20e Arrondi...|
|199135.0|                     true|Paris 18e Arrondi...|
|187156.0|                     true|Paris 19e Arrondi...|
| 35077.0|                     true|Paris 3e Arrondis...|
| 43134.0|                     true|Paris 6e Arrondis...|
| 38257.0|                     true|Paris 8e Arrondis...|
| 59389.0|    

Nous avons sélectionné uniquement les observations commençant par « Paris »,
on obtient donc les 20 arrondissements et leurs populations.

On peut alors sauver ces données sous forme de fichiers parquet ou json :

In [9]:
data_reduced.select("P14_POP","LIBGEO").write.save("resultat.parquet")
data_reduced.select("P14_POP","LIBGEO").write.save("resultat.json",format="json")

[Stage 8:>                                                          (0 + 1) / 1]                                                                                

#### Afficher des statistiques descriptives
Spark permet aussi de calculer des statistiques sur les données en utilisant, par
exemple, une opération groupby :

In [10]:
# on utilise un groupBy par département et
# on affiche le salaire médian moyen
salaire_med_moy = data_idf.groupBy("DEP").agg({"MED14" :"mean"})
salaire_med_moy.show()

+---+------------------+
|DEP|        avg(MED14)|
+---+------------------+
| 78|27908.276609517692|
| 91|25505.856457531612|
| 93|18004.142505975004|
| 94|23223.062544887238|
| 92|27815.275831569437|
| 77|23544.776856209497|
| 95| 24901.96792722259|
| 75|29629.178876815004|
+---+------------------+



In [11]:
# on peut transformer le résultat en format Pandas
salaire_med_moy_pandas = salaire_med_moy.toPandas()
# on aura les sorties de Pandas
salaire_med_moy_pandas.head()

Unnamed: 0,DEP,avg(MED14)
0,78,27908.27661
1,91,25505.856458
2,93,18004.142506
3,94,23223.062545
4,92,27815.275832


De nombreuses opérations proches de celles de Pandas sont disponibles avec
Spark. 


#### Terminer votre session Spark
Une fois que vous avez terminé de travailler sur votre session Spark, vous pouvez la fermer :

In [12]:
spark.stop()

### 7.4.4 Le machine learning avec Spark

#### Préparation des données

Nous supposons que nous avons déjà créé notre session Spark. Nous devons
maintenant récupérer nos données :

In [13]:
# on crée une session Spark
spark = SparkSession.builder \
         .appName("Exemples avec Python et Spark SQL") \
         .getOrCreate()

24/01/07 22:02:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [14]:
# on récupère les données telecom
churn=spark.read.format("csv").option("header", "true")\
        .option("inferSchema", "true")\
        .load("../data/telecom.csv")

La phase de préparation qui suit est importante. Il s’agit de définir les variables explicatives (x) et la variable cible (y) tout en transformant les variables non adaptées :

In [15]:
# on importe une classe qui transforme les colonnes qualitatives en colonnes
# sous forme d’entiers (équivalent de LabelEncoder de Scikit-Learn)
from pyspark.ml.feature import StringIndexer

# on va transformer la colonne Churn? et on va la nommer Churn2
indexer = StringIndexer(inputCol='Churn?', outputCol='Churn2').fit(churn)

# on construit ensuite un vecteur rassemblant toutes les colonnes explicatives
from pyspark.ml.feature import VectorAssembler

# on rassemble la liste des colonnes numériques que l’on va utiliser
numericCols = ['Day Mins','Day Calls','Day Charge','Eve Mins',
               'Eve Calls','Eve Charge','Night Mins','Night Calls',
               'Night Charge','Intl Mins','Intl Calls']

# on crée un objet qui rassemble toutes ces colonnes dans une colonne
# nommée var_expl
assembler = VectorAssembler(inputCols=numericCols, outputCol="var_expl")

# on divise le DataFrame initial (churn) en deux DataFrame représentant
# respectivement 70% et 30% des données

(trainingData, testData) = churn.randomSplit([0.7, 0.3])

À la différence de Scikit-Learn, on va devoir nommer les groupes de variables en
entrée et en sortie lors de la création de l’objet à partir de la classe du modèle. 

Les données doivent donc avoir le format spécifié dans l’objet. Par ailleurs, on utilise un format spécifique pour les variables explicatives qui sont toutes stockées dans une structure à l’intérieur du DataFrame.

#### Création du modèle et du pipeline
Nous pouvons créer notre modèle de forêt aléatoire ainsi que le pipeline associé :

In [16]:
from pyspark.ml.classification import RandomForestClassifier
# on crée notre modèle
model=RandomForestClassifier(labelCol="Churn2", featuresCol="var_expl",
                             numTrees=100)

In [17]:
from pyspark.ml import Pipeline
# on construit le pipeline qui est composé des 3 étapes dévelopées auparavant
pipeline = Pipeline(stages=[indexer, assembler, model])

#### Ajustement et validation du modèle
Nous faisons les calculs sur les données d’apprentissage et testons sur les
données de validation :

In [18]:
# ajustement du modèle
model = pipeline.fit(trainingData)
# prévision sur les données de validation
predictions = model.transform(testData)

Par défaut, Spark va créer de nouvelles colonnes dans nos données avec les
prédictions (colonne prediction) et les probabilités de prédiction (colonne
rawPrediction).

Nous pouvons calculer des métriques comme l’AUC ou le pourcentage de bien
classés (accuracy) :

In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# cette classe calcule l’AUC de notre modèle
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",
                                          labelCol="Churn2")

# on applique les données prédites à notre objet d’évalaution
evaluator.evaluate(predictions)

# L’AUC est affichée

0.7575137686860707

In [20]:
# on calcule l’accuracy manuellement
accuracy = predictions.filter(predictions.Churn2==predictions.prediction)\
                        .count() / float(testData.count())
accuracy
# on obtient l’accuracy

0.8730314960629921

Les métriques utilisées nous permettent de voir que notre modèle ressemble à
celui de Scikit-Learn en termes de performance (il est moins bon car nous avons
moins de variables explicatives).

Nous avons effectué tous les calculs dans notre environnement big data. Le seul
moment où les données sont revenues vers nous est situé à la fin, pour récupérer le
résultat.

Cet exemple illustre bien la simplicité de Spark. L’utilisation de Spark pour des
tâches plus complexes demande plus de travail mais PySpark et les DataFrames
rendent ce passage très aisé pour un data scientist à l’aise avec les outils de traitement
de données de Python.