Implemented PeerManager
This commit is contained in:
parent
143e50c1ed
commit
80f6650eaf
6 changed files with 382 additions and 13 deletions
|
@ -54,6 +54,8 @@
|
|||
|
||||
public const PEER_METADATA_NOT_FOUND = 2001;
|
||||
|
||||
public const INVALID_FEDERATED_ADDRESS = 2002;
|
||||
|
||||
|
||||
public const ALL = [
|
||||
self::INTERNAL_SERVER_ERROR,
|
||||
|
@ -66,6 +68,7 @@
|
|||
self::CLIENT_DISABLED,
|
||||
|
||||
self::INVALID_PEER_METADATA,
|
||||
|
||||
self::PEER_METADATA_NOT_FOUND,
|
||||
self::INVALID_FEDERATED_ADDRESS
|
||||
];
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
<?php
|
||||
|
||||
namespace FederationLib\Exceptions\Standard;
|
||||
|
||||
use Exception;
|
||||
use FederationLib\Enums\Standard\ErrorCodes;
|
||||
use Throwable;
|
||||
|
||||
class InvalidFederatedAddressException extends Exception
|
||||
{
|
||||
/**
|
||||
* @param string $message
|
||||
* @param Throwable|null $previous
|
||||
*/
|
||||
public function __construct(string $message = "", ?Throwable $previous = null)
|
||||
{
|
||||
parent::__construct($message, ErrorCodes::INVALID_FEDERATED_ADDRESS, $previous);
|
||||
}
|
||||
}
|
|
@ -199,7 +199,7 @@
|
|||
* @param ClientIdentity|null $identity
|
||||
* @param string|ClientRecord $client_uuid
|
||||
* @return Objects\Standard\Client
|
||||
*@throws ClientNotFoundException
|
||||
* @throws ClientNotFoundException
|
||||
* @throws DatabaseException
|
||||
* @throws InternalServerException
|
||||
* @throws AccessDeniedException
|
||||
|
|
|
@ -136,10 +136,7 @@
|
|||
{
|
||||
try
|
||||
{
|
||||
$redis = RedisConnectionManager::getConnection(
|
||||
Configuration::getObjectCacheServerPreference('client_objects'),
|
||||
Configuration::getObjectCacheServerFallback('client_objects')
|
||||
);
|
||||
$redis = RedisConnectionManager::getConnectionFromConfig('client_objects');
|
||||
|
||||
if($redis->exists($client_uuid))
|
||||
{
|
||||
|
@ -186,10 +183,10 @@
|
|||
{
|
||||
try
|
||||
{
|
||||
$redis = RedisConnectionManager::getConnection(
|
||||
Configuration::getObjectCacheServerPreference('client_objects'),
|
||||
Configuration::getObjectCacheServerFallback('client_objects')
|
||||
);
|
||||
if(!isset($redis))
|
||||
{
|
||||
$redis = RedisConnectionManager::getConnectionFromConfig('client_objects');
|
||||
}
|
||||
|
||||
$redis->hMSet($client->getUuid(), $client->toArray());
|
||||
if(Configuration::getObjectCacheTTL('client_objects') > 0)
|
||||
|
|
|
@ -2,11 +2,28 @@
|
|||
|
||||
namespace FederationLib\Managers;
|
||||
|
||||
use Doctrine\DBAL\ParameterType;
|
||||
use Exception;
|
||||
use FederationLib\Classes\Configuration;
|
||||
use FederationLib\Classes\Database;
|
||||
use FederationLib\Enums\DatabaseTables;
|
||||
use FederationLib\Enums\Misc;
|
||||
use FederationLib\Enums\Standard\UserPeerType;
|
||||
use FederationLib\Exceptions\CacheConnectionException;
|
||||
use FederationLib\Exceptions\DatabaseException;
|
||||
use FederationLib\Exceptions\Standard\InvalidFederatedAddressException;
|
||||
use FederationLib\Exceptions\Standard\InvalidPeerMetadataException;
|
||||
use FederationLib\Exceptions\Standard\PeerNotFoundException;
|
||||
use FederationLib\Exceptions\Standard\UnsupportedPeerType;
|
||||
use FederationLib\FederationLib;
|
||||
use FederationLib\Interfaces\PeerMetadataInterface;
|
||||
use FederationLib\Interfaces\PeerMetadataManagerInterface;
|
||||
use FederationLib\Objects\ClientRecord;
|
||||
use FederationLib\Objects\ParsedFederatedAddress;
|
||||
use FederationLib\Objects\PeerRecord;
|
||||
use FederationLib\Objects\Standard\PeerMetadata\TelegramUserMetadata;
|
||||
use LogLib\Log;
|
||||
use RedisException;
|
||||
|
||||
class PeerManager
|
||||
{
|
||||
|
@ -31,8 +48,320 @@
|
|||
$this->metadata_managers[UserPeerType::TELEGRAM_USER] = new TelegramUserMetadata();
|
||||
}
|
||||
|
||||
public function registerPeer(string $federated_address, PeerMetadataInterface $peer_metadata): void
|
||||
/**
|
||||
* Syncs the metadata of the given peer
|
||||
*
|
||||
* @param string $peer_type
|
||||
* @param ClientRecord|string $client_uuid
|
||||
* @param PeerMetadataInterface $peer_metadata
|
||||
* @return void
|
||||
*/
|
||||
private function syncMetadata(string $peer_type, ClientRecord|string $client_uuid, PeerMetadataInterface $peer_metadata): void
|
||||
{
|
||||
$this->metadata_managers[$peer_type]->syncMetadata($client_uuid, $peer_metadata);
|
||||
}
|
||||
|
||||
/**
|
||||
* Syncs the peer with the given federated address and metadata
|
||||
*
|
||||
* @param ClientRecord|string $client_uuid
|
||||
* @param ParsedFederatedAddress|string $federated_address
|
||||
* @param array $peer_metadata
|
||||
* @return void
|
||||
* @throws CacheConnectionException
|
||||
* @throws DatabaseException
|
||||
* @throws InvalidFederatedAddressException
|
||||
* @throws InvalidPeerMetadataException
|
||||
* @throws RedisException
|
||||
* @throws UnsupportedPeerType
|
||||
*/
|
||||
public function syncPeer(ClientRecord|string $client_uuid, ParsedFederatedAddress|string $federated_address, array $peer_metadata): void
|
||||
{
|
||||
if(!($federated_address instanceof ParsedFederatedAddress))
|
||||
{
|
||||
$federated_address = new ParsedFederatedAddress($federated_address);
|
||||
}
|
||||
|
||||
if($client_uuid instanceof ClientRecord)
|
||||
{
|
||||
$client_uuid = $client_uuid->getUuid();
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
$this->updateLastSeen($client_uuid, $this->getPeer($federated_address));
|
||||
}
|
||||
catch(PeerNotFoundException $e)
|
||||
{
|
||||
// Registering the peer will also sync the metadata
|
||||
$this->registerPeer($client_uuid, $federated_address, $peer_metadata);
|
||||
unset($e);
|
||||
return;
|
||||
}
|
||||
|
||||
// Parse the peer metadata
|
||||
$metadata = match ($federated_address->getPeerClass())
|
||||
{
|
||||
UserPeerType::TELEGRAM_USER => TelegramUserMetadata::fromArray($peer_metadata),
|
||||
default => throw new UnsupportedPeerType(sprintf('Peer type %s is not supported', $federated_address->getPeerClass())),
|
||||
};
|
||||
|
||||
// Validate the metadata & federated address
|
||||
if($metadata->validate() && ($metadata->getFederatedAddress() !== $federated_address->getAddress()))
|
||||
{
|
||||
throw new InvalidFederatedAddressException(sprintf('Peer metadata federated address (%s) does not match the provided federated address (%s)', $metadata->getFederatedAddress(), $federated_address->getAddress()));
|
||||
}
|
||||
|
||||
$this->syncMetadata($federated_address->getPeerType(), $client_uuid, $metadata);
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers the peer with the given federated address and metadata
|
||||
*
|
||||
* @param ClientRecord|string $client_uuid
|
||||
* @param ParsedFederatedAddress|string $federated_address
|
||||
* @param array $peer_metadata
|
||||
* @return void
|
||||
* @throws DatabaseException
|
||||
* @throws InvalidFederatedAddressException
|
||||
* @throws InvalidPeerMetadataException
|
||||
* @throws UnsupportedPeerType
|
||||
*/
|
||||
public function registerPeer(ClientRecord|string $client_uuid, ParsedFederatedAddress|string $federated_address, array $peer_metadata): void
|
||||
{
|
||||
if(!($federated_address instanceof ParsedFederatedAddress))
|
||||
{
|
||||
$federated_address = new ParsedFederatedAddress($federated_address);
|
||||
}
|
||||
|
||||
if($client_uuid instanceof ClientRecord)
|
||||
{
|
||||
$client_uuid = $client_uuid->getUuid();
|
||||
}
|
||||
|
||||
// Parse the peer metadata
|
||||
$metadata = match ($federated_address->getPeerClass())
|
||||
{
|
||||
UserPeerType::TELEGRAM_USER => TelegramUserMetadata::fromArray($peer_metadata),
|
||||
default => throw new UnsupportedPeerType(sprintf('Peer type %s is not supported', $federated_address->getPeerClass())),
|
||||
};
|
||||
|
||||
// Validate the metadata & federated address
|
||||
if($metadata->validate() && ($metadata->getFederatedAddress() !== $federated_address->getAddress()))
|
||||
{
|
||||
throw new InvalidFederatedAddressException(sprintf('Peer metadata federated address (%s) does not match the provided federated address (%s)', $metadata->getFederatedAddress(), $federated_address->getAddress()));
|
||||
}
|
||||
|
||||
// Register the peer into the database.
|
||||
$timestamp = time();
|
||||
$qb = Database::getConnection()->createQueryBuilder();
|
||||
$qb->insert(DatabaseTables::PEERS);
|
||||
$qb->setValue('federated_address', $qb->createNamedParameter($federated_address->getAddress()));
|
||||
$qb->setValue('client_first_seen', $qb->createNamedParameter($client_uuid));
|
||||
$qb->setValue('client_last_seen', $qb->createNamedParameter($client_uuid));
|
||||
$qb->setValue('discovered_timestamp', $qb->createNamedParameter($timestamp, ParameterType::INTEGER));
|
||||
$qb->setValue('seen_timestamp', $qb->createNamedParameter($timestamp, ParameterType::INTEGER));
|
||||
$qb->setMaxResults(1);
|
||||
|
||||
try
|
||||
{
|
||||
$qb->executeQuery();
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
throw new DatabaseException('Failed to register peer into database', $e);
|
||||
}
|
||||
|
||||
// Finally, sync the metadata
|
||||
try
|
||||
{
|
||||
$this->syncMetadata($federated_address->getPeerType(), $client_uuid, $metadata);
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
$this->deletePeer($federated_address->getAddress());
|
||||
throw new InvalidPeerMetadataException('Failed to sync peer metadata', $e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the given peer from the database
|
||||
*
|
||||
* @param ParsedFederatedAddress|string $federated_address
|
||||
* @throws DatabaseException
|
||||
* @return void
|
||||
*/
|
||||
public function deletePeer(ParsedFederatedAddress|string $federated_address): void
|
||||
{
|
||||
if(!($federated_address instanceof ParsedFederatedAddress))
|
||||
{
|
||||
$federated_address = new ParsedFederatedAddress($federated_address);
|
||||
}
|
||||
|
||||
$qb = Database::getConnection()->createQueryBuilder();
|
||||
$qb->delete(DatabaseTables::PEERS);
|
||||
$qb->where('federated_address = :federated_address');
|
||||
$qb->setParameter('federated_address', $federated_address->getAddress());
|
||||
$qb->setMaxResults(1);
|
||||
|
||||
try
|
||||
{
|
||||
$qb->executeQuery();
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
throw new DatabaseException('Failed to delete peer from database', $e);
|
||||
}
|
||||
|
||||
if(Configuration::isCacheSystemEnabled() && Configuration::getObjectCacheEnabled('peer_objects'))
|
||||
{
|
||||
try
|
||||
{
|
||||
$redis = RedisConnectionManager::getConnectionFromConfig('peer_objects');
|
||||
$redis->del($federated_address->getAddress());
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
Log::warning(Misc::FEDERATIONLIB, sprintf('Failed to delete peer object %s from cache', $federated_address->getAddress()), $e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a peer record for the given federated address
|
||||
*
|
||||
* @param string $federated_address
|
||||
* @return PeerRecord
|
||||
* @throws CacheConnectionException
|
||||
* @throws DatabaseException
|
||||
* @throws PeerNotFoundException
|
||||
* @throws RedisException
|
||||
*/
|
||||
public function getPeer(string $federated_address): PeerRecord
|
||||
{
|
||||
if(Configuration::isCacheSystemEnabled() && Configuration::getObjectCacheEnabled('peer_objects'))
|
||||
{
|
||||
$redis = RedisConnectionManager::getConnectionFromConfig('peer_objects');
|
||||
|
||||
if($redis->exists($federated_address))
|
||||
{
|
||||
$peer = PeerRecord::fromArray($redis->hGetAll($federated_address));
|
||||
Log::debug(Misc::FEDERATIONLIB, sprintf('Loaded client object %s from cache', $federated_address));
|
||||
return $peer;
|
||||
}
|
||||
}
|
||||
|
||||
$qb = Database::getConnection()->createQueryBuilder();
|
||||
$qb->select(
|
||||
'federated_address',
|
||||
'client_first_seen',
|
||||
'client_last_seen',
|
||||
'active_restriction',
|
||||
'permission_role',
|
||||
'discovered_timestamp',
|
||||
'seen_timestamp'
|
||||
);
|
||||
$qb->from(DatabaseTables::PEERS);
|
||||
$qb->where('federated_address = :federated_address');
|
||||
$qb->setParameter('federated_address', $federated_address);
|
||||
$qb->setMaxResults(1);
|
||||
|
||||
try
|
||||
{
|
||||
$result = $qb->executeQuery();
|
||||
|
||||
if($result->rowCount() === 0)
|
||||
{
|
||||
throw new PeerNotFoundException($federated_address);
|
||||
}
|
||||
|
||||
$peer = PeerRecord::fromArray($result->fetchAssociative());
|
||||
}
|
||||
catch(PeerNotFoundException $e)
|
||||
{
|
||||
throw $e;
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
throw new DatabaseException('Failed to get peer from database', $e);
|
||||
}
|
||||
|
||||
if(Configuration::isCacheSystemEnabled() && Configuration::getObjectCacheEnabled('peer_objects'))
|
||||
{
|
||||
try
|
||||
{
|
||||
if(!isset($redis))
|
||||
{
|
||||
$redis = RedisConnectionManager::getConnectionFromConfig('peer_objects');
|
||||
}
|
||||
|
||||
$redis->hMSet($peer->getFederatedAddress(), $peer->toArray());
|
||||
|
||||
if(Configuration::getObjectCacheTtl('peer_objects') > 0)
|
||||
{
|
||||
$redis->expire($peer->getFederatedAddress(), Configuration::getObjectCacheTtl('peer_objects'));
|
||||
}
|
||||
|
||||
Log::debug(Misc::FEDERATIONLIB, sprintf('Cached peer object %s', $peer->getFederatedAddress()));
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
Log::warning(Misc::FEDERATIONLIB, sprintf('Failed to cache peer object %s', $peer->getFederatedAddress()), $e);
|
||||
}
|
||||
}
|
||||
|
||||
return $peer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the peer's last seen timestamp
|
||||
*
|
||||
* @param ClientRecord|string $client_uuid
|
||||
* @param PeerRecord|string $federated_address
|
||||
* @return void
|
||||
* @throws DatabaseException
|
||||
*/
|
||||
public function updateLastSeen(ClientRecord|string $client_uuid, PeerRecord|string $federated_address): void
|
||||
{
|
||||
if($client_uuid instanceof ClientRecord)
|
||||
{
|
||||
$client_uuid = $client_uuid->getUuid();
|
||||
}
|
||||
|
||||
if($federated_address instanceof PeerRecord)
|
||||
{
|
||||
$federated_address = $federated_address->getFederatedAddress();
|
||||
}
|
||||
|
||||
$qb = Database::getConnection()->createQueryBuilder();
|
||||
$qb->update(DatabaseTables::PEERS);
|
||||
$qb->set('client_last_seen', $qb->createNamedParameter($client_uuid));
|
||||
$qb->set('seen_timestamp', $qb->createNamedParameter(time(), ParameterType::INTEGER));
|
||||
$qb->where('federated_address = :federated_address');
|
||||
$qb->setParameter('federated_address', $federated_address);
|
||||
$qb->setMaxResults(1);
|
||||
|
||||
try
|
||||
{
|
||||
$qb->executeQuery();
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
throw new DatabaseException('Failed to update peer last seen', $e);
|
||||
}
|
||||
|
||||
if(Configuration::isCacheSystemEnabled() && Configuration::getObjectCacheEnabled('peer_objects'))
|
||||
{
|
||||
try
|
||||
{
|
||||
$redis = RedisConnectionManager::getConnectionFromConfig('peer_objects');
|
||||
$redis->hSet($federated_address, 'client_last_seen', $client_uuid);
|
||||
$redis->hSet($federated_address, 'seen_timestamp', time());
|
||||
}
|
||||
catch(Exception $e)
|
||||
{
|
||||
Log::warning(Misc::FEDERATIONLIB, sprintf('Failed to update peer object %s in cache', $federated_address), $e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -50,7 +50,7 @@
|
|||
*/
|
||||
public function getSource(): string
|
||||
{
|
||||
return $this->source;
|
||||
return strtolower($this->source);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -60,7 +60,18 @@
|
|||
*/
|
||||
public function getPeerType(): string
|
||||
{
|
||||
return $this->peer_type;
|
||||
return strtolower($this->peer_type);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the full peer class name without the ID
|
||||
* Eg; telegram.user
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
public function getPeerClass(): string
|
||||
{
|
||||
return strtolower(sprintf('%s.%s', $this->source, $this->peer_type));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -83,4 +94,14 @@
|
|||
return sprintf('%s.%s:%s', $this->source, $this->peer_type, $this->unique_identifier);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Standard Federated Address of the peer
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
public function __toString(): string
|
||||
{
|
||||
return $this->getAddress();
|
||||
}
|
||||
|
||||
}
|
Loading…
Add table
Reference in a new issue