Refactored Interfaces and implemented Tamer, Gearman's protocol has been refactored by RabbitMQ needs refactoring
This commit is contained in:
parent
6d8d4a75a4
commit
c80b4b39c4
21 changed files with 1103 additions and 360 deletions
1
.idea/php.xml
generated
1
.idea/php.xml
generated
|
@ -16,6 +16,7 @@
|
|||
<path value="/var/ncc/packages/net.nosial.optslib=1.0.0" />
|
||||
<path value="/var/ncc/packages/net.nosial.loglib=1.0.0" />
|
||||
<path value="/var/ncc/packages/com.opis.closure=3.6.3" />
|
||||
<path value="/var/ncc/packages/com.php_amqplib.php_amqplib=3.5.1" />
|
||||
</include_path>
|
||||
</component>
|
||||
<component name="PhpProjectSharedConfiguration" php_language_level="8.1" />
|
||||
|
|
18
README.md
18
README.md
|
@ -2,6 +2,24 @@
|
|||
|
||||
Coming soon...
|
||||
|
||||
## Terminology
|
||||
|
||||
### Components
|
||||
|
||||
- **Supervisor** - The main component of the library, this is the component that is responsible for manging
|
||||
workers
|
||||
- **Worker** - The component that is responsible for executing tasks
|
||||
- **Task** - The component that is responsible for executing a function or closure
|
||||
|
||||
### Function Names
|
||||
- **do** - Execute a function in the background without blocking the current thread,
|
||||
this does not return a value. (This is a fire and forget function)
|
||||
- **doClosure** - Execute a closure in the background without blocking the current thread,
|
||||
this does not return a value. (This is a fire and forget function)
|
||||
- **queue** - Queues a function to be executed in the background until the next time the run function is called.
|
||||
- **queueClosure** - Queues a closure to be executed in th background until the next time the run function is called.
|
||||
- **run** - Executes all queued functions and closures in parallel and waits for the tasks to complete.
|
||||
|
||||
|
||||
# License
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
"assembly": {
|
||||
"name": "TamerLib",
|
||||
"package": "net.nosial.tamerlib",
|
||||
"description": "Telegram Bot API Library (A super-powered wrapper for longman/telegram-bot)",
|
||||
"description": "TamerLib allows the execution of parallel tasks",
|
||||
"company": "Nosial",
|
||||
"copyright": "Copyright (c) 2022-2023 Nosial, All Rights Reserved",
|
||||
"version": "1.0.0",
|
||||
|
|
10
src/Tamer/Abstracts/Mode.php
Normal file
10
src/Tamer/Abstracts/Mode.php
Normal file
|
@ -0,0 +1,10 @@
|
|||
<?php
|
||||
|
||||
namespace Tamer\Abstracts;
|
||||
|
||||
abstract class Mode
|
||||
{
|
||||
const Client = 'client';
|
||||
|
||||
const Worker = 'worker';
|
||||
}
|
|
@ -7,6 +7,4 @@
|
|||
const Gearman = 'gearman';
|
||||
|
||||
const RabbitMQ = 'rabbitmq';
|
||||
|
||||
const Redis = 'redis';
|
||||
}
|
|
@ -2,7 +2,11 @@
|
|||
|
||||
namespace Tamer\Classes;
|
||||
|
||||
use InvalidArgumentException;
|
||||
use OptsLib\Parse;
|
||||
use Tamer\Abstracts\ProtocolType;
|
||||
use Tamer\Interfaces\ClientProtocolInterface;
|
||||
use Tamer\Interfaces\WorkerProtocolInterface;
|
||||
|
||||
class Functions
|
||||
{
|
||||
|
@ -26,4 +30,40 @@
|
|||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a client protocol object based on the protocol type
|
||||
*
|
||||
* @param string $protocol
|
||||
* @param string|null $username
|
||||
* @param string|null $password
|
||||
* @return ClientProtocolInterface
|
||||
*/
|
||||
public static function createClient(string $protocol, ?string $username=null, ?string $password=null): ClientProtocolInterface
|
||||
{
|
||||
/** @noinspection PhpFullyQualifiedNameUsageInspection */
|
||||
return match (strtolower($protocol))
|
||||
{
|
||||
ProtocolType::Gearman => new \Tamer\Protocols\Gearman\Client($username, $password),
|
||||
ProtocolType::RabbitMQ => new \Tamer\Protocols\RabbitMq\Client($username, $password),
|
||||
default => throw new InvalidArgumentException('Invalid protocol type'),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $protocol
|
||||
* @param string|null $username
|
||||
* @param string|null $password
|
||||
* @return WorkerProtocolInterface
|
||||
*/
|
||||
public static function createWorker(string $protocol, ?string $username=null, ?string $password=null): WorkerProtocolInterface
|
||||
{
|
||||
/** @noinspection PhpFullyQualifiedNameUsageInspection */
|
||||
return match (strtolower($protocol))
|
||||
{
|
||||
ProtocolType::Gearman => new \Tamer\Protocols\Gearman\Worker($username, $password),
|
||||
ProtocolType::RabbitMQ => new \Tamer\Protocols\RabbitMq\Worker($username, $password),
|
||||
default => throw new InvalidArgumentException('Invalid protocol type'),
|
||||
};
|
||||
}
|
||||
}
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
namespace Tamer\Classes;
|
||||
|
||||
use Tamer\Abstracts\Mode;
|
||||
use Tamer\Abstracts\ProtocolType;
|
||||
use Tamer\Abstracts\TaskPriority;
|
||||
|
||||
|
@ -17,7 +18,20 @@
|
|||
{
|
||||
return match (strtolower($input))
|
||||
{
|
||||
ProtocolType::Gearman, ProtocolType::RabbitMQ, ProtocolType::Redis => true,
|
||||
ProtocolType::Gearman, ProtocolType::RabbitMQ => true,
|
||||
default => false,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $input
|
||||
* @return bool
|
||||
*/
|
||||
public static function mode(string $input): bool
|
||||
{
|
||||
return match (strtolower($input))
|
||||
{
|
||||
Mode::Client, Mode::Worker => true,
|
||||
default => false,
|
||||
};
|
||||
}
|
||||
|
|
|
@ -1,43 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace Tamer;
|
||||
|
||||
use InvalidArgumentException;
|
||||
use Tamer\Classes\Validate;
|
||||
|
||||
class Client
|
||||
{
|
||||
/**
|
||||
* @var string
|
||||
*/
|
||||
private string $protocol;
|
||||
|
||||
/**
|
||||
* Tamer client public constructor
|
||||
*/
|
||||
public function __construct(string $protocol)
|
||||
{
|
||||
$this->setProtocol($protocol);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string
|
||||
*/
|
||||
public function getProtocol(): string
|
||||
{
|
||||
return $this->protocol;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $protocol
|
||||
*/
|
||||
public function setProtocol(string $protocol): void
|
||||
{
|
||||
if(!Validate::protocolType($protocol))
|
||||
{
|
||||
throw new InvalidArgumentException("Invalid protocol type: $protocol");
|
||||
}
|
||||
|
||||
$this->protocol = $protocol;
|
||||
}
|
||||
}
|
|
@ -3,15 +3,16 @@
|
|||
namespace Tamer\Exceptions;
|
||||
|
||||
use Exception;
|
||||
use Throwable;
|
||||
|
||||
class ServerException extends Exception
|
||||
class ConnectionException extends Exception
|
||||
{
|
||||
/**
|
||||
* @param string $message
|
||||
* @param int $code
|
||||
* @param Exception|null $previous
|
||||
* @param Throwable|null $previous
|
||||
*/
|
||||
public function __construct(string $message, int $code=0, Exception $previous=null)
|
||||
public function __construct(string $message="", int $code=0, Throwable $previous=null)
|
||||
{
|
||||
parent::__construct($message, $code, $previous);
|
||||
}
|
|
@ -1,18 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace Tamer\Exceptions;
|
||||
|
||||
use Exception;
|
||||
|
||||
class WorkerException extends Exception
|
||||
{
|
||||
/**
|
||||
* @param string $message
|
||||
* @param int $code
|
||||
* @param Exception|null $previous
|
||||
*/
|
||||
public function __construct(string $message, int $code=0, Exception $previous=null)
|
||||
{
|
||||
parent::__construct($message, $code, $previous);
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@
|
|||
namespace Tamer\Interfaces;
|
||||
|
||||
use Closure;
|
||||
use Tamer\Exceptions\ConnectionException;
|
||||
use Tamer\Objects\Task;
|
||||
|
||||
interface ClientProtocolInterface
|
||||
|
@ -20,40 +21,84 @@
|
|||
*
|
||||
* @param string $host The host to connect to (eg; 127.0.0.1)
|
||||
* @param int $port The port to connect to (eg; 4730)
|
||||
* @return bool
|
||||
* @return void
|
||||
*/
|
||||
public function addServer(string $host, int $port): bool;
|
||||
public function addServer(string $host, int $port): void;
|
||||
|
||||
/**
|
||||
* Adds a list of servers to the list of servers to use
|
||||
*
|
||||
* @param array $servers An array of servers to connect to (eg; ['host:port', 'host:port', ...])
|
||||
* @return bool
|
||||
* @return void
|
||||
*/
|
||||
public function addServers(array $servers): bool;
|
||||
public function addServers(array $servers): void;
|
||||
|
||||
/**
|
||||
* Adds options to the client (client specific)
|
||||
* Connects to all the configured servers
|
||||
*
|
||||
* @throws ConnectionException
|
||||
* @return void
|
||||
*/
|
||||
public function connect(): void;
|
||||
|
||||
/**
|
||||
* Disconnects from all the configured servers
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function disconnect(): void;
|
||||
|
||||
/**
|
||||
* Reconnects to all the configured servers
|
||||
*
|
||||
* @throws ConnectionException
|
||||
* @return void
|
||||
*/
|
||||
public function reconnect(): void;
|
||||
|
||||
/**
|
||||
* Returns True if the client is connected to the server (or servers)
|
||||
*
|
||||
* @param array $options
|
||||
* @return bool
|
||||
*/
|
||||
public function addOptions(array $options): bool;
|
||||
public function isConnected(): bool;
|
||||
|
||||
/**
|
||||
* Sets options to the client (client specific)
|
||||
*
|
||||
* @param array $options
|
||||
* @return void
|
||||
*/
|
||||
public function setOptions(array $options): void;
|
||||
|
||||
/**
|
||||
* Returns the options set on the client
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function getOptions(): array;
|
||||
|
||||
/**
|
||||
* Clears all options from the client
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function clearOptions(): void;
|
||||
|
||||
/**
|
||||
* Returns True if the client is set to automatically reconnect to the server after a period of time
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isAutomaticReconnect(): bool;
|
||||
public function automaticReconnectionEnabled(): bool;
|
||||
|
||||
/**
|
||||
* Enables or disables automatic reconnecting to the server after a period of time
|
||||
*
|
||||
* @param bool $automatic_reconnect
|
||||
* @param bool $enable
|
||||
* @return void
|
||||
*/
|
||||
public function setAutomaticReconnect(bool $automatic_reconnect): void;
|
||||
public function enableAutomaticReconnection(bool $enable): void;
|
||||
|
||||
/**
|
||||
* Processes a task in the background (does not return a result)
|
||||
|
@ -83,10 +128,10 @@
|
|||
* Queues a closure to be processed in parallel (returns a result handled by a callback)
|
||||
*
|
||||
* @param Closure $closure The closure operation to perform (remote)
|
||||
* @param Closure $callback The closure to call when the operation is complete (local)
|
||||
* @param Closure|null $callback The closure to call when the operation is complete (local)
|
||||
* @return void
|
||||
*/
|
||||
public function queueClosure(Closure $closure, Closure $callback): void;
|
||||
public function queueClosure(Closure $closure, ?Closure $callback=null): void;
|
||||
|
||||
/**
|
||||
* Executes all tasks in the queue and waits for them to complete
|
||||
|
|
|
@ -2,6 +2,8 @@
|
|||
|
||||
namespace Tamer\Interfaces;
|
||||
|
||||
use Tamer\Exceptions\ConnectionException;
|
||||
|
||||
interface WorkerProtocolInterface
|
||||
{
|
||||
/**
|
||||
|
@ -17,9 +19,9 @@
|
|||
*
|
||||
* @param string $host The host to connect to (eg; 127.0.0.1)
|
||||
* @param int $port The port to connect to (eg; 4730)
|
||||
* @return bool
|
||||
* @return void
|
||||
*/
|
||||
public function addServer(string $host, int $port): bool;
|
||||
public function addServer(string $host, int $port): void;
|
||||
|
||||
/**
|
||||
* Adds a list of servers to the list of servers to use
|
||||
|
@ -30,37 +32,80 @@
|
|||
public function addServers(array $servers): void;
|
||||
|
||||
/**
|
||||
* Adds options to the worker (worker specific)
|
||||
* Connects to all the configured servers
|
||||
*
|
||||
* @throws ConnectionException
|
||||
* @return void
|
||||
*/
|
||||
public function connect(): void;
|
||||
|
||||
/**
|
||||
* Disconnects from all the configured servers
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function disconnect(): void;
|
||||
|
||||
/**
|
||||
* Reconnects to all the configured servers
|
||||
*
|
||||
* @throws ConnectionException
|
||||
* @return void
|
||||
*/
|
||||
public function reconnect(): void;
|
||||
|
||||
/**
|
||||
* Returns True if the client is connected to the server (or servers)
|
||||
*
|
||||
* @param array $options
|
||||
* @return bool
|
||||
*/
|
||||
public function addOptions(array $options): bool;
|
||||
public function isConnected(): bool;
|
||||
|
||||
/**
|
||||
* Sets options to the worker (worker specific)
|
||||
*
|
||||
* @param array $options
|
||||
* @return void
|
||||
*/
|
||||
public function setOptions(array $options): void;
|
||||
|
||||
/**
|
||||
* Returns the options set on the worker
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function getOptions(): array;
|
||||
|
||||
/**
|
||||
* Clears all options from the worker
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function clearOptions(): void;
|
||||
|
||||
/**
|
||||
* Returns True if the worker is set to automatically reconnect to the server after a period of time
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isAutomaticReconnect(): bool;
|
||||
public function automaticReconnectionEnabled(): bool;
|
||||
|
||||
/**
|
||||
* Enables or disables automatic reconnecting to the server after a period of time
|
||||
*
|
||||
* @param bool $automatic_reconnect
|
||||
* @param bool $enable
|
||||
* @return void
|
||||
*/
|
||||
public function setAutomaticReconnect(bool $automatic_reconnect): void;
|
||||
public function enableAutomaticReconnection(bool $enable): void;
|
||||
|
||||
/**
|
||||
* Registers a function to the worker
|
||||
*
|
||||
* @param string $function_name The name of the function to add
|
||||
* @param callable $function The function to add
|
||||
* @param mixed $context (optional) The context to pass to the function
|
||||
* @param string $name The name of the function to add
|
||||
* @param callable $callable The function to add
|
||||
* @return void
|
||||
*/
|
||||
public function addFunction(string $function_name, callable $function, mixed $context=null): void;
|
||||
public function addFunction(string $name, callable $callable): void;
|
||||
|
||||
/**
|
||||
* Removes a function from the worker
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
<?php
|
||||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace Tamer\Objects;
|
||||
|
||||
use Closure;
|
||||
|
@ -17,20 +19,20 @@
|
|||
/**
|
||||
* @var string
|
||||
*/
|
||||
private string $function_name;
|
||||
private $function_name;
|
||||
|
||||
/**
|
||||
* @var string|Closure|null
|
||||
*/
|
||||
private string|null|Closure $data;
|
||||
private $data;
|
||||
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
private int $priority;
|
||||
private $priority;
|
||||
|
||||
/**
|
||||
* @var callable|null
|
||||
* @var Closure|null
|
||||
*/
|
||||
private $callback;
|
||||
|
||||
|
@ -44,9 +46,9 @@
|
|||
*
|
||||
* @param string $function_name
|
||||
* @param string|Closure|null $data
|
||||
* @param callable|null $callback
|
||||
* @param Closure|null $callback
|
||||
*/
|
||||
public function __construct(string $function_name, string|Closure|null $data, callable $callback=null)
|
||||
public function __construct(string $function_name, string|Closure|null $data, Closure $callback=null)
|
||||
{
|
||||
$this->function_name = $function_name;
|
||||
$this->data = $data;
|
||||
|
@ -56,6 +58,19 @@
|
|||
$this->closure = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Static Constructor
|
||||
*
|
||||
* @param string $function_name
|
||||
* @param string|Closure|null $data
|
||||
* @param callable|null $callback
|
||||
* @return static
|
||||
*/
|
||||
public static function create(string $function_name, string|Closure|null $data, callable $callback=null): self
|
||||
{
|
||||
return new self($function_name, $data, $callback);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the function name for the task
|
||||
*
|
||||
|
@ -134,20 +149,22 @@
|
|||
}
|
||||
|
||||
/**
|
||||
* @param callable|null $callback
|
||||
* @param Closure|null $callback
|
||||
* @return Task
|
||||
*/
|
||||
public function setCallback(?callable $callback): void
|
||||
public function setCallback(?Closure $callback): self
|
||||
{
|
||||
$this->callback = $callback;
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the callback function
|
||||
*
|
||||
* @param JobResults $result
|
||||
* @param string|JobResults|null $result
|
||||
* @return void
|
||||
*/
|
||||
public function runCallback(JobResults $result): void
|
||||
public function runCallback(string|JobResults|null $result): void
|
||||
{
|
||||
if($this->callback !== null)
|
||||
{
|
||||
|
@ -165,9 +182,11 @@
|
|||
|
||||
/**
|
||||
* @param bool $closure
|
||||
* @return Task
|
||||
*/
|
||||
public function setClosure(bool $closure): void
|
||||
public function setClosure(bool $closure): self
|
||||
{
|
||||
$this->closure = $closure;
|
||||
return $this;
|
||||
}
|
||||
}
|
|
@ -6,11 +6,12 @@
|
|||
|
||||
use Closure;
|
||||
use Exception;
|
||||
use GearmanClient;
|
||||
use GearmanTask;
|
||||
use LogLib\Log;
|
||||
use Tamer\Abstracts\JobStatus;
|
||||
use ncc\Utilities\Console;
|
||||
use Tamer\Abstracts\TaskPriority;
|
||||
use Tamer\Exceptions\ServerException;
|
||||
use Tamer\Exceptions\ConnectionException;
|
||||
use Tamer\Interfaces\ClientProtocolInterface;
|
||||
use Tamer\Objects\Job;
|
||||
use Tamer\Objects\JobResults;
|
||||
|
@ -19,14 +20,18 @@
|
|||
class Client implements ClientProtocolInterface
|
||||
{
|
||||
/**
|
||||
* @var \GearmanClient|null $client
|
||||
* The Gearman Client object
|
||||
*
|
||||
* @var GearmanClient|null $client
|
||||
*/
|
||||
private $client;
|
||||
|
||||
/**
|
||||
* An array of servers that have been defined
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $server_cache;
|
||||
private $defined_servers;
|
||||
|
||||
/**
|
||||
* Used for tracking the current execution of tasks and run callbacks on completion
|
||||
|
@ -36,15 +41,27 @@
|
|||
private $tasks;
|
||||
|
||||
/**
|
||||
* Indicates if the client should automatically reconnect to the server if the connection is lost
|
||||
* (default: true)
|
||||
*
|
||||
* @var bool
|
||||
*/
|
||||
private $automatic_reconnect;
|
||||
|
||||
/**
|
||||
* The Unix timestamp of the next time the client should attempt to reconnect to the server
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $next_reconnect;
|
||||
|
||||
/**
|
||||
* The options to use when connecting to the server
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $options;
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
* @param string|null $username
|
||||
|
@ -56,47 +73,8 @@
|
|||
$this->tasks = [];
|
||||
$this->automatic_reconnect = false;
|
||||
$this->next_reconnect = time() + 1800;
|
||||
$this->server_cache = [];
|
||||
|
||||
try
|
||||
{
|
||||
$this->reconnect();
|
||||
}
|
||||
catch(ServerException $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds client options
|
||||
*
|
||||
* @link http://php.net/manual/en/gearmanclient.addoptions.php
|
||||
* @param int[] $options (GEARMAN_CLIENT_NON_BLOCKING, GEARMAN_CLIENT_UNBUFFERED_RESULT, GEARMAN_CLIENT_FREE_TASKS)
|
||||
* @return bool
|
||||
*/
|
||||
public function addOptions(array $options): bool
|
||||
{
|
||||
// Parse $options combination via bitwise OR operator
|
||||
$options = array_reduce($options, function($carry, $item)
|
||||
{
|
||||
return $carry | $item;
|
||||
});
|
||||
|
||||
return $this->client->addOptions($options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers callbacks for the client
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function registerCallbacks(): void
|
||||
{
|
||||
$this->client->setCompleteCallback([$this, 'callbackHandler']);
|
||||
$this->client->setFailCallback([$this, 'callbackHandler']);
|
||||
$this->client->setDataCallback([$this, 'callbackHandler']);
|
||||
$this->client->setStatusCallback([$this, 'callbackHandler']);
|
||||
$this->defined_servers = [];
|
||||
$this->options = [];
|
||||
}
|
||||
|
||||
|
||||
|
@ -106,31 +84,21 @@
|
|||
* @link http://php.net/manual/en/gearmanclient.addserver.php
|
||||
* @param string $host (127.0.0.1)
|
||||
* @param int $port (default: 4730)
|
||||
* @return bool
|
||||
* @throws ServerException
|
||||
* @return void
|
||||
*/
|
||||
public function addServer(string $host='127.0.0.1', int $port=4730): bool
|
||||
public function addServer(string $host, int $port): void
|
||||
{
|
||||
if(!isset($this->server_cache[$host]))
|
||||
if(!isset($this->defined_servers[$host]))
|
||||
{
|
||||
$this->server_cache[$host] = [];
|
||||
$this->defined_servers[$host] = [];
|
||||
}
|
||||
|
||||
if(in_array($port, $this->server_cache[$host]))
|
||||
if(in_array($port, $this->defined_servers[$host]))
|
||||
{
|
||||
return true;
|
||||
return;
|
||||
}
|
||||
|
||||
$this->server_cache[$host][] = $port;
|
||||
|
||||
try
|
||||
{
|
||||
return $this->client->addServer($host, $port);
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
throw new ServerException($e->getMessage(), $e->getCode(), $e);
|
||||
}
|
||||
$this->defined_servers[$host][] = $port;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -138,11 +106,160 @@
|
|||
*
|
||||
* @link http://php.net/manual/en/gearmanclient.addservers.php
|
||||
* @param array $servers (host:port, host:port, ...)
|
||||
* @return void
|
||||
*/
|
||||
public function addServers(array $servers): void
|
||||
{
|
||||
foreach($servers as $server)
|
||||
{
|
||||
$server = explode(':', $server);
|
||||
$this->addServer($server[0], $server[1]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects to the server(s)
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function connect(): void
|
||||
{
|
||||
if($this->isConnected())
|
||||
return;
|
||||
|
||||
$this->client = new GearmanClient();
|
||||
|
||||
// Parse $options combination via bitwise OR operator
|
||||
$options = array_reduce($this->options, function($carry, $item)
|
||||
{
|
||||
return $carry | $item;
|
||||
});
|
||||
|
||||
$this->client->addOptions($options);
|
||||
|
||||
foreach($this->defined_servers as $host => $ports)
|
||||
{
|
||||
foreach($ports as $port)
|
||||
{
|
||||
try
|
||||
{
|
||||
$this->client->addServer($host, $port);
|
||||
Log::debug('net.nosial.tamerlib', 'connected to gearman server: ' . $host . ':' . $port);
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
throw new ConnectionException('Failed to connect to Gearman server: ' . $host . ':' . $port, 0, $e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$this->client->setCompleteCallback([$this, 'callbackHandler']);
|
||||
$this->client->setFailCallback([$this, 'callbackHandler']);
|
||||
$this->client->setDataCallback([$this, 'callbackHandler']);
|
||||
$this->client->setStatusCallback([$this, 'callbackHandler']);
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnects from the server(s)
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function disconnect(): void
|
||||
{
|
||||
if(!$this->isConnected())
|
||||
return;
|
||||
|
||||
Log::debug('net.nosial.tamerlib', 'disconnecting from gearman server(s)');
|
||||
$this->client->clearCallbacks();
|
||||
unset($this->client);
|
||||
$this->client = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnects to the server(s)
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function reconnect(): void
|
||||
{
|
||||
Console::outDebug('net.nosial.tamerlib', 'reconnecting to gearman server(s)');
|
||||
|
||||
$this->disconnect();
|
||||
$this->connect();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current status of the client
|
||||
*
|
||||
* @inheritDoc
|
||||
* @return bool
|
||||
*/
|
||||
public function addServers(array $servers): bool
|
||||
public function isConnected(): bool
|
||||
{
|
||||
return $this->client->addServers(implode(',', $servers));
|
||||
if($this->client === null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* The automatic reconnect process
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function preformAutoreconf(): void
|
||||
{
|
||||
if($this->automatic_reconnect && $this->next_reconnect < time())
|
||||
{
|
||||
try
|
||||
{
|
||||
$this->reconnect();
|
||||
}
|
||||
catch (Exception $e)
|
||||
{
|
||||
Log::error('net.nosial.tamerlib', 'Failed to reconnect to Gearman server: ' . $e->getMessage());
|
||||
}
|
||||
finally
|
||||
{
|
||||
$this->next_reconnect = time() + 1800;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds client options
|
||||
*
|
||||
* @link http://php.net/manual/en/gearmanclient.addoptions.php
|
||||
* @param int[] $options (GEARMAN_CLIENT_NON_BLOCKING, GEARMAN_CLIENT_UNBUFFERED_RESULT, GEARMAN_CLIENT_FREE_TASKS)
|
||||
* @return void
|
||||
*/
|
||||
public function setOptions(array $options): void
|
||||
{
|
||||
$this->options = $options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current client options
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function getOptions(): array
|
||||
{
|
||||
return $this->options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the current client options
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function clearOptions(): void
|
||||
{
|
||||
$this->options = [];
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -150,7 +267,6 @@
|
|||
*
|
||||
* @param Closure $closure
|
||||
* @return void
|
||||
* @throws ServerException
|
||||
*/
|
||||
public function doClosure(Closure $closure): void
|
||||
{
|
||||
|
@ -164,15 +280,10 @@
|
|||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
* @throws ServerException
|
||||
*/
|
||||
public function do(Task $task): void
|
||||
{
|
||||
if($this->automatic_reconnect && time() > $this->next_reconnect)
|
||||
{
|
||||
$this->reconnect();
|
||||
$this->next_reconnect = time() + 1800;
|
||||
}
|
||||
$this->preformAutoreconf();
|
||||
|
||||
$this->tasks[] = $task;
|
||||
$job = new Job($task);
|
||||
|
@ -192,7 +303,6 @@
|
|||
$this->client->doBackground($task->getFunctionName(), msgpack_pack($job->toArray()));
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -200,15 +310,10 @@
|
|||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
* @throws ServerException
|
||||
*/
|
||||
public function queue(Task $task): void
|
||||
{
|
||||
if($this->automatic_reconnect && time() > $this->next_reconnect)
|
||||
{
|
||||
$this->reconnect();
|
||||
$this->next_reconnect = time() + 1800;
|
||||
}
|
||||
$this->preformAutoreconf();
|
||||
|
||||
$this->tasks[] = $task;
|
||||
$job = new Job($task);
|
||||
|
@ -234,11 +339,10 @@
|
|||
* Adds a closure task to the list of tasks to run
|
||||
*
|
||||
* @param Closure $closure
|
||||
* @param $callback
|
||||
* @param Closure|null $callback
|
||||
* @return void
|
||||
* @throws ServerException
|
||||
*/
|
||||
public function queueClosure(Closure $closure, $callback): void
|
||||
public function queueClosure(Closure $closure, ?Closure $callback=null): void
|
||||
{
|
||||
$closure_task = new Task('tamer_closure', $closure, $callback);
|
||||
$closure_task->setClosure(true);
|
||||
|
@ -247,15 +351,13 @@
|
|||
|
||||
/**
|
||||
* @return bool
|
||||
* @throws ServerException
|
||||
*/
|
||||
public function run(): bool
|
||||
{
|
||||
if($this->automatic_reconnect && time() > $this->next_reconnect)
|
||||
{
|
||||
$this->reconnect();
|
||||
$this->next_reconnect = time() + 1800;
|
||||
}
|
||||
if(!$this->isConnected())
|
||||
return false;
|
||||
|
||||
$this->preformAutoreconf();
|
||||
|
||||
if(!$this->client->runTasks())
|
||||
{
|
||||
|
@ -275,21 +377,24 @@
|
|||
{
|
||||
$job_result = JobResults::fromArray(msgpack_unpack($task->data()));
|
||||
$internal_task = $this->getTaskById($job_result->getId());
|
||||
$job_status = match ($task->returnCode())
|
||||
{
|
||||
GEARMAN_WORK_EXCEPTION => JobStatus::Exception,
|
||||
GEARMAN_WORK_FAIL => JobStatus::Failure,
|
||||
default => JobStatus::Success,
|
||||
};
|
||||
|
||||
Log::debug('net.nosial.tamerlib', 'callback for task ' . $internal_task->getId() . ' with status ' . $job_result->getStatus() . ' and data size ' . strlen($task->data()) . ' bytes');
|
||||
|
||||
try
|
||||
{
|
||||
Log::debug('net.nosial.tamer', 'callback for task ' . $internal_task->getId() . ' with status ' . $job_status . ' and data size ' . strlen($task->data()) . ' bytes');
|
||||
if($internal_task->isClosure())
|
||||
{
|
||||
// If the task is a closure, we need to run the callback with the closure's return value
|
||||
// instead of the job result object
|
||||
$internal_task->runCallback($job_result->getData());
|
||||
return;
|
||||
}
|
||||
|
||||
$internal_task->runCallback($job_result);
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
Log::error('net.nosial.tamer', 'Callback for task ' . $internal_task->getId() . ' failed with error: ' . $e->getMessage(), $e);
|
||||
Log::error('net.nosial.tamerlib', 'Failed to run callback for task ' . $internal_task->getId() . ': ' . $e->getMessage(), $e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
@ -314,24 +419,6 @@
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws ServerException
|
||||
*/
|
||||
private function reconnect()
|
||||
{
|
||||
$this->client = new \GearmanClient();
|
||||
|
||||
foreach($this->server_cache as $host => $ports)
|
||||
{
|
||||
foreach($ports as $port)
|
||||
{
|
||||
$this->addServer($host, $port);
|
||||
}
|
||||
}
|
||||
|
||||
$this->registerCallbacks();
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a task from the list of tasks
|
||||
*
|
||||
|
@ -350,17 +437,17 @@
|
|||
/**
|
||||
* @return bool
|
||||
*/
|
||||
public function isAutomaticReconnect(): bool
|
||||
public function automaticReconnectionEnabled(): bool
|
||||
{
|
||||
return $this->automatic_reconnect;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bool $automatic_reconnect
|
||||
* @param bool $enable
|
||||
*/
|
||||
public function setAutomaticReconnect(bool $automatic_reconnect): void
|
||||
public function enableAutomaticReconnection(bool $enable): void
|
||||
{
|
||||
$this->automatic_reconnect = $automatic_reconnect;
|
||||
$this->automatic_reconnect = $enable;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -370,13 +457,15 @@
|
|||
{
|
||||
try
|
||||
{
|
||||
$this->client->runTasks();
|
||||
$this->run();
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
|
||||
unset($this->client);
|
||||
finally
|
||||
{
|
||||
$this->disconnect();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -6,10 +6,11 @@
|
|||
|
||||
use Exception;
|
||||
use GearmanJob;
|
||||
use GearmanWorker;
|
||||
use LogLib\Log;
|
||||
use Opis\Closure\SerializableClosure;
|
||||
use Tamer\Abstracts\JobStatus;
|
||||
use Tamer\Exceptions\ServerException;
|
||||
use Tamer\Exceptions\WorkerException;
|
||||
use Tamer\Exceptions\ConnectionException;
|
||||
use Tamer\Interfaces\WorkerProtocolInterface;
|
||||
use Tamer\Objects\Job;
|
||||
use Tamer\Objects\JobResults;
|
||||
|
@ -17,92 +18,76 @@
|
|||
class Worker implements WorkerProtocolInterface
|
||||
{
|
||||
/**
|
||||
* @var \GearmanWorker|null
|
||||
* The Gearman Worker Instance (if connected)
|
||||
*
|
||||
* @var GearmanWorker|null
|
||||
*/
|
||||
private $worker;
|
||||
|
||||
/**
|
||||
* The list of servers that have been added
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $server_cache;
|
||||
private $defined_servers;
|
||||
|
||||
/**
|
||||
* Indicates if the worker should automatically reconnect to the server
|
||||
*
|
||||
* @var bool
|
||||
*/
|
||||
private $automatic_reconnect;
|
||||
|
||||
/**
|
||||
* The Unix Timestamp of when the next reconnect should occur (if automatic_reconnect is true)
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
private $next_reconnect;
|
||||
|
||||
/**
|
||||
* The options to use when connecting to the server
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $options;
|
||||
|
||||
/**
|
||||
* Public Constructor with optional username and password
|
||||
*
|
||||
* @param string|null $username
|
||||
* @param string|null $password
|
||||
*/
|
||||
public function __construct(?string $username=null, ?string $password=null)
|
||||
{
|
||||
$this->worker = null;
|
||||
$this->server_cache = [];
|
||||
$this->defined_servers = [];
|
||||
$this->automatic_reconnect = false;
|
||||
$this->next_reconnect = time() + 1800;
|
||||
|
||||
try
|
||||
{
|
||||
$this->reconnect();
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
$this->options = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array $options
|
||||
* @return bool
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function addOptions(array $options): bool
|
||||
{
|
||||
if($this->worker == null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
$options = array_map(function($option)
|
||||
{
|
||||
return constant($option);
|
||||
}, $options);
|
||||
|
||||
return $this->worker->addOptions(array_sum($options));
|
||||
}
|
||||
/**
|
||||
* Adds a server to the list of servers to use
|
||||
*
|
||||
* @link http://php.net/manual/en/gearmanworker.addserver.php
|
||||
* @param string $host (
|
||||
* @param int $port (default: 4730)
|
||||
* @return bool
|
||||
* @throws ServerException
|
||||
* @return void
|
||||
*/
|
||||
public function addServer(string $host='127.0.0.1', int $port=4730): bool
|
||||
public function addServer(string $host, int $port): void
|
||||
{
|
||||
if(!isset($this->server_cache[$host]))
|
||||
if(!isset($this->defined_servers[$host]))
|
||||
{
|
||||
$this->server_cache[$host] = [];
|
||||
$this->defined_servers[$host] = [];
|
||||
}
|
||||
|
||||
if(in_array($port, $this->server_cache[$host]))
|
||||
if(in_array($port, $this->defined_servers[$host]))
|
||||
{
|
||||
return true;
|
||||
return;
|
||||
}
|
||||
|
||||
$this->server_cache[$host][] = $port;
|
||||
|
||||
try
|
||||
{
|
||||
return $this->worker->addServer($host, $port);
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
throw new ServerException($e->getMessage(), $e->getCode(), $e);
|
||||
}
|
||||
$this->defined_servers[$host][] = $port;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -111,7 +96,6 @@
|
|||
* @link http://php.net/manual/en/gearmanworker.addservers.php
|
||||
* @param string[] $servers (host:port, host:port, ...)
|
||||
* @return void
|
||||
* @throws ServerException
|
||||
*/
|
||||
public function addServers(array $servers): void
|
||||
{
|
||||
|
@ -122,85 +106,42 @@
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Adds a function to the list of functions to call
|
||||
* Connects to the server
|
||||
*
|
||||
* @link http://php.net/manual/en/gearmanworker.addfunction.php
|
||||
* @param string $function_name The name of the function to register with the job server
|
||||
* @param callable $function The callback function to call when the job is received
|
||||
* @param mixed|null $context (optional) The context to pass to the callback function
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function addFunction(string $function_name, callable $function, mixed $context=null): void
|
||||
public function connect(): void
|
||||
{
|
||||
$this->worker->addFunction($function_name, function(GearmanJob $job) use ($function, $context)
|
||||
{
|
||||
$received_job = Job::fromArray(msgpack_unpack($job->workload()));
|
||||
|
||||
try
|
||||
{
|
||||
$result = $function($received_job, $context);
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
$job->sendFail();
|
||||
if($this->isConnected())
|
||||
return;
|
||||
}
|
||||
|
||||
$job_results = new JobResults($received_job, JobStatus::Success, $result);
|
||||
$job->sendComplete(msgpack_pack($job_results->toArray()));
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a function from the list of functions to call
|
||||
*
|
||||
* @param string $function_name The name of the function to unregister
|
||||
* @return void
|
||||
*/
|
||||
public function removeFunction(string $function_name): void
|
||||
{
|
||||
$this->worker->unregister($function_name);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return bool
|
||||
*/
|
||||
public function isAutomaticReconnect(): bool
|
||||
{
|
||||
return $this->automatic_reconnect;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bool $automatic_reconnect
|
||||
* @return void
|
||||
*/
|
||||
public function setAutomaticReconnect(bool $automatic_reconnect): void
|
||||
{
|
||||
$this->automatic_reconnect = $automatic_reconnect;
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws ServerException
|
||||
*/
|
||||
private function reconnect()
|
||||
{
|
||||
$this->worker = new \GearmanWorker();
|
||||
$this->worker = new GearmanWorker();
|
||||
$this->worker->addOptions(GEARMAN_WORKER_GRAB_UNIQ);
|
||||
|
||||
foreach($this->server_cache as $host => $ports)
|
||||
Log::debug('net.nosial.tamerlib', 'connecting to gearman server(s)');
|
||||
|
||||
foreach($this->defined_servers as $host => $ports)
|
||||
{
|
||||
foreach($ports as $port)
|
||||
{
|
||||
$this->addServer($host, $port);
|
||||
try
|
||||
{
|
||||
$this->worker->addServer($host, $port);
|
||||
Log::debug('net.nosial.tamerlib', 'connected to gearman server: ' . $host . ':' . $port);
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
throw new ConnectionException('Failed to connect to Gearman server: ' . $host . ':' . $port, 0, $e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$this->worker->addFunction('tamer_closure', function(GearmanJob $job)
|
||||
{
|
||||
$received_job = Job::fromArray(msgpack_unpack($job->workload()));
|
||||
Log::debug('net.nosial.tamerlib', 'received closure: ' . $received_job->getId());
|
||||
|
||||
try
|
||||
{
|
||||
|
@ -217,9 +158,163 @@
|
|||
|
||||
$job_results = new JobResults($received_job, JobStatus::Success, $result);
|
||||
$job->sendComplete(msgpack_pack($job_results->toArray()));
|
||||
Log::debug('net.nosial.tamerlib', 'completed closure: ' . $received_job->getId());
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnects from the server
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function disconnect(): void
|
||||
{
|
||||
if(!$this->isConnected())
|
||||
return;
|
||||
|
||||
$this->worker->unregisterAll();
|
||||
unset($this->worker);
|
||||
$this->worker = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnects to the server if the connection has been lost
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function reconnect(): void
|
||||
{
|
||||
$this->disconnect();
|
||||
$this->connect();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the worker is connected to the server
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isConnected(): bool
|
||||
{
|
||||
return $this->worker !== null;
|
||||
}
|
||||
|
||||
/**
|
||||
* The automatic reconnect process
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function preformAutoreconf(): void
|
||||
{
|
||||
if($this->automatic_reconnect && $this->next_reconnect < time())
|
||||
{
|
||||
try
|
||||
{
|
||||
$this->reconnect();
|
||||
}
|
||||
catch (Exception $e)
|
||||
{
|
||||
Log::error('net.nosial.tamerlib', 'Failed to reconnect to Gearman server: ' . $e->getMessage());
|
||||
}
|
||||
finally
|
||||
{
|
||||
$this->next_reconnect = time() + 1800;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the options to use when connecting to the server
|
||||
*
|
||||
* @param array $options
|
||||
* @return bool
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function setOptions(array $options): void
|
||||
{
|
||||
$this->options = $options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the options to use when connecting to the server
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function getOptions(): array
|
||||
{
|
||||
return $this->options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the options to use when connecting to the server
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function clearOptions(): void
|
||||
{
|
||||
$this->options = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* @return bool
|
||||
*/
|
||||
public function automaticReconnectionEnabled(): bool
|
||||
{
|
||||
return $this->automatic_reconnect;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bool $enable
|
||||
* @return void
|
||||
*/
|
||||
public function enableAutomaticReconnection(bool $enable): void
|
||||
{
|
||||
$this->automatic_reconnect = $enable;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a function to the list of functions to call
|
||||
*
|
||||
* @link http://php.net/manual/en/gearmanworker.addfunction.php
|
||||
* @param string $name The name of the function to register with the job server
|
||||
* @param callable $callable The callback function to call when the job is received
|
||||
* @return void
|
||||
*/
|
||||
public function addFunction(string $name, callable $callable): void
|
||||
{
|
||||
$this->worker->addFunction($name, function(GearmanJob $job) use ($callable)
|
||||
{
|
||||
$received_job = Job::fromArray(msgpack_unpack($job->workload()));
|
||||
Log::debug('net.nosial.tamerlib', 'received job: ' . $received_job->getId());
|
||||
|
||||
try
|
||||
{
|
||||
$result = $callable($received_job);
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
$job->sendFail();
|
||||
unset($e);
|
||||
return;
|
||||
}
|
||||
|
||||
$job_results = new JobResults($received_job, JobStatus::Success, $result);
|
||||
$job->sendComplete(msgpack_pack($job_results->toArray()));
|
||||
Log::debug('net.nosial.tamerlib', 'completed job: ' . $received_job->getId());
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a function from the list of functions to call
|
||||
*
|
||||
* @param string $function_name The name of the function to unregister
|
||||
* @return void
|
||||
*/
|
||||
public function removeFunction(string $function_name): void
|
||||
{
|
||||
$this->worker->unregister($function_name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for a job and calls the appropriate callback function
|
||||
*
|
||||
|
@ -228,26 +323,20 @@
|
|||
* @param int $timeout (default: 500) The timeout in milliseconds (if $blocking is false)
|
||||
* @param bool $throw_errors (default: false) Whether to throw exceptions on errors
|
||||
* @return void Returns nothing
|
||||
* @throws ServerException If the worker cannot connect to the server
|
||||
* @throws WorkerException If the worker encounters an error while working if $throw_errors is true
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public function work(bool $blocking=true, int $timeout=500, bool $throw_errors=false): void
|
||||
{
|
||||
if($this->automatic_reconnect && (time() > $this->next_reconnect))
|
||||
{
|
||||
$this->reconnect();
|
||||
$this->next_reconnect = time() + 1800;
|
||||
}
|
||||
|
||||
$this->worker->setTimeout($timeout);
|
||||
|
||||
while(true)
|
||||
{
|
||||
@$this->preformAutoreconf();
|
||||
@$this->worker->work();
|
||||
|
||||
if($this->worker->returnCode() == GEARMAN_COULD_NOT_CONNECT)
|
||||
{
|
||||
throw new ServerException('Could not connect to Gearman server');
|
||||
throw new ConnectionException('Could not connect to Gearman server');
|
||||
}
|
||||
|
||||
if($this->worker->returnCode() == GEARMAN_TIMEOUT && !$blocking)
|
||||
|
@ -257,8 +346,28 @@
|
|||
|
||||
if($this->worker->returnCode() != GEARMAN_SUCCESS && $throw_errors)
|
||||
{
|
||||
throw new WorkerException('Gearman worker error: ' . $this->worker->error(), $this->worker->returnCode());
|
||||
Log::error('net.nosial.tamerlib', 'Gearman worker error: ' . $this->worker->error());
|
||||
}
|
||||
|
||||
if($blocking)
|
||||
{
|
||||
usleep($timeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes all remaining tasks and closes the connection
|
||||
*/
|
||||
public function __destruct()
|
||||
{
|
||||
try
|
||||
{
|
||||
$this->disconnect();
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
unset($e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -90,7 +90,7 @@
|
|||
}
|
||||
}
|
||||
|
||||
public function addOptions(array $options): bool
|
||||
public function setOptions(array $options): bool
|
||||
{
|
||||
$this->options = $options;
|
||||
return true;
|
||||
|
@ -294,7 +294,7 @@
|
|||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function isAutomaticReconnect(): bool
|
||||
public function automaticReconnectionEnabled(): bool
|
||||
{
|
||||
return $this->automatic_reconnect;
|
||||
}
|
||||
|
@ -302,12 +302,12 @@
|
|||
/**
|
||||
* Enables or disables automatic reconnecting to the server
|
||||
*
|
||||
* @param bool $automatic_reconnect
|
||||
* @param bool $enable
|
||||
* @return void
|
||||
*/
|
||||
public function setAutomaticReconnect(bool $automatic_reconnect): void
|
||||
public function enableAutomaticReconnection(bool $enable): void
|
||||
{
|
||||
$this->automatic_reconnect = $automatic_reconnect;
|
||||
$this->automatic_reconnect = $enable;
|
||||
}
|
||||
|
||||
private function reconnect()
|
||||
|
|
|
@ -119,7 +119,7 @@
|
|||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function addOptions(array $options): bool
|
||||
public function setOptions(array $options): bool
|
||||
{
|
||||
$this->options = $options;
|
||||
return true;
|
||||
|
@ -128,7 +128,7 @@
|
|||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function isAutomaticReconnect(): bool
|
||||
public function automaticReconnectionEnabled(): bool
|
||||
{
|
||||
return $this->automatic_reconnect;
|
||||
}
|
||||
|
@ -136,18 +136,18 @@
|
|||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function setAutomaticReconnect(bool $automatic_reconnect): void
|
||||
public function enableAutomaticReconnection(bool $enable): void
|
||||
{
|
||||
$this->automatic_reconnect = $automatic_reconnect;
|
||||
$this->automatic_reconnect = $enable;
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function addFunction(string $function_name, callable $function, mixed $context = null): void
|
||||
public function addFunction(string $name, callable $callable, mixed $context = null): void
|
||||
{
|
||||
$this->functions[$function_name] = [
|
||||
'function' => $function,
|
||||
$this->functions[$name] = [
|
||||
'function' => $callable,
|
||||
'context' => $context
|
||||
];
|
||||
}
|
||||
|
|
336
src/Tamer/Tamer.php
Normal file
336
src/Tamer/Tamer.php
Normal file
|
@ -0,0 +1,336 @@
|
|||
<?php
|
||||
|
||||
/** @noinspection PhpMissingFieldTypeInspection */
|
||||
|
||||
namespace Tamer;
|
||||
|
||||
use Closure;
|
||||
use InvalidArgumentException;
|
||||
use Tamer\Abstracts\Mode;
|
||||
use Tamer\Classes\Functions;
|
||||
use Tamer\Classes\Validate;
|
||||
use Tamer\Exceptions\ConnectionException;
|
||||
use Tamer\Interfaces\ClientProtocolInterface;
|
||||
use Tamer\Interfaces\WorkerProtocolInterface;
|
||||
use Tamer\Objects\Task;
|
||||
|
||||
class Tamer
|
||||
{
|
||||
/**
|
||||
* The protocol to use when connecting to the server
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
private static $protocol;
|
||||
|
||||
/**
|
||||
* The protocol to use when connecting to the server as a client
|
||||
*
|
||||
* @var ClientProtocolInterface|null
|
||||
*/
|
||||
private static $client;
|
||||
|
||||
/**
|
||||
* The protocol to use when connecting to the server as a worker
|
||||
*
|
||||
* @var WorkerProtocolInterface|null
|
||||
*/
|
||||
private static $worker;
|
||||
|
||||
/**
|
||||
* Indicates if Tamer is running as a client or worker
|
||||
*
|
||||
* @var string
|
||||
* @see Mode
|
||||
*/
|
||||
private static $mode;
|
||||
|
||||
/**
|
||||
* Indicates if Tamer is connected to the server
|
||||
*
|
||||
* @var bool
|
||||
*/
|
||||
private static $connected;
|
||||
|
||||
/**
|
||||
* Connects to a server using the specified protocol and mode (client or worker)
|
||||
*
|
||||
* @param string $protocol
|
||||
* @param string $mode
|
||||
* @param array $servers
|
||||
* @param string|null $username
|
||||
* @param string|null $password
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public static function connect(string $protocol, string $mode, array $servers, ?string $username=null, ?string $password=null): void
|
||||
{
|
||||
if(self::$connected)
|
||||
{
|
||||
throw new ConnectionException('Tamer is already connected to the server');
|
||||
}
|
||||
|
||||
if (!Validate::protocolType($protocol))
|
||||
{
|
||||
throw new InvalidArgumentException(sprintf('Invalid protocol type: %s', $protocol));
|
||||
}
|
||||
|
||||
if (!Validate::mode($mode))
|
||||
{
|
||||
throw new InvalidArgumentException(sprintf('Invalid mode: %s', $mode));
|
||||
}
|
||||
|
||||
self::$protocol = $protocol;
|
||||
self::$mode = $mode;
|
||||
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$client = Functions::createClient($protocol, $username, $password);
|
||||
self::$client->addServers($servers);
|
||||
self::$client->connect();
|
||||
}
|
||||
elseif(self::$mode === Mode::Worker)
|
||||
{
|
||||
self::$worker = Functions::createWorker($protocol, $username, $password);
|
||||
self::$worker->addServers($servers);
|
||||
self::$worker->connect();
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException(sprintf('Invalid mode: %s', $mode));
|
||||
}
|
||||
|
||||
self::$connected = true;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Disconnects from the server
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public static function disconnect(): void
|
||||
{
|
||||
if (!self::$connected)
|
||||
{
|
||||
throw new ConnectionException('Tamer is not connected to the server');
|
||||
}
|
||||
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$client->disconnect();
|
||||
}
|
||||
else
|
||||
{
|
||||
self::$worker->disconnect();
|
||||
}
|
||||
|
||||
self::$connected = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconnects to the server
|
||||
*
|
||||
* @return void
|
||||
* @throws ConnectionException
|
||||
*/
|
||||
public static function reconnect(): void
|
||||
{
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$client->reconnect();
|
||||
}
|
||||
else
|
||||
{
|
||||
self::$worker->reconnect();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a task to the queue to be executed by the worker
|
||||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
*/
|
||||
public static function do(Task $task): void
|
||||
{
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$client->do($task);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in client mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a closure operation in the background (does not return a result)
|
||||
*
|
||||
* @param Closure $closure The closure operation to perform (remote)
|
||||
* @return void
|
||||
*/
|
||||
public static function doClosure(Closure $closure): void
|
||||
{
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$client->doClosure($closure);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in client mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues a task to be processed in parallel (returns a result handled by a callback)
|
||||
*
|
||||
* @param Task $task
|
||||
* @return void
|
||||
*/
|
||||
public static function queue(Task $task): void
|
||||
{
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$client->queue($task);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in client mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues a closure to be processed in parallel (returns a result handled by a callback)
|
||||
*
|
||||
* @param Closure $closure The closure operation to perform (remote)
|
||||
* @param Closure|null $callback The closure to call when the operation is complete (local)
|
||||
* @return void
|
||||
*/
|
||||
public static function queueClosure(Closure $closure, ?Closure $callback=null): void
|
||||
{
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
self::$client->queueClosure($closure, $callback);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in client mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes all tasks in the queue and waits for them to complete
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public static function run(): bool
|
||||
{
|
||||
if (self::$mode === Mode::Client)
|
||||
{
|
||||
return self::$client->run();
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in client mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a function to the worker
|
||||
*
|
||||
* @param string $name The name of the function to add
|
||||
* @param callable $callable The function to add
|
||||
* @return void
|
||||
*/
|
||||
public static function addFunction(string $name, callable $callable): void
|
||||
{
|
||||
if (self::$mode === Mode::Worker)
|
||||
{
|
||||
self::$worker->addFunction($name, $callable);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in worker mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a function from the worker
|
||||
*
|
||||
* @param string $function_name The name of the function to remove
|
||||
* @return void
|
||||
*/
|
||||
public static function removeFunction(string $function_name): void
|
||||
{
|
||||
if (self::$mode === Mode::Worker)
|
||||
{
|
||||
self::$worker->removeFunction($function_name);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in worker mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Works a job from the queue (blocking or non-blocking)
|
||||
*
|
||||
* @param bool $blocking (optional) Whether to block until a job is available
|
||||
* @param int $timeout (optional) The timeout to use when blocking
|
||||
* @param bool $throw_errors (optional) Whether to throw errors or not
|
||||
* @return void
|
||||
*/
|
||||
public static function work(bool $blocking=true, int $timeout=500, bool $throw_errors=false): void
|
||||
{
|
||||
if (self::$mode === Mode::Worker)
|
||||
{
|
||||
self::$worker->work($blocking, $timeout, $throw_errors);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidArgumentException('Tamer is not running in worker mode');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string
|
||||
*/
|
||||
public static function getProtocol(): string
|
||||
{
|
||||
return self::$protocol;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return ClientProtocolInterface|null
|
||||
*/
|
||||
public static function getClient(): ?ClientProtocolInterface
|
||||
{
|
||||
return self::$client;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return WorkerProtocolInterface|null
|
||||
*/
|
||||
public static function getWorker(): ?WorkerProtocolInterface
|
||||
{
|
||||
return self::$worker;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return string
|
||||
*/
|
||||
public static function getMode(): string
|
||||
{
|
||||
return self::$mode;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return bool
|
||||
*/
|
||||
public static function isConnected(): bool
|
||||
{
|
||||
return self::$connected;
|
||||
}
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
<?php
|
||||
|
||||
|
||||
use Tamer\Abstracts\TaskPriority;
|
||||
use Tamer\Objects\Task;
|
||||
|
||||
require 'ncc';
|
||||
|
@ -14,5 +14,7 @@
|
|||
|
||||
for($i = 0; $i < 500; $i++)
|
||||
{
|
||||
$client->do(new Task('sleep', '5'));
|
||||
$client->do(Task::create('sleep', '5')
|
||||
->setPriority(TaskPriority::High)
|
||||
);
|
||||
}
|
||||
|
|
57
tests/tamer_client.php
Normal file
57
tests/tamer_client.php
Normal file
|
@ -0,0 +1,57 @@
|
|||
<?php
|
||||
|
||||
use Tamer\Abstracts\Mode;
|
||||
use Tamer\Abstracts\ProtocolType;
|
||||
use Tamer\Tamer;
|
||||
|
||||
require 'ncc';
|
||||
|
||||
import('net.nosial.tamerlib', 'latest');
|
||||
|
||||
Tamer::connect(ProtocolType::Gearman, Mode::Client,
|
||||
['127.0.0.1:4730']
|
||||
);
|
||||
|
||||
// Pi calculation (closure)
|
||||
// Add it 10 times
|
||||
for($i = 0; $i < 100; $i++)
|
||||
{
|
||||
Tamer::queueClosure(function() {
|
||||
// Do Pi calculation
|
||||
$pi = 0;
|
||||
$top = 4.0;
|
||||
$bot = 1.0;
|
||||
$minus = true;
|
||||
|
||||
for($i = 0; $i < 1000000; $i++)
|
||||
{
|
||||
if($minus)
|
||||
{
|
||||
$pi -= ($top / $bot);
|
||||
$minus = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
$pi += ($top / $bot);
|
||||
$minus = true;
|
||||
}
|
||||
|
||||
$bot += 2.0;
|
||||
}
|
||||
|
||||
\LogLib\Log::info('net.nosial.tamerlib', sprintf('Pi: %s', $pi));
|
||||
return $pi;
|
||||
});
|
||||
}
|
||||
|
||||
// Sleep function (task)
|
||||
Tamer::queue(\Tamer\Objects\Task::create('sleep', 5, function(\Tamer\Objects\JobResults $data)
|
||||
{
|
||||
echo "Slept for {$data->getData()} seconds \n";
|
||||
}));
|
||||
|
||||
$a = microtime(true);
|
||||
Tamer::run();
|
||||
$b = microtime(true);
|
||||
|
||||
echo "Took " . ($b - $a) . " seconds \n";
|
20
tests/tamer_worker.php
Normal file
20
tests/tamer_worker.php
Normal file
|
@ -0,0 +1,20 @@
|
|||
<?php
|
||||
|
||||
use Tamer\Abstracts\Mode;
|
||||
use Tamer\Abstracts\ProtocolType;
|
||||
use Tamer\Tamer;
|
||||
|
||||
require 'ncc';
|
||||
|
||||
import('net.nosial.tamerlib', 'latest');
|
||||
|
||||
Tamer::connect(ProtocolType::Gearman, Mode::Worker,
|
||||
['127.0.0.1:4730']
|
||||
);
|
||||
|
||||
Tamer::addFunction('sleep', function(\Tamer\Objects\Job $job) {
|
||||
sleep($job->getData());
|
||||
return $job->getData();
|
||||
});
|
||||
|
||||
Tamer::work();
|
Loading…
Add table
Reference in a new issue