private fork from github

This commit is contained in:
soneill 2020-08-30 19:59:19 +12:00
commit ddcacca5c3
37 changed files with 5748 additions and 0 deletions

View file

@ -0,0 +1,269 @@
<?php
namespace Exodus4D\Socket\Component;
use Exodus4D\Socket\Data\Payload;
use Exodus4D\Socket\Log\Store;
use Ratchet\ConnectionInterface;
use Ratchet\MessageComponentInterface;
use React\EventLoop\TimerInterface;
abstract class AbstractMessageComponent implements MessageComponentInterface {
/**
* unique name for this component
* -> should be overwritten in child instances
* -> is used as "log store" name
*/
const COMPONENT_NAME = 'default';
/**
* log message server start
*/
const LOG_TEXT_SERVER_START = 'start WebSocket server…';
/**
* store for logs
* @var Store
*/
protected $logStore;
/**
* stores all active connections
* -> regardless of its subscription state
* [
* '$conn1->resourceId' => [
* 'connection' => $conn1,
* 'data' => null
* ],
* '$conn2->resourceId' => [
* 'connection' => $conn2,
* 'data' => null
* ]
* ]
* @var array
*/
private $connections;
/**
* max count of concurrent open connections
* @var int
*/
private $maxConnections = 0;
/**
* AbstractMessageComponent constructor.
* @param Store $store
*/
public function __construct(Store $store){
$this->connections = [];
$this->logStore = $store;
$this->log(['debug', 'info'], null, 'START', static::LOG_TEXT_SERVER_START);
}
// Connection callbacks from MessageComponentInterface ============================================================
/**
* new client connection onOpen
* @param ConnectionInterface $conn
*/
public function onOpen(ConnectionInterface $conn){
$this->log(['debug'], $conn, __FUNCTION__, 'open connection');
$this->addConnection($conn);
}
/**
* client connection onClose
* @param ConnectionInterface $conn
*/
public function onClose(ConnectionInterface $conn){
$this->log(['debug'], $conn, __FUNCTION__, 'close connection');
$this->removeConnection($conn);
}
/**
* client connection onError
* @param ConnectionInterface $conn
* @param \Exception $e
*/
public function onError(ConnectionInterface $conn, \Exception $e){
$this->log(['debug', 'error'], $conn, __FUNCTION__, $e->getMessage());
}
/**
* new message received from client connection
* @param ConnectionInterface $conn
* @param string $msg
*/
public function onMessage(ConnectionInterface $conn, $msg){
// parse message into payload object
$payload = $this->getPayloadFromMessage($msg);
if($payload){
$this->dispatchWebSocketPayload($conn, $payload);
}
}
// Connection handling ============================================================================================
/**
* add connection
* @param ConnectionInterface $conn
*/
private function addConnection(ConnectionInterface $conn) : void {
$this->connections[$conn->resourceId] = [
'connection' => $conn,
];
$this->maxConnections = max(count($this->connections), $this->maxConnections);
}
/**
* remove connection
* @param ConnectionInterface $conn
*/
private function removeConnection(ConnectionInterface $conn) : void {
if($this->hasConnection($conn)){
unset($this->connections[$conn->resourceId]);
}
}
/**
* @param ConnectionInterface $conn
* @return bool
*/
protected function hasConnection(ConnectionInterface $conn) : bool {
return isset($this->connections[$conn->resourceId]);
}
/**
* @param int $resourceId
* @return bool
*/
protected function hasConnectionId(int $resourceId) : bool {
return isset($this->connections[$resourceId]);
}
/**
* @param int $resourceId
* @return ConnectionInterface|null
*/
protected function getConnection(int $resourceId) : ?ConnectionInterface {
return $this->hasConnectionId($resourceId) ? $this->connections[$resourceId]['connection'] : null;
}
/**
* update meta data for $conn
* @param ConnectionInterface $conn
*/
protected function updateConnection(ConnectionInterface $conn){
if($this->hasConnection($conn)){
$meta = [
'mTimeSend' => microtime(true)
];
$this->connections[$conn->resourceId]['data'] = array_merge($this->getConnectionData($conn), $meta);
}
}
/**
* get meta data from $conn
* @param ConnectionInterface $conn
* @return array
*/
protected function getConnectionData(ConnectionInterface $conn) : array {
$meta = [];
if($this->hasConnection($conn)){
$meta = (array)$this->connections[$conn->resourceId]['data'];
}
return $meta;
}
/**
* wrapper for ConnectionInterface->send()
* -> this stores some meta data to the $conn
* @param ConnectionInterface $conn
* @param $data
*/
protected function send(ConnectionInterface $conn, $data){
$conn->send($data);
$this->updateConnection($conn);
}
/**
* @param ConnectionInterface $conn
* @param Payload $payload
*/
abstract protected function dispatchWebSocketPayload(ConnectionInterface $conn, Payload $payload) : void;
/**
* get Payload class from client message
* @param mixed $msg
* @return Payload|null
*/
protected function getPayloadFromMessage($msg) : ?Payload {
$payload = null;
$msg = (array)json_decode($msg, true);
if(isset($msg['task'], $msg['load'])){
$payload = $this->newPayload((string)$msg['task'], $msg['load']);
}
return $payload;
}
/**
* @param string $task
* @param null $load
* @param array|null $characterIds
* @return Payload|null
*/
protected function newPayload(string $task, $load = null, ?array $characterIds = null) : ?Payload {
$payload = null;
try{
$payload = new Payload($task, $load, $characterIds);
}catch(\Exception $e){
$this->log(['debug', 'error'], null, __FUNCTION__, $e->getMessage());
}
return $payload;
}
/**
* get WebSocket stats data
* @return array
*/
public function getSocketStats() : array {
return [
'connections' => count($this->connections),
'maxConnections' => $this->maxConnections,
'logs' => array_reverse($this->logStore->getStore())
];
}
/**
* @param $logTypes
* @param ConnectionInterface|null $connection
* @param string $action
* @param string $message
*/
protected function log($logTypes, ?ConnectionInterface $connection, string $action, string $message = '') : void {
if($this->logStore){
$remoteAddress = $connection ? $connection->remoteAddress : null;
$resourceId = $connection ? $connection->resourceId : null;
$this->logStore->log($logTypes, $remoteAddress, $resourceId, $action, $message);
}
}
/**
*
* @param TimerInterface $timer
*/
public function housekeeping(TimerInterface $timer) : void {
}
}

View file

@ -0,0 +1,41 @@
<?php
/**
* Created by PhpStorm.
* User: exodu
* Date: 31.03.2018
* Time: 13:09
*/
namespace Exodus4D\Socket\Component\Formatter;
class SubscriptionFormatter{
/**
* group charactersData by systemId based on their current 'log' data
* @param array $charactersData
* @return array
*/
static function groupCharactersDataBySystem(array $charactersData) : array {
$data = [];
foreach($charactersData as $characterId => $characterData){
// check if characterData has an active log (active system for character)
$systemId = 0;
if(isset($characterData['log']['system']['id'])){
$systemId = (int)$characterData['log']['system']['id'];
}
if( !isset($data[$systemId]) ){
$systemData = (object)[];
$systemData->id = $systemId;
$data[$systemId] = $systemData;
}
$data[$systemId]->user[] = $characterData;
}
$data = array_values($data);
return $data;
}
}

View file

@ -0,0 +1,76 @@
<?php
/**
* Created by PhpStorm.
* User: exodu
* Date: 03.09.2017
* Time: 17:02
*/
namespace Exodus4D\Socket\Component\Handler;
class LogFileHandler {
const ERROR_DIR_CREATE = 'There is no existing directory at "%s" and its not buildable.';
/**
* steam uri
* @var string
*/
private $stream = '';
/**
* stream dir
* @var string
*/
private $dir = '.';
/**
* file base dir already created
* @var bool
*/
private $dirCreated = false;
public function __construct(string $stream){
$this->stream = $stream;
$this->dir = dirname($this->stream);
$this->createDir();
}
/**
* write log data into to file
* @param array $log
*/
public function write(array $log){
$log = (string)json_encode($log, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
if( !empty($log) ){
if($stream = fopen($this->stream, 'a')){
flock($stream, LOCK_EX);
fwrite($stream, $log . PHP_EOL);
flock($stream, LOCK_UN);
fclose($stream);
// logs should be writable for non webSocket user too
@chmod($this->stream, 0666);
}
}
}
/**
* create directory
*/
private function createDir(){
// Do not try to create dir if it has already been tried.
if ($this->dirCreated){
return;
}
if ($this->dir && !is_dir($this->dir)){
$status = mkdir($this->dir, 0777, true);
if (false === $status) {
throw new \UnexpectedValueException(sprintf(self::ERROR_DIR_CREATE, $this->dir));
}
}
$this->dirCreated = true;
}
}

View file

@ -0,0 +1,943 @@
<?php
/**
* Created by PhpStorm.
* User: Exodus
* Date: 02.12.2016
* Time: 22:29
*/
namespace Exodus4D\Socket\Component;
use Exodus4D\Socket\Component\Handler\LogFileHandler;
use Exodus4D\Socket\Component\Formatter\SubscriptionFormatter;
use Exodus4D\Socket\Data\Payload;
use Exodus4D\Socket\Log\Store;
use Ratchet\ConnectionInterface;
class MapUpdate extends AbstractMessageComponent {
/**
* unique name for this component
* -> should be overwritten in child instances
* -> is used as "log store" name
*/
const COMPONENT_NAME = 'webSock';
/**
* log message unknown task name
*/
const LOG_TEXT_TASK_UNKNOWN = 'unknown task: %s';
/**
* log message for denied subscription attempt. -> character data unknown
*/
const LOG_TEXT_SUBSCRIBE_DENY = 'sub. denied for charId: %d';
/**
* log message for invalid subscription data
*/
const LOG_TEXT_SUBSCRIBE_INVALID = 'sub. data invalid';
/**
* log message for subscribe characterId
*/
const LOG_TEXT_SUBSCRIBE = 'sub. charId: %s to mapIds: [%s]';
/**
* log message unsubscribe characterId
*/
const LOG_TEXT_UNSUBSCRIBE = 'unsub. charId: %d from mapIds: [%s]';
/**
* log message for map data updated broadcast
*/
const LOG_TEXT_MAP_UPDATE = 'update map data, mapId: %d → broadcast to %d connections';
/**
* log message for map subscriptions data updated broadcast
*/
const LOG_TEXT_MAP_SUBSCRIPTIONS = 'update map subscriptions data, mapId: %d. → broadcast to %d connections';
/**
* log message for delete mapId broadcast
*/
const LOG_TEXT_MAP_DELETE = 'delete mapId: $d → broadcast to %d connections';
/**
* timestamp (ms) from last healthCheck ping
* -> timestamp received from remote TCP socket
* @var int|null
*/
protected $healthCheckToken;
/**
* expire time for map access tokens (seconds)
* @var int
*/
protected $mapAccessExpireSeconds = 30;
/**
* character access tokens for clients
* -> tokens are unique and expire onSubscribe!
* [
* 'charId_1' => [
* [
* 'token' => $characterToken1,
* 'expire' => $expireTime1,
* 'characterData' => $characterData1
* ],
* [
* 'token' => $characterToken2,
* 'expire' => $expireTime2,
* 'characterData' => $characterData1
* ]
* ],
* 'charId_2' => [
* [
* 'token' => $characterToken3,
* 'expire' => $expireTime3,
* 'characterData' => $characterData2
* ]
* ]
* ]
* @var array
*/
protected $characterAccessData;
/**
* access tokens for clients grouped by mapId
* -> tokens are unique and expire onSubscribe!
* @var array
*/
protected $mapAccessData;
/**
* connected characters
* [
* 'charId_1' => [
* '$conn1->resourceId' => $conn1,
* '$conn2->resourceId' => $conn2
* ],
* 'charId_2' => [
* '$conn1->resourceId' => $conn1,
* '$conn3->resourceId' => $conn3
* ]
* ]
* @var array
*/
protected $characters;
/**
* valid client connections subscribed to maps
* [
* 'mapId_1' => [
* 'charId_1' => $charId_1,
* 'charId_2' => $charId_2
* ],
* 'mapId_2' => [
* 'charId_1' => $charId_1,
* 'charId_3' => $charId_3
* ]
* ]
*
* @var array
*/
protected $subscriptions;
/**
* collection of characterData for valid subscriptions
* [
* 'charId_1' => $characterData1,
* 'charId_2' => $characterData2
* ]
*
* @var array
*/
protected $characterData;
/**
* MapUpdate constructor.
* @param Store $store
*/
public function __construct(Store $store){
parent::__construct($store);
$this->characterAccessData = [];
$this->mapAccessData = [];
$this->characters = [];
$this->subscriptions = [];
$this->characterData = [];
}
/**
* new client connection
* @param ConnectionInterface $conn
*/
public function onOpen(ConnectionInterface $conn){
parent::onOpen($conn);
}
/**
* @param ConnectionInterface $conn
*/
public function onClose(ConnectionInterface $conn){
parent::onClose($conn);
$this->unSubscribeConnection($conn);
}
/**
* @param ConnectionInterface $conn
* @param \Exception $e
*/
public function onError(ConnectionInterface $conn, \Exception $e){
parent::onError($conn, $e);
// close connection should trigger the onClose() callback for unSubscribe
$conn->close();
}
/**
* @param ConnectionInterface $conn
* @param string $msg
*/
public function onMessage(ConnectionInterface $conn, $msg){
parent::onMessage($conn, $msg);
}
/**
* @param ConnectionInterface $conn
* @param Payload $payload
*/
protected function dispatchWebSocketPayload(ConnectionInterface $conn, Payload $payload) : void {
switch($payload->task){
case 'healthCheck':
$this->broadcastHealthCheck($conn, $payload);
break;
case 'subscribe':
$this->subscribe($conn, (array)$payload->load);
break;
case 'unsubscribe':
// make sure characterIds got from client are valid
// -> intersect with subscribed characterIds for current $conn
$characterIds = array_intersect((array)$payload->load, $this->getCharacterIdsByConnection($conn));
if(!empty($characterIds)){
$this->unSubscribeCharacterIds($characterIds, $conn);
}
break;
default:
$this->log(['debug', 'error'], $conn, __FUNCTION__, sprintf(static::LOG_TEXT_TASK_UNKNOWN, $payload->task));
break;
}
}
/**
* checks healthCheck $token and respond with validation status + subscription stats
* @param ConnectionInterface $conn
* @param Payload $payload
*/
private function broadcastHealthCheck(ConnectionInterface $conn, Payload $payload) : void {
$isValid = $this->validateHealthCheckToken((int)$payload->load);
$load = [
'isValid' => $isValid,
];
// Make sure WebSocket client request is valid
if($isValid){
// set new healthCheckToken for next check
$load['token'] = $this->setHealthCheckToken(microtime(true));
// add subscription stats if $token is valid
$load['subStats'] = $this->getSubscriptionStats();
}
$payload->setLoad($load);
$connections = new \SplObjectStorage();
$connections->attach($conn);
$this->broadcast($connections, $payload);
}
/**
* compare token (timestamp from initial TCP healthCheck message) with token send from WebSocket
* @param int $token
* @return bool
*/
private function validateHealthCheckToken(int $token) : bool {
$isValid = false;
if($token && $this->healthCheckToken && $token === (int)$this->healthCheckToken){
$isValid = true;
}
// reset token
$this->healthCheckToken = null;
return $isValid;
}
/**
* subscribes a connection to valid accessible maps
* @param ConnectionInterface $conn
* @param $subscribeData
*/
private function subscribe(ConnectionInterface $conn, array $subscribeData) : void {
$characterId = (int)$subscribeData['id'];
$characterToken = (string)$subscribeData['token'];
if($characterId && $characterToken){
// check if character access token is valid (exists and not expired in $this->characterAccessData)
if($characterData = $this->checkCharacterAccess($characterId, $characterToken)){
$this->characters[$characterId][$conn->resourceId] = $conn;
// insert/update characterData cache
// -> even if characterId does not have access to a map "yet"
// -> no maps found but character can get map access at any time later
$this->setCharacterData($characterData);
// valid character -> check map access
$changedSubscriptionsMapIds = [];
foreach((array)$subscribeData['mapData'] as $data){
$mapId = (int)$data['id'];
$mapToken = (string)$data['token'];
$mapName = (string)$data['name'];
if($mapId && $mapToken){
// check if token is valid (exists and not expired) in $this->mapAccessData
if($this->checkMapAccess($characterId, $mapId, $mapToken)){
// valid map subscribe request
$this->subscriptions[$mapId]['characterIds'][$characterId] = $characterId;
$this->subscriptions[$mapId]['data']['name'] = $mapName;
$changedSubscriptionsMapIds[] = $mapId;
}
}
}
sort($changedSubscriptionsMapIds, SORT_NUMERIC);
$this->log(['debug', 'info'], $conn, __FUNCTION__,
sprintf(static::LOG_TEXT_SUBSCRIBE, $characterId, implode(',', $changedSubscriptionsMapIds))
);
// broadcast all active subscriptions to subscribed connections -------------------------------------------
$this->broadcastMapSubscriptions($changedSubscriptionsMapIds);
}else{
$this->log(['debug', 'info'], $conn, __FUNCTION__, sprintf(static::LOG_TEXT_SUBSCRIBE_DENY, $characterId));
}
}else{
$this->log(['debug', 'error'], $conn, __FUNCTION__, static::LOG_TEXT_SUBSCRIBE_INVALID);
}
}
/**
* subscribes an active connection from maps
* @param ConnectionInterface $conn
*/
private function unSubscribeConnection(ConnectionInterface $conn){
$characterIds = $this->getCharacterIdsByConnection($conn);
$this->unSubscribeCharacterIds($characterIds, $conn);
}
/**
* unSubscribe a $characterId from ALL maps
* -> if $conn is set -> just unSub the $characterId from this $conn
* @param int $characterId
* @param ConnectionInterface|null $conn
* @return bool
*/
private function unSubscribeCharacterId(int $characterId, ?ConnectionInterface $conn = null) : bool {
if($characterId){
// unSub from $this->characters ---------------------------------------------------------------------------
if($conn){
// just unSub a specific connection (e.g. single browser window)
unset($this->characters[$characterId][$conn->resourceId]);
if( !count($this->characters[$characterId]) ){
// no connection left for this character
unset($this->characters[$characterId]);
}
// TODO unset $this->>$characterData if $characterId does not have any other map subscribed to
}else{
// unSub ALL connections from a character (e.g. multiple browsers)
unset($this->characters[$characterId]);
// unset characterData cache
$this->deleteCharacterData($characterId);
}
// unSub from $this->subscriptions ------------------------------------------------------------------------
$changedSubscriptionsMapIds = [];
foreach($this->subscriptions as $mapId => $subData){
if(array_key_exists($characterId, (array)$subData['characterIds'])){
unset($this->subscriptions[$mapId]['characterIds'][$characterId]);
if( !count($this->subscriptions[$mapId]['characterIds']) ){
// no characters left on this map
unset($this->subscriptions[$mapId]);
}
$changedSubscriptionsMapIds[] = $mapId;
}
}
sort($changedSubscriptionsMapIds, SORT_NUMERIC);
$this->log(['debug', 'info'], $conn, __FUNCTION__,
sprintf(static::LOG_TEXT_UNSUBSCRIBE, $characterId, implode(',', $changedSubscriptionsMapIds))
);
// broadcast all active subscriptions to subscribed connections -------------------------------------------
$this->broadcastMapSubscriptions($changedSubscriptionsMapIds);
}
return true;
}
/**
* unSubscribe $characterIds from ALL maps
* -> if $conn is set -> just unSub the $characterId from this $conn
* @param int[] $characterIds
* @param ConnectionInterface|null $conn
* @return bool
*/
private function unSubscribeCharacterIds(array $characterIds, ?ConnectionInterface $conn = null) : bool {
$response = false;
foreach($characterIds as $characterId){
$response = $this->unSubscribeCharacterId($characterId, $conn);
}
return $response;
}
/**
* delete mapId from subscriptions and broadcast "delete msg" to clients
* @param string $task
* @param int $mapId
* @return int
*/
private function deleteMapId(string $task, int $mapId) : int {
$connectionCount = $this->broadcastMapData($task, $mapId, $mapId);
// remove map from subscriptions
if(isset($this->subscriptions[$mapId])){
unset($this->subscriptions[$mapId]);
}
$this->log(['debug', 'info'], null, __FUNCTION__,
sprintf(static::LOG_TEXT_MAP_DELETE, $mapId, $connectionCount)
);
return $connectionCount;
}
/**
* get all mapIds a characterId has subscribed to
* @param int $characterId
* @return int[]
*/
private function getMapIdsByCharacterId(int $characterId) : array {
$mapIds = [];
foreach($this->subscriptions as $mapId => $subData) {
if(array_key_exists($characterId, (array)$subData['characterIds'])){
$mapIds[] = $mapId;
}
}
return $mapIds;
}
/**
* @param ConnectionInterface $conn
* @return int[]
*/
private function getCharacterIdsByConnection(ConnectionInterface $conn) : array {
$characterIds = [];
$resourceId = $conn->resourceId;
foreach($this->characters as $characterId => $resourceIDs){
if(
array_key_exists($resourceId, $resourceIDs) &&
!in_array($characterId, $characterIds)
){
$characterIds[] = $characterId;
}
}
return $characterIds;
}
/**
* @param $mapId
* @return array
*/
private function getCharacterIdsByMapId(int $mapId) : array {
$characterIds = [];
if(
array_key_exists($mapId, $this->subscriptions) &&
is_array($this->subscriptions[$mapId]['characterIds'])
){
$characterIds = array_keys($this->subscriptions[$mapId]['characterIds']);
}
return $characterIds;
}
/**
* get connections by $characterIds
* @param int[] $characterIds
* @return \SplObjectStorage
*/
private function getConnectionsByCharacterIds(array $characterIds) : \SplObjectStorage {
$connections = new \SplObjectStorage;
foreach($characterIds as $characterId){
$connections->addAll($this->getConnectionsByCharacterId($characterId));
}
return $connections;
}
/**
* get connections by $characterId
* @param int $characterId
* @return \SplObjectStorage
*/
private function getConnectionsByCharacterId(int $characterId) : \SplObjectStorage {
$connections = new \SplObjectStorage;
if(isset($this->characters[$characterId])){
foreach(array_keys($this->characters[$characterId]) as $resourceId){
if(
$this->hasConnectionId($resourceId) &&
!$connections->contains($conn = $this->getConnection($resourceId))
){
$connections->attach($conn);
}
}
}
return $connections;
}
/**
* check character access against $this->characterAccessData whitelist
* @param $characterId
* @param $characterToken
* @return array
*/
private function checkCharacterAccess(int $characterId, string $characterToken) : array {
$characterData = [];
if( !empty($characterAccessData = (array)$this->characterAccessData[$characterId]) ){
// check expire for $this->characterAccessData -> check ALL characters and remove expired
foreach($characterAccessData as $i => $data){
$deleteToken = false;
if( ((int)$data['expire'] - time()) > 0 ){
// still valid -> check token
if($characterToken === $data['token']){
$characterData = $data['characterData'];
$deleteToken = true;
// NO break; here -> check other characterAccessData as well
}
}else{
// token expired
$deleteToken = true;
}
if($deleteToken){
unset($this->characterAccessData[$characterId][$i]);
// -> check if tokens for this charId is empty
if( empty($this->characterAccessData[$characterId]) ){
unset($this->characterAccessData[$characterId]);
}
}
}
}
return $characterData;
}
/**
* check map access against $this->mapAccessData whitelist
* @param $characterId
* @param $mapId
* @param $mapToken
* @return bool
*/
private function checkMapAccess(int $characterId, int $mapId, string $mapToken) : bool {
$access = false;
if( !empty($mapAccessData = (array)$this->mapAccessData[$mapId][$characterId]) ){
foreach($mapAccessData as $i => $data){
$deleteToken = false;
// check expire for $this->mapAccessData -> check ALL characters and remove expired
if( ((int)$data['expire'] - time()) > 0 ){
// still valid -> check token
if($mapToken === $data['token']){
$access = true;
$deleteToken = true;
}
}else{
// token expired
$deleteToken = true;
}
if($deleteToken){
unset($this->mapAccessData[$mapId][$characterId][$i]);
// -> check if tokens for this charId is empty
if( empty($this->mapAccessData[$mapId][$characterId]) ){
unset($this->mapAccessData[$mapId][$characterId]);
// -> check if map has no access tokens left for characters
if( empty($this->mapAccessData[$mapId]) ){
unset($this->mapAccessData[$mapId]);
}
}
}
}
}
return $access;
}
/**
* broadcast $payload to $connections
* @param \SplObjectStorage $connections
* @param Payload $payload
*/
private function broadcast(\SplObjectStorage $connections, Payload $payload) : void {
$data = json_encode($payload);
foreach($connections as $conn){
$this->send($conn, $data);
}
}
// custom calls ===================================================================================================
/**
* receive data from TCP socket (main App)
* -> send response back
* @param string $task
* @param null|int|array $load
* @return bool|float|int|null
*/
public function receiveData(string $task, $load = null){
$responseLoad = null;
switch($task){
case 'healthCheck':
$responseLoad = $this->setHealthCheckToken((float)$load);
break;
case 'characterUpdate':
$this->updateCharacterData((array)$load);
$mapIds = $this->getMapIdsByCharacterId((int)$load['id']);
$this->broadcastMapSubscriptions($mapIds);
break;
case 'characterLogout':
$responseLoad = $this->unSubscribeCharacterIds((array)$load);
break;
case 'mapConnectionAccess':
$responseLoad = $this->setConnectionAccess($load);
break;
case 'mapAccess':
$responseLoad = $this->setAccess($task, $load);
break;
case 'mapUpdate':
$responseLoad = $this->broadcastMapUpdate($task, (array)$load);
break;
case 'mapDeleted':
$responseLoad = $this->deleteMapId($task, (int)$load);
break;
case 'logData':
$this->handleLogData((array)$load['meta'], (array)$load['log']);
break;
}
return $responseLoad;
}
/**
* @param float $token
* @return float
*/
private function setHealthCheckToken(float $token) : float {
$this->healthCheckToken = $token;
return $this->healthCheckToken;
}
/**
* @param array $characterData
*/
private function setCharacterData(array $characterData) : void {
if($characterId = (int)$characterData['id']){
$this->characterData[$characterId] = $characterData;
}
}
/**
* @param int $characterId
* @return array
*/
private function getCharacterData(int $characterId) : array {
return empty($this->characterData[$characterId]) ? [] : $this->characterData[$characterId];
}
/**
* @param array $characterIds
* @return array
*/
private function getCharactersData(array $characterIds) : array {
return array_filter($this->characterData, function($characterId) use($characterIds) {
return in_array($characterId, $characterIds);
}, ARRAY_FILTER_USE_KEY);
}
/**
* @param array $characterData
*/
private function updateCharacterData(array $characterData) : void {
$characterId = (int)$characterData['id'];
if($this->getCharacterData($characterId)){
$this->setCharacterData($characterData);
}
}
/**
* @param int $characterId
*/
private function deleteCharacterData(int $characterId) : void {
unset($this->characterData[$characterId]);
}
/**
* @param array $mapIds
*/
private function broadcastMapSubscriptions(array $mapIds) : void {
$mapIds = array_unique($mapIds);
foreach($mapIds as $mapId){
if(
!empty($characterIds = $this->getCharacterIdsByMapId($mapId)) &&
!empty($charactersData = $this->getCharactersData($characterIds))
){
$systems = SubscriptionFormatter::groupCharactersDataBySystem($charactersData);
$mapUserData = (object)[];
$mapUserData->config = (object)['id' => $mapId];
$mapUserData->data = (object)['systems' => $systems];
$connectionCount = $this->broadcastMapData('mapSubscriptions', $mapId, $mapUserData);
$this->log(['debug'], null, __FUNCTION__,
sprintf(static::LOG_TEXT_MAP_SUBSCRIPTIONS, $mapId, $connectionCount)
);
}
}
}
/**
* @param string $task
* @param array $mapData
* @return int
*/
private function broadcastMapUpdate(string $task, array $mapData) : int {
$mapId = (int)$mapData['config']['id'];
$connectionCount = $this->broadcastMapData($task, $mapId, $mapData);
$this->log(['debug'], null, __FUNCTION__,
sprintf(static::LOG_TEXT_MAP_UPDATE, $mapId, $connectionCount)
);
return $connectionCount;
}
/**
* send map data to ALL connected clients
* @param string $task
* @param int $mapId
* @param mixed $load
* @return int
*/
private function broadcastMapData(string $task, int $mapId, $load) : int {
$characterIds = $this->getCharacterIdsByMapId($mapId);
$connections = $this->getConnectionsByCharacterIds($characterIds);
$this->broadcast($connections, $this->newPayload($task, $load, $characterIds));
return count($connections);
}
/**
* set/update map access for allowed characterIds
* @param string $task
* @param array $accessData
* @return int count of connected characters
*/
private function setAccess(string $task, $accessData) : int {
$newMapCharacterIds = [];
if($mapId = (int)$accessData['id']){
$mapName = (string)$accessData['name'];
$characterIds = (array)$accessData['characterIds'];
// check all charactersIds that have map access... --------------------------------------------------------
foreach($characterIds as $characterId){
// ... for at least ONE active connection ...
// ... and characterData cache exists for characterId
if(
!empty($this->characters[$characterId]) &&
!empty($this->getCharacterData($characterId))
){
$newMapCharacterIds[$characterId] = $characterId;
}
}
$currentMapCharacterIds = (array)$this->subscriptions[$mapId]['characterIds'];
// broadcast "map delete" to no longer valid characters ---------------------------------------------------
$removedMapCharacterIds = array_keys(array_diff_key($currentMapCharacterIds, $newMapCharacterIds));
$removedMapCharacterConnections = $this->getConnectionsByCharacterIds($removedMapCharacterIds);
$this->broadcast($removedMapCharacterConnections, $this->newPayload($task, $mapId, $removedMapCharacterIds));
// update map subscriptions -------------------------------------------------------------------------------
if( !empty($newMapCharacterIds) ){
// set new characters that have map access (overwrites existing subscriptions for that map)
$this->subscriptions[$mapId]['characterIds'] = $newMapCharacterIds;
$this->subscriptions[$mapId]['data']['name'] = $mapName;
// check if subscriptions have changed
if( !$this->arraysEqualKeys($currentMapCharacterIds, $newMapCharacterIds) ){
$this->broadcastMapSubscriptions([$mapId]);
}
}else{
// no characters (left) on this map
unset($this->subscriptions[$mapId]);
}
}
return count($newMapCharacterIds);
}
/**
* set map access data (whitelist) tokens for map access
* @param $connectionAccessData
* @return bool
*/
private function setConnectionAccess($connectionAccessData){
$response = false;
$characterId = (int)$connectionAccessData['id'];
$characterData = $connectionAccessData['characterData'];
$characterToken = $connectionAccessData['token'];
if(
$characterId &&
$characterData &&
$characterToken
){
// expire time for character and map tokens
$expireTime = time() + $this->mapAccessExpireSeconds;
// tokens for character access
$this->characterAccessData[$characterId][] = [
'token' => $characterToken,
'expire' => $expireTime,
'characterData' => $characterData
];
foreach((array)$connectionAccessData['mapData'] as $mapData){
$mapId = (int)$mapData['id'];
$this->mapAccessData[$mapId][$characterId][] = [
'token' => $mapData['token'],
'expire' => $expireTime
];
}
$response = 'OK';
}
return $response;
}
/**
* get stats data
* -> lists all channels, subscribed characters + connection info
* @return array
*/
protected function getSubscriptionStats() : array {
$uniqueConnections = [];
$uniqueSubscriptions = [];
$channelsStats = [];
foreach($this->subscriptions as $mapId => $subData){
$characterIds = $this->getCharacterIdsByMapId($mapId);
$uniqueMapConnections = [];
$channelStats = [
'channelId' => $mapId,
'channelName' => $subData['data']['name'],
'countSub' => count($characterIds),
'countCon' => 0,
'subscriptions' => []
];
foreach($characterIds as $characterId){
$characterData = $this->getCharacterData($characterId);
$connections = $this->getConnectionsByCharacterId($characterId);
$characterStats = [
'characterId' => $characterId,
'characterName' => isset($characterData['name']) ? $characterData['name'] : null,
'countCon' => $connections->count(),
'connections' => []
];
foreach($connections as $connection){
if(!in_array($connection->resourceId, $uniqueMapConnections)){
$uniqueMapConnections[] = $connection->resourceId;
}
$metaData = $this->getConnectionData($connection);
$microTime = (float)$metaData['mTimeSend'];
$logTime = Store::getDateTimeFromMicrotime($microTime);
$characterStats['connections'][] = [
'resourceId' => $connection->resourceId,
'remoteAddress' => $connection->remoteAddress,
'mTimeSend' => $microTime,
'mTimeSendFormat1' => $logTime->format('Y-m-d H:i:s.u'),
'mTimeSendFormat2' => $logTime->format('H:i:s')
];
}
$channelStats['subscriptions'][] = $characterStats;
}
$uniqueConnections = array_unique(array_merge($uniqueConnections, $uniqueMapConnections));
$uniqueSubscriptions = array_unique(array_merge($uniqueSubscriptions, $characterIds));
$channelStats['countCon'] = count($uniqueMapConnections);
$channelsStats[] = $channelStats;
}
return [
'countSub' => count($uniqueSubscriptions),
'countCon' => count($uniqueConnections),
'channels' => $channelsStats
];
}
/**
* compare two assoc arrays by keys. Key order is ignored
* -> if all keys from array1 exist in array2 && all keys from array2 exist in array 1, arrays are supposed to be equal
* @param array $array1
* @param array $array2
* @return bool
*/
protected function arraysEqualKeys(array $array1, array $array2) : bool {
return !array_diff_key($array1, $array2) && !array_diff_key($array2, $array1);
}
/**
* dispatch log writing to a LogFileHandler
* @param array $meta
* @param array $log
*/
private function handleLogData(array $meta, array $log){
$logHandler = new LogFileHandler((string)$meta['stream']);
$logHandler->write($log);
}
}

View file

@ -0,0 +1,94 @@
<?php
namespace Exodus4D\Socket\Data;
/**
* Class Payload
* @package Exodus4D\Socket\Data
* @property string $task
* @property mixed $load
*/
class Payload implements \JsonSerializable {
/**
* error message for missing 'task' name
*/
const ERROR_TASK_MISSING = "'task' must be a not empty string";
/**
* task name
* @var string
*/
private $task = '';
/**
* payload data
* @var mixed
*/
private $load;
/**
* optional characterId array -> recipients
* -> e.g if multiple browser tabs are open
* @var null|array
*/
private $characterIds;
/**
* Payload constructor.
* @param string $task
* @param null $load
* @param array|null $characterIds
*/
public function __construct(string $task, $load = null, ?array $characterIds = null){
$this->setTask($task);
$this->setLoad($load);
$this->setCharacterIds($characterIds);
}
/**
* @param string $task
*/
public function setTask(string $task){
if($task){
$this->task = $task;
}else{
throw new \InvalidArgumentException(self::ERROR_TASK_MISSING);
}
}
/**
* @param null $load
*/
public function setLoad($load = null){
$this->load = $load;
}
/**
* @param array|null $characterIds
*/
public function setCharacterIds(?array $characterIds){
if(is_array($characterIds)){
$this->characterIds = $characterIds;
}else{
$this->characterIds = null;
}
}
/**
* @param $name
* @return mixed
*/
public function __get($name){
return $this->$name;
}
/**
* @return array|mixed
*/
public function jsonSerialize(){
return get_object_vars($this);
}
}

View file

@ -0,0 +1,90 @@
<?php
namespace Exodus4D\Socket\Log;
class ShellColors {
/**
* all foreground color codes
* @var array
*/
private $foregroundColors = [];
/**
* all background color codes
* @var array
*/
private $backgroundColors = [];
public function __construct() {
// set up "Shell" colors
$this->foregroundColors['black'] = '0;30';
$this->foregroundColors['dark_gray'] = '1;30';
$this->foregroundColors['blue'] = '0;34';
$this->foregroundColors['light_blue'] = '1;34';
$this->foregroundColors['green'] = '0;32';
$this->foregroundColors['light_green'] = '1;32';
$this->foregroundColors['cyan'] = '0;36';
$this->foregroundColors['light_cyan'] = '1;36';
$this->foregroundColors['red'] = '0;31';
$this->foregroundColors['light_red'] = '1;31';
$this->foregroundColors['purple'] = '0;35';
$this->foregroundColors['light_purple'] = '1;35';
$this->foregroundColors['brown'] = '0;33';
$this->foregroundColors['yellow'] = '1;33';
$this->foregroundColors['light_gray'] = '0;37';
$this->foregroundColors['white'] = '1;37';
$this->backgroundColors['black'] = '40';
$this->backgroundColors['red'] = '41';
$this->backgroundColors['green'] = '42';
$this->backgroundColors['yellow'] = '43';
$this->backgroundColors['blue'] = '44';
$this->backgroundColors['magenta'] = '45';
$this->backgroundColors['cyan'] = '46';
$this->backgroundColors['light_gray'] = '47';
}
/**
* get colored string
* @param string $string
* @param string|null $foregroundColor
* @param string|null $backgroundColor
* @return string
*/
public function getColoredString(string $string, ?string $foregroundColor = null, ?string $backgroundColor = null) : string {
$coloredString = "";
// Check if given foreground color found
if (isset($this->foregroundColors[$foregroundColor])) {
$coloredString .= "\033[" . $this->foregroundColors[$foregroundColor] . "m";
}
// Check if given background color found
if (isset($this->backgroundColors[$backgroundColor])) {
$coloredString .= "\033[" . $this->backgroundColors[$backgroundColor] . "m";
}
// Add string and end coloring
$coloredString .= $string . "\033[0m";
return $coloredString;
}
/**
* returns all foreground color names
* @return array
*/
public function getForegroundColors() : array {
return array_keys($this->foregroundColors);
}
/**
* returns all background color names
* @return array
*/
public function getBackgroundColors() : array {
return array_keys($this->backgroundColors);
}
}

220
websocket/app/Log/Store.php Normal file
View file

@ -0,0 +1,220 @@
<?php
namespace Exodus4D\Socket\Log;
class Store {
/**
* default for: unique store name
*/
const DEFAULT_NAME = 'store';
/**
* default for: echo log data in terminal
*/
const DEFAULT_LOG_TO_STDOUT = true;
/**
* default for: max cached log entries
*/
const DEFAULT_LOG_STORE_SIZE = 50;
/**
* @see Store::DEFAULT_NAME
* @var string
*/
private $name = self::DEFAULT_NAME;
/**
* log store for log entries
* -> store size should be limited for memory reasons
* @var array
*/
private $store = [];
/**
* all valid types for custom log events
* if value is false, logs for this type are ignored
* @var array
*/
protected $logTypes = [
'error' => true,
'info' => true,
'debug' => true
];
/**
* if Store is locked, current state can not be changed
* @var bool
*/
protected $locked = false;
/**
* @var ShellColors
*/
static $colors;
/**
* Store constructor.
* @param string $name
*/
public function __construct(string $name){
$this->name = $name;
}
/**
* get all stored log entries
* @return array
*/
public function getStore() : array {
return $this->store;
}
/**
* @param bool $locked
*/
public function setLocked(bool $locked){
$this->locked = $locked;
}
/**
* @return bool
*/
public function isLocked() : bool {
return $this->locked;
}
/**
* @param int $logLevel
*/
public function setLogLevel(int $logLevel){
switch($logLevel){
case 3:
$this->logTypes['error'] = true;
$this->logTypes['info'] = true;
$this->logTypes['debug'] = true;
break;
case 2:
$this->logTypes['error'] = true;
$this->logTypes['info'] = true;
$this->logTypes['debug'] = false;
break;
case 1:
$this->logTypes['error'] = true;
$this->logTypes['info'] = false;
$this->logTypes['debug'] = false;
break;
case 0:
default:
$this->setLocked(true); // no logging
}
}
/**
* this is used for custom log events like 'error', 'debug',...
* works as dispatcher method that calls individual log*() methods
* @param $logTypes
* @param string|null $remoteAddress
* @param int|null $resourceId
* @param string $action
* @param string $message
*/
public function log($logTypes, ?string $remoteAddress, ?int $resourceId, string $action, string $message = '') : void {
if(!$this->isLocked()){
// filter out logTypes that should not be logged
$logTypes = array_filter((array)$logTypes, function(string $type) : bool {
return array_key_exists($type, $this->logTypes) && $this->logTypes[$type];
});
if($logTypes){
// get log entry data
$logData = $this->getLogData($logTypes, $remoteAddress, $resourceId, $action, $message);
if(self::DEFAULT_LOG_TO_STDOUT){
$this->echoLog($logData);
}
// add entry to local store and check size limit for store
$this->store[] = $logData;
$this->store = array_slice($this->store, self::DEFAULT_LOG_STORE_SIZE * -1);
}
}
}
/**
* get log data as array for a custom log entry
* @param array $logTypes
* @param string|null $remoteAddress
* @param int|null $resourceId
* @param string $action
* @param string $message
* @return array
*/
private function getLogData(array $logTypes, ?string $remoteAddress, ?int $resourceId, string $action, string $message = '') : array {
$file = null;
$lineNum = null;
$function = null;
$traceIndex = 4;
$backtrace = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, $traceIndex);
if(count($backtrace) == $traceIndex){
$caller = $backtrace[$traceIndex - 2];
$callerOrig = $backtrace[$traceIndex - 1];
$file = substr($caller['file'], strlen(dirname(dirname(dirname($caller['file'])))) + 1);
$lineNum = $caller['line'];
$function = $callerOrig['function'];
}
$microTime = microtime(true);
$logTime = self::getDateTimeFromMicrotime($microTime);
return [
'store' => $this->name,
'mTime' => $microTime,
'mTimeFormat1' => $logTime->format('Y-m-d H:i:s.u'),
'mTimeFormat2' => $logTime->format('H:i:s'),
'logTypes' => $logTypes,
'remoteAddress' => $remoteAddress,
'resourceId' => $resourceId,
'fileName' => $file,
'lineNumber' => $lineNum,
'function' => $function,
'action' => $action,
'message' => $message
];
}
/**
* echo log data to stdout -> terminal
* @param array $logData
*/
private function echoLog(array $logData) : void {
if(!self::$colors){
self::$colors = new ShellColors();
}
$data = [
self::$colors->getColoredString($logData['mTimeFormat1'], 'dark_gray'),
self::$colors->getColoredString($logData['store'], $logData['store'] == 'webSock' ? 'brown' : 'cyan'),
$logData['remoteAddress'] . ($logData['resourceId'] ? ' #' . $logData['resourceId'] : ''),
self::$colors->getColoredString($logData['fileName'] . ' line ' . $logData['lineNumber'], 'dark_gray'),
self::$colors->getColoredString($logData['function'] . '()' . (($logData['function'] !== $logData['action']) ? ' [' . $logData['action'] . ']' : ''), 'dark_gray'),
implode(',', (array)$logData['logTypes']),
self::$colors->getColoredString($logData['message'], 'light_purple')
];
echo implode(' | ', array_filter($data)) . PHP_EOL;
}
/**
* @see https://stackoverflow.com/a/29598719/4329969
* @param float $mTime
* @return \DateTime
*/
public static function getDateTimeFromMicrotime(float $mTime) : \DateTime {
return \DateTime::createFromFormat('U.u', number_format($mTime, 6, '.', ''));
}
}

View file

@ -0,0 +1,67 @@
<?php
namespace Exodus4D\Socket\Socket;
use Exodus4D\Socket\Log\Store;
use React\EventLoop;
use React\Socket;
use Ratchet\MessageComponentInterface;
abstract class AbstractSocket {
/**
* unique name for this component
* -> should be overwritten in child instances
* -> is used as "log store" name
*/
const COMPONENT_NAME = 'default';
/**
* global server loop
* @var EventLoop\LoopInterface
*/
protected $loop;
/**
* @var MessageComponentInterface
*/
protected $handler;
/**
* @var Store
*/
protected $logStore;
/**
* AbstractSocket constructor.
* @param EventLoop\LoopInterface $loop
* @param MessageComponentInterface $handler
* @param Store $store
*/
public function __construct(
EventLoop\LoopInterface $loop,
MessageComponentInterface $handler,
Store $store
){
$this->loop = $loop;
$this->handler = $handler;
$this->logStore = $store;
$this->log(['debug', 'info'], null, 'START', 'start Socket server…');
}
/**
* @param $logTypes
* @param Socket\ConnectionInterface|null $connection
* @param string $action
* @param string $message
*/
public function log($logTypes, ?Socket\ConnectionInterface $connection, string $action, string $message = '') : void {
if(!$this->logStore->isLocked()){
$remoteAddress = $connection ? $connection->getRemoteAddress() : null;
$this->logStore->log($logTypes, $remoteAddress, null, $action, $message);
}
}
}

View file

@ -0,0 +1,551 @@
<?php
/**
* Created by PhpStorm.
* User: Exodus 4D
* Date: 15.02.2019
* Time: 14:29
*/
namespace Exodus4D\Socket\Socket;
use Exodus4D\Socket\Log\Store;
use React\EventLoop;
use React\Socket;
use React\Promise;
use React\Stream;
use Clue\React\NDJson;
use Ratchet\MessageComponentInterface;
class TcpSocket extends AbstractSocket{
/**
* unique name for this component
* -> should be overwritten in child instances
* -> is used as "log store" name
*/
const COMPONENT_NAME = 'tcpSock';
/**
* error message for unknown acceptType
* @see TcpSocket::DEFAULT_ACCEPT_TYPE
*/
const ERROR_ACCEPT_TYPE = "Unknown acceptType: '%s'";
/**
* error message for connected stream is not readable
*/
const ERROR_STREAM_NOT_READABLE = "Stream is not readable. Remote address: '%s'";
/**
* error message for connection stream is not writable
*/
const ERROR_STREAM_NOT_WRITABLE = "Stream is not writable. Remote address: '%s'";
/**
* error message for missing 'task' key in payload
*/
const ERROR_TASK_MISSING = "Missing 'task' in payload";
/**
* error message for unknown 'task' key in payload
*/
const ERROR_TASK_UNKNOWN = "Unknown 'task': '%s' in payload";
/**
* error message for missing method
*/
const ERROR_METHOD_MISSING = "Method '%S' not found";
/**
* error message for waitTimeout exceeds
* @see TcpSocket::DEFAULT_WAIT_TIMEOUT
*/
const ERROR_WAIT_TIMEOUT = "Exceeds 'waitTimeout': %ss";
/**
* default for: accepted data type
* -> affects en/decoding socket data
*/
const DEFAULT_ACCEPT_TYPE = 'json';
/**
* default for: wait timeout
* -> timeout until connection gets closed
* timeout should be "reset" right after successful response send to client
*/
const DEFAULT_WAIT_TIMEOUT = 3.0;
/**
* default for: send response by end() method (rather than write())
* -> connection will get closed right after successful response send to client
*/
const DEFAULT_END_WITH_RESPONSE = true;
/**
* default for: add socket statistic to response payload
*/
const DEFAULT_ADD_STATS = false;
/**
* max length for JSON data string
* -> throw OverflowException on exceed
*/
const JSON_DECODE_MAX_LENGTH = 65536 * 4;
/**
* @see TcpSocket::DEFAULT_ACCEPT_TYPE
* @var string
*/
private $acceptType = self::DEFAULT_ACCEPT_TYPE;
/**
* @see TcpSocket::DEFAULT_WAIT_TIMEOUT
* @var float
*/
private $waitTimeout = self::DEFAULT_WAIT_TIMEOUT;
/**
* @see TcpSocket::DEFAULT_END_WITH_RESPONSE
* @var bool
*/
private $endWithResponse = self::DEFAULT_END_WITH_RESPONSE;
/**
* @see TcpSocket::DEFAULT_STATS
* @var bool
*/
private $addStats = self::DEFAULT_ADD_STATS;
/**
* storage for all active connections
* -> can be used to get current count of connected clients
* @var \SplObjectStorage
*/
private $connections;
/**
* max count of concurrent open connections
* -> represents number of active connected clients
* @var int
*/
private $maxConnections = 0;
/**
* timestamp on startup
* @var int
*/
private $startupTime = 0;
/**
* TcpSocket constructor.
* @param EventLoop\LoopInterface $loop
* @param MessageComponentInterface $handler
* @param Store $store
* @param string $acceptType
* @param float $waitTimeout
* @param bool $endWithResponse
*/
public function __construct(
EventLoop\LoopInterface $loop,
MessageComponentInterface $handler,
Store $store,
string $acceptType = self::DEFAULT_ACCEPT_TYPE,
float $waitTimeout = self::DEFAULT_WAIT_TIMEOUT,
bool $endWithResponse = self::DEFAULT_END_WITH_RESPONSE
){
parent::__construct($loop, $handler, $store);
$this->acceptType = $acceptType;
$this->waitTimeout = $waitTimeout;
$this->endWithResponse = $endWithResponse;
$this->connections = new \SplObjectStorage();
$this->startupTime = time();
}
/**
* @param Socket\ConnectionInterface $connection
*/
public function onConnect(Socket\ConnectionInterface $connection){
$this->log('debug', $connection, __FUNCTION__, 'open connection…');
if($this->isValidConnection($connection)){
// connection can be used
// add connection to global connection pool
$this->addConnection($connection);
// set waitTimeout timer for connection
$this->setTimerTimeout($connection, $this->waitTimeout);
// register connection events ... -------------------------------------------------------------------------
$this->initRead($connection)
->then($this->initDispatch($connection))
->then($this->initResponse($connection))
->then(
function(array $payload) use ($connection) {
$this->log(['debug', 'info'], $connection,'DONE', 'task "' . $payload['task'] . '" done → response send');
},
function(\Exception $e) use ($connection) {
$this->log(['debug', 'error'], $connection, 'ERROR', $e->getMessage());
$this->connectionError($connection, $e);
});
$connection->on('end', function() use ($connection) {
$this->log('debug', $connection, 'onEnd');
});
$connection->on('close', function() use ($connection) {
$this->log(['debug'], $connection, 'onClose', 'close connection');
$this->removeConnection($connection);
});
$connection->on('error', function(\Exception $e) use ($connection) {
$this->log(['debug', 'error'], $connection, 'onError', $e->getMessage());
});
}else{
// invalid connection -> can not be used
$connection->close();
}
}
/**
* @param Socket\ConnectionInterface $connection
* @return Promise\PromiseInterface
*/
protected function initRead(Socket\ConnectionInterface $connection) : Promise\PromiseInterface {
if($connection->isReadable()){
if('json' == $this->acceptType){
// new empty stream for processing JSON
$stream = new Stream\ThroughStream();
$streamDecoded = new NDJson\Decoder($stream, true, 512, 0, self::JSON_DECODE_MAX_LENGTH);
// promise get resolved on first emit('data')
$promise = Promise\Stream\first($streamDecoded);
// register on('data') for main input stream
$connection->on('data', function ($chunk) use ($stream) {
// send current data chunk to processing stream -> resolves promise
$stream->emit('data', [$chunk]);
});
return $promise;
}else{
return new Promise\RejectedPromise(
new \InvalidArgumentException(
sprintf(self::ERROR_ACCEPT_TYPE, $this->acceptType)
)
);
}
}else{
return new Promise\RejectedPromise(
new \Exception(
sprintf(self::ERROR_STREAM_NOT_READABLE, $connection->getRemoteAddress())
)
);
}
}
/**
* init dispatcher for payload
* @param Socket\ConnectionInterface $connection
* @return callable
*/
protected function initDispatch(Socket\ConnectionInterface $connection) : callable {
return function(array $payload) use ($connection) : Promise\PromiseInterface {
$task = (string)$payload['task'];
if(!empty($task)){
$load = $payload['load'];
$deferred = new Promise\Deferred();
$this->dispatch($connection, $deferred, $task, $load);
return $deferred->promise();
}else{
return new Promise\RejectedPromise(
new \InvalidArgumentException(self::ERROR_TASK_MISSING)
);
}
};
}
/**
* @param Socket\ConnectionInterface $connection
* @param Promise\Deferred $deferred
* @param string $task
* @param null $load
*/
protected function dispatch(Socket\ConnectionInterface $connection, Promise\Deferred $deferred, string $task, $load = null) : void {
$addStatusData = false;
switch($task){
case 'getStats':
$addStatusData = true;
$deferred->resolve($this->newPayload($task, null, $addStatusData));
break;
case 'healthCheck':
$addStatusData = true;
case 'characterUpdate':
case 'characterLogout':
case 'mapConnectionAccess':
case 'mapAccess':
case 'mapUpdate':
case 'mapDeleted':
case 'logData':
if(method_exists($this->handler, 'receiveData')){
$this->log(['info'], $connection, __FUNCTION__, 'task "' . $task . '" processing…');
$deferred->resolve(
$this->newPayload(
$task,
call_user_func_array([$this->handler, 'receiveData'], [$task, $load]),
$addStatusData
)
);
}else{
$deferred->reject(new \Exception(sprintf(self::ERROR_METHOD_MISSING, 'receiveData')));
}
break;
default:
$deferred->reject(new \InvalidArgumentException(sprintf(self::ERROR_TASK_UNKNOWN, $task)));
}
}
/**
* @param Socket\ConnectionInterface $connection
* @return callable
*/
protected function initResponse(Socket\ConnectionInterface $connection) : callable {
return function(array $payload) use ($connection) : Promise\PromiseInterface {
$this->log('debug', $connection, 'initResponse', 'task "' . $payload['task'] . '" → init response');
$deferred = new Promise\Deferred();
$this->write($deferred, $connection, $payload);
return $deferred->promise();
};
}
/**
* @param Promise\Deferred $deferred
* @param Socket\ConnectionInterface $connection
* @param array $payload
*/
protected function write(Promise\Deferred $deferred, Socket\ConnectionInterface $connection, array $payload) : void {
$write = false;
if($connection->isWritable()){
if('json' == $this->acceptType){
$connection = new NDJson\Encoder($connection);
}
// write a new chunk of data to connection stream
$write = $connection->write($payload);
if($this->endWithResponse){
// connection should be closed (and removed from this socket server)
$connection->end();
}
}
if($write){
$deferred->resolve($payload);
}else{
$deferred->reject(new \Exception(
sprintf(self::ERROR_STREAM_NOT_WRITABLE, $connection->getRemoteAddress())
));
}
}
/**
* $connection has error
* -> if writable -> end() connection with $payload (close() is called by default)
* -> if readable -> close() connection
* @param Socket\ConnectionInterface $connection
* @param \Exception $e
*/
protected function connectionError(Socket\ConnectionInterface $connection, \Exception $e){
$errorMessage = $e->getMessage();
$this->log(['debug', 'error'], $connection, __FUNCTION__, $errorMessage);
if($connection->isWritable()){
if('json' == $this->acceptType){
$connection = new NDJson\Encoder($connection);
}
// send "end" data, then close
$connection->end($this->newPayload('error', $errorMessage, true));
}else{
// close connection
$connection->close();
}
}
/**
* check if $connection is found in global pool
* @param Socket\ConnectionInterface $connection
* @return bool
*/
protected function hasConnection(Socket\ConnectionInterface $connection) : bool {
return $this->connections->contains($connection);
}
/**
* cancels a previously set timer callback for a $connection
* @param Socket\ConnectionInterface $connection
* @param string $timerName
*/
protected function cancelTimer(Socket\ConnectionInterface $connection, string $timerName){
if(
$this->hasConnection($connection) &&
($data = (array)$this->connections->offsetGet($connection)) &&
isset($data['timers']) && isset($data['timers'][$timerName]) &&
($data['timers'][$timerName] instanceof EventLoop\TimerInterface)
){
$this->loop->cancelTimer($data['timers'][$timerName]);
unset($data['timers'][$timerName]);
$this->connections->offsetSet($connection, $data);
}
}
/**
* cancels all previously set timers for a $connection
* @param Socket\ConnectionInterface $connection
*/
protected function cancelTimers(Socket\ConnectionInterface $connection){
if(
$this->hasConnection($connection) &&
($data = (array)$this->connections->offsetGet($connection)) &&
isset($data['timers'])
){
foreach((array)$data['timers'] as $timerName => $timer){
$this->loop->cancelTimer($timer);
}
$data['timers'] = [];
$this->connections->offsetSet($connection, $data);
}
}
/**
* @param Socket\ConnectionInterface $connection
* @param string $timerName
* @param float $interval
* @param callable $timerCallback
*/
protected function setTimer(Socket\ConnectionInterface $connection, string $timerName, float $interval, callable $timerCallback){
if(
$this->hasConnection($connection) &&
($data = (array)$this->connections->offsetGet($connection)) &&
isset($data['timers'])
){
$data['timers'][$timerName] = $this->loop->addTimer($interval, function() use ($connection, $timerCallback) {
$timerCallback($connection);
});
// store new timer to $connection
$this->connections->offsetSet($connection, $data);
}
}
/**
* cancels and removes previous connection timeout timers
* -> set new connection timeout
* @param Socket\ConnectionInterface $connection
* @param float $waitTimeout
*/
protected function setTimerTimeout(Socket\ConnectionInterface $connection, float $waitTimeout = self::DEFAULT_WAIT_TIMEOUT){
$this->cancelTimer($connection, 'disconnectTimer');
$this->setTimer($connection, 'disconnectTimer', $waitTimeout, function(Socket\ConnectionInterface $connection) use ($waitTimeout) {
$errorMessage = sprintf(self::ERROR_WAIT_TIMEOUT, $waitTimeout);
$this->connectionError(
$connection,
new Promise\Timer\TimeoutException($waitTimeout, $errorMessage)
);
});
}
/**
* add new connection to global pool
* @param Socket\ConnectionInterface $connection
*/
protected function addConnection(Socket\ConnectionInterface $connection){
if(!$this->hasConnection($connection)){
$this->connections->attach($connection, [
'remoteAddress' => $connection->getRemoteAddress(),
'timers' => []
]);
// update maxConnections count
$this->maxConnections = max($this->connections->count(), $this->maxConnections);
$this->log(['debug'], $connection, __FUNCTION__, 'add new connection');
}else{
$this->log(['debug'], $connection, __FUNCTION__, 'connection already exists');
}
}
/**
* remove $connection from global connection pool
* @param Socket\ConnectionInterface $connection
*/
protected function removeConnection(Socket\ConnectionInterface $connection){
if($this->hasConnection($connection)){
$this->log(['debug'], $connection, __FUNCTION__, 'remove connection');
$this->cancelTimers($connection);
$this->connections->detach($connection);
}
}
/**
* get new payload
* @param string $task
* @param null $load
* @param bool $addStats
* @return array
*/
protected function newPayload(string $task, $load = null, bool $addStats = false) : array {
$payload = [
'task' => $task,
'load' => $load
];
if($addStats || $this->addStats){
// add socket statistics
$payload['stats'] = $this->getStats();
}
return $payload;
}
/**
* check if connection is "valid" and can be used for data transfer
* @param Socket\ConnectionInterface $connection
* @return bool
*/
protected function isValidConnection(Socket\ConnectionInterface $connection) : bool {
return $connection->isReadable() || $connection->isWritable();
}
/**
* get socket server statistics
* -> e.g. connected clients count
* @return array
*/
protected function getStats() : array {
return [
'tcpSocket' => $this->getSocketStats(),
'webSocket' => $this->handler->getSocketStats()
];
}
/**
* get TcpSocket stats data
* @return array
*/
protected function getSocketStats() : array {
return [
'startup' => time() - $this->startupTime,
'connections' => $this->connections->count(),
'maxConnections' => $this->maxConnections,
'logs' => array_reverse($this->logStore->getStore())
];
}
}

View file

@ -0,0 +1,113 @@
<?php
/**
* Created by PhpStorm.
* User: Exodus
* Date: 01.11.2016
* Time: 18:21
*/
namespace Exodus4D\Socket;
use Exodus4D\Socket\Log\Store;
use Exodus4D\Socket\Socket\TcpSocket;
use React\EventLoop;
use React\Socket;
use Ratchet\Server\IoServer;
use Ratchet\Http\HttpServer;
use Ratchet\WebSocket\WsServer;
class WebSockets {
/**
* @var string
*/
protected $dsn;
/**
* @var int
*/
protected $wsListenPort;
/**
* @var string
*/
protected $wsListenHost;
/**
* @var int
*/
protected $debug;
/**
* WebSockets constructor.
* @param string $dsn
* @param int $wsListenPort
* @param string $wsListenHost
* @param int $debug
*/
function __construct(string $dsn, int $wsListenPort, string $wsListenHost, int $debug = 1){
$this->dsn = $dsn;
$this->wsListenPort = $wsListenPort;
$this->wsListenHost = $wsListenHost;
$this->debug = $debug;
$this->startMapSocket();
}
private function startMapSocket(){
// global EventLoop
$loop = EventLoop\Factory::create();
// new Stores for logging -------------------------------------------------------------------------------------
$webSocketLogStore = new Store(Component\MapUpdate::COMPONENT_NAME);
$webSocketLogStore->setLogLevel($this->debug);
$tcpSocketLogStore = new Store(TcpSocket::COMPONENT_NAME);
$tcpSocketLogStore->setLogLevel($this->debug);
// global MessageComponent (main app) (handles all business logic) --------------------------------------------
$mapUpdate = new Component\MapUpdate($webSocketLogStore);
$loop->addPeriodicTimer(3, function(EventLoop\TimerInterface $timer) use ($mapUpdate) {
$mapUpdate->housekeeping($timer);
});
// TCP Socket -------------------------------------------------------------------------------------------------
$tcpSocket = new TcpSocket($loop, $mapUpdate, $tcpSocketLogStore);
// TCP Server (WebServer <-> TCPServer <-> TCPSocket communication)
$server = new Socket\Server($this->dsn, $loop, [
'tcp' => [
'backlog' => 20,
'so_reuseport' => true
]
]);
$server->on('connection', function(Socket\ConnectionInterface $connection) use ($tcpSocket) {
$tcpSocket->onConnect($connection);
});
$server->on('error', function(\Exception $e) use ($tcpSocket) {
$tcpSocket->log(['debug', 'error'], null, 'onError', $e->getMessage());
});
// WebSocketServer --------------------------------------------------------------------------------------------
// Binding to 0.0.0.0 means remotes can connect (Web Clients)
$webSocketURI = $this->wsListenHost . ':' . $this->wsListenPort;
// Set up our WebSocket server for clients subscriptions
$webSock = new Socket\TcpServer($webSocketURI, $loop);
new IoServer(
new HttpServer(
new WsServer(
$mapUpdate
)
),
$webSock
);
$loop->run();
}
}