Add. EventConsumer class. (#1741)

Rewrite MWI and CallFlow subscribe handlers based on EventConsumer class.
Also on my test VirtualBox/Debian system Lua function `os.clock` produce
very strange result(delta ~0.015 for 1 second) so I switch to `os.time`.
Now to to stop this background Lua scripts it possible send CUSTOM event
with subclass `fusion::XXX::shutdown`. Where XXX is `mwi` or `flow`.

Usage of EventConsumer class
```Lua
-- create new object with timeout one minute
local events = EventConsumer.new(60000)

-- bind to some FS event
events:bind("SHUTDOW", function(self, name, event) ... end)

-- bind to another FS event with subclass
events:bind("CUSTOM::fusion::mwi::shutdown", function(self, name, event) ... end)

-- handle timeout event
events:on("TIMEOUT", function(self, name) ... end)

--run event loop
events:run()
```
This commit is contained in:
Alexey Melnichuk
2016-07-08 22:10:43 +03:00
committed by FusionPBX
parent 666ccddb34
commit cae644c8a1
5 changed files with 588 additions and 131 deletions

View File

@@ -0,0 +1,475 @@
require "resources.functions.mkdir";
require "resources.functions.split"
local IntervalTimer = require "resources.functions.interval_timer"
local file = require "resources.functions.file"
local api = require "resources.functions.api"
local function class(base)
local t = base and setmetatable({}, base) or {}
t.__index = t
t.__class = t
t.__base = base
function t.new(...)
local o = setmetatable({}, t)
if o.__init then
if t == ... then -- we call as Class:new()
return o:__init(select(2, ...))
else -- we call as Class.new()
return o:__init(...)
end
end
return o
end
return t
end
local function callable(f)
return type(f) == 'function'
end
local function basename(p)
return (string.match(p, '^(.-)[/\\][^/\\]+$'))
end
local function split_event(event_name)
local name, class = split_first(event_name, "::", true)
if class then return name, class end
return name
end
-------------------------------------------------------------------------------
local BasicEventEmitter = class() do
local ANY_EVENT = {}
BasicEventEmitter.ANY = ANY_EVENT
function BasicEventEmitter:__init()
-- map of array of listeners
self._handlers = {}
-- map to convert user's listener to internal wrapper
self._once = {}
return self
end
function BasicEventEmitter:on(event, handler)
local list = self._handlers[event] or {}
for i = 1, #list do
if list[i] == handler then
return self
end
end
list[#list + 1] = handler
self._handlers[event] = list
return self
end
function BasicEventEmitter:many(event, ttl, handler)
self:off(event, handler)
local function listener(...)
ttl = ttl - 1
if ttl == 0 then self:off(event, handler) end
handler(...)
end
self:on(event, listener)
self._once[handler] = listener
return self
end
function BasicEventEmitter:once(event, handler)
return self:many(event, 1, handler)
end
function BasicEventEmitter:off(event, handler)
local list = self._handlers[event]
if not list then return self end
if handler then
local listener = self._once[handler] or handler
self._once[handler] = nil
for i = 1, #list do
if list[i] == listener then
table.remove(list, i)
break
end
end
if #list == 0 then self._handlers[event] = nil end
else
for handler, listener in pairs(self._once) do
for i = 1, #list do
if list[i] == listener then
self._once[handler] = nil
break
end
end
end
self._handlers[event] = nil
end
return self
end
function BasicEventEmitter:onAny(handler)
return self:on(ANY_EVENT, handler)
end
function BasicEventEmitter:manyAny(ttl, handler)
return self:many(ANY_EVENT, ttl, handler)
end
function BasicEventEmitter:onceAny(handler)
return self:once(ANY_EVENT, handler)
end
function BasicEventEmitter:offAny(handler)
return self:off(ANY_EVENT, handler)
end
function BasicEventEmitter:_emit_impl(call_any, event, ...)
local ret = false
if call_any and ANY_EVENT ~= event then
ret = self:_emit_impl(false, ANY_EVENT, ...) or ret
end
local list = self._handlers[event]
if list then
for i = #list, 1, -1 do
if list[i] then
-- we need this check because cb could remove some listeners
list[i](...)
ret = true
end
end
end
return ret
end
function BasicEventEmitter:emit(event, ...)
return self:_emit_impl(true, event, ...)
end
function BasicEventEmitter:_emit_all(...)
-- we have to copy because cb can remove/add some events
-- and we do not need call new one or removed one
local names = {}
for name in pairs(self._handlers) do
names[#names+1] = name
end
local ret = false
for i = 1, #names do
ret = self:_emit_impl(false, names[i], ...) or ret
end
return ret
end
function BasicEventEmitter:_empty()
return nil == next(self._handlers)
end
function BasicEventEmitter:removeAllListeners(eventName)
if not eventName then
self._handlers = {}
self._once = {}
else
self:off(eventName)
end
return self
end
end
-------------------------------------------------------------------------------
-------------------------------------------------------------------------------
local EventEmitter = class() do
function EventEmitter:__init(opt)
if opt and opt.wildcard then
assert('`EventEmitter::wildcard` not supported')
-- self._EventEmitter = TreeEventEmitter.new(opt.delimiter)
else
self._EventEmitter = BasicEventEmitter.new()
end
self._EventEmitter_self = opt and opt.self or self
return self
end
function EventEmitter:on(event, listener)
assert(event, 'event expected')
assert(callable(listener), 'function expected')
self._EventEmitter:on(event, listener)
return self
end
function EventEmitter:many(event, ttl, listener)
assert(event, 'event expected')
assert(type(ttl) == 'number', 'number required')
assert(callable(listener), 'function expected')
self._EventEmitter:many(event, ttl, listener)
return self
end
function EventEmitter:once(event, listener)
assert(event, 'event expected')
assert(callable(listener), 'function expected')
self._EventEmitter:once(event, listener)
return self
end
function EventEmitter:off(event, listener)
assert(event, 'event expected')
assert((listener == nil) or callable(listener), 'function expected')
self._EventEmitter:off(event, listener)
return self
end
function EventEmitter:emit(event, ...)
assert(event, 'event expected')
return self._EventEmitter:emit(event, self._EventEmitter_self, event, ...)
end
function EventEmitter:onAny(listener)
assert(callable(listener), 'function expected')
self._EventEmitter:onAny(listener)
return self
end
function EventEmitter:manyAny(ttl, listener)
assert(type(ttl) == 'number', 'number required')
assert(callable(listener), 'function expected')
self._EventEmitter:manyAny(ttl, listener)
return self
end
function EventEmitter:onceAny(listener)
assert(callable(listener), 'function expected')
self._EventEmitter:onceAny(listener)
return self
end
function EventEmitter:offAny(listener)
assert((listener == nil) or callable(listener), 'function expected')
self._EventEmitter:offAny(listener)
return self
end
function EventEmitter:removeAllListeners(eventName)
self._EventEmitter:removeAllListeners(eventName)
return self
end
-- aliases
EventEmitter.addListener = EventEmitter.on
EventEmitter.removeListener = EventEmitter.off
end
-------------------------------------------------------------------------------
-------------------------------------------------------------------------------
local EventConsumer = class(EventEmitter) do
function EventConsumer:__init(timeout, pid_file)
self.__base.__init(self)
if pid_file then timeout = timeout or 60000 end
if timeout then assert(timeout > 0) end
self._bound = {}
self._running = false
self._consumer = freeswitch.EventConsumer()
self._timeout = timeout
self._timer = timeout and IntervalTimer.new(timeout)
self._pid = api:execute("create_uuid") or tostring(api:getTime())
self._pid_file = pid_file
if pid_file then
local pid_path = basename(self._pid_file)
mkdir(pid_path)
assert(file.write(self._pid_file, self._pid))
end
return self
end
function EventConsumer:_check_pid_file()
if not self._pid_file then
return true
end
if not file.exists(self._pid_file) then
return false
end
local stored = file.read(self._pid_file)
if stored and stored ~= self._pid then
return false
end
return true
end
function EventConsumer:bind(event_name, cb)
if not self._bound[event_name] then
local name, class = split_event(event_name)
local ok, err
if not class then ok, err = self._consumer:bind(name)
else ok, err = self._consumer:bind(name, class) end
if ok then self._bound[event_name] = true end
end
if self._bound[event_name] and cb then
if event_name == 'ALL' then
self:onAny(function(self, name, event)
if event then return cb(self, name, event) end
end)
else
self:on(event_name, cb)
end
end
end
function EventConsumer:run()
self._timer:restart()
self._running = true
while self._running do
local timeout
if self._timer then
timeout = self._timer:rest()
if timeout == 0 then
if not self:_check_pid_file() then
return
end
timeout = self._timeout
self._timer:restart()
self:emit('TIMEOUT')
end
else
-- wait infinity
timeout = 0
end
local event = self._consumer:pop(1, timeout)
if not event then
if not self:_check_pid_file() then
return
end
else
local event_name = event:getHeader('Event-Name')
if self._bound[event_name] then
self:emit(event_name, event)
end
local event_class = event:getHeader('Event-Subclass')
if event_class and #event_class > 0 then
event_name = event_name .. '::' .. event_class
self:emit(event_name, event)
end
end
end
self._running = false
end
function EventConsumer:stop()
self._running = false
if self._pid_file and self:_check_pid_file() then
file.remove(self._pid_file)
end
end
end
-------------------------------------------------------------------------------
---
--
-- @param events [string|array]- array of events to subscribe. To specify subclass you
-- can use string like `<EVENT>::<SUBCLASS>` or array like `{<EVENT>, <SUBCLASS>}`.
-- If `events` is string then it specify single event.
-- @param block [booolean?] - by default it use block
-- @param timeout [number?] - by default it 0. If set `block` that means infinity wait.
--
-- @usage
-- -- do blocked itarate over 'MEMCACHE' and 'SHUTDOWN' events
-- for event in ievents{'MEMCACHE','SHUTDOWN'} do ... end
--
-- -- do blocked iterate with timeout 1 sec
-- for event in ievents('SHUTDOWN', 1000) do
-- if event then -- has event
-- else -- timeout
-- end
-- end
local ievents = function(events, block, timeout)
if type(events) == 'string' then
events = freeswitch.EventConsumer(split_event(events))
elseif type(events) == 'table' then
local array = events
events = freeswitch.EventConsumer()
for _, event in ipairs(array) do
local name, class
if type(event) == 'table' then
base, sub = event[1], event[2]
else
name, class = split_event(base)
end
if not class then events:bind(name)
else events:bind(name, class) end
end
end
if type(block) == 'number' then
block, timeout = true, block
end
timeout = timeout or 0
block = block and 1 or 0
return function()
local event = events:pop(block, timeout)
if not event then return false end
return event
end
end
return {
EventConsumer = EventConsumer;
new = EventConsumer.new;
ievents = ievents;
}

View File

@@ -1,30 +0,0 @@
local ievents = function(events, ...)
if type(events) == 'string' then
events = freeswitch.EventConsumer(events)
elseif type(events) == 'table' then
local array = events
events = freeswitch.EventConsumer()
for _, event in ipairs(array) do
local base, sub
if type(event) == 'table' then
base, sub = event[1], event[2]
else
base = event
end
if not sub then events:bind(base)
else events:bind(base, sub) end
end
end
local block, timeout = ...
if timeout and (timeout == 0) then block, timeout = 0, 0 end
timeout = timeout or 0
return function()
local event = events:pop(block, timeout)
if not event then return false end
return event
end
end
return ievents

View File

@@ -1,3 +1,15 @@
-- absolute timer
local fs_time if freeswitch then
local api = require "resources.functions.api"
fs_time = {
now = function() return api:getTime() end;
elapsed = function(t) return api:getTime() - t end;
ms_to_time = function(ms) return ms end;
time_to_ms = function(t) return t end;
}
end
-- absolute timer
local os_time = {
now = function() return os.time() end;
elapsed = function(t) return os.difftime(os.time(), t) end;
@@ -5,6 +17,7 @@ local os_time = {
time_to_ms = function(t) return t * 1000 end;
}
-- monotonic timer (not work on my test Debian system)
local os_clock = {
now = function() return os.clock() end;
elapsed = function(t) return os.clock() - t end;
@@ -12,13 +25,19 @@ local os_clock = {
time_to_ms = function(t) return t * 1000 end;
}
local timers = {
freeswitch = fs_time;
time = os_time;
clock = os_clock;
}
local IntervalTimer = {} do
IntervalTimer.__index = IntervalTimer
function IntervalTimer.new(interval, timer)
local o = setmetatable({}, IntervalTimer)
o._interval = interval
o._timer = timer or os_clock
o._timer = timer and assert(timers[timer], "unknown timer: " .. timer) or os_time
return o
end