diff --git a/src/TgBotLib/Bot.php b/src/TgBotLib/Bot.php index 0b48e37..1b21a77 100644 --- a/src/TgBotLib/Bot.php +++ b/src/TgBotLib/Bot.php @@ -360,6 +360,8 @@ */ public function handleUpdate(Update $update): void { + Logger::getLogger()->debug(sprintf('Handling update %s', $update->getUpdateId())); + $command = $update?->getAnyMessage()?->getCommand(); if($command !== null) { @@ -368,6 +370,7 @@ /** @var CommandEvent $eventHandler */ foreach($this->getEventHandlersByCommand($command) as $eventHandler) { + Logger::getLogger()->debug(sprintf('Executing command %s for update %s', $command, $update->getUpdateId())); (new $eventHandler($update))->handle($this); $commandExecuted = true; } @@ -386,6 +389,7 @@ { foreach ($this->getEventHandlersByType(EventType::UPDATE_EVENT) as $eventHandler) { + Logger::getLogger()->debug(sprintf('Executing generic update event handler for update %s', $update->getUpdateId())); /** @var UpdateEvent $eventHandler */ (new $eventHandler($update))->handle($this); } @@ -398,6 +402,7 @@ /** @var UpdateEvent $eventHandler */ foreach($eventHandlers as $eventHandler) { + Logger::getLogger()->debug(sprintf('Executing event handler for type %s for update %s', $eventHandler::getEventType()->value, $update->getUpdateId())); (new $eventHandler($update))->handle($this); } } @@ -462,6 +467,7 @@ } // Support named and positional arguments + Logger::getLogger()->debug(sprintf('Calling method %s with arguments %s', $name, json_encode($arguments))); $parameters = $this->parseArguments($name, $arguments); return $this->sendRequest($name, $parameters); } diff --git a/src/TgBotLib/PollingBot.php b/src/TgBotLib/PollingBot.php index 1012057..1dfd5a3 100644 --- a/src/TgBotLib/PollingBot.php +++ b/src/TgBotLib/PollingBot.php @@ -3,6 +3,7 @@ namespace TgBotLib; use RuntimeException; + use function RuntimeException; /** * PollingBot class that extends Bot for handling updates using polling. @@ -14,6 +15,7 @@ private int $timeout; private array $allowedUpdates; private bool $fork; + private array $childPids; /** * Constructor for the class, initializing with a Bot instance. @@ -29,6 +31,13 @@ $this->timeout = 0; $this->allowedUpdates = []; $this->fork = false; + $this->childPids = []; + + // Register signal handler for child processes + if (function_exists('pcntl_signal')) + { + pcntl_signal(SIGCHLD, [$this, 'signalHandler']); + } } /** @@ -128,9 +137,9 @@ } /** - * Sets whether updates should be processed in a forked process + * Sets the fork value. * - * @param bool $fork + * @param bool $fork The fork value to set. * @return void */ public function setFork(bool $fork): void @@ -139,15 +148,31 @@ } /** - * Gets the current fork setting + * Retrieves the current fork setting. * - * @return bool + * @return bool The configured fork value. */ public function getFork(): bool { return $this->fork; } + private function signalHandler(int $signal): void + { + if ($signal === SIGCHLD) + { + $i = -1; + while (($pid = pcntl_wait($i, WNOHANG)) > 0) + { + $key = array_search($pid, $this->childPids); + if ($key !== false) + { + unset($this->childPids[$key]); + } + } + } + } + /** * Handles incoming updates by fetching and processing them with appropriate event handlers. * @@ -155,6 +180,12 @@ */ public function handleUpdates(): void { + // Install signal handler + if ($this->fork && function_exists('pcntl_signal_dispatch')) + { + pcntl_signal_dispatch(); + } + $updates = $this->getUpdates(offset: ($this->offset ?: 0), limit: $this->limit, timeout: $this->timeout, allowed_updates: $this->retrieveAllowedUpdates()); if (empty($updates)) @@ -162,59 +193,90 @@ return; } - // Update the offset based on the last update ID - $lastUpdate = end($updates); - if ($lastUpdate->getUpdateId() > ($this->offset ?? 0)) + // Track the highest update ID we've seen + $maxUpdateId = null; + + foreach ($updates as $update) { - $this->offset = $lastUpdate->getUpdateId() + 1; + // Update the maximum update ID as we go + if ($maxUpdateId === null || $update->getUpdateId() > $maxUpdateId) + { + $maxUpdateId = $update->getUpdateId(); + } + + if ($this->fork) + { + // Clean up any finished processes + if (function_exists('pcntl_signal_dispatch')) + { + pcntl_signal_dispatch(); + } + + $pid = pcntl_fork(); + + if ($pid == -1) + { + // Fork failed + throw new RuntimeException('Failed to fork process for update handling'); + } + elseif ($pid) + { + // Parent process + $this->childPids[] = $pid; + + // If we have too many child processes, wait for some to finish + $maxChildren = 32; // Adjust this value based on your system's capabilities + while (count($this->childPids) >= $maxChildren) + { + if (function_exists('pcntl_signal_dispatch')) + { + pcntl_signal_dispatch(); + } + + usleep(10000); // Sleep for 10ms to prevent CPU hogging + } + } + else + { + // Child process + try + { + $this->handleUpdate($update); + } + finally + { + exit(0); + } + } + } + else + { + $this->handleUpdate($update); + } } + // Update the offset based on the highest update ID we've seen + if ($maxUpdateId !== null) + { + $this->offset = $maxUpdateId + 1; + } + + // If forking is enabled, ensure we clean up any remaining child processes if ($this->fork) { - $this->handleUpdatesInFork($updates); - } - else - { - foreach($updates as $update) + // Wait for remaining child processes to finish + while (!empty($this->childPids)) { - $this->handleUpdate($update); + if (function_exists('pcntl_signal_dispatch')) + { + pcntl_signal_dispatch(); + } + + usleep(10000); // Sleep for 10ms to prevent CPU hogging } } } - /** - * Handles updates in a forked child process - * - * @param array $updates - * @return void - */ - private function handleUpdatesInFork(array $updates): void - { - $pid = pcntl_fork(); - - if ($pid == -1) - { - // Fork failed - throw new RuntimeException('Failed to fork process for update handling'); - } - elseif ($pid) - { - // Parent process - // Wait for child to finish to prevent zombie processes - pcntl_wait($status); - } - else - { - // Child process - foreach($updates as $update) - { - $this->handleUpdate($update); - } - - exit(0); - } - } - /** * Retrieves the allowed updates. *