1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 """Coroutine threading library.
24
25 Introduction
26 ============
27 Shrapnel is a cooperative threading library.
28
29 Getting Started
30 ===============
31 When your process starts up, you must spawn a thread to do some work, and then
32 start the event loop. The event loop runs forever processing events until the
33 process exits. An example::
34
35 import coro
36
37 def main():
38 print 'Hello world!'
39 # This will cause the process to exit.
40 coro.set_exit(0)
41
42 coro.spawn(main)
43 coro.event_loop()
44
45 Coroutines
46 ==========
47 Every coroutine thread is created with either the `new` function (which does
48 NOT automatically start the thread) or the `spawn` function (which DOES
49 automatically start it).
50
51 Every thread has a unique numeric ID. You may also set the name of the thread
52 when you create it.
53
54 Timeouts
55 ========
56 The shrapnel timeout facility allows you to execute a function which will be
57 interrupted if it does finish within a specified period of time. The
58 `coro.TimeoutError` exception will be raised if the timeout expires. See the
59 `with_timeout` docstring for more detail.
60
61 If the event loop is not running (such as in a non-coro process), a custom
62 version of `with_timeout` is installed that will operate using SIGALRM so that
63 you may use `with_timeout` in code that needs to run in non-coro processes
64 (though this is not recommended and should be avoided if possible).
65
66 Thread Local Storage
67 ====================
68 There is a tread-local storage interface available for storing global data this
69 is thread-specific. You instantiate a `ThreadLocal` instance and you can
70 assign attributes to it that will be specific to that thread. See the
71 `ThreadLocal` docs for more detail.
72
73 Signal Handlers
74 ===============
75 By default when you start the event loop, two signal handlers are installed
76 (for SIGTERM and SIGINT). The default signal handler will exit the event loop.
77 You can change this behavior by setting `install_signal_handlers` to False
78 before starting the event loop.
79
80 See `coro.signal_handler` for more detail on setting coro signal handlers.
81
82 Selfishness
83 ===========
84 Certain socket operations are allowed to try to execute without blocking if
85 they are able to (such as send/receiving data on a local socket or on a
86 high-speed network). However, there is a limit to the number of times a thread
87 is allowed to do this. The default is 4. The default may be changed
88 (`set_selfishness`) and the value on a per-thread may be changed
89 (`coro.coro.set_max_selfish_acts`).
90
91 Time
92 ====
93 Shrapnel uses the `tsc_time` module for handling time. It uses the TSC
94 value for a stable and high-resolution unit of time. See that module's
95 documentation for more detail.
96
97 A thread is always created when you start the event loop that will
98 resynchronize the TSC relationship to accomodate any clock drift (see
99 `tick_updater` and `tsc_time.update_time_relation`).
100
101 Exception Notifier
102 ==================
103 When a thread exits due to an exception, by default a stack trace is printed to
104 stderr. You may install your own callback to handle this situation. See the
105 `set_exception_notifier` function for more detail.
106
107 Debug Output
108 ============
109 The shrapnel library provides a mechanism for printing debug information to
110 stderr. The `print_stderr` function will print a string with a timestamp
111 and the thread number. The `write_stderr` function writes the string verbatim.
112
113 Shrapnel keeps a reference to the "real" stderr (in `saved_stderr`) and the
114 `print_stderr` and `write_stderr` functions always use the real stderr value. A
115 particular reason for doing this is the backdoor module replaces sys.stderr and
116 sys.stdout, but we do not want debug output to go to the interactive session.
117
118 Profiling
119 =========
120 Shrapnel has its own profiler that is coro-aware. See `coro.profiler` for
121 details on how to run the profiler.
122
123 :Variables:
124 - `all_threads`: A dictionary of all live coroutine objects. The key is
125 the coroutine ID, and the value is the coroutine object.
126 - `saved_stderr`: The actual stderr object for the process. This normally
127 should not be used. An example of why this exists is because the
128 backdoor replaces sys.stderr while executing user code.
129 """
130
131 __all__ = [
132 '_coro',
133 'aio',
134 'backdoor',
135 'event_queue',
136 'fifo',
137 'ironport',
138 'lio',
139 'local',
140 'optional',
141 'oserrors',
142 'poller',
143 'print_profile',
144 'profile',
145 'profiler',
146 'rusage',
147 'signal_handler',
148 'socket',
149 'sync',
150 'tb',
151 ]
152
153 from coro._coro import *
154 from coro._coro import _yield
155 from coro import signal_handler
156 from coro import optional
157 from coro import tb
158
159 import signal
160 import sys
161 import time
162 import os
163
164 UNAME = os.uname()[0]
165
166
167
168
169
170 compact_traceback = tb.traceback_string
171
172
174 me = current()
175 print_stderr (
176 'thread %d (%s): error %r\n' % (
177 me.id,
178 me.name,
179 compact_traceback(),
180 )
181 )
182
183 set_exception_notifier (default_exception_notifier)
184
185
186
187
188
190
191 """An error occurred in the `in_parallel` function.
192
193 :IVariables:
194 - `result_list`: A list of ``(status, result)`` tuples. ``status`` is
195 either `SUCCESS` or `FAILURE`. For success, the result is the return
196 value of the function. For failure, it is the output from
197 `sys.exc_info`.
198 """
199
201 self.result_list = result_list
202 Exception.__init__(self, result_list)
203
204 SUCCESS = 'success'
205 FAILURE = 'failure'
206
208 try:
209 result = (SUCCESS, fun (*args))
210 except:
211 result = (FAILURE, sys.exc_info())
212 result_list[i] = result
213 sem.release(1)
214
216 """Execute several functions in parallel.
217
218 This will block until all functions have returned or raised an exception.
219
220 If one or more functions raises an exception, then the `InParallelError`
221 exception will be raised.
222
223 :Parameters:
224 - `fun_arg_list`: A list of ``(fun, args)`` tuples.
225
226 :Return:
227 Returns a list of return values from the functions.
228
229 :Exceptions:
230 - `InParallelError`: One or more of the functions raised an exception.
231 """
232
233
234 n = len(fun_arg_list)
235 if n==0:
236 return []
237 result_list = [None] * n
238 sem = inverted_semaphore(n)
239 for i in xrange (n):
240 spawn (
241 _in_parallel_wrap,
242 result_list,
243 i,
244 sem,
245 fun_arg_list[i]
246 )
247 sem.block_till_zero()
248 for i in xrange (n):
249 if result_list[i][0] is FAILURE:
250 raise InParallelError, result_list
251
252 return [x[1] for x in result_list]
253
254
255
256
257
258
259 tick_update_interval = 3600
260
262 """Updates TSC<->POSIX relation.
263
264 This is a thread that runs forever. It is responsible for updating the now
265 and now_usec variables every hour. This will take care of any clock drift
266 because our ticks_per_sec variable might be slightly off.
267
268 This runs once an hour.
269 """
270 global tick_update_interval
271 while 1:
272 sleep_relative(tick_update_interval)
273 tsc_time.update_time_relation()
274
275
276
277
278
280 """Wait for a process to exit.
281
282 :Parameters:
283 - `pid`: The process ID to wait for.
284
285 :Return:
286 Returns a tuple ``(pid, status)`` of the process.
287
288 :Exceptions:
289 - `SimultaneousError`: Something is already waiting for this process
290 ID.
291 """
292 if UNAME == "Linux":
293
294 status = 0
295 while status == 0:
296 returned_pid, status = os.waitpid (pid, os.WNOHANG)
297 sleep_relative(4)
298
299 return returned_pid, status
300 else:
301 me = current()
302 wait_for(pid, EVFILT.PROC, fflags=NOTE.EXIT)
303
304
305 return os.waitpid (pid, 0)
306
307
308
309
310
311
313 """Get a coro thread by ID.
314
315 :Parameters:
316 - `thread_id`: The thread ID.
317
318 :Return:
319 Returns the coroutine object.
320
321 :Exceptions:
322 - `KeyError`: The coroutine does not exist.
323 """
324 return all_threads[thread_id]
325
327 """Return a string indicating where the given coroutine thread is currently
328 running.
329
330 :Parameters:
331 - `co`: The coroutine object.
332
333 :Return:
334 Returns a string displaying where the coro thread is currently
335 executing.
336 """
337 f = co.get_frame()
338 return tb.stack_string(f)
339
341 """Get a dictionary of where all coroutines are currently executing.
342
343 :Return:
344 Returns a dictionary mapping the coroutine ID to a tuple of ``(name,
345 coro, where)`` where ``where`` is a string representing where the
346 coroutine is currently running.
347 """
348 output = {}
349 for id, c in all_threads.items():
350 output[id] = (c.get_name(), c, where(c))
351 return output
352
353
354
355
356
357 _original_spawn = spawn
358
359 -def spawn (fun, *args, **kwargs):
360 """Spawn a new coroutine.
361
362 Additional arguments and keyword arguments will be passed to the given function.
363
364 :Parameters:
365 - `fun`: The function to call when the coroutine starts.
366 - `thread_name`: The name of the thread. Defaults to the name of the
367 function.
368
369 :Return:
370 Returns the new coroutine object.
371 """
372 if kwargs.has_key('thread_name'):
373 thread_name = kwargs['thread_name']
374 del kwargs['thread_name']
375 else:
376 thread_name = '%s' % (fun,)
377 return _original_spawn (fun, *args, **kwargs).set_name (thread_name)
378
379 _original_new = new
380 -def new (fun, *args, **kwargs):
381 """Create a new coroutine object.
382
383 Additional arguments and keyword arguments will be passed to the given
384 function.
385
386 This will not start the coroutine. Call the ``start`` method on the
387 coroutine to schedule it to run.
388
389 :Parameters:
390 - `fun`: The function to call when the coroutine starts.
391 - `thread_name`: The name of the thread. Defaults to the name of the
392 function.
393
394 :Return:
395 Returns the new coroutine object.
396 """
397 if kwargs.has_key('thread_name'):
398 thread_name = kwargs['thread_name']
399 del kwargs['thread_name']
400 else:
401 thread_name = '%s' % (fun,)
402 return _original_new (fun, *args, **kwargs).set_name (thread_name)
403
404
405
406
407 import coro.clocks.tsc_time as tsc_time
408
409 ticks_per_sec = tsc_time.ticks_per_sec
410 ticks_per_usec = tsc_time.ticks_per_usec
411 microseconds = 1000000
412
413 absolute_time_to_ticks = tsc_time.usec_to_ticks
414 ticks_to_absolute_time = tsc_time.ticks_to_usec
415 absolute_time_to_ticks_safe = tsc_time.usec_to_ticks_safe
416 ticks_to_absolute_time_safe = tsc_time.ticks_to_usec_safe
417 absolute_secs_to_ticks = tsc_time.sec_to_ticks
418 ticks_to_absolute_secs = tsc_time.ticks_to_sec
419 get_now = tsc_time.rdtsc
420 update_time_relation = tsc_time.update_time_relation
421
423 """This is for backwards compatibility and should not be used."""
424 return tsc_time.ticks_to_usec(get_now())
425
429
433
434 now = get_now()
435 now_usec = get_usec()
436
437
438
439
440
441
442 _original_with_timeout = with_timeout
443 with_timeout = optional.with_timeout
444 _original_sleep_relative = sleep_relative
445 sleep_relative = optional.sleep_relative
446
447
448
449
450
452 """Install Python threading emulation.
453
454 It is recommended that you call this at the very beginning of the main
455 script of your application before importing anything else. This will
456 cause the following modules to be emulated:
457
458 - thread
459 - threading
460 - socket
461
462 At this time, no other blocking operations are supported.
463 """
464 import coro.emulation.socket
465 import coro.emulation.thread
466 import coro.emulation.threading
467 sys.modules['thread'] = coro.emulation.thread
468 sys.modules['threading'] = coro.emulation.threading
469 sys.modules['socket'] = coro.emulation.socket
470
471
472
473
474
475 _original_event_loop = event_loop
476 install_signal_handlers = True
477 event_loop_is_running = False
478
480 """Determine if the coro event loop is running.
481
482 :Return:
483 Returns True if the event loop is running, otherwise False.
484 """
485 return event_loop_is_running
486
489
513