18.8 C
New York

Analyse approfondie de la dette method Spark


À quel level le mauvais code est-il mauvais : le retour sur investissement de la réparation du code Spark cassé

De temps en temps, je tombe sur du code Spark qui semble avoir été écrit par un développeur Java et cela ne manque jamais de me faire grimacer automobile c’est une event manquée d’écrire du code élégant et efficace : il est verbeux, difficile à lire et plein d’anti-modèles de traitement distribué.

Un tel événement s’est produit il y a quelques semaines lorsqu’un de mes collègues essayait de faire fonctionner un code d’analyse de désabonnement téléchargé à partir de GitHub.

Je cherchais un code cassé pour ajouter un atelier à notre cours Spark Efficiency Tuning et écrire un article de weblog à ce sujet, et cela correspondait parfaitement à la facture.

Pour des raisons de commodité, j’ai choisi de limiter la portée de cet exercice à une fonction spécifique qui prépare les données avant l’analyse de désabonnement.

Right here it's in all its wonderful juiciness:

from pyspark.sql.capabilities import udf,col

from pyspark.sql.sorts import IntegerType




def prepare_data_baseline(df):




    '''

    Operate to organize the given dataframe and divid into teams of churn and non churn

    customers whereas returnng the unique datafrme with a brand new label column right into a spark dataframe.

    Args:

        df- the unique dataframe

    Returns:

        df -  dataframe of the dataset with new column of churn added

        stayed -  dataframe of the non -churn consumer's actions solely.

        all_cancelled -  dataframe of the churn consumer's actions solely.

    '''




    #Outline a udf for cancelled

    canceled = udf(lambda x: 1 if x == 'Cancellation Affirmation' else 0)




    #outline a brand new column 'churn' the place 1 signifies cancellation of subscription, 0 in any other case

    df = df.withColumn('Churn', canceled(df.web page))





    #Dataframe of all that cancelled

    cancelled_df = df.choose('web page', 'userId','Churn').the place(col('churn')==1)

    #Record of cancelled

    list_cancelled = cancelled_df.choose('userId').distinct().accumulate()#listing of cancelled customers




    #Put in a listing format

    gb = ()#short-term variable to retailer lists

    for row in list_cancelled:

        gb.append(row(0))

    canc_list = (x for x in gb if x != '')#take away the invalid customers

    #Complete variety of customers who canceled

    print(f"The variety of churned customers is: {len(canc_list)}")




    #Record of staying customers

    all_users = df.choose('userId').distinct().accumulate()

    gh = ()#a brief variable to retailer all customers




    for row in all_users:

        gh.append(row(0))

    stayed_list = set(gh)-set(gb)#listing of customers staying

    stayed_list = (x for x in stayed_list if x != '')#take away the invalid customers




    #Complete variety of customers who didn't cancel

    print(f"The variety of staying customers is: {len(stayed_list)}")




    #Retailer each canceled and staying customers in new dataframes containng all actions they undertook

    all_cancelled = df.choose("*").the place(col('userId').isin(canc_list))

    stayed = df.choose('*').the place(col('userId').isin(stayed_list))




    #Redefine a udf for churn

    churned = udf(lambda x: 0 if x in stayed_list else 1, IntegerType())

    #Creat new column which will likely be our label column to trace all customers that ultimately cancelled their subscription

    df = df.withColumn('label', churned(col('userId')))




    return df, stayed, all_cancelled


Dans cet article de weblog, je vais décrire les étapes que j’ai suivies pour corriger ce code, puis mesurer la différence de performances d’exécution qui en résulte. Dans le processus, j’énoncerai explicitement les meilleures pratiques que je mettrai en œuvre.

Sautons dans ce terrier de lapin !

Définir un harnais de take a look at de non-régression

Arrêt!

Résistez à la tentation de commencer à peaufiner le code tout de suite !

Vous souhaitez pouvoir :

  • Assurez-vous de ne pas introduire de régression en corrigeant le code
  • Mesurer les améliorations en termes de performances

