Skip to main content

Chaos Testing a Distributed System with Jepsen — Part III

Serdar Serdar Benderli

Header (9)

Many thanks to Josh Kessler, the Data Layer team, Antonio Andrade, and Michael Crawford for their help in making this blog possible.

“We adore chaos because we love to produce order.” M. C. Escher

This is the third and final part in a series that explores running chaos tests against a distributed system using Jepsen. In Part I, we gave a brief introduction about the system-under-test (SUT), why we are doing chaos testing, and why we chose Jepsen as a chaos testing tool. In Part II, we talked about a few of the hurdles we faced and how we overcame them. In this final part, we will discuss the consistency problems we found and how we resolved those problems.

The “Model”

Before we can analyze our system for consistency and linearizability, we need to give Jepsen a model of our system. Under a linearizable data store, these two rules should hold true:

  1. A read that starts before a write ends can read either a previous write, or the current write.
  2. A read that starts after a write ends must read that write or a later write.

The model is an in-memory representation of the SUT and defines the operations that you are doing in your SUT, e.g. add, delete, update, etc. A randomly generated sequence of allowed operations are then applied both to the SUT and the model so that Jepsen can keep track of the expected state of the SUT and check whether these two constraints hold true throughout the test.

To simplify things, we are going to use our system as a simple key-value store: a single key that points to a single integer value (e.g “invoice”=1). In other words, throughout this test we will be changing and reading the value of a single key in our system over and over again. This simplification makes it much easier to model the system. Jepsen provides an existing model for such a data store called a Register. Think of a register as a single key which holds a single value and supports just two operations: read and write.

Although our SUT can support arbitrarily complex data, proving linearizability for a single key-value pair is the first step, and as we will see, it is enough to expose major flaws of a distributed system such as failover & leadership election, client-side request routing, and data consistency.

Chaos Time!

Jepsen supports many different kinds of chaos events out of the box. We will now discuss each chaos event we implemented, followed by the problems it helped us discover.

1. Random Partitions

This chaos event randomly partitions a cluster into two halves for five seconds using the built-in random-halves partitioner. Under the hood, Jepsen uses iptables rules to isolate nodes from one anotherIn a five-node cluster, a random-halves partition will inevitably create a majority (three or four nodes), and a minority (two or one nodes). If the leader remains in the majority, nothing will change after the partition. If the leader ends up in the minority, it will step down, and the majority will elect a new leader. During a leadership change, we would expect some writes to fail as the client figures out the new leader.

Just with this chaos event alone, we discovered numerous problems. Let’s talk about what they were and how we fixed them.

Lots of failed writes cause messy timelines

When the leader node goes down, writes may fail due to one of two exceptions:

  1. No leader found because the cluster has not yet elected a new leader.
  2. Replica can’t write because the previous leader is no longer the leader.

By default, Jepsen considers the result of failed writes as indeterminate because it cannot know whether and when the state of the system will change. Consequently, Jepsen timelines show the writes as lasting until the end of the test, as shown in the figure below:

Failed writes take up their own columns as Jepsen tries to display indeterminate operations

The reason there are many columns is because these timelines are visual aids for verifying linearizability. A write extending all the way to the end of the test is a visual representation of the fact that Jepsen has to consider this value as a possible value for all following reads. Since it would be much more confusing to overlay subsequent operations by that worker thread in the same column, Jepsen timelines opt to allocate new columns every time there is an indeterminate operation.

As we said, the result of an indeterminate write operation means that Jepsen will have to consider the intended write value as a possibility for every read that follows the write until the end of the test. Yet we know that the two exceptions we just mentioned are definitive: the write certainly did not take place, and the state of the system is unchanged. The solution is to catch these exceptions and tell Jepsen to ignore them in the model. We followed the robustness section of the tutorial and added this bit of code:

(defrecord Client [conn]
  client/Client
  (open! [this test node]
    (assoc this :conn client))

  (setup! [this test])

  (invoke! [_ test op]
    (try+
        (case (:f op)
          :read
          ;code to read stuff
          :write
          ;code to write stuff
        )
        (catch com.appian.data.client.AppianException e
           (assoc op
               :type :fail
               :error (let [message (.getMessage e)] (cond
                   (str/starts-with? message "ERR-000") :no-leader
                   (str/starts-with? message "ERR-001") :replica-cant-write
                   :else message
               ))
           )
        )
    )
  )
  (teardown! [this test])

  (close! [_ test])); no need to close client until the end of test


The timelines look much better after this change:

Looking better…

But now we need to answer another question: why are there failed writes after the network partition is healed? Even if the leader was partitioned off, the majority should have quickly elected a new leader (definitely quicker than the 5 seconds that the partition lasts), and the new leader should have remained undisturbed after the minority joined back. Even though some writes might fail during the failover, we would not expect any failures after a leader is elected, and certainly not after the partition is healed.

When a node rejoins the cluster, the leader steps down!

After sleuthing through the logs, we found out that the majority partition was losing its leader when the minority rejoins! We had not come across this in our existing integration tests because we were not creating partitions. This was a strange and surprising result, and not one we expected to see.

We have chosen Raft as the consensus algorithm for leader election and use the etcd’s excellent Raft implementation. Our issue is related to the Raft concept of PreVote which you can read more about in the Raft thesis (section 9.6). When a minority cluster is partitioned away (two out of the five nodes in our case), they continue to hold failed elections, incrementing their election term every time. When they rejoin the cluster, the leader sees a node with a higher term and steps down. This is the reason why we observed failed writes after a partition was healed. Enabling PreVote in our Raft nodes forces them to not increment their term if the pre-election indicates that they would not be elected. One caveat is that this makes leadership election slightly slower, but in our case this was acceptable.

After this, we observed that leadership election only happens when the leader itself gets partitioned away in a minority, and a new leader is elected. If the leader never gets partitioned, there are no other leadership elections except the one at the beginning of the test.

2. Killing Nodes

With this event, Jepsen randomly kills nodes. This isn’t as straightforward as killing random docker containers though, since the Jepsen control node does not have that level of access in the worker nodes. Instead, you need to execute some shell command or script that kills your system on a given node, e.g `kill -9 <pid>`. We have scripts for starting and stopping the system, so the chaos event looks like this:

(defn node-killer
     "Responds to :start by randomly killing Appian nodes, and to :stop by healing them."
     []
     (nemesis/node-start-stopper
       random-nonempty-max-quorum-subset
       (fn start [test node] (ctrl/su (ctrl/exec (str home "/bin/stop.sh"))) [:killed node])
       (fn stop [test node] (ctrl/su (ctrl/exec (str home "/bin/start.sh"))) [:healed node])))

After running this setup for a while, we finally saw the output we both hoped and feared to see:

{:op
{:process 4,
:type :ok,
:f :read,
:value 4,
:index 24,
:time 454538710},
:model {:msg “can’t read 4 from register 3”}}]
Analysis invalid! (ノಥ益ಥ)ノ ┻━┻

Something happened at index 24: Jepsen was able to read four, but the last write was a three. How exciting!

Failed writes are read by the system

First, note the yellow box at the top left. This is the nemesis event which is configured to kill three nodes. At this point, Jepsen begins to randomly issue the kill commands to the three nodes, and it waits for one to come down before moving on to another. Not shown in the timeline is a second yellow box later on that indicates when the chaos event completes killing all three nodes.

Next, note that writes started failing, indicated by the pink boxes. Since writes are only processed by the leader, we can correctly assume that at this point the cluster lost its leader, either because the leader stepped down after quorum is lost (majority of nodes are down) or the just-killed node was the leader. All the while, the reads are successful because some replicas are still available.

Jepsen found this inconsistency: Worker four was able to read a value of four for this key, but the last successful write was three. The two previous writes that both tried to write four failed because the leader was unavailable. Since we were able to read four, one of those writes must have accidentally made it into the system! The logs further show that the leader was killed immediately after the first failed write request was sent.

