普通文本  |  624行  |  18.76 KB

"""Run all test cases.
"""

import sys
import os
import unittest
try:
    # For Pythons w/distutils pybsddb
    import bsddb3 as bsddb
except ImportError:
    # For Python 2.3
    import bsddb


if sys.version_info[0] >= 3 :
    charset = "iso8859-1"  # Full 8 bit

    class logcursor_py3k(object) :
        def __init__(self, env) :
            self._logcursor = env.log_cursor()

        def __getattr__(self, v) :
            return getattr(self._logcursor, v)

        def __next__(self) :
            v = getattr(self._logcursor, "next")()
            if v is not None :
                v = (v[0], v[1].decode(charset))
            return v

        next = __next__

        def first(self) :
            v = self._logcursor.first()
            if v is not None :
                v = (v[0], v[1].decode(charset))
            return v

        def last(self) :
            v = self._logcursor.last()
            if v is not None :
                v = (v[0], v[1].decode(charset))
            return v

        def prev(self) :
            v = self._logcursor.prev()
            if v is not None :
                v = (v[0], v[1].decode(charset))
            return v

        def current(self) :
            v = self._logcursor.current()
            if v is not None :
                v = (v[0], v[1].decode(charset))
            return v

        def set(self, lsn) :
            v = self._logcursor.set(lsn)
            if v is not None :
                v = (v[0], v[1].decode(charset))
            return v

    class cursor_py3k(object) :
        def __init__(self, db, *args, **kwargs) :
            self._dbcursor = db.cursor(*args, **kwargs)

        def __getattr__(self, v) :
            return getattr(self._dbcursor, v)

        def _fix(self, v) :
            if v is None : return None
            key, value = v
            if isinstance(key, bytes) :
                key = key.decode(charset)
            return (key, value.decode(charset))

        def __next__(self) :
            v = getattr(self._dbcursor, "next")()
            return self._fix(v)

        next = __next__

        def previous(self) :
            v = self._dbcursor.previous()
            return self._fix(v)

        def last(self) :
            v = self._dbcursor.last()
            return self._fix(v)

        def set(self, k) :
            if isinstance(k, str) :
                k = bytes(k, charset)
            v = self._dbcursor.set(k)
            return self._fix(v)

        def set_recno(self, num) :
            v = self._dbcursor.set_recno(num)
            return self._fix(v)

        def set_range(self, k, dlen=-1, doff=-1) :
            if isinstance(k, str) :
                k = bytes(k, charset)
            v = self._dbcursor.set_range(k, dlen=dlen, doff=doff)
            return self._fix(v)

        def dup(self, flags=0) :
            cursor = self._dbcursor.dup(flags)
            return dup_cursor_py3k(cursor)

        def next_dup(self) :
            v = self._dbcursor.next_dup()
            return self._fix(v)

        def next_nodup(self) :
            v = self._dbcursor.next_nodup()
            return self._fix(v)

        def put(self, key, data, flags=0, dlen=-1, doff=-1) :
            if isinstance(key, str) :
                key = bytes(key, charset)
            if isinstance(data, str) :
                value = bytes(data, charset)
            return self._dbcursor.put(key, data, flags=flags, dlen=dlen,
                    doff=doff)

        def current(self, flags=0, dlen=-1, doff=-1) :
            v = self._dbcursor.current(flags=flags, dlen=dlen, doff=doff)
            return self._fix(v)

        def first(self) :
            v = self._dbcursor.first()
            return self._fix(v)

        def pget(self, key=None, data=None, flags=0) :
            # Incorrect because key can be a bare number,
            # but enough to pass testsuite
            if isinstance(key, int) and (data is None) and (flags == 0) :
                flags = key
                key = None
            if isinstance(key, str) :
                key = bytes(key, charset)
            if isinstance(data, int) and (flags==0) :
                flags = data
                data = None
            if isinstance(data, str) :
                data = bytes(data, charset)
            v=self._dbcursor.pget(key=key, data=data, flags=flags)
            if v is not None :
                v1, v2, v3 = v
                if isinstance(v1, bytes) :
                    v1 = v1.decode(charset)
                if isinstance(v2, bytes) :
                    v2 = v2.decode(charset)

                v = (v1, v2, v3.decode(charset))

            return v

        def join_item(self) :
            v = self._dbcursor.join_item()
            if v is not None :
                v = v.decode(charset)
            return v

        def get(self, *args, **kwargs) :
            l = len(args)
            if l == 2 :
                k, f = args
                if isinstance(k, str) :
                    k = bytes(k, "iso8859-1")
                args = (k, f)
            elif l == 3 :
                k, d, f = args
                if isinstance(k, str) :
                    k = bytes(k, charset)
                if isinstance(d, str) :
                    d = bytes(d, charset)
                args =(k, d, f)

            v = self._dbcursor.get(*args, **kwargs)
            if v is not None :
                k, v = v
                if isinstance(k, bytes) :
                    k = k.decode(charset)
                v = (k, v.decode(charset))
            return v

        def get_both(self, key, value) :
            if isinstance(key, str) :
                key = bytes(key, charset)
            if isinstance(value, str) :
                value = bytes(value, charset)
            v=self._dbcursor.get_both(key, value)
            return self._fix(v)

    class dup_cursor_py3k(cursor_py3k) :
        def __init__(self, dbcursor) :
            self._dbcursor = dbcursor

    class DB_py3k(object) :
        def __init__(self, *args, **kwargs) :
            args2=[]
            for i in args :
                if isinstance(i, DBEnv_py3k) :
                    i = i._dbenv
                args2.append(i)
            args = tuple(args2)
            for k, v in kwargs.items() :
                if isinstance(v, DBEnv_py3k) :
                    kwargs[k] = v._dbenv

            self._db = bsddb._db.DB_orig(*args, **kwargs)

        def __contains__(self, k) :
            if isinstance(k, str) :
                k = bytes(k, charset)
            return getattr(self._db, "has_key")(k)

        def __getitem__(self, k) :
            if isinstance(k, str) :
                k = bytes(k, charset)
            v = self._db[k]
            if v is not None :
                v = v.decode(charset)
            return v

        def __setitem__(self, k, v) :
            if isinstance(k, str) :
                k = bytes(k, charset)
            if isinstance(v, str) :
                v = bytes(v, charset)
            self._db[k] = v

        def __delitem__(self, k) :
            if isinstance(k, str) :
                k = bytes(k, charset)
            del self._db[k]

        def __getattr__(self, v) :
            return getattr(self._db, v)

        def __len__(self) :
            return len(self._db)

        def has_key(self, k, txn=None) :
            if isinstance(k, str) :
                k = bytes(k, charset)
            return self._db.has_key(k, txn=txn)

        def set_re_delim(self, c) :
            if isinstance(c, str) :  # We can use a numeric value byte too
                c = bytes(c, charset)
            return self._db.set_re_delim(c)

        def set_re_pad(self, c) :
            if isinstance(c, str) :  # We can use a numeric value byte too
                c = bytes(c, charset)
            return self._db.set_re_pad(c)

        def get_re_source(self) :
            source = self._db.get_re_source()
            return source.decode(charset)

        def put(self, key, data, txn=None, flags=0, dlen=-1, doff=-1) :
            if isinstance(key, str) :
                key = bytes(key, charset)
            if isinstance(data, str) :
                value = bytes(data, charset)
            return self._db.put(key, data, flags=flags, txn=txn, dlen=dlen,
                    doff=doff)

        def append(self, value, txn=None) :
            if isinstance(value, str) :
                value = bytes(value, charset)
            return self._db.append(value, txn=txn)

        def get_size(self, key) :
            if isinstance(key, str) :
                key = bytes(key, charset)
            return self._db.get_size(key)

        def exists(self, key, *args, **kwargs) :
            if isinstance(key, str) :
                key = bytes(key, charset)
            return self._db.exists(key, *args, **kwargs)

        def get(self, key, default="MagicCookie", txn=None, flags=0, dlen=-1, doff=-1) :
            if isinstance(key, str) :
                key = bytes(key, charset)
            if default != "MagicCookie" :  # Magic for 'test_get_none.py'
                v=self._db.get(key, default=default, txn=txn, flags=flags,
                        dlen=dlen, doff=doff)
            else :
                v=self._db.get(key, txn=txn, flags=flags,
                        dlen=dlen, doff=doff)
            if (v is not None) and isinstance(v, bytes) :
                v = v.decode(charset)
            return v

        def pget(self, key, txn=None) :
            if isinstance(key, str) :
                key = bytes(key, charset)
            v=self._db.pget(key, txn=txn)
            if v is not None :
                v1, v2 = v
                if isinstance(v1, bytes) :
                    v1 = v1.decode(charset)

                v = (v1, v2.decode(charset))
            return v

        def get_both(self, key, value, txn=None, flags=0) :
            if isinstance(key, str) :
                key = bytes(key, charset)
            if isinstance(value, str) :
                value = bytes(value, charset)
            v=self._db.get_both(key, value, txn=txn, flags=flags)
            if v is not None :
                v = v.decode(charset)
            return v

        def delete(self, key, txn=None) :
            if isinstance(key, str) :
                key = bytes(key, charset)
            return self._db.delete(key, txn=txn)

        def keys(self) :
            k = self._db.keys()
            if len(k) and isinstance(k[0], bytes) :
                return [i.decode(charset) for i in self._db.keys()]
            else :
                return k

        def items(self) :
            data = self._db.items()
            if not len(data) : return data
            data2 = []
            for k, v in data :
                if isinstance(k, bytes) :
                    k = k.decode(charset)
                data2.append((k, v.decode(charset)))
            return data2

        def associate(self, secondarydb, callback, flags=0, txn=None) :
            class associate_callback(object) :
                def __init__(self, callback) :
                    self._callback = callback

                def callback(self, key, data) :
                    if isinstance(key, str) :
                        key = key.decode(charset)
                    data = data.decode(charset)
                    key = self._callback(key, data)
                    if (key != bsddb._db.DB_DONOTINDEX) :
                        if isinstance(key, str) :
                            key = bytes(key, charset)
                        elif isinstance(key, list) :
                            key2 = []
                            for i in key :
                                if isinstance(i, str) :
                                    i = bytes(i, charset)
                                key2.append(i)
                            key = key2
                    return key

            return self._db.associate(secondarydb._db,
                    associate_callback(callback).callback, flags=flags,
                    txn=txn)

        def cursor(self, txn=None, flags=0) :
            return cursor_py3k(self._db, txn=txn, flags=flags)

        def join(self, cursor_list) :
            cursor_list = [i._dbcursor for i in cursor_list]
            return dup_cursor_py3k(self._db.join(cursor_list))

    class DBEnv_py3k(object) :
        def __init__(self, *args, **kwargs) :
            self._dbenv = bsddb._db.DBEnv_orig(*args, **kwargs)

        def __getattr__(self, v) :
            return getattr(self._dbenv, v)

        def log_cursor(self, flags=0) :
            return logcursor_py3k(self._dbenv)

        def get_lg_dir(self) :
            return self._dbenv.get_lg_dir().decode(charset)

        def get_tmp_dir(self) :
            return self._dbenv.get_tmp_dir().decode(charset)

        def get_data_dirs(self) :
            return tuple(
                (i.decode(charset) for i in self._dbenv.get_data_dirs()))

    class DBSequence_py3k(object) :
        def __init__(self, db, *args, **kwargs) :
            self._db=db
            self._dbsequence = bsddb._db.DBSequence_orig(db._db, *args, **kwargs)

        def __getattr__(self, v) :
            return getattr(self._dbsequence, v)

        def open(self, key, *args, **kwargs) :
            return self._dbsequence.open(bytes(key, charset), *args, **kwargs)

        def get_key(self) :
            return  self._dbsequence.get_key().decode(charset)

        def get_dbp(self) :
            return self._db

    import string
    string.letters=[chr(i) for i in xrange(65,91)]

    bsddb._db.DBEnv_orig = bsddb._db.DBEnv
    bsddb._db.DB_orig = bsddb._db.DB
    if bsddb.db.version() <= (4, 3) :
        bsddb._db.DBSequence_orig = None
    else :
        bsddb._db.DBSequence_orig = bsddb._db.DBSequence

    def do_proxy_db_py3k(flag) :
        flag2 = do_proxy_db_py3k.flag
        do_proxy_db_py3k.flag = flag
        if flag :
            bsddb.DBEnv = bsddb.db.DBEnv = bsddb._db.DBEnv = DBEnv_py3k
            bsddb.DB = bsddb.db.DB = bsddb._db.DB = DB_py3k
            bsddb._db.DBSequence = DBSequence_py3k
        else :
            bsddb.DBEnv = bsddb.db.DBEnv = bsddb._db.DBEnv = bsddb._db.DBEnv_orig
            bsddb.DB = bsddb.db.DB = bsddb._db.DB = bsddb._db.DB_orig
            bsddb._db.DBSequence = bsddb._db.DBSequence_orig
        return flag2

    do_proxy_db_py3k.flag = False
    do_proxy_db_py3k(True)

