• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

CopernicaMarketingSoftware/AMQP-CPP: C++ library for asynchronous non-blocking c ...

原作者: [db:作者] 来自: 网络 收藏 邀请

开源软件名称:

CopernicaMarketingSoftware/AMQP-CPP

开源软件地址:

https://github.com/CopernicaMarketingSoftware/AMQP-CPP

开源编程语言:

C++ 98.0%

开源软件介绍:

AMQP-CPP

Build Status Build status

Are you upgrading from AMQP-CPP 3 to AMQP-CPP 4? Please read the upgrade instructions

AMQP-CPP is a C++ library for communicating with a RabbitMQ message broker. The library can be used to parse incoming data from a RabbitMQ server, and to generate frames that can be sent to a RabbitMQ server.

This library has a layered architecture, and allows you - if you like - to completely take care of the network layer. If you want to set up and manage the network connections yourself, the AMQP-CPP library will not make a connection to RabbitMQ by itself, nor will it create sockets and/or perform IO operations. As a user of this library, you create the socket connection and implement a certain interface that you pass to the AMQP-CPP library and that the library will use for IO operations.

Intercepting this network layer is however optional, the AMQP-CPP library also comes with a predefined TCP and TLS module that can be used if you trust the AMQP library to take care of the network (and optional TLS) handling. In that case, the AMQP-CPP library does all the system and library calls to set up network connections and send and receive the (possibly encrypted) data.

This layered architecture makes the library extremely flexible and portable: it does not necessarily rely on operating system specific IO calls, and can be easily integrated into any kind of event loop. If you want to implement the AMQP protocol on top of some unusual other communication layer, this library can be used for that - but if you want to use it with regular TCP connections, setting it up is just as easy.

AMQP-CPP is fully asynchronous and does not do any blocking (system) calls, so it can be used in high performance applications without the need for threads.

The AMQP-CPP library uses C++11 features, so if you intend to use it, please make sure that your compiler is up-to-date and supports C++11.

Note for the reader: This readme file has a peculiar structure. We start explaining the pure and hard core low level interface in which you have to take care of opening socket connections yourself. In reality, you probably want to use the simpler TCP interface that is being described later on.

ABOUT

This library is created and maintained by Copernica (www.copernica.com), and is used inside the MailerQ (www.mailerq.com) and Yothalot (www.yothalot.com) applications. MailerQ is a tool for sending large volumes of email, using AMQP message queues, and Yothalot is a big data processing map/reduce framework.

Do you appreciate our work and are you looking for high quality email solutions? Then check out our other commercial and open source solutions:

INSTALLING

AMQP-CPP comes with an optional Linux-only TCP module that takes care of the network part required for the AMQP-CPP core library. If you use this module, you are required to link with pthread and dl.

There are two methods to compile AMQP-CPP: CMake and Make. CMake is platform portable and works on all systems, while the Makefile only works on Linux. The two methods create both a shared and a static version of the AMQP-CPP library. Building of a shared library is currently not supported on Windows.

After building there are two relevant files to #include when you use the library.

File Include when?
amqpcpp.h Always needed for the core features
amqpcpp/linux_tcp.h If using the Linux-only TCP module

On Windows you are required to define NOMINMAX when compiling code that includes public AMQP-CPP header files.

Using cmake

The CMake file supports both building and installing. You can choose not to use the install functionality, and instead manually use the build output at build/bin/. Keep in mind that the TCP module is only supported for Linux. An example install method would be:

mkdir build
cd build
cmake .. [-DAMQP-CPP_BUILD_SHARED=ON] [-DAMQP-CPP_LINUX_TCP=ON]
cmake --build . --target install
Option Default Meaning
AMQP-CPP_BUILD_SHARED OFF Static lib(ON) or shared lib(OFF)? Shared is not supported on Windows.
AMQP-CPP_LINUX_TCP OFF Should the Linux-only TCP module be built?

Using make

Compiling and installing AMQP-CPP with make is as easy as running make and then make install. This will install the full version of AMQP-CPP, including the system specific TCP module. If you do not need the additional TCP module (because you take care of handling the network stuff yourself), you can also compile a pure form of the library. Use make pure and make install for that.

When you compile an application that uses the AMQP-CPP library, do not forget to link with the library. For gcc and clang the linker flag is -lamqpcpp. If you use the fullblown version of AMQP-CPP (with the TCP module), you also need to pass the -lpthread and -ldl linker flags, because the TCP module uses a thread for running an asynchronous and non-blocking DNS hostname lookup, and it must be linked with the "dl" library to allow dynamic lookups for functions from the openssl library if a secure connection to RabbitMQ has to be set up.

