#!/usr/bin/python

"""Test script for dohdbm.  Derived from tests for dumbdbm."""

import sys
import pprint
import random
import unittest

import dohdbm

try:
    import compressed_file_mod
except ImportError:
    HAVE_COMPRESSED_FILE_MOD = False
else:
    HAVE_COMPRESSED_FILE_MOD = True

_FNAME = '@testfile'


def _delete_files():
    """Clean up files created by these tests."""
    return
#    for ext in [ "", ".temp" ]:
#        try:
#            os.unlink(_FNAME + ext)
#        except OSError:
#            pass


def make_used(var):
    """Convince pyflakes that var is used."""
    assert True or var


class DohDBMTestCase(unittest.TestCase):
    """Tests for dohdbm."""

    # pylint: disable=R0904
    # R0904: We're a test class - of course we have lots of public methods
    _dict = {
        dohdbm.string_to_binary('0'): dohdbm.string_to_binary(''),
        dohdbm.string_to_binary('a'): dohdbm.string_to_binary('Python:'),
        dohdbm.string_to_binary('b'): dohdbm.string_to_binary('Programming'),
        dohdbm.string_to_binary('c'): dohdbm.string_to_binary('the'),
        dohdbm.string_to_binary('d'): dohdbm.string_to_binary('way'),
        dohdbm.string_to_binary('f'): dohdbm.string_to_binary('Guido'),
        dohdbm.string_to_binary('g'): dohdbm.string_to_binary('intended')
        }

    def __init__(self, *args):
        """Initialize."""
        unittest.TestCase.__init__(self, *args)

    def test_creation(self):
        """Test creating a database."""
        self.set_up()
        file_ = dohdbm.open(_FNAME, 'n')
        self.assertEqual(list(file_.keys()), [])
        for key in self._dict:
            file_[key] = self._dict[key]
        self.read_helper(file_)
        file_.close()
        self.tear_down()

    def test_close_twice(self):
        """Test closing the database twice."""
        self.set_up()
        self.init_db()
        file_ = dohdbm.open(_FNAME, 'w')
        file_[dohdbm.string_to_binary('a')] = dohdbm.string_to_binary('b')
        self.assertEqual(file_[dohdbm.string_to_binary('a')], dohdbm.string_to_binary('b'))
        file_.close()
        file_.close()
        self.tear_down()

    def test_modification(self):
        """Test modifying a database."""
        self.set_up()
        self.init_db()
        file_ = dohdbm.open(_FNAME, 'w')
        self._dict[dohdbm.string_to_binary('g')] = \
            file_[dohdbm.string_to_binary('g')] = \
            dohdbm.string_to_binary("indented")
        self.read_helper(file_)
        file_.close()
        self.tear_down()

    def test_modification_of_ro_db(self):
        """Test modifying a database of a readonly database: should raise an exception."""
        self.set_up()
        self.init_db()
        file_ = dohdbm.open(_FNAME, 'r')
        with self.assertRaises(dohdbm.error):
            self._dict[dohdbm.string_to_binary('g')] = \
                file_[dohdbm.string_to_binary('g')] = \
                dohdbm.string_to_binary("indented")
        file_.close()
        self.tear_down()

    def test_del_from_ro_db(self):
        """Test modifying a database of a readonly database: should raise an exception."""
        self.set_up()
        self.init_db()
        file_ = dohdbm.open(_FNAME, 'r')
        with self.assertRaises(dohdbm.error):
            del self._dict[dohdbm.string_to_binary('g')]
            del file_[dohdbm.string_to_binary('g')]
        file_.close()
        self.tear_down()

    def test_read(self):
        """Test reading."""
        self.set_up()
        self.init_db()
        file_ = dohdbm.open(_FNAME, 'r')
        self.read_helper(file_)
        file_.close()
        self.tear_down()

    def test_keys(self):
        """Test getting the keys."""
        self.set_up()
        self.init_db()
        file_ = dohdbm.open(_FNAME)
        _unused = self.keys_helper(file_)
        make_used(_unused)
        file_.close()
        self.tear_down()

    def test_write_write_read(self):
        """Test for bug #482460."""
        self.set_up()
        file_ = dohdbm.open(_FNAME, 'n')
        file_[dohdbm.string_to_binary('1')] = dohdbm.string_to_binary('hello')
        file_[dohdbm.string_to_binary('1')] = dohdbm.string_to_binary('hello2')
        file_.close()
        file_ = dohdbm.open(_FNAME, 'w')
        self.assertEqual(file_[dohdbm.string_to_binary('1')], dohdbm.string_to_binary('hello2'))
        file_.close()
        self.tear_down()

    def read_helper(self, file_):
        """Help testing - read helper."""
        _unused = self.keys_helper(file_)
        make_used(_unused)
        for key in self._dict:
            self.assertEqual(self._dict[key], file_[key])

    def init_db(self):
        """Initialize a database with some test content."""
        file_ = dohdbm.open(_FNAME, 'n')
        for k in self._dict:
            file_[k] = self._dict[k]
        file_.close()

    def keys_helper(self, file_):
        """Get the keys, make sure they look reasonable - just a helper function."""
        keys = list(file_.keys())
        keys.sort()
        dkeys = list(self._dict.keys())
        dkeys.sort()
        self.assertEqual(keys, dkeys)
        return keys

    def test_compressed(self):
        """Test writing to and then reading from a compressed database."""
        if HAVE_COMPRESSED_FILE_MOD:
            cdb_writer = dohdbm.open('compressed.doh', 'n', backend_open=compressed_file_mod.Compressed_file)
            for k in self._dict:
                cdb_writer[k] = self._dict[k]
            cdb_writer.close()

            cdb_reader = dohdbm.open('compressed.doh', 'r', backend_open=compressed_file_mod.Compressed_file)
            assert len(self._dict.keys()) == len(cdb_reader.keys())
            for k in cdb_reader.keys():
                if cdb_reader[k] != self._dict[k]:
                    raise AssertionError('test_compressed: Key %s does not match' % k)
            cdb_reader.close()

    def test_random(self):
        """Perform randomized operations.  This doesn't make assumptions about what *might* fail."""
        self.set_up()
        dict_ = {}  # mirror the database
        for i in range(5):
            if i == 0:
                database = dohdbm.open(_FNAME, 'n')
            else:
                database = dohdbm.open(_FNAME, 'w')
            for _unused in range(100):
                k = random.choice('abcdefghijklm')
                if random.random() < 0.2:
                    if k in dict_:
                        del dict_[k]
                        del database[k]
                else:
                    value = random.choice('abc') * random.randrange(10)
                    bytes_key = dohdbm.string_to_binary(k)
                    bytes_value = dohdbm.string_to_binary(value)
                    dict_[bytes_key] = bytes_value
                    database[bytes_key] = bytes_value
                    self.assertEqual(database[bytes_key], bytes_value)
            database.close()

            database = dohdbm.open(_FNAME)
            expected = list(dict_.items())
            expected.sort()
            actual = list(database.items())
            actual.sort()
            if expected != actual:
                sys.stderr.write('Mismatch between dictionary and database:\n')
                sys.stderr.write('Expected:\n')
                sys.stderr.write('\n'.join('\t' + line for line in pprint.pformat(expected).split('\n')) + '\n')
                sys.stderr.write('Actual:\n')
                sys.stderr.write('\n'.join('\t' + line for line in pprint.pformat(actual).split('\n')) + '\n')
                raise AssertionError('test_random: expected != actual')
            database.close()
        self.tear_down()

    def tear_down(self):
        """Shut down our test info."""
        make_used(self)
        _delete_files()

    def set_up(self):
        """Set up our test info."""
        _delete_files()
        self.init_db()


def main():
    """Launch tests."""
    try:
        unittest.main()
    finally:
        _delete_files()


if __name__ == "__main__":
    main()