C’est là que limiter le périmètre de l’analyse à une fonction s’est avéré utile : cela m’a permis d’utiliser des outils advert hoc et simples :

  • J’ai isolé la fonction d’origine dans une fonction prepare_data_baseline dans un fichier séparé prepareData_baseline.py
  • J’ai créé un nouveau fichier appelé prepare_data.py avec la nouvelle model de la fonction prepare_data
  • J’ai mesuré le temps pour effectuer le traitement à l’aide de la bibliothèque de temps
  • Et j’ai comparé les DataFrames résultants avec subtract

Parce que l’évaluation paresseuse diffère le second où le code est réellement exécuté, j’ai ajouté du code qui enregistre les DataFrames dans des fichiers, forçant ainsi la matérialisation des DataFrames by way of l’exécution du code. J’ai également ajouté ces lignes dans le cadre de la mesure du temps.

Et voilà à quoi ça ressemble :

from pyspark.sql import SparkSession

import time, datetime

from prepareData import prepare_data

from prepareData_baseline import prepare_data_baseline




spark = SparkSession 

    .builder 

    .appName("Churn Evaluation Information Preparation Check Harness") 

    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")




spark.conf.set('spark.sql.adaptive.enabled','false')

print(f"AQE enabled: {spark.conf.get('spark.sql.adaptive.enabled')}")




df = spark.learn.json('knowledge/mini_sparkify_event_data.json')





#Baseline model




process_time_start = time.perf_counter()                   # Begin timer: start processing

df_baseline, stayed_baseline, all_cancelled_baseline = prepare_data_baseline(df)

df_baseline.write.mode("overwrite").json('knowledge/df_baseline')

stayed_baseline.write.mode("overwrite").json('knowledge/stayed_baseline')

all_cancelled_baseline.write.mode("overwrite").json('knowledge/all_cancelled_baseline')

process_time_end = time.perf_counter()                     # Cease timer: finish processing

process_time = process_time_end - process_time_start       # Elapsed time for processing

totalTime = datetime.timedelta(seconds = process_time)




print(f"Making ready knowledge took with the baseline model took {totalTime}")




#New model




process_time_start = time.perf_counter()                   # Begin timer: start processing

df, stayed, all_cancelled = prepare_data(df)

df.write.mode("overwrite").json('knowledge/df')

stayed.write.mode("overwrite").json('knowledge/stayed')

all_cancelled.write.mode("overwrite").json('knowledge/all_cancelled')

process_time_end = time.perf_counter()                     # Cease timer: finish processing

process_time = process_time_end - process_time_start       # Elapsed time for processing

totalTime = datetime.timedelta(seconds = process_time)




print(f"Making ready knowledge took with the brand new model took {totalTime}")




# Regression Testing




def diffDataFrame(df1,df2):

    return df1.subtract(df2).rely()




print(f"New processing launched {diffDataFrame(df,df_baseline)} variations in df.")

print(f"New processing launched {diffDataFrame(all_cancelled,all_cancelled_baseline)} variations in all_cancelled.")

print(f"New processing launched {diffDataFrame(stayed,stayed_baseline)} variations in stayed.")




spark.cease()


Retro documenter les exigences

Cette étape était assez facile à trigger des commentaires qui étaient présents dans le code preliminary.

Cette fonction :

  • Prend un DataFrame contenant les activités des utilisateurs,
  • le divise en deux groupes d’activités :
    • les activités des utilisateurs qui ont fini par retourner et
    • les activités des utilisateurs qui ne l’ont pas fait, et
  • ajoute une colonne « label » au DataFrame d’entrée pour baliser les activités appartenant aux utilisateurs qui ont finalement été désabonnés (1 si l’utilisateur a renvoyé 0 dans le cas contraire).

Si cela vous semble étrangement redondant, je suis d’accord. Mais laissons cette query de côté pour l’on the spot ; nous y reviendrons une fois que nous serons satisfaits de notre nouvelle model du code.

