Monday 5 March 2018

MongooDB Concept Part 5


7.5.2 Data Distribution Process

You will next look at how the data is distributed among the shards for the collections where sharding is enabled. In MongoDB, the data is sharded or distributed at the collection level. The collection is partitioned by the shard key.

7.5.2.1 Shard Key

Any indexed single/compound field that exists within all documents of the collection can be a shard key. You specify that this is the field basis which the documents of the collection need to be distributed. Internally, MongoDB divides the documents based on the value of the field into chunks and distributes them across the shards.
There are two ways MongoDB enables distribution of the data: range-based partitioning and hash-based partitioning.
Range-Based Partitioning
In range-based partitioning , the shard key values are divided into ranges. Say you consider a timestamp field as the shard key. In this way of partitioning, the values are considered as a straight line starting from a Min value to Max value where Min is the starting period (say, 01/01/1970) and Max is the end period (say, 12/31/9999). Every document in the collection will have timestamp value within this range only, and it will represent some point on the line.
Based on the number of shards available, the line will be divided into ranges, and documents will be distributed based on them.
In this scheme of partitioning, shown in Figure 7-17, the documents where the values of the shard key are nearby are likely to fall on the same shard. This can significantly improve the performance of the range queries.



A332296_1_En_7_Fig17_HTML.jpg
Figure 7-17.
Range-based partitioning
However, the disadvantage is that it can lead to uneven distribution of data, overloading one of the shards, which may end up receiving majority of the requests, whereas the other shards remain underloaded, so the system will not scale properly.
Hash-Based Partitioning
In hash-based partitioning , the data is distributed on the basis of the hash value of the shard field. If selected, this will lead to a more random distribution compared to range-based partitioning.
It’s unlikely that the documents with close shard key will be part of the same chunk. For example, for ranges based on the hash of the _id field, there will be a straight line of hash values, which will again be partitioned on basis of the number of shards. On the basis of the hash values, the documents will lie in either of the shards. See Figure 7-18.



A332296_1_En_7_Fig18_HTML.jpg
Figure 7-18.
Hash-based partitioning
In contrast to range-based partitioning, this ensures that the data is evenly distributed, but it happens at the cost of efficient range queries.
Chunks
The data is moved between the shards in form of chunks. The shard key range is further partitioned into sub-ranges, which are also termed as chunks. See Figure 7-19.



A332296_1_En_7_Fig19_HTML.jpg
Figure 7-19.
Chunks
For a sharded cluster, 64MB is the default chunk size. In most situations, this is an apt size for chunk slitting and migration.
Let’s discuss the execution of sharding and chunks with an example. Say you have a blog posts collection which is sharded on the field date. This implies that the collection will be split up on the basis of the date field values. Let’s assume further that you have three shards. In this scenario the data might be distributed across shards as follows:
  • Shard #1: Beginning of time up to July 2009
  • Shard #2: August 2009 to December 2009
  • Shard #3: January 2010 to through the end of time
In order to retrieve documents from January 1, 2010 until today, the query is sent to mongos.
In this scenario,
1.
The client queries mongos.

2.
The mongos know which shards have the data, so mongos sends the queries to Shard #3.

3.
Shard #3 executes the query and returns the results to mongos.

4.
Mongos combines the data received from various shards, which in this case is Shard #3 only, and returns the final result back to the client.

The application doesn’t need to be sharding-aware. It can query the mongos as though it’s a normal mongod.
Let’s consider another scenario where you insert a new document. The new document has today’s date. The sequences of events are as follows:
1.
The document is sent to the mongos.

2.
Mongos checks the date and on basis of that, sends the document to Shard #3.

3.
Shard #3 inserts the document.

From a client’s point of view, this is again identical to a single server setup.
Role of ConfigServers in the Above Scenario
Consider a scenario where you start getting insert requests for millions of documents with the date of September 2009. In this case, Shard #2 begins to get overloaded.
The config server steps in once it realizes that Shard #2 is becoming too big. It will split the data on the shard and start migrating it to other shards. After the migration is completed, it sends the updated status to the mongos. So now Shard #2 has data from August 2009 until September 18, 2009 and Shard #3 contains data from September 19, 2009 until the end of time.
When a new shard is added to the cluster, it’s the config server’s responsibility to figure out what to do with it. The data may need to be immediately migrated to the new shard, or the new shard may need to be in reserve for some time. In summary, the config servers are the brains. Whenever any data is moved around, the config servers let the mongos know about the final configuration so that the mongos can continue doing proper routing.

