diff --git a/composer.json b/composer.json index 4017e49..3065321 100644 --- a/composer.json +++ b/composer.json @@ -26,6 +26,7 @@ "require": { "php": ">=8.3", "php-amqplib/php-amqplib": "^3.7", + "utopia-php/di": "0.3.*", "utopia-php/servers": "0.3.*", "utopia-php/fetch": "0.5.*", "utopia-php/pools": "1.*", diff --git a/composer.lock b/composer.lock index c4d7746..b3e5401 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "c2011149e90fdd25593cfd8db6ee2601", + "content-hash": "872b99c340c46cf1d625d9a14749e459", "packages": [ { "name": "brick/math", @@ -145,21 +145,24 @@ }, { "name": "google/protobuf", - "version": "v4.33.5", + "version": "v4.32.0", "source": { "type": "git", "url": "https://github.com/protocolbuffers/protobuf-php.git", - "reference": "ebe8010a61b2ae0cff0d246fe1c4d44e9f7dfa6d" + "reference": "9a9a92ecbe9c671dc1863f6d4a91ea3ea12c8646" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/protocolbuffers/protobuf-php/zipball/ebe8010a61b2ae0cff0d246fe1c4d44e9f7dfa6d", - "reference": "ebe8010a61b2ae0cff0d246fe1c4d44e9f7dfa6d", + "url": "https://api.github.com/repos/protocolbuffers/protobuf-php/zipball/9a9a92ecbe9c671dc1863f6d4a91ea3ea12c8646", + "reference": "9a9a92ecbe9c671dc1863f6d4a91ea3ea12c8646", "shasum": "" }, "require": { "php": ">=8.1.0" }, + "provide": { + "ext-protobuf": "*" + }, "require-dev": { "phpunit/phpunit": ">=5.0.0 <8.5.27" }, @@ -183,9 +186,9 @@ "proto" ], "support": { - "source": "https://github.com/protocolbuffers/protobuf-php/tree/v4.33.5" + "source": "https://github.com/protocolbuffers/protobuf-php/tree/v4.32.0" }, - "time": "2026-01-29T20:49:00+00:00" + "time": "2025-08-14T20:00:33+00:00" }, { "name": "nyholm/psr7", @@ -4508,5 +4511,5 @@ "platform-dev": { "ext-redis": "*" }, - "plugin-api-version": "2.6.0" + "plugin-api-version": "2.9.0" } diff --git a/src/Queue/Server.php b/src/Queue/Server.php index 4702ad8..f82b585 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -3,7 +3,9 @@ namespace Utopia\Queue; use Exception; +use Swoole\Coroutine; use Throwable; +use Utopia\DI\Container; use Utopia\Servers\Hook; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Telemetry\Adapter\None as NoTelemetry; @@ -12,6 +14,7 @@ class Server { + public const WORKER_CONTAINER_CONTEXT_KEY = '__utopia_queue_worker_container'; /** * Queue Adapter * @@ -61,17 +64,9 @@ class Server */ protected array $workerStopHooks = []; - /** - * @var array - */ - protected array $resources = [ - 'error' => null, - ]; - - /** - * @var array - */ - protected static array $resourcesCallbacks = []; + protected bool $coroutines = false; + protected Container $container; + protected ?Container $workerContainer = null; private Histogram $jobWaitTime; private Histogram $processDuration; @@ -79,89 +74,61 @@ class Server /** * Creates an instance of a Queue server. * @param Adapter $adapter + * @param Container|null $container */ - public function __construct(Adapter $adapter) + public function __construct(Adapter $adapter, ?Container $container = null) { $this->adapter = $adapter; + $this->container = $container ?? new Container(); $this->setTelemetry(new NoTelemetry()); } - public function job(): Job - { - $this->job = new Job(); - return $this->job; - } - /** - * If a resource has been created return it, otherwise create it and then return it + * Enable or disable coroutine mode for concurrent job processing. * - * @param string $name - * @param bool $fresh - * @return mixed - * @throws Exception + * When enabled, each coroutine gets its own child container via + * Swoole\Coroutine::getContext(), preventing shared state between + * concurrent jobs. Must be called before start(). + * + * @param bool $enable + * @return self */ - public function getResource(string $name, bool $fresh = false): mixed + public function setCoroutines(bool $enable): self { - if ( - !\array_key_exists($name, $this->resources) || - $fresh || - self::$resourcesCallbacks[$name]['reset'] - ) { - if (!\array_key_exists($name, self::$resourcesCallbacks)) { - throw new Exception("Failed to find resource: $name"); - } - - $this->resources[$name] = \call_user_func_array( - self::$resourcesCallbacks[$name]['callback'], - $this->getResources( - self::$resourcesCallbacks[$name]['injections'], - ), - ); - } - - self::$resourcesCallbacks[$name]['reset'] = false; - - return $this->resources[$name]; + $this->coroutines = $enable; + return $this; } - /** - * Get Resources By List - * - * @param array $list - * @return array - */ - public function getResources(array $list): array + public function job(): Job { - $resources = []; - - foreach ($list as $name) { - $resources[$name] = $this->getResource($name); - } - - return $resources; + $this->job = new Job(); + return $this->job; } /** - * Set a new resource callback + * Set a new resource on the container * * @param string $name * @param callable $callback * @param array $injections * - * @throws Exception - * * @return void */ - public static function setResource( + public function setResource( string $name, callable $callback, array $injections = [], ): void { - self::$resourcesCallbacks[$name] = [ - 'callback' => $callback, - 'injections' => $injections, - 'reset' => true, - ]; + $this->container->set($name, $callback, $injections); + } + + public function getContainer(): Container + { + if ($this->coroutines && \Swoole\Coroutine::getCid() !== -1) { + return \Swoole\Coroutine::getContext()[self::WORKER_CONTAINER_CONTEXT_KEY] ?? $this->container; + } + + return $this->workerContainer ?? $this->container; } public function setTelemetry(Telemetry $telemetry): void @@ -237,9 +204,9 @@ public function stop(): self try { $this->adapter->stop(); } catch (Throwable $error) { - self::setResource('error', fn () => $error); + $this->getContainer()->set('error', fn () => $error); foreach ($this->errorHooks as $hook) { - $hook->getAction()(...$this->getArguments($hook)); + $hook->getAction()(...$this->getArguments($this->getContainer(), $hook)); } } return $this; @@ -266,28 +233,33 @@ public function start(): self { try { $this->adapter->workerStart(function (string $workerId) { - self::setResource('workerId', fn () => $workerId); + $this->getContainer()->set('workerId', fn () => $workerId); foreach ($this->workerStartHooks as $hook) { - $hook->getAction()(...$this->getArguments($hook)); + $hook->getAction()(...$this->getArguments($this->getContainer(), $hook)); } $this->adapter->consumer->consume( $this->adapter->queue, function (Message $message) { + if ($this->coroutines && Coroutine::getCid() !== -1) { + Coroutine::getContext()[self::WORKER_CONTAINER_CONTEXT_KEY] = new Container($this->container); + } + $receivedAtTimestamp = microtime(true); try { $waitDuration = microtime(true) - $message->getTimestamp(); $this->jobWaitTime->record($waitDuration); - $this->resources = []; - self::setResource('message', fn () => $message); + $this->getContainer()->set('message', fn () => $message); + if ($this->job->getHook()) { foreach ($this->initHooks as $hook) { // Global init hooks if (\in_array('*', $hook->getGroups())) { $arguments = $this->getArguments( + $this->getContainer(), $hook, $message->getPayload(), ); @@ -301,6 +273,7 @@ function (Message $message) { // Group init hooks if (\in_array($group, $hook->getGroups())) { $arguments = $this->getArguments( + $this->getContainer(), $hook, $message->getPayload(), ); @@ -312,6 +285,7 @@ function (Message $message) { return \call_user_func_array( $this->job->getAction(), $this->getArguments( + $this->getContainer(), $this->job, $message->getPayload(), ), @@ -323,11 +297,14 @@ function (Message $message) { } }, function (Message $message) { + $this->getContainer()->set('message', fn () => $message); + if ($this->job->getHook()) { foreach ($this->shutdownHooks as $hook) { - // Global init hooks + // Global shutdown hooks if (\in_array('*', $hook->getGroups())) { $arguments = $this->getArguments( + $this->getContainer(), $hook, $message->getPayload(), ); @@ -338,9 +315,10 @@ function (Message $message) { foreach ($this->job->getGroups() as $group) { foreach ($this->shutdownHooks as $hook) { - // Group init hooks + // Group shutdown hooks if (\in_array($group, $hook->getGroups())) { $arguments = $this->getArguments( + $this->getContainer(), $hook, $message->getPayload(), ); @@ -350,23 +328,26 @@ function (Message $message) { } }, function (?Message $message, Throwable $th) { - self::setResource('error', fn () => $th); + $this->getContainer()->set('error', fn () => $th); + if ($message !== null) { + $this->getContainer()->set('message', fn () => $message); + } foreach ($this->errorHooks as $hook) { - $hook->getAction()(...$this->getArguments($hook)); + $hook->getAction()(...$this->getArguments($this->getContainer(), $hook)); } }, ); }); $this->adapter->workerStop(function (string $workerId) { - self::setResource('workerId', fn () => $workerId); + $this->getContainer()->set('workerId', fn () => $workerId); try { // Call user-defined workerStop hooks foreach ($this->workerStopHooks as $hook) { try { - $hook->getAction()(...$this->getArguments($hook)); + $hook->getAction()(...$this->getArguments($this->getContainer(), $hook)); } catch (Throwable $e) { } } @@ -378,9 +359,9 @@ function (?Message $message, Throwable $th) { $this->adapter->start(); } catch (Throwable $error) { - self::setResource('error', fn () => $error); + $this->getContainer()->set('error', fn () => $error); foreach ($this->errorHooks as $hook) { - $hook->getAction()(...$this->getArguments($hook)); + $hook->getAction()(...$this->getArguments($this->getContainer(), $hook)); } } return $this; @@ -431,11 +412,12 @@ public function getWorkerStop(): array /** * Get Arguments * + * @param Container $container * @param Hook $hook * @param array $payload * @return array */ - protected function getArguments(Hook $hook, array $payload = []): array + protected function getArguments(Container $container, Hook $hook, array $payload = []): array { $arguments = []; foreach ($hook->getParams() as $key => $param) { @@ -444,13 +426,13 @@ protected function getArguments(Hook $hook, array $payload = []): array $value = $value === '' || $value === null ? $param['default'] : $value; - $this->validate($key, $param, $value); + $this->validate($key, $param, $value, $container); $hook->setParamValue($key, $value); $arguments[$param['order']] = $value; } foreach ($hook->getInjections() as $key => $injection) { - $arguments[$injection['order']] = $this->getResource( + $arguments[$injection['order']] = $container->get( $injection['name'], ); } @@ -466,21 +448,21 @@ protected function getArguments(Hook $hook, array $payload = []): array * @param string $key * @param array $param * @param mixed $value + * @param Container $container * * @throws Exception * * @return void */ - protected function validate(string $key, array $param, mixed $value): void + protected function validate(string $key, array $param, mixed $value, Container $container): void { if ('' !== $value && $value !== null) { $validator = $param['validator']; // checking whether the class exists if (\is_callable($validator)) { - $validator = \call_user_func_array( - $validator, - $this->getResources($param['injections']), - ); + $validatorKey = '_validator:' . $key; + $container->set($validatorKey, $validator, $param['injections']); + $validator = $container->get($validatorKey); } if (!$validator instanceof Validator) {