Ako čítať a zapisovať údaje tabuľky v PySpark

Ako Citat A Zapisovat Udaje Tabulky V Pyspark



Spracovanie dát v PySpark je rýchlejšie, ak sú dáta načítané vo forme tabuľky. S týmto, pomocou SQL Expressions, bude spracovanie rýchle. Takže konvertovanie PySpark DataFrame/RDD na tabuľku pred odoslaním na spracovanie je lepší prístup. Dnes uvidíme, ako načítať dáta tabuľky do PySpark DataFrame, zapísať PySpark DataFrame do tabuľky a vložiť nový DataFrame do existujúcej tabuľky pomocou vstavaných funkcií. Poďme!

Pyspark.sql.DataFrameWriter.saveAsTable()

Najprv uvidíme, ako zapísať existujúci PySpark DataFrame do tabuľky pomocou funkcie write.saveAsTable(). Na zápis DataFrame do tabuľky je potrebný názov tabuľky a ďalšie voliteľné parametre, ako sú režimy, partionBy atď. Je uložený ako parketový súbor.

Syntax:







dataframe_obj.write.saveAsTable(cesta/názov_tabuľky,režim,oddielBy,…)
  1. Table_name je názov tabuľky, ktorá je vytvorená z dataframe_obj.
  2. Údaje tabuľky môžeme pripojiť/prepísať pomocou parametra mode.
  3. PartitionBy používa jednotlivé/viacnásobné stĺpce na vytvorenie oddielov na základe hodnôt v týchto poskytnutých stĺpcoch.

Príklad 1:

Vytvorte PySpark DataFrame s 5 riadkami a 4 stĺpcami. Zapíšte tento dátový rámec do tabuľky s názvom „Agri_Table1“.



importovať pyspark

z pyspark.sql importujte SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# farmárske údaje s 5 riadkami a 5 stĺpcami

agri =[{ 'Soil_Type' : 'Čierna' , 'Irigation_availability' : 'nie' , 'Acres' : 2500 , 'Soil_status' : 'suchý' ,
'Krajina' : 'USA' },

{ 'Soil_Type' : 'Čierna' , 'Irigation_availability' : 'Áno' , 'Acres' : 3500 , 'Soil_status' : 'mokrý' ,
'Krajina' : 'India' },

{ 'Soil_Type' : 'červená' , 'Irigation_availability' : 'Áno' , 'Acres' : 210 , 'Soil_status' : 'suchý' ,
'Krajina' : 'UK' },

{ 'Soil_Type' : 'iné' , 'Irigation_availability' : 'nie' , 'Acres' : 1000 , 'Soil_status' : 'mokrý' ,
'Krajina' : 'USA' },

{ 'Soil_Type' : 'piesok' , 'Irigation_availability' : 'nie' , 'Acres' : 500 , 'Soil_status' : 'suchý' ,
'Krajina' : 'India' }]



# vytvorte dátový rámec z vyššie uvedených údajov

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Napíšte vyššie uvedený DataFrame do tabuľky.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )

Výkon:







Vidíme, že jeden parketový súbor je vytvorený s predchádzajúcimi dátami PySpark.



Príklad 2:

Zvážte predchádzajúci DataFrame a zapíšte „Agri_Table2“ do tabuľky rozdelením záznamov na základe hodnôt v stĺpci „Country“.

# Zapíšte vyššie uvedený DataFrame do tabuľky s parametrom partitionBy

agri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Krajina' ])

Výkon:

V stĺpci „Country“ sú tri jedinečné hodnoty – „India“, „UK“ a „USA“. Takže sa vytvoria tri oddiely. Každý oddiel obsahuje parketové súbory.

Pyspark.sql.DataFrameReader.table()

Načítajme tabuľku do PySpark DataFrame pomocou funkcie spark.read.table(). Vyžaduje len jeden parameter, ktorým je cesta/názov tabuľky. Priamo načíta tabuľku do PySpark DataFrame a všetky funkcie SQL, ktoré sú aplikované na PySpark DataFrame, možno použiť aj na tento načítaný DataFrame.

