Copyright (C) 1993 by Sun Microsystems, Inc. All rights reserved. This file is a product of SunSoft, Inc. and is provided for unrestricted use provided that this legend is included on all media and as a part of the software program in whole or part. Users may copy, modify or distribute this file at will. THIS FILE IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE. This file is provided with no support and without any obligation on the part of SunSoft, Inc. to assist in its use, correction, modification or enhancement. SUNSOFT AND SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY THIS FILE OR ANY PART THEREOF. IN NO EVENT WILL SUNSOFT OR SUN MICROSYSTEMS, INC. BE LIABLE FOR ANY LOST REVENUE OR PROFITS OR OTHER SPECIAL, INDIRECT AND CONSEQUENTIAL DAMAGES, EVEN IF THEY HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. SunSoft, Inc. 2550 Garcia Avenue Mountain View, California 94043 Jan 13 11:35 1994 barrier.c Page 1 /* barrier.c * * Copyright (C) 1993 by Sun Microsystems, Inc. * All rights reserved. * * Program to show how to coordinate a whole group of threads. * The main() thread has a chunk of work to do, in this * case an array of floats to operate upon. Each thread * is given a unique index into the array, and a unique * operation. Threads print their results and wait on the * of work, and then call barrier_wait() to synchronize. */ #include #include #include #include #include #include #define TRUE 1 #define FALSE 0 #define NUM_THREADS 16 typedef struct { int init_count; int current_count; mutex_t b_lock; cond_t b_cond; char state; } barrier_t; typedef struct { barrier_t *barrier; int index; void *the_work; } worksheet_t; /* barrier_init() - initialize the barrier; foo parameter for compatibility * with proposed UI interface */ int barrier_init(barrier_t *brr, const int count, const int type, void *foo) { int ret; ret = mutex_init(&brr->b_lock, type, 0); ret |= cond_init(&brr->b_cond, type, 0); brr->init_count = count; brr->current_count = brr->init_count; brr->state = 0; /* arbitrary */ return(ret); } Jan 13 11:35 1994 barrier.c Page 2 /* barrier_wait() - wait on the barrier */ int barrier_wait(barrier_t *brr) { int current_state; int ret; ret = mutex_lock(&brr->b_lock); current_state = brr->state; if (--brr->current_count == 0) { brr->state = !current_state; brr->current_count = brr->init_count; ret |= cond_broadcast(&brr->b_cond); } else while (brr->state == current_state) ret |= cond_wait(&brr->b_cond, &brr->b_lock); ret |= mutex_unlock(&brr->b_lock); return (ret); } /* barrier_destroy() - destroy state associated with the barrier */ int barrier_destroy(barrier_t *brr) { int ret; ret = mutex_destroy(&brr->b_lock); ret |= cond_destroy(&brr->b_cond); return(ret); } /* work_func1() - do a lot-o-square root'n on my index in the work_chunk */ int work_func1(const int *index, const void *work_chunk) { extern double sqrt(); double *wc; double result; int i; wc = (double *)(work_chunk); for (i = 0; i < 400000; i++) result = sqrt(*(wc+*index)); printf("index: %d\tvalue:%f\tsqrt:%f\n", *index, *(wc+*index), result); return(0); } /* work_func2() - do a lot-o-cube root'n on my index in the work_chunk */ int Jan 13 11:35 1994 barrier.c Page 3 work_func2(const int *index, const void *work_chunk) { extern double cbrt(); double *wc; double result, source; int i; wc = (double *)(work_chunk); for (i = 0; i < 400000; i++) result = cbrt(*(wc+*index)); printf("index: %d\tvalue:%f\tcbrt:%f\n", *index, *(wc+*index), result); return(0); } /* worker() - execute the functions and synchronize with the others */ void * worker(void *foo) { worksheet_t *my_work; int i; my_work = (worksheet_t *)foo; /* for clarity */ work_func1(&my_work->index, my_work->the_work); barrier_wait(my_work->barrier); work_func2(&my_work->index, my_work->the_work); barrier_wait(my_work->barrier); printf("thread %d shutting down...\n", thr_self()); thr_exit(0); } /* * create a bunch of threads and give them work to do. Each thread's worksheet_t * contains the index of which member of bunch_o_nums to work on. Synchronize * with the group via barrier_wait(); */ main() { thread_t new_thread; worksheet_t *instructions, /* description of overall task */ *this_wksh; barrier_t *our_barrier; /* barrier for this group */ double *bunch_o_nums, *this_num; int ret, i; long num_cpus; struct timeval my_time1; struct timeval my_time2; int sec_diff; Jan 13 11:35 1994 barrier.c Page 4 /* constructor */ gettimeofday(&my_time1); our_barrier = (barrier_t *)malloc(sizeof(barrier_t)); bunch_o_nums = (double *)malloc(sizeof(double) * NUM_THREADS); instructions = (worksheet_t *)malloc(sizeof(worksheet_t) * NUM_THREADS); if (!(our_barrier && bunch_o_nums && instructions)) { perror("malloc"); exit(-1); } /* this process is CPU bound */ num_cpus = sysconf(_SC_NPROCESSORS_ONLN); printf("\n\nthere are %d cpus online...\n\n", num_cpus); thr_setconcurrency((int)num_cpus); if (ret = barrier_init(our_barrier, (NUM_THREADS + 1), USYNC_THREAD, (void *)NULL)) { perror("barrier"); exit(-1); } /* initialize the work and the worksheets */ for (i = 0, this_num = bunch_o_nums, this_wksh = instructions; i < NUM_THREADS; i++, this_wksh++, this_num++) { *this_num = (double)i; this_wksh->barrier = our_barrier; this_wksh->the_work = bunch_o_nums; this_wksh->index = i; thr_create(NULL, NULL, worker, this_wksh, THR_DETACHED, &new_thread); } printf("the threads are doing square roots now...\n"); barrier_wait(our_barrier); printf("the threads are doing cube roots now...\n"); barrier_wait(our_barrier); printf("the threads are shutting down...\n"); /* give some time for thread printf()'s to flush */ sleep(1); /* destructor */ gettimeofday(&my_time2); sec_diff = (int)my_time2.tv_sec - (int)my_time1.tv_sec; printf("\nelapsed seconds: %d\n", (sec_diff-1)); /* for the sleep */ free(our_barrier); free(bunch_o_nums); free(instructions); printf("main exiting\n"); } Jan 13 11:32 1994 down_counter.c Page 1 /* Copyright 1993, SunSoft Inc. */ /* down_counter.c : methods for using down counter */ #include "my_syncs.h" /* * A down counter * has a group and a single coordinator, * and is initialized to the number of members in the group. * * Group members call the "group wait" * function, dc_gwait() and the coordinator calls the coordinator * wait function, dc_cwait(). When all members of the group have called * dc_gwait(), control is returned to the coordinator. The coordinator * can release the group and wait for the next synchroniztion point * by 'advancing' the group, via dc_advance(), at which point all group * members are known to be waiting. The coordinator can also "disband" * the group by releasing the group and telling them not to wait * at the next synchonization point via dc_disband(). The coordinator * wakes up from dc_disband() once all group members have called into * dc_gwait() again. * * * .... sleeping * ---- running * * "coordinator" thread * *|---dc_init()---thr_create()---dc_cwait()...|->dc_advance().............|-> * | ^ ^ * | | | | * | return control | return control * | | | | * V | | | * create group | | | * | start group | * | | | * group thread1 | V | * |----->dc_gwait().....................|---->dc_gwait() * ^ * thread2 | * |--------------->dc_gwait()...........|---->dc_gwait() * * threadn * |----->dc_gwait().....................|---->dc_gwait() * * ***************************************************************************** * * * .... sleeping * ---- running * * "coordinator" thread * Jan 13 11:32 1994 down_counter.c Page 2 *.....|->dc_advance()...............|->dc_disband()..............|---------> * | ^ * | | * "group thread1" V | * |----->dc_gwait().............|---->dc_gwait()---------> * * "group thread2" * |----->dc_gwait().............|---->dc_gwait()---------> * * "group threadn" * |----->dc_gwait().............|---->dc_gwait()---------> * * */ /* dc_init() - initialize down_counter and return with lock */ int dc_init(dcount_t *dc, const int flag, const int count, const int cstate) { int ret; ret = mutex_init(&dc->dc_lock, flag, 0); ret |= cond_init(&dc->group_cond, flag, 0); ret |= cond_init(&dc->coord_cond, flag, 0); ret |= mutex_lock(&dc->dc_lock); dc->init_count = count; dc->current_count = count; dc->group_state = WSTATE1; /* arbitrary */ dc->coord_state = cstate; return(ret); } /* dc_destroy() - destroy all state associated with down_counter */ int dc_destroy(dcount_t *dc) { int ret; ret = mutex_destroy(&dc->dc_lock); ret |= cond_destroy(&dc->group_cond); ret |= cond_destroy(&dc->coord_cond); } /* * dc_gwait() - down_counter group wait - decrement down counter * and wait for broadcast from coordinator. Send signal to coordinator * when group count is zero. */ int dc_gwait(dcount_t *dc) { int ret, current_work_state; Jan 13 11:32 1994 down_counter.c Page 3 ret = mutex_lock(&dc->dc_lock); dc->current_count--; if (!dc->current_count) { dc->coord_state = NOT_WAITING; ret |= cond_signal(&dc->coord_cond); } current_work_state = dc->group_state; if (current_work_state != DISBAND) { while (dc->group_state == current_work_state) ret |= cond_wait(&dc->group_cond, &dc->dc_lock); if (!ret) if(dc->group_state == DISBAND) ret = DISBAND; } else if (!ret) ret = DISBAND; mutex_unlock(&dc->dc_lock); return(ret); } /* * dc_checkin() - same as dc_gwait() except just don't wait */ int dc_checkin(dcount_t *dc) { int ret, current_work_state; ret = mutex_lock(&dc->dc_lock); dc->current_count--; if (!dc->current_count) { dc->coord_state = NOT_WAITING; ret |= cond_signal(&dc->coord_cond); } mutex_unlock(&dc->dc_lock); return(ret); } /* dc_cwait() - down_counter coordinator wait on already working group */ dc_cwait(dcount_t *dc) { while (dc->coord_state == WAITING) cond_wait(&dc->coord_cond, &dc->dc_lock); } /* * dc_advance() - Issue broadcast and wait for entire group to call dc_gwait() * *NOTE* - it is assumed that this is only called by a thread which has * previously called dc_init() and therefore already has the counter's * mutex lock! * Jan 13 11:32 1994 down_counter.c Page 4 */ void dc_advance(dcount_t *dc) { if (dc->group_state == WSTATE1) dc->group_state = WSTATE2; else dc->group_state = WSTATE1; dc->current_count = dc->init_count; cond_broadcast(&dc->group_cond); dc->coord_state = WAITING; dc_cwait(dc); } /* dc_disband - broadcast 'go' and wait for entire group to call dc_gwait(). * When group members' call dc_gwait() again they will immediately return * with DC_DISBAND value. * *NOTE* - it is assumed that this is only called by a thread which has * previously called dc_init() and therefore already has the counter's * mutex lock! */ void dc_disband(dcount_t *dc) { dc->group_state = DISBAND; dc->current_count = dc->init_count; cond_broadcast(&dc->group_cond); dc->coord_state = WAITING; dc_cwait(dc); } Jan 13 11:36 1994 example1.c Page 1 #define NUM_THREADS 16 #include "my_syncs.h" #include #include /* * * Copyright (C) 1993 by Sun Microsystems, Inc. * All rights reserved. * * create a bunch of threads from main() and give them work to do. * Each thread's worksheet_t * contains the index of which one of the bunch_o_nums to work on. Give workers * different functions between synchronizations. */ main() { thread_t new_thread; worksheet_t *instructions, *this_wksh; double *bunch_o_nums, *this_num; int pos; dcount_t *group_dc; /* down counter for this group */ long num_cpus; struct timeval my_time1; struct timeval my_time2; int sec_diff; extern void work_func1(); extern void work_func2(); extern void work_func3(); extern void *worker1(); /* constructor */ gettimeofday(&my_time1); group_dc = (dcount_t *)(malloc(sizeof(dcount_t))); bunch_o_nums = (double *)malloc(sizeof(double) * NUM_THREADS); instructions = (worksheet_t *)malloc(sizeof(worksheet_t) * NUM_THREADS); if (!(group_dc && bunch_o_nums && instructions)) { perror("malloc"); exit(-1); } /* for now assume process is CPU bound operation */ num_cpus = sysconf(_SC_NPROCESSORS_ONLN); printf("\n\nthere are %d cpus online...\n", num_cpus); thr_setconcurrency((int)num_cpus); /* the dc_init() routine will return with the counter lock held. * The worker threads main() will create immediately start * executing, and will call dc_gwait(). * Initialize coordinator state to WAITING so that main() * will get a signal when they all are created and in the "while()" Jan 13 11:36 1994 example1.c Page 2 * loop. Main() won't miss the signal since nobody else gets * the counter's mutex until main releases it, via dc_cwait(). */ dc_init(group_dc, USYNC_THREAD, NUM_THREADS, WAITING); /* initialize bunch_o_nums, give the workers unique index, * a pointer to group's work, and first work function */ for (pos = 0, this_num = bunch_o_nums, this_wksh = instructions; pos < NUM_THREADS; pos++, this_wksh++, this_num++) { /* work assignment proc */ *this_num = (double)pos; this_wksh->counter = group_dc; this_wksh->input = bunch_o_nums; this_wksh->index = pos; this_wksh->my_func = work_func1; /* 'your resource * intensive task here' */ if (thr_create(NULL, NULL, worker1, this_wksh, THR_DETACHED|THR_SUSPENDED, &new_thread)) printf("can't create thread!!\n"); this_wksh->my_threadid = new_thread; thr_continue(new_thread); } /* wait for all threads to get created and enter inner loop */ dc_cwait(group_dc); /* workers now all waiting for 'go' */ printf("\nGO FORTH AND CALCULATE SQUARE ROOTS!\n"); dc_advance(group_dc); /* all workers now have completed one pass and are waiting */ gettimeofday(&my_time2); sec_diff = (int)my_time2.tv_sec - (int)my_time1.tv_sec; printf("\nelapsed time so far: %d secs\n\n", sec_diff); for (pos = 0, this_num = bunch_o_nums, this_wksh = instructions; pos < NUM_THREADS; pos++, this_wksh++) { /* this is where you do load balancing based on past * performance and anticipated demands of the * work_func, etc. */ this_wksh->my_func = work_func2; /* mix and match! */ } printf("\nGO FORTH AND CALCULATE CUBE ROOTS!\n"); dc_advance(group_dc); gettimeofday(&my_time2); sec_diff = (int)my_time2.tv_sec - (int)my_time1.tv_sec; Jan 13 11:36 1994 example1.c Page 3 printf("\nelapsed time so far: %d secs\n\n", sec_diff); for (pos = 0, this_num = bunch_o_nums, this_wksh = instructions; pos < NUM_THREADS; pos++, this_wksh++) { this_wksh->my_func = work_func3; } /* break out workers from loop */ dc_disband(group_dc); /* threads all completed loop and shutting down on their own now */ free(bunch_o_nums); free(instructions); free(group_dc); gettimeofday(&my_time2); sec_diff = (int)my_time2.tv_sec - (int)my_time1.tv_sec; /* wait for thread printf()'s to flush */ sleep(1); printf("\ntotal elapsed seconds: %d\n", sec_diff); printf("main exiting\n"); } Jan 13 11:36 1994 example2.c Page 1 /* Copyright 1993 SunSoft Inc. */ #define NUM_WORKERS 4 #define NUM_MGRS 2 #include "my_syncs.h" #include #include /* * * Copyright (C) 1993 by Sun Microsystems, Inc. * All rights reserved. * * create a bunch of threads and give them work to do. Manage the threads * with managers for each different type operation. Each thread's worksheet_t * contains the index of which one of the bunch_o_nums to work on. Give workers * different functions between synchronizations. */ main() { thread_t new_thread; manager_work_t *instructions, *this_mgr_wksh; double *bunch_o_nums, *this_num; int pos; dcount_t *group_dc; /* down counter for this group */ long num_cpus; struct timeval my_time1; struct timeval my_time2; int sec_diff; extern void *manager_func1(); extern void work_func1(); extern void work_func2(); extern void work_func3(); extern void work_func4(); /* constructor */ gettimeofday(&my_time1); group_dc = (dcount_t *)(malloc(sizeof(dcount_t))); bunch_o_nums = (double *)malloc(sizeof(double) * NUM_WORKERS); instructions = (manager_work_t *)malloc(sizeof(manager_work_t) * NUM_MGRS); if (!(group_dc && bunch_o_nums && instructions)) { perror("malloc"); exit(-1); } /* for now assume process is CPU bound operation */ num_cpus = sysconf(_SC_NPROCESSORS_ONLN); printf("\n\nthere are %d cpus online...\n", num_cpus); thr_setconcurrency((int)num_cpus); /* the dc_init() routine will return with the counter lock held. Jan 13 11:36 1994 example2.c Page 2 * The worker threads main() will create immediately start * executing, and will call dc_gwait(). * Initialize coordinator state to WAITING so that main() * will get a signal when they all are created and in the "while()" * loop. Main() won't miss the signal since nobody else gets * the counter's mutex until main releases it, via dc_cwait(). */ dc_init(group_dc, USYNC_THREAD, NUM_MGRS, WAITING); /* initialize bunch_o_nums, give the workers unique index, * a pointer to group's work, and first work function */ /* work assignment proc */ for (pos = 0, this_num = bunch_o_nums; pos < NUM_WORKERS; this_num++, pos++) *this_num = (double)pos; this_mgr_wksh = instructions; for(pos = 0, this_mgr_wksh = instructions; pos < NUM_MGRS; pos++, this_mgr_wksh++) { this_mgr_wksh->num_children = NUM_WORKERS; this_mgr_wksh->input = bunch_o_nums; this_mgr_wksh->counter = group_dc; if (thr_create(NULL, NULL, manager_func1, (void *)this_mgr_wksh, THR_DETACHED|THR_SUSPENDED, &new_thread)) printf("can't create thread!!\n"); this_mgr_wksh->my_threadid = new_thread; thr_continue(new_thread); } dc_cwait(group_dc); printf("main: managers all ready\n"); /* manager AND workers now all waiting for 'go' */ this_mgr_wksh = instructions; this_mgr_wksh->child_func = work_func1; this_mgr_wksh++; this_mgr_wksh->child_func = work_func2; printf("main: issuing 1st 'go' to managers\n\n"); dc_advance(group_dc); printf("main: updating 2nd 'go' to managers\n\n"); this_mgr_wksh = instructions; this_mgr_wksh->child_func = work_func3; this_mgr_wksh++; this_mgr_wksh->child_func = work_func4; printf("main: issuing 2nd 'go' to managers\n\n"); dc_advance(group_dc); printf("main: disbanding heirarchy\n\n"); Jan 13 11:36 1994 example2.c Page 3 dc_disband(group_dc); /* allow printf() to flush */ sleep(1); printf("main destroying counter state\n"); dc_destroy(group_dc); gettimeofday(&my_time2); sec_diff = (int)my_time2.tv_sec - (int)my_time1.tv_sec; printf("\nelapsed time so far: %d secs\n\n", sec_diff); sec_diff = (int)my_time2.tv_sec - (int)my_time1.tv_sec; /* wait for thread printf()'s to flush */ sleep(1); printf("\ntotal elapsed seconds: %d\n", sec_diff); printf("main exiting\n"); } Jan 13 11:36 1994 example3.c Page 1 /* Copyright 1993 SunSoft Inc. */ #define NUM_WORKERS 4 #define NUM_MGRS 2 #include "my_syncs.h" #include #include /* * * Copyright (C) 1993 by Sun Microsystems, Inc. * All rights reserved. * * Connect working groups together with memory recirculation systems. */ main() { thread_t new_thread; manager_work_t *instructions, *this_mgr_wksh; double *bunch_o_nums, *this_num; int pos; dcount_t *group_dc; /* down counter for this group */ long num_cpus; struct timeval my_time1; struct timeval my_time2; int sec_diff; rcsystem_t *team_rc; /* recirculation system for effort */ extern void *manager_func1(); extern void work_func3(); extern void work_func4(); extern void work_func5(); extern void work_func6(); extern void work_func7(); extern void *printf_thread(); /* constructor */ gettimeofday(&my_time1); group_dc = (dcount_t *)(malloc(sizeof(dcount_t))); team_rc = (rcsystem_t *)(malloc(sizeof(rcsystem_t))); instructions = (manager_work_t *)malloc(sizeof(manager_work_t) * NUM_MGRS); if (!(group_dc && instructions && team_rc)) { perror("malloc"); exit(-1); } /* for now assume process is CPU bound operation */ num_cpus = sysconf(_SC_NPROCESSORS_ONLN); printf("\n\nthere are %d cpus online...\n", num_cpus); thr_setconcurrency((int)num_cpus); Jan 13 11:36 1994 example3.c Page 2 /* the dc_init() routine will return with the counter lock held. * The worker threads main() will create immediately start * executing, and will call dc_gwait(). * Initialize coordinator state to WAITING so that main() * will get a signal when they all are created and in the "while()" * loop. Main() won't miss the signal since nobody else gets * the counter's mutex until main releases it, via dc_cwait(). */ dc_init(group_dc, USYNC_THREAD, NUM_MGRS, WAITING); rcsystem_init(team_rc, (NUM_MGRS*NUM_WORKERS), sizeof(results_buf_t), work_func5, USYNC_THREAD); /* fire up printf_thread */ printf("main: creating printf() service thread...\n"); thr_create(NULL, NULL, printf_thread, (void *)team_rc, THR_BOUND, &new_thread); this_mgr_wksh = instructions; printf("main: creating manager threads...\n"); for(pos = 0, this_mgr_wksh = instructions; pos < NUM_MGRS; pos++, this_mgr_wksh++) { this_mgr_wksh->num_children = NUM_WORKERS; /* this is where 'recirculator' comes from */ this_mgr_wksh->input = team_rc; this_mgr_wksh->output = team_rc; this_mgr_wksh->counter = group_dc; if (thr_create(NULL, NULL, manager_func1, (void *)this_mgr_wksh, THR_DETACHED|THR_SUSPENDED, &new_thread)) printf("can't create thread!!\n"); this_mgr_wksh->my_threadid = new_thread; thr_continue(new_thread); } dc_cwait(group_dc); printf("main: managers all ready\n"); /* manager AND workers now all waiting for 'go' */ this_mgr_wksh = instructions; this_mgr_wksh->child_func = work_func6; this_mgr_wksh++; this_mgr_wksh->child_func = work_func7; printf("main: issuing 1st 'go' to managers\n\n"); dc_advance(group_dc); this_mgr_wksh = instructions; this_mgr_wksh->child_func = work_func3; this_mgr_wksh++; Jan 13 11:36 1994 example3.c Page 3 this_mgr_wksh->child_func = work_func4; printf("main: disbanding heirarchy\n\n\n"); dc_disband(group_dc); printf("main destroying counter state\n"); dc_destroy(group_dc); gettimeofday(&my_time2); sec_diff = (int)my_time2.tv_sec - (int)my_time1.tv_sec; printf("\nelapsed time so far: %d secs\n\n", sec_diff); sec_diff = (int)my_time2.tv_sec - (int)my_time1.tv_sec; /* wait for thread printf()'s to flush */ sleep(2); destroy_rcsystem(team_rc); free(team_rc); free(group_dc); printf("\ntotal elapsed seconds: %d\n", sec_diff); printf("main exiting\n"); } Jan 13 11:36 1994 rcsystem.c Page 1 #include "my_syncs.h" /* rcsystem.c - recirculation system routines. * * * Copyright (C) 1993 by Sun Microsystems, Inc. * All rights reserved. * * A "recirculation" system consists of a an integer stack and queue, * which coordinate access to a chunk of memory by index tokens. * An rcsystem is initialized with count number of entries and a chunk size; the * queue and stack will contain count entries, and the central memory will * be count times chunk_size bytes. * * Initially, typically, threads "check out" an index into the * central memory area, perform their operation, and add their * index to the queue. Another thread drains the queue, (typically * an inherently MT-unsafe thread, like X-windows related items) * performs its operations with the results, and checks the index token * back into the stack. * * Additionally, a "work assignment function" is called on check-out * to assign unique pieces of work to calling threads. Threads using this * technique are said to be "self-scheduling" if they check out new work * assignments as they complete current task. */ /* rcsystem_init() - initialize a recirculating system to contain * num_elements of mcsize memory chunk size of certain type */ int rcsystem_init(rcsystem_t *rcsyst, int num_elements, int mcsize, void (*asmnt_func)(), int type) { int ret; rcsyst->chunk_size = mcsize; rcsyst->asmnt_func = asmnt_func; if ((rcsyst->stack = (int_stack_t *)(malloc(sizeof(int_stack_t)))) == 0) return(-1); if ((rcsyst->queue = (int_q_t *)(malloc(sizeof(int_q_t)))) == 0) return(-1); /* align to double for buffers */ if ((rcsyst->mem = (char *)memalign(sizeof(double), num_elements * mcsize)) == 0) return(-1); ret = init_istack(rcsyst->stack, num_elements, type); ret |= init_iq(rcsyst->queue, num_elements, type); return(ret); } /* destroy_rcsystem() - free memory and destroy associated state of recirculating * system */ Jan 13 11:36 1994 rcsystem.c Page 2 int destroy_rcsystem(rcsystem_t *rcsyst) { int ret; if (!free(rcsyst->mem)) return(-1); ret = destroy_istack(rcsyst->stack); ret |= destroy_iq(rcsyst->queue); free(rcsyst->stack); free(rcsyst->queue); rcsyst->chunk_size = 0; } /* rcsys_checkout() - get a unique index off free stack and call work assignment * function. Return index of area stuffed with work_todo. */ int rcsys_checkout(rcsystem_t *rcsyst) { int new_index; char *dest; new_index = pop_istack(rcsyst->stack); dest = rcsys_memoffset(rcsyst, new_index); /* have assignment func place new work into dest location */ (*rcsyst->asmnt_func)(dest); return(new_index); } Jan 13 11:36 1994 stack-n-queue.c Page 1 #include "my_syncs.h" /* stack-n-queue.c - integer stack and queue routines * * Copyright (C) 1993 by Sun Microsystems, Inc. * All rights reserved. * */ int init_iq(int_q_t *buf_q, int num_e, int type) { int ret; buf_q->num_elements = num_e; if (((buf_q->the_q = (int *)(malloc(sizeof(int)*num_e)))) == 0) return(-1); buf_q->front = buf_q->end = 0; ret = mutex_init(&buf_q->q_lock, type, 0); ret |= cond_init(&buf_q->q_cond, type, 0); buf_q->q_state = QUEUE_EMPTY; } int init_istack(int_stack_t *buf_stk, int num_e, const int type) { int ret, i; buf_stk->num_elements = num_e; if ((( buf_stk->free_slots = (int *)(malloc(sizeof(int)*num_e)))) == 0) return(-1); for (i=0; ifree_slots[i]=i; buf_stk->top = num_e - 1; buf_stk->buff_state = BUFF_AVAIL; mutex_init(&buf_stk->stack_lock, type, 0); cond_init(&buf_stk->stack_cond, type, 0); } int destroy_iq(int_q_t *buf_q) { int ret; if (!free(buf_q->the_q)) return(-1); buf_q->num_elements = 0; ret = mutex_destroy(&buf_q->q_lock); ret |= cond_destroy(&buf_q->q_cond); buf_q->q_state = QUEUE_EMPTY; } int destroy_istack(int_stack_t *buf_stk) { Jan 13 11:36 1994 stack-n-queue.c Page 2 int ret; if (!free(buf_stk->free_slots)) return(-1); buf_stk->num_elements = 0; ret = mutex_destroy(&buf_stk->stack_lock); ret |= cond_destroy(&buf_stk->stack_cond); return(ret); } void add2iq(int_q_t *buf_q, int buf_num) { mutex_lock(&buf_q->q_lock); if(buf_q->q_state == QUEUE_EMPTY) { buf_q->q_state = QUEUE_NOT_EMPTY; cond_broadcast(&buf_q->q_cond); } else { if ((buf_q->end != (buf_q->num_elements - 1))) buf_q->end++; else buf_q->end = 0; } buf_q->the_q[buf_q->end] = buf_num; mutex_unlock(&buf_q->q_lock); } int drain_iq(int_q_t *buf_q) { int ret; mutex_lock(&buf_q->q_lock); while(buf_q->q_state == QUEUE_EMPTY) cond_wait(&buf_q->q_cond, &buf_q->q_lock); ret = buf_q->the_q[buf_q->front]; if (buf_q->front != buf_q->end) { if (buf_q->front != (buf_q->num_elements - 1)) buf_q->front++; else buf_q->front = 0; } else buf_q->q_state = QUEUE_EMPTY; mutex_unlock(&buf_q->q_lock); return(ret); } int pop_istack(int_stack_t *buf_stk) { int buf_num; mutex_lock(&buf_stk->stack_lock); if(buf_stk->top < 0) { while (buf_stk->buff_state == BUFF_UNAVAIL) cond_wait(&buf_stk->stack_cond, &buf_stk->stack_lock); } buf_num = buf_stk->free_slots[buf_stk->top]; Jan 13 11:36 1994 stack-n-queue.c Page 3 --buf_stk->top; if (buf_stk->top < 0) buf_stk->buff_state = BUFF_UNAVAIL; mutex_unlock(&buf_stk->stack_lock); return(buf_num); } void push_istack(int_stack_t *istk, const int buf_num) { mutex_lock(&istk->stack_lock); ++istk->top; istk->free_slots[istk->top] = buf_num; if (istk->top == 0) { /* build in statistics gathering? XXX */ istk->buff_state = BUFF_AVAIL; cond_broadcast(&istk->stack_cond); } mutex_unlock(&istk->stack_lock); } Jan 13 11:36 1994 th_sig.c Page 1 /* Copyright 1993 SunSoft Inc. */ /* th_sig.c - demo program to show thread-specific signal * handling. Main() creates two threads, one of which outputs * '.' characters, and the other which outputs '~' characters. * A generic signal handler is established, which fetches a * thread-specific function pointer for the "real" handler. * This is useful for inter-thread handling of asynchronous * events. Also of interest is the behavior of this program on * a uni versus a multiprocessor machine. * * Copyright (C) 1993 by Sun Microsystems, Inc. * All rights reserved. * */ #include #include #include #include #include thread_key_t sig_handler_key; sigset_t n; struct sigaction my_sigaction; void generic_sig_handler() { void (*my_handler)(); write(1, "generic handler\n", 16); if (thr_getspecific(sig_handler_key, (void **) &my_handler)) { perror("generic_sig_handler:"); exit(1); } (*my_handler)(); } void thr_sig_handler1() { struct sigaction tmp; write(1, "sig_handler1\n", 13); /* clean up - release all locks before exiting!!*/ thr_exit(0); } void thr_sig_handler2() { write(1, "sig_handler2\n", 13); } Jan 13 11:36 1994 th_sig.c Page 2 void * thread_type1(void *a) { sigset_t n; static int i = 0; struct sigaction my_sigaction; write(1, "thread1 starting\n\n", 18); (void) sigemptyset(&n); (void) sigaddset(&n, SIGINT); /* block signal until handler installed */ thr_setspecific(sig_handler_key, (void *) thr_sig_handler1); thr_sigsetmask(SIG_UNBLOCK, &n, 0); { int i; for(i = 0; ; i++) { if (!(i%360)) write(1,".",1); if (!(i % (360*75))) write(1,"\n",1); } } } void * thread_type2(void *a) { sigset_t n; static int i = 0; struct sigaction my_sigaction; write(1, "thread2 starting\n\n", 18); (void) sigemptyset(&n); (void) sigaddset(&n, SIGINT); thr_setspecific(sig_handler_key, (void *) thr_sig_handler2); thr_sigsetmask(SIG_UNBLOCK, &n, 0); { int i; for(i = 0; ; i++) { if (!(i%360)) write(1,"~",1); if (!(i % (360*75))) write(1,"\n", 1); } } } main() { thread_t thread1, thread2; int ret; Jan 13 11:36 1994 th_sig.c Page 3 ret = thr_keycreate(&sig_handler_key, NULL); (void) sigemptyset(&n); (void) sigaddset(&n, SIGINT); my_sigaction.sa_handler = generic_sig_handler; my_sigaction.sa_flags = 0; sigaction(SIGINT, &my_sigaction, NULL); thr_sigsetmask(SIG_BLOCK, &n, 0); write(1, "main: starting\n\n", 16); thr_setconcurrency(3); /* main, thread1, and thread2 */ write(1, "creating thread 1\n\n", 18); thr_create(NULL, NULL, thread_type1, NULL, THR_BOUND, &thread1); write(1, "creating thread 2\n", 17); thr_create(NULL, NULL, thread_type2, NULL, THR_BOUND, &thread2); /* note - SIGALRM gets deliverd to the right thread! */ sleep(5); write(1, "signalling thread 1\n", 19); thr_kill(thread1, SIGINT); sleep(5); write(1, "signalling thread 2\n", 19); thr_kill(thread2, SIGINT); sleep(3); write(1, "signalling thread 2\n", 19); thr_kill(thread2, SIGINT); sleep(1); exit(0); } Jan 13 11:37 1994 worker.c Page 1 #include "my_syncs.h" /* generic manager and worker functions. * * Copyright (C) 1993 by Sun Microsystems, Inc. * All rights reserved. * */ /* * worker1() - generic synchronized worker */ void * worker1(void *foo) { worksheet_t *my_work; int ret; my_work = (worksheet_t *)foo; /* for clarity */ my_work->my_threadid = thr_self(); while (dc_gwait(my_work->counter) == DC_CONTINUE) { if (my_work->my_func) (*my_work->my_func)(&my_work->index, my_work->input, my_work->output); } dc_checkin(my_work->counter); printf("thread %d exiting...\n", thr_self()); thr_exit(0); } /* manager_func1() - generic middle manager. Create num_children * workers with unique indexes and same work function. Pass along * same work function and pointer to overall work to all children. * Update all children and advance at every advancement this thread. */ void * manager_func1(void *foo) { manager_work_t *my_group_job; int i; worksheet_t *instructions, *this_wksh; thread_t new_thread; int ret; my_group_job = (manager_work_t *)(foo); /* for clarity */ instructions = (worksheet_t *)malloc(sizeof(worksheet_t) * my_group_job->num_children); dc_init(&my_group_job->child_dc, USYNC_THREAD, my_group_job->num_children, WAITING); printf("manager thread %d creating team...\n", thr_self()); Jan 13 11:37 1994 worker.c Page 2 for (i = 0, this_wksh = instructions; i < my_group_job->num_children; i++, this_wksh++) { this_wksh->counter = &my_group_job->child_dc; this_wksh->index = i; this_wksh->my_func = my_group_job->child_func; this_wksh->input = my_group_job->input; this_wksh->output = my_group_job->output; if (thr_create(NULL, NULL, worker1, this_wksh, THR_DETACHED|THR_SUSPENDED, &new_thread)) printf("can't create thread!!\n"); this_wksh->my_threadid = new_thread; thr_continue(new_thread); } /* wait for all threads to get created and enter inner loop */ dc_cwait(&my_group_job->child_dc); printf("manager: created workers; entering loop\n"); while ((ret = dc_gwait(my_group_job->counter)) == DC_CONTINUE) { if (my_group_job->child_func && my_group_job->input) { for (i = 0, this_wksh = instructions; i < my_group_job->num_children; i++, this_wksh++){ this_wksh->my_func = my_group_job->child_func; this_wksh->input = my_group_job->input; } } printf("manager thread %d starting team!\n\n", thr_self()); dc_advance(&my_group_job->child_dc); printf("manager: team done...wait for new team function...\n"); } printf("manager disbanding team...\n"); dc_disband(&my_group_job->child_dc); printf("manager destroying counter state..\n"); dc_destroy(&my_group_job->child_dc); free(instructions); dc_checkin(my_group_job->counter); } /* work_func1() - do a lot-o-square root'n on my index in the input * let libc (printf) deal with serialization */ void work_func1(const int *index1, const void *input, void *output) { extern double sqrt(); double *wc; double result; int i; wc = (double *)(input); /* could use register and/or make copies, but why not exercise the memory for fun? */ for (i = 0; i < 400000; i++) { result = sqrt(*(wc+*index1)); Jan 13 11:37 1994 worker.c Page 3 } printf("index: %d\tvalue:%f\tsqrt:%f\n", *index1, *(wc+*index1), result); } /* work_func2() - do a lot-o-cube root'n on my index in the input * let libc (printf) deal with serialization */ void work_func2(const int *index2, const void *input, void *output) { extern double cbrt(); double *wc; register double result, source; register int i; wc = (double *)(input); source = (*(wc+*index2)); for (i = 0; i < 400000; i++) result = cbrt(source); printf("\t\tindex: %d\tvalue:%f\tcbrt:%f\n", *index2, *(wc+*index2), result); } /* work_func3() */ void work_func3(const int *index3, const void *input, void *output) { printf("thread %d says: 'hello from work_func3'\n", thr_self()); } /* work_func4() */ void work_func4(const int *index3, const void *input, void *output) { printf("thread %d says: 'hello from work_func4'\n", thr_self()); } /* work_func5() - simplistic work assignment function: place unique double * in dest */ void work_func5(char *dest) { static double start_val = 0; results_buf_t *ddest; ddest = (results_buf_t *)dest; ddest->value = ++start_val; } /* work_func6() - get work and post sqrt results to Jan 13 11:37 1994 worker.c Page 4 * recirculation system * */ void work_func6(const int *index1, const void *input, void *output) { rcsystem_t *rcsysin; rcsystem_t *rcsysout; results_buf_t *buff; /* my unique working memory */ int i, my_index; double *my_work, result; extern double sqrt(); rcsysin = (rcsystem_t *)(input); /* output could be a queue or another rcsystem 'downstream' */ rcsysout = (rcsystem_t *)(output); /* get unique piece of memory with worked stuffed. */ my_index = rcsys_checkout(rcsysin); buff = (results_buf_t *)rcsys_memoffset(rcsysin, my_index); /* use same memory for getting/posting work */ for (i = 0; i < 400000; i++) { result = sqrt(buff->value); } /* place my results into the buffer */ buff->thread_num = thr_self(); buff->results = result; buff->op = SQUAREROOT; /* check in the buffer */ rcsys_add2q(rcsysout, my_index); } /* work_func7() - get work from and post cbrt results to * recirculation system * */ void work_func7(const int *index1, const void *input, void *output) { rcsystem_t *rcsysin; rcsystem_t *rcsysout; results_buf_t *buff; /* my unique working memory */ int i, my_index; double *my_work, result; extern double cbrt(); rcsysin = (rcsystem_t *)(input); rcsysout = (rcsystem_t *)(output); Jan 13 11:37 1994 worker.c Page 5 my_index = rcsys_checkout(rcsysin); buff = (results_buf_t *)rcsys_memoffset(rcsysin, my_index); for (i = 0; i < 400000; i++) { result = cbrt(buff->value); } buff->thread_num = thr_self(); buff->results = result; buff->op = CUBEROOT; rcsys_add2q(rcsysout, my_index); } void * printf_thread(void *foo) { results_buf_t *buff; int buffnum_to_print; rcsystem_t *rcsys; rcsys = (rcsystem_t *)(foo); while(1) { buffnum_to_print = rcsys_drainq(rcsys); buff = (results_buf_t *)rcsys_memoffset(rcsys, buffnum_to_print); if (buff->op == SQUAREROOT) printf("printf: buffer %d\tvalue: %f\tsqrt: %f\tsrc thread: %d\n", buffnum_to_print, buff->value, buff->results, buff->thread_num); else printf("\tprintf: buffer %d\tvalue: %f\tcbrt: %f\tsrc thread: %d\n", buffnum_to_print, buff->value, buff->results, buff->thread_num); rcsys_checkin(rcsys, buffnum_to_print); } } Jan 13 11:32 1994 Makefile Page 1 test: test.c cc -g -xsb -o test -I/usr/openwin/include -L/usr/openwin/lib -lX11 -lXt -lXol test.c example1: down_counter.o example1.o worker.o rcsystem.o cc -g -xsb -o example1 down_counter.o worker.o example1.o rcsystem.o -lthread -lm example2: down_counter.o example2.o worker.o rcsystem.o cc -g -xsb -o example2 down_counter.o worker.o rcsystem.o example2.o -lthread -lm example3: down_counter.o rcsystem.o example3.o worker.o cc -dalign -g -xsb -o example3 down_counter.o worker.o rcsystem.o example3.o stack-n-queue.o -lthread -lm worker.o down_counter.o example1.o: my_syncs.h clean: rm *.o rcsystem.o: rcsystem.c stack-n-queue.o cc -g -xsb -c rcsystem.c down_counter.o: down_counter.c cc -g -xsb -c down_counter.c worker.o: worker.c cc -g -xsb -c worker.c example1.o: example1.c cc -g -xsb -c example1.c example2.o: example2.c cc -g -xsb -c example2.c stack-n-queue.o: cc -g -xsb -c stack-n-queue.c example3.o: example3.c cc -dalign -g -xsb -c example3.c test5.o: test5.c cc -g -xsb -c test5.c -I$(OPENWINHOME)/include newmtg_ui.o: newmtg_ui.c newmtg_stubs.c newmtg_ui.h cc -g -xsb -c newmtg_ui.c -I$(OPENWINHOME)/include newmtg_stubs.o: newmtg_stubs.c cc -DMAIN -g -xsb -c newmtg_stubs.c -I$(OPENWINHOME)/include mtgui: newmtg_ui.o newmtg_stubs.o test5.o cc -g -xsb -DMAIN -o mtgui newmtg_ui.o newmtg_stubs.o test5.o \ -I$(OPENWINHOME)/include -L$(OPENWINHOME)/lib \ -lX11 -lxview -lsocket -lolgx th_sig: th_sig.c cc -g -xsb -o th_sig th_sig.c -lthread