summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lf_ring_buffer.c48
-rw-r--r--lf_ring_buffer.h3
-rw-r--r--lf_ring_buffer_data.h2
-rw-r--r--lf_ring_buffer_test.c18
4 files changed, 49 insertions, 22 deletions
diff --git a/lf_ring_buffer.c b/lf_ring_buffer.c
index ec071ec..70074bc 100644
--- a/lf_ring_buffer.c
+++ b/lf_ring_buffer.c
@@ -31,16 +31,28 @@
#include <string.h>
#include <time.h>
-//#define DEBUG_LFR_RING 1
+//#define DEBUG_LFRB_KO 1
+//#define DEBUG_LFRB_CAS 1
+//#define DEBUG_LFRB 1
-#ifdef DEBUG_LFR_RING
+#ifdef DEBUG_LFRB
#include <stdio.h>
- #define _LOG_( ... ) fprintf(stdout,__VA_ARGS__)
+ #define _LOG_KO_( ... ) fprintf(stdout,__VA_ARGS__)
+ #define _LOG_CAS_( ... ) fprintf(stdout,__VA_ARGS__)
+#elif defined DEBUG_LFRB_CAS
+ #include <stdio.h>
+ #define _LOG_KO_( ... )
+ #define _LOG_CAS_( ... ) fprintf(stdout,__VA_ARGS__)
+#elif defined DEBUG_LFRB_KO
+ #include <stdio.h>
+ #define _LOG_KO_( ... ) fprintf(stdout,__VA_ARGS__)
+ #define _LOG_CAS_( ... )
#else
- #define _LOG_( ... )
+ #define _LOG_KO_( ... )
+ #define _LOG_CAS_( ... )
#endif
-#define BACKOFF_NANO_SLEEP 100
+#define BACKOFF_NANO_SLEEP 100000
/* initialize an empty lf_ring_buffer struct */
lf_ring_buffer_t* lf_ring_buffer_create( size_t n_buf ) {
@@ -66,10 +78,14 @@ void lf_ring_buffer_destroy( lf_ring_buffer_t *r ) {
free(r);
}
+/* return 1 if is empty */
+int lf_ring_buffer_empty( lf_ring_buffer_t *r ) { return r->read_from==-1; }
+
/* write data into the ring buffer */
int lf_ring_buffer_write( lf_ring_buffer_t *r, void *data, int flags ) {
int idx, next;
struct timespec backoff;
+ int backoff_time = BACKOFF_NANO_SLEEP;
/* reserve a buffer */
for(;;){
idx = r->write_to;
@@ -84,7 +100,7 @@ int lf_ring_buffer_write( lf_ring_buffer_t *r, void *data, int flags ) {
* - a writter has reserved this buffer => write_to has moved => CAS fails
* - a reader has consumed a buffer => read_from has moved => we've got more space
*/
- _LOG_( "write: CAS %d %d %d\n", r->write_to, idx, next );
+ _LOG_CAS_( "write: CAS %d %d %d\n", r->write_to, idx, next );
if( CompareAndSwapInt( &r->write_to, idx, next ) ) {
/* !!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!
* - if the ring is empty before this write operation (r->read_from==-1 or latest idx)
@@ -98,19 +114,20 @@ int lf_ring_buffer_write( lf_ring_buffer_t *r, void *data, int flags ) {
break;
}
} else {
- _LOG_("write: ring full %d %d\n",r->read_from,idx);
+ _LOG_KO_("write: ring full %d %d\n",r->read_from,idx);
if(IS_NOT_BLOCKING(flags)) return -1;
backoff.tv_sec = 0;
- backoff.tv_nsec = BACKOFF_NANO_SLEEP;
+ backoff.tv_nsec = backoff_time;
nanosleep(&backoff,NULL);
}
} else {
- _LOG_("write: buffer not available\n");
+ _LOG_KO_("write: buffer not available\n");
if(IS_NOT_BLOCKING(flags)) return -1;
backoff.tv_sec = 0;
- backoff.tv_nsec = BACKOFF_NANO_SLEEP;
+ backoff.tv_nsec = backoff_time;
nanosleep(&backoff,NULL);
}
+ backoff_time += BACKOFF_NANO_SLEEP;
}
/* fill this buffer and mark it as filled */
memcpy( LFRB_DATA_PTR(r->buffer[idx]), data, LFRB_DATA_SIZE );
@@ -122,6 +139,7 @@ int lf_ring_buffer_write( lf_ring_buffer_t *r, void *data, int flags ) {
int lf_ring_buffer_read( lf_ring_buffer_t *r, void *data, int flags ) {
int idx, next;
struct timespec backoff;
+ int backoff_time = BACKOFF_NANO_SLEEP;
for(;;) {
idx = r->read_from;
if( !(LFRB_IS_AVAILABLE( r->buffer[idx] )) && r->read_from!=-1 ) {
@@ -129,21 +147,23 @@ int lf_ring_buffer_read( lf_ring_buffer_t *r, void *data, int flags ) {
if (next==r->n_buf) next=0;
/* will do bad things if data dst buffer is too small !! */
memcpy( data, LFRB_DATA_PTR(r->buffer[idx]), LFRB_DATA_SIZE );
- _LOG_( "read: CAS %d %d %d\n", r->read_from, idx, next );
+ _LOG_CAS_( "read: CAS %d %d %d\n", r->read_from, idx, next );
if( CompareAndSwapInt( &r->read_from, idx, next ) ) {
if(r->read_from==r->write_to) {
/* the buffer is empty but writers will see it as full */
- _LOG_( "read: empty CAS %d %d %d\n", r->read_from, next, -1 );
+ _LOG_CAS_( "read: empty CAS %d %d %d\n", r->read_from, next, -1 );
CompareAndSwapInt( &r->read_from, next, -1 );
}
break;
}
}
- _LOG_("read: ring empty\n");
+
+ _LOG_KO_("read: ring empty\n");
if(IS_NOT_BLOCKING(flags)) return -1;
backoff.tv_sec = 0;
- backoff.tv_nsec = BACKOFF_NANO_SLEEP;
+ backoff.tv_nsec = backoff_time;
nanosleep(&backoff,NULL);
+ backoff_time += BACKOFF_NANO_SLEEP;
}
/* finish the read process */
LFRB_MARK_AS_READ( r->buffer[idx] );
diff --git a/lf_ring_buffer.h b/lf_ring_buffer.h
index 64bed4c..14a1aec 100644
--- a/lf_ring_buffer.h
+++ b/lf_ring_buffer.h
@@ -51,6 +51,9 @@ lf_ring_buffer_t* lf_ring_buffer_create( size_t n_buf );
/* destroy an lf_ring_buffer_t struct */
void lf_ring_buffer_destroy( lf_ring_buffer_t *r );
+/* return 1 if is empty */
+int lf_ring_buffer_empty( lf_ring_buffer_t *r );
+
/* write data into the ring buffer
* return 0 on success
* return -1 if IS_NOT_BLOCKING and buffer is full
diff --git a/lf_ring_buffer_data.h b/lf_ring_buffer_data.h
index 6a1290f..80092d5 100644
--- a/lf_ring_buffer_data.h
+++ b/lf_ring_buffer_data.h
@@ -1,7 +1,7 @@
typedef char rb_data_t;
-#define RB_DATA_LEN 63
+#define RB_DATA_LEN 24
typedef struct buffer {
char status;
diff --git a/lf_ring_buffer_test.c b/lf_ring_buffer_test.c
index b131487..dd56ca2 100644
--- a/lf_ring_buffer_test.c
+++ b/lf_ring_buffer_test.c
@@ -63,8 +63,12 @@ static uint64_t sequential_reads( lf_ring_buffer_t *ring, int n, int flags ) {
rb_data_t data[RB_DATA_LEN];
struct timespec start, end;
clock_gettime(CLOCK_MONOTONIC, &start);
- for(i=0; i<n;) {
- if(lf_ring_buffer_read( ring, data, flags )==0) { i++; } else { redo+=1; }
+ if(flags==0) {
+ for(i=0; i<n; i++) lf_ring_buffer_read( ring, data, flags );
+ } else {
+ for(i=0; i<n;) {
+ if(lf_ring_buffer_read( ring, data, flags )==0) { i++; } else { redo+=1; }
+ }
}
clock_gettime(CLOCK_MONOTONIC, &end);
report( "read ", n, time_diff( &start, &end ), redo );
@@ -144,34 +148,34 @@ int main( int argc, char** argv ) {
*/
printf("sequential non blocking write operations ...\n");
- //report( "write", b_len, sequential_writes( ring, b_len, 0 ) );
sequential_writes( ring, b_len, 0 );
printf("sequential non blocking read operations ...\n");
- //report( "read ", b_len, sequential_reads( ring, b_len, 0 ) );
sequential_reads( ring, b_len, 0 );
+ if(!lf_ring_buffer_empty(ring)) { fprintf(stderr,"ring should be empty but is not\n"); exit( EXIT_FAILURE ); }
printf("sequential blocking write operations ...\n");
- //report( "write", b_len, sequential_writes( ring, b_len, 0 ) );
sequential_writes( ring, b_len, LFRB_NO_BLOCK );
printf("sequential blocking read operations ...\n");
- //report( "read ", b_len, sequential_reads( ring, b_len, 0 ) );
sequential_reads( ring, b_len, LFRB_NO_BLOCK );
+ if(!lf_ring_buffer_empty(ring)) { fprintf(stderr,"ring should be empty but is not\n"); exit( EXIT_FAILURE ); }
for(i=5; i<=100;i*=2) {
printf("%d parallel blocking with backoff inc write operations .... \n",i);
parallel_op( 0, i, ring, b_len, 0 );
printf("parallel blocking read operations ...\n");
sequential_reads( ring, b_len, 0 );
+ if(!lf_ring_buffer_empty(ring)) { fprintf(stderr,"ring should be empty but is not\n"); exit( EXIT_FAILURE ); }
}
for(i=5; i<=100;i*=2) {
printf("%d parallel non blocking write operations .... \n",i);
parallel_op( 0, i, ring, b_len, LFRB_NO_BLOCK );
printf("non blocking read operations ...\n");
sequential_reads( ring, b_len, LFRB_NO_BLOCK );
+ if(!lf_ring_buffer_empty(ring)) { fprintf(stderr,"ring should be empty but is not\n"); exit( EXIT_FAILURE ); }
}
-
for(i=10; i<=100;i*=2) {
printf("%d parallel blocking write and read operations .... \n",i*2);
parallel_op( 3, i, ring, b_len, 0 );
+ if(!lf_ring_buffer_empty(ring)) { fprintf(stderr,"ring should be empty but is not\n"); exit( EXIT_FAILURE ); }
}
lf_ring_buffer_destroy( ring );