Package coro
[hide private]
[frames] | no frames]

Package coro

source code

Coroutine threading library.

Introduction

Shrapnel is a cooperative threading library.

Getting Started

When your process starts up, you must spawn a thread to do some work, and then start the event loop. The event loop runs forever processing events until the process exits. An example:

import coro

def main():
    print 'Hello world!'
    # This will cause the process to exit.
    coro.set_exit(0)

coro.spawn(main)
coro.event_loop()

Coroutines

Every coroutine thread is created with either the new function (which does NOT automatically start the thread) or the spawn function (which DOES automatically start it).

Every thread has a unique numeric ID. You may also set the name of the thread when you create it.

Timeouts

The shrapnel timeout facility allows you to execute a function which will be interrupted if it does finish within a specified period of time. The coro.TimeoutError exception will be raised if the timeout expires. See the with_timeout docstring for more detail.

If the event loop is not running (such as in a non-coro process), a custom version of with_timeout is installed that will operate using SIGALRM so that you may use with_timeout in code that needs to run in non-coro processes (though this is not recommended and should be avoided if possible).

Thread Local Storage

There is a tread-local storage interface available for storing global data this is thread-specific. You instantiate a ThreadLocal instance and you can assign attributes to it that will be specific to that thread. See the ThreadLocal docs for more detail.

Signal Handlers

By default when you start the event loop, two signal handlers are installed (for SIGTERM and SIGINT). The default signal handler will exit the event loop. You can change this behavior by setting install_signal_handlers to False before starting the event loop.

See coro.signal_handler for more detail on setting coro signal handlers.

Selfishness

Certain socket operations are allowed to try to execute without blocking if they are able to (such as send/receiving data on a local socket or on a high-speed network). However, there is a limit to the number of times a thread is allowed to do this. The default is 4. The default may be changed (set_selfishness) and the value on a per-thread may be changed (coro.coro.set_max_selfish_acts).

Time

Shrapnel uses the tsc_time module for handling time. It uses the TSC value for a stable and high-resolution unit of time. See that module's documentation for more detail.

A thread is always created when you start the event loop that will resynchronize the TSC relationship to accomodate any clock drift (see tick_updater and tsc_time.update_time_relation).

Exception Notifier

When a thread exits due to an exception, by default a stack trace is printed to stderr. You may install your own callback to handle this situation. See the set_exception_notifier function for more detail.

Debug Output

The shrapnel library provides a mechanism for printing debug information to stderr. The print_stderr function will print a string with a timestamp and the thread number. The write_stderr function writes the string verbatim.

Shrapnel keeps a reference to the "real" stderr (in saved_stderr) and the print_stderr and write_stderr functions always use the real stderr value. A particular reason for doing this is the backdoor module replaces sys.stderr and sys.stdout, but we do not want debug output to go to the interactive session.

Profiling

Shrapnel has its own profiler that is coro-aware. See coro.profiler for details on how to run the profiler.

Submodules [hide private]

Classes [hide private]
  InParallelError
An error occurred in the in_parallel function.
  event_queue
  fifo
First-in First-Out container.
  rusage
Functions [hide private]
 
_in_parallel_wrap(result_list, i, sem, (fun, args)) source code
 
_original_event_loop(...)
Start the event loop.
source code
 
_original_sleep_relative(...)
Sleep for a period of time.
source code
 
_original_with_timeout(...)
Call a function with a timeout.
source code
 
coro_is_running()
Determine if the coro event loop is running.
source code
 
ctime_ticks(t)
This is for backwards compatibility and should not be used.
source code
 
ctime_usec(u)
This is for backwards compatibility and should not be used.
source code
 
default_exception_notifier() source code
 
event_loop(timeout=30)
Start the event loop.
source code
 
get_thread_by_id(thread_id)
Get a coro thread by ID.
source code
 
get_usec()
This is for backwards compatibility and should not be used.
source code
 
in_parallel(fun_arg_list)
Execute several functions in parallel.
source code
 
install_thread_emulation()
Install Python threading emulation.
source code
 
new(fun, *args, **kwargs)
Create a new coroutine object.
source code
 
set_handler(...)
Add a kevent handler.
source code
 
set_latency_warning(...)
Set the latency warning threshold multiplier.
source code
 
sigterm_handler(*_unused_args) source code
 
sleep_absolute(...)
This is an alias for the sleep method.
source code
 
spawn(fun, *args, **kwargs)
Spawn a new coroutine.
source code
 
tick_updater()
Updates TSC<->POSIX relation.
source code
 
wait_for(...)
Wait for an event.
source code
 
waitpid(pid)
Wait for a process to exit.
source code
 
where(co)
Return a string indicating where the given coroutine thread is currently running.
source code
 
where_all()
Get a dictionary of where all coroutines are currently executing.
source code
Variables [hide private]
  FAILURE = 'failure'
  SUCCESS = 'success'
  UNAME = 'FreeBSD'
  __package__ = 'coro'
  all_threads = {}
: A dictionary of all live coroutine objects.
  event_loop_is_running = False
  event_map = {}
  install_signal_handlers = True
  microseconds = 1000000
  now = 6412134603272295
  now_usec = 1329444398561186
  saved_stderr = <epydoc.docintrospecter._DevNull instance at 0x...
: The actual stderr object for the process.
  tick_update_interval = 3600
  ticks_per_sec = 2659636680
  ticks_per_usec = 2659
Function Details [hide private]

_original_event_loop(...)

source code 
Start the event loop.
Parameters:
  • timeout - : The amount of time to wait for kevent to return events. You should probably not set this value. Defaults to 30 seconds.

_original_sleep_relative(...)

source code 

Sleep for a period of time.