Refactoriser le code

Le principal problème du code est l’utilisation de listes Python pour obtenir les résultats requis. Ces listes sont créées en collectant les DataFrames sur le pilote Spark où les boucles for seront traitées, ce qui rend ce code non évolutif : au-delà d’un sure nombre d’utilisateurs, la mémoire du pilote peut être saturée et le programme plantera.

De plus, ce choix empêche le code de tirer parti de toutes les optimisations fournies avec les opérations DataFrames.

Ensuite, le code utilise des UDF Pyspark simples pour lesquelles vous encourez une pénalité de performances en raison de la nécessité de :

  • Désérialiser le Spark DataFrame en sa représentation Java
  • Transférez l’objet Java résultant au processus Python où l’UDF sera exécuté
  • Sérialiser la sortie de la fonction au format Spark

Méfiez-vous du coût des UDF Pyspark

Il existe des moyens d’atténuer ces problèmes en utilisant PyArrow et les UDF vectorielles lorsque vous en avez vraiment besoin, mais ce n’est pas le cas.

Tout d’abord, la fonction crée une colonne « Churn », ce qui, je suppose, est à des fins de commodité. Un utilisateur est identifié comme « baratté » s’il s’est rendu sur la web page « Affirmation d’annulation ».

Ceci est réalisé avec un appel withColumn et une UDF.

 #Outline a udf for cancelled

    canceled = udf(lambda x: 1 if x == 'Cancellation Affirmation' else 0)



    #outline a brand new column 'churn' the place 1 signifies cancellation of subscription, 0 in any other case

    df = df.withColumn('Churn', canceled(df.web page))




There isn't a want for a UDF in that case, these traces of code may be changed by a easy column expression like so:

    #outline a brand new column 'churn' the place 1 signifies cancellation of subscription, 0 in any other case

    df = df.withColumn('Churn', (df.web page == 'Cancellation Affirmation').solid('integer').solid('string'))


Je pense que le sort right pour cette nouvelle colonne serait booléen, mais à des fins de non-régression, j’ai dû le convertir en une chaîne de 0 ou 1.

Ensuite, l’auteur procède à la création de deux listes : une pour les utilisateurs qui se sont retirés et une pour les utilisateurs qui sont restés. Étant donné que mon objectif est d’éviter ces listes, je vais plutôt créer les DataFrames correspondants :

 all_users = df.choose(df.userId).distinct().the place(df.userId != '')

    churned_users = df.the place(df.Churn == '1').choose(df.userId).distinct().the place(df.userId != '')

    stayed_users = all_users.subtract(churned_users)


D’abord, je crée un DataFrame de tous les utilisateurs non vides, puis le DataFrame des utilisateurs qui se sont retournés, et je définis les utilisateurs qui sont restés comme la différence entre les deux.

L’auteur utilise les listes maladroitement créées avec les UDF pour créer les DataFrames all_cancelled et stayed. Voici le code du premier :

#Record of cancelled

    list_cancelled = cancelled_df.choose('userId').distinct().accumulate()#listing of cancelled customers




    #Put in a listing format

    gb = ()#short-term variable to retailer lists

    for row in list_cancelled:

        gb.append(row(0))

    canc_list = (x for x in gb if x != '')#take away the invalid customers



    all_cancelled = df.choose("*").the place(col('userId').isin(canc_list))
 

Je me rends compte maintenant que la boucle « Mettre au format liste » est probablement inutile.

Pour créer le même DataFrame, je fais simplement ce qui go well with :

all_cancelled = df.be part of(churned_users,'userId')

La même method est appliquée pour créer le DataFrame resté :

stayed = df.be part of(stayed_users,'userId')


Enfin, l’auteur ajoute la colonne « label » au DataFrame principal en utilisant un UDF :

#Redefine a udf for churn

    churned = udf(lambda x: 0 if x in stayed_list else 1, IntegerType())

    #Creat new column which will likely be our label column to trace all customers that ultimately cancelled their subscription

    df = df.withColumn('label', churned(col('userId')))
 