try:
    # For Pythons w/distutils pybsddb
    from bsddb3 import db, dbtables, dbutils, dbshelve, \
            hashopen, btopen, rnopen, dbobj
except ImportError:
    # For Python 2.3
    from bsddb import db, dbtables, dbutils, dbshelve, \
            hashopen, btopen, rnopen, dbobj

try:
    from bsddb3 import test_support
except ImportError:
    if sys.version_info[0] < 3 :
        from test import test_support
    else :
        from test import support as test_support


try:
    if sys.version_info[0] < 3 :
        from threading import Thread, currentThread
        del Thread, currentThread
    else :
        from threading import Thread, current_thread
        del Thread, current_thread
    have_threads = True
except ImportError:
    have_threads = False

verbose = 0
if 'verbose' in sys.argv:
    verbose = 1
    sys.argv.remove('verbose')

if 'silent' in sys.argv:  # take care of old flag, just in case
    verbose = 0
    sys.argv.remove('silent')


def print_versions():
    print
    print '-=' * 38
    print db.DB_VERSION_STRING
    print 'bsddb.db.version():   %s' % (db.version(), )
    if db.version() >= (5, 0) :
        print 'bsddb.db.full_version(): %s' %repr(db.full_version())
    print 'bsddb.db.__version__: %s' % db.__version__
    print 'bsddb.db.cvsid:       %s' % db.cvsid

    # Workaround for allowing generating an EGGs as a ZIP files.
    suffix="__"
    print 'py module:            %s' % getattr(bsddb, "__file"+suffix)
    print 'extension module:     %s' % getattr(bsddb, "__file"+suffix)

    print 'python version:       %s' % sys.version
    print 'My pid:               %s' % os.getpid()
    print '-=' * 38


def get_new_path(name) :
    get_new_path.mutex.acquire()
    try :
        import os
        path=os.path.join(get_new_path.prefix,
                name+"_"+str(os.getpid())+"_"+str(get_new_path.num))
        get_new_path.num+=1
    finally :
        get_new_path.mutex.release()
    return path

def get_new_environment_path() :
    path=get_new_path("environment")
    import os
    try:
        os.makedirs(path,mode=0700)
    except os.error:
        test_support.rmtree(path)
        os.makedirs(path)
    return path

def get_new_database_path() :
    path=get_new_path("database")
    import os
    if os.path.exists(path) :
        os.remove(path)
    return path


# This path can be overriden via "set_test_path_prefix()".
import os, os.path
get_new_path.prefix=os.path.join(os.environ.get("TMPDIR",
    os.path.join(os.sep,"tmp")), "z-Berkeley_DB")
get_new_path.num=0

def get_test_path_prefix() :
    return get_new_path.prefix

def set_test_path_prefix(path) :
    get_new_path.prefix=path

def remove_test_path_directory() :
    test_support.rmtree(get_new_path.prefix)

if have_threads :
    import threading
    get_new_path.mutex=threading.Lock()
    del threading
else :
    class Lock(object) :
        def acquire(self) :
            pass
        def release(self) :
            pass
    get_new_path.mutex=Lock()
    del Lock



class PrintInfoFakeTest(unittest.TestCase):
    def testPrintVersions(self):
        print_versions()


# This little hack is for when this module is run as main and all the
# other modules import it so they will still be able to get the right
# verbose setting.  It's confusing but it works.
if sys.version_info[0] < 3 :
    import test_all
    test_all.verbose = verbose
else :
    import sys
    print >>sys.stderr, "Work to do!"


def suite(module_prefix='', timing_check=None):
    test_modules = [
        'test_associate',
        'test_basics',
        'test_dbenv',
        'test_db',
        'test_compare',
        'test_compat',
        'test_cursor_pget_bug',
        'test_dbobj',
        'test_dbshelve',
        'test_dbtables',
        'test_distributed_transactions',
        'test_early_close',
        'test_fileid',
        'test_get_none',
        'test_join',
        'test_lock',
        'test_misc',
        'test_pickle',
        'test_queue',
        'test_recno',
        'test_replication',
        'test_sequence',
        'test_thread',
        ]

    alltests = unittest.TestSuite()
    for name in test_modules:
        #module = __import__(name)
        # Do it this way so that suite may be called externally via
        # python's Lib/test/test_bsddb3.
        module = __import__(module_prefix+name, globals(), locals(), name)

        alltests.addTest(module.test_suite())
        if timing_check:
            alltests.addTest(unittest.makeSuite(timing_check))
    return alltests


def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(PrintInfoFakeTest))
    return suite


if __name__ == '__main__':
    print_versions()
    unittest.main(defaultTest='suite')