- 7 Minutes to read
- Print
- DarkLight
Consume Data through the Kafka Consumer API
- 7 Minutes to read
- Print
- DarkLight
The first alternative to program-controlled consuming data is to use the Plain Consumer API by setting up a Kafka Consumer. Use the Kafka library in the application with the adequate configuration to consume the data from the stream as described below.
Dependencies information
The Kafka Client Dependency is required to consume from Kafka. Please check your favourite Build Tool documentation on how to add Maven Dependencies to your build.
For Java and Scala, see https://search.maven.org/artifact/org.apache.kafka/kafka-clients/2.3.0/jar for examples on how to add the Kafka library to your project using your favorite build tool.
The Kafka AVRO deserializer/ serializer from Confluent is required when consuming an AVRO topic, io.confluent.kafka-avro-serializer:5.2.2 (see Confluent's Maven Repository: https://packages.confluent.io/maven.)
Mapp will also provide soon a Java Library containing POJO classes representing the AVRO Schema. If you are interested on this please contact us.
Examples
See section Configuration Properties for the Kafka Clients for how to set up the configurations for the Kafka Client properly.
Java Code Example using the Kafka Plain Consumer
package mypackage;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class MyClass {
public static void main(String[] args) {
MyKafkaConsumerFactory kafkaConsumerFactory = new MyKafkaConsumerFactory();
KafkaConsumer<byte[], String> consumer = kafkaConsumerFactory.getConsumer();
while (true) {
ConsumerRecords<byte[], String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<byte[], String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// Close the consumer when necessary
// consumer.close();
}
private static class MyKafkaConsumerFactory {
private final String clientId = "myApplicationName";
private final String groupId = "mygroupId";
private final String endpoints = "host:port";
private final String topic = "mytopic";
private final String autoOffsetResetPolicy = "earliest";
private final String securityProtocol = "SASL_SSL";
private final String securitySaslMechanism = "SCRAM-SHA-256";
private final String keyDeserializer = ByteArrayDeserializer.class.getCanonicalName();
private final String valueDeserializer = StringDeserializer.class.getCanonicalName();
public KafkaConsumer<byte[], String> getConsumer() {
KafkaConsumer<byte[], String> consumer = new KafkaConsumer<>(getProperties());
consumer.subscribe(Collections.singletonList(topic));
return consumer;
}
private Properties getProperties() {
Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoints);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetPolicy);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put(SaslConfigs.SASL_MECHANISM, securitySaslMechanism);
return props;
}
}
}
The examples provided for reading AVRO topics produce IndexedRecord / GenericRecord as values which are a generic way of reading AVRO in a "map"-like way.
See section Configuration Properties for the Kafka Clients for how to set up the configurations for the Kafka Client properly.
Java Code Example using the Kafka Plain Consumer when consuming an AVRO topic.
package mypackage;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.subject.*;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class MyClass {
public static void main(String[] args) {
MyKafkaConsumerFactory kafkaConsumerFactory = new MyKafkaConsumerFactory();
KafkaConsumer<byte[], IndexedRecord> consumer = kafkaConsumerFactory.getConsumer();
while (true) {
ConsumerRecords<byte[], IndexedRecord> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<byte[], IndexedRecord> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
// Close the consumer when necessary
// consumer.close();
}
private static class MyKafkaConsumerFactory {
private final String clientId = "myApplicationName";
private final String groupId = "mygroupId";
private final String endpoints = "host:port";
private final String topic = "myAvroTopic";
private final String autoOffsetResetPolicy = "latest";
private final String securityProtocol = "SASL_SSL";
private final String securitySaslMechanism = "SCRAM-SHA-256";
private final String schemaRegistryUrl = "host:port";
private final String keyDeserializer = ByteArrayDeserializer.class.getCanonicalName();
private final String valueDeserializer = KafkaAvroDeserializer.class.getCanonicalName();
// If TOPIC is a Root Stream
private final String valueSubjectNameStrategy = RecordNameStrategy.class.getCanonicalName();
// If TOPIC is a Custom Stream
//private String valueSubjectNameStrategy = TopicNameStrategy.class.getCanonicalName();
KafkaConsumer<byte[], IndexedRecord> getConsumer() {
KafkaConsumer<byte[], IndexedRecord> consumer = new KafkaConsumer<>(getProperties());
consumer.subscribe(Collections.singletonList(topic));
return consumer;
}
private Properties getProperties() {
Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoints);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetPolicy);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put(SaslConfigs.SASL_MECHANISM, securitySaslMechanism);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroDeserializerConfig.VALUE_SUBJECT_NAME_STRATEGY, valueSubjectNameStrategy);
return props;
}
}
}
See section Configuration Properties for the Kafka Clients for how to set up the configurations for the Kafka Client properly.
Scala Code Example using the Kafka Plain Consumer.
package mypackage
import java.time.Duration
import java.util.Properties
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.jdk.CollectionConverters._
object MyClass {
private val clientId = "myApplicationName"
private val groupId = "mygroupId"
private val endpoints = "host:port"
private val topic = "mytopic"
private val autoOffsetResetPolicy = "earliest"
private val securityProtocol = "SASL_SSL"
private val securitySaslMechanism = "SCRAM-SHA-256"
private val keyDeserializer: String = classOf[ByteArrayDeserializer].getCanonicalName
private val valueDeserializer: String = classOf[StringDeserializer].getCanonicalName
def main(args: Array[String]): Unit = {
val consumer = new KafkaConsumer[Array[Byte], String](getProperties)
consumer.subscribe(List(topic).asJava)
while (true) {
val records = consumer.poll(Duration.ofMillis(100))
records.iterator().forEachRemaining { record =>
println(s"offset = ${record.offset()}, key = ${record.key()}, value = ${record.value()}")
}
}
// Close the Consumer when necessary
// consumer.close()
}
private val getProperties: Properties = {
val props = new Properties()
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoints)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetPolicy)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol)
props.put(SaslConfigs.SASL_MECHANISM, securitySaslMechanism)
props
}
}
The examples provided for reading AVRO topics produce IndexedRecord / GenericRecord as values which are a generic way of reading AVRO in a "map"-like way.
See section Configuration Properties for the Kafka Clients for how to set up the configurations for the Kafka Client properly.
Scala Code Example using the Kafka Plain Consumer when consuming an AVRO topic.
package mypackage
import java.time.Duration
import java.util.Properties
import io.confluent.kafka.serializers.subject._
import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer}
import org.apache.avro.generic.IndexedRecord
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import scala.jdk.CollectionConverters._
object MyClass {
private val clientId = "myApplicationName"
private val groupId = "mygroupId"
private val endpoints = "host:port"
private val topic = "mytopic"
private val autoOffsetResetPolicy = "latest"
private val securityProtocol = "SASL_SSL"
private val securitySaslMechanism = "SCRAM-SHA-256"
private val schemaRegistryUrl = "host:port"
private val keyDeserializer: String = classOf[ByteArrayDeserializer].getCanonicalName
private val valueDeserializer: String = classOf[KafkaAvroDeserializer].getCanonicalName
// If TOPIC is a Root Stream
private val valueSubjectNameStrategy: String = classOf[RecordNameStrategy].getCanonicalName
// If TOPIC is a Custom Stream
//private val valueSubjectNameStrategy: String = classOf[TopicNameStrategy].getCanonicalName
def main(args: Array[String]): Unit = {
val consumer = new KafkaConsumer[Array[Byte], IndexedRecord](getProperties)
consumer.subscribe(List(topic).asJava)
while (true) {
val records = consumer.poll(Duration.ofMillis(100))
records.iterator().forEachRemaining { record =>
println(s"offset = ${record.offset()}, key = ${record.key()}, value = ${record.value()}")
}
}
// Close the Consumer when necessary
// consumer.close()
}
private val getProperties: Properties = {
val props = new Properties()
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoints)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetPolicy)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol)
props.put(SaslConfigs.SASL_MECHANISM, securitySaslMechanism)
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
props.put(AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, valueSubjectNameStrategy)
props
}
}
This code prints the messages consumed from the topic for demonstration purposes. To further store or process the data, please set up your application accordingly.