Consume Data through the Kafka Stream API
    • 7 Minutes to read
    • Dark
      Light

    Consume Data through the Kafka Stream API

    • Dark
      Light

    Article summary

    The second alternative consuming data program-controlled is to use the Kafka Streams API to build a streaming processing application. Follow the steps below to set it up. Use the Kafka library in the application with the adequate configuration to consume the data from the stream as described below.

    JAVA Dependencies Information

    The Kafka Client Dependency is required to consume from Kafka. Please check your favourite Build Tool documentation on how to add Maven Dependencies to your build.

    SCALA Dependencies Information

    The Kafka Streams Dependency is required in order to consume from Kafka. Please check your favourite Build Tool documentation on how to add Maven Dependencies to your build.

    Examples

    The examples provided for reading AVRO topics produce IndexedRecord / GenericRecord as values which are a generic way of reading AVRO in a "map"-like way.

    Mapp will also provide soon a Java Library containing POJO classes representing the AVRO Schema. If you are interested on this please contact us.

    See section  Configuration Properties for the Kafka Clients for how to set up the configurations for the Kafka Client properly.

    Java Code Example using the Kafka Streams API when consuming a JSON topic:

    package mypackage;
     
    import java.util.Properties;
     
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.serialization.LongDeserializer;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.Consumed;
    import org.apache.kafka.streams.kstream.KStream;
     
    import static java.lang.Thread.*;
     
    class MyClass {
     
        public static void main(String[] args) {
            MyKafkaStreamsFactory myKafkaStreamsFactory = new MyKafkaStreamsFactory();
            KafkaStreams kafkaStreams = myKafkaStreamsFactory.getKafkaStreams();
            kafkaStreams.setUncaughtExceptionHandler(myKafkaStreamsFactory.getUncaughtExceptionHandler());
            kafkaStreams.start();
     
            // Close the Stream when necessary
            // kafkaStreams.close()
        }
     
        private static class MyKafkaStreamsFactory {
     
            private String clientId = "myApplicationName";
            private String groupId = "mygroupId";
            private String endpoints = "host:port";
            private String topic = "mytopic";
            private String autoOffsetResetPolicy = "earliest";
            private String streamsNumOfThreads = "3";
            private String securityProtocol = "SASL_SSL";
            private String securitySaslMechanism = "SCRAM-SHA-256";
            private String keySerde = "org.apache.kafka.common.serialization.Serdes$LongSerde";
            private String valueSerde = "org.apache.kafka.common.serialization.Serdes$StringSerde";
            private String deserializationExceptionHandler = LogAndContinueExceptionHandler.class.getCanonicalName();
     
            private KafkaStreams getKafkaStreams() {
     
                StreamsBuilder streamBuilder = new StreamsBuilder();
                KStream<Long, String> stream = streamBuilder.stream(topic, Consumed.with(Serdes.Long(), Serdes.String()));
                stream.foreach((key, value) -> System.out.printf("key = %d, value = %s%n", key, value));
     
                return new KafkaStreams(streamBuilder.build(), getProperties());
            }
     
            private Properties getProperties() {
                Properties props = new Properties();
                props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
                props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetPolicy);
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keySerde);
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueSerde);
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, clientId);
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, endpoints);
                props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, streamsNumOfThreads);
                props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);
                props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler);
                props.put(SaslConfigs.SASL_MECHANISM, securitySaslMechanism);
                return props;
            }
     
            private UncaughtExceptionHandler getUncaughtExceptionHandler() {
                return (thread, exception) -> System.out.println("Exception running the Stream " + exception.getMessage());
            }
     
        }
     
    }

    See section  Configuration Properties for the Kafka Clients for how to set up the configurations for the Kafka Client properly.

    Java Code Example using the Kafka Streams API when consuming an AVRO topic:

    package mypackage;
    
    import java.util.Collections;
    import java.util.Map;
    import java.util.Properties;
    
    import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
    import io.confluent.kafka.serializers.subject.*;
    import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.serialization.Serde;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.Consumed;
    import org.apache.kafka.streams.kstream.KStream;
    
    import static java.lang.Thread.*;
    
    class MyClass {
    
    public static void main(String[] args) {
    MyKafkaStreamsFactory myKafkaStreamsFactory = new MyKafkaStreamsFactory();
    KafkaStreams kafkaStreams = myKafkaStreamsFactory.getKafkaStreams();
    kafkaStreams.setUncaughtExceptionHandler(myKafkaStreamsFactory.getUncaughtExceptionHandler());
    kafkaStreams.start();
    
    // Close the Stream when necessary
    // kafkaStreams.close()
    }
    
    private static class MyKafkaStreamsFactory {
    
    private String clientId = "myApplicationName";
    private String groupId = "mygroupId";
    private String endpoints = "host:port";
    private String topic = "myAvroTopic";
    private String autoOffsetResetPolicy = "earliest";
    private String streamsNumOfThreads = "3";
    private String securityProtocol = "SASL_SSL";
    private String securitySaslMechanism = "SCRAM-SHA-256";
    private String schemaRegistryUrl = "host:port";
    private String keySerde = "org.apache.kafka.common.serialization.Serdes$LongSerde";
    private String valueSerde = GenericAvroSerde.class.getCanonicalName();
    private String deserializationExceptionHandler = LogAndContinueExceptionHandler.class.getCanonicalName();
    
    // If TOPIC is a Root Stream
    private String valueSubjectNamingStrategy = RecordNameStrategy.class.getCanonicalName();
    // If TOPIC is a Custom Stream
    //private String valueSubjectNamingStrategy = TopicNameStrategy.class.getCanonicalName();
    
    private KafkaStreams getKafkaStreams() {
    StreamsBuilder streamBuilder = new StreamsBuilder();
    
    Serde<GenericRecord> valueAvroSerde = new GenericAvroSerde();
    valueAvroSerde.configure(getSerdeProperties(), false);
    
    KStream<Long, GenericRecord> stream = streamBuilder.stream(topic, Consumed.with(Serdes.Long(), valueAvroSerde));
    stream.foreach((key, value) -> System.out.printf("key = %d, value = %s%n", key, value));
    
    return new KafkaStreams(streamBuilder.build(), getProperties());
    }
    
    private Properties getProperties() {
    Properties props = new Properties();
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetPolicy);
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, clientId);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde);
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerde);
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, endpoints);
    props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, streamsNumOfThreads);
    props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler);
    props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
    props.put(KafkaAvroDeserializerConfig.VALUE_SUBJECT_NAME_STRATEGY, valueSubjectNamingStrategy);
    props.put(SaslConfigs.SASL_MECHANISM, securitySaslMechanism);
    return props;
    }
    
    private Map<String, String> getSerdeProperties() {
    return Collections.singletonMap(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
    }
    
    private UncaughtExceptionHandler getUncaughtExceptionHandler() {
    return (thread, exception) -> exception.printStackTrace();
    }
    
    }
    
    }
    
    

    See section  Configuration Properties for the Kafka Clients for how to set up the configurations for the Kafka Client properly.

    Scala Code Example using the Kafka Streams API when consuming a JSON topic:

    package mypackage
     
    import java.lang.Thread.UncaughtExceptionHandler
    import java.util.Properties
     
    import org.apache.kafka.clients.consumer.ConsumerConfig
    import org.apache.kafka.common.config.SaslConfigs
    import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
    import org.apache.kafka.streams.scala.StreamsBuilder
    import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
     
    import scala.util.control.NonFatal
     
     
    object MyClass {
     
      private val clientId = "myApplicationName"
      private val groupId = "mygroupId"
      private val endpoints = "host:port"
      private val topic = "mytopic"
      private val autoOffsetResetPolicy = "earliest"
      private val streamsNumOfThreads = "3"
      private val securityProtocol = "SASL_SSL"
      private val securitySaslMechanism = "SCRAM-SHA-256"
      private val keySerde = "org.apache.kafka.common.serialization.Serdes$LongSerde"
      private val valueSerde = "org.apache.kafka.common.serialization.Serdes$StringSerde"
      private val deserializationExceptionHandler = classOf[LogAndContinueExceptionHandler]
       
      def main(args: Array[String]): Unit = {
        val kafkaStreams = getKafkaStreams
        kafkaStreams.setUncaughtExceptionHandler(getUncaughtExceptionHandler)
        kafkaStreams.start()
     
        // Close the Stream when necessary
        // kafkaStreams.close()
      }
     
      private def getKafkaStreams: KafkaStreams = {
        import org.apache.kafka.streams.scala.ImplicitConversions._
        import org.apache.kafka.streams.scala.Serdes._
     
        val streamBuilder = new StreamsBuilder()
     
        streamBuilder
          .stream[Long, String](topic)
          .foreach {
            case (key, value) =>
              println(s"key = $key, value = $value")
          }
     
        new KafkaStreams(streamBuilder.build(), getProperties)
      }
     
      private def getProperties: Properties = {
        val props = new Properties()
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetPolicy)
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keySerde)
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueSerde)
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, clientId)
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, endpoints)
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, streamsNumOfThreads)
        props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol)
        props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler)
        props.put(SaslConfigs.SASL_MECHANISM, securitySaslMechanism)
        props
      }
     
      private val getUncaughtExceptionHandler: UncaughtExceptionHandler = {
        case (_, NonFatal(ex)) =>
          println(s"Exception running the Stream $ex")
          ()
      }
     
    }

    See section  Configuration Properties for the Kafka Clients for how to set up the configurations for the Kafka Client properly.

    Scala Code Example using the Kafka Streams API when consuming an AVRO topic:

    package mypackage
     
    import java.lang.Thread.UncaughtExceptionHandler
    import java.util.{Collections, Properties}
     
    import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig
    import io.confluent.kafka.serializers.subject._
    import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
    import org.apache.avro.generic.GenericRecord
    import org.apache.kafka.clients.consumer.ConsumerConfig
    import org.apache.kafka.common.config.SaslConfigs
    import org.apache.kafka.common.serialization.Serde
    import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
    import org.apache.kafka.streams.scala.StreamsBuilder
    import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
     
    import scala.util.control.NonFatal
     
     
    object MyClass {
     
      private val clientId = "myApplicationName"
      private val groupId = "mygroupId"
      private val endpoints = "host:port"
      private val topic = "myAvroTopic"
      private val autoOffsetResetPolicy = "earliest"
      private val streamsNumOfThreads = "3"
      private val securityProtocol = "SASL_SSL"
      private val securitySaslMechanism = "SCRAM-SHA-256"
      private val schemaRegistryUrl = "host:port"
      private val keySerde: String = "org.apache.kafka.common.serialization.Serdes$LongSerde"
      private val valueSerde: String = classOf[GenericAvroSerde].getCanonicalName
      private val deserializationExceptionHandler = classOf[LogAndContinueExceptionHandler]
     
      // If TOPIC is a Root Stream
      private val valueSubjectNamingStrategy = classOf[RecordNameStrategy].getCanonicalName
      // If TOPIC is a Custom Stream
      //private val valueSubjectNamingStrategy = classOf[TopicNameStrategy].getCanonicalName
     
      implicit val valueAvroSerde: Serde[GenericRecord] = {
        val gas = new GenericAvroSerde
        gas.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl), false)
        gas
      }
     
      def main(args: Array[String]): Unit = {
        val kafkaStreams = getKafkaStreams
        kafkaStreams.setUncaughtExceptionHandler(getUncaughtExceptionHandler)
        kafkaStreams.start()
     
        // Close the Stream when necessary
        // kafkaStreams.close()
      }
     
      private def getKafkaStreams: KafkaStreams = {
        import org.apache.kafka.streams.scala.ImplicitConversions._
        import org.apache.kafka.streams.scala.Serdes._
     
        val streamBuilder = new StreamsBuilder()
     
        streamBuilder
          .stream[Long, GenericRecord](topic)
          .foreach {
            case (key, value) =>
              println(s"key = $key, value = $value")
          }
     
        new KafkaStreams(streamBuilder.build(), getProperties)
      }
     
      private def getProperties: Properties = {
        val props = new Properties()
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetPolicy)
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, clientId)
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde)
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerde)
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, endpoints)
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, streamsNumOfThreads)
        props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol)
        props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler)
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
        props.put(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, valueSubjectNamingStrategy)
        props.put(SaslConfigs.SASL_MECHANISM, securitySaslMechanism)
        props
      }
     
      private val getUncaughtExceptionHandler: UncaughtExceptionHandler = {
        case (_, NonFatal(ex)) =>
          println(s"Exception running the Stream $ex")
          ()
      }
     
    }

    This code prints the messages consumed from the topic for demonstration purposes. To further store or process the data, please set up your application accordingly.


    Was this article helpful?