Distributed Systems — Messaging

Quite recently I got into the vast topic of messaging within distributed systems. In fact, I actually got a project based on this thus I had to learn it — which in turn turned out to be a quite exciting one. The most interesting phenomena about studying distributed systems is that there is always this one factor to ponder on — given that the scope of this subject is so broad with the technology stack that is present today. The most interesting and reputed (and committed) technology stack for Distributed Systems is without a doubt — Apache technology stacks. And what I mean by this is the technologies which are having Apache Licences.

One major circumstance I face about studying distributed systems is it is not as easy as studying something like web development or native software development. You have to dig in deep into the root concepts and there are not of easy-resources to work with. What I meant by that was if you are studying something like web development you can find answers to almost any question in popular threads like Stackoverflow — but the situation is quite different when it comes to distributed aspects. The main sources of help are books (specially books), documentations and conference videos. I found some documentations to be a little intimidating while some are perfect (An example for a perfect documentation is Apache Kafka documentation). And another fact I have seen is that books are the greatest way to learn this. Books are more often than not written by guys who invent these technologies so they tend to explain the concepts really well.

Why messaging?

Take a look at this image.

Image for post
Image for post
Image source — https://www.comsol.com/blogs/intro-distributed-memory-computing/

When aggregate systems are getting horizontally scaled (Adding more systems), the number of maximum connections required to messaging in between systems are getting large. The number of maximum connections can be calculated by

Image for post
Image for post
n = number of nodes in the system

Thus, an efficient solution is needed for this problem — enter middleware.

In this case, we are talking about introducing a message oriented middleware in between the senders of messages and receivers of messages.

Image for post
Image for post
Image source — https://www.sciencedirect.com/science/article/pii/S0926580517304867

Middleware is a middle layer of logic (and persistence) where producers of messages produce and consumers of messages receive their messages. There are no communication medium in between systems (nodes) and the communication medium is only between the middleware and the system (node) itself. Data is transferred to the middleware and the consumers consume the data afterwards.

Advantages of using middleware is fairly clear. Middleware reduces the complexity of horizontal scalability. Think of the weight you have to lift in order to introduce another node in a legacy message system as we discussed before middleware. Now with the invention of Message Oriented Middleware (MOMs) you just have to plug the new node into the MOM.

The greatest advantage of MOMs is the introduction of the concept of loose coupling. Back in legacy systems where most of the communication was done using RPC (Remote Procedure Calls — Here, the sender is blocked until the receiver acknowledges the retrieval, thus giving a higher overhead) and the message passing was more often synchronous. But with the invention of Message Brokers (Implementation of MOMs),a Fire and Forget strategy is adhered to which is the introduction of asynchronous message passing. In Fire and Forget strategy, the producer of messages just fires the message to the message broker, and be done with it — it does not want any other acknowledgments or recognition. The consumer consumes the message and does not require any recognition. This concepts allows both producers and consumers to be unaware of each other — hence increasing loose coupling — allowing producers and consumers to be implemented in different technologies (programming languages) and use different protocols to send messages. The legacy concept of RPC is long forgotten because of the overhead that is incurred with synchronized message passing and the asynchronous passing of messages (As in Fire and Forget strategy) does not block the control of producers or consumers.

Let us think of an example — say you are buying something from Amazon. Usually, you fill your billing info and after you submit the form you get a response. If the transaction is completed successfully, you get a proper invoice through email. Inside this easily-seen procedure, there is a larger process going on at the backend. With the embracing of SOA (Service Oriented Architecture), most of the service-based companies switched into the concepts of microservices where each task is allocated to a service and that service is responsible only for this particular task. In case of a failure of this task, the container which that particular service is run is to be repaired (With the inception of Kubernetes — Google’s side project of container orchestration, it is possible to mediate SOA with efficiency). So when you submit your order, your request is processed through initial validation including authorization and authentication services synchronously and you are left with a returned response by those services. If you have made a successful order, your request is further processed by the rest of the services and you are emailed of the invoice. The processing of latter services is usually done asynchronously using MOMs.

Types of MOMs

There are two types of conceptual MOMs

PTP — Point To Point MOMs.

Image for post
Image for post
Image source — http://www.cloudcasts.net/devguide/Default.aspx?id=12045

Here you have the notion of Queue where producers of messages persist messages in. Note that even though this queue is comparable to a data store (Or a warehouse) the primary concern of this queue is to temporary hold data — not to act as a data store. This functionality is given in case the consumer’s code logic gets erroneous and malfunctioned — the data may need to be persisted. PTP paradigm is a one-to-one paradigm where one producer is allocated to only one consumer. And each pair of producer and consumer is awarded with a queue to persist messages in. But the concept can be a little altered to have one queue for multiple pairs of producers and consumers where each pair is awarded the queue in a round robin style.

