#!/usr/bin/python

"""Test script for dohdbm.  Derived from tests for dumbdbm"""

#mport os
import sys
import pprint
#mport shutil
import unittest

import dohdbm
import python2x3

try:
	import compressed_file_mod
except ImportError:
	HAVE_COMPRESSED_FILE_MOD = False
else:
	HAVE_COMPRESSED_FILE_MOD = True

_FNAME = '@testfile'

def _delete_files():
	'''Clean up files created by these tests'''
	return
#	for ext in [ "", ".temp" ]:
#		try:
#			os.unlink(_FNAME + ext)
#		except OSError:
#			pass

class DohDBMTestCase(unittest.TestCase):
	'''Tests for dohdbm'''
	# pylint: disable=R0904
	# R0904: We're a test class - of course we have lots of public methods
	_dict = {
		python2x3.string_to_binary('0'): python2x3.string_to_binary(''),
		python2x3.string_to_binary('a'): python2x3.string_to_binary('Python:'),
		python2x3.string_to_binary('b'): python2x3.string_to_binary('Programming'),
		python2x3.string_to_binary('c'): python2x3.string_to_binary('the'),
		python2x3.string_to_binary('d'): python2x3.string_to_binary('way'),
		python2x3.string_to_binary('f'): python2x3.string_to_binary('Guido'),
		python2x3.string_to_binary('g'): python2x3.string_to_binary('intended')
		}

	def __init__(self, *args):
		unittest.TestCase.__init__(self, *args)

	def test_creation(self):
		'''Test creating a database'''
		self.set_up()
		file_ = dohdbm.open(_FNAME, 'n')
		self.assertEqual(list(file_.keys()), [])
		for key in self._dict:
			file_[key] = self._dict[key]
		self.read_helper(file_)
		file_.close()
		self.tear_down()

	def test_close_twice(self):
		'''Test closing the database twice'''
		self.set_up()
		self.init_db()
		file_ = dohdbm.open(_FNAME, 'w')
		file_[python2x3.string_to_binary('a')] = python2x3.string_to_binary('b')
		self.assertEqual(file_[python2x3.string_to_binary('a')], python2x3.string_to_binary('b'))
		file_.close()
		file_.close()
		self.tear_down()

	def test_modification(self):
		'''Test modifying a database'''
		self.set_up()
		self.init_db()
		file_ = dohdbm.open(_FNAME, 'w')
		self._dict[python2x3.string_to_binary('g')] = file_[python2x3.string_to_binary('g')] = python2x3.string_to_binary("indented")
		self.read_helper(file_)
		file_.close()
		self.tear_down()

	def test_read(self):
		'''Test reading'''
		self.set_up()
		self.init_db()
		file_ = dohdbm.open(_FNAME, 'r')
		self.read_helper(file_)
		file_.close()
		self.tear_down()

	def test_keys(self):
		'''Test getting the keys'''
		self.set_up()
		self.init_db()
		file_ = dohdbm.open(_FNAME)
		dummy = self.keys_helper(file_)
		file_.close()
		self.tear_down()

	def test_write_write_read(self):
		'''Test for bug #482460'''
		self.set_up()
		file_ = dohdbm.open(_FNAME, 'n')
		file_[python2x3.string_to_binary('1')] = python2x3.string_to_binary('hello')
		file_[python2x3.string_to_binary('1')] = python2x3.string_to_binary('hello2')
		file_.close()
		file_ = dohdbm.open(_FNAME, 'w')
		self.assertEqual(file_[python2x3.string_to_binary('1')], python2x3.string_to_binary('hello2'))
		file_.close()
		self.tear_down()

	def read_helper(self, file_):
		'''Help testing - read helper'''
		dummy = self.keys_helper(file_)
		for key in self._dict:
			self.assertEqual(self._dict[key], file_[key])

	def init_db(self):
		'''Initialize a database with some test content'''
		file_ = dohdbm.open(_FNAME, 'n')
		for k in self._dict:
			file_[k] = self._dict[k]
		file_.close()

	def keys_helper(self, file_):
		'''Get the keys, make sure they look reasonable - just a helper function'''
		keys = list(file_.keys())
		keys.sort()
		dkeys = list(self._dict.keys())
		dkeys.sort()
		self.assertEqual(keys, dkeys)
		return keys

	def test_compressed(self):
		'''Test writing to and then reading from a compressed database'''
		if HAVE_COMPRESSED_FILE_MOD:
			cdb_writer = dohdbm.open('compressed.doh', 'n', backend_open=compressed_file_mod.Compressed_file)
			for k in self._dict:
				cdb_writer[k] = self._dict[k]
			cdb_writer.close()

			cdb_reader = dohdbm.open('compressed.doh', 'r', backend_open=compressed_file_mod.Compressed_file)
			assert len(self._dict.keys()) == len(cdb_reader.keys())
			for k in cdb_reader.keys():
				if cdb_reader[k] != self._dict[k]:
					raise AssertionError('test_compressed: Key %s does not match' % k)
			cdb_reader.close()

	def test_random(self):
		'''
		Perform randomized operations.  This doesn't make assumptions about
		what *might* fail.
		'''
		import random
		self.set_up()
		dict_ = {}  # mirror the database
		for i in range(5):
			if i == 0:
				database = dohdbm.open(_FNAME, 'n')
			else:
				database = dohdbm.open(_FNAME, 'w')
			for dummy in range(100):
				k = random.choice('abcdefghijklm')
				if random.random() < 0.2:
					if k in dict_:
						del dict_[k]
						del database[k]
				else:
					value = random.choice('abc') * random.randrange(10)
					bytes_key = python2x3.string_to_binary(k)
					bytes_value = python2x3.string_to_binary(value)
					dict_[bytes_key] = bytes_value
					database[bytes_key] = bytes_value
					self.assertEqual(database[bytes_key], bytes_value)
			database.close()

			database = dohdbm.open(_FNAME)
			expected = list(dict_.items())
			expected.sort()
			actual = list(database.items())
			actual.sort()
			if expected != actual:
				sys.stderr.write('Mismatch between dictionary and database:\n')
				sys.stderr.write('Expected:\n')
				sys.stderr.write('\n'.join('\t' + line for line in pprint.pformat(expected).split('\n')) + '\n')
				sys.stderr.write('Actual:\n')
				sys.stderr.write('\n'.join('\t' + line for line in pprint.pformat(actual).split('\n')) + '\n')
				raise AssertionError('test_random: expected != actual')
			database.close()
		self.tear_down()

	def tear_down(self):
		'''Shut down our test info'''
		dummy = self
		_delete_files()

	def set_up(self):
		'''Set up our test info'''
		_delete_files()
		self.init_db()

def main():
	'''Main function'''
	try:
		unittest.main()
	finally:
		_delete_files()

if __name__ == "__main__":
	main()