-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
listing-8.9.js
101 lines (87 loc) · 3.01 KB
/
listing-8.9.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"use strict";
const argv = require('yargs').argv;
const MongoClient = require('mongodb').MongoClient;
const spawn = require('child_process').spawn;
const parallel = require('async-await-parallel');
const hostName = "mongodb://127.0.0.1:7000";
const databaseName = "weather_stations";
const collectionName = "daily_readings";
//
// Open the connection to the database.
//
function openDatabase () {
return MongoClient.connect(hostName)
.then(client => {
const db = client.db(databaseName);
const collection = db.collection(collectionName);
return {
collection: collection,
close: () => {
return client.close();
},
};
});
};
//
// Run the slave process.
//
function runSlave (skip, limit, slaveIndex) {
return new Promise((resolve, reject) => {
const args = [ "listing-8.8.js", "--skip", skip, "--limit", limit ];
const childProcess = spawn("node", args);
childProcess.stdout.on("data", data => {
console.log("[" + slaveIndex + "]: INF: " + data);
});
childProcess.stderr.on("data", data => {
console.log("[" + slaveIndex + "]: ERR: " + data);
});
childProcess.on("close", code => {
if (code === 0) {
resolve();
}
else {
reject(code);
}
});
childProcess.on("error", err => {
reject(err);
});
});
};
//
// Run the slave process for a particular batch of records.
//
function processBatch (batchIndex, batchSize) {
const startIndex = batchIndex * batchSize;
return () => { // Encapsulate in an anon fn so that execution is deferred until later.
return runSlave(startIndex, batchSize, batchIndex);
};
};
//
// Process the entire database in batches of 100 records.
// 2 batches are processed in parallel, but this number can be tweaked based on the number of cores you
// want to throw at the problem.
//
function processDatabase (numRecords) {
const batchSize = 100; // The number of records to process in each batchs.
const maxProcesses = 2; // The number of process to run in parallel.
const numBatches = numRecords / batchSize; // Total number of batches that we need to process.
const slaveProcesses = [];
for (let batchIndex = 0; batchIndex < numBatches; ++batchIndex) {
slaveProcesses.push(processBatch(batchIndex, batchSize));
}
return parallel(slaveProcesses, maxProcesses);
};
openDatabase()
.then(db => {
return db.collection.find().count() // Determine number of records to process.
.then(numRecords => processDatabase(numRecords)) // Process the entire database.
.then(() => db.close()); // Close the database when done.
})
.then(() => {
console.log("Done processing all records.");
})
.catch(err => {
console.error("An error occurred reading the database.");
console.error(err);
});