N-sim
Emulation and simulation of
Wireless Sensor Networks



   Home


   Project Page


   Download


   CVS



   Installation


   Configuration


   Plug-ins




 Hosted by
SourceForge.net Logo

/home/brennan/n-sim/Vaike/linux/system-addons/system/control/sensor_ctl.c

Go to the documentation of this file.
00001 
00014 /*
00015  * Copyright 2007. Los Alamos National Security, LLC. This material
00016  * was produced under U.S. Government contract DE-AC52-06NA25396 for
00017  * Los Alamos National Laboratory (LANL), which is operated by Los
00018  * Alamos National Security, LLC, for the Department of Energy. The
00019  * U.S. Government has rights to use, reproduce, and distribute this
00020  * software. NEITHER THE GOVERNMENT NOR LOS ALAMOS NATIONAL SECURITY,
00021  * LLC, MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES ANY LEGAL
00022  * LIABILITY FOR THE USE OF THIS SOFTWARE. If software is modified to
00023  * produce derivative works, such modified software should be clearly
00024  * marked, so as not to confuse it with the version available from LANL.
00025  *
00026  * Additionally, this program is free software; you can redistribute
00027  * it and/or modify it under the terms of the GNU General Public
00028  * License as published by the Free Software Foundation; version 2 of
00029  * the License. Accordingly, this program is distributed in the hope
00030  * it will be useful, but WITHOUT ANY WARRANTY; without even the
00031  * implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
00032  * PURPOSE. See the GNU General Public License for more details.
00033  */
00034 
00035 #include <stdio.h>
00036 #include <stdlib.h>
00037 #include <unistd.h>
00038 #include <string.h>
00039 #include <sys/time.h>
00040 #include <signal.h>
00041 #include <wait.h>
00042 #include <limits.h>
00043 #include <stdint.h>
00044 #include <gps.h>
00045 #include <pthread.h>
00046 #include <minisoap.h>
00047 #include <ipc.h>
00048 #include "sensor_ctl.h"
00049 
00050 
00051 #define BATTERY_DEV      "/dev/platx/batmon"
00052 #define CONFIG_PATH      "/etc/sensord.conf"
00053 #define GPS_PORT         "2947"
00054 #define IPC_PORT_BASE    3133
00055 
00056 
00057 struct {
00058         /* capabilities */
00059         sensor_t *sensors;
00060         app_t *applications;
00061 
00062         /* gps data */
00063         double fix_time;  /* in seconds since epoch */
00064         double satellites;
00065         double latitude;
00066         double longitude;
00067         double altitude;
00068 
00069         /* health data */
00070         double voltage;
00071         int num_procs;
00072         app_t *processes;
00073 } status;
00074 
00075 
00076 static pthread_cond_t cmd_conditional;
00077 static pthread_mutex_t cmd_mutex;
00078 static SoapEvent *command;
00079 
00080 static pthread_cond_t data_conditional;
00081 static pthread_mutex_t data_mutex;
00082 static SoapEvent *data;
00083 
00084 
00085 
00086 
00087 static void launch_process(app_t *proc)
00088 {
00089         char cmdline[512];
00090         sprintf(cmdline, "%s %u", proc->path, proc->ipc_port);
00091         if (!fork()) {
00092                 system(cmdline);
00093                 exit(1);
00094         }
00095         proc->running++;
00096 }
00097 
00098 
00099 static int query_gps(void)
00100 {
00101         struct gps_data_t *data = gps_open("127.0.0.1", GPS_PORT);
00102         if (data == NULL)
00103                 return -1;
00104 
00105         gps_poll(data);
00106         status.fix_time = data->fix.time;
00107         status.satellites = data->satellites_used;
00108 
00109         if (data->fix.mode == MODE_2D || data->fix.mode == MODE_3D) {
00110                 status.latitude = data->fix.latitude;
00111                 status.longitude = data->fix.longitude;
00112                 if (data->fix.mode == MODE_3D)
00113                         status.altitude = data->fix.altitude;
00114         }
00115         gps_close(data);
00116         return 0;
00117 }
00118 
00119 
00120 static int query_battery(void)
00121 {
00122         char line[256];
00123         FILE *batmon = fopen(BATTERY_DEV, "r");
00124         if (fgets(line, 255, batmon) == NULL)
00125                 return -1;
00126         if (sscanf(line, "%lfv", &status.voltage) < 1)
00127                 return -1;
00128         fclose(batmon);
00129         return 0;
00130 }
00131 
00132 
00133 static int query_processes(void)
00134 {
00135         int i;
00136         for (i = 0; i < status.num_procs; i++) {
00137                 int stat = 0;
00138                 app_t *this_process = &status.processes[i];
00139                 char search[256];
00140 
00141                 sprintf(search, "ps ax | grep %s | grep -v grep >/dev/null",
00142                         this_process->path);
00143                 stat = system(search);
00144                 if (WEXITSTATUS(stat) < 0)  /* process not listed in results */
00145                         launch_process(this_process);
00146         }
00147         return 0;
00148 }
00149 
00150 
00151 
00152 static int status_response(char *server, int port)
00153 {
00154         char tmp[256];
00155         SoapEvent evt;
00156         sensor_t *sensor;
00157 
00158         strcpy(evt.from, my_ip_addr);
00159         strcpy(evt.to, server);
00160         strcpy(evt.from_app, "sensord");
00161         strcpy(evt.to_app, "");
00162         evt.type = Event_Msg;
00163 
00164         sprintf(evt.msg, "<status>\n");
00165         for (sensor = status.sensors; sensor != NULL; sensor = sensor->next) {
00166                 if (strncmp(sensor->path+4, "gps", 3) == 0) {
00167                         if (query_gps() == 0) {
00168                                 sprintf(tmp, "<gps>\n<latitude>%lf</latitude>\n"
00169                                         "<longitude>%lf</longitude>\n</gps>\n",
00170                                         status.latitude, status.longitude);
00171                                 strcat(evt.msg, tmp);
00172                         }
00173                 }
00174                 if (strncmp(sensor->path+4, "battery", 7) == 0) {
00175                         if (query_battery() == 0) {
00176                                 sprintf(tmp, "<battery>\n<volts>%lf</volts>\n"
00177                                         "</battery>\n", status.voltage);
00178                                 strcat(evt.msg, tmp);
00179                         }
00180                 }
00181         }
00182         if (query_processes() == 0) {
00183                 app_t *proc;
00184                 for (proc = status.processes; proc != NULL; proc = proc->next) {
00185                         sprintf(tmp, 
00186                                 "<process restarts=\"%d\">%s</process>\n",
00187                                 proc->running - 1, proc->path);
00188                         strcat(evt.msg, tmp);
00189                 }
00190         }
00191         strcat(evt.msg, "</status>");
00192 
00193         return raise_soap_event(&evt, server, port);
00194 }
00195 
00196 
00197 static int provides_response(char *server, int port)
00198 {
00199         app_t *app;
00200         SoapEvent evt;
00201 
00202         strcpy(evt.from, my_ip_addr);
00203         strcpy(evt.to, "server");
00204         strcpy(evt.from_app, "sensord");
00205         strcpy(evt.to_app, "");
00206         evt.type = Event_Msg;
00207 
00208         sprintf(evt.msg, "<provides>\n");
00209         for (app = status.applications; app != NULL; app = app->next) {
00210                 strcat(evt.msg, app->path);
00211                 strncat(evt.msg, "\n", 1);
00212         }
00213         strcat(evt.msg, "</provides>");
00214 
00215         return raise_soap_event(&evt, server, port);
00216 }
00217 
00218 
00219 static int data_register_handler(char *remainder)
00220 {
00221         app_t *proc;
00222         unsigned int port;
00223         char *appname = "";
00224         char *sink_ip = "";
00225         unsigned int found = 0;
00226 
00227         // FIXME: format is app:id or just app; sink_ip is evt.from
00228         if (sscanf(remainder, "%s %s:%d", appname, sink_ip, &port) < 3) {
00229                 perror("sscanf");
00230                 return -1;
00231         }
00232 
00233         for (proc = status.processes; proc != NULL; proc = proc->next) {
00234                 if (strcmp(appname, proc->path) == 0) {
00235                         consumer_t *sink =
00236                                 (consumer_t*) malloc(sizeof(consumer_t));
00237                         strncpy(sink->host_ip, sink_ip, NET_ADDR_LEN);
00238                         sink->port = port;
00239                         sink->next = proc->sinks;
00240                         proc->sinks = sink;
00241                         found++;
00242                         break;
00243                 }
00244         }
00245         if (!found) {
00246                 app_t *app;
00247                 for (app = status.applications; app != NULL; app = app->next) {
00248                         if (strcmp(appname, app->path) == 0) {
00249                                 consumer_t *sink;
00250                                 proc = (app_t*) malloc(sizeof(app_t));
00251                                 memcpy(proc, app, sizeof(app_t));
00252                                 proc->running = 0;
00253                                 proc->next = NULL;
00254                                 sink = (consumer_t*) malloc(sizeof(consumer_t));
00255                                 strncpy(sink->host_ip, sink_ip, NET_ADDR_LEN);
00256                                 sink->port = port;
00257                                 sink->next = proc->sinks;
00258                                 proc->sinks = sink;
00259                                 proc->next = status.processes;
00260                                 status.processes = proc;
00261                                 found++;
00262                                 break;
00263                         }
00264                 }
00265         }
00266 
00267         if (!found)
00268                 fprintf(stderr, "Attempt to register for %s - not found\n",
00269                         appname);
00270 
00271         if (!proc->running)
00272                 launch_process(proc);
00273 
00274         return 0;
00275 }
00276 
00277 
00278 #ifdef DEBUG
00279 static void print_config(FILE *out)
00280 {
00281         sensor_t *sensor;
00282         app_t *app;
00283 
00284         fprintf(out, "# sensord.conf\n");
00285         fprintf(out, "\nipaddr:%s\n", my_ip_addr);
00286         for (sensor = status.sensors; sensor != NULL; sensor = sensor->next) {
00287                 fprintf(out, "\nsensors:");
00288                 if (sensor->next != NULL)
00289                         fprintf(out, "%s[%u],", sensor->path+4,
00290                                 sensor->ipc_port);
00291                 else
00292                         fprintf(out, "%s[%u];\n", sensor->path+4,
00293                                 sensor->ipc_port);
00294         }
00295         for (app = status.applications; app != NULL; app = app->next) {
00296                 app_t *req;
00297                 int paren = (app->required_apps != NULL ? 1 : 0);
00298 
00299                 fprintf(out, "\nprovides:");
00300                 fprintf(out, "%s[%u]", app->path, app->ipc_port);
00301 
00302                 if (paren)
00303                         fprintf(out, "(");
00304                 for (req = app->required_apps; req != NULL; req = req->next) {
00305                         if (strcmp(req->host, "") == 0)
00306                                 fprintf(out, "%s[%u]", req->path,
00307                                         req->ipc_port);
00308                         else
00309                                 fprintf(out, "%s:%s", req->path, req->host);
00310                 }
00311                 if (paren)
00312                         fprintf(out, ")");
00313 
00314                 if(app->next != NULL)
00315                         fprintf(out, ",");
00316                 else
00317                         fprintf(out, ";\n");
00318         }
00319 }
00320 #endif
00321 
00322 
00323 /* soap message callback */
00324 void sensor_ctl(SoapEvent *env)
00325 {
00326         if (strncmp(env->to, my_ip_addr, NET_ADDR_LEN) != 0)
00327                 return;
00328 
00329         if (env->type == Event_Msg) {
00330                 pthread_mutex_lock(&data_mutex);
00331                 data = env;
00332                 pthread_cond_signal(&data_conditional);
00333                 pthread_mutex_unlock(&data_mutex);
00334         } else if (env->type == Command_Msg) {
00335                 pthread_mutex_lock(&cmd_mutex);
00336                 command = env;
00337                 pthread_cond_signal(&cmd_conditional);
00338                 pthread_mutex_unlock(&cmd_mutex);
00339         }
00340         usleep(100);  /* for mutex settling */
00341 }
00342 
00343 
00344 
00345 static int msg_route(SoapEvent *env)
00346 {
00347         if (strcmp(env->to, "") == 0) {
00348                 // FIXME
00349         } else if (strcmp(env->to, "127.0.0.1") == 0 ||
00350                    strcmp(env->to, my_ip_addr) == 0) {  /* local */
00351                 app_t *proc;
00352                 for (proc = status.processes; proc != NULL;
00353                      proc = proc->next) {
00354                         if (strcmp(proc->path, env->to_app) == 0) {
00355                                 ipc_data_send(env);
00356                                 break;
00357                         }
00358                 }
00359         } else {  /* remote */
00360                 raise_soap_event(env, env->to, DEFAULT_MINISOAP_PORT);
00361         }
00362         return 0;
00363 }
00364 
00365 
00366 static void *data_listener(void *unused)
00367 {
00368         while(!quitting()) {
00369                 pthread_mutex_lock(&data_mutex);
00370                 pthread_cond_wait(&data_conditional, &data_mutex);
00371 
00372                 msg_route(data);
00373                 free(data);
00374                 pthread_mutex_unlock(&data_mutex);
00375         }
00376         return NULL;
00377 }
00378 
00379 
00380 static void *cmd_listener(void *unused)
00381 {
00382         while(!quitting()) {
00383                 char *cmd = "";
00384                 char *remainder = "";
00385 
00386                 pthread_mutex_lock(&cmd_mutex);
00387                 pthread_cond_wait(&cmd_conditional, &cmd_mutex);
00388 
00389                 if (sscanf(command->msg, "%s %s", cmd, remainder) < 1) {
00390                         perror("sscanf");
00391                         continue;
00392                 }
00393                 if (strncmp(cmd, "status", 6) == 0)
00394                         status_response(command->from, DEFAULT_MINISOAP_PORT);
00395                 else if (strncmp(cmd, "provides", 8) == 0)
00396                         provides_response(command->from, DEFAULT_MINISOAP_PORT);
00397                 else if (strncmp(cmd, "register", 8) == 0)
00398                         data_register_handler(remainder);
00399                 else
00400                         msg_route(command);
00401 
00402                 free(command);
00403                 pthread_mutex_unlock(&cmd_mutex);
00404         }
00405         return NULL;
00406 }
00407 
00408 
00409 
00410 int main(int argc, char *argv[])
00411 {
00412         int error = 0;
00413         char *config_name = CONFIG_PATH;
00414         FILE *config_file;
00415         int line_len = 255;
00416         char line[line_len + 1];
00417         unsigned int ports_count = IPC_PORT_BASE;
00418         unsigned int under_sensors = 0;
00419         unsigned int under_provides = 0;
00420         unsigned int bad_config = 0;
00421         app_t *app;
00422 
00423         if (argc != 1) {
00424                 fprintf(stderr, "Usage: %s\n", argv[0]);
00425                 exit(-1);
00426         }
00427 
00428         status.sensors = NULL;
00429         status.applications = NULL;
00430         status.processes = NULL;
00431 
00432         pthread_mutex_init(&cmd_mutex, NULL);
00433         pthread_cond_init(&cmd_conditional, NULL);
00434 
00435         if ((error = initialize_ipc(-1)) < 0)
00436                 return error;
00437 
00438         /* parse config file */
00439         config_file = fopen(config_name, "r");
00440         while (fgets(line, line_len, config_file) != NULL) {
00441                 char *current = "";
00442                 char *remainder = "";
00443                 if (sscanf(line, "%*s:%s", remainder) < 1) {
00444                         if (sscanf(line, "\t%s", remainder) < 1)
00445                                 continue;
00446                 }
00447 
00448                 if (strncmp(line, "#", 1) == 0)  /* a comment line */
00449                         continue;
00450                 else if (strncasecmp(line, "ipaddr", 6) == 0)
00451                         sscanf(remainder, "%s;", my_ip_addr);
00452                 else if (strncasecmp(line, "sensors", 7) == 0 ||
00453                          (strncmp(line, "\t", 1) == 0 && under_sensors)) {
00454                         sensor_t *prev = status.sensors;
00455                         while (sscanf(remainder, "%s, %s",
00456                                       current, remainder) > 0) {
00457                                 sensor_t *this_sensor =
00458                                         (sensor_t*) malloc(sizeof(sensor_t));
00459 
00460                                 if (current[strlen(current)-1] == ';') {
00461                                         current[strlen(current)-1] = '\0';
00462                                         under_sensors = 0;
00463                                 } else
00464                                         under_sensors++;
00465 
00466                                 strncpy(this_sensor->path, "raw-", 4);
00467                                 strncat(this_sensor->path, current,
00468                                         PATH_MAX-4);
00469                                 this_sensor->ipc_port = ports_count++;
00470                                 this_sensor->next = prev;
00471                                 prev = this_sensor;
00472                         }
00473                         status.sensors = prev;
00474                 } else if (strncasecmp(line, "provides", 9) == 0 ||
00475                            (strncmp(line, "\t", 1) == 0 && under_provides)) {
00476                         app_t *prev_app = status.applications;
00477                         while (sscanf(remainder, "%s, %s",
00478                                       current, remainder) > 0) {
00479                                 app_t *prev_required = NULL;
00480                                 app_t *this_app =
00481                                         (app_t*) malloc(sizeof(app_t));
00482                                 char *app = "";
00483                                 char *support = "";
00484                                 char *required = "";
00485 
00486                                 if (current[strlen(current)-1] == ';') {
00487                                         current[strlen(current)-1] = '\0';
00488                                         under_provides = 0;
00489                                 } else
00490                                         under_provides++;
00491 
00492                                 if (sscanf(current, "%s (%s)", app, support) < 2)
00493                                         app = current;
00494                                 strncpy(this_app->path, app, PATH_MAX);
00495                                 this_app->running = 0;
00496                                 memset(&this_app->host, '\0', NET_ADDR_LEN);
00497                                 this_app->ipc_port = ports_count++;
00498                                 this_app->sinks = NULL;
00499                                 this_app->next = prev_app;
00500                                 prev_app = this_app;
00501 
00502                                 while (sscanf(support, "%s, %s",
00503                                               required, support) > 0) {
00504                                         char *host_string = "";
00505                                         app_t *a_new = (app_t*) malloc(sizeof(app_t));
00506                                         if (sscanf(required, "%s:%s",
00507                                                    required, host_string) > 1)
00508                                                 strncpy(a_new->host, host_string, NET_ADDR_LEN);
00509                                         else
00510                                                 a_new->ipc_port = -1;
00511 
00512                                         strncpy(a_new->path, required, PATH_MAX);
00513                                         a_new->running = 0;
00514                                         a_new->sinks = NULL;
00515                                         a_new->next = prev_required;
00516                                         prev_required = a_new;
00517                                 }
00518                         }
00519                         status.applications = prev_app;
00520                 }
00521         }
00522         fclose(config_file);
00523 
00524         /* fill in ipc_port of required apps */
00525         for (app = status.applications; app != NULL; app = app->next) {
00526                 app_t *req_app;
00527                 for (req_app = app->required_apps; req_app != NULL;
00528                      req_app = req_app->next) {
00529                         if (req_app->ipc_port < 0) {
00530                                 int found = 0;
00531                                 sensor_t *my_sens;
00532                                 app_t *my_apps;
00533                                 for (my_sens = status.sensors; my_sens != NULL;
00534                                      my_sens = my_sens->next) {
00535                                         if (strcmp(my_sens->path,
00536                                                    req_app->path) == 0) {
00537                                                 req_app->ipc_port =
00538                                                         my_sens->ipc_port;
00539                                                 found++;
00540                                                 break;
00541                                         }
00542                                 }
00543                                 if (!found) {
00544                                         for (my_apps = status.applications;
00545                                              my_apps != NULL;
00546                                              my_apps = my_apps->next) {
00547                                                 if (strcmp(my_apps->path,
00548                                                            req_app->path) == 0) {
00549                                                         req_app->ipc_port =
00550                                                                 my_apps->ipc_port;
00551                                                         found++;
00552                                                         break;
00553                                                 }
00554                                         }
00555                                 }
00556                                 if (!found) {
00557                                         fprintf(stderr, "Error: %s requires "
00558                                                 "%s which is missing from the "
00559                                                 "configuration.\n",
00560                                                 app->path, req_app->path);
00561                                         bad_config++;
00562                                 }
00563                         }
00564                 }
00565         }
00566         if (bad_config)
00567                 exit(-1);
00568 
00569 #ifdef DEBUG
00570         print_config(stderr);
00571 #endif
00572 
00573         if (!fork()) {  /* daemon mode */
00574                 int error = 0;
00575                 pthread_t net_data_thread, net_cmd_thread;
00576                 setsid();
00577 
00578                 if (register_soap_handler(sensor_ctl) < 0) {
00579                         fprintf(stderr, "Soap server initialization failed\n");
00580                         exit(-1);
00581                 }
00582 
00583                 if (register_data_handler(IPC_DATA_PORT, msg_route) < 0) {
00584                         fprintf(stderr, "Data server initialization failed\n");
00585                         exit(-1);
00586                 }
00587 
00588                 if (register_cmd_handler(IPC_CMD_PORT, msg_route) < 0) {
00589                         fprintf(stderr, "Command server initialization failed\n");
00590                         exit(-1);
00591                 }
00592 
00593                 if ((error = pthread_create(&net_data_thread, NULL,
00594                                             data_listener, NULL)) < 0) {
00595                         fprintf(stderr, "Data client initialization failed\n");
00596                         exit(-1);
00597                 }                       
00598                 
00599                 if ((error = pthread_create(&net_cmd_thread, NULL,
00600                                             cmd_listener, NULL)) < 0) {
00601                         fprintf(stderr, "Command client initialization failed\n");
00602                         exit(-1);
00603                 }                       
00604                 
00605                 pthread_join(soap_thread, NULL);
00606                 pthread_join(net_data_thread, NULL);
00607                 pthread_join(net_cmd_thread, NULL);
00608                 pthread_join(data_thread, NULL);
00609                 pthread_join(cmd_thread, NULL);
00610 
00611                 exit(0);
00612         }
00613         return 0;
00614 }


© 2007, Los Alamos National Security, LLC.