Distributed programming

Discovering Kubernetes

Kubernetes is a distributed cluster management application that makes easy to deploy, manage and scale applications in a public or private cloud. It moves away the complexity of an infrastructure by abstracting the cluster into a single gigantic machine with plenty of ressources.

Core Concepts

Pod: unit of deployment

A pod is collection of one or more container images, like Docker images, that work together. A pod is deployed as a single unit. The container technology makes an application easy to deploy anywhere regardless of the operating system.


A pod typically runs on a worker node. Each pod of the cluster has an unique IP in order to exchange traffic. But a pod IP is only visible inside the cluster. In addition, a pod is short lived, making their IP possibly not useful to rely on. In order to access a pod consistently and externally, we need to use an object called a service. A pod can also contain other resources shared between its container application.

Service: external endpoint

A service is a way of grouping a set of nods that have common characteristics. The service has an IP that is visible from the outside. A service will automatically select its pods based on their label. The IP address is durable during the life of the Kubernetes cluster and can be used safely by the outside.


A service will typically load balance traffic among its pods.

Master / Workers architecture

Kubernetes has a master slave distributed architecture. The master node is the cockpit of the cluster and interact with a client through its API service. The worker nodes are responsible for running the pods through an agent called kubelet. The application needed to run container images is also installed in each node.


Create a scalable app with minikube

In order to understand the concepts of kubernetes, let’s use a small example. In this example, we create a python server application, and deploy it in a small kubernetes environment that runs on a single VM: minikube. Minikube is very useful because it allows to test kubernetes locally without the actual need of a real cluster. After its deployment, we will expose the app to the outside and scale it up.

Create the application simple-server.py

Our application is a simple REST server with a get method that just counts the number of requests sent to that particular server during its lifetime.

Start minikube

After installing minikube and its prerequisites (including a VM driver like virtualbox) we can start it.
minikube start

Create a docker image for the application

In order to create a container image of our app in the local repository, we create the following Dockerfile next to our .py file.

Then we need to point to minikube’s docker daemon:
eval $(minikube docker-env)

We can now build the docker image of our app (the last dot is important). We build our image on top of python:2.7.15-alpine3.7 wich is somehow lightweight.

cd /dir-with-py-file/ docker build -t simple-server:v1 .

Create a docker image for the application

We can now create a deployment for the application. This will actually deploy and run one pod containing our image.

Note: “–image-pull-policy=Never” indicates that we use the image in our local repository
kubectl run simple-server --image=simple-server:v1 --port=8080 --image-pull-policy=Never

Create a service

We create a service in order to expose our pod to the outside world. This service is of type “LoadBalancer”.

kubectl expose deployment simple-server --type=LoadBalancer

Scale up the application

We can now scale up our application by creating multiples replicas.
kubectl scale deployments/simple-server --replicas=3

And run this command to call the service.
minikube service simple-server

If we refresh the broswer multiple times, we understand that the same pod is not always called and that load balancing works.

Clean up

We can now clean up what we did.

Clean kubernetes resources:
kubectl delete service simple-server
kubectl delete deployment simple-server

Remove docker image and reset docker environment:
docker rmi simple-server:v1 -f
eval $(minikube docker-env -u)

Stop and delete minikube
minikube stop
minikube delete

Distributed programming

Understanding Spark

Spark is a cluster computing framework for building parallel and distributed processing on massive amount of data. It somehow replaces MapReduce, but yet is not as simple.

A better Hadoop

Hadoop MapReduce is effective on processing huge amount of data. It divides data into many tiny parts, processes them locally in parallel on a cluster, and produces an output. It is very straightforward yet not completely satisfying:

  • performance issue : intermediary and final outputs are saved on disk,
  • interface/api issue : developers are forced to adapt there code to fit in a map/reduce

Spark is meant to address these concerns as a improved version of Hadoop. Yet, the newer framework might not be as easy to grasp.

Spark core concepts

In order to Undertstand spark let’s make an overview on its core concepts.

Driver Program

The driver program is the cockpit of a spark application. It runs the user main function, triggers operations on the cluster, and receives the final output.

Spark Context

The Spark Context is the main entry point of a spark application. It is created once in the beginning of a the driver program.

