diff options
author | Jérémy Zurcher <jeremy@asynk.ch> | 2010-01-10 17:06:55 +0100 |
---|---|---|
committer | Jérémy Zurcher <jeremy@asynk.ch> | 2010-01-10 17:06:55 +0100 |
commit | 3a331b078b7833b6af67ce86f5641869088d82eb (patch) | |
tree | 10c0bad7332080814814bed394cd55b7a369491b | |
parent | bb68ccdcc1dab0491edeb89c1382e95ebd16bf7a (diff) | |
download | lock_free-3a331b078b7833b6af67ce86f5641869088d82eb.zip lock_free-3a331b078b7833b6af67ce86f5641869088d82eb.tar.gz |
second ring buffer implementation
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | Makefile | 7 | ||||
-rw-r--r-- | lf_ring_buffer2.c | 168 | ||||
-rw-r--r-- | lf_ring_buffer2.h | 72 | ||||
-rw-r--r-- | lf_ring_buffer_test2.c | 185 |
5 files changed, 432 insertions, 1 deletions
@@ -4,4 +4,5 @@ cas container_of lf_fifo_test lf_ring_buffer_test +lf_ring_buffer_test2 lock_free_queue_test @@ -3,7 +3,7 @@ CC = gcc STD = _GNU_SOURCE CFLAGS = -DDEBUG -BIN = cas container_of lock_free_queue_test lf_fifo_test lf_ring_buffer_test +BIN = cas container_of lock_free_queue_test lf_fifo_test lf_ring_buffer_test lf_ring_buffer_test2 .c.o: $(CC) -march=i686 -O2 -c -Wall -I. $(CFLAGS) -D$(STD) $< @@ -35,5 +35,10 @@ lf_ring_buffer_test.o: lf_ring_buffer_test.c lf_ring_buffer_test: lf_ring_buffer.o lf_ring_buffer_test.o $(CC) -lrt lf_ring_buffer.o lf_ring_buffer_test.o -o lf_ring_buffer_test +lf_ring_buffer2.o: lf_ring_buffer2.h lf_ring_buffer2.c lf_ring_buffer_data.h lf_portable_cas.h +lf_ring_buffer_test2.o: lf_ring_buffer_test2.c +lf_ring_buffer_test2: lf_ring_buffer2.o lf_ring_buffer_test2.o + $(CC) -lrt lf_ring_buffer2.o lf_ring_buffer_test2.o -o lf_ring_buffer_test2 + clean: rm -f *~ *.o *.s core $(BIN) diff --git a/lf_ring_buffer2.c b/lf_ring_buffer2.c new file mode 100644 index 0000000..d4ff4d8 --- /dev/null +++ b/lf_ring_buffer2.c @@ -0,0 +1,168 @@ +/* + * File : lf_ring_buffer.c + * Author : Jérémy Zurcher <jeremy@asynk.ch> + * Date : 05/01/010 + * License : + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + */ + +#include "lf_ring_buffer2.h" +#include "lf_portable_cas.h" +#include <stdlib.h> +#include <string.h> +#include <time.h> + +//#define DEBUG_LFRB_KO 1 +//#define DEBUG_LFRB_CAS 1 +//#define DEBUG_LFRB 1 + +#ifdef DEBUG_LFRB + #include <stdio.h> + #define _LOG_KO_( ... ) fprintf(stdout,__VA_ARGS__) + #define _LOG_CAS_( ... ) fprintf(stdout,__VA_ARGS__) +#elif defined DEBUG_LFRB_CAS + #include <stdio.h> + #define _LOG_KO_( ... ) + #define _LOG_CAS_( ... ) fprintf(stdout,__VA_ARGS__) +#elif defined DEBUG_LFRB_KO + #include <stdio.h> + #define _LOG_KO_( ... ) fprintf(stdout,__VA_ARGS__) + #define _LOG_CAS_( ... ) +#else + #define _LOG_KO_( ... ) + #define _LOG_CAS_( ... ) +#endif + +#define BACKOFF_NANO_SLEEP 100000 + +#define USHORTMAX 0xffff + +/* initialize an empty lf_ring_buffer struct */ +lf_ring_buffer_t* lf_ring_buffer_create( size_t n_buf ) { + if(n_buf>=USHORTMAX) { + return NULL; + } + /* alloc ring_buffer struct */ + lf_ring_buffer_t *r = malloc(sizeof(lf_ring_buffer_t)); + if(r==NULL) return NULL; + /* */ + r->buffer = malloc(LFRB_BUFFER_SIZE*n_buf); + if(r->buffer==NULL) { + free(r); + return NULL; + } + memset(r->buffer,0,LFRB_BUFFER_SIZE*n_buf); + r->n_buf = n_buf; + r->indexes = (unsigned int)(USHORTMAX<<16) | (unsigned int)0; + return r; +} + +/* destroy an lf_ring_buffer strcture */ +void lf_ring_buffer_destroy( lf_ring_buffer_t *r ) { + free(r->buffer); + free(r); +} + +/* return 1 if is empty */ +int lf_ring_buffer_empty( lf_ring_buffer_t *r ) { return (r->indexes>>16)==USHORTMAX; } + +/* write data into the ring buffer */ +int lf_ring_buffer_write( lf_ring_buffer_t *r, void *data, int flags ) { + unsigned int current, next; + unsigned int write_to, read_from; + struct timespec backoff; + int backoff_time = BACKOFF_NANO_SLEEP; + /* reserve a buffer */ + for(;;){ + /* copy indexes and split it */ + current = r->indexes; + write_to = current&0xffff; + read_from = current>>16; + /* + * check if the buffer is available, + * if it is but read_from==write, + * it means that the buffer is full and that a writer thread which at first reserved this buffer + * hasn't had enough CPU cycles to call MARK_AS_FILLED + */ + if( LFRB_IS_AVAILABLE( r->buffer[write_to] ) && read_from!=write_to ) { + next = write_to+1; + if (next==r->n_buf) next=0; + /* set read_from to write_to if needed */ + if (read_from==USHORTMAX) { + next |= write_to<<16; + } else { + next |= read_from<<16; + } + /* try to update indexes */ + _LOG_CAS_( "write: CAS %u %u %u\n", r->indexes, current, next ); + if( CompareAndSwapInt( &r->indexes, current, next ) ) break; + } else { + _LOG_KO_("write: fail : %d %d %d\n",LFRB_IS_AVAILABLE(r->buffer[write_to]),write_to,read_from); + if(IS_NOT_BLOCKING(flags)) return -1; + } + backoff.tv_sec = 0; + backoff.tv_nsec = backoff_time; + nanosleep(&backoff,NULL); + backoff_time += BACKOFF_NANO_SLEEP; + } + /* fill this buffer and mark it as filled */ + memcpy( LFRB_DATA_PTR(r->buffer[write_to]), data, LFRB_DATA_SIZE ); + LFRB_MARK_AS_FILLED( r->buffer[write_to] ); + return 0; +} + +/* read data from the ring buffer */ +int lf_ring_buffer_read( lf_ring_buffer_t *r, void *data, int flags ) { + unsigned int current, next; + unsigned int write_to, read_from, tmp; + struct timespec backoff; + int backoff_time = BACKOFF_NANO_SLEEP; + for(;;) { + current = r->indexes; + write_to = current&0xffff; + read_from = current>>16; + if( !(LFRB_IS_AVAILABLE( r->buffer[read_from] )) && read_from!=USHORTMAX ) { + tmp = read_from +1; + if (tmp==r->n_buf) tmp=0; + /* is the buffer empty */ + if ( tmp==write_to) { + tmp = USHORTMAX; + } + next = tmp<<16 | write_to; + _LOG_CAS_( "read: CAS %u %u %u\n", r->indexes, current, next ); + if( CompareAndSwapInt( &r->indexes, current , next ) ) break; + } else { + _LOG_KO_("read: ring empty\n"); + if(IS_NOT_BLOCKING(flags)) return -1; + } + backoff.tv_sec = 0; + backoff.tv_nsec = backoff_time; + nanosleep(&backoff,NULL); + backoff_time += BACKOFF_NANO_SLEEP; + } + /* will do bad things if data dst buffer is too small !! */ + memcpy( data, LFRB_DATA_PTR(r->buffer[read_from]), LFRB_DATA_SIZE ); + /* finish the read process */ + LFRB_MARK_AS_READ( r->buffer[read_from] ); + return 0; +} + diff --git a/lf_ring_buffer2.h b/lf_ring_buffer2.h new file mode 100644 index 0000000..6f0cee1 --- /dev/null +++ b/lf_ring_buffer2.h @@ -0,0 +1,72 @@ +/* + * File : lf_ring_buffer.h + * Author : Jérémy Zurcher <jeremy@asynk.ch> + * Date : 05/01/010 + * License : + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + */ + +#ifndef _LF_RING_BUFFER_H_ +#define _LF_RING_BUFFER_H_ + +# ifdef __cplusplus +extern "C" { +# endif /* __cplusplus */ + +#include <sys/types.h> +#include "lf_ring_buffer_data.h" + +#define LFRB_NO_BLOCK 1 /* if buffer is full, leave instead of try again and again */ +#define IS_NOT_BLOCKING( flags ) ( (flags)&LFRB_NO_BLOCK ) + +typedef struct ring_buffer { + LFRB_BUFFER_TYPE *buffer; /* buffer data */ + size_t n_buf; /* number of buffers */ + unsigned int indexes; /* indexes where to read from and write data to */ +} lf_ring_buffer_t; + +/* return an initialized lf_ring_buffer_t struct */ +lf_ring_buffer_t* lf_ring_buffer_create( size_t n_buf ); + +/* destroy an lf_ring_buffer_t struct */ +void lf_ring_buffer_destroy( lf_ring_buffer_t *r ); + +/* return 1 if is empty */ +int lf_ring_buffer_empty( lf_ring_buffer_t *r ); + +/* write data into the ring buffer + * return 0 on success + * return -1 if IS_NOT_BLOCKING and buffer is full + */ +int lf_ring_buffer_write( lf_ring_buffer_t *r, void *data, int flags ); + +/* read data from the ring buffer + * return 0 on success + * return -1 if IS_NOT_BLOCKING and buffer is empty + */ +int lf_ring_buffer_read( lf_ring_buffer_t *r, void *data, int flags ); + +# ifdef __cplusplus +} +# endif /* __cplusplus */ + +# endif /* _LF_RING_BUFFER_H_ */ diff --git a/lf_ring_buffer_test2.c b/lf_ring_buffer_test2.c new file mode 100644 index 0000000..ecceede --- /dev/null +++ b/lf_ring_buffer_test2.c @@ -0,0 +1,185 @@ +#include <pthread.h> +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> + +#include "lf_ring_buffer.h" + +#define BUFFER_LEN 65000 + +//static rb_data_t data[BUFFER_LEN][RB_DATA_LEN]; + +static int64_t time_diff(struct timespec *t0, struct timespec *t1) +{ + return ((t1->tv_sec * 1000000000) + t1->tv_nsec) - ((t0->tv_sec * 1000000000) + t0->tv_nsec); +} + +/* +static void print_now(char* s) { + fprintf(stdout,s); + fflush(stdout); +} +*/ + +static void report( char* op, int n, uint64_t dt, int redo ) { + fprintf(stdout,"\t%9d %s operations in %4d [ms] => %7d [us] => %10d [ns]\t >>> %6d [ns/op]\t%4d redone operations\n", + n, op, (int)(dt/1000000), (int)(dt/1000), (int)dt, (int)(dt/n), redo ); +} + +struct thread_params { + lf_ring_buffer_t *ring; + int n; + int flags; +}; + +/* +static void feed_data( int n){ + int i; + for(i=0; i<n; i++){ + sprintf(data[i],"hello world %04d\n",i); + } +} +*/ + +static uint64_t sequential_writes( lf_ring_buffer_t *ring, int n, int flags ) { + int i, redo=0; + rb_data_t data[RB_DATA_LEN]; + struct timespec start, end; + clock_gettime(CLOCK_MONOTONIC, &start); + if(flags==0) { + for(i=0; i<n; i++) lf_ring_buffer_write( ring, data, flags ); + } else { + for(i=0; i<n;) { + if(lf_ring_buffer_write( ring, data, flags )==0) { i++; } else { redo+=1; } + } + } + clock_gettime(CLOCK_MONOTONIC, &end); + report( "write", n, time_diff( &start, &end ), redo ); + return time_diff( &start, &end ); +} + +static uint64_t sequential_reads( lf_ring_buffer_t *ring, int n, int flags ) { + int i,redo=0; + rb_data_t data[RB_DATA_LEN]; + struct timespec start, end; + clock_gettime(CLOCK_MONOTONIC, &start); + if(flags==0) { + for(i=0; i<n; i++) lf_ring_buffer_read( ring, data, flags ); + } else { + for(i=0; i<n;) { + if(lf_ring_buffer_read( ring, data, flags )==0) { i++; } else { redo+=1; } + } + } + clock_gettime(CLOCK_MONOTONIC, &end); + report( "read ", n, time_diff( &start, &end ), redo ); + return time_diff( &start, &end ); +} + +void* writer_thread( void* param ) { + struct thread_params *params = (struct thread_params*)param; + //report( "write", params->n, sequential_writes( params->ring, params->n, params->flags ) ); + sequential_writes( params->ring, params->n, params->flags ); + return NULL; +} + +void* reader_thread( void* param ) { + struct thread_params *params = (struct thread_params*)param; + //report( "read ", params->n, sequential_reads( params->ring, params->n, params->flags ) ); + sequential_reads( params->ring, params->n, params->flags ); + return NULL; +} + +static void parallel_op( int op, int nt, lf_ring_buffer_t *ring, int n, int flags ) { + int i; + + pthread_t *threads = malloc( sizeof(pthread_t)*nt); + struct thread_params *params = malloc( sizeof(struct thread_params)*nt); + + for(i=0; i<nt; i++) { + params[i].ring = ring; + params[i].n = n/nt; + params[i].flags = flags; + if(op==0) { + if (pthread_create( &threads[i], NULL, writer_thread, ¶ms[i])) { + fprintf(stderr,"Failed to create writer thread[%d]\n",i); + exit(1); + } + } else if(op==1) { + if (pthread_create( &threads[i], NULL, reader_thread, ¶ms[i])) { + fprintf(stderr,"Failed to create reader thread[%d]\n",i); + exit(1); + } + } else { + params[i].n /=2; + if (pthread_create( &threads[i], NULL, writer_thread, ¶ms[i])) { + fprintf(stderr,"Failed to create writer thread[%d]\n",i); + exit(1); + } + if (pthread_create( &threads[i], NULL, reader_thread, ¶ms[i])) { + fprintf(stderr,"Failed to create reader thread[%d]\n",i); + exit(1); + } + } + } + for(i=0; i<nt; i++) { + pthread_join( threads[i], NULL ); + } + /* empty the ring */ + free(threads); + free(params); +} + +int main( int argc, char** argv ) { + + int i; + int b_len = BUFFER_LEN; + lf_ring_buffer_t *ring; + + ring = lf_ring_buffer_create( b_len ); + if(ring==NULL){ + fprintf(stderr,"ERROR : lf_ring_buffer_create( %d );\n",b_len); + exit( EXIT_FAILURE ); + } + + /* + print_now("feed the data ... "); + feed_data(b_len); + printf("done.\n"); + */ + + printf("sequential non blocking write operations ...\n"); + sequential_writes( ring, b_len, 0 ); + printf("sequential non blocking read operations ...\n"); + sequential_reads( ring, b_len, 0 ); + if(!lf_ring_buffer_empty(ring)) { fprintf(stderr,"ring should be empty but is not\n"); exit( EXIT_FAILURE ); } + printf("sequential blocking write operations ...\n"); + sequential_writes( ring, b_len, LFRB_NO_BLOCK ); + printf("sequential blocking read operations ...\n"); + sequential_reads( ring, b_len, LFRB_NO_BLOCK ); + if(!lf_ring_buffer_empty(ring)) { fprintf(stderr,"ring should be empty but is not\n"); exit( EXIT_FAILURE ); } + + for(i=5; i<=50;i*=2) { + printf("%d parallel blocking with backoff inc write operations .... \n",i); + parallel_op( 0, i, ring, b_len, 0 ); + printf("parallel blocking read operations ...\n"); + sequential_reads( ring, b_len, 0 ); + if(!lf_ring_buffer_empty(ring)) { fprintf(stderr,"ring should be empty but is not\n"); exit( EXIT_FAILURE ); } + } + for(i=5; i<=50;i*=2) { + printf("%d parallel non blocking write operations .... \n",i); + parallel_op( 0, i, ring, b_len, LFRB_NO_BLOCK ); + printf("non blocking read operations ...\n"); + sequential_reads( ring, b_len, LFRB_NO_BLOCK ); + if(!lf_ring_buffer_empty(ring)) { fprintf(stderr,"ring should be empty but is not\n"); exit( EXIT_FAILURE ); } + } + for(i=10; i<=50;i*=2) { + printf("%d parallel blocking write and read operations .... \n",i*2); + parallel_op( 3, i, ring, b_len, 0 ); + if(!lf_ring_buffer_empty(ring)) { fprintf(stderr,"ring should be empty but is not\n"); exit( EXIT_FAILURE ); } + } + + lf_ring_buffer_destroy( ring ); + + return EXIT_SUCCESS; +} + |