Distributed programming

Using Zookeeper together with a distributed system

Zookeeper is a distributed service that helps coordinating distributed applications. It can be used to manage configuration, synchronization or naming. Let’s see how this Zookeeper is architectured as well as an example in Java.

Architecture

Zookeeper core concept is a hierarchical structure where each node contains data, as well as other nodes. Each node is called a znode. The structure looks like a file system where file and directory are the same. Each time a znode is updated, its version number increases. The data stored in a node is in the order of KB, and should not exceed 1MB.

zk_namespace

The entire tree is duplicated in several servers, the zookeeper servers: together they form an ensemble. Each server holds the tree locally in memory and on disk. They all know about each other. One zookeeper server acts as the leader: all the writes will be forwarded to him. The other servers called followers are for read-only purpose.

zk_overview

A client connect to a particular server and receives data from it. The server will service read requests with its local data, but all write requests will be forwarded to the leader. If the connection breaks, the client connect to an other server.

Zookeeper maintains a set a guarantees important to note:

  • writes sent by a client are chronological
  • writes are atomic
  • data is up-to-date everywhere within a certain time bound limit (order of 10sec)

Example in java

Let’s see how zookeeper could be used to managed to configuration of a distributed application. For our example, a client will call a single zookeeper server. Both will be located on the local machine. The terminal will be used to create the configuration. The java application will read the configuration.

We start our zookeeper server in the terminal.

bin/zkServer.sh start

We connect a client to the server.

bin/zkCli.sh -server 127.0.0.1:2181

We create several znodes for our configuration system.

create /dev null

create /dev/appsettings null

create /dev/appsettings/apiurl myurl

create /dev/appsettings/dbconnection myconnection

We create a java application that connects to the server and retrieve data stored in the znodes. A config file for the java application is still needed to locate the zookeeper server.

The pom.xml to retreive zookeeper via Maven.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>zkexample</groupId>
    <artifactId>zkexample</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.10</version>
        </dependency>
    </dependencies>
</project>

The config.properties file:

zkhostport=localhost:2181
env=dev

The java program:

import org.apache.zookeeper.ZooKeeper;

import java.io.FileInputStream;
import java.util.Properties;

public class Program {
    private static ZooKeeper zk=null;

    public static void main(String[] args) throws Exception {

        try {
            Properties prop = new Properties();
            prop.load(new FileInputStream("config.properties"));

            String hostPort = prop.getProperty("zkhostport");

            zk = new ZooKeeper(hostPort, 3000, null);

            String env = prop.getProperty("env");
            String url = new String(GetData("/"+env+"/appsettings/apiurl"));
            String dbconnection = new String(GetData("/"+env+"/appsettings/dbconnection"));

            // do something with the data from zookeeper
            System.out.println(url);
            System.out.println(dbconnection);
        }
        catch (Exception ex) {
            System.out.print(ex.getMessage());
        }
        finally {
            if(zk != null) zk.close();
        }
    }

    private static byte[] GetData(String path) throws Exception {
        return zk.getData(path, false, null);
    }
}

Zookeeper can be the bread and butter of an other distributed application.  It can  be used in more advanced scenarios like synchronization, leader election, and notification system.

Advertisements
Distributed programming

Kafka core concepts

Kafka is a messaging framework for building real time streaming applications. It allows to build distributed publish-subscribe systems. In this article we will present the core concepts of the framework.

APIs

One way to start with Kafka is to understand its APIs. There are four of them :

  • The Producer API allows to publish records to a particular category of message called topic.
  • The Streams API allows to transform messages from a topic into messages of a output topic
  • The Consumer API allows to subscribe to messages published.
  • The Connect API allows to send messages to other applications such as a Database

Clients that expose those APIs are available in several languages such as Java, C#, C++ and Python.

Architecture

Kafka is based on the concept of topic. A topic is a category of messages. It is divided into several parts called partitions. The whole topic represents a log. A partition is an append only, ordered sequence of records.

kafka_partition

Each record in a partition is uniquely identified by an id called the offset. The record consists in a key, a time stamp, and a value. Records are saved on disk and have an automatic retention policy: they are erased after the configured period. Partitions can also be replicated.

The producer chooses to which partition he will write into. For example it could be in a round robin fashion or based on a hash function.

kafka_architecture

 

A consumer group represents a set of consumer instances. The consumption of the partitions is fairly divided among the consumer instances.  Each consumer instance is the exclusive consumer of “his” partitions.

Key elements to understand

Kafka can deal with large amount of data with little overhead thanks to key features:

  • Consumption managed by consumers means less overhead on the server side
  • Partitions having different consumers provide parallelism and a way to scale
  • Automatic deletion gives the fastest way to delete old record

The framework also give some guarantees:

  • Partitions replication provides fault tolerance
  • Persistent messages allows to replay computation for a consumer
  • A consumer has an exclusivity (within its group) over the partitions he consumes
  • Writes in a partition are append only and ordered

Kafka is made with simple choices yet rather innovative. Those choices makes it robust, fast and scalable.