source: trunk/test/core/colin/test_parallel.py @ 1657

Last change on this file since 1657 was 1657, checked in by wehart, 11 years ago

Removing coopr.opt-depricated, and
renaming coopr.core to coopr.opt.

Unfortunately, plugins are still not being properly loaded, so
some of the tests are failing.

File size: 12.7 KB
Line 
1#
2# Unit Tests for coopr.opt.parallel (using the COLIN optimizers)
3#
4#
5
6import os
7import sys
8from os.path import abspath, dirname
9cooprdir = dirname(dirname(dirname(dirname(abspath(__file__)))))
10sys.path.insert(0, cooprdir)
11cooprdir += os.sep
12currdir = dirname(abspath(__file__))+os.sep
13
14import unittest
15from nose.tools import nottest
16import pyutilib_th
17import coopr.opt
18import xml
19from coopr.opt import ResultsFormat, ProblemFormat
20import pyutilib
21
22
23class TestProblem1(coopr.opt.colin.MixedIntOptProblem):
24
25    def __init__(self):
26        coopr.opt.colin.MixedIntOptProblem.__init__(self)
27        self.real_lower=[0.0, -1.0, 1.0, None]
28        self.real_upper=[None, 0.0, 2.0, -1.0]
29        self.nreal=4
30
31    def function_value(self, point):
32        self.validate(point)
33        return point.reals[0] - point.reals[1] + (point.reals[2]-1.5)**2 + (point.reals[3]+2)**4
34
35
36class TestSolverManager(coopr.opt.parallel.AsynchronousSolverManager):
37
38    def __init__(self, **kwds):
39        kwds['type'] = 'smtest_type'
40        kwds['doc'] = 'TestASM Documentation'
41        coopr.opt.parallel.AsynchronousSolverManager.__init__(self,**kwds)
42
43    def enabled(self):
44        return False
45
46
47class SolverManager_DelayedSerial(coopr.opt.parallel.AsynchronousSolverManager):
48
49    def clear(self):
50        """
51        Clear manager state
52        """
53        coopr.opt.parallel.AsynchronousSolverManager.clear(self)
54        self.delay=5
55        self._ah_list = []
56        self._opt = None
57        self._my_results = {}
58        self._ctr = 1
59        self._force_error = 0
60
61    def _perform_queue(self, ah, *args, **kwds):
62        """
63        Perform the queue operation.  This method returns the ActionHandle,
64        and the ActionHandle status indicates whether the queue was successful.
65        """
66        if 'opt' in kwds:
67            self._opt = kwds['opt']
68            del kwds['opt']
69        if self._opt is None:
70            raise ActionManagerError, "Undefined solver"
71        self._my_results[ah.id] = self._opt.solve(*args)
72        self._ah_list.append(ah)
73        return ah
74
75    def _perform_wait_any(self):
76        """
77        Perform the wait_any operation.  This method returns an
78        ActionHandle with the results of waiting.  If None is returned
79        then the ActionManager assumes that it can call this method again.
80        Note that an ActionHandle can be returned with a dummy value,
81        to indicate an error.
82        """
83        if self._force_error == 0:
84            self._ctr += 1
85            if self._ctr % self.delay != 0:
86                return None
87            if len(self._ah_list) > 0:
88                ah = self._ah_list.pop()
89                ah.status = coopr.opt.parallel.manager.ActionStatus.done
90                self.results[ah.id] = self._my_results[ah.id]
91                return ah
92            return coopr.opt.parallel.manager.ActionHandle(error=True, explanation="No queued evaluations available in the 'local' solver manager, which only executes solvers synchronously") 
93        elif self._force_error == 1:
94            #
95            # Wait Any returns an ActionHandle that indicates an error
96            #
97            return coopr.opt.parallel.manager.ActionHandle(error=True, explanation="Forced failure")
98        elif self._force_error == 2:
99            #
100            # Wait Any returns the correct ActionHandle, but no results are
101            # available.
102            #
103            return self._ah_list.pop()
104
105
106class OptPatternSearchDebug(pyutilib_th.TestCase):
107
108
109    def run(self, result=None):
110        self.smtest_plugin = coopr.opt.SolverManagerRegistration("smtest", TestSolverManager)
111        unittest.TestCase.run(self,result)
112        self.smtest_plugin.deactivate()
113
114    def setUp(self):
115        self.do_setup(False)
116        pyutilib.TempfileManager.tempdir = currdir
117
118    def do_setup(self,flag):
119        pyutilib.TempfileManager.tempdir = currdir
120        self.ps = coopr.opt.colin.PatternSearch()
121
122    def tearDown(self):
123        pyutilib.TempfileManager.clear_tempfiles()
124
125    def test_solve1(self):
126        """ Test PatternSearch - TestProblem1 """
127        problem=TestProblem1()
128        self.ps.problem=problem
129        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
130        self.ps.reset()
131        results = self.ps.solve(logfile=currdir+"test_solve1.log")
132        results.write(currdir+"test_solve1.txt",times=False)
133        self.failUnlessFileEqualsBaseline(currdir+"test_solve1.txt", currdir+"test1_ps.txt")
134        if os.path.exists(currdir+"test_solve1.log"):
135            os.remove(currdir+"test_solve1.log")
136
137    def test_serial1(self):
138        """ Test Serial EvalManager - TestProblem1 """
139        problem=TestProblem1()
140        self.ps.problem=problem
141        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
142        self.ps.reset()
143        mngr = coopr.opt.parallel.SolverManagerFactory("serial")
144        results = mngr.solve(opt=self.ps, logfile=currdir+"test_solve2.log")
145        results.write(currdir+"test_solve2.txt",times=False)
146        self.failUnlessFileEqualsBaseline(currdir+"test_solve2.txt", currdir+"test1_ps.txt")
147        if os.path.exists(currdir+"test_solve2.log"):
148            os.remove(currdir+"test_solve2.log")
149
150    def test_serial_error1(self):
151        """ Test Serial SolverManager - Error with no optimizer"""
152        problem=TestProblem1()
153        self.ps.problem=problem
154        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
155        self.ps.reset()
156        mngr = coopr.opt.parallel.SolverManagerFactory("serial")
157        try:
158            results = mngr.solve(logfile=currdir+"test_solve3.log")
159            self.fail("Expected error")
160        except coopr.opt.parallel.manager.ActionManagerError:
161            pass
162
163    def test_serial_error2(self):
164        """ Test Serial SolverManager - Error with no queue solves"""
165        problem=TestProblem1()
166        self.ps.problem=problem
167        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
168        self.ps.reset()
169        mngr = coopr.opt.parallel.SolverManagerFactory("serial")
170        results = mngr.solve(opt=self.ps, logfile=currdir+"test_solve3.log")
171        if mngr.wait_any() != coopr.opt.parallel.manager.FailedActionHandle:
172            self.fail("Expected a failed action")
173        if os.path.exists(currdir+"test_solve2.log"):
174            os.remove(currdir+"test_solve2.log")
175
176    def test_solver_manager_factory(self):
177        """
178        Testing the coopr.opt solver factory
179        """
180        ans = coopr.opt.SolverManagerFactory()
181        ans.sort()
182        tmp = ["smtest"]
183        tmp.sort()
184        self.failUnless(set(tmp) <= set(ans))
185
186    def test_solver_manager_instance(self):
187        """
188        Testing that we get a specific solver instance
189        """
190        ans = coopr.opt.SolverManagerFactory("none")
191        self.failUnlessEqual(ans, None)
192        ans = coopr.opt.SolverManagerFactory("smtest")
193        self.failUnlessEqual(type(ans), TestSolverManager)
194        ans = coopr.opt.SolverManagerFactory("smtest", "mymock")
195        self.failUnlessEqual(type(ans), TestSolverManager)
196        self.failUnlessEqual(ans.name,  "mymock")
197
198    def test_solver_manager_registration(self):
199        """
200        Testing methods in the solverwriter factory registration process
201        """
202        ep = pyutilib.plugin.ExtensionPoint(coopr.opt.parallel.solver.ISolverManagerRegistration)
203        service = ep.service("smtest")
204        self.failUnlessEqual(service.type(), "smtest")
205
206    def test_delayed_serial1(self):
207        """
208        Use a solver manager that delays the evaluation of responses,
209        and thus allows a mock testing of the wait*() methods.
210        """
211        problem=TestProblem1()
212        self.ps.problem=problem
213        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
214        self.ps.reset()
215        mngr = SolverManager_DelayedSerial()
216        results = mngr.solve(opt=self.ps, logfile=currdir+"test_solve4.log")
217        results.write(currdir+"test_solve4.txt",times=False)
218        self.failUnlessFileEqualsBaseline(currdir+"test_solve4.txt", currdir+
219"test1_ps.txt")
220        if os.path.exists(currdir+"test_solve4.log"):
221            os.remove(currdir+"test_solve4.log")
222
223    def test_delayed_serial2(self):
224        """
225        Use a solver manager that delays the evaluation of responses,
226        and _perform_wait_any() returns a failed action handle.
227        """
228        problem=TestProblem1()
229        self.ps.problem=problem
230        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
231        self.ps.reset()
232        mngr = SolverManager_DelayedSerial()
233        mngr._force_error = 1
234        try:
235            results = mngr.solve(opt=self.ps, logfile=currdir+"test_solve5.log")
236            self.fail("Expected error")
237        except coopr.opt.parallel.manager.ActionManagerError:
238            pass
239        if os.path.exists(currdir+"test_solve5.log"):
240            os.remove(currdir+"test_solve5.log")
241
242    def test_delayed_serial3(self):
243        """
244        Use a solver manager that delays the evaluation of responses,
245        and _perform_wait_any() returns a action handle, but no results are available.
246        """
247        problem=TestProblem1()
248        self.ps.problem=problem
249        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
250        self.ps.reset()
251        mngr = SolverManager_DelayedSerial()
252        mngr._force_error = 2
253        try:
254            results = mngr.solve(opt=self.ps, logfile=currdir+"test_solve6.log")
255            self.fail("Expected error")
256        except coopr.opt.parallel.manager.ActionManagerError:
257            pass
258        if os.path.exists(currdir+"test_solve6.log"):
259            os.remove(currdir+"test_solve6.log")
260
261    def test_delayed_serial4(self):
262        """
263        Use a solver manager that delays the evaluation of responses,
264        and verify that queue-ing multiple solves works.
265        """
266        problem=TestProblem1()
267        self.ps.problem=problem
268        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
269        self.ps.reset()
270        mngr = SolverManager_DelayedSerial()
271        ah_a = mngr.queue(opt=self.ps, logfile=currdir+"test_solve7a.log")
272        ah_b = mngr.queue(opt=self.ps, logfile=currdir+"test_solve7b.log")
273        ah_c = mngr.queue(opt=self.ps, logfile=currdir+"test_solve7c.log")
274
275        mngr.wait_all()
276
277        self.failUnlessEqual(ah_c.status, coopr.opt.parallel.manager.ActionStatus.done)
278        if os.path.exists(currdir+"test_solve7a.log"):
279            os.remove(currdir+"test_solve7a.log")
280        if os.path.exists(currdir+"test_solve7b.log"):
281            os.remove(currdir+"test_solve7b.log")
282        if os.path.exists(currdir+"test_solve7c.log"):
283            os.remove(currdir+"test_solve7c.log")
284
285    def test_delayed_serial5(self):
286        """
287        Use a solver manager that delays the evaluation of responses,
288        and verify that queue-ing multiple solves works.
289        """
290        problem=TestProblem1()
291        self.ps.problem=problem
292        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
293        self.ps.reset()
294        mngr = SolverManager_DelayedSerial()
295        ah_a = mngr.queue(opt=self.ps, logfile=currdir+"test_solve8a.log")
296        ah_b = mngr.queue(opt=self.ps, logfile=currdir+"test_solve8b.log")
297        ah_c = mngr.queue(opt=self.ps, logfile=currdir+"test_solve8c.log")
298
299        mngr.wait_all(ah_b)
300
301        self.failUnlessEqual(ah_b.status, coopr.opt.parallel.manager.ActionStatus.done)
302        self.failUnlessEqual(ah_a.status, coopr.opt.parallel.manager.ActionStatus.queued)
303        if os.path.exists(currdir+"test_solve8a.log"):
304            os.remove(currdir+"test_solve8a.log")
305        if os.path.exists(currdir+"test_solve8b.log"):
306            os.remove(currdir+"test_solve8b.log")
307        if os.path.exists(currdir+"test_solve8c.log"):
308            os.remove(currdir+"test_solve8c.log")
309
310    def test_delayed_serial6(self):
311        """
312        Use a solver manager that delays the evaluation of responses,
313        and verify that queue-ing multiple solves works.
314        """
315        problem=TestProblem1()
316        self.ps.problem=problem
317        self.ps.initial_point = [1.0, -0.5, 2.0, -1.0]
318        self.ps.reset()
319        mngr = SolverManager_DelayedSerial()
320        ah_a = mngr.queue(opt=self.ps, logfile=currdir+"test_solve8a.log")
321        ah_b = mngr.queue(opt=self.ps, logfile=currdir+"test_solve8b.log")
322        ah_c = mngr.queue(opt=self.ps, logfile=currdir+"test_solve8c.log")
323
324        self.failUnlessEqual( mngr.num_queued(), 3)
325        mngr.wait_all( [ah_b] )
326
327        self.failUnlessEqual(mngr.get_status(ah_b), coopr.opt.parallel.manager.ActionStatus.done)
328        self.failUnlessEqual(mngr.get_status(ah_a), coopr.opt.parallel.manager.ActionStatus.queued)
329
330        if os.path.exists(currdir+"test_solve8a.log"):
331            os.remove(currdir+"test_solve8a.log")
332        if os.path.exists(currdir+"test_solve8b.log"):
333            os.remove(currdir+"test_solve8b.log")
334        if os.path.exists(currdir+"test_solve8c.log"):
335            os.remove(currdir+"test_solve8c.log")
336
337if __name__ == "__main__":
338    unittest.main()
339
Note: See TracBrowser for help on using the repository browser.