Au lieu de cela, j’utilise simplement une union:

    df_label = all_cancelled.withColumn('label',lit(1)).union(stayed.withColumn('label',lit(0)))


Cela a déclenché une régression automobile je n’ai pas inclus les utilisateurs nuls. Je me demande quelle utilisation pourrait être faite des enregistrements avec des utilisateurs nuls pour former un modèle afin de prédire le taux de désabonnement du comportement des utilisateurs, mais à des fins de non-régression, je les ai également ajoutés :

    empty_users = df.the place(df.userId.isNull())



    #Add empty customers for non regression functions




    df_label = df_label.union(empty_users.withColumn('label',lit(1)))


Enfin, j’ai également dû réordonner les colonnes de mes DataFrames pour que mes simples checks de non-régression réussissent :

# Trier les colonnes

    columns = ('artist','auth','firstName','gender','itemInSession','lastName','size','degree','location','methodology','web page','registration','sessionId','tune','standing','ts','userAgent','userId','Churn','label')

    df_label_sorted = df_label.choose(columns)

    columns = ('artist','auth','firstName','gender','itemInSession','lastName','size','degree','location','methodology','web page','registration','sessionId','tune','standing','ts','userAgent','userId','Churn')

    all_cancelled_sorted = all_cancelled.choose(columns)

    stayed_sorted = stayed.choose(columns)


Ceci est ma model complète de la fonction:

from pyspark.sql.capabilities import lit




def prepare_data(df):




    '''

    Operate to organize the given dataframe and divide into teams of churn and non churn

    customers whereas returning the unique DataFrame with a brand new label column right into a spark dataframe.

    Args:

        df- the unique dataframe

    Returns:

        df -  dataframe of the dataset with new column of churn added

        stayed -  dataframe of the non -churn consumer's actions solely.

        all_cancelled -  dataframe of the churn consumer's actions solely.

    '''




    #outline a brand new column 'churn' the place 1 signifies cancellation of subscription, 0 in any other case

    df = df.withColumn('Churn', (df.web page == 'Cancellation Affirmation').solid('integer').solid('string'))




    all_users = df.choose(df.userId).distinct().the place(df.userId != '')

    churned_users = df.the place(df.Churn == '1').choose(df.userId).distinct().the place(df.userId != '')

    stayed_users = all_users.subtract(churned_users)

    empty_users = df.the place(df.userId.isNull())




    #Retailer each canceled and staying customers in new DataFrames containing all actions they undertook




    all_cancelled = df.be part of(churned_users,'userId')

    stayed = df.be part of(stayed_users,'userId')

    df_label = all_cancelled.withColumn('label',lit(1)).union(stayed.withColumn('label',lit(0)))




    #Add empty customers for non regression functions




    df_label = df_label.union(empty_users.withColumn('label',lit(1)))




    # Type the columns

    columns = ('artist','auth','firstName','gender','itemInSession','lastName','size','degree','location','methodology','web page','registration','sessionId','tune','standing','ts','userAgent','userId','Churn','label')

    df_label_sorted = df_label.choose(columns)

    columns = ('artist','auth','firstName','gender','itemInSession','lastName','size','degree','location','methodology','web page','registration','sessionId','tune','standing','ts','userAgent','userId','Churn')

    all_cancelled_sorted = all_cancelled.choose(columns)

    stayed_sorted = stayed.choose(columns)




    #Complete variety of customers who canceled

    print(f"The variety of churned customers is: {churned_users.rely()}")

    #Complete variety of customers who didn't cancel

    print(f"The variety of staying customers is: {stayed_users.rely()}")


    return df_label_sorted, stayed_sorted, all_cancelled_sorted


Non régression et efficiency

J’ai pu vérifier que je n’avais introduit aucune régression dans ma model de la fonction sur mon bureau avec Spark 3.3.

