mirror of
https://github.com/fusionpbx/fusionpbx.git
synced 2026-02-12 22:24:59 +00:00
Fix active_conferences high cpu bug on switch disconnect (#7742)
This commit is contained in:
@@ -243,6 +243,8 @@ class active_conferences_service extends base_websocket_system_service implement
|
||||
// Re-connect to the switch server
|
||||
if ($this->connect_to_switch_server()) {
|
||||
$this->register_event_socket_filters();
|
||||
} else {
|
||||
$this->warning('Failed to connect to switch server - conference events will not be received');
|
||||
}
|
||||
|
||||
// Add the switch event socket to the base websocket listener
|
||||
@@ -256,6 +258,20 @@ class active_conferences_service extends base_websocket_system_service implement
|
||||
*/
|
||||
protected function handle_switch_events(): void {
|
||||
$event = $this->event_socket->read_event();
|
||||
// Make sure the connection is still alive and if not, attempt to reconnect
|
||||
if ($event === false || ($this->event_socket === null && $this->switch_socket !== null)) {
|
||||
// Notify log
|
||||
$this->warning('Lost connection to switch server');
|
||||
|
||||
// Remove the broken socket listener for the switch socket and clear the socket and event socket properties
|
||||
$this->remove_listener($this->switch_socket);
|
||||
$this->switch_socket = null;
|
||||
$this->event_socket = null;
|
||||
|
||||
// Attempt to reconnect
|
||||
$this->reload_settings();
|
||||
return;
|
||||
}
|
||||
$event_message = event_message::create_from_switch_event($event, $this->event_filter);
|
||||
|
||||
// Set the event message topic as the event name
|
||||
@@ -364,15 +380,21 @@ class active_conferences_service extends base_websocket_system_service implement
|
||||
|
||||
// Create a new switch server connection object
|
||||
try {
|
||||
$this->switch_socket = stream_socket_client("tcp://$host:$port", $errno, $errstr, 5);
|
||||
// Wait indefinately to re-connect to the switch server
|
||||
while (true) {
|
||||
$this->switch_socket = stream_socket_client("tcp://$host:$port", $errno, $errstr, 5);
|
||||
if ($this->switch_socket) {
|
||||
// Successfully connected to the switch server
|
||||
$this->notice("Connected to switch server at $host:$port");
|
||||
break;
|
||||
}
|
||||
//sleep timeout before retrying to avoid busy loop if switch is down
|
||||
sleep(3);
|
||||
}
|
||||
} catch (\RuntimeException $re) {
|
||||
$this->warning('Unable to connect to event socket');
|
||||
}
|
||||
|
||||
if (!$this->switch_socket) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Block (wait) for responses so we can authenticate
|
||||
stream_set_blocking($this->switch_socket, true);
|
||||
|
||||
@@ -417,6 +439,10 @@ class active_conferences_service extends base_websocket_system_service implement
|
||||
$this->reload_settings();
|
||||
}
|
||||
|
||||
protected function on_ws_authenticated(websocket_message $websocket_message): void {
|
||||
$this->notice("Websocket client authenticated successfully");
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle ping requests to keep the connection alive
|
||||
*
|
||||
@@ -726,7 +752,7 @@ class active_conferences_service extends base_websocket_system_service implement
|
||||
*/
|
||||
protected function subscribe_room(websocket_message $message): void {
|
||||
$this->debug('Room subscription requested');
|
||||
|
||||
|
||||
$response = new websocket_message();
|
||||
$response
|
||||
->payload(['subscribed' => 'room'])
|
||||
@@ -981,7 +1007,7 @@ class active_conferences_service extends base_websocket_system_service implement
|
||||
|
||||
/**
|
||||
* Lookup human-readable conference display name from cache or database
|
||||
*
|
||||
*
|
||||
* Conference Center rooms use UUID as identifier and have conference_room_name
|
||||
* Simple Conferences use extension as identifier and have conference_name
|
||||
*
|
||||
@@ -1030,7 +1056,7 @@ class active_conferences_service extends base_websocket_system_service implement
|
||||
}
|
||||
unset($parameters);
|
||||
}
|
||||
|
||||
|
||||
if (is_numeric($conference_key)) {
|
||||
// Simple Conference - lookup by extension
|
||||
$sql = "SELECT c.conference_name ";
|
||||
@@ -1181,7 +1207,7 @@ class active_conferences_service extends base_websocket_system_service implement
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (!$member_found) {
|
||||
$this->debug("Member $member_id not found in members list. Available IDs: " . implode(', ', array_column($members, 'id')));
|
||||
}
|
||||
@@ -1282,7 +1308,7 @@ class active_conferences_service extends base_websocket_system_service implement
|
||||
private function broadcast_enriched_event(array $event_data, string $action, string $conference_name): void {
|
||||
$this->debug("Broadcasting enriched event - action: $action, conference: $conference_name");
|
||||
$this->debug("Payload: " . json_encode($event_data));
|
||||
|
||||
|
||||
$message = new websocket_message();
|
||||
$message
|
||||
->service_name(self::get_service_name())
|
||||
|
||||
@@ -41,7 +41,7 @@ abstract class base_websocket_system_service extends service implements websocke
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
protected $listeners;
|
||||
private $listeners;
|
||||
|
||||
/**
|
||||
* Outputs the version of the Service.
|
||||
@@ -116,8 +116,23 @@ abstract class base_websocket_system_service extends service implements websocke
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
protected function add_listener($socket, callable $callback): void {
|
||||
$this->listeners[] = [$socket, $callback];
|
||||
protected function add_listener($socket, callable $callback, array $args = []): void {
|
||||
$this->listeners[] = [$socket, $callback, $args];
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a socket listener
|
||||
*
|
||||
* @param $socket
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
protected function remove_listener($socket): void {
|
||||
foreach ($this->listeners as $key => $listener) {
|
||||
if ($listener[0] === $socket) {
|
||||
unset($this->listeners[$key]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -203,9 +218,12 @@ abstract class base_websocket_system_service extends service implements websocke
|
||||
}
|
||||
// Other listeners
|
||||
foreach ($this->listeners as $listener) {
|
||||
if ($resource === $listener[0]) {
|
||||
$socket = $listener[0];
|
||||
if ($resource === $socket) {
|
||||
// Call the callback function provided by the add_listener function
|
||||
call_user_func($listener[1]);
|
||||
$callback = $listener[1];
|
||||
$args = $listener[2] ?? [];
|
||||
call_user_func($callback, $args);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -269,7 +287,7 @@ abstract class base_websocket_system_service extends service implements websocke
|
||||
}
|
||||
|
||||
private function handle_ws_connected(): void {
|
||||
$this->info("Websocket connection established to server");
|
||||
$this->notice("Websocket connection established to server");
|
||||
$this->debug(static::class . " RESOURCE ID: " . $this->ws_client->socket());
|
||||
$this->on_ws_connected();
|
||||
}
|
||||
@@ -284,15 +302,15 @@ abstract class base_websocket_system_service extends service implements websocke
|
||||
}
|
||||
|
||||
private function handle_ws_authenticated(websocket_message $websocket_message): void {
|
||||
$this->info("Successfully authenticated with websocket server");
|
||||
$this->on_ws_authenticated();
|
||||
// Call the on authenticated event function in the child class to perform any necessary actions after authentication ie. logging
|
||||
$this->on_ws_authenticated($websocket_message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the service has successfully authenticated with the websocket server.
|
||||
* Override in child class to perform actions after authentication.
|
||||
*/
|
||||
protected function on_ws_authenticated(): void {
|
||||
protected function on_ws_authenticated(websocket_message $websocket_message): void {
|
||||
// Override in child class if needed
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user