利用Linux内核KFIFO代码,实现大量数据的写入与读出

xiaoxiao2021-02-28  56

参考Linux内核中的kfifo.h文件,进行简单修改以便使用,使用两个线程,其中一个线程(如Pthread_Write)进行写操作,另个一线程(如Pthread_Read)进行读操作;其中写入连续数字,读出的数据可以写入到一个文件中,但是读出的数据必须是连续的,数据个数60亿,期间不适用加锁机制;

写入时间,读取时间要统计出来显示,使用clock_gettime函数统计60亿的数据写完和读出所需要的时间。

ring_buffer.h文件:

#ifndef KFIFO_HEADER_H 

#define KFIFO_HEADER_H #include <inttypes.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #include <assert.h> //判断x是否是2的次方 #define is_power_of_2(x) ((x) != 0 && (((x) & ((x) - 1)) == 0)) //取a和b中最小值 #define min(a, b) (((a) < (b)) ? (a) : (b)) struct ring_buffer {     void         *buffer;     //缓冲区     uint32_t     size;       //大小     uint32_t     in;         //入口位置     uint32_t       out;        //出口位置 }; //初始化缓冲区 struct ring_buffer* ring_buffer_init(void *buffer, uint32_t size, pthread_mutex_t *f_lock) {     assert(buffer);     struct ring_buffer *ring_buf = NULL;     if (!is_power_of_2(size))     {     fprintf(stderr,"size must be power of 2.\n");         return ring_buf;     }     ring_buf = (struct ring_buffer *)malloc(sizeof(struct ring_buffer));     if (!ring_buf)     {         fprintf(stderr,"Failed to malloc memory,errno:%u,reason:%s",             errno, strerror(errno));         return ring_buf;     }     memset(ring_buf, 0, sizeof(struct ring_buffer));     ring_buf->buffer = buffer;     ring_buf->size = size;     ring_buf->in = 0;     ring_buf->out = 0;     return ring_buf; } //释放缓冲区 void ring_buffer_free(struct ring_buffer *ring_buf) {     if (ring_buf)     {     if (ring_buf->buffer)     {         free(ring_buf->buffer);         ring_buf->buffer = NULL;     }     free(ring_buf);     ring_buf = NULL;     } } //缓冲区的长度 uint32_t __ring_buffer_len(const struct ring_buffer *ring_buf) {     return (ring_buf->in - ring_buf->out); } //从缓冲区中取数据 uint32_t __ring_buffer_get(struct ring_buffer *ring_buf, void * buffer, uint32_t size) {     assert(ring_buf || buffer);     uint32_t len = 0;     size  = min(size, ring_buf->in - ring_buf->out);             /* first get the data from fifo->out until the end of the buffer */     len = min(size, ring_buf->size - (ring_buf->out & (ring_buf->size - 1)));     memcpy(buffer, ring_buf->buffer + (ring_buf->out & (ring_buf->size - 1)), len);     /* then get the rest (if any) from the beginning of the buffer */     memcpy(buffer + len, ring_buf->buffer, size - len);     ring_buf->out += size;     return size; } //向缓冲区中存放数据 uint32_t __ring_buffer_put(struct ring_buffer *ring_buf, void *buffer, uint32_t size) {     assert(ring_buf || buffer);     uint32_t len = 0;     size = min(size, ring_buf->size - ring_buf->in + ring_buf->out);     /* first put the data starting from fifo->in to buffer end */     len  = min(size, ring_buf->size - (ring_buf->in & (ring_buf->size - 1)));     memcpy(ring_buf->buffer + (ring_buf->in & (ring_buf->size - 1)), buffer, len);     /* then put the rest (if any) at the beginning of the buffer */     memcpy(ring_buf->buffer, buffer + len, size - len);     ring_buf->in += size;     return size; } uint32_t ring_buffer_len(const struct ring_buffer *ring_buf) {     uint32_t len = 0;     len = __ring_buffer_len(ring_buf);     return len; } uint32_t ring_buffer_get(struct ring_buffer *ring_buf, void *buffer, uint32_t size) {     uint32_t ret;     ret = __ring_buffer_get(ring_buf, buffer, size);     //buffer中没有数据     if (ring_buf->in == ring_buf->out)       ring_buf->in = ring_buf->out = 0;     return ret; } uint32_t ring_buffer_put(struct ring_buffer *ring_buf, void *buffer, uint32_t size) {     uint32_t ret;     ret = __ring_buffer_put(ring_buf, buffer, size);     return ret;      } #endif

----------------------------------------------------------------------------------------------------

****.c文件

