diff options
author | Jérémy Zurcher <jeremy@asynk.ch> | 2010-01-06 22:36:29 +0100 |
---|---|---|
committer | Jérémy Zurcher <jeremy@asynk.ch> | 2010-01-06 22:36:29 +0100 |
commit | 174afdb2c0eb0e418980c1fa1c0afe41277b5b4d (patch) | |
tree | 55a2901196983c31017dac6e4fa0f35e43dadd5e | |
parent | 80b8ee53cbf9e69f6831124f469358aa7d4e97d4 (diff) | |
download | lock_free-174afdb2c0eb0e418980c1fa1c0afe41277b5b4d.zip lock_free-174afdb2c0eb0e418980c1fa1c0afe41277b5b4d.tar.gz |
lf_ring_buffer is almost good + tests
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | Makefile | 17 | ||||
-rw-r--r-- | lf_ring_buffer.h | 30 | ||||
-rw-r--r-- | lf_ring_buffer_data.h | 21 | ||||
-rw-r--r-- | lf_ring_buffer_test.c | 218 |
5 files changed, 262 insertions, 26 deletions
@@ -3,5 +3,5 @@ cas container_of lf_fifo_test +lf_ring_buffer_test lock_free_queue_test -lock_free_ring_buffer_test @@ -3,30 +3,37 @@ CC = gcc STD = _GNU_SOURCE CFLAGS = -DDEBUG -BIN = cas container_of lock_free_queue_test lf_fifo_test lf_ring_buffer.o +BIN = cas container_of lock_free_queue_test lf_fifo_test lf_ring_buffer_test .c.o: - $(CC) -c -Wall -I. $(CFLAGS) -D$(STD) $< + $(CC) -march=i686 -O2 -c -Wall -I. $(CFLAGS) -D$(STD) $< all: $(BIN) +cas.o: cas.c cas: cas.o $(CC) -S cas.c $(CC) cas.o -o cas +container_of.o: container_of.c container_of: container_of.o $(CC) -S container_of.c $(CC) container_of.o -o container_of +lock_free_queue.o: lock_free_queue.h lock_free_queue.c +lock_free_queue_test.o: lock_free_queue_test.c lock_free_queue_test: lock_free_queue.o lock_free_queue_test.o $(CC) lock_free_queue.o lock_free_queue_test.o -o lock_free_queue_test -lf_fifo.o: lf_fifo.h lf_cas.h - +lf_fifo.o: lf_fifo.h lf_fifo.c lf_cas.h +lf_fifo_test.o: lf_fifo_test.c lf_fifo_test: lf_fifo.o lf_fifo_test.o $(CC) lf_fifo.o lf_fifo_test.o -o lf_fifo_test -lf_ring_buffer.o: lf_ring_buffer.h lf_portable_cas.h +lf_ring_buffer.o: lf_ring_buffer.h lf_ring_buffer.c lf_portable_cas.h +lf_ring_buffer_test.o: lf_ring_buffer_test.c +lf_ring_buffer_test: lf_ring_buffer.o lf_ring_buffer_test.o + $(CC) -lrt lf_ring_buffer.o lf_ring_buffer_test.o -o lf_ring_buffer_test clean: rm -f *~ *.o *.s core $(BIN) diff --git a/lf_ring_buffer.h b/lf_ring_buffer.h index e9a5727..a7c7786 100644 --- a/lf_ring_buffer.h +++ b/lf_ring_buffer.h @@ -32,6 +32,8 @@ extern "C" { # endif /* __cplusplus */ +#include "lf_ring_buffer_data.h" + #define BACKOFF_DELAY_INIT 1000 #define BACKOFF_DELAY_INC 3000 #define BACKOFF_DELAY_MAX 90000 @@ -39,27 +41,15 @@ extern "C" { #define NO_BLOCK 1 /* if buffer is full, leave instead of try again and again */ #define IS_NOT_BLOCKING( flags ) ( (flags)&NO_BLOCK ) -#define RB_DATA_SIZE 63 -typedef char rb_data_t; - -typedef struct buffer_el { - char status; - rb_data_t data[RB_DATA_SIZE]; -} lf_buffer_el_t; - typedef struct ring_buffer { - lf_buffer_el_t *buffer; /* buffer data */ - int n_buf; /* number of buffers */ - int read_from; /* index where to read data from */ - int write_to; /* index where to write data to */ - int write_delay; /* backoff nanosleep to reduce fast looping when writing */ - int read_delay; /* backoff nanosleep to reduce fast looping when reading */ + LFRB_BUFFER_TYPE *buffer; /* buffer data */ + int n_buf; /* number of buffers */ + int read_from; /* index where to read data from */ + int write_to; /* index where to write data to */ + int write_delay; /* backoff nanosleep to reduce fast looping when writing */ + int read_delay; /* backoff nanosleep to reduce fast looping when reading */ } lf_ring_buffer_t; -#define IS_AVAILABLE( idx ) (r->buffer[(idx)].status==0) -#define MARK_AS_FILLED( idx ) { r->buffer[(idx)].status=1; } -#define MARK_AS_READ( idx ) { r->buffer[(idx)].status=0; } - /* return an initialized lf_ring_buffer_t struct */ lf_ring_buffer_t* lf_ring_buffer_create( int n_buf ); @@ -70,13 +60,13 @@ void lf_ring_buffer_destroy( lf_ring_buffer_t *r ); * return 0 on success * return -1 if IS_NOT_BLOCKING and buffer is full */ -int lf_ring_buffer_write( lf_ring_buffer_t *r, rb_data_t *data, int flags ); +int lf_ring_buffer_write( lf_ring_buffer_t *r, void *data, int flags ); /* read data from the ring buffer * return 0 on success * return -1 if IS_NOT_BLOCKING and buffer is empty */ -int lf_ring_buffer_read( lf_ring_buffer_t *r, rb_data_t *data, int flags ); +int lf_ring_buffer_read( lf_ring_buffer_t *r, void *data, int flags ); # ifdef __cplusplus } diff --git a/lf_ring_buffer_data.h b/lf_ring_buffer_data.h new file mode 100644 index 0000000..cf6e9c9 --- /dev/null +++ b/lf_ring_buffer_data.h @@ -0,0 +1,21 @@ + +typedef char rb_data_t; + +#define RB_DATA_LEN 63 + +typedef struct buffer { + char status; + rb_data_t data[RB_DATA_LEN]; +} rb_buffer_t; + +#define LFRB_DATA_SIZE ( sizeof(rb_data_t)*RB_DATA_LEN ) + +#define LFRB_BUFFER_TYPE rb_buffer_t +#define LFRB_BUFFER_SIZE ( sizeof(rb_buffer_t) ) + +#define LFRB_IS_AVAILABLE( el ) (el.status==0) +#define LFRB_MARK_AS_FILLED( el ) { (el).status=1; } +#define LFRB_MARK_AS_READ( el ) { (el).status=0; } +#define LFRB_DATA_PTR( el ) (el).data + + diff --git a/lf_ring_buffer_test.c b/lf_ring_buffer_test.c new file mode 100644 index 0000000..31d4810 --- /dev/null +++ b/lf_ring_buffer_test.c @@ -0,0 +1,218 @@ +#include <unistd.h> +#include <pthread.h> +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> + +#include "lf_ring_buffer.h" + +#define BUFFER_LEN 5000000 + +static rb_data_t[BUFFER_LEN][RB_DATA_LEN] + +static const double secs_per_tick = 1.0 / CLOCKS_PER_SEC; + +static int64_t time_diff(struct timespec *t0, struct timespec *t1) +{ + return ((t1->tv_sec * 1000000000) + t1->tv_nsec) - ((t0->tv_sec * 1000000000) + t0->tv_nsec); +} + +static void print_now(char* s) { + fprintf(stdout,s); + fflush(stdout); +} + +static void report( int n, uint64_t dt ) { + fprintf(stdout,"%d operations in %d [ms] => %d [us] => %d [ns]\t%d [ns/op]\n", n, (int)(dt/1000000), (int)(dt/1000), (int)dt, (int)(dt/n) ); +} + +static void feed_data( int n){ + int i; + for(i=0; i<n; i++){ + sprintf(data[i],"hello world %04d\n",i); + } +} + +static uint64_t sequential_writes( lf_ring_buffer_t *ring, int n ) { + int i; + struct timespec start, end; + + print_now("sequential writes ... "); + + clock_gettime(CLOCK_MONOTONIC, &start); + + for(i=0; i<n; i++) lf_ring_buffer_write( ring, data[i], 0 ); + + clock_gettime(CLOCK_MONOTONIC, &end); + printf("done.\n"); + + return time_diff( &start, &end ); +} + +static uint64_t sequential_reads( lf_ring_buffer_t *ring, int n ) { + int i; + char data[20]; + struct timespec start, end; + + print_now("sequential reads ... "); + + clock_gettime(CLOCK_MONOTONIC, &start); + + for(i=0; i<n; i++) lf_ring_buffer_read( ring, data, 0 ); + + clock_gettime(CLOCK_MONOTONIC, &end); + printf("done.\n"); + + return time_diff( &start, &end ); +} + +int main( int argc, char** argv ) { + + int b_len = BUFFER_LEN; + lf_ring_buffer_t *ring; + + ring = lf_ring_buffer_create( b_len ); + if(ring==NULL){ + fprintf(stderr,"ERROR : lf_ring_buffer_create( %d );\n",b_len); + exit( EXIT_FAILURE ); + } + + print_now("feed the data ... "); + feed_data(b_len); + printf("done.\n"); + + report( b_len, sequential_writes( ring, b_len ) ); + report( sequential_reads( ring, b_len ) ); + + lf_ring_buffer_destroy( ring ); + + return EXIT_SUCCESS; +} + +/* +#define ARRAY_SIZE 2 +#define MAX_VALUE 0x10000 + +jack_ringbuffer_t *rb; +volatile int flowing = 0; + +static int +fill_int_array (int *array, int start, int count) +{ + int i, j = start; + for (i = 0; i < count; i++) + { + array[i] = j; + j = (j + 1) % MAX_VALUE; + } + return j; +} + +static int +cmp_array (int *array1, int *array2, int count) +{ + int i; + for (i = 0; i < count; i++) + if (array1[i] != array2[i]) + { + printf("%d != %d at offset %d\n", array1[i], array2[i], i); + fflush(stdout); + return 0; + } + + return 1; +} + +static void * +reader_start (void * arg) +{ + int i = 0, a[ARRAY_SIZE], b[ARRAY_SIZE]; + unsigned long j = 0, nfailures = 0; + printf("[reader started] "); + fflush(stdout); + i = fill_int_array (a, i, ARRAY_SIZE); + while (1) + { + if (j == 100 && !flowing) + { + printf("[flowing] "); + fflush(stdout); + flowing = 1; + } + + if (jack_ringbuffer_read_space (rb) >= (int) ARRAY_SIZE * (int) sizeof (int)) + { + if (jack_ringbuffer_read (rb, (char *) b, ARRAY_SIZE * sizeof (int))) + { + if (!cmp_array (a, b, ARRAY_SIZE)) + { + nfailures++; + // + //printf("failure in chunk %lu - probability: %lu/%lu = %.3f per million\n", + // j, nfailures, j, (float) nfailures / (j + 1) * 1000000); + //i = (b[0] + ARRAY_SIZE) % MAX_VALUE; + // + printf("FAILURE in chunk %lu\n", j); + fflush(stdout); + exit(1); + } + i = fill_int_array (a, i, ARRAY_SIZE); + j++; + } + } + } + + return NULL; +} + +static void * +writer_start (void * arg) +{ + int i = 0, a[ARRAY_SIZE]; + printf("[writer started] "); + fflush(stdout); + + i = fill_int_array (a, i, ARRAY_SIZE); + + while (1) + { + if (jack_ringbuffer_write_space (rb) >= (int) ARRAY_SIZE * (int) sizeof (int)) + { + if (jack_ringbuffer_write (rb, (char *) a, ARRAY_SIZE * sizeof (int))) + { + i = fill_int_array (a, i, ARRAY_SIZE); + } + } + } + + return NULL; +} + +int main(int argc, char *argv[]) +{ + int size; + sscanf(argv[1], "%d", &size); + printf("starting (120s max) - array/buffer size: %d/%d\n", + (int) sizeof(int) * ARRAY_SIZE, size); + fflush(stdout); + rb = jack_ringbuffer_create(size); + pthread_t reader_thread, writer_thread; + if (pthread_create (&reader_thread, NULL, reader_start, NULL)) + { + printf("Failed to create reader thread\n"); + exit(1); + } + if (pthread_create (&writer_thread, NULL, writer_start, NULL)) + { + printf("Failed to create writer thread\n"); + exit(1); + } + sleep(120); + if (flowing) + printf("SUCCESS\n"); + else + printf("FAILURE: data did not flow\n"); + fflush(stdout); + return 0; +} +*/ |