1. Coroutines

The central concept of Shrapnel is the coroutine. You can think of a coroutine like it is a thread. When it runs out of work to do, it yields and allows other coroutines to run. Scheduling of coroutines is handled by the scheduler which runs an “event loop”.

1.1. Event Loop

The event loop is a loop that runs forever until the program ends. Every Shrapnel program needs to start the event loop as one of the first things it does. A typical example would be:

import coro

def main():
    print 'Hello world!'
    # This will cause the process to exit.
    coro.set_exit(0)

if __name__ == '__main__':
    coro.spawn(main)
    coro.event_loop()

1.2. Coroutines

Every coroutine thread is created with either the new() function (which does NOT automatically start the thread) or the spawn() function (which DOES automatically start it).

Every thread has a unique numeric ID. You may also set the name of the thread when you create it.

class coro.coro

Bases: object

coro(fun, args, kwargs, int id, name=None) The coroutine object.

Do not create this object directly. Use either new() or spawn() to create one.
dead

dead: ‘int’

getName(self)
get_frame(self)
get_name(self)

Get the name of this coroutine thread.

If no name has been specified, then a name is generated.

Returns:The coroutine name.
id

id: ‘int’

interrupt(self, value=None)
join(self)

Wait for thread termination.

This will wait for this thread to exit. If the thread has already exited, this will return immediately.

Warning: If a thread is created, but never started, this function will block forever.

name

name: object

raise_exception(self, the_exception, force=True, cancel_start=False)

Schedule this coroutine to resume with an exception.

Parameters:
  • the_exception – The exception to raise. May be an Exception class or instance.
  • force – If True, will force the exception to be raised, even if the coroutine is already scheduled. Defaults to True.
  • cancel_start – If True, will cancel the coroutine if it has not started, yet. If False, and the couroutine has not started, then it will rise NotStartedError. Defaults to False.
Raises:
  • DeadCoroutine – The coroutine is dead (it has already exited).
  • ScheduleError – The coroutine is already scheduled to run (and force was set to False).
  • ScheduleError – Attempted to raise an exception on the currently running coro.
  • NotStartedError – The coroutine has not started, yet.
resume_with_exc(self, exc_type, exc_value=None)
schedule(self, value=None)

Schedule this coroutine to run.

Parameters:

value – The value to resume the coroutine with. Defaults to None.

Raises:
  • DeadCoroutine – The coroutine is dead (it has already exited).
  • ScheduleError – The coroutine is already scheduled to run.
  • ScheduleError – Attempted to schedule the currently running coro.
scheduled

scheduled: ‘int’

setName(self, name)
set_max_selfish_acts(self, maximum)

Set the maximum number of selfish acts this coroutine is allowed to perform.

When a coroutine is created, it defaults to 4.

Parameters:maximum – The maximum number of selfish acts.
set_name(self, name)

Set the name of this coroutine thread.

Parameters:name – The name of the thread.
shutdown(self)

Shut down this coroutine.

This will raise the Shutdown exception on this thread.

This method will not fail. If the thread is already dead, then it is ignored. If the thread hasn’t started, then it is canceled.

start(self)

Start the coroutine for the first time.

Raises ScheduleError:
 The coro is already started.
started

started: ‘int’

thread_id(self)
value

value: object

1.3. Timeouts

The shrapnel timeout facility allows you to execute a function which will be interrupted if it does not finish within a specified period of time. The TimeoutError exception will be raised if the timeout expires. See the with_timeout() docstring for more detail.

If the event loop is not running (such as in a non-coro process), a custom version of with_timeout is installed that will operate using SIGALRM so that you may use with_timeout in code that needs to run in non-coro processes (though this is not recommended and should be avoided if possible).

coro.with_timeout(timeout, function, *args, **kwargs)

Call a function with a timeout.

This version supports running even if the coro event loop isn’t running by using SIGALRM.

See coro._coro.sched.with_timeout for more detail.

Parameters :
  • timeout: The number of seconds to wait before raising the timeout. May be a floating point number.
  • function: The function to call.
Return :

Returns the return value of the function.

Exceptions :
  • coro.TimeoutError: The timeout expired.

1.4. Parallel Execution

XXX

coro.in_parallel(fun_arg_list)

Execute several functions in parallel.

This will block until all functions have returned or raised an exception.

If one or more functions raises an exception, then the InParallelError exception will be raised.

Parameters:fun_arg_list – A list of (fun, args) tuples.
Returns:A list of return values from the functions.
Raises InParallelError:
 One or more of the functions raised an exception.
exception coro.InParallelError(result_list)

Bases: exceptions.Exception

An error occurred in the in_parallel() function.

Variables:result_list – A list of (status, result) tuples. status is either SUCCESS or FAILURE. For success, the result is the return value of the function. For failure, it is the output from sys.exc_info.

1.5. Thread Local Storage

