PySpark Pandas_Udf()

Pyspark Pandas Udf



Transformácia PySpark DataFrame je možná pomocou funkcie pandas_udf(). Je to užívateľsky definovaná funkcia, ktorá sa aplikuje na PySpark DataFrame so šípkou. Vektorizované operácie môžeme vykonávať pomocou pandas_udf(). Dá sa implementovať odovzdaním tejto funkcie ako dekoratér. Poďme sa ponoriť do tejto príručky, aby sme poznali syntax, parametre a rôzne príklady.

Téma obsahu:

Ak chcete vedieť o PySpark DataFrame a inštalácii modulu, prejdite si toto článok .







Pyspark.sql.functions.pandas_udf()

Pandas_udf () je k dispozícii v module sql.functions v PySpark, ktorý je možné importovať pomocou kľúčového slova „from“. Používa sa na vykonávanie vektorizovaných operácií na našom PySpark DataFrame. Táto funkcia je implementovaná ako dekorátor odovzdaním troch parametrov. Potom môžeme vytvoriť užívateľom definovanú funkciu, ktorá vráti dáta vo vektorovom formáte (ako na to používame series/NumPy) pomocou šípky. V rámci tejto funkcie sme schopní vrátiť výsledok.



Štruktúra a syntax:



Najprv sa pozrime na štruktúru a syntax tejto funkcie:

@pandas_udf(datatype)
def názov_funkcie (operácia) -> convert_format:
návratový výkaz

Tu je názov_funkcie názov našej definovanej funkcie. Typ údajov určuje typ údajov, ktorý vracia táto funkcia. Výsledok môžeme vrátiť pomocou kľúčového slova „návrat“. Všetky operácie sa vykonávajú vo funkcii s priradením šípok.





Pandas_udf (funkcia a typ návratu)

  1. Prvý parameter je používateľom definovaná funkcia, ktorá sa mu odovzdá.
  2. Druhý parameter sa používa na určenie typu návratových údajov z funkcie.

údaje:

V celej tejto príručke používame na demonštráciu iba jeden PySpark DataFrame. Všetky užívateľom definované funkcie, ktoré definujeme, sú aplikované na tento PySpark DataFrame. Uistite sa, že tento DataFrame vytvoríte vo svojom prostredí najskôr po inštalácii PySpark.



importovať pyspark

z pyspark.sql importujte SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

z pyspark.sql.functions importujte pandas_udf

import z pyspark.sql.types *

importovať pandy ako pandy

# detaily zeleniny

zelenina =[{ 'typ' : 'zelenina' , 'názov' : 'paradajka' , 'locate_country' : 'USA' , 'množstvo' : 800 },

{ 'typ' : 'ovocie' , 'názov' : 'banán' , 'locate_country' : 'ČÍNA' , 'množstvo' : dvadsať },

{ 'typ' : 'zelenina' , 'názov' : 'paradajka' , 'locate_country' : 'USA' , 'množstvo' : 800 },

{ 'typ' : 'zelenina' , 'názov' : 'Mango' , 'locate_country' : 'JAPONSKO' , 'množstvo' : 0 },

{ 'typ' : 'ovocie' , 'názov' : 'citrón' , 'locate_country' : „INDIA“ , 'množstvo' : 1700 },

{ 'typ' : 'zelenina' , 'názov' : 'paradajka' , 'locate_country' : 'USA' , 'množstvo' : 1200 },

{ 'typ' : 'zelenina' , 'názov' : 'Mango' , 'locate_country' : 'JAPONSKO' , 'množstvo' : 0 },

{ 'typ' : 'ovocie' , 'názov' : 'citrón' , 'locate_country' : „INDIA“ , 'množstvo' : 0 }

]

# vytvorte trhový dátový rámec z vyššie uvedených údajov

market_df = linuxhint_spark_app.createDataFrame(zelenina)

market_df.show()

Výkon:

Tu vytvoríme tento DataFrame so 4 stĺpcami a 8 riadkami. Teraz používame pandas_udf() na vytvorenie užívateľom definovaných funkcií a aplikujeme ich na tieto stĺpce.

Pandas_udf() s rôznymi typmi údajov

V tomto scenári vytvoríme niektoré užívateľom definované funkcie pomocou pandas_udf() a aplikujeme ich na stĺpce a zobrazíme výsledky pomocou metódy select(). V každom prípade pri vykonávaní vektorizovaných operácií používame sériu pandas. Toto považuje hodnoty stĺpca za jednorozmerné pole a operácia sa použije na stĺpec. V samotnom dekorátore určíme návratový typ funkcie.

Príklad 1: Pandas_udf() s typom reťazca

Tu vytvoríme dve užívateľom definované funkcie s návratovým typom reťazca na prevod hodnôt stĺpcov typu reťazec na veľké a malé písmená. Nakoniec tieto funkcie aplikujeme na stĺpce „type“ a „locate_country“.

# Preveďte stĺpec typu na veľké písmená pomocou pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

vrátiť i.str.upper()

# Preveďte stĺpec locate_country na malé písmená pomocou pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

return i.str.lower()

# Zobrazte stĺpce pomocou select()

market_df.select( 'typ' ,type_upper_case( 'typ' ), 'locate_country' ,
country_lower_case( 'locate_country' )).šou()

Výkon:

Vysvetlenie:

