普通文本  |  1159行  |  35.38 KB

"""
Basic TestCases for BTree and hash DBs, with and without a DBEnv, with
various DB flags, etc.
"""

import os
import errno
import string
from pprint import pprint
import unittest
import time
import sys

from test_all import db, test_support, verbose, get_new_environment_path, \
        get_new_database_path

DASH = '-'


#----------------------------------------------------------------------

class VersionTestCase(unittest.TestCase):
    def test00_version(self):
        info = db.version()
        if verbose:
            print '\n', '-=' * 20
            print 'bsddb.db.version(): %s' % (info, )
            print db.DB_VERSION_STRING
            print '-=' * 20
        self.assertEqual(info, (db.DB_VERSION_MAJOR, db.DB_VERSION_MINOR,
                        db.DB_VERSION_PATCH))

#----------------------------------------------------------------------

class BasicTestCase(unittest.TestCase):
    dbtype       = db.DB_UNKNOWN  # must be set in derived class
    cachesize    = (0, 1024*1024, 1)
    dbopenflags  = 0
    dbsetflags   = 0
    dbmode       = 0660
    dbname       = None
    useEnv       = 0
    envflags     = 0
    envsetflags  = 0

    _numKeys      = 1002    # PRIVATE.  NOTE: must be an even value

    def setUp(self):
        if self.useEnv:
            self.homeDir=get_new_environment_path()
            try:
                self.env = db.DBEnv()
                self.env.set_lg_max(1024*1024)
                self.env.set_tx_max(30)
                self._t = int(time.time())
                self.env.set_tx_timestamp(self._t)
                self.env.set_flags(self.envsetflags, 1)
                self.env.open(self.homeDir, self.envflags | db.DB_CREATE)
                self.filename = "test"
            # Yes, a bare except is intended, since we're re-raising the exc.
            except:
                test_support.rmtree(self.homeDir)
                raise
        else:
            self.env = None
            self.filename = get_new_database_path()

        # create and open the DB
        self.d = db.DB(self.env)
        if not self.useEnv :
            self.d.set_cachesize(*self.cachesize)
            cachesize = self.d.get_cachesize()
            self.assertEqual(cachesize[0], self.cachesize[0])
            self.assertEqual(cachesize[2], self.cachesize[2])
            # Berkeley DB expands the cache 25% accounting overhead,
            # if the cache is small.
            self.assertEqual(125, int(100.0*cachesize[1]/self.cachesize[1]))
        self.d.set_flags(self.dbsetflags)
        if self.dbname:
            self.d.open(self.filename, self.dbname, self.dbtype,
                        self.dbopenflags|db.DB_CREATE, self.dbmode)
        else:
            self.d.open(self.filename,   # try out keyword args
                        mode = self.dbmode,
                        dbtype = self.dbtype,
                        flags = self.dbopenflags|db.DB_CREATE)

        if not self.useEnv:
            self.assertRaises(db.DBInvalidArgError,
                    self.d.set_cachesize, *self.cachesize)

        self.populateDB()


    def tearDown(self):
        self.d.close()
        if self.env is not None:
            self.env.close()
            test_support.rmtree(self.homeDir)
        else:
            os.remove(self.filename)



    def populateDB(self, _txn=None):
        d = self.d

        for x in range(self._numKeys//2):
            key = '%04d' % (self._numKeys - x)  # insert keys in reverse order
            data = self.makeData(key)
            d.put(key, data, _txn)

        d.put('empty value', '', _txn)

        for x in range(self._numKeys//2-1):
            key = '%04d' % x  # and now some in forward order
            data = self.makeData(key)
            d.put(key, data, _txn)

        if _txn:
            _txn.commit()

        num = len(d)
        if verbose:
            print "created %d records" % num


    def makeData(self, key):
        return DASH.join([key] * 5)



    #----------------------------------------

    def test01_GetsAndPuts(self):
        d = self.d

        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test01_GetsAndPuts..." % self.__class__.__name__

        for key in ['0001', '0100', '0400', '0700', '0999']:
            data = d.get(key)
            if verbose:
                print data

        self.assertEqual(d.get('0321'), '0321-0321-0321-0321-0321')

        # By default non-existent keys return None...
        self.assertEqual(d.get('abcd'), None)

        # ...but they raise exceptions in other situations.  Call
        # set_get_returns_none() to change it.
        try:
            d.delete('abcd')
        except db.DBNotFoundError, val:
            if sys.version_info < (2, 6) :
                self.assertEqual(val[0], db.DB_NOTFOUND)
            else :
                self.assertEqual(val.args[0], db.DB_NOTFOUND)
            if verbose: print val
        else:
            self.fail("expected exception")


        d.put('abcd', 'a new record')
        self.assertEqual(d.get('abcd'), 'a new record')

        d.put('abcd', 'same key')
        if self.dbsetflags & db.DB_DUP:
            self.assertEqual(d.get('abcd'), 'a new record')
        else:
            self.assertEqual(d.get('abcd'), 'same key')


        try:
            d.put('abcd', 'this should fail', flags=db.DB_NOOVERWRITE)
        except db.DBKeyExistError, val:
            if sys.version_info < (2, 6) :
                self.assertEqual(val[0], db.DB_KEYEXIST)
            else :
                self.assertEqual(val.args[0], db.DB_KEYEXIST)
            if verbose: print val
        else:
            self.fail("expected exception")

        if self.dbsetflags & db.DB_DUP:
            self.assertEqual(d.get('abcd'), 'a new record')
        else:
            self.assertEqual(d.get('abcd'), 'same key')


        d.sync()
        d.close()
        del d

        self.d = db.DB(self.env)
        if self.dbname:
            self.d.open(self.filename, self.dbname)
        else:
            self.d.open(self.filename)
        d = self.d

        self.assertEqual(d.get('0321'), '0321-0321-0321-0321-0321')
        if self.dbsetflags & db.DB_DUP:
            self.assertEqual(d.get('abcd'), 'a new record')
        else:
            self.assertEqual(d.get('abcd'), 'same key')

        rec = d.get_both('0555', '0555-0555-0555-0555-0555')
        if verbose:
            print rec

        self.assertEqual(d.get_both('0555', 'bad data'), None)

        # test default value
        data = d.get('bad key', 'bad data')
        self.assertEqual(data, 'bad data')

        # any object can pass through
        data = d.get('bad key', self)
        self.assertEqual(data, self)

        s = d.stat()
        self.assertEqual(type(s), type({}))
        if verbose:
            print 'd.stat() returned this dictionary:'
            pprint(s)


    #----------------------------------------

    def test02_DictionaryMethods(self):
        d = self.d

        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test02_DictionaryMethods..." % \
                  self.__class__.__name__

        for key in ['0002', '0101', '0401', '0701', '0998']:
            data = d[key]
            self.assertEqual(data, self.makeData(key))
            if verbose:
                print data

        self.assertEqual(len(d), self._numKeys)
        keys = d.keys()
        self.assertEqual(len(keys), self._numKeys)
        self.assertEqual(type(keys), type([]))

        d['new record'] = 'a new record'
        self.assertEqual(len(d), self._numKeys+1)
        keys = d.keys()
        self.assertEqual(len(keys), self._numKeys+1)

        d['new record'] = 'a replacement record'
        self.assertEqual(len(d), self._numKeys+1)
        keys = d.keys()
        self.assertEqual(len(keys), self._numKeys+1)

        if verbose:
            print "the first 10 keys are:"
            pprint(keys[:10])

        self.assertEqual(d['new record'], 'a replacement record')

# We check also the positional parameter
        self.assertEqual(d.has_key('0001', None), 1)
# We check also the keyword parameter
        self.assertEqual(d.has_key('spam', txn=None), 0)

        items = d.items()
        self.assertEqual(len(items), self._numKeys+1)
        self.assertEqual(type(items), type([]))
        self.assertEqual(type(items[0]), type(()))
        self.assertEqual(len(items[0]), 2)

        if verbose:
            print "the first 10 items are:"
            pprint(items[:10])

        values = d.values()
        self.assertEqual(len(values), self._numKeys+1)
        self.assertEqual(type(values), type([]))

        if verbose:
            print "the first 10 values are:"
            pprint(values[:10])


    #----------------------------------------

    def test02b_SequenceMethods(self):
        d = self.d

        for key in ['0002', '0101', '0401', '0701', '0998']:
            data = d[key]
            self.assertEqual(data, self.makeData(key))
            if verbose:
                print data

        self.assertTrue(hasattr(d, "__contains__"))
        self.assertTrue("0401" in d)
        self.assertFalse("1234" in d)


    #----------------------------------------

    def test03_SimpleCursorStuff(self, get_raises_error=0, set_raises_error=0):
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test03_SimpleCursorStuff (get_error %s, set_error %s)..." % \
                  (self.__class__.__name__, get_raises_error, set_raises_error)

        if self.env and self.dbopenflags & db.DB_AUTO_COMMIT:
            txn = self.env.txn_begin()
        else:
            txn = None
        c = self.d.cursor(txn=txn)

        rec = c.first()
        count = 0
        while rec is not None:
            count = count + 1
            if verbose and count % 100 == 0:
                print rec
            try:
                rec = c.next()
            except db.DBNotFoundError, val:
                if get_raises_error:
                    if sys.version_info < (2, 6) :
                        self.assertEqual(val[0], db.DB_NOTFOUND)
                    else :
                        self.assertEqual(val.args[0], db.DB_NOTFOUND)
                    if verbose: print val
                    rec = None
                else:
                    self.fail("unexpected DBNotFoundError")
            self.assertEqual(c.get_current_size(), len(c.current()[1]),
                    "%s != len(%r)" % (c.get_current_size(), c.current()[1]))

        self.assertEqual(count, self._numKeys)


        rec = c.last()
        count = 0
        while rec is not None:
            count = count + 1
            if verbose and count % 100 == 0:
                print rec
            try:
                rec = c.prev()
            except db.DBNotFoundError, val:
                if get_raises_error:
                    if sys.version_info < (2, 6) :
                        self.assertEqual(val[0], db.DB_NOTFOUND)
                    else :
                        self.assertEqual(val.args[0], db.DB_NOTFOUND)
                    if verbose: print val
                    rec = None
                else:
                    self.fail("unexpected DBNotFoundError")

        self.assertEqual(count, self._numKeys)

        rec = c.set('0505')
        rec2 = c.current()
        self.assertEqual(rec, rec2)
        self.assertEqual(rec[0], '0505')
        self.assertEqual(rec[1], self.makeData('0505'))
        self.assertEqual(c.get_current_size(), len(rec[1]))

        # make sure we get empty values properly
        rec = c.set('empty value')
        self.assertEqual(rec[1], '')
        self.assertEqual(c.get_current_size(), 0)

        try:
            n = c.set('bad key')
        except db.DBNotFoundError, val:
            if sys.version_info < (2, 6) :
                self.assertEqual(val[0], db.DB_NOTFOUND)
            else :
                self.assertEqual(val.args[0], db.DB_NOTFOUND)
            if verbose: print val
        else:
            if set_raises_error:
                self.fail("expected exception")
            if n is not None:
                self.fail("expected None: %r" % (n,))

        rec = c.get_both('0404', self.makeData('0404'))
        self.assertEqual(rec, ('0404', self.makeData('0404')))

        try:
            n = c.get_both('0404', 'bad data')
        except db.DBNotFoundError, val:
            if sys.version_info < (2, 6) :
                self.assertEqual(val[0], db.DB_NOTFOUND)
            else :
                self.assertEqual(val.args[0], db.DB_NOTFOUND)
            if verbose: print val
        else:
            if get_raises_error:
                self.fail("expected exception")
            if n is not None:
                self.fail("expected None: %r" % (n,))

        if self.d.get_type() == db.DB_BTREE:
            rec = c.set_range('011')
            if verbose:
                print "searched for '011', found: ", rec

            rec = c.set_range('011',dlen=0,doff=0)
            if verbose:
                print "searched (partial) for '011', found: ", rec
            if rec[1] != '': self.fail('expected empty data portion')

            ev = c.set_range('empty value')
            if verbose:
                print "search for 'empty value' returned", ev
            if ev[1] != '': self.fail('empty value lookup failed')

        c.set('0499')
        c.delete()
        try:
            rec = c.current()
        except db.DBKeyEmptyError, val:
            if get_raises_error:
                if sys.version_info < (2, 6) :
                    self.assertEqual(val[0], db.DB_KEYEMPTY)
                else :
                    self.assertEqual(val.args[0], db.DB_KEYEMPTY)
                if verbose: print val
            else:
                self.fail("unexpected DBKeyEmptyError")
        else:
            if get_raises_error:
                self.fail('DBKeyEmptyError exception expected')

        c.next()
        c2 = c.dup(db.DB_POSITION)
        self.assertEqual(c.current(), c2.current())

        c2.put('', 'a new value', db.DB_CURRENT)
        self.assertEqual(c.current(), c2.current())
        self.assertEqual(c.current()[1], 'a new value')

        c2.put('', 'er', db.DB_CURRENT, dlen=0, doff=5)
        self.assertEqual(c2.current()[1], 'a newer value')

        c.close()
        c2.close()
        if txn:
            txn.commit()

        # time to abuse the closed cursors and hope we don't crash
        methods_to_test = {
            'current': (),
            'delete': (),
            'dup': (db.DB_POSITION,),
            'first': (),
            'get': (0,),
            'next': (),
            'prev': (),
            'last': (),
            'put':('', 'spam', db.DB_CURRENT),
            'set': ("0505",),
        }
        for method, args in methods_to_test.items():
            try:
                if verbose:
                    print "attempting to use a closed cursor's %s method" % \
                          method
                # a bug may cause a NULL pointer dereference...
                getattr(c, method)(*args)
            except db.DBError, val:
                if sys.version_info < (2, 6) :
                    self.assertEqual(val[0], 0)
                else :
                    self.assertEqual(val.args[0], 0)
                if verbose: print val
            else:
                self.fail("no exception raised when using a buggy cursor's"
                          "%s method" % method)

        #
        # free cursor referencing a closed database, it should not barf:
        #
        oldcursor = self.d.cursor(txn=txn)
        self.d.close()

        # this would originally cause a segfault when the cursor for a
        # closed database was cleaned up.  it should not anymore.
        # SF pybsddb bug id 667343
        del oldcursor

    def test03b_SimpleCursorWithoutGetReturnsNone0(self):
        # same test but raise exceptions instead of returning None
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test03b_SimpleCursorStuffWithoutGetReturnsNone..." % \
                  self.__class__.__name__

        old = self.d.set_get_returns_none(0)
        self.assertEqual(old, 2)
        self.test03_SimpleCursorStuff(get_raises_error=1, set_raises_error=1)

    def test03b_SimpleCursorWithGetReturnsNone1(self):
        # same test but raise exceptions instead of returning None
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test03b_SimpleCursorStuffWithoutGetReturnsNone..." % \
                  self.__class__.__name__

        old = self.d.set_get_returns_none(1)
        self.test03_SimpleCursorStuff(get_raises_error=0, set_raises_error=1)


    def test03c_SimpleCursorGetReturnsNone2(self):
        # same test but raise exceptions instead of returning None
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test03c_SimpleCursorStuffWithoutSetReturnsNone..." % \
                  self.__class__.__name__

        old = self.d.set_get_returns_none(1)
        self.assertEqual(old, 2)
        old = self.d.set_get_returns_none(2)
        self.assertEqual(old, 1)
        self.test03_SimpleCursorStuff(get_raises_error=0, set_raises_error=0)

    if db.version() >= (4, 6):
        def test03d_SimpleCursorPriority(self) :
            c = self.d.cursor()
            c.set_priority(db.DB_PRIORITY_VERY_LOW)  # Positional
            self.assertEqual(db.DB_PRIORITY_VERY_LOW, c.get_priority())
            c.set_priority(priority=db.DB_PRIORITY_HIGH)  # Keyword
            self.assertEqual(db.DB_PRIORITY_HIGH, c.get_priority())
            c.close()

    #----------------------------------------

    def test04_PartialGetAndPut(self):
        d = self.d
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test04_PartialGetAndPut..." % \
                  self.__class__.__name__

        key = "partialTest"
        data = "1" * 1000 + "2" * 1000
        d.put(key, data)
        self.assertEqual(d.get(key), data)
        self.assertEqual(d.get(key, dlen=20, doff=990),
                ("1" * 10) + ("2" * 10))

        d.put("partialtest2", ("1" * 30000) + "robin" )
        self.assertEqual(d.get("partialtest2", dlen=5, doff=30000), "robin")

        # There seems to be a bug in DB here...  Commented out the test for
        # now.
        ##self.assertEqual(d.get("partialtest2", dlen=5, doff=30010), "")

        if self.dbsetflags != db.DB_DUP:
            # Partial put with duplicate records requires a cursor
            d.put(key, "0000", dlen=2000, doff=0)
            self.assertEqual(d.get(key), "0000")

            d.put(key, "1111", dlen=1, doff=2)
            self.assertEqual(d.get(key), "0011110")

    #----------------------------------------

    def test05_GetSize(self):
        d = self.d
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test05_GetSize..." % self.__class__.__name__

        for i in range(1, 50000, 500):
            key = "size%s" % i
            #print "before ", i,
            d.put(key, "1" * i)
            #print "after",
            self.assertEqual(d.get_size(key), i)
            #print "done"

    #----------------------------------------

    def test06_Truncate(self):
        d = self.d
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test06_Truncate..." % self.__class__.__name__

        d.put("abcde", "ABCDE");
        num = d.truncate()
        self.assertTrue(num >= 1, "truncate returned <= 0 on non-empty database")
        num = d.truncate()
        self.assertEqual(num, 0,
                "truncate on empty DB returned nonzero (%r)" % (num,))

    #----------------------------------------

    def test07_verify(self):
        # Verify bug solved in 4.7.3pre8
        self.d.close()
        d = db.DB(self.env)
        d.verify(self.filename)


    #----------------------------------------

    if db.version() >= (4, 6):
        def test08_exists(self) :
            self.d.put("abcde", "ABCDE")
            self.assertTrue(self.d.exists("abcde") == True,
                    "DB->exists() returns wrong value")
            self.assertTrue(self.d.exists("x") == False,
                    "DB->exists() returns wrong value")

    #----------------------------------------

    if db.version() >= (4, 7):
        def test_compact(self) :
            d = self.d
            self.assertEqual(0, d.compact(flags=db.DB_FREELIST_ONLY))
            self.assertEqual(0, d.compact(flags=db.DB_FREELIST_ONLY))
            d.put("abcde", "ABCDE");
            d.put("bcde", "BCDE");
            d.put("abc", "ABC");
            d.put("monty", "python");
            d.delete("abc")
            d.delete("bcde")
            d.compact(start='abcde', stop='monty', txn=None,
                    compact_fillpercent=42, compact_pages=1,
                    compact_timeout=50000000,
                    flags=db.DB_FREELIST_ONLY|db.DB_FREE_SPACE)

    #----------------------------------------

#----------------------------------------------------------------------


class BasicBTreeTestCase(BasicTestCase):
    dbtype = db.DB_BTREE


class BasicHashTestCase(BasicTestCase):
    dbtype = db.DB_HASH


class BasicBTreeWithThreadFlagTestCase(BasicTestCase):
    dbtype = db.DB_BTREE
    dbopenflags = db.DB_THREAD


class BasicHashWithThreadFlagTestCase(BasicTestCase):
    dbtype = db.DB_HASH
    dbopenflags = db.DB_THREAD


class BasicWithEnvTestCase(BasicTestCase):
    dbopenflags = db.DB_THREAD
    useEnv = 1
    envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK

    #----------------------------------------

    def test09_EnvRemoveAndRename(self):
        if not self.env:
            return

        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test09_EnvRemoveAndRename..." % self.__class__.__name__

        # can't rename or remove an open DB
        self.d.close()

        newname = self.filename + '.renamed'
        self.env.dbrename(self.filename, None, newname)
        self.env.dbremove(newname)

    #----------------------------------------

class BasicBTreeWithEnvTestCase(BasicWithEnvTestCase):
    dbtype = db.DB_BTREE


class BasicHashWithEnvTestCase(BasicWithEnvTestCase):
    dbtype = db.DB_HASH


#----------------------------------------------------------------------

class BasicTransactionTestCase(BasicTestCase):
    if (sys.version_info < (2, 7)) or ((sys.version_info >= (3, 0)) and
            (sys.version_info < (3, 2))) :
        def assertIn(self, a, b, msg=None) :
            return self.assertTrue(a in b, msg=msg)

    dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT
    useEnv = 1
    envflags = (db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
                db.DB_INIT_TXN)
    envsetflags = db.DB_AUTO_COMMIT


    def tearDown(self):
        self.txn.commit()
        BasicTestCase.tearDown(self)


    def populateDB(self):
        txn = self.env.txn_begin()
        BasicTestCase.populateDB(self, _txn=txn)

        self.txn = self.env.txn_begin()


    def test06_Transactions(self):
        d = self.d
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test06_Transactions..." % self.__class__.__name__

        self.assertEqual(d.get('new rec', txn=self.txn), None)
        d.put('new rec', 'this is a new record', self.txn)
        self.assertEqual(d.get('new rec', txn=self.txn),
                'this is a new record')
        self.txn.abort()
        self.assertEqual(d.get('new rec'), None)

        self.txn = self.env.txn_begin()

        self.assertEqual(d.get('new rec', txn=self.txn), None)
        d.put('new rec', 'this is a new record', self.txn)
        self.assertEqual(d.get('new rec', txn=self.txn),
                'this is a new record')
        self.txn.commit()
        self.assertEqual(d.get('new rec'), 'this is a new record')

        self.txn = self.env.txn_begin()
        c = d.cursor(self.txn)
        rec = c.first()
        count = 0
        while rec is not None:
            count = count + 1
            if verbose and count % 100 == 0:
                print rec
            rec = c.next()
        self.assertEqual(count, self._numKeys+1)

        c.close()                # Cursors *MUST* be closed before commit!
        self.txn.commit()

        # flush pending updates
        self.env.txn_checkpoint (0, 0, 0)

        statDict = self.env.log_stat(0);
        self.assertIn('magic', statDict)
        self.assertIn('version', statDict)
        self.assertIn('cur_file', statDict)
        self.assertIn('region_nowait', statDict)

        # must have at least one log file present:
        logs = self.env.log_archive(db.DB_ARCH_ABS | db.DB_ARCH_LOG)
        self.assertNotEqual(logs, None)
        for log in logs:
            if verbose:
                print 'log file: ' + log
            logs = self.env.log_archive(db.DB_ARCH_REMOVE)
            self.assertTrue(not logs)

        self.txn = self.env.txn_begin()

    #----------------------------------------

    if db.version() >= (4, 6):
        def test08_exists(self) :
            txn = self.env.txn_begin()
            self.d.put("abcde", "ABCDE", txn=txn)
            txn.commit()
            txn = self.env.txn_begin()
            self.assertTrue(self.d.exists("abcde", txn=txn) == True,
                    "DB->exists() returns wrong value")
            self.assertTrue(self.d.exists("x", txn=txn) == False,
                    "DB->exists() returns wrong value")
            txn.abort()

    #----------------------------------------

    def test09_TxnTruncate(self):
        d = self.d
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test09_TxnTruncate..." % self.__class__.__name__

        d.put("abcde", "ABCDE");
        txn = self.env.txn_begin()
        num = d.truncate(txn)
        self.assertTrue(num >= 1, "truncate returned <= 0 on non-empty database")
        num = d.truncate(txn)
        self.assertEqual(num, 0,
                "truncate on empty DB returned nonzero (%r)" % (num,))
        txn.commit()

    #----------------------------------------

    def test10_TxnLateUse(self):
        txn = self.env.txn_begin()
        txn.abort()
        try:
            txn.abort()
        except db.DBError, e:
            pass
        else:
            raise RuntimeError, "DBTxn.abort() called after DB_TXN no longer valid w/o an exception"

        txn = self.env.txn_begin()
        txn.commit()
        try:
            txn.commit()
        except db.DBError, e:
            pass
        else:
            raise RuntimeError, "DBTxn.commit() called after DB_TXN no longer valid w/o an exception"


    #----------------------------------------


    if db.version() >= (4, 4):
        def test_txn_name(self) :
            txn=self.env.txn_begin()
            self.assertEqual(txn.get_name(), "")
            txn.set_name("XXYY")
            self.assertEqual(txn.get_name(), "XXYY")
            txn.set_name("")
            self.assertEqual(txn.get_name(), "")
            txn.abort()

    #----------------------------------------


        def test_txn_set_timeout(self) :
            txn=self.env.txn_begin()
            txn.set_timeout(1234567, db.DB_SET_LOCK_TIMEOUT)
            txn.set_timeout(2345678, flags=db.DB_SET_TXN_TIMEOUT)
            txn.abort()

    #----------------------------------------

        def test_get_tx_max(self) :
            self.assertEqual(self.env.get_tx_max(), 30)

        def test_get_tx_timestamp(self) :
            self.assertEqual(self.env.get_tx_timestamp(), self._t)



class BTreeTransactionTestCase(BasicTransactionTestCase):
    dbtype = db.DB_BTREE

class HashTransactionTestCase(BasicTransactionTestCase):
    dbtype = db.DB_HASH



#----------------------------------------------------------------------

class BTreeRecnoTestCase(BasicTestCase):
    dbtype     = db.DB_BTREE
    dbsetflags = db.DB_RECNUM

    def test09_RecnoInBTree(self):
        d = self.d
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test09_RecnoInBTree..." % self.__class__.__name__

        rec = d.get(200)
        self.assertEqual(type(rec), type(()))
        self.assertEqual(len(rec), 2)
        if verbose:
            print "Record #200 is ", rec

        c = d.cursor()
        c.set('0200')
        num = c.get_recno()
        self.assertEqual(type(num), type(1))
        if verbose:
            print "recno of d['0200'] is ", num

        rec = c.current()
        self.assertEqual(c.set_recno(num), rec)

        c.close()



class BTreeRecnoWithThreadFlagTestCase(BTreeRecnoTestCase):
    dbopenflags = db.DB_THREAD

#----------------------------------------------------------------------

class BasicDUPTestCase(BasicTestCase):
    dbsetflags = db.DB_DUP

    def test10_DuplicateKeys(self):
        d = self.d
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test10_DuplicateKeys..." % \
                  self.__class__.__name__

        d.put("dup0", "before")
        for x in "The quick brown fox jumped over the lazy dog.".split():
            d.put("dup1", x)
        d.put("dup2", "after")

        data = d.get("dup1")
        self.assertEqual(data, "The")
        if verbose:
            print data

        c = d.cursor()
        rec = c.set("dup1")
        self.assertEqual(rec, ('dup1', 'The'))

        next_reg = c.next()
        self.assertEqual(next_reg, ('dup1', 'quick'))

        rec = c.set("dup1")
        count = c.count()
        self.assertEqual(count, 9)

        next_dup = c.next_dup()
        self.assertEqual(next_dup, ('dup1', 'quick'))

        rec = c.set('dup1')
        while rec is not None:
            if verbose:
                print rec
            rec = c.next_dup()

        c.set('dup1')
        rec = c.next_nodup()
        self.assertNotEqual(rec[0], 'dup1')
        if verbose:
            print rec

        c.close()



class BTreeDUPTestCase(BasicDUPTestCase):
    dbtype = db.DB_BTREE

class HashDUPTestCase(BasicDUPTestCase):
    dbtype = db.DB_HASH

class BTreeDUPWithThreadTestCase(BasicDUPTestCase):
    dbtype = db.DB_BTREE
    dbopenflags = db.DB_THREAD

class HashDUPWithThreadTestCase(BasicDUPTestCase):
    dbtype = db.DB_HASH
    dbopenflags = db.DB_THREAD


#----------------------------------------------------------------------

class BasicMultiDBTestCase(BasicTestCase):
    dbname = 'first'

    def otherType(self):
        if self.dbtype == db.DB_BTREE:
            return db.DB_HASH
        else:
            return db.DB_BTREE

    def test11_MultiDB(self):
        d1 = self.d
        if verbose:
            print '\n', '-=' * 30
            print "Running %s.test11_MultiDB..." % self.__class__.__name__

        d2 = db.DB(self.env)
        d2.open(self.filename, "second", self.dbtype,
                self.dbopenflags|db.DB_CREATE)
        d3 = db.DB(self.env)
        d3.open(self.filename, "third", self.otherType(),
                self.dbopenflags|db.DB_CREATE)

        for x in "The quick brown fox jumped over the lazy dog".split():
            d2.put(x, self.makeData(x))

        for x in string.letters:
            d3.put(x, x*70)

        d1.sync()
        d2.sync()
        d3.sync()
        d1.close()
        d2.close()
        d3.close()

        self.d = d1 = d2 = d3 = None

        self.d = d1 = db.DB(self.env)
        d1.open(self.filename, self.dbname, flags = self.dbopenflags)
        d2 = db.DB(self.env)
        d2.open(self.filename, "second",  flags = self.dbopenflags)
        d3 = db.DB(self.env)
        d3.open(self.filename, "third", flags = self.dbopenflags)

        c1 = d1.cursor()
        c2 = d2.cursor()
        c3 = d3.cursor()

        count = 0
        rec = c1.first()
        while rec is not None:
            count = count + 1
            if verbose and (count % 50) == 0:
                print rec
            rec = c1.next()
        self.assertEqual(count, self._numKeys)

        count = 0
        rec = c2.first()
        while rec is not None:
            count = count + 1
            if verbose:
                print rec
            rec = c2.next()
        self.assertEqual(count, 9)

        count = 0
        rec = c3.first()
        while rec is not None:
            count = count + 1
            if verbose:
                print rec
            rec = c3.next()
        self.assertEqual(count, len(string.letters))


        c1.close()
        c2.close()
        c3.close()

        d2.close()
        d3.close()



# Strange things happen if you try to use Multiple DBs per file without a
# DBEnv with MPOOL and LOCKing...

class BTreeMultiDBTestCase(BasicMultiDBTestCase):
    dbtype = db.DB_BTREE
    dbopenflags = db.DB_THREAD
    useEnv = 1
    envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK

class HashMultiDBTestCase(BasicMultiDBTestCase):
    dbtype = db.DB_HASH
    dbopenflags = db.DB_THREAD
    useEnv = 1
    envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK


class PrivateObject(unittest.TestCase) :
    def tearDown(self) :
        del self.obj

    def test01_DefaultIsNone(self) :
        self.assertEqual(self.obj.get_private(), None)

    def test02_assignment(self) :
        a = "example of private object"
        self.obj.set_private(a)
        b = self.obj.get_private()
        self.assertTrue(a is b)  # Object identity

    def test03_leak_assignment(self) :
        a = "example of private object"
        refcount = sys.getrefcount(a)
        self.obj.set_private(a)
        self.assertEqual(refcount+1, sys.getrefcount(a))
        self.obj.set_private(None)
        self.assertEqual(refcount, sys.getrefcount(a))

    def test04_leak_GC(self) :
        a = "example of private object"
        refcount = sys.getrefcount(a)
        self.obj.set_private(a)
        self.obj = None
        self.assertEqual(refcount, sys.getrefcount(a))

class DBEnvPrivateObject(PrivateObject) :
    def setUp(self) :
        self.obj = db.DBEnv()

class DBPrivateObject(PrivateObject) :
    def setUp(self) :
        self.obj = db.DB()

class CrashAndBurn(unittest.TestCase) :
    #def test01_OpenCrash(self) :
    #    # See http://bugs.python.org/issue3307
    #    self.assertRaises(db.DBInvalidArgError, db.DB, None, 65535)

    if db.version() < (4, 8) :
        def test02_DBEnv_dealloc(self):
            # http://bugs.python.org/issue3885
            import gc
            self.assertRaises(db.DBInvalidArgError, db.DBEnv, ~db.DB_RPCCLIENT)
            gc.collect()


#----------------------------------------------------------------------
#----------------------------------------------------------------------

def test_suite():
    suite = unittest.TestSuite()

    suite.addTest(unittest.makeSuite(VersionTestCase))
    suite.addTest(unittest.makeSuite(BasicBTreeTestCase))
    suite.addTest(unittest.makeSuite(BasicHashTestCase))
    suite.addTest(unittest.makeSuite(BasicBTreeWithThreadFlagTestCase))
    suite.addTest(unittest.makeSuite(BasicHashWithThreadFlagTestCase))
    suite.addTest(unittest.makeSuite(BasicBTreeWithEnvTestCase))
    suite.addTest(unittest.makeSuite(BasicHashWithEnvTestCase))
    suite.addTest(unittest.makeSuite(BTreeTransactionTestCase))
    suite.addTest(unittest.makeSuite(HashTransactionTestCase))
    suite.addTest(unittest.makeSuite(BTreeRecnoTestCase))
    suite.addTest(unittest.makeSuite(BTreeRecnoWithThreadFlagTestCase))
    suite.addTest(unittest.makeSuite(BTreeDUPTestCase))
    suite.addTest(unittest.makeSuite(HashDUPTestCase))
    suite.addTest(unittest.makeSuite(BTreeDUPWithThreadTestCase))
    suite.addTest(unittest.makeSuite(HashDUPWithThreadTestCase))
    suite.addTest(unittest.makeSuite(BTreeMultiDBTestCase))
    suite.addTest(unittest.makeSuite(HashMultiDBTestCase))
    suite.addTest(unittest.makeSuite(DBEnvPrivateObject))
    suite.addTest(unittest.makeSuite(DBPrivateObject))
    suite.addTest(unittest.makeSuite(CrashAndBurn))

    return suite


if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')