source: trunk/coopr/opt/parallel/manager.py @ 1768

Last change on this file since 1768 was 1768, checked in by wehart, 11 years ago

Rework of Coopr to use the new PyUtilib? package decomposition.

NOTE: to use Coopr with this update, we need to work with a new version of coopr_install.

File size: 6.0 KB
Line 
1#  _________________________________________________________________________
2#
3#  Coopr: A COmmon Optimization Python Repository
4#  Copyright (c) 2008 Sandia Corporation.
5#  This software is distributed under the BSD License.
6#  Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
7#  the U.S. Government retains certain rights in this software.
8#  For more information, see the Coopr README.txt file.
9#  _________________________________________________________________________
10
11
12__all__ = ['ActionManagerError', 'ActionHandle', 'AsynchronousActionManager', 'ActionStatus', 'FailedActionHandle']
13
14from pyutilib.enum import Enum
15
16
17ActionStatus = Enum('done', 'error', 'queued', 'executing', 'unknown')
18
19
20class ActionManagerError(Exception):
21    """
22    An exception used when an error occurs within an ActionManager.
23    """
24
25    def __init__(self,*args,**kargs):
26        Exception.__init__(self,*args,**kargs)      #pragma:nocover
27
28
29class ActionHandle(object):
30
31    id_counter = 0
32
33    def __init__(self, error=False, explanation=""):
34        """Constructor"""
35        if error:
36            self.id = -1
37        else:
38            self.id = ActionHandle.id_counter
39            ActionHandle.id_counter += 1
40            self.status = ActionStatus.error
41        self.explanation = explanation
42
43    def update(self, ah):
44        """ Update the contents of the provided ActionHandle """
45        self.id = ah.id
46        self.status = ah.status
47
48    def __lt__(self, other):
49        return self.id < other.id
50
51    def __hash__(self):
52        return self.id.__hash__()
53
54    def __eq__(self, other):
55        return (self.id.__hash__() == other.__hash__()) and (self.id == other.id)
56
57    def __ne__(self, other):
58        return not self.__eq__(other)
59
60    def __str__(self):
61        return str(self.id)
62
63
64FailedActionHandle = ActionHandle(error=True)
65
66
67class AsynchronousActionManager(object):
68
69    def __init__(self):
70        """Constructor"""
71        self.clear()
72
73    def clear(self):
74        """
75        Clear manager state
76        """
77        self.event_handle = {}
78        self.results = {}
79        self.queued_action_counter = 0
80
81    def execute(self, *args, **kwds):
82        """
83        Synchronously execute an action.
84        """
85        ah = self.queue(*args, **kwds)
86        results = self.wait_for(ah)
87        if results is None:
88            raise ActionManagerError, "Problem executing an event.  No results are available."
89        return results
90
91    def queue(self, *args, **kwds):
92        """
93        Queue an action, returning an ActionHandle object.
94        """
95        ah = ActionHandle()
96        self.event_handle[ah.id] = ah
97        ah.status = ActionStatus.queued
98        self.queued_action_counter += 1
99        return self._perform_queue(ah, *args, **kwds)
100
101    def wait_all(self, *args):
102        """
103        Wait for all actions to complete.  The arguments to this method
104        are expected to be ActionHandle objects or iterators that return
105        ActionHandle objects.  If no arguments are provided, then this
106        method will terminate after all queued actions are
107        """
108        #
109        # Collect event handlers from the arguments
110        #
111        ahs = set()
112        if len(args) == 0:
113            for key in self.event_handle:
114                ahs.add( self.event_handle[key] )
115        else:
116            for item in args:
117                if type(item) is ActionHandle:
118                    ahs.add(item)
119                elif type(item) in [list, tuple]:
120                    for ah in item:
121                        if type(ah) is not ActionHandle:     #pragma:nocover
122                            raise ActionManagerError, "Bad argument type %s" % str(ah)
123                        ahs.add(ah)
124                else:                       #pragma:nocover
125                    raise ActionManagerError, "Bad argument type %s" % str(ah)
126        #
127        # Iterate until all ah's have completed
128        #
129        while len(ahs) > 0:
130            ah = self.wait_any()
131            ahs.discard(ah)
132       
133    def wait_any(self):
134        """
135        Wait for any action to complete, and return the
136        corresponding ActionHandle.
137        """
138        ah = None
139        while ah is None:
140            ah = self._perform_wait_any()
141        if ah == FailedActionHandle:
142            return ah
143        self.queued_action_counter -= 1
144        self.event_handle[ah.id].update(ah)
145        return self.event_handle[ah.id]
146
147    def wait_for(self, ah):
148        """
149        Wait for the specified action to complete.
150        """
151        tmp = self.wait_any()
152        while tmp != ah:
153            tmp = self.wait_any()
154            if tmp == FailedActionHandle:
155                raise ActionManagerError, "Action %s failed: %s" % (ah, tmp.explanation)
156        return self.get_results(ah)
157       
158    def num_queued(self):
159        """
160        Return the number of queued actions
161        """
162        return self.queued_action_counter
163
164    def get_status(self, ah):
165        """
166        Return the status of the ActionHandle.
167        """
168        return ah.status
169
170    def get_results(self, ah):
171        """
172        Return solver results.  If solver results are not available,
173        return None.
174        """
175        if ah.id in self.results:
176            return self.results[ah.id]
177        return None
178
179    def _perform_queue(self, ah, *args, **kwds):
180        """
181        Perform the queue operation.  This method returns the ActionHandle,
182        and the ActionHandle status indicates whether the queue was successful.
183        """
184        raise ActionManagerError, "The _perform_queue method is not defined"     #pragma:nocover
185       
186    def _perform_wait_any(self):
187        """
188        Perform the wait_any operation.  This method returns an
189        ActionHandle with the results of waiting.  If None is returned
190        then the ActionManager assumes that it can call this method again.
191        Note that an ActionHandle can be returned with a dummy value,
192        to indicate an error.
193        """
194        raise ActionManagerError, "The _perform_wait_any method is not defined"      #pragma:nocover
195
196
Note: See TracBrowser for help on using the repository browser.