参考Linux内核中的kfifo.h文件,进行简单修改以便使用,使用两个线程,其中一个线程(如Pthread_Write)进行写操作,另个一线程(如Pthread_Read)进行读操作;其中写入连续数字,读出的数据可以写入到一个文件中,但是读出的数据必须是连续的,数据个数60亿,期间不适用加锁机制;
写入时间,读取时间要统计出来显示,使用clock_gettime函数统计60亿的数据写完和读出所需要的时间。
ring_buffer.h文件:
#ifndef KFIFO_HEADER_H
#define KFIFO_HEADER_H #include <inttypes.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #include <assert.h> //判断x是否是2的次方 #define is_power_of_2(x) ((x) != 0 && (((x) & ((x) - 1)) == 0)) //取a和b中最小值 #define min(a, b) (((a) < (b)) ? (a) : (b)) struct ring_buffer { void *buffer; //缓冲区 uint32_t size; //大小 uint32_t in; //入口位置 uint32_t out; //出口位置 }; //初始化缓冲区 struct ring_buffer* ring_buffer_init(void *buffer, uint32_t size, pthread_mutex_t *f_lock) { assert(buffer); struct ring_buffer *ring_buf = NULL; if (!is_power_of_2(size)) { fprintf(stderr,"size must be power of 2.\n"); return ring_buf; } ring_buf = (struct ring_buffer *)malloc(sizeof(struct ring_buffer)); if (!ring_buf) { fprintf(stderr,"Failed to malloc memory,errno:%u,reason:%s", errno, strerror(errno)); return ring_buf; } memset(ring_buf, 0, sizeof(struct ring_buffer)); ring_buf->buffer = buffer; ring_buf->size = size; ring_buf->in = 0; ring_buf->out = 0; return ring_buf; } //释放缓冲区 void ring_buffer_free(struct ring_buffer *ring_buf) { if (ring_buf) { if (ring_buf->buffer) { free(ring_buf->buffer); ring_buf->buffer = NULL; } free(ring_buf); ring_buf = NULL; } } //缓冲区的长度 uint32_t __ring_buffer_len(const struct ring_buffer *ring_buf) { return (ring_buf->in - ring_buf->out); } //从缓冲区中取数据 uint32_t __ring_buffer_get(struct ring_buffer *ring_buf, void * buffer, uint32_t size) { assert(ring_buf || buffer); uint32_t len = 0; size = min(size, ring_buf->in - ring_buf->out); /* first get the data from fifo->out until the end of the buffer */ len = min(size, ring_buf->size - (ring_buf->out & (ring_buf->size - 1))); memcpy(buffer, ring_buf->buffer + (ring_buf->out & (ring_buf->size - 1)), len); /* then get the rest (if any) from the beginning of the buffer */ memcpy(buffer + len, ring_buf->buffer, size - len); ring_buf->out += size; return size; } //向缓冲区中存放数据 uint32_t __ring_buffer_put(struct ring_buffer *ring_buf, void *buffer, uint32_t size) { assert(ring_buf || buffer); uint32_t len = 0; size = min(size, ring_buf->size - ring_buf->in + ring_buf->out); /* first put the data starting from fifo->in to buffer end */ len = min(size, ring_buf->size - (ring_buf->in & (ring_buf->size - 1))); memcpy(ring_buf->buffer + (ring_buf->in & (ring_buf->size - 1)), buffer, len); /* then put the rest (if any) at the beginning of the buffer */ memcpy(ring_buf->buffer, buffer + len, size - len); ring_buf->in += size; return size; } uint32_t ring_buffer_len(const struct ring_buffer *ring_buf) { uint32_t len = 0; len = __ring_buffer_len(ring_buf); return len; } uint32_t ring_buffer_get(struct ring_buffer *ring_buf, void *buffer, uint32_t size) { uint32_t ret; ret = __ring_buffer_get(ring_buf, buffer, size); //buffer中没有数据 if (ring_buf->in == ring_buf->out) ring_buf->in = ring_buf->out = 0; return ret; } uint32_t ring_buffer_put(struct ring_buffer *ring_buf, void *buffer, uint32_t size) { uint32_t ret; ret = __ring_buffer_put(ring_buf, buffer, size); return ret; } #endif
----------------------------------------------------------------------------------------------------
****.c文件
#include "ring_buffer.h" #include <pthread.h> #include <time.h> #include <stdio.h> #define BUFFER_SIZE 1024 * 1024 #define MAX_VALUE (long long)6000000000ll /*数据最大值*/ #define SECOND_TO_NSECOND (int)1000000000 /*秒向纳秒转换*/ long long i=0; long long dataVal=0; /****************取数据函数*********************/ void * ReadData(void *arg) { struct ring_buffer *ring_buf = (struct ring_buffer *)arg; long long getDataTemp=0; int second1=0; int nsecond1=0; FILE *fp; fp=fopen("data.txt","w"); /*打开文件*/ if(!fp) { fprintf(stderr,"Failed to Open the File!!"); } struct timespec get_start_time; struct timespec get_stop_time; int getDataFlag=1; while(getDataFlag) { ring_buffer_get(ring_buf, (void *)&getDataTemp , sizeof(getDataTemp)); if((getDataTemp > 1000000000) && (getDataTemp < 1000100000)) { fprintf(fp,"%lld ",getDataTemp); /*输出获取的数据到文本文件*/ } if(0==getDataTemp) /*取第一个数据时记录时间*/ { clock_gettime(CLOCK_REALTIME, &get_start_time); } if(MAX_VALUE-1==getDataTemp) { getDataFlag=0; /*取数据标志位置0结束取数据*/ clock_gettime(CLOCK_REALTIME, &get_stop_time); /*取最后一个数据时记录时间*/ if(get_stop_time.tv_nsec < get_start_time.tv_nsec) /*结束时的纳秒值小于开始时的纳秒时进行判断处理*/ { second1=(get_stop_time.tv_sec-get_start_time.tv_sec)-1; nsecond1=SECOND_TO_NSECOND+get_stop_time.tv_nsec-get_start_time.tv_nsec; } else { second1=get_stop_time.tv_sec-get_start_time.tv_sec; nsecond1=get_stop_time.tv_nsec-get_start_time.tv_nsec; } printf("Last Get Data: %lld\n",getDataTemp); printf("Get Data Complete!\n"); printf("Get Data Cost Time:%ds %dns\n\n",second1,nsecond1); /*打印取数据消耗的时间*/ } } if(ring_buffer_len(ring_buf)<=10) /*取数据过快时减小取数据的速度*/ { usleep(1); } fclose(fp); /*关闭文件*/ return (void *)ring_buf; } /****************写数据函数*********************/ void * WriteData(void *arg) { struct ring_buffer *ring_buf = (struct ring_buffer *)arg; struct timespec put_start_time; struct timespec put_stop_time; int putDataFlag=1; /*写数据标志为*/ long long *in_data = &dataVal; int second=0; int nsecond=0; while(putDataFlag) { ring_buffer_put(ring_buf, (void *)in_data, sizeof(dataVal)); if(0==dataVal) /*写第一个数据时记录时间*/ { clock_gettime(CLOCK_REALTIME, & put_start_time); } dataVal++; if(MAX_VALUE==dataVal) { clock_gettime(CLOCK_REALTIME, &put_stop_time); /*写最后一个数据时记录时间*/ if(put_stop_time.tv_nsec < put_start_time.tv_nsec) /*结束时的纳秒值小于开始时的纳秒时进行判断处理*/ { second=(put_stop_time.tv_sec-put_start_time.tv_sec)-1; nsecond=SECOND_TO_NSECOND+put_stop_time.tv_nsec-put_start_time.tv_nsec; } else { second=put_stop_time.tv_sec-put_start_time.tv_sec; nsecond=put_stop_time.tv_nsec-put_start_time.tv_nsec; } printf("Last Put Data: %lld\n",dataVal-1); printf("Put Data Complete!\n"); printf("Put Data Cost Time:%ds %dns\n\n",second,nsecond); /*打印写数据消耗的时间*/ putDataFlag=0; } if(ring_buffer_len(ring_buf)>=BUFFER_SIZE-100) { usleep(1); } } return (void *)ring_buf; } int Pthread_Read(void *arg) { int err; pthread_t tid; err = pthread_create(&tid, NULL, ReadData, arg); /*创建读数据线程*/ if (err != 0) { fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n", errno, strerror(errno)); return -1; } return tid; } int Pthread_Write(void *arg) { int err; pthread_t tid; err = pthread_create(&tid, NULL, WriteData, arg); /*创建写数据线程*/ if (err != 0) { fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n", errno, strerror(errno)); return -1; } return tid; } int main() { void * buffer = NULL; uint32_t size = 0; struct ring_buffer *ring_buf = NULL; pthread_t consume_pid, produce_pid; pthread_mutex_t *f_lock = (pthread_mutex_t *)malloc(sizeof(pthread_mutex_t)); if (pthread_mutex_init(f_lock, NULL) != 0) { fprintf(stderr, "Failed init mutex,errno:%u,reason:%s\n", errno, strerror(errno)); return -1; } buffer = (void *)malloc(BUFFER_SIZE); if (!buffer) { fprintf(stderr, "Failed to malloc memory.\n"); return -1; } size = BUFFER_SIZE; ring_buf = ring_buffer_init(buffer, size, f_lock); if (!ring_buf) { fprintf(stderr, "Failed to init ring buffer.\n"); return -1; } printf("Pthread_Write is running.......\n"); printf("Pthread_Read is running.......\n\n"); produce_pid = Pthread_Write((void*)ring_buf); consume_pid = Pthread_Read((void*)ring_buf); pthread_join(produce_pid, NULL);/*等待线程的结束*/ pthread_join(consume_pid, NULL); ring_buffer_free(ring_buf); free(f_lock); return 0; }