boost库asio详解3——io_service作为work pool

发布时间:2017-3-24 4:11:03 编辑:www.fx114.net 分享查询网我要评论
本篇文章主要介绍了"boost库asio详解3——io_service作为work pool",主要涉及到boost库asio详解3——io_service作为work pool方面的内容,对于boost库asio详解3——io_service作为work pool感兴趣的同学可以参考一下。

无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。 使用io_service作为处理工作的work pool,可以看到,就是通过io_service.post投递一个Handler到io_service的队列,Handler在这个io_service.run内部得到执行,有可能你会发现,io_services.dispatch的接口也和io_service.post一样,但不同的是它是直接调用而不是经过push到队列然后在io_services.run中执行,而在这个示例当中,显然我们需要把工作交到另一个线程去完成,这样才不会影响网络接收线程池的工作以达到高效率的接收数据,这种设计与前面的netsever其实相同,这就是典型的Half Sync/Half Async。二者的区别就是netsever自己实现了工作队列,而不是直接使用io_service,这种设计实际上在win下是使用了iocp作为工作队列。 不过我更倾向于前一种设计,因为那样做,代码一切都在自己的掌握中,而io_service则是经过许多封装代码,并且本身设计只是用于处理网络完成事件的。 无论如何使用,都能感觉到使用boost.asio实现服务器,不仅是一件非常轻松的事,而且代码很漂亮,逻辑也相当清晰,这点上很不同于ACE。 [cpp] view plaincopyprint? #include <stdio.h>    #include <cstdlib>    #include <iostream>    #include <boost/thread.hpp>    #include <boost/aligned_storage.hpp>    #include <boost/array.hpp>    #include <boost/bind.hpp>    #include <boost/enable_shared_from_this.hpp>    #include <boost/noncopyable.hpp>    #include <boost/shared_ptr.hpp>    #include <boost/asio.hpp>       using boost::asio::ip::tcp;       class handler_allocator        : private boost::noncopyable    {    public:        handler_allocator()            : in_use_(false)        {        }           void* allocate(std::size_t size)        {            if (!in_use_ && size < storage_.size)            {                in_use_ = true;                return storage_.address();            }            else            {                return ::operator new(size);            }        }           void deallocate(void* pointer)        {            if (pointer == storage_.address())            {                in_use_ = false;            }            else            {                ::operator delete(pointer);            }        }       private:        // Storage space used for handler-based custom memory allocation.        boost::aligned_storage<1024> storage_;           // Whether the handler-based custom allocation storage has been used.        bool in_use_;    };       template <typename Handler>    class custom_alloc_handler    {    public:        custom_alloc_handler(handler_allocator& a, Handler h)            : allocator_(a),            handler_(h)        {        }           template <typename Arg1>        void operator()(Arg1 arg1)        {            handler_(arg1);        }           template <typename Arg1, typename Arg2>        void operator()(Arg1 arg1, Arg2 arg2)        {            handler_(arg1, arg2);        }           friend void* asio_handler_allocate(std::size_t size,            custom_alloc_handler<Handler>* this_handler)        {            return this_handler->allocator_.allocate(size);        }           friend void asio_handler_deallocate(void* pointer, std::size_t /*size*/,            custom_alloc_handler<Handler>* this_handler)        {            this_handler->allocator_.deallocate(pointer);        }       private:        handler_allocator& allocator_;        Handler handler_;    };       // Helper function to wrap a handler object to add custom allocation.    template <typename Handler>    inline custom_alloc_handler<Handler> make_custom_alloc_handler(        handler_allocator& a, Handler h)    {        return custom_alloc_handler<Handler>(a, h);    }       /// A pool of io_service objects.    class io_service_pool        : private boost::noncopyable    {    public:        /// Construct the io_service pool.        explicit io_service_pool(std::size_t pool_size) : next_io_service_(0)        {            if (pool_size == 0)                throw std::runtime_error("io_service_pool size is 0");               // Give all the io_services work to do so that their run() functions will not            // exit until they are explicitly stopped.            for (std::size_t i = 0; i < pool_size; ++i)            {                io_service_ptr io_service(new boost::asio::io_service);                work_ptr work(new boost::asio::io_service::work(*io_service));                io_services_.push_back(io_service);                work_.push_back(work);            }        }           // Run all io_service objects in the pool.        void run()        {            // Create a pool of threads to run all of the io_services.            std::vector<boost::shared_ptr<boost::thread> > threads;            for (std::size_t i = 0; i < io_services_.size(); ++i)            {                boost::shared_ptr<boost::thread> thread(new boost::thread(                    boost::bind(&boost::asio::io_service::run, io_services_[i])));                threads.push_back(thread);            }               // Wait for all threads in the pool to exit.            for (std::size_t i = 0; i < threads.size(); ++i)                threads[i]->join();        }           // Stop all io_service objects in the pool.        void stop()        {            // Explicitly stop all io_services.            for (std::size_t i = 0; i < io_services_.size(); ++i)                io_services_[i]->stop();        }           // Get an io_service to use.        boost::asio::io_service& get_io_service()        {            // Use a round-robin scheme to choose the next io_service to use.            boost::asio::io_service& io_service = *io_services_[next_io_service_];            ++next_io_service_;            if (next_io_service_ == io_services_.size())                next_io_service_ = 0;            return io_service;        }       private:        typedef boost::shared_ptr<boost::asio::io_service> io_service_ptr;        typedef boost::shared_ptr<boost::asio::io_service::work> work_ptr;           /// The pool of io_services.        std::vector<io_service_ptr> io_services_;           /// The work that keeps the io_services running.        std::vector<work_ptr> work_;           /// The next io_service to use for a connection.        std::size_t next_io_service_;    };       class session        : public boost::enable_shared_from_this<session>    {    public:        session(boost::asio::io_service& work_service           , boost::asio::io_service& io_service)            : socket_(io_service)            , io_work_service(work_service)        {        }           tcp::socket& socket()        {            return socket_;        }           void start()        {            socket_.async_read_some(boost::asio::buffer(data_),                make_custom_alloc_handler(allocator_,                boost::bind(&session::handle_read,                shared_from_this(),                boost::asio::placeholders::error,                boost::asio::placeholders::bytes_transferred)));        }           void handle_read(const boost::system::error_code& error,            size_t bytes_transferred)        {            if (!error)            {                boost::shared_ptr<std::vector<char> > buf(new std::vector<char>);                   buf->resize(bytes_transferred);                std::copy(data_.begin(), data_.begin() + bytes_transferred, buf->begin());                io_work_service.post(boost::bind(&session::on_receive                   , shared_from_this(), buf, bytes_transferred));                   socket_.async_read_some(boost::asio::buffer(data_),                    make_custom_alloc_handler(allocator_,                    boost::bind(&session::handle_read,                    shared_from_this(),                    boost::asio::placeholders::error,                    boost::asio::placeholders::bytes_transferred)));            }        }           void handle_write(const boost::system::error_code& error)        {            if (!error)            {            }        }           void on_receive(boost::shared_ptr<std::vector<char> > buffers           , size_t bytes_transferred)        {            char* data_stream = &(*buffers->begin());            // in here finish the work.            std::cout << "receive :" << bytes_transferred << " bytes." <<                "message :" << data_stream << std::endl;        }       private:        // The io_service used to finish the work.        boost::asio::io_service& io_work_service;           // The socket used to communicate with the client.        tcp::socket socket_;           // Buffer used to store data received from the client.        boost::array<char, 1024> data_;           // The allocator to use for handler-based custom memory allocation.        handler_allocator allocator_;    };       typedef boost::shared_ptr<session> session_ptr;       class server    {    public:        server(short port, std::size_t io_service_pool_size)            : io_service_pool_(io_service_pool_size)            , io_service_work_pool_(io_service_pool_size)            , acceptor_(io_service_pool_.get_io_service(), tcp::endpoint(tcp::v4(), port))        {            session_ptr new_session(new session(io_service_work_pool_.get_io_service()               , io_service_pool_.get_io_service()));            acceptor_.async_accept(new_session->socket(),                boost::bind(&server::handle_accept, this, new_session,                boost::asio::placeholders::error));        }           void handle_accept(session_ptr new_session,            const boost::system::error_code& error)        {            if (!error)            {                new_session->start();                new_session.reset(new session(io_service_work_pool_.get_io_service()                   , io_service_pool_.get_io_service()));                acceptor_.async_accept(new_session->socket(),                    boost::bind(&server::handle_accept, this, new_session,                    boost::asio::placeholders::error));            }        }           void run()        {            io_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run               , &io_service_pool_)));            work_thread_.reset(new boost::thread(boost::bind(&io_service_pool::run               , &io_service_work_pool_)));        }           void stop()        {            io_service_pool_.stop();            io_service_work_pool_.stop();               io_thread_->join();            work_thread_->join();        }       private:        boost::shared_ptr<boost::thread> io_thread_;        boost::shared_ptr<boost::thread> work_thread_;        io_service_pool io_service_pool_;        io_service_pool io_service_work_pool_;        tcp::acceptor acceptor_;    };       int main(int argc, char* argv[])    {        try        {            if (argc != 2)            {                std::cerr << "Usage: server <port>/n";                return 1;            }               using namespace std; // For atoi.            server s(atoi(argv[1]), 10);               s.run();               getchar();               s.stop();        }        catch (std::exception& e)        {            std::cerr << "Exception: " << e.what() << "/n";        }           return 0;    }   

上一篇:[ACM]Elevator
下一篇:boost库asio详解4——deadline_timer使用说明

相关文章

相关评论

本站评论功能暂时取消,后续此功能例行通知。

一、不得利用本站危害国家安全、泄露国家秘密,不得侵犯国家社会集体的和公民的合法权益,不得利用本站制作、复制和传播不法有害信息!

二、互相尊重,对自己的言论和行为负责。

好贷网好贷款