00001
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <stdio.h>
00037 #include <stdlib.h>
00038 #include <unistd.h>
00039 #include <string.h>
00040 #include <errno.h>
00041 #include <netdb.h>
00042 #include <sys/types.h>
00043 #include <sys/socket.h>
00044 #include <sys/time.h>
00045 #include <netinet/in.h>
00046 #include <arpa/inet.h>
00047 #include <pthread.h>
00048 #include <mini_mpi.h>
00049 #include <daemon.h>
00050 #include "mini_mpi_server.h"
00051
00052 #ifdef OQ_TESTING
00053 #define TIMEOUT 2
00054 #else
00055 #define TIMEOUT 180 // in seconds
00056 #endif
00057 #define BACKLOG 10
00058
00059
00060 struct timeval _beginning_;
00061 volatile unsigned int __first__;
00062 volatile unsigned int _proc_count_;
00063
00064 pthread_cond_t mpi_conditional;
00065 pthread_mutex_t mpi_mutex;
00066 pthread_mutex_t task_mutex;
00067
00068 typedef struct task_handle {
00069 pthread_t thread;
00070 int id;
00071 struct sockaddr_in ip_addr;
00072 int fd;
00073 short flag;
00074 struct task_handle *list;
00075 struct task_handle *next;
00076 } task_t;
00077
00078 task_t *tasks_head, *tasks_tail;
00079
00080
00081 int get_cluster_size(void)
00082 {
00083 int size = 0;
00084 pthread_mutex_lock(&mpi_mutex);
00085 size = _proc_count_;
00086 pthread_mutex_lock(&mpi_mutex);
00087 return size;
00088 }
00089
00090 int get_task_socket(int i)
00091 {
00092 task_t *t;
00093 int sock = -1;
00094
00095 pthread_mutex_lock(&mpi_mutex);
00096 t = tasks_head;
00097 while (t != NULL) {
00098 if (t->id == i) {
00099 sock = t->fd;
00100 break;
00101 }
00102 t = t->next;
00103 }
00104 pthread_mutex_unlock(&mpi_mutex);
00105 return sock;
00106 }
00107
00108 void change_task_flag(int i, short f)
00109 {
00110 task_t *t;
00111
00112 pthread_mutex_lock(&mpi_mutex);
00113 t = tasks_head;
00114 while (t != NULL) {
00115 if (t->id == i) {
00116 t->flag = f;
00117 break;
00118 }
00119 t = t->next;
00120 }
00121 pthread_mutex_unlock(&mpi_mutex);
00122 }
00123
00124
00125 int tasks_to_string(int count, task_t *head, char *string)
00126 {
00127 int i, size = 0;
00128 int string_size = (sizeof(string) / sizeof(char)) - 1;
00129 task_t *t = head;
00130
00131 for (i = 0; i < count; i++) {
00132 char tmp[65];
00133 sprintf(tmp, "::%d:%s", t->id,
00134 inet_ntoa(t->ip_addr.sin_addr));
00135 size += strlen(tmp);
00136 strncat(string, tmp, string_size);
00137 string_size -= strlen(tmp);
00138 t = t->next;
00139 }
00140 return size;
00141 }
00142
00143
00144
00145 static void reset_timeout(void)
00146 {
00147 pthread_mutex_lock(&task_mutex);
00148 __first__ = 1;
00149 pthread_mutex_unlock(&task_mutex);
00150 }
00151
00152
00153 static int timedout(void)
00154 {
00155 int value = 0;
00156 pthread_mutex_lock(&task_mutex);
00157 if (__first__) {
00158 gettimeofday(&_beginning_, NULL);
00159 __first__ = 0;
00160 } else {
00161 struct timeval now;
00162 gettimeofday(&now, NULL);
00163 if (now.tv_sec - _beginning_.tv_sec >= TIMEOUT &&
00164 now.tv_usec - _beginning_.tv_usec >= 0)
00165 value = 1;
00166 }
00167 pthread_mutex_unlock(&task_mutex);
00168 return value;
00169 }
00170
00171
00172
00173
00174
00175 void *mpi_handler(void *t) {
00176 task_t *task = (task_t*)t;
00177 unsigned short done = 0;
00178 int task_strlen;
00179
00180 pthread_mutex_lock(&mpi_mutex);
00181
00182
00183
00184 pthread_cond_signal(&mpi_conditional);
00185 pthread_mutex_unlock(&mpi_mutex);
00186
00187 task_strlen = tasks_to_string(_proc_count_, tasks_head, NULL);
00188 char msg[task_strlen + 128];
00189 char tasks[task_strlen + 1];
00190 tasks_to_string(_proc_count_, tasks_head, tasks);
00191 sprintf(msg, "%d::%d:%d::%s", MINI_MPI_HEAD,
00192 _proc_count_, task->id, tasks);
00193 send(task->fd, msg, strlen(msg), 0);
00194
00195 while (!done > 0 && !daemon_shuttingdown()) {
00196 int req_id = 0;
00197
00198
00199
00200 switch (req_id) {
00201 case MINI_MPI_INIT:
00202 pthread_mutex_lock(&task_mutex);
00203
00204 pthread_mutex_unlock(&task_mutex);
00205 break;
00206 case MINI_MPI_ABORT:
00207
00208 break;
00209 case MINI_MPI_BARRIER:
00210 pthread_mutex_lock(&task_mutex);
00211
00212 pthread_mutex_unlock(&task_mutex);
00213
00214 break;
00215 case MINI_MPI_SEND:
00216 case MINI_MPI_RECV:
00217 case MINI_MPI_SCATTER:
00218 case MINI_MPI_GATHER:
00219 pthread_mutex_lock(&task_mutex);
00220
00221 pthread_mutex_unlock(&task_mutex);
00222 break;
00223 case MINI_MPI_FINAL:
00224 done = 1;
00225 break;
00226 default:
00227 pthread_mutex_lock(&task_mutex);
00228
00229 pthread_mutex_unlock(&task_mutex);
00230 break;
00231 }
00232 }
00233 close(task->fd);
00234 return NULL;
00235 }
00236
00237
00238
00239 void *mini_mpi_server(void *arg_struct) {
00240 int argc = 0;
00241 char **argv;
00242
00243 if (arg_struct != NULL) {
00244 argc = ((struct arguments *)arg_struct)->argc;
00245 argv = ((struct arguments *)arg_struct)->argv;
00246 }
00247
00248 pthread_mutex_init(&mpi_mutex, NULL);
00249 pthread_mutex_init(&task_mutex, NULL);
00250
00251 while (!daemon_shuttingdown()) {
00252 task_t *h;
00253 int rc, listen_sd, handle_sd, yes = 1;
00254 struct sockaddr_in my_addr, their_addr;
00255 socklen_t sin_size = sizeof(struct sockaddr_in);
00256
00257 tasks_head = tasks_tail = NULL;
00258 pthread_mutex_lock(&mpi_mutex);
00259 _proc_count_ = 0;
00260 if ((listen_sd = socket(PF_INET, SOCK_STREAM, 0)) == -1) {
00261 perror("socket");
00262 exit(-1);
00263 }
00264 if (setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &yes,
00265 sizeof(int)) == -1) {
00266 perror("setsockopt");
00267 exit(-1);
00268 }
00269
00270 my_addr.sin_family = AF_INET;
00271 my_addr.sin_port = htons(MPI_PORT);
00272 my_addr.sin_addr.s_addr = INADDR_ANY;
00273 memset(&(my_addr.sin_zero), '\0', 8);
00274
00275 if (bind(listen_sd, (struct sockaddr*) &my_addr,
00276 sizeof(struct sockaddr)) == -1) {
00277 perror("bind");
00278 exit(-1);
00279 }
00280 if (listen(listen_sd, BACKLOG) == -1) {
00281 perror("listen");
00282 exit(-1);
00283 }
00284
00285 while (!timedout() || !daemon_quitting() ||
00286 !daemon_shuttingdown()) {
00287 task_t *tmp;
00288 if ((handle_sd = accept(listen_sd,
00289 (struct sockaddr*) &their_addr,
00290 &sin_size)) == -1) {
00291 perror("accept");
00292 exit(-1);
00293 }
00294
00295 tmp = tasks_tail;
00296 tasks_tail = (task_t*)malloc(sizeof(task_t));
00297 pthread_mutex_lock(&task_mutex);
00298 tasks_tail->id = _proc_count_++;
00299 pthread_mutex_unlock(&task_mutex);
00300 tasks_tail->ip_addr = their_addr;
00301 tasks_tail->flag = 0;
00302 tasks_tail->fd = handle_sd;
00303 tasks_tail->next = NULL;
00304 if (tmp != NULL)
00305 tmp->next = tasks_tail;
00306 if (tasks_head == NULL)
00307 tasks_head = tasks_tail;
00308 tasks_tail->list = tasks_head;
00309
00310 if ((rc = pthread_create(&tasks_tail->thread, NULL,
00311 mpi_handler,
00312 tasks_tail)) != 0) {
00313 fprintf(stderr,
00314 "ERROR-- MPI handler thread: %d\n",
00315 rc);
00316 exit(-1);
00317 }
00318 }
00319 pthread_mutex_unlock(&mpi_mutex);
00320
00321 h = tasks_head;
00322 while (h != NULL) {
00323 task_t *tmp = h->next;
00324 pthread_join(h->thread, NULL);
00325 free(h);
00326 h = tmp;
00327 }
00328 close(listen_sd);
00329 }
00330 return NULL;
00331 }
00332
00333
00334
00335
00336 #ifdef OQ_TESTING
00337 #include <unistd.h>
00338
00339 int mpi_test()
00340 {
00341 int error = 0;
00342 int flag = 0;
00343 MPI_Init(0, NULL);
00344 MPI_Initialized(&flag);
00345 if (flag == 0) {
00346 fprintf(stderr, "MPI initialization failed\n");
00347 error = -1;
00348 }
00349 return error;
00350 }
00351
00352
00353 int main(int argc, char *argv[])
00354 {
00355 int error, rc = 0;
00356 pthread_t mpi;
00357 struct arguments args;
00358 args.argc = argc;
00359 args.argv = argv;
00360
00361 daemon_init();
00362 reset_timeout();
00363
00364 if ((rc = pthread_create(&mpi, NULL, mini_mpi_server, NULL)) != 0) {
00365 fprintf(stderr, "ERROR-- mini MPI server thread: %d\n", rc);
00366 exit(-1);
00367 }
00368
00369 error = mpi_test();
00370
00371 daemon_shutdown();
00372 pthread_join(mpi, NULL);
00373 return error;
00374 }
00375 #endif