Refactor PollingBot to use PsyncLib for updates

This commit is contained in:
netkas 2024-12-04 00:45:19 -05:00
parent a66f27b5e6
commit 8408430ef8
3 changed files with 34 additions and 117 deletions

View file

@ -36,6 +36,12 @@
"version": "latest", "version": "latest",
"source_type": "remote", "source_type": "remote",
"source": "nosial/libs.opts=latest@n64" "source": "nosial/libs.opts=latest@n64"
},
{
"name": "net.nosial.psynclib",
"version": "latest",
"source_type": "remote",
"source": "nosial/libs.psync=latest@n64"
} }
], ],
"configurations": [ "configurations": [

View file

@ -504,6 +504,20 @@
} }
} }
/**
* Processes an array of updates.
*
* @param array $updates An array containing update objects to be handled. Each update will be individually processed by calling the handleUpdate method.
* @return void This method does not return a value.
*/
public function handleUpdates(array $updates): void
{
foreach($updates as $update)
{
$this->handleUpdate($update);
}
}
/** /**
* Sends a request by executing the specified method with the given parameters. * Sends a request by executing the specified method with the given parameters.
* *

View file

@ -2,8 +2,7 @@
namespace TgBotLib; namespace TgBotLib;
use RuntimeException; use PsyncLib\Psync;
use function RuntimeException;
/** /**
* PollingBot class that extends Bot for handling updates using polling. * PollingBot class that extends Bot for handling updates using polling.
@ -14,8 +13,6 @@
private int $limit; private int $limit;
private int $timeout; private int $timeout;
private array $allowedUpdates; private array $allowedUpdates;
private bool $fork;
private array $childPids;
/** /**
* Constructor for the class, initializing with a Bot instance. * Constructor for the class, initializing with a Bot instance.
@ -30,8 +27,6 @@
$this->limit = 100; $this->limit = 100;
$this->timeout = 0; $this->timeout = 0;
$this->allowedUpdates = []; $this->allowedUpdates = [];
$this->fork = false;
$this->childPids = [];
// Register signal handler for child processes // Register signal handler for child processes
if (function_exists('pcntl_signal')) if (function_exists('pcntl_signal'))
@ -115,6 +110,11 @@
$this->allowedUpdates = $allowedUpdates; $this->allowedUpdates = $allowedUpdates;
} }
/**
* Retrieves the list of allowed updates.
*
* @return array Returns an array containing the allowed updates.
*/
public function getAllowedUpdates(): array public function getAllowedUpdates(): array
{ {
return $this->allowedUpdates; return $this->allowedUpdates;
@ -137,55 +137,14 @@
} }
/** /**
* Sets the fork value. * Polls for updates and processes them. Installs a signal handler if
* needed, fetches updates, tracks the highest update ID, updates the
* polling offset, and handles the updates using the Psync mechanism.
* *
* @param bool $fork The fork value to set. * @return void This method does not return any value.
* @return void
*/ */
public function setFork(bool $fork): void public function poll(): void
{ {
$this->fork = $fork;
}
/**
* Retrieves the current fork setting.
*
* @return bool The configured fork value.
*/
public function getFork(): bool
{
return $this->fork;
}
private function signalHandler(int $signal): void
{
if ($signal === SIGCHLD)
{
$i = -1;
while (($pid = pcntl_wait($i, WNOHANG)) > 0)
{
$key = array_search($pid, $this->childPids);
if ($key !== false)
{
unset($this->childPids[$key]);
}
}
}
}
/**
* Handles incoming updates by fetching and processing them with appropriate event handlers.
*
* @return void
*/
public function handleUpdates(): void
{
// Install signal handler
if ($this->fork && function_exists('pcntl_signal_dispatch'))
{
pcntl_signal_dispatch();
}
$updates = $this->getUpdates(offset: ($this->offset ?: 0), limit: $this->limit, timeout: $this->timeout, allowed_updates: $this->retrieveAllowedUpdates()); $updates = $this->getUpdates(offset: ($this->offset ?: 0), limit: $this->limit, timeout: $this->timeout, allowed_updates: $this->retrieveAllowedUpdates());
if (empty($updates)) if (empty($updates))
@ -195,7 +154,6 @@
// Track the highest update ID we've seen // Track the highest update ID we've seen
$maxUpdateId = null; $maxUpdateId = null;
foreach ($updates as $update) foreach ($updates as $update)
{ {
// Update the maximum update ID as we go // Update the maximum update ID as we go
@ -203,56 +161,6 @@
{ {
$maxUpdateId = $update->getUpdateId(); $maxUpdateId = $update->getUpdateId();
} }
if ($this->fork)
{
// Clean up any finished processes
if (function_exists('pcntl_signal_dispatch'))
{
pcntl_signal_dispatch();
}
$pid = pcntl_fork();
if ($pid == -1)
{
// Fork failed
throw new RuntimeException('Failed to fork process for update handling');
}
elseif ($pid)
{
// Parent process
$this->childPids[] = $pid;
// If we have too many child processes, wait for some to finish
$maxChildren = 32; // Adjust this value based on your system's capabilities
while (count($this->childPids) >= $maxChildren)
{
if (function_exists('pcntl_signal_dispatch'))
{
pcntl_signal_dispatch();
}
usleep(10000); // Sleep for 10ms to prevent CPU hogging
}
}
else
{
// Child process
try
{
$this->handleUpdate($update);
}
finally
{
exit(0);
}
}
}
else
{
$this->handleUpdate($update);
}
} }
// Update the offset based on the highest update ID we've seen // Update the offset based on the highest update ID we've seen
@ -261,20 +169,9 @@
$this->offset = $maxUpdateId + 1; $this->offset = $maxUpdateId + 1;
} }
// If forking is enabled, ensure we clean up any remaining child processes // Pass the method name as a string and the object
if ($this->fork) Psync::do([$this, 'handleUpdates'], [$updates]);
{ Psync::clean();
// Wait for remaining child processes to finish
while (!empty($this->childPids))
{
if (function_exists('pcntl_signal_dispatch'))
{
pcntl_signal_dispatch();
}
usleep(10000); // Sleep for 10ms to prevent CPU hogging
}
}
} }
/** /**