Skip to content

Commit d8811e1

Browse files
committed
New updates
1 parent c4e3303 commit d8811e1

13 files changed

+7479
-107
lines changed

README.md

+24
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,27 @@ mainWorker.getSharedData("test", msg => {
8787
console.log(msg); // { ok: "ok" }
8888
})
8989
```
90+
91+
92+
```javascript
93+
// main.js
94+
const { Pool } = require("ad-worker");
95+
96+
const threadPool = new Pool({ path: './test.js', quantityThread: 10 })
97+
threadPool.sendMessage('SELECT * FROM table', (res) => {
98+
console.log(res); // ['Sasha', 'Pasha', 'Oleg']
99+
})
100+
101+
// worker.js
102+
const { ChildWorker } = require('./index.js');
103+
const db = new Db();
104+
const childWorker = new ChildWorker();
105+
106+
childWorker.onMessage(async (msg) => {
107+
const res = await db.query(msg);
108+
childWorker.sendMessage(res);
109+
// Or
110+
childWorker.setSharedData(res);
111+
childWorker.sendMessage('done');
112+
});
113+
```

index.d.ts

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import {MainWorker} from './types/MainWorker'
2+
import {SharedData} from './types/SharedData'
3+
import {Pool} from './types/Pool'
4+
import {ChildWorker} from './types/ChildWorker'
5+
6+
export {
7+
MainWorker,
8+
SharedData,
9+
Pool,
10+
ChildWorker
11+
}

index.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
const SharedData = require("./libs/SharedData");
22
const ChildWorker = require("./libs/ChildWorker");
33
const MainWorker = require("./libs/MainWorker");
4+
const Pool = require("./libs/Pool");
45

56
module.exports = {
67
SharedData,
78
ChildWorker,
8-
MainWorker
9+
MainWorker,
10+
Pool
911
}

libs/ChildWorker.js

+12-9
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
"use strict";
22
const { parentPort, workerData } = require('node:worker_threads');
3-
const SharedData = require("./SharedData");
3+
const v8 = require('v8');
44
module.exports = class ChildWorker {
55
#sharedData;
6-
constructor(_workerData = {sharedData: {length: 1024, type: "int32"}, value: workerData || {}}) {
7-
this.#sharedData = new SharedData(_workerData.sharedData.length, _workerData.sharedData.type);
8-
this.#sharedData.add(_workerData.value || workerData);
6+
constructor() {
7+
this.#sharedData = Buffer.from(workerData || {})
98
}
109
onMessage(callback) {
1110
parentPort.on("message", callback);
@@ -20,29 +19,33 @@ module.exports = class ChildWorker {
2019
* @param {*} msg Your data
2120
*/
2221
setSharedData(msg) {
23-
this.#sharedData.add(msg);
22+
v8.serialize(msg).forEach((element, index) => {
23+
this.#sharedData[index] = element;
24+
});
2425
}
2526
getSharedData() {
26-
return this.#sharedData.deserialize();
27+
return v8.deserialize(
28+
Buffer.from(workerData)
29+
);
2730
}
2831
/**
2932
* @description Close thread
3033
*/
31-
close(){
34+
close() {
3235
parentPort.close();
3336
}
3437
/**
3538
* @param {callback|null} callback
3639
* @description Lock thread, if this owner
3740
*/
38-
lock(callback){
41+
lock(callback) {
3942
this.#sharedData.lock(callback)
4043
}
4144
/**
4245
* @param {callback|null} callback
4346
* @description Unlock thread, if this owner
4447
*/
45-
unlock(callback){
48+
unlock(callback) {
4649
this.#sharedData.unlock(callback)
4750
}
4851
}

libs/MainWorker.js

+39-42
Original file line numberDiff line numberDiff line change
@@ -3,36 +3,29 @@ const { Worker, isMainThread } = require('node:worker_threads');
33
const SharedData = require("./SharedData");
44

55
module.exports = class MainWorker {
6-
#group;
6+
#group = {};
77
#sharedData = {};
8-
/**
9-
* @param {Object} params
10-
* @param {Object[]} params.group
11-
* @param {string} params.group[].name
12-
* @param {string} params.group[].path
13-
* @param {Object} [params.group[].workerData]
14-
* @param {string} [params.group[].workerDataLink]
15-
* @param {number} params.group[].mutex
16-
* @param {Object} params.sharedData
17-
* @param {number} params.sharedData.length Size BufferArray
18-
* @param {'int8'|'int16'|'int32'|'uint8'|'uint16'|'uint32'|'float32'|'float64'} params.sharedData.type Type BufferArray
19-
*/
20-
constructor({ group = [], sharedData, mutex = 1 }) {
21-
this.path = __filename;
22-
this.#group = Object.assign({}, ...group.map(({ name, path, workerData, workerDataLink }) => {
23-
if (workerDataLink) {
24-
this.#sharedData[name] = this.#sharedData[workerDataLink];
25-
return { [name]: new Worker(path, { workerData: { sharedData: sharedData, value: this.#sharedData[name].na_get() } }) };
26-
}
27-
if (sharedData) {
28-
if (mutex < 1) return Error("MainWorker: mutex can not less one - " + name);
29-
this.#sharedData[name] = new SharedData(sharedData.length, sharedData.type);
30-
this.#sharedData[name].mutex(mutex);
31-
this.#sharedData[name].add(workerData);
32-
return { [name]: new Worker(path, { workerData: { sharedData: sharedData, value: this.#sharedData[name].na_get() } }) };
33-
}
34-
return { [name]: new Worker(path) };
35-
}));
8+
constructor(params) {
9+
if (params) {
10+
const { group = [], sharedData, mutex = 1 } = params;
11+
this.path = __filename;
12+
this.#group = Object.assign({}, ...group.map(({ name, path, workerData, workerDataLink }) => {
13+
if (workerDataLink) {
14+
this.#sharedData[name] = this.#sharedData[workerDataLink];
15+
return { [name]: new Worker(path, { workerData: { sharedData: sharedData, value: this.#sharedData[name].na_get() } }) };
16+
}
17+
if (sharedData) {
18+
if (mutex < 1) return Error("MainWorker: mutex can not less one - " + name);
19+
this.#sharedData[name] = new SharedData(sharedData.length, sharedData.type);
20+
// this.#sharedData[name].mutex(mutex);
21+
this.#sharedData[name].na_add(workerData);
22+
return { [name]: new Worker(path, { workerData: this.#sharedData[name].na_get() }) };
23+
}
24+
return { [name]: new Worker(path) };
25+
}));
26+
} else {
27+
28+
}
3629
}
3730

3831
/**
@@ -63,7 +56,7 @@ module.exports = class MainWorker {
6356
* @param {*} callback
6457
* @description Event "message"
6558
*/
66-
onceMessage(name, callback) {
59+
onceMessage(name, callback) {
6760
if (!this.#group[name]) {
6861
return Error("AddWorker: not found this name group - " + name);
6962
}
@@ -100,11 +93,15 @@ module.exports = class MainWorker {
10093
* @param {*} callback Fuction callback
10194
* @description Using for getting share data
10295
*/
103-
getSharedData(name, callback) {
96+
getSharedData(name) {
10497
if (!this.#group[name]) {
10598
return Error("AddWorker: not found this name group - " + name);
10699
}
107-
callback(this.#sharedData[name].deserialize());
100+
return this.#sharedData[name].deserialize()
101+
}
102+
103+
setSharedData(name, msg) {
104+
this.#sharedData[name].na_add(msg);
108105
}
109106

110107
/**
@@ -118,33 +115,33 @@ module.exports = class MainWorker {
118115
* @param {number} params.sharedData.length Size BufferArray
119116
* @param {'int8'|'int16'|'int32'|'uint8'|'uint16'|'uint32'|'float32'|'float64'} params.sharedData.type Type BufferArray
120117
*/
121-
newThread({ name, path, workerData, sharedData = {length: 1024, type: "int32"}, workerDataLink, mutex = 1 }) {
118+
newThread(params) {
119+
const { name, path, workerData, sharedData = { length: 1024, type: "int32" }, workerDataLink, mutex = 1 } = params;
122120
if (this.#group[name] !== undefined) return Error("It's name using");
123121
if (workerDataLink) {
124122
this.#group[name] = new Worker(path, this.#sharedData[workerDataLink]);
125123
return;
126124
}
127125
if (workerData) {
128126
const shar = new SharedData(sharedData.length, sharedData.type);
129-
shar.mutex(mutex);
130-
shar.add(workerData);
127+
// shar.mutex(mutex);
128+
shar.na_add(workerData);
131129
this.#sharedData[name] = shar;
132-
this.#group[name] = new Worker(path, { workerData: { sharedData: sharedData, value: this.#sharedData[name].na_get() } });
130+
this.#group[name] = new Worker(path, { workerData: this.#sharedData[name].na_get() });
133131
return;
134132
}
135133
const shar = new SharedData(sharedData.length, sharedData.type);
136-
shar.add({});
137-
shar.mutex(mutex);
134+
shar.na_add({});
135+
// shar.mutex(mutex);
138136
this.#sharedData[name] = shar;
139137
this.#group[name] = new Worker(path, { workerData: { sharedData: sharedData, value: this.#sharedData[name].na_get() } });
140138
}
141139

142-
spawn(callback){
143-
if(isMainThread){
140+
spawn(callback) {
141+
if (isMainThread) {
144142
new Worker((new Error()).stack.split("\n")[2].split("/").join("/").split("(")[1].split(":")[0]);
145-
}else {
143+
} else {
146144
return callback();
147145
}
148146
}
149-
pool(){}
150147
}

libs/Pool.js

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
"use strict";
2+
const { Worker } = require('node:worker_threads');
3+
const SharedData = require("./SharedData");
4+
5+
module.exports = class Pool {
6+
#pool = {};
7+
#sharedData;
8+
/**
9+
* @param {Object} params
10+
* @param {string} params.path
11+
* @param {number} params.quantityThread
12+
* @param {Object} params.sharedDatas
13+
* @param {number} params.sharedDatas.length Size BufferArray
14+
* @param {'int8'|'int16'|'int32'|'uint8'|'uint16'|'uint32'|'float32'|'float64'} params.sharedData.type Type BufferArray
15+
*/
16+
constructor({ path, sharedDatas, quantityThread, workerData }) {
17+
this.#sharedData = new SharedData(sharedDatas?.length || 1024, sharedDatas?.type || 'int32');
18+
this.#sharedData.mutex(1);
19+
this.#sharedData.na_add(workerData || {});
20+
for (let i = 0; i < quantityThread; i++) {
21+
this.#pool[i] =
22+
{
23+
thread: new Worker(path, { workerData: this.#sharedData.na_get() }),
24+
count: 0,
25+
id: i
26+
}
27+
}
28+
}
29+
/**
30+
* @param {*} message Your message (string|object|nubmer)
31+
*/
32+
sendMessage(message, callback) {
33+
const pool = this.#choiceThread()
34+
pool.count += 1;
35+
pool.thread.postMessage(message);
36+
const _cb = (res) => {
37+
pool.count -= 1;
38+
return callback( pool.id + " " +res)
39+
}
40+
pool.thread.once("message", _cb);
41+
}
42+
getSharedData() {
43+
return this.#sharedData.deserialize()
44+
}
45+
stopAllThread() {
46+
for (const key of Object.keys(this.#pool)) {
47+
this.#pool[key].thread.terminate()
48+
}
49+
}
50+
#choiceThread() {
51+
let min = 999999
52+
let id
53+
for (const key of Object.keys(this.#pool)) {
54+
if(this.#pool[key].count < min){
55+
min = this.#pool[key].count
56+
id = this.#pool[key].id
57+
}
58+
}
59+
return this.#pool[id]
60+
}
61+
}

0 commit comments

Comments
 (0)