Inside Elasticsearch Shard

What is shard

To add data to Elasticsearch, we need an index — a place to store related data. In reality, an index is just a logical namespace that points to one or more physical shards. A shard is a low-level worker unit that holds just a slice of all the data in the index, and a shard is a single instance of Lucene, the Java libraries on which Elasticsearch is based. Although documents are stored and indexed in shards, during search, our applications talks to an index instead of a shard.

Shards are how Elasticsearch distributes data around the cluster. Documents are stored in shards, and shards are allocated to nodes in the cluster. As cluster grows or shrinks, Elasticsearch will automatically migrate shards between nodes so that the cluster remains balanced.

A shard can be either a primary shard or a replica shard. Each document in your index belongs to a single primary shard, so the number of primary shards that you have determines the maximum amount of data that your index can hold. A replica shard is just a copy of a primary shard. Replicas are used to provide redundant copies of your data to protect against hardware failure, and to serve read requests like searching or retrieving a document.

When search with a query, on the high level there’re two phase happening in shards, query and fetch.

Query Phase

During the initial query phase, the query is broadcast to a shard copy (a primary or replica shard) of every shard in the index. Each shard executes the search locally and builds a priority queue of matching documents.

When a search request is sent to a node, that node becomes the coordinating node. It is the job of this node to broadcast the search request to all involved shards, and to gather their responses into a globally sorted result set that it can return to the client.

The query phase consists of the following three steps when the query specifies the pagination of the results:

  1. The client sends a search request to a node, which creates an empty priority queue of size from + size.

  2. The node forwards the search request to a primary or replica copy of every shard in the index. Each shard executes the query locally and adds the results into a local sorted priority queue of size from + size.

  3. Each shard returns the doc IDs and sort values of all the docs in its priority queue to the coordinating node, which merges these values into its own priority queue to produce a globally sorted list of results.

Fetch Phase

The query phase identifies which documents satisfy the search request, but we still need to retrieve the documents themselves.

The distributed phase consists of the following steps:

  1. The coordinating node identifies which documents need to be fetched and issues a multi GET request to the relevant shards*.

  2. Each shard loads the documents and enriches them, if required, and then returns the documents to the coordinating node.

  3. Once all documents have been fetched, the coordinating node returns the results to the client.

*The coordinating node first decides which documents actually need to be fetched. For instance, if our query specified { “from”: 90, “size”: 10 }, the first 90 results would be discarded and only the next 10 results would need to be retrieved. These documents may come from one, some, or all of the shards involved in the original search request. The coordinating node builds a multi-get request for each shard that holds a pertinent document and sends the request to the same shard copy that handled the query phase.

How shards make search near real-time

One thing about data model in shards is, the inverted index that is written to disk is immutable. Making it immutable has certain benefits, a major one is that there is no need for locking, if you never have to update the index, you never have to worry about multiple processes trying to make changes at the same time. And as long as there is enough space in the filesystem cache, most reads will come from memory instead of having to hit disk. This provides a big performance boost.

Since the inverted index is immutable, the next problem that needed to be solved is how to make an inverted index updatable. In ElasticSearch the solution is using more than one index. Instead of rewriting the whole inverted index, it add new supplementary indices to reflect more-recent changes. Each inverted index can be queried in turn — starting with the oldest — and the results combined. For Lucene, a segment is an inverted index, and a shard is a collection of segments plus a commit point, that is a file that lists all known segments. When Elasticsearch searches an index, it sends the query out to a copy of every shard that belongs to the index, and then reduces the per-shards results to a global result set. With the development of per-segment search, the delay between indexing a document and making it visible to search dropped dramatically.

Then the bottleneck is the disk. Commiting a new segment to disk requires an fsync to ensure that the segment is physically written to disk and that data will not be lost if there is a power failure. But an fsync is costly; it cannot be performed every time a document is indexed without a big performance hit. So ElasticSearch reduces fsync times, the new segment is written to the filesystem cache first and only later is it flushed to disk. But once a file is in the cache, it can be opened and read, just like any other file.

In Elasticsearch, this lightweight process of writing and opening a new segment is called a refresh. By default, every shard is refreshed automatically once every second. This is why we say that Elasticsearch has near real-time search: document changes are not visible to search immediately, but will become visible within 1 second.

Reference

Clinton Gormley and Zachary Tong. 2015. Elasticsearch: The Definitive Guide (1st. ed.). O’Reilly Media, Inc.

Chuanrong Li

Read more posts by this author.