Pyspark.sql.DataFrameWriter.saveAsTable()
Najprv uvidíme, ako zapísať existujúci PySpark DataFrame do tabuľky pomocou funkcie write.saveAsTable(). Na zápis DataFrame do tabuľky je potrebný názov tabuľky a ďalšie voliteľné parametre, ako sú režimy, partionBy atď. Je uložený ako parketový súbor.
Syntax:
dataframe_obj.write.saveAsTable(cesta/názov_tabuľky,režim,oddielBy,…)
- Table_name je názov tabuľky, ktorá je vytvorená z dataframe_obj.
- Údaje tabuľky môžeme pripojiť/prepísať pomocou parametra mode.
- PartitionBy používa jednotlivé/viacnásobné stĺpce na vytvorenie oddielov na základe hodnôt v týchto poskytnutých stĺpcoch.
Príklad 1:
Vytvorte PySpark DataFrame s 5 riadkami a 4 stĺpcami. Zapíšte tento dátový rámec do tabuľky s názvom „Agri_Table1“.
importovať pyspark
z pyspark.sql importujte SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# farmárske údaje s 5 riadkami a 5 stĺpcami
agri =[{ 'Soil_Type' : 'Čierna' , 'Irigation_availability' : 'nie' , 'Acres' : 2500 , 'Soil_status' : 'suchý' ,
'Krajina' : 'USA' },
{ 'Soil_Type' : 'Čierna' , 'Irigation_availability' : 'Áno' , 'Acres' : 3500 , 'Soil_status' : 'mokrý' ,
'Krajina' : 'India' },
{ 'Soil_Type' : 'červená' , 'Irigation_availability' : 'Áno' , 'Acres' : 210 , 'Soil_status' : 'suchý' ,
'Krajina' : 'UK' },
{ 'Soil_Type' : 'iné' , 'Irigation_availability' : 'nie' , 'Acres' : 1000 , 'Soil_status' : 'mokrý' ,
'Krajina' : 'USA' },
{ 'Soil_Type' : 'piesok' , 'Irigation_availability' : 'nie' , 'Acres' : 500 , 'Soil_status' : 'suchý' ,
'Krajina' : 'India' }]
# vytvorte dátový rámec z vyššie uvedených údajov
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# Napíšte vyššie uvedený DataFrame do tabuľky.
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )
Výkon:
Vidíme, že jeden parketový súbor je vytvorený s predchádzajúcimi dátami PySpark.
Príklad 2:
Zvážte predchádzajúci DataFrame a zapíšte „Agri_Table2“ do tabuľky rozdelením záznamov na základe hodnôt v stĺpci „Country“.
# Zapíšte vyššie uvedený DataFrame do tabuľky s parametrom partitionByagri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Krajina' ])
Výkon:
V stĺpci „Country“ sú tri jedinečné hodnoty – „India“, „UK“ a „USA“. Takže sa vytvoria tri oddiely. Každý oddiel obsahuje parketové súbory.
Pyspark.sql.DataFrameReader.table()
Načítajme tabuľku do PySpark DataFrame pomocou funkcie spark.read.table(). Vyžaduje len jeden parameter, ktorým je cesta/názov tabuľky. Priamo načíta tabuľku do PySpark DataFrame a všetky funkcie SQL, ktoré sú aplikované na PySpark DataFrame, možno použiť aj na tento načítaný DataFrame.
Syntax:
spark_app.read.table(cesta/'názov_tabuľky')V tomto scenári používame predchádzajúcu tabuľku, ktorá bola vytvorená z PySpark DataFrame. Uistite sa, že vo svojom prostredí potrebujete implementovať útržky kódu z predchádzajúceho scenára.
Príklad:
Načítajte tabuľku „Agri_Table1“ do DataFrame s názvom „loaded_data“.
load_data = linuxhint_spark_app.read.table( 'Agri_Table1' )načítané_údaje.show()
Výkon:
Vidíme, že tabuľka je načítaná do PySpark DataFrame.
Vykonávanie SQL dotazov
Teraz vykonáme niekoľko SQL dotazov na načítanom DataFrame pomocou funkcie spark.sql().
# Na zobrazenie všetkých stĺpcov z vyššie uvedenej tabuľky použite príkaz SELECT.linuxhint_spark_app.sql( 'SELECT * z Agri_Table1' ).šou()
# KDE Klauzula
linuxhint_spark_app.sql( 'SELECT * z Agri_Table1 WHERE Soil_status='Suchý' ' ).šou()
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000' ).šou()
Výkon:
- Prvý dotaz zobrazí všetky stĺpce a záznamy z DataFrame.
- Druhý dotaz zobrazí záznamy na základe stĺpca „Soil_status“. Existujú iba tri záznamy s prvkom „Suchý“.
- Posledný dotaz vráti dva záznamy s „akrami“, ktoré sú väčšie ako 2 000.
Pyspark.sql.DataFrameWriter.insertInto()
Pomocou funkcie insertInto() môžeme pridať DataFrame do existujúcej tabuľky. Túto funkciu môžeme použiť spolu s selectExpr() na definovanie názvov stĺpcov a potom ich vložiť do tabuľky. Táto funkcia tiež berie ako parameter tableName.
Syntax:
DataFrame_obj.write.insertInto('názov_tabuľky')V tomto scenári používame predchádzajúcu tabuľku, ktorá bola vytvorená z PySpark DataFrame. Uistite sa, že vo svojom prostredí potrebujete implementovať útržky kódu z predchádzajúceho scenára.
Príklad:
Vytvorte nový DataFrame s dvoma záznamami a vložte ich do tabuľky „Agri_Table1“.
importovať pysparkz pyspark.sql importujte SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# farmárske údaje s 2 riadkami
agri =[{ 'Soil_Type' : 'piesok' , 'Irigation_availability' : 'nie' , 'Acres' : 2500 , 'Soil_status' : 'suchý' ,
'Krajina' : 'USA' },
{ 'Soil_Type' : 'piesok' , 'Irigation_availability' : 'nie' , 'Acres' : 1200 , 'Soil_status' : 'mokrý' ,
'Krajina' : 'Japonsko' }]
# vytvorte dátový rámec z vyššie uvedených údajov
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'Akry' , 'Krajina' , 'Dostupnosť_zavlažovania' , 'Soil_Type' ,
'Stav_pôdy' ).write.insertInto( 'Agri_Table1' )
# Zobrazte konečnú tabuľku Agri_Table1
linuxhint_spark_app.sql( 'SELECT * z Agri_Table1' ).šou()
Výkon:
Teraz je celkový počet riadkov prítomných v DataFrame 7.
Záver
Teraz viete, ako zapísať PySpark DataFrame do tabuľky pomocou funkcie write.saveAsTable(). Preberá názov tabuľky a ďalšie voliteľné parametre. Potom sme túto tabuľku načítali do PySpark DataFrame pomocou funkcie spark.read.table(). Vyžaduje len jeden parameter, ktorým je cesta/názov tabuľky. Ak chcete pridať nový DataFrame do existujúcej tabuľky, použite funkciu insertInto().