diff --git a/.travis.yml b/.travis.yml index eda4af49..326616b9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -30,6 +30,8 @@ install: - pip install docutils - pip install numba - pip install tables + pip install multiprocessing + pip install joblib script: diff --git a/setup.py b/setup.py index 5dcda235..f57bda02 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name = 'spotpy', - version = '1.5.14', + version = '1.5.15a1', description = 'A Statistical Parameter Optimization Tool', long_description=open(os.path.join(os.path.dirname(__file__), "README.rst")).read(), diff --git a/spotpy/__init__.py b/spotpy/__init__.py index ac788820..7d1b6cd7 100644 --- a/spotpy/__init__.py +++ b/spotpy/__init__.py @@ -41,4 +41,4 @@ from . import describe # Contains some helper functions to describe samplers and set-ups from .hydrology import signatures # Quantifies goodness of fit between simulation and evaluation data with hydrological signatures -__version__ = '1.5.14' \ No newline at end of file +__version__ = '1.5.15a1' \ No newline at end of file diff --git a/spotpy/algorithms/_algorithm.py b/spotpy/algorithms/_algorithm.py index 3e029785..8139b24a 100644 --- a/spotpy/algorithms/_algorithm.py +++ b/spotpy/algorithms/_algorithm.py @@ -271,17 +271,10 @@ def __init__(self, spot_setup, dbname=None, dbformat=None, dbinit=True, elif parallel == 'mpi': from spotpy.parallel.mpi import ForEach - # MPC is based on pathos mutiprocessing and uses ordered map, so results are given back in the order + # MPC is based on mutiprocessing. mpc returns the results in order, umpc unordered, as the results are created # as the parameters are - elif parallel == 'mpc': + elif parallel in ('mpc', 'umpc'): from spotpy.parallel.mproc import ForEach - - # UMPC is based on pathos mutiprocessing and uses unordered map, so results are given back in the order - # as the subprocesses are finished which may speed up the whole simulation process but is not recommended if - # objective functions do their calculation based on the order of the data because the order of the result is chaotic - # and randomized - elif parallel == 'umpc': - from spotpy.parallel.umproc import ForEach else: raise ValueError( "'%s' is not a valid keyword for parallel processing" % parallel) @@ -292,6 +285,8 @@ def __init__(self, spot_setup, dbname=None, dbformat=None, dbinit=True, # to other functions. This is introduced for sceua to differentiate between burn in and # the normal work on the chains self.repeat = ForEach(self.simulate) + if parallel == 'umpc': + self.repeat.unordered = True # method "save" needs to know whether objective function result is list or float, default is float self.like_struct_typ = type(1.1) diff --git a/spotpy/analyser.py b/spotpy/analyser.py index aedafdfc..bfbc9782 100644 --- a/spotpy/analyser.py +++ b/spotpy/analyser.py @@ -803,8 +803,8 @@ def plot_bestmodelrun(results,evaluation,fig_name ='Best_model_run.png'): evaluation[i] = np.nan plt.plot(evaluation,'ro',markersize=1, label='Observation data') simulation_fields = get_simulation_fields(results) - bestindex,bestobjf = get_maxlikeindex(results,verbose=False) - plt.plot(list(results[simulation_fields][bestindex][0]),'b-',label='Obj='+str(round(bestobjf,2))) + bestindex,bestobjf = get_minlikeindex(results) + plt.plot(list(results[simulation_fields][bestindex]),'b',label='Obj='+str(round(bestobjf,2))) plt.xlabel('Number of Observation Points') plt.ylabel ('Simulated value') plt.legend(loc='upper right') diff --git a/spotpy/database/__init__.py b/spotpy/database/__init__.py index 8c539ba2..c59ea4f8 100644 --- a/spotpy/database/__init__.py +++ b/spotpy/database/__init__.py @@ -18,7 +18,6 @@ def __dir__(): def __getattr__(name): names = __dir__() - print(names) if name in names: try: db_module = import_module('.' + name, __name__) diff --git a/spotpy/database/csv.py b/spotpy/database/csv.py index 9fc7c596..1aa27866 100644 --- a/spotpy/database/csv.py +++ b/spotpy/database/csv.py @@ -23,18 +23,15 @@ def __init__(self, *args, **kwargs): if kwargs.get('dbappend', False) is False: print("* Database file '{}.csv' created.".format(self.dbname)) # Create a open file, which needs to be closed after the sampling - mode = 'w' - if sys.version_info.major < 3: - mode += 'b' + mode = 'w+' self.db = io.open(self.dbname + '.csv', mode) # write header line - self.db.write(unicode(','.join(self.header) + '\n')) + self.db.write((','.join(self.header) + '\n')) else: print("* Appending to database file '{}.csv'.".format(self.dbname)) # Continues writing file mode = 'a' - if sys.version_info.major < 3: - mode += 'b' + self.db = io.open(self.dbname + '.csv', mode) def save(self, objectivefunction, parameterlist, simulations=None, chains=1): diff --git a/spotpy/database/ram.py b/spotpy/database/ram.py index e5652c70..f5baff16 100644 --- a/spotpy/database/ram.py +++ b/spotpy/database/ram.py @@ -47,7 +47,7 @@ def finalize(self): the same structure as a csv database. """ dt = {'names': self.header, - 'formats': [np.float] * len(self.header)} + 'formats': [float] * len(self.header)} i = 0 Y = np.zeros(len(self.ram), dtype=dt) diff --git a/spotpy/examples/tutorial_dream_hymod.py b/spotpy/examples/tutorial_dream_hymod.py index ada56504..a1d2b830 100644 --- a/spotpy/examples/tutorial_dream_hymod.py +++ b/spotpy/examples/tutorial_dream_hymod.py @@ -17,7 +17,7 @@ from spotpy.analyser import plot_parameter_trace from spotpy.analyser import plot_posterior_parameter_histogram if __name__ == "__main__": - parallel ='seq' + parallel ='mpc' # Initialize the Hymod example (will only work on Windows systems) #spot_setup=spot_setup(parallel=parallel) spot_setup=spot_setup(GausianLike) @@ -48,6 +48,7 @@ # Load the results gained with the dream sampler, stored in DREAM_hymod.csv results = spotpy.analyser.load_csv_results('DREAM_hymod') + # Get fields with simulation data fields=[word for word in results.dtype.names if word.startswith('sim')] diff --git a/spotpy/examples/tutorial_parallel_computing_hymod.py b/spotpy/examples/tutorial_parallel_computing_hymod.py index 553cd3bb..b8107478 100644 --- a/spotpy/examples/tutorial_parallel_computing_hymod.py +++ b/spotpy/examples/tutorial_parallel_computing_hymod.py @@ -20,34 +20,35 @@ # When you are using parallel = 'mpc' you need to have #the if __name__ == "__main__": line in your script if __name__ == "__main__": - rep = 200 - #If you are working on a windows computer, this will be True - if 'win' in sys.platform: - parallel ='mpc' - from spotpy.examples.spot_setup_hymod_exe import spot_setup + rep = 2000 + #If you are working on a windows computer, this will be True + if 'win' in sys.platform: + parallel ='mpc' + #from spotpy.examples.spot_setup_rosenbrock import spot_setup + from spotpy.examples.spot_setup_hymod_python import spot_setup - # If not you probably have a Mac or Unix system. Then save this file and start it - # from your terminal with "mpirun -c 20 python your_script.py" - else: - parallel = 'mpi' - from spotpy.examples.spot_setup_hymod_unix import spot_setup + # If not you probably have a Mac or Unix system. Then save this file and start it + # from your terminal with "mpirun -c 20 python your_script.py" + else: + parallel = 'mpi' + from spotpy.examples.spot_setup_hymod_unix import spot_setup - # Initialize the Hymod example (this runs on Windows systems only) - # Checkout the spot_setup_hymod_exe.py to see how you need to adopt - # your spot_setup class to run in parallel - # If your model in def simulation reads any files, make sure, they are - # unique for each cpu. Otherwise things get messed up... - spot_setup=spot_setup(parallel=parallel) + # Initialize the Hymod example (this runs on Windows systems only) + # Checkout the spot_setup_hymod_exe.py to see how you need to adopt + # your spot_setup class to run in parallel + # If your model in def simulation reads any files, make sure, they are + # unique for each cpu. Otherwise things get messed up... + spot_setup=spot_setup() - # Initialize a sampler that is suited for parallel computing (all except MLE, MCMC and SA) - sampler=spotpy.algorithms.mc(spot_setup, dbname='Parallel_hymod', dbformat='csv', + #Initialize a sampler that is suited for parallel computing (all except MLE, MCMC and SA) + sampler=spotpy.algorithms.dream(spot_setup, dbname='Parallel_hymod3', dbformat='csv', parallel=parallel) - # Sample in parlallel - sampler.sample(rep) + # Sample in parlallel + sampler.sample(rep) - # Load results from file - results = spotpy.analyser.load_csv_results('Parallel_hymod') - print(len(results)) - # Plot best model run - #spotpy.analyser.plot_bestmodelrun(results,spot_setup.evaluation()) \ No newline at end of file + # Load results from file + results = spotpy.analyser.load_csv_results('Parallel_hymod3') + print('File contains', len(results), 'runs') + # Plot best model run + spotpy.analyser.plot_bestmodelrun(results, spot_setup.evaluation()) diff --git a/spotpy/examples/tutorial_sceua_hymod.py b/spotpy/examples/tutorial_sceua_hymod.py index abd2b380..ca3f1588 100644 --- a/spotpy/examples/tutorial_sceua_hymod.py +++ b/spotpy/examples/tutorial_sceua_hymod.py @@ -25,7 +25,7 @@ #Select number of maximum allowed repetitions rep=5000 - sampler=spotpy.algorithms.sceua(spot_setup, dbname='SCEUA_hymod', dbformat='csv') + sampler=spotpy.algorithms.sceua(spot_setup, dbname='SCEUA_hymod', dbformat='csv', breakpoint='read', backup_every_rep=1000) #Start the sampler, one can specify ngs, kstop, peps and pcento id desired sampler.sample(rep, ngs=7, kstop=3, peps=0.1, pcento=0.1) diff --git a/spotpy/parallel/mproc.py b/spotpy/parallel/mproc.py index 21094b59..dd4fb2fb 100644 --- a/spotpy/parallel/mproc.py +++ b/spotpy/parallel/mproc.py @@ -9,18 +9,12 @@ from __future__ import division from __future__ import print_function from __future__ import unicode_literals - -import pathos.multiprocessing as mp +from joblib import Parallel, delayed +import multiprocessing as mp process_count = None -class PhaseChange(object): - """ - Object to identify a change of a simulation phase - """ - def __init__(self, phase): - self.phase = phase - + class ForEach(object): """ ForEach is a classes for multiprocessed work based on a generater object which is given if __call__ is called @@ -30,8 +24,7 @@ class ForEach(object): def __init__(self, process): self.size = process_count or mp.cpu_count() self.process = process - self.phase=None - self.pool = mp.ProcessingPool(self.size) + self.phase = None def is_idle(self): return False @@ -50,6 +43,6 @@ def f(self, job): return data def __call__(self, jobs): - results = self.pool.map(self.f, jobs) - for i in results: - yield i + results = Parallel(n_jobs=self.size)(delayed(self.f)(job) for job in jobs) + for res in results: + yield res diff --git a/spotpy/parallel/umproc.py b/spotpy/parallel/umproc.py deleted file mode 100644 index e16bbed8..00000000 --- a/spotpy/parallel/umproc.py +++ /dev/null @@ -1,58 +0,0 @@ -''' -Copyright (c) 2018 by Tobias Houska - -This file is part of Statistical Parameter Estimation Tool (SPOTPY). - -:author: Benjamin Manns -''' -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function -from __future__ import unicode_literals - -import pathos.multiprocessing as mp - -process_count = None - -class PhaseChange(object): - """ - Object to identify a change of a simulation phase - """ - def __init__(self, phase): - self.phase = phase - -class ForEach(object): - """ - ForEach is a classes for multiprocessed work based on a generater object which is given if __call__ is called - We using the pathos multiprocessing module and the unordered map function where results are yield back while some - processes are still running. - """ - def __init__(self, process): - self.size = process_count or mp.cpu_count() - self.process = process - self.phase = None - self.pool = mp.ProcessingPool(self.size) - - def is_idle(self): - return False - - def terminate(self): - pass - - def start(self): - pass - - def setphase(self,phasename): - self.phase=phasename - - - def f(self, job): - data = self.process(job) - return data - - def __call__(self,jobs): - results = self.pool.uimap(self.f, jobs) - for i in results: - yield i - - diff --git a/tests/test_parallel.py b/tests/test_parallel.py index 58868643..b0cf8c83 100644 --- a/tests/test_parallel.py +++ b/tests/test_parallel.py @@ -15,31 +15,33 @@ class TestParallel(unittest.TestCase): @classmethod def setUpClass(self): - # How many digits to match in case of floating point answers - self.tolerance = 7 - #Create samplers for every algorithm: - self.rep = 21 - self.timeout = 10 #Given in Seconds + # Set number of repititions (not to small) + self.rep = 200 + self.timeout = 10 #Given in Seconds self.dbformat = "ram" def test_seq(self): + print('spotpy', spotpy.__version__, 'parallel=seq') sampler=spotpy.algorithms.mc(spot_setup(),parallel='seq', dbname='Rosen', dbformat=self.dbformat, sim_timeout=self.timeout) sampler.sample(self.rep) results = sampler.getdata() self.assertEqual(len(results), self.rep) def test_mpc(self): - sampler=spotpy.algorithms.mc(spot_setup(),parallel='mpc', dbname='Rosen', dbformat=self.dbformat, sim_timeout=self.timeout) + print('spotpy', spotpy.__version__, 'parallel=mpc') + sampler = spotpy.algorithms.mc(spot_setup(),parallel='mpc', dbname='Rosen', dbformat=self.dbformat, sim_timeout=self.timeout) sampler.sample(self.rep) results = sampler.getdata() self.assertEqual(len(results), self.rep) def test_umpc(self): + print('spotpy', spotpy.__version__, 'parallel=umpc') sampler=spotpy.algorithms.mc(spot_setup(),parallel='umpc', dbname='Rosen', dbformat=self.dbformat, sim_timeout=self.timeout) sampler.sample(self.rep) results = sampler.getdata() self.assertEqual(len(results), self.rep) + if __name__ == '__main__': unittest.main(exit=False)