Progress on closures

This commit is contained in:
Netkas 2023-02-01 23:42:41 -05:00
parent 39d1084a3f
commit f88f453578
13 changed files with 427 additions and 144 deletions

6
.idea/php.xml generated
View file

@ -15,9 +15,15 @@
<path value="/etc/ncc" />
<path value="/var/ncc/packages/net.nosial.optslib=1.0.0" />
<path value="/var/ncc/packages/net.nosial.loglib=1.0.0" />
<path value="/var/ncc/packages/com.opis.closure=3.6.3" />
</include_path>
</component>
<component name="PhpProjectSharedConfiguration" php_language_level="8.1" />
<component name="PhpRuntimeConfiguration">
<extensions>
<extension name="msgpack" enabled="true" />
</extensions>
</component>
<component name="PhpStanOptionsConfiguration">
<option name="transferred" value="true" />
</component>

1
.idea/tamer.iml generated
View file

@ -4,6 +4,7 @@
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/tests" isTestSource="true" />
<excludeFolder url="file://$MODULE_DIR$/build" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />

View file

@ -40,6 +40,12 @@
"version": "latest",
"source_type": "remote",
"source": "nosial/libs.log=latest@n64"
},
{
"name": "com.opis.closure",
"version": "latest",
"source_type": "remote",
"source": "opis/closure=latest@composer"
}
],
"configurations": [

View file

@ -2,7 +2,70 @@
namespace Tamer\Interfaces;
use Tamer\Objects\Task;
interface ClientProtocolInterface
{
/**
* Adds options to the client (client specific)
*
* @param array $options
* @return bool
*/
public function addOptions(array $options): bool;
/**
* Adds a server to the list of servers to use
*
* @param string $host The host to connect to (eg; 127.0.0.1)
* @param int $port The port to connect to (eg; 4730)
* @return bool
*/
public function addServer(string $host, int $port): bool;
/**
* Adds a list of servers to the list of servers to use
*
* @param array $servers An array of servers to connect to (eg; ['host:port', 'host:port', ...])
* @return bool
*/
public function addServers(array $servers): bool;
/**
* Processes a task in the background (does not return a result)
*
* @param Task $task The task to process
* @return void
*/
public function doBackground(Task $task): void;
/**
* Queues a task to be processed in parallel (returns a result handled by a callback)
*
* @param Task $task
* @return void
*/
public function addTask(Task $task): void;
/**
* Executes all tasks in the queue and waits for them to complete
*
* @return bool
*/
public function run(): bool;
/**
* Returns True if the client is set to automatically reconnect to the server after a period of time
*
* @return bool
*/
public function isAutomaticReconnect(): bool;
/**
* Enables or disables automatic reconnecting to the server after a period of time
*
* @param bool $automatic_reconnect
* @return void
*/
public function setAutomaticReconnect(bool $automatic_reconnect): void;
}

View file

@ -27,14 +27,24 @@
*/
private $data;
public function __construct(string $id, string $name, string $data)
/**
* Indicates if the data is a closure
*
* @var bool
*/
private $closure;
public function __construct(Task $task)
{
$this->id = $id;
$this->name = $name;
$this->data = $data;
$this->id = $task->getId();
$this->name = $task->getFunctionName();
$this->data = $task->getData();
$this->closure = $task->isClosure();
}
/**
* Returns the ID of the Job
*
* @return string
*/
public function getId(): string
@ -43,6 +53,8 @@
}
/**
* Returns the function name of the Job
*
* @return string
*/
public function getName(): string
@ -51,10 +63,56 @@
}
/**
* Returns the data of the Job
*
* @return string
*/
public function getData(): string
{
return $this->data;
}
/**
* @return bool
*/
public function isClosure(): bool
{
return $this->closure;
}
/**
* Returns an array representation of the Job
*
* @return array
*/
public function toArray(): array
{
return [
'id' => $this->id,
'name' => $this->name,
'data' => ($this->closure ? \Opis\Closure\serialize($this->data) : $this->data),
'closure' => $this->closure
];
}
/**
* Constructs a Job from an array
*
* @param array $data
* @return Job
*/
public static function fromArray(array $data): Job
{
$data = $data['data'];
if($data['closure'] === true)
$data = \Opis\Closure\unserialize($data['data']);
$job = new Job(new Task($data['name'], $data['data']));
$job->id = $data['id'];
$job->closure = $data['closure'];
return $job;
}
}

View file

@ -1,48 +1,69 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace Tamer\Objects;
use Tamer\Abstracts\JobStatus;
class JobResults
{
/**
* @var Task
*/
private Task $task;
/**
* @var int
*/
private int $status;
/**
* @var string|null
*/
private ?string $result;
/**
* Public Constructor
* The ID of the job
*
* @param Task $task
* @param int $status
* @param string|null $result
* @var string
*/
public function __construct(Task $task, int $status, ?string $result)
private $id;
/**
* The data to be passed to the function
*
* @var string
*/
private $data;
/**
* The status of the job
*
* @var int
* @see JobStatus
*/
private $status;
public function __construct(?Job $job=null, ?int $status=null, $results=null)
{
$this->task = $task;
$this->status = $status;
$this->result = $result;
if($job !== null)
{
$this->id = $job->getId();
$this->data = $results;
$this->status = $status;
}
}
/**
* @return Task
* Returns the ID of the Job
*
* @return string
*/
public function getTask(): Task
public function getId(): string
{
return $this->task;
return $this->id;
}
/**
* Returns the data of the Job
*
* @return string
*/
public function getData(): string
{
return $this->data;
}
/**
* @return int
* @noinspection PhpUnused
*/
public function getStatus(): int
{
@ -50,10 +71,59 @@
}
/**
* @return string|null
* Returns an array representation of the Job
*
* @return array
*/
public function getResult(): ?string
public function toArray(): array
{
return $this->result;
return [
'id' => $this->id,
'data' => $this->data,
'status' => $this->status
];
}
/**
* Constructs a Job from an array
*
* @param array $data
* @return JobResults
*/
public static function fromArray(array $data): JobResults
{
$job = new JobResults();
$job->setId($data['id']);
$job->setData($data['data']);
$job->setStatus($data['status']);
return $job;
}
/**
* @param string $id
*/
protected function setId(string $id): void
{
$this->id = $id;
}
/**
* @param string $data
*/
protected function setData(string $data): void
{
$this->data = $data;
}
/**
* @param int|null $status
*/
protected function setStatus(?int $status): void
{
$this->status = $status;
}
}

View file

@ -33,6 +33,11 @@
*/
private $callback;
/**
* @var bool
*/
private $closure;
/**
* Public Constructor
*
@ -47,6 +52,7 @@
$this->id = uniqid();
$this->priority = TaskPriority::Normal;
$this->callback = $callback;
$this->closure = false;
}
/**
@ -147,4 +153,20 @@
call_user_func($this->callback, $result);
}
}
/**
* @return bool
*/
public function isClosure(): bool
{
return $this->closure;
}
/**
* @param bool $closure
*/
public function setClosure(bool $closure): void
{
$this->closure = $closure;
}
}

View file

