Topic: lock-free await in p0114r0 and p0057r0 (was


Author: Lewis Baker <lewis.baker@maptek.com.au>
Date: Mon, 19 Oct 2015 14:33:27 +1030
Raw View
--047d7bdca2f83f0b5a05226d3a71
Content-Type: text/plain; charset=UTF-8

I have some questions with regards to the resumable expressions proposal
P0114R0 and how to use the 'break resumable' statement to safely suspend a
consumer function and atomically schedule it for resumption in a lock-free
way.

I've been playing around with writing some lock-free implementations of
future<T> and async_generator<T> on top of P0057R0 and I've needed to
handle the case where the producer coroutine publishing a result is racing
with the consumer coroutine await using only atomic instructions.

The P0057R0 support for bool await_suspend(coroutine_handle<> handle) allows
me to run some code to schedule resumption of the coroutine after coroutine
state has been saved and is in a resumable state but before the
suspend-point is reached and execution is yielded back to the caller. There
is some discussion of this issue described in P0054R0 under the Defects
section.

Under P0057R0 an await expression (either explicit await or implicit await
in yield_value/initial_suspend/final_suspend) of the form:

await <expr>;

gets translated by the compiler into something equivalent to this:

[](coroutine_handle<promise_type> awaiter, auto&& awaitable) ->
decltype(auto)
{
  if (!awaitable.await_ready())
  {
    <save-state>
    if (awaitable.await_suspend(awaiter))
    {
      <suspend/resume-point>
    }
  }
  return awaitable.await_resume();
})(<coro-handle>, <expr>);


An example implementation of a basic lock-free future<T> class might have
the following:

Code run on producer side.

auto future<T>::promise_type::final_suspend()
{
  struct publish_result_and_suspend_if_future_exists
  {
    suspend_if_future_exists(promise_type& p) : _p(p) {}

    bool await_ready()
    {
      return _p.state.load(std::memory_order_acquire) == CONSUMER_DESTROYED;
    }

    bool await_suspend(coroutine_handle<> producer)
    {
      // Stash handle that consumer can use to destroy producer when
they're done.
      _p.state.producer = producer;

      // Atomically mark future as completed, publishing value, and
retrieve state of consumer.
      int oldState = _p.state.exchange(VALUE_READY,
std::memory_order_acq_rel);

      // Resume the consumer if it had already suspended.
      if (oldState == CONSUMER_WAITING)
      {
        _p.state.consumer.resume();
      }

      // And return true to suspend and keep this coroutine alive
      // if a consumer still holds the future<T>.
      return oldState != CONSUMER_DESTROYED;
    }

    void await_resume() {}
  }

  return publish_result_and_suspend_if_future_exists{ *this };
}

Code run on consumer side

bool future<T>::await_ready()
{
  return _promise->state.load(std::memory_order_acquire) == VALUE_READY;
}

bool future<T>::await_suspend(coroutine_handle<> handle)
{
  _promise->consumer = handle;

  // Attempt to transition from PRODUCER_ACTIVE to CONSUMER_WAITING.
  // If this succeeds then we suspended and producer has responsibility for
resuming us.
  // If this fails then it means that producer has published result and we
don't need to suspend.
  int oldState = PRODUCER_ACTIVE;
  return _promise->state.compare_exchange_strong(oldState,
CONSUMER_WAITING, std::memory_order_release, std::memory_order_acquire));
}

T&& future<T>::await_resume()
{
  return std::move(_promise->value);
}

Now consider the case where future<T>::await_suspend()  is context switched
out immediately after the atomic instruction has executed but before
await_suspend() has returned. The consumer coroutine handle has now been
published and can be resumed by the producer. Meanwhile another thread
executes the producer and completes the operation and runs final_suspend()
logic. It sees that the consumer coroutine has been suspended and is
awaiting the result and resumes executing the consumer coroutine at
<suspend/resume-point>. Later, the original thread that called
await_suspend() returns from await_suspend(), sees that it returned true
and yields execution of the coroutine at <suspend/resume-point>. So in this
case it's possible that the coroutine gets resumed at
<suspend/resume-point> on another thread before the suspending thread has
actually reached <suspend/resume-point>.

