<Designing Data-Intensive Application> Notes

Chapter 1 - Reliable, Scalable, and Maintainable Applications

Typical data-intensive application functions:

  • Store data so that they, or another application, can find it again later (databases)
  • Remember the result of an expensive operation, to speed up reads (caches)
  • Allow users to search data by keyword or filter it in various ways (search indexes)
  • Send a message to another process, to be handled asynchronously (stream processing)
  • Periodically crunch a large amount of accumulated data (batch processing)

An application has to meet functional requirements(e.g. allowing data to be stored, retrieved, searched, and processed in various ways) and nonfunctional requirements (e.g. security, reliability, compliance, scalability, compatibility, and maintainability).

  • Reliability: making systems work correctly, even when faults occur.
  • Scalability: having strategies for keeping performance good, even when load increases.
  • Maintainability: making life better for the engineering and operations teams who need to work with the system, including operability, simplicity, evolvability.

Chapter 2 - Data Models and Query Languages

Most applications are built by layering one data model on top of another, each layer hides the complexity of the layers below it by providing a clean data model.

Relational Model: data is organized into relations (called tables in SQL), where each relation is an unordered collection of tuples (rows in SQL).

Birth of NoSQL (Not Only SQL)

  • A need for greater scalability than relational databases can easily achieve, including very large datasets or very high write throughput
  • A widespread preference for free and open source software over commercial database products
  • Specialized query operations that are not well supported by the relational model
  • Frustration with the restrictiveness of relational schemas, and a desire for a more dynamic and expressive data model

NoSQL have diverged in two main directions:

  1. Document databases target use cases where data comes in self-contained documents and relationships between one document and another are rare. If the data in your application has a document-like structure (e.g. LinkedIn profiles), it’s good to use a document model.

  2. Graph databases go in the opposite direction, targeting use cases where anything is potentially related to everything. If the connections within your data become more complex, it becomes more natural to start modeling your data as a graph.

Chapter 3 - Storage and Retrieval

Key-value stores are quite similar to the dictionary type in programming languages and is usually implemented as a hash map.

Storage engines: append data to a file, break the log into segments of a certain size by closing a segment file when it reaches a certain size, and making subsequent writes to a new segment file.

In real implementation, issues including: file format, deleting records, crash recovery, partially written records, concurrency control.

Sorted String Table (SSTable) requires that each key only appears once within each merged segment file. We can combine segments using mergesort algorithm, this idea refers to Log-Structured Merge-Tree (LSM-Tree), which is keeping a cascade of SSTables that are merged in the background.

The most widely used indexing structure is the B-tree.

Storage engines mainly from two schools of thought:

  • The log-structured, which only permits appending to files and deleting obsolete files, but never updates a file that has been written. Bitcask, SSTables, LSM-trees, LevelDB, Cassandra, HBase, Lucene, and others belong to this group.

  • The update-in-place, which treats the disk as a set of fixed-size pages that can be overwritten. B-trees are the biggest example of this philosophy, being used in all major relational databases and also many nonrelational ones.

Inmemory databases keep data entirely in memory, they can be faster because they can avoid the overheads of encoding in-memory data structures in a form that can be written to disk. And they can provide data models that are difficult to implement with disk-based indexes.


An application typically looks up a small number of records by some key, using an index. Records are inserted or updated based on the user’s input. The access pattern became known as online transaction processing (OLTP).

An analytic query needs to scan over a huge number of records, only reading a few columns per record, and calculates aggregate statistics (such as count, sum, or average) rather than returning the raw data to the user. This different pattern has been called online analytic processing (OLAP).

Property Transaction processing systems (OLTP) Analytic systems (OLAP)
Main read pattern Small number of records per query, fetched by key Aggregate over large number of records
Main write pattern Random-access, low-latency writes from user input Bulk import (ETL) or event stream
Primarily used by End user/customer, via web application Internal analyst, for decision support
What data represents Latest state of data (current point in time) History of events that happened over time
Dataset size Gigabytes to terabytes Terabytes to petabytes

Data Warehousing

