8 October 2017
Apache Kafka is an open source project for a distributed publish-subscribe messaging system rethought as a distributed commit log. Kafka stores messages in topics that are partitioned and replicated across multiple brokers in a cluster. Producers send messages to topics from which consumers read.
Kafka uses Zookeeper to share and save state between brokers. Each broker maintains a set of partitions: primary and/ or secondary for each topic. A set of Kafka brokers working together will maintain a set of topics. Each topic has its partitions distributed over the participating Kafka brokers and the replication factor determines, intuitively, the number of times a partition is duplicated for fault tolerance.
While many brokered message queue systems have the broker maintain the state of its consumers, Kafka does not. This frees up resources for the broker to ingest data faster. For more information about Kafka’s performance see the Kafka official documentation
In traditional message processing, you apply simple computations on the messages – in most cases individually per message.
In stream processing, you apply complex operations on multiple input streams and multiple records (ie, messages) at the same time (like aggregations and joins).
Furthermore, traditional messaging system cannot go “back in time” – ie, they automatically delete messages after they got delivered to all subscribed consumers. In contrast, Kafka keeps the messages as it uses a pull based model (ie, consumer pull data out of Kafka) for a configurable amount of time. This allows consumers to “rewind” and consume messages multiple times – or if you add a new consumer, it can read the complete history. This makes stream processing possible, because it allows for more complex applications. Furthermore, stream processing is not necessarily about real-time processing – it’s about processing infinite input stream (in contrast to batch processing that is applied to finite inputs).
And Kafka offers Kafka Connect and Streams API – so it is a stream processing platform and not just a messaging/pub-sub system (even if it uses this in it’s core).
source: Scalability in Kafka
In Kafka the communication between the clients and the servers is done with a simple, high-performance, language agnostic TCP protocol. This protocol is versioned and maintains backwards compatibility with older version. Kafka natively supports Java although it supports clients in other languages too.
Apache Kafka provides four core API using which applications can be developed using Kafka
The Producer API : It allows an application to publish a stream of records to one or more Kafka topics.
The Consumer API : it allows an application to subscribe to one or more topics and process the stream of records produced to them.
The Streams API : It allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
The Connector API : It allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.
It is the coordination interface between the Kafka broker and consumers.Kafka uses Zookeeper to store offsets of messages consumed for a specific topic and partition by a specific Consumer Group.it is not possible to bypass Zookeeper and connect directly to the Kafka server. If, for some reason, ZooKeeper is down, you cannot service any client request.
It is the actual Kafka process. It is just like a Kafka instance. A Kafka cluster can have one or more brokers.
A topic is a category or feed name to which records are published. Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.
For each topic, the Kafka cluster maintains a partitioned log that looks like below.
Partition are the basic building block of Kafka cluster which is an ordered, immutable sequence of records that is continually appended to—a structured commit log. The partitions of the log are distributed over the servers in the Kafka cluster with each server handling data and requests for a share of the partitions. Each partition is replicated across a configurable number of servers for fault tolerance.
Kafka maintains a numerical offset for each record in a partition. This offset acts as a kind of unique identifier of a record within that partition, and also denotes the position of the consumer in the partition.
The messages in the partitions are each assigned a sequential id number called the offset that uniquely identifies each message within the partition. The Kafka cluster retains all published records—whether or not they have been consumed—using a configurable retention period. For example, if the retention policy is set to two days, then for the two days after a record is published, it is available for consumption, after which it will be discarded to free up space. Kafka’s performance is effectively constant with respect to data size so storing data for a long time is not a problem.
Each partition has one server which acts as the “leader” and zero or more servers which act as “followers”. The leader handles all read and write requests for the partition while the followers passively replicate the leader. If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so load is well-balanced within the cluster.
Producers publish data to the topics of their choice. The producer is responsible for choosing which record to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the record).
The publisher sends data directly to the broker that is the leader for the specific partition. Publisher publishes the messages in the mentioned partition. If the partition to which the data has to be published is not mentioned, then the data is published in a single partition only
Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
The processes that subscribe to topics and process the feed of published messages are consumers. Messaging traditionally has two models:
In a queue, a pool of consumers may read from a server and each message goes to one of them; in publish-subscribe the message is broadcast to all consumers. Kafka offers a single consumer abstraction that generalizes both of these—the consumer group.
If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.
If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.
Kafka MirrorMaker provides geo-replication support for your clusters. With MirrorMaker, messages are replicated across multiple datacenters or cloud regions. You can use this in active/passive scenarios for backup and recovery; or in active/active scenarios to place data closer to your users, or support data locality requirements.
Kafka messages are delivered to single consumer within same consumer group. Depending on the combination of partitions/consumers, it is possible that multiple consumers in same group will receive the data but it won’t be duplicated. Hence it is essential to form correct logical grouping of consumers.
• Consumer is started for the very first time: In this case the consumer consumes only those messages which have been produced after the consumer has requested. A new consumer group gets messages that are published AFTER its first fetch request. • Consumer is restarted: In this case the consumer will consume messages from where it had left earlier. This is based on the offset management functionality internally implemented by Kafka.
Kafka only provides a total order over records within a partition,not between different partitions in a topic. Per-partition ordering combined with the ability to partition data by key is sufficient for most applications. However, if you require a total order over records this can be achieved with a topic that has only one partition,though this will mean only one consumer process per consumer group.
Serialization is the process of converting an object into a stream of bytes and that bytes are used for transmission. Kafka stores and transmit these bytes of array in its queue.
Deserialization as the name suggest does the opposite of serialization where we convert bytes of array into the desired data type. Kafka provides serializer and deserializer for few data types String, Long, Double, Integer, Bytes etc.
To create serializer class we need to implement
org.apache.kafka.common.serialization.Serializer interface and similarly to create deserializer class we need to implement
Each Kafka topic is broken down into partitions, which are the basic data building blocks. A partition is stored on a single disk. Kafka guarantees order of events within a partition and a partition can be either online (available) or offline (unavailable).
Each partition can have multiple replicas, one of which is a designated leader. All events are produced to and consumed from the leader replica. Other replicas just need to stay in sync with the leader and replicate all the recent events on time. If the leader becomes unavailable, one of the in-sync replicas becomes the new leader. A replica is considered in-sync if it is the leader for a partition, or if it is a follower that:
Replication Factor: Change the replication factor from the default replication factor (1) to 3
Minimum In-sync Replicas:
You can set the minimum number of in-sync replicas (ISRs) that must be available for the producer to successfully send messages to a partition using the
min.insync.replicas setting. If
min.insync.replicas is set to 2 and
acks is set to all, each message must be written successfully to at least two replicas. This guarantees that the message is not lost unless both hosts crash.
Kafka mirroring enables maintaining a replica of an existing Kafka cluster. Kafka’s mirroring feature makes it possible to maintain a replica of an existing Kafka cluster. The following diagram shows how to use the MirrorMaker tool to mirror a source Kafka cluster into a target (mirror) Kafka cluster. The tool uses a Kafka consumer to consume messages from the source cluster, and re-publishes those messages to the local (target) cluster using an embedded Kafka producer.
Kafka maintains message ordering at partition level only. Ex. consider a scenario where we are sending 10 messages to each of the two partitions of a topic with different partition key. First 10 messages(msg0,msg1,msg2,msg3 etc..) are published with partition key as “key “ to partition 0 and then next 10 messages (data0,data1,data2 etc..) are published with partition key as “partition” to partition 1. If a single consumer is to consume all the messages it may be the case that “data” messages are consumed before “msg” messages. But ordering within “data/msg” (within partition) will be maintained.
Every partition in Kafka has one main server that plays the role of a leader and one or more non-connected servers that are named as the followers. Here, the leading server sets the permission and rest of the servers just follow him accordingly. In case, leading server fails then followers take the responsibility of the main server.
Below section shows some of the common commands in Kafka Cluster based on console.
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper <Ip Address>:2181 --replication-factor 1 --partitions 1 --topic topic1 --config retention.ms=7200000
$KAFKA_HOME/bin/kafka-topics.sh --delete --zookeeper <IP Address>:2181 --topic test
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list <Ip Addres>:9093 --topic topic1 --producer.config $KAFKA_HOME/config/producer.properties
sudo $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server <Ip Address>:9093 --topic topic1 –consumer.config $KAFKA_HOME/config/consumer.properties