Tabelgegevens lezen en schrijven in PySpark

Tabelgegevens Lezen En Schrijven In Pyspark



Gegevensverwerking in PySpark gaat sneller als de gegevens in de vorm van een tabel worden geladen. Hiermee, met behulp van de SQL-expressies, zal de verwerking snel zijn. Dus het converteren van het PySpark DataFrame/RDD naar een tabel voordat het voor verwerking wordt verzonden, is de betere aanpak. Vandaag zullen we zien hoe we de tabelgegevens in het PySpark DataFrame kunnen lezen, het PySpark DataFrame naar de tabel kunnen schrijven en een nieuw DataFrame in de bestaande tabel kunnen invoegen met behulp van de ingebouwde functies. Laten we gaan!

Pyspark.sql.DataFrameWriter.saveAsTable()

Eerst zullen we zien hoe we het bestaande PySpark DataFrame in de tabel kunnen schrijven met behulp van de functie write.saveAsTable(). Er zijn de tabelnaam en andere optionele parameters zoals modes, partionBy, etc. nodig om het DataFrame naar de tabel te schrijven. Het wordt opgeslagen als parketmap.

Syntaxis:







dataframe_obj.write.saveAsTable(path/Table_name,mode,partitionBy,...)
  1. De Table_name is de naam van de tabel die is gemaakt op basis van de dataframe_obj.
  2. We kunnen de gegevens van de tabel toevoegen/overschrijven met behulp van de modusparameter.
  3. De partitionBy neemt de enkele/meerdere kolommen om partities te maken op basis van waarden in deze verstrekte kolommen.

Voorbeeld 1:

Maak een PySpark DataFrame met 5 rijen en 4 kolommen. Schrijf dit dataframe naar een tabel met de naam 'Agri_Table1'.



pyspark importeren

importeer vanuit pyspark.sql SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux-tip' ).getOrCreate()

# landbouwgegevens met 5 rijen en 5 kolommen

landbouw =[{ 'Grondsoort' : 'Zwart' , 'Irrigatie_beschikbaarheid' : 'Nee' , 'Acres' : 2500 , 'Bodem_status' : 'Droog' ,
'Land' : 'VERENIGDE STATEN VAN AMERIKA' },

{ 'Grondsoort' : 'Zwart' , 'Irrigatie_beschikbaarheid' : 'Ja' , 'Acres' : 3500 , 'Bodem_status' : 'Nat' ,
'Land' : 'Indië' },

{ 'Grondsoort' : 'Rood' , 'Irrigatie_beschikbaarheid' : 'Ja' , 'Acres' : 210 , 'Bodem_status' : 'Droog' ,
'Land' : 'VK' },

{ 'Grondsoort' : 'Ander' , 'Irrigatie_beschikbaarheid' : 'Nee' , 'Acres' : 1000 , 'Bodem_status' : 'Nat' ,
'Land' : 'VERENIGDE STATEN VAN AMERIKA' },

{ 'Grondsoort' : 'Zand' , 'Irrigatie_beschikbaarheid' : 'Nee' , 'Acres' : 500 , 'Bodem_status' : 'Droog' ,
'Land' : 'Indië' }]



# maak het dataframe van de bovenstaande gegevens

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Schrijf het bovenstaande DataFrame naar de tabel.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )

Uitgang:







We kunnen zien dat er één parketbestand wordt gemaakt met de vorige PySpark-gegevens.



Voorbeeld 2:

Overweeg het vorige DataFrame en schrijf de 'Agri_Table2' naar de tabel door de records te partitioneren op basis van de waarden in de kolom 'Land'.

# Schrijf het bovenstaande DataFrame naar de tabel met de parameter partitionBy

agri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Land' ])

Uitgang:

Er zijn drie unieke waarden in de kolom 'Land' - 'India', 'VK' en 'VS'. Er worden dus drie partities gemaakt. Elke partitie bevat de parketbestanden.

Pyspark.sql.DataFrameReader.table()

