Profile
GitHub

Promise 并发流程控制

简介

本篇文章的由来是一道经典前端面试题 控制n个异步任务并发执行

在面试中,如果没有事先准备,或者对promise的理解不够深入,很容易因为思维混乱而出现问题(本人在面试中也掉过坑)

下面一起看几道题

题目一:设计一个异步队列,控制并发数量为n

const tasks = new Array(10).fill(1).map((x, idx) => {
    return () => new Promise((resolve) => {
        setTimeout(() => {
            console.log('executed', idx)
            resolve(idx)
        }, 1000)
    })
})

concurrency([...tasks], 2)

// 输出:
// ... 1s
// executed 0 executed 1
// ... 2s
// executed 2 executed 3
// ... 3s
// executed 4 executed 5
// ... 4s
// executed 6 executed 7
// ... 5s
// executed 8 executed 9

解答

方法一: 使用async/await

思路

  • 使用了Set来存储当前正在执行的任务
  • 并使用Promise.race来等待最先完成的任务
  • 然后,当一个任务完成时,它会从Set中删除
  • 如果当前正在执行的任务数量达到了并发限制,那么它会等待至少一个任务完成才继续添加新的任务
  • 最后,使用Promise.all来等待所有剩余的任务完成
const concurrency = async (tasks, concurrency) => {
    const executing = new Set()

    for (let task of tasks) {
        const e = Promise.resolve().then(() => task()).then(() => executing.delete(e)).catch(() => executing.delete(e))

        executing.add(e)
        while (executing.size >= concurrency) {
            await Promise.race(executing)
        }
    }

    await Promise.all(executing)
}   

concurrency([...tasks], 2)

如果想执行完后收集任务返回值呢?

其实我们只需要在每个task的then中收集任务返回值,并在所有任务执行完毕后返回即可

const concurrency = async (tasks, limit) => {
    const executing = new Set()
    const result = []

    for (let task of tasks) {
        const e = Promise.resolve().then(() => task()).then((val) =>{ 
            executing.delete(e)
            result.push(val)
        }).catch(() => {
            // 记录日志..其他操作
            executing.delete(e)
        })

        executing.add(e)
        while (executing.size >= limit) {
            await Promise.race(executing)
        }
    }

    await Promise.all(executing)
    return result
}  

concurrency([...tasks], 2).then((res) => {
    console.log("result", res)
})

方法二:不使用async/await

要实现不使用async/await, 则我们需要使用递归的方式调用每个任务,并控制并发数量

const concurrency = (tasks, limit) => {
    const executing = new Set()

    const runTask = (tasks, limit) => {
        if (executing.size < limit && tasks.length > 0) {

            const task = tasks.shift()
            executing.add(task)
    
            Promise.resolve().then(() => task()).finally(() => {
                executing.delete(task)
                concurrency(tasks, limit)
            })
        }
    }

    for (let i = 0; i < Math.min(limit, tasks.length); i++) {
        runTask(tasks, limit)
    }
}  

concurrency([...tasks], 2)

收集任务返回值

我们只需要将上面的整个逻辑包裹在Promise中即可,最后任务完成时,我们收集任务返回值

const concurrency = (tasks, limit) => {
    const executing = new Set()
    const result = []
    const n = tasks.length

    return new Promise((resolve) => {
        const runTask = (tasks, limit) => {

            if (executing.size < limit && tasks.length > 0) {
    
                const task = tasks.shift()
                executing.add(task)
        
                Promise.resolve().then(() => task()).then((val) => {
                    result.push(val)

                    if (result.length === n) {
                        resolve(result)
                    }

                    executing.delete(task)
                    runTask(tasks, limit)
                })
            }
    
        }
    
        for (let i = 0; i < Math.min(limit, tasks.length); i++) {
            runTask(tasks, limit)
        }
    })
}  

concurrency([...tasks], 2).then((res) => {
    console.log("res", res)
})

题目二:设计一个异步事件队列,能够由任务本身控制后续流程

设计一个类似koa中间件模型的任务队列,只有上一个任务完成才能执行下一个任务

function fn1(next) {
    console.log('fn1');
    next();
  }
  
  function fn2(next) {
    setTimeout(() => {
      console.log('fn2');
      next();
    }, 1000)
  }
  
  function fn3(next) {
    setTimeout(() => {
      console.log('fn3');
      next();
    }, 1000);
  }
  
  function fn4(next) {
    setTimeout(() => {
      console.log('fn4');
      next();
    }, 1000);
  }

const asyncQueue = (tasks, cb) => {
    // 你的代码
}


asyncQueue([fn1, fn2, fn3, fn4], () => {
    console.log("任务结束")
})

// 预期输出
// fn1
// 1s
// fn2
// 1s
// fn3
// 1s
// fn4
// 任务结束

解答

这个解答的实现其实比较接近koa的中间件模型,其中,设计最巧妙的是dispatch函数,它是一个递归调用的过程,并且只有上一个任务执行完成才能执行下一个任务

  
  function fn1(next) {
    console.log('fn1');
    next();
  }
  
  function fn2(next) {
    setTimeout(() => {
      console.log('fn2');
      next();
    }, 1000)
  }
  
  function fn3(next) {
    setTimeout(() => {
      console.log('fn3');
      next();
    }, 1000);
  }
  
  function fn4(next) {
    setTimeout(() => {
      console.log('fn4');
      next();
    }, 1000);
  }


const asyncQueue = async (tasks, cb) => {
    const dispatch = (i) => {
        const t = tasks[i]
        if (!t) {
            cb()
            return 
        }
        t(() => dispatch(i + 1))
    }

    dispatch(0)
}


  asyncQueue([fn1, fn2, fn3, fn4], () => {
    console.log("任务结束")
  })