HOW TO USE AMQP-CPP

As we mentioned above, the library can be used in a network-agnostic fashion. It then does not do any IO by itself, and you need to pass an object to the library that the library can use for IO. So, before you start using the library, you first need to create a class that extends from the ConnectionHandler base class. This is a class with a number of methods that are called by the library every time it wants to send out data, or when it needs to inform you that an error occurred.

#include <amqpcpp.h>

class MyConnectionHandler : public AMQP::ConnectionHandler
{
    /**
     *  Method that is called by the AMQP library every time it has data
     *  available that should be sent to RabbitMQ.
     *  @param  connection  pointer to the main connection object
     *  @param  data        memory buffer with the data that should be sent to RabbitMQ
     *  @param  size        size of the buffer
     */
    virtual void onData(AMQP::Connection *connection, const char *data, size_t size)
    {
        // @todo
        //  Add your own implementation, for example by doing a call to the
        //  send() system call. But be aware that the send() call may not
        //  send all data at once, so you also need to take care of buffering
        //  the bytes that could not immediately be sent, and try to send
        //  them again when the socket becomes writable again
    }

    /**
     *  Method that is called by the AMQP library when the login attempt
     *  succeeded. After this method has been called, the connection is ready
     *  to use.
     *  @param  connection      The connection that can now be used
     */
    virtual void onReady(AMQP::Connection *connection)
    {
        // @todo
        //  add your own implementation, for example by creating a channel
        //  instance, and start publishing or consuming
    }

    /**
     *  Method that is called by the AMQP library when a fatal error occurs
     *  on the connection, for example because data received from RabbitMQ
     *  could not be recognized.
     *  @param  connection      The connection on which the error occurred
     *  @param  message         A human readable error message
     */
    virtual void onError(AMQP::Connection *connection, const char *message)
    {
        // @todo
        //  add your own implementation, for example by reporting the error
        //  to the user of your program, log the error, and destruct the
        //  connection object because it is no longer in a usable state
    }

    /**
     *  Method that is called when the connection was closed. This is the
     *  counter part of a call to Connection::close() and it confirms that the
     *  AMQP connection was correctly closed.
     *
     *  @param  connection      The connection that was closed and that is now unusable
     */
    virtual void onClosed(AMQP::Connection *connection) 
    {
        // @todo
        //  add your own implementation, for example by closing down the
        //  underlying TCP connection too
    }


};

After you've implemented the ConnectionHandler class (which is entirely up to you), you can start using the library by creating a Connection object, and one or more Channel objects:

// create an instance of your own connection handler
MyConnectionHandler myHandler;

// create a AMQP connection object
AMQP::Connection connection(&myHandler, AMQP::Login("guest","guest"), "/");

// and create a channel
AMQP::Channel channel(&connection);

// use the channel object to call the AMQP method you like
channel.declareExchange("my-exchange", AMQP::fanout);
channel.declareQueue("my-queue");
channel.bindQueue("my-exchange", "my-queue", "my-routing-key");

A number of remarks about the example above. First you may have noticed that we've created all objects on the stack. You are of course also free to create them on the heap with the C++ operator 'new'. That works just as well, and is in real life code probably more useful as you normally want to keep your handlers, connection and channel objects around for a longer time.

But more importantly, you can see in the example above that we instantiated the channel object directly after we made the connection object, and we also started declaring exchanges and queues right away. However, under the hood, a handshake protocol is executed between the server and the client when the Connection object is first created. During this handshake procedure it is not permitted to send other instructions (like opening a channel or declaring a queue). It would therefore have been better if we had first waited for the connection to be ready (implement the MyConnectionHandler::onReady() method), and create the channel object only then. But this is not strictly necessary. The methods that are called before the handshake is completed are cached by the AMQP library and will be executed the moment the handshake is completed and the connection becomes ready for use.

PARSING INCOMING DATA

The ConnectionHandler class has a method onData() that is called by the library every time that it wants to send out data. We've explained that it is up to you to implement that method. Inside your ConnectionHandler::onData() method, you can for example call the "send()" or "write()" system call to send out the data to the RabbitMQ server. But what about data in the other direction? How does the library receive data back from RabbitMQ?

