00001
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <stdio.h>
00037 #include <stdlib.h>
00038 #include <unistd.h>
00039 #include <string.h>
00040 #include <errno.h>
00041 #include <fcntl.h>
00042 #include <netdb.h>
00043 #include <sys/types.h>
00044 #include <sys/socket.h>
00045 #include <netinet/tcp.h>
00046 #include <ctype.h>
00047 #include <pthread.h>
00048
00049 #include <oq.h>
00050 #include <daemon.h>
00051 #include <configuration.h>
00052 #include "network_graph.h"
00053 #include "source_node_graph.h"
00054 #include "ksection.h"
00055 #include "mini_mpi_server.h"
00056
00057
00058 #define LOG_FILE stderr
00059 #define SIM_PORT 4955
00060 #define CFG_PORT 4799
00061
00062 #define MAXCMDSIZE 4096
00063 #define BACKLOG 10
00064
00065 pthread_mutex_t cfg_mutex;
00066
00067
00068 static int __parse_gui_cmd(char *string, Configuration *config)
00069 {
00070 int no_config = 1;
00071
00072 if (strncmp(string, "open ", 5) == 0) {
00073 char *file = string + 5;
00074 if (config->modded())
00075 config->clear();
00076 no_config = config->open(file);
00077 } else if (strncmp(string, "save ", 5) == 0) {
00078 char *file = string + 5;
00079 no_config = config->save(file);
00080 } else if (strncmp(string, "quit", 4) == 0) {
00081 daemon_quit();
00082 } else if ((strncmp(string, "halt", 4) == 0) ||
00083 (strncmp(string, "shutdown", 8) == 0)) {
00084 daemon_shutdown();
00085 } else if (strncmp(string, "<?xml", 5) == 0) {
00086 if (config->modded())
00087 config->clear();
00088 config->xml_to_cfg(string);
00089 no_config = 0;
00090 }
00091 return no_config;
00092 }
00093
00094
00095 static void __parse_gui_halt(char *string)
00096 {
00097 if (strncmp(string, "quit", 4) == 0) {
00098 daemon_quit();
00099 } else if ((strncmp(string, "halt", 4) == 0) ||
00100 (strncmp(string, "shutdown", 8) == 0)) {
00101 daemon_shutdown();
00102 }
00103 }
00104
00105
00106
00107
00108
00109 inline int cfg_is_locked(void)
00110 {
00111 if (pthread_mutex_trylock(&cfg_mutex) == EBUSY)
00112 return 1;
00113
00114 pthread_mutex_unlock(&cfg_mutex);
00115 return 0;
00116 }
00117
00118 inline void cfg_lock(void)
00119 {
00120 pthread_mutex_lock(&cfg_mutex);
00121 }
00122
00123 inline void cfg_release(void)
00124 {
00125 pthread_mutex_unlock(&cfg_mutex);
00126 }
00127
00128
00129 static void _load_config(void)
00130 {
00131 fprintf(LOG_FILE, "\t Awaiting user configuration ... \n");
00132 fflush(LOG_FILE);
00133
00134 pthread_mutex_lock(&cfg_mutex);
00135 pthread_mutex_unlock(&cfg_mutex);
00136
00137 fprintf(LOG_FILE, "\t configuration loaded.\n");
00138 fflush(LOG_FILE);
00139 }
00140
00141
00142
00143
00144
00145 void *get_configuration(void *cfg)
00146 {
00147 Configuration *config = (Configuration*)cfg;
00148 int listen_fd, handle_fd, configuring = 1;
00149 struct sockaddr_in my_addr;
00150 struct sockaddr_in their_addr;
00151 socklen_t sin_size;
00152 int yes = 1;
00153
00154 if ((listen_fd = socket(PF_INET, SOCK_STREAM, 0)) == -1) {
00155 fprintf(LOG_FILE, "socket: %s\n", strerror(errno));
00156 exit(-1);
00157 }
00158 if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR,
00159 &yes, sizeof(int)) == -1) {
00160 fprintf(LOG_FILE, "setsockopt: %s\n", strerror(errno));
00161 exit(-1);
00162 }
00163
00164 my_addr.sin_family = AF_INET;
00165 my_addr.sin_port = htons(CFG_PORT);
00166 my_addr.sin_addr.s_addr = INADDR_ANY;
00167 memset(&(my_addr.sin_zero), '\0', 8);
00168
00169 if (bind(listen_fd, (struct sockaddr *) &my_addr,
00170 sizeof(struct sockaddr)) == -1) {
00171 fprintf(LOG_FILE, "bind: %s\n", strerror(errno));
00172 exit(-1);
00173 }
00174
00175 sin_size = sizeof(struct sockaddr_in);
00176 if (listen(listen_fd, BACKLOG) == -1) {
00177 fprintf(LOG_FILE, "listen: %s\n", strerror(errno));
00178 exit(-1);
00179 }
00180
00181 if (!cfg_is_locked())
00182 cfg_lock();
00183
00184 while (!daemon_shuttingdown()) {
00185 int num_bytes, i;
00186 char rcv_buf[MAXCMDSIZE];
00187
00188 if ((handle_fd = accept(listen_fd,
00189 (struct sockaddr *) &their_addr,
00190 &sin_size)) == -1) {
00191 fprintf(LOG_FILE, "accept: %s\n", strerror(errno));
00192 exit(-1);
00193 }
00194 if (setsockopt(handle_fd, IPPROTO_TCP, TCP_NODELAY,
00195 &yes, sizeof(int)) == -1) {
00196 fprintf(LOG_FILE, "setsockopt: %s\n", strerror(errno));
00197 close(handle_fd);
00198 exit(-1);
00199 }
00200 if (setsockopt(handle_fd, SOL_SOCKET, SO_REUSEADDR,
00201 &yes, sizeof(int)) == -1) {
00202 fprintf(LOG_FILE, "setsockopt: %s\n", strerror(errno));
00203 close(handle_fd);
00204 exit(-1);
00205 }
00206
00207 while (configuring > 0) {
00208 memset(rcv_buf, '\0', MAXCMDSIZE);
00209 if ((num_bytes = recv(handle_fd, rcv_buf,
00210 MAXCMDSIZE-1, 0)) < 0)
00211 fprintf(LOG_FILE, "recv: %s\n", strerror(errno));
00212
00213 for (i = strlen(rcv_buf); i >= 0; i--) {
00214 if (! isprint((unsigned char) rcv_buf[i]))
00215 rcv_buf[i] = '\0';
00216 else
00217 break;
00218 }
00219
00220 if ((configuring =
00221 __parse_gui_cmd(rcv_buf, config)) < 0) {
00222 fprintf(LOG_FILE, "parsing command: %s\n",
00223 strerror(errno));
00224 }
00225 }
00226 close(handle_fd);
00227 cfg_release();
00228
00229 while (!daemon_quitting() || !daemon_shuttingdown()) {
00230 if ((handle_fd = accept(listen_fd,
00231 (struct sockaddr *) &their_addr,
00232 &sin_size)) == -1) {
00233 fprintf(LOG_FILE, "accept: %s\n",
00234 strerror(errno));
00235 exit(-1);
00236 }
00237 if (setsockopt(handle_fd, SOL_SOCKET, SO_REUSEADDR,
00238 &yes, sizeof(int)) == -1) {
00239 fprintf(LOG_FILE, "setsockopt: %s\n",
00240 strerror(errno));
00241 close(handle_fd);
00242 exit(-1);
00243 }
00244 fcntl(handle_fd, F_SETFL, O_NONBLOCK);
00245
00246 memset(rcv_buf, '\0', MAXCMDSIZE);
00247 if ((num_bytes = recv(handle_fd, rcv_buf,
00248 MAXCMDSIZE-1, 0)) < 0)
00249 continue;
00250
00251 for (i = strlen(rcv_buf); i >= 0; i--) {
00252 if (! isprint((unsigned char) rcv_buf[i]))
00253 rcv_buf[i] = '\0';
00254 else
00255 break;
00256 }
00257 __parse_gui_halt(rcv_buf);
00258 }
00259 }
00260 close(listen_fd);
00261 return NULL;
00262 }
00263
00264
00265
00266 void *ctl_server(void *unused)
00267 {
00268 while (!daemon_shuttingdown()) {
00269 pthread_t config_thread;
00270 int i, rc, cfg_len, numprocs = 0;
00271 char *config = NULL;
00272 Ksection *assign;
00273 Network_graph *network;
00274 Configuration active_config;
00275
00276 pthread_mutex_init(&cfg_mutex, NULL);
00277
00278 if ((rc = pthread_create(&config_thread, NULL,
00279 get_configuration,
00280 &active_config)) != 0) {
00281 fprintf(LOG_FILE,
00282 "ERROR-- config server thread: %d\n", rc);
00283 exit(-1);
00284 }
00285
00286
00287 _load_config();
00288
00289
00290 network = new Network_graph(&active_config);
00291 numprocs = get_cluster_size();
00292 if (numprocs <= 0) {
00293 daemon_shutdown();
00294 return NULL;
00295 }
00296 assign = new Ksection(network, numprocs);
00297
00298
00299 config = active_config.cfg_to_xml();
00300 cfg_len = strlen(config);
00301
00302
00303 for (i = 0; i < numprocs; i++) {
00304 int sock = get_task_socket(i);
00305 int part_size = 0;
00306 int numbytes = 0;
00307 int assign_len = (numprocs * INTEGER_STRING)
00308 + numprocs;
00309 char assignmt[assign_len];
00310 char len_str[INTEGER_STRING];
00311 Partition_Node *pn = assign->array(i)->head;
00312 memset(&assignmt, '\0', assign_len);
00313
00314 if (sock < 0)
00315 continue;
00316
00317 while (pn != NULL) {
00318 char node[INTEGER_STRING + 2];
00319 sprintf(node, "%d,", pn->id);
00320 strcat(assignmt, node);
00321 part_size++;
00322 pn = pn->next;
00323 }
00324 assignmt[strlen(assignmt)] = '\0';
00325 fprintf(LOG_FILE,
00326 " assignment %d : %s\n", i, assignmt);
00327
00328 sprintf(len_str, "%d", part_size);
00329 send(sock, len_str, strlen(len_str), 0);
00330
00331 send(sock, assignmt, strlen(assignmt), 0);
00332
00333 sprintf(len_str, "%d", cfg_len);
00334 send(sock, len_str, strlen(len_str), 0);
00335
00336 while (numbytes < cfg_len) {
00337 int sentbytes = send(sock, config + numbytes,
00338 cfg_len - numbytes, 0);
00339 numbytes += sentbytes;
00340 }
00341 }
00342
00343 while (!daemon_quitting() || !daemon_shuttingdown()) {
00345 }
00346 pthread_join(config_thread, NULL);
00347 }
00348 return NULL;
00349 }
00350
00351
00352
00353 #ifdef OQ_TESTING
00354 int get_cluster_size(void)
00355 {
00356 return 0;
00357 }
00358
00359 int get_task_socket(int unused)
00360 {
00361 return -1;
00362 }
00363
00364 int cfg_test()
00365 {
00366 int sock, error = 0;
00367 struct sockaddr_in their_addr;
00368 struct hostent *he;
00369 Configuration config;
00370 char *xml;
00371
00372 if ((error = config.open("../shared/test.cfg")) < 0) {
00373 printf(" Failed to load test configuration.\n");
00374 return error;
00375 }
00376 xml = config.cfg_to_xml();
00377
00378 if ((he = gethostbyname("localhost")) == NULL) {
00379 herror("gethostbyname");
00380 return h_errno;
00381 }
00382
00383 if ((sock = socket(PF_INET, SOCK_STREAM, 0)) == -1) {
00384 fprintf(LOG_FILE, "socket: %s\n", strerror(errno));
00385 return sock;
00386 }
00387
00388 their_addr.sin_family = AF_INET;
00389 their_addr.sin_port = htons(CFG_PORT);
00390 their_addr.sin_addr = *((struct in_addr *)he->h_addr);
00391 memset(their_addr.sin_zero, '\0', sizeof(their_addr.sin_zero));
00392
00393 sleep(4);
00394 if ((error = connect(sock, (struct sockaddr*)&their_addr,
00395 sizeof(struct sockaddr))) == -1) {
00396 fprintf(LOG_FILE, "connect: %s\n", strerror(errno));
00397 close(sock);
00398 return error;
00399 }
00400
00401 if ((error = send(sock, xml, strlen(xml), 0)) == -1) {
00402 fprintf(LOG_FILE, "send: %s\n", strerror(errno));
00403 close(sock);
00404 return error;
00405 }
00406
00407 close(sock);
00408 return 0;
00409 }
00410
00411
00412 int main(int argc, char *argv[])
00413 {
00414 int error, rc = 0;
00415 pthread_t ctl;
00416
00417 daemon_init();
00418
00419 if ((rc = pthread_create(&ctl, NULL, ctl_server, NULL)) != 0) {
00420 fprintf(LOG_FILE, "ERROR-- control server thread: %d\n", rc);
00421 exit(-1);
00422 }
00423
00424 error = cfg_test();
00425
00426 daemon_shutdown();
00427 pthread_join(ctl, NULL);
00428 return error;
00429 }
00430 #endif