mirror of
https://github.com/fusionpbx/fusionpbx.git
synced 2025-12-30 00:53:50 +00:00
Fix bug by adding a 'tries' counter so reading websocket data can return (#7470)
* add a tries counter so reading websocket data can fail * fix empty constructor parameters when creating a permission_filter * Use the message topic for debugging info instead of payload data * use a socket disconnect exception instead of runtime exception * Add a wait state for re-connecting to websocket server * Add PHPDoc return information for connect_to_ws_server method * Remove the cannot send message for already removed resource * Remove param in the method call to handle_websocket_event
This commit is contained in:
@@ -165,7 +165,7 @@ abstract class base_websocket_system_service extends service implements websocke
|
||||
|
||||
/**
|
||||
* Connects to the web socket server using a websocket_client object
|
||||
* @return bool
|
||||
* @return bool True if connected and False if not able to connect
|
||||
*/
|
||||
protected function connect_to_ws_server(): bool {
|
||||
if ($this->ws_client !== null && $this->ws_client->is_connected()) return true;
|
||||
|
||||
@@ -154,7 +154,7 @@ class websocket_client {
|
||||
*/
|
||||
public static function send($resource, ?string $payload): bool {
|
||||
if (!is_resource($resource)) {
|
||||
throw new \RuntimeException("Not connected");
|
||||
throw new \socket_disconnected_exception($resource, 400);
|
||||
}
|
||||
|
||||
// Check for a null message and send a disconnect frame
|
||||
@@ -254,8 +254,9 @@ class websocket_client {
|
||||
|
||||
while (!$final_frame) {
|
||||
$header = $this->read_bytes(2);
|
||||
if ($header === null)
|
||||
if (strlen($header) !== 2) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$byte1 = ord($header[0]);
|
||||
$byte2 = ord($header[1]);
|
||||
@@ -338,7 +339,9 @@ class websocket_client {
|
||||
$data = '';
|
||||
$max_chunk_size = stream_get_chunk_size($this->resource);
|
||||
|
||||
while (strlen($data) < $length) {
|
||||
// 20 tries waits 200 ms total per chunk
|
||||
$tries = 0;
|
||||
while (strlen($data) < $length && $tries < 20) {
|
||||
$remaining = $length - strlen($data);
|
||||
$read_size = min($max_chunk_size, $remaining);
|
||||
|
||||
@@ -360,8 +363,14 @@ class websocket_client {
|
||||
usleep(10000);
|
||||
|
||||
// Try again
|
||||
$tries++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// reset count for successfully received chunk
|
||||
$tries = 0;
|
||||
|
||||
// append the chunk to the received data
|
||||
$data .= $chunk;
|
||||
}
|
||||
return $data;
|
||||
|
||||
@@ -260,7 +260,7 @@ class websocket_service extends service {
|
||||
foreach ($send_to as $subscriber) {
|
||||
try {
|
||||
// Notify of the message we are broadcasting
|
||||
$this->debug("Broadcasting message '" . $message->payload['event_name'] . "' for service '" . $message->service_name . "' to subscriber $subscriber->id");
|
||||
$this->debug("Broadcasting message '" . $message->topic() . "' for service '" . $message->service_name . "' to subscriber $subscriber->id");
|
||||
$subscriber->send_message($message);
|
||||
} catch (subscriber_token_expired_exception $ste) {
|
||||
$this->info("Subscriber $ste->id token expired");
|
||||
@@ -806,6 +806,7 @@ class websocket_service extends service {
|
||||
// Ensure we have the correct number of bytes
|
||||
if (strlen($hdr) !== 2) {
|
||||
$this->warning('Header is empty!');
|
||||
$this->debug('Header content: ' . bin2hex($hdr) . '(' . strlen($hdr) . ' bytes)');
|
||||
$this->update_connected_clients();
|
||||
return '';
|
||||
}
|
||||
@@ -884,7 +885,6 @@ class websocket_service extends service {
|
||||
*/
|
||||
public static function send($resource, ?string $payload): bool {
|
||||
if (!is_resource($resource)) {
|
||||
self::log("Cannot send: invalid resource", LOG_ERR);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user