PySpark Pandas_Udf()

Pyspark Pandas Udf



Het transformeren van het PySpark DataFrame is mogelijk met behulp van de pandas_udf() functie. Het is een door de gebruiker gedefinieerde functie die wordt toegepast op het PySpark DataFrame met pijl. We kunnen de gevectoriseerde bewerkingen uitvoeren met pandas_udf(). Het kan worden geïmplementeerd door deze functie door te geven als decorateur. Laten we in deze handleiding duiken om de syntaxis, parameters en verschillende voorbeelden te leren kennen.

Onderwerp van de inhoud:

Als u meer wilt weten over de PySpark DataFrame en module-installatie, neemt u dit door artikel .







Pyspark.sql.functions.pandas_udf()

De pandas_udf () is beschikbaar in de module sql.functions in PySpark die kan worden geïmporteerd met het sleutelwoord 'from'. Het wordt gebruikt om de gevectoriseerde bewerkingen op ons PySpark DataFrame uit te voeren. Deze functie wordt geïmplementeerd als een binnenhuisarchitect door drie parameters door te geven. Daarna kunnen we een door de gebruiker gedefinieerde functie maken die de gegevens in vectorformaat retourneert (zoals we hiervoor series/NumPy gebruiken) met behulp van een pijl. Binnen deze functie kunnen we het resultaat retourneren.



Structuur & Syntaxis:



Laten we eerst eens kijken naar de structuur en syntaxis van deze functie:

@pandas_udf(gegevenstype)
def functie_naam(operatie) -> convert_format:
retour verklaring

Hier is de functienaam de naam van onze gedefinieerde functie. Het gegevenstype specificeert het gegevenstype dat door deze functie wordt geretourneerd. We kunnen het resultaat retourneren met behulp van het sleutelwoord 'return'. Alle bewerkingen worden uitgevoerd binnen de functie met de pijltoewijzing.





Pandas_udf (Functie en ReturnType)

  1. De eerste parameter is de door de gebruiker gedefinieerde functie die eraan wordt doorgegeven.
  2. De tweede parameter wordt gebruikt om het retourgegevenstype van de functie op te geven.

Gegevens:

In deze hele gids gebruiken we slechts één PySpark DataFrame voor demonstratie. Alle door de gebruiker gedefinieerde functies die we definiëren, worden toegepast op dit PySpark DataFrame. Zorg ervoor dat u dit DataFrame eerst in uw omgeving aanmaakt na de installatie van PySpark.



pyspark importeren

importeer vanuit pyspark.sql SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux-tip' ).getOrCreate()

van pyspark.sql.functions importeer pandas_udf

van pyspark.sql.types importeren *

importeer panda's als panda

# plantaardige details

groente =[{ 'type' : 'groente' , 'naam' : 'tomaat' , 'locate_country' : 'VERENIGDE STATEN VAN AMERIKA' , 'hoeveelheid' : 800 },

{ 'type' : 'fruit' , 'naam' : 'banaan' , 'locate_country' : 'CHINA' , 'hoeveelheid' : twintig },

{ 'type' : 'groente' , 'naam' : 'tomaat' , 'locate_country' : 'VERENIGDE STATEN VAN AMERIKA' , 'hoeveelheid' : 800 },

{ 'type' : 'groente' , 'naam' : 'Mango' , 'locate_country' : 'JAPAN' , 'hoeveelheid' : 0 },

{ 'type' : 'fruit' , 'naam' : 'citroen' , 'locate_country' : 'INDIA' , 'hoeveelheid' : 1700 },

{ 'type' : 'groente' , 'naam' : 'tomaat' , 'locate_country' : 'VERENIGDE STATEN VAN AMERIKA' , 'hoeveelheid' : 1200 },

{ 'type' : 'groente' , 'naam' : 'Mango' , 'locate_country' : 'JAPAN' , 'hoeveelheid' : 0 },

{ 'type' : 'fruit' , 'naam' : 'citroen' , 'locate_country' : 'INDIA' , 'hoeveelheid' : 0 }

]

# maak het marktdataframe van de bovenstaande gegevens

market_df = linuxhint_spark_app.createDataFrame (groente)

markt_df.show()

Uitgang:

Hier maken we dit DataFrame met 4 kolommen en 8 rijen. Nu gebruiken we de pandas_udf() om de door de gebruiker gedefinieerde functies te maken en deze op deze kolommen toe te passen.

Pandas_udf() met verschillende gegevenstypen

In dit scenario maken we enkele door de gebruiker gedefinieerde functies met pandas_udf() en passen deze toe op kolommen en geven de resultaten weer met behulp van de select() methode. In elk geval gebruiken we de pandas.Series terwijl we de gevectoriseerde bewerkingen uitvoeren. Dit beschouwt de kolomwaarden als een eendimensionale matrix en de bewerking wordt toegepast op de kolom. In de decorateur zelf specificeren we het retourtype van de functie.

Voorbeeld 1: Pandas_udf() met String Type

Hier maken we twee door de gebruiker gedefinieerde functies met het tekenreeksretourtype om de kolomwaarden van het tekenreekstype om te zetten in hoofdletters en kleine letters. Ten slotte passen we deze functies toe op de kolommen 'type' en 'locate_country'.

# Converteer typekolom naar hoofdletters met pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

return i.str.upper()

# Converteer de kolom location_country naar kleine letters met pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

retourneer i.str.lower()

# Geef de kolommen weer met select()

market_df.select( 'type' ,type_hoofdletter( 'type' ), 'locate_land' ,
country_lower_case( 'locate_country' )).show()

Uitgang:

Uitleg:

De functie StringType() is beschikbaar in de module pyspark.sql.types. We hebben deze module al geïmporteerd tijdens het maken van het PySpark DataFrame.

  1. Eerst retourneert UDF (door de gebruiker gedefinieerde functie) de tekenreeksen in hoofdletters met behulp van de functie str.upper(). De str.upper() is beschikbaar in de Series Data Structure (aangezien we converteren naar series met een pijl in de functie) die de gegeven string omzet in hoofdletters. Ten slotte wordt deze functie toegepast op de kolom 'type' die is opgegeven in de select()-methode. Voorheen waren alle strings in de kolom type in kleine letters. Nu zijn ze veranderd in hoofdletters.
  2. Ten tweede retourneert UDF de tekenreeksen in hoofdletters met behulp van de str.lower()functie. De str.lower() is beschikbaar in de Series Data Structure die de gegeven string converteert naar kleine letters. Ten slotte wordt deze functie toegepast op de kolom 'type' die is opgegeven in de select()-methode. Voorheen waren alle strings in de kolom type in hoofdletters. Nu zijn ze veranderd in kleine letters.

Voorbeeld 2: Pandas_udf() met Integer Type

Laten we een UDF maken die de PySpark DataFrame integer-kolom converteert naar de Pandas-reeks en 100 toevoegen aan elke waarde. Geef de kolom 'hoeveelheid' door aan deze functie binnen de select() methode.

# Voeg 100 toe

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

retourneer ik+ 100

# Geef de hoeveelheidskolom door aan de bovenstaande functie en geef deze weer.

market_df.select( 'hoeveelheid' ,voeg_100 toe( 'hoeveelheid' )).show()

Uitgang:

Uitleg:

Binnen de UDF herhalen we alle waarden en converteren we ze naar Series. Daarna tellen we 100 op bij elke waarde in de reeks. Ten slotte geven we de kolom 'hoeveelheid' door aan deze functie en kunnen we zien dat 100 wordt toegevoegd aan alle waarden.

Pandas_udf() met verschillende gegevenstypen met behulp van Groupby() & Agg()

Laten we eens kijken naar de voorbeelden om de UDF door te geven aan de geaggregeerde kolommen. Hier worden de kolomwaarden eerst gegroepeerd met behulp van de functie groupby() en wordt de aggregatie uitgevoerd met behulp van de functie agg(). We geven onze UDF door binnen deze geaggregeerde functie.

Syntaxis:

pyspark_dataframe_object.groupby( 'groeperingskolom' ).agg(UDF
(pyspark_dataframe_object[ 'kolom' ]))

Hier worden eerst de waarden in de groeperingskolom gegroepeerd. Vervolgens wordt de aggregatie uitgevoerd op alle gegroepeerde gegevens met betrekking tot onze UDF.

Voorbeeld 1: Pandas_udf() met Aggregate Mean()

Hier maken we een door de gebruiker gedefinieerde functie met een float van het retourtype. Binnen de functie berekenen we het gemiddelde met behulp van de functie mean(). Deze UDF wordt doorgegeven aan de kolom 'hoeveelheid' om de gemiddelde hoeveelheid voor elk type te krijgen.

# retourneer het gemiddelde/gemiddelde

@panda's_udf( 'vlot' )

def gemiddelde_functie(i: panda.Series) -> float:

terug i.mean()

# Geef de hoeveelheidskolom door aan de functie door de typekolom te groeperen.

market_df.groupby( 'type' ).agg(gemiddelde_functie(markt_df[ 'hoeveelheid' ])).show()

Uitgang:

We groeperen op basis van elementen in de kolom 'type'. Er worden twee groepen gevormd: 'fruit' en 'groente'. Voor elke groep wordt het gemiddelde berekend en geretourneerd.

Voorbeeld 2: Pandas_udf() met Aggregate Max() en Min()

Hier maken we twee door de gebruiker gedefinieerde functies met het integer (int) retourtype. De eerste UDF retourneert de minimumwaarde en de tweede UDF retourneert de maximumwaarde.

# pandas_udf die de minimumwaarde retourneren

@panda's_udf( 'int' )

def min_(i: panda.Serie) -> int:

geef i.min() terug

# pandas_udf die de maximale waarde retourneren

@panda's_udf( 'int' )

def max_(i: panda.Serie) -> int:

geef i.max() terug

# Geef de hoeveelheidskolom door aan de min_pandas_udf door Locate_Country te groeperen.

market_df.groupby( 'locate_land' ).agg(min_(markt_df[ 'hoeveelheid' ])).show()

# Geef de hoeveelheidskolom door aan de max_ pandas_udf door Locate_Country te groeperen.

market_df.groupby( 'locate_land' .agg(max_(markt_df[ 'hoeveelheid' ])).show()

Uitgang:

Om minimale en maximale waarden te retourneren, gebruiken we de functies min() en max() in het retourtype van UDF's. Nu groeperen we de gegevens in de kolom 'locate_country'. Er worden vier groepen gevormd (“CHINA”, “INDIA”, “JAPAN”, “USA”). Voor elke groep geven we de maximale hoeveelheid terug. Op dezelfde manier retourneren we de minimale hoeveelheid.

Conclusie

Kortom, de pandas_udf () wordt gebruikt om de gevectoriseerde bewerkingen op ons PySpark DataFrame uit te voeren. We hebben gezien hoe je de pandas_udf() maakt en toepast op het PySpark DataFrame. Voor een beter begrip hebben we de verschillende voorbeelden besproken door rekening te houden met alle datatypes (string, float en integer). Het kan mogelijk zijn om de pandas_udf() te gebruiken met groupby() via de agg() functie.