mirror of
https://github.com/fusionpbx/fusionpbx.git
synced 2026-02-22 10:56:31 +00:00
Documentation, format class, no modification. (#7628)
This commit is contained in:
@@ -35,48 +35,56 @@ class websocket_service extends service {
|
||||
|
||||
/**
|
||||
* Address to bind to. (Default 8080)
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
protected $ip;
|
||||
|
||||
/**
|
||||
* Port to bind to. (Default 0.0.0.0 - all PHP detected IP addresses of the system)
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
protected $port;
|
||||
|
||||
/**
|
||||
* Resource or stream of the server socket binding
|
||||
*
|
||||
* @var resource|stream
|
||||
*/
|
||||
protected $server_socket;
|
||||
|
||||
/**
|
||||
* List of connected client sockets
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
protected $clients;
|
||||
|
||||
/**
|
||||
* Used to track on_message events
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
protected $message_callbacks;
|
||||
|
||||
/**
|
||||
* Used to track on_connect events
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
protected $connect_callbacks;
|
||||
|
||||
/**
|
||||
* Used to track on_disconnect events
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
protected $disconnect_callbacks;
|
||||
|
||||
/**
|
||||
* Used to track switch listeners or other socket connection types
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
protected $listeners;
|
||||
@@ -84,12 +92,14 @@ class websocket_service extends service {
|
||||
|
||||
/**
|
||||
* Subscriber Objects
|
||||
*
|
||||
* @var subscriber
|
||||
*/
|
||||
protected $subscribers;
|
||||
|
||||
/**
|
||||
* Array of registered services
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
private $services;
|
||||
@@ -100,18 +110,19 @@ class websocket_service extends service {
|
||||
|
||||
/**
|
||||
* Reload settings
|
||||
*
|
||||
* @return void
|
||||
* @throws \RuntimeException
|
||||
* @access protected
|
||||
*/
|
||||
protected function reload_settings(): void {
|
||||
// Initialize tracking arrays
|
||||
$this->listeners = [];
|
||||
$this->clients = [];
|
||||
$this->message_callbacks = [];
|
||||
$this->connect_callbacks = [];
|
||||
$this->listeners = [];
|
||||
$this->clients = [];
|
||||
$this->message_callbacks = [];
|
||||
$this->connect_callbacks = [];
|
||||
$this->disconnect_callbacks = [];
|
||||
$this->subscribers = [];
|
||||
$this->subscribers = [];
|
||||
|
||||
$settings = new settings(['database' => database::new(['config' => config::load()])]);
|
||||
|
||||
@@ -134,6 +145,7 @@ class websocket_service extends service {
|
||||
|
||||
/**
|
||||
* Display the version on the console
|
||||
*
|
||||
* @return void
|
||||
* @access protected
|
||||
*/
|
||||
@@ -143,6 +155,7 @@ class websocket_service extends service {
|
||||
|
||||
/**
|
||||
* Set extra command options from the command line
|
||||
*
|
||||
* @access protected
|
||||
*/
|
||||
protected static function set_command_options() {
|
||||
@@ -170,6 +183,13 @@ class websocket_service extends service {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves a subscriber object from the given socket ID.
|
||||
*
|
||||
* @param mixed $socket The socket ID to search for
|
||||
*
|
||||
* @return subscriber|null The matching subscriber, or null if not found
|
||||
*/
|
||||
private function get_subscriber_from_socket_id($socket): ?subscriber {
|
||||
$subscriber = null;
|
||||
// Get the subscriber based on their socket ID
|
||||
@@ -182,6 +202,16 @@ class websocket_service extends service {
|
||||
return $subscriber;
|
||||
}
|
||||
|
||||
/**
|
||||
* Authenticate a subscriber based on the provided token and message.
|
||||
*
|
||||
* @param subscriber $subscriber The subscriber to authenticate.
|
||||
* @param websocket_message $message The message containing the authentication token.
|
||||
*
|
||||
* @return bool Whether the authentication was successful or not.
|
||||
* @throws \socket_disconnected_exception
|
||||
* @throws \subscriber_token_expired_exception
|
||||
*/
|
||||
private function authenticate_subscriber(subscriber $subscriber, websocket_message $message) {
|
||||
$this->info("Authenticating client: $subscriber->id");
|
||||
|
||||
@@ -204,7 +234,7 @@ class websocket_service extends service {
|
||||
foreach ($subscriptions as $subscribed_to) {
|
||||
if (isset($this->services[$subscribed_to])) {
|
||||
$subscriber_service = $this->services[$subscribed_to];
|
||||
$class_name = $subscriber_service->service_class();
|
||||
$class_name = $subscriber_service->service_class();
|
||||
// Make sure we can call the 'create_filter_chain_for' method
|
||||
if (is_a($class_name, 'websocket_service_interface', true)) {
|
||||
// Call the service class method to validate the subscriber
|
||||
@@ -227,6 +257,14 @@ class websocket_service extends service {
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcast a message to all subscribers except the specified broadcaster.
|
||||
*
|
||||
* @param subscriber $broadcaster The subscriber that is broadcasting the message.
|
||||
* @param websocket_message|null $message The message being broadcasted. If null, do nothing.
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function broadcast_service_message(subscriber $broadcaster, ?websocket_message $message = null) {
|
||||
|
||||
$this->debug("Processing Broadcast");
|
||||
@@ -268,8 +306,7 @@ class websocket_service extends service {
|
||||
$this->handle_disconnect($subscriber->socket_id());
|
||||
}
|
||||
}
|
||||
}
|
||||
// Route a specific request from a service back to a subscriber
|
||||
} // Route a specific request from a service back to a subscriber
|
||||
else {
|
||||
// Get the subscriber object hash
|
||||
$object_id = $message->resource_id;
|
||||
@@ -288,9 +325,11 @@ class websocket_service extends service {
|
||||
|
||||
/**
|
||||
* Filters subscribers based on the service name given
|
||||
* @param array $subscribers
|
||||
*
|
||||
* @param array $subscribers
|
||||
* @param websocket_message $message
|
||||
* @param string $service_name
|
||||
* @param string $service_name
|
||||
*
|
||||
* @return array List of subscriber objects or an empty array if there are no subscribers to that service name
|
||||
*/
|
||||
private function filter_subscribers(array $subscribers, websocket_message $message, string $service_name): array {
|
||||
@@ -299,7 +338,7 @@ class websocket_service extends service {
|
||||
foreach ($subscribers as $subscriber) {
|
||||
$caller_context = strtolower($message->caller_context ?? '');
|
||||
if (!empty($caller_context) && $subscriber->has_subscribed_to($service_name) && ($subscriber->show_all || $caller_context === $subscriber->domain_name || $caller_context === 'public' || $caller_context === 'default'
|
||||
)
|
||||
)
|
||||
) {
|
||||
$filtered[] = $subscriber;
|
||||
} else {
|
||||
@@ -313,13 +352,15 @@ class websocket_service extends service {
|
||||
|
||||
/**
|
||||
* Create a subscriber for each connection
|
||||
*
|
||||
* @param resource $socket
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function handle_connect($socket) {
|
||||
// We catch only the socket disconnection exception as there is a general try/catch already
|
||||
try {
|
||||
$subscriber = new subscriber($socket, [websocket_service::class, 'send']);
|
||||
$subscriber = new subscriber($socket, [websocket_service::class, 'send']);
|
||||
$this->subscribers[$subscriber->id] = $subscriber;
|
||||
$subscriber->send(websocket_message::connected());
|
||||
} catch (\socket_disconnected_exception $sde) {
|
||||
@@ -332,6 +373,7 @@ class websocket_service extends service {
|
||||
|
||||
/**
|
||||
* Web socket client disconnected from the server or this service has requested a disconnect from the subscriber
|
||||
*
|
||||
* @param subscriber|resource|int|string $object_or_resource_or_id
|
||||
*/
|
||||
private function handle_disconnect($object_or_resource_or_id) {
|
||||
@@ -377,8 +419,9 @@ class websocket_service extends service {
|
||||
|
||||
/**
|
||||
* When a message event occurs, send to all the subscribers
|
||||
*
|
||||
* @param resource $socket
|
||||
* @param mixed $data
|
||||
* @param mixed $data
|
||||
*/
|
||||
private function handle_message($socket, $data) {
|
||||
$subscriber = $this->get_subscriber_from_socket_id($socket);
|
||||
@@ -436,6 +479,14 @@ class websocket_service extends service {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a client message by routing it to its intended service.
|
||||
*
|
||||
* @param subscriber $subscriber The client subscriber instance associated with the incoming message.
|
||||
* @param websocket_message $message The incoming WebSocket message containing details about the request.
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
private function handle_client_message(subscriber $subscriber, websocket_message $message) {
|
||||
//find the service with that name
|
||||
foreach ($this->subscribers as $service) {
|
||||
@@ -454,7 +505,7 @@ class websocket_service extends service {
|
||||
$message->resource_id = $subscriber->id;
|
||||
|
||||
//send the modified web socket message to the service
|
||||
$service->send((string) $message);
|
||||
$service->send((string)$message);
|
||||
|
||||
//continue searching for service providers
|
||||
continue;
|
||||
@@ -465,6 +516,7 @@ class websocket_service extends service {
|
||||
/**
|
||||
* Runs the web socket server binding to the ip and port set in default settings
|
||||
* The run method will stop if the SIG_TERM or SIG_HUP signal is processed in the parent
|
||||
*
|
||||
* @return int
|
||||
* @throws \RuntimeException
|
||||
* @throws socket_exception
|
||||
@@ -494,7 +546,7 @@ class websocket_service extends service {
|
||||
//
|
||||
// Merge all sockets to a single array
|
||||
//
|
||||
$read = array_merge([$this->server_socket], $this->clients);
|
||||
$read = array_merge([$this->server_socket], $this->clients);
|
||||
$write = $except = [];
|
||||
|
||||
//$this->debug("Waiting on event. Connected Clients: (".count($this->clients).")", LOG_DEBUG);
|
||||
@@ -577,10 +629,10 @@ class websocket_service extends service {
|
||||
// Get the error details
|
||||
//
|
||||
$subscriber_id = $se->getSubscriberId();
|
||||
$message = $se->getMessage();
|
||||
$code = $se->getCode();
|
||||
$file = $se->getFile();
|
||||
$line = $se->getLine();
|
||||
$message = $se->getMessage();
|
||||
$code = $se->getCode();
|
||||
$file = $se->getFile();
|
||||
$line = $se->getLine();
|
||||
|
||||
//
|
||||
// Dump the details in the log
|
||||
@@ -595,10 +647,12 @@ class websocket_service extends service {
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Overrides the parent class to shutdown all sockets
|
||||
*
|
||||
* @override service
|
||||
*/
|
||||
public function __destruct() {
|
||||
@@ -610,12 +664,18 @@ class websocket_service extends service {
|
||||
parent::__destruct();
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves an array of open sockets.
|
||||
*
|
||||
* @return array An array containing open socket connections.
|
||||
*/
|
||||
public function get_open_sockets(): array {
|
||||
return $this->clients;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if there are connected web socket clients.
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public function has_clients(): bool {
|
||||
@@ -625,7 +685,9 @@ class websocket_service extends service {
|
||||
/**
|
||||
* When a web socket message is received the $on_message_callback function is called.
|
||||
* Multiple on_message functions can be specified.
|
||||
*
|
||||
* @param callable $on_message_callback
|
||||
*
|
||||
* @throws InvalidArgumentException
|
||||
*/
|
||||
public function on_message(callable $on_message_callback) {
|
||||
@@ -637,8 +699,10 @@ class websocket_service extends service {
|
||||
|
||||
/**
|
||||
* Calls all the on_message functions
|
||||
*
|
||||
* @param resource $resource
|
||||
* @param string $message
|
||||
* @param string $message
|
||||
*
|
||||
* @return void
|
||||
* @access protected
|
||||
*/
|
||||
@@ -652,7 +716,9 @@ class websocket_service extends service {
|
||||
/**
|
||||
* When a web socket handshake has completed, the $on_connect_callback function is called.
|
||||
* Multiple on_connect functions can be specified.
|
||||
*
|
||||
* @param callable $on_connect_callback
|
||||
*
|
||||
* @throws InvalidArgumentException
|
||||
*/
|
||||
public function on_connect(callable $on_connect_callback) {
|
||||
@@ -664,7 +730,9 @@ class websocket_service extends service {
|
||||
|
||||
/**
|
||||
* Calls all the on_connect functions
|
||||
*
|
||||
* @param resource $resource
|
||||
*
|
||||
* @access protected
|
||||
*/
|
||||
protected function trigger_connect($resource) {
|
||||
@@ -676,7 +744,9 @@ class websocket_service extends service {
|
||||
/**
|
||||
* When a web socket has disconnected, the $on_disconnect_callback function is called.
|
||||
* Multiple functions can be specified with subsequent calls
|
||||
*
|
||||
* @param string|callable $on_disconnect_callback
|
||||
*
|
||||
* @throws InvalidArgumentException
|
||||
*/
|
||||
public function on_disconnect($on_disconnect_callback) {
|
||||
@@ -688,7 +758,9 @@ class websocket_service extends service {
|
||||
|
||||
/**
|
||||
* Calls all the on_disconnect_callback functions
|
||||
*
|
||||
* @param resource $socket
|
||||
*
|
||||
* @access protected
|
||||
*/
|
||||
protected function trigger_disconnect($socket) {
|
||||
@@ -699,6 +771,7 @@ class websocket_service extends service {
|
||||
|
||||
/**
|
||||
* Returns the socket used in the server connection
|
||||
*
|
||||
* @return resource
|
||||
*/
|
||||
public function get_socket() {
|
||||
@@ -707,7 +780,9 @@ class websocket_service extends service {
|
||||
|
||||
/**
|
||||
* Remove a client socket on disconnect.
|
||||
*
|
||||
* @param resource $resource Resource for the socket connection
|
||||
*
|
||||
* @return bool Returns true on client disconnect and false when the client is not found in the tracking array
|
||||
* @access protected
|
||||
*/
|
||||
@@ -750,6 +825,8 @@ class websocket_service extends service {
|
||||
|
||||
/**
|
||||
* Performs web socket handshake on new connection.
|
||||
*
|
||||
* @param resource $resource
|
||||
* @access protected
|
||||
*/
|
||||
protected function handshake($resource): void {
|
||||
@@ -765,21 +842,23 @@ class websocket_service extends service {
|
||||
if (!preg_match("/Sec-WebSocket-Key: (.*)\r\n/i", $request_header, $matches)) {
|
||||
throw new invalid_handshake_exception($resource, "Invalid WebSocket handshake");
|
||||
}
|
||||
$key = trim($matches[1]);
|
||||
$accept_key = base64_encode(
|
||||
sha1($key . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true)
|
||||
$key = trim($matches[1]);
|
||||
$accept_key = base64_encode(
|
||||
sha1($key . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true)
|
||||
);
|
||||
$response_header = "HTTP/1.1 101 Switching Protocols\r\n"
|
||||
. "Upgrade: websocket\r\n"
|
||||
. "Connection: Upgrade\r\n"
|
||||
. "Sec-WebSocket-Accept: {$accept_key}\r\n\r\n";
|
||||
. "Upgrade: websocket\r\n"
|
||||
. "Connection: Upgrade\r\n"
|
||||
. "Sec-WebSocket-Accept: {$accept_key}\r\n\r\n";
|
||||
fwrite($resource, $response_header);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read specific number of bytes from a websocket
|
||||
*
|
||||
* @param resource $socket
|
||||
* @param int $length
|
||||
* @param int $length
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
private function read_bytes($socket, int $length): string {
|
||||
@@ -797,7 +876,9 @@ class websocket_service extends service {
|
||||
|
||||
/**
|
||||
* Reads a websocket data frame and converts it to a regular string
|
||||
*
|
||||
* @param resource $socket
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
private function receive_frame($socket): string {
|
||||
@@ -810,8 +891,8 @@ class websocket_service extends service {
|
||||
$this->update_connected_clients();
|
||||
return '';
|
||||
}
|
||||
$bytes = unpack('Cfirst/Csecond', $hdr);
|
||||
$fin = ($bytes['first'] >> 7) & 0x1;
|
||||
$bytes = unpack('Cfirst/Csecond', $hdr);
|
||||
$fin = ($bytes['first'] >> 7) & 0x1;
|
||||
$opcode = $bytes['first'] & 0x0F;
|
||||
$masked = ($bytes['second'] >> 7) & 0x1;
|
||||
$length = $bytes['second'] & 0x7F;
|
||||
@@ -829,7 +910,7 @@ class websocket_service extends service {
|
||||
if (strlen($ext) < 8)
|
||||
return '';
|
||||
// unpack 64-bit BE; PHP 7.0+: use J, else fallback
|
||||
$arr = unpack('J', $ext);
|
||||
$arr = unpack('J', $ext);
|
||||
$length = $arr[1];
|
||||
}
|
||||
|
||||
@@ -863,8 +944,10 @@ class websocket_service extends service {
|
||||
/**
|
||||
* Send text frame to client. If the socket connection is not a valid resource, the send
|
||||
* method will fail silently and return false.
|
||||
* @param resource $resource The socket or resource id to communicate on.
|
||||
* @param string|null $payload The string to wrap in a web socket frame to send to the clients
|
||||
*
|
||||
* @param resource $resource The socket or resource id to communicate on.
|
||||
* @param string|null $payload The string to wrap in a web socket frame to send to the clients
|
||||
*
|
||||
* @return bool
|
||||
*/
|
||||
public static function send($resource, ?string $payload): bool {
|
||||
@@ -878,7 +961,7 @@ class websocket_service extends service {
|
||||
}
|
||||
|
||||
$payload_length = strlen($payload);
|
||||
$frame_header = "\x81"; // FIN = 1, text frame
|
||||
$frame_header = "\x81"; // FIN = 1, text frame
|
||||
// Create frame header
|
||||
if ($payload_length <= 125) {
|
||||
$frame_header .= chr($payload_length);
|
||||
@@ -893,7 +976,7 @@ class websocket_service extends service {
|
||||
// Attempt to write full frame
|
||||
$written = @fwrite($resource, $frame);
|
||||
if ($written === false) {
|
||||
self::log("fwrite() failed for socket " . (int) $resource, LOG_ERR);
|
||||
self::log("fwrite() failed for socket " . (int)$resource, LOG_ERR);
|
||||
throw new socket_disconnected_exception($resource);
|
||||
}
|
||||
|
||||
@@ -907,7 +990,9 @@ class websocket_service extends service {
|
||||
|
||||
/**
|
||||
* Get the IP and port of the connected remote system.
|
||||
*
|
||||
* @param resource $resource The resource or stream of the connection
|
||||
*
|
||||
* @return array An associative array of remote_ip and remote_port
|
||||
*/
|
||||
public static function get_remote_info($resource): array {
|
||||
|
||||
Reference in New Issue
Block a user