00001
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include <stdio.h>
00036 #include <stdlib.h>
00037 #include <unistd.h>
00038 #include <string.h>
00039 #include <sys/time.h>
00040 #include <signal.h>
00041 #include <wait.h>
00042 #include <limits.h>
00043 #include <stdint.h>
00044 #include <gps.h>
00045 #include <pthread.h>
00046 #include <minisoap.h>
00047 #include <ipc.h>
00048 #include "sensor_ctl.h"
00049
00050
00051 #define BATTERY_DEV "/dev/platx/batmon"
00052 #define CONFIG_PATH "/etc/sensord.conf"
00053 #define GPS_PORT "2947"
00054 #define IPC_PORT_BASE 3133
00055
00056
00057 struct {
00058
00059 sensor_t *sensors;
00060 app_t *applications;
00061
00062
00063 double fix_time;
00064 double satellites;
00065 double latitude;
00066 double longitude;
00067 double altitude;
00068
00069
00070 double voltage;
00071 int num_procs;
00072 app_t *processes;
00073 } status;
00074
00075
00076 static pthread_cond_t cmd_conditional;
00077 static pthread_mutex_t cmd_mutex;
00078 static SoapEvent *command;
00079
00080 static pthread_cond_t data_conditional;
00081 static pthread_mutex_t data_mutex;
00082 static SoapEvent *data;
00083
00084
00085
00086
00087 static void launch_process(app_t *proc)
00088 {
00089 char cmdline[512];
00090 sprintf(cmdline, "%s %u", proc->path, proc->ipc_port);
00091 if (!fork()) {
00092 system(cmdline);
00093 exit(1);
00094 }
00095 proc->running++;
00096 }
00097
00098
00099 static int query_gps(void)
00100 {
00101 struct gps_data_t *data = gps_open("127.0.0.1", GPS_PORT);
00102 if (data == NULL)
00103 return -1;
00104
00105 gps_poll(data);
00106 status.fix_time = data->fix.time;
00107 status.satellites = data->satellites_used;
00108
00109 if (data->fix.mode == MODE_2D || data->fix.mode == MODE_3D) {
00110 status.latitude = data->fix.latitude;
00111 status.longitude = data->fix.longitude;
00112 if (data->fix.mode == MODE_3D)
00113 status.altitude = data->fix.altitude;
00114 }
00115 gps_close(data);
00116 return 0;
00117 }
00118
00119
00120 static int query_battery(void)
00121 {
00122 char line[256];
00123 FILE *batmon = fopen(BATTERY_DEV, "r");
00124 if (fgets(line, 255, batmon) == NULL)
00125 return -1;
00126 if (sscanf(line, "%lfv", &status.voltage) < 1)
00127 return -1;
00128 fclose(batmon);
00129 return 0;
00130 }
00131
00132
00133 static int query_processes(void)
00134 {
00135 int i;
00136 for (i = 0; i < status.num_procs; i++) {
00137 int stat = 0;
00138 app_t *this_process = &status.processes[i];
00139 char search[256];
00140
00141 sprintf(search, "ps ax | grep %s | grep -v grep >/dev/null",
00142 this_process->path);
00143 stat = system(search);
00144 if (WEXITSTATUS(stat) < 0)
00145 launch_process(this_process);
00146 }
00147 return 0;
00148 }
00149
00150
00151
00152 static int status_response(char *server, int port)
00153 {
00154 char tmp[256];
00155 SoapEvent evt;
00156 sensor_t *sensor;
00157
00158 strcpy(evt.from, my_ip_addr);
00159 strcpy(evt.to, server);
00160 strcpy(evt.from_app, "sensord");
00161 strcpy(evt.to_app, "");
00162 evt.type = Event_Msg;
00163
00164 sprintf(evt.msg, "<status>\n");
00165 for (sensor = status.sensors; sensor != NULL; sensor = sensor->next) {
00166 if (strncmp(sensor->path+4, "gps", 3) == 0) {
00167 if (query_gps() == 0) {
00168 sprintf(tmp, "<gps>\n<latitude>%lf</latitude>\n"
00169 "<longitude>%lf</longitude>\n</gps>\n",
00170 status.latitude, status.longitude);
00171 strcat(evt.msg, tmp);
00172 }
00173 }
00174 if (strncmp(sensor->path+4, "battery", 7) == 0) {
00175 if (query_battery() == 0) {
00176 sprintf(tmp, "<battery>\n<volts>%lf</volts>\n"
00177 "</battery>\n", status.voltage);
00178 strcat(evt.msg, tmp);
00179 }
00180 }
00181 }
00182 if (query_processes() == 0) {
00183 app_t *proc;
00184 for (proc = status.processes; proc != NULL; proc = proc->next) {
00185 sprintf(tmp,
00186 "<process restarts=\"%d\">%s</process>\n",
00187 proc->running - 1, proc->path);
00188 strcat(evt.msg, tmp);
00189 }
00190 }
00191 strcat(evt.msg, "</status>");
00192
00193 return raise_soap_event(&evt, server, port);
00194 }
00195
00196
00197 static int provides_response(char *server, int port)
00198 {
00199 app_t *app;
00200 SoapEvent evt;
00201
00202 strcpy(evt.from, my_ip_addr);
00203 strcpy(evt.to, "server");
00204 strcpy(evt.from_app, "sensord");
00205 strcpy(evt.to_app, "");
00206 evt.type = Event_Msg;
00207
00208 sprintf(evt.msg, "<provides>\n");
00209 for (app = status.applications; app != NULL; app = app->next) {
00210 strcat(evt.msg, app->path);
00211 strncat(evt.msg, "\n", 1);
00212 }
00213 strcat(evt.msg, "</provides>");
00214
00215 return raise_soap_event(&evt, server, port);
00216 }
00217
00218
00219 static int data_register_handler(char *remainder)
00220 {
00221 app_t *proc;
00222 unsigned int port;
00223 char *appname = "";
00224 char *sink_ip = "";
00225 unsigned int found = 0;
00226
00227
00228 if (sscanf(remainder, "%s %s:%d", appname, sink_ip, &port) < 3) {
00229 perror("sscanf");
00230 return -1;
00231 }
00232
00233 for (proc = status.processes; proc != NULL; proc = proc->next) {
00234 if (strcmp(appname, proc->path) == 0) {
00235 consumer_t *sink =
00236 (consumer_t*) malloc(sizeof(consumer_t));
00237 strncpy(sink->host_ip, sink_ip, NET_ADDR_LEN);
00238 sink->port = port;
00239 sink->next = proc->sinks;
00240 proc->sinks = sink;
00241 found++;
00242 break;
00243 }
00244 }
00245 if (!found) {
00246 app_t *app;
00247 for (app = status.applications; app != NULL; app = app->next) {
00248 if (strcmp(appname, app->path) == 0) {
00249 consumer_t *sink;
00250 proc = (app_t*) malloc(sizeof(app_t));
00251 memcpy(proc, app, sizeof(app_t));
00252 proc->running = 0;
00253 proc->next = NULL;
00254 sink = (consumer_t*) malloc(sizeof(consumer_t));
00255 strncpy(sink->host_ip, sink_ip, NET_ADDR_LEN);
00256 sink->port = port;
00257 sink->next = proc->sinks;
00258 proc->sinks = sink;
00259 proc->next = status.processes;
00260 status.processes = proc;
00261 found++;
00262 break;
00263 }
00264 }
00265 }
00266
00267 if (!found)
00268 fprintf(stderr, "Attempt to register for %s - not found\n",
00269 appname);
00270
00271 if (!proc->running)
00272 launch_process(proc);
00273
00274 return 0;
00275 }
00276
00277
00278 #ifdef DEBUG
00279 static void print_config(FILE *out)
00280 {
00281 sensor_t *sensor;
00282 app_t *app;
00283
00284 fprintf(out, "# sensord.conf\n");
00285 fprintf(out, "\nipaddr:%s\n", my_ip_addr);
00286 for (sensor = status.sensors; sensor != NULL; sensor = sensor->next) {
00287 fprintf(out, "\nsensors:");
00288 if (sensor->next != NULL)
00289 fprintf(out, "%s[%u],", sensor->path+4,
00290 sensor->ipc_port);
00291 else
00292 fprintf(out, "%s[%u];\n", sensor->path+4,
00293 sensor->ipc_port);
00294 }
00295 for (app = status.applications; app != NULL; app = app->next) {
00296 app_t *req;
00297 int paren = (app->required_apps != NULL ? 1 : 0);
00298
00299 fprintf(out, "\nprovides:");
00300 fprintf(out, "%s[%u]", app->path, app->ipc_port);
00301
00302 if (paren)
00303 fprintf(out, "(");
00304 for (req = app->required_apps; req != NULL; req = req->next) {
00305 if (strcmp(req->host, "") == 0)
00306 fprintf(out, "%s[%u]", req->path,
00307 req->ipc_port);
00308 else
00309 fprintf(out, "%s:%s", req->path, req->host);
00310 }
00311 if (paren)
00312 fprintf(out, ")");
00313
00314 if(app->next != NULL)
00315 fprintf(out, ",");
00316 else
00317 fprintf(out, ";\n");
00318 }
00319 }
00320 #endif
00321
00322
00323
00324 void sensor_ctl(SoapEvent *env)
00325 {
00326 if (strncmp(env->to, my_ip_addr, NET_ADDR_LEN) != 0)
00327 return;
00328
00329 if (env->type == Event_Msg) {
00330 pthread_mutex_lock(&data_mutex);
00331 data = env;
00332 pthread_cond_signal(&data_conditional);
00333 pthread_mutex_unlock(&data_mutex);
00334 } else if (env->type == Command_Msg) {
00335 pthread_mutex_lock(&cmd_mutex);
00336 command = env;
00337 pthread_cond_signal(&cmd_conditional);
00338 pthread_mutex_unlock(&cmd_mutex);
00339 }
00340 usleep(100);
00341 }
00342
00343
00344
00345 static int msg_route(SoapEvent *env)
00346 {
00347 if (strcmp(env->to, "") == 0) {
00348
00349 } else if (strcmp(env->to, "127.0.0.1") == 0 ||
00350 strcmp(env->to, my_ip_addr) == 0) {
00351 app_t *proc;
00352 for (proc = status.processes; proc != NULL;
00353 proc = proc->next) {
00354 if (strcmp(proc->path, env->to_app) == 0) {
00355 ipc_data_send(env);
00356 break;
00357 }
00358 }
00359 } else {
00360 raise_soap_event(env, env->to, DEFAULT_MINISOAP_PORT);
00361 }
00362 return 0;
00363 }
00364
00365
00366 static void *data_listener(void *unused)
00367 {
00368 while(!quitting()) {
00369 pthread_mutex_lock(&data_mutex);
00370 pthread_cond_wait(&data_conditional, &data_mutex);
00371
00372 msg_route(data);
00373 free(data);
00374 pthread_mutex_unlock(&data_mutex);
00375 }
00376 return NULL;
00377 }
00378
00379
00380 static void *cmd_listener(void *unused)
00381 {
00382 while(!quitting()) {
00383 char *cmd = "";
00384 char *remainder = "";
00385
00386 pthread_mutex_lock(&cmd_mutex);
00387 pthread_cond_wait(&cmd_conditional, &cmd_mutex);
00388
00389 if (sscanf(command->msg, "%s %s", cmd, remainder) < 1) {
00390 perror("sscanf");
00391 continue;
00392 }
00393 if (strncmp(cmd, "status", 6) == 0)
00394 status_response(command->from, DEFAULT_MINISOAP_PORT);
00395 else if (strncmp(cmd, "provides", 8) == 0)
00396 provides_response(command->from, DEFAULT_MINISOAP_PORT);
00397 else if (strncmp(cmd, "register", 8) == 0)
00398 data_register_handler(remainder);
00399 else
00400 msg_route(command);
00401
00402 free(command);
00403 pthread_mutex_unlock(&cmd_mutex);
00404 }
00405 return NULL;
00406 }
00407
00408
00409
00410 int main(int argc, char *argv[])
00411 {
00412 int error = 0;
00413 char *config_name = CONFIG_PATH;
00414 FILE *config_file;
00415 int line_len = 255;
00416 char line[line_len + 1];
00417 unsigned int ports_count = IPC_PORT_BASE;
00418 unsigned int under_sensors = 0;
00419 unsigned int under_provides = 0;
00420 unsigned int bad_config = 0;
00421 app_t *app;
00422
00423 if (argc != 1) {
00424 fprintf(stderr, "Usage: %s\n", argv[0]);
00425 exit(-1);
00426 }
00427
00428 status.sensors = NULL;
00429 status.applications = NULL;
00430 status.processes = NULL;
00431
00432 pthread_mutex_init(&cmd_mutex, NULL);
00433 pthread_cond_init(&cmd_conditional, NULL);
00434
00435 if ((error = initialize_ipc(-1)) < 0)
00436 return error;
00437
00438
00439 config_file = fopen(config_name, "r");
00440 while (fgets(line, line_len, config_file) != NULL) {
00441 char *current = "";
00442 char *remainder = "";
00443 if (sscanf(line, "%*s:%s", remainder) < 1) {
00444 if (sscanf(line, "\t%s", remainder) < 1)
00445 continue;
00446 }
00447
00448 if (strncmp(line, "#", 1) == 0)
00449 continue;
00450 else if (strncasecmp(line, "ipaddr", 6) == 0)
00451 sscanf(remainder, "%s;", my_ip_addr);
00452 else if (strncasecmp(line, "sensors", 7) == 0 ||
00453 (strncmp(line, "\t", 1) == 0 && under_sensors)) {
00454 sensor_t *prev = status.sensors;
00455 while (sscanf(remainder, "%s, %s",
00456 current, remainder) > 0) {
00457 sensor_t *this_sensor =
00458 (sensor_t*) malloc(sizeof(sensor_t));
00459
00460 if (current[strlen(current)-1] == ';') {
00461 current[strlen(current)-1] = '\0';
00462 under_sensors = 0;
00463 } else
00464 under_sensors++;
00465
00466 strncpy(this_sensor->path, "raw-", 4);
00467 strncat(this_sensor->path, current,
00468 PATH_MAX-4);
00469 this_sensor->ipc_port = ports_count++;
00470 this_sensor->next = prev;
00471 prev = this_sensor;
00472 }
00473 status.sensors = prev;
00474 } else if (strncasecmp(line, "provides", 9) == 0 ||
00475 (strncmp(line, "\t", 1) == 0 && under_provides)) {
00476 app_t *prev_app = status.applications;
00477 while (sscanf(remainder, "%s, %s",
00478 current, remainder) > 0) {
00479 app_t *prev_required = NULL;
00480 app_t *this_app =
00481 (app_t*) malloc(sizeof(app_t));
00482 char *app = "";
00483 char *support = "";
00484 char *required = "";
00485
00486 if (current[strlen(current)-1] == ';') {
00487 current[strlen(current)-1] = '\0';
00488 under_provides = 0;
00489 } else
00490 under_provides++;
00491
00492 if (sscanf(current, "%s (%s)", app, support) < 2)
00493 app = current;
00494 strncpy(this_app->path, app, PATH_MAX);
00495 this_app->running = 0;
00496 memset(&this_app->host, '\0', NET_ADDR_LEN);
00497 this_app->ipc_port = ports_count++;
00498 this_app->sinks = NULL;
00499 this_app->next = prev_app;
00500 prev_app = this_app;
00501
00502 while (sscanf(support, "%s, %s",
00503 required, support) > 0) {
00504 char *host_string = "";
00505 app_t *a_new = (app_t*) malloc(sizeof(app_t));
00506 if (sscanf(required, "%s:%s",
00507 required, host_string) > 1)
00508 strncpy(a_new->host, host_string, NET_ADDR_LEN);
00509 else
00510 a_new->ipc_port = -1;
00511
00512 strncpy(a_new->path, required, PATH_MAX);
00513 a_new->running = 0;
00514 a_new->sinks = NULL;
00515 a_new->next = prev_required;
00516 prev_required = a_new;
00517 }
00518 }
00519 status.applications = prev_app;
00520 }
00521 }
00522 fclose(config_file);
00523
00524
00525 for (app = status.applications; app != NULL; app = app->next) {
00526 app_t *req_app;
00527 for (req_app = app->required_apps; req_app != NULL;
00528 req_app = req_app->next) {
00529 if (req_app->ipc_port < 0) {
00530 int found = 0;
00531 sensor_t *my_sens;
00532 app_t *my_apps;
00533 for (my_sens = status.sensors; my_sens != NULL;
00534 my_sens = my_sens->next) {
00535 if (strcmp(my_sens->path,
00536 req_app->path) == 0) {
00537 req_app->ipc_port =
00538 my_sens->ipc_port;
00539 found++;
00540 break;
00541 }
00542 }
00543 if (!found) {
00544 for (my_apps = status.applications;
00545 my_apps != NULL;
00546 my_apps = my_apps->next) {
00547 if (strcmp(my_apps->path,
00548 req_app->path) == 0) {
00549 req_app->ipc_port =
00550 my_apps->ipc_port;
00551 found++;
00552 break;
00553 }
00554 }
00555 }
00556 if (!found) {
00557 fprintf(stderr, "Error: %s requires "
00558 "%s which is missing from the "
00559 "configuration.\n",
00560 app->path, req_app->path);
00561 bad_config++;
00562 }
00563 }
00564 }
00565 }
00566 if (bad_config)
00567 exit(-1);
00568
00569 #ifdef DEBUG
00570 print_config(stderr);
00571 #endif
00572
00573 if (!fork()) {
00574 int error = 0;
00575 pthread_t net_data_thread, net_cmd_thread;
00576 setsid();
00577
00578 if (register_soap_handler(sensor_ctl) < 0) {
00579 fprintf(stderr, "Soap server initialization failed\n");
00580 exit(-1);
00581 }
00582
00583 if (register_data_handler(IPC_DATA_PORT, msg_route) < 0) {
00584 fprintf(stderr, "Data server initialization failed\n");
00585 exit(-1);
00586 }
00587
00588 if (register_cmd_handler(IPC_CMD_PORT, msg_route) < 0) {
00589 fprintf(stderr, "Command server initialization failed\n");
00590 exit(-1);
00591 }
00592
00593 if ((error = pthread_create(&net_data_thread, NULL,
00594 data_listener, NULL)) < 0) {
00595 fprintf(stderr, "Data client initialization failed\n");
00596 exit(-1);
00597 }
00598
00599 if ((error = pthread_create(&net_cmd_thread, NULL,
00600 cmd_listener, NULL)) < 0) {
00601 fprintf(stderr, "Command client initialization failed\n");
00602 exit(-1);
00603 }
00604
00605 pthread_join(soap_thread, NULL);
00606 pthread_join(net_data_thread, NULL);
00607 pthread_join(net_cmd_thread, NULL);
00608 pthread_join(data_thread, NULL);
00609 pthread_join(cmd_thread, NULL);
00610
00611 exit(0);
00612 }
00613 return 0;
00614 }