Skip to content

refactor: replace static resource callbacks with DI Container#74

Open
ChiragAgg5k wants to merge 3 commits intomainfrom
feat/di-container-refactor
Open

refactor: replace static resource callbacks with DI Container#74
ChiragAgg5k wants to merge 3 commits intomainfrom
feat/di-container-refactor

Conversation

@ChiragAgg5k
Copy link
Member

@ChiragAgg5k ChiragAgg5k commented Mar 16, 2026

Summary

  • Replace static $resourcesCallbacks and $resources array with Utopia DI Container for dependency injection
  • Add utopia-php/di as an explicit dependency
  • Introduce workerContainer (child of global container) for worker-scoped resources (workerId, message, error)
  • Global resources are set on the root container, worker-specific resources on workerContainer
  • Add getContainer() method with Swoole coroutine context support
  • Pass Container explicitly to getArguments() and validate() instead of relying on static state

Test plan

  • Verify queue jobs process correctly with the new DI container
  • Confirm worker start/stop hooks receive correct worker-scoped resources
  • Validate error hooks still receive error and message resources
  • Test with Swoole coroutines enabled to verify context isolation

Summary by CodeRabbit

  • Chores

    • Added new runtime dependency: utopia-php/di (v0.3.*)
  • Refactor

    • Queue server reworked to use per-instance container-based resource and state management, with optional container injection and a coroutine-mode toggle for per-worker/per-coroutine isolation. Dependency injection, validation and resource resolution now run through the container, improving concurrency isolation and reliability for long-running workers.

Migrate Server from static resource management to Utopia DI Container.
Introduce workerContainer for worker-scoped resources while keeping the
global container for shared state.
@coderabbitai
Copy link

coderabbitai bot commented Mar 16, 2026

Walkthrough

This PR adds a runtime dependency utopia-php/di and refactors Queue Server to use a Container for dependency injection and per-worker state. The constructor now accepts an optional Container; a per-coroutine container mode (setCoroutines) and WORKER_CONTAINER_CONTEXT_KEY were added. Static/global resource management was removed: setResource is now an instance method storing callbacks in the container. Container is threaded through argument resolution, validation, hooks/actions, and transient worker data (workerId, message, error). Method signatures for getArguments and validate were updated to accept a Container, and getContainer() was added.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 70.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'refactor: replace static resource callbacks with DI Container' accurately describes the main change: migrating from static resource management to a dependency injection container-based approach.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/di-container-refactor
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
src/Queue/Server.php (2)

330-334: Silent exception swallowing in workerStop hooks.

Exceptions from workerStop hooks are silently caught and ignored. This could mask important errors during shutdown. Consider at minimum logging the exception.

