Skip to content

Javascript promise pool

在日常搬砖中或者面试题都遇到过异步控制的需求,简单的比如直接处理成二维数组和 Promise.all 就完事了。 但是更好的方法是创建一个异步池效率会更佳,下面这个图片更加形象的说明了异步池和 Promise.all 分批的区别: promise pool figure 图片来自网络搜索。 使用浏览器加载大量资源的时候,也会看到类似的网络请求瀑布图。

控制并发数的有个很典型的场景,爬虫。nodejs 并没有像浏览器一样会限制并发数量,所以所有的请求一股脑全都发出去是非常不明智的行为。

最近搬砖的时候遇到了一个场景,页面上有很多图片需要加载,分批+懒加载优化之后,还是可能遇到同时加载超过六张图片。众所周知,Chrome 并发只有 6 个,如果所有的资源都给图片的话,会导致其他可能需要加载的资源就会卡住,而且可能会卡很久,所以很需要异步控制,趁着这个机会,把之前写过的代码整理一下。

代码

直接上 Typescript 的版本,适用的场景更多,给兄弟们复制粘贴的时候更加方便:wink:。

typescript
interface AsyncData<T> {
  success: boolean
  data?: T
}

function asyncPool<T>(asyncFns: (() => PromiseLike<T>)[], type = 'all', concurrency = 3, timeout = 0) {
  if (!asyncFns.length) return []
  let currentIndex = 0
  let runningJobs = 0
  const result: AsyncData<T>[] = []
  let timer: null | number = null
  return new Promise((resolve, reject) => {
    const responseHandler = (index: number, success: boolean) => (data: T) => {
      if (type === 'all' && !success) {
        if (timer) clearTimeout(timer)
        return reject(data)
      }
      result[index] = { data, success }
      runningJobs--
      getNewJob()
    }
    const getNewJob = () => {
      // clear timeout when all jobs are done
      if (currentIndex === asyncFns.length && runningJobs === 0) {
        if (timer) clearTimeout(timer)
        resolve(result)
      }
      while (runningJobs < concurrency && currentIndex < asyncFns.length) {
        asyncFns[currentIndex]().then(responseHandler(currentIndex, true), responseHandler(currentIndex, false))
        currentIndex++
        runningJobs++
      }
    }
    if (timeout > 0) {
      timer = setTimeout(() => reject('timeout'), timeout)
    }
    getNewJob()
  })
}

工具函数

为了测试方便,我们写一个生成 promise 的函数。

typescript
const createAsyncFn = (time: number, fail?: boolean) => () =>
  new Promise((resolve, reject) =>
    setTimeout(() => {
      if (fail) {
        reject(time)
      } else {
        resolve(time)
      }
    }, time)
  )

测试用例

使用 vitest,覆盖率 100%。

javascript
import { asyncPool, createAsyncFn } from './asyncPool.mjs'
import { describe, expect, vi, it } from 'vitest'
const maxAllowedDeviation = 100 // 允许最大误差。实际运行下来,50也是可以的
describe('tests of asyncPool', () => {
  it('should return [] when gives []', () => {
    expect(asyncPool([])).toEqual([])
  })

  it('all mode, should use time less than 980ms + 100ms', async () => {
    const fns = [
      createAsyncFn(300),
      createAsyncFn(100),
      createAsyncFn(200),

      createAsyncFn(300),
      createAsyncFn(280),
      createAsyncFn(150),

      createAsyncFn(300),
      createAsyncFn(400),
      createAsyncFn(500),
    ]
    // 300 + 150 + 400 = 850
    // 100 + 300 + 300 = 700
    // 200 + 280 + 500 = 980
    const startTime = Date.now()
    await asyncPool(fns)
    const endTime = Date.now()
    const elapsedTime = endTime - startTime
    const expectedTime = 980

    expect(elapsedTime).toBeGreaterThanOrEqual(expectedTime)
    expect(elapsedTime).toBeLessThanOrEqual(expectedTime + maxAllowedDeviation)
  })
  it('all mode, reject, should use time less than 850ms + 100ms', async () => {
    const fns = [
      createAsyncFn(300),
      createAsyncFn(100),
      createAsyncFn(200),

      createAsyncFn(300),
      createAsyncFn(280),
      createAsyncFn(150),

      createAsyncFn(300),
      createAsyncFn(400, true),
      createAsyncFn(500),
    ]
    // 300 + 150 + 400 = 850 reject here
    // 100 + 300 + 300 = 700
    // 200 + 280 + 500 = 980
    const startTime = Date.now()
    try {
      await asyncPool(fns)
    } catch (e) {}
    const endTime = Date.now()
    const elapsedTime = endTime - startTime
    const expectedTime = 850

    expect(elapsedTime).toBeGreaterThanOrEqual(expectedTime)
    expect(elapsedTime).toBeLessThanOrEqual(expectedTime + maxAllowedDeviation)
  })

  it('all mode with timeout', async () => {
    const fns = [
      createAsyncFn(300),
      createAsyncFn(100),
      createAsyncFn(200),

      createAsyncFn(300),
      createAsyncFn(280),
      createAsyncFn(150),

      createAsyncFn(300),
      createAsyncFn(400, true),
      createAsyncFn(500),
    ]
    // 300 + 150 + 400 = 850 reject here
    // 100 + 300 + 300 = 700
    // 200 + 280 + 500 = 980
    const startTime = Date.now()
    try {
      await asyncPool(fns, 'all', 3, 400)
    } catch (e) {}
    const endTime = Date.now()
    const elapsedTime = endTime - startTime
    const expectedTime = 400

    expect(elapsedTime).toBeGreaterThanOrEqual(expectedTime)
    expect(elapsedTime).toBeLessThanOrEqual(expectedTime + maxAllowedDeviation)
  })

  it('settle mode, should use time less than 1610ms + 100ms', async () => {
    const fns = [
      createAsyncFn(300),
      createAsyncFn(200),
      createAsyncFn(100),

      createAsyncFn(500),
      createAsyncFn(600),
      createAsyncFn(400, true),

      createAsyncFn(700),
      createAsyncFn(1000, true),
      createAsyncFn(800),
    ]
    // 0,300 + 5,400 + 8,1000 = 1710
    // 1,200 + 4,600 + 7,800 = 1600
    // 2,100 + 3,500 + 6,700 = 1300
    const startTime = Date.now()
    await asyncPool(fns, 'settle')
    const endTime = Date.now()
    const elapsedTime = endTime - startTime
    const expectedTime = 1710

    expect(elapsedTime).toBeGreaterThanOrEqual(expectedTime)
    expect(elapsedTime).toBeLessThanOrEqual(expectedTime + maxAllowedDeviation)
  })
})

总结

这个示例是拿到全部的 Promise 结果之后再统一返回,现实场景可能会更加复杂,比如可能需要维护一个 queue,动态的添加 promise,而且要每个 promise 执行完毕之后都直接返回,方便后续处理。

这里只实现了 Promise 异步控制的 Promise.allPromise.allSettled 两个场景的实现。其他实现比如 Promise.racePromise.any 一般来说很少有使用并发池的场景,可以尝试自己实现。

总之,这里的实现只是一个玩具类型的,实际项目推荐使用 promise-pool