If a thread is interrupted at the exact same time the sleep is finished, it is not defined whether the interrupt or the sleep "wins". Your thread may continue running (with the interrupt rescheduled to try again later), or it may be interrupted.

Parameters:
  • delta - : The number of seconds to sleep.

_original_with_timeout(...)

source code 

Call a function with a timeout.

Additional arguments and keyword arguments provided are passed to the function. This will re-raise any exceptions raised by the function.

If a timeout expires, but the function returns before the next pass in the event loop, then the timeout will be diffused.

If a coroutine is already scheduled to run (such as if it received a kevent), and the timeout expires, the timeout will be put on "hold" to let the coroutine run and process the data. If the function returns, then the timeout will be defused, otherwise the timeout will be given another chance to fire during the next pass through the event loop. One should note that due to this behavior, if a coroutine is continually receiving kevents, the timeout will never fire until the kevents stop.

Nested timeouts will be handled correctly. If an outer timeout fires first, then only the outer except TimeoutError exception handler will catch it. An exception handlers on the inside will be skipped becaue the actual exception is the Interrupted exception until it gets to the original with_timeout frame.

Nested timeouts that are set to fire at the exact same time are not defined which one will fire first.

Care must be taken to never catch the Interrupted exception within code that is wrapped with a timeout.

Returns:
Returns the return value of the function.
Raises:
  • TimeoutError - : The function did not return within the specified timeout.

Parameters:

  • delta: The number of seconds to wait before raising a timeout.

    Should be >= 0. Negative value will be treated as 0.

  • function: The function to call.

coro_is_running()

source code 
Determine if the coro event loop is running.
Returns:
Returns True if the event loop is running, otherwise False.

event_loop(timeout=30)

source code 
Start the event loop.
Parameters:
  • timeout - : The amount of time to wait for kevent to return events. You should probably not set this value.

get_thread_by_id(thread_id)

source code 
Get a coro thread by ID.
Parameters:
  • thread_id - : The thread ID.
Returns:
Returns the coroutine object.
Raises:
  • KeyError - : The coroutine does not exist.

in_parallel(fun_arg_list)

source code 

Execute several functions in parallel.

This will block until all functions have returned or raised an exception.

If one or more functions raises an exception, then the InParallelError exception will be raised.

Parameters:
  • fun_arg_list - : A list of (fun, args) tuples.
Returns:
Returns a list of return values from the functions.
Raises:

install_thread_emulation()

source code 

Install Python threading emulation.

It is recommended that you call this at the very beginning of the main script of your application before importing anything else. This will cause the following modules to be emulated:

  • thread
  • threading
  • socket

At this time, no other blocking operations are supported.

new(fun, *args, **kwargs)

source code 

Create a new coroutine object.

Additional arguments and keyword arguments will be passed to the given function.

This will not start the coroutine. Call the start method on the coroutine to schedule it to run.

Parameters:
  • fun - : The function to call when the coroutine starts.
  • thread_name - : The name of the thread. Defaults to the name of the function.
Returns:
Returns the new coroutine object.

set_handler(...)

source code 

Add a kevent handler.

This is a low-level interface to register a kevent handler.

Parameters:
  • event - : A tuple of (ident, filter) of the kevent to handle.
  • handler - : The handler to use, a callable object which will be called with one argument, a py_kevent object.
  • flags - : Kevent flags to use. Defaults to EV_ADD|EV_ONESHOT.
  • fflags` - : Kevent filter flags to use. Defaults to 0.
Raises:

set_latency_warning(...)

source code 

Set the latency warning threshold multiplier.

The default latency warning threshold is 0.2 seconds. This will allow you to change the threshold by multiplying the 0.2 value.

Parameters:
  • factor - : The latency threshold multiplier. May be a number from 0 to 300. A value of 0 disables latency warnings.
Returns:
Returns the old multipler factor.
Raises:
  • ValueError - : The factor is too small or too large.

spawn(fun, *args, **kwargs)

source code 

Spawn a new coroutine.

Additional arguments and keyword arguments will be passed to the given function.

Parameters:
  • fun - : The function to call when the coroutine starts.
  • thread_name - : The name of the thread. Defaults to the name of the function.
Returns:
Returns the new coroutine object.

tick_updater()

source code 

Updates TSC<->POSIX relation.

This is a thread that runs forever. It is responsible for updating the now and now_usec variables every hour. This will take care of any clock drift because our ticks_per_sec variable might be slightly off.

This runs once an hour.

wait_for(...)

source code 
Wait for an event.
Parameters:
  • ident - : The kevent identifier (depends on the filter type, but is often a file descriptor).
  • filter - : The kevent filter (see EVFILT).
  • fflags - : Filter flags (defaults to 0).
Returns:
Returns a py_kevent instance that indicates the event that fired.
Raises:

waitpid(pid)

source code 
Wait for a process to exit.
Parameters:
  • pid - : The process ID to wait for.
Returns:
Returns a tuple (pid, status) of the process.
Raises:

where(co)

source code 
Return a string indicating where the given coroutine thread is currently running.
Parameters:
  • co - : The coroutine object.
Returns:
Returns a string displaying where the coro thread is currently executing.

where_all()

source code 
Get a dictionary of where all coroutines are currently executing.
Returns:
Returns a dictionary mapping the coroutine ID to a tuple of (name, coro, where) where where is a string representing where the coroutine is currently running.

Variables Details [hide private]

all_threads

: A dictionary of all live coroutine objects. The key is the coroutine ID, and the value is the coroutine object.
Value:
{}

saved_stderr

: The actual stderr object for the process. This normally should not be used. An example of why this exists is because the backdoor replaces sys.stderr while executing user code.
Value:
<epydoc.docintrospecter._DevNull instance at 0x80272ac20>