In this raw setup, the AMQP-CPP library does not do any IO by itself and it is therefore also not possible for the library to receive data from a socket. It is again up to you to do this. If, for example, you notice in your event loop that the socket that is connected with the RabbitMQ server becomes readable, you should read out that socket (for example by using the recv() system call), and pass the received bytes to the AMQP-CPP library. This is done by calling the parse() method in the Connection object.

The Connection::parse() method gets two parameters, a pointer to a buffer of data that you just read from the socket, and a parameter that holds the size of this buffer. The code snippet below comes from the Connection.h C++ header file.

/**
 *  Parse data that was received from RabbitMQ
 *
 *  Every time that data comes in from RabbitMQ, you should call this method to parse
 *  the incoming data, and let it handle by the AMQP-CPP library. This method returns
 *  the number of bytes that were processed.
 *
 *  If not all bytes could be processed because it only contained a partial frame,
 *  you should call this same method later on when more data is available. The
 *  AMQP-CPP library does not do any buffering, so it is up to the caller to ensure
 *  that the old data is also passed in that later call.
 *
 *  @param  buffer      buffer to decode
 *  @param  size        size of the buffer to decode
 *  @return             number of bytes that were processed
 */
size_t parse(char *buffer, size_t size)
{
    return _implementation.parse(buffer, size);
}

You should do all the book keeping for the buffer yourselves. If you for example call the Connection::parse() method with a buffer of 100 bytes, and the method returns that only 60 bytes were processed, you should later call the method again, with a buffer filled with the remaining 40 bytes. If the method returns 0, you should make a new call to parse() when more data is available, with a buffer that contains both the old data, and the new data.

To optimize your calls to the parse() method, you could use the Connection::expected() and Connection::maxFrame() methods. The expected() method returns the number of bytes that the library prefers to receive next. It is pointless to call the parse() method with a smaller buffer, and it is best to call the method with a buffer of exactly this size. The maxFrame() returns the max frame size for AMQP messages. If you read your messages into a reusable buffer, you could allocate this buffer up to this size, so that you never will have to reallocate.

TCP CONNECTIONS

Although the AMQP-CPP library gives you extreme flexibility by letting you setup your own network connections, the reality is that most if not all AMQP connections use the TCP protocol. To help you out, the library therefore also comes with a TCP module that takes care of setting up the network connections, and sending and receiving the data.

If you want to use this TCP module, you should not use the AMQP::Connection and AMQP::Channel classes that you saw above, but the alternative AMQP::TcpConnection and AMQP::TcpChannel classes instead. You also do not have to create your own class that implements the "AMQP::ConnectionHandler" interface - but a class that inherits from "AMQP::TcpHandler" instead. This AMQP::TcpHandler class contains a set of methods that you can override to intercept all sort of events that occur during the TCP and AMQP connection lifetime. Overriding these methods is mostly optional, because almost all have a default implementation. But you do need to implement the "monitor()" method, as that is needed by the AMQP-CPP library to interact with the main event loop:

#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>

class MyTcpHandler : public AMQP::TcpHandler
{
    /**
     *  Method that is called by the AMQP library when a new connection
     *  is associated with the handler. This is the first call to your handler
     *  @param  connection      The connection that is attached to the handler
     */
    virtual void onAttached(AMQP::TcpConnection *connection) override
    {
        // @todo
        //  add your own implementation, for example initialize things
        //  to handle the connection.
    }

    /**
     *  Method that is called by the AMQP library when the TCP connection 
     *  has been established. After this method has been called, the library
     *  still has take care of setting up the optional TLS layer and of
     *  setting up the AMQP connection on top of the TCP layer., This method 
     *  is always paired with a later call to onLost().
     *  @param  connection      The connection that can now be used
     */
    virtual void onConnected(AMQP::TcpConnection *connection) override
    {
        // @todo
        //  add your own implementation (probably not needed)
    }

    /**
     *  Method that is called when the secure TLS connection has been established. 
     *  This is only called for amqps:// connections. It allows you to inspect
     *  whether the connection is secure enough for your liking (you can
     *  for example check the server certificate). The AMQP protocol still has
     *  to be started.
     *  @param  connection      The connection that has been secured
     *  @param  ssl             SSL structure from openssl library
     *  @return bool            True if connection can be used
     */
    virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override
    {
        // @todo
        //  add your own implementation, for example by reading out the
        //  certificate and check if it is indeed yours
        return true;
    }

