Monday 5 March 2018

MongooDB Concept Part 6


7.5.6 Controlling Collection Distribution (Tag-Based Sharding)

In the previous section, you saw how data distribution happens. In this section, you will learn about tag-based sharding . This feature was introduced in version 2.2.0.
Tagging gives operators control over which collections go to which shard.
In order to understand tag-based sharding, let’s set up a sharded cluster. You will be using the shard cluster created above. For this example, you need three shards, so you will add Shard2 again to the cluster.

7.5.6.1 Prerequisite

You will start the cluster first. Just to reiterate, follow these steps.
1.
Start the config server. Enter the following command in a new terminal window (if it’s not already running):
C:\> mkdir C:\db1\config\data
C:\>cd c:\practicalmongodb\bin
C:\practicalmongodb\bin>mongod --port 27022 --dbpath C:\db\config\data --configsvr

2.
Start the mongos. Enter the following command in a new terminal window (if it’s not already running):
C:\>cd c:\practicalmongodb\bin
c:\practicalmongodb\bin>mongos --configdb localhost:27022 --port 27021

3.
You will start the shard servers next.

Start Shard0. Enter the following command in a new terminal window (if it’s not already running):
C:\>cd c:\practicalmongodb\bin
c:\practicalmongodb\bin>mongod --port 27023 --dbpath c:\db1\shard0\data --shardsvr
Start Shard1. Enter the following command in a new terminal window (if it’s not already running):
C:\>cd c:\practicalmongodb\bin
C:\practicalmongodb\bin>mongod --port 27024 --dbpath c:\db1\shard1\data –shardsvr
Start Shard2. Enter the following command in a new terminal window (if it’s not already running):
C:\>cd c:\practicalmongodb\bin
c:\practicalmongodb\bin>mongod --port 27025 --dbpath c:\db1\shard2\data –shardsvr
Since you have removed Shard2 from the sharded cluster in the earlier example, you must add Shard2 to the sharded cluster because for this example you need three shards.
In order to do so, you need to connect to the mongos. Enter the following commands:
C:\Windows\system32>cd c:\practicalmongodb\bin
c:\practicalmongodb\bin>mongo localhost:27021
MongoDB shell version: 3.0. 4
connecting to: localhost:27021/test
mongos>
Before the shard is added to the cluster you need to delete the testdb database:
mongos> use testdb
switched to db testdb
mongos> db.dropDatabase()
{ "dropped" : "testdb", "ok" : 1 }
mongos>
Next, add the Shard2 shard using the following steps:
mongos> use admin
switched to db admin
mongos> db.runCommand({addshard: "localhost:27025", allowlocal: true})
{ "shardAdded" : "shard0002", "ok" : 1 }
mongos>
If you try adding the removed shard without removing the testdb database, it will give the following error:
mongos>db.runCommand({addshard: "localhost:27025", allowlocal: true})
{
"ok" : 0,
"errmsg" : "can't add shard localhost:27025 because a local database 'testdb' exists in another shard0000:localhost:27023"}
In order to ensure that all the three shards are present in the cluster, run the following command:
mongos> db.runCommand({listshards:1})
{
"shards" : [
{
"_id" : "shard0000",
"host" : "localhost:27023"
}, {
"_id" : "shard0001",
"host" : "localhost:27024"
}, {
"_id" : "shard0002",
"host" : "localhost:27025"
}
], "ok" : 1}

7.5.6.2 Tagging

By the end of the above steps you have your sharded cluster with a config server, three shards, and a mongos up and running. Next, connect to the mongos at 30999 port and configdb at 27022 in a new terminal window:
C:\ >cd c:\practicalmongodb\bin
c:\ practicalmongodb\bin>mongos --port 30999 --configdb localhost:27022
2015-07-13T23:24:39.674-0700 W SHARDING running with 1 config server should be done only for testing purposes and is not recommended for production
...................................
2015-07-13T23:24:39.931-0700 I SHARDING [Balancer] distributed lock 'balancer /ANOC9:30999: 1429851279:41' unlocked..
Next, start a new terminal window, connect to the mongos, and enable sharding on the collections:
C:\ >cd c:\practicalmongodb\bin
c:\practicalmongodb\bin>mongo localhost:27021
MongoDB shell version: 3.0.4
connecting to: localhost:27021/test
mongos> show dbs
admin (empty)
config 0.016GB
testdb 0.078GB
mongos> conn=new Mongo("localhost:30999")
connection to localhost:30999
mongos> db=conn.getDB("movies")
movies
mongos> sh.enableSharding("movies")
{ "ok" : 1 }
mongos> sh.shardCollection("movies.drama", {originality:1})
{ "collectionsharded" : "movies.hindi", "ok" : 1 }
mongos> sh.shardCollection("movies.action", {distribution:1})
{ "collectionsharded" : "movies.action", "ok" : 1 }
mongos> sh.shardCollection("movies.comedy", {collections:1})
{ "collectionsharded" : "movies.comedy", "ok" : 1 }
mongos>
The steps are as follows:
1.
Connect to the mongos console.