Data Warehouse is a separate database that analysts can query to their hearts’ content, without affecting OLTP operations. It contains a read-only copy of the data in all the various OLTP systems in the company.

Data is extracted from OLTP databases (using either a periodic data dump or a continuous stream of updates), transformed into an analysis-friendly schema, cleaned up, and then loaded into the data warehouse. This process of getting data into the warehouse is known as Extract–Transform–Load (ETL).

Stars and Snowflakes

Star Schema: the fact table is in the middle, surrounded by its dimension tables; the connections to these tables are like the rays of a star

Snowflake Schema: dimensions are further broken down into subdimensions, more normalized than star schemas

Column Compression

The sequences of values for each column often look quite repetitive, which is a good sign for compression. We can take a column with n distinct values and turn it into n separate bitmaps: one bitmap for each distinct value, with one bit for each row. The bit is 1 if the row has that value, and 0 if not.

For data warehouse queries that need to scan over millions of rows, a big bottleneck is the bandwidth for getting data from disk into memory. Another bottleneck is the efficiently using the bandwidth from main memory into the CPU cache.

Chapter 4 - Encoding and Evolution

Rolling upgrades allow new versions of a service to be released without downtime (thus encouraging frequent small releases over rare big releases) and make deployments less risky.

In order for the system to continue running smoothly, we need to maintain compatibility in both directions:

  • Backward compatibility Newer code can read data that was written by older code.
  • Forward compatibility Older code can read data that was written by newer code.

Language-specific format encoding is often tied to a particular programming language, and reading the data in another language is very difficult.

JSON is less verbose than XML, but both still use a lot of space compared to binary formats.

Thrift and Protocol Buffers each come with a code generation tool that takes a schema definition like the ones shown here, and produces classes that implement the schema in various programming languages.

Binary encodings have nice properties:

  • They can be much more compact than the various “binary JSON” variants, since they can omit field names from the encoded data

  • The schema is a valuable form of documentation, and because the schema is required for decoding, you can be sure that it is up to date

  • Keeping a database of schemas allows you to check forward and backward compatibility of schema changes, before anything is deployed.

  • For users of statically typed programming languages, the ability to generate code from the schema is useful, since it enables type checking at compile time

Schema evolution allows the same kind of flexibility as schemaless / schema-on-read JSON databases provide.

REST is not a protocol, but rather a design philosophy that builds upon the principles of HTTP. It emphasizes simple data formats, using URLs for identifying resources and using HTTP features for cache control, authentication, and content type negotiation.

Chapter 5 - Replication

Replication means keeping a copy of the same data on multiple machines that are connected via a network, and can serve several purposes:

  • High availability

    • Keeping the system running, even when one machine (or several machines, or an entire datacenter) goes down
  • Disconnected operation

    • Allowing an application to continue working when there is a network interruption
  • Latency

    • Placing data geographically close to users, so that users can interact with it faster
  • Scalability

    • Being able to handle a higher volume of reads than a single machine could handle, by performing reads on replicas

Each node that stores a copy of the database is called a replica, and we want to ensure that all the data ends up on all the replicas. Since every write to the database needs to be processed by every replica; otherwise, the replicas would no longer contain the same data.

In leader-based replication, read can be queried from either the leader or any of the followers, but write must be sent to the leader, and followers take the log from the leader and updates its local copy of the database.

synchronous replication asynchronous replication
Definition followers confirmed its update receiving, then report success to the user leader sends the message, doesn’t wait for responses from followers
Pros follower is guaranteed to have an up-to-date copy of the data that is consistent with the leader leader can continue processing writes
Cons follower doesn’t respond, the write cannot be processed (leader block all writes and wait all replicas) write is not guaranteed to be durable

Sometimes nodes may go down, so here’s some approach for handling node outrages: Follower failure: simply recover from its log

Leader failure: one of the followers needs to be promoted to be the new leader, clients need to be reconfigured to send their writes to the new leader, and the other followers need to start consuming data changes from the new leader. This process is called failover.

