diff --git a/.idea/php.xml b/.idea/php.xml
index e01b5fd..2c4b796 100644
--- a/.idea/php.xml
+++ b/.idea/php.xml
@@ -15,9 +15,15 @@
+
+
+
+
+
+
diff --git a/.idea/tamer.iml b/.idea/tamer.iml
index 4348106..24c8dff 100644
--- a/.idea/tamer.iml
+++ b/.idea/tamer.iml
@@ -4,6 +4,7 @@
+
diff --git a/project.json b/project.json
index e57e387..26ada6b 100644
--- a/project.json
+++ b/project.json
@@ -40,6 +40,12 @@
"version": "latest",
"source_type": "remote",
"source": "nosial/libs.log=latest@n64"
+ },
+ {
+ "name": "com.opis.closure",
+ "version": "latest",
+ "source_type": "remote",
+ "source": "opis/closure=latest@composer"
}
],
"configurations": [
diff --git a/src/Tamer/Interfaces/ClientProtocolInterface.php b/src/Tamer/Interfaces/ClientProtocolInterface.php
index d0e2956..e03baff 100644
--- a/src/Tamer/Interfaces/ClientProtocolInterface.php
+++ b/src/Tamer/Interfaces/ClientProtocolInterface.php
@@ -2,7 +2,70 @@
namespace Tamer\Interfaces;
+ use Tamer\Objects\Task;
+
interface ClientProtocolInterface
{
+ /**
+ * Adds options to the client (client specific)
+ *
+ * @param array $options
+ * @return bool
+ */
+ public function addOptions(array $options): bool;
+ /**
+ * Adds a server to the list of servers to use
+ *
+ * @param string $host The host to connect to (eg; 127.0.0.1)
+ * @param int $port The port to connect to (eg; 4730)
+ * @return bool
+ */
+ public function addServer(string $host, int $port): bool;
+
+ /**
+ * Adds a list of servers to the list of servers to use
+ *
+ * @param array $servers An array of servers to connect to (eg; ['host:port', 'host:port', ...])
+ * @return bool
+ */
+ public function addServers(array $servers): bool;
+
+ /**
+ * Processes a task in the background (does not return a result)
+ *
+ * @param Task $task The task to process
+ * @return void
+ */
+ public function doBackground(Task $task): void;
+
+ /**
+ * Queues a task to be processed in parallel (returns a result handled by a callback)
+ *
+ * @param Task $task
+ * @return void
+ */
+ public function addTask(Task $task): void;
+
+ /**
+ * Executes all tasks in the queue and waits for them to complete
+ *
+ * @return bool
+ */
+ public function run(): bool;
+
+ /**
+ * Returns True if the client is set to automatically reconnect to the server after a period of time
+ *
+ * @return bool
+ */
+ public function isAutomaticReconnect(): bool;
+
+ /**
+ * Enables or disables automatic reconnecting to the server after a period of time
+ *
+ * @param bool $automatic_reconnect
+ * @return void
+ */
+ public function setAutomaticReconnect(bool $automatic_reconnect): void;
}
\ No newline at end of file
diff --git a/src/Tamer/Objects/Job.php b/src/Tamer/Objects/Job.php
index 36b9de0..8053815 100644
--- a/src/Tamer/Objects/Job.php
+++ b/src/Tamer/Objects/Job.php
@@ -27,14 +27,24 @@
*/
private $data;
- public function __construct(string $id, string $name, string $data)
+ /**
+ * Indicates if the data is a closure
+ *
+ * @var bool
+ */
+ private $closure;
+
+ public function __construct(Task $task)
{
- $this->id = $id;
- $this->name = $name;
- $this->data = $data;
+ $this->id = $task->getId();
+ $this->name = $task->getFunctionName();
+ $this->data = $task->getData();
+ $this->closure = $task->isClosure();
}
/**
+ * Returns the ID of the Job
+ *
* @return string
*/
public function getId(): string
@@ -43,6 +53,8 @@
}
/**
+ * Returns the function name of the Job
+ *
* @return string
*/
public function getName(): string
@@ -51,10 +63,56 @@
}
/**
+ * Returns the data of the Job
+ *
* @return string
*/
public function getData(): string
{
return $this->data;
}
+
+ /**
+ * @return bool
+ */
+ public function isClosure(): bool
+ {
+ return $this->closure;
+ }
+
+ /**
+ * Returns an array representation of the Job
+ *
+ * @return array
+ */
+ public function toArray(): array
+ {
+ return [
+ 'id' => $this->id,
+ 'name' => $this->name,
+ 'data' => ($this->closure ? \Opis\Closure\serialize($this->data) : $this->data),
+ 'closure' => $this->closure
+ ];
+ }
+
+ /**
+ * Constructs a Job from an array
+ *
+ * @param array $data
+ * @return Job
+ */
+ public static function fromArray(array $data): Job
+ {
+ $data = $data['data'];
+
+ if($data['closure'] === true)
+ $data = \Opis\Closure\unserialize($data['data']);
+
+ $job = new Job(new Task($data['name'], $data['data']));
+ $job->id = $data['id'];
+ $job->closure = $data['closure'];
+
+ return $job;
+ }
+
}
\ No newline at end of file
diff --git a/src/Tamer/Objects/JobResults.php b/src/Tamer/Objects/JobResults.php
index 09d8f59..2f9049d 100644
--- a/src/Tamer/Objects/JobResults.php
+++ b/src/Tamer/Objects/JobResults.php
@@ -1,48 +1,69 @@
task = $task;
- $this->status = $status;
- $this->result = $result;
+ if($job !== null)
+ {
+ $this->id = $job->getId();
+ $this->data = $results;
+ $this->status = $status;
+ }
}
/**
- * @return Task
+ * Returns the ID of the Job
+ *
+ * @return string
*/
- public function getTask(): Task
+ public function getId(): string
{
- return $this->task;
+ return $this->id;
+ }
+
+
+ /**
+ * Returns the data of the Job
+ *
+ * @return string
+ */
+ public function getData(): string
+ {
+ return $this->data;
}
/**
* @return int
+ * @noinspection PhpUnused
*/
public function getStatus(): int
{
@@ -50,10 +71,59 @@
}
/**
- * @return string|null
+ * Returns an array representation of the Job
+ *
+ * @return array
*/
- public function getResult(): ?string
+ public function toArray(): array
{
- return $this->result;
+ return [
+ 'id' => $this->id,
+ 'data' => $this->data,
+ 'status' => $this->status
+ ];
}
+
+ /**
+ * Constructs a Job from an array
+ *
+ * @param array $data
+ * @return JobResults
+ */
+ public static function fromArray(array $data): JobResults
+ {
+ $job = new JobResults();
+
+ $job->setId($data['id']);
+ $job->setData($data['data']);
+ $job->setStatus($data['status']);
+
+ return $job;
+ }
+
+ /**
+ * @param string $id
+ */
+ protected function setId(string $id): void
+ {
+ $this->id = $id;
+ }
+
+ /**
+ * @param string $data
+ */
+ protected function setData(string $data): void
+ {
+ $this->data = $data;
+ }
+
+ /**
+ * @param int|null $status
+ */
+ protected function setStatus(?int $status): void
+ {
+ $this->status = $status;
+ }
+
+
}
\ No newline at end of file
diff --git a/src/Tamer/Objects/Task.php b/src/Tamer/Objects/Task.php
index 4cfcbf5..54152ce 100644
--- a/src/Tamer/Objects/Task.php
+++ b/src/Tamer/Objects/Task.php
@@ -33,6 +33,11 @@
*/
private $callback;
+ /**
+ * @var bool
+ */
+ private $closure;
+
/**
* Public Constructor
*
@@ -47,6 +52,7 @@
$this->id = uniqid();
$this->priority = TaskPriority::Normal;
$this->callback = $callback;
+ $this->closure = false;
}
/**
@@ -147,4 +153,20 @@
call_user_func($this->callback, $result);
}
}
+
+ /**
+ * @return bool
+ */
+ public function isClosure(): bool
+ {
+ return $this->closure;
+ }
+
+ /**
+ * @param bool $closure
+ */
+ public function setClosure(bool $closure): void
+ {
+ $this->closure = $closure;
+ }
}
\ No newline at end of file
diff --git a/src/Tamer/Protocols/GearmanClient.php b/src/Tamer/Protocols/GearmanClient.php
index 1abe10a..37ec3ce 100644
--- a/src/Tamer/Protocols/GearmanClient.php
+++ b/src/Tamer/Protocols/GearmanClient.php
@@ -7,10 +7,12 @@
use Exception;
use GearmanTask;
use LogLib\Log;
+ use Opis\Closure\SerializableClosure;
use Tamer\Abstracts\JobStatus;
use Tamer\Abstracts\TaskPriority;
use Tamer\Exceptions\ServerException;
use Tamer\Interfaces\ClientProtocolInterface;
+ use Tamer\Objects\Job;
use Tamer\Objects\JobResults;
use Tamer\Objects\Task;
@@ -51,6 +53,7 @@
$this->tasks = [];
$this->automatic_reconnect = false;
$this->next_reconnect = time() + 1800;
+ $this->server_cache = [];
try
{
@@ -80,6 +83,11 @@
return $this->client->addOptions($options);
}
+ /**
+ * Registers callbacks for the client
+ *
+ * @return void
+ */
private function registerCallbacks(): void
{
$this->client->setCompleteCallback([$this, 'callbackHandler']);
@@ -134,14 +142,28 @@
return $this->client->addServers(implode(',', $servers));
}
+ /**
+ * Executes a closure in the background
+ *
+ * @param callable $function
+ * @return void
+ * @throws ServerException
+ */
+ public function closure(callable $function): void
+ {
+ $closure_task = new Task('tamer_closure', \Opis\Closure\serialize(new SerializableClosure($function)));
+ $closure_task->setClosure(true);
+ $this->doBackground($closure_task);
+ }
+
/**
* Processes a task in the background
*
* @param Task $task
- * @return bool
+ * @return void
* @throws ServerException
*/
- public function doBackground(Task $task): bool
+ public function doBackground(Task $task): void
{
if($this->automatic_reconnect && time() > $this->next_reconnect)
{
@@ -150,29 +172,34 @@
}
$this->tasks[] = $task;
+ $job = new Job($task);
switch($task->getPriority())
{
case TaskPriority::High:
- return $this->client->doHighBackground($task->getFunctionName(), $task->getData(), $task->getId());
+ $this->client->doHighBackground($task->getFunctionName(), msgpack_pack($job->toArray()));
+ break;
case TaskPriority::Low:
- return $this->client->doLowBackground($task->getFunctionName(), $task->getData(), $task->getId());
+ $this->client->doLowBackground($task->getFunctionName(), msgpack_pack($job->toArray()));
+ break;
default:
case TaskPriority::Normal:
- return $this->client->doBackground($task->getFunctionName(), $task->getData(), $task->getId());
+ $this->client->doBackground($task->getFunctionName(), msgpack_pack($job->toArray()));
+ break;
}
+
}
/**
- * Processes a task in the foreground
+ * Adds a task to the list of tasks to run
*
* @param Task $task
- * @return JobResults
+ * @return void
* @throws ServerException
*/
- public function do(Task $task): JobResults
+ public function addTask(Task $task): void
{
if($this->automatic_reconnect && time() > $this->next_reconnect)
{
@@ -181,85 +208,45 @@
}
$this->tasks[] = $task;
+ $job = new Job($task);
switch($task->getPriority())
{
case TaskPriority::High:
- return new JobResults($task, JobStatus::Success, $this->client->doHigh($task->getFunctionName(), $task->getData(), $task->getId()));
+ $this->client->addTaskHigh($task->getFunctionName(), msgpack_pack($job->toArray()));
+ break;
case TaskPriority::Low:
- return new JobResults($task, JobStatus::Success, $this->client->doLow($task->getFunctionName(), $task->getData(), $task->getId()));
+ $this->client->addTaskLow($task->getFunctionName(), msgpack_pack($job->toArray()));
+ break;
default:
case TaskPriority::Normal:
- return new JobResults($task, JobStatus::Success, $this->client->doNormal($task->getFunctionName(), $task->getData(), $task->getId()));
+ $this->client->addTask($task->getFunctionName(), msgpack_pack($job->toArray()));
+ break;
}
}
- public function addTask(Task $task): ClientProtocolInterface
+ /**
+ * Adds a closure task to the list of tasks to run
+ *
+ * @param callable $function
+ * @param $callback
+ * @return void
+ * @throws ServerException
+ */
+ public function addClosureTask(callable $function, $callback): void
{
- if($this->automatic_reconnect && time() > $this->next_reconnect)
- {
- $this->reconnect();
- $this->next_reconnect = time() + 1800;
- }
-
- $this->tasks[] = $task;
-
- switch($task->getPriority())
- {
- case TaskPriority::High:
- $this->client->addTaskHigh($task->getFunctionName(), $task->getData(), $task->getId());
- break;
-
- case TaskPriority::Low:
- $this->client->addTaskLow($task->getFunctionName(), $task->getData(), $task->getId());
- break;
-
- default:
- case TaskPriority::Normal:
- $this->client->addTask($task->getFunctionName(), $task->getData(), $task->getId());
- break;
- }
-
- return $this;
- }
-
-
- public function addBackgroundTask(Task $task): ClientProtocolInterface
- {
- if($this->automatic_reconnect && time() > $this->next_reconnect)
- {
- $this->reconnect();
- $this->next_reconnect = time() + 1800;
- }
-
- $this->tasks[] = $task;
-
- switch($task->getPriority())
- {
- case TaskPriority::High:
- $this->client->addTaskHighBackground($task->getFunctionName(), $task->getData(), $task->getId());
- break;
-
- case TaskPriority::Low:
- $this->client->addTaskLowBackground($task->getFunctionName(), $task->getData(), $task->getId());
- break;
-
- default:
- case TaskPriority::Normal:
- $this->client->addTaskBackground($task->getFunctionName(), $task->getData(), $task->getId());
- break;
- }
-
- return $this;
+ $closure_task = new Task('tamer_closure', \Opis\Closure\serialize(new SerializableClosure($function)), $callback);
+ $closure_task->setClosure(true);
+ $this->addTask($closure_task);
}
/**
* @return bool
* @throws ServerException
*/
- public function doTasks(): bool
+ public function run(): bool
{
if($this->automatic_reconnect && time() > $this->next_reconnect)
{
@@ -283,7 +270,8 @@
*/
public function callbackHandler(GearmanTask $task): void
{
- $internal_task = $this->getTaskById($task->unique());
+ $job_result = JobResults::fromArray(msgpack_unpack($task->data()));
+ $internal_task = $this->getTaskById($job_result->getId());
$job_status = match ($task->returnCode())
{
GEARMAN_WORK_EXCEPTION => JobStatus::Exception,
@@ -291,12 +279,10 @@
default => JobStatus::Success,
};
- $job_results = new JobResults($internal_task, $job_status, ($task->data() ?? null));
-
try
{
Log::debug('net.nosial.tamer', 'callback for task ' . $internal_task->getId() . ' with status ' . $job_status . ' and data size ' . strlen($task->data()) . ' bytes');
- $internal_task->runCallback($job_results);
+ $internal_task->runCallback($job_result);
}
catch(Exception $e)
{
@@ -314,8 +300,6 @@
*/
private function getTaskById(string $id): ?Task
{
- var_dump($this->tasks);
- var_dump($id);
foreach($this->tasks as $task)
{
if($task->getId() === $id)
@@ -349,16 +333,15 @@
* Removes a task from the list of tasks
*
* @param Task $task
- * @return ClientProtocolInterface
+ * @return void
*/
- private function removeTask(Task $task): ClientProtocolInterface
+ private function removeTask(Task $task): void
{
$this->tasks = array_filter($this->tasks, function($item) use ($task)
{
return $item->getId() !== $task->getId();
});
- return $this;
}
/**
@@ -376,4 +359,21 @@
{
$this->automatic_reconnect = $automatic_reconnect;
}
+
+ /**
+ * Executes all remaining tasks and closes the connection
+ */
+ public function __destruct()
+ {
+ try
+ {
+ $this->client->runTasks();
+ }
+ catch(Exception $e)
+ {
+ unset($e);
+ }
+
+ unset($this->client);
+ }
}
\ No newline at end of file
diff --git a/src/Tamer/Protocols/GearmanWorker.php b/src/Tamer/Protocols/GearmanWorker.php
index 2e3d359..b5a4802 100644
--- a/src/Tamer/Protocols/GearmanWorker.php
+++ b/src/Tamer/Protocols/GearmanWorker.php
@@ -1,13 +1,18 @@
worker = null;
$this->server_cache = [];
$this->automatic_reconnect = false;
$this->next_reconnect = time() + 1800;
- $this->current_job = null;
try
{
@@ -120,10 +119,21 @@
{
$this->worker->addFunction($function_name, function(GearmanJob $job) use ($function, $context)
{
- $this->current_job = $job;
- $job = new Job($job->unique(), $job->handle(), $job->workload());
- $function($job, $context);
- $this->current_job = null;
+ $received_job = Job::fromArray(msgpack_unpack($job->workload()));
+
+ try
+ {
+ $result = $function($received_job, $context);
+ }
+ catch(Exception $e)
+ {
+ $job->sendFail();
+ return;
+ }
+
+ $job_results = new JobResults($received_job, JobStatus::Success, $result);
+ $job->sendComplete(msgpack_pack($job_results->toArray()));
+
});
return $this;
}
@@ -173,6 +183,26 @@
$this->addServer($host, $port);
}
}
+
+ $this->worker->addFunction('tamer_closure', function(GearmanJob $job)
+ {
+ $received_job = Job::fromArray(msgpack_unpack($job->workload()));
+
+ try
+ {
+ /** @var SerializableClosure $closure */
+ $closure = $received_job->getData();
+ $result = $closure->getClosure()->__invoke($received_job);
+ }
+ catch(Exception $e)
+ {
+ $job->sendFail();
+ return;
+ }
+
+ $job_results = new JobResults($received_job, JobStatus::Success, $result);
+ $job->sendComplete(msgpack_pack($job_results->toArray()));
+ });
}
/**
diff --git a/tests/client_example.php b/tests/client_example.php
deleted file mode 100644
index 4a379eb..0000000
--- a/tests/client_example.php
+++ /dev/null
@@ -1,18 +0,0 @@
-addServer();
-
- $client->addTask(new Task('sleep', '5', function(Job $job) {
- echo "Task {$job->getId()} completed with data: {$job->getData()} \n";
- }));
-
-
- $client->doTasks();
\ No newline at end of file
diff --git a/tests/gearman_client.php b/tests/gearman_client.php
new file mode 100644
index 0000000..7305c05
--- /dev/null
+++ b/tests/gearman_client.php
@@ -0,0 +1,26 @@
+addServer();
+
+ $client->doBackground(new Task('sleep', '5'));
+
+
+ $client->addTask(new Task('sleep', '5', function(JobResults $job) {
+ echo "Task {$job->getId()} completed with data: {$job->getData()} \n";
+ }));
+
+
+ $client->addTask(new Task('sleep', '5', function(JobResults $job) {
+ echo "Task {$job->getId()} completed with data: {$job->getData()} \n";
+ }));
+
+
+ $client->run();
\ No newline at end of file
diff --git a/tests/gearman_closure.php b/tests/gearman_closure.php
new file mode 100644
index 0000000..4aa3acd
--- /dev/null
+++ b/tests/gearman_closure.php
@@ -0,0 +1,15 @@
+addServer();
+
+ $client->closure(function () {
+ echo "This function was sent from a client, it should be executed on the worker";
+ });
\ No newline at end of file
diff --git a/tests/gearman_worker.php b/tests/gearman_worker.php
index 7783f89..5a48584 100644
--- a/tests/gearman_worker.php
+++ b/tests/gearman_worker.php
@@ -1,17 +1,21 @@
addServer();
- $worker->addFunction('sleep', function($task) {
- var_dump(get_class($task));
- echo "Task {$task->getId()} started with data: {$task->getData()} \n";
- sleep($task->getData());
- echo "Task {$task->getId()} completed with data: {$task->getData()} \n";
+ $worker->addFunction('sleep', function($job) {
+ /** @var Job $job */
+ var_dump(get_class($job));
+ echo "Task {$job->getId()} started with data: {$job->getData()} \n";
+ sleep($job->getData());
+ echo "Task {$job->getId()} completed with data: {$job->getData()} \n";
+
+ return $job->getData();
});