#!/usr/bin/env python
from odb import *
## -----------------------------------------------------------------------
## T E S T S
## -----------------------------------------------------------------------
import MySQLdb
import odb_mysql
import sqlite
import odb_sqlite
def TEST(output=log):
LOGGING_STATUS[DEV_SELECT] = 1
LOGGING_STATUS[DEV_UPDATE] = 1
print "------ TESTING MySQLdb ---------"
rdb = MySQLdb.connect(host = 'localhost',user='root', passwd = '', db='testdb')
ndb = MySQLdb.connect(host = 'localhost',user='trakken', passwd = 'trakpas', db='testdb')
cursor = rdb.cursor()
output("drop table agents")
try:
cursor.execute("drop table agents") # clean out the table
except:
pass
output("creating table")
SQL = """
create table agents (
agent_id integer not null primary key auto_increment,
login varchar(200) not null,
unique (login),
ext_email varchar(200) not null,
hashed_pw varchar(20) not null,
name varchar(200),
auth_level integer default 0,
ticket_count integer default 0)
"""
cursor.execute(SQL)
db = odb_mysql.Database(ndb)
TEST_DATABASE(rdb,db,output=output)
print "------ TESTING sqlite ----------"
rdb = sqlite.connect("/tmp/test.db",autocommit=1)
cursor = rdb.cursor()
try:
cursor.execute("drop table agents")
except:
pass
SQL = """
create table agents (
agent_id integer primary key,
login varchar(200) not null,
ext_email varchar(200) not null,
hashed_pw varchar(20),
name varchar(200),
auth_level integer default 0,
ticket_count integer default 0)"""
cursor.execute(SQL)
rdb = sqlite.connect("/tmp/test.db",autocommit=1)
ndb = sqlite.connect("/tmp/test.db",autocommit=1)
db = odb_sqlite.Database(ndb)
TEST_DATABASE(rdb,db,output=output,is_mysql=0)
def TEST_DATABASE(rdb,db,output=log,is_mysql=1):
cursor = rdb.cursor()
class AgentsTable(Table):
def _defineRows(self):
self.d_addColumn("agent_id",kInteger,None,primarykey = 1,autoincrement = 1)
self.d_addColumn("login",kVarString,200,notnull=1)
self.d_addColumn("ext_email",kVarString,200,notnull=1)
self.d_addColumn("hashed_pw",kVarString,20,notnull=1)
self.d_addColumn("name",kBigString,compress_ok=1)
self.d_addColumn("auth_level",kInteger,None)
self.d_addColumn("ticket_count",kIncInteger,None)
tbl = AgentsTable(db,"agents")
TEST_INSERT_COUNT = 5
# ---------------------------------------------------------------
# make sure we can catch a missing row
try:
a_row = tbl.fetchRow( ("agent_id", 1000) )
raise "test error"
except eNoMatchingRows:
pass
output("PASSED! fetch missing row test")
# --------------------------------------------------------------
# create new rows and insert them
for n in range(TEST_INSERT_COUNT):
new_id = n + 1
newrow = tbl.newRow()
newrow.name = "name #%d" % new_id
newrow.login = "name%d" % new_id
newrow.ext_email = "%d@name" % new_id
newrow.save()
if newrow.agent_id != new_id:
raise "new insert id (%s) does not match expected value (%d)" % (newrow.agent_id,new_id)
output("PASSED! autoinsert test")
# --------------------------------------------------------------
# fetch one row
a_row = tbl.fetchRow( ("agent_id", 1) )
if a_row.name != "name #1":
raise "row data incorrect"
output("PASSED! fetch one row test")
# ---------------------------------------------------------------
# don't change and save it
# (i.e. the "dummy cursor" string should never be called!)
#
try:
a_row.save(cursor = "dummy cursor")
except AttributeError, reason:
raise "row tried to access cursor on save() when no changes were made!"
output("PASSED! don't save when there are no changed")
# ---------------------------------------------------------------
# change, save, load, test
a_row.auth_level = 10
a_row.save()
b_row = tbl.fetchRow( ("agent_id", 1) )
if b_row.auth_level != 10:
log(repr(b_row))
raise "save and load failed"
output("PASSED! change, save, load")
# ---------------------------------------------------------------
# replace
repl_row = tbl.newRow(replace=1)
repl_row.agent_id = a_row.agent_id
repl_row.login = a_row.login + "-" + a_row.login
repl_row.ext_email = "foo"
repl_row.save()
b_row = tbl.fetchRow( ("agent_id", a_row.agent_id) )
if b_row.login != repl_row.login:
raise "replace failed"
output("PASSED! replace")
# --------------------------------------------------------------
# access unknown dict item
try:
a = a_row["UNKNOWN_ATTRIBUTE"]
raise "test error"
except KeyError, reason:
pass
try:
a_row["UNKNOWN_ATTRIBUTE"] = 1
raise "test error"
except KeyError, reason:
pass
output("PASSED! unknown dict item exception")
# --------------------------------------------------------------
# access unknown attribute
try:
a = a_row.UNKNOWN_ATTRIBUTE
raise "test error"
except AttributeError, reason:
pass
try:
a_row.UNKNOWN_ATTRIBUTE = 1
raise "test error"
except AttributeError, reason:
pass
output("PASSED! unknown attribute exception")
# --------------------------------------------------------------
# use wrong data for column type
try:
a_row.agent_id = "this is a string"
raise "test error"
except eInvalidData, reason:
pass
output("PASSED! invalid data for column type")
# --------------------------------------------------------------
# fetch 1 rows
rows = tbl.fetchRows( ('agent_id', 1) )
if len(rows) != 1:
raise "fetchRows() did not return 1 row!" % (TEST_INSERT_COUNT)
output("PASSED! fetch one row")
# --------------------------------------------------------------
# fetch All rows
rows = tbl.fetchAllRows()
if len(rows) != TEST_INSERT_COUNT:
for a_row in rows:
output(repr(a_row))
raise "fetchAllRows() did not return TEST_INSERT_COUNT(%d) rows!" % (TEST_INSERT_COUNT)
output("PASSED! fetchall rows")
# --------------------------------------------------------------
# delete row object
row = tbl.fetchRow( ('agent_id', 1) )
row.delete()
try:
row = tbl.fetchRow( ('agent_id', 1) )
raise "delete failed to delete row!"
except eNoMatchingRows:
pass
# --------------------------------------------------------------
# table deleteRow() call
row = tbl.fetchRow( ('agent_id',2) )
tbl.deleteRow( ('agent_id', 2) )
try:
row = tbl.fetchRow( ('agent_id',2) )
raise "table delete failed"
except eNoMatchingRows:
pass
# --------------------------------------------------------------
# table deleteRow() call
row = tbl.fetchRow( ('agent_id',3) )
if row.databaseSizeForColumn('name') != len(row.name):
raise "databaseSizeForColumn('name') failed"
# --------------------------------------------------------------
# test inc fields
row = tbl.newRow()
new_id = 1092
row.name = "name #%d" % new_id
row.login = "name%d" % new_id
row.ext_email = "%d@name" % new_id
row.inc('ticket_count')
row.save()
new_id = row.agent_id
trow = tbl.fetchRow( ('agent_id',new_id) )
if trow.ticket_count != 1:
raise "ticket_count didn't inc!"
row.inc('ticket_count', count=2)
row.save()
trow = tbl.fetchRow( ('agent_id',new_id) )
if trow.ticket_count != 3:
raise "ticket_count wrong, expected 3, got %d" % trow.ticket_count
trow.inc('ticket_count')
trow.save()
if trow.ticket_count != 4:
raise "ticket_count wrong, expected 4, got %d" % trow.ticket_count
output("\n==== ALL TESTS PASSED ====")
if __name__ == "__main__":
TEST()