- 1 Minute to read
- Print
- DarkLight
Consume Data with Python
- 1 Minute to read
- Print
- DarkLight
This section provides an example on how to consume a data stream from Kafka using Python. Please note the following:
We assume a UNIX enviroment for this guide.
We provides examples for Python 3.7 (latest stable Pyhton version).
We recommend you to upgrade if you are still using Pyhon 2.7 since it will reach EOL in 2020.
We will use Confluent's Kafka Python Client to consume from Kafka.
Set up Python Virtualenv
# Creates the Virtual Environment python3 -m venv datastreams # Change Directory to the Virtual Environment cd datastreams # Activate the Virtual Environment source bin/activate
Because the Confluent's Kafka Python Client uses the librdkafka C++ library under the hood you need to install the library on your system. Confluent's Kafka Python Client includes the necessary C++ bindings to make their client to communicate with
librdkafka
.# MacOS (using Brew Package Manager) brew install librdkafka # Debian-like GNU/Linux Distributions (Debian, Ubuntu, Mint...) apt install librdkafka-dev # RedHat-like GNU/Linux Distributions (RedHad Linux, CentOS, Fedora...) yum install librdkafka-devel python-devel
Now install Confluent's Kafka Python Client by upgrading PIP and install Confluent Kafka Python:
# Upgrade Python Package Manager (Optional) pip install --upgrade pip # Install Confluent's Kafka Python Client pip install confluent-kafka # Install Requests Package (needed by the AVRO Client) pip install requests # Install Avro Package pip install avro-python3
The following example consumes from a given JSON topic using Confluent's Kafka Python Client.
from confluent_kafka import Consumer, KafkaError
def error_cb(error):
print(error)
c = Consumer({
'bootstrap.servers': 'host:port',
'auto.offset.reset': 'earliest',
'group.id' : 'myGroupId',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'SCRAM-SHA-256',
'sasl.username': 'myuser',
'sasl.password': 'mypassword',
'error_cb': error_cb
})
c.subscribe(['mytopic'])
while True:
msg = c.poll(10.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
At the moment consuming AVRO with Confluent's Kafka Python Client is not possible since it does not support having simple keys (instead of AVRO keys): https://github.com/confluentinc/confluent-kafka-python/issues/608