The fact that await_suspend() call occurs between <save-state> and
<suspend/resume-point> here is key because it means that it's safe for the
coroutine to be resumed prior to await_suspend() returning but it still
gives us an opportunity to run some code to schedule the resumption before
the coroutine yields execution back to the caller.

One problem that I foresee with P0114R0's 'break resumable' statement is
that it combines <save-state> and <suspend/resume-point> into an
indivisible operation which means that you need to come up with some means
to schedule resumption of coroutine after 'break resumable'.

In the example implementation for await() given in section 12.5 of P0114R0
it schedules resumption of the function prior to suspending it and then
relies on a mutex held by the run() loop to ensure function is not resumed
on another thread until after 'break resumable' statement is hit and
execution returns to run() loop which releases mutex.

template<class T>
resumable T await(future<T> f)
{
  waiter* this_waiter = waiter::active();
  assert(this_waiter != nullptr);

  future<T> result;

  // Scheduling this_waiter for resumption here
  f.then([w = this_waiter->shared_from_this(), &result](auto f)
         {
           result = std::move(f);
           w->resume();
         });

  // but not suspending it until here
  this_waiter->suspend();
  return result.get();
}

I'd be interested to see if someone can present a version of future<T> and
await() on top of P0114R0 that is lock-free for comparison.

I suspect that to make this lock-free would require registering some
callback with the top-level run() loop that acts as a trampoline function,
calling your callback on your behalf to reschedule you after you have
suspended. That top-level run() loop would need to use some kind of
indirect call (eg. virtual-dispatch) to call your callback since it would
need to handle potentially many different await calls for awaiting
different things (eg. future, io, async_generator). This indirect call may
incur some overhead on suspension that is not present in P0057R0 suspension.

As far as I can tell, the instructions generated in optimised builds using
P0057R0 from await of lock-free future<T> described above should be fairly
minimal as the await_ready/await_suspend logic can be inlined into
coroutine body. ie.
- Load promise state
- compare to VALUE_READY(2)
- jump to <resume-point> if equal
- <instructions-to-save-coroutine-state>   - should be equivalent for both
proposals.
- store coroutine-handle to promise
- atomic cmpxchg state from PRODUCER_ACTIVE(0) -> CONSUMER_WAITING(1)
- jump to <resume-point> if cmpxchg failed
- return to caller.
<resume-point>
- load address of return value from promise

--

---
You received this message because you are subscribed to the Google Groups "ISO C++ Standard - Future Proposals" group.
To unsubscribe from this group and stop receiving emails from it, send an email to std-proposals+unsubscribe@isocpp.org.
To post to this group, send email to std-proposals@isocpp.org.
Visit this group at http://groups.google.com/a/isocpp.org/group/std-proposals/.

--047d7bdca2f83f0b5a05226d3a71
Content-Type: text/html; charset=UTF-8
Content-Transfer-Encoding: quoted-printable

<div dir=3D"ltr"><div>I have some questions with regards to the resumable e=
xpressions proposal P0114R0 and how to use the &#39;break resumable&#39; st=
atement to safely suspend a consumer function and atomically schedule it fo=
r resumption in a lock-free way.<br></div><div><br></div><div>I&#39;ve been=
 playing around with writing some lock-free implementations of future&lt;T&=
