summaryrefslogtreecommitdiffstats
path: root/lf_ring_buffer.c
blob: 225ef4de6685ef6f0ca95b79a274decee5c01be9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/*
 * File     : lf_ring_buffer.c
 * Author   : Jérémy Zurcher  <jeremy@asynk.ch>
 * Date     : 05/01/010
 * License  :
 *
 *  Permission is hereby granted, free of charge, to any person obtaining
 *  a copy of this software and associated documentation files (the
 *  "Software"), to deal in the Software without restriction, including
 *  without limitation the rights to use, copy, modify, merge, publish,
 *  distribute, sublicense, and/or sell copies of the Software, and to
 *  permit persons to whom the Software is furnished to do so, subject to
 *  the following conditions:
 *  
 *  The above copyright notice and this permission notice shall be
 *  included in all copies or substantial portions of the Software.
 *  
 *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 *  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 *  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 *  NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
 *  LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
 *  OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 *  WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 *
 */

#include "lf_ring_buffer.h"
#include "lf_portable_cas.h"
#include <stdlib.h>
#include <string.h>
#include <time.h>

/* initialize an empty lf_ring_buffer struct */
lf_ring_buffer_t* lf_ring_buffer_create( int n_buf ) {
    /* alloc ring_buffer struct */
    lf_ring_buffer_t *r = malloc(sizeof(lf_ring_buffer_t));
    if(r==NULL) return NULL;
    /* */
    r->buffer = malloc(sizeof(lf_buffer_el_t)*n_buf);
    if(r->buffer==NULL) {
        free(r);
        return NULL;
    }
    memset(r->buffer,0,sizeof(lf_buffer_el_t)*r->n_buf);
    r->n_buf = n_buf;
    r->read_from = -1;
    r->write_to = 0;
    r->write_delay=BACKOFF_DELAY_INIT;
    r->read_delay=BACKOFF_DELAY_INIT;
    return r;
}

/* destroy an lf_ring_buffer strcture */
void lf_ring_buffer_destroy( lf_ring_buffer_t *r ) {
    free(r->buffer);
    free(r);
}

/* write data into the ring buffer */
int lf_ring_buffer_write( lf_ring_buffer_t *r, rb_data_t *data, int flags ) {
    int idx;
    int next;
    struct timespec st;
    st.tv_sec=0;
    /* reserve a buffer */
    for(;;){
        idx = r->write_to;
        if(IS_AVAILABLE(idx) ) {
            /* read_from==idx means that the buffer is full and that a writer thread which at first reserved this buffer
             * hasn't had enough CPU cycles to call MARK_AS_FILLED
             */
            if(!r->read_from==idx) {
                next = (idx+1)%r->n_buf;
                /* what might have happend between IS_AVAILABLE and now :
                * - a writter has reserved this buffer => write_to has moved => CAS fails
                * - a reader has consumed a buffer => read_from has moved => we've got more space
                */
                if( CompareAndSwapInt( &r->write_to, idx, next ) ) break;
            }
            /* TODO simply reloop, nothing to do ??? */
        } else {
            if(IS_NOT_BLOCKING(flags)) return -1;
            st.tv_nsec=r->write_delay;
            nanosleep(&st,NULL);
            if(r->write_delay<BACKOFF_DELAY_MAX) r->write_delay+=BACKOFF_DELAY_INC;
        }
    }
    /* try to set read_from on idx if it has not been initialized yet
     * !!!!!!!!!!!!!! WARNING !!!!!!!!!!!!!!!!!
     * there is a really tiny chance if the ring is too small and there is a lot of writers that,
     * another thread might have reserved this same buffer before this
     * which means this thread and the other will write data to this current buffer !!!!
     */
    if(r->read_from==-1) CompareAndSwapInt( &r->read_from, -1, idx );
    /* fill this buffer and mark it as filled */
    memcpy( r->buffer[idx].data, data, RB_DATA_SIZE*sizeof(rb_data_t) );
    MARK_AS_FILLED( idx );
    if(r->write_delay>BACKOFF_DELAY_INIT) r->write_delay-=BACKOFF_DELAY_INC;
    return 0;
}

/* read data from the ring buffer */
int lf_ring_buffer_read( lf_ring_buffer_t *r, rb_data_t *data, int flags ) {
    int idx;
    struct timespec st;
    st.tv_sec=0;
    if(r->read_from==-1) return -1;
    for(;;) {
        idx = r->read_from;
        if(!IS_AVAILABLE(idx)) {
            memcpy( data, r->buffer[idx].data, RB_DATA_SIZE*sizeof(rb_data_t) );
            if( CompareAndSwapInt( &r->read_from, idx, (idx+1)%r->n_buf ) ) break;
        } else { 
            if(IS_NOT_BLOCKING(flags)) return -1;
            st.tv_nsec=r->read_delay;
            nanosleep(&st,NULL);
            if(r->read_delay<BACKOFF_DELAY_MAX) r->read_delay+=BACKOFF_DELAY_INC;
        }
    }
    /* finish the read process */
    MARK_AS_READ( idx );
    if(r->read_delay>BACKOFF_DELAY_INIT) r->read_delay-=BACKOFF_DELAY_INC;
    return 0;
}