2.
View the running databases connected to the mongos instance running at port 30999.

3.
Get reference to the database movies.

4.
Enable sharding of the database movies.

5.
Shard the collection movies.drama by shard key originality.

6.
Shard the collection movies.action by shard key distribution.

7.
Shard the collection movies.comedy by shard key collections.

Next, insert some data in the collections, using the following sequence of commands:
mongos>for(var i=0;i<100000;i++){db.drama.insert({originality:Math.random(), count:i, time:new Date()});}
mongos>for(var i=0;i<100000;i++){db.action.insert({distribution:Math.random(),
count:i, time:new Date()});}
mongos>for(var i=0;i<100000;i++) {db.comedy.insert({collections:Math.random(), count:i, time:new Date()});}
mongos>
By the end of the above step you have three shards and three collections with sharding enabled on the collections. Next you will see how data is distributed across the shards.
Switch to configdb:
mongos> use config
switched to db config
mongos>
You can use chunks.find to look at how the chunks are distributed:
mongos> db.chunks.find({ns:"movies.drama"}, {shard:1, _id:0}).sort({shard:1})
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
mongos> db.chunks.find({ns:"movies.action"}, {shard:1, _id:0}).sort({shard:1})
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
mongos> db.chunks.find({ns:"movies.comedy"}, {shard:1, _id:0}).sort({shard:1})
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
mongos>
As you can see, the chunks are pretty evenly spread out amongst the shards. See Figure 7-22.



A332296_1_En_7_Fig22_HTML.jpg
Figure 7-22.
Distribution without tagging
Next, you will use tags to separate the collections. The intent of this is to have one collection per shard (i.e. your goal is to have the chunk distribution shown in Table 7-6).
Table 7-6.
Chunk Distribution
Collection Chunks
On Shard
movies.drama
Shard0000
movies.action
Shard0001
movies.comedy
Shard0002
A tag describes the shard’s property, which can be anything. Hence you might tag a shard as “slow” or “fast” or “rack space” or “west coast.”
In the following example, you will tag the shards as belonging to each of the collection:
mongos> sh.addShardTag("shard0000", "dramas")
mongos> sh.addShardTag("shard0001", "actions")
mongos> sh.addShardTag("shard0002", "comedies")
mongos>
This signifies the following:
  • Put the chunks tagged “dramas” on shard0000.
  • Put the chunks tagged “actions” on shard0001.
  • And put the chunks tagged “comedies” on shard0002.
Next, you will create rules to tag the collections chunk accordingly.
Rule 1: All chunks created in the movies.drama collection will be tagged as “dramas:”
mongos> sh.addTagRange("movies.drama", {originality:MinKey}, {originality:MaxKey}, "dramas")
mongos>
The rule uses MinKey, which means negative infinity, and MaxKey, which means positive infinity. Hence the above rule means mark all of the chunks of the collection movies.drama with the tag “dramas.”
Similar to this you will make rules for the other two collections.
Rule 2: All chunks created in the movies.action collection will be tagged as “actions.”
mongos> sh.addTagRange("movies.action", {distribution:MinKey}, {distribution:MaxKey}, "actions")
mongos>
Rule 3: All chunks created in the movies.comedy collection will be tagged as “comedies.”
mongos> sh.addTagRange("movies.comedy", {collection:MinKey}, {collection:MaxKey}, "comedies")
mongos>
You need to wait for the cluster to rebalance so that the chunks are distributed based on the tags and rules defined above. As mentioned, the chunk distribution is an automatic process, so after some time the chunks will automatically be redistributed to implement the changes you have made.
Next, issue chunks.find to vet the chunks organization:
mongos> use config
switched to db config
mongos> db.chunks.find({ns:"movies.drama"}, {shard:1, _id:0}).sort({shard:1})
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
mongos> db.chunks.find({ns:"movies.action"}, {shard:1, _id:0}).sort({shard:1})
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
mongos> db.chunks.find({ns:"movies.comedy"}, {shard:1, _id:0}).sort({shard:1})
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
mongos>
Thus the collection chunks have been redistributed based on the tags and rules defined (Figure 7-23).



A332296_1_En_7_Fig23_HTML.jpg
Figure 7-23.
Distribution with tagging

7.5.6.3 Scaling with Tagging

Next, you will look at how to scale with tagging . Let’s change the scenario. Let’s assume the collection movies.action needs two servers for its data. Since you have only three shards, this means the other two collection’s data need to be moved to one shard.
In this scenario, you will change the tagging of the shards. You will add the tag “comedies” to Shard0 and remove the tag from Shard2, and further add the tag “actions” to Shard2.
This means that the chunks tagged “comedies” will be moved to Shard0 and chunks tagged “actions” will be spread to Shard2.
You first move the collection movies.comedy chunk to Shard0 and remove the same from Shard2:
mongos> sh.addShardTag("shard0000","comedies")
mongos> sh.removeShardTag("shard0002","comedies")
Next, you add the tag “actions” to Shard2, so that movies.action chunks are spread across Shard2 also:
mongos> sh.addShardTag("shard0002","actions")
Re-issuing the find command after some time will show the following results:
mongos> db.chunks.find({ns:"movies.drama"}, {shard:1, _id:0}).sort({shard:1})
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
mongos> db.chunks.find({ns:"movies.action"}, {shard:1, _id:0}).sort({shard:1})
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
mongos> db.chunks.find({ns:"movies.comedy"}, {shard:1, _id:0}).sort({shard:1})
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
mongos>
The chunks have been redistributed reflecting the changes made (Figure 7-24).



A332296_1_En_7_Fig24_HTML.jpg
Figure 7-24.
Tagging with scaling

7.5.6.4 Multiple Tags

You can have multiple tags associated with the shards. Let’s add two different tags to the shards.
Say you want to distribute the writes based on the disk. You have one shard that has a spinning disk and the other has a SSD (solid state drive). You want to redirect 50% of the writes to the shard with SSD and the remaining to the one with the spinning disk.
First, tag the shards based on these properties:
mongos> sh.addShardTag("shard0001", "spinning")
mongos> sh.addShardTag("shard0002", "ssd")
mongos>
Let’s further assume you have a distribution field of the movies.action collection that you will be using as the shard key. The distribution field value is between 0 and 1. Next, you want to say, “If distribution < .5, send this to the spinning disk. If distribution >= .5, send to the SSD.” So you define the rules as follows:
mongos>sh.addTagRange("movies.action", {distribution:MinKey} ,{distribution:.5} ,"spinning")
mongos>sh.addTagRange("movies.action" ,{distribution:.5} ,{distribution:MaxKey},"ssd")
mongos>
Now documents with distribution < .5 will be written to the spinning shard and the others will be written to the SSD disk shard.
With tagging you can control the type of load that each newly added server will get.

7.5.7 Points to Remember When Importing Data in a ShardedEnvironment

Here are some points to keep in mind when importing data.

7.5.7.1 Pre-Splitting of the Data

Instead of leaving the choice of chunks creation with MongoDB, you can tell MongoDB how to do so using the following command:
db.runCommand( { split : "practicalmongodb.mycollection" , middle : { shardkey : value } } );
Post this you can also let MongoDB know which chunks goes to which node.
For all this you will need knowledge of the data you will be imported to the database. And this also depends on the use case you are aiming to solve and how the data is being read by your application. When deciding where to place the chunk, keep things like data locality in mind.

7.5.7.2 Deciding on the Chunk Size

You need to keep the following points in mind when deciding on the chunk size :
1.
If the size is too small, the data will be distributed evenly but it will end up having more frequent migrations, which will be an expensive operation at the mongos layer.

2.
If the size is large, it will lead to less migration, reducing the expense at the mongos layer, but you will end up with uneven data distribution.

7.5.7.3 Choosing a Good Shard Key

It’s very essential to pick a good shard key for good distribution of data among nodes of the shard cluster .

7.5.8 Monitoring for Sharding

In addition to the normal monitoring and analysis that is done for other MongoDB instances, the sharding cluster requires an additional monitoring to ensure that all its operations are functioning appropriately and the data is distributed effectively among the nodes. In this section, you will see what monitoring you should do for the proper functioning of the sharding cluster.