gt; and async_generator&lt;T&gt; on top of P0057R0 and I&#39;ve needed to h=
andle the case where the producer coroutine publishing a result is racing w=
ith the consumer coroutine await using only atomic instructions.</div><div>=
<br></div><div>The P0057R0 support for <font face=3D"monospace, monospace">=
bool await_suspend(coroutine_handle&lt;&gt; handle)</font><font face=3D"ari=
al, helvetica, sans-serif">=C2=A0allows me to run some code to schedule res=
umption of the coroutine after coroutine state has been saved and is in a r=
esumable state but before the suspend-point is reached and execution is yie=
lded back to the caller. There is some discussion of this issue described i=
n P0054R0 under the Defects section.</font></div><div><font face=3D"arial, =
helvetica, sans-serif"><br></font></div><div><font face=3D"arial, helvetica=
, sans-serif">Under P0057R0 an await expression (either explicit await or i=
mplicit await in yield_value/initial_suspend/final_suspend) of the form:</f=
ont></div><div><font face=3D"arial, helvetica, sans-serif"><br></font></div=
><div><font face=3D"monospace, monospace">await &lt;expr&gt;;</font></div><=
div><br></div><div>gets translated by the compiler into something equivalen=
t to this:</div><div><font face=3D"arial, helvetica, sans-serif"><br></font=
></div><div><font face=3D"monospace, monospace">[](coroutine_handle&lt;prom=
ise_type&gt; awaiter, auto&amp;&amp; awaitable) -&gt; decltype(auto)</font>=
</div><div><font face=3D"monospace, monospace">{</font></div><div><font fac=
e=3D"monospace, monospace">=C2=A0 if (!awaitable.await_ready())</font></div=
><div><font face=3D"monospace, monospace">=C2=A0 {</font></div><div><font f=
ace=3D"monospace, monospace">=C2=A0 =C2=A0 &lt;save-state&gt;</font></div><=
div><font face=3D"monospace, monospace">=C2=A0 =C2=A0 if (awaitable.await_s=
uspend(awaiter))</font></div><div><font face=3D"monospace, monospace">=C2=
=A0 =C2=A0 {</font></div><div><font face=3D"monospace, monospace">=C2=A0 =
=C2=A0 =C2=A0 &lt;suspend/resume-point&gt;</font></div><div><font face=3D"m=
onospace, monospace">=C2=A0 =C2=A0 }</font></div><div><font face=3D"monospa=
ce, monospace">=C2=A0 }</font></div><div><font face=3D"monospace, monospace=
">=C2=A0 return awaitable.await_resume();<br></font></div><div><font face=
=3D"monospace, monospace">})(&lt;coro-handle&gt;, &lt;expr&gt;);</font></di=
v><div><br></div><div><br></div><div>An example implementation of a basic l=
ock-free future&lt;T&gt; class might have the following:<br></div><div><br>=
</div><div>Code run on producer side.</div><div><br></div><div><span style=
=3D"font-family:monospace,monospace">auto future&lt;T&gt;::promise_type::</=
span><span style=3D"font-family:monospace,monospace">final_suspend()</span>=
<br></div><div><font face=3D"monospace, monospace">{</font></div><div><font=
 face=3D"monospace, monospace">=C2=A0 struct publish_result_and_suspend_if_=
future_exists</font></div><div><font face=3D"monospace, monospace">=C2=A0 {=
</font></div><div><font face=3D"monospace, monospace">=C2=A0 =C2=A0 suspend=
_if_future_exists(promise_type&amp; p) : _p(p) {}</font></div><div><font fa=
ce=3D"monospace, monospace"><br></font></div><div><font face=3D"monospace, =
monospace">=C2=A0 =C2=A0 bool await_ready()</font></div><div><font face=3D"=
monospace, monospace">=C2=A0 =C2=A0 {</font></div><div><font face=3D"monosp=
ace, monospace">=C2=A0 =C2=A0 =C2=A0 return _p.state.load(std::memory_order=
_acquire) =3D=3D CONSUMER_DESTROYED;</font></div><div><font face=3D"monospa=
ce, monospace">=C2=A0 =C2=A0 }</font></div><div><font face=3D"monospace, mo=
nospace"><br></font></div><div><font face=3D"monospace, monospace">=C2=A0 =
=C2=A0 bool await_suspend(coroutine_handle&lt;&gt; producer)</font></div><d=
iv><font face=3D"monospace, monospace">=C2=A0 =C2=A0 {</font></div><div><fo=
nt face=3D"monospace, monospace">=C2=A0 =C2=A0 =C2=A0 // Stash handle that =
consumer can use to destroy producer when they&#39;re done.</font></div><di=
v><font face=3D"monospace, monospace">=C2=A0 =C2=A0 =C2=A0 _p.state.produce=
r =3D producer;</font></div><div><font face=3D"monospace, monospace"><br></=
font></div><div><font face=3D"monospace, monospace">=C2=A0 =C2=A0 =C2=A0 //=
 Atomically mark future as completed, publishing value, and retrieve state =
of consumer.</font></div><div><font face=3D"monospace, monospace">=C2=A0 =
=C2=A0 =C2=A0 int oldState =3D _p.state.exchange(VALUE_READY, std::memory_o=
rder_acq_rel);</font></div><div><font face=3D"monospace, monospace"><br></f=
ont></div><div><font face=3D"monospace, monospace">=C2=A0 =C2=A0 =C2=A0 // =
Resume the consumer if it had already suspended.</font></div><div><font fac=
e=3D"monospace, monospace">=C2=A0 =C2=A0 =C2=A0 if (oldState =3D=3D CONSUME=
R_WAITING)</font></div><div><font face=3D"monospace, monospace">=C2=A0 =C2=
=A0 =C2=A0 {</font></div><div><font face=3D"monospace, monospace">=C2=A0 =
=C2=A0 =C2=A0 =C2=A0 _p.state.consumer.resume();</font></div><div><font fac=
e=3D"monospace, monospace">=C2=A0 =C2=A0 =C2=A0 }</font></div><div><font fa=
ce=3D"monospace, monospace"><br></font></div><div><font face=3D"monospace, =
monospace">=C2=A0 =C2=A0 =C2=A0 // And return true to suspend and keep this=
 coroutine alive</font></div><div><font face=3D"monospace, monospace">=C2=
=A0 =C2=A0 =C2=A0 // if a consumer still holds the future&lt;T&gt;.</font><=
/div><div><font face=3D"monospace, monospace">=C2=A0 =C2=A0 =C2=A0 return o=
ldState !=3D CONSUMER_DESTROYED;</font></div><div><font face=3D"monospace, =
monospace">=C2=A0 =C2=A0 }<br></font></div><div><font face=3D"monospace, mo=
nospace"><br></font></div><div><font face=3D"monospace, monospace">=C2=A0 =
=C2=A0 void await_resume() {}</font></div><div><font face=3D"monospace, mon=
ospace">=C2=A0 }</font></div><div><font face=3D"monospace, monospace"><br><=
/font></div><div><font face=3D"monospace, monospace">=C2=A0 return publish_=
result_and_suspend_if_future_exists{ *this };</font></div><div><font face=
=3D"monospace, monospace">}</font></div><div><br></div><div>Code run on con=
sumer side</div><div><br></div><div><font face=3D"monospace, monospace">boo=
l future&lt;T&gt;::await_ready()</font></div><div><font face=3D"monospace, =
monospace">{</font></div><div><font face=3D"monospace, monospace">=C2=A0 re=
turn _promise-&gt;state.load(std::memory_order_acquire) =3D=3D VALUE_READY;=
</font></div><div><font face=3D"monospace, monospace">}</font></div><div><f=
ont face=3D"monospace, monospace"><br></font></div><div><font face=3D"monos=
pace, monospace">bool future&lt;T&gt;::await_suspend(coroutine_handle&lt;&g=
t; handle)</font></div><div><font face=3D"monospace, monospace">{</font></d=
iv><div><font face=3D"monospace, monospace">=C2=A0 _promise-&gt;consumer =
=3D handle;</font></div><div><font face=3D"monospace, monospace"><br></font=
></div><div><font face=3D"monospace, monospace">=C2=A0 // Attempt to transi=
tion from PRODUCER_ACTIVE to CONSUMER_WAITING.</font></div><div><font face=
=3D"monospace, monospace">=C2=A0 // If this succeeds then we suspended and =
producer has responsibility for resuming us.</font></div><div><font face=3D=
"monospace, monospace">=C2=A0 // If this fails then it means that producer =
has published result and we don&#39;t need to suspend.</font></div><div><sp=
an style=3D"font-family:monospace,monospace">=C2=A0 int oldState =3D PRODUC=
ER_ACTIVE;</span><br></div><div><font face=3D"monospace, monospace">=C2=A0 =
return _promise-&gt;state.compare_exchange_strong(oldState, CONSUMER_WAITIN=
G, std::memory_order_release, std::memory_order_acquire));<br></font></div>=
<div><span style=3D"font-family:monospace,monospace">}</span><br></div><div=
><span style=3D"font-family:monospace,monospace"><br></span></div><div><spa=
n style=3D"font-family:monospace,monospace">T&amp;&amp; future&lt;T&gt;::aw=
ait_resume()</span></div><div><span style=3D"font-family:monospace,monospac=
e">{</span></div><div><span style=3D"font-family:monospace,monospace">=C2=
=A0 return std::move(_promise-&gt;value);</span></div><div><span style=3D"f=
ont-family:monospace,monospace">}</span></div><div><br></div><div>Now consi=
der the case where future&lt;T&gt;::await_suspend() =C2=A0is context switch=
ed out immediately after the atomic instruction has executed but before awa=
it_suspend() has returned. The consumer coroutine handle has now been publi=
shed and can be resumed by the producer. Meanwhile another thread executes =
the producer and completes the operation and runs final_suspend() logic. It=
 sees that the consumer coroutine has been suspended and is awaiting the re=
