diff --git a/.idea/php.xml b/.idea/php.xml
index 666a0dd..8ba8e7b 100644
--- a/.idea/php.xml
+++ b/.idea/php.xml
@@ -15,11 +15,8 @@
-
-
-
-
+
diff --git a/project.json b/project.json
index 37ae278..0c2bd23 100644
--- a/project.json
+++ b/project.json
@@ -47,12 +47,6 @@
"source_type": "remote",
"source": "opis/closure=latest@composer"
},
- {
- "name": "com.php_amqplib.php_amqplib",
- "version": "latest",
- "source_type": "remote",
- "source": "php-amqplib/php-amqplib=latest@composer"
- },
{
"name": "com.symfony.process",
"version": "latest",
diff --git a/src/TamerLib/Abstracts/ExitCodes/WorkerExitCodes.php b/src/TamerLib/Abstracts/ExitCodes/WorkerExitCodes.php
deleted file mode 100644
index 4dd99f6..0000000
--- a/src/TamerLib/Abstracts/ExitCodes/WorkerExitCodes.php
+++ /dev/null
@@ -1,16 +0,0 @@
- new \TamerLib\Protocols\Gearman\Client($username, $password),
- ProtocolType::RabbitMQ => throw new InvalidArgumentException('RabbitMQ is not fully implemented yet'),
- default => throw new InvalidArgumentException('Invalid protocol type'),
- };
- }
-
- /**
- * @param string $protocol
- * @param string|null $username
- * @param string|null $password
- * @return WorkerProtocolInterface
- */
- public static function createWorker(string $protocol, ?string $username=null, ?string $password=null): WorkerProtocolInterface
- {
- /** @noinspection PhpFullyQualifiedNameUsageInspection */
- return match (strtolower($protocol))
- {
- ProtocolType::Gearman => new \TamerLib\Protocols\Gearman\Worker($username, $password),
- ProtocolType::RabbitMQ => throw new InvalidArgumentException('RabbitMQ is not fully implemented yet'),
- default => throw new InvalidArgumentException('Invalid protocol type'),
- };
- }
-
- /**
- * Returns the worker variables from the environment variables
- *
- * @return array
- */
- public static function getWorkerVariables(): array
- {
- if(self::$worker_variables == null)
- {
- self::$worker_variables = [
- 'TAMER_ENABLED' => getenv('TAMER_ENABLED') === 'true',
- 'TAMER_PROTOCOL' => getenv('TAMER_PROTOCOL'),
- 'TAMER_SERVERS' => getenv('TAMER_SERVERS'),
- 'TAMER_USERNAME' => getenv('TAMER_USERNAME'),
- 'TAMER_PASSWORD' => getenv('TAMER_PASSWORD'),
- 'TAMER_INSTANCE_ID' => getenv('TAMER_INSTANCE_ID'),
- ];
-
- if(self::$worker_variables['TAMER_SERVERS'] !== false)
- self::$worker_variables['TAMER_SERVERS'] = explode(',', self::$worker_variables['TAMER_SERVERS']);
- }
-
- return self::$worker_variables;
- }
-
- /**
- * Returns the path to the php binary
- *
- * @return string
- * @throws Exception
- */
- public static function findPhpBin(): string
- {
- if(self::$php_bin !== null)
- return self::$php_bin;
-
- $php_finder = new PhpExecutableFinder();
- $php_bin = $php_finder->find();
- if($php_bin === false)
- throw new Exception('Unable to find the php binary');
-
- self::$php_bin = $php_bin;
- return $php_bin;
- }
-
- /**
- * Calculates the priority for a task based on the priority level
- *
- * @param int $priority
- * @return int
- */
- public static function calculatePriority(int $priority): int
- {
- if($priority < TaskPriority::Low)
- return 0;
-
- if($priority > TaskPriority::High)
- return 255;
-
- return (int) round(($priority / TaskPriority::High) * 255);
- }
- }
\ No newline at end of file
diff --git a/src/TamerLib/Classes/RedisServer.php b/src/TamerLib/Classes/RedisServer.php
new file mode 100644
index 0000000..cc6f2d4
--- /dev/null
+++ b/src/TamerLib/Classes/RedisServer.php
@@ -0,0 +1,155 @@
+cmd = $cmd;
+ $this->host = $host;
+ $this->port = $port;
+ }
+
+ /**
+ * Returns the port that the Redis server is listening on.
+ *
+ * @return int|null
+ */
+ public function getPort(): ?int
+ {
+ return $this->port;
+ }
+
+ /**
+ * Determines if the Redis server is running.
+ *
+ * @return bool
+ */
+ public function isRunning(): bool
+ {
+ if(is_null($this->server_process))
+ {
+ return false;
+ }
+
+ return $this->server_process->isRunning();
+ }
+
+ /**
+ * Starts the Redis server.
+ *
+ * @param int $timeout
+ * @return bool
+ * @throws RedisServerException
+ * @throws RedisException
+ */
+ public function start(int $timeout=60): bool
+ {
+ if($this->isRunning())
+ {
+ return true;
+ }
+
+ Log::verbose('net.nosial.tamerlib', 'Starting Redis server on port ' . $this->port . '.');
+ $this->server_process = new Process([$this->cmd, '--port', $this->port]);
+ $this->server_process->start();
+
+ // Use a redis client and ping the server until it responds.
+ $redis_client = new Redis();
+ $timeout_counter = 0;
+
+ while(!$redis_client->isConnected())
+ {
+ if($timeout_counter >= $timeout)
+ {
+ throw new RedisServerException('Redis server failed to start within ' . $timeout . ' seconds.');
+ }
+
+ try
+ {
+ $redis_client->connect($this->host, $this->port);
+ }
+ catch (RedisException $e)
+ {
+ // Do nothing.
+ }
+ finally
+ {
+ sleep(1);
+ $timeout_counter++;
+ }
+ }
+
+ Log::verbose('net.nosial.tamerlib', 'Redis server started.');
+ return true;
+ }
+
+ /**
+ * Stops the Redis server.
+ *
+ * @return bool
+ */
+ public function stop(): bool
+ {
+ if(!$this->isRunning())
+ {
+ return true;
+ }
+
+ $this->server_process->stop();
+ Log::verbose('net.nosial.tamerlib', 'Redis server stopped.');
+ return true;
+ }
+
+ /**
+ * Terminates the Redis server.
+ */
+ public function __destruct()
+ {
+ $this->stop();
+ }
+ }
\ No newline at end of file
diff --git a/src/TamerLib/Classes/Supervisor.php b/src/TamerLib/Classes/Supervisor.php
deleted file mode 100644
index 5c5dabc..0000000
--- a/src/TamerLib/Classes/Supervisor.php
+++ /dev/null
@@ -1,194 +0,0 @@
-workers = [];
- $this->protocol = $protocol;
- $this->servers = $servers;
- $this->username = $username;
- $this->password = $password;
- }
-
- /**
- * Adds a worker to the supervisor instance
- *
- * @param string $target
- * @param int $instances
- * @return void
- * @throws Exception
- */
- public function addWorker(string $target, int $instances): void
- {
- for ($i = 0; $i < $instances; $i++)
- {
- $this->workers[] = new WorkerInstance($target, $this->protocol, $this->servers, $this->username, $this->password);
- }
- }
-
- /**
- * Starts all the workers
- *
- * @return void
- * @throws Exception
- */
- public function start(): void
- {
- /** @var WorkerInstance $worker */
- foreach ($this->workers as $worker)
- {
- $worker->start();
- }
-
- // Ensure that all the workers are running
- foreach($this->workers as $worker)
- {
- if (!$worker->isRunning())
- {
- throw new Exception("Worker {$worker->getId()} is not running");
- }
-
- while(true)
- {
- switch($worker->getProcess()->getStatus())
- {
- case Process::STATUS_STARTED:
- Log::debug('net.nosial.tamerlib', "worker {$worker->getId()} is running");
- break 2;
-
- case Process::STATUS_TERMINATED:
- throw new Exception("Worker {$worker->getId()} has terminated");
-
- default:
- echo "Worker {$worker->getId()} is {$worker->getProcess()->getStatus()}" . PHP_EOL;
- }
- }
- }
- }
-
- /**
- * Stops all the workers
- *
- * @return void
- * @throws Exception
- */
- public function stop(): void
- {
- /** @var WorkerInstance $worker */
- foreach ($this->workers as $worker)
- {
- $worker->stop();
- }
- }
-
- /**
- * Restarts all the workers
- *
- * @return void
- * @throws Exception
- */
- public function restart(): void
- {
- /** @var WorkerInstance $worker */
- foreach ($this->workers as $worker)
- {
- $worker->stop();
- $worker->start();
- }
- }
-
- /**
- * Monitors all the workers and restarts them if they are not running
- *
- * @param bool $blocking
- * @param bool $auto_restart
- * @return void
- * @throws Exception
- */
- public function monitor(bool $blocking=false, bool $auto_restart=true): void
- {
- while(true)
- {
- /** @var WorkerInstance $worker */
- foreach ($this->workers as $worker)
- {
- if (!$worker->isRunning())
- {
- if ($auto_restart)
- {
- Log::warning('net.nosial.tamerlib', "worker {$worker->getId()} is not running, restarting");
- $worker->start();
- }
- else
- {
- throw new Exception("Worker {$worker->getId()} is not running");
- }
- }
- }
-
- if (!$blocking)
- {
- break;
- }
-
- sleep(1);
- }
- }
-
- /**
- * @throws Exception
- */
- public function __destruct()
- {
- $this->stop();
- }
-
- }
\ No newline at end of file
diff --git a/src/TamerLib/Classes/Utilities.php b/src/TamerLib/Classes/Utilities.php
new file mode 100644
index 0000000..a2128a7
--- /dev/null
+++ b/src/TamerLib/Classes/Utilities.php
@@ -0,0 +1,38 @@
+ true,
- default => false,
- };
- }
-
- /**
- * @param string $input
- * @return bool
- */
- public static function mode(string $input): bool
- {
- return match (strtolower($input))
- {
- Mode::Client, Mode::Worker => true,
- default => false,
- };
- }
-
- /**
- * Returns true if the input is a valid task priority.
- *
- * @param int $input
- * @return bool
- */
- public static function taskPriority(int $input): bool
- {
- return match ($input)
- {
- TaskPriority::Low, TaskPriority::Normal, TaskPriority::High => true,
- default => false,
- };
- }
-
-
- /**
- * Determines the object type
- *
- * @param $input
- * @return string
- */
- public static function getObjectType($input): string
- {
- if(!is_array($input))
- {
- return ObjectType::Unknown;
- }
-
- if(!array_key_exists('type', $input))
- {
- return ObjectType::Unknown;
- }
-
- return match ($input['type'])
- {
- ObjectType::Job => ObjectType::Job,
- ObjectType::JobResults => ObjectType::JobResults,
- default => ObjectType::Unknown,
- };
- }
- }
\ No newline at end of file
diff --git a/src/TamerLib/Exceptions/ConnectionException.php b/src/TamerLib/Exceptions/ConnectionException.php
deleted file mode 100644
index e8ab5dd..0000000
--- a/src/TamerLib/Exceptions/ConnectionException.php
+++ /dev/null
@@ -1,19 +0,0 @@
-id = $task->getId();
- $this->name = $task->getFunctionName();
- $this->data = $task->getData();
- $this->closure = $task->isClosure();
- }
-
- /**
- * Returns the ID of the Job
- *
- * @return string
- */
- public function getId(): string
- {
- return $this->id;
- }
-
- /**
- * Returns the function name of the Job
- *
- * @return string
- */
- public function getName(): string
- {
- return $this->name;
- }
-
- /**
- * Returns the data of the Job
- *
- * @return string|Closure|null
- */
- public function getData(): Closure|string|null
- {
- return $this->data;
- }
-
- /**
- * @return bool
- */
- public function isClosure(): bool
- {
- return $this->closure;
- }
-
- /**
- * Returns an array representation of the Job
- *
- * @return array
- */
- public function toArray(): array
- {
- return [
- 'type' => 'tamer_job',
- 'id' => $this->id,
- 'name' => $this->name,
- 'data' => ($this->closure ? serialize(new SerializableClosure($this->data)) : $this->data),
- 'closure' => $this->closure
- ];
- }
-
- /**
- * Constructs a Job from an array
- *
- * @param array $data
- * @return Job
- */
- public static function fromArray(array $data): Job
- {
- $job_data = $data['data'];
-
- if($data['closure'] === true)
- {
- /** @var SerializableClosure $job_data */
- $job_data = unserialize($data['data']);
- $job_data = $job_data->getClosure();
- }
-
- $job = new Job(new Task($data['name'], $job_data));
- $job->id = $data['id'];
- $job->closure = $data['closure'];
-
- return $job;
- }
-
- }
\ No newline at end of file
diff --git a/src/TamerLib/Objects/JobResults.php b/src/TamerLib/Objects/JobResults.php
deleted file mode 100644
index 83def50..0000000
--- a/src/TamerLib/Objects/JobResults.php
+++ /dev/null
@@ -1,130 +0,0 @@
-id = $job->getId();
- $this->data = $results;
- $this->status = $status;
- }
- }
-
- /**
- * Returns the ID of the Job
- *
- * @return string
- */
- public function getId(): string
- {
- return $this->id;
- }
-
-
- /**
- * Returns the data of the Job
- *
- * @return string
- */
- public function getData(): string
- {
- return $this->data;
- }
-
- /**
- * @return int
- * @noinspection PhpUnused
- */
- public function getStatus(): int
- {
- return $this->status;
- }
-
- /**
- * Returns an array representation of the Job
- *
- * @return array
- */
- public function toArray(): array
- {
- return [
- 'type' => 'tamer_job_results',
- 'id' => $this->id,
- 'data' => $this->data,
- 'status' => $this->status
- ];
- }
-
- /**
- * Constructs a Job from an array
- *
- * @param array $data
- * @return JobResults
- */
- public static function fromArray(array $data): JobResults
- {
- $job = new JobResults();
-
- $job->setId($data['id']);
- $job->setData($data['data']);
- $job->setStatus($data['status']);
-
- return $job;
- }
-
- /**
- * @param string $id
- */
- protected function setId(string $id): void
- {
- $this->id = $id;
- }
-
- /**
- * @param string $data
- */
- protected function setData(string $data): void
- {
- $this->data = $data;
- }
-
- /**
- * @param int|null $status
- */
- protected function setStatus(?int $status): void
- {
- $this->status = $status;
- }
-
-
- }
\ No newline at end of file
diff --git a/src/TamerLib/Objects/Task.php b/src/TamerLib/Objects/Task.php
deleted file mode 100644
index da55d34..0000000
--- a/src/TamerLib/Objects/Task.php
+++ /dev/null
@@ -1,192 +0,0 @@
-function_name = $function_name;
- $this->data = $data;
- $this->id = uniqid();
- $this->priority = TaskPriority::Normal;
- $this->callback = $callback;
- $this->closure = false;
- }
-
- /**
- * Static Constructor
- *
- * @param string $function_name
- * @param string|Closure|null $data
- * @param callable|null $callback
- * @return static
- */
- public static function create(string $function_name, string|Closure|null $data, callable $callback=null): self
- {
- return new self($function_name, $data, $callback);
- }
-
- /**
- * Returns the function name for the task
- *
- * @return string
- */
- public function getFunctionName(): string
- {
- return $this->function_name;
- }
-
- /**
- * Sets the function name for the task
- *
- * @param string $function_name
- * @return Task
- */
- public function setFunctionName(string $function_name): self
- {
- $this->function_name = $function_name;
- return $this;
- }
-
- /**
- * Returns the arguments for the task
- *
- * @return string|Closure|null
- */
- public function getData(): string|null|Closure
- {
- return $this->data;
- }
-
- /**
- * Sets the arguments for the task
- *
- * @param string $data
- * @return Task
- */
- public function setData(string $data): self
- {
- $this->data = $data;
- return $this;
- }
-
- /**
- * Returns the Unique ID of the task
- *
- * @return string
- */
- public function getId(): string
- {
- return $this->id;
- }
-
- /**
- * @return int
- */
- public function getPriority(): int
- {
- return $this->priority;
- }
-
- /**
- * @param int $priority
- * @return Task
- */
- public function setPriority(int $priority): self
- {
- if(!Validate::taskPriority($priority))
- {
- throw new InvalidArgumentException("Invalid priority value");
- }
-
- $this->priority = $priority;
- return $this;
- }
-
- /**
- * @param Closure|null $callback
- * @return Task
- */
- public function setCallback(?Closure $callback): self
- {
- $this->callback = $callback;
- return $this;
- }
-
- /**
- * Executes the callback function
- *
- * @param string|JobResults|null $result
- * @return void
- */
- public function runCallback(string|JobResults|null $result): void
- {
- if($this->callback !== null)
- {
- call_user_func($this->callback, $result);
- }
- }
-
- /**
- * @return bool
- */
- public function isClosure(): bool
- {
- return $this->closure;
- }
-
- /**
- * @param bool $closure
- * @return Task
- */
- public function setClosure(bool $closure): self
- {
- $this->closure = $closure;
- return $this;
- }
- }
\ No newline at end of file
diff --git a/src/TamerLib/Objects/WorkerInstance.php b/src/TamerLib/Objects/WorkerInstance.php
deleted file mode 100644
index 52cb424..0000000
--- a/src/TamerLib/Objects/WorkerInstance.php
+++ /dev/null
@@ -1,186 +0,0 @@
-id = uniqid();
- $this->target = $target;
- $this->protocol = $protocol;
- $this->servers = $servers;
- $this->username = $username;
- $this->password = $password;
- $this->process = null;
-
- if($target !== 'closure' && file_exists($target) === false)
- {
- throw new Exception('The target file does not exist');
- }
- }
-
- /**
- * Returns the worker instance id
- *
- * @return string
- */
- public function getId(): string
- {
- return $this->id;
- }
-
- /**
- * Executes the worker instance in a separate process
- *
- * @return void
- * @throws Exception
- */
- public function start(): void
- {
- $target = $this->target;
- if($target == 'closure')
- {
- $target = __DIR__ . DIRECTORY_SEPARATOR . 'closure';
- }
-
- $argv = $_SERVER['argv'];
- array_shift($argv);
-
- $this->process = new Process(array_merge([Functions::findPhpBin(), $target], $argv));
- $this->process->setEnv([
- 'TAMER_ENABLED' => 'true',
- 'TAMER_PROTOCOL' => $this->protocol,
- 'TAMER_SERVERS' => implode(',', $this->servers),
- 'TAMER_USERNAME' => $this->username,
- 'TAMER_PASSWORD' => $this->password,
- 'TAMER_INSTANCE_ID' => $this->id
- ]);
-
-
- Log::debug('net.nosial.tamerlib', sprintf('starting worker %s', $this->id));
-
- // Callback for process output
- $this->process->start(function ($type, $buffer)
- {
- // Add newline if it's missing
- if(substr($buffer, -1) !== PHP_EOL)
- {
- $buffer .= PHP_EOL;
- }
-
- print($buffer);
- });
- }
-
- /**
- * Stops the worker instance
- *
- * @return void
- */
- public function stop(): void
- {
- if($this->process !== null)
- {
- Log::debug('net.nosial.tamerlib', sprintf('Stopping worker %s', $this->id));
- $this->process->stop();
- }
- }
-
- /**
- * Returns whether the worker instance is running
- *
- * @return bool
- */
- public function isRunning(): bool
- {
- if($this->process !== null)
- {
- return $this->process->isRunning();
- }
-
- return false;
- }
-
- /**
- * @return Process|null
- */
- public function getProcess(): ?Process
- {
- return $this->process;
- }
-
- /**
- * Destructor
- */
- public function __destruct()
- {
- $this->stop();
- }
- }
\ No newline at end of file
diff --git a/src/TamerLib/Objects/closure b/src/TamerLib/Objects/closure
deleted file mode 100644
index 309bd82..0000000
--- a/src/TamerLib/Objects/closure
+++ /dev/null
@@ -1,16 +0,0 @@
-getMessage(), $e);
- exit(1);
- }
diff --git a/src/TamerLib/Protocols/Gearman/Client.php b/src/TamerLib/Protocols/Gearman/Client.php
deleted file mode 100644
index 7ae3a0d..0000000
--- a/src/TamerLib/Protocols/Gearman/Client.php
+++ /dev/null
@@ -1,468 +0,0 @@
-client = null;
- $this->tasks = [];
- $this->automatic_reconnect = false;
- $this->next_reconnect = time() + 1800;
- $this->defined_servers = [];
- $this->options = [];
- }
-
-
- /**
- * Adds a server to the list of servers to use
- *
- * @link http://php.net/manual/en/gearmanclient.addserver.php
- * @param string $host (127.0.0.1)
- * @param int $port (default: 4730)
- * @return void
- */
- public function addServer(string $host, int $port): void
- {
- if(!isset($this->defined_servers[$host]))
- {
- $this->defined_servers[$host] = [];
- }
-
- if(in_array($port, $this->defined_servers[$host]))
- {
- return;
- }
-
- $this->defined_servers[$host][] = $port;
- }
-
- /**
- * Adds a list of servers to the list of servers to use
- *
- * @link http://php.net/manual/en/gearmanclient.addservers.php
- * @param array $servers (host:port, host:port, ...)
- * @return void
- */
- public function addServers(array $servers): void
- {
- foreach($servers as $server)
- {
- $server = explode(':', $server);
- $this->addServer($server[0], (int)$server[1]);
- }
- }
-
- /**
- * Connects to the server(s)
- *
- * @return void
- * @throws ConnectionException
- */
- public function connect(): void
- {
- if($this->isConnected())
- return;
-
- $this->client = new GearmanClient();
-
- // Parse $options combination via bitwise OR operator
- $options = array_reduce($this->options, function($carry, $item)
- {
- return $carry | $item;
- });
-
- $this->client->addOptions($options);
-
- foreach($this->defined_servers as $host => $ports)
- {
- foreach($ports as $port)
- {
- try
- {
- $this->client->addServer($host, $port);
- Log::debug('net.nosial.tamerlib', 'connected to gearman server: ' . $host . ':' . $port);
- }
- catch(Exception $e)
- {
- throw new ConnectionException('Failed to connect to Gearman server: ' . $host . ':' . $port, 0, $e);
- }
- }
- }
-
- $this->client->setCompleteCallback([$this, 'callbackHandler']);
- $this->client->setFailCallback([$this, 'callbackHandler']);
- $this->client->setDataCallback([$this, 'callbackHandler']);
- $this->client->setStatusCallback([$this, 'callbackHandler']);
- }
-
- /**
- * Disconnects from the server(s)
- *
- * @return void
- */
- public function disconnect(): void
- {
- if(!$this->isConnected())
- return;
-
- Log::debug('net.nosial.tamerlib', 'disconnecting from gearman server(s)');
- $this->client->clearCallbacks();
- unset($this->client);
- $this->client = null;
- }
-
- /**
- * Reconnects to the server(s)
- *
- * @return void
- * @throws ConnectionException
- */
- public function reconnect(): void
- {
- Log::debug('net.nosial.tamerlib', 'reconnecting to gearman server(s)');
-
- $this->disconnect();
- $this->connect();
- }
-
- /**
- * Returns the current status of the client
- *
- * @inheritDoc
- * @return bool
- */
- public function isConnected(): bool
- {
- if($this->client === null)
- {
- return false;
- }
-
- return true;
- }
-
- /**
- * The automatic reconnect process
- *
- * @return void
- */
- private function preformAutoreconf(): void
- {
- if($this->automatic_reconnect && $this->next_reconnect < time())
- {
- try
- {
- $this->reconnect();
- }
- catch (Exception $e)
- {
- Log::error('net.nosial.tamerlib', 'Failed to reconnect to Gearman server: ' . $e->getMessage());
- }
- finally
- {
- $this->next_reconnect = time() + 1800;
- }
- }
- }
-
- /**
- * Adds client options
- *
- * @link http://php.net/manual/en/gearmanclient.addoptions.php
- * @param int[] $options (GEARMAN_CLIENT_NON_BLOCKING, GEARMAN_CLIENT_UNBUFFERED_RESULT, GEARMAN_CLIENT_FREE_TASKS)
- * @return void
- */
- public function setOptions(array $options): void
- {
- $this->options = $options;
- }
-
- /**
- * Returns the current client options
- *
- * @return array
- */
- public function getOptions(): array
- {
- return $this->options;
- }
-
- /**
- * Clears the current client options
- *
- * @return void
- */
- public function clearOptions(): void
- {
- $this->options = [];
- }
-
- /**
- * Executes a closure in the background
- *
- * @param Closure $closure
- * @return void
- */
- public function doClosure(Closure $closure): void
- {
- $closure_task = new Task('tamer_closure', $closure);
- $closure_task->setClosure(true);
- $this->do($closure_task);
- }
-
- /**
- * Processes a task in the background
- *
- * @param Task $task
- * @return void
- */
- public function do(Task $task): void
- {
- $this->preformAutoreconf();
-
- $this->tasks[] = $task;
- $job = new Job($task);
- $job_data = msgpack_pack($job->toArray());
-
- Log::debug('net.nosial.tamerlib', 'sending closure to gearman server: ' . strlen($job_data) . ' bytes');
- switch($task->getPriority())
- {
- case TaskPriority::High:
- $this->client->doHighBackground($task->getFunctionName(), $job_data);
- break;
-
- case TaskPriority::Low:
- $this->client->doLowBackground($task->getFunctionName(), $job_data);
- break;
-
- default:
- case TaskPriority::Normal:
- $this->client->doBackground($task->getFunctionName(), $job_data);
- break;
- }
- }
-
- /**
- * Adds a task to the list of tasks to run
- *
- * @param Task $task
- * @return void
- */
- public function queue(Task $task): void
- {
- $this->preformAutoreconf();
-
- $this->tasks[] = $task;
- $job = new Job($task);
- $job_data = msgpack_pack($job->toArray());
-
- Log::debug('net.nosial.tamerlib', 'sending closure to gearman server: ' . strlen($job_data) . ' bytes');
- switch($task->getPriority())
- {
- case TaskPriority::High:
- $this->client->addTaskHigh($task->getFunctionName(), $job_data);
- break;
-
- case TaskPriority::Low:
- $this->client->addTaskLow($task->getFunctionName(), $job_data);
- break;
-
- default:
- case TaskPriority::Normal:
- $this->client->addTask($task->getFunctionName(), $job_data);
- break;
- }
- }
-
- /**
- * Adds a closure task to the list of tasks to run
- *
- * @param Closure $closure
- * @param Closure|null $callback
- * @return void
- */
- public function queueClosure(Closure $closure, ?Closure $callback=null): void
- {
- $closure_task = new Task('tamer_closure', $closure, $callback);
- $closure_task->setClosure(true);
- $this->queue($closure_task);
- }
-
- /**
- * @return bool
- */
- public function run(): bool
- {
- if(!$this->isConnected())
- return false;
-
- $this->preformAutoreconf();
-
- if(!$this->client->runTasks())
- return false;
-
- return true;
- }
-
- /**
- * Processes a task callback in the foreground
- *
- * @param GearmanTask $task
- * @return void
- */
- public function callbackHandler(GearmanTask $task): void
- {
- $job_result = JobResults::fromArray(msgpack_unpack($task->data()));
- $internal_task = $this->getTaskById($job_result->getId());
-
- Log::debug('net.nosial.tamerlib', 'callback for task ' . $internal_task->getId() . ' with status ' . $job_result->getStatus() . ' and data size ' . strlen($task->data()) . ' bytes');
-
- try
- {
- if($internal_task->isClosure())
- {
- // If the task is a closure, we need to run the callback with the closure's return value
- // instead of the job result object
- $internal_task->runCallback($job_result->getData());
- return;
- }
-
- $internal_task->runCallback($job_result);
- }
- catch(Exception $e)
- {
- Log::error('net.nosial.tamerlib', 'Failed to run callback for task ' . $internal_task->getId() . ': ' . $e->getMessage(), $e);
- }
- finally
- {
- $this->removeTask($internal_task);
- }
- }
-
- /**
- * @param string $id
- * @return Task|null
- */
- private function getTaskById(string $id): ?Task
- {
- foreach($this->tasks as $task)
- {
- if($task->getId() === $id)
- {
- return $task;
- }
- }
-
- return null;
- }
-
- /**
- * Removes a task from the list of tasks
- *
- * @param Task $task
- * @return void
- */
- private function removeTask(Task $task): void
- {
- $this->tasks = array_filter($this->tasks, function($item) use ($task)
- {
- return $item->getId() !== $task->getId();
- });
-
- }
-
- /**
- * @return bool
- */
- public function automaticReconnectionEnabled(): bool
- {
- return $this->automatic_reconnect;
- }
-
- /**
- * @param bool $enable
- */
- public function enableAutomaticReconnection(bool $enable): void
- {
- $this->automatic_reconnect = $enable;
- }
-
- /**
- * Executes all remaining tasks and closes the connection
- */
- public function __destruct()
- {
- try
- {
- $this->disconnect();
- }
- catch(Exception $e)
- {
- unset($e);
- }
- }
- }
\ No newline at end of file
diff --git a/src/TamerLib/Protocols/Gearman/Worker.php b/src/TamerLib/Protocols/Gearman/Worker.php
deleted file mode 100644
index 9c32810..0000000
--- a/src/TamerLib/Protocols/Gearman/Worker.php
+++ /dev/null
@@ -1,371 +0,0 @@
-worker = null;
- $this->defined_servers = [];
- $this->automatic_reconnect = false;
- $this->next_reconnect = time() + 1800;
- $this->options = [];
- }
-
- /**
- * Adds a server to the list of servers to use
- *
- * @link http://php.net/manual/en/gearmanworker.addserver.php
- * @param string $host (
- * @param int $port (default: 4730)
- * @return void
- */
- public function addServer(string $host, int $port): void
- {
- if(!isset($this->defined_servers[$host]))
- {
- $this->defined_servers[$host] = [];
- }
-
- if(in_array($port, $this->defined_servers[$host]))
- {
- return;
- }
-
- $this->defined_servers[$host][] = $port;
- }
-
- /**
- * Adds a list of servers to the list of servers to use
- *
- * @link http://php.net/manual/en/gearmanworker.addservers.php
- * @param string[] $servers (host:port, host:port, ...)
- * @return void
- */
- public function addServers(array $servers): void
- {
- foreach($servers as $server)
- {
- $server = explode(':', $server);
- $this->addServer($server[0], (int)$server[1]);
- }
- }
-
- /**
- * Connects to the server
- *
- * @return void
- * @throws ConnectionException
- */
- public function connect(): void
- {
- if($this->isConnected())
- return;
-
- $this->worker = new GearmanWorker();
- $this->worker->addOptions(GEARMAN_WORKER_GRAB_UNIQ);
-
- foreach($this->defined_servers as $host => $ports)
- {
- foreach($ports as $port)
- {
- try
- {
- $this->worker->addServer($host, $port);
- Log::debug('net.nosial.tamerlib', 'connected to gearman server: ' . $host . ':' . $port);
- }
- catch(Exception $e)
- {
- throw new ConnectionException('Failed to connect to Gearman server: ' . $host . ':' . $port, 0, $e);
- }
- }
- }
-
- $this->worker->addFunction('tamer_closure', function(GearmanJob $job)
- {
- $received_job = Job::fromArray(msgpack_unpack($job->workload()));
- Log::debug('net.nosial.tamerlib', 'received closure: ' . $received_job->getId());
-
- try
- {
- /** @var SerializableClosure $closure */
- $closure = $received_job->getData();
- $result = $closure($received_job);
- }
- catch(Exception $e)
- {
- $job->sendFail();
- unset($e);
- return;
- }
-
- $job_results = new JobResults($received_job, JobStatus::Success, $result);
- $job->sendComplete(msgpack_pack($job_results->toArray()));
- Log::debug('net.nosial.tamerlib', 'completed closure: ' . $received_job->getId());
- });
- }
-
- /**
- * Disconnects from the server
- *
- * @return void
- */
- public function disconnect(): void
- {
- if(!$this->isConnected())
- return;
-
- $this->worker->unregisterAll();
- unset($this->worker);
- $this->worker = null;
- }
-
- /**
- * Reconnects to the server if the connection has been lost
- *
- * @return void
- * @throws ConnectionException
- */
- public function reconnect(): void
- {
- $this->disconnect();
- $this->connect();
- }
-
- /**
- * Returns true if the worker is connected to the server
- *
- * @return bool
- */
- public function isConnected(): bool
- {
- return $this->worker !== null;
- }
-
- /**
- * The automatic reconnect process
- *
- * @return void
- */
- private function preformAutoreconf(): void
- {
- if($this->automatic_reconnect && $this->next_reconnect < time())
- {
- try
- {
- $this->reconnect();
- }
- catch (Exception $e)
- {
- Log::error('net.nosial.tamerlib', 'Failed to reconnect to Gearman server: ' . $e->getMessage());
- }
- finally
- {
- $this->next_reconnect = time() + 1800;
- }
- }
- }
-
- /**
- * Sets the options to use when connecting to the server
- *
- * @param array $options
- * @return bool
- * @inheritDoc
- */
- public function setOptions(array $options): void
- {
- $this->options = $options;
- }
-
- /**
- * Returns the options to use when connecting to the server
- *
- * @return array
- */
- public function getOptions(): array
- {
- return $this->options;
- }
-
- /**
- * Clears the options to use when connecting to the server
- *
- * @return void
- */
- public function clearOptions(): void
- {
- $this->options = [];
- }
-
- /**
- * @return bool
- */
- public function automaticReconnectionEnabled(): bool
- {
- return $this->automatic_reconnect;
- }
-
- /**
- * @param bool $enable
- * @return void
- */
- public function enableAutomaticReconnection(bool $enable): void
- {
- $this->automatic_reconnect = $enable;
- }
-
- /**
- * Adds a function to the list of functions to call
- *
- * @link http://php.net/manual/en/gearmanworker.addfunction.php
- * @param string $name The name of the function to register with the job server
- * @param callable $callable The callback function to call when the job is received
- * @return void
- */
- public function addFunction(string $name, callable $callable): void
- {
- $this->worker->addFunction($name, function(GearmanJob $job) use ($callable)
- {
- $received_job = Job::fromArray(msgpack_unpack($job->workload()));
- Log::debug('net.nosial.tamerlib', 'received job: ' . $received_job->getId());
-
- try
- {
- $result = $callable($received_job);
- }
- catch(Exception $e)
- {
- $job->sendFail();
- unset($e);
- return;
- }
-
- $job_results = new JobResults($received_job, JobStatus::Success, $result);
- $job->sendComplete(msgpack_pack($job_results->toArray()));
- Log::debug('net.nosial.tamerlib', 'completed job: ' . $received_job->getId());
- });
- }
-
- /**
- * Removes a function from the list of functions to call
- *
- * @param string $function_name The name of the function to unregister
- * @return void
- */
- public function removeFunction(string $function_name): void
- {
- $this->worker->unregister($function_name);
- }
-
- /**
- * Waits for a job and calls the appropriate callback function
- *
- * @link http://php.net/manual/en/gearmanworker.work.php
- * @param bool $blocking (default: true) Whether to block until a job is received
- * @param int $timeout (default: 500) The timeout in milliseconds (if $blocking is false)
- * @param bool $throw_errors (default: false) Whether to throw exceptions on errors
- * @return void Returns nothing
- * @throws ConnectionException
- */
- public function work(bool $blocking=true, int $timeout=500, bool $throw_errors=false): void
- {
- $this->worker->setTimeout($timeout);
-
- while(true)
- {
- @$this->preformAutoreconf();
- @$this->worker->work();
-
- if($this->worker->returnCode() == GEARMAN_COULD_NOT_CONNECT)
- {
- throw new ConnectionException('Could not connect to Gearman server');
- }
-
- if($this->worker->returnCode() == GEARMAN_TIMEOUT && !$blocking)
- {
- break;
- }
-
- if($this->worker->returnCode() != GEARMAN_SUCCESS && $throw_errors)
- {
- Log::error('net.nosial.tamerlib', 'Gearman worker error: ' . $this->worker->error());
- }
-
- if($blocking)
- {
- usleep($timeout);
- }
- }
- }
-
- /**
- * Executes all remaining tasks and closes the connection
- */
- public function __destruct()
- {
- try
- {
- $this->disconnect();
- }
- catch(Exception $e)
- {
- unset($e);
- }
- }
- }
\ No newline at end of file
diff --git a/src/TamerLib/Protocols/RabbitMq/Client.php b/src/TamerLib/Protocols/RabbitMq/Client.php
deleted file mode 100644
index 9fc3580..0000000
--- a/src/TamerLib/Protocols/RabbitMq/Client.php
+++ /dev/null
@@ -1,467 +0,0 @@
-tasks = [];
- $this->automatic_reconnect = false;
- $this->defined_servers = [];
- $this->options = [];
- $this->username = $username;
- $this->password = $password;
- $this->connections = [];
- }
-
- /**
- * Adds a server to the list of servers to use
- *
- * @param string $host
- * @param int $port
- * @return void
- */
- public function addServer(string $host, int $port): void
- {
- if(!isset($this->defined_servers[$host]))
- {
- $this->defined_servers[$host] = [];
- }
-
- if(in_array($port, $this->defined_servers[$host]))
- {
- return;
- }
-
- $this->defined_servers[$host][] = $port;
- }
-
- /**
- * Adds a list of servers to the list of servers to use
- *
- * @param array $servers (host:port, host:port, ...)
- * @return void
- */
- public function addServers(array $servers): void
- {
- foreach($servers as $server)
- {
- $server = explode(':', $server);
- $this->addServer($server[0], (int)$server[1]);
- }
- }
-
- /**
- * Connects to the server(s) defined
- *
- * @return void
- * @throws ConnectionException
- */
- public function connect(): void
- {
- if($this->isConnected())
- return;
-
- if(count($this->defined_servers) === 0)
- return;
-
- foreach($this->defined_servers as $host => $ports)
- {
- foreach($ports as $port)
- {
- $connection = new Connection($host, $port, $this->username, $this->password);
- $connection->connect();
-
- $this->connections[] = $connection;
- }
- }
- }
-
- /**
- * Disconnects from the server
- *
- * @return void
- */
- public function disconnect(): void
- {
- if(!$this->isConnected())
- return;
-
- foreach($this->connections as $connection)
- {
- $connection->disconnect();
- }
-
- $this->connections = [];
- }
-
- /**
- * Reconnects to the server
- *
- * @return void
- * @throws ConnectionException
- */
- public function reconnect(): void
- {
- $this->disconnect();
- $this->connect();
- }
-
- /**
- * Returns True if one or more connections are connected, False otherwise
- * (Note, some connections may be disconnected, and this will still return True)
- *
- * @return bool
- */
- public function isConnected(): bool
- {
- if(count($this->connections) === 0)
- return false;
-
- foreach($this->connections as $connection)
- {
- if($connection->isConnected())
- return true;
- }
-
- return false;
- }
-
- /**
- * Sets the options array
- *
- * @param array $options
- * @return void
- */
- public function setOptions(array $options): void
- {
- $this->options = $options;
- }
-
- /**
- * Returns the options array
- *
- * @return array
- */
- public function getOptions(): array
- {
- return $this->options;
- }
-
- /**
- * Clears the options array
- *
- * @return void
- */
- public function clearOptions(): void
- {
- $this->options = [];
- }
-
- /**
- * Returns True if the client is automatically reconnecting to the server
- *
- * @return bool
- */
- public function automaticReconnectionEnabled(): bool
- {
- return $this->automatic_reconnect;
- }
-
- /**
- * Enables or disables automatic reconnecting to the server
- *
- * @param bool $enable
- * @return void
- */
- public function enableAutomaticReconnection(bool $enable): void
- {
- $this->automatic_reconnect = $enable;
- }
-
- /**
- * Runs a task in the background (Fire and Forget)
- *
- * @param Task $task
- * @return void
- */
- public function do(Task $task): void
- {
- if(!$this->isConnected())
- return;
-
- $job = new Job($task);
- $message = new AMQPMessage(msgpack_pack($job->toArray()), [
- 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
- 'correlation_id' => $task->getId(),
- 'priority' => Functions::calculatePriority($task->getPriority()),
- ]);
-
- // Select random connection
- $connection = $this->connections[array_rand($this->connections)];
- if($this->automatic_reconnect)
- $connection->preformAutoreconf();
- $connection->getChannel()->basic_publish($message, '', 'tamer_queue');
- }
-
- /**
- * Executes a closure in the background
- *
- * @param Closure $closure
- * @return void
- */
- public function doClosure(Closure $closure): void
- {
- $closure_task = new Task('tamer_closure', $closure);
- $closure_task->setClosure(true);
- $this->do($closure_task);
- }
-
- /**
- * Queues a task to be executed
- *
- * @param Task $task
- * @return void
- */
- public function queue(Task $task): void
- {
- $this->tasks[] = $task;
- }
-
- /**
- * Adds a closure task to the list of tasks to run
- *
- * @param Closure $closure
- * @param Closure|null $callback
- * @return void
- */
- public function queueClosure(Closure $closure, ?Closure $callback=null): void
- {
- $closure_task = new Task('tamer_closure', $closure, $callback);
- $closure_task->setClosure(true);
- $this->queue($closure_task);
- }
-
- /**
- * Executes all the tasks that has been added
- *
- * @return bool
- */
- public function run(): bool
- {
- if(count($this->tasks) === 0)
- return false;
-
- if(!$this->isConnected())
- return false;
-
- $this->preformAutoreconf();
- $correlationIds = [];
- $connection = $this->connections[array_rand($this->connections)];
-
- if($this->automatic_reconnect)
- $connection->preformAutoreconf();
-
- /** @var Task $task */
- foreach($this->tasks as $task)
- {
- $correlationIds[] = $task->getId();
- $job = new Job($task);
-
- $message = new AMQPMessage(msgpack_pack($job->toArray()), [
- 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
- 'correlation_id' => $task->getId(),
- 'reply_to' => 'tamer_queue',
- 'priority' => Functions::calculatePriority($task->getPriority()),
- ]);
-
- $connection->getChannel()->basic_publish($message, '', 'tamer_queue');
- }
-
- // Register callback for each task
- $callback = function($msg) use (&$correlationIds, $connection)
- {
- var_dump(Validate::getObjectType(msgpack_unpack($msg->body)));
- if(Validate::getObjectType(msgpack_unpack($msg->body)) !== ObjectType::JobResults)
- {
- $connection->getChannel()->basic_nack($msg->delivery_info['delivery_tag'], false, true);
- return;
- }
-
- $job_result = JobResults::fromArray(msgpack_unpack($msg->body));
- $task = $this->getTaskById($job_result->getId());
-
- if($task == null)
- {
- $connection->getChannel()->basic_nack($msg->delivery_info['delivery_tag'], false, true);
- return;
- }
-
- try
- {
- if($task->isClosure())
- {
- $task->runCallback($job_result->getData());
- }
- else
- {
- $task->runCallback($job_result);
- }
- }
- catch(Exception $e)
- {
- echo $e->getMessage();
- }
-
- // Remove the processed correlation_id
- $index = array_search($msg->get('correlation_id'), $correlationIds);
-
- if ($index !== false)
- {
- unset($correlationIds[$index]);
- $connection->getChannel()->basic_ack($msg->delivery_info['delivery_tag']);
- }
- else
- {
- $connection->getChannel()->basic_nack($msg->delivery_info['delivery_tag'], false, true);
- }
-
- // Stop consuming when all tasks are processed
- if(count($correlationIds) === 0)
- {
- $connection->getChannel()->basic_cancel($msg->delivery_info['consumer_tag']);
- }
- };
-
- $connection->getChannel()->basic_consume('tamer_queue', '', false, false, false, false, $callback);
-
- // Start consuming messages
- while(count($connection->getChannel()->callbacks))
- {
- $connection->getChannel()->wait();
- }
-
- return true;
- }
-
- /**
- * Returns a task by its id
- *
- * @param string $id
- * @return Task|null
- */
- private function getTaskById(string $id): ?Task
- {
- foreach($this->tasks as $task)
- {
- if($task->getId() === $id)
- {
- return $task;
- }
- }
-
- return null;
- }
-
- /**
- * The automatic reconnect process
- *
- * @return void
- */
- private function preformAutoreconf(): void
- {
- if($this->automatic_reconnect)
- {
- foreach($this->connections as $connection)
- {
- $connection->preformAutoreconf();
- }
- }
- }
-
- /**
- * Disconnects from the server when the object is destroyed
- */
- public function __destruct()
- {
- try
- {
- $this->disconnect();
- }
- catch(Exception $e)
- {
- unset($e);
- }
- }
-
- }
\ No newline at end of file
diff --git a/src/TamerLib/Protocols/RabbitMq/Connection.php b/src/TamerLib/Protocols/RabbitMq/Connection.php
deleted file mode 100644
index 0ed425e..0000000
--- a/src/TamerLib/Protocols/RabbitMq/Connection.php
+++ /dev/null
@@ -1,216 +0,0 @@
-id = uniqid();
- $this->host = $host;
- $this->port = $port;
- $this->username = $username;
- $this->password = $password;
- }
-
- /**
- * @return string
- */
- public function getId(): string
- {
- return $this->id;
- }
-
- /**
- * @return AMQPStreamConnection|null
- */
- public function getConnection(): ?AMQPStreamConnection
- {
- return $this->connection;
- }
-
- /**
- * @return AMQPChannel|null
- */
- public function getChannel(): ?AMQPChannel
- {
- return $this->channel;
- }
-
- /**
- * Returns True if the client is connected to the server
- *
- * @return bool
- */
- public function isConnected(): bool
- {
- return $this->connection !== null;
- }
-
- /**
- * Establishes a connection to the server
- *
- * @return void
- * @throws ConnectionException
- */
- public function connect(): void
- {
- if($this->isConnected())
- {
- return;
- }
-
- try
- {
- $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->username, $this->password);
- $this->channel = $this->connection->channel();
- $this->channel->queue_declare('tamer_queue', false, true, false, false);
- $this->next_reconnect = time() + 1800;
- }
- catch(Exception $e)
- {
- throw new ConnectionException(sprintf('Could not connect to RabbitMQ server: %s', $e->getMessage()), $e->getCode(), $e);
- }
- }
-
- /**
- * Closes the connection to the server
- *
- * @return void
- */
- public function disconnect(): void
- {
- if(!$this->isConnected())
- {
- return;
- }
-
- try
- {
- $this->channel?->close();
- }
- catch(Exception $e)
- {
- unset($e);
- }
-
- try
- {
- $this->connection?->close();
- }
- catch(Exception $e)
- {
- unset($e);
- }
-
- $this->channel = null;
- $this->connection = null;
- }
-
- /**
- * Reconnects to the server
- *
- * @return void
- * @throws ConnectionException
- */
- public function reconnect(): void
- {
- $this->disconnect();
- $this->connect();
- }
-
- /**
- * The automatic reconnect process
- *
- * @return void
- */
- public function preformAutoreconf(): void
- {
- if($this->next_reconnect < time())
- {
- try
- {
- $this->reconnect();
- }
- catch (Exception $e)
- {
- Log::error('net.nosial.tamerlib', 'Could not reconnect to RabbitMQ server: %s', $e);
- }
- finally
- {
- $this->next_reconnect = time() + 1800;
- }
- }
- }
-
- }
\ No newline at end of file
diff --git a/src/TamerLib/Protocols/RabbitMq/Worker.php b/src/TamerLib/Protocols/RabbitMq/Worker.php
deleted file mode 100644
index 14e9da1..0000000
--- a/src/TamerLib/Protocols/RabbitMq/Worker.php
+++ /dev/null
@@ -1,399 +0,0 @@
-defined_servers = [];
- $this->connections = [];
- $this->functions = [];
- $this->automatic_reconnect = true;
- $this->username = $username;
- $this->password = $password;
- }
-
- /**
- * Adds a server to the list of servers to use
- *
- * @param string $host
- * @param int $port
- * @return void
- */
- public function addServer(string $host, int $port): void
- {
- if(!isset($this->defined_servers[$host]))
- {
- $this->defined_servers[$host] = [];
- }
-
- if(in_array($port, $this->defined_servers[$host]))
- {
- return;
- }
-
- $this->defined_servers[$host][] = $port;
- }
-
- /**
- * Adds an array of servers to the list of servers to use
- *
- * @param array $servers (eg; [host:port, host:port, ...])
- * @return void
- */
- public function addServers(array $servers): void
- {
- foreach($servers as $server)
- {
- $server = explode(':', $server);
- $this->addServer($server[0], (int)$server[1]);
- }
- }
-
- /**
- * Establishes a connection to the server (or servers)
- *
- * @return void
- * @noinspection DuplicatedCode
- * @throws ConnectionException
- */
- public function connect(): void
- {
- if($this->isConnected())
- return;
-
- if(count($this->defined_servers) === 0)
- return;
-
- foreach($this->defined_servers as $host => $ports)
- {
- foreach($ports as $port)
- {
- $connection = new Connection($host, $port, $this->username, $this->password);
- $connection->connect();
-
- $this->connections[] = $connection;
- }
- }
- }
-
- /**
- * Disconnects from the server
- *
- * @return void
- */
- public function disconnect(): void
- {
- if(!$this->isConnected())
- return;
-
- foreach($this->connections as $connection)
- {
- $connection->disconnect();
- }
-
- $this->connections = [];
- }
-
- /**
- * Reconnects to the server (or servers)
- *
- * @return void
- * @throws ConnectionException
- */
- public function reconnect(): void
- {
- $this->disconnect();
- $this->connect();
- }
-
- /**
- * Returns True if one or more connections are connected, False otherwise
- * (Note, some connections may be disconnected, and this will still return True)
- *
- * @return bool
- */
- public function isConnected(): bool
- {
- if(count($this->connections) === 0)
- return false;
-
- foreach($this->connections as $connection)
- {
- if($connection->isConnected())
- return true;
- }
-
- return false;
- }
-
- /**
- * Sets the options to use for this client
- *
- * @param array $options
- * @return void
- */
- public function setOptions(array $options): void
- {
- $this->options = $options;
- }
-
- /**
- * Returns the current options for this client
- *
- * @return array
- */
- public function getOptions(): array
- {
- return $this->options;
- }
-
- /**
- * Clears the current options for this client
- *
- * @return void
- */
- public function clearOptions(): void
- {
- $this->options = [];
- }
-
- /**
- * Returns True if automatic reconnection is enabled, False otherwise
- *
- * @return bool
- */
- public function automaticReconnectionEnabled(): bool
- {
- return $this->automatic_reconnect;
- }
-
- /**
- * Enables or disables automatic reconnection
- *
- * @param bool $enable
- * @return void
- */
- public function enableAutomaticReconnection(bool $enable): void
- {
- $this->automatic_reconnect = $enable;
- }
-
- /**
- * Registers a new function to the worker to handle
- *
- * @param string $name
- * @param callable $callable
- * @param mixed|null $context
- * @return void
- */
- public function addFunction(string $name, callable $callable, mixed $context = null): void
- {
- $this->functions[$name] = [
- 'function' => $callable,
- 'context' => $context
- ];
- }
-
- /**
- * Removes an existing function from the worker
- *
- * @param string $function_name
- * @return void
- */
- public function removeFunction(string $function_name): void
- {
- unset($this->functions[$function_name]);
- }
-
- /**
- * Processes a job if there's one available
- *
- * @param bool $blocking
- * @param int $timeout
- * @param bool $throw_errors
- * @return void
- */
- public function work(bool $blocking = true, int $timeout = 500, bool $throw_errors = false): void
- {
- if(!$this->isConnected())
- return;
-
- // Select a random connection
- $connection = $this->connections[array_rand($this->connections)];
-
- $callback = function($message) use ($throw_errors, $connection)
- {
- var_dump(Validate::getObjectType(msgpack_unpack($message->body)));
- if(Validate::getObjectType(msgpack_unpack($message->body)) !== ObjectType::Job)
- {
- $connection->getChannel()->basic_nack($message->delivery_info['delivery_tag']);
- return;
- }
-
- $received_job = Job::fromArray(msgpack_unpack($message->body));
-
- if($received_job->isClosure())
- {
- Log::debug('net.nosial.tamerlib', 'received closure: ' . $received_job->getId());
-
- try
- {
- // TODO: Check back on this, looks weird.
- $closure = $received_job->getData();
- $result = $closure($received_job);
- }
- catch(Exception $e)
- {
- unset($e);
-
- // Do not requeue the job, it's a closure
- $connection->getChannel()->basic_nack($message->delivery_info['delivery_tag']);
- return;
- }
-
- $job_results = new JobResults($received_job, JobStatus::Success, $result);
- $connection->getChannel->basic_publish(
- new AMQPMessage(msgpack_pack($job_results->toArray()), ['correlation_id' => $received_job->getId()])
- );
- $connection->getChannel()->basic_ack($message->delivery_info['delivery_tag']);
- return;
- }
-
- if(!isset($this->functions[$received_job->getName()]))
- {
- Log::debug('net.nosial.tamerlib', 'received unknown function: ' . $received_job->getId());
- $connection->getChannel()->basic_nack($message->delivery_info['delivery_tag'], false, true);
- return;
- }
-
- Log::debug('net.nosial.tamerlib', 'received function: ' . $received_job->getId());
- $function = $this->functions[$received_job->getName()];
- $callback = $function['function'];
-
- try
- {
- $result = $callback($received_job->getData(), $function['context']);
- }
- catch(Exception $e)
- {
- unset($e);
-
- // Do not requeue the job, it's a closure
- $connection->getChannel()->basic_nack($message->delivery_info['delivery_tag']);
- return;
- }
-
- $job_results = new JobResults($received_job, JobStatus::Success, $result);
- $connection->getChannel->basic_publish(
- new AMQPMessage(msgpack_pack($job_results->toArray()), ['correlation_id' => $received_job->getId()])
- );
- $connection->getChannel()->basic_ack($message->delivery_info['delivery_tag']);
- };
-
- $connection->getChannel()->basic_consume('tamer_queue', '', false, false, false, false, $callback);
-
- if ($blocking)
- {
- while(true)
- {
- $connection->getChannel()->wait();
- }
- }
- else
- {
- $start = microtime(true);
- while (true)
- {
- if (microtime(true) - $start >= $timeout / 1000)
- {
- break;
- }
-
- $connection->getChannel()->wait();
- }
- }
- }
-
- /**
- * Disconnects from the server when the object is destroyed
- */
- public function __destruct()
- {
- try
- {
- $this->disconnect();
- }
- catch(Exception $e)
- {
- unset($e);
- // Ignore
- }
- }
-
- }
\ No newline at end of file
diff --git a/src/TamerLib/Tamer.php b/src/TamerLib/Tamer.php
index dbfd26b..11d82f3 100644
--- a/src/TamerLib/Tamer.php
+++ b/src/TamerLib/Tamer.php
@@ -4,444 +4,8 @@
namespace TamerLib;
- use Closure;
- use Exception;
- use InvalidArgumentException;
- use TamerLib\Abstracts\Mode;
- use TamerLib\Classes\Functions;
- use TamerLib\Classes\Supervisor;
- use TamerLib\Classes\Validate;
- use TamerLib\Exceptions\ConnectionException;
- use TamerLib\Exceptions\UnsupervisedWorkerException;
- use TamerLib\Interfaces\ClientProtocolInterface;
- use TamerLib\Interfaces\WorkerProtocolInterface;
- use TamerLib\Objects\Task;
class Tamer
{
- /**
- * The protocol to use when connecting to the server
- *
- * @var string
- */
- private static $protocol;
- /**
- * The protocol to use when connecting to the server as a client
- *
- * @var ClientProtocolInterface|null
- */
- private static $client;
-
- /**
- * The protocol to use when connecting to the server as a worker
- *
- * @var WorkerProtocolInterface|null
- */
- private static $worker;
-
- /**
- * Indicates if Tamer is running as a client or worker
- *
- * @var string
- * @see Mode
- */
- private static $mode;
-
- /**
- * Indicates if Tamer is connected to the server
- *
- * @var bool
- */
- private static $connected;
-
- /**
- * The supervisor that is supervising the workers
- *
- * @var Supervisor
- */
- private static $supervisor;
-
- /**
- * Initializes Tamer as a client and connects to the server
- *
- * @param string $protocol
- * @param array $servers
- * @param string|null $username
- * @param string|null $password
- * @return void
- * @throws ConnectionException
- */
- public static function init(string $protocol, array $servers, ?string $username=null, ?string $password=null): void
- {
- if(self::$connected)
- {
- throw new ConnectionException('Tamer is already connected to the server');
- }
-
- if (!Validate::protocolType($protocol))
- {
- throw new InvalidArgumentException(sprintf('Invalid protocol type: %s', $protocol));
- }
-
- self::$protocol = $protocol;
- self::$mode = Mode::Client;
- self::$client = Functions::createClient($protocol, $username, $password);
- self::$client->addServers($servers);
- self::$client->connect();
- self::$supervisor = new Supervisor($protocol, $servers, $username, $password);
- self::$connected = true;
- }
-
- /**
- * Initializes Tamer as a worker client and connects to the server
- *
- * @return void
- * @throws ConnectionException
- * @throws UnsupervisedWorkerException
- */
- public static function initWorker(): void
- {
- if(self::$connected)
- {
- throw new ConnectionException('Tamer is already connected to the server');
- }
-
- if(!Functions::getWorkerVariables()['TAMER_ENABLED'])
- {
- throw new UnsupervisedWorkerException('Tamer is not enabled for this worker');
- }
-
- self::$protocol = Functions::getWorkerVariables()['TAMER_PROTOCOL'];
- self::$mode = Mode::Worker;
- self::$worker = Functions::createWorker(self::$protocol);
- self::$worker->addServers(Functions::getWorkerVariables()['TAMER_SERVERS']);
- self::$worker->connect();
- self::$connected = true;
- }
-
-
- /**
- * Disconnects from the server
- *
- * @return void
- * @throws ConnectionException
- */
- public static function disconnect(): void
- {
- if (!self::$connected)
- {
- throw new ConnectionException('Tamer is not connected to the server');
- }
-
- if (self::$mode === Mode::Client)
- {
- self::$client->disconnect();
- }
- else
- {
- self::$worker->disconnect();
- }
-
- self::$connected = false;
- }
-
- /**
- * Reconnects to the server
- *
- * @return void
- * @throws ConnectionException
- */
- public static function reconnect(): void
- {
- if (self::$mode === Mode::Client)
- {
- self::$client->reconnect();
- }
- else
- {
- self::$worker->reconnect();
- }
- }
-
- /**
- * Adds a task to the queue to be executed by the worker
- *
- * @param Task $task
- * @return void
- */
- public static function do(Task $task): void
- {
- if (self::$mode === Mode::Client)
- {
- self::$client->do($task);
- }
- else
- {
- throw new InvalidArgumentException('Tamer is not running in client mode');
- }
- }
-
- /**
- * Executes a closure operation in the background (does not return a result)
- *
- * @param Closure $closure The closure operation to perform (remote)
- * @return void
- */
- public static function doClosure(Closure $closure): void
- {
- if (self::$mode === Mode::Client)
- {
- self::$client->doClosure($closure);
- }
- else
- {
- throw new InvalidArgumentException('Tamer is not running in client mode');
- }
- }
-
- /**
- * Queues a task to be processed in parallel (returns a result handled by a callback)
- *
- * @param Task $task
- * @return void
- */
- public static function queue(Task $task): void
- {
- if (self::$mode === Mode::Client)
- {
- self::$client->queue($task);
- }
- else
- {
- throw new InvalidArgumentException('Tamer is not running in client mode');
- }
- }
-
- /**
- * Queues a closure to be processed in parallel (returns a result handled by a callback)
- *
- * @param Closure $closure The closure operation to perform (remote)
- * @param Closure|null $callback The closure to call when the operation is complete (local)
- * @return void
- */
- public static function queueClosure(Closure $closure, ?Closure $callback=null): void
- {
- if (self::$mode === Mode::Client)
- {
- self::$client->queueClosure($closure, $callback);
- }
- else
- {
- throw new InvalidArgumentException('Tamer is not running in client mode');
- }
- }
-
- /**
- * Executes all tasks in the queue and waits for them to complete
- *
- * @return bool
- */
- public static function run(): bool
- {
- if (self::$mode === Mode::Client)
- {
- return self::$client->run();
- }
- else
- {
- throw new InvalidArgumentException('Tamer is not running in client mode');
- }
- }
-
- /**
- * Registers a function to the worker
- *
- * @param string $name The name of the function to add
- * @param callable $callable The function to add
- * @return void
- */
- public static function addFunction(string $name, callable $callable): void
- {
- if (self::$mode === Mode::Worker)
- {
- self::$worker->addFunction($name, $callable);
- }
- else
- {
- throw new InvalidArgumentException('Tamer is not running in worker mode');
- }
- }
-
- /**
- * Removes a function from the worker
- *
- * @param string $function_name The name of the function to remove
- * @return void
- */
- public static function removeFunction(string $function_name): void
- {
- if (self::$mode === Mode::Worker)
- {
- self::$worker->removeFunction($function_name);
- }
- else
- {
- throw new InvalidArgumentException('Tamer is not running in worker mode');
- }
- }
-
- /**
- * Works a job from the queue (blocking or non-blocking)
- *
- * @param bool $blocking (optional) Whether to block until a job is available
- * @param int $timeout (optional) The timeout to use when blocking
- * @param bool $throw_errors (optional) Whether to throw errors or not
- * @return void
- */
- public static function work(bool $blocking=true, int $timeout=500, bool $throw_errors=false): void
- {
- if (self::$mode === Mode::Worker)
- {
- self::$worker->work($blocking, $timeout, $throw_errors);
- }
- else
- {
- throw new InvalidArgumentException('Tamer is not running in worker mode');
- }
- }
-
- /**
- * Monitors the workers and restarts them if they die unexpectedly (monitor mode only)
- *
- * @param bool $blocking
- * @param bool $auto_restart
- * @return void
- * @throws Exception
- */
- public static function monitor(bool $blocking=false, bool $auto_restart=true): void
- {
- if (self::$mode === Mode::Client)
- {
- self::$supervisor->monitor($blocking, $auto_restart);
- }
- else
- {
- throw new InvalidArgumentException('Tamer is not running in client mode');
- }
- }
-
- /**
- * Adds a worker to the supervisor
- *
- * @param string $target
- * @param int $instances
- * @return void
- * @throws Exception
- */
- public static function addWorker(string $target, int $instances): void
- {
- if (self::$mode === Mode::Client)
- {
- self::$supervisor->addWorker($target, $instances);
- }
- else
- {
- throw new InvalidArgumentException('Tamer is not running in client mode');
- }
- }
-
- /**
- * Starts all workers
- *
- * @return void
- * @throws Exception
- */
- public static function startWorkers(): void
- {
- if (self::$mode === Mode::Client)
- {
- self::$supervisor->start();
- }
- else
- {
- throw new InvalidArgumentException('Tamer is not running in client mode');
- }
- }
-
- /**
- * Stops all workers
- *
- * @return void
- * @throws Exception
- */
- public static function stopWorkers(): void
- {
- if (self::$mode === Mode::Client)
- {
- self::$supervisor->stop();
- }
- else
- {
- throw new InvalidArgumentException('Tamer is not running in client mode');
- }
- }
-
- /**
- * Restarts all workers
- *
- * @return void
- * @throws Exception
- */
- public static function restartWorkers(): void
- {
- if (self::$mode === Mode::Client)
- {
- self::$supervisor->restart();
- }
- else
- {
- throw new InvalidArgumentException('Tamer is not running in client mode');
- }
- }
-
- /**
- * @return string
- */
- public static function getProtocol(): string
- {
- return self::$protocol;
- }
-
- /**
- * @return ClientProtocolInterface|null
- */
- public static function getClient(): ?ClientProtocolInterface
- {
- return self::$client;
- }
-
- /**
- * @return WorkerProtocolInterface|null
- */
- public static function getWorker(): ?WorkerProtocolInterface
- {
- return self::$worker;
- }
-
- /**
- * @return string
- */
- public static function getMode(): string
- {
- return self::$mode;
- }
-
- /**
- * @return bool
- */
- public static function isConnected(): bool
- {
- return self::$connected;
- }
}
\ No newline at end of file
diff --git a/tests/no_tamer.php b/tests/no_tamer.php
deleted file mode 100644
index 5f120fc..0000000
--- a/tests/no_tamer.php
+++ /dev/null
@@ -1,30 +0,0 @@
-start();
+
+ $redis_client = new \Redis();
+ $redis_client->connect('127.0.0.1', $redis_server->getPort());
+
+ $redis_client->set('foo', 'bar');
+ $value = $redis_client->get('foo');
+
+ echo $value . PHP_EOL;
+
+ $redis_client->close();
+ $redis_server->stop();
\ No newline at end of file
diff --git a/tests/tamer.php b/tests/tamer.php
deleted file mode 100644
index 7388283..0000000
--- a/tests/tamer.php
+++ /dev/null
@@ -1,71 +0,0 @@
-getData()} seconds \n";
- }));
- }
-
- echo "Waiting for jobs to finish \n";
- $a = microtime(true);
- Tamer::run();
- $b = microtime(true);
- echo "Took " . ($b - $a) . " seconds \n";
\ No newline at end of file
diff --git a/tests/tamer_worker.php b/tests/tamer_worker.php
deleted file mode 100644
index ddb5883..0000000
--- a/tests/tamer_worker.php
+++ /dev/null
@@ -1,18 +0,0 @@
-getData());
- return $job->getData();
- });
-
- Tamer::work();
\ No newline at end of file