We need a little more context before we can understand the issue. Appian uses Kafka as the transaction log of all operations that change the system state. If the timing is such that the leader successfully produces a message and appends it to the Kafka transaction log, but is killed before it can respond to the client, then the request will timeout and the client will throw an exception even though the write actually made it to Kafka. Once a message is in Kafka, it will be seen and applied to the system, changing the system state. Ultimately, there’s no guarantee that a timed-out write didn’t actually change the system state.

While we can definitively say that writes that fail with certain exceptions (e.g. unreachable) have not changed the system state, we cannot be certain about writes that end with other exceptions (e.g timeouts) — their results are indeterminate. In other words, the indeterminacy here is a fundamental characteristic of the transaction-log backed system, and there really isn’t a clean and easy server-side solution. The solution is for the client to deal with indeterminate calls appropriately. Here are a few options we discussed to address how we might handle this in Jepsen.

Option 1 — use :info annotation

The built-in :info result type tells Jepsen “the result of this operation was indeterminate.” There are a couple of downsides to this. The first is the timelines we saw earlier where Jepsen treats timed-out writes as on-going operations which last until the end of the test. This in turn makes the timelines hard to decipher. The other is exponentially longer linearizability analysis at the end of each test because the intended write value needs to be considered as a possibility for every read that follows.

Option 2 — retry indefinitely

Another option is retrying timed-out writes indefinitely until they succeed. When the system finally elects a new leader, one of the retries will eventually succeed. The write operation will last until a new leader is elected, and its result will be conclusive. In the final linearizability analysis, Jepsen will consider this intended write as a possible value for all reads that took place during this write. This is closest to how the production code would deal with this problem, although we certainly would not want infinite retries in production.

For the time being, we decided not to do anything about it in Jepsen because it is a rare occurrence. When it does happen, it is easy to diagnose the failure by observing the timeline. If this becomes a burden, we will most likely implement the first option and accept the difficulties of diagnosing issues due to the timelines. In production, the caller needs to either retry until success, or accept the fact that the failed write might have actually gone through.

3. Kafka Unreachable

The next chaos event we introduced was to partition off Kafka. We already mentioned that Jepsen is running with Kafka and Zookeeper containers on the side. What we didn’t mention is that Jepsen effectively has no knowledge of or control over these containers beyond startup. Therefore, it’s not possible for the control node to execute commands on the Kafka and Zookeeper containers. We considered a few alternative solutions to this problem and ultimately decided to block all traffic to Kafka on each of the nodes, effectively partitioning off Kafka. This can be accomplished easily with a couple of iptables rules, similar to how Jepsen creates partitions under the hood:

(defn kafka-blocker
     "Responds to :start by blocking all outgoing messages to kafka, to :stop by unblocking."
     []
     (nemesis/node-start-stopper
       all-nodes
       (fn start [test node]
         (ctrl/su (ctrl/exec :iptables :-A :OUTPUT :-d (get-ip-of-host "jepsen-kafka") :-j :DROP))
         (info "kafka blocked on " node)
         [:kafka-blocked-on node])
       (fn stop [test node]
         (ctrl/su (ctrl/exec :iptables :-D :OUTPUT :-d (get-ip-of-host "jepsen-kafka") :-j :DROP))
         (info "kafka unblocked on " node)
         [:kafka-unblocked-on node])
     )
)

After playing around with the duration of the chaos event and isolating it to run by itself, we came across a very interesting issue.

Duplicate messages in Kafka make the system unavailable for long periods

The symptom of the problem is that all writes start failing, and the system does not seem to recover. This is already worrisome, since we discovered it during tests where the Kafka block was the only chaos event — there are no killed nodes and no leadership changes. These failures don’t affect the consistency of the data store (since these duplicate messages will be ignored by the system), but they do cause significant downtime that extends far beyond the Kafka partition event. We have a five second recovery time objective (RTO) but we observed that the system can be unavailable for over two minutes!

To understand the problem, we first need a little more context about how the system handles writes. When a write request is received, the leader submits this transaction to the transaction log along with the expected offset. The expected offset is updated asynchronously on all nodes via a consumer process that is continuously reading the transaction log and is set to the last transaction’s offset + 1. When a transaction is written successfully, the leader receives the actual offset from Kafka and compares the two offsets. The write is only considered successful if the actual offset matches the expected offset. This mechanism of ensuring offsets protects us against split-brain scenarios where there are multiple leaders that can write, which can happen during partitions and leadership changes because Raft does not guarantee a single leader at all times. When the system is behaving normally, we expect the leader to be the only writer to the transaction log, meaning that the actual offsets will always be the same as the expected offsets. If the offsets do not match, it means that someone else is writing to the transaction log, which indicates a split-brain scenario. When messages with unexpected offsets end up in the transaction log, they are ignored by the system when consumed.

The bug occurs when a transaction is sent to Kafka during the period when messages to Kafka are dropped on all nodes. The call to the Kafka client (we use sarama) can hang anywhere from 30s to over two minutes (yikes!) before it reports that the message has finally been sent successfully. This lag is a function of the retry and timeout settings on the Kafka client. However, when the consumer process reads the Kafka transaction log, it sees that the same message has been written to Kafka twice with the same expected offset: Once at the expected offset, and once more at a later offset. This extraneous second message which by definition has an unexpected offset then causes subsequent writes to fail with an unexpected offset error. Meanwhile, these transactions for failed writes still end up in the transaction log with unexpected offsets. Because Jepsen is generating many concurrent writes, the transaction log gets polluted with many messages that have unexpected offsets. The background process that is constantly updating the expected offset is thus unable to catch up, and the expected offset value is therefore always lagging behind, preventing the system from recovering.

See the figure below for a timeline that illustrates the issue. The red periods on the timeline indicate when Kafka is partitioned off. On the left you have the producer, the writer that appends messages to the Kafka transaction log. On the right you have the consumer, the background process that is responsible for keeping replicas up to date with the transaction log. Arrows going into the timeline indicate write requests made to Kafka, and arrows coming out indicate Kafka’s response—green arrows indicate that the write was successful because it had the expected offset, red arrows indicate that the write failed because it had an unexpected offset. For simplification, we exclude query requests made by the consumer and only show arrows going into the consumer indicating a replayed message. Messages with unexpected offsets are ignored by the consumer.

Timeline of events

The system only recovers if the writes slow down and the consumer is given a chance to catch up, but this is not acceptable: the system should not be down when load is the highest as that is when uptime is needed the most. After discussing a few options, we settled on a two-part solution:

  1. Reduce the Kafka client timeout from 30 seconds to 10 seconds. This will make it more likely for the client to retry successfully during network flakiness. This doesn’t solve the issue, but it does make faster recovery more likely
  2. Apply back-pressure by pausing writes to prevent messages with unexpected offsets to pollute the transaction log. This allows the system to catch up with the transaction log. Instead of suffering a prolonged down-time by doing nothing, we suffer a much shorter down-time to heal. The writes are queued-up in the meantime to be processed once the consumer catches up.

After implementing both of these, we stopped seeing prolonged down-times during Kafka partitions. Even when a write makes it into the transaction log with the unexpected offset during the Kafka partition, the system heals quickly by briefly refusing writes.

4. Combining Chaos Events

At this point, we would like to run our tests with all these chaos events enabled, randomly occurring one after another. Luckily, Jepsen provides a pattern for combining chaos events, and there are some examples in the Jepsen repo: aerospikecockroach, and dgraph tests are particularly helpful. The pattern goes like this:

  1. Create a nemesis composite using nemesis/compose and provide a mapping of events to operations.
  2. Create a generator that generates the chaos events either randomly or deterministically.
  3. Define each one of your operations and what they do. Most of our events use the nemesis/node-start-stopper where we define a start (to start the chaos) and a stop (to heal the system) function.

In addition to the three chaos events described in the previous sections, we have also implemented the following chaos events:

  1. Partition a single node
  2. Create partition rings
  3. Kill individual components as opposed to the entire node
  4. Pause individual components
  5. Create CPU and Memory spikes using stress

Below is a snippet of code that shows the final generator:

 

