背景需求:有非常多的数据库分析请求,串行执行太慢,并行执行容易打爆服务器资源。
已经有npm库提供了解决方案,比如p-limit。我们也可以自己手动实现一个任务池来实现并发限制。我看了一下代码实现,发现涉及了不少异步编程的概念,因此记录一下。
代码如下:
// test-concurrency.js
// 限流并发执行函数
async function runWithConcurrency(tasks, limit) {
const results = [];
const executing = [];
for (const task of tasks) {
const p = task().then(r => {
executing.splice(executing.indexOf(p), 1);
return r;
});
executing.push(p);
results.push(p);
if (executing.length >= limit) {
await Promise.race(executing); // 等待最先完成的任务释放位置
}
}
return Promise.all(results);
}
async function test() {
const concurrency = 2; // 限制同时最多执行2个任务
// 模拟异步任务
const task1 = () => new Promise(resolve => setTimeout(() => resolve('Task 1'), 1000));
const task2 = () => new Promise(resolve => setTimeout(() => resolve('Task 2'), 3000));
const task3 = () => new Promise(resolve => setTimeout(() => resolve('Task 3'), 2000));
const tasks = [task1, task2, task3];
const startTime = Date.now();
const results = await runWithConcurrency(tasks, concurrency);
const endTime = Date.now();
console.log(`\nAll tasks finished in ${endTime - startTime}ms`);
console.log('Results:', results);
}
test();
首先,tasks参数是一个包含异步任务的数组。每个任务都是一个返回Promise的函数。注意不能是已执行的Promise对象,否则就不能限制并发数了。
然后,task()启动任务,then里面是任务成功后的回调。回调的逻辑是把任务从executing数组中删除,并返回结果。删除这个操作,保证了executing数组有空位出来给下一个任务。
results数组保存了所有任务的Promise对象。await Promise.race(executing)会等待最先完成的任务完成,并触发上面的回调。
就这样,遍历完毕时,results数组中保存了所有任务,且是有序的。Promise.all(results)会等待所有任务完成,并返回结果数组。
Content licenced under CC BY-NC-ND 4.0