From 871b632d4935d7a67695be81fda70b4b62565fc7 Mon Sep 17 00:00:00 2001 From: Alexey Melnichuk Date: Wed, 18 Nov 2015 16:27:55 +0300 Subject: [PATCH] Add. Lua part of fax_queue. To poll active task you can run `luarun fax_queue_poll_once.lua` --- resources/install/scripts/fax_queue/exec.lua | 185 ++++++++++ resources/install/scripts/fax_queue/next.lua | 69 ++++ resources/install/scripts/fax_queue/retry.lua | 345 ++++++++++++++++++ resources/install/scripts/fax_queue/tasks.lua | 223 +++++++++++ .../install/scripts/fax_queue_poll_once.lua | 1 + .../scripts/resources/functions/cache.lua | 8 +- .../scripts/resources/functions/database.lua | 237 ++++++++---- .../scripts/resources/functions/esl.lua | 159 ++++++++ .../resources/functions/lazy_settings.lua | 172 +++++++++ .../scripts/resources/functions/log.lua | 11 +- .../scripts/resources/functions/sleep.lua | 15 + 11 files changed, 1340 insertions(+), 85 deletions(-) create mode 100644 resources/install/scripts/fax_queue/exec.lua create mode 100644 resources/install/scripts/fax_queue/next.lua create mode 100644 resources/install/scripts/fax_queue/retry.lua create mode 100644 resources/install/scripts/fax_queue/tasks.lua create mode 100644 resources/install/scripts/fax_queue_poll_once.lua create mode 100644 resources/install/scripts/resources/functions/esl.lua create mode 100644 resources/install/scripts/resources/functions/lazy_settings.lua create mode 100644 resources/install/scripts/resources/functions/sleep.lua diff --git a/resources/install/scripts/fax_queue/exec.lua b/resources/install/scripts/fax_queue/exec.lua new file mode 100644 index 0000000000..2ecf66bcc7 --- /dev/null +++ b/resources/install/scripts/fax_queue/exec.lua @@ -0,0 +1,185 @@ +-- @usage without queue +-- api: originate {fax_file='',wav_file='',fax_dtmf=''}user/108@domain.local &lua(fax_task.lua) +-- @usage with queue task +-- api: originate {task_uuid=''}user/108@domain.local &lua(fax_task.lua) +-- @fax_dtmf +-- 0-9*# - dtmf symbols +-- @200 - dtmf duration in ms +-- p - pause 500 ms +-- P - pause 1000 ms +-- +-- example: pause 5 sec dial 008 pause 2 sec paly greeting +-- PPPPP008@300PP +-- +-- api: originate {fax_file='d:/fax/file1.tiff',wav_file='c:/FreeSWITCH/sounds/music/8000/suite-espanola-op-47-leyenda_x8.wav',fax_dtmf=''}user/101@sip.office.intelcom-tg.ru &lua(fax_task.lua) + +require "resources.functions.config" +local log = require "resources.functions.log".fax_task + +-- If we handle queue task +local task_uuid = session:getVariable('task_uuid') +local task if task_uuid then + local Tasks = require "fax_queue.tasks" + task = Tasks.select_task(task_uuid) + if not task then + log.warningf("Can not found fax task: %q", tostring(task_uuid)) + return + end +end + +if task then + local str = 'Queue task :' + for k, v in pairs(task) do + str = str .. ('\n %q = %q'):format(k, v) + end + log.info(str) +else + log.info('Not queued task') +end + +local function empty(t) return (not t) or (#t == 0) end + +local function not_empty(t) if not empty(t) then return t end end + +local dtmf, wav_file, fax_file + +if task then + dtmf = not_empty(task.dtmf) + wav_file = not_empty(task.wav_file) or not_empty(task.greeting) + fax_file = not_empty(task.fax_file) +else + dtmf = not_empty(session:getVariable('fax_dtmf')) + wav_file = not_empty(session:getVariable('wav_file')) + fax_file = not_empty(session:getVariable('fax_file')) +end + +if not (wav_file or fax_file) then + log.warning("No fax or wav file") + return +end + +local function decode_dtmf(dtmf) + local r, sleep, seq = {} + dtmf:gsub('P', 'pp'):gsub('.', function(ch) + -- print(ch) + if ch == ';' or ch == ',' then + r[#r + 1] = sleep or seq + sleep, seq = nil + elseif ch == 'p' then + r[#r + 1] = seq + sleep = (sleep or 0) + 500 + seq = nil + else + r[#r + 1] = sleep + seq = (seq or '') .. ch + sleep = nil + end + end) + r[#r + 1] = sleep or seq + return r +end + +local function send_fax() + session:execute("txfax", fax_file) +end + +local function start_fax_detect(detect_duration) + if not tone_detect_cb then + function tone_detect_cb(s, type, obj, arg) + if type == "event" then + if obj:getHeader('Event-Name') == 'DETECTED_TONE' then + return "false" + end + end + end + end + + log.notice("Start detecting fax") + + detect_duration = detect_duration or 60000 + + session:setInputCallback("tone_detect_cb") + session:execute("tone_detect", "txfax 2100 r +" .. tostring(detect_duration) .. " set remote_fax_detected=txfax") + session:execute("tone_detect", "rxfax 1100 r +" .. tostring(detect_duration) .. " set remote_fax_detected=rxfax") + session:setVariable("sip_api_on_image", "uuid_break " .. session:getVariable("uuid") .. " all") +end + +local function stop_fax_detect() + session:unsetInputCallback() + session:execute("stop_tone_detect") + session:setVariable("sip_api_on_image", "") +end + +local function fax_deteced() + if session:getVariable('has_t38') == 'true' then + log.noticef('Detected t38') + session:setVariable('remote_fax_detected', 'txfax') + end + + if fax_file and session:getVariable('remote_fax_detected') then + log.noticef("Detected %s", session:getVariable('remote_fax_detected')) + if session:getVariable('remote_fax_detected') == 'txfax' then + send_fax() + else + log.warning('Remote fax try send fax') + end + return true + end +end + +local function check() + if not session:ready() then return false end + if fax_deteced() then return false end + return true +end + +local function task() + session:waitForAnswer(session) + + while not session:answered() do + if not session:ready() then return end + session:sleep(500) + end + + log.notice("11111") + if not (session:ready() and session:answered()) then return end + log.notice("2222") + + if fax_file and wav_file then + start_fax_detect() + end + + if dtmf then + dtmf = decode_dtmf(dtmf) + for _, element in ipairs(dtmf) do + if type(element) == 'number' then + session:streamFile("silence_stream://" .. tostring(element)) + else + session:execute("send_dtmf", element) + end + if not check() then return end + end + end + + if wav_file then + session:streamFile(wav_file) + if not check() then return end + end + + if fax_file then + if wav_file then + stop_fax_detect() + end + send_fax() + end +end + +log.noticef("START TASK") +log.notice("Fax:" .. tostring(fax_file)) +log.notice("Wav:" .. tostring(wav_file)) + +task() + +log.noticef("STOP TASK") +log.notice("Ready: " .. tostring(session:ready())) +log.notice("answered: " .. tostring(session:answered())) diff --git a/resources/install/scripts/fax_queue/next.lua b/resources/install/scripts/fax_queue/next.lua new file mode 100644 index 0000000000..0067d0e9b4 --- /dev/null +++ b/resources/install/scripts/fax_queue/next.lua @@ -0,0 +1,69 @@ +require "resources.functions.config" + +require "resources.functions.sleep" +local log = require "resources.functions.log".next_fax_task +local Tasks = require "fax_queue.tasks" +local Esl = require "resources.functions.esl" + +local FAX_OPTIONS = { + "fax_use_ecm=false,fax_enable_t38=true,fax_enable_t38_request=true,fax_disable_v17=default"; + "fax_use_ecm=true,fax_enable_t38=true,fax_enable_t38_request=true,fax_disable_v17=false"; + "fax_use_ecm=true,fax_enable_t38=false,fax_enable_t38_request=false,fax_disable_v17=false"; + "fax_use_ecm=true,fax_enable_t38=true,fax_enable_t38_request=true,fax_disable_v17=true"; + "fax_use_ecm=false,fax_enable_t38=false,fax_enable_t38_request=false,fax_disable_v17=false"; +} + +local function next_task() + local task, err = Tasks.next_task() + + if not task then + if err then + log.noticef('Can not select next task: %s', tostring(err)) + else + log.notice("No task") + end + return + end + + local esl + local ok, err = pcall(function() + + for k, v in pairs(task) do + print(string.format(" `%s` => `%s`", tostring(k), tostring(v))) + end + + local mode = (task.retry_counter % #FAX_OPTIONS) + 1 + local dial_string = '{' .. + task.dial_string .. "api_hangup_hook='lua fax_queue/retry.lua'," .. + FAX_OPTIONS[mode] .. + '}' .. task.uri + + local originate = 'originate ' .. dial_string .. ' &lua(fax_queue/exec.lua)' + + log.notice(originate) + esl = assert(Esl.new()) + local ok, err = esl:api(originate) + log.notice(ok or err) + end) + + if esl then esl:close() end + + if not ok then + Tasks.release_task(task) + log.noticef("Error execute task: %s", tostring(err)) + end + + return true +end + +local function poll_once() + Tasks.cleanup_tasks() + while next_task() do + sleep(5000) + end + Tasks.release_db() +end + +return { + poll_once = poll_once; +} diff --git a/resources/install/scripts/fax_queue/retry.lua b/resources/install/scripts/fax_queue/retry.lua new file mode 100644 index 0000000000..2adcd45b27 --- /dev/null +++ b/resources/install/scripts/fax_queue/retry.lua @@ -0,0 +1,345 @@ +-- include libraries + require "resources.functions.config"; + require "resources.functions.explode"; + require "resources.functions.split"; + require "resources.functions.count"; + + local log = require "resources.functions.log".fax_retry + local Database = require "resources.functions.database" + local Settings = require "resources.functions.lazy_settings" + local Tasks = require "fax_queue.tasks" + + local task_uuid = env:getHeader('task_uuid') + local task = Tasks.select_task(task_uuid) + if not task then + log.warningf("Can not find fax task: %q", tostring(task_uuid)) + return + end + +-- show all channel variables + if debug["fax_serialize"] then + log.noticef("info:\n%s", env:serialize()) + end + + local dbh = Database.new('system') + +-- Channel/FusionPBX variables + local uuid = env:getHeader("uuid") + local domain_uuid = env:getHeader("domain_uuid") or task.domain_uuid + local domain_name = env:getHeader("domain_name") or task.domain_name + local origination_caller_id_name = env:getHeader("origination_caller_id_name") or '000000000000000' + local origination_caller_id_number = env:getHeader("origination_caller_id_number") or '000000000000000' + local accountcode = env:getHeader("accountcode") + local duration = tonumber(env:getHeader("billmsec")) or 0 + local sip_to_user = env:getHeader("sip_to_user") + local bridge_hangup_cause = env:getHeader("bridge_hangup_cause") + local hangup_cause_q850 = tonumber(env:getHeader("hangup_cause_q850")) + local answered = duration > 0 + +-- fax variables + local fax_success = env:getHeader('fax_success') + local has_t38 = env:getHeader('has_t38') or 'false' + local t38_broken_boolean = env:getHeader('t38_broken_boolean') or '' + local fax_result_code = tonumber(env:getHeader('fax_result_code')) or 2 + local fax_result_text = env:getHeader('fax_result_text') or 'FS_NOT_SET' + local fax_ecm_used = env:getHeader('fax_ecm_used') or '' + local fax_local_station_id = env:getHeader('fax_local_station_id') or '' + local fax_document_transferred_pages = env:getHeader('fax_document_transferred_pages') or nil + local fax_document_total_pages = env:getHeader('fax_document_total_pages') or nil + local fax_image_resolution = env:getHeader('fax_image_resolution') or '' + local fax_image_size = env:getHeader('fax_image_size') or nil + local fax_bad_rows = env:getHeader('fax_bad_rows') or nil + local fax_transfer_rate = env:getHeader('fax_transfer_rate') or nil + local fax_v17_disabled = env:getHeader('fax_v17_disabled') or '' + local fax_ecm_requested = env:getHeader('fax_ecm_requested') or '' + local fax_remote_station_id = env:getHeader('fax_remote_station_id') or '' + + local fax_options = ("fax_use_ecm=%s,fax_enable_t38=%s,fax_enable_t38_request=%s,fax_disable_v17=%s"):format( + env:getHeader('fax_use_ecm') or '', + env:getHeader('fax_enable_t38') or '', + env:getHeader('fax_enable_t38_request') or '', + env:getHeader('fax_disable_v17') or '' + ) + +-- Fax task params + local fax_uri = env:getHeader("fax_uri") or task.uri + local fax_file = env:getHeader("fax_file") or task.fax_file + local wav_file = env:getHeader("wav_file") or task.wav_file + local fax_uuid = task.fax_uuid + +-- Email variables + local email_address = env:getHeader("mailto_address") + local from_address = env:getHeader("mailfrom_address") or email_address + local number_dialed = fax_uri:match("/([^/]-)%s*$") + local email_message_fail = "We are sorry the fax failed to go through. It has been attached. Please check the number "..number_dialed..", and if it was correct you might consider emailing it instead." + local email_message_success = "We are happy to report the fax was sent successfully. It has been attached for your records." + + log.noticef([[<<< CALL RESULT >>> + uuid: = '%s' + answered: = '%s' + fax_file: = '%s' + wav_file: = '%s' + fax_uri: = '%s' + sip_to_user: = '%s' + accountcode: = '%s' + origination_caller_id_name: = '%s' + origination_caller_id_number: = '%s' + mailfrom_address: = '%s' + mailto_address: = '%s' + hangup_cause_q850: = '%s' + fax_options = '%s' +]], + tostring(uuid) , + tostring(answered) , + tostring(fax_file) , + tostring(wav_file) , + tostring(fax_uri) , + tostring(sip_to_user) , + tostring(accountcode) , + tostring(origination_caller_id_name) , + tostring(origination_caller_id_number) , + tostring(mailfrom_address) , + tostring(mailto_address) , + tostring(hangup_cause_q850) , + fax_options +) + + if fax_success then + log.noticef([[<<< FAX RESULT >>> + fax_success = '%s' + has_t38 = '%s' + t38_broken_boolean = '%s' + fax_result_code = '%s' + fax_result_text = '%s' + fax_ecm_used = '%s' + fax_local_station_id = '%s' + fax_document_transferred_pages = '%s' + fax_document_total_pages = '%s' + fax_image_resolution = '%s' + fax_image_size = '%s' + fax_bad_rows = '%s' + fax_transfer_rate = '%s' + fax_v17_disabled = '%s' + fax_ecm_requested = '%s' + fax_remote_station_id = '%s' + '%s' +]], + fax_success , + has_t38 , + t38_broken_boolean , + fax_result_code , + fax_result_text , + fax_ecm_used , + fax_local_station_id , + fax_document_transferred_pages , + fax_document_total_pages , + fax_image_resolution , + fax_image_size , + fax_bad_rows , + fax_transfer_rate , + fax_v17_disabled , + fax_ecm_requested , + fax_remote_station_id , + '---------------------------------' + ) + end + +--get the values from the fax file + if not (fax_uuid and domain_name) then + local array = split(fax_file, "[\\/]+") + domain_name = domain_name or array[#array - 3] + local fax_extension = fax_extension or array[#array - 2] + + if not fax_uuid then + local sql = "SELECT fax_uuid FROM v_fax " + sql = sql .. "WHERE domain_uuid = '" .. domain_uuid .. "' " + sql = sql .. "AND fax_extension = '" .. fax_extension .. "' " + fax_uuid = dbh:first_value(sql); + end + end + +--get the domain_uuid using the domain name required for multi-tenant + if domain_name and not domain_uuid then + local sql = "SELECT domain_uuid FROM v_domains "; + sql = sql .. "WHERE domain_name = '" .. domain_name .. "' " + domain_uuid = dbh:first_value(sql) + end + + assert(domain_name and domain_uuid) + +--settings + local settings = Settings.new(dbh, domain_name, domain_uuid) + local keep_local = settings:get('fax', 'keep_local','boolean') + local storage_type = (keep_local == "false") and "" or settings:get('fax', 'storage_type', 'text') + +--be sure accountcode is not empty + if (accountcode == nil) then + accountcode = domain_name + end + + local function opt(v, default) + if v then return "'" .. v .. "'" end + return default or 'NULL' + end + + local function now_sql() + return (database["type"] == "sqlite") and "'"..os.date("%Y-%m-%d %X").."'" or "now()"; + end + +--add to fax logs + do + local fields = { + "fax_log_uuid"; + "domain_uuid"; + "fax_uuid"; + "fax_success"; + "fax_result_code"; + "fax_result_text"; + "fax_file"; + "fax_ecm_used"; + "fax_local_station_id"; + "fax_document_transferred_pages"; + "fax_document_total_pages"; + "fax_image_resolution"; + "fax_image_size"; + "fax_bad_rows"; + "fax_transfer_rate"; + "fax_retry_attempts"; + "fax_retry_limit"; + "fax_retry_sleep"; + "fax_uri"; + "fax_date"; + "fax_epoch"; + } + + local values = { + "'"..uuid .. "'"; + "'"..domain_uuid .. "'"; + opt(fax_uuid); + opt(fax_success); + opt(fax_result_code); + opt(fax_result_text); + opt(fax_file); + opt(fax_ecm_used); + opt(fax_local_station_id); + opt(fax_document_transferred_pages, "'0'"); + opt(fax_document_total_pages, "'0'"); + opt(fax_image_resolution); + opt(fax_image_size); + opt(fax_bad_rows); + opt(fax_transfer_rate); + opt(fax_retry_attempts); + opt(fax_retry_limit); + opt(fax_retry_sleep); + opt(fax_uri); + now_sql(); + "'"..os.time().."' "; + } + + local sql = "insert into v_fax_logs(" .. table.concat(fields, ",") .. ")" .. + "values(" .. table.concat(values, ",") .. ")" + + if (debug["sql"]) then + freeswitch.consoleLog("notice", "[FAX] retry: "..sql.."\n"); + end + dbh:query(sql); + end + +-- add the fax files + if fax_success == "1" then + + if storage_type == "base64" then + --include the base64 function + require "resources.functions.base64"; + + --base64 encode the file + local f = io.open(fax_file, "rb"); + if not f then + log.waitng("Can not find file %s", fax_file) + storage_type = nil + else + local file_content = f:read("*all"); + f:close() + fax_base64 = base64.encode(file_content) + end + end + + -- build SQL + local sql do + sql = { + "insert into v_fax_files("; + "fax_file_uuid"; ","; + "fax_uuid"; ","; + "fax_mode"; ","; + "fax_destination"; ","; + "fax_file_type"; ","; + "fax_file_path"; ","; + "fax_caller_id_name"; ","; + "fax_caller_id_number"; ","; + "fax_date"; ","; + "fax_epoch"; ","; + "fax_base64"; ","; + "domain_uuid"; " "; + ") values ("; + opt(uuid); ","; + opt(fax_uuid); ","; + "'tx'"; ","; + opt(sip_to_user); ","; + "'tif'"; ","; + opt(fax_file); ","; + opt(origination_caller_id_name); ","; + opt(origination_caller_id_number); ","; + now_sql(); ","; + "'" .. os.time() .. "'"; ","; + opt(fax_base64); ","; + opt(domain_uuid); " "; + ")" + } + + sql = table.concat(sql, "\n"); + if (debug["sql"]) then + freeswitch.consoleLog("notice", "[FAX] SQL: " .. sql .. "\n"); + end + end + + if storage_type == "base64" then + local db_type, db_cnn = split_first(database["system"], "://", true) + local luasql = require ("luasql." .. db_type); + local env = assert (luasql[db_type]()); + local dbh = env:connect(db_cnn); + dbh:execute(sql) + dbh:close() + env:close() + else + result = dbh:query(sql) + end + end + + if fax_success == "1" then + --Success + log.info("RETRY STATS SUCCESS: GATEWAY[%s] VARS[%s]", fax_options, fax_trial); + + if keep_local == "false" then + os.remove(fax_file); + end + + Tasks.remove_task(task) + end + + if fax_success ~= "1" then + if not answered then + log.noticef("no answer: %d", hangup_cause_q850) + else + if not fax_success then + log.noticef("Fax not detected: %s", fax_options) + else + log.noticef("fax fail %s", fax_options) + end + end + + Tasks.wait_task(task, answered, hangup_cause_q850) + if task.status ~= 0 then + Tasks.remove_task(task) + end + end + diff --git a/resources/install/scripts/fax_queue/tasks.lua b/resources/install/scripts/fax_queue/tasks.lua new file mode 100644 index 0000000000..31eeb70009 --- /dev/null +++ b/resources/install/scripts/fax_queue/tasks.lua @@ -0,0 +1,223 @@ +local Database = require "resources.functions.database" +local Settings = require "resources.functions.lazy_settings" + +local db + +local Q850_TIMEOUT = { + [17] = 60; +} + +local select_task_common_sql = [[ +select + t1.task_uuid as uuid, + t1.fax_uuid as fax_uuid, + t3.domain_name, + t3.domain_uuid, + t1.task_status as status, + t1.task_uri as uri, + t1.task_dial_string as dial_string, + t1.task_dtmf as dtmf, + t1.task_fax_file as fax_file, + t1.task_wav_file as wav_file, + t1.task_no_answer_counter as no_answer_counter, + t1.task_no_answer_retry_counter as no_answer_retry_counter, + t1.task_retry_counter as retry_counter, + t2.fax_send_greeting as greeting +from v_fax_tasks t1 + inner join v_fax t2 on t2.fax_uuid = t1.fax_uuid + inner join v_domains t3 on t2.domain_uuid = t3.domain_uuid +where t1.task_interrupted <> 'true' +]] + +local next_task_sql = select_task_common_sql .. [[ +and t1.task_status = 0 and t1.task_next_time < NOW() +and t2.fax_send_channels > (select count(*) from v_fax_tasks as tasks + where tasks.fax_uuid = t1.fax_uuid and + tasks.task_status > 0 and tasks.task_status <= 2 +) +order by t1.task_next_time +]] + +local select_task_sql = select_task_common_sql .. "and t1.task_uuid='%s'" + +local aquire_task_sql = [[ + update v_fax_tasks set task_status = 1, task_lock_time = NOW() + where task_uuid = '%s' and task_status = 0 +]] + +local wait_task_sql = [[ + update v_fax_tasks + set task_status = %s, + task_lock_time = NULL, + task_no_answer_counter = %s, + task_no_answer_retry_counter = %s, + task_retry_counter = %s, + task_next_time = NOW() + interval '%s second' + where task_uuid = '%s' +]] + +local remove_task_task_sql = [[ + delete from v_fax_tasks + where task_uuid = '%s' +]] + +local release_task_sql = [[ + update v_fax_tasks + set task_status = 0, task_lock_time = NULL, + task_next_time = NOW() + interval '%s second' + where task_uuid = '%s' +]] + +local release_stuck_tasks_sql = [[ + update v_fax_tasks + set task_status = 0, task_lock_time = NULL, + task_next_time = NOW() + where task_lock_time < NOW() + interval '3600 second' +]] + +local remove_finished_tasks_sql = [[ + delete from v_fax_tasks where task_status > 3 +]] + +local function get_db() + if not db then + db = assert(Database.new('system')) + end + return db +end + +local function next_task() + local db = get_db() + + while true do + local task, err = db:first_row(next_task_sql) + if not task then return nil, err end + local ok, err = db:query( aquire_task_sql:format(task.uuid) ) + if not ok then return nil, err end + if db:affected_rows() == 1 then + task.no_answer_counter = tonumber(task.no_answer_counter) + task.no_answer_retry_counter = tonumber(task.no_answer_retry_counter) + task.retry_counter = tonumber(task.retry_counter) + return task + end + end +end + +local function select_task(task_uuid) + local db = get_db() + + local task, err = db:first_row(select_task_sql:format(task_uuid)) + if not task then return nil, err end + + task.no_answer_counter = tonumber(task.no_answer_counter) + task.no_answer_retry_counter = tonumber(task.no_answer_retry_counter) + task.retry_counter = tonumber(task.retry_counter) + + return task +end + +local function wait_task(task, answered, q850) + local db = get_db() + + local interval = 30 + + local settings = Settings.new(db, task.domain_name, task.domain_uuid) + task.status = 0 + + if not answered then + interval = Q850_TIMEOUT[q850 or 17] or interval + end + + if not answered then + local fax_send_no_answer_retry_limit = tonumber(settings:get('fax', 'send_no_answer_retry_limit', 'numeric')) or 0 + task.no_answer_retry_counter = task.no_answer_retry_counter + 1 + + if task.no_answer_retry_counter >= fax_send_no_answer_retry_limit then + task.no_answer_retry_counter = 0 + task.no_answer_counter = task.no_answer_counter + 1 + local fax_send_no_answer_limit = tonumber(settings:get('fax', 'send_no_answer_limit', 'numeric')) or 0 + if task.no_answer_counter >= fax_send_no_answer_limit then + task.status = 4 + else + interval = tonumber(settings:get('fax', 'send_no_answer_interval', 'numeric')) or interval + end + else + interval = tonumber(settings:get('fax', 'send_no_answer_retry_interval', 'numeric')) or interval + end + else + task.retry_counter = task.retry_counter + 1 + local fax_send_retry_limit = tonumber(settings:get('fax', 'send_retry_limit', 'numeric')) or 0 + + if task.retry_counter >= fax_send_retry_limit then + task.status = 4 + else + interval = tonumber(settings:get('fax', 'send_retry_interval', 'numeric')) or interval + task.task_seq_call_counter = 0 + end + end + + local sql = wait_task_sql:format( + tostring(task.status), + tostring(task.no_answer_counter), + tostring(task.no_answer_retry_counter), + tostring(task.retry_counter), + tostring(interval), + task.uuid + ) + + print(sql) + + local ok, err = db:query( sql ) + + if not ok then return nil, err end + + return task +end + +local function remove_task(task) + local db = get_db() + + local sql = remove_task_task_sql:format(task.uuid) + local ok, err = db:query( sql ) + if not ok then return nil, err end + return db:affected_rows() +end + +local function release_task(task) + local db = get_db() + + local interval = 30 + + local sql = release_task_sql:format( + tostring(interval), + task.uuid + ) + + local ok, err = db:query( sql ) + + if not ok then return nil, err end + + return task +end + +local function cleanup_tasks() + local db = get_db() + + db:query(release_stuck_tasks_sql) + db:query(remove_finished_tasks_sql) +end + +return { + release_db = function() + if db then + db:release() + db = nil + end + end; + next_task = next_task; + wait_task = wait_task; + select_task = select_task; + remove_task = remove_task; + release_task = release_task; + cleanup_tasks = cleanup_tasks; +} diff --git a/resources/install/scripts/fax_queue_poll_once.lua b/resources/install/scripts/fax_queue_poll_once.lua new file mode 100644 index 0000000000..c42ee62c72 --- /dev/null +++ b/resources/install/scripts/fax_queue_poll_once.lua @@ -0,0 +1 @@ +require "fax_queue.next".poll_once() \ No newline at end of file diff --git a/resources/install/scripts/resources/functions/cache.lua b/resources/install/scripts/resources/functions/cache.lua index 06548af1cd..6d121eb995 100644 --- a/resources/install/scripts/resources/functions/cache.lua +++ b/resources/install/scripts/resources/functions/cache.lua @@ -8,7 +8,13 @@ require "resources.functions.trim"; -local api = api or freeswitch.API(); +local api = api +if (not api) and freeswitch then api = freeswitch.API() else +api = {} +function api:execute() + return '-ERR UNSUPPORTTED' +end +end local function send_event(action, key) local event = freeswitch.Event("MEMCACHE", action); diff --git a/resources/install/scripts/resources/functions/database.lua b/resources/install/scripts/resources/functions/database.lua index f8939126ff..17abe280ba 100644 --- a/resources/install/scripts/resources/functions/database.lua +++ b/resources/install/scripts/resources/functions/database.lua @@ -1,115 +1,190 @@ require 'resources.functions.config' -require 'resources.functions.file_exists' -require 'resources.functions.database_handle' -local unpack = unpack or table.unpack +----------------------------------------------------------- +local OdbcDatabase = {} if not freeswitch then +OdbcDatabase.__index = OdbcDatabase -local Database = {} do +local odbc = require "odbc.dba" -Database.__index = Database +function OdbcDatabase.new(name) + local self = setmetatable({}, OdbcDatabase) -function Database.new(name) - local dbh = assert(name) - if type(name) == 'string' then - if name == 'switch' and file_exists(database_dir.."/core.db") then - dbh = freeswitch.Dbh("sqlite://"..database_dir.."/core.db") - else - dbh = database_handle(name) - end - end - assert(dbh:connected()) + local connection_string = assert(database[name]) - local self = setmetatable({ - _dbh = dbh; - }, Database) + local typ, dsn, user, password = connection_string:match("^(.-)://(.-):(.-):(.-)$") + assert(typ == 'odbc', "unsupported connection string:" .. connection_string) - return self + self._dbh = odbc.Connect(dsn, user, password) + + return self end -function Database:query(sql, fn) - if (fn == nil) then - return self._dbh:query(sql) - else - return self._dbh:query(sql, fn) - end +function OdbcDatabase:query(sql, fn) + self._rows_affected = nil + if fn then + return self._dbh:neach(sql, function(row) + local o = {} + for k, v in pairs(row) do + if v == odbc.NULL then + o[k] = nil + else + o[k] = tostring(v) + end + end + return fn(o) + end) + end + local ok, err = self._dbh:exec(sql) + if not ok then return nil, err end + self._rows_affected = ok + return self._rows_affected +end + +function OdbcDatabase:affected_rows() + return self._rows_affected; +end + +function OdbcDatabase:release() + if self._dbh then + self._dbh:destroy() + self._dbh = nil + end +end + +function OdbcDatabase:connected() + return self._dbh and self._dbh:connected() +end + +end +----------------------------------------------------------- + +----------------------------------------------------------- +local FsDatabase = {} if freeswitch then + +require "resources.functions.file_exists" +require "resources.functions.database_handle" + +FsDatabase.__index = FsDatabase + +function FsDatabase.new(name) + local dbh = assert(name) + if type(name) == 'string' then + if name == 'switch' and file_exists(database_dir.."/core.db") then + dbh = freeswitch.Dbh("sqlite://"..database_dir.."/core.db") + else + dbh = database_handle(name) + end + end + assert(dbh:connected()) + + local self = setmetatable({ + _dbh = dbh; + }, FsDatabase) + + return self +end + +function FsDatabase:query(sql, fn) + if fn then + return self._dbh:query(sql, fn) + end + return self._dbh:query(sql) +end + +function FsDatabase:affected_rows() + if self._dbh then + return self._dbh:affected_rows() + end +end + +function FsDatabase:release() + if self._dbh then + self._dbh:release() + self._dbh = nil + end +end + +function FsDatabase:connected() + return self._dbh and self._dbh:connected() +end + +end +----------------------------------------------------------- + +----------------------------------------------------------- +local Database = {} do +Database.__index = Database +Database.__base = freeswitch and FsDatabase or OdbcDatabase +Database = setmetatable(Database, Database.__base) + +function Database.new(...) + local self = Database.__base.new(...) + setmetatable(self, Database) + return self end function Database:first_row(sql) - local result - local ok, err = self:query(sql, function(row) - result = row - return 1 - end) - if not ok then return nil, err end - return result + local result + local ok, err = self:query(sql, function(row) + result = row + return 1 + end) + if not ok then return nil, err end + return result end function Database:first_value(sql) - local result, err = self:first_row(sql) - if not result then return nil, err end - local k, v = next(result) - return v + local result, err = self:first_row(sql) + if not result then return nil, err end + local k, v = next(result) + return v end function Database:first(sql, ...) - local result, err = self:first_row(sql) - if not result then return nil, err end - local t, n = {}, select('#', ...) - for i = 1, n do - t[i] = result[(select(i, ...))] - end - return unpack(t, 1, n) + local result, err = self:first_row(sql) + if not result then return nil, err end + local t, n = {}, select('#', ...) + for i = 1, n do + t[i] = result[(select(i, ...))] + end + return unpack(t, 1, n) end function Database:fetch_all(sql) - local result = {} - local ok, err = self:query(sql, function(row) - result[#result + 1] = row - end) - if not ok then return nil, err end - return result + local result = {} + local ok, err = self:query(sql, function(row) + result[#result + 1] = row + end) + if (not ok) and err then return nil, err end + return result end -function Database:release(sql) - if self._dbh then - self._dbh:release() - self._dbh = nil - end -end +function Database.__self_test__(...) + local db = Database.new(...) + assert(db:connected()) -function Database:connected(sql) - return self._dbh and self._dbh:connected() -end + assert("1" == db:first_value("select 1 as v union all select 2 as v")) -function Database.__self_test__(name) - local db = Database.new(name or 'system') - assert(db:connected()) + local t = assert(db:first_row("select '1' as v union all select '2' as v")) + assert(t.v == "1") - assert("1" == db:first_value("select 1 as v union all select 2 as v")) + t = assert(db:fetch_all("select '1' as v union all select '2' as v")) + assert(#t == 2) + assert(t[1].v == "1") + assert(t[2].v == "2") - local t = assert(db:first_row("select 1 as v union all select 2 as v")) - assert(t.v == "1") + local a, b = assert(db:first("select '1' as b, '2' as a", 'a', 'b')) + assert(a == "2") + assert(b == "1") - t = assert(db:fetch_all("select 1 as v union all select 2 as v")) - assert(#t == 2) - assert(t[1].v == "1") - assert(t[2].v == "2") + -- assert(nil == db:first_value("some non sql query")) - local a, b = assert(db:first("select 1 as b, 2 as a", 'a', 'b')) - assert(a == "2") - assert(b == "1") - - -- assert(nil == db:first_value("some non sql query")) - - db:release() - assert(not db:connected()) - print(" * databse - OK!") + db:release() + assert(not db:connected()) + print(" * databse - OK!") end end - --- if debug.self_test then --- Database.__self_test__() --- end +----------------------------------------------------------- return Database \ No newline at end of file diff --git a/resources/install/scripts/resources/functions/esl.lua b/resources/install/scripts/resources/functions/esl.lua new file mode 100644 index 0000000000..fcd4898754 --- /dev/null +++ b/resources/install/scripts/resources/functions/esl.lua @@ -0,0 +1,159 @@ +local function class(base) + local t = base and setmetatable({}, base) or {} + t.__index = t + t.__class = t + t.__base = base + + function t.new(...) + local o = setmetatable({}, t) + if o.__init then + if t == ... then -- we call as Class:new() + return o:__init(select(2, ...)) + else -- we call as Class.new() + return o:__init(...) + end + end + return o + end + + return t +end + +local EventSocket = class() do + +if not freeswitch then + +local socket = require "socket" +local ESLParser = require "lluv.esl".ESLParser +local split_status = require "lluv.esl.utils".split_status +local Database = require "resources.functions.database" + +local EOL = '\n' + +local host, port, auth + +function EventSocket:__init() + if not host then + local db = Database.new('system') + local settings, err = db:first_row("select event_socket_ip_address, event_socket_port, event_socket_password from v_settings") + if not settings then return nil, err end + host, port, auth = settings.event_socket_ip_address, settings.event_socket_port, settings.event_socket_password + end + + return self:_connect(host, port, auth) +end + +function EventSocket:_connect(host, port, password) + local err + self._cnn, err = socket.connect(host, port) + if not self._cnn then return nil, err end + + self._cnn:settimeout(1) + + self._parser = ESLParser.new() + local auth + while true do + local event + event, err = self:_recv_event() + if not event then break end + + local ct = event:getHeader('Content-Type') + if ct == 'auth/request' then + self._cnn:send('auth ' .. password .. EOL .. EOL) + elseif ct == 'command/reply' then + local reply = event:getHeader('Reply-Text') + if reply then + local ok, status, msg = split_status(reply) + if ok then auth = true else err = msg end + else + err = 'invalid response' + end + break + end + end + + if not auth then + self._cnn:close() + self._cnn = nil + return nil, err + end + + return self +end + +function EventSocket:_recv_event() + local event, err = self._parser:next_event() + + while event == true do + local str, rst + str, err, rst = self._cnn:receive("*l") + if str then self._parser:append(str):append(EOL) end + if rst then self._parser:append(rst) end + if err and err ~= 'timeout' then + break + end + event = self._parser:next_event() + end + + if (not event) or (event == true) then + return nil, err + end + + return event +end + +function EventSocket:_request(cmd) + if not self._cnn then return nil, 'closed' end + + for str in (cmd .. '\n'):gmatch("(.-)\n") do + self._cnn:send(str .. EOL) + end + self._cnn:send(EOL) + + return self:_recv_event() +end + +function EventSocket:api(cmd) + local event, err = self:_request('api ' .. cmd) + if not event then return nil, err end + local body = event:getBody() + if body then return body end + return event:getReply() +end + +function EventSocket:close() + if self._cnn then + self._cnn:close() + self._cnn = nil + end +end + +end + +if freeswitch then + +local api + +function EventSocket:__init() + self._api = api or freeswitch.API() + api = self._api + return self +end + +function EventSocket:api(cmd) + local result = self._api:executeString(cmd) + if result and result:sub(1, 4) == '-ERR' then + return nil, result:sub(5) + end + return result +end + +function EventSocket:close() + self._api = nil +end + +end + +end + +return EventSocket diff --git a/resources/install/scripts/resources/functions/lazy_settings.lua b/resources/install/scripts/resources/functions/lazy_settings.lua new file mode 100644 index 0000000000..3d3b72b342 --- /dev/null +++ b/resources/install/scripts/resources/functions/lazy_settings.lua @@ -0,0 +1,172 @@ +-- -- Global settings +-- local settings = Settings.new('system') +-- print(settings:get('switch', 'base', 'dir')) +-- +-- Domain settings (to `fax_retry.lua`) +-- local Settings = require "resources.functions.settings" +-- local settings = Settings.new(dbh, domain_name, domain_uuid) +-- storage_type = settings:get('fax', 'storage_type', 'text') or '' +-- storage_path = settings:get('fax', 'storage_path', 'text') or '' +-- storage_path = storage_path +-- :gsub("${domain_name}", domain_name) +-- :gsub("${voicemail_id}", voicemail_id) +-- :gsub("${voicemail_dir}", voicemail_dir) + +local Database = require "resources.functions.database" +local cache = require "resources.functions.cache" +require "resources.functions.split" + +----------------------------------------------------------- +local Settings = {} do +Settings.__index = Settings + +local NONE = '15783958-912c-4893-8866-4ccd1ca73c6e' + +local function append(t, v) + t[#t+1] = v + return t +end + +local function append_setting(array, category, subcategory, name, value) + --add the category array + if not array[category] then + array[category] = {} + end + + --add the subcategory array + if not array[category][subcategory] then + array[category][subcategory] = {} + end + + --set the name and value + if (name == "array") then + if not array[category][subcategory][name] then + array[category][subcategory][name] = {} + end + append(array[category][subcategory][name], value); + elseif value ~= nil then + array[category][subcategory][name] = value; + end +end + +function Settings.new(db, domain_name, domain_uuid) + local self = setmetatable({}, Settings) + self._array = {} + self._db = db + self._domain_name = domain_name + self._domain_uuid = domain_uuid + + return self +end + +function Settings:_cache_key(category, subcategory, name) + return 'setting:' .. (self._domain_name or '') .. ':' .. category .. ':' .. subcategory .. ':' .. name +end + +function Settings:set(category, subcategory, name, value) + append_setting(self._array, category, subcategory, name, value) + return self +end + +function Settings:get(category, subcategory, name) + local a = self._array + local v = a[category] and a[category][subcategory] and a[category][subcategory][name] + if v == NONE then return nil end + if v ~= nil then return v end + + local key = self:_cache_key(category, subcategory, name) + + v = cache.get(key) + if v then + if v ~= NONE and name == 'array' then + v = split(v, '/+/', true) + end + self:set(category, subcategory, name, v) + if v == NONE then return nil end + return v + end + + return self:_load(category, subcategory, name) +end + +function Settings:_load(category, subcategory, name) + local domain_uuid = self._domain_uuid + local db = self._db + if type(self._db) == 'string' then + db = Database.new(self._db) + end + + local found = false + --get the domain settings + if domain_uuid then + sql = "SELECT domain_setting_uuid,domain_setting_category,domain_setting_subcategory,domain_setting_name,domain_setting_value " + sql = sql .. "FROM v_domain_settings "; + sql = sql .. "WHERE domain_uuid = '" .. domain_uuid .. "'"; + sql = sql .. "AND domain_setting_enabled = 'true' "; + sql = sql .. "AND domain_setting_category = '" .. category .."'"; + sql = sql .. "AND domain_setting_subcategory = '" .. subcategory .. "'"; + sql = sql .. "AND domain_setting_name = '" .. name .. "'"; + sql = sql .. "AND domain_setting_value is not null "; + sql = sql .. "ORDER BY domain_setting_category, domain_setting_subcategory ASC "; + + db:query(sql, function(row) + found = true; + self:set( + row.domain_setting_category, + row.domain_setting_subcategory, + row.domain_setting_name, + row.domain_setting_value + ) + end) + end + + if not found then + local sql = "SELECT default_setting_uuid,default_setting_category,default_setting_subcategory,default_setting_name,default_setting_value " + sql = sql .. "FROM v_default_settings "; + sql = sql .. "WHERE default_setting_enabled = 'true' "; + sql = sql .. "AND default_setting_category = '" .. category .."'"; + sql = sql .. "AND default_setting_subcategory = '" .. subcategory .. "'"; + sql = sql .. "AND default_setting_name = '" .. name .. "'"; + sql = sql .. "AND default_setting_value is not null "; + sql = sql .. "ORDER BY default_setting_category, default_setting_subcategory ASC"; + + db:query(sql, function(row) + found = true; + self:set( + row.default_setting_category, + row.default_setting_subcategory, + row.default_setting_name, + row.default_setting_value + ) + end) + end + + if not found then + self:set(category, subcategory, name, NONE) + end + + local a = self._array + local v = a[category] and a[category][subcategory] and a[category][subcategory][name] + + if cache.support() then + local key = self:_cache_key(category, subcategory, name) + local value = v + if v ~= NONE and name == 'array' then + value = table.concat(v, '/+/') + end + local exp = expire and expire["settings"] or 3600 + cache.set(key, value, exp) + end + + if type(self._db) == 'string' then + db:release() + end + + if v == NONE then return nil end + return v +end + +end +----------------------------------------------------------- + +return Settings diff --git a/resources/install/scripts/resources/functions/log.lua b/resources/install/scripts/resources/functions/log.lua index b6cb59652b..b88818447b 100644 --- a/resources/install/scripts/resources/functions/log.lua +++ b/resources/install/scripts/resources/functions/log.lua @@ -3,9 +3,14 @@ -- log.noticef("%s %s", "hello", "world") -- -- log if debug.SQL or debug.xml_handler.SQL then -- log.tracef("SQL", "SQL is %s", sql) - -local function log(name, level, msg) - freeswitch.consoleLog(level, "[" .. name .. "] " .. msg .. "\n") +local log if freeswitch then + log = function (name, level, msg) + freeswitch.consoleLog(level, "[" .. name .. "] " .. msg .. "\n") + end +else + log = function (name, level, msg) + print(os.date("%Y-%m-%d %X") .. '[' .. level:upper() .. '] [' .. name .. '] ' .. msg) + end end local function logf(name, level, ...) diff --git a/resources/install/scripts/resources/functions/sleep.lua b/resources/install/scripts/resources/functions/sleep.lua new file mode 100644 index 0000000000..e7ace57abd --- /dev/null +++ b/resources/install/scripts/resources/functions/sleep.lua @@ -0,0 +1,15 @@ +if freeswitch then + +function sleep(ms) + freeswitch.msleep(ms) +end + +else + +local socket = require "socket" + +function sleep(ms) + socket.sleep(ms/1000) +end + +end