首页 > 程序人生 > 再谈“事件驱动”

再谈“事件驱动”

2011年10月16日 发表评论 阅读评论

之前也写过一些关于事件驱动模型的文章,事隔一年多,再谈起事件驱动,又是一种新的感触,借此文来分享一下,希望能够帮助到在工作、学习中在这方面遇到困惑的朋友。

通常,我们会把轮询和事件驱动拿来做对比,关于它们两者,我在《浅淡“事件驱动”》一文中有举过一个传统邮件和电子邮件的例子,可以帮助大家来理解。轮询的最大缺点是盲目的做,CPU浪费很严重;事件驱动解决了浪费CPU的问题,它只在需要做的时候做,但是在实现上相对要复杂一些。不过总的来说,再复杂的模型,只要理解了它的原理,都阻挡不了聪明的人类使用它的步伐。

工作的两三年的时间里,陆陆续续也做过不少项目,写过不少程序。每次写新的程序,或者是看别人写的程序,都多多少少对会反思自己以前写的程序中使用过的一些模型,想着如何来改进它,使得程序的可扩展性,可复用性等越来越好。通过这样的一些过程,自己也确实发现了变化,也积累了一些经验,总结一下,分享给大家。接下来我将通过一些实例,来介绍事件驱动模型在实现的过程中需要注意的一些点。

介绍例子之前,先明确几个概念,这个在事件驱动模型中用得相对比较多:
(1)Task: 是指对由一系列动作串起来以完成一件事情的抽象,比如对下载一个网页这样的事情,可以抽象出一个PageCrawlTask;
(2)Event: 是对某一时刻发生的事情的抽象,它的主要目的是为了通知Task,该时刻发生了什么,让Task根据所发生的做出相应的变化。比如收到网络包,可以抽象为RecvPacketEvent,PageCrawlTask收到这个事件后,就会根据当前自己的状态,处理收到的包的情况。

