Welcome 微信登录

首页 / 操作系统 / Linux / 并发无锁环形队列的实现

前面在《Linux内核数据结构kfifo详解》一文中详细解析了 Linux  内核并发无锁环形队列kfifo的原理和实现,kfifo鬼斧神工,博大精深,让人叹为观止,但遗憾的是kfifo为内核提供服务,并未开放出来。剑不试则利钝暗,弓不试则劲挠诬,鹰不试则巧拙惑,马不试则良驽疑,光说不练是不能学到精髓的,下面就动手实现自己的并发无锁队列UnlockQueue(单生产者单消费者)。

一、UnlockQueue声明

1: #ifndef _UNLOCK_QUEUE_H 2: #define _UNLOCK_QUEUE_H 3: 4: class UnlockQueue 5: { 6: public: 7:   UnlockQueue(int nSize); 8:   virtual ~UnlockQueue(); 9:  10:   bool Initialize();11:  12:   unsigned int Put(const unsigned char *pBuffer, unsigned int nLen);13:   unsigned int Get(unsigned char *pBuffer, unsigned int nLen);14:  15:   inline void Clean() { m_nIn = m_nOut = 0; }16:   inline unsigned int GetDataLen() const { return  m_nIn - m_nOut; }17:  18: private:19:   inline bool is_power_of_2(unsigned long n) { return (n != 0 && ((n & (n - 1)) == 0)); };20:   inline unsigned long roundup_power_of_two(unsigned long val);21:  22: private:23:   unsigned char *m_pBuffer;    /* the buffer holding the data */24:   unsigned int m_nSize;        /* the size of the allocated buffer */25:   unsigned int m_nIn;        /* data is added at offset (in % size) */26:   unsigned int m_nOut;        /* data is extracted from off. (out % size) */27: };28:  29: #endifUnlockQueue与kfifo 结构相同相同,也是由一下变量组成:
UnlockQueuekfifo作用
m_pBufferbuffer用于存放数据的缓存
m_nSizesize缓冲区空间的大小,圆整为2的次幂
m_nInin指向buffer中队头
m_nOutout指向buffer中的队尾
UnlockQueue的设计是用在单生产者单消费者情况下,所以不需要锁lock如果使用不能保证任何时间最多只有一个读线程和写线程,必须使用该lock实施同步。

二、UnlockQueue构造函数和初始化

