diff --git a/.idea/php.xml b/.idea/php.xml index b479234..03580e5 100644 --- a/.idea/php.xml +++ b/.idea/php.xml @@ -15,8 +15,11 @@ - + + + + diff --git a/README.md b/README.md index ebc8811..4bac9a7 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ TamerLib\Tamer::init(\TamerLib\Abstracts\ProtocolType::Gearman, [ ## Supported Protocols * [x] Gearman - * [x] RabbitMQ + * [ ] RabbitMQ (Work in progress) * [ ] Redis # License diff --git a/src/TamerLib/Objects/Job.php b/src/TamerLib/Objects/Job.php index f8b3db3..2598993 100644 --- a/src/TamerLib/Objects/Job.php +++ b/src/TamerLib/Objects/Job.php @@ -92,6 +92,7 @@ public function toArray(): array { return [ + 'type' => 'tamer_job', 'id' => $this->id, 'name' => $this->name, 'data' => ($this->closure ? serialize(new SerializableClosure($this->data)) : $this->data), diff --git a/src/TamerLib/Objects/JobResults.php b/src/TamerLib/Objects/JobResults.php index c78ac10..83def50 100644 --- a/src/TamerLib/Objects/JobResults.php +++ b/src/TamerLib/Objects/JobResults.php @@ -78,6 +78,7 @@ public function toArray(): array { return [ + 'type' => 'tamer_job_results', 'id' => $this->id, 'data' => $this->data, 'status' => $this->status diff --git a/src/TamerLib/Protocols/RabbitMq/Client.php b/src/TamerLib/Protocols/RabbitMq/Client.php index 622f350..9fc3580 100644 --- a/src/TamerLib/Protocols/RabbitMq/Client.php +++ b/src/TamerLib/Protocols/RabbitMq/Client.php @@ -7,7 +7,9 @@ use Closure; use Exception; use PhpAmqpLib\Message\AMQPMessage; + use TamerLib\Abstracts\ObjectType; use TamerLib\Classes\Functions; + use TamerLib\Classes\Validate; use TamerLib\Exceptions\ConnectionException; use TamerLib\Interfaces\ClientProtocolInterface; use TamerLib\Objects\Job; @@ -326,6 +328,7 @@ $this->preformAutoreconf(); $correlationIds = []; $connection = $this->connections[array_rand($this->connections)]; + if($this->automatic_reconnect) $connection->preformAutoreconf(); @@ -348,12 +351,32 @@ // Register callback for each task $callback = function($msg) use (&$correlationIds, $connection) { + var_dump(Validate::getObjectType(msgpack_unpack($msg->body))); + if(Validate::getObjectType(msgpack_unpack($msg->body)) !== ObjectType::JobResults) + { + $connection->getChannel()->basic_nack($msg->delivery_info['delivery_tag'], false, true); + return; + } + $job_result = JobResults::fromArray(msgpack_unpack($msg->body)); $task = $this->getTaskById($job_result->getId()); + if($task == null) + { + $connection->getChannel()->basic_nack($msg->delivery_info['delivery_tag'], false, true); + return; + } + try { - $task->runCallback($job_result); + if($task->isClosure()) + { + $task->runCallback($job_result->getData()); + } + else + { + $task->runCallback($job_result); + } } catch(Exception $e) { diff --git a/src/TamerLib/Protocols/RabbitMq/Worker.php b/src/TamerLib/Protocols/RabbitMq/Worker.php index d8f41a8..14e9da1 100644 --- a/src/TamerLib/Protocols/RabbitMq/Worker.php +++ b/src/TamerLib/Protocols/RabbitMq/Worker.php @@ -8,6 +8,8 @@ use LogLib\Log; use PhpAmqpLib\Message\AMQPMessage; use TamerLib\Abstracts\JobStatus; + use TamerLib\Abstracts\ObjectType; + use TamerLib\Classes\Validate; use TamerLib\Exceptions\ConnectionException; use TamerLib\Interfaces\WorkerProtocolInterface; use TamerLib\Objects\Job; @@ -69,6 +71,7 @@ public function __construct(?string $username = null, ?string $password = null) { $this->defined_servers = []; + $this->connections = []; $this->functions = []; $this->automatic_reconnect = true; $this->username = $username; @@ -282,10 +285,17 @@ return; // Select a random connection - $connection = $this->connections[array_rand($this->connections)]; + $connection = $this->connections[array_rand($this->connections)]; $callback = function($message) use ($throw_errors, $connection) { + var_dump(Validate::getObjectType(msgpack_unpack($message->body))); + if(Validate::getObjectType(msgpack_unpack($message->body)) !== ObjectType::Job) + { + $connection->getChannel()->basic_nack($message->delivery_info['delivery_tag']); + return; + } + $received_job = Job::fromArray(msgpack_unpack($message->body)); if($received_job->isClosure()) @@ -364,6 +374,7 @@ { break; } + $connection->getChannel()->wait(); } } diff --git a/tests/tamer.php b/tests/tamer.php index 6c201ea..60bf683 100644 --- a/tests/tamer.php +++ b/tests/tamer.php @@ -8,7 +8,7 @@ import('net.nosial.tamerlib', 'latest'); - Tamer::init(ProtocolType::Gearman, + Tamer::init(ProtocolType::RabbitMQ, ['127.0.0.1:4730'] //['127.0.0.1:5672'], 'guest', 'guest' );