source: coopr.colin/trunk/coopr/colin/tests/core/test_parallel.py @ 2201

Last change on this file since 2201 was 2201, checked in by wehart, 10 years ago

Update to Coopr to account for changes in PyUtilib? package names.

File size: 12.8 KB
Line 
1#
2# Unit Tests for coopr.opt.parallel (using the COLIN optimizers)
3#
4#
5
6import os
7import sys
8from os.path import abspath, dirname
9cooprdir = dirname(dirname(dirname(dirname(abspath(__file__)))))
10sys.path.insert(0, cooprdir)
11cooprdir += os.sep
12currdir = dirname(abspath(__file__))+os.sep
13
14import unittest
15from nose.tools import nottest
16import xml
17import coopr.opt
18import coopr.colin
19from coopr.opt import ResultsFormat, ProblemFormat
20import pyutilib.th
21import pyutilib.component.core
22import pyutilib.services
23
24
25class TestProblem1(coopr.colin.MixedIntOptProblem):
26
27    def __init__(self):
28        coopr.colin.MixedIntOptProblem.__init__(self)
29        self.real_lower=[0.0, -1.0, 1.0, None]
30        self.real_upper=[None, 0.0, 2.0, -1.0]
31        self.nreal=4
32
33    def function_value(self, point):
34        self.validate(point)
35        return point.reals[0] - point.reals[1] + (point.reals[2]-1.5)**2 + (point.reals[3]+2)**4
36
37
38class TestSolverManager(coopr.opt.parallel.AsynchronousSolverManager):
39
40    def __init__(self, **kwds):
41        kwds['type'] = 'smtest_type'
42        kwds['doc'] = 'TestASM Documentation'
43        coopr.opt.parallel.AsynchronousSolverManager.__init__(self,**kwds)
44
45    def enabled(self):
46        return False
47
48
49class SolverManager_DelayedSerial(coopr.opt.parallel.AsynchronousSolverManager):
50
51    def clear(self):
52        """
53        Clear manager state
54        """
55        coopr.opt.parallel.AsynchronousSolverManager.clear(self)
56        self.delay=5
57        self._ah_list = []
58        self._opt = None
59        self._my_results = {}
60        self._ctr = 1
61        self._force_error = 0
62
63    def _perform_queue(self, ah, *args, **kwds):
64        """
65        Perform the queue operation.  This method returns the ActionHandle,
66        and the ActionHandle status indicates whether the queue was successful.
67        """
68        if 'opt' in kwds:
69            self._opt = kwds['opt']
70            del kwds['opt']
71        if self._opt is None:
72            raise ActionManagerError, "Undefined solver"
73        self._my_results[ah.id] = self._opt.solve(*args)
74        self._ah_list.append(ah)
75        return ah
76
77    def _perform_wait_any(self):
78        """
79        Perform the wait_any operation.  This method returns an
80        ActionHandle with the results of waiting.  If None is returned
81        then the ActionManager assumes that it can call this method again.
82        Note that an ActionHandle can be returned with a dummy value,
83        to indicate an error.
84        """
85        if self._force_error == 0:
86            self._ctr += 1
87            if self._ctr % self.delay != 0:
88                return None
89            if len(self._ah_list) > 0:
90                ah = self._ah_list.pop()
91                ah.status = coopr.opt.parallel.manager.ActionStatus.done
92                self.results[ah.id] = self._my_results[ah.id]
93                return ah
94            return coopr.opt.parallel.manager.ActionHandle(error=True, explanation="No queued evaluations available in the 'local' solver manager, which only executes solvers synchronously") 
95        elif self._force_error == 1:
96            #
97            # Wait Any returns an ActionHandle that indicates an error
98            #
99            return coopr.opt.parallel.manager.ActionHandle(error=True, explanation="Forced failure")
100        elif self._force_error == 2:
101            #
102            # Wait Any returns the correct ActionHandle, but no results are
103            # available.
104            #
105            return self._ah_list.pop()
106
107
108class Test(pyutilib.th.TestCase):
109
110
111    def run(self, result=None):
112        self.smtest_plugin = coopr.opt.SolverManagerRegistration("smtest", TestSolverManager)
113        unittest.TestCase.run(self,result)
114        self.smtest_plugin.deactivate()
115
116    def setUp(self):
117        self.do_setup(False)
118        pyutilib.services.TempfileManager.tempdir = currdir
119
120    def do_setup(self,flag):
121        pyutilib.services.TempfileManager.tempdir = currdir
122        self.ps = coopr.colin.solver.PatternSearch()
123
124    def tearDown(self):
125        pyutilib.services.TempfileManager.clear_tempfiles()
126
127    def test_solve1(self):
128        """ Test PatternSearch - TestProblem1 """
129        problem=TestProblem1()
130        self.ps.problem=problem
131        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
132        self.ps.reset()
133        results = self.ps.solve(logfile=currdir+"test_solve1.log")
134        results.write(filename=currdir+"test_solve1.txt",times=False)
135        self.failUnlessFileEqualsBaseline(currdir+"test_solve1.txt", currdir+"test1_ps.txt")
136        if os.path.exists(currdir+"test_solve1.log"):
137            os.remove(currdir+"test_solve1.log")
138
139    def test_serial1(self):
140        """ Test Serial EvalManager - TestProblem1 """
141        problem=TestProblem1()
142        self.ps.problem=problem
143        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
144        self.ps.reset()
145        mngr = coopr.opt.parallel.SolverManagerFactory("serial")
146        results = mngr.solve(opt=self.ps, logfile=currdir+"test_solve2.log")
147        results.write(filename=currdir+"test_solve2.txt",times=False)
148        self.failUnlessFileEqualsBaseline(currdir+"test_solve2.txt", currdir+"test1_ps.txt")
149        if os.path.exists(currdir+"test_solve2.log"):
150            os.remove(currdir+"test_solve2.log")
151
152    def test_serial_error1(self):
153        """ Test Serial SolverManager - Error with no optimizer"""
154        problem=TestProblem1()
155        self.ps.problem=problem
156        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
157        self.ps.reset()
158        mngr = coopr.opt.parallel.SolverManagerFactory("serial")
159        try:
160            results = mngr.solve(logfile=currdir+"test_solve3.log")
161            self.fail("Expected error")
162        except coopr.opt.parallel.manager.ActionManagerError:
163            pass
164
165    def test_serial_error2(self):
166        """ Test Serial SolverManager - Error with no queue solves"""
167        problem=TestProblem1()
168        self.ps.problem=problem
169        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
170        self.ps.reset()
171        mngr = coopr.opt.parallel.SolverManagerFactory("serial")
172        results = mngr.solve(opt=self.ps, logfile=currdir+"test_solve3.log")
173        if mngr.wait_any() != coopr.opt.parallel.manager.FailedActionHandle:
174            self.fail("Expected a failed action")
175        if os.path.exists(currdir+"test_solve2.log"):
176            os.remove(currdir+"test_solve2.log")
177
178    def test_solver_manager_factory(self):
179        """
180        Testing the coopr.opt solver factory
181        """
182        ans = coopr.opt.SolverManagerFactory()
183        ans.sort()
184        tmp = ["smtest"]
185        tmp.sort()
186        self.failUnless(set(tmp) <= set(ans))
187
188    def test_solver_manager_instance(self):
189        """
190        Testing that we get a specific solver instance
191        """
192        ans = coopr.opt.SolverManagerFactory("none")
193        self.failUnlessEqual(ans, None)
194        ans = coopr.opt.SolverManagerFactory("smtest")
195        self.failUnlessEqual(type(ans), TestSolverManager)
196        ans = coopr.opt.SolverManagerFactory("smtest", "mymock")
197        self.failUnlessEqual(type(ans), TestSolverManager)
198        self.failUnlessEqual(ans.name,  "mymock")
199
200    def test_solver_manager_registration(self):
201        """
202        Testing methods in the solverwriter factory registration process
203        """
204        ep = pyutilib.component.core.ExtensionPoint(coopr.opt.parallel.solver.ISolverManagerRegistration)
205        service = ep.service("smtest")
206        self.failUnlessEqual(service.type(), "smtest")
207
208    def test_delayed_serial1(self):
209        """
210        Use a solver manager that delays the evaluation of responses,
211        and thus allows a mock testing of the wait*() methods.
212        """
213        problem=TestProblem1()
214        self.ps.problem=problem
215        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
216        self.ps.reset()
217        mngr = SolverManager_DelayedSerial()
218        results = mngr.solve(opt=self.ps, logfile=currdir+"test_solve4.log")
219        results.write(filename=currdir+"test_solve4.txt",times=False)
220        self.failUnlessFileEqualsBaseline(currdir+"test_solve4.txt", currdir+
221"test1_ps.txt")
222        if os.path.exists(currdir+"test_solve4.log"):
223            os.remove(currdir+"test_solve4.log")
224
225    def test_delayed_serial2(self):
226        """
227        Use a solver manager that delays the evaluation of responses,
228        and _perform_wait_any() returns a failed action handle.
229        """
230        problem=TestProblem1()
231        self.ps.problem=problem
232        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
233        self.ps.reset()
234        mngr = SolverManager_DelayedSerial()
235        mngr._force_error = 1
236        try:
237            results = mngr.solve(opt=self.ps, logfile=currdir+"test_solve5.log")
238            self.fail("Expected error")
239        except coopr.opt.parallel.manager.ActionManagerError:
240            pass
241        if os.path.exists(currdir+"test_solve5.log"):
242            os.remove(currdir+"test_solve5.log")
243
244    def test_delayed_serial3(self):
245        """
246        Use a solver manager that delays the evaluation of responses,
247        and _perform_wait_any() returns a action handle, but no results are available.
248        """
249        problem=TestProblem1()
250        self.ps.problem=problem
251        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
252        self.ps.reset()
253        mngr = SolverManager_DelayedSerial()
254        mngr._force_error = 2
255        try:
256            results = mngr.solve(opt=self.ps, logfile=currdir+"test_solve6.log")
257            self.fail("Expected error")
258        except coopr.opt.parallel.manager.ActionManagerError:
259            pass
260        if os.path.exists(currdir+"test_solve6.log"):
261            os.remove(currdir+"test_solve6.log")
262
263    def test_delayed_serial4(self):
264        """
265        Use a solver manager that delays the evaluation of responses,
266        and verify that queue-ing multiple solves works.
267        """
268        problem=TestProblem1()
269        self.ps.problem=problem
270        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
271        self.ps.reset()
272        mngr = SolverManager_DelayedSerial()
273        ah_a = mngr.queue(opt=self.ps, logfile=currdir+"test_solve7a.log")
274        ah_b = mngr.queue(opt=self.ps, logfile=currdir+"test_solve7b.log")
275        ah_c = mngr.queue(opt=self.ps, logfile=currdir+"test_solve7c.log")
276
277        mngr.wait_all()
278
279        self.failUnlessEqual(ah_c.status, coopr.opt.parallel.manager.ActionStatus.done)
280        if os.path.exists(currdir+"test_solve7a.log"):
281            os.remove(currdir+"test_solve7a.log")
282        if os.path.exists(currdir+"test_solve7b.log"):
283            os.remove(currdir+"test_solve7b.log")
284        if os.path.exists(currdir+"test_solve7c.log"):
285            os.remove(currdir+"test_solve7c.log")
286
287    def test_delayed_serial5(self):
288        """
289        Use a solver manager that delays the evaluation of responses,
290        and verify that queue-ing multiple solves works.
291        """
292        problem=TestProblem1()
293        self.ps.problem=problem
294        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
295        self.ps.reset()
296        mngr = SolverManager_DelayedSerial()
297        ah_a = mngr.queue(opt=self.ps, logfile=currdir+"test_solve8a.log")
298        ah_b = mngr.queue(opt=self.ps, logfile=currdir+"test_solve8b.log")
299        ah_c = mngr.queue(opt=self.ps, logfile=currdir+"test_solve8c.log")
300
301        mngr.wait_all(ah_b)
302
303        self.failUnlessEqual(ah_b.status, coopr.opt.parallel.manager.ActionStatus.done)
304        self.failUnlessEqual(ah_a.status, coopr.opt.parallel.manager.ActionStatus.queued)
305        if os.path.exists(currdir+"test_solve8a.log"):
306            os.remove(currdir+"test_solve8a.log")
307        if os.path.exists(currdir+"test_solve8b.log"):
308            os.remove(currdir+"test_solve8b.log")
309        if os.path.exists(currdir+"test_solve8c.log"):
310            os.remove(currdir+"test_solve8c.log")
311
312    def test_delayed_serial6(self):
313        """
314        Use a solver manager that delays the evaluation of responses,
315        and verify that queue-ing multiple solves works.
316        """
317        problem=TestProblem1()
318        self.ps.problem=problem
319        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
320        self.ps.reset()
321        mngr = SolverManager_DelayedSerial()
322        ah_a = mngr.queue(opt=self.ps, logfile=currdir+"test_solve8a.log")
323        ah_b = mngr.queue(opt=self.ps, logfile=currdir+"test_solve8b.log")
324        ah_c = mngr.queue(opt=self.ps, logfile=currdir+"test_solve8c.log")
325
326        self.failUnlessEqual( mngr.num_queued(), 3)
327        mngr.wait_all( [ah_b] )
328
329        self.failUnlessEqual(mngr.get_status(ah_b), coopr.opt.parallel.manager.ActionStatus.done)
330        self.failUnlessEqual(mngr.get_status(ah_a), coopr.opt.parallel.manager.ActionStatus.queued)
331
332        if os.path.exists(currdir+"test_solve8a.log"):
333            os.remove(currdir+"test_solve8a.log")
334        if os.path.exists(currdir+"test_solve8b.log"):
335            os.remove(currdir+"test_solve8b.log")
336        if os.path.exists(currdir+"test_solve8c.log"):
337            os.remove(currdir+"test_solve8c.log")
338
339if __name__ == "__main__":
340    unittest.main()
341
Note: See TracBrowser for help on using the repository browser.