N-sim
Emulation and simulation of
Wireless Sensor Networks



   Home


   Project Page


   Download


   CVS



   Installation


   Configuration


   Plug-ins




 Hosted by
SourceForge.net Logo

/home/brennan/n-sim/OrbisQuartus/control/mini_mpi_server.c

Go to the documentation of this file.
00001 
00014 /*
00015  * Copyright 2007. Los Alamos National Security, LLC. This material
00016  * was produced under U.S. Government contract DE-AC52-06NA25396 for
00017  * Los Alamos National Laboratory (LANL), which is operated by Los
00018  * Alamos National Security, LLC, for the Department of Energy. The
00019  * U.S. Government has rights to use, reproduce, and distribute this
00020  * software. NEITHER THE GOVERNMENT NOR LOS ALAMOS NATIONAL SECURITY,
00021  * LLC, MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES ANY LEGAL
00022  * LIABILITY FOR THE USE OF THIS SOFTWARE. If software is modified to
00023  * produce derivative works, such modified software should be clearly
00024  * marked, so as not to confuse it with the version available from LANL.
00025  *
00026  * Additionally, this program is free software; you can redistribute
00027  * it and/or modify it under the terms of the GNU General Public
00028  * License as published by the Free Software Foundation; version 2 of
00029  * the License. Accordingly, this program is distributed in the hope
00030  * it will be useful, but WITHOUT ANY WARRANTY; without even the
00031  * implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
00032  * PURPOSE. See the GNU General Public License for more details.
00033  */
00034 
00035 
00036 #include <stdio.h>
00037 #include <stdlib.h>
00038 #include <unistd.h>
00039 #include <string.h>
00040 #include <errno.h>
00041 #include <netdb.h>
00042 #include <sys/types.h>
00043 #include <sys/socket.h>
00044 #include <sys/time.h>
00045 #include <netinet/in.h>
00046 #include <arpa/inet.h>
00047 #include <pthread.h>
00048 #include <mini_mpi.h>
00049 #include <daemon.h>
00050 #include "mini_mpi_server.h"
00051 
00052 #ifdef OQ_TESTING
00053 #define TIMEOUT  2
00054 #else
00055 #define TIMEOUT  180  // in seconds
00056 #endif
00057 #define BACKLOG  10
00058 
00059 
00060 struct timeval _beginning_;
00061 volatile unsigned int __first__;
00062 volatile unsigned int _proc_count_;
00063 
00064 pthread_cond_t mpi_conditional;
00065 pthread_mutex_t mpi_mutex;
00066 pthread_mutex_t task_mutex;
00067 
00068 typedef struct task_handle {
00069         pthread_t thread;
00070         int id;
00071         struct sockaddr_in ip_addr;
00072         int fd;
00073         short flag;
00074         struct task_handle *list;
00075         struct task_handle *next;
00076 } task_t;
00077 
00078 task_t *tasks_head, *tasks_tail;
00079 
00080 
00081 int get_cluster_size(void)
00082 {
00083         int size = 0;
00084         pthread_mutex_lock(&mpi_mutex);
00085         size = _proc_count_;
00086         pthread_mutex_lock(&mpi_mutex);
00087         return size;
00088 }
00089 
00090 int get_task_socket(int i)
00091 {
00092         task_t *t;
00093         int sock = -1;
00094 
00095         pthread_mutex_lock(&mpi_mutex);
00096         t = tasks_head;
00097         while (t != NULL) {
00098                 if (t->id == i) {
00099                         sock = t->fd;
00100                         break;
00101                 }
00102                 t = t->next;
00103         }
00104         pthread_mutex_unlock(&mpi_mutex);
00105         return sock;
00106 }
00107 
00108 void change_task_flag(int i, short f)
00109 {
00110         task_t *t;
00111 
00112         pthread_mutex_lock(&mpi_mutex);
00113         t = tasks_head;
00114         while (t != NULL) {
00115                 if (t->id == i) {
00116                         t->flag = f;
00117                         break;
00118                 }
00119                 t = t->next;
00120         }
00121         pthread_mutex_unlock(&mpi_mutex);
00122 }
00123 
00124 
00125 int tasks_to_string(int count, task_t *head, char *string)
00126 {
00127         int i, size = 0;
00128         int string_size = (sizeof(string) / sizeof(char)) - 1;
00129         task_t *t = head;
00130 
00131         for (i = 0; i < count; i++) {
00132                 char tmp[65];
00133                 sprintf(tmp, "::%d:%s", t->id, 
00134                         inet_ntoa(t->ip_addr.sin_addr));
00135                 size += strlen(tmp);
00136                 strncat(string, tmp, string_size);
00137                 string_size -= strlen(tmp);
00138                 t = t->next;
00139         }
00140         return size;
00141 }
00142 
00143 
00144 
00145 static void reset_timeout(void)
00146 {
00147         pthread_mutex_lock(&task_mutex);
00148         __first__ = 1;
00149         pthread_mutex_unlock(&task_mutex);
00150 }
00151 
00152 
00153 static int timedout(void)
00154 {
00155         int value = 0;
00156         pthread_mutex_lock(&task_mutex);
00157         if (__first__) {
00158                 gettimeofday(&_beginning_, NULL);
00159                 __first__ = 0;
00160         } else {
00161                 struct timeval now;
00162                 gettimeofday(&now, NULL);
00163                 if (now.tv_sec - _beginning_.tv_sec >= TIMEOUT &&
00164                     now.tv_usec - _beginning_.tv_usec >= 0)
00165                         value = 1;
00166         }
00167         pthread_mutex_unlock(&task_mutex);
00168         return value;
00169 }
00170 
00171 
00172 
00173 
00174 
00175 void *mpi_handler(void *t) {
00176         task_t *task = (task_t*)t;
00177         unsigned short done = 0;
00178         int task_strlen;
00179 
00180         pthread_mutex_lock(&mpi_mutex);
00181         // check for MINI_MPI_INIT request
00182         // 
00183         // wait for timeout
00184         pthread_cond_signal(&mpi_conditional);
00185         pthread_mutex_unlock(&mpi_mutex);
00186 
00187         task_strlen = tasks_to_string(_proc_count_, tasks_head, NULL);
00188         char msg[task_strlen + 128];
00189         char tasks[task_strlen + 1];
00190         tasks_to_string(_proc_count_, tasks_head, tasks);
00191         sprintf(msg, "%d::%d:%d::%s", MINI_MPI_HEAD, 
00192                 _proc_count_, task->id, tasks);
00193         send(task->fd, msg, strlen(msg), 0);
00194 
00195         while (!done > 0 && !daemon_shuttingdown()) {
00196                         int req_id = 0;
00197 
00198                         // recv requests:
00199 
00200                         switch (req_id) {
00201                         case MINI_MPI_INIT:
00202                                 pthread_mutex_lock(&task_mutex);
00203                                 // log error
00204                                 pthread_mutex_unlock(&task_mutex);
00205                                 break;
00206                         case MINI_MPI_ABORT:
00207                                 // broadcast MINI_MPI_ABORT
00208                                 break;
00209                         case MINI_MPI_BARRIER:
00210                                 pthread_mutex_lock(&task_mutex);
00211                                 // wait for all                         
00212                                 pthread_mutex_unlock(&task_mutex);
00213                                 // broadcast MINI_MPI_RESUME
00214                                 break;
00215                         case MINI_MPI_SEND:
00216                         case MINI_MPI_RECV:
00217                         case MINI_MPI_SCATTER:
00218                         case MINI_MPI_GATHER:
00219                                 pthread_mutex_lock(&task_mutex);
00220                                 // log msg to file
00221                                 pthread_mutex_unlock(&task_mutex);
00222                                 break;
00223                         case MINI_MPI_FINAL:
00224                                 done = 1;
00225                                 break;
00226                         default:
00227                                 pthread_mutex_lock(&task_mutex);
00228                                 // log error
00229                                 pthread_mutex_unlock(&task_mutex);
00230                                 break;
00231                         }
00232         }
00233         close(task->fd);
00234         return NULL;
00235 }
00236 
00237 
00238 
00239 void *mini_mpi_server(void *arg_struct) {
00240         int argc = 0;
00241         char **argv;
00242 
00243         if (arg_struct != NULL) {
00244                 argc = ((struct arguments *)arg_struct)->argc;
00245                 argv = ((struct arguments *)arg_struct)->argv;
00246         }
00247 
00248         pthread_mutex_init(&mpi_mutex, NULL);
00249         pthread_mutex_init(&task_mutex, NULL);
00250 
00251         while (!daemon_shuttingdown()) {
00252                 task_t *h;
00253                 int rc, listen_sd, handle_sd, yes = 1;
00254                 struct sockaddr_in my_addr, their_addr;
00255                 socklen_t sin_size = sizeof(struct sockaddr_in);
00256 
00257                 tasks_head = tasks_tail = NULL;
00258                 pthread_mutex_lock(&mpi_mutex);
00259                 _proc_count_ = 0;
00260                 if ((listen_sd = socket(PF_INET, SOCK_STREAM, 0)) == -1) {
00261                         perror("socket");
00262                         exit(-1);
00263                 }
00264                 if (setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &yes, 
00265                                sizeof(int)) == -1) {
00266                         perror("setsockopt");
00267                         exit(-1);
00268                 }
00269 
00270                 my_addr.sin_family = AF_INET;
00271                 my_addr.sin_port = htons(MPI_PORT);
00272                 my_addr.sin_addr.s_addr = INADDR_ANY;
00273                 memset(&(my_addr.sin_zero), '\0', 8);
00274 
00275                 if (bind(listen_sd, (struct sockaddr*) &my_addr, 
00276                          sizeof(struct sockaddr)) == -1) {
00277                         perror("bind");
00278                         exit(-1);
00279                 }
00280                 if (listen(listen_sd, BACKLOG) == -1) {
00281                         perror("listen");
00282                         exit(-1);
00283                 }
00284 
00285                 while (!timedout() || !daemon_quitting() || 
00286                        !daemon_shuttingdown()) {
00287                         task_t *tmp;
00288                         if ((handle_sd = accept(listen_sd, 
00289                                                 (struct sockaddr*) &their_addr,
00290                                                 &sin_size)) == -1) {
00291                                 perror("accept");
00292                                 exit(-1);
00293                         }
00294 
00295                         tmp = tasks_tail;
00296                         tasks_tail = (task_t*)malloc(sizeof(task_t));
00297                         pthread_mutex_lock(&task_mutex);
00298                         tasks_tail->id = _proc_count_++;
00299                         pthread_mutex_unlock(&task_mutex);
00300                         tasks_tail->ip_addr = their_addr;
00301                         tasks_tail->flag = 0;
00302                         tasks_tail->fd = handle_sd;
00303                         tasks_tail->next = NULL;
00304                         if (tmp != NULL)
00305                                 tmp->next = tasks_tail;
00306                         if (tasks_head == NULL)
00307                                 tasks_head = tasks_tail;
00308                         tasks_tail->list = tasks_head;
00309 
00310                         if ((rc = pthread_create(&tasks_tail->thread, NULL, 
00311                                                  mpi_handler,
00312                                                  tasks_tail)) != 0) {
00313                                 fprintf(stderr, 
00314                                         "ERROR-- MPI handler thread: %d\n", 
00315                                         rc);
00316                                 exit(-1);
00317                         }
00318                 }
00319                 pthread_mutex_unlock(&mpi_mutex);
00320 
00321                 h = tasks_head;
00322                 while (h != NULL) {
00323                         task_t *tmp = h->next;
00324                         pthread_join(h->thread, NULL);
00325                         free(h);
00326                         h = tmp;
00327                 }
00328                 close(listen_sd);
00329         }
00330         return NULL;
00331 }
00332 
00333 
00334 
00335 
00336 #ifdef OQ_TESTING
00337 #include <unistd.h>
00338 
00339 int mpi_test()
00340 {
00341         int error = 0;
00342         int flag = 0;
00343         MPI_Init(0, NULL);
00344         MPI_Initialized(&flag);
00345         if (flag == 0) {
00346                 fprintf(stderr, "MPI initialization failed\n");
00347                 error = -1;
00348         }
00349         return error;
00350 }
00351 
00352 
00353 int main(int argc, char *argv[])
00354 {
00355         int error, rc = 0;
00356         pthread_t mpi;
00357         struct arguments args;
00358         args.argc = argc;
00359         args.argv = argv;
00360 
00361         daemon_init();
00362         reset_timeout();
00363 
00364         if ((rc = pthread_create(&mpi, NULL, mini_mpi_server, NULL)) != 0) {
00365                 fprintf(stderr, "ERROR-- mini MPI server thread: %d\n", rc);
00366                 exit(-1);
00367         }
00368 
00369         error = mpi_test();
00370 
00371         daemon_shutdown();
00372         pthread_join(mpi, NULL);
00373         return error;
00374 }
00375 #endif


© 2007, Los Alamos National Security, LLC.