How to use the eachLimit function from async

Find comprehensive JavaScript async.eachLimit code examples handpicked from public code repositorys.

async.eachLimit is a function in the async library for Node.js that runs an iterator function on each element in a collection with a specified limit on the number of concurrent tasks.

106
107
108
109
110
111
112
113
114
115
  return callback(error);
}

debug("browse", "path=" , path, "returns length=" + files.length);

Async.eachLimit(files, BROWSE_FILES_LIMIT, (childURL, callback) => {

  if (/^\./.exec(childURL.basename)) {
    return callback();
  }
fork icon46
star icon169
watch icon18

1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
}));

store.buckets = bucketCount;
store.bucketList = retBucketInfos;

return async.eachLimit(bucketInfos, this.concurrentCursors,
    (bucketInfo, done) => {
        async.waterfall([
            next => this._getIsTransient(bucketInfo, log, next),
            (isTransient, next) => {
fork icon17
star icon13
watch icon56

How does async.eachLimit work?

async.eachLimit is a function that takes three arguments: a collection (an array or an object), a limit parameter (a number that specifies the maximum number of concurrent tasks), and an iterator function (a function that is called on each element in the collection).

The iterator function takes two arguments: the current element in the collection and a callback function. The callback function is called when the iterator has completed its work on the current element, and it takes an error parameter and an optional result parameter.

async.eachLimit works by starting the iterator function on the first element in the collection. If the limit parameter is greater than 1, the function will continue starting iterator functions on the next elements in the collection, up to the limit parameter. When one of the iterator functions completes, the next element in the collection is passed to the function. This continues until all elements in the collection have been processed.

If an error occurs during the processing of an element, async.eachLimit will immediately stop processing any further elements and call the final callback function with the error. If all elements are processed successfully, the final callback function is called with no error and an optional result parameter.

async.eachLimit provides a way to process a collection of items concurrently while also limiting the number of tasks that are running at the same time, which can help prevent resource exhaustion or other issues.

98
99
100
101
102
103
104
105
106
107
var async = require('async');
var fs = require('fs');

fs.readdir('./bufferconcat', function (err, files) {
  var count = 0;
  async.eachLimit(files, 2, function (file, callback) {
    if( file.length > 32 ) {
      console.log('This file name is too long');
      callback('File name too long');
    } else {
fork icon1
star icon8
watch icon3

58
59
60
61
62
63
64
65
66
67
    });
  });
};

// Avoid overwhelming any service by limiting parallelism
async.eachLimit(res, 5, makeRequest, function (err) {
  if (err) handleError(err);
  removeMongoEventsNotFoundInFacebook(facebookEventIds);
  console.log(`Facebook ETL Ended`);
});
fork icon8
star icon3
watch icon7

Ai Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
const async = require("async");
const http = require("http");
const fs = require("fs");

const filesToDownload = ["file1.jpg", "file2.jpg", "file3.jpg", "file4.jpg"];

const downloadFile = (file, callback) => {
  const url = "http://example.com/files/" + file;
  const filePath = "./downloads/" + file;

  const fileStream = fs.createWriteStream(filePath);
  const request = http
    .get(url, (response) => {
      response.pipe(fileStream);
      fileStream.on("finish", () => {
        fileStream.close();
        callback();
      });
    })
    .on("error", (err) => {
      fs.unlink(filePath);
      callback(err);
    });
};

async.eachLimit(filesToDownload, 2, downloadFile, (err) => {
  if (err) {
    console.error(err);
  } else {
    console.log("All files downloaded successfully!");
  }
});

In this example, we have an array filesToDownload that contains the names of the files we want to download. We define a function downloadFile that takes a file name and a callback function. This function creates a write stream for the file, makes an HTTP request to download the file from a remote server, and pipes the response to the file stream. When the download is complete, it closes the file stream and calls the callback. We then use async.eachLimit to run the downloadFile function on each element in filesToDownload, with a limit of 2 concurrent tasks. Finally, we pass a callback function that is called when all downloads are complete, or when an error occurs. In this example, the callback function simply logs a message if all files were downloaded successfully, or logs an error if any downloads failed.

102
103
104
105
106
107
108
109
110
111
Fixture.find({ _id: { $in: gameticket.fixtures } }).toArray(function(err, fixtures){
  // Lancer une itération pour récupérer chaque grille appartenant au groupe du ticket
  _pendingGrids = [];
  //console.log("Lancement de la comparaisons des pronostics jouées pour chaque grille par rapport au résultat des matchs.");
  // Traiter chaque grille du ticket de jeu
  async.eachLimit(groupTicket, 1, function(grilleTicket, eachGrilleTicket){
    var countWin = 0;
    var countCanceled = 0;
    var points = 0;
    //console.log("Traîtement de la grille :", grilleTicket._id);
fork icon0
star icon2
watch icon1

+ 5 other calls in file

197
198
199
200
201
202
203
204
205
206

// Envoyer une notification push à tous les utilisateurs ayant jouées une grille gagnante
pushGrilleJackpot(db, _jackpotGrilles, gameticket).then(function(result){
  // Créer une demande de paiement pour chaque grille gagnante
  var payoutArr = [];
  async.eachLimit(_jackpotGrilles, 1, function(grille, eachJackpotGrille){
    createPayout(db, grille).then(function(payout){
      if (payout != null){
        payoutArr.push(payout);
        //console.log('Send jackpot email alert');
fork icon0
star icon2
watch icon1

+ 5 other calls in file

651
652
653
654
655
656
657
658
659
660

if (!resMessages || !resMessages.value || resMessages.value.length === 0) {
    return callback(null, { files: [] });
}

return Async.eachLimit(resMessages.value, concurrentLimitFiles, (message, callback) => {

    // Only select the properties we need so we don't fetch the content bytes (for performance reasons)
    return client.api(`/me/messages/${message.id}/attachments`)
        .select(['microsoft.graph.fileAttachment/contentId', 'microsoft.graph.fileAttachment/contentType', 'id', 'size', 'name', 'lastModifiedDateTime', 'isInline'])
fork icon2
star icon1
watch icon6

66
67
68
69
70
71
72
73
74
75
    })
  })
}
async eachLimit(coll, limit, iteratee) {    
  return new Promise(async (resolve, reject) => { 
    async.eachLimit(coll, limit, iteratee, (err, res) => {
      if (err) { 
        reject(err)
      } else {
        resolve(res)
fork icon1
star icon0
watch icon1

+ 21 other calls in file

155
156
157
158
159
160
161
162
163
164
    }
  });
},
// Vérifier la validité des paris
function(gameticket, done){
  async.eachLimit(gameticket.fixtures, 1, function(fixture, eachFixture){
    //console.log('Check if fixture exists in bets array :', fixture._id);
    //if (bets.hasOwnProperty(fixture._id)){
    //console.log(bets);
    var bet = bets.filter(e => e.fixture == fixture._id);
fork icon0
star icon2
watch icon1

1372
1373
1374
1375
1376
1377
1378
1379
1380
1381

const libDlQueue = []
let dlSize = 0

//Check validity of each library. If the hashs don't match, download the library.
async.eachLimit(libArr, 5, (lib, cb) => {
    if(Library.validateRules(lib.rules, lib.natives)){
        let artifact = (lib.natives == null) ? lib.downloads.artifact : lib.downloads.classifiers[lib.natives[Library.mojangFriendlyOS()].replace('${arch}', process.arch.replace('x', ''))]
        const libItm = new Library(lib.name, artifact.sha1, artifact.size, artifact.url, path.join(libPath, artifact.path))
        if(!AssetGuard._validateLocal(libItm.to, 'sha1', libItm.hash)){
fork icon0
star icon1
watch icon0

+ 15 other calls in file

1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
if (_shouldConditionallyDelete(request, locations)) {
    return _performConditionalDelete(
        request, response, locations, log, callback);
}
log.trace('batch delete locations', { locations });
return async.eachLimit(locations, 5, (loc, next) => {
    const _loc = Object.assign({}, loc);
    if (_loc.dataStoreVersionId !== undefined) {
        // required by cloud backends
        _loc.deleteVersion = true;
fork icon236
star icon0
watch icon89

+ 3 other calls in file

226
227
228
229
230
231
232
233
234
235
    });
    await db.deleteAll(deleteKeys);
};

async function removeTagsFromTopics(tags) {
    await async.eachLimit(tags, 50, async (tag) => {
        const tids = await db.getSortedSetRange(`tag:${tag}:topics`, 0, -1);
        if (!tids.length) {
            return;
        }
fork icon0
star icon0
watch icon1

+ 5 other calls in file

145
146
147
148
149
150
151
152
153
154
155
        await fs.promises.writeFile(data.destPath, output);
    }
};


actions.minifyJS_batch = async function minifyJS_batch(data) {
    await async.eachLimit(data.files, 100, async (fileObj) => {
        const source = await fs.promises.readFile(fileObj.srcPath, 'utf8');
        const filesToMinify = [
            {
                srcPath: fileObj.srcPath,
fork icon0
star icon0
watch icon1

44
45
46
47
48
49
50
51
52
53
    metricCollector[eMetricNames.activeTasks].inc(1);
},
this.dispose = (callback) => {
    logger.info(`${this.name}: Stop all tasks ...`);
    let keys = Object.keys(this.tasks);
    async.eachLimit(keys, 4, (key, next) => {
        let task = this.tasks[key];
        if (typeof task.dispose === 'function') {
            return task.dispose(next);
        }
fork icon0
star icon0
watch icon1

208
209
210
211
212
213
214
215
216
217
_raiseGracefulExitAlarm.call(this, () => {
    logger.info(`>>>>>> Stop all modules ...`);
    let layers = Object.keys(this._arch);
    async.eachSeries(layers, (layer, callback) => {
        logger.info(`>>>>>>>> ${layer}: Clean up modules ...`);
        async.eachLimit(this._arch[layer], 4, (m, next) => {
            if (typeof m.dispose !== 'function') {
                return process.nextTick(next);
            }
            return m.dispose(next);
fork icon0
star icon0
watch icon1

29
30
31
32
33
34
35
36
37
38
    return client;
}
this.dispose = (callback) => {
    logger.info(`${this.name}: Destroy all amqp clients ...`);
    let ids = Object.keys(this._clients);
    async.eachLimit(ids, 4, (id, next) => {
        let client = this._clients[id];
        if (client === undefined) {
            return process.nextTick(next);
        }
fork icon0
star icon0
watch icon1

144
145
146
147
148
149
150
151
152
153
    });
    return this._clients[name];
}
this.dispose = (callback) => {
    let names = Object.keys(this._clients);
    async.eachLimit(names, 2, (name, next) => {
        return this._clients[name].dispose(next);
    }, () => {
        return callback();
    });
fork icon0
star icon0
watch icon1

142
143
144
145
146
147
148
149
150

runPrereqs() {
  if (this.prereqs && this.prereqs.length) {

    if (this.concurrency > 1) {
      async.eachLimit(this.prereqs, this.concurrency,

        (name, cb) => {
          let parsed = parsePrereqName(name);
fork icon0
star icon0
watch icon1

25
26
27
28
29
30
31
32
33
34
35
    fs.readdir(path.resolve('lib', 'consumers'), (err, fileList) => {
      if (err) {
        return cb(err);
      }


      async.eachLimit(fileList, 1, initializeConsumer, cb);
    });
  });
}

fork icon0
star icon0
watch icon1