Afin d’obtenir des mesures de performances significatives, j’avais besoin d’utiliser l’ensemble de données JSON 12G complet. Sinon, avec de petites données, la plupart du temps est consacré aux frais généraux et les résultats varient énormément.

Je suis donc passé à notre service de données CML en utilisant Spark 3.2 et j’ai adapté le code en conséquence.

CML utilise Spark sur Kubernetes et la valeur par défaut est l’allocation dynamique des exécuteurs. J’ai dû désactiver cela pour obtenir un environnement secure et donc des mesures significatives :

import time, datetime

from prepareData import prepare_data

from prepareData_baseline import prepare_data_baseline

from prepareData_improved import prepare_data_improved

import cml.data_v1 as cmldata

from env import S3_ROOT, S3_HOME, CONNECTION_NAME




conn = cmldata.get_connection(CONNECTION_NAME)

spark = (

            SparkSession.builder.appName(conn.app_name)

            .config("spark.sql.hive.hwc.execution.mode", "spark")

            .config("spark.dynamicAllocation.enabled","false")

            .config("spark.executor.cases", 3)

            .config("spark.executor.reminiscence","32g")

            .config("spark.executor.cores",4)

            .config("spark.yarn.entry.hadoopFileSystems", conn.hive_external_dir)

            .getOrCreate()

        )




spark.sparkContext.setLogLevel("ERROR")

spark.conf.set('spark.sql.adaptive.enabled','true')

print(f"AQE enabled: {spark.conf.get('spark.sql.adaptive.enabled')}")
 

Cela m’a donné le résultat souhaité:

J’ai ensuite découvert que l’ensemble de données 12G complet contenait un enregistrement corrompu auquel je devais faire face, et pendant que j’y étais, j’ai converti le fichier au format Parquet pour me faire gagner du temps :

Convertir les premiers codecs en colonnes compressées (Parquet, ORC)

J’ai créé une fonction qui effectue les checks pour éviter le code répétitif dans laquelle j’ai également ajouté des appels à setJobGroupsetJobGroup et setJobDescriptionsetJobDescription pour améliorer la lisibilité de l’interface utilisateur Spark :

def measureDataPreparation(df,f,versionName):

  spark.sparkContext.setJobGroup(versionName,"")

  # Begin timer: start processing

  process_time_start = time.perf_counter()                  

  df, stayed, all_cancelled = f(df)

  spark.sparkContext.setJobDescription("Write /knowledge/df")

  df.write.mode("overwrite").json(S3_HOME + '/knowledge/df')

  spark.sparkContext.setJobDescription("Write /knowledge/stayed")

  stayed.write.mode("overwrite").json(S3_HOME + '/knowledge/stayed')

  spark.sparkContext.setJobDescription("Write /knowledge/all_cancelled")

  all_cancelled.write.mode("overwrite").json(S3_HOME + '/knowledge/all_cancelled')

  # Cease timer: finish processing

  process_time_end = time.perf_counter()                    

  # Elapsed time for processing

  process_time = process_time_end - process_time_start      

  totalTime = datetime.timedelta(seconds = process_time)

  print(f"Making ready knowledge with the {versionName} took {totalTime}")

Utilisez setJobGroup et setJobDescription pour améliorer la lisibilité de l’interface utilisateur Spark

Et voici à quoi ressemble l’interface utilisateur Spark :

Comme j’avais établi que je n’avais introduit aucune régression, j’ai également supprimé les checks de régression.

Voici la partie pertinente du résultat de la session :

measureDataPreparation(df,prepare_data_baseline,"baseline model")

The variety of churned customers is: 4982

The variety of staying customers is: 17282

Making ready knowledge with the baseline model took 0:09:11.799036




