summaryrefslogtreecommitdiffstats
path: root/lf_ring_buffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'lf_ring_buffer.c')
-rw-r--r--lf_ring_buffer.c75
1 files changed, 58 insertions, 17 deletions
diff --git a/lf_ring_buffer.c b/lf_ring_buffer.c
index 225ef4d..75b2f17 100644
--- a/lf_ring_buffer.c
+++ b/lf_ring_buffer.c
@@ -31,18 +31,49 @@
#include <string.h>
#include <time.h>
+#if !defined LFRB_DATA_SIZE
+ #error "LFRB_DATA_SIZE is not defined"
+#endif
+#if !defined LFRB_BUFFER_TYPE
+ #error "LFRB_BUFFER_TAPE is not defined"
+#endif
+#if !defined LFRB_BUFFER_SIZE
+ #error "LFRB_BUFFER_SIZE is not defined"
+#endif
+#if !defined LFRB_IS_AVAILABLE
+ #error "LFRB_IS_AVAILABLE is not defined"
+#endif
+#if !defined LFRB_MARK_AS_FILLED
+ #error "LFRB_MARK_AS_FILLED is not defined"
+#endif
+#if !defined LFRB_MARK_AS_READ
+ #error "LFRB_MARK_AS_READ is not defined"
+#endif
+#if !defined LFRB_DATA_PTR
+ #error "LFRB_DATA_PTR is not defined"
+#endif
+
+//#define DEBUG_LFR_RING 1
+
+#ifdef DEBUG_LFR_RING
+ #include <stdio.h>
+ #define _LOG_( ... ) fprintf(stdout,__VA_ARGS__)
+#else
+ #define _LOG_( ... )
+#endif
+
/* initialize an empty lf_ring_buffer struct */
lf_ring_buffer_t* lf_ring_buffer_create( int n_buf ) {
/* alloc ring_buffer struct */
lf_ring_buffer_t *r = malloc(sizeof(lf_ring_buffer_t));
if(r==NULL) return NULL;
/* */
- r->buffer = malloc(sizeof(lf_buffer_el_t)*n_buf);
+ r->buffer = malloc(LFRB_BUFFER_SIZE*n_buf);
if(r->buffer==NULL) {
free(r);
return NULL;
}
- memset(r->buffer,0,sizeof(lf_buffer_el_t)*r->n_buf);
+ memset(r->buffer,0,LFRB_BUFFER_SIZE*n_buf);
r->n_buf = n_buf;
r->read_from = -1;
r->write_to = 0;
@@ -58,28 +89,33 @@ void lf_ring_buffer_destroy( lf_ring_buffer_t *r ) {
}
/* write data into the ring buffer */
-int lf_ring_buffer_write( lf_ring_buffer_t *r, rb_data_t *data, int flags ) {
- int idx;
- int next;
+int lf_ring_buffer_write( lf_ring_buffer_t *r, void *data, int flags ) {
+ int idx, next;
struct timespec st;
st.tv_sec=0;
/* reserve a buffer */
for(;;){
idx = r->write_to;
- if(IS_AVAILABLE(idx) ) {
+ if(LFRB_IS_AVAILABLE( r->buffer[idx] ) ) {
/* read_from==idx means that the buffer is full and that a writer thread which at first reserved this buffer
* hasn't had enough CPU cycles to call MARK_AS_FILLED
*/
- if(!r->read_from==idx) {
- next = (idx+1)%r->n_buf;
+ if(!(r->read_from==idx)) {
+ next = idx+1;
+ if (next==r->n_buf) next=0;
/* what might have happend between IS_AVAILABLE and now :
* - a writter has reserved this buffer => write_to has moved => CAS fails
* - a reader has consumed a buffer => read_from has moved => we've got more space
*/
+ _LOG_( "write: CAS %d %d %d\n", r->write_to, idx, next );
if( CompareAndSwapInt( &r->write_to, idx, next ) ) break;
+ } else {
+ /* TODO simply reloop, nothing to do ??? */
+ // TODO ERROR when all has been read
+ _LOG_("write: buffer full %d %d\n",r->read_from,idx);
}
- /* TODO simply reloop, nothing to do ??? */
} else {
+ _LOG_("write: not available\n");
if(IS_NOT_BLOCKING(flags)) return -1;
st.tv_nsec=r->write_delay;
nanosleep(&st,NULL);
@@ -94,24 +130,29 @@ int lf_ring_buffer_write( lf_ring_buffer_t *r, rb_data_t *data, int flags ) {
*/
if(r->read_from==-1) CompareAndSwapInt( &r->read_from, -1, idx );
/* fill this buffer and mark it as filled */
- memcpy( r->buffer[idx].data, data, RB_DATA_SIZE*sizeof(rb_data_t) );
- MARK_AS_FILLED( idx );
+ memcpy( r->buffer[idx].data, data, LFRB_DATA_SIZE );
+ LFRB_MARK_AS_FILLED( r->buffer[idx] );
if(r->write_delay>BACKOFF_DELAY_INIT) r->write_delay-=BACKOFF_DELAY_INC;
return 0;
}
/* read data from the ring buffer */
-int lf_ring_buffer_read( lf_ring_buffer_t *r, rb_data_t *data, int flags ) {
- int idx;
+int lf_ring_buffer_read( lf_ring_buffer_t *r, void *data, int flags ) {
+ int idx, next;
struct timespec st;
st.tv_sec=0;
if(r->read_from==-1) return -1;
for(;;) {
idx = r->read_from;
- if(!IS_AVAILABLE(idx)) {
- memcpy( data, r->buffer[idx].data, RB_DATA_SIZE*sizeof(rb_data_t) );
- if( CompareAndSwapInt( &r->read_from, idx, (idx+1)%r->n_buf ) ) break;
+ if( !(LFRB_IS_AVAILABLE( r->buffer[idx] )) ) {
+ next = idx+1;
+ if (next==r->n_buf) next=0;
+ /* will do bad things if data dst buffer is too small !! */
+ memcpy( data, r->buffer[idx].data, LFRB_DATA_SIZE );
+ _LOG_( "write: CAS %d %d %d\n", r->read_from, idx, next );
+ if( CompareAndSwapInt( &r->read_from, idx, next ) ) break;
} else {
+ _LOG_("read: not available\n");
if(IS_NOT_BLOCKING(flags)) return -1;
st.tv_nsec=r->read_delay;
nanosleep(&st,NULL);
@@ -119,7 +160,7 @@ int lf_ring_buffer_read( lf_ring_buffer_t *r, rb_data_t *data, int flags ) {
}
}
/* finish the read process */
- MARK_AS_READ( idx );
+ LFRB_MARK_AS_READ( r->buffer[idx] );
if(r->read_delay>BACKOFF_DELAY_INIT) r->read_delay-=BACKOFF_DELAY_INC;
return 0;
}