(defn node-killer
     "Responds to :start by killing nodes on random nodes and to :stop by healing."
     []
     (nemesis/node-start-stopper
       random-nonempty-max-quorum-subset
       (fn start [test node] (ctrl/su (ctrl/exec (str home "/bin/stop.sh"))) [:killed node])
       (fn stop [test node] (ctrl/su (ctrl/exec (str home "/bin/start.sh"))) [:healed node])))

(defn kafka-blocker
     "Responds to :start by blocking all outgoing messages to kafka, to :stop by unblocking."
     []
     (nemesis/node-start-stopper
       all-nodes
       (fn start [test node]
         (ctrl/su (ctrl/exec :iptables :-A :INPUT  :-d (get-ip-of-host "jepsen-kafka") :-j :DROP))
         (ctrl/su (ctrl/exec :iptables :-A :OUTPUT :-d (get-ip-of-host "jepsen-kafka") :-j :DROP))
         (info "kafka blocked on " node)
         [:kafka-blocked-on node])
       (fn stop [test node]
         (ctrl/su (ctrl/exec :iptables :-A :INPUT  :-d (get-ip-of-host "jepsen-kafka") :-j :DROP))
         (ctrl/su (ctrl/exec :iptables :-D :OUTPUT :-d (get-ip-of-host "jepsen-kafka") :-j :DROP))
         (info "kafka unblocked on " node)
         [:kafka-unblocked-on node])
     )
)

