00001
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <stdio.h>
00037 #include <stdlib.h>
00038 #include <unistd.h>
00039 #include <string.h>
00040 #include <sys/types.h>
00041 #include <sys/socket.h>
00042 #include <arpa/inet.h>
00043 #include <netdb.h>
00044 #include <pthread.h>
00045 #include <daemon.h>
00046 #include <virtual_time.h>
00047 #include "mini_mpi_server.h"
00048
00049 #define GVT_INTERVAL 60 // in seconds
00050
00051
00052 extern pthread_cond_t mpi_conditional;
00053 extern pthread_mutex_t mpi_mutex;
00054
00055
00056 void *virtual_time_server(void *unused)
00057 {
00058 int error, bcast, broadcast = 1;
00059 char *lvt = "LVT";
00060 char gvt[6 + sizeof(VT_timestamp)];
00061 struct hostent *he;
00062 struct sockaddr_in their_addr;
00063 int num_nodes = 0;
00064
00065 if ((he = gethostbyname("255.255.255.255")) == NULL) {
00066 herror("gethostbyname");
00067 return NULL;
00068 }
00069 if ((bcast = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
00070 perror("socket");
00071 return NULL;
00072 }
00073 if ((error = setsockopt(bcast, SOL_SOCKET, SO_BROADCAST,
00074 &broadcast, sizeof(broadcast))) < 0) {
00075 perror("setsockopt (SO_BROADCAST)");
00076 return NULL;
00077 }
00078
00079 their_addr.sin_family = AF_INET;
00080 their_addr.sin_port = htons(VT_PORT);
00081 their_addr.sin_addr = *((struct in_addr *)he->h_addr);
00082 memset(their_addr.sin_zero, '\0', sizeof their_addr.sin_zero);
00083
00084 pthread_mutex_lock(&mpi_mutex);
00085 pthread_cond_wait(&mpi_conditional, &mpi_mutex);
00086 num_nodes = get_cluster_size();
00087
00088 while(!daemon_shuttingdown()) {
00089 int i, numbytes;
00090 char buf[sizeof(gvt)];
00091 VT_timestamp min, *LVTs =
00092 (VT_timestamp*) malloc(sizeof(VT_timestamp) * num_nodes);
00093
00094 sleep(GVT_INTERVAL);
00095 if ((numbytes = sendto(bcast, lvt, strlen(lvt), 0,
00096 (struct sockaddr *)&their_addr,
00097 sizeof(struct sockaddr))) < 0) {
00098 perror("sendto");
00099 continue;
00100 }
00101
00102 for (i = 0; i < num_nodes; i++) {
00103 char lvt_str[VT_timestamp::string_len()];
00104 if ((numbytes = recvfrom(bcast, buf, sizeof(buf), 0,
00105 NULL, NULL)) < 0) {
00106 perror("recvfrom");
00107 continue;
00108 }
00109 buf[numbytes] = '\0';
00110 sscanf(buf, "LVT=%s", lvt_str);
00111 LVTs[i] = VT_timestamp(lvt_str);
00112 }
00113
00114 VT_timestamp::min_time(LVTs, num_nodes, min);
00115 sprintf(gvt, "GVT=%s", min.to_string());
00116
00117 if ((numbytes = sendto(bcast, gvt, strlen(gvt), 0,
00118 (struct sockaddr *)&their_addr,
00119 sizeof(struct sockaddr))) < 0) {
00120 perror("sendto");
00121 continue;
00122 }
00123 free(LVTs);
00124 }
00125 return NULL;
00126 }
00127
00128
00129 #ifdef OQ_TESTING
00130 int main(int argc, char *argv[])
00131 {
00132 daemon_init();
00133 virtual_time_server(NULL);
00134 return 0;
00135 }
00136 #endif