From 1834be5042f11399108465b011c9188ed83dc79f Mon Sep 17 00:00:00 2001 From: Alexey Melnichuk Date: Wed, 13 Jul 2016 18:43:51 +0300 Subject: [PATCH] Add. Support `onInterval` method to EventConsumer class (#1747) * Add. Support `onInterval` method to EventConsumer class Usage ```Lua -- execute action each 30 sec events:onInterval(30*1000, function() end) -- execute action once after 5 min events:onIntervalOnce(5*60*1000, function() end) ``` * Fix. Remove timers * Fix. Reset timer before callback It allows stop timer inside callback. Also it produce more accurate interval invocation if callback take quite a long time. E.g. Interval = 10 sec and callback took 5 sec then if we reset timer after this callback then gap between invocation will be 15 sec. * Add. Timers now have TimeEvent class type. Add. `reset` method to IntervalTimer class. ```Lua events:onIntervalOnce(1000, function(self, timer) -- timer has type TimeEvent -- restart timer so it will be invoke again timer:restart() -- or reset new interval -- timer:reset(5000) end) ``` * Fix. Typo in variable name --- .../resources/functions/event_consumer.lua | 194 +++++++++++++++--- .../resources/functions/interval_timer.lua | 6 + 2 files changed, 175 insertions(+), 25 deletions(-) diff --git a/resources/install/scripts/resources/functions/event_consumer.lua b/resources/install/scripts/resources/functions/event_consumer.lua index 92da61cc7c..dd4de925ec 100644 --- a/resources/install/scripts/resources/functions/event_consumer.lua +++ b/resources/install/scripts/resources/functions/event_consumer.lua @@ -40,6 +40,15 @@ local function split_event(event_name) return name end +local function append(t, v) + t[#t+1]=v return t +end + +local function remove(t, i) + table.remove(t, i) + return t +end + ------------------------------------------------------------------------------- local BasicEventEmitter = class() do @@ -300,13 +309,133 @@ EventEmitter.removeListener = EventEmitter.off end ------------------------------------------------------------------------------- +------------------------------------------------------------------------------- +local TimeEvent = class() do + +function TimeEvent:__init(interval, callback, once) + self._timer = IntervalTimer.new(interval):start() + self._callback = callback + self._once = once + + return self +end + +function TimeEvent:started() + return self._timer:started() +end + +function TimeEvent:restart() + return self._timer:restart() +end + +function TimeEvent:rest() + return self._timer:rest() +end + +function TimeEvent:reset(interval) + self._timer:reset(interval) + return self +end + +function TimeEvent:stop() + return self._timer:stop() +end + +function TimeEvent:once() + return self._once +end + +function TimeEvent:fire(...) + -- !!! do not pass self + return self._callback(...) +end + +end +------------------------------------------------------------------------------- + +------------------------------------------------------------------------------- +local TimeEvents = class() do + +function TimeEvents:__init() + self._events = {} + return self +end + +function TimeEvents:sleepInterval(max_interval) + local events = self._events + for i = 1, #events do + local event = events[i] + if event:started() then + local rest = event:rest() + if max_interval > rest then max_interval = rest end + end + end + return max_interval +end + +function TimeEvents:fire(this, ...) + self._lock = true + local i, events = 0, self._events + for i = 1, #events do + local event = events[i] + if event:rest() == 0 then + if event:once() then + event:stop() + else + event:restart() + end + event:fire(this, event, ...) + end + end + self._lock = false + + for i = #events, 1, -1 do + local event = events[i] + if not event:started() then + remove(events, i) + end + end +end + +function TimeEvents:setInterval(interval, callback) + local event = TimeEvent.new(interval, callback, false) + append(self._events, event) + return event +end + +function TimeEvents:setIntervalOnce(interval, callback) + local event = TimeEvent.new(interval, callback, true) + append(self._events, event) + return event +end + +function TimeEvents:removeInterval(timer) + local events = self._events + for i = #events, 1, -1 do + if events[i] == timer then + if self._lock then + events[i]:stop() + else + remove(events, i) + end + return true + end + end +end + +end +------------------------------------------------------------------------------- + ------------------------------------------------------------------------------- local EventConsumer = class(EventEmitter) do +local default_timeout = 60000 +local default_poll_interval = 60000 * 30 + function EventConsumer:__init(timeout, pid_file) self.__base.__init(self) - if pid_file then timeout = timeout or 60000 end + if pid_file then timeout = timeout or default_timeout end if timeout then assert(timeout > 0) end @@ -314,7 +443,7 @@ function EventConsumer:__init(timeout, pid_file) self._running = false self._consumer = freeswitch.EventConsumer() self._timeout = timeout - self._timer = timeout and IntervalTimer.new(timeout) + self._timers = TimeEvents.new() self._pid = api:execute("create_uuid") or tostring(api:getTime()) self._pid_file = pid_file @@ -324,6 +453,13 @@ function EventConsumer:__init(timeout, pid_file) assert(file.write(self._pid_file, self._pid)) end + if self._timeout then + self:onInterval(self._timeout, function(self) + if not self:_check_pid_file() then return self:stop() end + self:emit('TIMEOUT') + end) + end + return self end @@ -366,41 +502,37 @@ function EventConsumer:bind(event_name, cb) end function EventConsumer:run() - self._timer:restart() self._running = true + -- set some huge default interval + -- if there no time events then we wait this amount of time + local max_interval = self._timeout or default_poll_interval + while self._running do - local timeout - if self._timer then - timeout = self._timer:rest() - if timeout == 0 then - if not self:_check_pid_file() then - return - end - timeout = self._timeout - self._timer:restart() - self:emit('TIMEOUT') - end + self._timers:fire(self) + if not self._running then break end + local timeout = self._timers:sleepInterval(max_interval) + + local event + if timeout == 0 then + -- we have some time based events. + -- so we just try get fs event without wait + event = self._consumer:pop(0) else - -- wait infinity - timeout = 0 + event = self._consumer:pop(1, timeout) end - local event = self._consumer:pop(1, timeout) - - if not event then - if not self:_check_pid_file() then - return - end - else - local event_name = event:getHeader('Event-Name') + if event then + local event_name = event:getHeader('Event-Name') if self._bound[event_name] then self:emit(event_name, event) + if not self._running then break end end local event_class = event:getHeader('Event-Subclass') if event_class and #event_class > 0 then event_name = event_name .. '::' .. event_class self:emit(event_name, event) + if not self._running then break end end end end @@ -415,6 +547,18 @@ function EventConsumer:stop() end end +function EventConsumer:onInterval(interval, callback) + return self._timers:setInterval(interval, callback) +end + +function EventConsumer:onIntervalOnce(interval, callback) + return self._timers:setIntervalOnce(interval, callback) +end + +function EventConsumer:offInterval(timer) + return self._timers:removeInterval(timer) +end + end ------------------------------------------------------------------------------- @@ -472,4 +616,4 @@ return { EventConsumer = EventConsumer; new = EventConsumer.new; ievents = ievents; -} \ No newline at end of file +} diff --git a/resources/install/scripts/resources/functions/interval_timer.lua b/resources/install/scripts/resources/functions/interval_timer.lua index 0cc496421e..d5781c50a3 100644 --- a/resources/install/scripts/resources/functions/interval_timer.lua +++ b/resources/install/scripts/resources/functions/interval_timer.lua @@ -68,6 +68,12 @@ function IntervalTimer:rest() return d end +function IntervalTimer:reset(interval) + self._interval = interval + if self._begin then self:restart() end + return self +end + function IntervalTimer:stop() if self:started() then local d = self:elapsed()