Ako implementovať streamovanie údajov v reálnom čase v Pythone

Ako Implementovat Streamovanie Udajov V Realnom Case V Pythone



Zvládnutie implementácie streamovania údajov v reálnom čase v Pythone pôsobí ako základná zručnosť v dnešnom svete s údajmi. Táto príručka skúma základné kroky a základné nástroje na využitie streamovania údajov v reálnom čase s autentickosťou v Pythone. Od výberu vhodného rámca, ako je Apache Kafka alebo Apache Pulsar, až po písanie kódu Python pre nenáročnú spotrebu údajov, ich spracovanie a efektívnu vizualizáciu, získame potrebné zručnosti na vytvorenie agilných a efektívnych dátových kanálov v reálnom čase.

Príklad 1: Implementácia streamovania údajov v reálnom čase v Pythone

Implementácia streamovania údajov v reálnom čase v Pythone je kľúčová v dnešnom veku a svete založenom na údajoch. V tomto podrobnom príklade prejdeme procesom vytvárania systému streamovania údajov v reálnom čase pomocou Apache Kafka a Pythonu v službe Google Colab.







Na inicializáciu príkladu predtým, ako začneme kódovať, je nevyhnutné vytvoriť konkrétne prostredie v službe Google Colab. Prvá vec, ktorú musíme urobiť, je nainštalovať potrebné knižnice. Na integráciu Kafka používame knižnicu „kafka-python“.



! pip Inštalácia kafka-pytón


Tento príkaz nainštaluje knižnicu „kafka-python“, ktorá poskytuje funkcie Pythonu a väzby pre Apache Kafka. Ďalej importujeme potrebné knižnice pre náš projekt. Import požadovaných knižníc vrátane „KafkaProducer“ a „KafkaConsumer“ sú triedy z knižnice „kafka-python“, ktoré nám umožňujú komunikovať s maklérmi Kafka. JSON je knižnica Pythonu na prácu s údajmi JSON, ktoré používame na serializáciu a deserializáciu správ.



od kafky import KafkaProducer, KafkaConsumer
importovať súbor json


Tvorba Kafkovho producenta





Je to dôležité, pretože producent Kafka posiela údaje ku Kafkovej téme. V našom príklade vytvoríme producenta, ktorý odošle simulované údaje v reálnom čase na tému s názvom „téma v reálnom čase“.

Vytvoríme inštanciu „KafkaProducer“, ktorá špecifikuje adresu makléra Kafka ako „localhost:9092“. Potom použijeme „value_serializer“, funkciu, ktorá serializuje dáta pred ich odoslaním do Kafky. V našom prípade funkcia lambda kóduje údaje ako JSON s kódovaním UTF-8. Teraz simulujme niektoré údaje v reálnom čase a pošlime ich na tému Kafka.



producent = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( v ) .kódovať ( 'utf-8' ) )
# Simulované údaje v reálnom čase
údaje = { 'sensor_id' : 1 , 'teplota' : 25.5 , 'vlhkosť' : 60,2 }
# Odosielanie údajov k téme
výrobca.odoslať ( „téma v reálnom čase“ , údaje )


V týchto riadkoch definujeme „dátový“ slovník, ktorý predstavuje simulované dáta senzora. Potom použijeme metódu „odoslať“ na zverejnenie týchto údajov do „témy v reálnom čase“.

Potom chceme vytvoriť spotrebiteľa Kafka a spotrebiteľ Kafka prečíta údaje z témy Kafka. Vytvárame spotrebiteľa, ktorý konzumuje a spracováva správy v „téme v reálnom čase“. Vytvoríme inštanciu „KafkaConsumer“, ktorá špecifikuje tému, ktorú chceme konzumovať, napr. (téma v reálnom čase) a adresu makléra Kafka. Potom „value_deserializer“ je funkcia, ktorá deserializuje údaje prijaté od Kafku. V našom prípade funkcia lambda dekóduje údaje ako JSON s kódovaním UTF-8.

spotrebiteľ = KafkaConsumer ( „téma v reálnom čase“ ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )


Používame iteračnú slučku na nepretržité prijímanie a spracovanie správ z témy.

# Čítanie a spracovanie údajov v reálnom čase
pre správu v spotrebiteľ:
údaje = správa.hodnota
vytlačiť ( f 'Prijaté údaje: {data}' )


Načítame hodnotu každej správy a naše simulované údaje zo senzorov v slučke a vytlačíme ich do konzoly. Spustenie výrobcu a spotrebiteľa Kafka zahŕňa spustenie tohto kódu v službe Google Colab a samostatné spustenie buniek kódu. Výrobca odošle simulované dáta do Kafkovej témy a spotrebiteľ si prijaté dáta prečíta a vytlačí.


Analýza výstupu počas behu kódu

Budeme sledovať údaje v reálnom čase, ktoré sa vyrábajú a spotrebúvajú. Formát údajov sa môže líšiť v závislosti od našej simulácie alebo skutočného zdroja údajov. V tomto podrobnom príklade pokrývame celý proces nastavenia systému streamovania údajov v reálnom čase pomocou Apache Kafka a Pythonu v službe Google Colab. Vysvetlíme si každý riadok kódu a jeho význam pri budovaní tohto systému. Streamovanie údajov v reálnom čase je výkonná funkcia a tento príklad slúži ako základ pre komplexnejšie aplikácie v reálnom svete.

