基于条件变量的消息队列 说明介绍
基于条件变量的消息队列 说明介绍
发布时间:2016-12-28 来源:查字典编辑
摘要:条件变量是线程之前同步的另一种机制。条件变量给多线程提供了一种会和的场所。当条件变量和互斥锁一起使用时,允许线程以无竞争的方式等待特定的条件...

条件变量是线程之前同步的另一种机制。条件变量给多线程提供了一种会和的场所。当条件变量和互斥锁一起使用时,允许线程以无竞争的方式等待特定的条件发生。这样大大减少了锁竞争引起的线程调度和线程等待。

消息队列是服务器端开发过程中绕不开的一道坎,前面,我已经实现了一个基于互斥锁和三队列的消息队列,性能很不错。博客园中的其他园主也实现了很多基于环形队列和lock-free的消息队列,很不错,今天我们将要实现一个基于双缓冲、互斥锁和条件变量的消息队列;这个大概也参考了一下java的blockingqueue,在前面一个博客中有简单介绍!!基于三缓冲的队列,虽然最大限度上解除了线程竞争,但是在玩家很少,消息很小的时候,需要添加一些buff去填充数据,这大概也是其一个缺陷吧!

消息队列在服务器开发过程中主要用于什么对象呢?

1: 我想大概就是通信层和逻辑层之间的交互,通信层接受到的网络数据,验证封包之后,通过消息队列传递给逻辑层,逻辑层将处理结果封包再传递给通信层!

2:逻辑线程和数据库IO线程的分离;数据库IO线程负责对数据库的读写更新,逻辑层对数据库的操作,封装成消息去请求数据库IO线程,数据库IO线程处理完之后,再交回给逻辑层。

3:日志;处理模式与方式2 类似。不过日志大概是不需要返回的!

给出源代码:

BlockingQueue.h文件

复制代码 代码如下:

/*

* BlockingQueue.h

*

* Created on: Apr 19, 2013

* Author: archy_yu

*/

#ifndef BLOCKINGQUEUE_H_

#define BLOCKINGQUEUE_H_

#include <queue>

#include <pthread.h>

typedef void* CommonItem;

class BlockingQueue

{

public:

BlockingQueue();

virtual ~BlockingQueue();

int peek(CommonItem &item);

int append(CommonItem item);

private:

pthread_mutex_t _mutex;

pthread_cond_t _cond;

std::queue<CommonItem> _read_queue;

std::queue<CommonItem> _write_queue;

};

#endif /* BLOCKINGQUEUE_H_ */

BlockingQueue.cpp 文件代码

复制代码 代码如下:

/*

* BlockingQueue.cpp

*

* Created on: Apr 19, 2013

* Author: archy_yu

*/

#include "BlockingQueue.h"

BlockingQueue::BlockingQueue()

{

pthread_mutex_init(&this->_mutex,NULL);

pthread_cond_init(&this->_cond,NULL);

}

BlockingQueue::~BlockingQueue()

{

pthread_mutex_destroy(&this->_mutex);

pthread_cond_destroy(&this->_cond);

}

int BlockingQueue::peek(CommonItem &item)

{

if( !this->_read_queue.empty() )

{

item = this->_read_queue.front();

this->_read_queue.pop();

}

else

{

pthread_mutex_lock(&this->_mutex);

while(this->_write_queue.empty())

{

pthread_cond_wait(&this->_cond,&this->_mutex);

}

while(!this->_write_queue.empty())

{

this->_read_queue.push(this->_write_queue.front());

this->_write_queue.pop();

}

pthread_mutex_unlock(&this->_mutex);

}

return 0;

}

int BlockingQueue::append(CommonItem item)

{

pthread_mutex_lock(&this->_mutex);

this->_write_queue.push(item);

pthread_cond_signal(&this->_cond);

pthread_mutex_unlock(&this->_mutex);

return 0;

}

测试代码:

复制代码 代码如下:

BlockingQueue _queue;

void* process(void* arg)

{

int i=0;

while(true)

{

int *j = new int();

*j = i;

_queue.append((void *)j);

i ++;

}

return NULL;

}

int main(int argc,char** argv)

{

pthread_t pid;

pthread_create(&pid,0,process,0);

long long int start = get_os_system_time();

int i = 0;

while(true)

{

int* j = NULL;

_queue.peek((void* &)j);

i ++;

if(j != NULL && (*j) == 100000)

{

long long int end = get_os_system_time();

printf("consume %dn",end - start);

break;

}

}

return 0;

}

推荐文章
猜你喜欢
附近的人在看
推荐阅读
拓展阅读
相关阅读
网友关注
最新C语言学习
热门C语言学习
编程开发子分类