Skip to content

Api allows workers to do initialization but it is not clear how to do cleanup #87

@bhovhannes

Description

@bhovhannes

Hi there,

Docs says that it is possible to do initialization inside worker. I am using that feature to create a database connection like this:

async function initialize() {
  const conn = await connectToDb();
  return (query) => conn.exec(query);
}

module.exports = initialize();

During its lifetime, pool may decide to close inactive threads. In that case I need to close database connection.
But currently I didn't found any api which allows to do that.

Currently I set minThreads and maxThreads to the same value and have the following as a workaround:

const Piscina = require("piscina")

const workerCount = 10;

const pool = new Piscina({
    filename: __dirname + "/searchMessagesWorker.cjs",
    minThreads: workerCount,
    maxThreads: workerCount,
    idleTimeout: Integer.MAX_SAFE_INTEGER
})

// this will be set to `false` each time user calls `pool.runTask`
let poolIsEmpty = true
pool.on('drain', () => {
    poolIsEmpty = true
})
    
function shutdown() {
    // drainPromise will be resolved as soon pool will be drained
    const drainPromise = new Promise((resolve) => {
        if (poolIsEmpty) {
           resolve()
        }
        else {
            pool.once('drain', resolve)
        }
    })

    return drainPromise.then(
        () => {
            // no tasks are scheduled on the pool at this moment.
            // Schedule "workerCount" amount "close" tasks in a single loop,
            // assuming Piscina will assign a single task to each thread.
            const promises = []
            for (let i = 0; i < workerCount; ++i) {
                // this tells worker to close DB connection
                promises.push(pool.runTask({
                    type: 'close'
                }))
            }
            return Promise.all(promises)
        }
    ).then(
        () => pool.destroy()
    )
}

Not only it is complicated, but also fragile (depending on how Piscina schedules tasks under the hood, setting idleTimeout to very high number). Also, this way it is not possible to have dynamic number of threads in the pool.

Interesting, but cleanup functionality is something which is missing in all worker pool implementations I can find on npm. I am wondering why?
Maybe there is a completely different way to do cleanup inside worker thread?

I'll be grateful for any explanation/reading resource on this topic.
If this is something which is in scope of Piscina package I am ready to work on PR as well.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions