* Portions created by the Initial Developer are Copyright (C) 2008-2025 * the Initial Developer. All Rights Reserved. * * Contributor(s): * Mark J Crane * Tim Fry */ /** * Description of subscriber * * @author Tim Fry */ class subscriber { /** * The ID of the object given by PHP * * @var spl_object_id */ private $id; /** * * @var resource */ private $socket; /** * Stores the original socket ID used when the subscriber object was created. * The resource is cast to an integer and then saved in order to match the * a resource to the original socket. This is primarily used in the equals * method to test for equality. * * @var int */ private $socket_id; /** * Remote IP of the socket resource connection * * @var string */ private $remote_ip; /** * Remote port of the socket resource connection * * @var int */ private $remote_port; /** * Services the subscriber has subscribed to * * @var array */ private $services; /** * Permissions array of the subscriber * * @var array */ private $permissions; /** * Domain name the subscriber belongs to * * @var string|null */ private $domain_name; /** * Domain UUID the subscriber belongs to * * @var string|null */ private $domain_uuid; /** * Token hash used to validate this subscriber * * @var string|null */ private $token_hash; /** * Token name used to validate this subscriber * * @var string|null */ private $token_name; /** * Epoch time the token was issued * * @var int */ private $token_time; /** * Time limit in seconds * * @var int */ private $token_limit; /** * Whether the subscriber has a time limit set for their token or not * * @var bool True when there is a time limit. False if no time limit set. */ private $enable_token_time_limit; /** * Whether the subscriber is able to broadcast messages as a service * * @var bool */ private $service; /** * The name of the service class object to handle callbacks * * @var string|null */ private $service_class; /** * If the subscriber is a service the service name used * * @var string|null */ private $service_name; /** * The filter used to send web socket messages * * @var filter */ private $filter; /** * Function or method name to call when sending information through the socket * * @var callable */ private $callback; /** * Subscriptions to services * * @var array */ private $subscriptions; /** * Whether or not this subscriber has been authenticated * * @var bool */ private $authenticated; /** * User information * * @var array */ private $user; /** * Creates a subscriber object. * * @param resource|stream $socket Connected socket * @param callable $frame_wrapper The callback used to wrap communication in a web socket frame. Sending * NULL to the frame wrapper should send a disconnect. * * @throws \socket_exception Thrown when the passed socket is already closed * @throws \InvalidArgumentException Thrown when the $callback is not a valid callback */ public function __construct($socket, callable $frame_wrapper) { if (!is_resource($socket)) { throw new \socket_exception('Socket must be a valid resource'); } // check for valid callback so we can send websocket data when required if (!is_callable($frame_wrapper)) { throw new \InvalidArgumentException('Websocket callable method must be a valid callable function or method'); } // set object identifiers $this->id = md5(spl_object_hash($this)); // PHP unique object hash is similar to 000000000000000f0000000000000000 so we use md5 $this->socket = $socket; $this->socket_id = (int)$socket; $this->domain_name = ''; $this->domain_uuid = ''; // always use the same formula from the static functions [$this->remote_ip, $this->remote_port] = self::get_remote_information_from_socket($socket); // set defaults $this->authenticated = false; $this->permissions = []; $this->services = []; $this->enable_token_time_limit = false; $this->subscriptions = []; $this->service = false; $this->service_name = ''; $this->user = []; // Save the websocket frame wrapper used to communicate to this subscriber $this->callback = $frame_wrapper; // No filter initially $this->filter = null; } /** * Returns the user array information in this subscriber * * @return array */ public function get_user_array(): array { return $this->user; } /** * Retrieves a user setting by its key. * * @param string $key The name of the user setting to retrieve * @param mixed $default_value The default value to return if the setting is not found (optional) * * @return mixed The value of the user setting, or the default value if it does not exist */ public function get_user_setting($key, $default_value = null) { return $this->user[$key] ?? $default_value; } /** * Checks if this subscriber is subscribed to the given services * * @param array $services Optional list of service names, e.g. [active.calls, inactive.calls] * * @return mixed This object or an array of subscribed service names */ public function subscribed_to($services = []) { if (func_num_args() > 0) { $this->services = array_flip($services); return $this; } return array_keys($this->services); } /** * Gets or sets the service class name for this subscriber * * @param ?string $service_class * * @return $this|string */ public function service_class($service_class = null) { if (func_num_args() > 0) { $this->service_class = $service_class; return $this; } return $this->service_class; } /** * Sets the filter used for this subscriber * * @param filter $filter * * @return $this */ public function set_filter(filter $filter) { $this->filter = $filter; return $this; } /** * Returns the filter used for this subscriber * * @return filter */ public function get_filter() { return $this->filter; } /** * When there is no more references to the object we ensure that we disconnect from the subscriber */ public function __destruct() { // disconnect the socket $this->disconnect(); } /** * Disconnects the socket resource used for this subscriber * * @return bool true on success and false on failure */ public function disconnect(): bool { //return success if close was successful if (is_resource($this->socket)) { //self::$logger->info("Subscriber $this->id has been disconnected"); // Send null to the frame wrapper to send a disconnect frame call_user_func($this->callback, $this->socket_id, null); return (@fclose($this->socket) !== false); } return false; } /** * Checks if this subscriber is equal to the given object, resource or id. * * @param mixed $object_or_resource_or_id The object, resource or id to compare with * * @return bool True if the subscribers are equal, false otherwise */ public function equals($object_or_resource_or_id): bool { // Compare by resource if (is_resource($object_or_resource_or_id)) { return $object_or_resource_or_id === $this->socket; } // Compare by spl_object_id or spl_object_hash if (gettype($object_or_resource_or_id) === 'integer' || gettype($object_or_resource_or_id) === 'string') { return $object_or_resource_or_id === $this->id; } // Ensure it is the same type of object if (!($object_or_resource_or_id instanceof subscriber)) { // Not a subscriber object return false; } // Compare by object using the spl_object_id to match return $object_or_resource_or_id->id() === $this->id; } /** * Compares this object to another object or resource id. * * @param $object_or_resource The object or resource to compare * * @return bool True if this object is not equal to the other object or resource. False otherwise. * @see subscriber::equals() */ public function not_equals($object_or_resource): bool { return !$this->equals($object_or_resource); } /** * Allow accessing copies of the private values to ensure the object values are immutable. * * @param string $name The name of the attribute to be accessed * * @throws \InvalidArgumentException If the attribute does not exist or direct access is prohibited */ public function __get(string $name) { switch ($name) { case 'id': case 'socket_id': case 'remote_ip': case 'remote_port': case 'token_name': case 'token_hash': case 'token_time': case 'domain_name': case 'permissions': case 'services': return $this->{$name}; default: throw new \InvalidArgumentException("Property '$name' does not exist or direct access is prohibited. Try using '$name()' for access."); } } /** * Returns the current ID of this subscriber. * The ID is set in the constructor using the spl_object_id given by PHP * * @return string */ public function id(): string { return "$this->id"; } /** * Checks if this subscriber has the permission given in $permission * * @param string $permission The permission to check * * @return bool True when this subscriber has the permission and false otherwise */ public function has_permission(string $permission): bool { // Do not allow empty names if (empty($this->permissions) || strlen($permission) === 0) { return false; } return isset($this->permissions[$permission]); } /** * Returns the array of permissions this subscriber has been assigned. * * @return array */ public function get_permissions(): array { return $this->permissions; } /** * Returns the domain name used. *

Note:
* This value is not validated in the object and must be validated.

* * @return string */ public function get_domain_name(): string { return $this->domain_name; } /** * Returns the associated socket * * @return resource */ public function socket() { return $this->socket; } /** * Returns the socket ID that was cast to an integer when the object was created. * * @return int The socket ID cast as an integer. */ public function socket_id(): int { return $this->socket_id; } /** * Validates the given token against the loaded token in the this subscriber * * @param array $token Must be an associative array with name and hash as the keys. * * @return bool */ public function is_valid_token(array $token): bool { if (!is_array($token)) { throw new \InvalidArgumentException('Token must be an array'); } // get the name and hash from array $token_name = $token['name'] ?? ''; $token_hash = $token['hash'] ?? ''; // empty values are not allowed if (empty($token_name) || empty($token_hash)) { return false; } // validate the name and hash $valid = ($token_name === $this->token_name && $token_hash === $this->token_hash); // Get the current epoch time $server_time = time(); // check time validation required if ($this->enable_token_time_limit) { // compare against time limit in minutes $valid = $valid && ($server_time - $this->token_time < $this->token_limit * 60); } //self::$logger->debug("------------------ Token Compare ------------------"); //self::$logger->debug("Subscriber token time: $this->token_time"); //self::$logger->debug(" Server time: $server_time"); //self::$logger->debug("Subscriber token name: $this->token_name"); //self::$logger->debug(" Server token name: $token_name"); //self::$logger->debug("Subscriber token hash: $this->token_hash"); //self::$logger->debug(" Server token hash: $token_hash"); //self::$logger->debug("Returning: " . ($valid ? 'true' : 'false')); //self::$logger->debug("---------------------------------------------------"); return $valid; } /** * Validates the given token array against the token previously saved in the file system. When the token is valid * the token will be saved in this object and the file removed. This method should not be called a second time * once a token has be authenticated. * * @param array $request_token * * @return bool */ public function authenticate_token(array $request_token): bool { // Check connection if (!$this->is_connected()) { throw new \socket_disconnected_exception($this->id); } // Check for required fields if (empty($request_token)) { //$date = date('Y/m/d H:i:s', time()); //self::$logger->warn("Empty token given for $this->id"); return false; } // Set local storage $token_file = self::get_token_file($request_token['name'] ?? ''); // Set default return value of false $valid = false; //self::$logger->debug("Using file: $token_file"); // Ensure the file is there if (file_exists($token_file)) { //self::$logger->debug("Using $token_file for token"); // Get the token using PHP engine parsing (fastest method) $array = include($token_file); // Assign to local variables to reflect local storage $token_name = $array['token']['name'] ?? ''; $token_hash = $array['token']['hash'] ?? ''; $token_time = intval($array['token']['time'] ?? 0); $token_limit = intval($array['token']['limit'] ?? 0); // Compare the token given in the request with the one that was in local storage $valid = $token_name === $request_token['name'] && $token_hash === $request_token['hash']; // If the token is supposed to have a time limit then check the token time if ($token_limit > 0) { // check time has expired or not and put it in valid $valid = $valid && (time() - $token_time < $token_limit * 60); // token_time_limit * 60 seconds = 15 minutes } // When token is valid if ($valid) { // Store the valid token information in this object $this->token_name = $token_name; $this->token_hash = $token_hash; $this->token_time = $token_time; $this->enable_token_time_limit = $token_limit > 0; $this->token_limit = $token_limit * 60; // convert to seconds for time() comparison // Add the domain $this->domain_name = $array['domain']['name'] ?? ''; $this->domain_uuid = $array['domain']['uuid'] ?? ''; // Store the permissions $this->permissions = $array['user']['permissions'] ?? []; // Remove the permissions from the user array because this class handles them seperately unset($array['user']['permissions']); // Add the user information when available $this->user = $array['user'] ?? []; // Add subscriptions for services $services = $array['services'] ?? []; foreach ($services as $service) { $this->subscribe($service); } // Check for service if (isset($array['service'])) { // // Set the service information in the object // $this->service_name = "" . ($array['service_name'] ?? ''); $this->service_class = "" . ($array['service_class'] ?? ''); // // Ensure we can call the method we need by checking for the interface. // Using the interface instead of calling method_exists means we only have to check once // for the interface instead of checking for each individual method required for it to be // considered a service. We can also adjust the interface with new methods and this code // remains the same. It is also possbile for us to use the 'instanceof' operator to check // that the object is what we require. However, using the instanceof operator requires an // object first. Here we only check that the class has implemented the interface allowing // us to call static methods without first creating an object. // $this->service = is_a($this->service_class, 'websocket_service_interface', true); } //self::$logger->debug("Permission count(".count($this->permissions) . ")"); } // Remove the token from local storage @unlink($token_file); } // store the result $this->authenticated = $valid; // return success or failed return $valid; } /** * Returns whether or not this subscriber has been authenticated. * * @return bool */ public function is_authenticated(): bool { return $this->authenticated; } /** * Allows overriding the token authentication * * @param bool $authenticated * * @return self */ public function set_authenticated(bool $authenticated): self { $this->authenticated = $authenticated; return $this; } /** * Sets the domain UUID and name * * @param string $uuid * @param string $name * * @return self * @throws invalid_uuid_exception * @depends is_uuid() * @see is_uuid() */ public function set_domain(string $uuid, string $name): self { if (is_uuid($uuid)) { $this->uuid = $uuid; } else { throw new invalid_uuid_exception("UUID is not valid"); } $this->domain_name = $name; return $this; } /** * Returns whether or not this subscriber is a service. * * @return bool True if this subscriber is a service and false if this subscriber is not a service. */ public function is_service(): bool { return $this->service; } /** * Alias of service_name without the parameters * * @return string */ public function get_service_name(): string { return $this->service_name; } /** * Get or set the service_name * * @param string|null $service_name * * @return string|$this */ public function service_name($service_name = null) { /* : string|self */ if (func_num_args() > 0) { $this->service_name = $service_name; return $this; } return $this->service_name; } /** * Returns whether or not the service name matches this subscriber * * @param string $service_name Name of the service * * @return bool True if this subscriber matches the provided service name. False if this subscriber does not * match or this subscriber is not a service. */ public function service_equals(string $service_name): bool { return ($this->service && $this->service_name === $service_name); } /** * Returns true if the socket/stream is still open (not at EOF). * * @return bool Returns true if connected and false if the connection has closed */ public function is_connected(): bool { return is_resource($this->socket) && !feof($this->socket); } /** * Returns true if the subscriber is no longer connected * * @return bool Returns true if the subscriber is no longer connected */ public function is_not_connected(): bool { return !$this->is_connected(); } /** * Checks if this subscriber is subscribed to the given service name * * @param string $service_name The service name ie. active.calls * * @return bool * @see subscriber::subscribe */ public function has_subscribed_to(string $service_name): bool { return isset($this->services[$service_name]); } /** * Subscribe to a service by ensuring this subscriber has the appropriate permissions * * @param string $service_name * * @return self */ public function subscribe(string $service_name): self { $this->services[$service_name] = true; return $this; } /** * Sends a response to the subscriber using the provided callback web socket wrapper in the constructor * * @param string $json Valid JSON response to send to the connected client * * @throws subscriber_token_expired_exception Thrown when the time limit set in the token has expired */ public function send(string $json) { //ensure the token is still valid if (!$this->token_time_exceeded()) { call_user_func($this->callback, $this->socket, $json); } else { throw new subscriber_token_expired_exception($this->id); } } /** * Sends the given message through the websocket * * @param websocket_message $message * * @throws socket_disconnected_exception */ public function send_message(websocket_message $message) { // Filter the message if ($this->filter !== null) { $message->apply_filter($this->filter); } if (empty($message->service_name())) { return; } // Check that we are subscribed to the event if (!$this->has_subscribed_to($message->service_name())) { //self::$logger->warn("Subscriber not subscribed to " . $message->service_name()); throw new subscriber_not_subscribed_exception($this->id); } // Ensure we are still connected if (!$this->is_connected()) { throw new \socket_disconnected_exception($this->id); } $this->send((string)$message); return; } /** * The remote information is retrieved using the stream_socket_get_name function. * * @param resource $socket * * @return array Returns a zero-based indexed array of first the IP address and then the port of the remote * machine. * @see stream_socket_get_name(); * @link https://php.net/stream_socket_get_name PHP documentation for underlying function used to return * information. */ public static function get_remote_information_from_socket($socket): array { return explode(':', stream_socket_get_name($socket, true), 2); } /** * The remote information is retrieved using the stream_socket_get_name function. * * @param resource $socket * * @return string Returns the IP address of the remote machine or an empty string. * @see stream_socket_get_name(); * @link https://php.net/stream_socket_get_name PHP documentation for underlying function used to return * information. */ public static function get_remote_ip_from_socket($socket): string { $array = explode(':', stream_socket_get_name($socket, true), 2); return $array[0] ?? ''; } /** * The remote information is retrieved using the stream_socket_get_name function. * * @param resource $socket * * @return string Returns the port of the remote machine as a string or an empty string. * @see stream_socket_get_name(); * @link https://php.net/stream_socket_get_name PHP documentation for underlying function used to return * information. */ public static function get_remote_port_from_socket($socket): string { $array = explode(':', stream_socket_get_name($socket, true), 2); return $array[1] ?? ''; } /** * Returns the name and path for the token. * Priority is given to the /dev/shm folder if it exists as this is much faster. If that is not available, then the * sys_get_temp_dir() function is called to get a storage location. * * @param string $token_name * * @return string * @see sys_get_temp_dir() * @link https://php.net/sys_get_temp_dir PHP Documentation for the function used to get the temporary storage * location. */ public static function get_token_file($token_name): string { // Try to store in RAM first if (is_dir('/dev/shm') && is_writable('/dev/shm')) { $token_file = '/dev/shm/' . $token_name . '.php'; } else { // Use the filesystem $token_file = sys_get_temp_dir() . DIRECTORY_SEPARATOR . $token_name . '.php'; } return $token_file; } /** * Saves the token array to local file system * * The web socket server runs in a separate process so it is unable to use * sessions. Therefor, the token must be stored in a temp folder to be * verified by the web socket server. It is possible to use a database * but the database connection process is very slow compared to the file * system. If the database resides on a remote system instead of local, * the web socket service may not yet have access to the token before the * web socket client requests authorization. * * @param array $token Standard token issued from the token object * @param array $services A simple array list of service names to subscribe to * @param int $time_limit_in_minutes Set a token time limit. Setting to zero will disable the time limit * * @see token::create() */ public static function save_token(array $token, array $services, int $time_limit_in_minutes = 0) { // // Store the currently logged in user when available // $array['user'] = $_SESSION['user'] ?? []; // // Store the token service and events // $array['services'] = $services; // // Store the name and hash of the token // $array['token']['name'] = $token['name']; $array['token']['hash'] = $token['hash']; // // Store the epoch time and time limit // $array['token']['time'] = "" . time(); $array['token']['limit'] = $time_limit_in_minutes; // // Store the domain name in this session // $array['domain']['name'] = $_SESSION['domain_name'] ?? ''; $array['domain']['uuid'] = $_SESSION['domain_uuid'] ?? ''; // // Get the full path and file name for storing the token // $token_file = self::get_token_file($token['name']); $file_contents = "enable_token_time_limit) { return false; } //test the time on the token to ensure it is valid return (time() - $this->token_time) > $this->token_limit; } }