Consume Data with Python
    • 1 Minute to read
    • Dark
      Light

    Consume Data with Python

    • Dark
      Light

    Article summary

    This section provides an example on how to consume a data stream from Kafka using Python. Please note the following:

    1. Set up Python Virtualenv 

      # Creates the Virtual Environment
      python3 -m venv datastreams 
      
      # Change Directory to the Virtual Environment
      cd datastreams 
      
      # Activate the Virtual Environment
      source bin/activate
    2.  Because the Confluent's Kafka Python Client uses the  librdkafka C++ library under the hood you need to install the library on your system. Confluent's Kafka Python Client includes the necessary C++ bindings to make their client to communicate with librdkafka

      # MacOS (using Brew Package Manager)
      brew install librdkafka
       
      # Debian-like GNU/Linux Distributions (Debian, Ubuntu, Mint...)
      apt install librdkafka-dev
       
      # RedHat-like GNU/Linux Distributions (RedHad Linux, CentOS, Fedora...)
      yum install librdkafka-devel python-devel
    3. Now install Confluent's Kafka Python Client by upgrading PIP and install Confluent Kafka Python:

      # Upgrade Python Package Manager (Optional)
      pip install --upgrade pip 
      
      # Install Confluent's Kafka Python Client
      pip install confluent-kafka 
      
      # Install Requests Package (needed by the AVRO Client)
      pip install requests 
      
      # Install Avro Package
      pip install avro-python3

    The following example consumes from a given JSON topic using Confluent's Kafka Python Client.

    from confluent_kafka import Consumer, KafkaError
     
    def error_cb(error):
        print(error)
     
    c = Consumer({
         'bootstrap.servers': 'host:port',
         'auto.offset.reset': 'earliest',
         'group.id' : 'myGroupId',
         'security.protocol': 'SASL_SSL',
         'sasl.mechanism':   'SCRAM-SHA-256',
         'sasl.username':  'myuser',
         'sasl.password':  'mypassword',
         'error_cb': error_cb
    })
     
    c.subscribe(['mytopic'])
     
    while True:
        msg = c.poll(10.0)
     
        if msg is None:
            continue
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue
     
        print('Received message: {}'.format(msg.value().decode('utf-8')))
     
    c.close()

    At the moment consuming AVRO with Confluent's Kafka Python Client is not possible since it does not support having simple keys (instead of AVRO keys): https://github.com/confluentinc/confluent-kafka-python/issues/608


    Was this article helpful?