7.5.3 Data Balancing Process

You will next look at how the cluster is kept balanced (i.e. how MongoDB ensures that all the shards are equally loaded).
The addition of new data or modification of existing data, or the addition or removal of servers, can lead to imbalance in the data distribution, which means either one shard is overloaded with more chunks and the other shards have less number of chunks, or it can lead to an increase in the chunk size, which is significantly greater than the other chunks.
MongoDB ensures balance with the following background processes:
  • Chunk splitting
  • Balancer

7.5.3.1 Chunk Splitting

Chunk splitting is one of the processes that ensures the chunks are of the specified size. As you have seen, a shard key is chosen and it is used to identify how the documents will be distributed across the shards. The documents are further grouped into chunks of 64MB (default and is configurable) and are stored in the shards based on the range it is hosting.
If the size of the chunk changes due to an insert or update operation, and exceeds the default chunk size, then the chunk is split into two smaller chunks by the mongos. See Figure 7-20.



A332296_1_En_7_Fig20_HTML.jpg
Figure 7-20.
Chunk splitting
This process keeps the chunks within a shard of the specified size or lesser than that (i.e. it ensures that the chunks are of the configured size).
Insert and update operations trigger splits. The split operation leads to modification of the data in the config server as the metadata is modified. Although splits don’t lead to migration of data, this operation can lead to an unbalance of the cluster with one shard having more chunks compared to another.

7.5.3.2 Balancer

Balancer is the background process that is used to ensure that all of the shards are equally loaded or are in a balanced state. This process manages chunk migrations.
Splitting of the chunk can cause imbalance. The addition or removal of documents can also lead to a cluster imbalance. In a cluster imbalance, balancer is used, which is the process of distributing data evenly.
When you have a shard with more chunks as compared to other shards, then the chunks balancing is done automatically by MongoDB across the shards. This process is transparent to the application and to you.
Any of the mongos within the cluster can initiate the balancer process. They do so by acquiring a lock on the config database of the config server, as balancer involves migration of chunks from one shard to another, which can lead to a change in the metadata, which will lead to change in the config server database. The balancer process can have huge impact on the database performance, so it can either
1.
Be configured to start the migration only when the migration threshold has reached. The migration threshold is the difference in the number of maximum and minimum chunks on the shards. Threshold is shown in Table 7-4.
Table 7-4.
Migration Threshold
Number of Chunks
Migration Threshold
< 20
2
21-80
4
>80
8

2.
Or it can be scheduled to run in a time period that will not impact the production traffic.

The balancer migrates one chunk at a time (see Figure 7-21) and follows these steps:
1.
The moveChunk command is sent to the source shard.

2.
An internal moveChunk command is started on the source where it creates the copy of the documents within the chunk and queues it. In the meantime, any operations for that chunk are routed to the source by the mongos because the config database is not yet changed and the source will be responsible for serving any read/write request on that chunk.

3.
The destination shard starts receiving the copy of the data from the source.

4.
Once all of the documents in the chunks have been received by the destination shard, the synchronization process is initiated to ensure that all changes that have happened to the data while migration are updated at the destination shard.

5.
Once the synchronization is completed, the next step is to update the metadata with the chunk’s new location in the config database. This activity is done by the destination shard that connects to the config database and carries out the necessary updates.

6.
Post successful completion of all the above, the document copy that is maintained at the source shard is deleted.


A332296_1_En_7_Fig21_HTML.jpg
Figure 7-21.
Chunk migration
If in the meanwhile the balancer needs additional chunk migration from the source shard, it can start with the new migration without even waiting for the deletion step to finish for the current migration.
In case of any error during the migration process, the process is aborted by the balancer, leaving the chunks on the original shard. On successful completion of the process, the chunk data is removed from the original shard by MongoDB.
Addition or removal of shards can also lead to cluster imbalance. When a new shard is added, data migration to the shard is started immediately. However, it takes time for the cluster to be balanced.
When a shard is removed, the balancer ensures that the data is migrated to the other shards and the metadata information is updated. Post completion of the two activities, the shard is removed safely.