sult and resumes executing the consumer coroutine at &lt;suspend/resume-poi=
nt&gt;. Later, the original thread that called await_suspend() returns from=
 await_suspend(), sees that it returned true and yields execution of the co=
routine at &lt;suspend/resume-point&gt;. So in this case it&#39;s possible =
that the coroutine gets resumed at &lt;suspend/resume-point&gt; on another =
thread before the suspending thread has actually reached &lt;suspend/resume=
-point&gt;.<br></div><div><br></div><div>The fact that await_suspend() call=
 occurs between &lt;save-state&gt; and &lt;suspend/resume-point&gt; here is=
 key because it means that it&#39;s safe for the coroutine to be resumed pr=
ior to await_suspend() returning but it still gives us an opportunity to ru=
n some code to schedule the resumption before the coroutine yields executio=
n back to the caller.</div><div><br></div><div>One problem that I foresee w=
ith P0114R0&#39;s &#39;break resumable&#39; statement is that it combines &=
lt;save-state&gt; and &lt;suspend/resume-point&gt; into an indivisible oper=
ation which means that you need to come up with some means to schedule resu=
mption of coroutine after &#39;break resumable&#39;.</div><div><br></div><d=
iv>In the example implementation for await() given in section 12.5 of P0114=
R0 it schedules resumption of the function prior to suspending it and then =
relies on a mutex held by the run() loop to ensure function is not resumed =
on another thread until after &#39;break resumable&#39; statement is hit an=
d execution returns to run() loop which releases mutex.</div><div><br></div=
><div><font face=3D"monospace, monospace">template&lt;class T&gt;</font></d=
iv><div><font face=3D"monospace, monospace">resumable T await(future&lt;T&g=
t; f)</font></div><div><font face=3D"monospace, monospace">{</font></div><d=
iv><font face=3D"monospace, monospace">=C2=A0 waiter* this_waiter =3D waite=
r::active();</font></div><div><font face=3D"monospace, monospace">=C2=A0 as=
sert(this_waiter !=3D nullptr);</font></div><div><font face=3D"monospace, m=
onospace"><br></font></div><div><font face=3D"monospace, monospace">=C2=A0 =
future&lt;T&gt; result;</font></div><div><font face=3D"monospace, monospace=
"><br></font></div><div><font face=3D"monospace, monospace">=C2=A0 // Sched=
uling this_waiter for resumption here</font></div><div><font face=3D"monosp=
ace, monospace">=C2=A0 f.then([w =3D this_waiter-&gt;shared_from_this(),
          &amp;result](auto f)</font></div><div><font face=3D"monospace, mo=
