实现posix消息队列示例分享_C语言教程-查字典教程网
实现posix消息队列示例分享
实现posix消息队列示例分享
发布时间:2016-12-28 来源:查字典编辑
摘要:mqueue.h复制代码代码如下:////mqueue.h//UNIX_C////Createdby周凯on14-2-9.//Copyrig...

mqueue.h

复制代码 代码如下:

//

// mqueue.h

// UNIX_C

//

// Created by 周凯 on 14-2-9.

// Copyright (c) 2014年 zk. All rights reserved.

//

#ifndef __PS_MQUEUE_H

#define __PS_MQUEUE_H

#include <unistd.h>

#include <sys/types.h>

typedef struct mq_info *mqd_t;

typedef struct mq_attr mq_attr;

#ifdef __cplusplus

extern "C" {

#endif

mqd_t mq_open(const char *name, int flag, .../*mode_t mode, struct mq_attr *attr*/);

int mq_close(mqd_t mqdes);

int mq_unlink(const char *name);

int mq_getattr(mqd_t mqdes,mq_attr *attr);

int mq_setattr(mqd_t mqdes,const mq_attr *attr,mq_attr *old);

int mq_send(mqd_t mqdes,const char *ptr,size_t len,unsigned int prio);

int mq_receive(mqd_t mqdes,char *ptr,size_t len,unsigned int *priop);

//

void mq_info_test(mqd_t mqdes);

#ifdef __cplusplus

}

#endif

#endif

多进程,多线程创建同一个队列测试

复制代码 代码如下:

#include <wrap_ext.h>

#include <mqueue.h>

void *create_mq(void *name){

mqd_t mq;

mq = mq_open("/tmp/mqfile", O_CREAT,FILE_MODE,0);

if (mq == (mqd_t) -1) {

err_ret(errno, "mq_open() error");

return 0;

}

mq_info_test(mq);

mq_close(mq);

return 0;

}

int main(){

mq_unlink("/tmp/mqfile");

if (Fork() == 0) {

create_mq("/tmp/mqfile");

exit(0);

}

Create_detach_thread(create_mq, "/tmp/mqfile");

Create_detach_thread(create_mq, "/tmp/mqfile");

sleep(50);

//mq_unlink("/tmp/mqfile");

return 0;

}

测试结果

复制代码 代码如下:

create,start create...

create,start init...

exists,wait get...

exists,wait get...

create,end init...

mq_hdr.mqh_free:116 bytes

msghdr size:268 bytesmap file size:3332 bytes

next msg offset and msg length:

[384,0];[652,0];[920,0];[1188,0];[1456,0];

[1724,0];[1992,0];[2260,0];[2528,0];exists,start get...

[2796,0];

[3064,0];[0,0];

end,start get...

exists,start get...

mq_hdr.mqh_free:116 bytes

msghdr size:268 bytesmap file size:3332 bytes

next msg offset and msg length:

[384,0];[652,0];[920,0];[1188,0];[1456,0];

[1724,0];[1992,0];[2260,0];[2528,0];[2796,0];

[3064,0];[0,0];

end,start get...

mq_hdr.mqh_free:116 bytes

msghdr size:268 bytesmap file size:3332 bytes

next msg offset and msg length:

[384,0];[652,0];[920,0];[1188,0];[1456,0];

[1724,0];[1992,0];[2260,0];[2528,0];[2796,0];

[3064,0];[0,0];

Program ended with exit code: 0

属性设置、获取测试

复制代码 代码如下:

#include <wrap_ext.h>

#include <mqueue.h>

void print_attr(mq_attr *attr){

assert(attr);

err_msg(" mq_attr mq_flag:0x%0x"

" mq_curmsgs:%d"

" mq_msgsize:%d"

" mq_maxmsg:%d"

,attr->mq_flags

,attr->mq_curmsgs

,attr->mq_msgsize

,attr->mq_maxmsg);

}

void *create_mq(void *name){

pthread_t tid;

mq_attr attr,old;

mqd_t mq;

int flag;

flag = O_CREAT;

tid = pthread_self();

if ((long)tid % 2 != 0) {

flag = O_NONBLOCK;

}

mq = mq_open("/tmp/mqfile", flag | O_CREAT,FILE_MODE,0);

if (mq == (mqd_t) -1) {

err_ret(errno, "mq_open() error");

return 0;

}

if ((long)tid % 2 == 0) {

attr.mq_flags = O_NONBLOCK;

mq_setattr(mq, &attr, &old);

}

else

mq_getattr(mq, &old);

print_attr(&old);

//mq_info_test(mq);

mq_close(mq);

return 0;

}

int main(){

pid_t pid;

mq_unlink("/tmp/mqfile");

if ((pid=Fork()) == 0) {

create_mq("/tmp/mqfile3");

Create_detach_thread(create_mq, "/tmp/mqfile1");

Create_detach_thread(create_mq, "/tmp/mqfile2");

sleep(1);

exit(0);

}

Create_detach_thread(create_mq, "/tmp/mqfile1");

Create_detach_thread(create_mq, "/tmp/mqfile2");

create_mq("/tmp/mqfile3");

wait(0);

sleep(5);

//mq_unlink("/tmp/mqfile");

return 0;

}

测试注册通知规则

复制代码 代码如下:

#include <wrap_ext.h>

#include <mqueue.h>

int main(){

pid_t pid;

Init_wait();

mqd_t mq;

sigevent_t sige;

mq_unlink("/tmp/mqfile");

mq = mq_open("/tmp/mqfile", O_CREAT,FILE_MODE,0);

Signal(SIGCHLD, SIG_DFL);

if (mq == (mqd_t) -1) {

err_sys(errno, "mq_open() error");

}

if ((pid=Fork()) == 0) {

if (mq_notify(mq, &sige) == -1)

err_ret(errno, "mq_notify() error");

Tell_parent();

Wait_parent();

End_wait();

sleep(1);

exit(0);

}

Wait_child();

/*子进程已注册,测试是否能注册、取消通知*/

if (mq_notify(mq, 0) == -1)

err_ret(errno, "mq_notify() error");

if (mq_notify(mq, &sige) == -1)

err_ret(errno, "mq_notify() error");

Tell_child(pid);

End_wait();

wait(0);

sleep(1);

/*子进程已结束,测试是否能注册通知*/

if (mq_notify(mq, &sige) == -1)

err_ret(errno, "mq_notify() error");

//mq_unlink("/tmp/mqfile");

return 0;

}

mqueue.c

复制代码 代码如下:

//

// File.c

// UNIX_C

//

// Created by 周凯 on 14-2-9.

// Copyright (c) 2014年 zk. All rights reserved.

//

#include "mqueue.h"

#include <wrap_ext.h>

#if !defined(_LINUX_)

#define va_mode_t int

#else

#define va_mode_t mode_t

#endif

typedef struct mq_info mq_info;

typedef struct mq_hdr mq_hdr;

//typedef struct mq_attr mq_attr;

typedef struct mq_msg mq_msg;

struct mq_hdr{

mq_attr mqh_attr;

long mqh_head;

long mqh_free;

pthread_cond_t mqh_conn;

pthread_mutex_t mqh_mutex;

sigevent_t mqh_sigevent;

pid_t mqh_pid;

};

struct mq_msg{

long msg_next;/*从映射内存的地址起,到下一个消息的偏移值*/

ssize_t msg_len;

int msg_prio;

};

struct mq_info{

mq_hdr *mqi_hdr;

long long mqi_magic;

int mqi_flag;

};

#define MQ_MAXMSG 12

#define MQ_MSGSIZE 256

#define MQ_MAGIC 0x9235167840

/*

防止以下情况:

一个进程或线程以创建模式打开一个队列,

随后CPU切换当前进程或线程到另一个正

在打开此前创建的队列,但是该队列并未

初始化完毕,故使用一个记录锁加一个线

程锁,进行同步。

注:

该实现不是异步调用安全,即不能在信号处理函数中调用队列打开(创建)函数

*/

#define MQ_LOCK_FILE "/tmp/mq_lock_file"

static struct mq_attr def_attr = {0,MQ_MAXMSG,MQ_MSGSIZE,0};

static pthread_once_t __mq_once = PTHREAD_ONCE_INIT;

static pthread_mutex_t __mq_lock;

static pthread_key_t __mq_key;

static void __mq_once_init();

static int __mq_get_filelock();

static void *__mq_mmap_file(int fd,mq_attr *attr);

static int __mq_init_mmap(void *ptr,mq_attr *attr);

static void __mq_unmap(const char *name,void *ptr);

static void __mq_once_init(){

pthread_mutexattr_t mattr;

Pthread_mutexattr_init(&mattr);

Pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_RECURSIVE);

Pthread_mutex_init(&__mq_lock, &mattr);

Pthread_mutexattr_destroy(&mattr);

Pthread_key_create(&__mq_key, 0);

}

static int __mq_get_filelock(){

int fd,tmp;

Pthread_mutex_lock(&__mq_lock);

if ((fd = (int)Pthread_getspecific(__mq_key)) == 0) {

fd = open(MQ_LOCK_FILE, O_CREAT | O_EXCL | O_WRONLY, FILE_MODE);

if (fd == -1 && errno != EEXIST)

err_sys(errno, "mq_open(),__mq_get_filelock() error");

else

fd =Open(MQ_LOCK_FILE, O_WRONLY, 0);

if (fd == 0) {

tmp = Open(MQ_LOCK_FILE, O_WRONLY, 0);

close(fd);

fd = tmp;

}

Pthread_setspecific(__mq_key, (void*)fd);

}

Pthread_mutex_unlock(&__mq_lock);

return fd;

}

static void *__mq_mmap_file(int fd,mq_attr *attr){

size_t filesize;

void *ptr;

if (attr == 0) {

attr = &def_attr;

}

if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {

errno = EINVAL;

return MAP_FAILED;

}

filesize = sizeof(mq_hdr)+(sizeof(mq_msg)+ALIGN_VAL(attr->mq_msgsize, sizeof(long)))*attr->mq_maxmsg;

if(lseek(fd, filesize - 1, SEEK_SET)<0)

return MAP_FAILED;

if(write(fd,"",1)!=1)

return MAP_FAILED;

ptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);

return ptr;

}

static void __mq_unmap(const char *name,void *ptr){

size_t filesize;

stat_t fstat;

assert(name);

if (stat(name, &fstat) == -1) {

return;

}

filesize = (size_t)fstat.st_size;

unlink(name);

if (ptr == MAP_FAILED) {

return;

}

munmap(ptr, filesize);

return;

}

static int __mq_init_mmap(void *ptr,mq_attr *attr){

char *tmp;

size_t index,i;

int flag;

mq_hdr *mqhdr;

mq_msg *mqmsg;

pthread_condattr_t cattr;

pthread_mutexattr_t mattr;

assert(ptr);

if (attr == 0) {

attr = &def_attr;

}

if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {

errno = EINVAL;

return -1;

}

tmp = ptr;

mqhdr = (mq_hdr*)tmp;

mqhdr->mqh_attr.mq_flags = 0;

mqhdr->mqh_attr.mq_curmsgs = 0;

mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;

mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;

flag = pthread_condattr_init(&cattr);

if (flag) {

errno = flag;

return -1;

}

flag = pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);

if (flag) {

errno = flag;

return -1;

}

flag = pthread_cond_init(&mqhdr->mqh_conn, &cattr);

if (flag) {

errno = flag;

return -1;

}

flag = pthread_condattr_destroy(&cattr);

if (flag) {

errno = flag;

return -1;

}

flag = pthread_mutexattr_init(&mattr);

if (flag) {

errno = flag;

return -1;

}

flag = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);

if (flag) {

errno = flag;

return -1;

}

flag = pthread_mutex_init(&mqhdr->mqh_mutex, &mattr);

if (flag) {

errno = flag;

return -1;

}

flag = pthread_mutexattr_destroy(&mattr);

if (flag) {

errno = flag;

return -1;

}

index = mqhdr->mqh_free = sizeof(mq_hdr);

mqmsg = (mq_msg*)(tmp+index);

for (i = 0; i < attr->mq_maxmsg - 1; i++) {

mqmsg->msg_next = sizeof(mq_msg) + ALIGN_VAL(attr->mq_msgsize, sizeof(long)) + index;

index = mqmsg->msg_next;

mqmsg ++;

//mqmsg = (mq_msg*)(tmp+index);

}

mqmsg->msg_next = 0;

return 0;

}

