/* based_mt_task.c -- A basic multi-threaded real-time task skeleton. * * This (by itself useless) task demos how to setup a multi-threaded LITMUS^RT * real-time task. Familiarity with the single threaded example (base_task.c) * is assumed. * * Currently, liblitmus still lacks automated support for real-time * tasks, but internaly it is thread-safe, and thus can be used together * with pthreads. */ #include #include #include #include #include /* Include gettid() */ #include /* Include threading support. */ #include /* Include the LITMUS^RT API.*/ #include "litmus.h" #include "spinlocks.h" #include "common.h" #define NS_PER_MS 1e6 #define MS_PER_S 1000 #define S_PER_MS 0.001 /* The information passed to each thread. Could be anything. */ struct thread_context { int processor; double cost; double period; double cs_length; request_type type; resource_mask_t resources; }; /* The real-time thread program. Doesn't have to be the same for * all threads. Here, we only have one that will invoke job(). */ void* rt_thread(void *tcontext); /* Declare the periodically invoked job. * Returns 1 -> task should exit. * 0 -> task should continue. */ static int job(struct thread_context *ctx, double program_end); double duration; rwrnlp rw_lock; /* Catch errors. */ #define CALL( exp ) do { \ int ret; \ ret = exp; \ if (ret != 0) \ fprintf(stderr, "%s failed: %m\n", #exp);\ else \ fprintf(stderr, "%s ok.\n", #exp); \ } while (0) /* * returns the character that made processing stop, newline or EOF */ static int skip_to_next_line(FILE *fstream) { int ch; for (ch = fgetc(fstream); ch != EOF && ch != '\n'; ch = fgetc(fstream)); return ch; } static void skip_comments(FILE *fstream) { int ch; for (ch = fgetc(fstream); ch == '#'; ch = fgetc(fstream)) skip_to_next_line(fstream); ungetc(ch, fstream); } struct thread_context* parse_csv(const char *file, int *num_tasks) { FILE *fstream; int cur_task, tmp; char ch; struct thread_context *ctx; *num_tasks = 0; fstream = fopen(file, "r"); if (!fstream){ fprintf(stderr,"could not open execution time file\n"); exit(EXIT_FAILURE); } /* figure out the number of tasks */ do { skip_comments(fstream); ch = skip_to_next_line(fstream); if (ch != EOF) ++(*num_tasks); } while (ch != EOF); if (-1 == fseek(fstream, 0L, SEEK_SET)){ fprintf(stderr, "rewinding file failed"); exit(EXIT_FAILURE); } /* allocate space for exec times */ ctx = malloc(*num_tasks * sizeof(struct thread_context)); if (!ctx){ fprintf(stderr, "couldn't allocate ctx memory!\n"); } for (cur_task = 0; cur_task < *num_tasks && !feof(fstream); ++cur_task) { skip_comments(fstream); if (1 != fscanf(fstream, "%d",&ctx[cur_task].processor)){ fprintf(stderr, "invalid processor near line %d\n", cur_task); exit(EXIT_FAILURE); } if (1 != fscanf(fstream, "%lf",&ctx[cur_task].cost)){ fprintf(stderr, "invalid cost near line %d\n", cur_task); exit(EXIT_FAILURE); } if (1 != fscanf(fstream, "%lf",&ctx[cur_task].period)){ fprintf(stderr, "invalid period near line %d\n", cur_task); exit(EXIT_FAILURE); } if (1 != fscanf(fstream, "%lf",&ctx[cur_task].cs_length)){ fprintf(stderr, "invalid cs_length near line %d\n", cur_task); exit(EXIT_FAILURE); } if (1 != fscanf(fstream, "%d",&tmp)){ fprintf(stderr, "invalid type near line %d\n", cur_task); exit(EXIT_FAILURE); } if(tmp == 0){ ctx[cur_task].type = read_req; } else { ctx[cur_task].type = write_req; } if (1 != fscanf(fstream, "%lu",&ctx[cur_task].resources)){ fprintf(stderr, "invalid resource mask near line %d\n", cur_task); exit(EXIT_FAILURE); } skip_to_next_line(fstream); } fclose(fstream); return ctx; } /* Basic setup is the same as in the single-threaded example. However, * we do some thread initiliazation first before invoking the job. */ int main(int argc, char** argv) { int i; int num_tasks; struct thread_context *ctx; pthread_t *task; if(argc < 2){ fprintf(stderr, "Usage: rwrnlp input.csv [length]\n"); exit(EXIT_FAILURE); } if(argc == 3){ duration = atoi(argv[2]); } else { duration = 10; } ctx = parse_csv(argv[1], &num_tasks); task = calloc(num_tasks, sizeof(pthread_t)); rwrnlp_init(&rw_lock); init_litmus(); for (i = 0; i < num_tasks; i++) { pthread_create(task + i, NULL, rt_thread, (void *) (ctx + i)); } for (i = 0; i < num_tasks; i++){ pthread_join(task[i], NULL); } return 0; } /* A real-time thread is very similar to the main function of a single-threaded * real-time app. Notice, that init_rt_thread() is called to initialized per-thread * data structures of the LITMUS^RT user space libary. */ void* rt_thread(void *tcontext) { struct thread_context *ctx = (struct thread_context *) tcontext; struct rt_task param; double start; /* Set up task parameters */ memset(¶m, 0, sizeof(param)); param.exec_cost = ctx->cost * NS_PER_MS; param.period = ctx->period * NS_PER_MS; param.relative_deadline = ctx->period * NS_PER_MS; /* What to do in the case of budget overruns? */ param.budget_policy = NO_ENFORCEMENT; /* The task class parameter is ignored by most plugins. */ param.cls = RT_CLASS_HARD; CALL( init_rt_thread() ); param.cpu = ctx->processor; be_migrate_to(ctx->processor); CALL( set_rt_task_param(gettid(), ¶m) ); fprintf(stderr, "Thread %d on processor %d\n", gettid(), ctx->processor); /***** * 2) Transition to real-time mode. */ CALL( task_mode(LITMUS_RT_TASK) ); /* The task is now executing as a real-time task if the call didn't fail. */ start = wctime(); while(job(ctx, start + duration)); CALL( task_mode(BACKGROUND_TASK) ); return NULL; } #define NUMS 4096 static int num[NUMS]; static int loop_once(void) { int i, j = 0; for (i = 0; i < NUMS; i++) j += num[i]++; return j; } static int loop_for(double exec_time, double emergency_exit) { double last_loop = 0, loop_start; int tmp = 0; double start = cputime(); double now = cputime(); while (now + last_loop < start + exec_time) { loop_start = now; tmp += loop_once(); now = cputime(); last_loop = now - loop_start; if (emergency_exit && wctime() > emergency_exit) { /* Oops --- this should only be possible if the execution time tracking * is broken in the LITMUS^RT kernel. */ fprintf(stderr, "!!! rtspin/%d emergency exit!\n", gettid()); fprintf(stderr, "Something is seriously wrong! Do not ignore this.\n"); break; } } return tmp; } static int job(struct thread_context *ctx, double program_end) { double ncs_length; long lock_overhead, unlock_overhead; if (wctime() > program_end){ printf("Terminating...\n"); return 0; } else { ncs_length = (ctx->cost-ctx->cs_length)/2*S_PER_MS; loop_for(ncs_length, program_end + 1); if(ctx->type == read_req){ //printf("%d:%d read locking...\n", __sync_fetch_and_add(&events,1), gettid()); lock_overhead = rwrnlp_read_lock(&rw_lock, ctx->resources, ctx->processor); //printf("%d:%d read CS...\n", __sync_fetch_and_add(&events, 1), gettid()); loop_for(ctx->cs_length*S_PER_MS, program_end + 1); //printf("%d:%d read unlocking...\n", __sync_fetch_and_add(&events,1), gettid()); unlock_overhead = rwrnlp_read_unlock(&rw_lock, ctx->processor); //printf("%d:%d ncs...\n", __sync_fetch_and_add(&events,1), gettid()); printf("read lock overhead: %ld\n", lock_overhead); printf("read unlock overhead: %ld\n", unlock_overhead); }else{ //printf("%d:%d write locking %lu\n", __sync_fetch_and_add(&events,1), gettid(), ctx->resources); lock_overhead = rwrnlp_write_lock(&rw_lock, ctx->resources, ctx->processor); //printf("%d:%d write CS...\n", __sync_fetch_and_add(&events,1), gettid()); loop_for(ctx->cs_length*S_PER_MS, program_end + 1); //printf("%d:%d write unlocking...\n", __sync_fetch_and_add(&events,1), gettid()); unlock_overhead = rwrnlp_write_unlock(&rw_lock, ctx->processor); //printf("%d:%d ncs...\n", __sync_fetch_and_add(&events,1), gettid()); printf("write lock overhead: %ld\n", lock_overhead); printf("write unlock overhead: %ld\n", unlock_overhead); } loop_for(ncs_length, program_end + 1); sleep_next_period(); return 1; } }