diff options
author | Jérémy Zurcher <jeremy@asynk.ch> | 2010-01-09 12:29:25 +0100 |
---|---|---|
committer | Jérémy Zurcher <jeremy@asynk.ch> | 2010-01-09 12:29:25 +0100 |
commit | 3584c739ff527592fb10d5f9dfe7000edbd9fdf3 (patch) | |
tree | 553edb6e455b93851e9381bf1100cbb4530b8891 /lf_ring_buffer_test.c | |
parent | b51a125c586f517761bae6eaff8ff8a08a1c1e09 (diff) | |
download | lock_free-3584c739ff527592fb10d5f9dfe7000edbd9fdf3.zip lock_free-3584c739ff527592fb10d5f9dfe7000edbd9fdf3.tar.gz |
add parallel read and write to lf_ring_buffer_test
Diffstat (limited to 'lf_ring_buffer_test.c')
-rw-r--r-- | lf_ring_buffer_test.c | 168 |
1 files changed, 35 insertions, 133 deletions
diff --git a/lf_ring_buffer_test.c b/lf_ring_buffer_test.c index 8baf232..b131487 100644 --- a/lf_ring_buffer_test.c +++ b/lf_ring_buffer_test.c @@ -78,7 +78,14 @@ void* writer_thread( void* param ) { return NULL; } -static void parallel_writes( int nt, lf_ring_buffer_t *ring, int n, int flags ) { +void* reader_thread( void* param ) { + struct thread_params *params = (struct thread_params*)param; + //report( "read ", params->n, sequential_reads( params->ring, params->n, params->flags ) ); + sequential_reads( params->ring, params->n, params->flags ); + return NULL; +} + +static void parallel_op( int op, int nt, lf_ring_buffer_t *ring, int n, int flags ) { int i; pthread_t *threads = malloc( sizeof(pthread_t)*nt); @@ -88,9 +95,26 @@ static void parallel_writes( int nt, lf_ring_buffer_t *ring, int n, int flags ) params[i].ring = ring; params[i].n = n/nt; params[i].flags = flags; - if (pthread_create( &threads[i], NULL, writer_thread, ¶ms[i])) { - fprintf(stderr,"Failed to create reader thread[%d]\n",i); - exit(1); + if(op==0) { + if (pthread_create( &threads[i], NULL, writer_thread, ¶ms[i])) { + fprintf(stderr,"Failed to create writer thread[%d]\n",i); + exit(1); + } + } else if(op==1) { + if (pthread_create( &threads[i], NULL, reader_thread, ¶ms[i])) { + fprintf(stderr,"Failed to create reader thread[%d]\n",i); + exit(1); + } + } else { + params[i].n /=2; + if (pthread_create( &threads[i], NULL, writer_thread, ¶ms[i])) { + fprintf(stderr,"Failed to create writer thread[%d]\n",i); + exit(1); + } + if (pthread_create( &threads[i], NULL, reader_thread, ¶ms[i])) { + fprintf(stderr,"Failed to create reader thread[%d]\n",i); + exit(1); + } } } for(i=0; i<nt; i++) { @@ -134,146 +158,24 @@ int main( int argc, char** argv ) { for(i=5; i<=100;i*=2) { printf("%d parallel blocking with backoff inc write operations .... \n",i); - parallel_writes( i, ring, b_len, 0 ); + parallel_op( 0, i, ring, b_len, 0 ); printf("parallel blocking read operations ...\n"); sequential_reads( ring, b_len, 0 ); } for(i=5; i<=100;i*=2) { printf("%d parallel non blocking write operations .... \n",i); - parallel_writes( i, ring, b_len, LFRB_NO_BLOCK ); + parallel_op( 0, i, ring, b_len, LFRB_NO_BLOCK ); printf("non blocking read operations ...\n"); sequential_reads( ring, b_len, LFRB_NO_BLOCK ); } - lf_ring_buffer_destroy( ring ); - - return EXIT_SUCCESS; -} - -/* -#define ARRAY_SIZE 2 -#define MAX_VALUE 0x10000 - -jack_ringbuffer_t *rb; -volatile int flowing = 0; - -static int -fill_int_array (int *array, int start, int count) -{ - int i, j = start; - for (i = 0; i < count; i++) - { - array[i] = j; - j = (j + 1) % MAX_VALUE; - } - return j; -} - -static int -cmp_array (int *array1, int *array2, int count) -{ - int i; - for (i = 0; i < count; i++) - if (array1[i] != array2[i]) - { - printf("%d != %d at offset %d\n", array1[i], array2[i], i); - fflush(stdout); - return 0; + for(i=10; i<=100;i*=2) { + printf("%d parallel blocking write and read operations .... \n",i*2); + parallel_op( 3, i, ring, b_len, 0 ); } - return 1; -} - -static void * -reader_start (void * arg) -{ - int i = 0, a[ARRAY_SIZE], b[ARRAY_SIZE]; - unsigned long j = 0, nfailures = 0; - printf("[reader started] "); - fflush(stdout); - i = fill_int_array (a, i, ARRAY_SIZE); - while (1) - { - if (j == 100 && !flowing) - { - printf("[flowing] "); - fflush(stdout); - flowing = 1; - } - - if (jack_ringbuffer_read_space (rb) >= (int) ARRAY_SIZE * (int) sizeof (int)) - { - if (jack_ringbuffer_read (rb, (char *) b, ARRAY_SIZE * sizeof (int))) - { - if (!cmp_array (a, b, ARRAY_SIZE)) - { - nfailures++; - // - //printf("failure in chunk %lu - probability: %lu/%lu = %.3f per million\n", - // j, nfailures, j, (float) nfailures / (j + 1) * 1000000); - //i = (b[0] + ARRAY_SIZE) % MAX_VALUE; - // - printf("FAILURE in chunk %lu\n", j); - fflush(stdout); - exit(1); - } - i = fill_int_array (a, i, ARRAY_SIZE); - j++; - } - } - } - - return NULL; -} - -static void * -writer_start (void * arg) -{ - int i = 0, a[ARRAY_SIZE]; - printf("[writer started] "); - fflush(stdout); - - i = fill_int_array (a, i, ARRAY_SIZE); - - while (1) - { - if (jack_ringbuffer_write_space (rb) >= (int) ARRAY_SIZE * (int) sizeof (int)) - { - if (jack_ringbuffer_write (rb, (char *) a, ARRAY_SIZE * sizeof (int))) - { - i = fill_int_array (a, i, ARRAY_SIZE); - } - } - } + lf_ring_buffer_destroy( ring ); - return NULL; + return EXIT_SUCCESS; } -int main(int argc, char *argv[]) -{ - int size; - sscanf(argv[1], "%d", &size); - printf("starting (120s max) - array/buffer size: %d/%d\n", - (int) sizeof(int) * ARRAY_SIZE, size); - fflush(stdout); - rb = jack_ringbuffer_create(size); - pthread_t reader_thread, writer_thread; - if (pthread_create (&reader_thread, NULL, reader_start, NULL)) - { - printf("Failed to create reader thread\n"); - exit(1); - } - if (pthread_create (&writer_thread, NULL, writer_start, NULL)) - { - printf("Failed to create writer thread\n"); - exit(1); - } - sleep(120); - if (flowing) - printf("SUCCESS\n"); - else - printf("FAILURE: data did not flow\n"); - fflush(stdout); - return 0; -} -*/ |