(defn memory-monster
      "Responds to :start by limiting memory on random nodes and to :stop by removing the memory restriction
      (if it hasn't timed out yet)."
      []
      (nemesis/node-start-stopper
        random-nonempty-max-quorum-subset
        (fn start [test node]
          (ctrl/su (ctrl/exec "stress" "--vm-bytes" "1g" "--vm-hang" "10" "--vm" "32" "--timeout" (randMax 30)))
          (info "memory limited on " node)
          [:limit-mem node])
        (fn stop [test node]
          (ctrl/su (ctrl/lit "pgrep stress | xargs kill -9 > /dev/null 2>&1 | true"))
          (info "memory freed on " node)
          [:unlimit-mem node])))

(defn cpu-monster
      "Responds to :start by hogging CPU on random nodes and to :stop by removing the CPU restriction
      (if it hasn't timed out yet)."
      []
      (nemesis/node-start-stopper
        random-nonempty-max-quorum-subset
        (fn start [test node]
          (ctrl/su (ctrl/exec "stress" "--cpu" "32" "--timeout" (randMax 30)))
          (info "cpu limited on " node)
          [:limit-cpu node])
        (fn stop [test node]
          (ctrl/su (ctrl/lit "pgrep stress | xargs kill -9 > /dev/null 2>&1 | true"))
          (info "cpu freed on " node)
          [:unlimit-cpu node])))

(defn component-killer
      "Responds to :start by killing component on random nodes and to :stop by healing it."
      []
      (nemesis/node-start-stopper
        random-nonempty-max-quorum-subset
        (fn start [test node]
          (ctrl/su (ctrl/exec (str home "/bin/kill-component.sh")))
          (info "component killed on " node)
          [:component-killed-on node])
        (fn stop [test node]
          (ctrl/su (ctrl/exec (str home "/bin/start.sh")))
          (info "component healed on " node)
          [:component-restarted-on node])))

(defn component-pauser
      "Responds to :start by pausing component on random nodes, :stop resumes the component."
      []
      (nemesis/node-start-stopper
        random-nonempty-max-quorum-subset
        (fn start [test node]
          (pause-processes (get-pids "component"))
          (info "component paused on " node)
          [:component-paused-on node])
        (fn stop [test node]
          (resume-processes (get-pids "component"))
          (info "component resumed on " node)
          [:component-resumed-on node])))

(defn full-nemesis
     "Creates a nemesis that creates the chaos events defined below."
     []
     (nemesis/compose
       {
        {:start-partition          :start
         :stop-partition           :stop}  (nemesis/partition-random-halves)
       
        {:start-partition-halves   :start
          :stop-partition-halves   :stop}  (nemesis/partition-random-halves)

        {:start-partition-ring     :start
          :stop-partition-ring     :stop}  (nemesis/partition-majorities-ring)
            
        {:kill-node                :start
         :heal-node                :stop}  (node-killer)
        
        {:kill-component           :start
         :heal-component           :stop}  (component-killer)
        
        {:pause-component          :start
         :resume-component         :stop}  (component-pauser)

        {:kafka-block              :start
         :kafka-unblock            :stop}  (kafka-blocker)
            
        {:limit-mem                :start
         :unlimit-mem              :stop}  (memory-monster)

        {:limit-cpu                :start
         :unlimit-cpu              :stop}  (cpu-monster)
       }))

(defn nemesis-events
     "Creates a randomized list of nemesis events that will run during the test. Disruptions last 1-30 seconds, and system is undisrupted for 5 seconds afterwards."
     []
     (let [events [
                   [{:type :info :f :start-partition}
                     (gen/sleep (+ 1 (rand-int 30)))
                    {:type :info :f :stop-partition}
                     (gen/sleep 5)]

                   [{:type :info :f :start-partition-halves}
                     (gen/sleep (randMax 30))
                    {:type :info :f :stop-partition-halves}
                     (gen/sleep 5)]

                   [{:type :info :f :start-partition-ring}
                     (gen/sleep (randMax 30))
                    {:type :info :f :stop-partition-ring}
                     (gen/sleep 5)]
                       
                   [{:type :info :f :kill-node}
                     (gen/sleep (+ 1 (rand-int 30)))
                    {:type :info :f :heal-node}
                     (gen/sleep 5)]
                       
                   [{:type :info :f :kill-component}
                     (gen/sleep (randMax 30))
                    {:type :info :f :heal-component}
                     (gen/sleep (randMax 30))]
                       
                   [{:type :info :f :pause-component}
                     (gen/sleep (randMax 30))
                    {:type :info :f :resume-component}
                     (gen/sleep 5)]

 			    [{:type :info :f :kafka-block}
                     (gen/sleep (randMax 30))
                    {:type :info :f :kafka-unblock}
                     (gen/sleep 5)]
                   
                   [{:type :info :f :limit-mem}
                    {:type :info :f :unlimit-mem}
                     (gen/sleep 5)]

                   [{:type :info :f :limit-cpu}
                    {:type :info :f :unlimit-cpu}
                     (gen/sleep 5)]
                 ]]
     (apply concat (repeatedly 100 #(rand-nth events)))))

(defn full-generator
     "Constructs a generator for the different types of nemesis operations."
     [opts]
     (->> (gen/mix [r w])
          (gen/stagger 1/10)
          (gen/nemesis (gen/seq (nemesis-events)))
          (gen/time-limit (:time-limit opts))))


The above generator generates a random list of 100 events and runs the test until time-limit is reached.

After months of running tests with this setup, we have not observed any additional problems.

Conclusion

The project we worked on the last couple of quarters was a first in Appian in a number of ways. We were the first team to use Raft for leadership election, and we were the first team to use a comprehensive chaos-testing framework like Jepsen. Throughout this journey, we uncovered some interesting and serious issues in our distributed system. Many times, we were amazed at the kinds of issues Jepsen helped us discover, because we simply could not have found them any other way except in production. Ultimately, we became a lot more confident in the system we built, not just because we observed that it works under extreme stress, but also because the very act of troubleshooting these issues led us to a deeper understanding of the system. As a result of all these fixes and improvements, we proved to ourselves and to the stakeholders that the system is robust enough to safely hold customer data even under extreme stress.

As a bonus, we have also improved our logging greatly because of the pains we faced during troubleshooting. By the end, everyone on the team became an expert at reading the myriad of logs and timelines, preparing us to better handle customer issues of the future. As part of Appian’s mission to ensure the highest standards on data integrity, we will continue to improve the Jepsen infrastructure and tests.

We hope that this post inspired you to start your own journey of chaos and consistency, and that the tips and the snippets of code included here will help you save time as you face the difficulties of chaos testing your distributed system.

Serdar

Written by

Serdar Benderli

Serder is a Technical Mentor of Software Development at Appian.