From a42b187447f583595092dd04277ccd4bde802678 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20Zurcher?= Date: Tue, 5 Jan 2010 23:23:42 +0100 Subject: first lock free ring buffer implementation --- Makefile | 4 +- lf_portable_cas.h | 44 +++++++++++++++++++ lf_ring_buffer.c | 125 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ lf_ring_buffer.h | 85 +++++++++++++++++++++++++++++++++++++ 4 files changed, 257 insertions(+), 1 deletion(-) create mode 100644 lf_portable_cas.h create mode 100644 lf_ring_buffer.c create mode 100644 lf_ring_buffer.h diff --git a/Makefile b/Makefile index a12803a..5f766d3 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ CC = gcc STD = _GNU_SOURCE CFLAGS = -DDEBUG -BIN = cas container_of lock_free_queue_test lf_fifo_test +BIN = cas container_of lock_free_queue_test lf_fifo_test lf_ring_buffer.o .c.o: $(CC) -c -Wall -I. $(CFLAGS) -D$(STD) $< @@ -26,5 +26,7 @@ lf_fifo.o: lf_fifo.h lf_cas.h lf_fifo_test: lf_fifo.o lf_fifo_test.o $(CC) lf_fifo.o lf_fifo_test.o -o lf_fifo_test +lf_ring_buffer.o: lf_ring_buffer.h lf_portable_cas.h + clean: rm -f *~ *.o *.s core $(BIN) diff --git a/lf_portable_cas.h b/lf_portable_cas.h new file mode 100644 index 0000000..8e482fe --- /dev/null +++ b/lf_portable_cas.h @@ -0,0 +1,44 @@ +/* + * File : portable_cas.h + * Author : Jérémy Zurcher + * Date : 05/01/10 + * License : + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + */ + +#ifndef LF_PORTABLE_CAS_H_ +#define LF_PORTABLE_CAS_H_ + +#if __ENVIRONMENT_MAC_OS_X_VERSION_MIN_REQUIRED__ >= 1050 + #define CompareAndSwapInt(ptr,old_value, new_value) OSAtomicCompareAndSwapInt(old_value, new_value, ptr) + #define CompareAndSwapPointer(ptr,new_value,old_value) OSAtomicCompareAndSwapPtr (old_value, new_value, ptr) +#elif defined(_MSC_VER) + #define CompareAndSwapInt(ptr,old_value,new_value) InterlockedCompareExchange(ptr, new_value, old_value) + #define CompareAndSwapPointer(ptr,new_value,old_value) InterlockedCompareExchange(ptr, new_value, old_value) +#elif (__GNUC__ * 10000 + __GNUC_MINOR__ * 100 + __GNUC_PATCHLEVEL__) > 40100 + #define CompareAndSwapInt(ptr,old_value,new_value) __sync_bool_compare_and_swap(ptr, old_value, new_value) + #define CompareAndSwapPointer(ptr,new_value,old_value) __sync_val_compare_and_swap(ptr, old_value, new_value) +#else +# error No implementation +#endif + +# endif /* LF_PORTABLE_CAS_H_ */ diff --git a/lf_ring_buffer.c b/lf_ring_buffer.c new file mode 100644 index 0000000..7796206 --- /dev/null +++ b/lf_ring_buffer.c @@ -0,0 +1,125 @@ +/* + * File : lf_ring_buffer.h + * Author : Jérémy Zurcher + * Date : 05/01/010 + * License : + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + */ + +#include "lf_ring_buffer.h" +#include "lf_portable_cas.h" +#include +#include +#include + +/* initialize an empty lf_ring_buffer struct */ +lf_ring_buffer_t* lf_ring_buffer_create( int n_buf ) { + /* alloc ring_buffer struct */ + lf_ring_buffer_t *r = malloc(sizeof(lf_ring_buffer_t)); + if(r==NULL) return NULL; + /* */ + r->buffer = malloc(sizeof(lf_buffer_el_t)*n_buf); + if(r->buffer==NULL) { + free(r); + return NULL; + } + memset(r->buffer,0,sizeof(lf_buffer_el_t)*r->n_buf); + r->n_buf = n_buf; + r->read_from = -1; + r->write_to = 0; + r->write_delay=BACKOFF_DELAY_INIT; + return r; +} + +/* destroy an lf_ring_buffer strcture */ +void lf_ring_buffer_destroy( lf_ring_buffer_t *r ) { + free(r->buffer); + free(r); +} + +/* write data into the ring buffer */ +int lf_ring_buffer_write( lf_ring_buffer_t *r, rb_data_t *data, int flags ) { + int idx; + int next; + struct timespec st; + st.tv_sec=0; + /* reserve a buffer */ + for(;;){ + idx = r->write_to; + if(IS_AVAILABLE(idx) ) { + /* read_from==idx means that the buffer is full and that a writer thread which at first reserved this buffer + * hasn't had enough CPU cycles to call MARK_AS_FILLED + */ + if(!r->read_from==idx) { + next = (idx+1)%r->n_buf; + /* what might have happend between IS_AVAILABLE and now : + * - a writter has reserved this buffer => write_to has moved => CAS fails + * - a reader has consumed a buffer => read_from has moved => we've got more space + */ + if( CompareAndSwapInt( &r->write_to, idx, next ) ) break; + } + /* TODO simply reloop, nothing to do ??? */ + } else { + if(IS_NOT_BLOCKING(flags)) return -1; + st.tv_nsec=r->write_delay; + nanosleep(&st,NULL); + if(r->write_delaywrite_delay+=BACKOFF_DELAY_INC; + } + } + /* try to set read_from on idx if it has not been initialized yet + * !!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!! + * there is a really tiny chance if the ring is too small and there is a lot of writers that, + * another thread might have reserved this same buffer before this + * which means this thread and the other will write data to this current buffer !!!! + */ + if(r->read_from==-1) CompareAndSwapInt( &r->read_from, -1, idx ); + /* fill this buffer and mark it as filled */ + memcpy( r->buffer[idx].data, data, RB_DATA_SIZE*sizeof(rb_data_t) ); + MARK_AS_FILLED( idx ); + if(r->write_delay>BACKOFF_DELAY_INIT) r->write_delay-=BACKOFF_DELAY_INC; + return 0; +} + +/* read data from the ring buffer */ +int lf_ring_buffer_read( lf_ring_buffer_t *r, rb_data_t *data, int flags ) { + int idx; + struct timespec st; + st.tv_sec=0; + if(r->read_from==-1) return -1; + for(;;) { + idx = r->read_from; + if(!IS_AVAILABLE(idx)) { + memcpy( data, r->buffer[idx].data, RB_DATA_SIZE*sizeof(rb_data_t) ); + if( CompareAndSwapInt( &r->read_from, idx, (idx+1)%r->n_buf ) ) break; + } else { + if(IS_NOT_BLOCKING(flags)) return -1; + st.tv_nsec=r->read_delay; + nanosleep(&st,NULL); + if(r->read_delayread_delay+=BACKOFF_DELAY_INC; + } + } + /* finish the read process */ + MARK_AS_READ( idx ); + if(r->read_delay>BACKOFF_DELAY_INIT) r->read_delay-=BACKOFF_DELAY_INC; + return 0; +} + diff --git a/lf_ring_buffer.h b/lf_ring_buffer.h new file mode 100644 index 0000000..7c237ab --- /dev/null +++ b/lf_ring_buffer.h @@ -0,0 +1,85 @@ +/* + * File : lf_ring_buffer.h + * Author : Jérémy Zurcher + * Date : 05/01/010 + * License : + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + */ + +#ifndef _LF_RING_BUFFER_H_ +#define _LF_RING_BUFFER_H_ + +# ifdef __cplusplus +extern "C" { +# endif /* __cplusplus */ + +#define BACKOFF_DELAY_INIT 1000 +#define BACKOFF_DELAY_INC 3000 +#define BACKOFF_DELAY_MAX 90000 + +#define NO_BLOCK 1 /* if buffer is full, leave instead of try again and again */ +#define IS_NOT_BLOCKING( flags ) ( (flags)&NO_BLOCK ) + +#define RB_DATA_SIZE 63 +typedef char rb_data_t; + +typedef struct buffer_el { + char status; + rb_data_t data[RB_DATA_SIZE]; +} lf_buffer_el_t; + +typedef struct ring_buffer { + lf_buffer_el_t *buffer; /* buffer data */ + int n_buf; /* number of buffers */ + int read_from; /* index where to read data from */ + int write_to; /* index where to write data to */ + int write_delay; /* backoff nanosleep to reduce fast looping when writing */ + int read_delay; /* backoff nanosleep to reduce fast looping when reading */ +} lf_ring_buffer_t; + +#define IS_AVAILABLE( idx ) (r->buffer[(idx)].status==0) +#define MARK_AS_FILLED( idx ) { r->buffer[(idx)].status=1; } +#define MARK_AS_READ( idx ) { r->buffer[(idx)].status=0; } + +/* initialize an empty lf_ring_buffer struct */ +lf_ring_buffer_t* lf_ring_buffer_create( int n_buf ); + +/* destroy an lf_ring_buffer strcture */ +void lf_ring_buffer_destroy( lf_ring_buffer_t *r ); + +/* write data into the ring buffer + * return 0 on success + * return -1 if IS_NOT_BLOCKING and buffer is full + */ +int lf_ring_buffer_write( lf_ring_buffer_t *r, rb_data_t *data, int flags ); + +/* read data from the ring buffer + * return 0 on success + * return -1 if IS_NOT_BLOCKING and buffer is empty + */ +int lf_ring_buffer_read( lf_ring_buffer_t *r, rb_data_t *data, int flags ); + +# ifdef __cplusplus +} +# endif /* __cplusplus */ + +# endif /* _LF_RING_BUFFER_H_ */ -- cgit v1.1-2-g2b99