@ -7,10 +7,12 @@
use Exception;
use GearmanTask;
use LogLib\Log;
use Opis\Closure\SerializableClosure;
use Tamer\Abstracts\JobStatus;
use Tamer\Abstracts\TaskPriority;
use Tamer\Exceptions\ServerException;
use Tamer\Interfaces\ClientProtocolInterface;
use Tamer\Objects\Job;
use Tamer\Objects\JobResults;
use Tamer\Objects\Task;
@ -51,6 +53,7 @@
$this->tasks = [];
$this->automatic_reconnect = false;
$this->next_reconnect = time() + 1800;
$this->server_cache = [];
try
{
@ -80,6 +83,11 @@
return $this->client->addOptions($options);
}
/**
* Registers callbacks for the client
*
* @return void
*/
private function registerCallbacks(): void
{
$this->client->setCompleteCallback([$this, 'callbackHandler']);
@ -134,14 +142,28 @@
return $this->client->addServers(implode(',', $servers));
}
/**
* Executes a closure in the background
*
* @param callable $function
* @return void
* @throws ServerException
*/
public function closure(callable $function): void
{
$closure_task = new Task('tamer_closure', \Opis\Closure\serialize(new SerializableClosure($function)));
$closure_task->setClosure(true);
$this->doBackground($closure_task);
}
/**
* Processes a task in the background
*
* @param Task $task
* @return bool
* @return void
* @throws ServerException
*/
public function doBackground(Task $task): bool
public function doBackground(Task $task): void
{
if($this->automatic_reconnect && time() > $this->next_reconnect)
{
@ -150,29 +172,34 @@
}
$this->tasks[] = $task;
$job = new Job($task);
switch($task->getPriority())
{
case TaskPriority::High:
return $this->client->doHighBackground($task->getFunctionName(), $task->getData(), $task->getId());
$this->client->doHighBackground($task->getFunctionName(), msgpack_pack($job->toArray()));
break;
case TaskPriority::Low:
return $this->client->doLowBackground($task->getFunctionName(), $task->getData(), $task->getId());
$this->client->doLowBackground($task->getFunctionName(), msgpack_pack($job->toArray()));
break;
default:
case TaskPriority::Normal:
return $this->client->doBackground($task->getFunctionName(), $task->getData(), $task->getId());
$this->client->doBackground($task->getFunctionName(), msgpack_pack($job->toArray()));
break;
}
}
/**
* Processes a task in the foreground
* Adds a task to the list of tasks to run
*
* @param Task $task
* @return JobResults
* @return void
* @throws ServerException
*/
public function do(Task $task): JobResults
public function addTask(Task $task): void
{
if($this->automatic_reconnect && time() > $this->next_reconnect)
{
@ -181,85 +208,45 @@
}
$this->tasks[] = $task;
$job = new Job($task);
switch($task->getPriority())
{
case TaskPriority::High:
return new JobResults($task, JobStatus::Success, $this->client->doHigh($task->getFunctionName(), $task->getData(), $task->getId()));
$this->client->addTaskHigh($task->getFunctionName(), msgpack_pack($job->toArray()));
break;
case TaskPriority::Low:
return new JobResults($task, JobStatus::Success, $this->client->doLow($task->getFunctionName(), $task->getData(), $task->getId()));
$this->client->addTaskLow($task->getFunctionName(), msgpack_pack($job->toArray()));
break;
default:
case TaskPriority::Normal:
return new JobResults($task, JobStatus::Success, $this->client->doNormal($task->getFunctionName(), $task->getData(), $task->getId()));
$this->client->addTask($task->getFunctionName(), msgpack_pack($job->toArray()));
break;
}
}
public function addTask(Task $task): ClientProtocolInterface
/**
* Adds a closure task to the list of tasks to run
*
* @param callable $function
* @param $callback
* @return void
* @throws ServerException
*/
public function addClosureTask(callable $function, $callback): void
{
if($this->automatic_reconnect && time() > $this->next_reconnect)
{
$this->reconnect();
$this->next_reconnect = time() + 1800;
}
$this->tasks[] = $task;
switch($task->getPriority())
{
case TaskPriority::High:
$this->client->addTaskHigh($task->getFunctionName(), $task->getData(), $task->getId());
break;
case TaskPriority::Low:
$this->client->addTaskLow($task->getFunctionName(), $task->getData(), $task->getId());
break;
default:
case TaskPriority::Normal:
$this->client->addTask($task->getFunctionName(), $task->getData(), $task->getId());
break;
}
return $this;
}
public function addBackgroundTask(Task $task): ClientProtocolInterface
{
if($this->automatic_reconnect && time() > $this->next_reconnect)
{
$this->reconnect();
$this->next_reconnect = time() + 1800;
}
$this->tasks[] = $task;
switch($task->getPriority())
{
case TaskPriority::High:
$this->client->addTaskHighBackground($task->getFunctionName(), $task->getData(), $task->getId());
break;
case TaskPriority::Low:
$this->client->addTaskLowBackground($task->getFunctionName(), $task->getData(), $task->getId());
break;
default:
case TaskPriority::Normal:
$this->client->addTaskBackground($task->getFunctionName(), $task->getData(), $task->getId());
break;
}
return $this;
$closure_task = new Task('tamer_closure', \Opis\Closure\serialize(new SerializableClosure($function)), $callback);
$closure_task->setClosure(true);
$this->addTask($closure_task);
}
/**
* @return bool
* @throws ServerException
*/
public function doTasks(): bool
public function run(): bool
{
if($this->automatic_reconnect && time() > $this->next_reconnect)
{
@ -283,7 +270,8 @@
*/
public function callbackHandler(GearmanTask $task): void
{
$internal_task = $this->getTaskById($task->unique());
$job_result = JobResults::fromArray(msgpack_unpack($task->data()));
$internal_task = $this->getTaskById($job_result->getId());
$job_status = match ($task->returnCode())
{
GEARMAN_WORK_EXCEPTION => JobStatus::Exception,
@ -291,12 +279,10 @@
default => JobStatus::Success,
};
$job_results = new JobResults($internal_task, $job_status, ($task->data() ?? null));
try
{
Log::debug('net.nosial.tamer', 'callback for task ' . $internal_task->getId() . ' with status ' . $job_status . ' and data size ' . strlen($task->data()) . ' bytes');
$internal_task->runCallback($job_results);
$internal_task->runCallback($job_result);
}
catch(Exception $e)
{
@ -314,8 +300,6 @@
*/
private function getTaskById(string $id): ?Task
{
var_dump($this->tasks);
var_dump($id);
foreach($this->tasks as $task)
{
if($task->getId() === $id)
@ -349,16 +333,15 @@
* Removes a task from the list of tasks
*
* @param Task $task
* @return ClientProtocolInterface
* @return void
*/
private function removeTask(Task $task): ClientProtocolInterface
private function removeTask(Task $task): void
{
$this->tasks = array_filter($this->tasks, function($item) use ($task)
{
return $item->getId() !== $task->getId();
});
return $this;
}
/**
@ -376,4 +359,21 @@
{
$this->automatic_reconnect = $automatic_reconnect;
}
/**
* Executes all remaining tasks and closes the connection
*/
public function __destruct()
{
try
{
$this->client->runTasks();
}
catch(Exception $e)
{
unset($e);
}
unset($this->client);
}
}

View file

@ -1,13 +1,18 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace Tamer\Protocols;
use Exception;
use GearmanJob;
use Opis\Closure\SerializableClosure;
use Tamer\Abstracts\JobStatus;
use Tamer\Exceptions\ServerException;
use Tamer\Exceptions\WorkerException;
use Tamer\Interfaces\WorkerProtocolInterface;
use Tamer\Objects\Job;
use Tamer\Objects\JobResults;
class GearmanWorker implements WorkerProtocolInterface
{
@ -31,18 +36,12 @@
*/
private $next_reconnect;
/**
* @var GearmanJob|null
*/
private $current_job;
public function __construct()
{
$this->worker = null;
$this->server_cache = [];
$this->automatic_reconnect = false;
$this->next_reconnect = time() + 1800;
$this->current_job = null;
try
{
@ -120,10 +119,21 @@
{
$this->worker->addFunction($function_name, function(GearmanJob $job) use ($function, $context)
{
$this->current_job = $job;
$job = new Job($job->unique(), $job->handle(), $job->workload());
$function($job, $context);
$this->current_job = null;
$received_job = Job::fromArray(msgpack_unpack($job->workload()));
try
{
$result = $function($received_job, $context);
}
catch(Exception $e)
{
$job->sendFail();
return;
}
$job_results = new JobResults($received_job, JobStatus::Success, $result);
$job->sendComplete(msgpack_pack($job_results->toArray()));
});
return $this;
}
@ -173,6 +183,26 @@
$this->addServer($host, $port);
}
}
$this->worker->addFunction('tamer_closure', function(GearmanJob $job)
{
$received_job = Job::fromArray(msgpack_unpack($job->workload()));
try
{
/** @var SerializableClosure $closure */
$closure = $received_job->getData();
$result = $closure->getClosure()->__invoke($received_job);
}
catch(Exception $e)
{
$job->sendFail();
return;
}
$job_results = new JobResults($received_job, JobStatus::Success, $result);
$job->sendComplete(msgpack_pack($job_results->toArray()));
});
}
/**

View file

@ -1,18 +0,0 @@
<?php
require 'ncc';
use Tamer\Objects\Job;
use Tamer\Objects\Task;
import('net.nosial.tamerlib', 'latest');
$client = new \Tamer\Protocols\GearmanClient();
$client->addServer();
$client->addTask(new Task('sleep', '5', function(Job $job) {
echo "Task {$job->getId()} completed with data: {$job->getData()} \n";
}));
$client->doTasks();

26
tests/gearman_client.php Normal file
View file

@ -0,0 +1,26 @@
<?php
require 'ncc';
use Tamer\Objects\JobResults;
use Tamer\Objects\Task;
import('net.nosial.tamerlib', 'latest');
$client = new \Tamer\Protocols\GearmanClient();
$client->addServer();
$client->doBackground(new Task('sleep', '5'));
$client->addTask(new Task('sleep', '5', function(JobResults $job) {
echo "Task {$job->getId()} completed with data: {$job->getData()} \n";
}));
$client->addTask(new Task('sleep', '5', function(JobResults $job) {
echo "Task {$job->getId()} completed with data: {$job->getData()} \n";
}));
$client->run();

15
tests/gearman_closure.php Normal file
View file

@ -0,0 +1,15 @@
<?php
require 'ncc';
use Tamer\Objects\JobResults;
use Tamer\Objects\Task;
import('net.nosial.tamerlib', 'latest');
$client = new \Tamer\Protocols\GearmanClient();
$client->addServer();
$client->closure(function () {
echo "This function was sent from a client, it should be executed on the worker";
});

View file

@ -1,17 +1,21 @@
<?php
require 'ncc';
use Tamer\Objects\Task;
use Tamer\Objects\Job;
import('net.nosial.tamerlib', 'latest');
$worker = new \Tamer\Protocols\GearmanWorker();
$worker->addServer();
$worker->addFunction('sleep', function($task) {
var_dump(get_class($task));
echo "Task {$task->getId()} started with data: {$task->getData()} \n";
sleep($task->getData());
echo "Task {$task->getId()} completed with data: {$task->getData()} \n";
$worker->addFunction('sleep', function($job) {
/** @var Job $job */
var_dump(get_class($job));
echo "Task {$job->getId()} started with data: {$job->getData()} \n";
sleep($job->getData());
echo "Task {$job->getId()} completed with data: {$job->getData()} \n";
return $job->getData();
});