From 1d4bbfc8cdbb1a2ff70837a5083f49d79c5322da Mon Sep 17 00:00:00 2001 From: Philipp Kraft Date: Wed, 14 Apr 2021 12:03:16 +0200 Subject: [PATCH 1/6] Changed format function in ram database from np.float to float to a avoid deprecation warning in Numpy >= 1.20 --- spotpy/database/ram.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spotpy/database/ram.py b/spotpy/database/ram.py index e5652c70..f5baff16 100644 --- a/spotpy/database/ram.py +++ b/spotpy/database/ram.py @@ -47,7 +47,7 @@ def finalize(self): the same structure as a csv database. """ dt = {'names': self.header, - 'formats': [np.float] * len(self.header)} + 'formats': [float] * len(self.header)} i = 0 Y = np.zeros(len(self.ram), dtype=dt) From 3638b7af1a30ea4a8612a472bf2fda46b5a6e776 Mon Sep 17 00:00:00 2001 From: Philipp Kraft Date: Wed, 14 Apr 2021 12:04:12 +0200 Subject: [PATCH 2/6] Removed a debugging print for import helper of database --- spotpy/database/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/spotpy/database/__init__.py b/spotpy/database/__init__.py index 8c539ba2..c59ea4f8 100644 --- a/spotpy/database/__init__.py +++ b/spotpy/database/__init__.py @@ -18,7 +18,6 @@ def __dir__(): def __getattr__(name): names = __dir__() - print(names) if name in names: try: db_module = import_module('.' + name, __name__) From 191ebf5186ccddef357372c7d175958476419978 Mon Sep 17 00:00:00 2001 From: Philipp Kraft Date: Wed, 14 Apr 2021 12:04:42 +0200 Subject: [PATCH 3/6] Pushed version to 1.5.15a1 --- setup.py | 2 +- spotpy/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 5dcda235..f57bda02 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name = 'spotpy', - version = '1.5.14', + version = '1.5.15a1', description = 'A Statistical Parameter Optimization Tool', long_description=open(os.path.join(os.path.dirname(__file__), "README.rst")).read(), diff --git a/spotpy/__init__.py b/spotpy/__init__.py index ac788820..7d1b6cd7 100644 --- a/spotpy/__init__.py +++ b/spotpy/__init__.py @@ -41,4 +41,4 @@ from . import describe # Contains some helper functions to describe samplers and set-ups from .hydrology import signatures # Quantifies goodness of fit between simulation and evaluation data with hydrological signatures -__version__ = '1.5.14' \ No newline at end of file +__version__ = '1.5.15a1' \ No newline at end of file From 6a8a2a784f52c1b43e9d3371da37987093ade26c Mon Sep 17 00:00:00 2001 From: Philipp Kraft Date: Wed, 14 Apr 2021 12:06:37 +0200 Subject: [PATCH 4/6] Removed pathos dependency for mproc by using standard multiprocessing. Unified code for ordered and unordered processing --- spotpy/algorithms/_algorithm.py | 13 +++----- spotpy/parallel/mproc.py | 31 ++++++++---------- spotpy/parallel/umproc.py | 58 --------------------------------- 3 files changed, 18 insertions(+), 84 deletions(-) delete mode 100644 spotpy/parallel/umproc.py diff --git a/spotpy/algorithms/_algorithm.py b/spotpy/algorithms/_algorithm.py index 3e029785..8139b24a 100644 --- a/spotpy/algorithms/_algorithm.py +++ b/spotpy/algorithms/_algorithm.py @@ -271,17 +271,10 @@ def __init__(self, spot_setup, dbname=None, dbformat=None, dbinit=True, elif parallel == 'mpi': from spotpy.parallel.mpi import ForEach - # MPC is based on pathos mutiprocessing and uses ordered map, so results are given back in the order + # MPC is based on mutiprocessing. mpc returns the results in order, umpc unordered, as the results are created # as the parameters are - elif parallel == 'mpc': + elif parallel in ('mpc', 'umpc'): from spotpy.parallel.mproc import ForEach - - # UMPC is based on pathos mutiprocessing and uses unordered map, so results are given back in the order - # as the subprocesses are finished which may speed up the whole simulation process but is not recommended if - # objective functions do their calculation based on the order of the data because the order of the result is chaotic - # and randomized - elif parallel == 'umpc': - from spotpy.parallel.umproc import ForEach else: raise ValueError( "'%s' is not a valid keyword for parallel processing" % parallel) @@ -292,6 +285,8 @@ def __init__(self, spot_setup, dbname=None, dbformat=None, dbinit=True, # to other functions. This is introduced for sceua to differentiate between burn in and # the normal work on the chains self.repeat = ForEach(self.simulate) + if parallel == 'umpc': + self.repeat.unordered = True # method "save" needs to know whether objective function result is list or float, default is float self.like_struct_typ = type(1.1) diff --git a/spotpy/parallel/mproc.py b/spotpy/parallel/mproc.py index 7594e9df..572602e5 100644 --- a/spotpy/parallel/mproc.py +++ b/spotpy/parallel/mproc.py @@ -5,22 +5,13 @@ :author: Philipp Kraft ''' -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals -import pathos.multiprocessing as mp + +import multiprocessing as mp process_count = None -class PhaseChange(object): - """ - Object to identify a change of a simulation phase - """ - def __init__(self, phase): - self.phase = phase - + class ForEach(object): """ ForEach is a classes for multiprocessed work based on a generater object which is given if __call__ is called @@ -30,8 +21,8 @@ class ForEach(object): def __init__(self, process): self.size = process_count or mp.cpu_count() self.process = process - self.phase=None - self.pool = mp.ProcessingPool(self.size) + self.phase = None + self.unordered = False def is_idle(self): return False @@ -50,6 +41,12 @@ def f(self, job): return data def __call__(self, jobs): - results = self.pool.imap(self.f, jobs) - for i in results: - yield i + with mp.Pool(self.size) as pool: + + if self.unordered: + results = pool.imap_unordered(self.f, jobs) + else: + results = pool.imap(self.f, jobs) + + for i in results: + yield i diff --git a/spotpy/parallel/umproc.py b/spotpy/parallel/umproc.py deleted file mode 100644 index e16bbed8..00000000 --- a/spotpy/parallel/umproc.py +++ /dev/null @@ -1,58 +0,0 @@ -''' -Copyright (c) 2018 by Tobias Houska - -This file is part of Statistical Parameter Estimation Tool (SPOTPY). - -:author: Benjamin Manns -''' -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals - -import pathos.multiprocessing as mp - -process_count = None - -class PhaseChange(object): - """ - Object to identify a change of a simulation phase - """ - def __init__(self, phase): - self.phase = phase - -class ForEach(object): - """ - ForEach is a classes for multiprocessed work based on a generater object which is given if __call__ is called - We using the pathos multiprocessing module and the unordered map function where results are yield back while some - processes are still running. - """ - def __init__(self, process): - self.size = process_count or mp.cpu_count() - self.process = process - self.phase = None - self.pool = mp.ProcessingPool(self.size) - - def is_idle(self): - return False - - def terminate(self): - pass - - def start(self): - pass - - def setphase(self,phasename): - self.phase=phasename - - - def f(self, job): - data = self.process(job) - return data - - def __call__(self,jobs): - results = self.pool.uimap(self.f, jobs) - for i in results: - yield i - - From f3345d8174a18c252dc51a41c41cf395d45c3295 Mon Sep 17 00:00:00 2001 From: Philipp Kraft Date: Wed, 14 Apr 2021 12:17:27 +0200 Subject: [PATCH 5/6] Some changes with the parallel test --- tests/test_parallel.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/test_parallel.py b/tests/test_parallel.py index 58868643..b0cf8c83 100644 --- a/tests/test_parallel.py +++ b/tests/test_parallel.py @@ -15,31 +15,33 @@ class TestParallel(unittest.TestCase): @classmethod def setUpClass(self): - # How many digits to match in case of floating point answers - self.tolerance = 7 - #Create samplers for every algorithm: - self.rep = 21 - self.timeout = 10 #Given in Seconds + # Set number of repititions (not to small) + self.rep = 200 + self.timeout = 10 #Given in Seconds self.dbformat = "ram" def test_seq(self): + print('spotpy', spotpy.__version__, 'parallel=seq') sampler=spotpy.algorithms.mc(spot_setup(),parallel='seq', dbname='Rosen', dbformat=self.dbformat, sim_timeout=self.timeout) sampler.sample(self.rep) results = sampler.getdata() self.assertEqual(len(results), self.rep) def test_mpc(self): - sampler=spotpy.algorithms.mc(spot_setup(),parallel='mpc', dbname='Rosen', dbformat=self.dbformat, sim_timeout=self.timeout) + print('spotpy', spotpy.__version__, 'parallel=mpc') + sampler = spotpy.algorithms.mc(spot_setup(),parallel='mpc', dbname='Rosen', dbformat=self.dbformat, sim_timeout=self.timeout) sampler.sample(self.rep) results = sampler.getdata() self.assertEqual(len(results), self.rep) def test_umpc(self): + print('spotpy', spotpy.__version__, 'parallel=umpc') sampler=spotpy.algorithms.mc(spot_setup(),parallel='umpc', dbname='Rosen', dbformat=self.dbformat, sim_timeout=self.timeout) sampler.sample(self.rep) results = sampler.getdata() self.assertEqual(len(results), self.rep) + if __name__ == '__main__': unittest.main(exit=False) From 3dedc781cb086ad7a803763f3feeb2a50a3febed Mon Sep 17 00:00:00 2001 From: thouska Date: Tue, 15 Jun 2021 15:04:58 +0200 Subject: [PATCH 6/6] Restructure to use joblib instead of pathos for mpc --- .travis.yml | 2 + spotpy/analyser.py | 4 +- spotpy/database/csv.py | 9 ++-- spotpy/examples/tutorial_dream_hymod.py | 3 +- .../tutorial_parallel_computing_hymod.py | 52 ++++++++++--------- spotpy/examples/tutorial_sceua_hymod.py | 2 +- spotpy/parallel/mproc.py | 20 +++---- 7 files changed, 45 insertions(+), 47 deletions(-) diff --git a/.travis.yml b/.travis.yml index eda4af49..326616b9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -30,6 +30,8 @@ install: - pip install docutils - pip install numba - pip install tables + pip install multiprocessing + pip install joblib script: diff --git a/spotpy/analyser.py b/spotpy/analyser.py index aedafdfc..bfbc9782 100644 --- a/spotpy/analyser.py +++ b/spotpy/analyser.py @@ -803,8 +803,8 @@ def plot_bestmodelrun(results,evaluation,fig_name ='Best_model_run.png'): evaluation[i] = np.nan plt.plot(evaluation,'ro',markersize=1, label='Observation data') simulation_fields = get_simulation_fields(results) - bestindex,bestobjf = get_maxlikeindex(results,verbose=False) - plt.plot(list(results[simulation_fields][bestindex][0]),'b-',label='Obj='+str(round(bestobjf,2))) + bestindex,bestobjf = get_minlikeindex(results) + plt.plot(list(results[simulation_fields][bestindex]),'b',label='Obj='+str(round(bestobjf,2))) plt.xlabel('Number of Observation Points') plt.ylabel ('Simulated value') plt.legend(loc='upper right') diff --git a/spotpy/database/csv.py b/spotpy/database/csv.py index 9fc7c596..1aa27866 100644 --- a/spotpy/database/csv.py +++ b/spotpy/database/csv.py @@ -23,18 +23,15 @@ def __init__(self, *args, **kwargs): if kwargs.get('dbappend', False) is False: print("* Database file '{}.csv' created.".format(self.dbname)) # Create a open file, which needs to be closed after the sampling - mode = 'w' - if sys.version_info.major < 3: - mode += 'b' + mode = 'w+' self.db = io.open(self.dbname + '.csv', mode) # write header line - self.db.write(unicode(','.join(self.header) + '\n')) + self.db.write((','.join(self.header) + '\n')) else: print("* Appending to database file '{}.csv'.".format(self.dbname)) # Continues writing file mode = 'a' - if sys.version_info.major < 3: - mode += 'b' + self.db = io.open(self.dbname + '.csv', mode) def save(self, objectivefunction, parameterlist, simulations=None, chains=1): diff --git a/spotpy/examples/tutorial_dream_hymod.py b/spotpy/examples/tutorial_dream_hymod.py index ada56504..a1d2b830 100644 --- a/spotpy/examples/tutorial_dream_hymod.py +++ b/spotpy/examples/tutorial_dream_hymod.py @@ -17,7 +17,7 @@ from spotpy.analyser import plot_parameter_trace from spotpy.analyser import plot_posterior_parameter_histogram if __name__ == "__main__": - parallel ='seq' + parallel ='mpc' # Initialize the Hymod example (will only work on Windows systems) #spot_setup=spot_setup(parallel=parallel) spot_setup=spot_setup(GausianLike) @@ -48,6 +48,7 @@ # Load the results gained with the dream sampler, stored in DREAM_hymod.csv results = spotpy.analyser.load_csv_results('DREAM_hymod') + # Get fields with simulation data fields=[word for word in results.dtype.names if word.startswith('sim')] diff --git a/spotpy/examples/tutorial_parallel_computing_hymod.py b/spotpy/examples/tutorial_parallel_computing_hymod.py index 5fe05ba7..a7a3e9a6 100644 --- a/spotpy/examples/tutorial_parallel_computing_hymod.py +++ b/spotpy/examples/tutorial_parallel_computing_hymod.py @@ -20,34 +20,36 @@ # When you are using parallel = 'mpc' you need to have #the if __name__ == "__main__": line in your script if __name__ == "__main__": - rep = 200 - #If you are working on a windows computer, this will be True - if 'win' in sys.platform: - parallel ='mpc' - from spotpy.examples.spot_setup_hymod_exe import spot_setup + rep = 2000 + #If you are working on a windows computer, this will be True + if 'win' in sys.platform: + parallel ='mpc' + #from spotpy.examples.spot_setup_rosenbrock import spot_setup + from spotpy.examples.spot_setup_hymod_python import spot_setup - # If not you probably have a Mac or Unix system. Then save this file and start it - # from your terminal with "mpirun -c 20 python your_script.py" - else: - parallel = 'mpi' - from spotpy.examples.spot_setup_hymod_unix import spot_setup + # If not you probably have a Mac or Unix system. Then save this file and start it + # from your terminal with "mpirun -c 20 python your_script.py" + else: + parallel = 'mpi' + from spotpy.examples.spot_setup_hymod_unix import spot_setup - # Initialize the Hymod example (this runs on Windows systems only) - # Checkout the spot_setup_hymod_exe.py to see how you need to adopt - # your spot_setup class to run in parallel - # If your model in def simulation reads any files, make sure, they are - # unique for each cpu. Otherwise things get messed up... - spot_setup=spot_setup(parallel=parallel) + # Initialize the Hymod example (this runs on Windows systems only) + # Checkout the spot_setup_hymod_exe.py to see how you need to adopt + # your spot_setup class to run in parallel + # If your model in def simulation reads any files, make sure, they are + # unique for each cpu. Otherwise things get messed up... + spot_setup=spot_setup() - # Initialize a sampler that is suited for parallel computing (all except MLE, MCMC and SA) - sampler=spotpy.algorithms.mc(spot_setup, dbname='Parallel_hymod', dbformat='csv', + #Initialize a sampler that is suited for parallel computing (all except MLE, MCMC and SA) + sampler=spotpy.algorithms.dream(spot_setup, dbname='Parallel_hymod3', dbformat='csv', parallel=parallel) - # Sample in parlallel - sampler.sample(rep) + # Sample in parlallel + sampler.sample(rep) - # Load results from file - #results = spotpy.analyser.load_csv_results('Parallel_hymod') - - # Plot best model run - #spotpy.analyser.plot_bestmodelrun(results,spot_setup.evaluation()) \ No newline at end of file + + # Load results from file + results = spotpy.analyser.load_csv_results('Parallel_hymod3') + print('File contains', len(results), 'runs') + # Plot best model run + spotpy.analyser.plot_bestmodelrun(results, spot_setup.evaluation()) diff --git a/spotpy/examples/tutorial_sceua_hymod.py b/spotpy/examples/tutorial_sceua_hymod.py index abd2b380..ca3f1588 100644 --- a/spotpy/examples/tutorial_sceua_hymod.py +++ b/spotpy/examples/tutorial_sceua_hymod.py @@ -25,7 +25,7 @@ #Select number of maximum allowed repetitions rep=5000 - sampler=spotpy.algorithms.sceua(spot_setup, dbname='SCEUA_hymod', dbformat='csv') + sampler=spotpy.algorithms.sceua(spot_setup, dbname='SCEUA_hymod', dbformat='csv', breakpoint='read', backup_every_rep=1000) #Start the sampler, one can specify ngs, kstop, peps and pcento id desired sampler.sample(rep, ngs=7, kstop=3, peps=0.1, pcento=0.1) diff --git a/spotpy/parallel/mproc.py b/spotpy/parallel/mproc.py index 572602e5..dd4fb2fb 100644 --- a/spotpy/parallel/mproc.py +++ b/spotpy/parallel/mproc.py @@ -5,8 +5,11 @@ :author: Philipp Kraft ''' - - +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals +from joblib import Parallel, delayed import multiprocessing as mp process_count = None @@ -22,7 +25,6 @@ def __init__(self, process): self.size = process_count or mp.cpu_count() self.process = process self.phase = None - self.unordered = False def is_idle(self): return False @@ -41,12 +43,6 @@ def f(self, job): return data def __call__(self, jobs): - with mp.Pool(self.size) as pool: - - if self.unordered: - results = pool.imap_unordered(self.f, jobs) - else: - results = pool.imap(self.f, jobs) - - for i in results: - yield i + results = Parallel(n_jobs=self.size)(delayed(self.f)(job) for job in jobs) + for res in results: + yield res