Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
441 views
in Technique[技术] by (71.8m points)

c++ - Proper cleanup with a suspended coroutine

I'm wondering what the best (cleanest, hardest to mess up) method for cleanup is in this situation.

void MyClass::do_stuff(boost::asio::yield_context context) {
  while (running_) {
    uint32_t data = async_buffer->Read(context);
    // do other stuff
  }
}

Read is a call which asynchronously waits until there is data to be read, then returns that data. If I want to delete this instance of MyClass, how can I make sure I do so properly? Let's say that the asynchronous wait here is performed via a deadline_timer's async_wait. If I cancel the event, I still have to wait for the thread to finish executing the "other stuff" before I know things are in a good state (I can't join the thread, as it's a thread that belongs to the io service that may also be handling other jobs). I could do something like this:

MyClass::~MyClass() {
  running_ = false;
  read_event->CancelEvent(); // some way to cancel the deadline_timer the Read is waiting on
  boost::mutex::scoped_lock lock(finished_mutex_);
  if (!finished_) {
    cond_.wait(lock);
  }
  // any other cleanup
}

void MyClass::do_stuff(boost::asio::yield_context context) {
  while (running_) {
    uint32_t data = async_buffer->Read(context);
    // do other stuff
  }
  boost::mutex::scoped_lock lock(finished_mutex_);
  finished_ = true;
  cond.notify();
}

But I'm hoping to make these stackful coroutines as easy to use as possible, and it's not straightforward for people to recognize that this condition exists and what would need to be done to make sure things are cleaned up properly. Is there a better way? Is what I'm trying to do here wrong at a more fundamental level?

