An Advanced Elasticsearch Architecture for High-volume Reindexing

by Fred de Villamil , Director of Infrastructure October 26, 2016 - 10 minutes read

Originally published at

I’ve found a new and funny way to play with Elasticsearch to reindex a production cluster without disturbing our clients. If you haven’t already, you might enjoy what we did last summer reindexing 36 billion documents in 5 days within the same cluster.

Reindexing that cluster was easy because it was not on production yet. Reindexing a whole cluster where regular clients expect to get their data in real time offers new challenges and more problems to solve.

As you can see on the screenshot below, our main bottleneck the first time we reindexed Blackhole, the well named, was the CPU. Having the whole cluster at 100% and a load of 20 is not an option, so we need to find a workaround.

This time, we won’t reindex Blackhole but Blink. Blink stores the data we display in our clients dashboards. We need to reindex them every time we change the mapping to enrich that data and add new feature our clients and colleagues love.

A glimpse at our infrastructure

Blink is a group of 3 clusters built around 27 physical hosts each, having 64GB RAM and 4 core / 8 threads Xeon D-1520. They are small, affordable and disposable hosts. The topology is the same for each cluster:

  • 3 master nodes (2 in our main data center and 1 in our backup data center plus a virtual machine ready to launch in case of major outage)
  • 4 http query nodes (2 in each data center)
  • 20 data nodes (10 in each data center)

The data nodes have 4*800GB SSD drives in RAID0, about 58TB per cluster. The data and nodes are configured with Elasticsearch zones awareness. With 1 replica for each index, that makes sure we have 100% of the data in each data center so we’re crash proof.

We didn’t allocate the http query nodes to a specific zone for a reason: we want to use the whole cluster when possible, at the cost of 1.2ms of network latency. From Elasticsearch documentation:

When executing search or GET requests, with shard awareness enabled, Elasticsearch will prefer using local shards — shards in the same awareness group — to execute the request. This is usually faster than crossing racks or awareness zones.

In front of the clusters, we have a layer 7 load balancer made of 2 servers each running Haproxy and holding various virtual IP addresses (VIP). A keepalived ensures the active load balancer holdes the VIP. Each load balancer runs in a different data center for fault tolerance. Haproxy uses the allbackups configuration directive so we access the query nodes in the second data center only when the two first ones are down.

frontend blink_01
 default_backend be_blink01
backend be_blink01
 balance leastconn
 option allbackups
 option httpchk GET /_cluster/health 
 server esnode01 check port 9200 inter 3s fall 3
 server esnode02 check port 9200 inter 3s fall 3
 server esnode03 check port 9200 inter 3s fall 3 backup
 server esnode04 check port 9200 inter 3s fall 3 backup

So our infrastructure diagram becomes:

In front of the Haproxy, we have an applicative layer called Baldur. Baldur was developed by my colleague Nicolas Bazire to handle multiple versions of a same Elasticsearch index and route queries amongst multiple clusters.

There’s a reason why we had to split the infrastructure in multiple clusters even though they all run the same version of Elasticsearch, the same plugins, and they do exactly the same things. Each cluster supports about 10,000 indices, and 30,000 shards. That’s a lot, and Elasticsearch master nodes have a hard time dealing with so much indexes and shards.

Baldur is both an API and an applicative load balancer built on Nginx with the LUA plugin. It connects to a MySQL database and has a local memcache based cache. Baldur was built for 2 reasons:

  • to tell our API the active index for a dashboard
  • to tell our indexers which indexes they should write in, since we manage multiple versions of the same index.

In elasticsearch, each index has a defined naming:

<mapping version>_<dashboard id>

In baldur, we use have 2 tables:

The first one is the indexes table with the triplet

id / cluster id / mapping id

That’s how we manage to index into multiple versions of a same index with the ongoing data during the migration process from one mapping to another.

The second table is the reports table with the triplet

client id / report id / active index id

So the API knows which index it should use as active.

Just like the load balancers, Baldur holds a VIP managed by another Keepalived, for fail over.

Using Elasticsearch for fun and profit

Since you know everything you need about our infrastructure, let’s talk about playing with our Elasticsearch cluster the smart way for fun and, indeed, profit.

Elasticseach and our indexes naming allows us to be lazy so we can watch more cute kitten videos on Youtube. To create an index with the right mapping and settings, we use Elasticsearch templates and auto create index patterns.

Every node in the cluster has the following configuration:

 auto_create_index: +<mapping id 1>_*,+<mapping id 2>_*,-*

And we create a template in Elasticsearch for every mapping we need.

