* Portions created by the Initial Developer are Copyright (C) 2008-2025 * the Initial Developer. All Rights Reserved. * * Contributor(s): * Mark J Crane * Tim Fry */ /** * Description of subscriber * @author Tim Fry */ class subscriber { public $show_all; /** * The ID of the object given by PHP * @var spl_object_id */ private $id; private $socket; /** * Stores the original socket ID used when the subscriber object was created. * The resource is cast to an integer and then saved in order to match the * a resource to the original socket. This is primarily used in the equals * method to test for equality. * @var int */ private $socket_id; private $remote_ip; private $remote_port; private $services; private $permissions; private $domain_name; private $domain_uuid; private $token_hash; private $token_name; private $token_time; private $token_limit; private $enable_token_time_limit; private $service; private $service_class; private $service_name; private $filter; /** * Function or method name to call when sending information through the socket * @var callable */ private $callback; private $send_all; private $subscriptions; private $authenticated; /** * Creates a subscriber object. * @param resource|stream $socket Connected socket * @param callable $frame_wrapper The callback used to wrap communication in a web socket frame. Sending NULL to the frame wrapper should send a disconnect. * @throws \socket_exception Thrown when the passed socket is already closed * @throws \InvalidArgumentException Thrown when the $callback is not a valid callback */ public function __construct($socket, callable $frame_wrapper) { if (!is_resource($socket)) { throw new \socket_exception('Socket must be a valid resource'); } // check for valid callback so we can send websocket data when required if (!is_callable($frame_wrapper)) { throw new \InvalidArgumentException('Websocket callable method must be a valid callable function or method'); } // set object identifiers $this->id = md5(spl_object_hash($this)); // PHP unique object hash is similar to 000000000000000f0000000000000000 so we use md5 $this->socket = $socket; $this->socket_id = (int) $socket; $this->domain_name = ''; $this->domain_uuid = ''; // always use the same formula from the static functions [$this->remote_ip, $this->remote_port] = self::get_remote_information_from_socket($socket); // set defaults $this->authenticated = false; $this->permissions = []; $this->services = []; $this->show_all = false; $this->enable_token_time_limit = false; $this->subscriptions = []; $this->service = false; $this->service_name = ''; // Save the websocket frame wrapper used to communicate to this subscriber $this->callback = $frame_wrapper; // No filter initially $this->filter = null; } /** * Gets or sets the subscribed to services * @param array $services * @return $this|array */ public function subscribed_to($services = []) { if (func_num_args() > 0) { $this->services = array_flip($services); return $this; } return array_keys($this->services); } public function service_class($service_class = null) { if (func_num_args() > 0) { $this->service_class = $service_class; return $this; } return $this->service_class; } public function set_filter(filter $filter) { $this->filter = $filter; return $this; } public function get_filter() { return $this->filter; } /** * When there is no more references to the object we ensure that we disconnect from the subscriber */ public function __destruct() { // disconnect the socket $this->disconnect(); } /** * Disconnects the socket resource used for this subscriber * @return bool true on success and false on failure */ public function disconnect(): bool { //return success if close was successful if (is_resource($this->socket)) { //self::$logger->info("Subscriber $this->id has been disconnected"); // Send null to the frame wrapper to send a disconnect frame call_user_func($this->callback, $this->socket_id, null); return (@fclose($this->socket) !== false); } return false; } /** * Compares the current object with another object to see if they are exactly the same object * @param subscriber|resource $object_or_resource_or_id * @return bool */ public function equals($object_or_resource_or_id): bool { // Compare by resource if (is_resource($object_or_resource_or_id)) { return $object_or_resource_or_id === $this->socket; } // Compare by spl_object_id or spl_object_hash if (gettype($object_or_resource_or_id) === 'integer' || gettype($object_or_resource_or_id) === 'string') { return $object_or_resource_or_id === $this->id; } // Ensure it is the same type of object if (!($object_or_resource_or_id instanceof subscriber)) { // Not a subscriber object return false; } // Compare by object using the spl_object_id to match return $object_or_resource_or_id->id() === $this->id; } public function not_equals($object_or_resource): bool { return !$this->equals($object_or_resource); } /** * Allow accessing copies of the private values * @param string $name * @return mixed * @throws \InvalidArgumentException */ public function __get(string $name) { switch ($name) { case 'id': case 'socket_id': case 'remote_ip': case 'remote_port': case 'token_name': case 'token_hash': case 'token_time': case 'domain_name': case 'permissions': case 'services': return $this->{$name}; default: throw new \InvalidArgumentException("Property '$name' does not exist or direct access is prohibited. Try using '$name()' for access."); } } /** * Returns the current ID of this subscriber. * The ID is set in the constructor using the spl_object_id given by PHP * @return string */ public function id(): string { return "$this->id"; } /** * Checks if this subscriber has the permission given in $permission * @param string $permission * @return bool True when this subscriber has the permission and false otherwise */ public function has_permission(string $permission): bool { // Do not allow empty names if (empty($this->permissions) || strlen($permission) === 0) { return false; } return isset($this->permissions[$permission]); } public function get_permissions(): array { return $this->permissions; } public function get_domain_name(): string { return $this->domain_name; } /** * Returns the current socket resource used to communicate with this subscriber * @return resource|stream Resource Id or stream used */ public function socket() { return $this->socket; } /** * Returns the socket ID that was cast to an integer when the object was * created */ public function socket_id(): int { return $this->socket_id; } /** * Validates the given token against the loaded token in the this subscriber * @param array $token Must be an associative array with name and hash as the keys. * @return bool */ public function is_valid_token(array $token): bool { if (!is_array($token)) { throw new \InvalidArgumentException('Token must be an array'); } // get the name and hash from array $token_name = $token['name'] ?? ''; $token_hash = $token['hash'] ?? ''; // empty values are not allowed if (empty($token_name) || empty($token_hash)) { return false; } // validate the name and hash $valid = ($token_name === $this->token_name && $token_hash === $this->token_hash); // Get the current epoch time $server_time = time(); // check time validation required if ($this->enable_token_time_limit) { // compare against time limit in minutes $valid = $valid && ($server_time - $this->token_time < $this->token_limit * 60); } //self::$logger->debug("------------------ Token Compare ------------------"); //self::$logger->debug("Subscriber token time: $this->token_time"); //self::$logger->debug(" Server time: $server_time"); //self::$logger->debug("Subscriber token name: $this->token_name"); //self::$logger->debug(" Server token name: $token_name"); //self::$logger->debug("Subscriber token hash: $this->token_hash"); //self::$logger->debug(" Server token hash: $token_hash"); //self::$logger->debug("Returning: " . ($valid ? 'true' : 'false')); //self::$logger->debug("---------------------------------------------------"); return $valid; } /** * Validates the given token array against the token previously saved in the file system. When the token is valid * the token will be saved in this object and the file removed. This method should not be called a second time * once a token has be authenticated. * @param array $request_token * @return bool */ public function authenticate_token(array $request_token): bool { // Check connection if (!$this->is_connected()) { throw new \socket_disconnected_exception($this->id); } // Check for required fields if (empty($request_token)) { $date = date('Y/m/d H:i:s', time()); //self::$logger->warn("Empty token given for $this->id"); return false; } // Set local storage $token_file = self::get_token_file($request_token['name'] ?? ''); // Set default return value of false $valid = false; //self::$logger->debug("Using file: $token_file"); // Ensure the file is there if (file_exists($token_file)) { //self::$logger->debug("Using $token_file for token"); // Get the token using PHP engine parsing (fastest method) $array = include($token_file); // Assign to local variables to reflect local storage $token_name = $array['token']['name'] ?? ''; $token_hash = $array['token']['hash'] ?? ''; $token_time = intval($array['token']['time'] ?? 0); $token_limit = intval($array['token']['limit'] ?? 0); // Compare the token given in the request with the one that was in local storage $valid = $token_name === $request_token['name'] && $token_hash === $request_token['hash']; // If the token is supposed to have a time limit then check the token time if ($token_limit > 0) { // check time has expired or not and put it in valid $valid = $valid && (time() - $token_time < $token_limit * 60); // token_time_limit * 60 seconds = 15 minutes } // Debug information if (true) { //self::$logger->debug("------------------ Authenticate Token Compare ------------------"); //self::$logger->debug(" Subscriber token name: ".$request_token['name']); //self::$logger->debug(" Subscriber token hash: ".$request_token['hash']); //self::$logger->debug(" Server token name: $token_name"); //self::$logger->debug(" Server token hash: $token_hash"); //self::$logger->debug(" Server token time: $token_time"); //self::$logger->debug(" Server token limit: $token_limit"); //self::$logger->debug("Valid: " . ($valid ? 'yes' : 'no')); //self::$logger->debug("----------------------------------------------------------------"); } // When token is valid if ($valid) { // Store the valid token information in this object $this->token_name = $token_name; $this->token_hash = $token_hash; $this->token_time = $token_time; $this->enable_token_time_limit = $token_limit > 0; $this->token_limit = $token_limit * 60; // convert to seconds for time() comparison // Add the domain $this->domain_name = $array['domain']['name'] ?? ''; $this->domain_uuid = $array['domain']['uuid'] ?? ''; // Add subscriptions for services $services = $array['services'] ?? []; foreach ($services as $service) { $this->subscribe($service); } // Store the permissions $this->permissions = $array['permissions'] ?? []; // Check for service if (isset($array['service'])) { // // Set the service information in the object // $this->service_name = "" . ($array['service_name'] ?? ''); $this->service_class = "" . ($array['service_class'] ?? ''); // // Ensure we can call the method we need by checking for the interface. // Using the interface instead of calling method_exists means we only have to check once // for the interface instead of checking for each individual method required for it to be // considered a service. We can also adjust the interface with new methods and this code // remains the same. It is also possbile for us to use the 'instanceof' operator to check // that the object is what we require. However, using the instanceof operator requires anc // object first. Here we only check that the class has implemented the interface allowing // us to call static methods without first creating an object. // $this->service = is_a($this->service_class, 'websocket_service_interface', true); } //self::$logger->debug("Permission count(".count($this->permissions) . ")"); } // Remove the token from local storage @unlink($token_file); } // store the result $this->authenticated = $valid; // return success or failed return $valid; } public function is_authenticated(): bool { return $this->authenticated; } public function set_authenticated(bool $authenticated): self { return $this; } public function set_domain(string $uuid, string $name): self { if (is_uuid($uuid)) { $this->uuid = $uuid; } else { throw new invalid_uuid_exception("UUID is not valid"); } $this->domain_name = $name; return $this; } public function is_service(): bool { return $this->service; } /** * Get or set the service_name * @param string|null $service_name * @return string|$this */ public function service_name($service_name = null) { /* : string|self */ if (func_num_args() > 0) { $this->service_name = $service_name; return $this; } return $this->service_name; } public function service_equals(string $service_name): bool { return ($this->service && $this->service_name === $service_name); } /** * Returns true if the socket/stream is still open (not at EOF). * @return bool Returns true if connected and false if the connection has closed */ public function is_connected(): bool { return is_resource($this->socket) && !feof($this->socket); } /** * Returns true if the subscriber is no longer connected * @return bool Returns true if the subscriber is no longer connected */ public function is_not_connected(): bool { return !$this->is_connected(); } /** * Checks if this subscriber is subscribed to the given service name * @param string $service_name The service name ie. active.calls * @return bool * @see subscriber::subscribe */ public function has_subscribed_to(string $service_name): bool { return isset($this->services[$service_name]); } public function subscribe(string $service_name): self { $this->services[$service_name] = true; return $this; } /** * Sends a response to the subscriber using the provided callback web socket wrapper in the constructor * @param string $json Valid JSON response to send to the connected client * @throws subscriber_token_expired_exception Thrown when the time limit set in the token has expired */ public function send(string $json) { //ensure the token is still valid if (!$this->token_time_exceeded()) { call_user_func($this->callback, $this->socket, $json); } else { throw new subscriber_token_expired_exception($this->id); } } /** * Sends the given message through the websocket * @param websocket_message $message * @throws socket_disconnected_exception */ public function send_message(websocket_message $message) { // Filter the message if ($this->filter !== null) { $message->apply_filter($this->filter); } if (empty($message->service_name())) { return; } // Check that we are subscribed to the event if (!$this->has_subscribed_to($message->service_name())) { //self::$logger->warn("Subscriber not subscribed to " . $message->service_name()); throw new subscriber_not_subscribed_exception($this->id); } // Ensure we are still connected if (!$this->is_connected()) { throw new \socket_disconnected_exception($this->id); } $this->send((string) $message); return; } public static function get_remote_information_from_socket($socket): array { return explode(':', stream_socket_get_name($socket, true), 2); } public static function get_remote_ip_from_socket($socket): string { $array = explode(':', stream_socket_get_name($socket, true), 2); return $array[0] ?? ''; } public static function get_remote_port_from_socket($socket): string { $array = explode(':', stream_socket_get_name($socket, true), 2); return $array[1] ?? ''; } public static function get_token_file($token_name): string { // Try to store in RAM first if (is_dir('/dev/shm') && is_writable('/dev/shm')) { $token_file = '/dev/shm/' . $token_name . '.php'; } else { // Use the filesystem $token_file = sys_get_temp_dir() . DIRECTORY_SEPARATOR . $token_name . '.php'; } return $token_file; } /** * Saves the token array to local file system * * The web socket server runs in a separate process so it is unable to use * sessions. Therefor, the token must be stored in a temp folder to be * verified by the web socket server. It is possible to use a database * but the database connection process is very slow compared to the file * system. If the database resides on a remote system instead of local, * the web socket service may not yet have access to the token before the * web socket client requests authorization. * * @param array $token Standard token issued from the token object * @param array $services A simple array list of service names to subscribe to * @param int $time_limit_in_minutes Set a token time limit. Setting to zero will disable the time limit * @see token::create() */ public static function save_token(array $token, array $services, int $time_limit_in_minutes = 0) { // // Put the domain_name, permissions, and token in local storage so we can use all the information // to authenticate an incoming connection from the websocket service. // $array['permissions'] = $_SESSION['permissions']; // // Store the token service and events // $array['services'] = $services; // // Store the name and hash of the token // $array['token']['name'] = $token['name']; $array['token']['hash'] = $token['hash']; // // Store the epoch time and time limit // $array['token']['time'] = "" . time(); $array['token']['limit'] = $time_limit_in_minutes; // // Store the domain name in this session // $array['domain']['name'] = $_SESSION['domain_name']; $array['domain']['uuid'] = $_SESSION['domain_uuid']; // // Get the full path and file name for storing the token // $token_file = self::get_token_file($token['name']); $file_contents = "enable_token_time_limit) return false; //self::$logger->debug("------------- TOKEN TIME LIMIT -------------"); //self::$logger->debug(" Token Limit: $this->token_limit"); //self::$logger->debug(" Token Time: $this->token_time"); //self::$logger->debug(" Current Time: " . time()); //self::$logger->debug("time-token_time: " . (time() - $this->token_time)); //self::$logger->debug(" Time Exceeded: " . ((time() - $this->token_time) > $this->token_limit ? 'Yes' : 'No')); //self::$logger->debug("--------------------------------------------"); //test the time on the token to ensure it is valid return (time() - $this->token_time) > $this->token_limit; } }