Also, for the event (what I have is basically the same as Tanner's answer here) I need to cancel it in a way that I'd have to keep some extra state (a true cancel vs. the normal cancel used to fire the event) -- which wouldn't be appropriate if there were multiple pieces of logic waiting on that same event. Would love to hear if there's a better way to model the asynchronous event to be used with a coroutine suspend/resume.

Thanks.

EDIT: Thanks @Sehe, took a shot at a working example, I think this illustrates what I'm getting at:

class AsyncBuffer {
public:
  AsyncBuffer(boost::asio::io_service& io_service) :
  write_event_(io_service) {
    write_event_.expires_at(boost::posix_time::pos_infin);
  }

  void Write(uint32_t data) {
    buffer_.push_back(data);
    write_event_.cancel();
  }

  uint32_t Read(boost::asio::yield_context context) {
    if (buffer_.empty()) {
      write_event_.async_wait(context);
    }
    uint32_t data = buffer_.front();
    buffer_.pop_front();
    return data;
  }

protected:
  boost::asio::deadline_timer write_event_;
  std::list<uint32_t> buffer_;
};

class MyClass {
public:
  MyClass(boost::asio::io_service& io_service) :
      running_(false), io_service_(io_service), buffer_(io_service) {
  }

  void Run(boost::asio::yield_context context) {
    while (running_) {
      boost::system::error_code ec;
      uint32_t data = buffer_.Read(context[ec]);
      // do something with data
    }
  }

  void Write(uint32_t data) {
    buffer_.Write(data);
  }

  void Start() {
    running_ = true;
    boost::asio::spawn(io_service_, boost::bind(&MyClass::Run, this, _1));
  }

protected:
  boost::atomic_bool running_;
  boost::asio::io_service& io_service_;
  AsyncBuffer buffer_;
};

So here, let's say that the buffer is empty and MyClass::Run is currently suspended while making a call to Read, so there's a deadline_timer.async_wait that's waiting for the event to fire to resume that context. It's time to destroy this instance of MyClass, so how do we make sure that it gets done cleanly.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

A more typical approach would be to use boost::enable_shared_from_this with MyClass, and run the methods as bound to the shared pointer.

Boost Bind supports binding to boost::shared_ptr<MyClass> transparently.

This way, you can automatically have the destructor run only when the last user disappears.


If you create a SSCCE, I'm happy to change it around, to show what I mean.


UPDATE

To the SSCCEE: Some remarks:

  • I imagined a pool of threads running the IO service
  • The way in which MyClass calls into AsyncBuffer member functions directly is not threadsafe. There is actually no thread safe way to cancel the event outside the producer thread[1], since the producer already access the buffer for Writeing. This could be mitigated using a strand (in the current setup I don't see how MyClass would likely be threadsafe). Alternatively, look at the active object pattern (for which Tanner has an excellent answer[2] on SO).

    I chose the strand approach here, for simplicity, so we do:

    void MyClass::Write(uint32_t data) {
        strand_.post(boost::bind(&AsyncBuffer::Write, &buffer_, data));
    }
    
  • You ask

    Also, for the event (what I have is basically the same as Tanner's answer here) I need to cancel it in a way that I'd have to keep some extra state (a true cancel vs. the normal cancel used to fire the event)

    The most natural place for this state is the usual for the deadline_timer: it's deadline. Stopping the buffer is done by resetting the timer:

    void AsyncBuffer::Stop() { // not threadsafe!
        write_event_.expires_from_now(boost::posix_time::seconds(-1));
    }
    

    This at once cancels the timer, but is detectable because the deadline is in the past.

Here's a simple demo with a a group of IO service threads, one "producer coroutine" that produces random numbers and a "sniper thread" that snipes the MyClass::Run coroutine after 2 seconds. The main thread is the sniper thread.

See it Live On Coliru

#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/async_result.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/atomic.hpp>
#include <list>
#include <iostream>

// for refcounting:
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>

namespace asio = boost::asio;

class AsyncBuffer {
    friend class MyClass;
protected:
    AsyncBuffer(boost::asio::io_service &io_service) : write_event_(io_service) {
        write_event_.expires_at(boost::posix_time::pos_infin);
    }

    void Write(uint32_t data) {
        buffer_.push_back(data);
        write_event_.cancel();
    }

    uint32_t Read(boost::asio::yield_context context) {
        if (buffer_.empty()) {
            boost::system::error_code ec;
            write_event_.async_wait(context[ec]);

            if (ec != boost::asio::error::operation_aborted || write_event_.expires_from_now().is_negative())
            {
                if (context.ec_)
                    *context.ec_ = boost::asio::error::operation_aborted;
                return 0;
            }
        }

        uint32_t data = buffer_.front();
        buffer_.pop_front();
        return data;
    }

    void Stop() {
        write_event_.expires_from_now(boost::posix_time::seconds(-1));
    }

private:
    boost::asio::deadline_timer write_event_;
    std::list<uint32_t> buffer_;
};

class MyClass : public boost::enable_shared_from_this<MyClass> {
    boost::atomic_bool stopped_;
public:
    MyClass(boost::asio::io_service &io_service) : stopped_(false), buffer_(io_service), strand_(io_service) {}

    void Run(boost::asio::yield_context context) {
        while (!stopped_) {
            boost::system::error_code ec;

            uint32_t data = buffer_.Read(context[ec]);

            if (ec == boost::asio::error::operation_aborted)
                break;

            // do something with data
            std::cout << data << " " << std::flush;
        }
        std::cout << "EOF
";
    }

    bool Write(uint32_t data) { 
        if (!stopped_) {
            strand_.post(boost::bind(&AsyncBuffer::Write, &buffer_, data));
        }
        return !stopped_;
    }

    void Start() {
        if (!stopped_) {
            stopped_ = false;
            boost::asio::spawn(strand_, boost::bind(&MyClass::Run, shared_from_this(), _1));
        }
    }

    void Stop() {
        stopped_ = true;
        strand_.post(boost::bind(&AsyncBuffer::Stop, &buffer_));
    }

    ~MyClass() { 
        std::cout << "MyClass destructed because no coroutines hold a reference to it anymore
";
    }

protected:
    AsyncBuffer buffer_;
    boost::asio::strand strand_;
};

int main()
{
    boost::thread_group tg;
    asio::io_service svc;

    {
        // Start the consumer:
        auto instance = boost::make_shared<MyClass>(svc); 
        instance->Start();

        // Sniper in 2 seconds :)
        boost::thread([instance]{ 
                boost::this_thread::sleep_for(boost::chrono::seconds(2));
                instance->Stop();
                }).detach();

        // Start the producer:
        auto producer_coro = [instance, &svc](asio::yield_context c) { // a bound function/function object in C++03
            asio::deadline_timer tim(svc);

            while (instance->Write(rand())) {
                tim.expires_from_now(boost::posix_time::milliseconds(200));
                tim.async_wait(c);
            }
        };

        asio::spawn(svc, producer_coro);

        // Start the service threads:
        for(size_t i=0; i < boost::thread::hardware_concurrency(); ++i)
            tg.create_thread(boost::bind(&asio::io_service::run, &svc));
    }

    // now `instance` is out of scope, it will selfdestruct after the snipe
    // completed
    boost::this_thread::sleep_for(boost::chrono::seconds(3)); // wait longer than the snipe
    std::cout << "This is the main thread _after_ MyClass self-destructed correctly
";

    // cleanup service threads
    tg.join_all();
}

[1] logical thread, this could be a coroutine that gets resumed on different threads

[2] boost::asio and Active Object


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

57.0k users

...