Laten we de tabel in het PySpark DataFrame laden met behulp van de functie spark.read.table(). Er is slechts één parameter nodig, namelijk de pad-/tabelnaam. Het laadt de tabel rechtstreeks in het PySpark DataFrame en alle SQL-functies die op het PySpark DataFrame worden toegepast, kunnen ook op dit geladen DataFrame worden toegepast.

Syntaxis:

spark_app.read.table(path/'Table_name')

In dit scenario gebruiken we de vorige tabel die is gemaakt op basis van het PySpark DataFrame. Zorg ervoor dat u de codefragmenten van het vorige scenario in uw omgeving moet implementeren.

Voorbeeld:

Laad de tabel 'Agri_Table1' in het DataFrame met de naam 'loaded_data'.

geladen_data = linuxhint_spark_app.read.table( 'Agri_Table1' )

geladen_gegevens.show()

Uitgang:

We kunnen zien dat de tabel is geladen in het PySpark DataFrame.

Het uitvoeren van de SQL-query's

Nu voeren we enkele SQL-query's uit op het geladen DataFrame met behulp van de functie spark.sql().

# Gebruik de opdracht SELECT om alle kolommen uit de bovenstaande tabel weer te geven.

linuxhint_spark_app.sql( 'SELECTEER * uit Agri_Table1' ).show()

# WHERE-clausule

linuxhint_spark_app.sql( 'SELECTEER * uit Agri_Table1 WHERE Soil_status='Droog' ' ).show()

linuxhint_spark_app.sql( 'SELECTEER * uit Agri_Table1 WAAR Acres > 2000 ' ).show()

Uitgang:

  1. De eerste query geeft alle kolommen en records uit het DataFrame weer.
  2. De tweede query geeft de records weer op basis van de kolom 'Soil_status'. Er zijn slechts drie records met het element 'Dry'.
  3. De laatste query retourneert twee records met 'Acres' die groter zijn dan 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Met behulp van de functie insertInto() kunnen we het DataFrame aan de bestaande tabel toevoegen. We kunnen deze functie samen met selectExpr() gebruiken om de kolomnamen te definiëren en deze vervolgens in de tabel in te voegen. Deze functie neemt ook de tableName als parameter.

Syntaxis:

DataFrame_obj.write.insertInto('Table_name')

In dit scenario gebruiken we de vorige tabel die is gemaakt op basis van het PySpark DataFrame. Zorg ervoor dat u de codefragmenten van het vorige scenario in uw omgeving moet implementeren.

Voorbeeld:

Maak een nieuw DataFrame met twee records en plaats deze in de tabel 'Agri_Table1'.

pyspark importeren

importeer vanuit pyspark.sql SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux-tip' ).getOrCreate()

# landbouwgegevens met 2 rijen

landbouw =[{ 'Grondsoort' : 'Zand' , 'Irrigatie_beschikbaarheid' : 'Nee' , 'Acres' : 2500 , 'Bodem_status' : 'Droog' ,
'Land' : 'VERENIGDE STATEN VAN AMERIKA' },

{ 'Grondsoort' : 'Zand' , 'Irrigatie_beschikbaarheid' : 'Nee' , 'Acres' : 1200 , 'Bodem_status' : 'Nat' ,
'Land' : 'Japan' }]

# maak het dataframe van de bovenstaande gegevens

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# schrijf.insertInto()

agri_df2.selectUitdr( 'Acres' , 'Land' , 'irrigatie_beschikbaarheid' , 'Grondsoort' ,
'Bodem_status' ).write.insertInto( 'Agri_Table1' )

# Geef de laatste Agri_Table1 weer

linuxhint_spark_app.sql( 'SELECTEER * uit Agri_Table1' ).show()

Uitgang:

Nu is het totale aantal rijen in het DataFrame 7.

Conclusie

U begrijpt nu hoe u het PySpark DataFrame naar de tabel schrijft met behulp van de functie write.saveAsTable(). Het heeft de tabelnaam en andere optionele parameters nodig. Vervolgens hebben we deze tabel in het PySpark DataFrame geladen met behulp van de functie spark.read.table(). Er is slechts één parameter nodig, namelijk de pad-/tabelnaam. Als u het nieuwe DataFrame aan de bestaande tabel wilt toevoegen, gebruikt u de functie insertInto().