getPayload()])) { Log::warning(Utilities::getName(), sprintf('Job %s requested function \'%s\' which does not exist, rejecting job.', $job_packet->getId(), $job_packet->getPayload())); self::$job_manager->rejectJob($job_packet); return; } $result = call_user_func_array(self::$function_pointers[$job_packet->getPayload()], unserialize($job_packet->getParameters(), ['allowed_classes'=>true])); self::$job_manager->returnJob($job_packet, $result); } catch(Exception $e) { self::$job_manager->returnException($job_packet, $e); } } /** * GLOBAL FUNCTIONS */ /** * Initializes Tamer in the specified mode * * Note that Tamer can only be initialized once per process, additionally not all functions are available in * all modes. Please review the documentation for CLIENT & WORKER mode usage as both modes operate differently. * * CLIENT MODE: * Client Mode supervises workers and optionally initializes a server instance if the $server_config parameter * is left null, in which case a server will be initialized with default parameters. If a server is already * initialized then CLIENT Mode can work in swarm mode where multiple clients can be initialized, and they will * all share the same server instance. This is done by passing the same $server_config object to each client. * * WORKER MODE: * Worker Mode is responsible for listening for jobs and executing them. Worker Mode can only be initialized * if the parent process is a client process, otherwise an exception will be thrown because the worker will * have no server to connect to if it is not initialized by a client. * * @param string $mode The mode to initialize Tamer in, must be one of the TamerMode constants * @param ServerConfiguration|null $server_config Optional. The server configuration to use, if null is provided * then a server will be initialized with default parameters. * @throws ServerException If the server fails to initialize * @return void */ public static function initialize(string $mode, ?ServerConfiguration $server_config=null): void { if(self::$mode !== null) { throw new RuntimeException('TamerLib has already been initialized.'); } if(!in_array(strtolower($mode), TamerMode::ALL, true)) { throw new InvalidArgumentException(sprintf('Invalid mode "%s" provided, must be one of "%s".', $mode, implode('", "', TamerMode::ALL))); } self::$mode = $mode; self::$server_configuration = $server_config; if($server_config === null && $mode === TamerMode::CLIENT) { try { // Initialize the server if no configuration was provided, and we are in client mode self::$server_configuration = new ServerConfiguration(); self::$server = new RedisServer(self::$server_configuration); self::$server->start(); // Register shutdown function to stop the server when the process exits register_shutdown_function(static function() { self::$server?->stop(); }); } catch(Exception $e) { throw new ServerException('Failed to initialize the server.', $e); } } if($mode === TamerMode::WORKER) { try { self::$worker_configuration = WorkerConfiguration::fromEnvironment(); self::$server_configuration = new ServerConfiguration(self::$worker_configuration->getHost(), self::$worker_configuration->getPort(), self::$worker_configuration->getPassword()); } catch(Exception $e) { throw new RuntimeException('Failed to initialize worker configuration. (is the process running as a worker?)', 0, $e); } } if($mode === TamerMode::CLIENT) { self::$supervisor = new WorkerSupervisor(self::$server_configuration); try { self::$return_channel = sprintf('rch%s', random_int(100000000, 999999999)); } catch(Exception $e) { throw new RuntimeException(sprintf('Bad environment, failed to generate random integer. (%s)', $e->getMessage()), $e); } } self::$job_manager = new JobManager(self::$server_configuration); } /** * Shuts down all workers * * @return void */ public static function shutdown(): void { // Do nothing if Tamer is not initialized if(self::$mode === null) { return; } // Close all subprocesses self::$supervisor?->stopAll(); self::$server?->stop(); self::$job_manager?->disconnect(); // Clear all static variables self::$mode = null; self::$server_configuration = null; self::$server = null; self::$watching_jobs = []; self::$supervisor = null; self::$job_manager = null; self::$worker_configuration = null; self::$function_pointers = []; self::$return_channel = null; } /** * Returns the current mode Tamer is running in * * @return string The current mode Tamer is running in */ public static function getMode(): string { return self::$mode ?? TamerMode::NONE; } /** * Monitors all internal processes * * @param int $timeout The timeout in seconds to monitor for, if -1 is provided then the monitor will run * for one iteration and then return. If 0 is provided then the monitor will run forever. * @return void */ public static function monitor(int $timeout=0): void { if($timeout > 0 || $timeout === -1) { try { self::$supervisor?->monitor($timeout); self::$server?->monitor($timeout); } catch(Exception $e) { Log::error(Utilities::getName(), $e->getMessage(), $e); } } else { $start_time = time(); while(true) { try { self::$supervisor?->monitor(-1); self::$server?->monitor(-1); } catch(Exception $e) { Log::error(Utilities::getName(), $e->getMessage(), $e); } if(time() - $start_time >= $timeout) { break; } } } } /** * CLIENT FUNCTIONS */ /** * Spawns a worker process by their count, if the path is null then a generic sub process will be spawned * that will only be capable of executing closures. * * @param int $count The number of workers to spawn (defaults to 8) * @param string|null $path The path to the worker file to spawn, if null is provided then a generic worker * will be spawned that can only execute closures (unimplemented) * @param int $channel The channel to spawn the workers on (defaults to 0) * @throws WorkerFailedException If the worker fails to spawn * @return void */ public static function createWorker(int $count=8, ?string $path=null, int $channel=0): void { if(self::$mode !== TamerMode::CLIENT) { throw new RuntimeException(sprintf('Attempting to spawn a worker in \'%s\' mode, only clients can spawn workers.', self::$mode)); } if($path === null) { self::$supervisor->spawnClosure($count, $channel); } else { self::$supervisor->spawnWorker($path, $count, $channel); } self::monitor(-1); } /** * Preforms a function call against a worker in the background, returns the Job ID to keep track of the job status. * * @param string $function The function to call * @param array $arguments The arguments to pass to the function * @param int $channel The channel to preform the function call on (defaults to 0) * @return int The Job ID of the function call */ public static function do(string $function, array $arguments=[], int $channel=0): int { if(self::$mode !== TamerMode::CLIENT) { throw new RuntimeException(sprintf('Attempting to do() in \'%s\' mode, only clients can preform do().', self::$mode)); } $job_packet = new JobPacket(); $job_packet->setParameters(serialize($arguments)); $job_packet->setPayload($function); $job_packet->setChannel($channel); $job_packet->setReturnChannel(self::$return_channel); try { self::$job_manager->pushJob($job_packet); } catch(ConnectionException $e) { throw new RuntimeException('do() failed, failed to connect to the server', 0, $e); } catch(JobManagerException $e) { throw new RuntimeException('do() failed, failed to push the job to the server', 0, $e); } self::addToWatchlist($job_packet->getId()); return $job_packet->getId(); } /** * Sends a function call to a worker in the background, but once the job is completed it will be forgotten and * the result will not be returned, this also means that the job will not be added to the watchlist. * * @param string $function The function to call * @param array $arguments The arguments to pass to the function * @param int $channel The channel to preform the function call on (defaults to 0) * @return void */ public static function dof(string $function, array $arguments=[], int $channel=0): void { if(self::$mode !== TamerMode::CLIENT) { throw new RuntimeException(sprintf('Attempting to dof() in \'%s\' mode, only clients can preform dof().', self::$mode)); } $job_packet = new JobPacket(); $job_packet->setParameters(serialize($arguments)); $job_packet->setPayload($function); $job_packet->setChannel($channel); try { self::$job_manager->pushJob($job_packet); } catcH(Exception $e) { throw new RuntimeException('dof() failed, failed to push job to the server', 0, $e); } } /** * Waits for all the dispatched jobs to complete, this is a blocking function and will not return until all the * jobs have completed. If a timeout is specified, the function will return after the timeout has been reached. * * @param callable|null $callback A callback function that will be called after each iteration of the wait loop * @param int $timeout The timeout in seconds, if 0 is provided then the function will block until all the jobs * have completed, if -1 is provided then the function run for one iteration and return * @return void * @throws ConnectionException If the client fails to connect to the server * @throws JobManagerException If the JobManager throws an exception * @throws TamerException If the Tamer throws an exception * @throws Throwable If a job fails * @throws TimeoutException If the timeout is reached */ public static function wait(?callable $callback=null, int $timeout=0): void { if(self::$mode !== TamerMode::CLIENT) { throw new RuntimeException(sprintf('Attempting to wait() in \'%s\' mode, only clients can preform wait().', self::$mode)); } $time_start = time(); while(true) { self::monitor(-1); if(count(self::$watching_jobs) === 0) { return; } Log::debug(Utilities::getName(), 'Waiting for jobs to complete'); $job_packet = self::$job_manager->listenReturnChannel(self::$return_channel); if(!in_array($job_packet->getId(), self::$watching_jobs)) { Log::debug(Utilities::getName(), sprintf('Job \'%s\' has returned, but is not in the watchlist', $job_packet->getId())); self::$job_manager->dropJob($job_packet->getId()); continue; } self::removeFromWatchlist($job_packet->getId()); self::$job_manager->dropJob($job_packet->getId()); if($callback !== null && $job_packet->getStatus() === JobStatus::FINISHED) { $return_value = $job_packet->getReturnValue(); if($return_value !== null) { $return_value = unserialize($return_value, ['allowed_classes' => true]); } $callback($job_packet->getId(), $return_value); } elseif($job_packet->getStatus() === JobStatus::FAILED) { try { $e = unserialize($job_packet->getException(), ['allowed_classes' => true]); } catch(Exception $e) { Log::error(Utilities::getName(), 'Failed to unserialize exception, exception was dropped', $e); } finally { if(isset($e) && $e instanceof Throwable) { throw $e; } /** @noinspection ThrowRawExceptionInspection */ throw new Exception(sprintf('wait() failed, job \'%s\' failed with an unknown exception', $job_packet->getId())); } } else { throw new TamerException(sprintf('wait() failed, job \'%s\' returned with an unknown status \'%s\'', $job_packet->getId(), $job_packet->getStatus())); } if ($timeout < 0) { throw new TimeoutException('wait() timed out'); } if($timeout > 0 && (time() - $time_start) >= $timeout) { throw new TimeoutException('wait() timed out'); } usleep(1000); } } /** * Waits for a job to complete, returns the result of the job. * * @param int $job_id The ID of the job to wait for * @param int $timeout The timeout in seconds, if 0 is provided then the function will block until all the jobs * have completed, if -1 is provided then the function run for one iteration and return * @throws ConnectionException If the client fails to connect to the server * @throws JobManagerException If the JobManager throws an exception * @throws JobNotFoundException If the job is not found * @throws TimeoutException If the timeout is reached * @throws Throwable If the job fails * @throws Exception If the job fails * @return mixed The return value of the job */ public static function waitFor(int $job_id, int $timeout=0): mixed { if(self::$mode !== TamerMode::CLIENT) { throw new RuntimeException(sprintf('Attempting to waitFor() in \'%s\' mode, only clients can preform waitFor().', self::$mode)); } $time_start = time(); while(true) { self::monitor(-1); switch(self::$job_manager->getJobStatus($job_id)) { case JobStatus::FINISHED: $return = self::$job_manager->getJobResult($job_id); self::$job_manager->dropJob($job_id); return $return; case JobStatus::FAILED: $throwable = self::$job_manager->getJobException($job_id); self::$job_manager->dropJob($job_id); throw $throwable; } if($timeout < 0) { throw new TimeoutException('waitFor() timed out'); } if($timeout > 0 && (time() - $time_start) >= $timeout) { throw new TimeoutException(sprintf('waitFor() timed out after %d seconds', $timeout)); } usleep(10); } } /** * Preforms a do() call on a waitFor() call all in one function. * * @param string $function The function to call * @param array $arguments The arguments to pass to the function * @param int $channel The channel to use * @param int $timeout The timeout in seconds, if 0 is provided then the function will block until all the jobs * @throws ConnectionException If the client fails to connect to the server * @throws JobManagerException If the JobManager throws an exception * @throws JobNotFoundException If the job is not found * @throws TimeoutException If the timeout is reached * @throws Throwable If the job fails * @return mixed The return value of the job */ public static function doWait(string $function, array $arguments=[], int $channel=0, int $timeout=0): mixed { return self::waitFor(self::do($function, $arguments, $channel), $timeout); } /** * Clears the watchlist, this will remove all jobs from the watchlist. * * @return void */ public static function clear(): void { if(self::$mode !== TamerMode::CLIENT) { throw new RuntimeException(sprintf('Attempting to clear() in \'%s\' mode, only clients can preform clear().', self::$mode)); } self::$watching_jobs = []; } /** * Invokes the do() function, returns the Job ID. * * @param string $name * @param array $arguments * @return int */ public static function __callStatic(string $name, array $arguments=[]) { return self::do($name, $arguments); } /** * WORKER FUNCTIONS */ /** * Registers a new function to be called by the worker, this function will be called when the worker receives * a job with the same name as the function. Internally this function uses call_user_func_array() to call the * function. * * @param string $function The name of the function to register * @param callable $callback The callback to call when the function is called * @return void */ public static function addFunction(string $function, callable $callback): void { if(self::$mode !== TamerMode::WORKER) { throw new RuntimeException(sprintf('Attempting to addFunction() in \'%s\' mode, only workers can preform addFunction().', self::$mode)); } if (!preg_match('/^[a-zA-Z_\x80-\xff][a-zA-Z0-9_\x80-\xff]*$/', $function)) { throw new InvalidArgumentException("Invalid function name: $function"); } if(method_exists(__CLASS__, $function)) { throw new InvalidArgumentException(sprintf('Attempting to addFunction() with a function name of \'%s\', this is a reserved function name.', $function)); } self::$function_pointers[$function] = $callback; } /** * Removes a function from the worker, this function will no longer be called when a job with the same name * is received. * * @param string $function The name of the function to remove * @return void */ public static function removeFunction(string $function): void { if(self::$mode !== TamerMode::WORKER) { throw new RuntimeException(sprintf('Attempting to removeFunction() in \'%s\' mode, only workers can preform removeFunction().', self::$mode)); } if(!isset(self::$function_pointers[$function])) { throw new InvalidArgumentException(sprintf('Attempting to removeFunction() with a function name of \'%s\', this function does not exist.', $function)); } unset(self::$function_pointers[$function]); } /** * Returns an array of all the registered functions. * * @return array An array of all the registered functions */ public static function getFunctions(): array { if(self::$mode !== TamerMode::WORKER) { throw new RuntimeException(sprintf('Attempting to getFunctions() in \'%s\' mode, only workers can preform getFunctions().', self::$mode)); } return array_keys(self::$function_pointers); } /** * Runs the worker, this function will block and listen for incoming jobs, if a job is received then the * job will be processed and optionally returned back to the client via a return channel. * * @param int|array $channel The channel to listen on, if an array is provided then the worker will listen on * @param int $timeout The timeout in seconds, if 0 is provided then the function will block until a job is received * @param bool $ignore_errors If set to true then the worker will not throw exceptions, instead it will log the * error and continue to run. * @throws ConnectionException If the client fails to connect to the server * @return void */ public static function run(int|array $channel=0, int $timeout=0, bool $ignore_errors=false): void { if(self::$mode !== TamerMode::WORKER) { throw new RuntimeException(sprintf('Attempting to run() in \'%s\' mode, only workers can preform run().', self::$mode)); } $start_time = time(); $error_time = null; while(true) { try { $job_packet = self::$job_manager->listenForJob(self::$worker_configuration->getWorkerId(), $channel, $timeout); self::executeJob($job_packet); } catch(TimeoutException $e) { unset($e); return; } /** @noinspection PhpRedundantCatchClauseInspection */ catch(RedisException $e) { if($ignore_errors === false && strtolower($e->getMessage()) === 'redis server went away') { if($error_time === null) { $error_time = time(); } else if((time() - $error_time) >= 5) { throw new ConnectionException('Redis server went away, and did not come back.'); } } } catch(Exception $e) { Log::error(Utilities::getName(), sprintf('Worker %s encountered an error while listening for jobs: %s', self::$worker_configuration->getWorkerId(), $e->getMessage()), $e); unset($e); } if($timeout === -1) { return; } if($timeout > 0 && (time() - $start_time) >= $timeout) { return; } } } }