信号量同步共享内存读写实例
前言在前几篇文章讲述了共享内存和信号量在共享内存的文章中两个进程不是并发运行的而是线性运行等一个进程写完了退出了再启动另一个进程进行读操作因为他们的操作不是原子的是不安全的。而前面又讲到了信号量可以进行进程间的同步所以把信号量结合到共享内存中就能在多进程进行共享内存的同步操作原文链接https://kidwjb.top/archives/215目标目前使用一个生产者和一个消费者的简单模型同时读写一个100字节大小的共享内存数据区不考虑环形缓冲区读写完即止消费者作为父进程创建生产者子进程生产者完成所有写操作之后退出。两个进程同时进行对共享内存的读写代码实现共享内存数据结构定义笔者使用的是无名信号量所以是需要共享内存支持才能在进程之间作用所以把sem_t也封装到共享内存结构体里面typedef struct shareData { sem_t sem; //64位下32字节内部对齐是8的倍数 uint8_t data[DATA_SIZE]; uint32_t write_ptr 0; uint32_t read_ptr 0; uint32_t sem_init_flag 0; uint8_t reserved[112]; //作为字节对齐保留 }shareData_t; //总计256字节根据sem_t的定义#if __WORDSIZE 64 # define __SIZEOF_SEM_T 32 #else # define __SIZEOF_SEM_T 16 #endif /* Value returned if sem_open failed. */ #define SEM_FAILED ((sem_t *) 0) typedef union { char __size[__SIZEOF_SEM_T]; long int __align; } sem_t;笔者在64位机做实验这里对应__SIZEOF_SEM_T值为32然后由于sem_t联合体里面有long int的数据类型那么它的内存对齐就是8个字节倍数所以综合考虑最终共享内存结构体大小定义为256字节sem_init_flag用于判断是否已经初始化信号量因为无名信号量的初始化只需要一次共享内存类实现为了减少主函数的代码量和可移植性笔者将对共享内存的所有操作封装成一个类两个进程只需要调用类的接口即可这样看起来更直观清晰class sharedMem { private: shareData_t *shm_ptr; char *shm_name; public: sharedMem(/* args */); ~sharedMem(); int init(char *name); void deinit(); void destroy(); int write(uint8_t *data,size_t size); int read(uint8_t *data,size_t size); };类的私有成员一个是指向共享内存地址的指针还有一个是共享内存的名称这里使用的是POSIX的共享内存API整个类的接口有初始化去初始化销毁共享内存写共享内存读共享内存几种初始化共享内存的初始化主要是分为两步一步是创建共享内存如果没有并映射其地址fd shm_open(shm_name,O_CREAT | O_RDWR,0640); ftruncate(fd,sizeof(shareData_t)); shm_ptr (shareData_t *)mmap(NULL,sizeof(shareData_t),PROT_READ | PROT_WRITE,MAP_SHARED,fd,0);第二部是判断信号量是否已经初始化如果没有初始化就需要进行初始化初始化后其他进程避免初始化在对信号量的初始化判断的时候使用到了文件锁进行加锁操作避免其他进程同时进行判断从而可能存在竞争。if(flock(fd,LOCK_EX) 0) //加文件锁排他锁查看是否已经初始化sem { if(shm_ptr-sem_init_flag ! SEM_INIT_FLAG) { ret sem_init(shm_ptr-sem,1,1); if(ret ! 0) { perror(sem_init failed:); return -1; } shm_ptr-sem_init_flag SEM_INIT_FLAG; SHM_PRINT(sem_init succ); } else SHM_PRINT(sem inited); flock(fd,LOCK_UN);//释放锁 }去初始化去初始化的作用就是接触共享内存的映射munmap(shm_ptr,sizeof(shareData_t));销毁共享内存销毁共享内存只需要调用一次一般由管理这个共享内存的主进程调用销毁shm_unlink(shm_name);写共享内存读写共享内存时需要加信号量操作就如同多线程读写加互斥锁操作一样这里笔者的写完成判断是写指针达到data数组的大小在这之后生产者进程直接退出sem_wait(shm_ptr-sem); if(sizeof(shm_ptr-data) - shm_ptr-write_ptr size) { memcpy(shm_ptr-data shm_ptr-write_ptr,data,size); shm_ptr-write_ptr size; sem_post(shm_ptr-sem); return size; } else { sem_post(shm_ptr-sem); SHM_PRINT(shared Mem full); return -1; }读共享内存读共享内存也和写操作一样,有时候可能读操作会比写操作快那么就需要重新进行等待sem_wait(shm_ptr-sem); if(shm_ptr-read_ptr sizeof(shm_ptr-data)) { sem_post(shm_ptr-sem); return -1; } if(shm_ptr-write_ptr - shm_ptr-read_ptr size) { memcpy(data,shm_ptr-data shm_ptr-read_ptr,size); shm_ptr-read_ptr size; sem_post(shm_ptr-sem); return size; } else { sem_post(shm_ptr-sem); SHM_PRINT(read faster than write); return 0; }主进程消费者这里笔者将消费者进程作为主进程在进行信号量的初始化之后会去fork子进程子进程调用execve函数执行生产者内容int main(int argc,char **argv) { uint8_t buff[4]; int ret 0; pid_t child_pid 0; sharedMem g_shareMem; int status; ret g_shareMem.init(SHM_NAME); if((child_pid fork()) 0) { char *argv[] { ./sharedMemWrite, NULL }; char *envp[] { NULL }; execve(./sharedMemWrite,argv,envp); } while(1) { sleep(1); ret g_shareMem.read(buff,sizeof(buff)); if(ret 0) { break; } else if(ret 0) { continue; } SHM_PRINT(read buff: %u %u %u %u,buff[0],buff[1],buff[2],buff[3]); } g_shareMem.deinit(); waitpid(child_pid,status,0); SHM_PRINT(child proc exit); g_shareMem.destroy(); return 0; }子进程生产者子进程代码就比较简单了就直接映射共享内存然后写就行了int main(int argc,char **agrv) { uint8_t buff[4]; int ret 0; uint8_t i 0; uint8_t j 0; sharedMem g_shareMem; ret g_shareMem.init(SHM_NAME); while(1) { sleep(1); for(i 0;i sizeof(buff);i) { buff[i] i j; } ret g_shareMem.write(buff,sizeof(buff)); if(ret 0) break; j j 4; } g_shareMem.deinit(); return 0; }结果现象执行sharedMemRead程序结果如下jhliutest:~/OS_code/sharedMemSync$ ls shareData.h sharedMem.cpp sharedMem.h sharedMemRead sharedMemRead.cpp sharedMemWrite sharedMemWrite.cpp jhliutest:~/OS_code/sharedMemSync$ ./sharedMemRead PID615102: shm_open succ PID615102: shm st_size:256 PID615102: mmap succ PID615102: sem_init succ PID615104: shm_open succ PID615104: shm st_size:256 PID615104: mmap succ PID615104: sem inited PID615102: read faster than write PID615102: read buff: 0 1 2 3 PID615102: read buff: 4 5 6 7 PID615102: read buff: 8 9 10 11 PID615102: read buff: 12 13 14 15 PID615102: read buff: 16 17 18 19 PID615102: read buff: 20 21 22 23 PID615102: read buff: 24 25 26 27 PID615102: read buff: 28 29 30 31 PID615102: read buff: 32 33 34 35 PID615102: read buff: 36 37 38 39 PID615102: read buff: 40 41 42 43 PID615102: read buff: 44 45 46 47 PID615102: read buff: 48 49 50 51 PID615102: read buff: 52 53 54 55 PID615102: read buff: 56 57 58 59 PID615102: read buff: 60 61 62 63 PID615102: read buff: 64 65 66 67 PID615102: read buff: 68 69 70 71 PID615102: read buff: 72 73 74 75 PID615102: read buff: 76 77 78 79 PID615102: read buff: 80 81 82 83 PID615102: read buff: 84 85 86 87 PID615102: read buff: 88 89 90 91 PID615102: read buff: 92 93 94 95 PID615102: read buff: 96 97 98 99 PID615104: shared Mem full PID615104: munmap succ PID615102: munmap succ PID615102: child proc exit PID615102: shm_unlink succ可以看到主进程615102先创建共享内存然后映射到自己的虚拟内存中并且初始化了信号量然后启动子进程615104子进程只需要映射共享内存并判断信号量已经初始化。接下来生产者和消费者两个进程就轮流读写。最后子进程615104退出然后主进程615102检测到子进程退出也销毁共享内存上面生产者和消费者的读写速度都是1s一次下面笔者改了生产者的速度为2s一次jhliutest:~/OS_code/sharedMemSync$ ./sharedMemRead PID615614: shm_open succ PID615614: shm st_size:256 PID615614: mmap succ PID615614: sem_init succ PID615615: shm_open succ PID615615: shm st_size:256 PID615615: mmap succ PID615615: sem inited PID615614: read faster than write PID615614: read faster than write PID615614: read buff: 0 1 2 3 PID615614: read faster than write PID615614: read buff: 4 5 6 7 PID615614: read faster than write PID615614: read buff: 8 9 10 11 PID615614: read faster than write PID615614: read buff: 12 13 14 15 PID615614: read faster than write PID615614: read buff: 16 17 18 19 PID615614: read faster than write PID615614: read buff: 20 21 22 23 PID615614: read faster than write PID615614: read buff: 24 25 26 27 PID615614: read faster than write PID615614: read buff: 28 29 30 31 PID615614: read faster than write PID615614: read buff: 32 33 34 35 PID615614: read buff: 36 37 38 39 PID615614: read faster than write PID615614: read buff: 40 41 42 43 PID615614: read faster than write PID615614: read buff: 44 45 46 47 PID615614: read faster than write PID615614: read buff: 48 49 50 51 PID615614: read faster than write PID615614: read buff: 52 53 54 55 PID615614: read faster than write PID615614: read buff: 56 57 58 59 PID615614: read faster than write PID615614: read buff: 60 61 62 63 PID615614: read faster than write PID615614: read buff: 64 65 66 67 PID615614: read faster than write PID615614: read buff: 68 69 70 71 PID615614: read faster than write PID615614: read buff: 72 73 74 75 PID615614: read faster than write PID615614: read buff: 76 77 78 79 PID615614: read faster than write PID615614: read buff: 80 81 82 83 PID615614: read faster than write PID615614: read buff: 84 85 86 87 PID615614: read faster than write PID615614: read buff: 88 89 90 91 PID615614: read faster than write PID615614: read buff: 92 93 94 95 PID615614: read faster than write PID615614: read buff: 96 97 98 99 PID615614: munmap succ PID615615: shared Mem full PID615615: munmap succ PID615614: child proc exit PID615614: shm_unlink succ完整源码shareData.h#ifndef __SHAREDATA__H #define __SHAREDATA__H #include semaphore.h #include stdint.h #include stdio.h #include unistd.h #define SHM_NAME /wjbtest #define SHM_PRINT(fmt, ...) printf(PID%d: fmt \r\n, getpid(), ##__VA_ARGS__) #define DATA_SIZE 100 #define SEM_INIT_FLAG 0x12345678 typedef struct shareData { sem_t sem; //64位下32字节内部对齐是8的倍数 uint8_t data[DATA_SIZE]; uint32_t write_ptr 0; uint32_t read_ptr 0; uint32_t sem_init_flag 0; uint8_t reserved[112]; //作为字节对齐保留 }shareData_t; //总计256字节 #endifsharedMem.h:#ifndef __SHAREDMEM_H #define __SHAREDMEM_H #include shareData.h class sharedMem { private: shareData_t *shm_ptr; char *shm_name; public: sharedMem(/* args */); ~sharedMem(); int init(char *name); void deinit(); void destroy(); int write(uint8_t *data,size_t size); int read(uint8_t *data,size_t size); }; #endifsharedMem.cpp:#include cstddef #include sys/mman.h #include sys/stat.h /* For mode constants */ #include fcntl.h /* For O_* constants */ #include stdio.h #include unistd.h #include string.h #include sys/file.h #include sharedMem.h sharedMem::sharedMem(/* args */) { } sharedMem::~sharedMem() { } int sharedMem::init(char *name) { int fd 0; int ret 0; shm_name name; fd shm_open(shm_name,O_CREAT | O_RDWR,0640); if(fd -1) { perror(shm_open failed:); return -1; } SHM_PRINT(shm_open succ); if(ftruncate(fd,sizeof(shareData_t)) ! 0) { perror(ftruncate failed:); return -1; } struct stat filestate; fstat(fd,filestate); SHM_PRINT(shm st_size:%ld,filestate.st_size); shm_ptr (shareData_t *)mmap(NULL,sizeof(shareData_t),PROT_READ | PROT_WRITE,MAP_SHARED,fd,0); if(shm_ptr NULL) { perror(mmap failed:); return -1; } SHM_PRINT(mmap succ); if(flock(fd,LOCK_EX) 0) //加文件锁排他锁查看是否已经初始化sem { if(shm_ptr-sem_init_flag ! SEM_INIT_FLAG) { ret sem_init(shm_ptr-sem,1,1); if(ret ! 0) { perror(sem_init failed:); return -1; } shm_ptr-sem_init_flag SEM_INIT_FLAG; SHM_PRINT(sem_init succ); } else SHM_PRINT(sem inited); flock(fd,LOCK_UN);//释放锁 } close(fd); return 0; } void sharedMem::deinit() { munmap(shm_ptr,sizeof(shareData_t)); SHM_PRINT(munmap succ); } void sharedMem::destroy() { shm_unlink(shm_name); SHM_PRINT(shm_unlink succ); } int sharedMem::write(uint8_t *data,size_t size) { sem_wait(shm_ptr-sem); if(sizeof(shm_ptr-data) - shm_ptr-write_ptr size) { memcpy(shm_ptr-data shm_ptr-write_ptr,data,size); shm_ptr-write_ptr size; sem_post(shm_ptr-sem); return size; } else { sem_post(shm_ptr-sem); SHM_PRINT(shared Mem full); return -1; } } int sharedMem::read(uint8_t *data,size_t size) { sem_wait(shm_ptr-sem); if(shm_ptr-read_ptr sizeof(shm_ptr-data)) { sem_post(shm_ptr-sem); return -1; } if(shm_ptr-write_ptr - shm_ptr-read_ptr size) { memcpy(data,shm_ptr-data shm_ptr-read_ptr,size); shm_ptr-read_ptr size; sem_post(shm_ptr-sem); return size; } else { sem_post(shm_ptr-sem); SHM_PRINT(read faster than write); return 0; } }sharedMemRead.cpp:#include cstddef #include sys/mman.h #include sys/stat.h /* For mode constants */ #include fcntl.h /* For O_* constants */ #include stdio.h #include unistd.h #include string.h #include sys/wait.h #include sharedMem.h int main(int argc,char **argv) { uint8_t buff[4]; int ret 0; pid_t child_pid 0; sharedMem g_shareMem; int status; ret g_shareMem.init(SHM_NAME); if((child_pid fork()) 0) { char *argv[] { ./sharedMemWrite, NULL }; char *envp[] { NULL }; execve(./sharedMemWrite,argv,envp); } while(1) { sleep(1); ret g_shareMem.read(buff,sizeof(buff)); if(ret 0) { break; } else if(ret 0) { continue; } SHM_PRINT(read buff: %u %u %u %u,buff[0],buff[1],buff[2],buff[3]); } g_shareMem.deinit(); waitpid(child_pid,status,0); SHM_PRINT(child proc exit); g_shareMem.destroy(); return 0; }sharedMemWrite.cpp:#include cstddef #include sys/mman.h #include sys/stat.h /* For mode constants */ #include fcntl.h /* For O_* constants */ #include stdio.h #include unistd.h #include string.h #include sharedMem.h int main(int argc,char **agrv) { uint8_t buff[4]; int ret 0; uint8_t i 0; uint8_t j 0; sharedMem g_shareMem; ret g_shareMem.init(SHM_NAME); while(1) { sleep(2); for(i 0;i sizeof(buff);i) { buff[i] i j; } ret g_shareMem.write(buff,sizeof(buff)); if(ret 0) break; j j 4; } g_shareMem.deinit(); return 0; }