Cluster Manager

The cluster manager provides the driver program with resources of the cluster.  An external cluster manager can be used (Mesos or Yarn), as well as the built-in one.


An executor is a process executing tasks on a worker node. It is acquired by the driver program through the cluster manager. Each spark application has its own executors.

Dataset (previously Resilient Distributed Dataset)

A dataset is a partitioned, read-only collection of items. It can be loaded from a HDFS file and cached for future computations.

Direct Acyclic Graph (DAG)

The direct acyclic graph is a graph that represents a set of operations to be performed on the data in several stages.


A transformation (like mapfilter, reduceByKey) create a Dataset from an existing one. It is only evaluated lazily when a action is called.


An action (like reduce, collect, count) return a value to the driver program. An action trigger the previous transformations.

The execution of a distributed job

A spark job is executed as follows:

  • The driver program starts and creates several stages divided into tasks
  • The driver program contacts the cluster manager to start executors on the cluster
  • Executors execute tasks assigned and return the result to the driver program
  • The driver program releases resources to the cluster manager and exits


Spark specific features


Caching is a key feature allowing to store a DataSet and reuse after in several different actions. Caching can be done in memory or/and on disk.

Normal variables

In the normal case, each parameter passed into a function (such as map or reduce) are copied on each machine as a read-only variables.

Broadcast Variables

Broadcast variables are read-only cached on every node. They are useful when multiple stages need the same data.


Accumulator are variables that can be only added. They support parallel operations because addition is commutative and associative.


Certain operations need data to be redistributed across the cluster. By example in reduceByKey, the keys are not necessarily on the same machine.

Example in Scala explained

We can use the Spark shell in order for quickly create a spark application. The shell can started by running the following command in spark directory:


The following program creates a DataSet from a text file, then applies a transformation (filter) and an action (collect) on the DatSet. Note that no operation on data is done before the action is called.

Distributed programming

Using Zookeeper together with a distributed system

Zookeeper is a distributed service that helps coordinating distributed applications. It can be used to manage configuration, synchronization or naming. Let’s see how this Zookeeper is architectured as well as an example in Java.


Zookeeper core concept is a hierarchical structure where each node contains data, as well as other nodes. Each node is called a znode. The structure looks like a file system where file and directory are the same. Each time a znode is updated, its version number increases. The data stored in a node is in the order of KB, and should not exceed 1MB.


The entire tree is duplicated in several servers, the zookeeper servers: together they form an ensemble. Each server holds the tree locally in memory and on disk. They all know about each other. One zookeeper server acts as the leader: all the writes will be forwarded to him. The other servers called followers are for read-only purpose.


A client connect to a particular server and receives data from it. The server will service read requests with its local data, but all write requests will be forwarded to the leader. If the connection breaks, the client connect to an other server.

Zookeeper maintains a set a guarantees important to note:

  • writes sent by a client are chronological
  • writes are atomic
  • data is up-to-date everywhere within a certain time bound limit (order of 10sec)

Example in java

Let’s see how zookeeper could be used to managed to configuration of a distributed application. For our example, a client will call a single zookeeper server. Both will be located on the local machine. The terminal will be used to create the configuration. The java application will read the configuration.

We start our zookeeper server in the terminal.

bin/zkServer.sh start

We connect a client to the server.

bin/zkCli.sh -server

We create several znodes for our configuration system.

create /dev null

create /dev/appsettings null

create /dev/appsettings/apiurl myurl

create /dev/appsettings/dbconnection myconnection

We create a java application that connects to the server and retrieve data stored in the znodes. A config file for the java application is still needed to locate the zookeeper server.

The pom.xml to retreive zookeeper via Maven.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">



The config.properties file:


The java program:

import org.apache.zookeeper.ZooKeeper;

import java.io.FileInputStream;
import java.util.Properties;

public class Program {
    private static ZooKeeper zk=null;

