Add forking capability for update processing

This commit is contained in:
netkas 2024-11-04 00:56:21 -05:00
parent 1eaf2e7d91
commit e88e18995e

View file

@ -2,8 +2,11 @@
namespace TgBotLib;
use RuntimeException;
use TgBotLib\Abstracts\UpdateEvent;
use TgBotLib\Classes\Logger;
use TgBotLib\Enums\EventType;
use Throwable;
/**
* PollingBot class that extends Bot for handling updates using polling.
@ -14,6 +17,7 @@
private int $limit;
private int $timeout;
private array $allowedUpdates;
private bool $fork;
/**
* Constructor for the class, initializing with a Bot instance.
@ -28,8 +32,15 @@
$this->limit = 100;
$this->timeout = 0;
$this->allowedUpdates = [];
$this->fork = false;
}
/**
* Sets the offset value.
*
* @param int $offset The offset value to be set.
* @return void
*/
public function setOffset(int $offset): void
{
$this->offset = $offset;
@ -120,6 +131,27 @@
$this->allowedUpdates[] = $allowedUpdate;
}
/**
* Sets whether updates should be processed in a forked process
*
* @param bool $fork
* @return void
*/
public function setFork(bool $fork): void
{
$this->fork = $fork;
}
/**
* Gets the current fork setting
*
* @return bool
*/
public function getFork(): bool
{
return $this->fork;
}
/**
* Handles incoming updates by fetching and processing them with appropriate event handlers.
*
@ -127,32 +159,94 @@
*/
public function handleUpdates(): void
{
foreach($this->getUpdates(offset: ($this->offset ?: 0), limit: $this->limit, timeout: $this->timeout, allowed_updates: $this->retrieveAllowedUpdates()) as $update)
$updates = $this->getUpdates(offset: ($this->offset ?: 0), limit: $this->limit, timeout: $this->timeout, allowed_updates: $this->retrieveAllowedUpdates());
if (empty($updates))
{
// Update the last offset if the current Update ID is greater than the offset ID
if($update->getUpdateId() > $this->offset)
{
$this->offset = $update->getUpdateId() + 1;
return;
}
// Update the offset based on the last update ID
$lastUpdate = end($updates);
if ($lastUpdate->getUpdateId() > ($this->offset ?? 0))
{
$this->offset = $lastUpdate->getUpdateId() + 1;
}
if ($this->fork)
{
$this->handleUpdatesInFork($updates);
}
else
{
$this->processUpdates($updates);
}
}
/**
* Handles updates in a forked child process
*
* @param array $updates
* @return void
*/
private function handleUpdatesInFork(array $updates): void
{
$pid = pcntl_fork();
if ($pid == -1)
{
// Fork failed
throw new RuntimeException('Failed to fork process for update handling');
}
elseif ($pid)
{
// Parent process
// Wait for child to finish to prevent zombie processes
pcntl_wait($status);
}
else
{
// Child process
try
{
$this->processUpdates($updates);
exit(0);
}
catch (Throwable $e)
{
Logger::getLogger()->error("Error in forked process: " . $e->getMessage());
exit(1);
}
}
}
/**
* Processes a batch of updates with appropriate event handlers
*
* @param array $updates
* @return void
*/
private function processUpdates(array $updates): void
{
foreach ($updates as $update)
{
$updateByType = $this->getEventHandlersByType(EventType::determineEventType($update));
if(count($updateByType) === 0)
if (count($updateByType) === 0)
{
// If no event handlers are found appropriate for the update type, use the generic update event handler
// So that we don't miss any updates
/** @var UpdateEvent $eventHandler */
foreach($this->getEventHandlersByType(EventType::UPDATE_EVENT) as $eventHandler)
foreach ($this->getEventHandlersByType(EventType::UPDATE_EVENT) as $eventHandler)
{
/** @var UpdateEvent $eventHandler */
(new $eventHandler($update))->handle($this);
}
}
else
{
// Otherwise, use the appropriate event handler for the update type
/** @var UpdateEvent $eventHandler */
foreach($this->getEventHandlersByType(EventType::determineEventType($update)) as $eventHandler)
foreach ($updateByType as $eventHandler)
{
/** @var UpdateEvent $eventHandler */
(new $eventHandler($update))->handle($this);
}
}
@ -166,7 +260,7 @@
*/
private function retrieveAllowedUpdates(): ?array
{
if(count($this->allowedUpdates) === 0)
if (count($this->allowedUpdates) === 0)
{
return null;
}