There are several way to implemente replication logs. In the simplest case, the leader logs every write request (statement) that it executes and sends that statement log to its followers. This is called statement-based replication.
Write-ahead log (WAL) shipping: the log is an append-only sequence of bytes containing all writes to the database.
Logical (row-based) log replication uses different log formats for replication and for the storage engine, which allows the replication log to be decoupled from the storage engine internals.

Leader-based replication has one major downside: there is only one leader, and all writes must go through it. Besides this, there are other approaches:

  • Single-leader replication (Leader-based replication)

    • Clients send all writes to a single node (the leader), which sends a stream of data change events to the other replicas (followers). Reads can be performed on any replica, but reads from followers might be stale.
  • Multi-leader replication

    • Clients send each write to one of several leader nodes, any of which can accept writes. The leaders send streams of data change events to each other and to any follower nodes.
  • Leaderless replication

    • Clients send each write to several nodes, and read from several nodes in parallel in order to detect and correct nodes with stale data.

Below are some consistency models which are helpful for deciding how an application should behave under replication lag:

  • Read-after-write consistency

    • Users should always see data that they submitted themselves.
  • Monotonic reads

    • After users have seen the data at one point in time, they shouldn’t later see the data from some earlier point in time.
  • Consistent prefix reads

    • Users should see the data in a state that makes causal sense: for example, seeing a question and its reply in the correct order.

Chapter 6 - Partitioning

For very large datasets, or very high query throughput, replication is not sufficient: we need to break the data up into partitions, also known as sharding.

Partitioning is usually combined with replication so that copies of each partition are stored on multiple nodes.

Our goal with partitioning is to spread the data and the query load evenly across nodes. If the partitioning is unfair, so that some partitions have more data or queries than others, we call it skewed. A partition with disproportionately high load is called a hot spot.

Key range partitioning

One way of partitioning is to assign a continuous range of keys (from some minimum to some maximum) to each partition,but the downside of key range partitioning is that certain access patterns can lead to hot spots.

Hash partitioning

Because of this risk of skew and hot spots, many distributed datastores use a hash function to determine the partition for a given key. However, by using the hash of the key for partitioning we lose a nice property of key-range partitioning: the ability to do efficient range queries.

The situation becomes more complicated if secondary indexes are involved. A secondary index usually doesn’t identify a record uniquely but rather is a way of searching for occurrences of a particular value. The problem with secondary indexes is that they don’t map neatly to partitions.

We can partition secondary indexes by document: each partition maintains its own secondary indexes, covering only the documents in that partition. The document-partitioned index is known as a local index. Reading would become very expensive: you need to send the query to all partitions, and the approach is called scatter / gather.

In the other way, we can construct a global index that covers data in all partitions, we call this kind of index term-partitioned, because the term we’re looking for determines the partition of the index. It makes reads more efficient but writes are slower and more complicated.

Rebalancing Partitions

The process of moving load from one node in the cluster to another is called rebalancing.

The approach of fixed number of partitions creates many more partitions than there are nodes, and assign several partitions to each node, if a node is added to the cluster, the new node can steal a few partitions from every existing node until partitions are fairly distributed once again.

Fully automated rebalancing can be convenient, but it can be unpredictable. It can be a good thing to have a human in the loop for rebalancing.

Chapter 7 - Transactions

A transaction is a way for an application to group several reads and writes together into a logical unit. It aims to simplify the programming model for applications accessing a database.


ACID stands for Atomicity, Consistency, Isolation, and Durability, they are the safety guarantees provided by transactions.

Systems that do not meet the ACID criteria are sometimes called BASE, which stands for Basically Available, Soft state, and Eventual consistency.

Atomic refers to something that cannot be broken down into smaller parts.

Consistency is that you have certain statements about your data (invariants) that must always be true.

Isolation means that concurrently executing transactions are isolated from each other.

Durability is the promise that once a transaction has committed successfully, any data it has written will not be forgotten.

Weak Isolation Levels