mqd_t mq_open(const char *name,int flag,...){

int fd, nonblock, lockfile_fd, err;

void *ptr;

mq_attr *mqattr;

mqd_t mqdesc;

stat_t filestat;

debug_assert("Invalid pointer", "mq_open()", name);

Pthread_once(&__mq_once, __mq_once_init);

nonblock = flag & O_NONBLOCK;

mqattr = NULL;

mqdesc = NULL;

ptr = MAP_FAILED;

__again:

if (flag & O_CREAT) {

va_list vp;

mode_t mode;

/*分析可变参数*/

va_start(vp, flag);

mode = va_arg(vp, va_mode_t);

mqattr = va_arg(vp, mq_attr *);

va_end(vp);

Pthread_mutex_lock(&__mq_lock);

lockfile_fd = __mq_get_filelock();

write_lock_wait(lockfile_fd, SEEK_SET, 0, 0);

fd = open(name, flag | O_CREAT | O_EXCL | O_RDWR, mode);

if (fd < 0) {

/*如果指定了O_EXCL,并且文件已存在,则等待其他进程或线程完成初始化*/

if (errno == EEXIST && (flag & O_EXCL) == 1) {

return (mqd_t)-1;

}

goto __exists_wait_init;

}

/*初始化内存映射文件*/

err_msg("create,start init...");

/*初始化映射文件大小(注意必须使文件长度达到映射的大小),且映射文件到内存*/

ptr = __mq_mmap_file(fd, mqattr);

//sleep(1);

if (ptr == MAP_FAILED) {

goto __err;

}

/*初始化映射内存的内容*/

if (__mq_init_mmap(ptr, mqattr) < 0) {

goto __err;

}

mqdesc = (mqd_t)calloc(1, sizeof(mq_hdr));

if (mqdesc == 0) {

goto __err;

}

mqdesc->mqi_hdr = (mq_hdr*)ptr;

mqdesc->mqi_flag = nonblock;

mqdesc->mqi_magic = MQ_MAGIC;

err_msg("create,end init...");

file_unlock(lockfile_fd, SEEK_SET, 0, 0);

Pthread_mutex_unlock(&__mq_lock);

return mqdesc;

}

__exists_wait_init:

fd = open(name, O_RDWR, 0);

if (fd < 0 ) {

if (errno == ENOENT && (flag & O_CREAT)) {

goto __again;

}

goto __err;

}

err_msg("exists,start get...");

if (stat(name, &filestat) == -1) {

if (errno == ENOENT && (flag & O_CREAT)) {

goto __again;

}

goto __err;

}

ptr = mmap(0, (size_t)filestat.st_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);

if (ptr == MAP_FAILED) {

goto __err;

}

mqdesc = (mqd_t)calloc(1, sizeof(mq_hdr));

if (mqdesc == 0) {

goto __err;

}

mqdesc->mqi_hdr = (mq_hdr*)ptr;

mqdesc->mqi_flag = nonblock;

mqdesc->mqi_magic = MQ_MAGIC;

close(fd);

file_unlock(lockfile_fd, SEEK_SET, 0, 0);

Pthread_mutex_unlock(&__mq_lock);

err_msg("end,start get...");

return mqdesc;

__err:

file_unlock(lockfile_fd, SEEK_SET, 0, 0);

Pthread_mutex_unlock(&__mq_lock);

err = errno;

__mq_unmap(name, ptr);

close(fd);

if (mqdesc)

free(mqdesc);

errno = err;

return (mqd_t)-1;

}

int mq_close(mqd_t mqdes){

size_t filesize;

mq_attr *mattr;

int flag;

assert(mqdes);

if (mqdes->mqi_magic != MQ_MAGIC) {

errno = EBADF;

return -1;

}

mattr = &mqdes->mqi_hdr->mqh_attr;

filesize = mattr->mq_maxmsg * (sizeof(mq_msg)* ALIGN_VAL(mattr->mq_msgsize, sizeof(long))) + sizeof(mq_hdr);

flag = munmap((void*)mqdes->mqi_hdr, filesize);

mqdes->mqi_magic = 0;

free(mqdes);

return flag;

}

int mq_unlink(char const *name){

assert(name);

return unlink(name);

}

int mq_getattr(mqd_t mqdes,mq_attr *attr){

int flag;

mq_attr *tmp;

assert(mqdes);

assert(attr);

if (mqdes->mqi_magic != MQ_MAGIC) {

errno = EBADF;

return -1;

}

tmp = &mqdes->mqi_hdr->mqh_attr;

/*防止其他进程或线程在改变属性值*/

flag = pthread_mutex_lock(&mqdes->mqi_hdr->mqh_mutex);

if (flag > 0) {

errno = flag;

return -1;

}

bcopy(&mqdes->mqi_hdr->mqh_attr, attr, sizeof(mq_attr));

attr->mq_flags = mqdes->mqi_flag;

flag = pthread_mutex_unlock(&mqdes->mqi_hdr->mqh_mutex);

if (flag > 0) {

errno = flag;

return -1;

}

return 0;

}

