summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lf_ring_buffer.c22
-rw-r--r--lf_ring_buffer.h16
-rw-r--r--lf_ring_buffer_test.c112
3 files changed, 112 insertions, 38 deletions
diff --git a/lf_ring_buffer.c b/lf_ring_buffer.c
index ad866e0..30532fc 100644
--- a/lf_ring_buffer.c
+++ b/lf_ring_buffer.c
@@ -63,7 +63,7 @@
#endif
/* initialize an empty lf_ring_buffer struct */
-lf_ring_buffer_t* lf_ring_buffer_create( int n_buf ) {
+lf_ring_buffer_t* lf_ring_buffer_create( size_t n_buf ) {
/* alloc ring_buffer struct */
lf_ring_buffer_t *r = malloc(sizeof(lf_ring_buffer_t));
if(r==NULL) return NULL;
@@ -77,8 +77,8 @@ lf_ring_buffer_t* lf_ring_buffer_create( int n_buf ) {
r->n_buf = n_buf;
r->read_from = -1;
r->write_to = 0;
- r->write_delay=BACKOFF_DELAY_INIT;
- r->read_delay=BACKOFF_DELAY_INIT;
+ r->write_delay = BACKOFF_DELAY_INIT;
+ r->read_delay = BACKOFF_DELAY_INIT;
return r;
}
@@ -119,7 +119,9 @@ int lf_ring_buffer_write( lf_ring_buffer_t *r, void *data, int flags ) {
if(IS_NOT_BLOCKING(flags)) return -1;
st.tv_nsec=r->write_delay;
nanosleep(&st,NULL);
- if(r->write_delay<BACKOFF_DELAY_MAX) r->write_delay+=BACKOFF_DELAY_INC;
+ if( !(BACKOFF_INC_NOT(flags)) ) {
+ if(r->write_delay<BACKOFF_DELAY_MAX) r->write_delay+=BACKOFF_DELAY_INC;
+ }
}
}
/* try to set read_from on idx if it has not been initialized yet
@@ -132,7 +134,9 @@ int lf_ring_buffer_write( lf_ring_buffer_t *r, void *data, int flags ) {
/* fill this buffer and mark it as filled */
memcpy( r->buffer[idx].data, data, LFRB_DATA_SIZE );
LFRB_MARK_AS_FILLED( r->buffer[idx] );
- if(r->write_delay>BACKOFF_DELAY_INIT) r->write_delay-=BACKOFF_DELAY_INC;
+ if( !(BACKOFF_INC_NOT(flags)) ) {
+ if(r->write_delay>BACKOFF_DELAY_INIT) r->write_delay-=BACKOFF_DELAY_INC;
+ }
return 0;
}
@@ -163,12 +167,16 @@ int lf_ring_buffer_read( lf_ring_buffer_t *r, void *data, int flags ) {
if(IS_NOT_BLOCKING(flags)) return -1;
st.tv_nsec=r->read_delay;
nanosleep(&st,NULL);
- if(r->read_delay<BACKOFF_DELAY_MAX) r->read_delay+=BACKOFF_DELAY_INC;
+ if( !(BACKOFF_INC_NOT(flags)) ) {
+ if(r->read_delay<BACKOFF_DELAY_MAX) r->read_delay+=BACKOFF_DELAY_INC;
+ }
}
}
/* finish the read process */
LFRB_MARK_AS_READ( r->buffer[idx] );
- if(r->read_delay>BACKOFF_DELAY_INIT) r->read_delay-=BACKOFF_DELAY_INC;
+ if( !(BACKOFF_INC_NOT(flags)) ) {
+ if(r->read_delay>BACKOFF_DELAY_INIT) r->read_delay-=BACKOFF_DELAY_INC;
+ }
return 0;
}
diff --git a/lf_ring_buffer.h b/lf_ring_buffer.h
index 790cba9..51d29e5 100644
--- a/lf_ring_buffer.h
+++ b/lf_ring_buffer.h
@@ -32,18 +32,22 @@
extern "C" {
# endif /* __cplusplus */
+#include <sys/types.h>
#include "lf_ring_buffer_data.h"
-#define BACKOFF_DELAY_INIT 1000
-#define BACKOFF_DELAY_INC 3000
-#define BACKOFF_DELAY_MAX 90000
+#define BACKOFF_DELAY_INIT 100
+#define BACKOFF_DELAY_INC 100
+#define BACKOFF_DELAY_MAX 1000
-#define LFRB_NO_BLOCK 1 /* if buffer is full, leave instead of try again and again */
+#define LFRB_NO_BLOCK 1 /* if buffer is full, leave instead of try again and again */
#define IS_NOT_BLOCKING( flags ) ( (flags)&LFRB_NO_BLOCK )
+#define LFRB_NO_BACKOFF_INC 2 /* do not use BACKOFF_DELAY_X */
+#define BACKOFF_INC_NOT( flags ) ( (flags)&LFRB_NO_BACKOFF_INC )
+
typedef struct ring_buffer {
LFRB_BUFFER_TYPE *buffer; /* buffer data */
- int n_buf; /* number of buffers */
+ size_t n_buf; /* number of buffers */
int read_from; /* index where to read data from */
int write_to; /* index where to write data to */
int write_delay; /* backoff nanosleep to reduce fast looping when writing */
@@ -51,7 +55,7 @@ typedef struct ring_buffer {
} lf_ring_buffer_t;
/* return an initialized lf_ring_buffer_t struct */
-lf_ring_buffer_t* lf_ring_buffer_create( int n_buf );
+lf_ring_buffer_t* lf_ring_buffer_create( size_t n_buf );
/* destroy an lf_ring_buffer_t struct */
void lf_ring_buffer_destroy( lf_ring_buffer_t *r );
diff --git a/lf_ring_buffer_test.c b/lf_ring_buffer_test.c
index e9e199a..b3ec016 100644
--- a/lf_ring_buffer_test.c
+++ b/lf_ring_buffer_test.c
@@ -1,4 +1,3 @@
-#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
@@ -8,22 +7,31 @@
#define BUFFER_LEN 5000000
-static rb_data_t data[BUFFER_LEN][RB_DATA_LEN];
+//static rb_data_t data[BUFFER_LEN][RB_DATA_LEN];
static int64_t time_diff(struct timespec *t0, struct timespec *t1)
{
return ((t1->tv_sec * 1000000000) + t1->tv_nsec) - ((t0->tv_sec * 1000000000) + t0->tv_nsec);
}
+/*
static void print_now(char* s) {
fprintf(stdout,s);
fflush(stdout);
}
+*/
-static void report( int n, uint64_t dt ) {
- fprintf(stdout,"%d operations in %d [ms] => %d [us] => %d [ns]\t >>> %d [ns/op]\n", n, (int)(dt/1000000), (int)(dt/1000), (int)dt, (int)(dt/n) );
+static void report( char* op, int n, uint64_t dt, int redo ) {
+ fprintf(stdout,"\t%9d %s operations in %4d [ms] => %7d [us] => %10d [ns]\t >>> %6d [ns/op]\t%4d redone operations\n",
+ n, op, (int)(dt/1000000), (int)(dt/1000), (int)dt, (int)(dt/n), redo );
}
+struct thread_params {
+ lf_ring_buffer_t *ring;
+ int n;
+ int flags;
+};
+
/*
static void feed_data( int n){
int i;
@@ -34,41 +42,68 @@ static void feed_data( int n){
*/
static uint64_t sequential_writes( lf_ring_buffer_t *ring, int n, int flags ) {
- int i;
+ int i, redo=0;
rb_data_t data[RB_DATA_LEN];
struct timespec start, end;
-
- print_now("sequential writes ... ");
-
clock_gettime(CLOCK_MONOTONIC, &start);
-
- for(i=0; i<n; i++) lf_ring_buffer_write( ring, data, flags );
-
+ if(flags==0) {
+ for(i=0; i<n; i++) lf_ring_buffer_write( ring, data, flags );
+ } else {
+ for(i=0; i<n;) {
+ if(lf_ring_buffer_write( ring, data, flags )==0) { i++; } else { redo+=1; }
+ }
+ }
clock_gettime(CLOCK_MONOTONIC, &end);
- printf("done.\n");
-
+ report( "write", n, time_diff( &start, &end ), redo );
return time_diff( &start, &end );
}
static uint64_t sequential_reads( lf_ring_buffer_t *ring, int n, int flags ) {
- int i;
+ int i,redo=0;
rb_data_t data[RB_DATA_LEN];
struct timespec start, end;
-
- print_now("sequential reads ... ");
-
clock_gettime(CLOCK_MONOTONIC, &start);
+ for(i=0; i<n;) {
+ if(lf_ring_buffer_read( ring, data, flags )==0) { i++; } else { redo+=1; }
+ }
+ clock_gettime(CLOCK_MONOTONIC, &end);
+ report( "read ", n, time_diff( &start, &end ), redo );
+ return time_diff( &start, &end );
+}
- for(i=0; i<n; i++) lf_ring_buffer_read( ring, data, flags );
+void* writer_thread( void* param ) {
+ struct thread_params *params = (struct thread_params*)param;
+ //report( "write", params->n, sequential_writes( params->ring, params->n, params->flags ) );
+ sequential_writes( params->ring, params->n, params->flags );
+ return NULL;
+}
- clock_gettime(CLOCK_MONOTONIC, &end);
- printf("done.\n");
+static void parallel_writes( int nt, lf_ring_buffer_t *ring, int n, int flags ) {
+ int i;
- return time_diff( &start, &end );
+ pthread_t *threads = malloc( sizeof(pthread_t)*nt);
+ struct thread_params *params = malloc( sizeof(struct thread_params)*nt);
+
+ for(i=0; i<nt; i++) {
+ params[i].ring = ring;
+ params[i].n = n/nt;
+ params[i].flags = flags;
+ if (pthread_create( &threads[i], NULL, writer_thread, &params[i])) {
+ fprintf(stderr,"Failed to create reader thread[%d]\n",i);
+ exit(1);
+ }
+ }
+ for(i=0; i<nt; i++) {
+ pthread_join( threads[i], NULL );
+ }
+ /* empty the ring */
+ free(threads);
+ free(params);
}
int main( int argc, char** argv ) {
+ int i;
int b_len = BUFFER_LEN;
lf_ring_buffer_t *ring;
@@ -84,10 +119,37 @@ int main( int argc, char** argv ) {
printf("done.\n");
*/
- report( b_len, sequential_writes( ring, b_len, 0 ) );
- report( b_len, sequential_reads( ring, b_len, 0 ) );
- report( b_len, sequential_writes( ring, b_len, 0 ) );
- report( b_len, sequential_reads( ring, b_len, 0 ) );
+ printf("sequential non blocking write operations ...\n");
+ //report( "write", b_len, sequential_writes( ring, b_len, 0 ) );
+ sequential_writes( ring, b_len, 0 );
+ printf("sequential non blocking read operations ...\n");
+ //report( "read ", b_len, sequential_reads( ring, b_len, 0 ) );
+ sequential_reads( ring, b_len, 0 );
+ printf("sequential blocking write operations ...\n");
+ //report( "write", b_len, sequential_writes( ring, b_len, 0 ) );
+ sequential_writes( ring, b_len, LFRB_NO_BLOCK );
+ printf("sequential blocking read operations ...\n");
+ //report( "read ", b_len, sequential_reads( ring, b_len, 0 ) );
+ sequential_reads( ring, b_len, LFRB_NO_BLOCK );
+
+ for(i=5; i<=100;i*=22) {
+ printf("%d parallel blocking with backoff inc write operations .... \n",i);
+ parallel_writes( i, ring, b_len, 0 );
+ printf("parallel blocking read operations ...\n");
+ sequential_reads( ring, b_len, 0 );
+ }
+ for(i=5; i<=100;i*=22) {
+ printf("%d parallel blocking with no backoff inc write operations .... \n",i);
+ parallel_writes( i, ring, b_len, LFRB_NO_BACKOFF_INC );
+ printf("parallel blocking read operations ...\n");
+ sequential_reads( ring, b_len, LFRB_NO_BACKOFF_INC );
+ }
+ for(i=5; i<=100;i*=22) {
+ printf("%d parallel non blocking write operations .... \n",i);
+ parallel_writes( i, ring, b_len, LFRB_NO_BLOCK );
+ printf("non blocking read operations ...\n");
+ sequential_reads( ring, b_len, LFRB_NO_BLOCK );
+ }
lf_ring_buffer_destroy( ring );