Bloerg
       

Chaining Python futures

Update: I fixed a problem with the decorator, that leads to problems when calling a decorated function several times or in a nested way. I also shortened it a bit now.

One of Python 3.2’s new modules is concurrent.futures. With it’s executor and future abstractions, asynchronous, multi-threaded programming becomes less of a headache:

import time
from concurrent.futures import ThreadPoolExecutor

def read_and_block_for_a_while():
    pass

def sleeps_a_bit(sleep_time):
    time.sleep(sleep_time)

with ThreadPoolExecutor(max_workers=4) as executor:
    future1 = executor.submit(read_and_block_for_a_while)
    future2 = executor.submit(sleeps_a_bit, 1.5)

    # Now, we can continue doing other stuff ...
    
    # But at some point we need the result and we will block until the result is
    # delivered.
    data = future1.result()

As you can see, a future combines a thread of execution, the threads current state and methods to query and wait for results. You can pass it around as if it were a value and you can add callbacks to be notified when a result is ready – without ever missing the result as in message-based systems.

Now, passing futures around sounds like fun but becomes quite awkward to use. This is because functions have to expect future arguments and call the result() method when they need an argument’s value. But most of the time we want to write “normal” functions in which we don’t want to take care about futures. Unfortunately, the futures API is modeled a little bit too close after Java’s futures, which is why we have to get our hands a little bit dirty with this promise decorator:

from concurrent.futures import Future, ThreadPoolExecutor


class promise(object):

    executor = ThreadPoolExecutor(max_workers=100)

    def __init__(self, func):
        self.func = func

    def resolve(self, *args, **kwargs):
        resolved_args = []
        resolved_kwargs = {}

        for i, arg in enumerate(args):
            if isinstance(arg, Future):
                resolved_args.append(arg.result())
            else:
                resolved_args.append(arg)

        for kw, arg in kwargs.items():
            if isinstance(arg, Future):
                resolved_kwargs[kw] = arg.result()
            else:
                resolved_kwargs[kw] = arg

        return self.func(*resolved_args, **resolved_kwargs)

    def __call__(self, *args, **kwargs):
        return self.executor.submit(self.resolve, *args, **kwargs)

The decorator creates a new future that calls resolve(). This method will call .result() on all regular and keyword arguments that are of future type 1. When done, the wrapped function will be called with actual values.

Because a C++’s std::promise works in a similar way, I also called the decorator promise. The following example shows how it is used for chaining:

@promise
def initial(x):
    time.sleep(1.0)
    return x

@promise
def add(x, y):
    return x + y

future = add(1, initial(2))
# do something else ...
print(future.result())

Unfortunately, there are three caveats with this approach. First, the decorator class keeps a reference to the executor. This makes it a bit uncomfortable when using the with statement:

with ThreadPoolExecutor(max_workers=2) as e:
    promise.executor = e
    ...

Second, the promise decorator only works with a ThreadPoolExecutor. As stated in the documentation:

Calling Executor or Future methods from a callable submitted to a ProcessPoolExecutor will result in deadlock.

… which is exactly what we do. This is a little bit unfortunate, because the Executor interface does not restrict this kind of usage and I would expect all implementations of an interface to behave the same.

Last but not least, we are serializing the calls by waiting for all input futures to be finished. That means, you won’t gain much from deep calls because they don’t run in parallel. On the other hand, you will see no difference in run-time if you compute

print(initial(1).result)

or

print(add(add(initial(1), initial(2)), initial(3)).result())

Both will take exactly one second.

  1. I know that this is not Pythonic, but there is no other way to ensure that we are calling a future’s result() method.

Discussion

Sergey
Thu, Feb 06 2014

Thanks for your promise observation!

You have litle mistake here:

future = add(1, initial(/*some int should be there*/))
Matthias
Thu, Feb 06 2014

Thanks, and yes you are right. I will fix that right away.

Daniel Dotsenko
Mon, Feb 08 2016

This article comes up when I search for “Python Futures chaining” but what I was looking for is a “thenable” implementation on top of concurrent.futures.Future inline with JavaScript’s Promise/A+ standard. Specifically, was interested in code that auto-resolves recursive Promises chains.

Seeing that you went a different way, and not seeing “thenable” on top of PEP Future anywhere else, unfortunately, had to extend Future myself: Since you obviously spent good amount of time on your solution and might be able to spot issues in others was wondering what you think: concurrent.futures.Future extended to support Promises/A+ .then(success, error) API

Matthias
Mon, Feb 08 2016

Hi Daniel, thanks for the pointer. Your implementation looks pretty clean and the API itself useful. It’s actually useful enough to be part of the standard Future interface, so maybe you could convince the Python developers to consider it being officially supported?

Post a comment

Name required

E-mail required, not published

Website optional

Comment Markdown accepted


This post might also have some comments at Google+.