Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
476 views
in Technique[技术] by (71.8m points)

iterator - How do I iterate over a Vec of functions returning Futures in Rust?

Is it possible to loop over a Vec, calling a method that returns a Future on each, and build a chain of Futures, to be evaluated (eventually) by the consumer? Whether to execute the later Futures would depend on the outcome of the earlier Futures in the Vec.

To clarify:

I'm working on an application that can fetch data from an arbitrary set of upstream sources.

Requesting data would check with each of the sources, in turn. If the first source had an error (Err), or did not have the data available (None), then the second source would be tried, and so on.

Each source should be tried exactly once, and no source should be tried until all of the sources before have returned their results. Errors are logged, but otherwise ignored, passing the query to the next upstream data source.

I have some working code that does this for fetching metadata:

/// Attempts to read/write data to various external sources. These are
/// nested types, because a data source may exist as both a reader and a writer
struct StoreManager {
    /// Upstream data sources
    readers: Vec<Rc<RefCell<StoreRead>>>,
    /// Downstream data sinks
    writers: Vec<Rc<RefCell<StoreWrite>>>,
}

impl StoreRead for StoreManager {
    fn metadata(self: &Self, id: &Identifier) -> Box<Future<Option<Metadata>, Error>> {
       Box::new(ok(self.readers
            .iter()
            .map(|store| {
                executor::block_on(store.borrow().metadata(id)).unwrap_or_else(|err| {
                    error!("Error on metadata(): {:?}", err);
                    None
                })
            })
            .find(Option::is_some)
            .unwrap_or(None)))
    }
}

Aside from my unhappiness with all of the Box and Rc/RefCell nonsense, my real concern is with the executor::block_on() call. It blocks, waiting for each Future to return a result, before continuing to the next.

Given that it's possible to call fn_returning_future().or_else(|_| other_fn()) and so on, is it possible to build up a dynamic chain like this? Or is it a requirement to fully evaluate each Future in the iterator before moving to the next?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You can use stream::unfold to convert a single value into a stream. In this case, we can use the IntoIter iterator as that single value.

use futures::{executor, stream, Stream, TryStreamExt}; // 0.3.4

type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn network_request(val: i32) -> Result<i32> {
    // Just for demonstration, don't do this in a real program
    use std::{
        thread,
        time::{Duration, Instant},
    };
    thread::sleep(Duration::from_secs(1));
    println!("Resolving {} at {:?}", val, Instant::now());

    Ok(val * 100)
}

fn requests_in_sequence(vals: Vec<i32>) -> impl Stream<Item = Result<i32>> {
    stream::unfold(vals.into_iter(), |mut vals| async {
        let val = vals.next()?;
        let response = network_request(val).await;
        Some((response, vals))
    })
}

fn main() {
    let s = requests_in_sequence(vec![1, 2, 3]);
    executor::block_on(async {
        s.try_for_each(|v| async move {
            println!("-> {}", v);
            Ok(())
        })
        .await
        .expect("An error occurred");
    });
}
Resolving 1 at Instant { tv_sec: 6223328, tv_nsec: 294631597 }
-> 100
Resolving 2 at Instant { tv_sec: 6223329, tv_nsec: 310839993 }
-> 200
Resolving 3 at Instant { tv_sec: 6223330, tv_nsec: 311005834 }
-> 300

To ignore Err and None, you have to shuttle the Error over to the Item, making the Item type a Result<Option<T>, Error>:

use futures::{executor, stream, Stream, StreamExt}; // 0.3.4

type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn network_request(val: i32) -> Result<Option<i32>> {
    // Just for demonstration, don't do this in a real program
    use std::{
        thread,
        time::{Duration, Instant},
    };
    thread::sleep(Duration::from_secs(1));
    println!("Resolving {} at {:?}", val, Instant::now());

    match val {
        1 => Err("boom".into()),  // An error
        2 => Ok(None),            // No data
        _ => Ok(Some(val * 100)), // Success
    }
}

fn requests_in_sequence(vals: Vec<i32>) -> impl Stream<Item = Result<Option<i32>>> {
    stream::unfold(vals.into_iter(), |mut vals| async {
        let val = vals.next()?;
        let response = network_request(val).await;
        Some((response, vals))
    })
}

fn main() {
    executor::block_on(async {
        let s = requests_in_sequence(vec![1, 2, 3]);

        let s = s.filter_map(|v| async move { v.ok() });
        let s = s.filter_map(|v| async move { v });
        let mut s = s.boxed_local();

        match s.next().await {
            Some(v) => println!("First success: {}", v),
            None => println!("No successful requests"),
        }
    });
}
Resolving 1 at Instant { tv_sec: 6224229, tv_nsec: 727216392 }
Resolving 2 at Instant { tv_sec: 6224230, tv_nsec: 727404752 }
Resolving 3 at Instant { tv_sec: 6224231, tv_nsec: 727593740 }
First success: 300

is it possible to build up a dynamic chain like this

Yes, by leveraging async functions:

use futures::executor; // 0.3.4

type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn network_request(val: i32) -> Result<Option<i32>> {
    // Just for demonstration, don't do this in a real program
    use std::{
        thread,
        time::{Duration, Instant},
    };
    thread::sleep(Duration::from_secs(1));
    println!("Resolving {} at {:?}", val, Instant::now());

    match val {
        1 => Err("boom".into()),  // An error
        2 => Ok(None),            // No data
        _ => Ok(Some(val * 100)), // Success
    }
}

async fn requests_in_sequence(vals: Vec<i32>) -> Result<i32> {
    let mut vals = vals.into_iter().peekable();

    while let Some(v) = vals.next() {
        match network_request(v).await {
            Ok(Some(v)) => return Ok(v),
            Err(e) if vals.peek().is_none() => return Err(e),
            Ok(None) | Err(_) => { /* Do nothing and try the next source */ }
        }
    }

    Err("Ran out of sources".into())
}

fn main() {
    executor::block_on(async {
        match requests_in_sequence(vec![1, 2, 3]).await {
            Ok(v) => println!("First success: {}", v),
            Err(e) => println!("No successful requests: {}", e),
        }
    });
}

See also:


is it a requirement to fully evaluate each Future in the iterator before moving to the next

Isn't that part of your own requirements? Emphasis mine:

Requesting data would check with each of the sources, in turn. If the first source had an error (Err), or did not have the data available (None), then the second source would be tried


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...