mirror of
https://github.com/fusionpbx/fusionpbx.git
synced 2025-12-30 00:53:50 +00:00
Fix websocket in progress calls (#7394)
* Add a max chunking size when reading bytes to prevent failure * Add more debug information * Update the active calls response to be better
This commit is contained in:
@@ -424,11 +424,6 @@ class active_calls_service extends service implements websocket_service_interfac
|
||||
websocket_client::send($this->ws_client->socket(), websocket_message::request_forbidden($websocket_message->request_id, SERVICE_NAME, $websocket_message->topic));
|
||||
}
|
||||
|
||||
// Make sure we are connected
|
||||
if (!$this->event_socket->is_connected()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Set up the response array
|
||||
$response = [];
|
||||
$response['service_name'] = SERVICE_NAME;
|
||||
@@ -464,29 +459,55 @@ class active_calls_service extends service implements websocket_service_interfac
|
||||
websocket_client::send($this->ws_client->socket(), websocket_message::request_forbidden($websocket_message->request_id, SERVICE_NAME, $websocket_message->topic));
|
||||
}
|
||||
|
||||
// Get the payload
|
||||
$payload = $websocket_message->payload();
|
||||
|
||||
// Get the request ID so we can route it back
|
||||
$request_id = $websocket_message->request_id() ?? '';
|
||||
|
||||
// Get the UUID from the payload
|
||||
$uuid = $payload['unique_id'] ?? '';
|
||||
|
||||
// Respond with bad command
|
||||
if (empty($uuid)) {
|
||||
websocket_client::send(websocket_message::request_is_bad($request_id, SERVICE_NAME, 'hangup'));
|
||||
}
|
||||
|
||||
$host = self::$switch_host ?? parent::$config->get('switch.event_socket.host', '127.0.0.1');
|
||||
$port = self::$switch_port ?? parent::$config->get('switch.event_socket.port', 8021);
|
||||
$password = self::$switch_password ?? parent::$config->get('switch.event_socket.password', 'ClueCon');
|
||||
|
||||
//
|
||||
// We use a new socket connection to get the response because the switch
|
||||
// can be firing events while we are processing so we need only this
|
||||
// request answered
|
||||
//
|
||||
$event_socket = new event_socket();
|
||||
$event_socket->connect($host, $port, $password);
|
||||
|
||||
// Make sure we are connected
|
||||
if (!$this->event_socket->is_connected()) {
|
||||
$this->warn("Failed to hangup call because event socket no longer connected");
|
||||
if (!$event_socket->is_connected()) {
|
||||
$this->warn("Unable to connect to event socket");
|
||||
return;
|
||||
}
|
||||
|
||||
// Send the command on a new channel that does not have events
|
||||
$reply = trim($event_socket->request("api uuid_kill $uuid"));
|
||||
|
||||
// Close the connection
|
||||
$event_socket->close();
|
||||
|
||||
// Set up the response array
|
||||
$response = [];
|
||||
$response['service_name'] = SERVICE_NAME;
|
||||
$response['topic'] = $websocket_message->topic;
|
||||
$response['request_id'] = $websocket_message->request_id;
|
||||
|
||||
// Get the payload
|
||||
$payload = $websocket_message->payload;
|
||||
|
||||
// Get the UUID from the payloadc
|
||||
$uuid = $payload['unique_id'] ?? '';
|
||||
|
||||
$response['status_message'] = 'success';
|
||||
$response['status_code'] = 200;
|
||||
|
||||
//Notify switch to hangup and ignore the response
|
||||
$response = $this->event_socket->request("bgapi uuid_kill $uuid");
|
||||
// Set the response payload to the reply received from the switch
|
||||
$response['payload'] = $reply;
|
||||
|
||||
// Notify websocket server of the result
|
||||
websocket_client::send($this->ws_client->socket(), new websocket_message($response));
|
||||
@@ -658,13 +679,17 @@ class active_calls_service extends service implements websocket_service_interfac
|
||||
private function get_active_calls(): array {
|
||||
$calls = [];
|
||||
|
||||
$host = self::$switch_host ?? parent::$config->get('switch.event_socket.host', '127.0.0.1');
|
||||
$port = self::$switch_port ?? parent::$config->get('switch.event_socket.port', 8021);
|
||||
$password = self::$switch_password ?? parent::$config->get('switch.event_socket.password', 'ClueCon');
|
||||
|
||||
//
|
||||
// We use a new socket connection to get the response because the switch
|
||||
// can be firing events while we are processing so we need only this
|
||||
// request answered
|
||||
//
|
||||
$event_socket = new event_socket();
|
||||
$event_socket->connect();
|
||||
$event_socket->connect($host, $port, $password);
|
||||
|
||||
// Make sure we are connected
|
||||
if (!$event_socket->is_connected()) {
|
||||
|
||||
@@ -336,14 +336,35 @@ class websocket_client {
|
||||
// Helper function to fully read N bytes
|
||||
private function read_bytes(int $length): ?string {
|
||||
$data = '';
|
||||
$max_chunk_size = stream_get_chunk_size($this->resource);
|
||||
|
||||
while (strlen($data) < $length) {
|
||||
$chunk = fread($this->resource, $length - strlen($data));
|
||||
if ($chunk === false || $chunk === '') {
|
||||
break;
|
||||
$remaining = $length - strlen($data);
|
||||
$read_size = min($max_chunk_size, $remaining);
|
||||
|
||||
// Read maximum chunk size or what is remaining
|
||||
$chunk = fread($this->resource, $read_size);
|
||||
|
||||
if ($chunk === false) {
|
||||
echo "[ERROR] fread() failed to read stream\n";
|
||||
return null;
|
||||
}
|
||||
|
||||
if ($chunk === '') {
|
||||
$meta = stream_get_meta_data($this->resource);
|
||||
if (!empty($meta['timed_out'])) {
|
||||
echo "[ERROR] Socket timed out after reading " . strlen($data) . " of $length bytes\n";
|
||||
return null;
|
||||
}
|
||||
// Jitter or other read issues on the socket so wait 10 ms
|
||||
usleep(10000);
|
||||
|
||||
// Try again
|
||||
continue;
|
||||
}
|
||||
$data .= $chunk;
|
||||
}
|
||||
return strlen($data) === $length ? $data : null;
|
||||
return $data;
|
||||
}
|
||||
|
||||
public function authenticate($token_name, $token_hash) {
|
||||
@@ -409,6 +430,14 @@ class websocket_client {
|
||||
}
|
||||
}
|
||||
|
||||
// PHP <=7.4 compatibility - Replaced in PHP 8.0+
|
||||
if (!function_exists('stream_get_chunk_size')) {
|
||||
function stream_get_chunk_size($stream): int {
|
||||
// For PHP versions lower then 8 we send the maximum size defined from https://php.net/stream_get_chunk_size
|
||||
return 8192;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Example usage:
|
||||
*/
|
||||
|
||||
@@ -401,6 +401,7 @@ class websocket_service extends service {
|
||||
$message = websocket_message::create_from_json_message($json_array);
|
||||
|
||||
if ($message === null) {
|
||||
$this->warn("Message is empty");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -419,6 +420,7 @@ class websocket_service extends service {
|
||||
|
||||
// Message is from the client so check the service_name that needs to get the message
|
||||
if (!empty($message->service_name())) {
|
||||
$this->debug("Message is from subscriber");
|
||||
$this->handle_client_message($subscriber, $message);
|
||||
} else {
|
||||
// Message does not have a service name
|
||||
@@ -435,6 +437,9 @@ class websocket_service extends service {
|
||||
foreach ($this->subscribers as $service) {
|
||||
//when we find the service send the request
|
||||
if ($service->service_equals($message->service_name())) {
|
||||
//notify we found the service
|
||||
$this->debug("Routing message to service '" . $message->service_name() . "' for topic '" . $message->topic() . "'");
|
||||
|
||||
//attach the current subscriber permissions so the service can verify
|
||||
$message->permissions($subscriber->get_permissions());
|
||||
|
||||
@@ -446,6 +451,7 @@ class websocket_service extends service {
|
||||
|
||||
//send the modified web socket message to the service
|
||||
$service->send((string) $message);
|
||||
|
||||
//continue searching for service providers
|
||||
continue;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user