    /**
     *  Method that is called by the AMQP library when the login attempt
     *  succeeded. After this the connection is ready to use.
     *  @param  connection      The connection that can now be used
     */
    virtual void onReady(AMQP::TcpConnection *connection) override
    {
        // @todo
        //  add your own implementation, for example by creating a channel
        //  instance, and start publishing or consuming
    }

    /**
     *  Method that is called by the AMQP library when a fatal error occurs
     *  on the connection, for example because data received from RabbitMQ
     *  could not be recognized, or the underlying connection is lost. This
     *  call is normally followed by a call to onLost() (if the error occurred
     *  after the TCP connection was established) and onDetached().
     *  @param  connection      The connection on which the error occurred
     *  @param  message         A human readable error message
     */
    virtual void onError(AMQP::TcpConnection *connection, const char *message) override
    {
        // @todo
        //  add your own implementation, for example by reporting the error
        //  to the user of your program and logging the error
    }

    /**
     *  Method that is called when the AMQP protocol is ended. This is the
     *  counter-part of a call to connection.close() to graceful shutdown
     *  the connection. Note that the TCP connection is at this time still 
     *  active, and you will also receive calls to onLost() and onDetached()
     *  @param  connection      The connection over which the AMQP protocol ended
     */
    virtual void onClosed(AMQP::TcpConnection *connection) override 
    {
        // @todo
        //  add your own implementation (probably not necessary, but it could
        //  be useful if you want to do some something immediately after the
        //  amqp connection is over, but do not want to wait for the tcp 
        //  connection to shut down
    }

    /**
     *  Method that is called when the TCP connection was closed or lost.
     *  This method is always called if there was also a call to onConnected()
     *  @param  connection      The connection that was closed and that is now unusable
     */
    virtual void onLost(AMQP::TcpConnection *connection) override 
    {
        // @todo
        //  add your own implementation (probably not necessary)
    }

    /**
     *  Final method that is called. This signals that no further calls to your
     *  handler will be made about the connection.
     *  @param  connection      The connection that can be destructed
     */
    virtual void onDetached(AMQP::TcpConnection *connection) override 
    {
        // @todo
        //  add your own implementation, like cleanup resources or exit the application
    } 

    /**
     *  Method that is called by the AMQP-CPP library when it wants to interact
     *  with the main event loop. The AMQP-CPP library is completely non-blocking,
     *  and only make "write()" or "read()" system calls when it knows in advance
     *  that these calls will not block. To register a filedescriptor in the
     *  event loop, it calls this "monitor()" method with a filedescriptor and
     *  flags telling whether the filedescriptor should be checked for readability
     *  or writability.
     *
     *  @param  connection      The connection that wants to interact with the event loop
     *  @param  fd              The filedescriptor that should be checked
     *  @param  flags           Bitwise or of AMQP::readable and/or AMQP::writable
     */
    virtual void monitor(AMQP::TcpConnection *connection, int fd, int flags) override
    {
        // @todo
        //  add your own implementation, for example by adding the file
        //  descriptor to the main application event loop (like the select() or
        //  poll() loop). When the event loop reports that the descriptor becomes
        //  readable and/or writable, it is up to you to inform the AMQP-CPP
        //  library that the filedescriptor is active by calling the
        //  connection->process(fd, flags) method.
    }
};

You see that there are many methods in TcpHandler that you can implement. The most important one is "monitor()". This method is used to integrate the AMQP filedescriptors in your application's event loop. For some popular event loops (libev, libuv, libevent), we have already added example handler objects (see the next section for that). All the other methods are optional to override. It often is a good idea to override the onError() method to log or report errors and onDetached() for cleaning up stuff. AMQP-CPP has it's own buffers if you send instructions prematurely, but if you intend to send a lot of data over the connection, it also is a good idea to implement the onReady() method and delay your calls until the AMQP connection has been fully set up.

Using the TCP module of the AMQP-CPP library is easier than using the raw AMQP::Connection and AMQP::Channel objects, because you do not have to create the sockets and connections yourself, and you also do not have to take care of buffering network data. The example that we gave above, looks slightly different if you make use of the TCP module:

// create an instance of your own tcp handler
MyTcpHandler myHandler;

// address of the server
AMQP::Address address("amqp://guest:guest@localhost/vhost");

// create a AMQP connection object
AMQP::TcpConnection connection(&myHandler, address);

// and create a channel
AMQP::TcpChannel channel(&connection);