#include "ring_buffer.h" #include <pthread.h> #include <time.h> #include <stdio.h> #define  BUFFER_SIZE         1024 * 1024 #define  MAX_VALUE          (long long)6000000000ll    /*数据最大值*/ #define  SECOND_TO_NSECOND   (int)1000000000  /*秒向纳秒转换*/ long long i=0; long long dataVal=0; /****************取数据函数*********************/ void * ReadData(void *arg) {     struct ring_buffer *ring_buf = (struct ring_buffer *)arg;      long long getDataTemp=0;      int second1=0;     int nsecond1=0;      FILE *fp;             fp=fopen("data.txt","w"); /*打开文件*/        if(!fp)        {            fprintf(stderr,"Failed to Open the File!!");        }     struct timespec get_start_time;     struct timespec get_stop_time;     int  getDataFlag=1;     while(getDataFlag)     {      ring_buffer_get(ring_buf, (void *)&getDataTemp , sizeof(getDataTemp));       if((getDataTemp > 1000000000) && (getDataTemp < 1000100000))           {            fprintf(fp,"%lld  ",getDataTemp);  /*输出获取的数据到文本文件*/         }    if(0==getDataTemp)   /*取第一个数据时记录时间*/       {           clock_gettime(CLOCK_REALTIME, &get_start_time);        }       if(MAX_VALUE-1==getDataTemp)          {          getDataFlag=0;    /*取数据标志位置0结束取数据*/           clock_gettime(CLOCK_REALTIME, &get_stop_time);   /*取最后一个数据时记录时间*/           if(get_stop_time.tv_nsec < get_start_time.tv_nsec)   /*结束时的纳秒值小于开始时的纳秒时进行判断处理*/                {              second1=(get_stop_time.tv_sec-get_start_time.tv_sec)-1;              nsecond1=SECOND_TO_NSECOND+get_stop_time.tv_nsec-get_start_time.tv_nsec;              }          else                {                  second1=get_stop_time.tv_sec-get_start_time.tv_sec;              nsecond1=get_stop_time.tv_nsec-get_start_time.tv_nsec;                }        printf("Last Get Data: %lld\n",getDataTemp);        printf("Get Data Complete!\n");        printf("Get Data Cost Time:%ds %dns\n\n",second1,nsecond1);  /*打印取数据消耗的时间*/                   }            }       if(ring_buffer_len(ring_buf)<=10)    /*取数据过快时减小取数据的速度*/        {         usleep(1);       }            fclose(fp);   /*关闭文件*/      return (void *)ring_buf; } /****************写数据函数*********************/ void * WriteData(void *arg) {     struct ring_buffer *ring_buf = (struct ring_buffer *)arg;     struct timespec put_start_time;     struct timespec put_stop_time;     int putDataFlag=1;  /*写数据标志为*/      long long *in_data = &dataVal;     int second=0;     int nsecond=0;     while(putDataFlag)     {                ring_buffer_put(ring_buf, (void *)in_data, sizeof(dataVal));       if(0==dataVal)   /*写第一个数据时记录时间*/          {           clock_gettime(CLOCK_REALTIME, & put_start_time);           }       dataVal++;       if(MAX_VALUE==dataVal)           {                 clock_gettime(CLOCK_REALTIME, &put_stop_time);   /*写最后一个数据时记录时间*/           if(put_stop_time.tv_nsec < put_start_time.tv_nsec)   /*结束时的纳秒值小于开始时的纳秒时进行判断处理*/              {             second=(put_stop_time.tv_sec-put_start_time.tv_sec)-1;             nsecond=SECOND_TO_NSECOND+put_stop_time.tv_nsec-put_start_time.tv_nsec;              }         else              {                 second=put_stop_time.tv_sec-put_start_time.tv_sec;                      nsecond=put_stop_time.tv_nsec-put_start_time.tv_nsec;               }          printf("Last Put Data: %lld\n",dataVal-1);          printf("Put Data Complete!\n");          printf("Put Data Cost Time:%ds %dns\n\n",second,nsecond);   /*打印写数据消耗的时间*/             putDataFlag=0;         }              if(ring_buffer_len(ring_buf)>=BUFFER_SIZE-100)          {          usleep(1);          }        }          return (void *)ring_buf; } int Pthread_Read(void *arg) {     int err;     pthread_t tid;     err = pthread_create(&tid, NULL, ReadData, arg);  /*创建读数据线程*/     if (err != 0)       {        fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n",        errno, strerror(errno));        return -1;       }     return tid; } int Pthread_Write(void *arg) {     int err;     pthread_t tid;     err = pthread_create(&tid, NULL, WriteData, arg);   /*创建写数据线程*/     if (err != 0)     {     fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n",         errno, strerror(errno));     return -1;     }     return tid; } int main() {     void * buffer = NULL;     uint32_t size = 0;     struct ring_buffer *ring_buf = NULL;     pthread_t consume_pid, produce_pid;     pthread_mutex_t *f_lock = (pthread_mutex_t *)malloc(sizeof(pthread_mutex_t));     if (pthread_mutex_init(f_lock, NULL) != 0)     {     fprintf(stderr, "Failed init mutex,errno:%u,reason:%s\n",         errno, strerror(errno));     return -1;     }     buffer = (void *)malloc(BUFFER_SIZE);     if (!buffer)     {     fprintf(stderr, "Failed to malloc memory.\n");     return -1;     }     size = BUFFER_SIZE;     ring_buf = ring_buffer_init(buffer, size, f_lock);     if (!ring_buf)     {     fprintf(stderr, "Failed to init ring buffer.\n");     return -1;     }     printf("Pthread_Write is running.......\n");     printf("Pthread_Read  is running.......\n\n");     produce_pid  = Pthread_Write((void*)ring_buf);     consume_pid  = Pthread_Read((void*)ring_buf);     pthread_join(produce_pid, NULL);/*等待线程的结束*/     pthread_join(consume_pid, NULL);     ring_buffer_free(ring_buf);     free(f_lock);     return 0; }

转载请注明原文地址: https://www.6miu.com/read-61005.html

最新回复(0)