Hoe real-time gegevensstreaming in Python te implementeren

Hoe Real Time Gegevensstreaming In Python Te Implementeren



Het beheersen van de implementatie van real-time datastreaming in Python fungeert als een essentiële vaardigheid in de hedendaagse data-geïmpliceerde wereld. Deze gids onderzoekt de kernstappen en essentiële hulpmiddelen voor het gebruik van realtime gegevensstreaming met authenticiteit in Python. Van het selecteren van een passend raamwerk zoals Apache Kafka of Apache Pulsar tot het schrijven van een Python-code voor moeiteloos dataverbruik, verwerking en effectieve visualisatie, we zullen de benodigde vaardigheden verwerven om de flexibele en efficiënte realtime datakanalen te bouwen.

Voorbeeld 1: Implementatie van realtime gegevensstreaming in Python

Het implementeren van realtime datastreaming in Python is van cruciaal belang in het huidige datagestuurde tijdperk en de huidige wereld. In dit gedetailleerde voorbeeld doorlopen we het proces van het bouwen van een realtime datastreamingsysteem met Apache Kafka en Python in Google Colab.







Om het voorbeeld te initialiseren voordat we beginnen met coderen, is het bouwen van een specifieke omgeving in Google Colab essentieel. Het eerste dat we moeten doen, is de benodigde bibliotheken installeren. We gebruiken de “kafka-python” -bibliotheek voor Kafka-integratie.



! Pip installeren kafka-python


Met deze opdracht installeert u de bibliotheek 'kafka-python' die de Python-functies en de bindingen voor Apache Kafka levert. Vervolgens importeren we de benodigde bibliotheken voor ons project. Het importeren van de vereiste bibliotheken, waaronder 'KafkaProducer' en 'KafkaConsumer', zijn de klassen uit de 'kafka-python' -bibliotheek waarmee we kunnen communiceren met Kafka-makelaars. JSON is de Python-bibliotheek om te werken met de JSON-gegevens die we gebruiken om de berichten te serialiseren en te deserialiseren.



van kafka import KafkaProducer, KafkaConsumer
json importeren


Oprichting van een Kafkaproducent





Dit is belangrijk omdat een Kafka-producent de gegevens naar een Kafka-onderwerp stuurt. In ons voorbeeld maken we een producer om gesimuleerde realtime gegevens naar een onderwerp met de naam 'real-time-topic' te sturen.

We maken een “KafkaProducer” -instantie die het adres van de Kafka-makelaar specificeert als “localhost:9092”. Vervolgens gebruiken we de “value_serializer”, een functie die de gegevens serialiseert voordat deze naar Kafka worden verzonden. In ons geval codeert een lambda-functie de gegevens als UTF-8-gecodeerde JSON. Laten we nu wat realtime gegevens simuleren en deze naar het Kafka-onderwerp sturen.



producer = KafkaProducer ( bootstrap_servers = 'lokalehost:9092' ,
waarde_serialisatie =lambda v: json.dumps ( in ) .coderen ( 'utf-8' ) )
# Gesimuleerde realtime gegevens
gegevens = { 'sensor_id' : 1 , 'temperatuur' : 25,5 , 'vochtigheid' : 60,2 }
# Gegevens naar het onderwerp verzenden
producent.verzenden ( 'real-time-onderwerp' , gegevens )


In deze regels definiëren we een ‘datawoordenboek’ dat gesimuleerde sensorgegevens vertegenwoordigt. Vervolgens gebruiken we de “send”-methode om deze gegevens naar het “real-time-topic” te publiceren.

Vervolgens willen we een Kafka-consument maken, en een Kafka-consument leest de gegevens uit een Kafka-onderwerp. We creëren een consument die de berichten in het ‘real-time-onderwerp’ consumeert en verwerkt. We maken een “KafkaConsumer”-instantie, waarin het onderwerp wordt gespecificeerd dat we willen consumeren, bijvoorbeeld (real-time-onderwerp) en het adres van de Kafka-makelaar. Vervolgens is de “value_deserializer” een functie die de gegevens deserialiseert die van Kafka worden ontvangen. In ons geval decodeert een lambda-functie de gegevens als UTF-8-gecodeerde JSON.

consument = KafkaConsument ( 'real-time-onderwerp' ,
bootstrap_servers = 'lokalehost:9092' ,
waarde_deserializer =lambda x: json.loads ( x.decoderen ( 'utf-8' ) ) )


We gebruiken een iteratieve lus om de berichten uit het onderwerp continu te consumeren en te verwerken.

# Realtime gegevens lezen en verwerken
voor bericht in klant:
data = bericht.waarde
afdrukken ( F 'Ontvangen gegevens: {data}' )


We halen de waarde van elk bericht en onze gesimuleerde sensorgegevens binnen de lus op en printen deze naar de console. Bij het uitvoeren van de Kafka-producent en -consument wordt deze code uitgevoerd in Google Colab en worden de codecellen afzonderlijk uitgevoerd. De producent stuurt de gesimuleerde gegevens naar het Kafka-onderwerp en de consument leest en drukt de ontvangen gegevens af.


Analyse van de uitvoer terwijl de code wordt uitgevoerd

We zullen realtime gegevens observeren die worden geproduceerd en geconsumeerd. Het gegevensformaat kan variëren, afhankelijk van onze simulatie of feitelijke gegevensbron. In dit gedetailleerde voorbeeld behandelen we het hele proces van het opzetten van een realtime datastreamingsysteem met Apache Kafka en Python in Google Colab. We zullen elke coderegel en de betekenis ervan bij het bouwen van dit systeem uitleggen. Real-time datastreaming is een krachtige mogelijkheid, en dit voorbeeld dient als basis voor complexere toepassingen in de echte wereld.