Funkcia StringType() je dostupná v module pyspark.sql.types. Tento modul sme už importovali pri vytváraní PySpark DataFrame.

  1. Po prvé, UDF (používateľom definovaná funkcia) vráti reťazce napísané veľkými písmenami pomocou funkcie str.upper(). Str.upper() je k dispozícii v štruktúre údajov série (keďže konvertujeme na sériu so šípkou vo funkcii), ktorá skonvertuje daný reťazec na veľké písmená. Nakoniec sa táto funkcia aplikuje na stĺpec „type“, ktorý je špecifikovaný v metóde select(). Predtým boli všetky reťazce v stĺpci typu písané malými písmenami. Teraz sú zmenené na veľké písmená.
  2. Po druhé, UDF vráti reťazce veľkými písmenami pomocou funkcie str.lower(). Str.lower() je k dispozícii v štruktúre údajov série, ktorá skonvertuje daný reťazec na malé písmená. Nakoniec sa táto funkcia aplikuje na stĺpec „type“, ktorý je špecifikovaný v metóde select(). Predtým boli všetky reťazce v stĺpci typu písané veľkými písmenami. Teraz sú zmenené na malé písmená.

Príklad 2: Pandas_udf() s typom Integer

Vytvorme UDF, ktorý prevedie celočíselný stĺpec PySpark DataFrame na sériu Pandas a ku každej hodnote pridá 100. V rámci metódy select() odovzdajte stĺpec „množstvo“ tejto funkcii.

# Pridajte 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

vrátiť i+ 100

# Preneste stĺpec množstva do vyššie uvedenej funkcie a displeja.

market_df.select( 'množstvo' ,add_100( 'množstvo' )).šou()

Výkon:

Vysvetlenie:

Vo vnútri UDF iterujeme všetky hodnoty a konvertujeme ich na Series. Potom ku každej hodnote v sérii pridáme 100. Nakoniec tejto funkcii odovzdáme stĺpec „množstvo“ a vidíme, že ku všetkým hodnotám sa pripočíta 100.

Pandas_udf() s rôznymi typmi údajov pomocou Groupby() a Agg()

Pozrime sa na príklady odovzdania UDF do agregovaných stĺpcov. Tu sú hodnoty stĺpcov najprv zoskupené pomocou funkcie groupby() a agregácia sa vykonáva pomocou funkcie agg(). Náš UDF odovzdávame do tejto agregovanej funkcie.

Syntax:

pyspark_dataframe_object.groupby( 'stĺpec_zoskupenia' ).agg(UDF
(pyspark_dataframe_object[ 'stĺpec' ]))

Tu sú najprv zoskupené hodnoty v stĺpci zoskupenia. Potom sa agregácia vykoná na všetkých zoskupených údajoch s ohľadom na náš UDF.

Príklad 1: Pandas_udf() s Aggregate Mean()

Tu vytvoríme užívateľom definovanú funkciu s návratovým typom float. Vo vnútri funkcie vypočítame priemer pomocou funkcie mean(). Tento UDF sa odovzdá do stĺpca „množstvo“, aby sa získalo priemerné množstvo pre každý typ.

# vráti priemer/priemer

@pandas_udf( 'plavák' )

def average_function(i: panda.Series) -> float:

vrátiť i.mean()

# Odovzdajte stĺpec množstva funkcii zoskupením stĺpca typu.

market_df.groupby( 'typ' ).agg(priemerná_funkcia(trh_df[ 'množstvo' ])).šou()

Výkon:

Zoskupujeme na základe prvkov v stĺpci „typ“. Vznikajú dve skupiny – „ovocie“ a „zelenina“. Pre každú skupinu sa vypočíta a vráti priemer.

Príklad 2: Pandas_udf() s Aggregate Max() a Min()

Tu vytvoríme dve užívateľom definované funkcie s návratovým typom integer (int). Prvý UDF vráti minimálnu hodnotu a druhý UDF vráti maximálnu hodnotu.

# pandas_udf, ktoré vrátia minimálnu hodnotu

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

vrátiť i.min()

# pandas_udf, ktoré vrátia maximálnu hodnotu

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

návrat i.max()

# Preneste stĺpec množstva do min_ pandas_udf zoskupením locate_country.

market_df.groupby( 'locate_country' ).agg(min_(market_df[ 'množstvo' ])).šou()

# Odovzdajte stĺpec množstva do max_ pandas_udf zoskupením locate_country.

market_df.groupby( 'locate_country' ).agg(max_(market_df[ 'množstvo' ])).šou()

Výkon:

Na vrátenie minimálnych a maximálnych hodnôt využívame funkcie min() a max() v návratovom type UDF. Teraz zoskupujeme údaje v stĺpci „locate_country“. Vytvoria sa štyri skupiny („ČÍNA“, „INDIA“, „JAPONSKO“, „USA“). Pre každú skupinu vraciame maximálne množstvo. Podobne vraciame minimálne množstvo.

Záver

V podstate sa pandas_udf () používa na vykonávanie vektorizovaných operácií na našom PySpark DataFrame. Videli sme, ako vytvoriť pandas_udf() a použiť ho na PySpark DataFrame. Pre lepšie pochopenie sme diskutovali o rôznych príkladoch zvážením všetkých dátových typov (reťazec, float a celé číslo). Je možné použiť pandas_udf() s groupby() prostredníctvom funkcie agg().