#!/usr/bin/python

"""Test script for dohdbm.  Derived from tests for dumbdbm"""

#mport os
import sys
import pprint
import random
#mport shutil
import unittest

import dohdbm
import python2x3

try:
    import compressed_file_mod
except ImportError:
    HAVE_COMPRESSED_FILE_MOD = False
else:
    HAVE_COMPRESSED_FILE_MOD = True

_FNAME = '@testfile'

def _delete_files():
    '''Clean up files created by these tests'''
    return
#    for ext in [ "", ".temp" ]:
#        try:
#            os.unlink(_FNAME + ext)
#        except OSError:
#            pass

class DohDBMTestCase(unittest.TestCase):
    '''Tests for dohdbm'''
    # pylint: disable=R0904
    # R0904: We're a test class - of course we have lots of public methods
    _dict = {
        python2x3.string_to_binary('0'): python2x3.string_to_binary(''),
        python2x3.string_to_binary('a'): python2x3.string_to_binary('Python:'),
        python2x3.string_to_binary('b'): python2x3.string_to_binary('Programming'),
        python2x3.string_to_binary('c'): python2x3.string_to_binary('the'),
        python2x3.string_to_binary('d'): python2x3.string_to_binary('way'),
        python2x3.string_to_binary('f'): python2x3.string_to_binary('Guido'),
        python2x3.string_to_binary('g'): python2x3.string_to_binary('intended')
        }

    def __init__(self, *args):
        unittest.TestCase.__init__(self, *args)

    def test_creation(self):
        '''Test creating a database'''
        self.set_up()
        file_ = dohdbm.open(_FNAME, 'n')
        self.assertEqual(list(file_.keys()), [])
        for key in self._dict:
            file_[key] = self._dict[key]
        self.read_helper(file_)
        file_.close()
        self.tear_down()

    def test_close_twice(self):
        '''Test closing the database twice'''
        self.set_up()
        self.init_db()
        file_ = dohdbm.open(_FNAME, 'w')
        file_[python2x3.string_to_binary('a')] = python2x3.string_to_binary('b')
        self.assertEqual(file_[python2x3.string_to_binary('a')], python2x3.string_to_binary('b'))
        file_.close()
        file_.close()
        self.tear_down()

    def test_modification(self):
        '''Test modifying a database'''
        self.set_up()
        self.init_db()
        file_ = dohdbm.open(_FNAME, 'w')
        self._dict[python2x3.string_to_binary('g')] = file_[python2x3.string_to_binary('g')] = python2x3.string_to_binary("indented")
        self.read_helper(file_)
        file_.close()
        self.tear_down()

    def test_read(self):
        '''Test reading'''
        self.set_up()
        self.init_db()
        file_ = dohdbm.open(_FNAME, 'r')
        self.read_helper(file_)
        file_.close()
        self.tear_down()

    def test_keys(self):
        '''Test getting the keys'''
        self.set_up()
        self.init_db()
        file_ = dohdbm.open(_FNAME)
        dummy = self.keys_helper(file_)
        file_.close()
        self.tear_down()

    def test_write_write_read(self):
        '''Test for bug #482460'''
        self.set_up()
        file_ = dohdbm.open(_FNAME, 'n')
        file_[python2x3.string_to_binary('1')] = python2x3.string_to_binary('hello')
        file_[python2x3.string_to_binary('1')] = python2x3.string_to_binary('hello2')
        file_.close()
        file_ = dohdbm.open(_FNAME, 'w')
        self.assertEqual(file_[python2x3.string_to_binary('1')], python2x3.string_to_binary('hello2'))
        file_.close()
        self.tear_down()

    def read_helper(self, file_):
        '''Help testing - read helper'''
        dummy = self.keys_helper(file_)
        for key in self._dict:
            self.assertEqual(self._dict[key], file_[key])

    def init_db(self):
        '''Initialize a database with some test content'''
        file_ = dohdbm.open(_FNAME, 'n')
        for k in self._dict:
            file_[k] = self._dict[k]
        file_.close()

    def keys_helper(self, file_):
        '''Get the keys, make sure they look reasonable - just a helper function'''
        keys = list(file_.keys())
        keys.sort()
        dkeys = list(self._dict.keys())
        dkeys.sort()
        self.assertEqual(keys, dkeys)
        return keys

    def test_compressed(self):
        '''Test writing to and then reading from a compressed database'''
        if HAVE_COMPRESSED_FILE_MOD:
            cdb_writer = dohdbm.open('compressed.doh', 'n', backend_open=compressed_file_mod.Compressed_file)
            for k in self._dict:
                cdb_writer[k] = self._dict[k]
            cdb_writer.close()

            cdb_reader = dohdbm.open('compressed.doh', 'r', backend_open=compressed_file_mod.Compressed_file)
            assert len(self._dict.keys()) == len(cdb_reader.keys())
            for k in cdb_reader.keys():
                if cdb_reader[k] != self._dict[k]:
                    raise AssertionError('test_compressed: Key %s does not match' % k)
            cdb_reader.close()

    def test_random(self):
        '''
        Perform randomized operations.  This doesn't make assumptions about
        what *might* fail.
        '''
        self.set_up()
        dict_ = {}  # mirror the database
        for i in range(5):
            if i == 0:
                database = dohdbm.open(_FNAME, 'n')
            else:
                database = dohdbm.open(_FNAME, 'w')
            for dummy in range(100):
                k = random.choice('abcdefghijklm')
                if random.random() < 0.2:
                    if k in dict_:
                        del dict_[k]
                        del database[k]
                else:
                    value = random.choice('abc') * random.randrange(10)
                    bytes_key = python2x3.string_to_binary(k)
                    bytes_value = python2x3.string_to_binary(value)
                    dict_[bytes_key] = bytes_value
                    database[bytes_key] = bytes_value
                    self.assertEqual(database[bytes_key], bytes_value)
            database.close()

            database = dohdbm.open(_FNAME)
            expected = list(dict_.items())
            expected.sort()
            actual = list(database.items())
            actual.sort()
            if expected != actual:
                sys.stderr.write('Mismatch between dictionary and database:\n')
                sys.stderr.write('Expected:\n')
                sys.stderr.write('\n'.join('\t' + line for line in pprint.pformat(expected).split('\n')) + '\n')
                sys.stderr.write('Actual:\n')
                sys.stderr.write('\n'.join('\t' + line for line in pprint.pformat(actual).split('\n')) + '\n')
                raise AssertionError('test_random: expected != actual')
            database.close()
        self.tear_down()

    def tear_down(self):
        '''Shut down our test info'''
        dummy = self
        _delete_files()

    def set_up(self):
        '''Set up our test info'''
        _delete_files()
        self.init_db()

def main():
    '''Main function'''
    try:
        unittest.main()
    finally:
        _delete_files()

if __name__ == "__main__":
    main()