介绍完上面的概念,下面我们通过实例来看:

  • 1. task->event->worker,显式的通过task状态选择相应的处理函数
  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    
    class PageCrawlTask;
     
    /// 事件类型
    class Event
    {
    public:
        /// 事件类型
        enum Type
        {
            kEventTypeRecvDNSResponse = 1, ///< 收到dns应答的事件
            kEventTypeRecvContent  = 2,    ///< 收到下载内容的事件
        };
     
        PageCrawlTask* m_task;        ///< 于当前event所关联的task
        int            m_dispatch_id; ///< 用来分发事件对对应处理线程的id
        int            m_event_type;  ///< 事件类型
        void*          m_cookie;      ///< 事件所带的cookie信息
    };
     
    /// 页面下载的task类型
    class PageCrawlTask
    {
    public:
        /// task状态
        enum State
        {
            kInit            = 0, ///< 初始状态
            kWaitDNSResponse = 1, ///< 发出异步dns请求,等待应答
            kWaitContent     = 2, ///< 收到下载的内容
            kFinish          = 3  ///< 下载完成
        };
     
        State GetState() const
        {
            return m_state;
        }
     
        /// 启动任务->发起dns请求
        void Start() {}
     
        /// 收到dns请求->发起下载请求
        void ProcessDNSResponse(DNSResponse* response) {}
     
        /// 收到下载的内容->下载完成
        void ProcessContent(Content* content) {}
     
        /// 任务结束,清理资源
        void Finish() {}
     
    private:
        State m_state; ///< 任务当前的状态
    };
     
    /// 用来驱动事件的工作线程
    class Worker
    {
    public:
        /// 新加一个事件到当前工作线程
        void AddEvent(Event* event)
        {
            m_event_queue.PushBack(event);
        }
     
        /// 线程入口
        void Entry()
        {
            while (!IsStop())
            {
                Event* event = GetNextEvent(100);
                if (event != NULL)
                {
                    ProcessEvent(event);
                    delete event;
                }
            }
        }
     
    private:
        ///  获取下一个事件
        Event* GetNextEvent(int timeout)
        {
            Event* event = m_event_queue.TimedPopFront(timeout);
            return event;
        }
     
        /// 处理事件
        void ProcessEvent(Event* event)
        {
            PageCrawlTask* task = event->m_task; 
            switch (task->GetState())
            {
            case kInit:
                task->Start();
                break;
            case kWaitDNSResponse:
                task->ProcessDNSResponse(static_cast< DNSResponse*>(event->m_cookie));
                break;
            case kWaitContent:
                task->ProcessContent(static_cast< Content*>(event->m_cookie));
                break;
            case kFinish:
                task->Finish();
                break;
            }
        }
     
    private:
        CondQueue< Event*>  m_event_queue; ///< 事件队列(条件队列)
    };

    通过上面的例子,我们可以看出,事件的处理都是在Worker::ProcessEvent里进行的,在这里会根据每个任务当前的状态,选择其对应的处理函数。需要注意的是,整体PageCrawlTask的状态机是Start->WaitDNS->WaitContent->Finish,每个状态都有唯一的处理,如果不是的话,就需要加额外的判断逻辑了,这里请大家用的时候小心一些。

  • 2. task->event->worker,通过event的多态来选择对应的处理函数
  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    
    class PageCrawlTask;
     
    /// 事件类型
    class Event
    {
    public:
        virtual void Process() = 0;
     
        PageCrawlTask* m_task;        ///< 于当前event所关联的task
        int            m_dispatch_id; ///< 用来分发事件对对应处理线程的id
        void*          m_cookie;      ///< 事件所带的cookie信息
    };
     
    /// 启动任务的事件
    class StartEvent : public Event
    {
    public:
        virtual void Process()
        {
            m_task->Start(this);
        }
    };
     
    /// 收到dns应答的事件
    class RecvDNSResponseEvent : public Event
    {
    public:
        virtual void Process()
        {
            m_task->ProcessDNSResponse(this);
        }
    };
     
    /// 收到下载内容的事件
    class RecvContentEvent : public Event
    {
    public:
        virtual void Process()
        {
            m_task->ProcessContent(this);
        }
    };
     
    /// 结束任务的事件
    class FinishEvent : public Event
    {
    public:
        virtual void Process()
        {
            m_task->Finish(this);
        }
    };
     
    /// 页面下载的task类型
    class PageCrawlTask
    {
    public:
        /// 启动任务->发起dns请求
        void Start(StartEvent* event) {}
     
        /// 收到dns请求->发起下载请求
        void ProcessDNSResponse(RecvDNSResponseEvent* event) {}
     
        /// 收到下载的内容->下载完成
        void ProcessContent(RecvContentEvent* event) {}
     
        /// 任务结束,清理资源
        void Finish(FinishEvent* event) {}
     
    private:
        State m_state; ///< 任务当前的状态
    };
     
    /// 用来驱动事件的工作线程
    class Worker
    {
    public:
        /// 新加一个事件到当前工作线程
        void AddEvent(Event* event)
        {
            m_event_queue.PushBack(event);
        }
     
        /// 线程入口
        void Entry()
        {
            while (!IsStop())
            {
                Event* event = GetNextEvent(100);
                if (event != NULL)
                {
                    event->Process(); ///< 处理事件
                    delete event;
                }
            }
        }
     
    private:
        ///  获取下一个事件
        Event* GetNextEvent(int timeout)
        {
            Event* event = m_event_queue.TimedPopFront(timeout);
            return event;
        }
     
    private:
        CondQueue< Event*>  m_event_queue; ///< 事件队列(条件队列)
    };

    上面的例子,本质上与1中的例子是一样的,只不过这里用到了C++的多态,通过Event的多态来分派事件处理,这样做的好处一是Worker简单了,二是程序扩展性好了。增加新的事件,只需要增加新的事件类,和其对应的处理函数,不需要修改原有的代码。相比而言,1中如果要增加新的事件,需要修改event, task, worker三个地方,尤其是要修改Worker的ProcessEvent函数。不过因为上面的方法用到了虚函数,相比1中的实现,有一些小小的额外开销,但这个的影响相对于其优点,基本可以忽略不计。

  • 3. task, 没有显式的event和worker, 通过回调函数来通知事件, 通过网络框架的thread和timer的thread来驱动
  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    
    /// 页面下载的task类型
    class PageCrawlTask
    {
    public:
        /// 回调函数(用了c++0x的function)
        typedef std::function< void(int)> Callback;
     
        /// 启动任务
        void Start()
        {
            Callback callback = std::bind(&PageCrawlTask::DNSRequestCallback,
                this, &m_dns_response, _1);
            SendDNSRequest(callback);
        }
     
        /// 任务结束, 通过callback通知上层应用
        void Complete(int error_code)
        {
            m_callback(error_code);
        }
     
    private:
        /// 发送dns请求, 注册收到应答时的callback
        void SendDNSRequest(Callback callback) {}
     
        /// dns收到应答的callback
        void DNSRequestCallback(DNSResponse* response, int error_code)
        {
            Callback callback = std::bind(&PageCrawlTask::DonwloadPageRequestCallback,
                this, &m_page_content, _1);
            SendDownloadPageRequest(callback);
        }
     
        /// 发送下载请求, 注册下载完成时的callback
        void SendDownloadPageRequest(Callback callback) {}
     
        /// 收到下载结果的callback
        void DonwloadPageRequestCallback(PageContent* content, int error_code)
        {
            /// TODO 处理下载结果
            Complete(error_code); ///< 通知上层应用, 下载完成
        }
     
    private:
        Callback     m_callback;      ///< 上层应用在启动任务时注册的回调函数
        DNSResponse  m_dns_response;  ///< dns应答
        PageContent  m_page_content;  ///< 页面下载结果
    };

    上面的例子,十分简洁,所有的过程都绑定了回调函数,这样每个过程的下一个过程都是十分明确直接的。上面的例子与1、2中最大的区别是,没有了显式的驱动线程,通过网络框架、timer等的线程来驱动。这里需要注意一下,对于比较慢的事情,可能会阻塞网络或者timer的线程,可以考虑增加一个线程池,把这样的事件丢到线程池里去处理,处理完也同样通过回调函数来通知。

    上面三个例子,各有优劣,都是很经典的模型,感兴趣的朋友可以试着在实际的工作、学习中都试用一下,掌握了上面的模型,基本上对后台Server的开发没有什么大的问题了。如果对于上面的代码有什么疑问的话,可以在下面留言,欢迎交流、沟通,能够帮助别人是笔者感到非常荣幸的事情。

    1. 漂石
      2014年12月27日13:06 | #1

      第三个例子看不懂。没有说明网络框架、timer等的线程如何驱动,也没有说明DNSResponse和PageContent定义。void SendDNSRequest(Callback callback) {}这是函数为空,就是什么都不做,怎么会执行回调函数?

    1. 本文目前尚无任何 trackbacks 和 pingbacks.
    您必须在 登录 后才能发布评论.