♻️ Proposed improvement: Log or record the exception
                     try {
                         $hook->getAction()(...$this->getArguments($this->workerContainer, $hook));
                     } catch (Throwable $e) {
+                        // Consider logging: error_log("workerStop hook failed: " . $e->getMessage());
                     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Queue/Server.php` around lines 330 - 334, The workerStop hook loop
currently swallows all exceptions (catch (Throwable $e) { }) which hides
shutdown errors; update the catch to log the exception from $hook->getAction()
using an available logger (e.g. $this->logger->error) or a safe fallback
(error_log) and include $e->getMessage() and $e->getTraceAsString(); ensure you
still continue running other hooks and keep the invocation via
$this->getArguments($this->workerContainer, $hook) unchanged.

444-447: Validators accumulate in the container and may conflict.

Each call to validate() with a callable validator stores it in the container with key _validator:{key}. This has two concerns:

  1. Accumulation: Validator instances persist in the container, potentially growing unbounded over many job invocations.
  2. Key collision: If different hooks use the same param key with different validators, they'll share/overwrite the same container key.

Consider using a more unique key (include hook identity) or not storing validators in the container at all since they're transient.

♻️ Proposed fix: Invoke validator directly without storing
         if (\is_callable($validator)) {
-            $validatorKey = '_validator:' . $key;
-            $container->set($validatorKey, $validator, $param['injections']);
-            $validator = $container->get($validatorKey);
+            // Resolve injections and call the validator factory
+            $injectionValues = array_map(fn($name) => $container->get($name), $param['injections']);
+            $validator = $validator(...$injectionValues);
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Queue/Server.php` around lines 444 - 447, The validator instances are
being stored in the container under a shared key ($validatorKey = '_validator:'
. $key) via $container->set / $container->get which causes accumulation and
collisions; instead either invoke the callable validator directly (call
$validator with the param and injections) without storing it in the container,
or if you must register it, generate a truly unique key that includes the hook
identity (e.g. hook name/id or spl_object_hash($validator)) so it won't collide,
and ensure you remove/unset the key after invocation to avoid retention; update
the code paths around validate(), $validatorKey, $container->set and
$container->get to implement one of these approaches.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/Queue/Server.php`:
- Around line 236-237: The shared container is being mutated with
$this->getContainer()->set('message', ...) which can cause race conditions when
jobs run concurrently; instead, avoid writing to the global/shared container —
create an isolated per-job context (e.g., clone the container or create a
scoped/child container) and call set('message', ...) on that per-job container,
or store the message in coroutine-local storage and pass that scoped
container/context into the job processing flow so getContainer() is not mutated
globally; update the code paths that consume message to use the per-job
container/context rather than the shared $this->getContainer().
- Around line 311-319: The error and message are being stored on the shared
container inside the closure (see the anonymous function parameters,
getContainer()->set('error', fn () => $th) and set('message', ...)) which
creates a race when multiple jobs run concurrently; instead avoid mutating
shared container state by passing the error and message directly to each hook
invocation or by creating a per-hook scoped container before calling
$hook->getAction(). Concretely, update the loop that iterates $this->errorHooks
to either call $hook->getAction() with explicit parameters (e.g. append $message
and $th to the arguments returned by getArguments) or build a short-lived
cloned/scoped container containing only 'error' and 'message' for that single
hook call and do not write to the shared container; change getArguments or
call-site accordingly and remove the shared getContainer()->set(...) usage to
eliminate the race.
- Line 66: The $coroutines property on the Server class is never switched to
true so the coroutine branch in getContainer() is dead; add a public setter
(e.g., enableCoroutines() or setCoroutines(bool $enable)) on the class that sets
the protected bool $coroutines property and document intent, and update any
initialization path to call that setter when coroutine mode should be active;
reference the Server::$coroutines property and the Server::getContainer() method
so the coroutine check can become reachable.
- Around line 108-115: The coroutine context key WORKER_CONTAINER_CONTEXT_KEY
used by getContainer() is never set, so per-coroutine isolation never occurs;
populate \Swoole\Coroutine::getContext()[self::WORKER_CONTAINER_CONTEXT_KEY]
when a coroutine begins processing a job (e.g. in the job message consumer
callback or the worker loop that spawns coroutines) by creating a new Container
instance seeded from $this->workerContainer ?? $this->container and assigning it
into the coroutine context so getContainer() will return the per-coroutine
Container instead of the shared one.

---

Nitpick comments:
In `@src/Queue/Server.php`:
- Around line 330-334: The workerStop hook loop currently swallows all
exceptions (catch (Throwable $e) { }) which hides shutdown errors; update the
catch to log the exception from $hook->getAction() using an available logger
(e.g. $this->logger->error) or a safe fallback (error_log) and include
$e->getMessage() and $e->getTraceAsString(); ensure you still continue running
other hooks and keep the invocation via
$this->getArguments($this->workerContainer, $hook) unchanged.
- Around line 444-447: The validator instances are being stored in the container
under a shared key ($validatorKey = '_validator:' . $key) via $container->set /
$container->get which causes accumulation and collisions; instead either invoke
the callable validator directly (call $validator with the param and injections)
without storing it in the container, or if you must register it, generate a
truly unique key that includes the hook identity (e.g. hook name/id or
spl_object_hash($validator)) so it won't collide, and ensure you remove/unset
the key after invocation to avoid retention; update the code paths around
validate(), $validatorKey, $container->set and $container->get to implement one
of these approaches.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 53b554fa-b958-4789-813f-8534cbfffe92

📥 Commits

Reviewing files that changed from the base of the PR and between ffdc931 and 58f59a7.

⛔ Files ignored due to path filters (1)
  • composer.lock is excluded by !**/*.lock
📒 Files selected for processing (2)
  • composer.json
  • src/Queue/Server.php

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/Queue/Server.php (1)

67-67: ⚠️ Potential issue | 🟠 Major

Coroutine mode cannot be enabled from this class API.

$coroutines is always false here, so coroutine container logic is effectively dead unless something outside this class mutates the property.

🔧 Suggested fix
 protected bool $coroutines = false;
 protected Container $container;
 protected ?Container $workerContainer = null;
+
+public function setCoroutines(bool $enabled): self
+{
+    $this->coroutines = $enabled;
+    return $this;
+}

Also applies to: 218-220

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Queue/Server.php` at line 67, The Server class hardcodes protected bool
$coroutines = false so coroutine-mode code paths never run; change the class API
to allow enabling coroutines (either accept a bool $coroutines parameter in the
constructor and assign it to $this->coroutines, or add a public
setCoroutines(bool $enabled) method and use that before initialization), update
code that initializes the coroutine container to read $this->coroutines (e.g.,
in the constructor or init method that references the coroutine logic around the
same area as the code at lines 218-220), and ensure any callers
(factory/instantiation sites) pass the desired flag so the coroutine container
logic is reachable.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/Queue/Server.php`:
- Around line 109-115: The $workerContainer property is never initialized so
non-coroutine code falls back to the root container and pollutes global state;
initialize and assign a worker-scoped container instance to
$this->workerContainer before any worker-scoped writes occur (e.g., at worker
startup) and have getContainer() return that worker container when available
(preserve existing coroutine behavior using WORKER_CONTAINER_CONTEXT_KEY).
Concretely, when a worker is created or about to perform worker-scoped
operations, create/clone a child container and assign it to
$this->workerContainer so subsequent calls to getContainer() (and any write
paths noted in the review) use the worker-scoped container instead of the root
container.

---

Duplicate comments:
In `@src/Queue/Server.php`:
- Line 67: The Server class hardcodes protected bool $coroutines = false so
coroutine-mode code paths never run; change the class API to allow enabling
coroutines (either accept a bool $coroutines parameter in the constructor and
assign it to $this->coroutines, or add a public setCoroutines(bool $enabled)
method and use that before initialization), update code that initializes the
coroutine container to read $this->coroutines (e.g., in the constructor or init
method that references the coroutine logic around the same area as the code at
lines 218-220), and ensure any callers (factory/instantiation sites) pass the
desired flag so the coroutine container logic is reachable.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ce42f7d2-1ba4-486a-a807-0eb658c00071

📥 Commits

Reviewing files that changed from the base of the PR and between 58f59a7 and 17efc3f.

📒 Files selected for processing (1)
  • src/Queue/Server.php

Comment on lines +109 to +115
public function getContainer(): Container
{
if ($this->coroutines && \Swoole\Coroutine::getCid() !== -1) {
return \Swoole\Coroutine::getContext()[self::WORKER_CONTAINER_CONTEXT_KEY] ?? $this->container;
}

return $this->workerContainer ?? $this->container;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Initialize workerContainer before worker-scoped writes.

$workerContainer is never assigned, so non-coroutine paths write workerId/message/error into the root container. That breaks worker-vs-global scoping.

🔧 Suggested fix
 public function getContainer(): Container
 {
     if ($this->coroutines && \Swoole\Coroutine::getCid() !== -1) {
-        return \Swoole\Coroutine::getContext()[self::WORKER_CONTAINER_CONTEXT_KEY] ?? $this->container;
+        return \Swoole\Coroutine::getContext()[self::WORKER_CONTAINER_CONTEXT_KEY]
+            ?? $this->workerContainer
+            ?? $this->container;
     }

     return $this->workerContainer ?? $this->container;
 }
 $this->adapter->workerStart(function (string $workerId) {
-    $this->getContainer()->set('workerId', fn () => $workerId);
+    $this->workerContainer = new Container($this->container);
+    $this->workerContainer->set('workerId', fn () => $workerId);
 $this->adapter->workerStop(function (string $workerId) {
     $this->getContainer()->set('workerId', fn () => $workerId);
@@
     } finally {
         // Always close consumer connection, even if hooks throw
         $this->adapter->consumer->close();
+        $this->workerContainer = null;
     }
 });

Also applies to: 223-225, 327-329

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Queue/Server.php` around lines 109 - 115, The $workerContainer property
is never initialized so non-coroutine code falls back to the root container and
pollutes global state; initialize and assign a worker-scoped container instance
to $this->workerContainer before any worker-scoped writes occur (e.g., at worker
startup) and have getContainer() return that worker container when available
(preserve existing coroutine behavior using WORKER_CONTAINER_CONTEXT_KEY).
Concretely, when a worker is created or about to perform worker-scoped
operations, create/clone a child container and assign it to
$this->workerContainer so subsequent calls to getContainer() (and any write
paths noted in the review) use the worker-scoped container instead of the root
container.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/Queue/Server.php`:
- Around line 125-132: The workerContainer property is never initialized so
getContainer() falls back to the root container; in the workerStart callback
initialize a per-worker container (assign to $this->workerContainer) and seed it
with the worker-scoped entries (workerId, message, error) or copy/child the root
$this->container as appropriate, and in workerStop unset/cleanup
$this->workerContainer so it no longer persists; when coroutines are enabled
continue to use WORKER_CONTAINER_CONTEXT_KEY for coroutine-local access, but
ensure workerStart populates $this->workerContainer and workerStop clears it to
restore proper worker-scoped isolation.
- Around line 234-236: The container is being set in start() behind the false
condition Coroutine::getCid() !== -1 so WORKER_CONTAINER_CONTEXT_KEY is never
created for coroutines; move the container initialization into the message
consume callback (the function that runs per-message) so that inside the
callback where Coroutine::getCid() will be a coroutine id you set
Coroutine::getContext()[self::WORKER_CONTAINER_CONTEXT_KEY] = new
Container($this->container) before processing the message; update getContainer()
usage to rely on this per-coroutine context and keep the fallback to
$this->container only if the context key is absent.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 603e2a9c-e980-48dd-abda-cd778fbf6dda

📥 Commits

Reviewing files that changed from the base of the PR and between 17efc3f and cd3b960.

📒 Files selected for processing (1)
  • src/Queue/Server.php

Comment on lines +125 to 132
public function getContainer(): Container
{
if ($this->coroutines && \Swoole\Coroutine::getCid() !== -1) {
return \Swoole\Coroutine::getContext()[self::WORKER_CONTAINER_CONTEXT_KEY] ?? $this->container;
}

return $this->workerContainer ?? $this->container;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

workerContainer is never initialized, breaking worker-scoped isolation.

The workerContainer property remains null throughout execution. In workerStart callback (line 240), $this->getContainer() returns $this->container (root container) since workerContainer is never assigned. This causes worker-scoped resources (workerId, message, error) to pollute the root container.

🔧 Proposed fix

Initialize workerContainer in the workerStart callback and clean it up in workerStop:

 $this->adapter->workerStart(function (string $workerId) {
+    $this->workerContainer = new Container($this->container);
     $this->getContainer()->set('workerId', fn () => $workerId);
             } finally {
                 // Always close consumer connection, even if hooks throw
                 $this->adapter->consumer->close();
+                $this->workerContainer = null;
             }

Also applies to: 239-244

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Queue/Server.php` around lines 125 - 132, The workerContainer property is
never initialized so getContainer() falls back to the root container; in the
workerStart callback initialize a per-worker container (assign to
$this->workerContainer) and seed it with the worker-scoped entries (workerId,
message, error) or copy/child the root $this->container as appropriate, and in
workerStop unset/cleanup $this->workerContainer so it no longer persists; when
coroutines are enabled continue to use WORKER_CONTAINER_CONTEXT_KEY for
coroutine-local access, but ensure workerStart populates $this->workerContainer
and workerStop clears it to restore proper worker-scoped isolation.

Comment on lines +234 to +236
if ($this->coroutines && Coroutine::getCid() !== -1) {
Coroutine::getContext()[self::WORKER_CONTAINER_CONTEXT_KEY] = new Container($this->container);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check Swoole adapter implementation for coroutine spawning patterns
ast-grep --pattern $'go(function() {
  $$$
})'

# Also check consume implementation
rg -n 'consume|Coroutine::create|go\(' src/Queue/Adapter/Swoole.php -A5

Repository: utopia-php/queue

Length of output: 649


🏁 Script executed:

# Get full Swoole adapter implementation
cat -n src/Queue/Adapter/Swoole.php

Repository: utopia-php/queue

Length of output: 3275


🏁 Script executed:

# Check Server.php for getContainer implementation and context usage
rg -n 'getContainer|WORKER_CONTAINER_CONTEXT_KEY' src/Queue/Server.php -B2 -A5

Repository: utopia-php/queue

Length of output: 6611


🏁 Script executed:

# Look for message processing and coroutine patterns in Swoole adapter
rg -n 'function.*message|consume.*function|Coroutine::' src/Queue/Adapter/Swoole.php -B2 -A10

Repository: utopia-php/queue

Length of output: 1150


🏁 Script executed:

# Find Consumer interface/implementations
fd -t f -e php Consumer | head -20

Repository: utopia-php/queue

Length of output: 82


🏁 Script executed:

# Search for how consume() method works
rg -n 'interface Consumer|function consume|public function consume' -A8

Repository: utopia-php/queue

Length of output: 3031


🏁 Script executed:

# Check if there are any tests that show coroutine spawning patterns
rg -n 'consume.*function.*message|go\(|Coroutine::create' tests/ -A5

