Merge pull request #1251 from moteus/fax_queue

Fax queue
This commit is contained in:
FusionPBX
2015-11-25 07:37:28 -08:00
21 changed files with 2222 additions and 265 deletions

View File

@@ -0,0 +1,181 @@
-- @usage without queue
-- api: originate {fax_file='',wav_file='',fax_dtmf=''}user/108@domain.local &lua(fax_queue/exec.lua)
-- @usage with queue task
-- api: originate {task_uuid=''}user/108@domain.local &lua(fax_queue/exec.lua)
-- @fax_dtmf
-- 0-9*# - dtmf symbols
-- @200 - dtmf duration in ms
-- p - pause 500 ms
-- P - pause 1000 ms
--
-- example: pause 5 sec dial 008 pause 2 sec paly greeting
-- PPPPP008@300PP
--
require "resources.functions.config"
local log = require "resources.functions.log".fax_task
-- If we handle queue task
local task_uuid = session:getVariable('task_uuid')
local task if task_uuid then
local Tasks = require "fax_queue.tasks"
task = Tasks.select_task(task_uuid)
if not task then
log.warningf("Can not found fax task: %q", tostring(task_uuid))
return
end
end
if task then
local str = 'Queue task :'
for k, v in pairs(task) do
str = str .. ('\n %q = %q'):format(k, v)
end
log.info(str)
else
log.info('Not queued task')
end
local function empty(t) return (not t) or (#t == 0) end
local function not_empty(t) if not empty(t) then return t end end
local dtmf, wav_file, fax_file
if task then
dtmf = not_empty(task.dtmf)
wav_file = not_empty(task.wav_file) or not_empty(task.greeting)
fax_file = not_empty(task.fax_file)
else
dtmf = not_empty(session:getVariable('fax_dtmf'))
wav_file = not_empty(session:getVariable('wav_file'))
fax_file = not_empty(session:getVariable('fax_file'))
end
if not (wav_file or fax_file) then
log.warning("No fax or wav file")
return
end
local function decode_dtmf(dtmf)
local r, sleep, seq = {}
dtmf:gsub('P', 'pp'):gsub('.', function(ch)
if ch == ';' or ch == ',' then
r[#r + 1] = sleep or seq
sleep, seq = nil
elseif ch == 'p' then
r[#r + 1] = seq
sleep = (sleep or 0) + 500
seq = nil
else
r[#r + 1] = sleep
seq = (seq or '') .. ch
sleep = nil
end
end)
r[#r + 1] = sleep or seq
return r
end
local function send_fax()
session:execute("txfax", fax_file)
end
local function start_fax_detect(detect_duration)
if not tone_detect_cb then
function tone_detect_cb(s, type, obj, arg)
if type == "event" then
if obj:getHeader('Event-Name') == 'DETECTED_TONE' then
return "false"
end
end
end
end
log.notice("Start detecting fax")
detect_duration = detect_duration or 60000
session:setInputCallback("tone_detect_cb")
session:execute("tone_detect", "txfax 2100 r +" .. tostring(detect_duration) .. " set remote_fax_detected=txfax")
session:execute("tone_detect", "rxfax 1100 r +" .. tostring(detect_duration) .. " set remote_fax_detected=rxfax")
session:setVariable("sip_api_on_image", "uuid_break " .. session:getVariable("uuid") .. " all")
end
local function stop_fax_detect()
session:unsetInputCallback()
session:execute("stop_tone_detect")
session:setVariable("sip_api_on_image", "")
end
local function fax_deteced()
if session:getVariable('has_t38') == 'true' then
log.noticef('Detected t38')
session:setVariable('remote_fax_detected', 'txfax')
end
if fax_file and session:getVariable('remote_fax_detected') then
log.noticef("Detected %s", session:getVariable('remote_fax_detected'))
if session:getVariable('remote_fax_detected') == 'txfax' then
send_fax()
else
log.warning('Remote fax try send fax')
end
return true
end
end
local function check()
if not session:ready() then return false end
if fax_deteced() then return false end
return true
end
local function task()
session:waitForAnswer(session)
while not session:answered() do
if not session:ready() then return end
session:sleep(500)
end
if not (session:ready() and session:answered()) then return end
if fax_file and wav_file then
start_fax_detect()
end
if dtmf then
dtmf = decode_dtmf(dtmf)
for _, element in ipairs(dtmf) do
if type(element) == 'number' then
session:streamFile("silence_stream://" .. tostring(element))
else
session:execute("send_dtmf", element)
end
if not check() then return end
end
end
if wav_file then
session:streamFile(wav_file)
if not check() then return end
end
if fax_file then
if wav_file then
stop_fax_detect()
end
send_fax()
end
end
log.noticef("START TASK")
log.notice("Fax:" .. tostring(fax_file))
log.notice("Wav:" .. tostring(wav_file))
task()
log.noticef("STOP TASK")
log.notice("Ready: " .. tostring(session:ready()))
log.notice("Answered: " .. tostring(session:answered()))

View File

@@ -0,0 +1,69 @@
require "resources.functions.config"
require "resources.functions.sleep"
local log = require "resources.functions.log".next_fax_task
local Tasks = require "fax_queue.tasks"
local Esl = require "resources.functions.esl"
local FAX_OPTIONS = {
"fax_use_ecm=false,fax_enable_t38=true,fax_enable_t38_request=true,fax_disable_v17=default";
"fax_use_ecm=true,fax_enable_t38=true,fax_enable_t38_request=true,fax_disable_v17=false";
"fax_use_ecm=true,fax_enable_t38=false,fax_enable_t38_request=false,fax_disable_v17=false";
"fax_use_ecm=true,fax_enable_t38=true,fax_enable_t38_request=true,fax_disable_v17=true";
"fax_use_ecm=false,fax_enable_t38=false,fax_enable_t38_request=false,fax_disable_v17=false";
}
local function next_task()
local task, err = Tasks.next_task()
if not task then
if err then
log.noticef('Can not select next task: %s', tostring(err))
else
log.notice("No task")
end
return
end
local esl
local ok, err = pcall(function()
for k, v in pairs(task) do
print(string.format(" `%s` => `%s`", tostring(k), tostring(v)))
end
local mode = (task.retry_counter % #FAX_OPTIONS) + 1
local dial_string = '{' ..
task.dial_string .. "api_hangup_hook='lua fax_queue/retry.lua'," ..
FAX_OPTIONS[mode] ..
'}' .. task.uri
local originate = 'originate ' .. dial_string .. ' &lua(fax_queue/exec.lua)'
log.notice(originate)
esl = assert(Esl.new())
local ok, err = esl:api(originate)
log.notice(ok or err)
end)
if esl then esl:close() end
if not ok then
Tasks.release_task(task)
log.noticef("Error execute task: %s", tostring(err))
end
return true
end
local function poll_once()
Tasks.cleanup_tasks()
while next_task() do
sleep(5000)
end
Tasks.release_db()
end
return {
poll_once = poll_once;
}

View File

@@ -0,0 +1,346 @@
-- include libraries
require "resources.functions.config";
require "resources.functions.explode";
require "resources.functions.split";
require "resources.functions.count";
local log = require "resources.functions.log".fax_retry
local Database = require "resources.functions.database"
local Settings = require "resources.functions.lazy_settings"
local Tasks = require "fax_queue.tasks"
local task_uuid = env:getHeader('task_uuid')
local task = Tasks.select_task(task_uuid)
if not task then
log.warningf("Can not find fax task: %q", tostring(task_uuid))
return
end
-- show all channel variables
if debug["fax_serialize"] then
log.noticef("info:\n%s", env:serialize())
end
local dbh = Database.new('system')
-- Channel/FusionPBX variables
local uuid = env:getHeader("uuid")
local domain_uuid = env:getHeader("domain_uuid") or task.domain_uuid
local domain_name = env:getHeader("domain_name") or task.domain_name
local origination_caller_id_name = env:getHeader("origination_caller_id_name") or '000000000000000'
local origination_caller_id_number = env:getHeader("origination_caller_id_number") or '000000000000000'
local accountcode = env:getHeader("accountcode")
local duration = tonumber(env:getHeader("billmsec")) or 0
local sip_to_user = env:getHeader("sip_to_user")
local bridge_hangup_cause = env:getHeader("bridge_hangup_cause")
local hangup_cause_q850 = tonumber(env:getHeader("hangup_cause_q850"))
local answered = duration > 0
-- fax variables
local fax_success = env:getHeader('fax_success')
local has_t38 = env:getHeader('has_t38') or 'false'
local t38_broken_boolean = env:getHeader('t38_broken_boolean') or ''
local fax_result_code = tonumber(env:getHeader('fax_result_code')) or 2
local fax_result_text = env:getHeader('fax_result_text') or 'FS_NOT_SET'
local fax_ecm_used = env:getHeader('fax_ecm_used') or ''
local fax_local_station_id = env:getHeader('fax_local_station_id') or ''
local fax_document_transferred_pages = env:getHeader('fax_document_transferred_pages') or nil
local fax_document_total_pages = env:getHeader('fax_document_total_pages') or nil
local fax_image_resolution = env:getHeader('fax_image_resolution') or ''
local fax_image_size = env:getHeader('fax_image_size') or nil
local fax_bad_rows = env:getHeader('fax_bad_rows') or nil
local fax_transfer_rate = env:getHeader('fax_transfer_rate') or nil
local fax_v17_disabled = env:getHeader('fax_v17_disabled') or ''
local fax_ecm_requested = env:getHeader('fax_ecm_requested') or ''
local fax_remote_station_id = env:getHeader('fax_remote_station_id') or ''
local fax_options = ("fax_use_ecm=%s,fax_enable_t38=%s,fax_enable_t38_request=%s,fax_disable_v17=%s"):format(
env:getHeader('fax_use_ecm') or '',
env:getHeader('fax_enable_t38') or '',
env:getHeader('fax_enable_t38_request') or '',
env:getHeader('fax_disable_v17') or ''
)
-- Fax task params
local fax_uri = env:getHeader("fax_uri") or task.uri
local fax_file = env:getHeader("fax_file") or task.fax_file
local wav_file = env:getHeader("wav_file") or task.wav_file
local fax_uuid = task.fax_uuid
-- Email variables
local email_address = env:getHeader("mailto_address")
local from_address = env:getHeader("mailfrom_address") or email_address
local number_dialed = fax_uri:match("/([^/]-)%s*$")
local email_message_fail = "We are sorry the fax failed to go through. It has been attached. Please check the number "..number_dialed..", and if it was correct you might consider emailing it instead."
local email_message_success = "We are happy to report the fax was sent successfully. It has been attached for your records."
log.noticef([[<<< CALL RESULT >>>
uuid: = '%s'
answered: = '%s'
fax_file: = '%s'
wav_file: = '%s'
fax_uri: = '%s'
sip_to_user: = '%s'
accountcode: = '%s'
origination_caller_id_name: = '%s'
origination_caller_id_number: = '%s'
mailfrom_address: = '%s'
mailto_address: = '%s'
hangup_cause_q850: = '%s'
fax_options = '%s'
]],
tostring(uuid) ,
tostring(answered) ,
tostring(fax_file) ,
tostring(wav_file) ,
tostring(fax_uri) ,
tostring(sip_to_user) ,
tostring(accountcode) ,
tostring(origination_caller_id_name) ,
tostring(origination_caller_id_number) ,
tostring(from_address) ,
tostring(email_address) ,
tostring(hangup_cause_q850) ,
fax_options
)
if fax_success then
log.noticef([[<<< FAX RESULT >>>
fax_success = '%s'
has_t38 = '%s'
t38_broken_boolean = '%s'
fax_result_code = '%s'
fax_result_text = '%s'
fax_ecm_used = '%s'
fax_local_station_id = '%s'
fax_document_transferred_pages = '%s'
fax_document_total_pages = '%s'
fax_image_resolution = '%s'
fax_image_size = '%s'
fax_bad_rows = '%s'
fax_transfer_rate = '%s'
fax_v17_disabled = '%s'
fax_ecm_requested = '%s'
fax_remote_station_id = '%s'
'%s'
]],
fax_success ,
has_t38 ,
t38_broken_boolean ,
fax_result_code ,
fax_result_text ,
fax_ecm_used ,
fax_local_station_id ,
fax_document_transferred_pages ,
fax_document_total_pages ,
fax_image_resolution ,
fax_image_size ,
fax_bad_rows ,
fax_transfer_rate ,
fax_v17_disabled ,
fax_ecm_requested ,
fax_remote_station_id ,
'---------------------------------'
)
end
--get the values from the fax file
if not (fax_uuid and domain_name) then
local array = split(fax_file, "[\\/]+")
domain_name = domain_name or array[#array - 3]
local fax_extension = fax_extension or array[#array - 2]
if not fax_uuid then
local sql = "SELECT fax_uuid FROM v_fax "
sql = sql .. "WHERE domain_uuid = '" .. domain_uuid .. "' "
sql = sql .. "AND fax_extension = '" .. fax_extension .. "' "
fax_uuid = dbh:first_value(sql);
end
end
--get the domain_uuid using the domain name required for multi-tenant
if domain_name and not domain_uuid then
local sql = "SELECT domain_uuid FROM v_domains ";
sql = sql .. "WHERE domain_name = '" .. domain_name .. "' "
domain_uuid = dbh:first_value(sql)
end
assert(domain_name and domain_uuid)
--settings
local settings = Settings.new(dbh, domain_name, domain_uuid)
local keep_local = settings:get('fax', 'keep_local','boolean')
local storage_type = (keep_local == "false") and "" or settings:get('fax', 'storage_type', 'text')
--be sure accountcode is not empty
if (accountcode == nil) then
accountcode = domain_name
end
local function opt(v, default)
if v then return "'" .. v .. "'" end
return default or 'NULL'
end
local function now_sql()
return (database["type"] == "sqlite") and "'"..os.date("%Y-%m-%d %X").."'" or "now()";
end
--add to fax logs
do
local fields = {
"fax_log_uuid";
"domain_uuid";
"fax_uuid";
"fax_success";
"fax_result_code";
"fax_result_text";
"fax_file";
"fax_ecm_used";
"fax_local_station_id";
"fax_document_transferred_pages";
"fax_document_total_pages";
"fax_image_resolution";
"fax_image_size";
"fax_bad_rows";
"fax_transfer_rate";
"fax_retry_attempts";
"fax_retry_limit";
"fax_retry_sleep";
"fax_uri";
"fax_date";
"fax_epoch";
}
local values = {
"'"..uuid .. "'";
"'"..domain_uuid .. "'";
opt(fax_uuid);
opt(fax_success);
opt(fax_result_code);
opt(fax_result_text);
opt(fax_file);
opt(fax_ecm_used);
opt(fax_local_station_id);
opt(fax_document_transferred_pages, "'0'");
opt(fax_document_total_pages, "'0'");
opt(fax_image_resolution);
opt(fax_image_size);
opt(fax_bad_rows);
opt(fax_transfer_rate);
opt(fax_retry_attempts);
opt(fax_retry_limit);
opt(fax_retry_sleep);
opt(fax_uri);
now_sql();
"'"..os.time().."' ";
}
local sql = "insert into v_fax_logs(" .. table.concat(fields, ",") .. ")" ..
"values(" .. table.concat(values, ",") .. ")"
if (debug["sql"]) then
log.noticef("SQL: %s", sql);
end
dbh:query(sql);
end
-- add the fax files
if fax_success == "1" then
if storage_type == "base64" then
--include the base64 function
require "resources.functions.base64";
--base64 encode the file
local f = io.open(fax_file, "rb");
if not f then
log.waitng("Can not find file %s", fax_file)
storage_type = nil
else
local file_content = f:read("*all");
f:close()
fax_base64 = base64.encode(file_content)
end
end
-- build SQL
local sql do
sql = {
"insert into v_fax_files(";
"fax_file_uuid"; ",";
"fax_uuid"; ",";
"fax_mode"; ",";
"fax_destination"; ",";
"fax_file_type"; ",";
"fax_file_path"; ",";
"fax_caller_id_name"; ",";
"fax_caller_id_number"; ",";
"fax_date"; ",";
"fax_epoch"; ",";
"fax_base64"; ",";
"domain_uuid"; " ";
") values (";
opt(uuid); ",";
opt(fax_uuid); ",";
"'tx'"; ",";
opt(sip_to_user); ",";
"'tif'"; ",";
opt(fax_file); ",";
opt(origination_caller_id_name); ",";
opt(origination_caller_id_number); ",";
now_sql(); ",";
"'" .. os.time() .. "'"; ",";
opt(fax_base64); ",";
opt(domain_uuid); " ";
")"
}
sql = table.concat(sql, "\n");
if (debug["sql"]) then
log.noticef("SQL: %s", sql);
end
end
if storage_type == "base64" then
local db_type, db_cnn = split_first(database["system"], "://", true)
local luasql = require ("luasql." .. db_type);
local env = assert (luasql[db_type]());
local dbh = env:connect(db_cnn);
dbh:execute(sql)
dbh:close()
env:close()
else
result = dbh:query(sql)
end
end
if fax_success == "1" then
--Success
log.infof("RETRY STATS SUCCESS: GATEWAY[%s]", fax_options);
if keep_local == "false" then
os.remove(fax_file);
end
Tasks.remove_task(task)
end
if fax_success ~= "1" then
if not answered then
log.noticef("no answer: %d", hangup_cause_q850)
else
if not fax_success then
log.noticef("Fax not detected: %s", fax_options)
else
log.noticef("fax fail %s", fax_options)
end
end
Tasks.wait_task(task, answered, hangup_cause_q850)
if task.status ~= 0 then
Tasks.remove_task(task)
end
end

View File

@@ -0,0 +1,251 @@
local Database = require "resources.functions.database"
local Settings = require "resources.functions.lazy_settings"
local db
local date_utc_now_sql
local now_add_sec_sql
if database.type == 'pgsql' then
date_utc_now_sql = "NOW() at time zone 'utc'"
now_add_sec_sql = "NOW() at time zone 'utc' + interval '%s second'"
elseif database.type == 'mysql' then
date_utc_now_sql = "UTC_TIMESTAMP()"
now_add_sec_sql = "DATE_ADD(UTC_TIMESTAMP(), INTERVAL %s SECOND)"
elseif database.type == 'sqlite' then
date_utc_now_sql = "datetime('now')"
now_add_sec_sql = "datetime('now', '%s seconds')"
else
error("unsupported database type: " .. database.type)
end
-- Broken on FS 1.4 with native postgresql
-- Fixed on 1.6.0
-- Also works with ODBC
local ignore_affected_rows = true
if dbh_affected_rows_broken ~= nil then
ignore_affected_rows = dbh_affected_rows_broken
end
local Q850_TIMEOUT = {
[17] = 60;
}
local select_task_common_sql = [[
select
t1.task_uuid as uuid,
t1.fax_uuid as fax_uuid,
t3.domain_name,
t3.domain_uuid,
t1.task_status as status,
t1.task_uri as uri,
t1.task_dial_string as dial_string,
t1.task_dtmf as dtmf,
t1.task_fax_file as fax_file,
t1.task_wav_file as wav_file,
t1.task_no_answer_counter as no_answer_counter,
t1.task_no_answer_retry_counter as no_answer_retry_counter,
t1.task_retry_counter as retry_counter,
t2.fax_send_greeting as greeting
from v_fax_tasks t1
inner join v_fax t2 on t2.fax_uuid = t1.fax_uuid
inner join v_domains t3 on t2.domain_uuid = t3.domain_uuid
where t1.task_interrupted <> 'true'
]]
local next_task_sql = select_task_common_sql .. [[
and t1.task_status = 0 and t1.task_next_time < ]] .. date_utc_now_sql .. [[
and t2.fax_send_channels > (select count(*) from v_fax_tasks as tasks
where tasks.fax_uuid = t1.fax_uuid and
tasks.task_status > 0 and tasks.task_status <= 2
)
order by t1.task_next_time
]]
local select_task_sql = select_task_common_sql .. "and t1.task_uuid='%s'"
local aquire_task_sql = [[
update v_fax_tasks set task_status = 1, task_lock_time = ]] .. date_utc_now_sql .. [[
where task_uuid = '%s' and task_status = 0
]]
local wait_task_sql = [[
update v_fax_tasks
set task_status = %s,
task_lock_time = NULL,
task_no_answer_counter = %s,
task_no_answer_retry_counter = %s,
task_retry_counter = %s,
task_next_time = ]] .. now_add_sec_sql .. [[
where task_uuid = '%s'
]]
local remove_task_task_sql = [[
delete from v_fax_tasks
where task_uuid = '%s'
]]
local release_task_sql = [[
update v_fax_tasks
set task_status = 0, task_lock_time = NULL,
task_next_time = ]] .. now_add_sec_sql .. [[
where task_uuid = '%s'
]]
local release_stuck_tasks_sql = [[
update v_fax_tasks
set task_status = 0, task_lock_time = NULL,
task_next_time = ]] .. date_utc_now_sql .. [[
where task_lock_time < ]] .. now_add_sec_sql:format('-3600') .. [[
]]
local remove_finished_tasks_sql = [[
delete from v_fax_tasks where task_status > 3
]]
local function get_db()
if not db then
db = assert(Database.new('system'))
end
return db
end
local function next_task()
local db = get_db()
while true do
local task, err = db:first_row(next_task_sql)
if not task then return nil, err end
local ok, err = db:query( aquire_task_sql:format(task.uuid) )
if not ok then return nil, err end
local rows = db:affected_rows()
if ignore_affected_rows then
rows = 1
end
if rows == 1 then
task.no_answer_counter = tonumber(task.no_answer_counter)
task.no_answer_retry_counter = tonumber(task.no_answer_retry_counter)
task.retry_counter = tonumber(task.retry_counter)
return task
end
end
end
local function select_task(task_uuid)
local db = get_db()
local task, err = db:first_row(select_task_sql:format(task_uuid))
if not task then return nil, err end
task.no_answer_counter = tonumber(task.no_answer_counter)
task.no_answer_retry_counter = tonumber(task.no_answer_retry_counter)
task.retry_counter = tonumber(task.retry_counter)
return task
end
local function wait_task(task, answered, q850)
local db = get_db()
local interval = 30
local settings = Settings.new(db, task.domain_name, task.domain_uuid)
task.status = 0
if not answered then
interval = Q850_TIMEOUT[q850 or 17] or interval
end
if not answered then
local fax_send_no_answer_retry_limit = tonumber(settings:get('fax', 'send_no_answer_retry_limit', 'numeric')) or 0
task.no_answer_retry_counter = task.no_answer_retry_counter + 1
if task.no_answer_retry_counter >= fax_send_no_answer_retry_limit then
task.no_answer_retry_counter = 0
task.no_answer_counter = task.no_answer_counter + 1
local fax_send_no_answer_limit = tonumber(settings:get('fax', 'send_no_answer_limit', 'numeric')) or 0
if task.no_answer_counter >= fax_send_no_answer_limit then
task.status = 4
else
interval = tonumber(settings:get('fax', 'send_no_answer_interval', 'numeric')) or interval
end
else
interval = tonumber(settings:get('fax', 'send_no_answer_retry_interval', 'numeric')) or interval
end
else
task.retry_counter = task.retry_counter + 1
local fax_send_retry_limit = tonumber(settings:get('fax', 'send_retry_limit', 'numeric')) or 0
if task.retry_counter >= fax_send_retry_limit then
task.status = 4
else
interval = tonumber(settings:get('fax', 'send_retry_interval', 'numeric')) or interval
task.task_seq_call_counter = 0
end
end
local sql = wait_task_sql:format(
tostring(task.status),
tostring(task.no_answer_counter),
tostring(task.no_answer_retry_counter),
tostring(task.retry_counter),
tostring(interval),
task.uuid
)
print(sql)
local ok, err = db:query( sql )
if not ok then return nil, err end
return task
end
local function remove_task(task)
local db = get_db()
local sql = remove_task_task_sql:format(task.uuid)
local ok, err = db:query( sql )
if not ok then return nil, err end
return db:affected_rows()
end
local function release_task(task)
local db = get_db()
local interval = 30
local sql = release_task_sql:format(
tostring(interval),
task.uuid
)
local ok, err = db:query( sql )
if not ok then return nil, err end
return task
end
local function cleanup_tasks()
local db = get_db()
db:query(release_stuck_tasks_sql)
db:query(remove_finished_tasks_sql)
end
return {
release_db = function()
if db then
db:release()
db = nil
end
end;
next_task = next_task;
wait_task = wait_task;
select_task = select_task;
remove_task = remove_task;
release_task = release_task;
cleanup_tasks = cleanup_tasks;
}

View File

@@ -0,0 +1,42 @@
local sleep_interval = 60;
--include config.lua
require "resources.functions.config";
--general functions
require "resources.functions.file_exists";
require "resources.functions.mkdir";
require "resources.functions.sleep";
local log = require "resources.functions.log".fax_queue_monitor
local Next = require "fax_queue.next"
mkdir(scripts_dir .. "/run");
--define the run file
local run_file = scripts_dir .. "/run/fax_queue.tmp";
--used to stop the lua service
local file = assert(io.open(run_file, "w"));
file:write("remove this file to stop the script");
file:close()
log.notice("Start")
while true do
local ok, err = pcall(function()
Next.poll_once()
end)
if not ok then
log.errf("fail poll queue: %s", tostring(err))
end
if not file_exists(run_file) then
break;
end
sleep(sleep_interval * 1000)
end
log.notice("Stop")

View File

@@ -0,0 +1 @@
require "fax_queue.next".poll_once()

View File

@@ -8,7 +8,13 @@
require "resources.functions.trim";
local api = api or freeswitch.API();
local api = api
if (not api) and freeswitch then api = freeswitch.API() else
api = {}
function api:execute()
return '-ERR UNSUPPORTTED'
end
end
local function send_event(action, key)
local event = freeswitch.Event("MEMCACHE", action);

View File

@@ -1,115 +1,190 @@
require 'resources.functions.config'
require 'resources.functions.file_exists'
require 'resources.functions.database_handle'
local unpack = unpack or table.unpack
-----------------------------------------------------------
local OdbcDatabase = {} if not freeswitch then
OdbcDatabase.__index = OdbcDatabase
local Database = {} do
local odbc = require "odbc.dba"
Database.__index = Database
function OdbcDatabase.new(name)
local self = setmetatable({}, OdbcDatabase)
function Database.new(name)
local dbh = assert(name)
if type(name) == 'string' then
if name == 'switch' and file_exists(database_dir.."/core.db") then
dbh = freeswitch.Dbh("sqlite://"..database_dir.."/core.db")
else
dbh = database_handle(name)
end
end
assert(dbh:connected())
local connection_string = assert(database[name])
local self = setmetatable({
_dbh = dbh;
}, Database)
local typ, dsn, user, password = connection_string:match("^(.-)://(.-):(.-):(.-)$")
assert(typ == 'odbc', "unsupported connection string:" .. connection_string)
return self
self._dbh = odbc.Connect(dsn, user, password)
return self
end
function Database:query(sql, fn)
if (fn == nil) then
return self._dbh:query(sql)
else
return self._dbh:query(sql, fn)
end
function OdbcDatabase:query(sql, fn)
self._rows_affected = nil
if fn then
return self._dbh:neach(sql, function(row)
local o = {}
for k, v in pairs(row) do
if v == odbc.NULL then
o[k] = nil
else
o[k] = tostring(v)
end
end
return fn(o)
end)
end
local ok, err = self._dbh:exec(sql)
if not ok then return nil, err end
self._rows_affected = ok
return self._rows_affected
end
function OdbcDatabase:affected_rows()
return self._rows_affected;
end
function OdbcDatabase:release()
if self._dbh then
self._dbh:destroy()
self._dbh = nil
end
end
function OdbcDatabase:connected()
return self._dbh and self._dbh:connected()
end
end
-----------------------------------------------------------
-----------------------------------------------------------
local FsDatabase = {} if freeswitch then
require "resources.functions.file_exists"
require "resources.functions.database_handle"
FsDatabase.__index = FsDatabase
function FsDatabase.new(name)
local dbh = assert(name)
if type(name) == 'string' then
if name == 'switch' and file_exists(database_dir.."/core.db") then
dbh = freeswitch.Dbh("sqlite://"..database_dir.."/core.db")
else
dbh = database_handle(name)
end
end
assert(dbh:connected())
local self = setmetatable({
_dbh = dbh;
}, FsDatabase)
return self
end
function FsDatabase:query(sql, fn)
if fn then
return self._dbh:query(sql, fn)
end
return self._dbh:query(sql)
end
function FsDatabase:affected_rows()
if self._dbh then
return self._dbh:affected_rows()
end
end
function FsDatabase:release()
if self._dbh then
self._dbh:release()
self._dbh = nil
end
end
function FsDatabase:connected()
return self._dbh and self._dbh:connected()
end
end
-----------------------------------------------------------
-----------------------------------------------------------
local Database = {} do
Database.__index = Database
Database.__base = freeswitch and FsDatabase or OdbcDatabase
Database = setmetatable(Database, Database.__base)
function Database.new(...)
local self = Database.__base.new(...)
setmetatable(self, Database)
return self
end
function Database:first_row(sql)
local result
local ok, err = self:query(sql, function(row)
result = row
return 1
end)
if not ok then return nil, err end
return result
local result
local ok, err = self:query(sql, function(row)
result = row
return 1
end)
if not ok then return nil, err end
return result
end
function Database:first_value(sql)
local result, err = self:first_row(sql)
if not result then return nil, err end
local k, v = next(result)
return v
local result, err = self:first_row(sql)
if not result then return nil, err end
local k, v = next(result)
return v
end
function Database:first(sql, ...)
local result, err = self:first_row(sql)
if not result then return nil, err end
local t, n = {}, select('#', ...)
for i = 1, n do
t[i] = result[(select(i, ...))]
end
return unpack(t, 1, n)
local result, err = self:first_row(sql)
if not result then return nil, err end
local t, n = {}, select('#', ...)
for i = 1, n do
t[i] = result[(select(i, ...))]
end
return unpack(t, 1, n)
end
function Database:fetch_all(sql)
local result = {}
local ok, err = self:query(sql, function(row)
result[#result + 1] = row
end)
if not ok then return nil, err end
return result
local result = {}
local ok, err = self:query(sql, function(row)
result[#result + 1] = row
end)
if (not ok) and err then return nil, err end
return result
end
function Database:release(sql)
if self._dbh then
self._dbh:release()
self._dbh = nil
end
end
function Database.__self_test__(...)
local db = Database.new(...)
assert(db:connected())
function Database:connected(sql)
return self._dbh and self._dbh:connected()
end
assert("1" == db:first_value("select 1 as v union all select 2 as v"))
function Database.__self_test__(name)
local db = Database.new(name or 'system')
assert(db:connected())
local t = assert(db:first_row("select '1' as v union all select '2' as v"))
assert(t.v == "1")
assert("1" == db:first_value("select 1 as v union all select 2 as v"))
t = assert(db:fetch_all("select '1' as v union all select '2' as v"))
assert(#t == 2)
assert(t[1].v == "1")
assert(t[2].v == "2")
local t = assert(db:first_row("select 1 as v union all select 2 as v"))
assert(t.v == "1")
local a, b = assert(db:first("select '1' as b, '2' as a", 'a', 'b'))
assert(a == "2")
assert(b == "1")
t = assert(db:fetch_all("select 1 as v union all select 2 as v"))
assert(#t == 2)
assert(t[1].v == "1")
assert(t[2].v == "2")
-- assert(nil == db:first_value("some non sql query"))
local a, b = assert(db:first("select 1 as b, 2 as a", 'a', 'b'))
assert(a == "2")
assert(b == "1")
-- assert(nil == db:first_value("some non sql query"))
db:release()
assert(not db:connected())
print(" * databse - OK!")
db:release()
assert(not db:connected())
print(" * databse - OK!")
end
end
-- if debug.self_test then
-- Database.__self_test__()
-- end
-----------------------------------------------------------
return Database

View File

@@ -0,0 +1,159 @@
local function class(base)
local t = base and setmetatable({}, base) or {}
t.__index = t
t.__class = t
t.__base = base
function t.new(...)
local o = setmetatable({}, t)
if o.__init then
if t == ... then -- we call as Class:new()
return o:__init(select(2, ...))
else -- we call as Class.new()
return o:__init(...)
end
end
return o
end
return t
end
local EventSocket = class() do
if not freeswitch then
local socket = require "socket"
local ESLParser = require "lluv.esl".ESLParser
local split_status = require "lluv.esl.utils".split_status
local Database = require "resources.functions.database"
local EOL = '\n'
local host, port, auth
function EventSocket:__init()
if not host then
local db = Database.new('system')
local settings, err = db:first_row("select event_socket_ip_address, event_socket_port, event_socket_password from v_settings")
if not settings then return nil, err end
host, port, auth = settings.event_socket_ip_address, settings.event_socket_port, settings.event_socket_password
end
return self:_connect(host, port, auth)
end
function EventSocket:_connect(host, port, password)
local err
self._cnn, err = socket.connect(host, port)
if not self._cnn then return nil, err end
self._cnn:settimeout(1)
self._parser = ESLParser.new()
local auth
while true do
local event
event, err = self:_recv_event()
if not event then break end
local ct = event:getHeader('Content-Type')
if ct == 'auth/request' then
self._cnn:send('auth ' .. password .. EOL .. EOL)
elseif ct == 'command/reply' then
local reply = event:getHeader('Reply-Text')
if reply then
local ok, status, msg = split_status(reply)
if ok then auth = true else err = msg end
else
err = 'invalid response'
end
break
end
end
if not auth then
self._cnn:close()
self._cnn = nil
return nil, err
end
return self
end
function EventSocket:_recv_event()
local event, err = self._parser:next_event()
while event == true do
local str, rst
str, err, rst = self._cnn:receive("*l")
if str then self._parser:append(str):append(EOL) end
if rst then self._parser:append(rst) end
if err and err ~= 'timeout' then
break
end
event = self._parser:next_event()
end
if (not event) or (event == true) then
return nil, err
end
return event
end
function EventSocket:_request(cmd)
if not self._cnn then return nil, 'closed' end
for str in (cmd .. '\n'):gmatch("(.-)\n") do
self._cnn:send(str .. EOL)
end
self._cnn:send(EOL)
return self:_recv_event()
end
function EventSocket:api(cmd)
local event, err = self:_request('api ' .. cmd)
if not event then return nil, err end
local body = event:getBody()
if body then return body end
return event:getReply()
end
function EventSocket:close()
if self._cnn then
self._cnn:close()
self._cnn = nil
end
end
end
if freeswitch then
local api
function EventSocket:__init()
self._api = api or freeswitch.API()
api = self._api
return self
end
function EventSocket:api(cmd)
local result = self._api:executeString(cmd)
if result and result:sub(1, 4) == '-ERR' then
return nil, result:sub(5)
end
return result
end
function EventSocket:close()
self._api = nil
end
end
end
return EventSocket

View File

@@ -0,0 +1,172 @@
-- -- Global settings
-- local settings = Settings.new('system')
-- print(settings:get('switch', 'base', 'dir'))
--
-- Domain settings (to `fax_retry.lua`)
-- local Settings = require "resources.functions.settings"
-- local settings = Settings.new(dbh, domain_name, domain_uuid)
-- storage_type = settings:get('fax', 'storage_type', 'text') or ''
-- storage_path = settings:get('fax', 'storage_path', 'text') or ''
-- storage_path = storage_path
-- :gsub("${domain_name}", domain_name)
-- :gsub("${voicemail_id}", voicemail_id)
-- :gsub("${voicemail_dir}", voicemail_dir)
local Database = require "resources.functions.database"
local cache = require "resources.functions.cache"
require "resources.functions.split"
-----------------------------------------------------------
local Settings = {} do
Settings.__index = Settings
local NONE = '15783958-912c-4893-8866-4ccd1ca73c6e'
local function append(t, v)
t[#t+1] = v
return t
end
local function append_setting(array, category, subcategory, name, value)
--add the category array
if not array[category] then
array[category] = {}
end
--add the subcategory array
if not array[category][subcategory] then
array[category][subcategory] = {}
end
--set the name and value
if (name == "array") then
if not array[category][subcategory][name] then
array[category][subcategory][name] = {}
end
append(array[category][subcategory][name], value);
elseif value ~= nil then
array[category][subcategory][name] = value;
end
end
function Settings.new(db, domain_name, domain_uuid)
local self = setmetatable({}, Settings)
self._array = {}
self._db = db
self._domain_name = domain_name
self._domain_uuid = domain_uuid
return self
end
function Settings:_cache_key(category, subcategory, name)
return 'setting:' .. (self._domain_name or '') .. ':' .. category .. ':' .. subcategory .. ':' .. name
end
function Settings:set(category, subcategory, name, value)
append_setting(self._array, category, subcategory, name, value)
return self
end
function Settings:get(category, subcategory, name)
local a = self._array
local v = a[category] and a[category][subcategory] and a[category][subcategory][name]
if v == NONE then return nil end
if v ~= nil then return v end
local key = self:_cache_key(category, subcategory, name)
v = cache.get(key)
if v then
if v ~= NONE and name == 'array' then
v = split(v, '/+/', true)
end
self:set(category, subcategory, name, v)
if v == NONE then return nil end
return v
end
return self:_load(category, subcategory, name)
end
function Settings:_load(category, subcategory, name)
local domain_uuid = self._domain_uuid
local db = self._db
if type(self._db) == 'string' then
db = Database.new(self._db)
end
local found = false
--get the domain settings
if domain_uuid then
sql = "SELECT domain_setting_uuid,domain_setting_category,domain_setting_subcategory,domain_setting_name,domain_setting_value "
sql = sql .. "FROM v_domain_settings ";
sql = sql .. "WHERE domain_uuid = '" .. domain_uuid .. "'";
sql = sql .. "AND domain_setting_enabled = 'true' ";
sql = sql .. "AND domain_setting_category = '" .. category .."'";
sql = sql .. "AND domain_setting_subcategory = '" .. subcategory .. "'";
sql = sql .. "AND domain_setting_name = '" .. name .. "'";
sql = sql .. "AND domain_setting_value is not null ";
sql = sql .. "ORDER BY domain_setting_category, domain_setting_subcategory ASC ";
db:query(sql, function(row)
found = true;
self:set(
row.domain_setting_category,
row.domain_setting_subcategory,
row.domain_setting_name,
row.domain_setting_value
)
end)
end
if not found then
local sql = "SELECT default_setting_uuid,default_setting_category,default_setting_subcategory,default_setting_name,default_setting_value "
sql = sql .. "FROM v_default_settings ";
sql = sql .. "WHERE default_setting_enabled = 'true' ";
sql = sql .. "AND default_setting_category = '" .. category .."'";
sql = sql .. "AND default_setting_subcategory = '" .. subcategory .. "'";
sql = sql .. "AND default_setting_name = '" .. name .. "'";
sql = sql .. "AND default_setting_value is not null ";
sql = sql .. "ORDER BY default_setting_category, default_setting_subcategory ASC";
db:query(sql, function(row)
found = true;
self:set(
row.default_setting_category,
row.default_setting_subcategory,
row.default_setting_name,
row.default_setting_value
)
end)
end
if not found then
self:set(category, subcategory, name, NONE)
end
local a = self._array
local v = a[category] and a[category][subcategory] and a[category][subcategory][name]
if cache.support() then
local key = self:_cache_key(category, subcategory, name)
local value = v
if v ~= NONE and name == 'array' then
value = table.concat(v, '/+/')
end
local exp = expire and expire["settings"] or 3600
cache.set(key, value, exp)
end
if type(self._db) == 'string' then
db:release()
end
if v == NONE then return nil end
return v
end
end
-----------------------------------------------------------
return Settings

View File

@@ -3,9 +3,14 @@
-- log.noticef("%s %s", "hello", "world")
-- -- log if debug.SQL or debug.xml_handler.SQL then
-- log.tracef("SQL", "SQL is %s", sql)
local function log(name, level, msg)
freeswitch.consoleLog(level, "[" .. name .. "] " .. msg .. "\n")
local log if freeswitch then
log = function (name, level, msg)
freeswitch.consoleLog(level, "[" .. name .. "] " .. msg .. "\n")
end
else
log = function (name, level, msg)
print(os.date("%Y-%m-%d %X") .. '[' .. level:upper() .. '] [' .. name .. '] ' .. msg)
end
end
local function logf(name, level, ...)

View File

@@ -0,0 +1,15 @@
if freeswitch then
function sleep(ms)
freeswitch.msleep(ms)
end
else
local socket = require "socket"
function sleep(ms)
socket.sleep(ms/1000)
end
end

View File

@@ -0,0 +1,27 @@
function split(str, sep, plain)
local b, res = 1, {}
while b <= #str do
local e, e2 = string.find(str, sep, b, plain)
if e then
res[#res + 1] = string.sub(str, b, e-1)
b = e2 + 1
else
res[#res + 1] = string.sub(str, b)
break
end
end
return res
end
function split_first(str, sep, plain)
local e, e2 = string.find(str, sep, nil, plain)
if e then
return string.sub(str, 1, e - 1), string.sub(str, e2 + 1)
end
return str
end
local unpack = unpack or table.unpack
function usplit(...) return unpack(split(...)) end