From ce1dcb6ed32396910e3400e3a816b76d22d5a0de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20Zurcher?= Date: Wed, 6 Jan 2010 23:12:15 +0100 Subject: ring buffer is better ... one more crash --- lf_ring_buffer.c | 75 +++++++++++++++++++++++++++++++++++++++------------ lf_ring_buffer.h | 4 +-- lf_ring_buffer_test.c | 28 ++++++++++--------- 3 files changed, 76 insertions(+), 31 deletions(-) diff --git a/lf_ring_buffer.c b/lf_ring_buffer.c index 225ef4d..75b2f17 100644 --- a/lf_ring_buffer.c +++ b/lf_ring_buffer.c @@ -31,18 +31,49 @@ #include #include +#if !defined LFRB_DATA_SIZE + #error "LFRB_DATA_SIZE is not defined" +#endif +#if !defined LFRB_BUFFER_TYPE + #error "LFRB_BUFFER_TAPE is not defined" +#endif +#if !defined LFRB_BUFFER_SIZE + #error "LFRB_BUFFER_SIZE is not defined" +#endif +#if !defined LFRB_IS_AVAILABLE + #error "LFRB_IS_AVAILABLE is not defined" +#endif +#if !defined LFRB_MARK_AS_FILLED + #error "LFRB_MARK_AS_FILLED is not defined" +#endif +#if !defined LFRB_MARK_AS_READ + #error "LFRB_MARK_AS_READ is not defined" +#endif +#if !defined LFRB_DATA_PTR + #error "LFRB_DATA_PTR is not defined" +#endif + +//#define DEBUG_LFR_RING 1 + +#ifdef DEBUG_LFR_RING + #include + #define _LOG_( ... ) fprintf(stdout,__VA_ARGS__) +#else + #define _LOG_( ... ) +#endif + /* initialize an empty lf_ring_buffer struct */ lf_ring_buffer_t* lf_ring_buffer_create( int n_buf ) { /* alloc ring_buffer struct */ lf_ring_buffer_t *r = malloc(sizeof(lf_ring_buffer_t)); if(r==NULL) return NULL; /* */ - r->buffer = malloc(sizeof(lf_buffer_el_t)*n_buf); + r->buffer = malloc(LFRB_BUFFER_SIZE*n_buf); if(r->buffer==NULL) { free(r); return NULL; } - memset(r->buffer,0,sizeof(lf_buffer_el_t)*r->n_buf); + memset(r->buffer,0,LFRB_BUFFER_SIZE*n_buf); r->n_buf = n_buf; r->read_from = -1; r->write_to = 0; @@ -58,28 +89,33 @@ void lf_ring_buffer_destroy( lf_ring_buffer_t *r ) { } /* write data into the ring buffer */ -int lf_ring_buffer_write( lf_ring_buffer_t *r, rb_data_t *data, int flags ) { - int idx; - int next; +int lf_ring_buffer_write( lf_ring_buffer_t *r, void *data, int flags ) { + int idx, next; struct timespec st; st.tv_sec=0; /* reserve a buffer */ for(;;){ idx = r->write_to; - if(IS_AVAILABLE(idx) ) { + if(LFRB_IS_AVAILABLE( r->buffer[idx] ) ) { /* read_from==idx means that the buffer is full and that a writer thread which at first reserved this buffer * hasn't had enough CPU cycles to call MARK_AS_FILLED */ - if(!r->read_from==idx) { - next = (idx+1)%r->n_buf; + if(!(r->read_from==idx)) { + next = idx+1; + if (next==r->n_buf) next=0; /* what might have happend between IS_AVAILABLE and now : * - a writter has reserved this buffer => write_to has moved => CAS fails * - a reader has consumed a buffer => read_from has moved => we've got more space */ + _LOG_( "write: CAS %d %d %d\n", r->write_to, idx, next ); if( CompareAndSwapInt( &r->write_to, idx, next ) ) break; + } else { + /* TODO simply reloop, nothing to do ??? */ + // TODO ERROR when all has been read + _LOG_("write: buffer full %d %d\n",r->read_from,idx); } - /* TODO simply reloop, nothing to do ??? */ } else { + _LOG_("write: not available\n"); if(IS_NOT_BLOCKING(flags)) return -1; st.tv_nsec=r->write_delay; nanosleep(&st,NULL); @@ -94,24 +130,29 @@ int lf_ring_buffer_write( lf_ring_buffer_t *r, rb_data_t *data, int flags ) { */ if(r->read_from==-1) CompareAndSwapInt( &r->read_from, -1, idx ); /* fill this buffer and mark it as filled */ - memcpy( r->buffer[idx].data, data, RB_DATA_SIZE*sizeof(rb_data_t) ); - MARK_AS_FILLED( idx ); + memcpy( r->buffer[idx].data, data, LFRB_DATA_SIZE ); + LFRB_MARK_AS_FILLED( r->buffer[idx] ); if(r->write_delay>BACKOFF_DELAY_INIT) r->write_delay-=BACKOFF_DELAY_INC; return 0; } /* read data from the ring buffer */ -int lf_ring_buffer_read( lf_ring_buffer_t *r, rb_data_t *data, int flags ) { - int idx; +int lf_ring_buffer_read( lf_ring_buffer_t *r, void *data, int flags ) { + int idx, next; struct timespec st; st.tv_sec=0; if(r->read_from==-1) return -1; for(;;) { idx = r->read_from; - if(!IS_AVAILABLE(idx)) { - memcpy( data, r->buffer[idx].data, RB_DATA_SIZE*sizeof(rb_data_t) ); - if( CompareAndSwapInt( &r->read_from, idx, (idx+1)%r->n_buf ) ) break; + if( !(LFRB_IS_AVAILABLE( r->buffer[idx] )) ) { + next = idx+1; + if (next==r->n_buf) next=0; + /* will do bad things if data dst buffer is too small !! */ + memcpy( data, r->buffer[idx].data, LFRB_DATA_SIZE ); + _LOG_( "write: CAS %d %d %d\n", r->read_from, idx, next ); + if( CompareAndSwapInt( &r->read_from, idx, next ) ) break; } else { + _LOG_("read: not available\n"); if(IS_NOT_BLOCKING(flags)) return -1; st.tv_nsec=r->read_delay; nanosleep(&st,NULL); @@ -119,7 +160,7 @@ int lf_ring_buffer_read( lf_ring_buffer_t *r, rb_data_t *data, int flags ) { } } /* finish the read process */ - MARK_AS_READ( idx ); + LFRB_MARK_AS_READ( r->buffer[idx] ); if(r->read_delay>BACKOFF_DELAY_INIT) r->read_delay-=BACKOFF_DELAY_INC; return 0; } diff --git a/lf_ring_buffer.h b/lf_ring_buffer.h index a7c7786..790cba9 100644 --- a/lf_ring_buffer.h +++ b/lf_ring_buffer.h @@ -38,8 +38,8 @@ extern "C" { #define BACKOFF_DELAY_INC 3000 #define BACKOFF_DELAY_MAX 90000 -#define NO_BLOCK 1 /* if buffer is full, leave instead of try again and again */ -#define IS_NOT_BLOCKING( flags ) ( (flags)&NO_BLOCK ) +#define LFRB_NO_BLOCK 1 /* if buffer is full, leave instead of try again and again */ +#define IS_NOT_BLOCKING( flags ) ( (flags)&LFRB_NO_BLOCK ) typedef struct ring_buffer { LFRB_BUFFER_TYPE *buffer; /* buffer data */ diff --git a/lf_ring_buffer_test.c b/lf_ring_buffer_test.c index 31d4810..348d202 100644 --- a/lf_ring_buffer_test.c +++ b/lf_ring_buffer_test.c @@ -6,11 +6,10 @@ #include "lf_ring_buffer.h" -#define BUFFER_LEN 5000000 +//#define BUFFER_LEN 5000000 +#define BUFFER_LEN 500 -static rb_data_t[BUFFER_LEN][RB_DATA_LEN] - -static const double secs_per_tick = 1.0 / CLOCKS_PER_SEC; +static rb_data_t data[BUFFER_LEN][RB_DATA_LEN]; static int64_t time_diff(struct timespec *t0, struct timespec *t1) { @@ -23,7 +22,7 @@ static void print_now(char* s) { } static void report( int n, uint64_t dt ) { - fprintf(stdout,"%d operations in %d [ms] => %d [us] => %d [ns]\t%d [ns/op]\n", n, (int)(dt/1000000), (int)(dt/1000), (int)dt, (int)(dt/n) ); + fprintf(stdout,"%d operations in %d [ms] => %d [us] => %d [ns]\t >>> %d [ns/op]\n", n, (int)(dt/1000000), (int)(dt/1000), (int)dt, (int)(dt/n) ); } static void feed_data( int n){ @@ -33,15 +32,16 @@ static void feed_data( int n){ } } -static uint64_t sequential_writes( lf_ring_buffer_t *ring, int n ) { +static uint64_t sequential_writes( lf_ring_buffer_t *ring, int n, int flags ) { int i; + rb_data_t data[RB_DATA_LEN]; struct timespec start, end; print_now("sequential writes ... "); clock_gettime(CLOCK_MONOTONIC, &start); - for(i=0; i