普通文本  |  297行  |  8.14 KB

#!/usr/bin/env python

from odb import *

## -----------------------------------------------------------------------
##                            T  E  S  T S
## -----------------------------------------------------------------------

import MySQLdb
import odb_mysql
import sqlite
import odb_sqlite
        
def TEST(output=log):
    LOGGING_STATUS[DEV_SELECT] = 1
    LOGGING_STATUS[DEV_UPDATE] = 1

    print "------ TESTING MySQLdb ---------"
    rdb = MySQLdb.connect(host = 'localhost',user='root', passwd = '', db='testdb')
    ndb = MySQLdb.connect(host = 'localhost',user='trakken', passwd = 'trakpas', db='testdb')
    cursor = rdb.cursor()
    
    output("drop table agents")
    try:
        cursor.execute("drop table agents")   # clean out the table
    except:
        pass
    output("creating table")

    SQL = """

    create table agents (
       agent_id integer not null primary key auto_increment,
       login varchar(200) not null,
       unique (login),
       ext_email varchar(200) not null,
       hashed_pw varchar(20) not null,
       name varchar(200),
       auth_level integer default 0,
       ticket_count integer default 0)
       """

    cursor.execute(SQL)
    db = odb_mysql.Database(ndb)
    TEST_DATABASE(rdb,db,output=output)

    print "------ TESTING sqlite ----------"
    rdb = sqlite.connect("/tmp/test.db",autocommit=1)
    cursor = rdb.cursor()
    try:
        cursor.execute("drop table agents")
    except:
        pass
    SQL = """
    create table agents (
       agent_id integer primary key,
       login varchar(200) not null,
       ext_email varchar(200) not null,
       hashed_pw varchar(20),
       name varchar(200),
       auth_level integer default 0,
       ticket_count integer default 0)"""
    cursor.execute(SQL)
    rdb = sqlite.connect("/tmp/test.db",autocommit=1)
    ndb = sqlite.connect("/tmp/test.db",autocommit=1)
        
    db = odb_sqlite.Database(ndb)
    TEST_DATABASE(rdb,db,output=output,is_mysql=0)
    
    


def TEST_DATABASE(rdb,db,output=log,is_mysql=1):

    cursor = rdb.cursor()
    
    class AgentsTable(Table):
        def _defineRows(self):
            self.d_addColumn("agent_id",kInteger,None,primarykey = 1,autoincrement = 1)
            self.d_addColumn("login",kVarString,200,notnull=1)
            self.d_addColumn("ext_email",kVarString,200,notnull=1)
            self.d_addColumn("hashed_pw",kVarString,20,notnull=1)
            self.d_addColumn("name",kBigString,compress_ok=1)
            self.d_addColumn("auth_level",kInteger,None)
            self.d_addColumn("ticket_count",kIncInteger,None)

    tbl = AgentsTable(db,"agents")




    TEST_INSERT_COUNT = 5

    # ---------------------------------------------------------------
    # make sure we can catch a missing row

    try:
        a_row = tbl.fetchRow( ("agent_id", 1000) )
        raise "test error"
    except eNoMatchingRows:
        pass

    output("PASSED! fetch missing row test")

    # --------------------------------------------------------------
    # create new rows and insert them

    for n in range(TEST_INSERT_COUNT):
        new_id = n + 1
        
        newrow = tbl.newRow()
        newrow.name = "name #%d" % new_id
        newrow.login = "name%d" % new_id
        newrow.ext_email = "%d@name" % new_id
        newrow.save()
        if newrow.agent_id != new_id:
            raise "new insert id (%s) does not match expected value (%d)" % (newrow.agent_id,new_id)

    output("PASSED! autoinsert test")

    # --------------------------------------------------------------
    # fetch one row
    a_row = tbl.fetchRow( ("agent_id", 1) )

    if a_row.name != "name #1":
        raise "row data incorrect"

    output("PASSED! fetch one row test")

    # ---------------------------------------------------------------
    # don't change and save it
    # (i.e. the "dummy cursor" string should never be called!)
    #
    try:
        a_row.save(cursor = "dummy cursor")
    except AttributeError, reason:
        raise "row tried to access cursor on save() when no changes were made!"

    output("PASSED! don't save when there are no changed")

    # ---------------------------------------------------------------
    # change, save, load, test
    
    a_row.auth_level = 10
    a_row.save()
    b_row = tbl.fetchRow( ("agent_id", 1) )
    if b_row.auth_level != 10:
        log(repr(b_row))
        raise "save and load failed"
    

    output("PASSED! change, save, load")

    # ---------------------------------------------------------------
    # replace


    repl_row = tbl.newRow(replace=1)
    repl_row.agent_id = a_row.agent_id
    repl_row.login = a_row.login + "-" + a_row.login
    repl_row.ext_email = "foo"
    repl_row.save()

    b_row = tbl.fetchRow( ("agent_id", a_row.agent_id) )
    if b_row.login != repl_row.login:
        raise "replace failed"
    output("PASSED! replace")

    # --------------------------------------------------------------
    # access unknown dict item

    try:
        a = a_row["UNKNOWN_ATTRIBUTE"]
        raise "test error"
    except KeyError, reason:
        pass

    try:
        a_row["UNKNOWN_ATTRIBUTE"] = 1
        raise "test error"
    except KeyError, reason:
        pass

    output("PASSED! unknown dict item exception")

    # --------------------------------------------------------------
    # access unknown attribute
    try:
        a = a_row.UNKNOWN_ATTRIBUTE
        raise "test error"
    except AttributeError, reason:
        pass

    try:
        a_row.UNKNOWN_ATTRIBUTE = 1
        raise "test error"
    except AttributeError, reason:
        pass

    output("PASSED! unknown attribute exception")

 
    # --------------------------------------------------------------
    # use wrong data for column type

    try:
        a_row.agent_id = "this is a string"
        raise "test error"
    except eInvalidData, reason:
        pass

    output("PASSED! invalid data for column type")

    # --------------------------------------------------------------
    # fetch 1 rows

    rows = tbl.fetchRows( ('agent_id', 1) )
    if len(rows) != 1:
        raise "fetchRows() did not return 1 row!" % (TEST_INSERT_COUNT)

    output("PASSED! fetch one row")


    # --------------------------------------------------------------
    # fetch All rows
    
    rows = tbl.fetchAllRows()
    if len(rows) != TEST_INSERT_COUNT:
        for a_row in rows:
            output(repr(a_row))
        raise "fetchAllRows() did not return TEST_INSERT_COUNT(%d) rows!" % (TEST_INSERT_COUNT)

    output("PASSED! fetchall rows")

  
    # --------------------------------------------------------------
    # delete row object

    row = tbl.fetchRow( ('agent_id', 1) )
    row.delete()
    try:
        row = tbl.fetchRow( ('agent_id', 1) )
        raise "delete failed to delete row!"
    except eNoMatchingRows:
        pass

    # --------------------------------------------------------------
    # table deleteRow() call

    row = tbl.fetchRow( ('agent_id',2) )
    tbl.deleteRow( ('agent_id', 2) )
    try:
        row = tbl.fetchRow( ('agent_id',2) )
        raise "table delete failed"
    except eNoMatchingRows:
        pass

    # --------------------------------------------------------------
    # table deleteRow() call

    row = tbl.fetchRow( ('agent_id',3) )
    if row.databaseSizeForColumn('name') != len(row.name):
        raise "databaseSizeForColumn('name') failed"
    
    # --------------------------------------------------------------
    # test inc fields
    row = tbl.newRow()
    new_id = 1092
    row.name = "name #%d" % new_id
    row.login = "name%d" % new_id
    row.ext_email = "%d@name" % new_id
    row.inc('ticket_count')
    row.save()
    new_id = row.agent_id

    trow = tbl.fetchRow( ('agent_id',new_id) )
    if trow.ticket_count != 1:
        raise "ticket_count didn't inc!"

    row.inc('ticket_count', count=2)
    row.save()
    trow = tbl.fetchRow( ('agent_id',new_id) )
    if trow.ticket_count != 3:
        raise "ticket_count wrong, expected 3, got %d" % trow.ticket_count

    trow.inc('ticket_count')
    trow.save()
    if trow.ticket_count != 4:
        raise "ticket_count wrong, expected 4, got %d" % trow.ticket_count

    output("\n==== ALL TESTS PASSED ====")
    

if __name__ == "__main__":
    TEST()