There is a thread-local storage interface available for storing global data that is thread-specific. You instantiate a ThreadLocal instance and you can assign attributes to it that will be specific to that thread. From a design perspective, it is generally discouraged to use thread-local storage. But nonetheless, it can be useful at times.

class coro.ThreadLocal

Bases: object

Thread Local Storage.

This class implements a thread-local storage facility. You create an instance of ThreadLocal. You can get and set arbitrary attributes on that instance, and those attributes will be thread-local. For example:

>>> local = coro.ThreadLocal()
>>> local.foo = 1
>>> local.foo
1

Now, any code that references this local object can set and get any variable on that object, and the value will be local to that thread. Imagine this running in another thread:

>>> local.foo
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "local.pyx", line 35, in coro._coro.ThreadLocal.__getattr__
AttributeError: foo
>>> local.foo = 2
>>> local.foo
2

Now, in the original thread in the first example, imagine doing this:

>>> local.foo
1

Notice how the attribute stays the same value for that thread.

Tip: You can subclass ThreadLocal to add any logic you wish.

Note: This API is very similar to the one in Python (threading.local) with 1 important difference: __slots__ are not supported. Python’s implementation allows you to add attributes that are not thread-local by defining __slots__. This is not supported at this time.

1.6. Functions

The coro module defines the following functions:

coro.get_thread_by_id(thread_id)

Get a coro thread by ID.

Parameters:thread_id – The thread ID.
Returns:The coroutine object.
Raises KeyError:
 The coroutine does not exist.
coro.coro_is_running()

Determine if the coro event loop is running.

Returns:True if the event loop is running, otherwise False.
coro.event_loop(timeout=30)

Start the event loop.

Parameters:timeout – The amount of time to wait for kevent to return events. You should probably not set this value.
coro.new(fun, *args, **kwargs)

Create a new coroutine object.

Additional arguments and keyword arguments will be passed to the given function.

This will not start the coroutine. Call the start method on the coroutine to schedule it to run.

Parameters:
  • fun – The function to call when the coroutine starts.
  • thread_name – The name of the thread. Defaults to the name of the function.
Returns:

The new coroutine object.

coro.spawn(fun, *args, **kwargs)

Spawn a new coroutine.

Additional arguments and keyword arguments will be passed to the given function.

Parameters:
  • fun – The function to call when the coroutine starts.
  • thread_name – The name of the thread. Defaults to the name of the function.
Returns:

The new coroutine object.

coro.waitpid(pid)

Wait for a process to exit.

Parameters:pid – The process ID to wait for.
Returns:A tuple (pid, status) of the process.
Raises SimultaneousError:
 Something is already waiting for this process ID.
coro.yield_slice()

Yield to allow other threads to run.

This will yield to allow other threads to run. The coroutine will be rescheduled to run during the next pass through the event loop.

coro.schedule(coro co, value=None)

Schedule a coroutine to run.

See coro.schedule() for more detail.

Parameters:
  • co – The coroutine to schedule.
  • value – The value to resume the coroutine with. Defaults to None.
coro.current()

Return the current coroutine object.

coro.set_exit(exit_code=0)

Indicate that the event loop should exit.

Note that if any other coroutines are scheduled to run, they will be given a chance to run before exiting.

Parameters:exit_code – The exit code of the process. Defaults to 0.
coro.set_print_exit_string(val)

Set whether or not “Exiting” should be printed when the event loop exits.

By default, the string will be printed.

Parameters:val – Whether or not “Exiting” should be printed when the event loop exits.
coro.sleep_absolute(self, uint64_t when)

Sleep until a specific point in time.

Parameters:when – The TSC value when you want the coroutine to wake up.
coro.sleep_relative(delta)

Sleep for a period of time.

Parameters :
  • delta: The number of seconds to sleep.

1.7. Variables

coro.all_threads

A dictionary of all live coroutine objects. The key is the coroutine ID, and the value is the coroutine object.

1.8. Exceptions

The coro module defines the following exceptions:

exception coro.ScheduleError

Bases: exceptions.Exception

attempt to schedule an already-scheduled coroutine

exception coro.DeadCoroutine

Bases: coro._coro.ScheduleError

attempt to resume a dead coroutine

exception coro.ClosedError

Bases: exceptions.Exception

Another thread closed this descriptor.

exception coro.NotStartedError

Bases: coro._coro.ScheduleError

Attempted to interrupt a thread before it has started.

exception coro.TimeoutError

Bases: coro._coro.Interrupted

A call to with_timeout() has expired

exception coro.SimultaneousError

Bases: exceptions.Exception

Two threads attempted a conflicting blocking operation (e.g., read() on the same descriptor).

Variables:
  • co – The coroutine that is trying to block on an event.
  • other – The coroutine or function that is already waiting on the event.
  • event – The event that it is trying to block on. For kqueue, this is normally a kevent_key object.
exception coro.Shutdown

Bases: coro._coro.Interrupted

The thread is shutting down.

exception coro.WakeUp

Bases: exceptions.Exception

A convenience exception used to wake up a sleeping thread.