Syntax:

spark_app.read.table(cesta/'názov_tabuľky')

V tomto scenári používame predchádzajúcu tabuľku, ktorá bola vytvorená z PySpark DataFrame. Uistite sa, že vo svojom prostredí potrebujete implementovať útržky kódu z predchádzajúceho scenára.

Príklad:

Načítajte tabuľku „Agri_Table1“ do DataFrame s názvom „loaded_data“.

load_data = linuxhint_spark_app.read.table( 'Agri_Table1' )

načítané_údaje.show()

Výkon:

Vidíme, že tabuľka je načítaná do PySpark DataFrame.

Vykonávanie SQL dotazov

Teraz vykonáme niekoľko SQL dotazov na načítanom DataFrame pomocou funkcie spark.sql().

# Na zobrazenie všetkých stĺpcov z vyššie uvedenej tabuľky použite príkaz SELECT.

linuxhint_spark_app.sql( 'SELECT * z Agri_Table1' ).šou()

# KDE Klauzula

linuxhint_spark_app.sql( 'SELECT * z Agri_Table1 WHERE Soil_status='Suchý' ' ).šou()

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000' ).šou()

Výkon:

  1. Prvý dotaz zobrazí všetky stĺpce a záznamy z DataFrame.
  2. Druhý dotaz zobrazí záznamy na základe stĺpca „Soil_status“. Existujú iba tri záznamy s prvkom „Suchý“.
  3. Posledný dotaz vráti dva záznamy s „akrami“, ktoré sú väčšie ako 2 000.

Pyspark.sql.DataFrameWriter.insertInto()

Pomocou funkcie insertInto() môžeme pridať DataFrame do existujúcej tabuľky. Túto funkciu môžeme použiť spolu s selectExpr() na definovanie názvov stĺpcov a potom ich vložiť do tabuľky. Táto funkcia tiež berie ako parameter tableName.

Syntax:

DataFrame_obj.write.insertInto('názov_tabuľky')

V tomto scenári používame predchádzajúcu tabuľku, ktorá bola vytvorená z PySpark DataFrame. Uistite sa, že vo svojom prostredí potrebujete implementovať útržky kódu z predchádzajúceho scenára.

Príklad:

Vytvorte nový DataFrame s dvoma záznamami a vložte ich do tabuľky „Agri_Table1“.

importovať pyspark

z pyspark.sql importujte SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# farmárske údaje s 2 riadkami

agri =[{ 'Soil_Type' : 'piesok' , 'Irigation_availability' : 'nie' , 'Acres' : 2500 , 'Soil_status' : 'suchý' ,
'Krajina' : 'USA' },

{ 'Soil_Type' : 'piesok' , 'Irigation_availability' : 'nie' , 'Acres' : 1200 , 'Soil_status' : 'mokrý' ,
'Krajina' : 'Japonsko' }]

# vytvorte dátový rámec z vyššie uvedených údajov

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Akry' , 'Krajina' , 'Dostupnosť_zavlažovania' , 'Soil_Type' ,
'Stav_pôdy' ).write.insertInto( 'Agri_Table1' )

# Zobrazte konečnú tabuľku Agri_Table1

linuxhint_spark_app.sql( 'SELECT * z Agri_Table1' ).šou()

Výkon:

Teraz je celkový počet riadkov prítomných v DataFrame 7.

Záver

Teraz viete, ako zapísať PySpark DataFrame do tabuľky pomocou funkcie write.saveAsTable(). Preberá názov tabuľky a ďalšie voliteľné parametre. Potom sme túto tabuľku načítali do PySpark DataFrame pomocou funkcie spark.read.table(). Vyžaduje len jeden parameter, ktorým je cesta/názov tabuľky. Ak chcete pridať nový DataFrame do existujúcej tabuľky, použite funkciu insertInto().