1: UnlockQueue::UnlockQueue(int nSize) 2: :m_pBuffer(NULL) 3: ,m_nSize(nSize) 4: ,m_nIn(0) 5: ,m_nOut(0) 6: { 7:   //round up to the next power of 2 8:   if (!is_power_of_2(nSize)) 9:   {10:       m_nSize = roundup_power_of_two(nSize);11:   }12: }13:  14: UnlockQueue::~UnlockQueue()15: {16:   if(NULL != m_pBuffer)17:   {18:       delete[] m_pBuffer;19:       m_pBuffer = NULL;20:   }21: }22:  23: bool UnlockQueue::Initialize()24: {25:   m_pBuffer = new unsigned char[m_nSize];26:   if (!m_pBuffer)27:   {28:       return false;29:   }30:  31:   m_nIn = m_nOut = 0;32:  33:   return true;34: }35:  36: unsigned long UnlockQueue::roundup_power_of_two(unsigned long val)37: {38:   if((val & (val-1)) == 0)39:       return val;40:  41:   unsigned long maxulong = (unsigned long)((unsigned long)~0);42:   unsigned long andv = ~(maxulong&(maxulong>>1));43:   while((andv & val) == 0)44:       andv = andv>>1;45:  46:   return andv<<1;47: }1.在构造函数中,对传入的size进行2的次幂圆整,圆整的好处是可以将m_nIn % m_nSize 可以转化为 m_nIn  & (m_nSize – 1),取模运算”的效率并没有 “位运算” 的效率高。2.在构造函数中,未给buffer分配内存,而在Initialize中分配,这样做的原因是:我们知道在new UnlockQueue的时候有两步操作,第一步分配内存,第二步调用构造函数,如果将buffer的分配放在构造函数中,那么就可能 buffer 就可能分配失败,而后面用到buffer,还需要判空。

三、UnlockQueue入队和出队操作

1: unsigned int UnlockQueue::Put(const unsigned char *buffer, unsigned int len) 2: { 3:   unsigned int l; 4: 5:   len = std::min(len, m_nSize - m_nIn + m_nOut); 6: 7:   /* 8:    * Ensure that we sample the m_nOut index -before- we 9:    * start putting bytes into the UnlockQueue.10:    */11:   __sync_synchronize();12:  13:   /* first put the data starting from fifo->in to buffer end */14:   l = std::min(len, m_nSize - (m_nIn  & (m_nSize - 1)));15:   memcpy(m_pBuffer + (m_nIn & (m_nSize - 1)), buffer, l);16:  17:   /* then put the rest (if any) at the beginning of the buffer */18:   memcpy(m_pBuffer, buffer + l, len - l);19:  20:   /*21:    * Ensure that we add the bytes to the kfifo -before-22:    * we update the fifo->in index.23:    */24:   __sync_synchronize();25:  26:   m_nIn += len;27:  28:   return len;29: }30:  31: unsigned int UnlockQueue::Get(unsigned char *buffer, unsigned int len)32: {33:   unsigned int l;34:  35:   len = std::min(len, m_nIn - m_nOut);36:  37:   /*38:    * Ensure that we sample the fifo->in index -before- we39:    * start removing bytes from the kfifo.40:    */41:   __sync_synchronize();42:  43:   /* first get the data from fifo->out until the end of the buffer */44:   l = std::min(len, m_nSize - (m_nOut & (m_nSize - 1)));45:   memcpy(buffer, m_pBuffer + (m_nOut & (m_nSize - 1)), l);46:  47:   /* then get the rest (if any) from the beginning of the buffer */48:   memcpy(buffer + l, m_pBuffer, len - l);49:  50:   /*51:    * Ensure that we remove the bytes from the kfifo -before-52:    * we update the fifo->out index.53:    */54:   __sync_synchronize();55:  56:   m_nOut += len;57:  58:   return len;59: }      入队和出队操作与kfifo相同,用到的技巧也完全相同,有不理解的童鞋可以参考前面一篇文章《Linux内核数据结构kfifo详解》。这里需要指出的是__sync_synchronize()函数,由于linux并未开房出内存屏障函数,而在gcc4.2以上版本提供This builtin issues a full memory barrier,有兴趣同学可以参考Built-in functions for atomic memory access。

四、测试程序

如图所示,我们设计了两个线程,一个生产者随机生成学生信息放入队列,一个消费者从队列中取出学生信息并打印,可以看到整个代码是无锁的。 1: #include "UnlockQueue.h" 2: #include <iostream> 3: #include <algorithm> 4: #include <pthread.h> 5: #include <time.h> 6: #include <stdio.h> 7: #include <errno.h> 8: #include <string.h> 9:  10: struct student_info11: {12:    long stu_id;13:    unsigned int age;14:    unsigned int score;15: };16:  17: void print_student_info(const student_info *stu_info)18: {19:   if(NULL == stu_info)20:       return;21:  22:   printf("id:%ld ",stu_info->stu_id);23:   printf("age:%u ",stu_info->age);24:   printf("score:%u ",stu_info->score);25: }26:  27: student_info * get_student_info(time_t timer)28: {29:      student_info *stu_info = (student_info *)malloc(sizeof(student_info));30:      if (!stu_info)31:      {32:       fprintf(stderr, "Failed to malloc memory. ");33:       return NULL;34:      }35:      srand(timer);36:      stu_info->stu_id = 10000 + rand() % 9999;37:      stu_info->age = rand() % 30;38:      stu_info->score = rand() % 101;39:      //print_student_info(stu_info);40:      return stu_info;41: }42:  43: void * consumer_proc(void *arg)44: {45:      UnlockQueue* queue = (UnlockQueue *)arg;46:      student_info stu_info;47:      while(1)48:      {49:          sleep(1);50:          unsigned int len = queue->Get((unsigned char *)&stu_info, sizeof(student_info));51:          if(len > 0)52:          {53:              printf("------------------------------------------ ");54:              printf("UnlockQueue length: %u ", queue->GetDataLen());55:              printf("Get a student ");56:              print_student_info(&stu_info);57:              printf("------------------------------------------ ");58:          }59:      }60:      return (void *)queue;61: }62:  63: void * producer_proc(void *arg)64:  {65:     time_t cur_time;66:     UnlockQueue *queue = (UnlockQueue*)arg;67:     while(1)68:     {69:         time(&cur_time);70:         srand(cur_time);71:         int seed = rand() % 11111;72:         printf("****************************************** ");73:         student_info *stu_info = get_student_info(cur_time + seed);74:         printf("put a student info to queue. ");75:         queue->Put( (unsigned char *)stu_info, sizeof(student_info));76:         free(stu_info);77:         printf("UnlockQueue length: %u ", queue->GetDataLen());78:         printf("****************************************** ");79:         sleep(1);80:     }81:      return (void *)queue;82: }83:  84:  85: int main()86: {87:   UnlockQueue unlockQueue(1024);88:   if(!unlockQueue.Initialize())89:   {90:       return -1;91:   }92:  93:   pthread_t consumer_tid, producer_tid;94:  95:   printf("multi thread test....... ");96:  97:   if(0 != pthread_create(&producer_tid, NULL, producer_proc, (void*)&unlockQueue))98:   {99:          fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s ", 100:                  errno, strerror(errno)); 101:          return -1; 102:   } 103: 104:   if(0 != pthread_create(&consumer_tid, NULL, consumer_proc, (void*)&unlockQueue)) 105:   { 106:            fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s ", 107:                    errno, strerror(errno)); 108:            return -1; 109:   } 110: 111:   pthread_join(producer_tid, NULL); 112:   pthread_join(consumer_tid, NULL); 113: 114:   return 0; 115:  }运行结果:本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-12/137937.htm