move example to different port
[multitaskhttpd.git] / multitask.py
1 ################################################################################
2 #
3 # Copyright (c) 2007 Christopher J. Stawarz
4 #
5 # Permission is hereby granted, free of charge, to any person
6 # obtaining a copy of this software and associated documentation files
7 # (the "Software"), to deal in the Software without restriction,
8 # including without limitation the rights to use, copy, modify, merge,
9 # publish, distribute, sublicense, and/or sell copies of the Software,
10 # and to permit persons to whom the Software is furnished to do so,
11 # subject to the following conditions:
12 #
13 # The above copyright notice and this permission notice shall be
14 # included in all copies or substantial portions of the Software.
15 #
16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
20 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
21 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
22 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 # SOFTWARE.
24 #
25 ################################################################################
26
27
28
29 """
30
31 Cooperative multitasking and asynchronous I/O using generators
32
33 multitask allows Python programs to use generators (a.k.a. coroutines)
34 to perform cooperative multitasking and asynchronous I/O.
35 Applications written using multitask consist of a set of cooperating
36 tasks that yield to a shared task manager whenever they perform a
37 (potentially) blocking operation, such as I/O on a socket or getting
38 data from a queue. The task manager temporarily suspends the task
39 (allowing other tasks to run in the meantime) and then restarts it
40 when the blocking operation is complete. Such an approach is suitable
41 for applications that would otherwise have to use select() and/or
42 multiple threads to achieve concurrency.
43
44 The functions and classes in the multitask module allow tasks to yield
45 for I/O operations on sockets and file descriptors, adding/removing
46 data to/from queues, or sleeping for a specified interval. When
47 yielding, a task can also specify a timeout. If the operation for
48 which the task yielded has not completed after the given number of
49 seconds, the task is restarted, and a Timeout exception is raised at
50 the point of yielding.
51
52 As a very simple example, here's how one could use multitask to allow
53 two unrelated tasks to run concurrently:
54
55 >>> def printer(message):
56 ... while True:
57 ... print message
58 ... yield
59 ...
60 >>> multitask.add(printer('hello'))
61 >>> multitask.add(printer('goodbye'))
62 >>> multitask.run()
63 hello
64 goodbye
65 hello
66 goodbye
67 hello
68 goodbye
69 [and so on ...]
70
71 For a more useful example, here's how one could implement a
72 multitasking server that can handle multiple concurrent client
73 connections:
74
75 def listener(sock):
76 while True:
77 conn, address = (yield multitask.accept(sock))
78 multitask.add(client_handler(conn))
79
80 def client_handler(sock):
81 while True:
82 request = (yield multitask.recv(sock, 1024))
83 if not request:
84 break
85 response = handle_request(request)
86 yield multitask.send(sock, response)
87
88 multitask.add(listener(sock))
89 multitask.run()
90
91 Tasks can also yield other tasks, which allows for composition of
92 tasks and reuse of existing multitasking code. A child task runs
93 until it either completes or raises an exception. To return output to
94 its parent, a child task raises StopIteration, passing the output
95 value(s) to the StopIteration constructor. An unhandled exception
96 raised within a child task is propagated to its parent. For example:
97
98 >>> def parent():
99 ... print (yield return_none())
100 ... print (yield return_one())
101 ... print (yield return_many())
102 ... try:
103 ... yield raise_exception()
104 ... except Exception, e:
105 ... print 'caught exception: %s' % e
106 ...
107 >>> def return_none():
108 ... yield
109 ... # do nothing
110 ... # or return
111 ... # or raise StopIteration
112 ... # or raise StopIteration(None)
113 ...
114 >>> def return_one():
115 ... yield
116 ... raise StopIteration(1)
117 ...
118 >>> def return_many():
119 ... yield
120 ... raise StopIteration(2, 3) # or raise StopIteration((2, 3))
121 ...
122 >>> def raise_exception():
123 ... yield
124 ... raise RuntimeError('foo')
125 ...
126 >>> multitask.add(parent())
127 >>> multitask.run()
128 None
129 1
130 (2, 3)
131 caught exception: foo
132
133 """
134
135
136 import collections
137 import errno
138 from functools import partial
139 import heapq
140 import os
141 import select
142 import sys
143 import time
144 import types
145
146
147 __author__ = 'Christopher Stawarz <cstawarz@csail.mit.edu>'
148 __version__ = '0.2.0'
149 __revision__ = int('$Revision: 5025 $'.split()[1])
150
151
152
153 ################################################################################
154 #
155 # Timeout exception type
156 #
157 ################################################################################
158
159
160
161 class Timeout(Exception):
162 'Raised in a yielding task when an operation times out'
163 pass
164
165
166
167 ################################################################################
168 #
169 # _ChildTask class
170 #
171 ################################################################################
172
173
174
175 class _ChildTask(object):
176
177 def __init__(self, parent, task):
178 self.parent = parent
179 self.task = task
180
181 def send(self, value):
182 return self.task.send(value)
183
184 def throw(self, type, value=None, traceback=None):
185 return self.task.throw(type, value, traceback)
186
187
188
189 ################################################################################
190 #
191 # YieldCondition class
192 #
193 ################################################################################
194
195
196
197 class YieldCondition(object):
198
199 """
200
201 Base class for objects that are yielded by a task to the task
202 manager and specify the condition(s) under which the task should
203 be restarted. Only subclasses of this class are useful to
204 application code.
205
206 """
207
208 def __init__(self, timeout=None):
209 """
210
211 If timeout is None, the task will be suspended indefinitely
212 until the condition is met. Otherwise, if the condition is
213 not met within timeout seconds, a Timeout exception will be
214 raised in the yielding task.
215
216 """
217
218 self.task = None
219 self.handle_expiration = None
220
221 if timeout is None:
222 self.expiration = None
223 else:
224 self.expiration = time.time() + float(timeout)
225
226 def _expires(self):
227 return (self.expiration is not None)
228
229
230
231 ################################################################################
232 #
233 # _SleepDelay class and related functions
234 #
235 ################################################################################
236
237
238
239 class _SleepDelay(YieldCondition):
240
241 def __init__(self, seconds):
242 seconds = float(seconds)
243 if seconds <= 0.0:
244 raise ValueError("'seconds' must be greater than 0")
245 super(_SleepDelay, self).__init__(seconds)
246
247
248 def sleep(seconds):
249 """
250
251 A task that yields the result of this function will be resumed
252 after the specified number of seconds have elapsed. For example:
253
254 while too_early():
255 yield sleep(5) # Sleep for five seconds
256 do_something() # Done sleeping; get back to work
257
258 """
259
260 return _SleepDelay(seconds)
261
262
263
264 ################################################################################
265 #
266 # FDReady class and related functions
267 #
268 ################################################################################
269
270
271
272 class FDReady(YieldCondition):
273
274 """
275
276 A task that yields an instance of this class will be suspended
277 until a specified file descriptor is ready for I/O.
278
279 """
280
281 def __init__(self, fd, read=False, write=False, exc=False, timeout=None):
282 """
283
284 Resume the yielding task when fd is ready for reading,
285 writing, and/or "exceptional" condition handling. fd can be
286 any object accepted by select.select() (meaning an integer or
287 an object with a fileno() method that returns an integer).
288 Any exception raised by select() due to fd will be re-raised
289 in the yielding task.
290
291 If timeout is not None, a Timeout exception will be raised in
292 the yielding task if fd is not ready after timeout seconds
293 have elapsed.
294
295 """
296
297 super(FDReady, self).__init__(timeout)
298
299 self.fd = (fd if _is_file_descriptor(fd) else fd.fileno())
300
301 if not (read or write or exc):
302 raise ValueError("'read', 'write', and 'exc' cannot all be false")
303 self.read = read
304 self.write = write
305 self.exc = exc
306
307 def fileno(self):
308 'Return the file descriptor on which the yielding task is waiting'
309 return self.fd
310
311 def _add_to_fdsets(self, read_fds, write_fds, exc_fds):
312 for add, fdset in ((self.read, read_fds),
313 (self.write, write_fds),
314 (self.exc, exc_fds)):
315 if add:
316 fdset.add(self)
317
318 def _remove_from_fdsets(self, read_fds, write_fds, exc_fds):
319 for fdset in (read_fds, write_fds, exc_fds):
320 fdset.discard(self)
321
322
323 def _is_file_descriptor(fd):
324 return isinstance(fd, (int, long))
325
326
327 def readable(fd, timeout=None):
328 """
329
330 A task that yields the result of this function will be resumed
331 when fd is readable. If timeout is not None, a Timeout exception
332 will be raised in the yielding task if fd is not readable after
333 timeout seconds have elapsed. For example:
334
335 try:
336 yield readable(sock, timeout=5)
337 data = sock.recv(1024)
338 except Timeout:
339 # No data after 5 seconds
340
341 """
342
343 return FDReady(fd, read=True, timeout=timeout)
344
345
346 def writable(fd, timeout=None):
347 """
348
349 A task that yields the result of this function will be resumed
350 when fd is writable. If timeout is not None, a Timeout exception
351 will be raised in the yielding task if fd is not writable after
352 timeout seconds have elapsed. For example:
353
354 try:
355 yield writable(sock, timeout=5)
356 nsent = sock.send(data)
357 except Timeout:
358 # Can't send after 5 seconds
359
360 """
361
362 return FDReady(fd, write=True, timeout=timeout)
363
364
365
366 ################################################################################
367 #
368 # FDAction class and related functions
369 #
370 ################################################################################
371
372
373
374 class FDAction(FDReady):
375
376 """
377
378 A task that yields an instance of this class will be suspended
379 until an I/O operation on a specified file descriptor is complete.
380
381 """
382
383 def __init__(self, fd, func, args=(), kwargs={}, read=False, write=False,
384 exc=False):
385 """
386
387 Resume the yielding task when fd is ready for reading,
388 writing, and/or "exceptional" condition handling. fd can be
389 any object accepted by select.select() (meaning an integer or
390 an object with a fileno() method that returns an integer).
391 Any exception raised by select() due to fd will be re-raised
392 in the yielding task.
393
394 The value of the yield expression will be the result of
395 calling func with the specified args and kwargs (which
396 presumably performs a read, write, or other I/O operation on
397 fd). If func raises an exception, it will be re-raised in the
398 yielding task. Thus, FDAction is really just a convenient
399 subclass of FDReady that requests that the task manager
400 perform an I/O operation on the calling task's behalf.
401
402 If kwargs contains a timeout argument that is not None, a
403 Timeout exception will be raised in the yielding task if fd is
404 not ready after timeout seconds have elapsed.
405
406 """
407
408 timeout = kwargs.pop('timeout', None)
409 super(FDAction, self).__init__(fd, read, write, exc, timeout)
410
411 self.func = func
412 self.args = args
413 self.kwargs = kwargs
414
415 def _eval(self):
416 return self.func(*(self.args), **(self.kwargs))
417
418
419 def read(fd, *args, **kwargs):
420 """
421
422 A task that yields the result of this function will be resumed
423 when fd is readable, and the value of the yield expression will be
424 the result of reading from fd. If a timeout keyword is given and
425 is not None, a Timeout exception will be raised in the yielding
426 task if fd is not readable after timeout seconds have elapsed.
427 Other arguments will be passed to the read function (os.read() if
428 fd is an integer, fd.read() otherwise). For example:
429
430 try:
431 data = (yield read(fd, 1024, timeout=5))
432 except Timeout:
433 # No data after 5 seconds
434
435 """
436
437 func = (partial(os.read, fd) if _is_file_descriptor(fd) else fd.read)
438 return FDAction(fd, func, args, kwargs, read=True)
439
440
441 def readline(fd, *args, **kwargs):
442 """
443
444 A task that yields the result of this function will be resumed
445 when fd is readable, and the value of the yield expression will be
446 the result of reading a line from fd. If a timeout keyword is
447 given and is not None, a Timeout exception will be raised in the
448 yielding task if fd is not readable after timeout seconds have
449 elapsed. Other arguments will be passed to fd.readline(). For
450 example:
451
452 try:
453 data = (yield readline(fd, timeout=5))
454 except Timeout:
455 # No data after 5 seconds
456
457 """
458
459 return FDAction(fd, fd.readline, args, kwargs, read=True)
460
461
462 def write(fd, *args, **kwargs):
463 """
464
465 A task that yields the result of this function will be resumed
466 when fd is writable, and the value of the yield expression will be
467 the result of writing to fd. If a timeout keyword is given and is
468 not None, a Timeout exception will be raised in the yielding task
469 if fd is not writable after timeout seconds have elapsed. Other
470 arguments will be passed to the write function (os.write() if fd
471 is an integer, fd.write() otherwise). For example:
472
473 try:
474 nbytes = (yield write(fd, data, timeout=5))
475 except Timeout:
476 # Can't write after 5 seconds
477
478 """
479
480 func = (partial(os.write, fd) if _is_file_descriptor(fd) else fd.write)
481 return FDAction(fd, func, args, kwargs, write=True)
482
483
484 def accept(sock, *args, **kwargs):
485 """
486
487 A task that yields the result of this function will be resumed
488 when sock is readable, and the value of the yield expression will
489 be the result of accepting a new connection on sock. If a timeout
490 keyword is given and is not None, a Timeout exception will be
491 raised in the yielding task if sock is not readable after timeout
492 seconds have elapsed. Other arguments will be passed to
493 sock.accept(). For example:
494
495 try:
496 conn, address = (yield accept(sock, timeout=5))
497 except Timeout:
498 # No connections after 5 seconds
499
500 """
501
502 return FDAction(sock, sock.accept, args, kwargs, read=True)
503
504
505 def recv(sock, *args, **kwargs):
506 """
507
508 A task that yields the result of this function will be resumed
509 when sock is readable, and the value of the yield expression will
510 be the result of receiving from sock. If a timeout keyword is
511 given and is not None, a Timeout exception will be raised in the
512 yielding task if sock is not readable after timeout seconds have
513 elapsed. Other arguments will be passed to sock.recv(). For
514 example:
515
516 try:
517 data = (yield recv(sock, 1024, timeout=5))
518 except Timeout:
519 # No data after 5 seconds
520
521 """
522
523 return FDAction(sock, sock.recv, args, kwargs, read=True)
524
525
526 def recvfrom(sock, *args, **kwargs):
527 """
528
529 A task that yields the result of this function will be resumed
530 when sock is readable, and the value of the yield expression will
531 be the result of receiving from sock. If a timeout keyword is
532 given and is not None, a Timeout exception will be raised in the
533 yielding task if sock is not readable after timeout seconds have
534 elapsed. Other arguments will be passed to sock.recvfrom(). For
535 example:
536
537 try:
538 data, address = (yield recvfrom(sock, 1024, timeout=5))
539 except Timeout:
540 # No data after 5 seconds
541
542 """
543
544 return FDAction(sock, sock.recvfrom, args, kwargs, read=True)
545
546
547 def send(sock, *args, **kwargs):
548 """
549
550 A task that yields the result of this function will be resumed
551 when sock is writable, and the value of the yield expression will
552 be the result of sending to sock. If a timeout keyword is given
553 and is not None, a Timeout exception will be raised in the
554 yielding task if sock is not writable after timeout seconds have
555 elapsed. Other arguments will be passed to the sock.send(). For
556 example:
557
558 try:
559 nsent = (yield send(sock, data, timeout=5))
560 except Timeout:
561 # Can't send after 5 seconds
562
563 """
564
565 return FDAction(sock, sock.send, args, kwargs, write=True)
566
567
568 def sendto(sock, *args, **kwargs):
569 """
570
571 A task that yields the result of this function will be resumed
572 when sock is writable, and the value of the yield expression will
573 be the result of sending to sock. If a timeout keyword is given
574 and is not None, a Timeout exception will be raised in the
575 yielding task if sock is not writable after timeout seconds have
576 elapsed. Other arguments will be passed to the sock.sendto().
577 For example:
578
579 try:
580 nsent = (yield sendto(sock, data, address, timeout=5))
581 except Timeout:
582 # Can't send after 5 seconds
583
584 """
585
586 return FDAction(sock, sock.sendto, args, kwargs, write=True)
587
588
589
590 ################################################################################
591 #
592 # Queue and _QueueAction classes
593 #
594 ################################################################################
595
596
597
598 class Queue(object):
599
600 """
601
602 A multi-producer, multi-consumer FIFO queue (similar to
603 Queue.Queue) that can be used for exchanging data between tasks
604
605 """
606
607 def __init__(self, contents=(), maxsize=0):
608 """
609
610 Create a new Queue instance. contents is a sequence (empty by
611 default) containing the initial contents of the queue. If
612 maxsize is greater than 0, the queue will hold a maximum of
613 maxsize items, and put() will block until space is available
614 in the queue.
615
616 """
617
618 self.maxsize = int(maxsize)
619 self._queue = collections.deque(contents)
620
621 def __len__(self):
622 'Return the number of items in the queue'
623 return len(self._queue)
624
625 def _get(self):
626 return self._queue.popleft()
627
628 def _put(self, item):
629 self._queue.append(item)
630
631 def empty(self):
632 'Return True is the queue is empty, False otherwise'
633 return (len(self) == 0)
634
635 def full(self):
636 'Return True is the queue is full, False otherwise'
637 return ((len(self) >= self.maxsize) if (self.maxsize > 0) else False)
638
639 def get(self, timeout=None):
640 """
641
642 A task that yields the result of this method will be resumed
643 when an item is available in the queue, and the value of the
644 yield expression will be the item. If timeout is not None, a
645 Timeout exception will be raised in the yielding task if an
646 item is not available after timeout seconds have elapsed. For
647 example:
648
649 try:
650 item = (yield queue.get(timeout=5))
651 except Timeout:
652 # No item available after 5 seconds
653
654 """
655
656 return _QueueAction(self, timeout=timeout)
657
658 def put(self, item, timeout=None):
659 """
660
661 A task that yields the result of this method will be resumed
662 when item has been added to the queue. If timeout is not
663 None, a Timeout exception will be raised in the yielding task
664 if no space is available after timeout seconds have elapsed.
665 For example:
666
667 try:
668 yield queue.put(item, timeout=5)
669 except Timeout:
670 # No space available after 5 seconds
671
672 """
673
674 return _QueueAction(self, item, timeout=timeout)
675
676
677 class _QueueAction(YieldCondition):
678
679 NO_ITEM = object()
680
681 def __init__(self, queue, item=NO_ITEM, timeout=None):
682 super(_QueueAction, self).__init__(timeout)
683 if not isinstance(queue, Queue):
684 raise TypeError("'queue' must be a Queue instance")
685 self.queue = queue
686 self.item = item
687
688
689 ################################################################################
690 #
691 # SmartQueue and _SmartQueueAction classes
692 #
693 ################################################################################
694
695
696
697 class SmartQueue(object):
698
699 """
700
701 A multi-producer, multi-consumer FIFO queue (similar to
702 Queue.Queue) that can be used for exchanging data between tasks.
703 The difference with Queue is that this implements filtering criteria
704 on get and allows multiple get to be signalled for the same put.
705 On the downside, this uses list instead of deque and has lower
706 performance.
707
708 """
709
710 def __init__(self, contents=(), maxsize=0):
711 """
712
713 Create a new Queue instance. contents is a sequence (empty by
714 default) containing the initial contents of the queue. If
715 maxsize is greater than 0, the queue will hold a maximum of
716 maxsize items, and put() will block until space is available
717 in the queue.
718
719 """
720
721 self.maxsize = int(maxsize)
722 self._pending = list(contents)
723
724 def __len__(self):
725 'Return the number of items in the queue'
726 return len(self._pending)
727
728 def _get(self, criteria=None):
729 #self._pending = filter(lambda x: x[1]<=now, self._pending) # remove expired ones
730 if criteria:
731 found = filter(lambda x: criteria(x), self._pending) # check any matching criteria
732 if found:
733 self._pending.remove(found[0])
734 return found[0]
735 else:
736 return None
737 else:
738 return self._pending.pop(0) if self._pending else None
739
740 def _put(self, item):
741 self._pending.append(item)
742
743 def empty(self):
744 'Return True is the queue is empty, False otherwise'
745 return (len(self) == 0)
746
747 def full(self):
748 'Return True is the queue is full, False otherwise'
749 return ((len(self) >= self.maxsize) if (self.maxsize > 0) else False)
750
751 def get(self, timeout=None, criteria=None):
752 """
753
754 A task that yields the result of this method will be resumed
755 when an item is available in the queue and the item matches the
756 given criteria (a function, usually lambda), and the value of the
757 yield expression will be the item. If timeout is not None, a
758 Timeout exception will be raised in the yielding task if an
759 item is not available after timeout seconds have elapsed. For
760 example:
761
762 try:
763 item = (yield queue.get(timeout=5, criteria=lambda x: x.name='kundan'))
764 except Timeout:
765 # No item available after 5 seconds
766
767 """
768
769 return _SmartQueueAction(self, timeout=timeout, criteria=criteria)
770
771 def put(self, item, timeout=None):
772 """
773
774 A task that yields the result of this method will be resumed
775 when item has been added to the queue. If timeout is not
776 None, a Timeout exception will be raised in the yielding task
777 if no space is available after timeout seconds have elapsed.
778 TODO: Otherwise if space is available, the timeout specifies how
779 long to keep the item in the queue before discarding it if it
780 is not fetched in a get. In this case it doesnot throw exception.
781 For example:
782
783 try:
784 yield queue.put(item, timeout=5)
785 except Timeout:
786 # No space available after 5 seconds
787
788 """
789
790 return _SmartQueueAction(self, item, timeout=timeout)
791
792
793 class _SmartQueueAction(YieldCondition):
794
795 NO_ITEM = object()
796
797 def __init__(self, queue, item=NO_ITEM, timeout=None, criteria=None):
798 super(_SmartQueueAction, self).__init__(timeout)
799 if not isinstance(queue, SmartQueue):
800 raise TypeError("'queue' must be a SmartQueue instance")
801 self.queue = queue
802 self.item = item
803 self.criteria = criteria
804 self.expires = (timeout is not None) and (time.time() + timeout) or 0
805
806
807 ################################################################################
808 #
809 # TaskManager class
810 #
811 ################################################################################
812
813
814
815 class TaskManager(object):
816
817 """
818
819 Engine for running a set of cooperatively-multitasking tasks
820 within a single Python thread
821
822 """
823
824 def __init__(self):
825 """
826
827 Create a new TaskManager instance. Generally, there will only
828 be one of these per Python process. If you want to run two
829 existing instances simultaneously, merge them first, then run
830 one or the other.
831
832 """
833
834 self._queue = collections.deque()
835 self._read_waits = set()
836 self._write_waits = set()
837 self._exc_waits = set()
838 self._queue_waits = collections.defaultdict(self._double_deque)
839 self._timeouts = []
840
841 @staticmethod
842 def _double_deque():
843 return (collections.deque(), collections.deque())
844
845 def merge(self, other):
846 """
847
848 Merge this TaskManager with another. After the merge, the two
849 objects share the same (merged) internal data structures, so
850 either can be used to manage the combined task set.
851
852 """
853
854 if not isinstance(other, TaskManager):
855 raise TypeError("'other' must be a TaskManager instance")
856
857 # Merge the data structures
858 self._queue.extend(other._queue)
859 self._read_waits |= other._read_waits
860 self._write_waits |= other._write_waits
861 self._exc_waits |= other._exc_waits
862 self._queue_waits.update(other._queue_waits)
863 self._timeouts.extend(other._timeouts)
864 heapq.heapify(self._timeouts)
865
866 # Make other reference the merged data structures. This is
867 # necessary because other's tasks may reference and use other
868 # (e.g. to add a new task in response to an event).
869 other._queue = self._queue
870 other._read_waits = self._read_waits
871 other._write_waits = self._write_waits
872 other._exc_waits = self._exc_waits
873 other._queue_waits = self._queue_waits
874 other._timeouts = self._timeouts
875
876 def add(self, task):
877 'Add a new task (i.e. a generator instance) to the run queue'
878
879 if not isinstance(task, types.GeneratorType):
880 raise TypeError("'task' must be a generator")
881 self._enqueue(task)
882
883 def _enqueue(self, task, input=None, exc_info=()):
884 self._queue.append((task, input, exc_info))
885
886 def run(self):
887 """
888
889 Call run_next() repeatedly until there are no tasks that are
890 currently runnable, waiting for I/O, or waiting to time out.
891 Note that this method can block indefinitely (e.g. if there
892 are only I/O waits and no timeouts). If this is unacceptable,
893 use run_next() instead.
894
895 """
896 while self.has_runnable() or self.has_io_waits() or self.has_timeouts():
897 self.run_next()
898
899 def has_runnable(self):
900 """
901
902 Return True is there are runnable tasks in the queue, False
903 otherwise
904
905 """
906 return bool(self._queue)
907
908 def has_io_waits(self):
909 """
910
911 Return True is there are tasks waiting for I/O, False
912 otherwise
913
914 """
915 return bool(self._read_waits or self._write_waits or self._exc_waits)
916
917 def has_timeouts(self):
918 """
919
920 Return True is there are tasks with pending timeouts, False
921 otherwise
922
923 """
924 return bool(self._timeouts)
925
926 def run_next(self, timeout=None):
927 """
928
929 Perform one iteration of the run cycle: check whether any
930 pending I/O operations can be performed, check whether any
931 timeouts have expired, then run all currently runnable tasks.
932
933 The timeout argument specifies the maximum time to wait for
934 some task to become runnable. If timeout is None and there
935 are no currently runnable tasks, but there are tasks waiting
936 to perform I/O or time out, then this method will block until
937 at least one of the waiting tasks becomes runnable. To
938 prevent this method from blocking indefinitely, use timeout to
939 specify the maximum number of seconds to wait.
940
941 If there are runnable tasks in the queue when run_next() is
942 called, then it will check for I/O readiness using a
943 non-blocking call to select() (i.e. a poll), and only
944 already-expired timeouts will be handled. This ensures both
945 that the task manager is never idle when tasks can be run and
946 that tasks waiting for I/O never starve.
947
948 """
949
950 if self.has_io_waits():
951 self._handle_io_waits(self._fix_run_timeout(timeout))
952
953 if self.has_timeouts():
954 self._handle_timeouts(self._fix_run_timeout(timeout))
955
956 # Run all tasks currently in the queue
957 for dummy in xrange(len(self._queue)):
958 task, input, exc_info = self._queue.popleft()
959 try:
960 if exc_info:
961 output = task.throw(*exc_info)
962 else:
963 output = task.send(input)
964 except StopIteration, e:
965 if isinstance(task, _ChildTask):
966 if not e.args:
967 output = None
968 elif len(e.args) == 1:
969 output = e.args[0]
970 else:
971 output = e.args
972 self._enqueue(task.parent, input=output)
973 except:
974 if isinstance(task, _ChildTask):
975 # Propagate exception to parent
976 self._enqueue(task.parent, exc_info=sys.exc_info())
977 else:
978 # No parent task, so just die
979 raise
980 else:
981 self._handle_task_output(task, output)
982
983 def _fix_run_timeout(self, timeout):
984 if self.has_runnable():
985 # Don't block if there are tasks in the queue
986 timeout = 0.0
987 elif self.has_timeouts():
988 # If there are timeouts, block only until the first expiration
989 expiration_timeout = max(0.0, self._timeouts[0][0] - time.time())
990 if (timeout is None) or (timeout > expiration_timeout):
991 timeout = expiration_timeout
992 return timeout
993
994 def _handle_io_waits(self, timeout):
995 # The error handling here is (mostly) borrowed from Twisted
996 try:
997 read_ready, write_ready, exc_ready = \
998 select.select(self._read_waits,
999 self._write_waits,
1000 self._exc_waits,
1001 timeout)
1002 except (TypeError, ValueError):
1003 self._remove_bad_file_descriptors()
1004 except (select.error, IOError), err:
1005 if err[0] == errno.EINTR:
1006 pass
1007 elif ((err[0] == errno.EBADF) or
1008 ((sys.platform == 'win32') and
1009 (err[0] == 10038))): # WSAENOTSOCK
1010 self._remove_bad_file_descriptors()
1011 else:
1012 # Not an error we can handle, so die
1013 raise
1014 else:
1015 for fd in set(read_ready + write_ready + exc_ready):
1016 try:
1017 input = (fd._eval() if isinstance(fd, FDAction) else None)
1018 self._enqueue(fd.task, input=input)
1019 except:
1020 self._enqueue(fd.task, exc_info=sys.exc_info())
1021 fd._remove_from_fdsets(self._read_waits,
1022 self._write_waits,
1023 self._exc_waits)
1024 if fd._expires():
1025 self._remove_timeout(fd)
1026
1027 def _remove_bad_file_descriptors(self):
1028 for fd in (self._read_waits | self._write_waits | self._exc_waits):
1029 try:
1030 select.select([fd], [fd], [fd], 0.0)
1031 except:
1032 self._enqueue(fd.task, exc_info=sys.exc_info())
1033 fd._remove_from_fdsets(self._read_waits,
1034 self._write_waits,
1035 self._exc_waits)
1036 if fd._expires():
1037 self._remove_timeout(fd)
1038
1039 def _add_timeout(self, item, handler):
1040 item.handle_expiration = handler
1041 heapq.heappush(self._timeouts, (item.expiration, item))
1042
1043 def _remove_timeout(self, item):
1044 self._timeouts.remove((item.expiration, item))
1045 heapq.heapify(self._timeouts)
1046
1047 def _handle_timeouts(self, timeout):
1048 if (not self.has_runnable()) and (timeout > 0.0):
1049 time.sleep(timeout)
1050
1051 current_time = time.time()
1052
1053 while self._timeouts and (self._timeouts[0][0] <= current_time):
1054 item = heapq.heappop(self._timeouts)[1]
1055 if isinstance(item, _SleepDelay):
1056 self._enqueue(item.task)
1057 else:
1058 self._enqueue(item.task, exc_info=(Timeout,))
1059 item.handle_expiration()
1060
1061 def _handle_task_output(self, task, output):
1062 if isinstance(output, types.GeneratorType):
1063 self._enqueue(_ChildTask(task, output))
1064 elif isinstance(output, YieldCondition):
1065 output.task = task
1066 if isinstance(output, _SleepDelay):
1067 self._add_timeout(output, None)
1068 elif isinstance(output, FDReady):
1069 self._handle_fdready(task, output)
1070 elif isinstance(output, _QueueAction):
1071 self._handle_queue_action(task, output)
1072 elif isinstance(output, _SmartQueueAction):
1073 self._handle_smart_queue_action(task, output)
1074 else:
1075 # Return any other output as input and send task to
1076 # end of queue
1077 self._enqueue(task, input=output)
1078
1079 def _handle_fdready(self, task, output):
1080 output._add_to_fdsets(self._read_waits,
1081 self._write_waits,
1082 self._exc_waits)
1083 if output._expires():
1084 self._add_timeout(output,
1085 (lambda:
1086 output._remove_from_fdsets(self._read_waits,
1087 self._write_waits,
1088 self._exc_waits)))
1089
1090 def _handle_queue_action(self, task, output):
1091 get_waits, put_waits = self._queue_waits[output.queue]
1092
1093 if output.item is output.NO_ITEM:
1094 # Action is a get
1095 if output.queue.empty():
1096 get_waits.append(output)
1097 if output._expires():
1098 self._add_timeout(output,
1099 (lambda: get_waits.remove(output)))
1100 else:
1101 item = output.queue._get()
1102 self._enqueue(task, input=item)
1103 if put_waits:
1104 action = put_waits.popleft()
1105 output.queue._put(action.item)
1106 self._enqueue(action.task)
1107 if action._expires():
1108 self._remove_timeout(action)
1109 else:
1110 # Action is a put
1111 if output.queue.full():
1112 put_waits.append(output)
1113 if output._expires():
1114 self._add_timeout(output,
1115 (lambda: put_waits.remove(output)))
1116 else:
1117 output.queue._put(output.item)
1118 self._enqueue(task)
1119 if get_waits:
1120 action = get_waits.popleft()
1121 item = output.queue._get()
1122 self._enqueue(action.task, input=item)
1123 if action._expires():
1124 self._remove_timeout(action)
1125
1126
1127 def _handle_smart_queue_action(self, task, output):
1128 get_waits, put_waits = self._queue_waits[output.queue]
1129
1130 if output.item is output.NO_ITEM:
1131 # Action is a get
1132 item = output.queue._get(criteria=output.criteria)
1133 if item is None:
1134 get_waits.append(output)
1135 if output._expires():
1136 self._add_timeout(output,
1137 (lambda: get_waits.remove(output)))
1138 else:
1139 self._enqueue(task, input=item)
1140 if put_waits:
1141 action = put_waits.popleft()
1142 output.queue._put(action.item)
1143 self._enqueue(action.task)
1144 if action._expires():
1145 self._remove_timeout(action)
1146 else:
1147 # Action is a put
1148 if output.queue.full():
1149 put_waits.append(output)
1150 if output._expires():
1151 self._add_timeout(output,
1152 (lambda: put_waits.remove(output)))
1153 else:
1154 output.queue._put(output.item)
1155 self._enqueue(task)
1156 if get_waits:
1157 actions = []
1158 for action in get_waits:
1159 item = output.queue._get(criteria=action.criteria)
1160 if item is not None:
1161 actions.append((action, item))
1162 for action,item in actions:
1163 get_waits.remove(action)
1164 self._enqueue(action.task, input=item)
1165 if action._expires():
1166 self._remove_timeout(action)
1167
1168
1169
1170 ################################################################################
1171 #
1172 # Default TaskManager instance
1173 #
1174 ################################################################################
1175
1176
1177
1178 _default_task_manager = None
1179
1180
1181 def get_default_task_manager():
1182 'Return the default TaskManager instance'
1183 global _default_task_manager
1184 if _default_task_manager is None:
1185 _default_task_manager = TaskManager()
1186 return _default_task_manager
1187
1188
1189 def add(task):
1190 'Add a task to the default TaskManager instance'
1191 get_default_task_manager().add(task)
1192
1193
1194 def run():
1195 'Run the default TaskManager instance'
1196 get_default_task_manager().run()
1197
1198
1199
1200 ################################################################################
1201 #
1202 # Test routine
1203 #
1204 ################################################################################
1205
1206
1207
1208 if __name__ == '__main__':
1209 if sys.platform == 'win32':
1210 # Make sure WSAStartup() is called
1211 import socket
1212
1213 def printer(name):
1214 for i in xrange(1, 4):
1215 print '%s:\t%d' % (name, i)
1216 yield
1217
1218 t = TaskManager()
1219 t.add(printer('first'))
1220 t.add(printer('second'))
1221 t.add(printer('third'))
1222
1223 queue = Queue()
1224
1225 def receiver():
1226 print 'receiver started'
1227 print 'receiver received: %s' % (yield queue.get())
1228 print 'receiver finished'
1229
1230 def sender():
1231 print 'sender started'
1232 yield queue.put('from sender')
1233 print 'sender finished'
1234
1235 def bad_descriptor():
1236 print 'bad_descriptor running'
1237 try:
1238 yield readable(12)
1239 except:
1240 print 'exception in bad_descriptor:', sys.exc_info()[1]
1241
1242 def sleeper():
1243 print 'sleeper started'
1244 yield sleep(1)
1245 print 'sleeper finished'
1246
1247 def timeout_immediately():
1248 print 'timeout_immediately running'
1249 try:
1250 yield Queue().get(timeout=0)
1251 except Timeout:
1252 print 'timeout_immediately timed out'
1253
1254 t2 = TaskManager()
1255 t2.add(receiver())
1256 t2.add(bad_descriptor())
1257 t2.add(sender())
1258 t2.add(sleeper())
1259 t2.add(timeout_immediately())
1260
1261 def parent():
1262 print 'child returned: %s' % ((yield child()),)
1263 try:
1264 yield child(raise_exc=True)
1265 except:
1266 print 'exception in child:', sys.exc_info()[1]
1267
1268 def child(raise_exc=False):
1269 yield
1270 if raise_exc:
1271 raise RuntimeError('foo')
1272 raise StopIteration(1, 2, 3)
1273
1274 t3 = TaskManager()
1275 t3.add(parent())
1276
1277 t.merge(t2)
1278 t.merge(t3)
1279 t.run()
1280
1281 assert not(t.has_runnable() or t.has_io_waits() or t.has_timeouts())