Konverzia PySpark DataFrame na CSV

Konverzia Pyspark Dataframe Na Csv



Pozrime sa na štyri rôzne scenáre prevodu dátového rámca PySpark na CSV. Priamo používame metódu write.csv() na konverziu PySpark DataFrame na CSV. Pomocou funkcie to_csv() konvertujeme PySpark Pandas DataFrame na CSV. Môže to byť možné aj jeho konverziou na pole NumPy.

Téma obsahu:

Ak chcete vedieť o PySpark DataFrame a inštalácii modulu, prejdite si toto článok .







PySpark DataFrame na CSV konverziou na Pandas DataFrame

To_csv() je metóda, ktorá je dostupná v module Pandas a ktorá konvertuje Pandas DataFrame na CSV. Najprv musíme previesť náš PySpark DataFrame na Pandas DataFrame. Používa sa na to metóda toPandas(). Pozrime sa na syntax to_csv() spolu s jej parametrami.



Syntax:



pandas_dataframe_obj.to_csv(cesta/ 'názov_súboru.csv' , hlavička ,index,stĺpce,režim...)
  1. Musíme zadať názov súboru CSV. Ak chcete stiahnutý súbor CSV uložiť na určité miesto v počítači, môžete spolu s názvom súboru zadať aj cestu.
  2. Stĺpce sú zahrnuté, ak je hlavička nastavená na „True“. Ak stĺpce nepotrebujete, nastavte hlavičku na hodnotu „False“.
  3. Indexy sú špecifikované, ak je index nastavený na „True“. Ak indexy nepotrebujete, nastavte index na „False“.
  4. Parameter Columns obsahuje zoznam názvov stĺpcov, v ktorom môžeme určiť, ktoré konkrétne stĺpce sa extrahujú do súboru CSV.
  5. Záznamy vieme pridať do CSV pomocou parametra mode. Append – používa sa na to „a“.

Príklad 1: S parametrami hlavičky a indexu

Vytvorte „skills_df“ PySpark DataFrame s 3 riadkami a 4 stĺpcami. Skonvertujte tento DataFrame na CSV tak, že ho najprv skonvertujete na Pandas DataFrame.





importovať pyspark

z pyspark.sql importujte SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# údaje o zručnostiach s 3 riadkami a 4 stĺpcami

schopnosti =[{ 'id' : 123 , 'osoba' : 'Med' , 'skill' : 'maľovanie' , 'cena' : 25 000 },

{ 'id' : 112 , 'osoba' : 'Mouni' , 'skill' : 'tancovať' , 'cena' : 2000 },

{ 'id' : 153 , 'osoba' : 'Tulasi' , 'skill' : 'čítanie' , 'cena' : 1200 }

]

# vytvorte dátový rámec zručností z vyššie uvedených údajov

skills_df = linuxhint_spark_app.createDataFrame(zručnosti)

skills_df.show()

# Previesť skills_df na pandas DataFrame

pandas_skills_df= skills_df.toPandas()

print(pandas_skills_df)

# Preveďte tento DataFrame na csv s hlavičkou a indexom

pandas_skills_df.to_csv( 'pandas_skills1.csv' , hlavička =True, index=True)

Výkon:



Vidíme, že PySpark DataFrame je konvertovaný na Pandas DataFrame. Pozrime sa, či sa prevedie na CSV s názvami stĺpcov a indexmi:

Príklad 2: Pripojte údaje do CSV

Vytvorte ešte jeden PySpark DataFrame s 1 záznamom a pripojte ho k CSV, ktorý je vytvorený ako súčasť nášho prvého príkladu. Uistite sa, že musíme nastaviť hlavičku na hodnotu „False“ spolu s parametrom režimu. V opačnom prípade sú názvy stĺpcov tiež pripojené ako riadok.

importovať pyspark

z pyspark.sql importujte SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

schopnosti =[{ 'id' : 90 , 'osoba' : 'Bhargav' , 'skill' : 'čítanie' , 'cena' : 12 000 }

]

# vytvorte dátový rámec zručností z vyššie uvedených údajov

skills_df = linuxhint_spark_app.createDataFrame(zručnosti)

# Previesť skills_df na pandas DataFrame

pandas_skills_df= skills_df.toPandas()

# Pridajte tento DataFrame do súboru pandas_skills1.csv