7.5.4 Operations

You will next look at how the read and write operations are performed on the sharded cluster. As mentioned, the config servers maintain the cluster metadata. This data is stored in the config database. This data of the config database is used by the mongos to service the application read and write requests.
The data is cached by the mongos instances, which is then used for routing write and read operations to the shards. This way the config servers are not overburdened.
The mongos will only read from the config servers in the following scenarios:
  • The mongos has started for first time or
  • An existing mongos has restarted or
  • After chunk migration when the mongos needs to update its cached metadata with the new cluster metadata.
Whenever any operation is issued, the first step that the mongos need to do is to identify the shards that will be serving the request. Since the shard key is used to distribute data across the sharded cluster, if the operation is using the shard key field, then based on that specific shards can be targeted.
If the shard key is employeeid, the following things can happen:
1.
If the find query contains the employeeid field, then to satiate the query, only specific shards will be targeted by the mongos.

2.
If a single update operation uses employeeid for updating the document, the request will be routed to the shard holding that employee data.

However, if the operation is not using the shard key, then the request is broadcast to all the shards. Generally a multi-update or remove operation is targeted across the cluster.
While querying the data, there might be scenarios where in addition to identifying the shards and getting the data from them, the mongos might need to work on the data returned from various shards before sending the final output to the client.
Say an application has issued a find() request with sort(). In this scenario, the mongos will pass the $orderby option to the shards. The shards will fetch the data from their data set and will send the result in an ordered manner. Once the mongos has all the shard’s sorted data, it will perform an incremental merge sort on the entire data and then return the final output to the client.
Similar to sort are the aggregation functions such as limit(), skip(), etc., which require mongos to perform operations post receiving the data from the shards and before returning the final result set to the client.
The mongos consumes minimal system resources and has no persistent state. So if the application requirement is a simple find () queries that can be solely met by the shards and needs no manipulation at the mongos level, you can run the mongos on the same system where your application servers are running.

7.5.5 Implementing Sharding

In this section, you will learn to configure sharding in one machine on a Windows platform.
You will keep the example simple by using only two shards. In this configuration, you will be using the services listed in Table 7-5.
Table 7-5.
Sharding Cluster Configuration
Component
Type
Port
Datafile path
Shard Controller
Mongos
27021
-
Config Server
Mongod
27022
C:\db1\config\data
Shard0
Mongod
27023
C:\db1\shard1\data
Shard1
Mongod
27024
C:\db1\shard2\data
You will be focusing on the following:
1.
Setting up a sharded cluster.

2.
Creating a database and collection, and enable sharding on the collection.

3.
Using the import command to load data in the sharded collection.

4.
Distributed data amongst the shards.

5.
Adding and removing shards from the cluster and checking how data is distributed automatically.

7.5.5.1 Setting the Shard Cluster

