Some updates
This commit is contained in:
parent
33bb2e5035
commit
f82f1ca26a
7 changed files with 44 additions and 5 deletions
5
.idea/php.xml
generated
5
.idea/php.xml
generated
|
@ -15,8 +15,11 @@
|
||||||
<path value="/etc/ncc" />
|
<path value="/etc/ncc" />
|
||||||
<path value="/var/ncc/packages/net.nosial.optslib=1.0.0" />
|
<path value="/var/ncc/packages/net.nosial.optslib=1.0.0" />
|
||||||
<path value="/var/ncc/packages/net.nosial.loglib=1.0.0" />
|
<path value="/var/ncc/packages/net.nosial.loglib=1.0.0" />
|
||||||
<path value="/var/ncc/packages/com.opis.closure=3.6.3" />
|
<path value="/var/ncc/packages/com.phpseclib.phpseclib=3.0.18" />
|
||||||
<path value="/var/ncc/packages/com.php_amqplib.php_amqplib=3.5.1" />
|
<path value="/var/ncc/packages/com.php_amqplib.php_amqplib=3.5.1" />
|
||||||
|
<path value="/var/ncc/packages/com.paragonie.random_compat=9.99.100" />
|
||||||
|
<path value="/var/ncc/packages/com.paragonie.constant_time_encoding=2.6.3" />
|
||||||
|
<path value="/var/ncc/packages/com.opis.closure=3.6.3" />
|
||||||
</include_path>
|
</include_path>
|
||||||
</component>
|
</component>
|
||||||
<component name="PhpProjectSharedConfiguration" php_language_level="8.1" />
|
<component name="PhpProjectSharedConfiguration" php_language_level="8.1" />
|
||||||
|
|
|
@ -44,7 +44,7 @@ TamerLib\Tamer::init(\TamerLib\Abstracts\ProtocolType::Gearman, [
|
||||||
## Supported Protocols
|
## Supported Protocols
|
||||||
|
|
||||||
* [x] Gearman
|
* [x] Gearman
|
||||||
* [x] RabbitMQ
|
* [ ] RabbitMQ (Work in progress)
|
||||||
* [ ] Redis
|
* [ ] Redis
|
||||||
|
|
||||||
# License
|
# License
|
||||||
|
|
|
@ -92,6 +92,7 @@
|
||||||
public function toArray(): array
|
public function toArray(): array
|
||||||
{
|
{
|
||||||
return [
|
return [
|
||||||
|
'type' => 'tamer_job',
|
||||||
'id' => $this->id,
|
'id' => $this->id,
|
||||||
'name' => $this->name,
|
'name' => $this->name,
|
||||||
'data' => ($this->closure ? serialize(new SerializableClosure($this->data)) : $this->data),
|
'data' => ($this->closure ? serialize(new SerializableClosure($this->data)) : $this->data),
|
||||||
|
|
|
@ -78,6 +78,7 @@
|
||||||
public function toArray(): array
|
public function toArray(): array
|
||||||
{
|
{
|
||||||
return [
|
return [
|
||||||
|
'type' => 'tamer_job_results',
|
||||||
'id' => $this->id,
|
'id' => $this->id,
|
||||||
'data' => $this->data,
|
'data' => $this->data,
|
||||||
'status' => $this->status
|
'status' => $this->status
|
||||||
|
|
|
@ -7,7 +7,9 @@
|
||||||
use Closure;
|
use Closure;
|
||||||
use Exception;
|
use Exception;
|
||||||
use PhpAmqpLib\Message\AMQPMessage;
|
use PhpAmqpLib\Message\AMQPMessage;
|
||||||
|
use TamerLib\Abstracts\ObjectType;
|
||||||
use TamerLib\Classes\Functions;
|
use TamerLib\Classes\Functions;
|
||||||
|
use TamerLib\Classes\Validate;
|
||||||
use TamerLib\Exceptions\ConnectionException;
|
use TamerLib\Exceptions\ConnectionException;
|
||||||
use TamerLib\Interfaces\ClientProtocolInterface;
|
use TamerLib\Interfaces\ClientProtocolInterface;
|
||||||
use TamerLib\Objects\Job;
|
use TamerLib\Objects\Job;
|
||||||
|
@ -326,6 +328,7 @@
|
||||||
$this->preformAutoreconf();
|
$this->preformAutoreconf();
|
||||||
$correlationIds = [];
|
$correlationIds = [];
|
||||||
$connection = $this->connections[array_rand($this->connections)];
|
$connection = $this->connections[array_rand($this->connections)];
|
||||||
|
|
||||||
if($this->automatic_reconnect)
|
if($this->automatic_reconnect)
|
||||||
$connection->preformAutoreconf();
|
$connection->preformAutoreconf();
|
||||||
|
|
||||||
|
@ -348,12 +351,32 @@
|
||||||
// Register callback for each task
|
// Register callback for each task
|
||||||
$callback = function($msg) use (&$correlationIds, $connection)
|
$callback = function($msg) use (&$correlationIds, $connection)
|
||||||
{
|
{
|
||||||
|
var_dump(Validate::getObjectType(msgpack_unpack($msg->body)));
|
||||||
|
if(Validate::getObjectType(msgpack_unpack($msg->body)) !== ObjectType::JobResults)
|
||||||
|
{
|
||||||
|
$connection->getChannel()->basic_nack($msg->delivery_info['delivery_tag'], false, true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
$job_result = JobResults::fromArray(msgpack_unpack($msg->body));
|
$job_result = JobResults::fromArray(msgpack_unpack($msg->body));
|
||||||
$task = $this->getTaskById($job_result->getId());
|
$task = $this->getTaskById($job_result->getId());
|
||||||
|
|
||||||
|
if($task == null)
|
||||||
|
{
|
||||||
|
$connection->getChannel()->basic_nack($msg->delivery_info['delivery_tag'], false, true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
$task->runCallback($job_result);
|
if($task->isClosure())
|
||||||
|
{
|
||||||
|
$task->runCallback($job_result->getData());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
$task->runCallback($job_result);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch(Exception $e)
|
catch(Exception $e)
|
||||||
{
|
{
|
||||||
|
|
|
@ -8,6 +8,8 @@
|
||||||
use LogLib\Log;
|
use LogLib\Log;
|
||||||
use PhpAmqpLib\Message\AMQPMessage;
|
use PhpAmqpLib\Message\AMQPMessage;
|
||||||
use TamerLib\Abstracts\JobStatus;
|
use TamerLib\Abstracts\JobStatus;
|
||||||
|
use TamerLib\Abstracts\ObjectType;
|
||||||
|
use TamerLib\Classes\Validate;
|
||||||
use TamerLib\Exceptions\ConnectionException;
|
use TamerLib\Exceptions\ConnectionException;
|
||||||
use TamerLib\Interfaces\WorkerProtocolInterface;
|
use TamerLib\Interfaces\WorkerProtocolInterface;
|
||||||
use TamerLib\Objects\Job;
|
use TamerLib\Objects\Job;
|
||||||
|
@ -69,6 +71,7 @@
|
||||||
public function __construct(?string $username = null, ?string $password = null)
|
public function __construct(?string $username = null, ?string $password = null)
|
||||||
{
|
{
|
||||||
$this->defined_servers = [];
|
$this->defined_servers = [];
|
||||||
|
$this->connections = [];
|
||||||
$this->functions = [];
|
$this->functions = [];
|
||||||
$this->automatic_reconnect = true;
|
$this->automatic_reconnect = true;
|
||||||
$this->username = $username;
|
$this->username = $username;
|
||||||
|
@ -282,10 +285,17 @@
|
||||||
return;
|
return;
|
||||||
|
|
||||||
// Select a random connection
|
// Select a random connection
|
||||||
$connection = $this->connections[array_rand($this->connections)];
|
$connection = $this->connections[array_rand($this->connections)];
|
||||||
|
|
||||||
$callback = function($message) use ($throw_errors, $connection)
|
$callback = function($message) use ($throw_errors, $connection)
|
||||||
{
|
{
|
||||||
|
var_dump(Validate::getObjectType(msgpack_unpack($message->body)));
|
||||||
|
if(Validate::getObjectType(msgpack_unpack($message->body)) !== ObjectType::Job)
|
||||||
|
{
|
||||||
|
$connection->getChannel()->basic_nack($message->delivery_info['delivery_tag']);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
$received_job = Job::fromArray(msgpack_unpack($message->body));
|
$received_job = Job::fromArray(msgpack_unpack($message->body));
|
||||||
|
|
||||||
if($received_job->isClosure())
|
if($received_job->isClosure())
|
||||||
|
@ -364,6 +374,7 @@
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
$connection->getChannel()->wait();
|
$connection->getChannel()->wait();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
|
|
||||||
import('net.nosial.tamerlib', 'latest');
|
import('net.nosial.tamerlib', 'latest');
|
||||||
|
|
||||||
Tamer::init(ProtocolType::Gearman,
|
Tamer::init(ProtocolType::RabbitMQ,
|
||||||
['127.0.0.1:4730']
|
['127.0.0.1:4730']
|
||||||
//['127.0.0.1:5672'], 'guest', 'guest'
|
//['127.0.0.1:5672'], 'guest', 'guest'
|
||||||
);
|
);
|
||||||
|
|
Loading…
Add table
Reference in a new issue