Skip to content

Commit

Permalink
add file locking to results file
Browse files Browse the repository at this point in the history
  • Loading branch information
enarjord committed Oct 5, 2024
1 parent 3a267a9 commit 58b05d7
Showing 1 changed file with 40 additions and 2 deletions.
42 changes: 40 additions & 2 deletions src/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
from deap import base, creator, tools, algorithms
from contextlib import contextmanager
import tempfile
import time
import fcntl


def create_shared_memory_file(hlcvs):
Expand Down Expand Up @@ -181,6 +183,26 @@ def __init__(self, shared_memory_file, hlcvs_shape, hlcvs_dtype, config, mss):
self.shared_hlcvs_np = self.mmap_context.__enter__()
self.config = config
_, self.exchange_params, self.backtest_params = prep_backtest_args(config, mss)
self.lock_filepath = self.config["lock_filepath"]

@contextmanager
def file_lock(self, timeout=1):
start_time = time.time()
while True:
try:
with open(self.lock_filepath, "w") as lock_file:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
try:
yield
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
break
except IOError:
if time.time() - start_time > timeout:
logging.warning(f"Timeout while waiting for lock. Proceeding without lock.")
yield
break
time.sleep(0.1)

def evaluate(self, individual):
config = individual_to_config(individual, template=self.config)
Expand All @@ -197,10 +219,18 @@ def evaluate(self, individual):
)
w_0, w_1 = self.calc_fitness(analysis)
analysis.update({"w_0": w_0, "w_1": w_1})
with open(self.config["results_filename"], "a") as f:
f.write(json.dumps(denumpyize({"analysis": analysis, "config": config})) + "\n")
self.write_results({"analysis": analysis, "config": config})
return w_0, w_1

def write_results(self, data):
with self.file_lock():
try:
with open(self.config["results_filename"], "a") as f:
json.dump(denumpyize(data), f)
f.write("\n")
except Exception as e:
logging.error(f"Error writing results: {e}")

def calc_fitness(self, analysis):
modifier = 0.0
for i, key in [
Expand Down Expand Up @@ -320,6 +350,8 @@ async def main():
config["results_filename"] = make_get_filepath(
f"optimize_results/{date_fname}_{coins_fname}_{hash_snippet}_all_results.txt"
)
config["lock_filepath"] = config["results_filename"] + ".lock"

try:
shared_memory_file = create_shared_memory_file(hlcvs)
evaluator = Evaluator(shared_memory_file, hlcvs.shape, hlcvs.dtype, config, mss)
Expand Down Expand Up @@ -436,6 +468,12 @@ def create_individual():
logging.error(f"An error occurred: {e}")
traceback.print_exc()
finally:
if os.path.exists(config["lock_filepath"]):
try:
os.remove(config["lock_filepath"])
logging.info(f"Removed lock file")
except Exception as e1:
logging.error(f"Failed to remove lock file {e1}")
if "pool" in locals():
logging.info("Closing and terminating the process pool...")
pool.close()
Expand Down

0 comments on commit 58b05d7

Please sign in to comment.