Pub/Sub MOMs — Publisher Subscriber MOMs

Image for post
Image for post
Image source — http://www.cloudcasts.net/devguide/Default.aspx?id=12045

Here the main notion is the Topic. Publishers publish their messages to a topic where one or many consumers can subscribe to. Thus, it is clear that this paradigm is a one-to-many procedure. Data is persisted in a topic in case there is a logical error in subscribers thus that it can read it the data back.

Another important fact about pub/sub paradigm is that messages does not necessarily have to be persisted. There are two different subscribers — namely Durable Subscribers and Non-Durable Subscribers. Durable subscriptions are persisted while the other is not.

In PTP, you can configure the persistence in the message header if you are using a specification like JMS (Java Messaging Service).

Enter Kafka

Apache Kafka itself is a message broker. It is the new alternative for Scribe, Data Highway and Flume. This was built in LinkedIn to facilitate transaction of messages resulting in a higher throughput with lower latency. Kafka uses the Pub/Sub paradigm and uses Apache Zookeeper to handle its clustering manipulations such as bootstrapping configurations, roles allocation, etc.

Image for post
Image for post
Image source — http://kafka.apache.org/

Apache Kafka includes a very useful API called the Stream API where you can take the persisted topics (Input Topics), process it and store it in Kafka topic (Output topic) itself.

The core unit of Apache Kafka is the commit log.

Image for post
Image for post
Image source — http://kafka.apache.org/intro

Topic is further divided into partitions — think of a topic as a table in a RDBMS. When a table in RDBMS is horizontally divided into multiple tables — then you make different partitions. In other words, you store same type (topic) of data in different places (partitions) inside the topic itself. This is a little identical to elasticsearch indexes further dividing themselves into shards but Elasticsearch and Kafka are two different things — the main similarity is both of them persist data in the disk itself.

Image for post
Image for post
Image source — http://kafka.apache.org/intro

Another great thing about Kafka is that you can have groups of consumers subscribing to one topic. Thus, you can have one whole topic consumed by several consumers — each consumer consuming a part (partition) of one topic itself.

To a consumer to consume a message, a tuple of topic, partition and the offset is required. Offset is the alternative of an ID for a message in Kafka. Consumer sequentially consumes messages from a partition and this implies that the previous offsets were already consumed by the consumer.

“ At the heart of the consumer API is a simple loop for polling the server for more data. Once the consumer subscribes to topics, the poll loop handles all details of coordination, partition rebalances, heartbeats, and data fetching, leaving the developer with a clean API that simply returns available data from the assigned partitions “ — Neha Narkhede

More on consuming from Kafka can be read from here.

Kafka clusters are a cluster of multiple kafka broker (Kafka servers) nodes. They are managed by spinning up a Apache Zookeeper instance. Kafka is utilized well when there are multiple nodes (brokers) in Kafka clusters.

An Application

Log Aggregation (Getting logs into one place) is one good application achieved using Apache Kafka. Logs are usually indexed in Elasticsearch — and the shipping after the parsing of logs (Using Logstash) can be done through Kafka (Redis, RabbitMQ or ActiveMQ can be used as well). In this case, producers for Kafka would be the Logstash instances and the consumers would be Elasticsearch nodes in Elasticsearch cluster. In case Elasticsearch hits a snag, Kafka brokers can persist the parsed logs in their disks.

Kafka involves an I/O overhead in persistence of data. According to the research paper of Kafka (Written by Kay Kreps, Neha Narkhede and Jun Rao), logs (or messages) are available to the consumers only after Kafka brokers flush the data to the disk itself. But given the statistics, the latency is very small.

Kafka is a great message broker which can stream data where groups of consumers can subscribe to. Most of the core hardships are hidden in the client API so that it is more developer friendly in using it. The java-based message broker is nicely explained in its documentation. It is used at Netflix to achieve great successes and so as LinkedIn and many other organizations.

“ We currently operate 36 Kafka clusters consisting of 4,000+ broker instances for both Fronting Kafka and Consumer Kafka. More than 700 billion messages are ingested on an average day. We are currently transitioning from Kafka version 0.8.2.1 to 0.9.0.1 “ — Netflix Techblog

Software Engineer @ CodeGen International, CSE Graduate @ University of Moratuwa

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store