int mq_setattr(mqd_t mqdes,const mq_attr *attr,mq_attr *old){

int flag;

mq_attr *tmp;

assert(mqdes);

assert(attr);

if (mqdes->mqi_magic != MQ_MAGIC) {

errno = EBADF;

return -1;

}

tmp = &mqdes->mqi_hdr->mqh_attr;

/*防止其他进程或线程在读取属性值*/

flag = pthread_mutex_lock(&mqdes->mqi_hdr->mqh_mutex);

if (flag > 0) {

errno = flag;

return -1;

}

if (old != NULL) {

bcopy(&mqdes->mqi_hdr->mqh_attr, old, sizeof(mq_attr));

old->mq_flags = mqdes->mqi_flag;

}

/*创建后,只有文件标识可以改变*/

//bcopy(attr, &mqdes->mqi_hdr->mqh_attr, sizeof(mq_attr));

/*只有O_NONBLOCK标志可以存储*/

if (attr->mq_flags & O_NONBLOCK) {

mqdes->mqi_flag |= O_NONBLOCK;

}

else {

mqdes->mqi_flag &= ~O_NONBLOCK;

}

flag = pthread_mutex_unlock(&mqdes->mqi_hdr->mqh_mutex);

if (flag > 0) {

errno = flag;

return -1;

}

return 0;

}

int mq_notify(mqd_t mqdes,const struct sigevent *notification){

sigevent_t *old;

pid_t pid;

int flag;

assert(mqdes);

if (mqdes->mqi_magic != MQ_MAGIC) {

errno = EBADF;

return -1;

}

flag = pthread_mutex_lock(&mqdes->mqi_hdr->mqh_mutex);

if (flag > 0) {

errno = flag;

return -1;

}

pid = mqdes->mqi_hdr->mqh_pid;

/*已设置*/

if (pid != 0) {

/*发送一个0信号给注册的进程,如果能发送,或者不能发送但不是返回没有进程的错误(可能权限不够),则不能再次注册通知*/

/*有效进程*/

if (kill(pid, 0) != -1 || errno != ESRCH) {

if (notification == 0) {

if (pid != getpid()) {

errno = EPERM;

flag = -1;

}

else {

mqdes->mqi_hdr->mqh_pid = 0;

flag = 0;

}

}

else {

errno = EBUSY;

flag = -1;

}

goto __return;

}

/*无效进程*/

}

/*未设置*/

if (notification != 0) {

mqdes->mqi_hdr->mqh_pid = getpid();

old = &mqdes->mqi_hdr->mqh_sigevent;

bcopy(notification, old, sizeof(sigevent_t));

}

flag = 0;

__return:

pthread_mutex_unlock(&mqdes->mqi_hdr->mqh_mutex);

return flag;

}

void mq_info_test(mqd_t mqdes){

size_t i,msgsize,index;

mq_msg *msg;

mq_attr *mattr;

assert(mqdes);

mattr = &mqdes->mqi_hdr->mqh_attr;

msgsize = sizeof(mq_msg) + ALIGN_VAL(mattr->mq_msgsize, sizeof(long));

index = mqdes->mqi_hdr->mqh_free;

err_msg("mq_hdr.mqh_free:%ld bytesn"

"msghdr size:%u bytes"

"map file size:%u bytes"

, index

, msgsize

, mattr->mq_maxmsg * msgsize + index);

err_msg("next msg offset and msg length:");

msg = (mq_msg*)&((char*)mqdes->mqi_hdr)[index];

for (i = 0; i < mattr->mq_maxmsg; i++) {

fprintf(stderr, "[%ld,%ld];", msg->msg_next, msg->msg_len);

if ((i+1)%5 == 0) {

fprintf(stderr,"n");

}

msg ++ ;

}

if ((i+1)%5 != 0) {

fprintf(stderr,"n");

}

return;

}

相关阅读
推荐文章
猜你喜欢
附近的人在看
推荐阅读
拓展阅读
  • 大家都在看
  • 小编推荐
  • 猜你喜欢
  • 最新C语言学习
    热门C语言学习
    编程开发子分类