Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
560 views
in Technique[技术] by (71.8m points)

asynchronous - How to implement a Future or Stream that polls an async fn?

I have a struct Test I want to implement std::future::Future that would poll function:

use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

struct Test;

impl Test {
    async fn function(&mut self) {}
}

impl Future for Test {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.function() {
            Poll::Pending => Poll::Pending,
            Poll::Ready(_) => Poll::Ready(()),
        }
    }
}

That didn't work:

error[E0308]: mismatched types
  --> src/lib.rs:17:13
   |
10 |     async fn function(&mut self) {}
   |                                  - the `Output` of this `async fn`'s expected opaque type
...
17 |             Poll::Pending => Poll::Pending,
   |             ^^^^^^^^^^^^^ expected opaque type, found enum `Poll`
   |
   = note: expected opaque type `impl Future`
                     found enum `Poll<_>`

error[E0308]: mismatched types
  --> src/lib.rs:18:13
   |
10 |     async fn function(&mut self) {}
   |                                  - the `Output` of this `async fn`'s expected opaque type
...
18 |             Poll::Ready(_) => Poll::Ready(()),
   |             ^^^^^^^^^^^^^^ expected opaque type, found enum `Poll`
   |
   = note: expected opaque type `impl Future`
                     found enum `Poll<_>`

I understand that function must be called once, the returned Future must be stored somewhere in the struct, and then the saved future must be polled. I tried this:

struct Test(Option<Box<Pin<dyn Future<Output = ()>>>>);

impl Test {
    async fn function(&mut self) {}
    fn new() -> Self {
        let mut s = Self(None);
        s.0 = Some(Box::pin(s.function()));
        s
    }
}

That also didn't work:

error[E0277]: the size for values of type `(dyn Future<Output = ()> + 'static)` cannot be known at compilation time
   --> src/lib.rs:7:13
    |
7   | struct Test(Option<Box<Pin<dyn Future<Output = ()>>>>);
    |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ doesn't have a size known at compile-time
    |
    = help: the trait `Sized` is not implemented for `(dyn Future<Output = ()> + 'static)`

After I call function() I have taken a &mut reference of Test, because of that I can't change the Test variable, and therefore can't store the returned Future inside the Test.

I did get an unsafe solution (inspired by this)

struct Test<'a>(Option<BoxFuture<'a, ()>>);

impl Test<'_> {
    async fn function(&mut self) {
        println!("I'm alive!");
    }

    fn new() -> Self {
        let mut s = Self(None);
        s.0 = Some(unsafe { &mut *(&mut s as *mut Self) }.function().boxed());
        s
    }
}

impl Future for Test<'_> {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.0.as_mut().unwrap().poll_unpin(cx)
    }
}

I hope that there is another way.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Though there are times when you may want to do things similar to what you're trying to accomplish here, they are a rarity. So most people reading this, maybe even OP, may wish to restructure such that struct state and data used for a single async execution are different objects.

To answer your question, yes it is somewhat possible. Unless you want to absolutely resort to unsafe code you will need to use Mutex and Arc. All fields you wish to manipulate inside the async fn will have to be wrapped inside a Mutex and the function itself will accept an Arc<Self>.

I must stress, however, that this is not a beautiful solution and you probably don't want to do this. Depending on your specific case your solution may vary, but my guess of what OP is trying to accomplish while using Streams would be better solved by something similar to this gist that I wrote.

use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
};

struct Test {
    state: Mutex<Option<Pin<Box<dyn Future<Output = ()>>>>>,
    // if available use your async library's Mutex to `.await` locks on `buffer` instead
    buffer: Mutex<Vec<u8>>,
}

impl Test {
    async fn function(self: Arc<Self>) {
        for i in 0..16u8 {
            let data: Vec<u8> = vec![i]; // = fs::read(&format("file-{}.txt", i)).await.unwrap();
            let mut buflock = self.buffer.lock().unwrap();
            buflock.extend_from_slice(&data);
        }
    }
    pub fn new() -> Arc<Self> {
        let s = Arc::new(Self {
            state: Default::default(),
            buffer: Default::default(),
        });

        {
            // start by trying to aquire a lock to the Mutex of the Box
            let mut lock = s.state.lock().unwrap();
            // create boxed future
            let b = Box::pin(s.clone().function());
            // insert value into the mutex
            *lock = Some(b);
        } // block causes the lock to be released

        s
    }
}

impl Future for Test {
    type Output = ();
    fn poll(
        self: std::pin::Pin<&mut Self>,
        ctx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<<Self as std::future::Future>::Output> {
        let mut lock = self.state.lock().unwrap();
        let fut: &mut Pin<Box<dyn Future<Output = ()>>> = lock.as_mut().unwrap();
        Future::poll(fut.as_mut(), ctx)
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...