Chaining Python futures

Update: I fixed a problem with the decorator, that leads to problems when calling a decorated function several times or in a nested way. I also shortened it a bit now.

One of Python 3.2’s new modules is concurrent.futures. With it’s executor and future abstractions, asynchronous, multi-threaded programming becomes less of a headache:

import time
from concurrent.futures import ThreadPoolExecutor

def read_and_block_for_a_while():
    pass

def sleeps_a_bit(sleep_time):
    time.sleep(sleep_time)

with ThreadPoolExecutor(max_workers=4) as executor:
    future1 = executor.submit(read_and_block_for_a_while)
    future2 = executor.submit(sleeps_a_bit, 1.5)

    # Now, we can continue doing other stuff ...
    
    # But at some point we need the result and we will block until the result is
    # delivered.
    data = future1.result()

As you can see, a future combines a thread of execution, the threads current state and methods to query and wait for results. You can pass it around as if it were a value and you can add callbacks to be notified when a result is ready – without ever missing the result as in message-based systems.

Now, passing futures around sounds like fun but becomes quite awkward to use. This is because functions have to expect future arguments and call the result() method when they need an argument’s value. But most of the time we want to write “normal” functions in which we don’t want to take care about futures. Unfortunately, the futures API is modeled a little bit too close after Java’s futures, which is why we have to get our hands a little bit dirty with this promise decorator:

from concurrent.futures import Future, ThreadPoolExecutor


class promise(object):

    executor = ThreadPoolExecutor(max_workers=100)

    def __init__(self, func):
        self.func = func

    def resolve(self, *args, **kwargs):
        resolved_args = []
        resolved_kwargs = {}

        for i, arg in enumerate(args):
            if isinstance(arg, Future):
                resolved_args.append(arg.result())
            else:
                resolved_args.append(arg)

        for kw, arg in kwargs.items():
            if isinstance(arg, Future):
                resolved_kwargs[kw] = arg.result()
            else:
                resolved_kwargs[kw] = arg

        return self.func(*resolved_args, **resolved_kwargs)

    def __call__(self, *args, **kwargs):
        return self.executor.submit(self.resolve, *args, **kwargs)

The decorator creates a new future that calls resolve(). This method will call .result() on all regular and keyword arguments that are of future type 1. When done, the wrapped function will be called with actual values.

Because a C++’s std::promise works in a similar way, I also called the decorator promise. The following example shows how it is used for chaining:

@promise
def initial(x):
    time.sleep(1.0)
    return x

@promise
def add(x, y):
    return x + y

future = add(1, initial(2))
# do something else ...
print(future.result())

Unfortunately, there are three caveats with this approach. First, the decorator class keeps a reference to the executor. This makes it a bit uncomfortable when using the with statement:

with ThreadPoolExecutor(max_workers=2) as e:
    promise.executor = e
    ...

Second, the promise decorator only works with a ThreadPoolExecutor. As stated in the documentation:

Calling Executor or Future methods from a callable submitted to a ProcessPoolExecutor will result in deadlock.

… which is exactly what we do. This is a little bit unfortunate, because the Executor interface does not restrict this kind of usage and I would expect all implementations of an interface to behave the same.

Last but not least, we are serializing the calls by waiting for all input futures to be finished. That means, you won’t gain much from deep calls because they don’t run in parallel. On the other hand, you will see no difference in run-time if you compute

print(initial(1).result)

or

print(add(add(initial(1), initial(2)), initial(3)).result())

Both will take exactly one second.

1

I know that this is not Pythonic, but there is no other way to ensure that we are calling a future’s result() method.