V tejto príručke sa zameriame hlavne na čítanie/načítanie súboru parket do PySpark DataFrame/SQL pomocou funkcie read.parquet(), ktorá je dostupná v triede pyspark.sql.DataFrameReader.
Téma obsahu:
Prečítajte si súbor Parquet do PySpark DataFrame
Prečítajte si súbor Parquet do PySpark SQL
Pyspark.sql.DataFrameReader.parquet()
Táto funkcia sa používa na čítanie parketového súboru a jeho načítanie do PySpark DataFrame. Preberá cestu/názov súboru parketového súboru. Môžeme jednoducho použiť funkciu read.parquet(), pretože ide o všeobecnú funkciu.
Syntax:
Pozrime sa na syntax read.parquet():
spark_app.read.parquet(názov_súboru.parketa/cesta)Najprv nainštalujte modul PySpark pomocou príkazu pip:
pip install pyspark
Získajte súbor na parkety
Na čítanie súboru parkiet potrebujete údaje, v ktorých je z týchto údajov vygenerovaný súbor parkiet. V tejto časti uvidíme, ako vygenerovať parketový súbor z PySpark DataFrame.
Vytvorme PySpark DataFrame s 5 záznamami a zapíšme to do súboru parkiet „industry_parquet“.
importovať pysparkz pyspark.sql importujte SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# vytvorte dátový rámec, ktorý ukladá podrobnosti o odvetví
industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Poľnohospodárstvo' ,Oblasť= 'USA' ,
Hodnotenie = 'horúce' ,Total_employees= 100 ),
Riadok (Typ= 'Poľnohospodárstvo' ,Oblasť= 'India' ,Hodnotenie= 'horúce' ,Total_employees= 200 ),
Riadok (Typ= 'vývoj' ,Oblasť= 'USA' ,Hodnotenie= 'teplé' ,Total_employees= 100 ),
Riadok (Typ= 'vzdelávanie' ,Oblasť= 'USA' ,Hodnotenie= 'cool' ,Total_employees= 400 ),
Riadok (Typ= 'vzdelávanie' ,Oblasť= 'USA' ,Hodnotenie= 'teplé' ,Total_employees= dvadsať )
])
# Skutočný DataFrame
industry_df.show()
# Napíšte industry_df do súboru parkiet
industry_df.coalesce( 1 ).napíšte.parkety( 'priemysel_parkety' )
Výkon:
Toto je DataFrame, ktorý obsahuje 5 záznamov.
Pre predchádzajúci DataFrame sa vytvorí parketový súbor. Tu je názov nášho súboru s príponou „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet“. Tento súbor používame v celom návode.
Prečítajte si súbor Parquet do PySpark DataFrame
Máme pilník na parkety. Prečítajme si tento súbor pomocou funkcie read.parquet() a načítajme ho do PySpark DataFrame.
importovať pysparkz pyspark.sql importujte SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# Prečítajte súbor parkiet do objektu dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parkquet' )
# Zobrazte dataframe_from_parquet-DataFrame
dataframe_from_parquet.show()
Výkon:
DataFrame zobrazujeme pomocou metódy show(), ktorá bola vytvorená zo súboru parket.
SQL dotazy so súborom Parquet
Po načítaní do DataFrame je možné vytvárať SQL tabuľky a zobrazovať dáta, ktoré sú prítomné v DataFrame. Potrebujeme vytvoriť TEMPORARY VIEW a použiť príkazy SQL na vrátenie záznamov z DataFrame, ktorý je vytvorený zo súboru parket.
Príklad 1:
Vytvorte dočasné zobrazenie s názvom „Sectors“ a pomocou príkazu SELECT zobrazte záznamy v DataFrame. Môžete sa odvolať na toto tutoriál ktorý vysvetľuje, ako vytvoriť VIEW v Spark – SQL.
importovať pysparkz pyspark.sql importujte SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# Prečítajte súbor parkiet do objektu dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parkquet' )
# Vytvorte pohľad z vyššie uvedeného parketového súboru s názvom - 'Sektory'
dataframe_from_parquet.createOrReplaceTempView( 'Sektory' )
# Dopyt na zobrazenie všetkých záznamov zo sektorov
linuxhint_spark_app.sql( 'vyberte * zo sektorov' ).šou()
Výkon:
Príklad 2:
Pomocou predchádzajúceho VIEW napíšte dotaz SQL:
- Zobrazenie všetkých záznamov zo sektorov, ktoré patria do „India“.
- Na zobrazenie všetkých záznamov zo sektorov so zamestnancom, ktorý je väčší ako 100.
linuxhint_spark_app.sql( 'vyberte * zo sektorov, kde Area='India'' ).šou()
# Dopyt na zobrazenie všetkých záznamov zo sektorov so zamestnancom väčším ako 100
linuxhint_spark_app.sql( 'vyberte * zo sektorov, kde celkový_zamestnanci>100' ).šou()
Výkon:
Existuje len jeden záznam s oblasťou „India“ a dva záznamy so zamestnancami, ktoré sú väčšie ako 100.
Prečítajte si súbor Parquet do PySpark SQL
Najprv musíme vytvoriť VIEW pomocou príkazu CREATE. Pomocou kľúčového slova „cesta“ v rámci dotazu SQL môžeme prečítať súbor parketu do Spark SQL. Po ceste musíme zadať názov súboru/umiestnenie súboru.
Syntax:
spark_app.sql( 'VYTVORIŤ DOČASNÉ ZOBRAZENIE view_name POMOCOU MOŽNOSTÍ parkiet (cesta ' názov_súboru.parkety ')' )Príklad 1:
Vytvorte dočasné zobrazenie s názvom „Sector2“ a prečítajte si doň súbor parkety. Pomocou funkcie sql() napíšte výberový dotaz na zobrazenie všetkých záznamov, ktoré sú prítomné v zobrazení.
importovať pysparkz pyspark.sql importujte SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# Prečítajte si parketový súbor do Spark-SQL
linuxhint_spark_app.sql( 'VYTVORIŤ DOČASNÝ POHĽAD Sektor 2 POMOCOU MOŽNOSTÍ parkiet (cesta ' časť-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parkquet ')' )
# Dopyt na zobrazenie všetkých záznamov zo Sektora2
linuxhint_spark_app.sql( 'vyberte * zo sektora 2' ).šou()
Výkon:
Príklad 2:
Použite predchádzajúci VIEW a napíšte dotaz na zobrazenie všetkých záznamov s hodnotením „Hot“ alebo „Cool“.
# Dopyt na zobrazenie všetkých záznamov zo Sektora2 s hodnotením - Hot alebo Cool.linuxhint_spark_app.sql( 'vyberte * zo sektora 2, kde Hodnotenie='Hot' OR Rating='Cool'' ).šou()
Výkon:
Existujú tri záznamy s hodnotením „Hot“ alebo „Cool“.
Záver
V PySpark funkcia write.parquet() zapíše DataFrame do súboru parket. Funkcia read.parquet() načíta súbor parket do PySpark DataFrame alebo akéhokoľvek iného DataSource. Naučili sme sa čítať parketový súbor do PySpark DataFrame a do tabuľky PySpark. V rámci tohto tutoriálu sme tiež diskutovali o tom, ako vytvoriť tabuľky z PySpark DataFrame a filtrovať údaje pomocou klauzuly WHERE.