Voorbeeld 2: Implementatie van realtime gegevensstreaming in Python met behulp van beursgegevens

Laten we nog een uniek voorbeeld geven van het implementeren van realtime datastreaming in Python met behulp van een ander scenario; deze keer zullen we ons concentreren op aandelenmarktgegevens. We creëren een realtime datastreamingsysteem dat de koerswijzigingen van aandelen vastlegt en deze verwerkt met Apache Kafka en Python in Google Colab. Zoals uit het vorige voorbeeld blijkt, beginnen we met het configureren van onze omgeving in Google Colab. Eerst installeren we de vereiste bibliotheken:

! Pip installeren kafka-python yfinance


Hier voegen we de ‘yfinance’-bibliotheek toe waarmee we realtime aandelenmarktgegevens kunnen verkrijgen. Vervolgens importeren we de benodigde bibliotheken. We blijven de klassen “KafkaProducer” en “KafkaConsumer” uit de bibliotheek “kafka-python” gebruiken voor Kafka-interactie. We importeren JSON om met de JSON-gegevens te werken. We gebruiken ook “yfinance” om realtime beursgegevens te verkrijgen. We importeren ook de ‘time’-bibliotheek om een ​​tijdsvertraging toe te voegen om de realtime updates te simuleren.

van kafka import KafkaProducer, KafkaConsumer
json importeren
importeer financiering als yf
importeren tijd


Nu maken we een Kafka-producent voor voorraadgegevens. Onze Kafka-producent krijgt realtime voorraadgegevens en stuurt deze naar een Kafka-onderwerp met de naam 'aandelenprijs'.

producer = KafkaProducer ( bootstrap_servers = 'lokalehost:9092' ,
waarde_serialisatie =lambda v: json.dumps ( in ) .coderen ( 'utf-8' ) )

terwijl WAAR:
voorraad = yf.Ticker ( 'AAPL' ) # Voorbeeld: aandelen Apple Inc
stock_data = voorraad.geschiedenis ( periode = '1d' )
laatste_prijs = voorraadgegevens [ 'Dichtbij' ] .iloc [ - 1 ]
gegevens = { 'symbool' : 'AAPL' , 'prijs' : laatste prijs }
producent.verzenden ( 'aandelenprijs' , gegevens )
tijd.slaap ( 10 ) # Simuleer elke 10 seconden realtime updates


We maken een “KafkaProducer” -instantie met het adres van de Kafka-makelaar in deze code. Binnen de lus gebruiken we “yfinance” om de laatste aandelenkoers voor Apple Inc. (“AAPL”) te krijgen. Vervolgens extraheren we de laatste slotkoers en sturen deze naar het onderwerp ‘aandelenkoers’. Uiteindelijk introduceren we een tijdsvertraging om de realtime updates elke 10 seconden te simuleren.

Laten we een Kafka-consument maken om de aandelenkoersgegevens uit het onderwerp 'aandelenkoers' te lezen en te verwerken.

consument = KafkaConsument ( 'aandelenprijs' ,
bootstrap_servers = 'lokalehost:9092' ,
waarde_deserializer =lambda x: json.loads ( x.decoderen ( 'utf-8' ) ) )

voor bericht in klant:
stock_data = bericht.waarde
afdrukken ( F 'Ontvangen voorraadgegevens: {stock_data['symbol']} - Prijs: {stock_data['price']}' )


Deze code is vergelijkbaar met de consumentenconfiguratie van het vorige voorbeeld. Het leest en verwerkt voortdurend de berichten uit het onderwerp “aandelenkoers” en drukt het aandelensymbool en de koers af naar de console. We voeren de codecellen opeenvolgend uit, bijvoorbeeld één voor één in Google Colab om de producent en de consument uit te voeren. De producent ontvangt en verzendt de realtime aandelenkoersupdates, terwijl de consument deze gegevens leest en weergeeft.

! Pip installeren kafka-python yfinance
van kafka import KafkaProducer, KafkaConsumer
json importeren
importeer financiering als yf
importeren tijd
producer = KafkaProducer ( bootstrap_servers = 'lokalehost:9092' ,
waarde_serialisatie =lambda v: json.dumps ( in ) .coderen ( 'utf-8' ) )

terwijl WAAR:
voorraad = yf.Ticker ( 'AAPL' ) # Apple Inc.-aandelen
stock_data = voorraad.geschiedenis ( periode = '1d' )
laatste_prijs = voorraadgegevens [ 'Dichtbij' ] .iloc [ - 1 ]

gegevens = { 'symbool' : 'AAPL' , 'prijs' : laatste prijs }

producent.verzenden ( 'aandelenprijs' , gegevens )

tijd.slaap ( 10 ) # Simuleer elke 10 seconden realtime updates
consument = KafkaConsument ( 'aandelenprijs' ,
bootstrap_servers = 'lokalehost:9092' ,
waarde_deserializer =lambda x: json.loads ( x.decoderen ( 'utf-8' ) ) )

voor bericht in klant:
stock_data = bericht.waarde
afdrukken ( F 'Ontvangen voorraadgegevens: {stock_data['symbol']} - Prijs: {stock_data['price']}' )


Bij de analyse van de uitvoer nadat de code is uitgevoerd, zullen we de realtime aandelenkoersupdates voor Apple Inc. observeren die worden geproduceerd en geconsumeerd.

Conclusie

In dit unieke voorbeeld demonstreerden we de implementatie van realtime datastreaming in Python met behulp van Apache Kafka en de “yfinance” -bibliotheek om de aandelenmarktgegevens vast te leggen en te verwerken. We hebben elke regel van de code grondig uitgelegd. Realtime datastreaming kan op verschillende gebieden worden toegepast om real-world toepassingen op het gebied van financiën, IoT en meer te bouwen.