pandas_skills_df.to_csv( 'pandas_skills1.csv' , režim= 'a' , hlavička =False)

Výstup CSV:

Vidíme, že do súboru CSV sa pridá nový riadok.

Príklad 3: S parametrom Columns

Majme rovnaký DataFrame a konvertujeme ho na CSV s dvoma stĺpcami: „osoba“ a „cena“.

importovať pyspark

z pyspark.sql importujte SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# údaje o zručnostiach s 3 riadkami a 4 stĺpcami

schopnosti =[{ 'id' : 123 , 'osoba' : 'Med' , 'skill' : 'maľovanie' , 'cena' : 25 000 },

{ 'id' : 112 , 'osoba' : 'Mouni' , 'skill' : 'tancovať' , 'cena' : 2000 },

{ 'id' : 153 , 'osoba' : 'Tulasi' , 'skill' : 'čítanie' , 'cena' : 1200 }

]

# vytvorte dátový rámec zručností z vyššie uvedených údajov

skills_df = linuxhint_spark_app.createDataFrame(zručnosti)

# Previesť skills_df na pandas DataFrame

pandas_skills_df= skills_df.toPandas()

# Preveďte tento DataFrame na csv so špecifickými stĺpcami

pandas_skills_df.to_csv( 'pandas_skills2.csv' , stĺpce=[ 'osoba' , 'cena' ])

Výstup CSV:

Vidíme, že v súbore CSV existujú iba stĺpce „osoba“ a „cena“.

PySpark Pandas DataFrame do CSV pomocou metódy To_Csv().

To_csv() je metóda, ktorá je dostupná v module Pandas a ktorá konvertuje Pandas DataFrame na CSV. Najprv musíme previesť náš PySpark DataFrame na Pandas DataFrame. Používa sa na to metóda toPandas(). Pozrime sa na syntax to_csv() spolu s jej parametrami:

Syntax:

pyspark_pandas_dataframe_obj.to_csv(cesta/ 'názov_súboru.csv' , hlavička ,index,stĺpce,...)
  1. Musíme zadať názov súboru CSV. Ak chcete stiahnutý súbor CSV uložiť na určité miesto v počítači, môžete spolu s názvom súboru zadať aj cestu.
  2. Stĺpce sú zahrnuté, ak je hlavička nastavená na „True“. Ak stĺpce nepotrebujete, nastavte hlavičku na hodnotu „False“.
  3. Indexy sú špecifikované, ak je index nastavený na „True“. Ak indexy nepotrebujete, nastavte index na „False“.
  4. Parameter columns má zoznam názvov stĺpcov, v ktorom môžeme určiť, ktoré konkrétne stĺpce sa extrahujú do súboru CSV.

Príklad 1: S parametrom Columns

Vytvorte PySpark Pandas DataFrame s 3 stĺpcami a preveďte ho na CSV pomocou to_csv() so stĺpcami „osoba“ a „cena“.

z pyspark import pandy

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'osoba' :[ 'Med' , 'Mouni' , 'sám' , 'radha' ], 'cena' :[ 1 , 2 , 3 , 4 ]})

print(pyspark_pandas_dataframe)

# Preveďte tento DataFrame na csv so špecifickými stĺpcami

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas1' , stĺpce=[ 'osoba' , 'cena' ])

Výkon:

Vidíme, že PySpark Pandas DataFrame je konvertovaný na CSV s dvoma oddielmi. Každý oddiel obsahuje 2 záznamy. Tiež stĺpce v CSV sú len „osoba“ a „cena“.

Súbor oddielu 1:

Súbor oddielu 2:

Príklad 2: S parametrom hlavičky

Použite predchádzajúci DataFrame a špecifikujte parameter hlavičky jeho nastavením na „True“.

z pyspark import pandy

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'osoba' :[ 'Med' , 'Mouni' , 'sám' , 'radha' ], 'cena' :[ 1 , 2 , 3 , 4 ]})

# Preveďte tento DataFrame na csv s hlavičkou.

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas2' , hlavička = Pravda)

Výstup CSV:

Vidíme, že PySpark Pandas DataFrame je konvertovaný na CSV s dvoma oddielmi. Každý oddiel obsahuje 2 záznamy s názvami stĺpcov.

Súbor oddielu 1:

Súbor oddielu 2:

PySpark Pandas DataFrame na CSV konverziou na pole NumPy

Máme možnosť previesť PySpark Pandas DataFrame na CSV konverziou do poľa Numpy. To_numpy() je metóda, ktorá je dostupná v module PySpark Pandas a ktorá konvertuje PySpark Pandas DataFrame na pole NumPy.

Syntax:

pyspark_pandas_dataframe_obj.to_numpy()

Nepotrebuje žiadne parametre.

Pomocou metódy Tofile().

Po konverzii do poľa NumPy môžeme použiť metódu tofile() na konverziu NumPy na CSV. Tu uloží každý záznam do novej bunky stĺpcovo v súbore CSV.

Syntax:

array_obj.to_numpy(názov súboru/cesta,sep=’ ’)

Vyžaduje názov súboru alebo cestu súboru CSV a oddeľovač.

Príklad:

Vytvorte PySpark Pandas DataFrame s 3 stĺpcami a 4 záznamami a skonvertujte ho na CSV tak, že ho najskôr skonvertujete na pole NumPy.

z pyspark import pandy

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'osoba' :[ 'Med' , 'Mouni' , 'sám' , 'radha' ], 'cena' :[ 1 , 2 , 3 , 4 ]})

# Preveďte vyššie uvedený DataFrame na numpy pole

konvertované = pyspark_pandas_dataframe.to_numpy()

vytlačiť (konvertované)

# Použitie tofile()

convert.tofile( 'converted1.csv' , september = ',' )

Výkon:

[[ 90 'Med' 1 ]

[ 78 'Mouni' 2 ]

[ 90 'sám' 3 ]

[ 57 'radha' 4 ]]

Vidíme, že PySpark Pandas DataFrame je konvertovaný na pole NumPy (12 hodnôt). Ak vidíte údaje CSV, každá hodnota bunky sa uloží do nového stĺpca.

PySpark DataFrame do CSV pomocou metódy Write.Csv().

Metóda write.csv() berie ako parameter názov súboru/cestu, kam potrebujeme uložiť súbor CSV.

Syntax:

dataframe_object.coalesce( 1 ).write.csv( 'názov súboru' )

V skutočnosti sa CSV uloží ako oddiely (viac ako jedna). Aby sme sa toho zbavili, zlúčime všetky rozdelené súbory CSV do jedného. V tomto scenári používame funkciu coalesce(). Teraz môžeme vidieť iba jeden súbor CSV so všetkými riadkami z PySpark DataFrame.

Príklad:

Predstavte si PySpark DataFrame so 4 záznamami so 4 stĺpcami. Zapíšte tento DataFrame do CSV so súborom s názvom „market_details“.

importovať pyspark

z pyspark.sql importujte SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# trhové údaje so 4 riadkami a 4 stĺpcami

trh =[{ 'm_id' : 'mz-001' , 'm_name' : „ABC“ , 'm_city' : 'delhi' , 'm_state' : 'delhi' },

{ 'm_id' : 'mz-002' , 'm_name' : 'XYZ' , 'm_city' : 'patna' , 'm_state' : 'šťastie' },

{ 'm_id' : 'mz-003' , 'm_name' : 'PQR' , 'm_city' : 'florida' , 'm_state' : 'jeden' },

{ 'm_id' : 'mz-004' , 'm_name' : „ABC“ , 'm_city' : 'delhi' , 'm_state' : 'šťastie' }

]



# vytvorte trhový dátový rámec z vyššie uvedených údajov

market_df = linuxhint_spark_app.createDataFrame(trh)

# Aktuálne trhové údaje

market_df.show()

# write.csv()

market_df.coalesce( 1 ).write.csv( 'market_details' )

Výkon:

Pozrime sa na súbor:

Ak chcete zobraziť záznamy, otvorte posledný súbor.

Záver

Naučili sme sa štyri rôzne scenáre, ktoré konvertujú PySpark DataFrame na CSV, s príkladmi zvážením rôznych parametrov. Keď pracujete s PySpark DataFrame, máte dve možnosti, ako previesť tento DataFrame na CSV: jedným spôsobom je použitie metódy write() a druhým je použitie metódy to_csv() pomocou konverzie na Pandas DataFrame. Ak pracujete s PySpark Pandas DataFrame, môžete tiež použiť to_csv() a tofile() konverziou na pole NumPy.