    public static void main(String[] args) throws Exception {

        try {
            Properties prop = new Properties();
            prop.load(new FileInputStream("config.properties"));

            String hostPort = prop.getProperty("zkhostport");

            zk = new ZooKeeper(hostPort, 3000, null);

            String env = prop.getProperty("env");
            String url = new String(GetData("/"+env+"/appsettings/apiurl"));
            String dbconnection = new String(GetData("/"+env+"/appsettings/dbconnection"));

            // do something with the data from zookeeper
        catch (Exception ex) {
        finally {
            if(zk != null) zk.close();

    private static byte[] GetData(String path) throws Exception {
        return zk.getData(path, false, null);

Zookeeper can be the bread and butter of an other distributed application.  It can  be used in more advanced scenarios like synchronization, leader election, and notification system.

Distributed programming

Kafka core concepts

Kafka is a messaging framework for building real time streaming applications. It allows to build distributed publish-subscribe systems. In this article we will present the core concepts of the framework.


One way to start with Kafka is to understand its APIs. There are four of them :

  • The Producer API allows to publish records to a particular category of message called topic.
  • The Streams API allows to transform messages from a topic into messages of a output topic
  • The Consumer API allows to subscribe to messages published.
  • The Connect API allows to send messages to other applications such as a Database

Clients that expose those APIs are available in several languages such as Java, C#, C++ and Python.


Kafka is based on the concept of topic. A topic is a category of messages. It is divided into several parts called partitions. The whole topic represents a log. A partition is an append only, ordered sequence of records.


Each record in a partition is uniquely identified by an id called the offset. The record consists in a key, a time stamp, and a value. Records are saved on disk and have an automatic retention policy: they are erased after the configured period. Partitions can also be replicated.

The producer chooses to which partition he will write into. For example it could be in a round robin fashion or based on a hash function.



A consumer group represents a set of consumer instances. The consumption of the partitions is fairly divided among the consumer instances.  Each consumer instance is the exclusive consumer of “his” partitions.

Key elements to understand

Kafka can deal with large amount of data with little overhead thanks to key features:

  • Consumption managed by consumers means less overhead on the server side
  • Partitions having different consumers provide parallelism and a way to scale
  • Automatic deletion gives the fastest way to delete old record

The framework also give some guarantees:

  • Partitions replication provides fault tolerance
  • Persistent messages allows to replay computation for a consumer
  • A consumer has an exclusivity (within its group) over the partitions he consumes
  • Writes in a partition are append only and ordered

Kafka is made with simple choices yet rather innovative. Those choices makes it robust, fast and scalable.


Things to know when switching from a RDMS to MongoDB

Before switching from a RDMS such as Oracle or SQL Server to MongoDB, one should be familiar with some key concepts of the NoSQL DataBase.

Translation needed

Before starting, we have should translate some traditional concepts of the RDMS world :

  • A collection is like a table
  • A document is a like a row
  • Afield is a like a column

That said we can take a look at the key concepts of MongoDB.

Key Concepts

Dynamic schema

The fields of a document can be changed at anytime. Documents of a collection may have different fields or even different data types for the same field.

Atomicity of write operations

Write operations are atomic for one single document. A document will never be partially update, even if the document contains multiple sub-documents. On the contrary write operations on multiple documents are never atomic.

Unique Index

Unique index allows to have uniqueness. For example, we can create a unique index on the field person_id on the persons collection.

Read uncomitted

Readers can see the result of writes before the writes are said to be durable. A durable write is a write that persists after a shutdown or restart of one or more servers.


The same data set can be replicated in order to provide high availability (for read performance) or redundancy (for data security).


A collection can be partitioned among multiple server  in order to offer horizontal scaling. Data is partitioning is based on a key called the shard key.

MongoDB is easy to start with but can be difficult to master. Those are the key concepts to know before diving more into the understanding of the NoSQL database.

Distributed programming

A word on HDFS (Hadoop Distributed File System)

Last time we have made an introduction to Hadoop MapReduce. We have seen that it relied on its file system: Hadoop Distributed File System (HDFS). Today we are going to take a look at this.

What is HDFS?

The goal of HDFS is to stored large amount of data in a distributed and fault tolerant way. A large file loaded into HDFS will be divided into small chunks  and replicated several times into different nodes of the cluster. The block size and the replication factor is actually configurable per file. This allows to apply some computation on the data locally using MapReduce.  HDFS is said to be append only: once a file is created in HDFS it can only be appended.

Name nodes and Data nodes.

HDFS is based on a master/slave architecture. The NamedNode is the only node responsible for managing the access to the data by clients.  The DataNodes are the nodes responsible for storing the data. The NamedNode  actually stores the metadata of all the data stored in HDFS, and maintains a traditional hierarchical file organisation: the File System Namespace.

Some HDFS commands

These commands are rather simple. In order to do this, you should simply install Hadoop on a machine using a single node setup.

List contents: bin/hadoop fs -ls

Load a file: bin/hadoop fs -put sourcedir targetdir

Delete a file: bin/hadoop fs -rmr -r dir

Distributed programming

An introduction to Hadoop MapReduce

Today we are going to talk about a famous BigData framework called Hadoop MapReduce. In this article, after presenting the framework, we will make a small example using Java and MapReduce Hadoop on Linux Raspbian (yes I am testing Hadoop on a raspberry pi!).

What is MapReduce?

MapReduce is a framework to process very large amount of data easily and efficiently. Typically it gives a solution to this problem: we have terabytes of data, and we need to apply some computation on it to produce a result.

Ok, but how? In very short, MapReduce will use the memory, cpu and storage capacity of a cluster of nodes. The developer using this framework will just need to:

  • install and configure Hadoop on each node
  • make a program that implements two methods: map and reduce

And it is magic? It is near magic, yes! The key concept of understanding MapReduce is actually its file system: the Hadoop Distributed File System (HDSF). It splits huge data into small chunks (typically 68MB or 128MB) and stores them in different nodes of a cluster. During computation, nodes will execute in parallel on the data that is locally stored. Therefore, we are able to:

  • process extremely huge data by splitting them into smaller ones
  • optimize CPU/RAM by executing in different nodes concurrently
  • reduce network bandwidth because the data is located on the node where the computation happens

What does Map and Reduce Do?

Map Phase

Map takes key/value pairs and produce intermediary key/value pairs. The is known as the Map Phase and is executed in parallel by MapperTasks where each MapperTask process a different chunk of the input data.

(input) <k1, v1> -> Map -> <k2, v2> (intermediary ouput)

Reduce Phase

The framework then takes over in a process know as Shuffle and Sort. This happens before the ReducerTasks entering the Reduce phase.

Each intermediary key/value pairs is attributed to a specific ReducerTask (Shuffle): the only one responsible of a given key.  The pairs are then grouped by key (Sort) producing pairs of key/list of values. According to the documentation, the shuffle and sort phases occur simultaneously.

Finally,  ReducerTasks produces the final output for each key it is processing.

 (int. ouput) <k2, v2> -> Shuffle&Sort -> <k2, listOf v2> -> Reduce -> <k3,  v3> (ouput)

The complete process is:

<k1, v1> -> Map -> <k2, v2> -> Shuffle&Sort -> <k2, list of v2> -> Reduce -> <k3,  v3>

Example with WordSearch


We need to install Hadoop first. I have choosen to install it on Raspbian. For the example, we are going to set up a single node system for the purpose of simplicity.

Input data

The input consists of two files located on a folder in the Linux machine.

File01 contains: hello world bye hadoop

File02 contains: hello world hi harry home holiday

Nb: for simplicity, those files and folders are created manually without using HDFS commands.

Source File

Our Hadoop program is a Java program that looks like this. In the Map method we take only the word that starts with a specific string. In the Reduce method, we simply count the occurrences of each word filtered by the Map Phase.

Running the MapReduce program

First, we need to compile and package our program into a .jar file. Second we can run the program with Hadoop by choosing the following parameters:

  • the path of the java program
  • the name of the java class contaning Map/Reduce
  • the path of the input folder
  • the path of the output folder
  • the “startWith” string parameter

For the sake of simplicity let’s say that our jar, input and output are all located in Hadoop folder installation.

The command is to run MapReduce is:

bin/hadoop jar wordsearch.jar WordSearch input ouput ha


Looking at the file created in the output folder, we have a file named “part-r-00000” with all words starting with “ha” and the number of times they appear:

hadoop 1
harry 1

Which is correct!

Hadoop MapReduce is a cleanly thought distributed framework. In order to be use, one need a good comprehension of how things work.