00001
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #define _ipc_impl_
00036
00037 #include <stdio.h>
00038 #include <stdlib.h>
00039 #include <unistd.h>
00040 #include <errno.h>
00041 #include <string.h>
00042 #include <sys/time.h>
00043 #include <sys/types.h>
00044 #include <sys/socket.h>
00045 #include <netinet/in.h>
00046 #include <netinet/sctp.h>
00047 #include <arpa/inet.h>
00048 #include <netdb.h>
00049 #include <limits.h>
00050 #include <signal.h>
00051 #include <pthread.h>
00052 #include "minisoap.h"
00053 #include "sensor_ipc.h"
00054
00055
00056 enum msgtype { DATA, CMD };
00057
00058 pthread_t data_thread, cmd_thread;
00059
00060 static int data_send_fd, data_recv_fd;
00061 struct sockaddr_in data_send_addr;
00062 static unsigned int data_recv_port;
00063
00064 static int cmd_send_fd, cmd_recv_fd;
00065 struct sockaddr_in cmd_send_addr;
00066 static unsigned int cmd_recv_port = 0;
00067 static unsigned int initialized = 0;
00068 volatile static unsigned int __shutdown__;
00069
00070
00071
00072 static void sigint_handler(int s) {
00073 __shutdown__ = 1;
00074 }
00075
00076
00077 int quitting(void)
00078 {
00079 return __shutdown__ >= 1;
00080 }
00081
00082
00083
00084 long long get_time(void)
00085 {
00086 struct timeval now;
00087 gettimeofday(&now, NULL);
00088 return (now.tv_sec * 1000000) + now.tv_usec;
00089 }
00090
00091
00092 int initialize_ipc(int port)
00093 {
00094 int error, yes = 1;
00095 struct hostent *he;
00096
00097 if (!initialized) {
00098 __shutdown__ = 0;
00099 signal(SIGINT, sigint_handler);
00100 initialized = 1;
00101 }
00102
00103
00104 if ((data_send_fd = socket(PF_INET, SOCK_RDM, IPPROTO_SCTP)) < 0) {
00105 perror("IPC: socket");
00106 return data_send_fd;
00107 }
00108 if ((error = setsockopt(data_send_fd, SOL_SOCKET, SO_REUSEADDR,
00109 &yes, sizeof(int))) < 0) {
00110 perror("IPC: setsockopt (SO_REUSEADDR)");
00111 return error;
00112 }
00113
00114 if ((he = gethostbyname("127.0.0.1")) == NULL) {
00115 herror("gethostbyname");
00116 return -1;
00117 }
00118
00119 data_send_addr.sin_family = AF_INET;
00120 data_send_addr.sin_port = htons(IPC_DATA_PORT);
00121 data_send_addr.sin_addr = *((struct in_addr *)he->h_addr);
00122 memset(data_send_addr.sin_zero, '\0', sizeof(data_send_addr.sin_zero));
00123
00124 if ((error = bind(data_send_fd, (struct sockaddr *)&data_send_addr,
00125 sizeof(data_send_addr))) < 0) {
00126 perror("IPC: bind");
00127 return error;
00128 }
00129
00130
00131 if ((cmd_send_fd = socket(PF_INET, SOCK_RDM, IPPROTO_SCTP)) < 0) {
00132 perror("IPC: socket");
00133 return cmd_send_fd;
00134 }
00135 if ((error = setsockopt(cmd_send_fd, SOL_SOCKET, SO_REUSEADDR,
00136 &yes, sizeof(int))) < 0) {
00137 perror("IPC: setsockopt (SO_REUSEADDR)");
00138 return error;
00139 }
00140
00141 cmd_send_addr.sin_family = AF_INET;
00142 cmd_send_addr.sin_port = htons(IPC_CMD_PORT);
00143 cmd_send_addr.sin_addr = *((struct in_addr *)he->h_addr);
00144 memset(cmd_send_addr.sin_zero, '\0', sizeof(cmd_send_addr.sin_zero));
00145
00146 if ((error = bind(cmd_send_fd, (struct sockaddr *)&cmd_send_addr,
00147 sizeof(cmd_send_addr))) < 0) {
00148 perror("IPC: bind");
00149 return error;
00150 }
00151
00152 if (port > 0) {
00153 data_recv_port = port;
00154
00155
00156 if ((data_recv_fd = socket(PF_INET, SOCK_RDM,
00157 IPPROTO_SCTP)) < 0) {
00158 perror("IPC: socket");
00159 return data_recv_fd;
00160 }
00161 }
00162
00163
00164 return 0;
00165 }
00166
00167
00168
00169 static void disconnect(int fd)
00170 {
00171 struct sockaddr_in unspec;
00172 unspec.sin_family = AF_UNSPEC;
00173 unspec.sin_port = 0;
00174 unspec.sin_addr.s_addr = INADDR_ANY;
00175 memset(unspec.sin_zero, '\0', sizeof(unspec.sin_zero));
00176
00177 connect(fd, (struct sockaddr*)&unspec, sizeof(unspec));
00178 }
00179
00180
00181
00182 static int ipc_send(enum msgtype t, SoapEvent *evt)
00183 {
00184 int pkt_len;
00185 int error, total = 0;
00186 int fd, remaining = pkt_len;
00187 struct sockaddr_in their_addr;
00188 char buf[MAX_EVENT_LEN + 1];
00189
00190 if (evt == NULL)
00191 return -1;
00192
00193 SoapEvent_string(evt, buf, MAX_EVENT_LEN);
00194 pkt_len = strlen(buf);
00195
00196 if (t == DATA) {
00197 fd = data_send_fd;
00198 memcpy(&their_addr, &data_send_addr,
00199 sizeof(struct sockaddr_in));
00200 } else if (t == CMD) {
00201 fd = cmd_send_fd;
00202 memcpy(&their_addr, &cmd_send_addr,
00203 sizeof(struct sockaddr_in));
00204 } else
00205 return -1;
00206
00207 if ((error = connect(fd, (struct sockaddr*)&their_addr,
00208 sizeof(their_addr))) < 0)
00209 return error;
00210
00211 if ((error = send(fd, &pkt_len, sizeof(int), 0)) < 0)
00212 return error;
00213
00214 while (total < pkt_len) {
00215 if ((error = send(fd, buf + total, remaining, 0)) < 0)
00216 break;
00217 total += error;
00218 remaining -= error;
00219 }
00220
00221 disconnect(fd);
00222 return error;
00223 }
00224
00225
00226 inline int ipc_data_send(SoapEvent *evt)
00227 {
00228 return ipc_send(DATA, evt);
00229 }
00230
00231
00232 inline int ipc_cmd_send(SoapEvent *evt)
00233 {
00234 return ipc_send(CMD, evt);
00235 }
00236
00237
00238
00239
00240 static int ipc_recv(enum msgtype t, SoapEvent *evt)
00241 {
00242 int len = MAX_EVENT_LEN + 1;
00243 char *buf = (char*) malloc(len);
00244 int fd, buf_len = len;
00245 int pkt_len = len;
00246 int error, total = 0;
00247 int remaining = pkt_len;
00248 struct sockaddr_in their_addr;
00249 socklen_t addr_len = sizeof(struct sockaddr);
00250
00251 memset(buf, '\0', pkt_len);
00252 len = 0;
00253 if (t == DATA)
00254 fd = data_recv_fd;
00255 else if (t == CMD)
00256 fd = cmd_recv_fd;
00257 else
00258 return -1;
00259
00260 if (fd == 0 || evt == NULL)
00261 return -1;
00262
00263 if ((error = recvfrom(fd, &pkt_len, sizeof(int), 0,
00264 (struct sockaddr*)&their_addr, &addr_len)) < 0)
00265 return error;
00266
00267
00268 if ((error = connect(fd, (struct sockaddr*)&their_addr,
00269 sizeof(their_addr))) < 0)
00270 return error;
00271
00272 if (pkt_len >= buf_len) {
00273 if ((buf = realloc(buf, pkt_len + 1)) == NULL)
00274 return -1;
00275 memset(buf, '\0', pkt_len + 1);
00276 }
00277 remaining = pkt_len;
00278
00279 while (total < pkt_len) {
00280 if ((error = recv(fd, buf + total, remaining, 0)) <= 0)
00281 break;
00282 total += error;
00283 remaining -= error;
00284 }
00285 buf[total] = '\0';
00286 len = total + 1;
00287
00288 string_SoapEvent(buf, evt);
00289
00290 disconnect(fd);
00291 return error;
00292 }
00293
00294
00295 inline int ipc_data_recv(SoapEvent *evt)
00296 {
00297 return ipc_recv(DATA, evt);
00298 }
00299
00300
00301 inline int ipc_cmd_recv(SoapEvent *evt)
00302 {
00303 return ipc_recv(CMD, evt);
00304 }
00305
00306
00307
00308
00309 static void *cmd_listener(void *callback)
00310 {
00311 while (1) {
00312 SoapEvent evt;
00313 ipc_recv(CMD, &evt);
00314 ((ipc_recv_t) callback)(&evt);
00315 }
00316 return NULL;
00317 }
00318
00319
00320 int register_cmd_handler(int port, ipc_recv_t handler)
00321 {
00322 cmd_recv_port = port;
00323
00324
00325 if ((cmd_recv_fd = socket(PF_INET, SOCK_RDM, IPPROTO_SCTP)) < 0) {
00326 perror("IPC: socket");
00327 return cmd_recv_fd;
00328 }
00329
00330 return pthread_create(&cmd_thread, NULL, cmd_listener,
00331 (void*)handler);
00332 }
00333
00334
00335
00336
00337 static void *data_listener(void *callback)
00338 {
00339 while (1) {
00340 SoapEvent evt;
00341 ipc_recv(DATA, &evt);
00342 ((ipc_recv_t) callback)(&evt);
00343 }
00344 return NULL;
00345 }
00346
00347
00348 int register_data_handler(int port, ipc_recv_t handler)
00349 {
00350 data_recv_port = port;
00351
00352
00353 if ((data_recv_fd = socket(PF_INET, SOCK_RDM, IPPROTO_SCTP)) < 0) {
00354 perror("IPC: socket");
00355 return cmd_recv_fd;
00356 }
00357
00358 return pthread_create(&data_thread, NULL, data_listener,
00359 (void*)handler);
00360 }