Merge pull request #1443 from moteus/database_backend

Database backend
This commit is contained in:
FusionPBX
2016-02-25 08:39:07 -07:00
7 changed files with 499 additions and 156 deletions

View File

@@ -1,188 +1,136 @@
---
-- @usage
-- -- Use default backend
-- dbh = Database.new("system")
-- .....
--
-- @usage
-- -- Use LuaSQL backend
-- dbh = Database.backend.luasql("system")
-- .....
require 'resources.functions.config'
local log = require "resources.functions.log".database
local BACKEND = xml_handler and xml_handler.db_backend or 'native'
local unpack = unpack or table.unpack
-----------------------------------------------------------
local OdbcDatabase = {} if not freeswitch then
OdbcDatabase.__index = OdbcDatabase
local installed_classes = {}
local default_backend = FsDatabase
local function new_database(backend)
local class = installed_classes[backend]
if class then return class end
local odbc = require "odbc.dba"
local Database = {} do
Database.__index = Database
Database.__base = backend or default_backend
Database = setmetatable(Database, Database.__base)
function OdbcDatabase.new(name)
local self = setmetatable({}, OdbcDatabase)
function Database.new(...)
local self = Database.__base.new(...)
setmetatable(self, Database)
return self
end
local connection_string = assert(database[name])
local typ, dsn, user, password = connection_string:match("^(.-)://(.-):(.-):(.-)$")
assert(typ == 'odbc', "unsupported connection string:" .. connection_string)
self._dbh = odbc.Connect(dsn, user, password)
return self
end
function OdbcDatabase:query(sql, fn)
self._rows_affected = nil
if fn then
return self._dbh:neach(sql, function(row)
local o = {}
for k, v in pairs(row) do
if v == odbc.NULL then
o[k] = nil
else
o[k] = tostring(v)
end
end
return fn(o)
function Database:first_row(sql)
local result
local ok, err = self:query(sql, function(row)
result = row
return 1
end)
if not ok then return nil, err end
return result
end
local ok, err = self._dbh:exec(sql)
if not ok then return nil, err end
self._rows_affected = ok
return self._rows_affected
end
function OdbcDatabase:affected_rows()
return self._rows_affected;
end
function OdbcDatabase:release()
if self._dbh then
self._dbh:destroy()
self._dbh = nil
function Database:first_value(sql)
local result, err = self:first_row(sql)
if not result then return nil, err end
local k, v = next(result)
return v
end
end
function OdbcDatabase:connected()
return self._dbh and self._dbh:connected()
end
end
-----------------------------------------------------------
-----------------------------------------------------------
local FsDatabase = {} if freeswitch then
require "resources.functions.file_exists"
require "resources.functions.database_handle"
FsDatabase.__index = FsDatabase
function FsDatabase.new(name)
local dbh = assert(name)
if type(name) == 'string' then
if name == 'switch' and file_exists(database_dir.."/core.db") then
dbh = freeswitch.Dbh("sqlite://"..database_dir.."/core.db")
else
dbh = database_handle(name)
function Database:first(sql, ...)
local result, err = self:first_row(sql)
if not result then return nil, err end
local t, n = {}, select('#', ...)
for i = 1, n do
t[i] = result[(select(i, ...))]
end
return unpack(t, 1, n)
end
assert(dbh:connected())
local self = setmetatable({
_dbh = dbh;
}, FsDatabase)
return self
end
function FsDatabase:query(sql, fn)
if fn then
return self._dbh:query(sql, fn)
function Database:fetch_all(sql)
local result = {}
local ok, err = self:query(sql, function(row)
result[#result + 1] = row
end)
if (not ok) and err then return nil, err end
return result
end
return self._dbh:query(sql)
end
function FsDatabase:affected_rows()
if self._dbh then
return self._dbh:affected_rows()
function Database.__self_test__(...)
log.info('self_test Database - ' .. Database._backend_name)
local db = Database.new(...)
assert(db:connected())
assert("1" == db:first_value("select 1 as v union all select 2 as v"))
local t = assert(db:first_row("select '1' as v union all select '2' as v"))
assert(t.v == "1")
t = assert(db:fetch_all("select '1' as v union all select '2' as v"))
assert(#t == 2)
assert(t[1].v == "1")
assert(t[2].v == "2")
local a, b = assert(db:first("select '1' as b, '2' as a", 'a', 'b'))
assert(a == "2")
assert(b == "1")
-- assert(nil == db:first_value("some non sql query"))
-- select NULL
local a = assert(db:first_value("select NULL as a"))
assert(a == "")
db:release()
assert(not db:connected())
log.info('self_test Database - pass')
end
end
function FsDatabase:release()
if self._dbh then
self._dbh:release()
self._dbh = nil
end
end
function FsDatabase:connected()
return self._dbh and self._dbh:connected()
end
installed_classes[backend] = Database
return Database
end
-----------------------------------------------------------
-----------------------------------------------------------
local Database = {} do
Database.__index = Database
Database.__base = freeswitch and FsDatabase or OdbcDatabase
Database = setmetatable(Database, Database.__base)
local Database = {} do
function Database.new(...)
local self = Database.__base.new(...)
setmetatable(self, Database)
return self
end
function Database:first_row(sql)
local result
local ok, err = self:query(sql, function(row)
result = row
return 1
end)
if not ok then return nil, err end
return result
end
function Database:first_value(sql)
local result, err = self:first_row(sql)
if not result then return nil, err end
local k, v = next(result)
return v
end
function Database:first(sql, ...)
local result, err = self:first_row(sql)
if not result then return nil, err end
local t, n = {}, select('#', ...)
for i = 1, n do
t[i] = result[(select(i, ...))]
local backend_loader = setmetatable({}, {__index = function(self, backend)
local class = require("resources.functions.database." .. backend)
local database = new_database(class)
self[backend] = function(...)
return database.new(...)
end
return unpack(t, 1, n)
end
return self[backend]
end})
function Database:fetch_all(sql)
local result = {}
local ok, err = self:query(sql, function(row)
result[#result + 1] = row
end)
if (not ok) and err then return nil, err end
return result
end
Database.backend = backend_loader
function Database.__self_test__(...)
local db = Database.new(...)
assert(db:connected())
Database.new = Database.backend[BACKEND]
assert("1" == db:first_value("select 1 as v union all select 2 as v"))
local t = assert(db:first_row("select '1' as v union all select '2' as v"))
assert(t.v == "1")
t = assert(db:fetch_all("select '1' as v union all select '2' as v"))
assert(#t == 2)
assert(t[1].v == "1")
assert(t[2].v == "2")
local a, b = assert(db:first("select '1' as b, '2' as a", 'a', 'b'))
assert(a == "2")
assert(b == "1")
-- assert(nil == db:first_value("some non sql query"))
db:release()
assert(not db:connected())
print(" * databse - OK!")
end
Database.__self_test__ = function(backends, ...)
for _, backend in ipairs(backends) do
local t = Database.backend[backend]
t(...).__self_test__(...)
end
end;
end
-----------------------------------------------------------

View File

@@ -0,0 +1,106 @@
--
-- LuaSQL backend to FusionPBX database class
--
require "resources.functions.split"
local log = require "resources.functions.log".database
local LuaSQLDatabase = {} do
LuaSQLDatabase.__index = LuaSQLDatabase
LuaSQLDatabase._backend_name = 'LuaSQL'
local map = {
pgsql = 'postgres';
}
local function apply_names(row, colnames, null_value)
for _, name in pairs(colnames) do
if row[name] == nil then
row[name] = null_value
else
row[name] = tostring(row[name])
end
end
return row
end
function LuaSQLDatabase.new(name)
local self = setmetatable({}, OdbcDatabase)
local connection_string = assert(database[name])
local typ, args = split_first(database[name], "://", true);
typ = map[typ] or typ
local luasql = require ("luasql." .. typ)
local env = assert (luasql[typ]())
local dbh = assert (env:connect( usplit(args, ':', true) ))
self._env, self._dbh = env, dbh
return self
end
function LuaSQLDatabase:query(sql, fn)
self._rows_affected = nil
if fn then
local cur, err = self._dbh:execute(sql)
if err and not cur then
log.errf("Can not execute sql: %s\n%s", tostring(err), sql)
end
local colnames = cur:getcolnames()
while true do
local row, err = cur:fetch({}, "a")
if not row then break end
local ok, ret = pcall(fn, apply_names(row, colnames, ""))
if (not ok) or (type(ret) == 'number' and ret > 0) then
break
end
end
cur:close()
return true
end
local ok, err = self._dbh:execute(sql)
if err and not ok then
log.errf("Can not execute sql: %s\n%s", tostring(err), sql)
end
if not ok then return nil, err end
if type(ok) ~= 'number' then
ok:close()
log.warning('SQL return recordset')
else
self._rows_affected = ok
end
self._rows_affected = ok
return self._rows_affected
end
function LuaSQLDatabase:affected_rows()
return self._rows_affected;
end
function LuaSQLDatabase:release()
if self._dbh then
self._dbh:close()
self._env:close()
self._env, self._dbh = nil
end
end
function LuaSQLDatabase:connected()
if not self._dbh then
return false
end
local str = tostring(self._dbh)
return not string.find(str, 'closed')
end
end
return LuaSQLDatabase

View File

@@ -0,0 +1,61 @@
--
-- Native backend to FusionPBX database class
--
local log = require "resources.functions.log".database
-----------------------------------------------------------
local FsDatabase = {} if freeswitch then
require "resources.functions.file_exists"
require "resources.functions.database_handle"
FsDatabase.__index = FsDatabase
FsDatabase._backend_name = 'native'
function FsDatabase.new(name)
local dbh = assert(name)
if type(name) == 'string' then
if name == 'switch' and file_exists(database_dir.."/core.db") then
dbh = freeswitch.Dbh("sqlite://"..database_dir.."/core.db")
else
dbh = database_handle(name)
end
end
assert(dbh:connected())
local self = setmetatable({
_dbh = dbh;
}, FsDatabase)
return self
end
function FsDatabase:query(sql, fn)
if fn then
return self._dbh:query(sql, fn)
end
return self._dbh:query(sql)
end
function FsDatabase:affected_rows()
if self._dbh then
return self._dbh:affected_rows()
end
end
function FsDatabase:release()
if self._dbh then
self._dbh:release()
self._dbh = nil
end
end
function FsDatabase:connected()
return self._dbh and self._dbh:connected()
end
end
-----------------------------------------------------------
return FsDatabase

View File

@@ -0,0 +1,67 @@
--
-- Lua-ODBC backend to FusionPBX database class
--
local log = require "resources.functions.log".database
local odbc = require "odbc.dba"
local function remove_null(row, null, null_value)
local o = {}
for k, v in pairs(row) do
if v == null then
o[k] = null_value
else
o[k] = tostring(v)
end
end
return o
end
local OdbcDatabase = {} do
OdbcDatabase.__index = OdbcDatabase
OdbcDatabase._backend_name = 'ODBC'
function OdbcDatabase.new(name)
local self = setmetatable({}, OdbcDatabase)
local connection_string = assert(database[name])
local typ, dsn, user, password = connection_string:match("^(.-)://(.-):(.-):(.-)$")
assert(typ == 'odbc', "unsupported connection string:" .. connection_string)
self._dbh = odbc.Connect(dsn, user, password)
return self
end
function OdbcDatabase:query(sql, fn)
self._rows_affected = nil
if fn then
return self._dbh:neach(sql, function(row)
return fn(remove_null(row, odbc.NULL, ""))
end)
end
local ok, err = self._dbh:exec(sql)
if not ok then return nil, err end
self._rows_affected = ok
return self._rows_affected
end
function OdbcDatabase:affected_rows()
return self._rows_affected;
end
function OdbcDatabase:release()
if self._dbh then
self._dbh:destroy()
self._dbh = nil
end
end
function OdbcDatabase:connected()
return self._dbh and self._dbh:connected()
end
end
return OdbcDatabase

View File

@@ -0,0 +1,82 @@
--
-- Lua-ODBC-Pool backend to FusionPBX database class
--
local log = require "resources.functions.log".database
local odbc = require "odbc.dba"
local odbcpool = require "odbc.dba.pool"
local function remove_null(row, null, null_value)
local o = {}
for k, v in pairs(row) do
if v == null then
o[k] = null_value
else
o[k] = tostring(v)
end
end
return o
end
-----------------------------------------------------------
local OdbcPoolDatabase = {} do
OdbcPoolDatabase.__index = OdbcPoolDatabase
OdbcPoolDatabase._backend_name = 'ODBC Pool'
function OdbcPoolDatabase.new(name)
local self = setmetatable({}, OdbcPoolDatabase)
self._cli = odbcpool.client(name)
self._timeout = 1000
self._rows_affected = nil
return self
end
function OdbcPoolDatabase:query(sql, fn)
self._rows_affected = nil
local cli = self._cli
local ok, err
if fn then
ok, err = cli:acquire(self._timeout, function(dbh)
local ok, err = dbh:neach(sql, function(row)
return fn(remove_null(row, odbc.NULL, ""))
end)
if err and not ok then
log.errf("Can not execute sql: %s\n%s", tostring(err), sql)
end
return not not dbh:connected(), true
end)
else
ok, err = cli:acquire(self._timeout, function(dbh)
local ok, err = dbh:exec(sql)
if err and not ok then
log.errf("Can not execute sql: %s\n%s", tostring(err), sql)
end
self._rows_affected = ok
return not not dbh:connected(), ok
end)
end
if err and not ok then
log.errf("Can not get database handle: %s", tostring(err))
end
return ok
end
function OdbcPoolDatabase:affected_rows()
return self._rows_affected;
end
function OdbcPoolDatabase:release()
self._cli = nil
end
function OdbcPoolDatabase:connected()
return not not self._cli
end
end
-----------------------------------------------------------
return OdbcPoolDatabase

View File

@@ -0,0 +1,67 @@
-- Start background service to support Lua-ODBC-Pool database backend
require "resources.functions.config"
require "resources.functions.file_exists"
local log = require "resources.functions.log".dbpool
local odbc = require "odbc"
local odbcpool = require "odbc.pool"
-- Configuration
local POLL_TIMEOUT = 5
local run_file = scripts_dir .. "/run/dbpool.tmp";
-- Pool ctor
local function run_odbc_pool(name, n)
local connection_string = assert(database[name])
local typ, dsn, user, password = connection_string:match("^(.-)://(.-):(.-):(.-)$")
assert(typ == 'odbc', "unsupported connection string:" .. connection_string)
local cli = odbcpool.client(name)
log.noticef("Starting reconnect thread[%s] ...", name)
local rthread = odbcpool.reconnect_thread(cli, dsn, user, password)
rthread:start()
log.noticef("Reconnect thread[%s] started", name)
local env = odbc.environment()
local connections = {}
for i = 1, (n or 10) do
local cnn = odbc.assert(env:connection())
connections[#connections+1] = cnn
cli:reconnect(cnn)
end
return {
name = name;
cli = cli;
cnn = connections;
thr = rthread;
}
end
local function stop_odbc_pool(ctx)
log.noticef("Stopping reconnect thread[%s] ...", ctx.name)
ctx.thr:stop()
log.noticef("Reconnect thread[%s] stopped", ctx.name)
end
local function main()
local system_pool = run_odbc_pool("system", 10)
local switch_pool = run_odbc_pool("switch", 10)
local file = assert(io.open(run_file, "w"));
file:write("remove this file to stop the script");
file:close()
while file_exists(run_file) do
freeswitch.msleep(POLL_TIMEOUT*1000)
end
stop_odbc_pool(system_pool)
stop_odbc_pool(switch_pool)
end
main()

View File

@@ -0,0 +1,12 @@
local Cache = require 'resources.functions.cache'
local Database = require 'resources.functions.database'
Database.__self_test__({
"native",
"luasql",
"odbc",
"odbcpool",
},
"system")
Cache._self_test()