Chaining Python futures
Update: I fixed a problem with the decorator, that leads to problems when calling a decorated function several times or in a nested way. I also shortened it a bit now.
One of Python 3.2’s new modules is concurrent.futures. With it’s executor and future abstractions, asynchronous, multi-threaded programming becomes less of a headache:
import time
from concurrent.futures import ThreadPoolExecutor
def read_and_block_for_a_while():
pass
def sleeps_a_bit(sleep_time):
time.sleep(sleep_time)
with ThreadPoolExecutor(max_workers=4) as executor:
future1 = executor.submit(read_and_block_for_a_while)
future2 = executor.submit(sleeps_a_bit, 1.5)
# Now, we can continue doing other stuff ...
# But at some point we need the result and we will block until the result is
# delivered.
data = future1.result()
As you can see, a future combines a thread of execution, the threads current state and methods to query and wait for results. You can pass it around as if it were a value and you can add callbacks to be notified when a result is ready – without ever missing the result as in message-based systems.
Now, passing futures around sounds like fun but becomes quite awkward to use.
This is because functions have to expect future arguments and call the
result()
method when they need an argument’s value. But most of the time we
want to write “normal” functions in which we don’t want to take care about
futures. Unfortunately, the futures API is modeled a little bit too close after
Java’s futures, which is why we have to get our hands a little bit dirty
with this promise decorator:
from concurrent.futures import Future, ThreadPoolExecutor
class promise(object):
executor = ThreadPoolExecutor(max_workers=100)
def __init__(self, func):
self.func = func
def resolve(self, *args, **kwargs):
resolved_args = []
resolved_kwargs = {}
for i, arg in enumerate(args):
if isinstance(arg, Future):
resolved_args.append(arg.result())
else:
resolved_args.append(arg)
for kw, arg in kwargs.items():
if isinstance(arg, Future):
resolved_kwargs[kw] = arg.result()
else:
resolved_kwargs[kw] = arg
return self.func(*resolved_args, **resolved_kwargs)
def __call__(self, *args, **kwargs):
return self.executor.submit(self.resolve, *args, **kwargs)
The decorator creates a new future that calls resolve()
. This method will call
.result()
on all regular and keyword arguments that are of future type 1.
When done, the wrapped function will be called with actual values.
Because a C++’s std::promise
works in a similar way, I also called the
decorator promise
. The following example shows how it is used for chaining:
@promise
def initial(x):
time.sleep(1.0)
return x
@promise
def add(x, y):
return x + y
future = add(1, initial(2))
# do something else ...
print(future.result())
Unfortunately, there are three caveats with this approach. First, the decorator
class keeps a reference to the executor. This makes it a bit uncomfortable when
using the with
statement:
with ThreadPoolExecutor(max_workers=2) as e:
promise.executor = e
...
Second, the promise decorator only works with a ThreadPoolExecutor
. As stated
in the documentation:
Calling Executor or Future methods from a callable submitted to a ProcessPoolExecutor will result in deadlock.
… which is exactly what we do. This is a little bit unfortunate, because the
Executor
interface does not restrict this kind of usage and I would expect all
implementations of an interface to behave the same.
Last but not least, we are serializing the calls by waiting for all input futures to be finished. That means, you won’t gain much from deep calls because they don’t run in parallel. On the other hand, you will see no difference in run-time if you compute
print(initial(1).result)
or
print(add(add(initial(1), initial(2)), initial(3)).result())
Both will take exactly one second.
I know that this is not Pythonic, but there is no other way to ensure that we are calling a future’s result()
method.