Príklad 2: Implementácia streamovania údajov v reálnom čase v Pythone pomocou údajov o akciovom trhu

Urobme ďalší jedinečný príklad implementácie streamovania údajov v reálnom čase v Pythone pomocou iného scenára; tentoraz sa zameriame na údaje o akciovom trhu. Vytvárame systém streamovania údajov v reálnom čase, ktorý zachytáva zmeny cien akcií a spracováva ich pomocou Apache Kafka a Python v Google Colab. Ako je znázornené v predchádzajúcom príklade, začneme konfiguráciou nášho prostredia v službe Google Colab. Najprv nainštalujeme potrebné knižnice:

! pip Inštalácia kafka-python yfinance


Tu pridávame knižnicu „yfinance“, ktorá nám umožňuje získať údaje o akciovom trhu v reálnom čase. Ďalej importujeme potrebné knižnice. Na interakciu s Kafkou naďalej používame triedy „KafkaProducer“ a „KafkaConsumer“ z knižnice „kafka-python“. Importujeme JSON, aby sme mohli pracovať s údajmi JSON. Na získanie údajov o akciovom trhu v reálnom čase používame aj „yfinance“. Importujeme aj knižnicu „času“, aby sme pridali časové oneskorenie na simuláciu aktualizácií v reálnom čase.

od kafky import KafkaProducer, KafkaConsumer
importovať súbor json
import yfinance ako yf
importovať čas


Teraz vytvárame výrobcu Kafka pre údaje o akciách. Náš výrobca Kafka získa údaje o akciách v reálnom čase a pošle ich do témy Kafka s názvom „akciová cena“.

producent = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( v ) .kódovať ( 'utf-8' ) )

zatiaľ čo pravda:
akcie = yf.Ticker ( 'AAPL' ) # Príklad: akcie spoločnosti Apple Inc
stock_data = stock.história ( obdobie = '1 d' )
last_price = stock_data [ 'Zavrieť' ] .iloc [ - 1 ]
údaje = { 'symbol' : 'AAPL' , 'cena' : posledná cena }
výrobca.odoslať ( 'cena akcií' , údaje )
čas.spánok ( 10 ) # Simulujte aktualizácie v reálnom čase každých 10 sekúnd


Vytvoríme inštanciu „KafkaProducer“ s adresou makléra Kafka v tomto kóde. V rámci cyklu používame „yfinance“ na získanie najnovšej ceny akcií spoločnosti Apple Inc. („AAPL“). Potom extrahujeme poslednú záverečnú cenu a pošleme ju do témy „akciová cena“. Nakoniec zavedieme časové oneskorenie na simuláciu aktualizácií v reálnom čase každých 10 sekúnd.

Vytvorme spotrebiteľa Kafka, ktorý bude čítať a spracovávať údaje o cene akcií z témy „cena akcií“.

spotrebiteľ = KafkaConsumer ( 'cena akcií' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

pre správu v spotrebiteľ:
stock_data = message.value
vytlačiť ( f 'Prijaté údaje o akciách: {stock_data['symbol']} - Cena: {stock_data['price']}' )


Tento kód je podobný spotrebiteľskému nastaveniu v predchádzajúcom príklade. Priebežne číta a spracováva správy z témy „akciová cena“ a tlačí symbol akcie a cenu do konzoly. Bunky kódu spúšťame postupne, napr. jednu po druhej v službe Google Colab, aby sme spustili producenta a spotrebiteľa. Výrobca získava a odosiela aktualizácie cien akcií v reálnom čase, zatiaľ čo spotrebiteľ tieto údaje číta a zobrazuje.

! pip Inštalácia kafka-python yfinance
od kafky import KafkaProducer, KafkaConsumer
importovať súbor json
import yfinance ako yf
importovať čas
producent = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( v ) .kódovať ( 'utf-8' ) )

zatiaľ čo pravda:
akcie = yf.Ticker ( 'AAPL' ) # Akcie spoločnosti Apple Inc
stock_data = stock.história ( obdobie = '1 d' )
last_price = stock_data [ 'Zavrieť' ] .iloc [ - 1 ]

údaje = { 'symbol' : 'AAPL' , 'cena' : posledná cena }

výrobca.odoslať ( 'cena akcií' , údaje )

čas.spánok ( 10 ) # Simulujte aktualizácie v reálnom čase každých 10 sekúnd
spotrebiteľ = KafkaConsumer ( 'cena akcií' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

pre správu v spotrebiteľ:
stock_data = message.value
vytlačiť ( f 'Prijaté údaje o akciách: {stock_data['symbol']} - Cena: {stock_data['price']}' )


Pri analýze výstupu po spustení kódu budeme sledovať, ako sa v reálnom čase vyrábajú a spotrebúvajú aktualizácie cien akcií spoločnosti Apple Inc.

Záver

V tomto jedinečnom príklade sme demonštrovali implementáciu streamovania údajov v reálnom čase v Pythone pomocou Apache Kafka a knižnice „yfinance“ na zachytávanie a spracovanie údajov o akciovom trhu. Dôkladne sme vysvetlili každý riadok kódu. Streamovanie údajov v reálnom čase je možné použiť v rôznych oblastiach na vytváranie reálnych aplikácií v oblasti financií, internetu vecí a ďalších.