Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
274 views
in Technique[技术] by (71.8m points)

c++11 - How to read data from Internet using muli-threading with connecting only once?

I'm building a tiny muti-threading download program using boost::asio::ip::tcp. I need each thread to deal with a part of data. I know it can solve the problem by adding "Range:bytes:xx-xx" to the request header. But I don't want to let the program connect to the server so many times. Is there any solution?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Just read it and dispatch to the worker threads when appropriate.

Having no clue what kind of chunks you want to separately handle, let's assume you read all of the prime numbers from https://www.mathsisfun.com/includes/primes-to-100k.zip, read them in chunks, then do some work on all the primes on separate threads.

What Is The Work?

Here's some lazy prime job:

void handle_batch(std::vector<size_t> params) {
    if (!params.empty()) {
        std::cout
            << "Batch n:" << params.size()
            << "Range [" << params.front() << ".." << params.back() << "]"
            << "Sum:" << std::accumulate(begin(params), end(params), 0ull)
            << std::endl;
    }
}

Yeah, we just print a description of the job params and their sum. We can doodle a bit on it to make it more lifelike, like making it take some time, and being aware that we are on worker threads, so we want to synchronize access to the console.

void handle_batch(std::vector<size_t> params) {
    std::mutex s_mx;

    if (!params.empty()) {
        // emulate some work, because I'm lazy
        auto sum = std::accumulate(begin(params), end(params), 0ull);
        // then wait some 100..200ms
        {
            using namespace std::chrono_literals;
            std::mt19937 prng(std::random_device{}());
            std::this_thread::sleep_for(
                std::uniform_real_distribution<>(100,200)(prng)*1ms);
        }

        // simple thread id (thread::id displays ugly)
        auto tid = std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100;

        // report results to stdout
        std::lock_guard lk(s_mx); // make sure the output doesn't intermix
        std::cout
            << "Thread #" << std::setw(2) << std::setfill('0') << tid
            << " Batch n:" << params.size()
            << "Range [" << params.front() << ".." << params.back() << "]"
            << "Sum:" << sum
            << std::endl;
    }
}

Okay, that's enough gore for the unimportant bits.

The Plan

Well, there's a slight complication with my chosen approach, because not only is that site using https (ugh), also it is serving up ZIP files (ugh). And we're using C++ (ugh?).

