Add child process management for polling updates

This commit is contained in:
netkas 2024-11-04 22:04:58 -05:00
parent 87f9321280
commit 946e110131
2 changed files with 115 additions and 47 deletions

View file

@ -360,6 +360,8 @@
*/ */
public function handleUpdate(Update $update): void public function handleUpdate(Update $update): void
{ {
Logger::getLogger()->debug(sprintf('Handling update %s', $update->getUpdateId()));
$command = $update?->getAnyMessage()?->getCommand(); $command = $update?->getAnyMessage()?->getCommand();
if($command !== null) if($command !== null)
{ {
@ -368,6 +370,7 @@
/** @var CommandEvent $eventHandler */ /** @var CommandEvent $eventHandler */
foreach($this->getEventHandlersByCommand($command) as $eventHandler) foreach($this->getEventHandlersByCommand($command) as $eventHandler)
{ {
Logger::getLogger()->debug(sprintf('Executing command %s for update %s', $command, $update->getUpdateId()));
(new $eventHandler($update))->handle($this); (new $eventHandler($update))->handle($this);
$commandExecuted = true; $commandExecuted = true;
} }
@ -386,6 +389,7 @@
{ {
foreach ($this->getEventHandlersByType(EventType::UPDATE_EVENT) as $eventHandler) foreach ($this->getEventHandlersByType(EventType::UPDATE_EVENT) as $eventHandler)
{ {
Logger::getLogger()->debug(sprintf('Executing generic update event handler for update %s', $update->getUpdateId()));
/** @var UpdateEvent $eventHandler */ /** @var UpdateEvent $eventHandler */
(new $eventHandler($update))->handle($this); (new $eventHandler($update))->handle($this);
} }
@ -398,6 +402,7 @@
/** @var UpdateEvent $eventHandler */ /** @var UpdateEvent $eventHandler */
foreach($eventHandlers as $eventHandler) foreach($eventHandlers as $eventHandler)
{ {
Logger::getLogger()->debug(sprintf('Executing event handler for type %s for update %s', $eventHandler::getEventType()->value, $update->getUpdateId()));
(new $eventHandler($update))->handle($this); (new $eventHandler($update))->handle($this);
} }
} }
@ -462,6 +467,7 @@
} }
// Support named and positional arguments // Support named and positional arguments
Logger::getLogger()->debug(sprintf('Calling method %s with arguments %s', $name, json_encode($arguments)));
$parameters = $this->parseArguments($name, $arguments); $parameters = $this->parseArguments($name, $arguments);
return $this->sendRequest($name, $parameters); return $this->sendRequest($name, $parameters);
} }

View file

