00001 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 #include <stdio.h>
00037 #include <stdlib.h>
00038 #include <unistd.h>
00039 #include <string.h>
00040 #include <errno.h>
00041 #include <fcntl.h>
00042 #include <netdb.h>
00043 #include <sys/types.h>
00044 #include <sys/socket.h>
00045 #include <netinet/tcp.h>
00046 #include <ctype.h>
00047 #include <pthread.h>
00048 
00049 #include <oq.h>
00050 #include <daemon.h>
00051 #include <configuration.h>
00052 #include "network_graph.h"
00053 #include "source_node_graph.h"
00054 #include "ksection.h"
00055 #include "mini_mpi_server.h"
00056 
00057 
00058 #define LOG_FILE        stderr
00059 #define SIM_PORT        4955
00060 #define CFG_PORT        4799
00061 
00062 #define MAXCMDSIZE      4096  
00063 #define BACKLOG         10
00064 
00065 pthread_mutex_t cfg_mutex;
00066 
00067 
00068 static int __parse_gui_cmd(char *string, Configuration *config)
00069 {
00070         int no_config = 1;
00071 
00072         if (strncmp(string, "open ", 5) == 0) {
00073                 char *file = string + 5;
00074                 if (config->modded())
00075                         config->clear();
00076                 no_config = config->open(file);
00077         } else if (strncmp(string, "save ", 5) == 0) {
00078                 char *file = string + 5;
00079                 no_config = config->save(file);
00080         } else if (strncmp(string, "quit", 4) == 0) {
00081                 daemon_quit();
00082         } else if ((strncmp(string, "halt", 4) == 0) ||
00083                    (strncmp(string, "shutdown", 8) == 0)) {
00084                 daemon_shutdown();
00085         } else if (strncmp(string, "<?xml", 5) == 0) {
00086                 if (config->modded())
00087                         config->clear();
00088                 config->xml_to_cfg(string);
00089                 no_config = 0;
00090         }
00091         return no_config;
00092 }
00093 
00094 
00095 static void __parse_gui_halt(char *string)
00096 {
00097         if (strncmp(string, "quit", 4) == 0) {
00098                 daemon_quit();
00099         } else if ((strncmp(string, "halt", 4) == 0) ||
00100                    (strncmp(string, "shutdown", 8) == 0)) {
00101                 daemon_shutdown();
00102         }
00103 }
00104 
00105 
00106 
00107 
00108 
00109 inline int cfg_is_locked(void)
00110 {
00111         if (pthread_mutex_trylock(&cfg_mutex) == EBUSY)
00112                 return 1;
00113 
00114         pthread_mutex_unlock(&cfg_mutex);
00115         return 0;
00116 }
00117 
00118 inline void cfg_lock(void)
00119 {
00120         pthread_mutex_lock(&cfg_mutex);
00121 }
00122 
00123 inline void cfg_release(void)
00124 {
00125         pthread_mutex_unlock(&cfg_mutex);
00126 }
00127 
00128 
00129 static void _load_config(void)
00130 {
00131         fprintf(LOG_FILE, "\t    Awaiting user configuration ... \n");
00132         fflush(LOG_FILE);
00133 
00134         pthread_mutex_lock(&cfg_mutex);
00135         pthread_mutex_unlock(&cfg_mutex);
00136 
00137         fprintf(LOG_FILE, "\t                  configuration loaded.\n");
00138         fflush(LOG_FILE);
00139 }
00140 
00141 
00142 
00143 
00144 
00145 void *get_configuration(void *cfg)
00146 {
00147         Configuration *config = (Configuration*)cfg;
00148         int listen_fd, handle_fd, configuring = 1;
00149         struct sockaddr_in my_addr;
00150         struct sockaddr_in their_addr;
00151         socklen_t sin_size;
00152         int yes = 1;
00153 
00154         if ((listen_fd = socket(PF_INET, SOCK_STREAM, 0)) == -1) {
00155                 fprintf(LOG_FILE, "socket: %s\n", strerror(errno));
00156                 exit(-1);
00157         }
00158         if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR,
00159                        &yes, sizeof(int)) == -1) {
00160                 fprintf(LOG_FILE, "setsockopt: %s\n", strerror(errno));
00161                 exit(-1);
00162         }
00163 
00164         my_addr.sin_family = AF_INET;
00165         my_addr.sin_port = htons(CFG_PORT);
00166         my_addr.sin_addr.s_addr = INADDR_ANY;
00167         memset(&(my_addr.sin_zero), '\0', 8);
00168 
00169         if (bind(listen_fd, (struct sockaddr *) &my_addr,
00170                  sizeof(struct sockaddr)) == -1) {
00171                 fprintf(LOG_FILE, "bind: %s\n", strerror(errno));
00172                 exit(-1);
00173         }
00174 
00175         sin_size = sizeof(struct sockaddr_in);
00176         if (listen(listen_fd, BACKLOG) == -1) {
00177                 fprintf(LOG_FILE, "listen: %s\n", strerror(errno));
00178                 exit(-1);
00179         }
00180 
00181         if (!cfg_is_locked())
00182                 cfg_lock();
00183 
00184         while (!daemon_shuttingdown()) {
00185                 int num_bytes, i;
00186                 char rcv_buf[MAXCMDSIZE];
00187 
00188                 if ((handle_fd = accept(listen_fd,
00189                                         (struct sockaddr *) &their_addr,
00190                                         &sin_size)) == -1) {
00191                         fprintf(LOG_FILE, "accept: %s\n", strerror(errno));
00192                         exit(-1);
00193                 }
00194                 if (setsockopt(handle_fd, IPPROTO_TCP, TCP_NODELAY,
00195                                &yes, sizeof(int)) == -1) {
00196                         fprintf(LOG_FILE, "setsockopt: %s\n", strerror(errno));
00197                         close(handle_fd);
00198                         exit(-1);
00199                 }
00200                 if (setsockopt(handle_fd, SOL_SOCKET, SO_REUSEADDR,
00201                                &yes, sizeof(int)) == -1) {
00202                         fprintf(LOG_FILE, "setsockopt: %s\n", strerror(errno));
00203                         close(handle_fd);
00204                         exit(-1);
00205                 }
00206 
00207                 while (configuring > 0) {
00208                         memset(rcv_buf, '\0', MAXCMDSIZE);
00209                         if ((num_bytes = recv(handle_fd, rcv_buf, 
00210                                               MAXCMDSIZE-1, 0)) < 0)
00211                                 fprintf(LOG_FILE, "recv: %s\n", strerror(errno));
00212 
00213                         for (i = strlen(rcv_buf); i >= 0; i--) {
00214                                 if (! isprint((unsigned char) rcv_buf[i]))
00215                                         rcv_buf[i] = '\0';
00216                                 else
00217                                         break;
00218                         }
00219 
00220                         if ((configuring = 
00221                              __parse_gui_cmd(rcv_buf, config)) < 0) {
00222                                 fprintf(LOG_FILE, "parsing command: %s\n",
00223                                         strerror(errno));
00224                         }
00225                 }
00226                 close(handle_fd);
00227                 cfg_release();
00228 
00229                 while (!daemon_quitting() || !daemon_shuttingdown()) {
00230                         if ((handle_fd = accept(listen_fd,
00231                                                 (struct sockaddr *) &their_addr,
00232                                                 &sin_size)) == -1) {
00233                                 fprintf(LOG_FILE, "accept: %s\n", 
00234                                         strerror(errno));
00235                                 exit(-1);
00236                         }
00237                         if (setsockopt(handle_fd, SOL_SOCKET, SO_REUSEADDR,
00238                                        &yes, sizeof(int)) == -1) {
00239                                 fprintf(LOG_FILE, "setsockopt: %s\n",
00240                                         strerror(errno));
00241                                 close(handle_fd);
00242                                 exit(-1);
00243                         }
00244                         fcntl(handle_fd, F_SETFL, O_NONBLOCK);
00245                         
00246                         memset(rcv_buf, '\0', MAXCMDSIZE);
00247                         if ((num_bytes = recv(handle_fd, rcv_buf, 
00248                                               MAXCMDSIZE-1, 0)) < 0)
00249                                 continue;
00250 
00251                         for (i = strlen(rcv_buf); i >= 0; i--) {
00252                                 if (! isprint((unsigned char) rcv_buf[i]))
00253                                         rcv_buf[i] = '\0';
00254                                 else
00255                                         break;
00256                         }
00257                         __parse_gui_halt(rcv_buf);
00258                 }
00259         }
00260         close(listen_fd);
00261         return NULL;
00262 }
00263 
00264 
00265 
00266 void *ctl_server(void *unused)
00267 {
00268         while (!daemon_shuttingdown()) {
00269                 pthread_t config_thread;
00270                 int i, rc, cfg_len, numprocs = 0;
00271                 char *config = NULL;
00272                 Ksection *assign;
00273                 Network_graph *network;
00274                 Configuration active_config;
00275 
00276                 pthread_mutex_init(&cfg_mutex, NULL);
00277 
00278                 if ((rc = pthread_create(&config_thread, NULL, 
00279                                          get_configuration, 
00280                                          &active_config)) != 0) {
00281                         fprintf(LOG_FILE, 
00282                                 "ERROR-- config server thread: %d\n", rc);
00283                         exit(-1);
00284                 }
00285 
00286                 
00287                 _load_config();
00288 
00289                 
00290                 network = new Network_graph(&active_config);
00291                 numprocs = get_cluster_size();  
00292                 if (numprocs <= 0) {
00293                         daemon_shutdown();
00294                         return NULL;
00295                 }                       
00296                 assign = new Ksection(network, numprocs);
00297 
00298                 
00299                 config = active_config.cfg_to_xml();
00300                 cfg_len = strlen(config);
00301 
00302                 
00303                 for (i = 0; i < numprocs; i++) {
00304                         int sock = get_task_socket(i);
00305                         int part_size = 0;
00306                         int numbytes = 0;
00307                         int assign_len = (numprocs * INTEGER_STRING) 
00308                                 + numprocs;
00309                         char assignmt[assign_len];
00310                         char len_str[INTEGER_STRING];
00311                         Partition_Node *pn = assign->array(i)->head;
00312                         memset(&assignmt, '\0', assign_len);
00313 
00314                         if (sock < 0)
00315                                 continue;
00316 
00317                         while (pn != NULL) {
00318                                 char node[INTEGER_STRING + 2];
00319                                 sprintf(node, "%d,", pn->id);
00320                                 strcat(assignmt, node);
00321                                 part_size++;
00322                                 pn = pn->next;
00323                         }
00324                         assignmt[strlen(assignmt)] = '\0';  
00325                         fprintf(LOG_FILE, 
00326                                 "  assignment %d : %s\n", i, assignmt);
00327 
00328                         sprintf(len_str, "%d", part_size);
00329                         send(sock, len_str, strlen(len_str), 0);
00330 
00331                         send(sock, assignmt, strlen(assignmt), 0);
00332 
00333                         sprintf(len_str, "%d", cfg_len);
00334                         send(sock, len_str, strlen(len_str), 0);
00335 
00336                         while (numbytes < cfg_len) {
00337                                 int sentbytes = send(sock, config + numbytes, 
00338                                                      cfg_len - numbytes, 0);
00339                                 numbytes += sentbytes;
00340                         }
00341                 }
00342 
00343                 while (!daemon_quitting() || !daemon_shuttingdown()) {
00345                 }
00346                 pthread_join(config_thread, NULL);
00347         }
00348         return NULL;
00349 }
00350 
00351 
00352 
00353 #ifdef OQ_TESTING
00354 int get_cluster_size(void)
00355 {
00356         return 0;
00357 }
00358 
00359 int get_task_socket(int unused)
00360 {
00361         return -1;
00362 }
00363 
00364 int cfg_test()
00365 {
00366         int sock, error = 0;
00367         struct sockaddr_in their_addr;
00368         struct hostent *he;
00369         Configuration config;
00370         char *xml;
00371 
00372         if ((error = config.open("../shared/test.cfg")) < 0) {
00373                 printf("  Failed to load test configuration.\n");
00374                 return error;
00375         }
00376         xml = config.cfg_to_xml();
00377 
00378         if ((he = gethostbyname("localhost")) == NULL) {
00379                 herror("gethostbyname");
00380                 return h_errno;
00381         }
00382 
00383         if ((sock = socket(PF_INET, SOCK_STREAM, 0)) == -1) {
00384                 fprintf(LOG_FILE, "socket: %s\n", strerror(errno));
00385                 return sock;
00386         }
00387 
00388         their_addr.sin_family = AF_INET;
00389         their_addr.sin_port = htons(CFG_PORT);
00390         their_addr.sin_addr = *((struct in_addr *)he->h_addr);
00391         memset(their_addr.sin_zero, '\0', sizeof(their_addr.sin_zero));
00392 
00393         sleep(4);  
00394         if ((error = connect(sock, (struct sockaddr*)&their_addr, 
00395                     sizeof(struct sockaddr))) == -1) {
00396                 fprintf(LOG_FILE, "connect: %s\n", strerror(errno));
00397                 close(sock);
00398                 return error;
00399         }
00400 
00401         if ((error = send(sock, xml, strlen(xml), 0)) == -1) {
00402                 fprintf(LOG_FILE, "send: %s\n", strerror(errno));
00403                 close(sock);
00404                 return error;
00405         }
00406 
00407         close(sock);
00408         return 0;
00409 }
00410 
00411 
00412 int main(int argc, char *argv[])
00413 {
00414         int error, rc = 0;
00415         pthread_t ctl;
00416 
00417         daemon_init();
00418 
00419         if ((rc = pthread_create(&ctl, NULL, ctl_server, NULL)) != 0) {
00420                 fprintf(LOG_FILE, "ERROR-- control server thread: %d\n", rc);
00421                 exit(-1);
00422         }
00423 
00424         error = cfg_test();
00425 
00426         daemon_shutdown();
00427         pthread_join(ctl, NULL);
00428         return error;
00429 }
00430 #endif