source: coopr.opt/stable/2.1/coopr/opt/parallel/manager.py @ 1973

Last change on this file since 1973 was 1973, checked in by wehart, 11 years ago

Merged revisions 1949-1972 via svnmerge from
https://software.sandia.gov/svn/public/coopr/coopr.opt/trunk

........

r1955 | jwatson | 2009-12-02 17:55:38 -0700 (Wed, 02 Dec 2009) | 3 lines


Fixed missing StringIO module import.

........

r1960 | jwatson | 2009-12-04 20:43:24 -0700 (Fri, 04 Dec 2009) | 5 lines


Fixed a major memory leak in the solver manager. When get_results() was previously invoked, the action handle was referenced as a key in the map, and the corresponding results object was deleted. However, it wasn't cleaned up in the solver manager results map. Consequently, the solver manager kept a copy of every result object ever created, which reaches GB size for long PH runs.


This *might* result in a slight change in semantics, but I don't think so: once a solver result is returned from the manager, whoever got it "owns it". In other words, you can't grab the result multiple times.

........

File size: 6.3 KB
Line 
1#  _________________________________________________________________________
2#
3#  Coopr: A COmmon Optimization Python Repository
4#  Copyright (c) 2008 Sandia Corporation.
5#  This software is distributed under the BSD License.
6#  Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
7#  the U.S. Government retains certain rights in this software.
8#  For more information, see the Coopr README.txt file.
9#  _________________________________________________________________________
10
11
12__all__ = ['ActionManagerError', 'ActionHandle', 'AsynchronousActionManager', 'ActionStatus', 'FailedActionHandle']
13
14from pyutilib.enum import Enum
15
16
17ActionStatus = Enum('done', 'error', 'queued', 'executing', 'unknown')
18
19
20class ActionManagerError(Exception):
21    """
22    An exception used when an error occurs within an ActionManager.
23    """
24
25    def __init__(self,*args,**kargs):
26        Exception.__init__(self,*args,**kargs)      #pragma:nocover
27
28
29class ActionHandle(object):
30
31    id_counter = 0
32
33    def __init__(self, error=False, explanation=""):
34        """Constructor"""
35        if error:
36            self.id = -1
37        else:
38            self.id = ActionHandle.id_counter
39            ActionHandle.id_counter += 1
40            self.status = ActionStatus.error
41        self.explanation = explanation
42
43    def update(self, ah):
44        """ Update the contents of the provided ActionHandle """
45        self.id = ah.id
46        self.status = ah.status
47
48    def __lt__(self, other):
49        return self.id < other.id
50
51    def __hash__(self):
52        return self.id.__hash__()
53
54    def __eq__(self, other):
55        return (self.id.__hash__() == other.__hash__()) and (self.id == other.id)
56
57    def __ne__(self, other):
58        return not self.__eq__(other)
59
60    def __str__(self):
61        return str(self.id)
62
63
64FailedActionHandle = ActionHandle(error=True)
65
66
67class AsynchronousActionManager(object):
68
69    def __init__(self):
70        """Constructor"""
71        self.clear()
72
73    def clear(self):
74        """
75        Clear manager state
76        """
77        self.event_handle = {}
78        self.results = {}
79        self.queued_action_counter = 0
80
81    def execute(self, *args, **kwds):
82        """
83        Synchronously execute an action.
84        """
85        ah = self.queue(*args, **kwds)
86        results = self.wait_for(ah)
87        if results is None:
88            raise ActionManagerError, "Problem executing an event.  No results are available."
89        return results
90
91    def queue(self, *args, **kwds):
92        """
93        Queue an action, returning an ActionHandle object.
94        """
95        ah = ActionHandle()
96        self.event_handle[ah.id] = ah
97        ah.status = ActionStatus.queued
98        self.queued_action_counter += 1
99        return self._perform_queue(ah, *args, **kwds)
100
101    def wait_all(self, *args):
102        """
103        Wait for all actions to complete.  The arguments to this method
104        are expected to be ActionHandle objects or iterators that return
105        ActionHandle objects.  If no arguments are provided, then this
106        method will terminate after all queued actions are
107        """
108        #
109        # Collect event handlers from the arguments
110        #
111        ahs = set()
112        if len(args) == 0:
113            for key in self.event_handle:
114                ahs.add( self.event_handle[key] )
115        else:
116            for item in args:
117                if type(item) is ActionHandle:
118                    ahs.add(item)
119                elif type(item) in [list, tuple]:
120                    for ah in item:
121                        if type(ah) is not ActionHandle:     #pragma:nocover
122                            raise ActionManagerError, "Bad argument type %s" % str(ah)
123                        ahs.add(ah)
124                else:                       #pragma:nocover
125                    raise ActionManagerError, "Bad argument type %s" % str(ah)
126        #
127        # Iterate until all ah's have completed
128        #
129        while len(ahs) > 0:
130            ah = self.wait_any()
131            ahs.discard(ah)
132       
133    def wait_any(self):
134        """
135        Wait for any action to complete, and return the
136        corresponding ActionHandle.
137        """
138        ah = None
139        while ah is None:
140            ah = self._perform_wait_any()
141        if ah == FailedActionHandle:
142            return ah
143        self.queued_action_counter -= 1
144        self.event_handle[ah.id].update(ah)
145        return self.event_handle[ah.id]
146
147    def wait_for(self, ah):
148        """
149        Wait for the specified action to complete.
150        """
151        tmp = self.wait_any()
152        while tmp != ah:
153            tmp = self.wait_any()
154            if tmp == FailedActionHandle:
155                raise ActionManagerError, "Action %s failed: %s" % (ah, tmp.explanation)
156        return self.get_results(ah)
157       
158    def num_queued(self):
159        """
160        Return the number of queued actions
161        """
162        return self.queued_action_counter
163
164    def get_status(self, ah):
165        """
166        Return the status of the ActionHandle.
167        """
168        return ah.status
169
170    def get_results(self, ah):
171        """
172        Return solver results.  If solver results are not available,
173        return None.
174        """
175        if ah.id in self.results:
176            # extremely important - clean up the map before returning the result.
177            # otherwise, it will be around forever, acting as a memory leak (because
178            # the solver manager will always retain a reference).
179            result = self.results[ah.id]
180            del self.results[ah.id]
181            return result
182        return None
183
184    def _perform_queue(self, ah, *args, **kwds):
185        """
186        Perform the queue operation.  This method returns the ActionHandle,
187        and the ActionHandle status indicates whether the queue was successful.
188        """
189        raise ActionManagerError, "The _perform_queue method is not defined"     #pragma:nocover
190       
191    def _perform_wait_any(self):
192        """
193        Perform the wait_any operation.  This method returns an
194        ActionHandle with the results of waiting.  If None is returned
195        then the ActionManager assumes that it can call this method again.
196        Note that an ActionHandle can be returned with a dummy value,
197        to indicate an error.
198        """
199        raise ActionManagerError, "The _perform_wait_any method is not defined"      #pragma:nocover
200
201
Note: See TracBrowser for help on using the repository browser.