In order to set up the cluster , the first step is to set up the configuration server. Enter the following code in a new terminal window to create the data directory for the config server and start the mongod:
C:\> mkdir C:\db1\config\data
C:\>CD C:\practicalmongodb\bin
C:\ practicalmongodb\bin>mongod --port 27022 --dbpath C:\db1\config\data --configsvr
2015-07-13T23:02:41.982-0700 I JOURNAL [journal writer] Journal writer thread started
2015-07-13T23:02:41.984-0700 I CONTROL [initandlisten] MongoDB starting : pid=3084 port=27022 dbpath=C:\db1\config\data master=1 64-bit host=ANOC9
......................................
2015-07-13T23:02:42.066-0700 I REPL [initandlisten] ******
2015-07-13T03:02:42.067-0700 I NETWORK [initandlisten] waiting for connections on port 27022
Next, start the mongos. Type the following in a new terminal window :
C:\>cd c:\practicalmongodb\bin
c:\practicalmongodb\bin>mongos --configdb localhost:27022 --port 27021 --chunkSize 1
2015-07-13T23:06:07.246-0700 W SHARDING running with 1 config server should be done only for testing purposes and is not recommended for production
...............................................................
2015-07-13T23:09:07.464-0700 I SHARDING [Balancer] distributed lock 'balancer/ ANOC9:27021:1429783567:41' unlocked
You now have the shard controller (i.e. the mongos) up and running.
If you switch to the window where the config server has been started, you will find a registration of the shard server to the config server.
In this example you have used chunk size of 1MB. Note that this is not ideal in a real-life scenario since the size is less than 4MB (a document’s maximum size). However, this is just for demonstration purpose since this creates the necessary amount of chunks without loading a large amount of data. The chunkSize is 128MB by default unless otherwise specified.
Next, bring up the shard servers, Shard0 and Shard1.
Open a fresh terminal window. Create the data directory for the first shard and start the mongod:
C:\>mkdir C:\db1\shard0\data
C:\>cd c:\practicalmongodb\bin
c:\practicalmongodb\bin>mongod --port 27023 --dbpath c:\db1\shard0\data –shardsvr
2015-07-13T23:14:58.076-0700 I CONTROL [initandlisten] MongoDB starting : pid=1996 port=27023 dbpath=c:\db1\shard0\data 64-bit host=ANOC9
.................................................................
2015-07-13T23:14:58.158-0700 I NETWORK [initandlisten] waiting for connections on port 27023
Open fresh terminal window. Create the data directory for the second shard and start the mongod:
C:\>mkdir c:\db1\shard1\data
C:\>cd c:\practicalmongodb\bin
c:\practicalmongodb\bin>mongod --port 27024 --dbpath C:\db1\shard1\data --shardsvr
2015-07-13T23:17:01.704-0700 I CONTROL [initandlisten] MongoDB starting : pid=3672 port=27024 dbpath=C:\db1\shard1\data 64-bit host=ANOC9
2015-07-13T23:17:01.704-0700 I NETWORK [initandlisten] waiting for connections on port 27024
All the servers relevant for the setup are up and running by the end of the above step. The next step is to add the shards information to the shard controller.
The mongos appears as a complete MongoDB instance to the application in spite of actually not being a full instance. The mongo shell can be used to connect to the mongos to perform any operation on it.
Open the mongos mongo console:
C:\>cd c:\practicalmongodb\bin
c:\ practicalmongodb\bin>mongo localhost:27021
MongoDB shell version: 3.0.4
connecting to: localhost:27021/test
mongos>
Switch to the admin database:
mongos> use admin
switched to db admin
mongos>
Add the shards information by running the following commands :
mongos> db.runCommand({addshard:"localhost:27023",allowLocal:true})
{ "shardAdded" : "shard0000", "ok" : 1 }
mongos> db.runCommand({addshard:"localhost:27024",allowLocal:true})
{ "shardAdded" : "shard0001", "ok" : 1 }
mongos>
This activates the two shard servers.
The next command checks the shards:
mongos> db.runCommand({listshards:1})
{
"shards" : [
{
"_id" : "shard0000",
"host" : "localhost:27023"
}, {
"_id" : "shard0001",
"host" : "localhost:27024"
}
], "ok" : 1}

7.5.5.2 Creating a Database and Shard Collection