Repository: utopia-php/queue

Length of output: 511


Container context is never created because condition evaluates to false in main thread.

The condition at lines 234-236 checks Coroutine::getCid() !== -1, but start() is called from the main thread where getCid() returns -1. This means the container is never created in any coroutine context. Message processing callbacks later call getContainer() attempting to retrieve WORKER_CONTAINER_CONTEXT_KEY from the coroutine context, but since it was never set, they fall back to the shared $this->container, providing no isolation between concurrent message jobs.

Move the container creation into the message consume callback where the condition will evaluate to true:

🔧 Proposed fix
-        if ($this->coroutines && Coroutine::getCid() !== -1) {
-            Coroutine::getContext()[self::WORKER_CONTAINER_CONTEXT_KEY] = new Container($this->container);
-        }
-
         try {
             $this->adapter->workerStart(function (string $workerId) {
                     $this->adapter->queue,
                     function (Message $message) {
                         $receivedAtTimestamp = microtime(true);
                         try {
+                            if ($this->coroutines && Coroutine::getCid() !== -1) {
+                                Coroutine::getContext()[self::WORKER_CONTAINER_CONTEXT_KEY] = new Container($this->workerContainer ?? $this->container);
+                            }
+
                             $waitDuration =
                                 microtime(true) - $message->getTimestamp();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if ($this->coroutines && Coroutine::getCid() !== -1) {
Coroutine::getContext()[self::WORKER_CONTAINER_CONTEXT_KEY] = new Container($this->container);
}
try {
$this->adapter->workerStart(function (string $workerId) {
$this->workerContainer = new Container($this->container);
$this->workerContainer->set('workerId', fn () => $workerId);
});
$this->adapter->consume(
$this->adapter->queue,
function (Message $message) {
$receivedAtTimestamp = microtime(true);
try {
if ($this->coroutines && Coroutine::getCid() !== -1) {
Coroutine::getContext()[self::WORKER_CONTAINER_CONTEXT_KEY] = new Container($this->workerContainer ?? $this->container);
}
$waitDuration =
microtime(true) - $message->getTimestamp();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/Queue/Server.php` around lines 234 - 236, The container is being set in
start() behind the false condition Coroutine::getCid() !== -1 so
WORKER_CONTAINER_CONTEXT_KEY is never created for coroutines; move the container
initialization into the message consume callback (the function that runs
per-message) so that inside the callback where Coroutine::getCid() will be a
coroutine id you set Coroutine::getContext()[self::WORKER_CONTAINER_CONTEXT_KEY]
= new Container($this->container) before processing the message; update
getContainer() usage to rely on this per-coroutine context and keep the fallback
to $this->container only if the context key is absent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant