#!/usr/local/cpython-3.7/bin/python3 """An experiment with a simple kind of file lock.""" from __future__ import print_function import os import sys import time import fslock_mod def serial_test(): """Test serial use.""" fslock_mod.force_release_lock('serial') lock1 = fslock_mod.Fslock('serial') lock2 = fslock_mod.Fslock('serial') lock1.acquire() assert not lock2.acquire(max_attempts=5) lock1.release() return True def get_counter(): """Get the counter.""" with open('counter.txt', 'r') as counter_in: prior_value = int(counter_in.readline().rstrip()) return prior_value def put_counter(value): """Put the counter.""" with open('counter.txt', 'w') as counter_out: counter_out.write('%d\n' % value) def parallel_test(): """Test with parallelism - much harder than serial.""" fslock_mod.force_release_lock('parallel') lock_just_for_cleaning = fslock_mod.Fslock('parallel') lock_just_for_cleaning.force_release() # Count to num_processes*num_reps the hard way, to test our locking num_processes = 20 num_reps = 20 put_counter(0) for i in range(num_processes): pid = os.fork() if pid == 0: # child for repno in range(num_reps): lock = fslock_mod.Fslock('parallel') lock.acquire() put_counter(get_counter() + 1) lock.release(force=True) sys.exit(0) else: # parent - do nothing pass # Wait for the counting processes to all exit for processno in range(num_processes): os.wait() # Verify that we've counted to 100 final_value = get_counter() if final_value == num_processes * num_reps: return True else: sys.stderr.write('Final value is %d != %d\n' % (final_value, num_processes)) return False def context_manager_test(): """Test as a context manager.""" fslock_mod.force_release_lock('context-manager') with fslock_mod.Fslock('context-manager'): time.sleep(0.001) return True def lock_time_test(): """Test as a context manager.""" fslock_mod.force_release_lock('time') with fslock_mod.Fslock('time') as fslock: time.sleep(0.001) ctime = fslock.get_creation_time() if abs(time.time() - ctime) < 2.0: return True else: return False def is_locked_locked_test(): """Test if a locked lock looks locked according to fslock.is_locked_by_me().""" with fslock_mod.Fslock('is_locked_locked') as locked_fslock: if locked_fslock.is_locked_by_me(): all_good = True else: all_good = False sys.stderr.write('locked lock does not look locked.\n') return all_good def is_locked_not_locked_test(): """Test if a not-locked lock looks locked according to fslock.is_locked_by_anyone().""" not_locked_fslock = fslock_mod.Fslock('is_locked_not_locked') if not_locked_fslock.is_locked_by_anyone(): all_good = False sys.stderr.write('not_locked_fslock looks locked.\n') else: all_good = True return all_good def main(): """Test the experiment - main function.""" all_good = True all_good &= context_manager_test() all_good &= lock_time_test() all_good &= serial_test() # This one messes up pudb... For debugging try commenting it out. all_good &= parallel_test() # These two are actually only half of the picture, but I'm starting to have doubts about the approach here. all_good &= is_locked_locked_test() all_good &= is_locked_not_locked_test() if all_good: sys.stderr.write('%s: All tests passed\n' % sys.argv[0]) else: sys.stderr.write('%s: One or more tests failed\n' % sys.argv[0]) sys.exit(1) main()