Package coro
[hide private]
[frames] | no frames]

Source Code for Package coro

  1  # Copyright (c) 2002-2011 IronPort Systems and Cisco Systems 
  2  #  
  3  # Permission is hereby granted, free of charge, to any person obtaining a copy 
  4  # of this software and associated documentation files (the "Software"), to deal 
  5  # in the Software without restriction, including without limitation the rights 
  6  # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 
  7  # copies of the Software, and to permit persons to whom the Software is 
  8  # furnished to do so, subject to the following conditions: 
  9  #  
 10  # The above copyright notice and this permission notice shall be included in 
 11  # all copies or substantial portions of the Software. 
 12  #  
 13  # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 
 14  # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
 15  # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 
 16  # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 
 17  # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 
 18  # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 
 19  # SOFTWARE. 
 20   
 21  # $Header: //prod/main/ap/shrapnel/coro/__init__.py#31 $ 
 22   
 23  """Coroutine threading library. 
 24   
 25  Introduction 
 26  ============ 
 27  Shrapnel is a cooperative threading library. 
 28   
 29  Getting Started 
 30  =============== 
 31  When your process starts up, you must spawn a thread to do some work, and then 
 32  start the event loop.  The event loop runs forever processing events until the 
 33  process exits.  An example:: 
 34   
 35      import coro 
 36   
 37      def main(): 
 38          print 'Hello world!' 
 39          # This will cause the process to exit. 
 40          coro.set_exit(0) 
 41   
 42      coro.spawn(main) 
 43      coro.event_loop() 
 44   
 45  Coroutines 
 46  ========== 
 47  Every coroutine thread is created with either the `new` function (which does 
 48  NOT automatically start the thread) or the `spawn` function (which DOES 
 49  automatically start it). 
 50   
 51  Every thread has a unique numeric ID.  You may also set the name of the thread 
 52  when you create it. 
 53   
 54  Timeouts 
 55  ======== 
 56  The shrapnel timeout facility allows you to execute a function which will be 
 57  interrupted if it does finish within a specified period of time.  The 
 58  `coro.TimeoutError` exception will be raised if the timeout expires.  See the 
 59  `with_timeout` docstring for more detail. 
 60   
 61  If the event loop is not running (such as in a non-coro process), a custom 
 62  version of `with_timeout` is installed that will operate using SIGALRM so that 
 63  you may use `with_timeout` in code that needs to run in non-coro processes 
 64  (though this is not recommended and should be avoided if possible). 
 65   
 66  Thread Local Storage 
 67  ==================== 
 68  There is a tread-local storage interface available for storing global data this 
 69  is thread-specific.  You instantiate a `ThreadLocal` instance and you can 
 70  assign attributes to it that will be specific to that thread.  See the 
 71  `ThreadLocal` docs for more detail. 
 72   
 73  Signal Handlers 
 74  =============== 
 75  By default when you start the event loop, two signal handlers are installed 
 76  (for SIGTERM and SIGINT). The default signal handler will exit the event loop. 
 77  You can change this behavior by setting `install_signal_handlers` to False 
 78  before starting the event loop. 
 79   
 80  See `coro.signal_handler` for more detail on setting coro signal handlers. 
 81   
 82  Selfishness 
 83  =========== 
 84  Certain socket operations are allowed to try to execute without blocking if 
 85  they are able to (such as send/receiving data on a local socket or on a 
 86  high-speed network). However, there is a limit to the number of times a thread 
 87  is allowed to do this. The default is 4.  The default may be changed 
 88  (`set_selfishness`) and the value on a per-thread may be changed 
 89  (`coro.coro.set_max_selfish_acts`). 
 90   
 91  Time 
 92  ==== 
 93  Shrapnel uses the `tsc_time` module for handling time.  It uses the TSC 
 94  value for a stable and high-resolution unit of time.  See that module's 
 95  documentation for more detail. 
 96   
 97  A thread is always created when you start the event loop that will 
 98  resynchronize the TSC relationship to accomodate any clock drift (see 
 99  `tick_updater` and `tsc_time.update_time_relation`). 
100   
101  Exception Notifier 
102  ================== 
103  When a thread exits due to an exception, by default a stack trace is printed to 
104  stderr.  You may install your own callback to handle this situation.  See the 
105  `set_exception_notifier` function for more detail. 
106   
107  Debug Output 
108  ============ 
109  The shrapnel library provides a mechanism for printing debug information to 
110  stderr.  The `print_stderr` function will print a string with a timestamp 
111  and the thread number.  The `write_stderr` function writes the string verbatim. 
112   
113  Shrapnel keeps a reference to the "real" stderr (in `saved_stderr`) and the 
114  `print_stderr` and `write_stderr` functions always use the real stderr value. A 
115  particular reason for doing this is the backdoor module replaces sys.stderr and 
116  sys.stdout, but we do not want debug output to go to the interactive session. 
117   
118  Profiling 
119  ========= 
120  Shrapnel has its own profiler that is coro-aware.  See `coro.profiler` for 
121  details on how to run the profiler. 
122   
123  :Variables: 
124      - `all_threads`: A dictionary of all live coroutine objects.  The key is 
125        the coroutine ID, and the value is the coroutine object. 
126      - `saved_stderr`: The actual stderr object for the process.  This normally 
127        should not be used.  An example of why this exists is because the 
128        backdoor replaces sys.stderr while executing user code. 
129  """ 
130   
131  __all__ = [ 
132      '_coro', 
133      'aio', 
134      'backdoor', 
135      'event_queue', 
136      'fifo', 
137      'ironport', 
138      'lio', 
139      'local', 
140      'optional', 
141      'oserrors', 
142      'poller', 
143      'print_profile', 
144      'profile', 
145      'profiler', 
146      'rusage', 
147      'signal_handler', 
148      'socket', 
149      'sync', 
150      'tb', 
151  ] 
152   
153  from coro._coro import * 
154  from coro._coro import _yield 
155  from coro import signal_handler 
156  from coro import optional 
157  from coro import tb 
158   
159  import signal 
160  import sys 
161  import time 
162  import os 
163   
164  UNAME = os.uname()[0] 
165   
166  # ============================================================================ 
167  #                        tracebacks/exceptions 
168  # ============================================================================ 
169   
170  compact_traceback = tb.traceback_string 
171   
172  # This overrides the one in <_coro> 
173 -def default_exception_notifier():
174 me = current() 175 print_stderr ( 176 'thread %d (%s): error %r\n' % ( 177 me.id, 178 me.name, 179 compact_traceback(), 180 ) 181 )
182 183 set_exception_notifier (default_exception_notifier) 184 185 # ============================================================================ 186 # parallel execution 187 # ============================================================================ 188
189 -class InParallelError (Exception):
190 191 """An error occurred in the `in_parallel` function. 192 193 :IVariables: 194 - `result_list`: A list of ``(status, result)`` tuples. ``status`` is 195 either `SUCCESS` or `FAILURE`. For success, the result is the return 196 value of the function. For failure, it is the output from 197 `sys.exc_info`. 198 """ 199
200 - def __init__(self, result_list):
201 self.result_list = result_list 202 Exception.__init__(self, result_list)
203 204 SUCCESS = 'success' 205 FAILURE = 'failure' 206
207 -def _in_parallel_wrap (result_list, i, sem, (fun, args)):
208 try: 209 result = (SUCCESS, fun (*args)) 210 except: 211 result = (FAILURE, sys.exc_info()) 212 result_list[i] = result 213 sem.release(1)
214
215 -def in_parallel (fun_arg_list):
216 """Execute several functions in parallel. 217 218 This will block until all functions have returned or raised an exception. 219 220 If one or more functions raises an exception, then the `InParallelError` 221 exception will be raised. 222 223 :Parameters: 224 - `fun_arg_list`: A list of ``(fun, args)`` tuples. 225 226 :Return: 227 Returns a list of return values from the functions. 228 229 :Exceptions: 230 - `InParallelError`: One or more of the functions raised an exception. 231 """ 232 # InParallelError, [(SUCCESS, result0), (FAILURE, exc_info1), ...] 233 234 n = len(fun_arg_list) 235 if n==0: 236 return [] 237 result_list = [None] * n 238 sem = inverted_semaphore(n) 239 for i in xrange (n): 240 spawn ( 241 _in_parallel_wrap, 242 result_list, 243 i, 244 sem, 245 fun_arg_list[i] 246 ) 247 sem.block_till_zero() 248 for i in xrange (n): 249 if result_list[i][0] is FAILURE: 250 raise InParallelError, result_list 251 # no errors, convert to a simple result list 252 return [x[1] for x in result_list]
253 254 # ============================================================================ 255 # time 256 # ============================================================================ 257 258 # Every hour 259 tick_update_interval = 3600 260
261 -def tick_updater():
262 """Updates TSC<->POSIX relation. 263 264 This is a thread that runs forever. It is responsible for updating the now 265 and now_usec variables every hour. This will take care of any clock drift 266 because our ticks_per_sec variable might be slightly off. 267 268 This runs once an hour. 269 """ 270 global tick_update_interval 271 while 1: 272 sleep_relative(tick_update_interval) 273 tsc_time.update_time_relation()
274 275 # ============================================================================ 276 # waitpid 277 # ============================================================================ 278
279 -def waitpid (pid):
280 """Wait for a process to exit. 281 282 :Parameters: 283 - `pid`: The process ID to wait for. 284 285 :Return: 286 Returns a tuple ``(pid, status)`` of the process. 287 288 :Exceptions: 289 - `SimultaneousError`: Something is already waiting for this process 290 ID. 291 """ 292 if UNAME == "Linux": 293 # XXX Replace this sleep crap with netlink and epoll 294 status = 0 295 while status == 0: 296 returned_pid, status = os.waitpid (pid, os.WNOHANG) 297 sleep_relative(4) 298 299 return returned_pid, status 300 else: 301 me = current() 302 wait_for(pid, EVFILT.PROC, fflags=NOTE.EXIT) 303 # this should always succeed immediately. 304 # XXX too bad kqueue NOTE_EXIT doesn't have the status. 305 return os.waitpid (pid, 0)
306 307 308 # ============================================================================ 309 # misc 310 # ============================================================================ 311
312 -def get_thread_by_id (thread_id):
313 """Get a coro thread by ID. 314 315 :Parameters: 316 - `thread_id`: The thread ID. 317 318 :Return: 319 Returns the coroutine object. 320 321 :Exceptions: 322 - `KeyError`: The coroutine does not exist. 323 """ 324 return all_threads[thread_id]
325
326 -def where (co):
327 """Return a string indicating where the given coroutine thread is currently 328 running. 329 330 :Parameters: 331 - `co`: The coroutine object. 332 333 :Return: 334 Returns a string displaying where the coro thread is currently 335 executing. 336 """ 337 f = co.get_frame() 338 return tb.stack_string(f)
339
340 -def where_all():
341 """Get a dictionary of where all coroutines are currently executing. 342 343 :Return: 344 Returns a dictionary mapping the coroutine ID to a tuple of ``(name, 345 coro, where)`` where ``where`` is a string representing where the 346 coroutine is currently running. 347 """ 348 output = {} 349 for id, c in all_threads.items(): 350 output[id] = (c.get_name(), c, where(c)) 351 return output
352 353 # ============================================================================ 354 # spawn/new wrappers 355 # ============================================================================ 356 357 _original_spawn = spawn 358
359 -def spawn (fun, *args, **kwargs):
360 """Spawn a new coroutine. 361 362 Additional arguments and keyword arguments will be passed to the given function. 363 364 :Parameters: 365 - `fun`: The function to call when the coroutine starts. 366 - `thread_name`: The name of the thread. Defaults to the name of the 367 function. 368 369 :Return: 370 Returns the new coroutine object. 371 """ 372 if kwargs.has_key('thread_name'): 373 thread_name = kwargs['thread_name'] 374 del kwargs['thread_name'] 375 else: 376 thread_name = '%s' % (fun,) 377 return _original_spawn (fun, *args, **kwargs).set_name (thread_name)
378 379 _original_new = new
380 -def new (fun, *args, **kwargs):
381 """Create a new coroutine object. 382 383 Additional arguments and keyword arguments will be passed to the given 384 function. 385 386 This will not start the coroutine. Call the ``start`` method on the 387 coroutine to schedule it to run. 388 389 :Parameters: 390 - `fun`: The function to call when the coroutine starts. 391 - `thread_name`: The name of the thread. Defaults to the name of the 392 function. 393 394 :Return: 395 Returns the new coroutine object. 396 """ 397 if kwargs.has_key('thread_name'): 398 thread_name = kwargs['thread_name'] 399 del kwargs['thread_name'] 400 else: 401 thread_name = '%s' % (fun,) 402 return _original_new (fun, *args, **kwargs).set_name (thread_name)
403 404 # ============================================================================ 405 # time backwards compatibility 406 # ============================================================================ 407 import coro.clocks.tsc_time as tsc_time 408 409 ticks_per_sec = tsc_time.ticks_per_sec 410 ticks_per_usec = tsc_time.ticks_per_usec 411 microseconds = 1000000 412 413 absolute_time_to_ticks = tsc_time.usec_to_ticks 414 ticks_to_absolute_time = tsc_time.ticks_to_usec 415 absolute_time_to_ticks_safe = tsc_time.usec_to_ticks_safe 416 ticks_to_absolute_time_safe = tsc_time.ticks_to_usec_safe 417 absolute_secs_to_ticks = tsc_time.sec_to_ticks 418 ticks_to_absolute_secs = tsc_time.ticks_to_sec 419 get_now = tsc_time.rdtsc 420 update_time_relation = tsc_time.update_time_relation 421
422 -def get_usec():
423 """This is for backwards compatibility and should not be used.""" 424 return tsc_time.ticks_to_usec(get_now())
425
426 -def ctime_ticks(t):
427 """This is for backwards compatibility and should not be used.""" 428 return tsc_time.TSC_from_ticks(t).ctime()
429
430 -def ctime_usec(u):
431 """This is for backwards compatibility and should not be used.""" 432 return tsc_time.TSC_from_posix_usec(u).ctime()
433 434 now = get_now() 435 now_usec = get_usec() 436 437 438 # ============================================================================ 439 # non-coro compatibility 440 # ============================================================================ 441 442 _original_with_timeout = with_timeout 443 with_timeout = optional.with_timeout 444 _original_sleep_relative = sleep_relative 445 sleep_relative = optional.sleep_relative 446 447 # ============================================================================ 448 # Python compatibility 449 # ============================================================================ 450
451 -def install_thread_emulation():
452 """Install Python threading emulation. 453 454 It is recommended that you call this at the very beginning of the main 455 script of your application before importing anything else. This will 456 cause the following modules to be emulated: 457 458 - thread 459 - threading 460 - socket 461 462 At this time, no other blocking operations are supported. 463 """ 464 import coro.emulation.socket 465 import coro.emulation.thread 466 import coro.emulation.threading 467 sys.modules['thread'] = coro.emulation.thread 468 sys.modules['threading'] = coro.emulation.threading 469 sys.modules['socket'] = coro.emulation.socket
470 471 # ============================================================================ 472 # event loop 473 # ============================================================================ 474 475 _original_event_loop = event_loop 476 install_signal_handlers = True 477 event_loop_is_running = False 478
479 -def coro_is_running():
480 """Determine if the coro event loop is running. 481 482 :Return: 483 Returns True if the event loop is running, otherwise False. 484 """ 485 return event_loop_is_running
486
487 -def sigterm_handler (*_unused_args):
488 _coro.set_exit()
489
490 -def event_loop (timeout=30):
491 """Start the event loop. 492 493 :Parameters: 494 - `timeout`: The amount of time to wait for kevent to return 495 events. You should probably *not* set this value. 496 """ 497 global event_loop_is_running, with_timeout, sleep_relative 498 # replace time.time with our tsc-based version 499 time.time, time.original_time = tsc_time.now_raw_posix_fsec, time.time 500 with_timeout = _original_with_timeout 501 sleep_relative = _original_sleep_relative 502 if install_signal_handlers: 503 signal_handler.register(signal.SIGTERM, sigterm_handler) 504 signal_handler.register(signal.SIGINT, sigterm_handler) 505 spawn (tick_updater).set_name ('tick_updater') 506 try: 507 event_loop_is_running = True 508 _original_event_loop (timeout) 509 finally: 510 event_loop_is_running = False 511 # put it back 512 time.time = time.original_time
513