This commit is contained in:
Netkas 2023-02-01 19:22:26 -05:00
parent bfe790cf35
commit 39d1084a3f
22 changed files with 1158 additions and 1 deletions

9
.idea/php.xml generated
View file

@ -9,6 +9,15 @@
<component name="PHPCodeSnifferOptionsConfiguration">
<option name="transferred" value="true" />
</component>
<component name="PhpIncludePathManager">
<include_path>
<path value="/usr/share/php" />
<path value="/etc/ncc" />
<path value="/var/ncc/packages/net.nosial.optslib=1.0.0" />
<path value="/var/ncc/packages/net.nosial.loglib=1.0.0" />
</include_path>
</component>
<component name="PhpProjectSharedConfiguration" php_language_level="8.1" />
<component name="PhpStanOptionsConfiguration">
<option name="transferred" value="true" />
</component>

5
.idea/tamer.iml generated
View file

@ -1,7 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/tests" isTestSource="true" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>

View file

@ -28,6 +28,20 @@
"build": {
"source_path": "src",
"default_configuration": "release",
"dependencies": [
{
"name": "net.nosial.optslib",
"version": "latest",
"source_type": "remote",
"source": "nosial/libs.opts=latest@n64"
},
{
"name": "net.nosial.loglib",
"version": "latest",
"source_type": "remote",
"source": "nosial/libs.log=latest@n64"
}
],
"configurations": [
{
"name": "release",

View file

@ -0,0 +1,16 @@
<?php
namespace Tamer\Abstracts\ExitCodes;
class WorkerExitCodes
{
const GracefulShutdown = 0;
const Exception = 1;
const UnsupervisedWorker = 2;
const ProtocolUnavailable = 3;
const ServerConnectionFailed = 4;
}

View file

@ -0,0 +1,12 @@
<?php
namespace Tamer\Abstracts;
abstract class JobStatus
{
const Success = 0;
const Failure = 1;
const Exception = 2;
}

View file

@ -0,0 +1,12 @@
<?php
namespace Tamer\Abstracts;
abstract class ProtocolType
{
const Gearman = 'gearman';
const RabbitMQ = 'rabbitmq';
const Redis = 'redis';
}

View file

@ -0,0 +1,12 @@
<?php
namespace Tamer\Abstracts;
abstract class TaskPriority
{
const Low = 0;
const Normal = 1;
const High = 2;
}

View file

@ -0,0 +1,29 @@
<?php
namespace Tamer\Classes;
use OptsLib\Parse;
class Functions
{
/**
* Attempts to get the worker id from the command line arguments or the environment variable TAMER_WORKER_ID
* If neither are set, returns null.
*
* @return string|null
*/
public static function getWorkerId(): ?string
{
$options = Parse::getArguments();
$worker_id = ($options['worker-id'] ?? null);
if($worker_id !== null)
return $worker_id;
$worker_id = getenv('TAMER_WORKER_ID');
if($worker_id !== false)
return $worker_id;
return null;
}
}

View file

@ -0,0 +1,39 @@
<?php
namespace Tamer\Classes;
use Tamer\Abstracts\ProtocolType;
use Tamer\Abstracts\TaskPriority;
class Validate
{
/**
* Returns true if the input is a valid protocol type.
*
* @param string $input
* @return bool
*/
public static function protocolType(string $input): bool
{
return match (strtolower($input))
{
ProtocolType::Gearman, ProtocolType::RabbitMQ, ProtocolType::Redis => true,
default => false,
};
}
/**
* Returns true if the input is a valid task priority.
*
* @param int $input
* @return bool
*/
public static function taskPriority(int $input): bool
{
return match ($input)
{
TaskPriority::Low, TaskPriority::Normal, TaskPriority::High => true,
default => false,
};
}
}

43
src/Tamer/Client.php Normal file
View file

@ -0,0 +1,43 @@
<?php
namespace Tamer;
use InvalidArgumentException;
use Tamer\Classes\Validate;
class Client
{
/**
* @var string
*/
private string $protocol;
/**
* Tamer client public constructor
*/
public function __construct(string $protocol)
{
$this->setProtocol($protocol);
}
/**
* @return string
*/
public function getProtocol(): string
{
return $this->protocol;
}
/**
* @param string $protocol
*/
public function setProtocol(string $protocol): void
{
if(!Validate::protocolType($protocol))
{
throw new InvalidArgumentException("Invalid protocol type: $protocol");
}
$this->protocol = $protocol;
}
}

View file

@ -0,0 +1,18 @@
<?php
namespace Tamer\Exceptions;
use Exception;
class ServerException extends Exception
{
/**
* @param string $message
* @param int $code
* @param Exception|null $previous
*/
public function __construct(string $message, int $code=0, Exception $previous=null)
{
parent::__construct($message, $code, $previous);
}
}

View file

@ -0,0 +1,18 @@
<?php
namespace Tamer\Exceptions;
use Exception;
class WorkerException extends Exception
{
/**
* @param string $message
* @param int $code
* @param Exception|null $previous
*/
public function __construct(string $message, int $code=0, Exception $previous=null)
{
parent::__construct($message, $code, $previous);
}
}

View file

@ -0,0 +1,8 @@
<?php
namespace Tamer\Interfaces;
interface ClientProtocolInterface
{
}

View file

@ -0,0 +1,8 @@
<?php
namespace Tamer\Interfaces;
interface WorkerProtocolInterface
{
}

60
src/Tamer/Objects/Job.php Normal file
View file

@ -0,0 +1,60 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace Tamer\Objects;
class Job
{
/**
* The ID of the job
*
* @var string
*/
private $id;
/**
* The name of the function
*
* @var string
*/
private $name;
/**
* The data to be passed to the function
*
* @var string
*/
private $data;
public function __construct(string $id, string $name, string $data)
{
$this->id = $id;
$this->name = $name;
$this->data = $data;
}
/**
* @return string
*/
public function getId(): string
{
return $this->id;
}
/**
* @return string
*/
public function getName(): string
{
return $this->name;
}
/**
* @return string
*/
public function getData(): string
{
return $this->data;
}
}

View file

@ -0,0 +1,59 @@
<?php
namespace Tamer\Objects;
class JobResults
{
/**
* @var Task
*/
private Task $task;
/**
* @var int
*/
private int $status;
/**
* @var string|null
*/
private ?string $result;
/**
* Public Constructor
*
* @param Task $task
* @param int $status
* @param string|null $result
*/
public function __construct(Task $task, int $status, ?string $result)
{
$this->task = $task;
$this->status = $status;
$this->result = $result;
}
/**
* @return Task
*/
public function getTask(): Task
{
return $this->task;
}
/**
* @return int
*/
public function getStatus(): int
{
return $this->status;
}
/**
* @return string|null
*/
public function getResult(): ?string
{
return $this->result;
}
}

150
src/Tamer/Objects/Task.php Normal file
View file

@ -0,0 +1,150 @@
<?php
namespace Tamer\Objects;
use InvalidArgumentException;
use Tamer\Abstracts\TaskPriority;
use Tamer\Classes\Validate;
class Task
{
/**
* @var string
*/
private string $id;
/**
* @var string
*/
private string $function_name;
/**
* @var string
*/
private string $data;
/**
* @var int
*/
private int $priority;
/**
* @var callable|null
*/
private $callback;
/**
* Public Constructor
*
* @param string $function_name
* @param string $data
* @param callable|null $callback
*/
public function __construct(string $function_name, string $data, callable $callback=null)
{
$this->function_name = $function_name;
$this->data = $data;
$this->id = uniqid();
$this->priority = TaskPriority::Normal;
$this->callback = $callback;
}
/**
* Returns the function name for the task
*
* @return string
*/
public function getFunctionName(): string
{
return $this->function_name;
}
/**
* Sets the function name for the task
*
* @param string $function_name
* @return Task
*/
public function setFunctionName(string $function_name): self
{
$this->function_name = $function_name;
return $this;
}
/**
* Returns the arguments for the task
*
* @return string
*/
public function getData(): string
{
return $this->data;
}
/**
* Sets the arguments for the task
*
* @param string $data
* @return Task
*/
public function setData(string $data): self
{
$this->data = $data;
return $this;
}
/**
* Returns the Unique ID of the task
*
* @return string
*/
public function getId(): string
{
return $this->id;
}
/**
* @return int
*/
public function getPriority(): int
{
return $this->priority;
}
/**
* @param int $priority
* @return Task
*/
public function setPriority(int $priority): self
{
if(!Validate::taskPriority($priority))
{
throw new InvalidArgumentException("Invalid priority value");
}
$this->priority = $priority;
return $this;
}
/**
* @param callable|null $callback
*/
public function setCallback(?callable $callback): void
{
$this->callback = $callback;
}
/**
* Executes the callback function
*
* @param JobResults $result
* @return void
*/
public function runCallback(JobResults $result): void
{
if($this->callback !== null)
{
call_user_func($this->callback, $result);
}
}
}

View file

@ -0,0 +1,379 @@
<?php
/** @noinspection PhpMissingFieldTypeInspection */
namespace Tamer\Protocols;
use Exception;
use GearmanTask;
use LogLib\Log;
use Tamer\Abstracts\JobStatus;
use Tamer\Abstracts\TaskPriority;
use Tamer\Exceptions\ServerException;
use Tamer\Interfaces\ClientProtocolInterface;
use Tamer\Objects\JobResults;
use Tamer\Objects\Task;
class GearmanClient implements ClientProtocolInterface
{
/**
* @var \GearmanClient|null $client
*/
private $client;
/**
* @var array
*/
private $server_cache;
/**
* Used for tracking the current execution of tasks and run callbacks on completion
*
* @var Task[]
*/
private $tasks;
/**
* @var bool
*/
private $automatic_reconnect;
/**
* @var int
*/
private $next_reconnect;
/**
*/
public function __construct()
{
$this->client = null;
$this->tasks = [];
$this->automatic_reconnect = false;
$this->next_reconnect = time() + 1800;
try
{
$this->reconnect();
}
catch(ServerException $e)
{
unset($e);
}
}
/**
* Adds client options
*
* @link http://php.net/manual/en/gearmanclient.addoptions.php
* @param int[] $options (GEARMAN_CLIENT_NON_BLOCKING, GEARMAN_CLIENT_UNBUFFERED_RESULT, GEARMAN_CLIENT_FREE_TASKS)
* @return bool
*/
public function addOptions(array $options): bool
{
// Parse $options combination via bitwise OR operator
$options = array_reduce($options, function($carry, $item)
{
return $carry | $item;
});
return $this->client->addOptions($options);
}
private function registerCallbacks(): void
{
$this->client->setCompleteCallback([$this, 'callbackHandler']);
$this->client->setFailCallback([$this, 'callbackHandler']);
$this->client->setDataCallback([$this, 'callbackHandler']);
$this->client->setStatusCallback([$this, 'callbackHandler']);
}
/**
* Adds a server to the list of servers to use
*
* @link http://php.net/manual/en/gearmanclient.addserver.php
* @param string $host (127.0.0.1)
* @param int $port (default: 4730)
* @return bool
* @throws ServerException
*/
public function addServer(string $host='127.0.0.1', int $port=4730): bool
{
if(!isset($this->server_cache[$host]))
{
$this->server_cache[$host] = [];
}
if(in_array($port, $this->server_cache[$host]))
{
return true;
}
$this->server_cache[$host][] = $port;
try
{
return $this->client->addServer($host, $port);
}
catch(Exception $e)
{
throw new ServerException($e->getMessage(), $e->getCode(), $e);
}
}
/**
* Adds a list of servers to the list of servers to use
*
* @link http://php.net/manual/en/gearmanclient.addservers.php
* @param array $servers (host:port, host:port, ...)
* @return bool
*/
public function addServers(array $servers): bool
{
return $this->client->addServers(implode(',', $servers));
}
/**
* Processes a task in the background
*
* @param Task $task
* @return bool
* @throws ServerException
*/
public function doBackground(Task $task): bool
{
if($this->automatic_reconnect && time() > $this->next_reconnect)
{
$this->reconnect();
$this->next_reconnect = time() + 1800;
}
$this->tasks[] = $task;
switch($task->getPriority())
{
case TaskPriority::High:
return $this->client->doHighBackground($task->getFunctionName(), $task->getData(), $task->getId());
case TaskPriority::Low:
return $this->client->doLowBackground($task->getFunctionName(), $task->getData(), $task->getId());
default:
case TaskPriority::Normal:
return $this->client->doBackground($task->getFunctionName(), $task->getData(), $task->getId());
}
}
/**
* Processes a task in the foreground
*
* @param Task $task
* @return JobResults
* @throws ServerException
*/
public function do(Task $task): JobResults
{
if($this->automatic_reconnect && time() > $this->next_reconnect)
{
$this->reconnect();
$this->next_reconnect = time() + 1800;
}
$this->tasks[] = $task;
switch($task->getPriority())
{
case TaskPriority::High:
return new JobResults($task, JobStatus::Success, $this->client->doHigh($task->getFunctionName(), $task->getData(), $task->getId()));
case TaskPriority::Low:
return new JobResults($task, JobStatus::Success, $this->client->doLow($task->getFunctionName(), $task->getData(), $task->getId()));
default:
case TaskPriority::Normal:
return new JobResults($task, JobStatus::Success, $this->client->doNormal($task->getFunctionName(), $task->getData(), $task->getId()));
}
}
public function addTask(Task $task): ClientProtocolInterface
{
if($this->automatic_reconnect && time() > $this->next_reconnect)
{
$this->reconnect();
$this->next_reconnect = time() + 1800;
}
$this->tasks[] = $task;
switch($task->getPriority())
{
case TaskPriority::High:
$this->client->addTaskHigh($task->getFunctionName(), $task->getData(), $task->getId());
break;
case TaskPriority::Low:
$this->client->addTaskLow($task->getFunctionName(), $task->getData(), $task->getId());
break;
default:
case TaskPriority::Normal:
$this->client->addTask($task->getFunctionName(), $task->getData(), $task->getId());
break;
}
return $this;
}
public function addBackgroundTask(Task $task): ClientProtocolInterface
{
if($this->automatic_reconnect && time() > $this->next_reconnect)
{
$this->reconnect();
$this->next_reconnect = time() + 1800;
}
$this->tasks[] = $task;
switch($task->getPriority())
{
case TaskPriority::High:
$this->client->addTaskHighBackground($task->getFunctionName(), $task->getData(), $task->getId());
break;
case TaskPriority::Low:
$this->client->addTaskLowBackground($task->getFunctionName(), $task->getData(), $task->getId());
break;
default:
case TaskPriority::Normal:
$this->client->addTaskBackground($task->getFunctionName(), $task->getData(), $task->getId());
break;
}
return $this;
}
/**
* @return bool
* @throws ServerException
*/
public function doTasks(): bool
{
if($this->automatic_reconnect && time() > $this->next_reconnect)
{
$this->reconnect();
$this->next_reconnect = time() + 1800;
}
if(!$this->client->runTasks())
{
return false;
}
return true;
}
/**
* Processes a task callback in the foreground
*
* @param GearmanTask $task
* @return void
*/
public function callbackHandler(GearmanTask $task): void
{
$internal_task = $this->getTaskById($task->unique());
$job_status = match ($task->returnCode())
{
GEARMAN_WORK_EXCEPTION => JobStatus::Exception,
GEARMAN_WORK_FAIL => JobStatus::Failure,
default => JobStatus::Success,
};
$job_results = new JobResults($internal_task, $job_status, ($task->data() ?? null));
try
{
Log::debug('net.nosial.tamer', 'callback for task ' . $internal_task->getId() . ' with status ' . $job_status . ' and data size ' . strlen($task->data()) . ' bytes');
$internal_task->runCallback($job_results);
}
catch(Exception $e)
{
Log::error('net.nosial.tamer', 'Callback for task ' . $internal_task->getId() . ' failed with error: ' . $e->getMessage(), $e);
}
finally
{
$this->removeTask($internal_task);
}
}
/**
* @param string $id
* @return Task|null
*/
private function getTaskById(string $id): ?Task
{
var_dump($this->tasks);
var_dump($id);
foreach($this->tasks as $task)
{
if($task->getId() === $id)
{
return $task;
}
}
return null;
}
/**
* @throws ServerException
*/
private function reconnect()
{
$this->client = new \GearmanClient();
foreach($this->server_cache as $host => $ports)
{
foreach($ports as $port)
{
$this->addServer($host, $port);
}
}
$this->registerCallbacks();
}
/**
* Removes a task from the list of tasks
*
* @param Task $task
* @return ClientProtocolInterface
*/
private function removeTask(Task $task): ClientProtocolInterface
{
$this->tasks = array_filter($this->tasks, function($item) use ($task)
{
return $item->getId() !== $task->getId();
});
return $this;
}
/**
* @return bool
*/
public function isAutomaticReconnect(): bool
{
return $this->automatic_reconnect;
}
/**
* @param bool $automatic_reconnect
*/
public function setAutomaticReconnect(bool $automatic_reconnect): void
{
$this->automatic_reconnect = $automatic_reconnect;
}
}

View file

@ -0,0 +1,219 @@
<?php
namespace Tamer\Protocols;
use Exception;
use GearmanJob;
use Tamer\Exceptions\ServerException;
use Tamer\Exceptions\WorkerException;
use Tamer\Interfaces\WorkerProtocolInterface;
use Tamer\Objects\Job;
class GearmanWorker implements WorkerProtocolInterface
{
/**
* @var \GearmanWorker|null
*/
private $worker;
/**
* @var array
*/
private $server_cache;
/**
* @var bool
*/
private $automatic_reconnect;
/**
* @var int
*/
private $next_reconnect;
/**
* @var GearmanJob|null
*/
private $current_job;
public function __construct()
{
$this->worker = null;
$this->server_cache = [];
$this->automatic_reconnect = false;
$this->next_reconnect = time() + 1800;
$this->current_job = null;
try
{
$this->reconnect();
}
catch(Exception $e)
{
unset($e);
}
}
/**
* Adds a server to the list of servers to use
*
* @link http://php.net/manual/en/gearmanworker.addserver.php
* @param string $host (
* @param int $port (default: 4730)
* @return bool
* @throws ServerException
*/
public function addServer(string $host='127.0.0.1', int $port=4730): bool
{
if(!isset($this->server_cache[$host]))
{
$this->server_cache[$host] = [];
}
if(in_array($port, $this->server_cache[$host]))
{
return true;
}
$this->server_cache[$host][] = $port;
try
{
return $this->worker->addServer($host, $port);
}
catch(Exception $e)
{
throw new ServerException($e->getMessage(), $e->getCode(), $e);
}
}
/**
* Adds a list of servers to the list of servers to use
*
* @link http://php.net/manual/en/gearmanworker.addservers.php
* @param string[] $servers (host:port, host:port, ...)
* @return WorkerProtocolInterface
* @throws ServerException
*/
public function addServers(array $servers): self
{
foreach($servers as $server)
{
$server = explode(':', $server);
$this->addServer($server[0], $server[1]);
}
return $this;
}
/**
* Adds a function to the list of functions to call
*
* @link http://php.net/manual/en/gearmanworker.addfunction.php
* @param string $function_name The name of the function to register with the job server
* @param callable $function The callback function to call when the job is received
* @param mixed|null $context (optional) The context to pass to the callback function
* @return WorkerProtocolInterface
*/
public function addFunction(string $function_name, callable $function, mixed $context=null): self
{
$this->worker->addFunction($function_name, function(GearmanJob $job) use ($function, $context)
{
$this->current_job = $job;
$job = new Job($job->unique(), $job->handle(), $job->workload());
$function($job, $context);
$this->current_job = null;
});
return $this;
}
/**
* Removes a function from the list of functions to call
*
* @param string $function_name The name of the function to unregister
* @return WorkerProtocolInterface
*/
public function removeFunction(string $function_name): self
{
$this->worker->unregister($function_name);
return $this;
}
/**
* @return bool
*/
public function isAutomaticReconnect(): bool
{
return $this->automatic_reconnect;
}
/**
* @param bool $automatic_reconnect
* @return WorkerProtocolInterface
*/
public function setAutomaticReconnect(bool $automatic_reconnect): self
{
$this->automatic_reconnect = $automatic_reconnect;
return $this;
}
/**
* @throws ServerException
*/
private function reconnect()
{
$this->worker = new \GearmanWorker();
$this->worker->addOptions(GEARMAN_WORKER_GRAB_UNIQ);
foreach($this->server_cache as $host => $ports)
{
foreach($ports as $port)
{
$this->addServer($host, $port);
}
}
}
/**
* Waits for a job and calls the appropriate callback function
*
* @link http://php.net/manual/en/gearmanworker.work.php
* @param bool $blocking (default: true) Whether to block until a job is received
* @param int $timeout (default: 500) The timeout in milliseconds
* @param bool $throw_errors (default: false) Whether to throw exceptions on errors
* @return void Returns nothing
* @throws ServerException If the worker cannot connect to the server
* @throws WorkerException If the worker encounters an error while working if $throw_errors is true
*/
public function work(bool $blocking=true, int $timeout=500, bool $throw_errors=false): void
{
if($this->automatic_reconnect && (time() > $this->next_reconnect))
{
$this->reconnect();
$this->next_reconnect = time() + 1800;
}
$this->worker->setTimeout($timeout);
while(true)
{
@$this->worker->work();
if($this->worker->returnCode() == GEARMAN_COULD_NOT_CONNECT)
{
throw new ServerException('Could not connect to Gearman server');
}
if($this->worker->returnCode() == GEARMAN_TIMEOUT && !$blocking)
{
break;
}
if($this->worker->returnCode() != GEARMAN_SUCCESS && $throw_errors)
{
throw new WorkerException('Gearman worker error: ' . $this->worker->error(), $this->worker->returnCode());
}
}
}
}

View file

@ -0,0 +1,8 @@
<?php
namespace Tamer\Protocols;
class RabbitMqClient
{
}

18
tests/client_example.php Normal file
View file

@ -0,0 +1,18 @@
<?php
require 'ncc';
use Tamer\Objects\Job;
use Tamer\Objects\Task;
import('net.nosial.tamerlib', 'latest');
$client = new \Tamer\Protocols\GearmanClient();
$client->addServer();
$client->addTask(new Task('sleep', '5', function(Job $job) {
echo "Task {$job->getId()} completed with data: {$job->getData()} \n";
}));
$client->doTasks();

23
tests/gearman_worker.php Normal file
View file

@ -0,0 +1,23 @@
<?php
require 'ncc';
use Tamer\Objects\Task;
import('net.nosial.tamerlib', 'latest');
$worker = new \Tamer\Protocols\GearmanWorker();
$worker->addServer();
$worker->addFunction('sleep', function($task) {
var_dump(get_class($task));
echo "Task {$task->getId()} started with data: {$task->getData()} \n";
sleep($task->getData());
echo "Task {$task->getId()} completed with data: {$task->getData()} \n";
});
while(true)
{
echo "Waiting for job... \n";
$worker->work();
}