mirror of
https://github.com/fusionpbx/fusionpbx.git
synced 2025-12-30 00:53:50 +00:00
Add. Support onInterval method to EventConsumer class (#1747)
* Add. Support `onInterval` method to EventConsumer class Usage ```Lua -- execute action each 30 sec events:onInterval(30*1000, function() end) -- execute action once after 5 min events:onIntervalOnce(5*60*1000, function() end) ``` * Fix. Remove timers * Fix. Reset timer before callback It allows stop timer inside callback. Also it produce more accurate interval invocation if callback take quite a long time. E.g. Interval = 10 sec and callback took 5 sec then if we reset timer after this callback then gap between invocation will be 15 sec. * Add. Timers now have TimeEvent class type. Add. `reset` method to IntervalTimer class. ```Lua events:onIntervalOnce(1000, function(self, timer) -- timer has type TimeEvent -- restart timer so it will be invoke again timer:restart() -- or reset new interval -- timer:reset(5000) end) ``` * Fix. Typo in variable name
This commit is contained in:
committed by
FusionPBX
parent
9da40dc7b1
commit
1834be5042
@@ -40,6 +40,15 @@ local function split_event(event_name)
|
||||
return name
|
||||
end
|
||||
|
||||
local function append(t, v)
|
||||
t[#t+1]=v return t
|
||||
end
|
||||
|
||||
local function remove(t, i)
|
||||
table.remove(t, i)
|
||||
return t
|
||||
end
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
local BasicEventEmitter = class() do
|
||||
|
||||
@@ -300,13 +309,133 @@ EventEmitter.removeListener = EventEmitter.off
|
||||
end
|
||||
-------------------------------------------------------------------------------
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
local TimeEvent = class() do
|
||||
|
||||
function TimeEvent:__init(interval, callback, once)
|
||||
self._timer = IntervalTimer.new(interval):start()
|
||||
self._callback = callback
|
||||
self._once = once
|
||||
|
||||
return self
|
||||
end
|
||||
|
||||
function TimeEvent:started()
|
||||
return self._timer:started()
|
||||
end
|
||||
|
||||
function TimeEvent:restart()
|
||||
return self._timer:restart()
|
||||
end
|
||||
|
||||
function TimeEvent:rest()
|
||||
return self._timer:rest()
|
||||
end
|
||||
|
||||
function TimeEvent:reset(interval)
|
||||
self._timer:reset(interval)
|
||||
return self
|
||||
end
|
||||
|
||||
function TimeEvent:stop()
|
||||
return self._timer:stop()
|
||||
end
|
||||
|
||||
function TimeEvent:once()
|
||||
return self._once
|
||||
end
|
||||
|
||||
function TimeEvent:fire(...)
|
||||
-- !!! do not pass self
|
||||
return self._callback(...)
|
||||
end
|
||||
|
||||
end
|
||||
-------------------------------------------------------------------------------
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
local TimeEvents = class() do
|
||||
|
||||
function TimeEvents:__init()
|
||||
self._events = {}
|
||||
return self
|
||||
end
|
||||
|
||||
function TimeEvents:sleepInterval(max_interval)
|
||||
local events = self._events
|
||||
for i = 1, #events do
|
||||
local event = events[i]
|
||||
if event:started() then
|
||||
local rest = event:rest()
|
||||
if max_interval > rest then max_interval = rest end
|
||||
end
|
||||
end
|
||||
return max_interval
|
||||
end
|
||||
|
||||
function TimeEvents:fire(this, ...)
|
||||
self._lock = true
|
||||
local i, events = 0, self._events
|
||||
for i = 1, #events do
|
||||
local event = events[i]
|
||||
if event:rest() == 0 then
|
||||
if event:once() then
|
||||
event:stop()
|
||||
else
|
||||
event:restart()
|
||||
end
|
||||
event:fire(this, event, ...)
|
||||
end
|
||||
end
|
||||
self._lock = false
|
||||
|
||||
for i = #events, 1, -1 do
|
||||
local event = events[i]
|
||||
if not event:started() then
|
||||
remove(events, i)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
function TimeEvents:setInterval(interval, callback)
|
||||
local event = TimeEvent.new(interval, callback, false)
|
||||
append(self._events, event)
|
||||
return event
|
||||
end
|
||||
|
||||
function TimeEvents:setIntervalOnce(interval, callback)
|
||||
local event = TimeEvent.new(interval, callback, true)
|
||||
append(self._events, event)
|
||||
return event
|
||||
end
|
||||
|
||||
function TimeEvents:removeInterval(timer)
|
||||
local events = self._events
|
||||
for i = #events, 1, -1 do
|
||||
if events[i] == timer then
|
||||
if self._lock then
|
||||
events[i]:stop()
|
||||
else
|
||||
remove(events, i)
|
||||
end
|
||||
return true
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
-------------------------------------------------------------------------------
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
local EventConsumer = class(EventEmitter) do
|
||||
|
||||
local default_timeout = 60000
|
||||
local default_poll_interval = 60000 * 30
|
||||
|
||||
function EventConsumer:__init(timeout, pid_file)
|
||||
self.__base.__init(self)
|
||||
|
||||
if pid_file then timeout = timeout or 60000 end
|
||||
if pid_file then timeout = timeout or default_timeout end
|
||||
|
||||
if timeout then assert(timeout > 0) end
|
||||
|
||||
@@ -314,7 +443,7 @@ function EventConsumer:__init(timeout, pid_file)
|
||||
self._running = false
|
||||
self._consumer = freeswitch.EventConsumer()
|
||||
self._timeout = timeout
|
||||
self._timer = timeout and IntervalTimer.new(timeout)
|
||||
self._timers = TimeEvents.new()
|
||||
self._pid = api:execute("create_uuid") or tostring(api:getTime())
|
||||
self._pid_file = pid_file
|
||||
|
||||
@@ -324,6 +453,13 @@ function EventConsumer:__init(timeout, pid_file)
|
||||
assert(file.write(self._pid_file, self._pid))
|
||||
end
|
||||
|
||||
if self._timeout then
|
||||
self:onInterval(self._timeout, function(self)
|
||||
if not self:_check_pid_file() then return self:stop() end
|
||||
self:emit('TIMEOUT')
|
||||
end)
|
||||
end
|
||||
|
||||
return self
|
||||
end
|
||||
|
||||
@@ -366,41 +502,37 @@ function EventConsumer:bind(event_name, cb)
|
||||
end
|
||||
|
||||
function EventConsumer:run()
|
||||
self._timer:restart()
|
||||
self._running = true
|
||||
|
||||
-- set some huge default interval
|
||||
-- if there no time events then we wait this amount of time
|
||||
local max_interval = self._timeout or default_poll_interval
|
||||
|
||||
while self._running do
|
||||
local timeout
|
||||
if self._timer then
|
||||
timeout = self._timer:rest()
|
||||
if timeout == 0 then
|
||||
if not self:_check_pid_file() then
|
||||
return
|
||||
end
|
||||
timeout = self._timeout
|
||||
self._timer:restart()
|
||||
self:emit('TIMEOUT')
|
||||
end
|
||||
self._timers:fire(self)
|
||||
if not self._running then break end
|
||||
local timeout = self._timers:sleepInterval(max_interval)
|
||||
|
||||
local event
|
||||
if timeout == 0 then
|
||||
-- we have some time based events.
|
||||
-- so we just try get fs event without wait
|
||||
event = self._consumer:pop(0)
|
||||
else
|
||||
-- wait infinity
|
||||
timeout = 0
|
||||
event = self._consumer:pop(1, timeout)
|
||||
end
|
||||
|
||||
local event = self._consumer:pop(1, timeout)
|
||||
|
||||
if not event then
|
||||
if not self:_check_pid_file() then
|
||||
return
|
||||
end
|
||||
else
|
||||
local event_name = event:getHeader('Event-Name')
|
||||
if event then
|
||||
local event_name = event:getHeader('Event-Name')
|
||||
if self._bound[event_name] then
|
||||
self:emit(event_name, event)
|
||||
if not self._running then break end
|
||||
end
|
||||
local event_class = event:getHeader('Event-Subclass')
|
||||
if event_class and #event_class > 0 then
|
||||
event_name = event_name .. '::' .. event_class
|
||||
self:emit(event_name, event)
|
||||
if not self._running then break end
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -415,6 +547,18 @@ function EventConsumer:stop()
|
||||
end
|
||||
end
|
||||
|
||||
function EventConsumer:onInterval(interval, callback)
|
||||
return self._timers:setInterval(interval, callback)
|
||||
end
|
||||
|
||||
function EventConsumer:onIntervalOnce(interval, callback)
|
||||
return self._timers:setIntervalOnce(interval, callback)
|
||||
end
|
||||
|
||||
function EventConsumer:offInterval(timer)
|
||||
return self._timers:removeInterval(timer)
|
||||
end
|
||||
|
||||
end
|
||||
-------------------------------------------------------------------------------
|
||||
|
||||
@@ -472,4 +616,4 @@ return {
|
||||
EventConsumer = EventConsumer;
|
||||
new = EventConsumer.new;
|
||||
ievents = ievents;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,6 +68,12 @@ function IntervalTimer:rest()
|
||||
return d
|
||||
end
|
||||
|
||||
function IntervalTimer:reset(interval)
|
||||
self._interval = interval
|
||||
if self._begin then self:restart() end
|
||||
return self
|
||||
end
|
||||
|
||||
function IntervalTimer:stop()
|
||||
if self:started() then
|
||||
local d = self:elapsed()
|
||||
|
||||
Reference in New Issue
Block a user