// use the channel object to call the AMQP method you like
channel.declareExchange("my-exchange", AMQP::fanout);
channel.declareQueue("my-queue");
channel.bindQueue("my-exchange", "my-queue", "my-routing-key");

SECURE CONNECTIONS

The TCP module of AMQP-CPP also supports setting up secure connections. If your RabbitMQ server accepts SSL connections, you can specify the address to your server using the amqps:// protocol:

// init the SSL library (this works for openssl 1.1, for openssl 1.0 use SSL_library_init())
OPENSSL_init_ssl(0, NULL);

// address of the server (secure!)
AMQP::Address address("amqps://guest:guest@localhost/vhost");

// create a AMQP connection object
AMQP::TcpConnection connection(&myHandler, address);

There are two things to take care of if you want to create a secure connection: (1) you must link your application with the -lssl flag (or use dlopen()), and (2) you must initialize the openssl library by calling OPENSSL_init_ssl(). This initializating must take place before you let you application connect to RabbitMQ. This is necessary because AMQP-CPP needs access to the openssl library to set up secure connections. It can only access this library if you have linked your application with this library, or if you have loaded this library at runtime using dlopen()).

Linking openssl is the normal thing to do. You just have to add the -lssl flag to your linker. If you however do not want to link your application with openssl, you can also load the openssl library at runtime, and pass in the pointer to the handle to AMQP-CPP:

// dynamically open the openssl library
void *handle = dlopen("/path/to/openssl.so", RTLD_LAZY);

// tell AMQP-CPP library where the handle to openssl can be found
AMQP::openssl(handle);

// @todo call functions to initialize openssl, and create the AMQP connection
// (see exampe above)

By itself, AMQP-CPP does not check if the created TLS connection is sufficient secure. Whether the certificate is expired, self-signed, missing or invalid: for AMQP-CPP it all doesn't matter and the connection is simply permitted. If you want to be more strict (for example: if you want to verify the server's certificate), you must do this yourself by implementing the "onSecured()" method in your handler object:

#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>

class MyTcpHandler : public AMQP::TcpHandler
{
    /**
     *  Method that is called right after the TLS connection has been created.
     *  In this method you can check the connection properties (like the certificate)
     *  and return false if you find it not secure enough
     *  @param  connection      the connection that has just completed the tls handshake
     *  @param  ssl             SSL structure from the openssl library
     *  @return bool            true if connection is secure enough to start the AMQP protocol
     */
    virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override
    {
        // @todo call functions from the openssl library to check the certificate,
        // like SSL_get_peer_certificate() or SSL_get_verify_result().
        // For now we always allow the connection to proceed
        return true;
    }
    
    /**
     *  All other methods (like onConnected(), onError(), etc) are left out of this
     *  example, but would be here if this was an actual user space handler class.
     */
};

The SSL pointer that is passed to the onSecured() method refers to the "SSL" structure from the openssl library.

EXISTING EVENT LOOPS

Both the pure AMQP::Connection as well as the easier AMQP::TcpConnection class allow you to integrate AMQP-CPP in your own event loop. Whether you take care of running the event loop yourself (for example by using the select() system call), or if you use an existing library for it (like libevent, libev or libuv), you can implement the "monitor()" method to watch the file descriptors and hand over control back to AMQP-CPP when one of the sockets become active.

For libev, libuv and libevent users, we have even implemented an example implementation, so that you do not even have to do this. Instead of implementing the monitor() method yourself, you can use the AMQP::LibEvHandler, AMQP::LibUvHandler or AMQP:LibEventHandler classes instead:

#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>

int main()
{
    // access to the event loop
    auto *loop = EV_DEFAULT;

    // handler for libev (so we don't have to implement AMQP::TcpHandler!)
    AMQP::LibEvHandler handler(loop);

    // make a connection
    AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://localhost/"));

    // we need a channel too
    AMQP::TcpChannel channel(&connection);

    // create a temporary queue
    channel.declareQueue(AMQP::exclusive).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) {

        // report the name of the temporary queue
        std::cout << "declared queue " << name << std::endl;

        // now we can close the connection
        connection.close();
    });

    // run the loop
    ev_run(loop, 0);

    // done
    return 0;
}

The AMQP::LibEvHandler and AMQP::LibEventHandler classes are extended AMQP::TcpHandler classes, with an implementation of the monitor() method that simply adds the filedescriptor to the event loop. If you use this class, it is recommended not to instantiate i


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap