Promise 并发流程控制
简介
本篇文章的由来是一道经典前端面试题 控制n个异步任务并发执行
在面试中,如果没有事先准备,或者对promise
的理解不够深入,很容易因为思维混乱而出现问题(本人在面试中也掉过坑)
下面一起看几道题
题目一:设计一个异步队列,控制并发数量为n
const tasks = new Array(10).fill(1).map((x, idx) => {
return () => new Promise((resolve) => {
setTimeout(() => {
console.log('executed', idx)
resolve(idx)
}, 1000)
})
})
concurrency([...tasks], 2)
// 输出:
// ... 1s
// executed 0 executed 1
// ... 2s
// executed 2 executed 3
// ... 3s
// executed 4 executed 5
// ... 4s
// executed 6 executed 7
// ... 5s
// executed 8 executed 9
解答
方法一: 使用async/await
思路
- 使用了Set来存储当前正在执行的任务
- 并使用Promise.race来等待最先完成的任务
- 然后,当一个任务完成时,它会从Set中删除
- 如果当前正在执行的任务数量达到了并发限制,那么它会等待至少一个任务完成才继续添加新的任务
- 最后,使用Promise.all来等待所有剩余的任务完成
const concurrency = async (tasks, concurrency) => {
const executing = new Set()
for (let task of tasks) {
const e = Promise.resolve().then(() => task()).then(() => executing.delete(e)).catch(() => executing.delete(e))
executing.add(e)
while (executing.size >= concurrency) {
await Promise.race(executing)
}
}
await Promise.all(executing)
}
concurrency([...tasks], 2)
如果想执行完后收集任务返回值呢?
其实我们只需要在每个task的then中收集任务返回值,并在所有任务执行完毕后返回即可
const concurrency = async (tasks, limit) => {
const executing = new Set()
const result = []
for (let task of tasks) {
const e = Promise.resolve().then(() => task()).then((val) =>{
executing.delete(e)
result.push(val)
}).catch(() => {
// 记录日志..其他操作
executing.delete(e)
})
executing.add(e)
while (executing.size >= limit) {
await Promise.race(executing)
}
}
await Promise.all(executing)
return result
}
concurrency([...tasks], 2).then((res) => {
console.log("result", res)
})
方法二:不使用async/await
要实现不使用async/await
, 则我们需要使用递归的方式调用每个任务,并控制并发数量
const concurrency = (tasks, limit) => {
const executing = new Set()
const runTask = (tasks, limit) => {
if (executing.size < limit && tasks.length > 0) {
const task = tasks.shift()
executing.add(task)
Promise.resolve().then(() => task()).finally(() => {
executing.delete(task)
concurrency(tasks, limit)
})
}
}
for (let i = 0; i < Math.min(limit, tasks.length); i++) {
runTask(tasks, limit)
}
}
concurrency([...tasks], 2)
收集任务返回值
我们只需要将上面的整个逻辑包裹在Promise
中即可,最后任务完成时,我们收集任务返回值
const concurrency = (tasks, limit) => {
const executing = new Set()
const result = []
const n = tasks.length
return new Promise((resolve) => {
const runTask = (tasks, limit) => {
if (executing.size < limit && tasks.length > 0) {
const task = tasks.shift()
executing.add(task)
Promise.resolve().then(() => task()).then((val) => {
result.push(val)
if (result.length === n) {
resolve(result)
}
executing.delete(task)
runTask(tasks, limit)
})
}
}
for (let i = 0; i < Math.min(limit, tasks.length); i++) {
runTask(tasks, limit)
}
})
}
concurrency([...tasks], 2).then((res) => {
console.log("res", res)
})
题目二:设计一个异步事件队列,能够由任务本身控制后续流程
设计一个类似koa中间件模型的任务队列,只有上一个任务完成才能执行下一个任务
function fn1(next) {
console.log('fn1');
next();
}
function fn2(next) {
setTimeout(() => {
console.log('fn2');
next();
}, 1000)
}
function fn3(next) {
setTimeout(() => {
console.log('fn3');
next();
}, 1000);
}
function fn4(next) {
setTimeout(() => {
console.log('fn4');
next();
}, 1000);
}
const asyncQueue = (tasks, cb) => {
// 你的代码
}
asyncQueue([fn1, fn2, fn3, fn4], () => {
console.log("任务结束")
})
// 预期输出
// fn1
// 1s
// fn2
// 1s
// fn3
// 1s
// fn4
// 任务结束
解答
这个解答的实现其实比较接近koa的中间件模型,其中,设计最巧妙的是dispatch
函数,它是一个递归调用的过程,并且只有上一个任务执行完成才能执行下一个任务
function fn1(next) {
console.log('fn1');
next();
}
function fn2(next) {
setTimeout(() => {
console.log('fn2');
next();
}, 1000)
}
function fn3(next) {
setTimeout(() => {
console.log('fn3');
next();
}, 1000);
}
function fn4(next) {
setTimeout(() => {
console.log('fn4');
next();
}, 1000);
}
const asyncQueue = async (tasks, cb) => {
const dispatch = (i) => {
const t = tasks[i]
if (!t) {
cb()
return
}
t(() => dispatch(i + 1))
}
dispatch(0)
}
asyncQueue([fn1, fn2, fn3, fn4], () => {
console.log("任务结束")
})