If two transactions don’t touch the same data, they can safely be run in parallel. Serializable isolation has a performance cost, and many databases don’t want to pay that price. It’s therefore common for systems to use weaker levels of isolation, which protect against some concurrency issues, but not all.

The most basic level of transaction isolation is read committed. It makes two guarantees:

  1. When reading from the database, you will only see data that has been committed (no dirty reads).

  2. When writing to the database, you will only overwrite data that has been committed (no dirty writes).

Databases prevent dirty writes by using row-level locks: when a transaction wants to modify a particular object (row or document), it must first acquire a lock on that object.

Snapshot isolation: each transaction reads from a consistent snapshot of the database—that is, the transaction sees all the data that was committed in the database at the start of the transaction. Even if the data is subsequently changed by another transaction, each transaction sees only the old data from that particular point in time.


The simplest way of avoiding concurrency problems is to remove the concurrency entirely: to execute only one transaction at a time, in serial order, on a single thread. But limits the transaction throughput of the database to the speed of a single CPU core on a single machine.

  • Every transaction must be small and fast, because it takes only one slow transaction to stall all transaction processing.

  • It is limited to use cases where the active dataset can fit in memory. Rarely accessed data could potentially be moved to disk, but if it needed to be accessed in a single-threaded transaction, the system would get very slow.

  • Write throughput must be low enough to be handled on a single CPU core, or else transactions need to be partitioned without requiring cross-partition coordination.

  • Cross-partition transactions are possible, but there is a hard limit to the extent to which they can be used.

Two-Phase Locking

Two-phase locking makes the lock requirements much stronger. Several transactions are allowed to concurrently read the same object as long as nobody is writing to it. But as soon as anyone wants to write (modify or delete) an object, exclusive access is required.

Transaction throughput and response times of queries are significantly worse under two-phase locking than under weak isolation.

Serializable snapshot isolation (SSI)

It uses an optimistic approach, allowing transactions to proceed without blocking. When a transaction wants to commit, it is checked, and it is aborted if the execution was not serializable.

Chapter 8 - The Trouble with Distributed Systems

Partial Failures

In a distributed system, there may well be some parts of the system that are broken in some unpredictable way, even though other parts of the system are working fine. This is known as a partial failure

Problems can occur almost anywhere in distributed systems, including:

  • Whenever you try to send a packet over the network, it may be lost or arbitrarily delayed. Likewise, the reply may be lost or delayed, so if you don’t get a reply, you have no idea whether the message got through.

  • A node’s clock may be significantly out of sync with other nodes (despite your best efforts to set up NTP), it may suddenly jump forward or back in time, and relying on it is dangerous because you most likely don’t have a good measure of your clock’s error interval.

  • A process may pause for a substantial amount of time at any point in its execution (perhaps due to a stop-the-world garbage collector), be declared dead by other nodes, and then come back to life again without realizing that it was paused.

Most systems don’t have an accurate mechanism of detecting whether a node has failed, so most distributed algorithms rely on timeouts to determine whether a remote node is still available. However, timeouts can’t distinguish between network and node failures.

Once a fault is detected, making a system tolerate it is not easy either: there is no global variable, no shared memory, no common knowledge or any other kind of shared state between the machines.

Chapter 9 - Consistency and Consensus


The response time of read and write requests is at least proportional to the uncertainty of delays in the network if you want linearizability.

Ordering Guarantees

There are deep connections between ordering, linearizability, and consensus.

Ordering helps preserve causality, causality defines a partial order, not a total order: some operations are ordered with respect to each other, but some are incomparable.
Causal consistency is the strongest possible consistency model that does not slow down due to network delays, and remains available in the face of network failures.
In many cases, systems that appear to require linearizability in fact only really require causal consistency, which can be implemented more efficiently.
In a distributed system, getting all nodes to agree on the same total ordering of operations is tricky.

Total order broadcast can be used to implement serializable transactions: if every message represents a deterministic transaction to be executed as a stored procedure, and if every node processes those messages in the same order, then the partitions and replicas of the database are kept consistent with each other. This principle is known as state machine replication.

Distributed Transactions and Consensus

Consensus is one of the most important and fundamental problems in distributed computing.

Chapter 10 - Batch Processing

Three different types of systems:

Services (online systems)
A service waits for a request or instruction from a client to arrive. When one is received, the service tries to handle it as quickly as possible and sends a response back. Response time is usually the primary measure of performance of a service, and availability is often very important (if the client can’t reach the service, the user will probably get an error message).

Batch processing systems (offline systems)
A batch processing system takes a large amount of input data, runs a job to process it, and produces some output data. Jobs often take a while (from a few minutes to several days), so there normally isn’t a user waiting for the job to finish. Instead, batch jobs are often scheduled to run periodically (for example, once a day). The primary performance measure of a batch job is usually throughput (the time it takes to crunch through an input dataset of a certain size).

Stream processing systems (near-real-time systems)
Stream processing is somewhere between online and offline/batch processing (so it is sometimes called near-real-time or nearline processing). Like a batch processing system, a stream processor consumes inputs and produces outputs (rather than responding to requests). However, a stream job operates on events shortly after they happen, whereas a batch job operates on a fixed set of input data. This difference allows stream processing systems to have lower latency than the equivalent batch systems.

The Unix Philosophy
  • A uniform interface

    share a uniform interface, so they can easily be plugged together.

  • Separation of logic and wiring

    Unix uses the standard input (stdin) and standard output (stdout)

  • Transparency and experimentation

MapReduce and Distributed Filesystems

MapReduce is a fairly blunt, brute-force, but surprisingly effective tool. HDFS is based on the shared-nothing principle, requires no special hardware, only computers connected by a conventional datacenter network.

To create a MapReduce job, implement two callback functions, the mapper and reducer:

  • Mapper

    The mapper is called once for every input record, and its job is to extract the key and value from the input record. For each input, it may generate any number of key-value pairs (including none). It does not keep any state from one input record to the next, so each record is handled independently.

  • Reducer

    The MapReduce framework takes the key-value pairs produced by the mappers, collects all the values belonging to the same key, and calls the reducer with an iterator over that collection of values. The reducer can produce output records.

Beyond MapReduce

Downsides of MapReduce:

  • A MapReduce job can only start when all tasks in the preceding jobs (that generate its inputs) have completed. Skew or varying load on different machines means that a job often has a few straggler tasks that take much longer to complete than the others. Having to wait until all of the preceding job’s tasks have completed slows down the execution of the workflow as a whole.

  • Mappers are often redundant: they just read back the same file that was just written by a reducer, and prepare it for the next stage of partitioning and sorting.

  • Storing intermediate state in a distributed filesystem means those files are replicated across several nodes, which is often overkill for such temporary data.

Dataflow engines

Several new execution engines for distributed batch computations were developed, the most well known of which are Spark, Tez, and Flink. There are various differences in the way they are designed, but they have one thing in common: they handle an entire workflow as one job, rather than breaking it up into independent subjobs.

Spark, Flink, and Tez avoid writing intermediate state to HDFS, so they take a different approach to tolerating faults: if a machine fails and the intermediate state on that machine is lost, it is recomputed from other data that is still available. Spark uses the resilient distributed dataset (RDD) abstraction for tracking the ancestry of data, while Flink checkpoints operator state, allowing it to resume running an operator that ran into a fault during its execution.

Graphs and Iterative Processing

Many graph algorithms are expressed by traversing one edge at a time, joining one vertex with an adjacent vertex in order to propagate some information, and repeating until some condition is met. It is possible to store a graph in a distributed filesystem, but this idea of “repeating until done” cannot be expressed in plain MapReduce, since it only performs a single pass over the data.

A similar idea of MapReduce is behind Pregel: one vertex can “send a message” to another vertex, and typically those messages are sent along the edges in a graph. In each iteration, a function is called for each vertex, passing it all the messages that were sent to it. In the Pregel model, a vertex remembers its state in memory from one iteration to the next, so the function only needs to process new incoming messages. This fault tolerance is achieved by periodically checkpointing the state of all vertices at the end of an iteration. Because the programming model deals with just one vertex at a time (sometimes called “thinking like a vertex”), the framework may partition the graph in arbitrary ways. As a result, graph algorithms often have a lot of cross-machine communication overhead, and the intermediate state (messages sent between nodes) is often bigger than the original graph.

Chapter 11 - Stream Processing

In general, a “stream” refers to data that is incrementally made available over time.

in streaming terminology, an event is generated once by a producer (also known as a publisher or sender), and then potentially processed by multiple consumers (subscribers or recipients). in a streaming system, related events are usually grouped together into a topic or stream.

A common approach for notifying consumers about new events is to use a messaging system: a producer sends a message containing the event, which is then pushed to consumers.

Direct network communication between producers and consumers

UDP multicast is widely used in the financial industry for streams such as stock market feeds, where low latency is important

Brokerless messaging libraries such as ZeroMQ and nanomsg take a similar approach, implementing publish/subscribe messaging over TCP or IP multicast.

StatsD and Brubeck use unreliable UDP messaging for collecting metrics from all machines on the network and monitoring them.

If the consumer exposes a service on the network, producers can make a direct HTTP or RPC request

Message brokers

Message broker (also known as a message queue) is essentially a kind of database that is optimized for handling message streams. When multiple consumers read messages in the same topic, two main patterns of messaging are used:

  • Load balancing

    Each message is delivered to one of the consumers, so the consumers can share the work of processing the messages in the topic.

  • Fan-out

    Each message is delivered to all of the consumers.

The two patterns can be combined: for example, two separate groups of consumers may each subscribe to a topic, such that each group collectively receives all messages, but within each group only one of the nodes receives each message.

A log is simply an append-only sequence of records on disk. Message broker can be based on log: a producer sends a message by appending it to the end of the log, and a consumer receives messages by reading the log sequentially. In order to scale to higher throughput than a single disk can offer, the log can be partitioned

Apache Kafka, Amazon Kinesis Streams, and Twitter’s DistributedLog are log-based message brokers. Google Cloud Pub/Sub is architecturally similar but exposes a JMS-style API rather than a log abstraction.

There has been growing interest in change data capture (CDC), which is the process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems. Essentially, change data capture makes one database the leader (the one from which the changes are captured), and turns the others into followers.

The principle of log compaction: the storage engine periodically looks for log records with the same key, throws away any duplicates, and keeps only the most recent update for each key.

Three types of joins that may appear in stream processes:

  • Stream-stream joins

    Both input streams consist of activity events, and the join operator searches for related events that occur within some window of time.

  • Stream-table joins

    One input stream consists of activity events, while the other is a database changelog. The changelog keeps a local copy of the database up to date. For each activity event, the join operator queries the database and outputs an enriched activity event.

  • Table-table joins

    Both input streams are database changelogs. In this case, every change on one side is joined with the latest state of the other side. The result is a stream of changes to the materialized view of the join between the two tables.

Chapter 12 - The Future of Data Systems

The core idea of the lambda architecture is that incoming data should be recorded by appending immutable events to an always-growing dataset. The lambda architecture proposes running two different systems in parallel: a batch processing system such as Hadoop MapReduce, and a separate streamprocessing system such as Storm.

Persistent local storage in the web browser and mobile apps have led to a renewed interest in offline-first applications that do as much as possible using a local database on the same device, without requiring an internet connection, and sync with remote servers in the background when a network connection is available.

Server-sent events (the EventSource API) and WebSockets provide communication channels by which a web browser can keep an open TCP connection to a server, and the server can actively push messages to the browser as long as it remains connected.

If you need stronger assurances of correctness, then serializability and atomic commit are established approaches, but they come at a cost: they typically only work in a single datacenter (ruling out geographically distributed architectures), and they limit the scale and fault-tolerance properties you can achieve.

Every system is built for a purpose; every action we take has both intended and unintended consequences.

Having privacy does not mean keeping everything secret; it means having the freedom to choose which things to reveal to whom, what to make public, and what to keep secret.

Chuanrong Li

Read more posts by this author.