返回
Featured image of post NodeJS 多线程编程

NodeJS 多线程编程

利用 worker_threads,充分利用多核CPU,提高计算效率

开发环境:Node.JS v14.8.0

整理自之前的《NodeJS多线程通信和共享内存 - worker_threads》&&《NodeJS多线程模块研究 - worker_threads》两篇文章。

快速开始

js 和 nodejs 一直都是单线程,直到官方推出了 worker_threads 模块,用来解决 CPU 密集型计算场景。

可以通过以下代码快速开启一个工作线程:

if (isMainThread) {
    // 这会在工作线程实例中重新加载当前文件。
    new Worker(__filename);
    console.log("在主进程中");
    console.log("isMainThread is", isMainThread);
    // Worker可以启动指定文件,也可以配合eval参数,直接启动代码
    // 由于在子线程中没有process.env,所以这里可以配合workerData选项
    // 那么在子线程中,workerData会按照HTML结构化克隆算法
    // 将workerData克隆到 require('worker_thread')中
} else {
    console.log("在工作线程中");
    console.log("isMainThread is", isMainThread); // 打印 'false'。
}

上面代码的输出是:

在主进程中
isMainThread is true
在工作线程中
isMainThread is false

多线程通信

父子线程通信-parentPort

master thread 中,Worker 返回的对象,代表着工作线程实例。

worker thread 中,require('worker_threads').parentPort 就是 master thread 的 MessagePort。

利用上面说到的两个对象,可以实现主线程和工作线程之间的“双工通信”。

const { Worker, isMainThread, parentPort } = require("worker_threads");

// 使用parentPort和worker进行双工通信
if (isMainThread) {
    const worker = new Worker(__filename);
    // 1. 针对工作线程(worker)开启消息监听
    worker.on("message", (message) => console.log(message));
    // 2. 像工作线程(worker)发送消息:"ping"
    worker.postMessage("ping");
} else {
    // 3. 针对主线程(parentPort)开启消息监听
    parentPort.on("message", (message) =>
        // 4. 接收到主线程的消息后,将其放入 pong 字段,并且发送给主线程(parentPort)
        parentPort.postMessage({ pong: message })
    );
}

上面输出:

{
    pong: "ping";
}
// handling

node 的设计参考了浏览器的 Web Worker

任意线程通信-MessageChannel

通信管道(MessageChannel)可以跨任何线程(比如多个工作线程之间)进行数据通信。对比上面通过require('worker_threads').parentPort 的方式,缺点是只能在主线程和工作线程之间进行通信。

管道通信的思路:

  • 通过 MessageChannel 创建通信管道
  • 通过 postMessage 传递数据
    • 第一个参数是传递的数据,包括通信管道的 Messageport
    • 第二个参数是 transformList,放入其中的对象,将在管道发送端中无法使用

来看一段利用通信管道,兄弟线程通信的实现:

const {
    isMainThread, parentPort, threadId, MessageChannel, Worker
} = require('worker_threads');
  
if (isMainThread) {
    const worker1 = new Worker(__filename);
    const worker2 = new Worker(__filename);
    const subChannel = new MessageChannel();
    worker1.postMessage({ yourPort: subChannel.port1 }, [subChannel.port1]);
    worker2.postMessage({ yourPort: subChannel.port2 }, [subChannel.port2]);
} else {
    parentPort.once('message', (value) => {
        value.yourPort.postMessage('hello');
        value.yourPort.on('message', msg => {
            console.log(`thread ${threadId}: receive ${msg}`);
        });
    });
}

控制台输出:

thread 1: receive hello
thread 2: receive hello

共享内存通信-SharedArrayBuffer

nodejs 多线程也可以通过“共享内存”进行通信。master thread 和 worker threads 都操作同一段物理存储上的数据,实现数据共享,并且避免了拷贝带来的开销。

共享内存通信的思路:

  • 通过 SharedArrayBuffer 创建二进制数据
  • SharedArrayBuffer 数据不能放在 postMessage 的第二个参数中

来看一段通过共享内存进行通信的代码:

const assert = require("assert");
const {
    Worker,
    MessageChannel,
    MessagePort,
    isMainThread,
    parentPort,
} = require("worker_threads");
if (isMainThread) {
    const worker = new Worker(__filename);
    const subChannel = new MessageChannel();
    // uint8Arr 是放在共享内存上的
    const sharedArray = new SharedArrayBuffer(4);
    const uint8Arr = new Uint8Array(sharedArray);
    console.log("[master] 原来的uint8Arr", uint8Arr);
    
    worker.postMessage({ hereIsYourPort: subChannel.port1, uint8Arr }, [
        subChannel.port1,
    ]);

    subChannel.port2.on("message", (msg) => {
        console.log("[master] 经过工作线程修改的uint8Arr", uint8Arr);
    });
} else {
    parentPort.on("message", (value) => {
        assert(value.hereIsYourPort instanceof MessagePort);
        value.uint8Arr[1] = 10;
        console.log("[worker] 修改出来的uint8Arr", value.uint8Arr);
        value.hereIsYourPort.postMessage("");
        value.hereIsYourPort.close();
    });
}

上面代码的输出是:

[master] 原来的uint8Arr Uint8Array(4) [ 0, 0, 0, 0 ]
[worker] 修改出来的uint8Arr Uint8Array(4) [ 0, 10, 0, 0 ]
[master] 经过工作线程修改的uint8Arr Uint8Array(4) [ 0, 10, 0, 0 ]

可以看到,worker thread 修改了 uint8Arr,由于采用共享内存,因此在 master thread 中,从自己的上下文中,也读取到修改后的 uint8Arr。

worker_threads 效果评测

单线程计算

以一个生成素数的计算为例,直接跑2-1e7范围内的素数:

const min = 2;
const max = 1e7;
const primes = [];
function generatePrimes(start, range) {
  let isPrime = true;
  let end = start + range;
  for (let i = start; i < end; i++) {
    for (let j = min; j < Math.sqrt(end); j++) {
      if (i !== j && i%j === 0) {
        isPrime = false;
        break;
      }
    }
    if (isPrime) {
      primes.push(i);
    }
    isPrime = true;
  }
}
generatePrimes(min, max);

通过 time 命令能看到,这里一共耗时10.36s,单核cpu占用率达到了99%

工作线程计算

主线程负责切割数据,将数据等分后,下发给工作线程。

工作线程从 workerData 上,读取计算范围,同时计算范围内的素数。

代码如下:

const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const min = 2;
let primes = [];
function generatePrimes(start, range) {
  let isPrime = true;
  let end = start + range;
  for (let i = start; i < end; i++) {
    for (let j = min; j < Math.sqrt(end); j++) {
      if (i !== j && i%j === 0) {
        isPrime = false;
        break;
      }
    }
    if (isPrime) {
      primes.push(i);
    }
    isPrime = true;
  }
}
if (isMainThread) {
  const max = 1e7;
  const threadCount = +process.argv[2] || 4;
  const threads = new Set();;
  console.log(`Running with ${threadCount} threads...`);
  const range = Math.ceil((max - min) / threadCount);
  let start = min;
  for (let i = 0; i < threadCount - 1; i++) {
    const myStart = start;
    threads.add(new Worker(__filename, { workerData: { start: myStart, range }}));
    start += range;
  }
  threads.add(new Worker(__filename, { workerData: { start, range: range + ((max - min + 1) % threadCount)}}));
  for (let worker of threads) {
    worker.on('error', (err) => { throw err; });
    worker.on('exit', () => {
      threads.delete(worker);
      console.log(`Thread exiting, ${threads.size} running...`);
      if (threads.size === 0) {
        // console.log(primes.join('\n'));
      }
    })
    worker.on('message', (msg) => {
      primes = primes.concat(msg);
    });
  }
} else {
  generatePrimes(workerData.start, workerData.range);
  parentPort.postMessage(primes);
}

在本地开启4个工作线程,并且本地6核CPU不处于忙碌的状态,最终耗时是2.658s,缩短了4倍。CPU利用率是 314%,,调用了3+CPU资源来支持本次计算:

可以看到,优化效果非常明显,能充分利用多核CPU的优势

worker_threads 的底层模型

对于单线程,Nodejs 是由以下部分组成:

  • 一个进程
  • 一个线程
  • 一个事件循环
  • 一个 JS 引擎实例
  • 一个 Node.js 实例

对于工作线程,Nodejs 组成变成了:

  • 一个进程
  • 多个线程
  • 每个线程独立的事件循环
  • 每个线程独立的 JS 引擎实例
  • 每个线程独立的 Node.js 实例

如下图所示:

Q & A

多线程是否可以并行?workder_threads 能否提高CPU利用率?

并发是快速切换任务,本质还是串行执行;并行是同时执行任务。

而多线程可以并发,也可以并行,这取决于是否能抢占到 CPU 资源。这个过程是操作系统来调度,不可人为控制。

但总的来说,多线程和多进程类似,都可以 CPU 的利用率。

workder_threads和cluster/child_process的区别

child_process是nodejs的多进程模块。cluster是在child_process上封装的集群模块,提供了多进程并行下的各种API。他们都是基于进程模型

worker_threads 基于线程模型,开销小,更加轻量,并且在同一个进程下,还可以通过共享内存来通信。适合解决CPU密集问题

参考文章

Built with Hugo
Theme Stack designed by Jimmy