At least, we can do the the whole SSL connect business synchronously in not too much code, we want the reading to be asynchronously, though, because that way we can demonstrate that

  • you can do a lot of intermixed IO on just the main thread using Boost Asio
  • same goes for Boost Process to launch zcat as a child process to unzip the primes content (we'll assume UNIX-like system with zcat installed)
  • which means we'll be asynchronously writing to that child process stdin
  • and also asynchronously reading from its stdout
  • spawning off batch jobs along the way as soon as they're ready

This should be pretty good model for your workload, because the workers take more time than the IO, however, we do many IO tasks on a single thread without blocking.

Let's Get The Data

As said, we will use a single thread for IO, and a thread pool for the batch workers:

int main() {
    net::io_context io; // main thread does all io
    net::thread_pool pool(6); // worker threads

There. That's a start. Now, we want to have a SSL connection, and request that ZIP. Here it is:

http::response_parser<http::buffer_body> res_reader;
beast::flat_buffer lookahead; // for the res_reader
std::array<char,512> buf{0}; // for download content
auto ctx = ssl_context();
ssl::stream<tcp::socket> s(io, ctx);

{   // synchronously write request
    std::string host = "www.mathsisfun.com";
    connect_https(s, host, tcp::resolver{io}.resolve(host, "https"));
    http::write(s, get_request(host, "/includes/primes-to-100k.zip"));

    http::read_header(s, lookahead, res_reader);
    //std::cerr << "Headers: " << res_reader.get().base() << std::endl;
}

Yup, that already did the reading of the response headers1. Of course we cheated because we need three helpers:

  1. making an ssl context

    auto ssl_context() {
        ssl::context ctx{ssl::context::sslv23};
        ctx.set_default_verify_paths();
        ctx.set_verify_mode(ssl::verify_peer);
        return ctx;
    }
    
  2. connecting over SSL

    void connect_https(stream& s, std::string const& host, tcp::resolver::iterator eps) {
        net::connect(s.lowest_layer(), eps);
        s.lowest_layer().set_option(tcp::no_delay(true));
    
        if (!SSL_set_tlsext_host_name(s.native_handle(), host.c_str())) {
            throw system_error{ { (int)::ERR_get_error(), net::error::get_ssl_category() } };
        }
        s.handshake(stream::handshake_type::client);
    }
    
  3. making the HTTP request

    auto get_request(std::string const& host, std::string const& path) {
        using namespace http;
        request<string_body> req;
        req.version(11);
        req.method(verb::get);
        req.target("https://" + host + path);
        req.set(field::user_agent, "test");
        req.set(field::host, host);
    
        std::cerr << req << std::endl;
        return req;
    }
    

Not bad, for C++.

Pipe It Into zcat

Now we start with the asynchrony: let's have a "pump" or "loop" that sends all the response data into a pipe:

// now, asynchoronusly read contents
process::async_pipe pipe_to_zcat(io);

std::function<void(error_code, size_t)> receive_zip;

receive_zip is what we call our loop. It's a self-chaining asynchronous operation. So, everytime it is called, it will pump some data into the pipe, and call one more async_read for the HTTP response:

receive_zip = [&s, &response_reader, &pipe_to_zcat, &buf, &lookahead, &receive_zip]
    (error_code ec, size_t /*ignore_this*/)
{
    auto& res = response_reader.get();
    auto& body = res.body();
    if (body.data) {
        auto n = sizeof(buf) - body.size;
        net::write(pipe_to_zcat, net::buffer(buf, n));
    }

    bool done = ec && !(ec == http::error::need_buffer);
    done += response_reader.is_done();

    if (done) {
        std::cerr << "receive_zip: " << ec.message() << std::endl;
        pipe_to_zcat.close();
    } else {
        body.data = buf.data();
        body.size = buf.size();

        http::async_read(s, lookahead, response_reader, receive_zip);
    }
};

This slightly complicated looking reading of a buffered response is almost literally from the documentation here.

Now, all we have to do is prime the pump:

// kick off receive loop
receive_zip(error_code{}, 0);

Intermezzo, Unzip

This is not the interesting part, let's go: We are launching a subprocess zcat and want a second pipe to read the output from:

process::async_pipe zcat_output(io);
process::child zcat(
   process::search_path("zcat"),
   process::std_in < pipe_to_zcat,
   process::std_out > zcat_output,
   process::on_exit([](int exitcode, std::error_code ec) {
        std::cerr << "Child process exited with " << exitcode << " (" << ec.message() << ")
";
   }), io);

End of intermission :)

(We even threw in error reporting because, why not?)

Ah, The Good Stuff: Primes On Tap!

Now, we have another async read loop, this time to read back the uncompressed primes. This is where we will assemble batch jobs to be handled on the worker pool.

std::function<void(error_code, size_t)> receive_primes;
net::streambuf sb;

Like receive_zip before, receive_primes is our loop driver, the sb buffer is just a buffer which makes it easy to read using std::istream as you'd normally do from std::cin.

receive_primes = [&zcat_output, &sb, &receive_primes, &pool](error_code ec, size_t /*transferred*/) {
    {
        std::istream is(&sb);

        size_t n = std::count(net::buffers_begin(sb.data()), net::buffers_end(sb.data()), '
');
        std::vector<size_t> batch(n);
        std::copy_n(std::istream_iterator<size_t>(is), n, batch.begin());
        is.ignore(1, '
'); // we know a newline is pending, eat it to keep invariant

        post(pool, std::bind(handle_batch, std::move(batch)));
    }

    if (ec) {
        std::cerr << "receive_primes: " << ec.message() << std::endl;
        zcat_output.close();
    } else {
        net::async_read_until(zcat_output, sb, "
", receive_primes);
    }
};

Because async_read_until may read partial lines, we count the number (n) of full lines in the buffer and pack them into a vector. After we make sure that we eat the impending newline, we ... post the batch job, finally:

 post(pool, std::bind(handle_batch, std::move(batch)));

We move ownership to the task because it will run on a separate thread, and the best way to handle concurrency is to minimize sharing.

Again, priming the pump:

// kick off handler loop as well:
receive_primes(error_code{}, 0);

PUTTING IT ALL TOGETHER

Well. Prepare for the anticlimax. With all the async chains setup, all we need to do is... wait.

    io.run();
    pool.join();
} // end of main

The io.run() keeps running both pumps and awaits the child process, all on the main thread, as we like.

The pool.join() waits for all batch jobs to be completed, before stopping the thread pool. If you leave out that line, you might not run all the tasks, because the destructor of thread_pool calls stop() before it calls join().

Toy around with the buffer size (512 bytes in my example) to see how large batches become. Note that 512 bytes is compressed bytes.

"UNLIVE" DEMO

Sadly no online compiler that I know of supports external network access, so you'll have to run this one yourself. For convenience, here's a full listing, and sample output from a run on my computer:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/beast/http.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <iomanip>
#include <iostream>

void handle_batch(std:

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

Just Browsing Browsing

1.4m articles

1.4m replys

5 comments

56.9k users

...