nospace">=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0{</font></div><div><font face=3D=
"monospace, monospace">=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0result =3D =
std::move(f);</font></div><div><font face=3D"monospace, monospace">=C2=A0 =
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0w-&gt;resume();</font></div><div><font fa=
ce=3D"monospace, monospace">=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0});</font></d=
iv><div><font face=3D"monospace, monospace"><br></font></div><div><font fac=
e=3D"monospace, monospace">=C2=A0 // but not suspending it until here</font=
></div><div><font face=3D"monospace, monospace">=C2=A0 this_waiter-&gt;susp=
end();</font></div><div><font face=3D"monospace, monospace">=C2=A0 return r=
esult.get();</font></div><div><font face=3D"monospace, monospace">}</font><=
br></div><div><br></div><div>I&#39;d be interested to see if someone can pr=
esent a version of future&lt;T&gt; and await() on top of P0114R0 that is lo=
ck-free for comparison.<br></div><div><br></div><div>I suspect that to make=
 this lock-free would require registering some callback with the top-level =
run() loop that acts as a trampoline function, calling your callback on you=
r behalf to reschedule you after you have suspended. That top-level run() l=
oop would need to use some kind of indirect call (eg. virtual-dispatch) to =
call your callback since it would need to handle potentially many different=
 await calls for awaiting different things (eg. future, io, async_generator=
). This indirect call may incur some overhead on suspension that is not pre=
sent in P0057R0 suspension.</div><div><br></div><div>As far as I can tell, =
the instructions generated in optimised builds using P0057R0 from await of =
lock-free future&lt;T&gt; described above should be fairly minimal as the a=
wait_ready/await_suspend logic can be inlined into coroutine body. ie.</div=
><div>- Load promise state</div><div>- compare to VALUE_READY(2)</div><div>=
- jump to &lt;resume-point&gt; if equal</div><div>- &lt;instructions-to-sav=
e-coroutine-state&gt; =C2=A0 - should be equivalent for both proposals.</di=
v><div>- store coroutine-handle to promise</div><div>- atomic cmpxchg state=
 from PRODUCER_ACTIVE(0) -&gt; CONSUMER_WAITING(1)</div><div>- jump to &lt;=
resume-point&gt; if cmpxchg failed</div><div>- return to caller.</div><div>=
&lt;resume-point&gt;</div><div>- load address of return value from promise<=
/div></div>

<p></p>

-- <br />
<br />
--- <br />
You received this message because you are subscribed to the Google Groups &=
quot;ISO C++ Standard - Future Proposals&quot; group.<br />
To unsubscribe from this group and stop receiving emails from it, send an e=
mail to <a href=3D"mailto:std-proposals+unsubscribe@isocpp.org">std-proposa=
ls+unsubscribe@isocpp.org</a>.<br />
To post to this group, send email to <a href=3D"mailto:std-proposals@isocpp=
..org">std-proposals@isocpp.org</a>.<br />
Visit this group at <a href=3D"http://groups.google.com/a/isocpp.org/group/=
std-proposals/">http://groups.google.com/a/isocpp.org/group/std-proposals/<=
/a>.<br />

--047d7bdca2f83f0b5a05226d3a71--

.