7.5.9 Monitoring the Config Servers

The config server, as you know by now, stores the metadata of the sharded cluster. The mongos caches the data and routes the request to the respective shards. If the config server goes down but there’s a running mongos instance, there’s no immediate impact on the shard cluster and it will remain available for a while. However, you won’t be able to perform operations like chunk migration or restart a new mongos. In the long run, the unavailability of the config server can severely impact the availability of the cluster. To ensure that the cluster remains balanced and available, you should monitor the config servers.

7.5.9.1 Monitoring the Shard Status Balancing and Chunk Distribution

For a most effective sharded cluster deployment , it’s required that the chunks be distributed evenly among the shards. As you know by now, this is done automatically by MongoDB using a background process. You need to monitor the shard status to ensure that the process is working effectively. For this, you can use the db.printShardingStatus() or sh.status() command in the mongos mongo shell to ensure that the process is working effectively.

7.5.9.2 Monitoring the Lock Status

In almost all cases the balancer releases its locks automatically after completing its process, but you need to check the lock status of the database in order to ensure there’s no long lasting lock because this can block future balancing, which will affect the availability of the cluster. Issue the following from mongos mongo to check the lock status:
use config
db.locks.find()

7.6 Production Cluster Architecture

In this section, you will look at the production cluster architecture. In order to understand it, let’s consider a very generic use case of a social networking application where the user can create a circle of friends and can share their comments or pictures across the group. The user can also comment or like her friend’s comments or pictures. The users are geographically distributed.
The application requirement is immediate availability across geographies of all the comments; data should be redundant so that the user’s comments, posts and pictures are not lost; and it should be highly available. So the application’s production cluster should have the following components :
1.
At least two mongos instance, but you can have more as per need.

2.
Three config servers, each on a separate system.

3.
Two or more replica sets serving as shards. The replica sets are distributed across geographies with read concern set to nearest. See Figure 7-25.

A332296_1_En_7_Fig25_HTML.jpg
Figure 7-25.
Production cluster architecture

Now let’s look at the possible failure scenarios in MongoDB production deployment and its impact on the environment.

7.6.1 Scenario 1

Mongos become unavailable: The application server where mongos has gone down will not be able to communicate with the cluster but it will not lead to any data loss since the mongos don’t maintain any data of its own. The mongos can restart, and while restarting, it can sync up with the config servers to cache the cluster metadata, and the application can normally start its operations (Figure 7-26).



A332296_1_En_7_Fig26_HTML.jpg
Figure 7-26.
Mongos become unavailable

7.6.2 Scenario 2

One of the mongod of the replica set becomes unavailable in a shard: Since you used replica sets to provide high availability, there is no data loss. If a primary node is down, a new primary is chosen, whereas if it’s a secondary node, then it is disconnected and the functioning continues normally (Figure 7-27).



A332296_1_En_7_Fig27_HTML.jpg
Figure 7-27.
One of the mongod of replica set is unavailable
The only difference is that the duplication of the data is reduced, making the system little weak, so you should in parallel check if the mongod is recoverable. If it is, it should be recovered and restarted whereas if it’s non-recoverable, you need to create a new replica set and replace it as soon as possible.

7.6.3 Scenario 3

If one of the shard becomes unavailable: In this scenario, the data on the shard will be unavailable, but the other shards will be available, so it won’t stop the application. The application can continue with its read/write operations; however, the partial results must be dealt with within the application. In parallel, the shard should attempt to recover as soon as possible (Figure 7-28).



A332296_1_En_7_Fig28_HTML.jpg
Figure 7-28.
Shard unavailable

7.6.4 Scenario 4

Only one config server is available out of three: In this scenario, although the cluster will become read-only, it will not serve any operations that might lead to changes in the cluster structure, thereby leading to a change of metadata such as chunk migration or chunk splitting. The config servers should be replaced ASAP because if all config servers become unavailable, this will lead to an inoperable cluster (Figure 7-29).



A332296_1_En_7_Fig29_HTML.jpg
Figure 7-29.
Only one config server available

7.7 Summary

In this chapter you covered the core processes and tools, standalone deployment, sharding concepts, replication concepts, and production deployment. You also looked at how HA can be achieved.
In the following chapter, you will see how data is stored under the hood, how writes happens using journaling, what is GridFS used for, and the different types of indexes available in MongoDB.











No comments:

Post a Comment