@ -3,6 +3,7 @@
namespace TgBotLib; namespace TgBotLib;
use RuntimeException; use RuntimeException;
use function RuntimeException;
/** /**
* PollingBot class that extends Bot for handling updates using polling. * PollingBot class that extends Bot for handling updates using polling.
@ -14,6 +15,7 @@
private int $timeout; private int $timeout;
private array $allowedUpdates; private array $allowedUpdates;
private bool $fork; private bool $fork;
private array $childPids;
/** /**
* Constructor for the class, initializing with a Bot instance. * Constructor for the class, initializing with a Bot instance.
@ -29,6 +31,13 @@
$this->timeout = 0; $this->timeout = 0;
$this->allowedUpdates = []; $this->allowedUpdates = [];
$this->fork = false; $this->fork = false;
$this->childPids = [];
// Register signal handler for child processes
if (function_exists('pcntl_signal'))
{
pcntl_signal(SIGCHLD, [$this, 'signalHandler']);
}
} }
/** /**
@ -128,9 +137,9 @@
} }
/** /**
* Sets whether updates should be processed in a forked process * Sets the fork value.
* *
* @param bool $fork * @param bool $fork The fork value to set.
* @return void * @return void
*/ */
public function setFork(bool $fork): void public function setFork(bool $fork): void
@ -139,15 +148,31 @@
} }
/** /**
* Gets the current fork setting * Retrieves the current fork setting.
* *
* @return bool * @return bool The configured fork value.
*/ */
public function getFork(): bool public function getFork(): bool
{ {
return $this->fork; return $this->fork;
} }
private function signalHandler(int $signal): void
{
if ($signal === SIGCHLD)
{
$i = -1;
while (($pid = pcntl_wait($i, WNOHANG)) > 0)
{
$key = array_search($pid, $this->childPids);
if ($key !== false)
{
unset($this->childPids[$key]);
}
}
}
}
/** /**
* Handles incoming updates by fetching and processing them with appropriate event handlers. * Handles incoming updates by fetching and processing them with appropriate event handlers.
* *
@ -155,6 +180,12 @@
*/ */
public function handleUpdates(): void public function handleUpdates(): void
{ {
// Install signal handler
if ($this->fork && function_exists('pcntl_signal_dispatch'))
{
pcntl_signal_dispatch();
}
$updates = $this->getUpdates(offset: ($this->offset ?: 0), limit: $this->limit, timeout: $this->timeout, allowed_updates: $this->retrieveAllowedUpdates()); $updates = $this->getUpdates(offset: ($this->offset ?: 0), limit: $this->limit, timeout: $this->timeout, allowed_updates: $this->retrieveAllowedUpdates());
if (empty($updates)) if (empty($updates))
@ -162,59 +193,90 @@
return; return;
} }
// Update the offset based on the last update ID // Track the highest update ID we've seen
$lastUpdate = end($updates); $maxUpdateId = null;
if ($lastUpdate->getUpdateId() > ($this->offset ?? 0))
foreach ($updates as $update)
{ {
$this->offset = $lastUpdate->getUpdateId() + 1; // Update the maximum update ID as we go
if ($maxUpdateId === null || $update->getUpdateId() > $maxUpdateId)
{
$maxUpdateId = $update->getUpdateId();
}
if ($this->fork)
{
// Clean up any finished processes
if (function_exists('pcntl_signal_dispatch'))
{
pcntl_signal_dispatch();
}
$pid = pcntl_fork();
if ($pid == -1)
{
// Fork failed
throw new RuntimeException('Failed to fork process for update handling');
}
elseif ($pid)
{
// Parent process
$this->childPids[] = $pid;
// If we have too many child processes, wait for some to finish
$maxChildren = 32; // Adjust this value based on your system's capabilities
while (count($this->childPids) >= $maxChildren)
{
if (function_exists('pcntl_signal_dispatch'))
{
pcntl_signal_dispatch();
}
usleep(10000); // Sleep for 10ms to prevent CPU hogging
}
}
else
{
// Child process
try
{
$this->handleUpdate($update);
}
finally
{
exit(0);
}
}
}
else
{
$this->handleUpdate($update);
}
} }
// Update the offset based on the highest update ID we've seen
if ($maxUpdateId !== null)
{
$this->offset = $maxUpdateId + 1;
}
// If forking is enabled, ensure we clean up any remaining child processes
if ($this->fork) if ($this->fork)
{ {
$this->handleUpdatesInFork($updates); // Wait for remaining child processes to finish
} while (!empty($this->childPids))
else
{
foreach($updates as $update)
{ {
$this->handleUpdate($update); if (function_exists('pcntl_signal_dispatch'))
{
pcntl_signal_dispatch();
}
usleep(10000); // Sleep for 10ms to prevent CPU hogging
} }
} }
} }
/**
* Handles updates in a forked child process
*
* @param array $updates
* @return void
*/
private function handleUpdatesInFork(array $updates): void
{
$pid = pcntl_fork();
if ($pid == -1)
{
// Fork failed
throw new RuntimeException('Failed to fork process for update handling');
}
elseif ($pid)
{
// Parent process
// Wait for child to finish to prevent zombie processes
pcntl_wait($status);
}
else
{
// Child process
foreach($updates as $update)
{
$this->handleUpdate($update);
}
exit(0);
}
}
/** /**
* Retrieves the allowed updates. * Retrieves the allowed updates.
* *