Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solving #266 with a new multiprocessing design #268

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ install:
- pip install docutils
- pip install numba
- pip install tables
pip install multiprocessing
pip install joblib


script:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setup(
name = 'spotpy',
version = '1.5.14',
version = '1.5.15a1',
description = 'A Statistical Parameter Optimization Tool',
long_description=open(os.path.join(os.path.dirname(__file__),
"README.rst")).read(),
Expand Down
2 changes: 1 addition & 1 deletion spotpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@
from . import describe # Contains some helper functions to describe samplers and set-ups
from .hydrology import signatures # Quantifies goodness of fit between simulation and evaluation data with hydrological signatures

__version__ = '1.5.14'
__version__ = '1.5.15a1'
13 changes: 4 additions & 9 deletions spotpy/algorithms/_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,17 +271,10 @@ def __init__(self, spot_setup, dbname=None, dbformat=None, dbinit=True,
elif parallel == 'mpi':
from spotpy.parallel.mpi import ForEach

# MPC is based on pathos mutiprocessing and uses ordered map, so results are given back in the order
# MPC is based on mutiprocessing. mpc returns the results in order, umpc unordered, as the results are created
# as the parameters are
elif parallel == 'mpc':
elif parallel in ('mpc', 'umpc'):
from spotpy.parallel.mproc import ForEach

# UMPC is based on pathos mutiprocessing and uses unordered map, so results are given back in the order
# as the subprocesses are finished which may speed up the whole simulation process but is not recommended if
# objective functions do their calculation based on the order of the data because the order of the result is chaotic
# and randomized
elif parallel == 'umpc':
from spotpy.parallel.umproc import ForEach
else:
raise ValueError(
"'%s' is not a valid keyword for parallel processing" % parallel)
Expand All @@ -292,6 +285,8 @@ def __init__(self, spot_setup, dbname=None, dbformat=None, dbinit=True,
# to other functions. This is introduced for sceua to differentiate between burn in and
# the normal work on the chains
self.repeat = ForEach(self.simulate)
if parallel == 'umpc':
self.repeat.unordered = True

# method "save" needs to know whether objective function result is list or float, default is float
self.like_struct_typ = type(1.1)
Expand Down
4 changes: 2 additions & 2 deletions spotpy/analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,8 +803,8 @@ def plot_bestmodelrun(results,evaluation,fig_name ='Best_model_run.png'):
evaluation[i] = np.nan
plt.plot(evaluation,'ro',markersize=1, label='Observation data')
simulation_fields = get_simulation_fields(results)
bestindex,bestobjf = get_maxlikeindex(results,verbose=False)
plt.plot(list(results[simulation_fields][bestindex][0]),'b-',label='Obj='+str(round(bestobjf,2)))
bestindex,bestobjf = get_minlikeindex(results)
plt.plot(list(results[simulation_fields][bestindex]),'b',label='Obj='+str(round(bestobjf,2)))
plt.xlabel('Number of Observation Points')
plt.ylabel ('Simulated value')
plt.legend(loc='upper right')
Expand Down
1 change: 0 additions & 1 deletion spotpy/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def __dir__():

def __getattr__(name):
names = __dir__()
print(names)
if name in names:
try:
db_module = import_module('.' + name, __name__)
Expand Down
9 changes: 3 additions & 6 deletions spotpy/database/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,15 @@ def __init__(self, *args, **kwargs):
if kwargs.get('dbappend', False) is False:
print("* Database file '{}.csv' created.".format(self.dbname))
# Create a open file, which needs to be closed after the sampling
mode = 'w'
if sys.version_info.major < 3:
mode += 'b'
mode = 'w+'
self.db = io.open(self.dbname + '.csv', mode)
# write header line
self.db.write(unicode(','.join(self.header) + '\n'))
self.db.write((','.join(self.header) + '\n'))
else:
print("* Appending to database file '{}.csv'.".format(self.dbname))
# Continues writing file
mode = 'a'
if sys.version_info.major < 3:
mode += 'b'

self.db = io.open(self.dbname + '.csv', mode)

def save(self, objectivefunction, parameterlist, simulations=None, chains=1):
Expand Down
2 changes: 1 addition & 1 deletion spotpy/database/ram.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def finalize(self):
the same structure as a csv database.
"""
dt = {'names': self.header,
'formats': [np.float] * len(self.header)}
'formats': [float] * len(self.header)}
i = 0
Y = np.zeros(len(self.ram), dtype=dt)

Expand Down
3 changes: 2 additions & 1 deletion spotpy/examples/tutorial_dream_hymod.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from spotpy.analyser import plot_parameter_trace
from spotpy.analyser import plot_posterior_parameter_histogram
if __name__ == "__main__":
parallel ='seq'
parallel ='mpc'
# Initialize the Hymod example (will only work on Windows systems)
#spot_setup=spot_setup(parallel=parallel)
spot_setup=spot_setup(GausianLike)
Expand Down Expand Up @@ -48,6 +48,7 @@

# Load the results gained with the dream sampler, stored in DREAM_hymod.csv
results = spotpy.analyser.load_csv_results('DREAM_hymod')

# Get fields with simulation data
fields=[word for word in results.dtype.names if word.startswith('sim')]

Expand Down
51 changes: 26 additions & 25 deletions spotpy/examples/tutorial_parallel_computing_hymod.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,35 @@
# When you are using parallel = 'mpc' you need to have
#the if __name__ == "__main__": line in your script
if __name__ == "__main__":
rep = 200
#If you are working on a windows computer, this will be True
if 'win' in sys.platform:
parallel ='mpc'
from spotpy.examples.spot_setup_hymod_exe import spot_setup
rep = 2000
#If you are working on a windows computer, this will be True
if 'win' in sys.platform:
parallel ='mpc'
#from spotpy.examples.spot_setup_rosenbrock import spot_setup
from spotpy.examples.spot_setup_hymod_python import spot_setup


# If not you probably have a Mac or Unix system. Then save this file and start it
# from your terminal with "mpirun -c 20 python your_script.py"
else:
parallel = 'mpi'
from spotpy.examples.spot_setup_hymod_unix import spot_setup
# If not you probably have a Mac or Unix system. Then save this file and start it
# from your terminal with "mpirun -c 20 python your_script.py"
else:
parallel = 'mpi'
from spotpy.examples.spot_setup_hymod_unix import spot_setup

# Initialize the Hymod example (this runs on Windows systems only)
# Checkout the spot_setup_hymod_exe.py to see how you need to adopt
# your spot_setup class to run in parallel
# If your model in def simulation reads any files, make sure, they are
# unique for each cpu. Otherwise things get messed up...
spot_setup=spot_setup(parallel=parallel)
# Initialize the Hymod example (this runs on Windows systems only)
# Checkout the spot_setup_hymod_exe.py to see how you need to adopt
# your spot_setup class to run in parallel
# If your model in def simulation reads any files, make sure, they are
# unique for each cpu. Otherwise things get messed up...
spot_setup=spot_setup()

# Initialize a sampler that is suited for parallel computing (all except MLE, MCMC and SA)
sampler=spotpy.algorithms.mc(spot_setup, dbname='Parallel_hymod', dbformat='csv',
#Initialize a sampler that is suited for parallel computing (all except MLE, MCMC and SA)
sampler=spotpy.algorithms.dream(spot_setup, dbname='Parallel_hymod3', dbformat='csv',
parallel=parallel)
# Sample in parlallel
sampler.sample(rep)
# Sample in parlallel
sampler.sample(rep)

# Load results from file
results = spotpy.analyser.load_csv_results('Parallel_hymod')
print(len(results))
# Plot best model run
#spotpy.analyser.plot_bestmodelrun(results,spot_setup.evaluation())
# Load results from file
results = spotpy.analyser.load_csv_results('Parallel_hymod3')
print('File contains', len(results), 'runs')
# Plot best model run
spotpy.analyser.plot_bestmodelrun(results, spot_setup.evaluation())
2 changes: 1 addition & 1 deletion spotpy/examples/tutorial_sceua_hymod.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

#Select number of maximum allowed repetitions
rep=5000
sampler=spotpy.algorithms.sceua(spot_setup, dbname='SCEUA_hymod', dbformat='csv')
sampler=spotpy.algorithms.sceua(spot_setup, dbname='SCEUA_hymod', dbformat='csv', breakpoint='read', backup_every_rep=1000)

#Start the sampler, one can specify ngs, kstop, peps and pcento id desired
sampler.sample(rep, ngs=7, kstop=3, peps=0.1, pcento=0.1)
Expand Down
21 changes: 7 additions & 14 deletions spotpy/parallel/mproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,12 @@
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import pathos.multiprocessing as mp
from joblib import Parallel, delayed
import multiprocessing as mp

process_count = None

class PhaseChange(object):
"""
Object to identify a change of a simulation phase
"""
def __init__(self, phase):
self.phase = phase


class ForEach(object):
"""
ForEach is a classes for multiprocessed work based on a generater object which is given if __call__ is called
Expand All @@ -30,8 +24,7 @@ class ForEach(object):
def __init__(self, process):
self.size = process_count or mp.cpu_count()
self.process = process
self.phase=None
self.pool = mp.ProcessingPool(self.size)
self.phase = None

def is_idle(self):
return False
Expand All @@ -50,6 +43,6 @@ def f(self, job):
return data

def __call__(self, jobs):
results = self.pool.map(self.f, jobs)
for i in results:
yield i
results = Parallel(n_jobs=self.size)(delayed(self.f)(job) for job in jobs)
for res in results:
yield res
58 changes: 0 additions & 58 deletions spotpy/parallel/umproc.py

This file was deleted.

14 changes: 8 additions & 6 deletions tests/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,33 @@
class TestParallel(unittest.TestCase):
@classmethod
def setUpClass(self):
# How many digits to match in case of floating point answers
self.tolerance = 7
#Create samplers for every algorithm:
self.rep = 21
self.timeout = 10 #Given in Seconds

# Set number of repititions (not to small)
self.rep = 200
self.timeout = 10 #Given in Seconds
self.dbformat = "ram"

def test_seq(self):
print('spotpy', spotpy.__version__, 'parallel=seq')
sampler=spotpy.algorithms.mc(spot_setup(),parallel='seq', dbname='Rosen', dbformat=self.dbformat, sim_timeout=self.timeout)
sampler.sample(self.rep)
results = sampler.getdata()
self.assertEqual(len(results), self.rep)

def test_mpc(self):
sampler=spotpy.algorithms.mc(spot_setup(),parallel='mpc', dbname='Rosen', dbformat=self.dbformat, sim_timeout=self.timeout)
print('spotpy', spotpy.__version__, 'parallel=mpc')
sampler = spotpy.algorithms.mc(spot_setup(),parallel='mpc', dbname='Rosen', dbformat=self.dbformat, sim_timeout=self.timeout)
sampler.sample(self.rep)
results = sampler.getdata()
self.assertEqual(len(results), self.rep)

def test_umpc(self):
print('spotpy', spotpy.__version__, 'parallel=umpc')
sampler=spotpy.algorithms.mc(spot_setup(),parallel='umpc', dbname='Rosen', dbformat=self.dbformat, sim_timeout=self.timeout)
sampler.sample(self.rep)
results = sampler.getdata()
self.assertEqual(len(results), self.rep)


if __name__ == '__main__':
unittest.main(exit=False)