PUT /_template/template_<mapping id>
  "template": "<mapping id>_*",
  "settings": {
    "number_of_shards": 1
  "mappings": {
    "add some json": "here"

Every time the indexer tries to write into a not yet existing index, Elasticsearch creates it with the right mapping. That’s the magic.

Except this time, we don’t want to create empty indexes with a single shard as we’re going to copy existing data.

After playing with Elasticsearch for years, we’ve noticed that the best size / shard was about 10GB. This allows faster reallocation and recovery at a cost of more Lucene segments during heavy writing and more frequent optimization.

On Blink, 1,000,000 documents weight about 2GB so we’re creating indexes with 1 shard for each 5 million documents + 1 when the dashboard already has more than 5 million documents.

Before reindexing a client, we run a small script to create the new indexes with the right amount of shards. Here’s a simplified version without error management for your eyes only.

$ curl -XPUT http://esnode01:9200/<new mapping id>_<dashboard id> -d '{ "settings.index.number_of_shards" : '$(( $(curl -XGET http://esnode01:9200/<old mapping id>_<dashboard_id>/_count | cut -f 2 -d : | cut -f 1 -d ",") / 5000000 + 1))'}'

Now we’re able to reindex, except we didn’t solve the CPU issue. That’s where fun things start.

What we’re going to do is to leverage Elasticsearch zone awareness to dedicate a few data nodes to the writing process. You can also add some new nodes if you can’t afford removing a few from your existing cluster, it works exactly the same way.

First, let’s kick out all the indexes from those nodes.

PUT /_cluster/settings
 "transient" : {
 "cluster.routing.allocation.exclude._ip" : "<data node 1>,<data node 2>,<data node x>"

Elasticsearch then moves all the data from these nodes to the remaining ones. You can also shutdown those nodes and wait for the indexes to recover but you might lose data.

Then, for each node, we edit Elasticsearch configuration to assign these nodes to a new zone called envrack (f$#!ed up in French). We put all these machines in the secondary data center to use the spare http query nodes for the indexing process.

 zone: 'envrack'

Then restart Elasticsearch so it runs with the new configuration.

We don’t want Elasticsearch to allocate the existing indexes to the new zone when we bring back these nodes online, so we update these index settings accordingly.

$ curl -XPUT http://esmaster01:9200/<old mapping id>_*/_settings -d '{
 "" : "envrack"

The same way, we don’t want the new indexes to be allocated to the production zones, so we update the creation script.

counter=$(curl -XGET http://esnode01:9200/<old mapping id>_<dashboard_id>/_count | cut -f 2 -d : | cut -f 1 -d ",")
if [ $counter -gt 5000000 ]; then
 shards=$(( $counter / 5000000 + 1 ))
curl -XPUT http://esnode01:9200/<new mapping id>_<dashboard id> -d '{
 "settings" : {
 "index.number_of_shards" : '$counter',
 "index.numer_of_replicas" : 0,
 "" : "barack,chirack"

More readable than a oneliner isn’t it?

We don’t add a replica for 2 reasons:

  • The cluster is zone aware and we only have one zone for the reindexing
  • Indexing with a replica means indexing twice, so using twice as much CPU. Adding a replica after indexing is just transferring the data from one host to another.

Indeed, losing a data node means losing data. If you can’t afford reindexing an index multiple times in case of crash, don’t do this and add another zone or allow your new indexes to use the data from the existing zone in the backup data center.

There’s one more thing we want to do before we start indexing.

Since we’ve set the new zone in the secondary data center, we update the http query nodes configuration to make them zone aware so they read the local shards in priority. We do the same with the active nodes so they read their zone first. That way, we can query the passive http query nodes when reading during the reindexing process with little hassle on what the clients access.

In the main data center:

 zone: 'barack'

And in the secondary:

 zone: 'chirack'

Here’s what our infrastructure looks like now.

It’s now time to reindex.

We first tried to reindex taking the data from our database clusters, but it put them on their knees. We have large databases and our dashboard are made of documents crawled over time, which means large queries on a huge dataset, with random accesses only. In one word: sluggish.

What we’re doing then is copy the existing data, from the old indexes to the new ones, then add the stuff that makes our data richer.

To copy the content of an existing index into a new one, Logstash from Elastic is a convenient tool. It takes the data from a source, transforms it if needed and pushes it into a destination.

Our Logstash configuration are pretty straightforward:

input {
 elasticsearch {
 hosts => [ "esnode0{3,4}" ]
 index => "<old mapping id>_INDEX_ID"
 size => 1000
 scroll => "5m"
 docinfo => true
output {
 elasticsearch {
 host => "esdataXX"
 index => "<new mapping id>_INDEX_ID"
 protocol => "http"
 index_type => "%{[@metadata][_type]}"
 document_id => "%{[@metadata][_id]}"
 workers => 10
 stdout {
 codec => dots

We can now run Logstash from a host inside the secondary data center.

Here, we:

  • read from the passive http query nodes. Since they’re zone aware, they query the data in the same zone in priority
  • write on the data nodes inside the indexing zone so we won’t load the nodes accessed by our clients

Once we’ve done with reindexing a client, we update Baldur to change the active indexes for that client. Then, we add a replica and move the freshly baked indexes inside the production zones.

$ curl -XPUT http://esnode01:9200/<new mapping id>_<dashboard id> -d '{
 "settings" : {
 "index.numer_of_replicas" : 1,
 "" : "envrack",
 "" : "barack,chirack"

Now, we’re ready to delete the old indexes for that client.

$ curl -XDELETE http://esnode01:9200/<old mapping_id>_<dashboard id>


This post doesn’t deal with cluster optimization for massive indexing on purpose. The Web is full of articles on that topic so I decided it didn’t need another one.

What I wanted to show is how we managed to isolate the data within the same cluster so we didn’t disturb our clients. Considering our current infrastructure, building 3 more clusters might have been easier, but it has a double cost we didn’t want to afford.

First, it means doubling the infrastructure, so buying even more servers you won’t use anymore after the reindexing process. And it means buying these servers 1 or 2 months upfront to make sure they’re delivered in time.

I hope you enjoyed reading that post as much as I enjoyed sharing my experience on the topic. If you did, please share it around you, it might be helpful to someone!

An Elasticsearch cheat sheet

by Fred de Villamil , Director of Infrastructure August 27, 2016 - 4 minutes read

Originally published at

Sending out a search. Photo credit Robert Mc Goldrick

I’m using Elasticsearch a lot, which brings me to run the same commands again and again to manage my clusters. Even though they’re now all automated in Ansible, I thought it would be interesting to share them here.

Mass index deletion with pattern

I often have to delete hundreds of indexes at once. Their name usually follow some patterns, which makes batch deletion easier.

$ for index in $(curl -XGET esmaster:9200/_cat/indices | awk '/pattern/ {print $3}'); do curl -XDELETE esmaster:9200/$index?master_timeout=120s; done

Mass optimize, indexes with the most deleted docs first

Lucene, which powers Elasticsearch has a specific behavior when it comes to delete or update documents. Instead of actually deleting or overwriting the data, if flags it as deleted and write a new one. The only way to get rid of a deleted document is to run an optimize on your indexes.

This snippet sorts your existing indexes by the number of deleted documents before it runs the optimize.

$ for indice in $(CURL -XGET esmaster:9200/_cat/indices | sort -rk 7 | awk '{print $3}'); do curl -XPOST http://esmaster:9200/${indice}/_optimize?max_num_segments=1; done

Restart a cluster using rack awareness

Using rack awareness allows to split your replicated data evenly between hosts or data center. It’s convenient to restart half of your cluster at once instead of host by host.

$ curl -XPUT 'host:9200/_cluster/settings' -d '{ "transient" : { "cluster.routing.allocation.enable": "none" }}'; for host in $(curl -XGET esmaster:9200/_cat/nodeattrs?attr | awk '/rack_id/ {print $2}'); do ssh $host service elasticsearch restart; done; sleep60; curl -XPUT 'host:9200/_cluster/settings' -d '{ "transient" : { "cluster.routing.allocation.enable": "all" }}'

Optimize your cluster restart

There’s a simple way to accelerate your cluster restart. Once you’ve brought your masters back, run this snippet. Most of the options are self explanatory:

$ curl -XPUT 'http://escluster:9200/_cluster/settings' -d '{
  "transient" : {
    "cluster.routing.allocation.cluster_concurrent_rebalance": 20,
    "indices.recovery.concurrent_streams": 20,
    "cluster.routing.allocation.node_initial_primaries_recoveries": 20,
    "cluster.routing.allocation.node_concurrent_recoveries": 20,
    "indices.recovery.max_bytes_per_sec": "2048mb",
    "cluster.routing.allocation.disk.threshold_enabled" : true,
    "cluster.routing.allocation.disk.watermark.low" : "90%",
    "cluster.routing.allocation.disk.watermark.high" : "98%",
    "cluster.routing.allocation.enable": "primary"

Then, once your cluster is back to yellow, run that one:

$ curl -XPUT 'http://escluster:9200/_cluster/settings' -d '{
  "transient" : {
    "cluster.routing.allocation.enable": "all"

Remove data nodes from a cluster without getting yellow

$ curl -XPUT 'http://escluster:9200/_cluster/settings' -d '{
  "transient" : {
    "cluster.routing.allocation.exclude._ip" : "<data node 1>,<data node 2>,<data node x>"

Get useful information about your cluster

Nodes information

This snippet gets the most useful information from your Elasticsearch nodes:

  • hostname
  • role (master, data, nothing)
  • free disk space
  • heap used
  • ram used
  • file descriptors used
  • load
$ curl -XGET https://escluster/_cat/nodes?v&h=host,r,d,hc,rc,fdc,l
ost          r        d     hc     rc   fdc    l d      1tb  9.4gb 58.2gb 20752 0.20 d  988.4gb 16.2gb 59.3gb 21004 0.12 d      1tb 14.1gb 59.2gb 20952 0.18 d      1tb 14.3gb 58.8gb 20796 0.10 d      1tb 16.1gb 60.5gb 21140 0.17 d      1tb  9.5gb 59.4gb 20928 0.19

Then, it’s easy to sort the output to get interesting information.

Sort by free disk space:

$ curl -XGET https://escluster/_cat/nodes?h=host,r,d,hc,rc,fdc,l | sort -hrk 3

Sort by heap occupancy:

$ curl -XGET https://escluster/_cat/nodes?h=host,r,d,hc,rc,fdc,l | sort -hrk 4

And so on.

Monitor your search queues

It’s sometimes useful to know what happens on your data nodes search queues. Beyond the search thread pool(default thread pool being ((CPU * 3) / 2) + 1 on each data node, queries get stacked into the search queue, a 1000 buffer.

$ while true; do curl -XGET 'host:9200/_cat/thread_pool?v&h=host,search.queue,,search.rejected,search.completed' | sort -unk 2,3 ; sleep 5 ; done

That code snippet only displays the data node running active search queries so it’s easier to read on large cluster.

Indices information

This snippet gets most information you need about your indices. You can then grep on what you need to know: open, closed, green / yellow / red…

$ curl -XGET https://escluster/_cat/indices?v

Shard allocation information

Shards movement have lots of impact on your cluster performances. These snippets allows you to get the most critical information about your shards.

$ curl -XGET https://escluster/_cat/shards?v
17_20140829 4  r STARTED  2894319    4.3gb esdata89
17_20140829 10 p STARTED  2894440    4.3gb esdata87
17_20140829 10 r STARTED  2894440    4.3gb esdata44
17_20140829 3  p STARTED  2784067    4.1gb esdata48

Recovery information

Recovery information comes under the form of a JSON output but it’s still easy to read to understand what happens on your cluster.

$ curl -XGET https://escluster/_recovery?pretty&active_only

Segments information (can be extremely verbose)

$ curl -XGET https://escluster/curl -XGET https://escluster/_cat/nodes?h=host,r,d,hc,rc,fdc,l | sort -hrk 3

Cluster stats

$ curl -XGET https://escluster/_cluster/stats?pretty

Nodes stats

$ curl -XGET https://escluster/_nodes/stats?pretty

Indice stats

$ curl -XGET https://escluster/someindice/_stats?pretty

Indice mapping

$ curl -XGET https://escluster/someindice/_mapping

Indice settings

$ curl -XGET https://escluster/someindice/_settings

Cluster dynamic settings

$ curl -XGET https://escluster/_cluster/settings

All the cluster settings (can be extremely verbose)

$ curl -XGET https://escluster/_settings

That’s all for now. Don’t hesitate to bookmark this post as I’ll update it when I add new useful Elasticsearch tricks.

How we reindexed 36 billion documents in 5 days within the same Elasticsearch cluster

by Fred de Villamil , Director of Infrastructure July 6, 2016 - 13 minutes read

Originally published at

At Synthesio, we use ElasticSearch at various places to run complex queries that fetch up to 50 million rich documents out of tens of billion in the blink of an eye. Elasticsearch makes it fast and easily scalable where running the same queries over multiple MySQL clusters would take minutes and crash a few servers on the way. Every day, we push Elasticsearch boundaries further, and going deeper and deeper in its internals leads to even more love.

Last week, we decided to reindex a 136TB dataset with a brand new mapping. Updating an Elasticsearch mapping on a large index is easy until you need to change an existing field type or delete one. Such updates require a complete reindexing in a separate index created with the right mapping so there was no easy way out for us.

The “Blackhole” cluster

We’ve called our biggest Elasticsearch cluster “Blackhole”, because that’s exactly what it is: a hot, ready to use datastore being able to contain virtually any amount of data. The only difference with a real blackhole is that we can get our data back at the speed of light.

When we designed blackhole, we had to chose between 2 different models.

  • A few huge machines with 4 * 12 core CPU, 512GB of memory and 36 800GB SSD drives, each of them running multiple instances of Elasticsearch.
  • A lot of smaller machines we could scale horizontally as the cluster grows.

We opted for the latter since it would make scaling much easier and didn’t require spending too much money upfront.

Blackhole runs on 75 physical machines:

  • 2 http nodes, one in each data center behind a HAProxy to load balance the queries.
  • 3 master nodes located in 3 different data center.
  • 70 data nodes into 2 different data center.

Each node has quad core Xeon D-1521 CPU running at 2.40GHz and 64GB of memory. The data nodes have a RAID0 over 4*800GB SSD drives with XFS. The whole cluster runs a Systemd less Debian Jessie with a 3.14.32 vanilla kernel. The current version of the cluster has 218,75TB of storage and 4,68TB of memory with 2.39TB being allocated to Elasticsearch heap. That’s all for the numbers.

Elasticsearch configuration

Blackhole runs ElasticSearch 1.7.5 on Java 1.8. Indexes have 12 shards and 1 replica. We ensure each data center hosts 100% of our data using Elasticsearch rack awareness feature. This setup allows to crash a whole data center without neither data loss nor downtime, which we test every month.

All the filtered queries are ran with _cache=false. ElasticSearch caches the filtered queries result in memory, making the whole cluster explode at the first search. Running queries on 100GB shards, this is not something you want to see.

When running in production, our configuration is:

    node_initial_primaries_recoveries: 20
    node_concurrent_recoveries: 20
    cluster_concurrent_rebalance: 20
      threshold_enabled: true
        low: 60%
        high: 78%
  number_of_shards: 12
  number_of_replicas: 1
      max_thread_count: 8
      type: 'concurrent'
        type: 'tiered'
        max_merged_segment: 100gb
        segments_per_tier: 4
        max_merge_at_once: 4
        max_merge_at_once_explicit: 4
    type: niofs
      max_clause_count: 10000
  auto_create_index: false
      max_bytes_per_sec: 2048mb
        limit: 80%
        size: 25%
        expire: 1m
        type: 'none'
    minimum_master_nodes: 2
        enabled: false
        hosts: ["master01","master02","master03"]
    queue_size: 3000
    type: cached
    queue_size: 3000
    type: cached
  mlockall: true
  index_buffer_size: 10%
  max_content_length: 1024mb

After trying both ElasticSearch default_fs and mmapfs, we’ve picked up niofs for file system storage.

The NIO FS type stores the shard index on the file system (maps to Lucene NIOFSDirectory) using NIO. It allows multiple threads to read from the same file concurrently.

The reason why we decided to go with niofs is to let the kernel manage the file system cache instead of relying on the broken, out of memory error generator mmapfs.

Tuning the Java virtual machine

We launch the java virtual machine with -Xms31g -Xmx31g. Combined with ElasticSearch mlockall=true, it ensures ElasticSearch gets enough memory to run and never swaps. The remaining 33GB are used for ElasticSearch threads and file system cache.

Despite ElasticSearch recommendations we have replaced the Concurrent Mark Sweep (CMS) garbage collector with the Garbage First Garbage Collector (G1GC). With CMS, we would run into a stop the world garbage collection for every single query on more than 1 month of data.

Our configuration of G1GC is relatively simple but does the job under pressure:

JAVA_OPTS="$JAVA_OPTS -XX:MaxGCPauseMillis=200"
JAVA_OPTS="$JAVA_OPTS -XX:GCPauseIntervalMillis=1000"
JAVA_OPTS="$JAVA_OPTS -XX:InitiatingHeapOccupancyPercent=35"

Blackhole Initial indexing

We started the initial indexing mid December 2015. It took 19 days from fetching the raw data to pushing it into ElasticSearch.

Back then, Blackhole only had 46 nodes:

  • 3 master nodes
  • 1 query node
  • 42 data nodes

This led to a cluster sized for 30 months of data with 1.29TB of memory and 134TB of storage, all SSD.

For this initial indexing, we decided to go with 1 index per month and 30 shards per index. This didn’t work as expected as each query on a month would request data from 3TB and 1.2 billion documents. As most queries went on 3 to 12 months, this made the cluster impossible to scale properly.

The first part of the process took 10 days. We had to fetch 30 billion documents from our main Galera datastore, turn it into JSON and push it into a Kafka queue, each month of data being pushed into a different Kafka partition. Since we were scanning the database incrementally, the process went pretty fast considering the amount of data we were processing.

The migration processes were running on 8 virtual machines with 4 core and 8GB RAM. Each machine was running a 8 processes of a Scala homemade program.

During the second part, we merged the data from the Kafka with data from 2 other Galera clusters and an Elasticsearch cluster before pushing them into Blackhole.

Blackhole initial migration

The merge and indexing parts took place on 8 virtual machines, each having 4 core and 8GB RAM. Each machine was running 8 indexing processes reading an offset of a Kafka partition.

The indexer was shard aware. It had a mapping between the index it was writing on, its shards and the data node they were hosted on. This allowed to index directly on the right data nodes with the lowest possible network latency.

This part was not as smooth as we expected.

The first version of the indexer was developed in Scala, but for some reasons was slow as hell, not being able to index more than 30,000 documents per second. We rewrote it in Go in 2 days, and it was much better, with an average of 60,000 indexed documents per second, with peaks at 120,000 documents per second.

Surprisingly, the main bottleneck was neither one of the Galera clusters nor the Elasticsearch metadata cluster, but the Kafka queues. For some reasons, we could not read more than 10,000 documents per second per Kafka partition.

The other unexpected bottleneck was the CPU. Surprisingly, we were CPU bound but the disks were not a problem (which is normal since we’re using SSDs).

After 9 days, the data was fully indexed and we could start playing with the data.

Blackhole reindexing

When we decided to change Blackhole mapping, we had enough experience with the cluster and its content to avoid previous mistakes and go much faster.

Instead of monthly indexes, we decided to split the cluster into daily indexes. A few tests on a migrating index showed it was the way to go.

With the new mapping dropping a bunch of data, we moved from 3GB for 1 million documents (with a replica) to 2GB for 1 million documents. Going daily reduced the average index from 3TB to 120GB, and a single shard from 100GB to 10GB. Having a large number of machines, this allowed to better use the ressources, starting with the JVM heap, running parallel queries.

The reindexing process

Instead of polling the data from our database clusters, we decided to reuse the data from Blackhole itself. This meant reading and writing on the same cluster simultaneously, adding some fun in the operation.

This time, we did not use separate virtual machines to host the indexing processes. Instead, we decided to run the indexers on the data nodes, read locally and write on their counterpart in the secondary data center. Considering a 10Gb link and a 46ms network latency, that solution was acceptable. It meant we had 70 machines to both read and write to, allowing maximum parallelism.

There are many solutions to copy an Elasticsearch index to another, but most of them neither allow splitting one to many or change the data model. Unfortunately, the new mapping involved deleting some fields and moving other fields somewhere else. Since we did not have the time to build a homemade solution, we decided to go with Logstash.

Logstash has both an Elasticsearch input, for reading, an Elasticsearch output, for writing, and a transform filter to change the data model. The input module accepts a classic Elasticsearch query and the output module can be parallelized.

We ran a few tests on Blackhole to determine which configuration was the best, and ended with 5000 documents scrolls and 10 indexing workers.

Testing with 5000 documents scroll and 10 workers

For these tests, we were running with a production configuration, which explains the refreshes and segment count madness. Indeed, running with 0 replica was faster, but since we’re using RAID0, this configuration was a no go.

During the operation, both source and target nodes behaved without problems, specifically on the memory level.

Source node for reindexing

Target node behavior

For the first tests, we ran logstash against a full day of reindexation, using a simple Elasticsearch query:

query => '{ "query": { "range": { "date": { "gte": "yyyy-mm-ddT00:00.000", "lte": "yyyy-mm-dd+1T00:00.000+01:00" } } } }

Unfortunately, for some reasons, we had missing documents because our scroll keepalive of 5 minutes was too short. This made catching up with the data too long as we had to replay the whole day, so we decided to run hourly queries.

Logstash configuration

input {
  elasticsearch {
    hosts => [ "local elasticsearch node" ]
    index => "index to read from"
    size => 5000
    scroll => "20m" # 5 minutes initial
    docinfo => true
    query => '{ "query": { "range": { "date": { "gte": "2015-07-23T10:00.000+01:00", "lte": "2015-07-23T11:00.000+01:00" } } } }'
output {
  elasticsearch {
    host => "remote elasticsearch node"
    index => "index to write to"
    protocol => "http"
    index_type => "%{[@metadata][_type]}"
    document_id => "%{[@metadata][_id]}"
    workers => 10
  stdout {
    codec => rubydebug # because removing the timestamp field makes logstash crash
filter {
  mutate {
    rename => { "some field" => "some other field" }
    rename => { "another field" => "somewhere else" }
    remove_field => [ "something", "something else", "another field", "some field", "@timestamp", "@version" ]

Reindexing Elasticsearch configuration

We changed only a few settings for that reindexing.

  index_buffer_size: 50% (instead of 10%)
      type : "none" (as fast as your SSD can go)
    disable_flush: true
  refresh_interval: -1 (instead of 1s)
      max_bytes_per_sec: "2gb"

We wanted to limit the Lucene refreshes as much as we could, preferring to manage hundreds of thousand segments instead of limiting our throughput for CPU overhead.

Introducing Yoko and Moulinette

To manage the indexing process, we have created 2 simple tools: Yoko and Moulinette.

Yoko and Moulinette use a simple MySQL database with every index to process, query to run and status. The data model is pretty self explanatory:

  `index_from` varchar(16) NOT NULL,
  `index_to` varchar(16) NOT NULL,
  `logstash_query` text NOT NULL,
  `status` enum("todo", "processing", "done", "complete", "failed") DEFAULT "todo"

Before indexing, we fill in the Yoko database with every index we want to migrate along with all the logstash queries we need to run. One line contains the source index, destination index and the query to reindex 1 hour of data.

Yoko is a simple Python daemon that manages the global indexing processes. It:

  • Creates the daily indexes when they don’t exist yet with the right mapping.
  • Checks for every “done” daily index and compares the number of documents from the initial index running the logstash query.
  • Moves each successful “done” line to “complete” if the count matches or “failed”.
  • Delete each monthly index when every day of a month is “complete”.
  • Changes the refresh values when a daily index is “complete”.
PUT /index/_settings?master_timeout=120s
  "translog.disable_flush" : "false",
    "index" : {
        "refresh_interval" : "1s"

Moulinette is the processing script. It’s a small daemon written in Bash (with some ugly bashisms) that runs on every indexing node. It fetches lines in “todo” from the yoko table, generates the logstash.conf with the source and destination index, and source and destination node and Logstash query. Then it runs Logstash, and once Logstash exits, switches the line to “done” if Logstash exit code is 0, or “failed” otherwise.

Reindexing in 5 days

Once again, the main problem was being CPU bound. As you can see on that Marvel screenshot, the cluster was put under heavy load during the whole indexing process. Considering that we were both reading and writing on the same cluster, with an indexing rate over 90,000 documents per second with 140,000 documents per second peaks, this is not surprising at all.

Reindexing blackhole, 2 days after

Having a look at the CPU graphs, there was little we could to to improve the throughput without dropping Logstash and relying on a faster solution running on less nodes.

CPU usage

The disks operations show well the scroll / index processing. There was certainly some latency inside Logstash for the transform process, but we didn’t track it.

Disks operations

The other problem was losing nodes. We had some hardware issues and lost some nodes here and there. This caused indexing from that node to crash and indexing to that node to stale since Logstash does not exit when the output endpoint crashes.

This caused many lost time checking (almost) manually logs on every node once or twice a day. If an hourly index took more than 3 hours to process, we would consider it lost and restart Moulinette and move the hourly index to “todo”.

Lesson learned, Yoko and Moulinette V2 will have a better silent error handling. When an index is blocked for more than 3 hours, Yoko will raise an alert and move the index to “todo”. The alert will allow to kill the locked Logstash process and restart Moulinette as soon as there’s a problem.

The next step is optimizing the indexes, moving from an average of 1500 Lucene segments post indexing to 24 (1 segment per replica). This aims both at improving the performances and removing completely the deleted documents we had after restarting the indexing post crash. When overwriting or deleting a document, Lucene does not actually delete it but flags it at “deleted” until an optimize is performed.

Our optimize script is extremely simple, starting with the indexes that have the most important number of deleted documents to save space.

CURL_BIN=$(which curl)
if [ -z "$HOST" ]; then
  echo "Host is missing"
  exit 1
if [ -z "$CURL_BIN" ]; then
  echo "Curl binary is missing"
  exit 1
for indice in $(${CURL_BIN} -XGET http://${HOST}:9200/_cat/indices | sort -rk 7 | awk '{print $3}'); do
  if [ ! -z "$indice" ]; then
    echo $(date +"%Y%m%d %H:%M") Processing indice ${indice}
    ${CURL_BIN} -XPOST http://${HOST}:9200/${indice}/_optimize?max_num_segments=1
exit 0


Reindexing a large Elasticsearch cluster with major data mode changes was quite interesting. It allowed us to push Elasticsearch and our hardware boundaries to reach a correct throughput. Yoko and Moulinette are now reusable for every Elasticsearch cluster we run at Synthesio, allowing reindexing within a same cluster or cross clusters.

Thanks to Alexandre Heimburger.

Building an awesome Devops team on the ashes of an existing infrastructure

by Fred de Villamil , Director of Infrastructure May 6, 2016 - 18 minutes read

Originally published at

Devops Commando

5AM, the cellphone next to my bed rings continuously. The desperate CTO of a SAAS company is calling. Their infrastructure is a ruin. The system administration team has left the building, leaving all the credentials in an encrypted file. They’ll run out of business if I don’t accept to join them ASAP he cries, threatening to hang himself with a WIFI cable.

I’m exaggerating. But not that much.

I’ve given my Devops Command talk a couple of times lately. Devops Commando is about taking over and fixing a non documented, non managed, crumbling infrastructure. After my experience on avoiding disasters and building awesome devops teams rose many questions, I have decided that I would write everything down.

In the past 10 years, I’ve been working for a couple of SAAS companies, and done a few consulting for some other. As a SAAS company, you need a solid infrastructure as much as solid sales team. The first one is often considered as a cost for the company, while the second is supposed to make it live. Both should actually considered assets, which most of my clients realised too late.


Taking over an existing infrastructure when everyone has left the building is a tough, long term job. Turning a ruin into something viable won’t go without the management full support. Before accepting the job, you need to ensure a few prerequisites are filled or you’ll face a certain failure.

Control over the budget: having a tight control of the budget is the mosts important part of getting over an infrastructure that requires a full replatforming. Since you have no idea of the amount of things you’ll have to add or change, it’s a tricky exercise that needs either some experience or a budget at least twice as much as the previous year. You’re not forced to spend all the money you’re allowed, but at least you’ll be able to achieve your mission during the first year.

Control over your team hires (and fires): whether you’re taking over an existing team or building one form scratch, be sure you have the final word on hiring (or firing). If the management can’t understand that people who use to “do the job” at a certain time of the company’s life don’t fit anymore, you’ll rush into big trouble. Things get worse when you get people who’s been slacking or under performing for years. After all, if you’re jumping in, that’s because some things are really fishy aren’t they?

Freedom of technical choices: even though you’ll have to deal with an existing infrastructure, be sure you’ll be given free hands on the new technical choices when they happen. Being stuck with a manager who’ll block every new technology he doesn’t know about, or being forced to pick up all the newest fancy, not production ready things they’ve read on Hackers News makes one ops life a nightmare. From my experience, keeping the technos that work, even though they’re outdated or you don’t like them can save you lots of problems, starting with managing other people’s ego.

Freedom of tools: managing an infrastructure requires a few tools, and better pick up the ones you’re familiar with. If you’re refused to switch form Trac to Jira, or refused a PagerDuty account for any reason, be sure you’ll get in trouble very soon for anything else you’ll have to change. Currently, my favorite, can’t live without tools are Jira for project management, PagerDuty for incident alerting, Zabbix for monitoring and ELK for log management.

Being implied early into the product roadmap: as an ops manager, it’s critical to be aware of what’s going on on the product level. You’ll have to deploy development, staging (we call it theory because “it works in theory”) and production infrastructure and the sooner you know, the better you’ll be able to work. Being implied in the product roadmap also means that you’ll be able to help the backend developers in terms of architecture before they deliver something you won’t be able to manage.

Get an initial glance of the infrastructure: it’s not really a prerequisite, but it’s always good to know where you’re going. Having a glance of the infrastructure (and even better at the incident logs) allows you to setup your priorities before you actually start the job.

Your priorities, according to the rest of the company

Priority is a word that should not have a plural

For some reasons, most departments in a company have a defined, single priority. Sales priority is to bring back new clients, marketing to build new leads, devs to create a great product without bugs. When it comes to the devops team, every department has a different view on what you should do first.

The sales, consulting and marketing expect stability first to get new clients and keep the existing ones. A SAAS company with an unstable infrastructure can’t get new clients, keep the existing ones, and get bad press outside. Remember Twitter Fail Whale era? Twitter was most famous for being down that everything else.

The product team expect you to help deliver new feature first, and they’re not the only ones. New feature are important to stay up to date in front of your competitors. The market expects them, the analysts expect them, and you must deliver some if you want to look alive.

The development teams expect on demand environment. All of them. I’ve never seen a company where the development team was not asking for a virtual machine they could run the product on. And they consider it critical to be able to work.

The company exec, legal team, your management expect documentation, conformity to the standards, certifications, and they expect you to deliver fast. It’s hard to answer a RFP without a strong documentation showing you’ve a state of the art infrastructure and top notch IT processes.

As a devops manager, your only priority is to bring back confidence in the infrastructure, which implies reaching the whole company’s expectation.

The only way to reach that goal is to provide a clear, public roadmap of what you’re going to do, and why. All these points are critical, they all need to be addressed, not at the same time, but always with an ETA.

Our work organisation

I’m a fan of the Scrum agile methodology. Unfortunately, 2–3 weeks sprints and immutability do not fit a fast changing, unreliable environment. Kanban is great at managing ongoing events and issues but makes giving visibility on the projects harder. That’s why we’re using a mix of Scrum and Kanban.

We run 1 week sprints, with 50% of our time dedicated to the projects, and 50% dedicated to managing ongoing events. Ongoing events are both your daily system administration and requests from the developers that can’t wait for the following sprint.

Our work day officially starts at 10AM for the daily standup around the coffee machine. Having a coffee powered standup turns what can be seen as a meeting into a nice, devops-friendly moment where we share what’s we’ve done the day before, what we’re going to do, and which problems we have. If anyone’s stuck, we discuss the various solutions and plan a pair-working moment if it takes more than a minute to solve.

Sprint planning is done every Friday afternoon so everybody knows what they’ll do Monday morning. That’s a critical part of the week. We all gather around a coffee and start reviewing the backlog issues. Tasks we were not able to achieve during the week are added on the top of the backlog, then we move the developers requests we promised to take care of, then the new projects. People pick up the projects they want to work on, with myself saying yes or no or assigning projects I consider we must do first in last resort. We take care of having everyone working on all the technologies we have so there’s no point of failure in the team and everybody can learn and improve.

Each devops work alone on their projects. To avoid mistakes and share knowledge, nothing ships to production without a comprehensive code review so at least 2 people in the team are aware of what’s been done. That way, when someone is absent, the other person can take over the project and finish it. In addition to the code reviews, we take care about documentation, the minimum being operation instruction being added in every Zabbix alert.

Managing the ongoing events

Managing the ongoings is a tricky part because they often overlap with the planned projects and you can’t always postpone them. You’ll most probably take a few months before you’re able to do everything you planned to within a week.

During the day, incident management is the duty of the whole team, not only the oncall person. Oncall devops also have their projects to do so they can’t be assigned all the incidents. Moreover, some of us are more at ease with some technologies or part of the infrastructure and are more efficient when managing an incident. (Backend) developers are involved in the incidents management when possible. When pushing to production, they provide us with a HOWTO to fix most of the issues we’ll meet so we can add them to Zabbix alert messages.

We try to define a weekly contact who’ll manage the relationships with the development team so we’re not disturbed 10 times a day and won’t move without a Jira ticket number. Then, the task is prioritised in the current sprint or another one, depending on the emergency. When managing relationships with the development teams, it’s important to remember that “no” is an acceptable answer if you explain why. The BOFH style is the best way to be hated by the whole company, and that’s not something you want, do you?

In any case, we always provide the demander with an ETA so they know when they can start working. If the project is delayed, we communicate about the delay as well.

When you have no one left

Building a new team from scratch because everyone has left the building before you join is a rewarding and exciting task, except you can’t stop the company’s infrastructure while you’re recruiting.

During the hiring process, which can take up to 6 months, I rely on external contractors. I hire experienced freelancers to help me fix and build the most important things. Over the year, I’ve built a good address book of freelancers skilled with specific technologies such as FreeBSD, database management or packaging so I always work with people I’ve worked with, or people who’ve worked with people I trust.

I also rely on vendors consulting and support to help on technologies I don’t know. They teach me a lot and help fixing the most important issues. When I had to take over a massive Galera cluster, I relied on Percona support during the 6 first months so we’re now able to operate it fully.

Finally, we work a lot with the developers who wrote the code we operate. That’s an important part since they know most of the internals and traps of the existing machines. It also allows to create a deep link with the team we’re going to work the most with.

Recruiting the perfect team

Recruiting the perfect devops team is a long and difficult process, even more when you have to build a small team. When looking for people, I look for a few things:

Complementary and supplementary skills: a small team can’t afford having single points of failure, so we need at least 2 people knowing the same technology, at least a bit when possible. We also look for people knowing other technologies, whether or not we’ll deploy them someday. Having worked on various technologies give you a great insight on the problem you’ll encounter when working on similar ones.

Autonomy and curiosity: our way of working requires people to be autonomous and not wait until we help them when they’re blocked. I refuse to micro manage people and ask them what they’re doing every hour. They need to be honest enough to say “I don’t know” or “I’m stuck” before the projects delays go out of control.

Knowledge of the technologies in place and fast learners: building a team from scratch on an existing platform requires to learn fast how to operate and troubleshoot it. Having an experience in some if the technologies in place is incredibly precious and limits either the number of incidents or their length. Since hiring people who know all the technologies in place is not possible, having fast learners is mandatory so they can operate them quickly. Being able to read the code is a plus I love.

Indeed, every medal has two sides, and these people are both expensive and hard to find. It means you need to feed them enough to keep them on the long term.

Knowing who are your clients

The first thing before starting to communicate is to understand who your client, the one you can get satisfaction metrics from, is. Working in a B2B company, I don’t have a direct contact with our clients. It means my clients are the support people, sales persons, project managers, and the development team. If they’re satisfied, then you’ve done your job right.

This relationship is not immutable and you might reconsider it after a while. Sometimes, acting like a service provider for the development team does not work and you’ll have to create a deeper integration. Or, on the contrary, take your distance if they prevent you from doing your job correctly, but that’s something you need time to know.

Communication and reporting

Communication within the company is critical, even more when the infrastructure is considered a source of problems.

Unify the communication around one person, even more when managing incidents. We use a dedicated Slack channel to communicate about incidents, and only the infrastructure manager, or the person oncall during the night and weekend communicates there. That way, we avoin conflicting messages with someone saying the incident is over while it’s not totally over. This also requires a good communication within the team.

Don’t send alarming messages. Never. But be totally transparent with your management so they can work on a communication when the shit really hits the fan, which might happen. This might mean they’ll kick you in the butt if you’ve screwed up, but at least they’re prepared.

Finally, we always give an ETA when communicating about an incident, along with a precise functional perimeter. “A database server has crashed” has no meaning if you’re not in the technical field, “People can’t login anymore” does. And remember that “I don’t have an ETA yet” is something people can hear.

We do a 3 slides weekly reporting with the most important elements:

  • KPIs: budget, uptime, number of incidents, evolution of the number of oncall interventions.
  • Components still at risk (a lot in the beginning).
  • Main projects status and ETA.

Discovering the platform

So you’re in and it’s time for the things to get real. Here are a few things I use to discover the platform I’ll have to work on.

The monitoring: it’s the most useful thing to know about the servers and services you operate. It also provides a useful incident log so you know what breaks the most. Unfortunately, I’ve realised that the monitoring is not always as complete as it should and you might get some surprises.

The hypervisor: when running on the cloud or a virtualised infrastructure, the hypervisor is the best place to discover the infrastructure even though it won’t tell you which services are running, and which machines are actually used. On AWS, the security groups provide useful informations about the open ports, when it’s not 1–65534 TCP.

nmap + ssh + facter in a CSV: running nmap with OS and service discovery on your whole subnet(s) is the most efficient discovery way I know. It might provide some surprises as well: I once had a machine with 50 internal IP addresses to run a proxy for 50 geo located addresses! Be careful too, facter does not return the same information on Linux and FreeBSD.

tcpdump on the most central nodes: running tcpdump and / or iftop on the most central nodes allows a better comprehension of the networking flows and service communication within your infrastructure. If you run internal and external load balancers, they’re the perfect place to snif the traffic. Having a glance at their configuration also provides helpful information.

Puppet / Ansible: when they exist, automation infrastructure provide a great insight on the infrastructure. However, from experience, they’re often incomplete, messy as hell and outdated. I remember seeing the production infrastructure running on the CTO personal Puppet environment. Don’t ask why.

The great old ones: people working in the tech team for a while often have a deep knowledge of the infrastructure. More than how it works, they provide useful information on why things have been done this way and why it will be a nightmare to change.

The handover with the existing team: if you’re lucky, you’ll be able to work with the team you’re replacing during 1 or 2 days. Focus on the infrastructure overview, data workflows, technologies you don’t know about and most common incidents. In the worst case, they’ll answer “I don’t remember” to every question you ask.

In the beginning

In the beginning, there was Jack, and Jack had a groove.
And from this groove came the groove of all grooves.
And while one day viciously throwing down on his box, Jack boldy declared,
“Let there be HOUSE!”, and house music was born.
“I am, you see,
I am the creator, and this is my house!
And, in my house there is ONLY house music.”

So, you’re in and you need to start somewhere, so here are a few tricks to make the first months easier.

Let the teams who used to do it manage their part of the infrastructure. It might not be state of the art system administration, but if it works, it’s OK and it lets you focus on what doesn’t work.

Create an inventory as soon as you can. Rationalise the naming of your machines so you can group them into clusters and later on, automate everything.

Restart every service one by one, under control to make sure they come back. Once, I had postfix configuration into a file, and the service had not be restarted for months. Another time, a cluster did not want to restart after a crash because the configuration files were referring servers that had be removed 1 year sooner.

Focus on what you don’t know but works, then look at what you know but needs fixes. The first time I took over a sysadmin-less infrastructure, I left the load balancers away because they were running smoothly, focusing on the always crashing PHP batches. A few weeks later, when both load balancers crashed at the same time, it took me 2 hours to understand how everything was working.

Automate on day one

In the beginning, you’ll have to do lots of repetitive tasks, so better start automating early.

If I have to do the same task 3 times, I’ve already lost my time twice.

The most repetitive thing you’ll have to do is deployment, so better start with them. We’re using an Ansible playbook triggered by a Jenkins build so the developers can deploy when they need without us. If I didn’t want, I could ignore how many deployments to production are done every day.

Speaking of involving the developers, ask the backend developers to provide the Ansible material they need to deploy what they ask you to operate. It’s useful both for them to ensure dev, production and theory are the same, and to know things will be deployed the way they want with the right library.

Giving some power to the development team does not mean leaving them playing in the wild. Grant them only the tools they need, for example using Jenkins builds or users with limited privileges through automated deployment.

Resist in hostile environments

Hostile environment: anything you don’t have at least an acceptable control on.

Developers designed servers are a nightmare to operate so better talk about them first. A developer designed server is a machine providing a full feature without service segregation. The processes, database, cache stack… runs on the same machine making them hard to debug and impossible to scale horizontally. And they take a long time to split. They need to be split into logical smaller (virtual) machines you can expand horizontally. It provides reliability, scalability but has an important impact on your network in general and on your IP addressing in particular.

Private clouds operated by a tier are another nightmare since you don’t control resources allocation. I once had a MySQL server that crashed repeatedly and couldn’t understand why. After weeks of searches and debugging, we realised the hosting company was doing memory ballooning since they considered we used too much memory. Ballooning is a technic that fills part of the virtual machine memory so it won’t try to use everything it’s supposed to have. When MySQL started to use more than half of the RAM it was supposed to have, it crashed because it didn’t have enough despite the operating system saying the contrary.

AWS is another hostile environment. Machines and storage might disappear anytime, and you can’t debug their NATted network. So you need to build your infrastructure for failure.

Write documentation early

Finally, let’s talk about documentation. Infrastructure documentation is often considered a burden, and with the infra as code fashion, automation scripts are supposed to be the real documentation. Or people tell “I’ll write documentation when I’ll have time, for now everything is on fire”.

Nothing’s more wrong (except running Docker in production). But yes, writing documentation takes time and organisation, so you need to iterate on it.

The tool is an critical part if you want your team to write documentation. I like using Git powered wiki using flat Markdown files like the ones on Github or Gitlab, but it does not fit everyone, so I often fallback to Confluence. Whatever the tool, ensure the documentation is not hosted on your infrastructure!

I usually start small, writing down operation instructions in the monitoring alert messages. It’s a good start and allows you to solve an incident without digging into the documentation looking for what you need. Then, we build infrastructure diagrams using tools like Omnigraffle on Mac, or Lucidchart in the browser. Then we write comprehensive documentation around those 2.


Well, that’s all folks, or most probably only the beginning of a long journey in turning a ruin into a resilient, blazing fast, scalable infrastructure around an awesome devops team. Don’t hesitate to share and comment if you liked this post.

© Synthesio 2016

Powered by Hugo & Kiss.