measureDataPreparation(df,prepare_data,"no regression model"

The variety of churned customers is: 4982

The variety of staying customers is: 17282

Making ready knowledge with the no regression model took 0:01:48.224514



Grand succès! La nouvelle model est plus de quatre fois plus efficace !

Autres améliorations

Comme je n’ai plus besoin de tester la non régression, je peux supprimer le tri des colonnes.

Je peux également supprimer le code qui imprime le nombre d’utilisateurs abandonnés et restés. Ce code n’appartient pas à une fonction qui s’exécutera très probablement sans surveillance dans un pipeline de données.

Il déclenche une exécution distribuée pour calculer des résultats que personne ne verra. Il devrait être laissé au code qui appelle la fonction de consigner ou non ce sort d’informations.

Il s’agit également d’un exemple de violation de la règle suivante :

Supprimer le code qui a aidé au débogage avec rely(), take() ou present() en manufacturing

J’ai vérifié le reste du code preliminary, et après une exploration exhaustive des données et juste avant de diviser l’ensemble de données à des fins de formation, l’auteur supprime les lignes avec des utilisateurs nuls. Il ne sert à rien de transporter ce bagage supplémentaire tout ce temps. En fait, cela enfreint une autre règle du traitement des mégadonnées :

Filtrer tôt

Enfin, j’ai supprimé le casting de la colonne « Churn » et l’ai laissé en tant que booléen. J’ai également vérifié qu’il n’était pas utilisé en dehors de cette fonction et je l’ai renommé « churn » parce que je détestais ce « C » majuscule avec toute la ardour d’un millier de soleils brûlants.

Ceci est la model finale du code :

from pyspark.sql.capabilities import lit




def prepare_data_improved(df):




    '''

    Operate to organize the given DataFrame and divide into teams of churn and non churn

    customers whereas returning the unique DataFrame with a brand new label column right into a Spark DataFrame.

    Args:

        df- the unique DataFrame

    Returns:

        df -  DataFrame of the dataset with new column of churn added

        stayed -  DataFrame of the non -churn consumer's actions solely.

        all_cancelled -  DataFrame of the churn consumer's actions solely.

    '''




    #outline a brand new column 'churn' the place 1 signifies cancellation of subscription, 0 in any other case

    df = df.the place(df.userId != '').withColumn('churn', (df.web page == 'Cancellation Affirmation'))




    all_users = df.choose(df.userId).distinct()

    churned_users = df.the place(df.churn).choose(df.userId).distinct()

    stayed_users = all_users.subtract(churned_users)

 

    #Retailer each canceled and staying customers in new DataFrames containing all actions they undertook




    all_cancelled = df.be part of(churned_users,'userId')

    stayed = df.be part of(stayed_users,'userId')

    df_label = all_cancelled.withColumn('label',lit(1)).union(stayed.withColumn('label',lit(0)))

 

    return df_label, stayed, all_cancelled

Conclusion

Maintenant que j’ai réalisé la non régression en utilisant exclusivement DataFrame, et que j’ai aussi une model améliorée, je devrais pouvoir mesurer les bénéfices de l’utilisation du cache Spark et du moteur Adaptive Question Execution.

Voici les résultats complets :

Dans cette expérience limitée, le facteur numéro un qui affect les performances de l’exécution est la refactorisation du code Spark pour supprimer les anti-modèles de traitement distribué.

La mise en cache des données, l’amélioration supplémentaire du code ou l’utilisation d’AQE apportent toutes des améliorations marginales par rapport à l’élimination de la dette method.

Le retour sur investissement de la formation est toujours un problème épineux en raison de la difficulté à le mesurer facilement dans un tableur, mais avec cette expérience, j’espère avoir montré que le manque de compétences devrait être une préoccupation majeure pour toute organisation exécutant des expenses de travail Spark.

Si vous souhaitez acquérir une expérience pratique avec Spark 3.2, ainsi que d’autres outils et strategies pour que vos tâches Spark s’exécutent à des performances optimales, inscrivez-vous à Cloudera Cours de réglage des performances d’Apache Spark.

Si vous avez besoin d’une introduction à AQE, veuillez vous référer à mon précédent article de weblog.

Related Articles

LAISSER UN COMMENTAIRE

S'il vous plaît entrez votre commentaire!
S'il vous plaît entrez votre nom ici

Latest Articles