summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lf_ring_buffer.c75
-rw-r--r--lf_ring_buffer.h4
-rw-r--r--lf_ring_buffer_test.c28
3 files changed, 76 insertions, 31 deletions
diff --git a/lf_ring_buffer.c b/lf_ring_buffer.c
index 225ef4d..75b2f17 100644
--- a/lf_ring_buffer.c
+++ b/lf_ring_buffer.c
@@ -31,18 +31,49 @@
#include <string.h>
#include <time.h>
+#if !defined LFRB_DATA_SIZE
+ #error "LFRB_DATA_SIZE is not defined"
+#endif
+#if !defined LFRB_BUFFER_TYPE
+ #error "LFRB_BUFFER_TAPE is not defined"
+#endif
+#if !defined LFRB_BUFFER_SIZE
+ #error "LFRB_BUFFER_SIZE is not defined"
+#endif
+#if !defined LFRB_IS_AVAILABLE
+ #error "LFRB_IS_AVAILABLE is not defined"
+#endif
+#if !defined LFRB_MARK_AS_FILLED
+ #error "LFRB_MARK_AS_FILLED is not defined"
+#endif
+#if !defined LFRB_MARK_AS_READ
+ #error "LFRB_MARK_AS_READ is not defined"
+#endif
+#if !defined LFRB_DATA_PTR
+ #error "LFRB_DATA_PTR is not defined"
+#endif
+
+//#define DEBUG_LFR_RING 1
+
+#ifdef DEBUG_LFR_RING
+ #include <stdio.h>
+ #define _LOG_( ... ) fprintf(stdout,__VA_ARGS__)
+#else
+ #define _LOG_( ... )
+#endif
+
/* initialize an empty lf_ring_buffer struct */
lf_ring_buffer_t* lf_ring_buffer_create( int n_buf ) {
/* alloc ring_buffer struct */
lf_ring_buffer_t *r = malloc(sizeof(lf_ring_buffer_t));
if(r==NULL) return NULL;
/* */
- r->buffer = malloc(sizeof(lf_buffer_el_t)*n_buf);
+ r->buffer = malloc(LFRB_BUFFER_SIZE*n_buf);
if(r->buffer==NULL) {
free(r);
return NULL;
}
- memset(r->buffer,0,sizeof(lf_buffer_el_t)*r->n_buf);
+ memset(r->buffer,0,LFRB_BUFFER_SIZE*n_buf);
r->n_buf = n_buf;
r->read_from = -1;
r->write_to = 0;
@@ -58,28 +89,33 @@ void lf_ring_buffer_destroy( lf_ring_buffer_t *r ) {
}
/* write data into the ring buffer */
-int lf_ring_buffer_write( lf_ring_buffer_t *r, rb_data_t *data, int flags ) {
- int idx;
- int next;
+int lf_ring_buffer_write( lf_ring_buffer_t *r, void *data, int flags ) {
+ int idx, next;
struct timespec st;
st.tv_sec=0;
/* reserve a buffer */
for(;;){
idx = r->write_to;
- if(IS_AVAILABLE(idx) ) {
+ if(LFRB_IS_AVAILABLE( r->buffer[idx] ) ) {
/* read_from==idx means that the buffer is full and that a writer thread which at first reserved this buffer
* hasn't had enough CPU cycles to call MARK_AS_FILLED
*/
- if(!r->read_from==idx) {
- next = (idx+1)%r->n_buf;
+ if(!(r->read_from==idx)) {
+ next = idx+1;
+ if (next==r->n_buf) next=0;
/* what might have happend between IS_AVAILABLE and now :
* - a writter has reserved this buffer => write_to has moved => CAS fails
* - a reader has consumed a buffer => read_from has moved => we've got more space
*/
+ _LOG_( "write: CAS %d %d %d\n", r->write_to, idx, next );
if( CompareAndSwapInt( &r->write_to, idx, next ) ) break;
+ } else {
+ /* TODO simply reloop, nothing to do ??? */
+ // TODO ERROR when all has been read
+ _LOG_("write: buffer full %d %d\n",r->read_from,idx);
}
- /* TODO simply reloop, nothing to do ??? */
} else {
+ _LOG_("write: not available\n");
if(IS_NOT_BLOCKING(flags)) return -1;
st.tv_nsec=r->write_delay;
nanosleep(&st,NULL);
@@ -94,24 +130,29 @@ int lf_ring_buffer_write( lf_ring_buffer_t *r, rb_data_t *data, int flags ) {
*/
if(r->read_from==-1) CompareAndSwapInt( &r->read_from, -1, idx );
/* fill this buffer and mark it as filled */
- memcpy( r->buffer[idx].data, data, RB_DATA_SIZE*sizeof(rb_data_t) );
- MARK_AS_FILLED( idx );
+ memcpy( r->buffer[idx].data, data, LFRB_DATA_SIZE );
+ LFRB_MARK_AS_FILLED( r->buffer[idx] );
if(r->write_delay>BACKOFF_DELAY_INIT) r->write_delay-=BACKOFF_DELAY_INC;
return 0;
}
/* read data from the ring buffer */
-int lf_ring_buffer_read( lf_ring_buffer_t *r, rb_data_t *data, int flags ) {
- int idx;
+int lf_ring_buffer_read( lf_ring_buffer_t *r, void *data, int flags ) {
+ int idx, next;
struct timespec st;
st.tv_sec=0;
if(r->read_from==-1) return -1;
for(;;) {
idx = r->read_from;
- if(!IS_AVAILABLE(idx)) {
- memcpy( data, r->buffer[idx].data, RB_DATA_SIZE*sizeof(rb_data_t) );
- if( CompareAndSwapInt( &r->read_from, idx, (idx+1)%r->n_buf ) ) break;
+ if( !(LFRB_IS_AVAILABLE( r->buffer[idx] )) ) {
+ next = idx+1;
+ if (next==r->n_buf) next=0;
+ /* will do bad things if data dst buffer is too small !! */
+ memcpy( data, r->buffer[idx].data, LFRB_DATA_SIZE );
+ _LOG_( "write: CAS %d %d %d\n", r->read_from, idx, next );
+ if( CompareAndSwapInt( &r->read_from, idx, next ) ) break;
} else {
+ _LOG_("read: not available\n");
if(IS_NOT_BLOCKING(flags)) return -1;
st.tv_nsec=r->read_delay;
nanosleep(&st,NULL);
@@ -119,7 +160,7 @@ int lf_ring_buffer_read( lf_ring_buffer_t *r, rb_data_t *data, int flags ) {
}
}
/* finish the read process */
- MARK_AS_READ( idx );
+ LFRB_MARK_AS_READ( r->buffer[idx] );
if(r->read_delay>BACKOFF_DELAY_INIT) r->read_delay-=BACKOFF_DELAY_INC;
return 0;
}
diff --git a/lf_ring_buffer.h b/lf_ring_buffer.h
index a7c7786..790cba9 100644
--- a/lf_ring_buffer.h
+++ b/lf_ring_buffer.h
@@ -38,8 +38,8 @@ extern "C" {
#define BACKOFF_DELAY_INC 3000
#define BACKOFF_DELAY_MAX 90000
-#define NO_BLOCK 1 /* if buffer is full, leave instead of try again and again */
-#define IS_NOT_BLOCKING( flags ) ( (flags)&NO_BLOCK )
+#define LFRB_NO_BLOCK 1 /* if buffer is full, leave instead of try again and again */
+#define IS_NOT_BLOCKING( flags ) ( (flags)&LFRB_NO_BLOCK )
typedef struct ring_buffer {
LFRB_BUFFER_TYPE *buffer; /* buffer data */
diff --git a/lf_ring_buffer_test.c b/lf_ring_buffer_test.c
index 31d4810..348d202 100644
--- a/lf_ring_buffer_test.c
+++ b/lf_ring_buffer_test.c
@@ -6,11 +6,10 @@
#include "lf_ring_buffer.h"
-#define BUFFER_LEN 5000000
+//#define BUFFER_LEN 5000000
+#define BUFFER_LEN 500
-static rb_data_t[BUFFER_LEN][RB_DATA_LEN]
-
-static const double secs_per_tick = 1.0 / CLOCKS_PER_SEC;
+static rb_data_t data[BUFFER_LEN][RB_DATA_LEN];
static int64_t time_diff(struct timespec *t0, struct timespec *t1)
{
@@ -23,7 +22,7 @@ static void print_now(char* s) {
}
static void report( int n, uint64_t dt ) {
- fprintf(stdout,"%d operations in %d [ms] => %d [us] => %d [ns]\t%d [ns/op]\n", n, (int)(dt/1000000), (int)(dt/1000), (int)dt, (int)(dt/n) );
+ fprintf(stdout,"%d operations in %d [ms] => %d [us] => %d [ns]\t >>> %d [ns/op]\n", n, (int)(dt/1000000), (int)(dt/1000), (int)dt, (int)(dt/n) );
}
static void feed_data( int n){
@@ -33,15 +32,16 @@ static void feed_data( int n){
}
}
-static uint64_t sequential_writes( lf_ring_buffer_t *ring, int n ) {
+static uint64_t sequential_writes( lf_ring_buffer_t *ring, int n, int flags ) {
int i;
+ rb_data_t data[RB_DATA_LEN];
struct timespec start, end;
print_now("sequential writes ... ");
clock_gettime(CLOCK_MONOTONIC, &start);
- for(i=0; i<n; i++) lf_ring_buffer_write( ring, data[i], 0 );
+ for(i=0; i<n; i++) lf_ring_buffer_write( ring, data, flags );
clock_gettime(CLOCK_MONOTONIC, &end);
printf("done.\n");
@@ -49,16 +49,16 @@ static uint64_t sequential_writes( lf_ring_buffer_t *ring, int n ) {
return time_diff( &start, &end );
}
-static uint64_t sequential_reads( lf_ring_buffer_t *ring, int n ) {
+static uint64_t sequential_reads( lf_ring_buffer_t *ring, int n, int flags ) {
int i;
- char data[20];
+ rb_data_t data[RB_DATA_LEN];
struct timespec start, end;
print_now("sequential reads ... ");
clock_gettime(CLOCK_MONOTONIC, &start);
- for(i=0; i<n; i++) lf_ring_buffer_read( ring, data, 0 );
+ for(i=0; i<n; i++) lf_ring_buffer_read( ring, data, flags );
clock_gettime(CLOCK_MONOTONIC, &end);
printf("done.\n");
@@ -77,12 +77,16 @@ int main( int argc, char** argv ) {
exit( EXIT_FAILURE );
}
+ /*
print_now("feed the data ... ");
feed_data(b_len);
printf("done.\n");
+ */
- report( b_len, sequential_writes( ring, b_len ) );
- report( sequential_reads( ring, b_len ) );
+ report( b_len, sequential_writes( ring, b_len, 0 ) );
+ report( b_len, sequential_reads( ring, b_len, 0 ) );
+ report( b_len, sequential_writes( ring, b_len, 0 ) );
+ report( b_len, sequential_reads( ring, b_len, 0 ) );
lf_ring_buffer_destroy( ring );