In order to continue further with the example, you will create a database named testdb and a collection named testcollection, which you will be sharding on the key testkey.
Connect to the mongos console and issue the following command to get the database:
mongos> testdb=db.getSisterDB("testdb")
testdb
Next, enabling sharding at database level for testdb:
mongos> db.runCommand({enableSharding system: "testdb"})
{ "ok" : 1 }
mongos>
Next, specify the collection that needs to be sharded and the key on which the collection will be sharded:
mongos> db.runCommand({shardcollection: "testdb.testcollection", key: {testkey:1}})
{ "collectionsharded" : "testdb.testcollection", "ok" : 1 }
mongos>
With the completion of the above steps you now have a sharded cluster set up with all components up and running. You have also created a database and enabled sharding on the collection.
Next, import data into the collection so that you can check the data distribution on the shards.
You will be using the import command to load data in the testcollection. Connect to a new terminal window and execute the following:
C:\>cd C:\practicalmongodb\bin
C:\practicalmongodb\bin>mongoimport --host ANOC9 --port 27021 --db testdb --collection testcollection --type csv --file c:\mongoimport.csv –-headerline
2015-07-13T23:17:39.101-0700 connected to: ANOC9:27021
2015-07-13T23:17:42.298-0700 [##############..........] testdb.testcollection 1.1 MB/1.9 MB (59.6%)
2015-07-13T23:17:44.781-0700 imported 100000 documents
The mongoimport.csv consists of two fields. The first is the testkey, which is a randomly generated number. The second field is a text field; it is used to ensure that the documents occupy a sufficient number of chunks, making it feasible to use the sharding mechanism.
This inserts 100,000 objects in the collection.
In order to vet whether the records are inserted or not, connect to the mongo console of the mongos and issue the following command:
C:\Windows\system32>cd c:\practicalmongodb\bin
c:\practicalmongodb\bin>mongo localhost:27021
MongoDB shell version: 3.0.4
connecting to: localhost:27021/test
mongos> use testdb
switched to db testdb
mongos> db.testcollection.count()
100000
mongos>
Next, connect to the consoles of the two shards (Shard0 and Shard1) and look at how the data is distributed. Open a new terminal window and connect to Shard0’s console:
C:\>cd C:\practicalmongodb\bin
C:\ practicalmongodb\bin>mongo localhost:27023
MongoDB shell version: 3.0.4
connecting to: localhost:27023/test
Switch to testdb and issue the count() command to check number of documents on the shard:
> use testdb
switched to db testdb
> db.testcollection.count()
57998
Next, open a new terminal window, connect to Shard1’s console, and follow the steps as above (i.e. switch to testdb and check the count of testcollection collection):
C:\>cd c:\practicalmongodb\bin
c:\practicalmongodb\bin>mongo localhost:27024
MongoDB shell version: 3.0.4
connecting to: localhost:27024/test
> use testdb
switched to db testdb
> db.testcollection.count()
42002
>
You might see a difference in the document’s number in each shard when you run the above command for some time. When the documents are loaded, all of the chunks are placed on one shard by the mongos. In time the shard set is rebalanced by distributing the chunks evenly across all the shards.

7.5.5.3 Adding a New Shard

You have a sharded cluster set up and you also have sharded a collection and looked at how the data is distributed amongst the shards. Next, you’ll add a new shard to the cluster so that the load is spread out a little more.
You will be repeating the steps mentioned above. Begin by creating a data directory for the new shard in a new terminal window:
c:\>mkdir c:\db1\shard2\data
Next, start the mongod at port 27025:
c:\>cd c:\practicalmongodb\bin
c:\ practicalmongodb\bin>mongod --port 27025 --dbpath C:\db1\shard2\data --shardsvr
2015-07-13T23:25:49.103-0700 I CONTROL [initandlisten] MongoDB starting : pid=3744 port=27025 dbpath=C:\db1\shard2\data 64-bit host=ANOC9
................................
2015-07-13T23:25:49.183-0700 I NETWORK [initandlisten] waiting for connections on port 27025
Next, the new shard server will be added to the shard cluster. In order to configure it, open the mongos mongo console in a new terminal window:
C:\>cd c:\practicalmongodb\bin
c:\practicalmongodb\bin>mongo localhost:27021
MongoDB shell version: 3.0.4
connecting to: localhost:27021/test
mongos>
Switch to the admin database and run the addshard command . This command adds the shard server to the sharded cluster.
mongos> use admin
switched to db admin
mongos> db.runCommand({addshard: "localhost:27025", allowlocal: true})
{ "shardAdded" : "shard0002", "ok" : 1 }
mongos>
In order to vet whether the addition is successful or not, run the listshards command :
mongos> db.runCommand({listshards:1})
{
"shards" : [
{
"_id" : "shard0000",
"host" : "localhost:27023"
},
{
"_id" : "shard0001",
"host" : "localhost:27024"
},
{
"_id" : "shard0002",
"host" : "localhost:27025"
}
],
"ok" : 1
}
Next, check how the testcollection data is distributed. Connect to the new shard’s console in a new terminal window:
C:\>cd c:\practicalmongodb\bin
c:\practicalmongodb\bin>mongo localhost:27025
MongoDB shell version: 3.0.4
connecting to: localhost:27025/test
Switch to testdb and check the collections listed on the shard:
> use testdb
switched to db testdb
> show collections
system.indexes
testcollection
Issue a testcollection.count command three times:
> db.testcollection.count()
6928
> db.testcollection.count()
12928
> db.testcollection.count()
16928
Interestingly, the number of items in the collection is slowly going up. The mongos is rebalancing the cluster.
With time, the chunks will be migrated from the shard servers Shard0 and Shard1 to the newly added shard server, Shard2, so that the data is evenly distributed across all the servers. Post completion of this process the config server metadata is updated. This is an automatic process and it happens even if there’s no new data addition in the testcollection. This is one of the important factors you need to consider when deciding on the chunk size.
If the value of chunkSize is very large, you will end up having less even data distribution. The data is more evenly distributed when the chunkSize is smaller.

7.5.5.4 Removing a Shard

In the following example, you will see how to remove a shard server. For this example, you will be removing the server you added in the above example.
In order to initiate the process, you need to log on to the mongos console, switch to the admin db, and execute the following command to remove the shard from the shard cluster:
C:\>cd c:\practicalmongodb\bin
c:\practicalmongodb\bin>mongo localhost:27021
MongoDB shell version: 3.0.4
connecting to: localhost:27021/test
mongos> use admin
switched to db admin
mongos> db.runCommand({removeShard: "localhost:27025"})
{
"msg" : "draining started successfully",
"state" : "started",
"shard" : "shard0002",
"ok" : 1
}
mongos>
As you can see, the removeShard command returns a message. One of the message fields is state, which indicates the process state. The message also states that the draining process has started. This is indicated by the field msg.
You can reissue the removeShard command to check the progress:
mongos> db.runCommand({removeShard: "localhost:27025"})
{
"msg" : "draining ongoing",
"state" : "ongoing",
"remaining" : {
"chunks" : NumberLong(2),
"dbs" : NumberLong(0)
},
"ok" : 1
}
mongos>
The response tells you the number of chunks and databases that still need to be drained from the server. If you reissue the command and the process is terminated, the output of the command will depict the same.
mongos> db.runCommand({removeShard: "localhost:27025"})
{
"msg" : "removeshard completed successfully",
"state" : "completed",
"shard" : "shard0002",
"ok" : 1
}
mongos>
You can use the listshards to vet whether removeShard was successful or not.
As you can see, the data is successfully migrated to the other shards, so you can delete the storage files and terminate the Shard2 mongod process.
This ability to modify the shard cluster without going offline is one of the critical components of MongoDB, which enables it to support highly available, highly scalable, large capacity data stores.

7.5.5.5 Listing the Sharded Cluster Status

The printShardingStatus() command gives lots of insight into the sharding system internals.
mongos> db.printShardingStatus()
--- Sharding Status ---
sharding version: {
"_id" : 1,
"version" : 3,
"minCompatibleVersion" : 5,
"currentVersion" : 6,
"clusterId" : ObjectId("52fb7a8647e47c5884749a1a")
}
shards:
{ "_id" : "shard0000", "host" : "localhost:27023" }
{ "_id" : "shard0001", "host" : "localhost:27024" }
balancer:
Currently enabled: yes
Currently running: no
Failed balancer rounds in last 5 attempts: 0
Migration Results for the last 24 hours:
17 : Success
databases:
{ "_id" : "admin", "partitioned" : false, "primary" : "config" }
{ "_id" : "testdb", "partitioned" : true, "primary" : "shard0000" }
...............
The output lists the following:
  • All of the shard servers of the shard cluster
  • The configurations of each sharded database/collection
  • All of the chunks of the sharded dataset
Important information that can be obtained from the above command is the sharding keys range, which is associated with each chunk. This also shows where specific chunks are stored (on which shard server). The output can be used to analyse the shard server’s keys and chunks distribution.



No comments:

Post a Comment