00001
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #define RAW_PACKETS
00036
00037
00038 #define _BSD_SOURCE
00039 #include <stdio.h>
00040 #include <stdlib.h>
00041 #include <string.h>
00042 #include <unistd.h>
00043 #include <sys/types.h>
00044 #include <sys/socket.h>
00045 #include <sys/time.h>
00046 #include <sys/select.h>
00047 #include <netinet/in.h>
00048 #include <arpa/inet.h>
00049 #include <syslog.h>
00050 #include <errno.h>
00051
00052 #ifdef RAW_PACKETS
00053 #include <netinet/in.h>
00054 #include <net/if.h>
00055 #include <net/if_arp.h>
00056 #include <asm/types.h>
00057 #include <linux/filter.h>
00058 #include <netinet/ip.h>
00059 #include <netinet/udp.h>
00060 #include <net/ethernet.h>
00061 #endif
00062
00063 #include "mesh_net.h"
00064 #include "mesh_protocol.h"
00065 #include "mesh_discovery.h"
00066 #include "mesh_link.h"
00067 #include "mesh_neighbor.h"
00068 #include "sensor_self.h"
00069
00070
00071
00072
00073
00074 static struct sock_filter mesh_filter[] = {
00075
00076 BPF_STMT (BPF_LD + BPF_H + BPF_ABS, 12),
00077 BPF_JUMP (BPF_JMP + BPF_JEQ + BPF_K, ETHERTYPE_IP, 0, 8),
00078
00079
00080 BPF_STMT (BPF_LD + BPF_B + BPF_ABS, 23),
00081 BPF_JUMP (BPF_JMP + BPF_JEQ + BPF_K, IPPROTO_UDP, 0, 6),
00082
00083
00084 BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 20),
00085 BPF_JUMP(BPF_JMP + BPF_JSET + BPF_K, 0x1fff, 4, 0),
00086
00087
00088 BPF_STMT (BPF_LDX + BPF_B + BPF_MSH, 14),
00089
00090
00091 BPF_STMT (BPF_LD + BPF_H + BPF_IND, 16),
00092 BPF_JUMP (BPF_JMP + BPF_JEQ + BPF_K, 67, 0, 1),
00093
00094
00095 BPF_STMT(BPF_RET+BPF_K, (u_int)-1),
00096
00097
00098 BPF_STMT(BPF_RET+BPF_K, 0),
00099 };
00100
00101
00102 static u_int32_t checksum(unsigned char *buf, unsigned nbytes, u_int32_t sum)
00103 {
00104 unsigned i;
00105
00106 for (i = 0; i < (nbytes & ~1U); i += 2) {
00107 sum += (u_int16_t) ntohs(*((u_int16_t *)(buf + i)));
00108
00109 if (sum > 0xFFFF)
00110 sum -= 0xFFFF;
00111 }
00112
00113
00114
00115 if (i < nbytes) {
00116 sum += buf [i] << 8;
00117
00118 if (sum > 0xFFFF)
00119 sum -= 0xFFFF;
00120 }
00121 return sum;
00122 }
00123
00124
00125
00126 static u_int32_t wrapsum(u_int32_t sum)
00127 {
00128 sum = ~sum & 0xFFFF;
00129 return htons(sum);
00130 }
00131
00132
00133
00134
00135
00136
00137
00138 static void assemble_ethernet_header(unsigned char *buf, unsigned *bufix,
00139 unsigned char *from, unsigned char *to)
00140 {
00141 struct ether_header eh;
00142
00143 if (to != NULL)
00144 memcpy (eh.ether_dhost, &to[1], sizeof(eh.ether_dhost));
00145 else
00146 memset (eh.ether_dhost, 0xff, sizeof(eh.ether_dhost));
00147
00148 if (from != NULL)
00149 memcpy (eh.ether_shost, &from[1], sizeof(eh.ether_shost));
00150 else
00151 memset (eh.ether_shost, 0x00, sizeof(eh.ether_shost));
00152
00153 eh.ether_type = htons(ETHERTYPE_IP);
00154
00155 memcpy (&buf[*bufix], &eh, ETHER_HDR_LEN);
00156 *bufix += ETHER_HDR_LEN;
00157 }
00158
00159
00160 static ssize_t decode_ethernet_header(unsigned char *buf, unsigned bufix,
00161 unsigned char *from)
00162 {
00163 struct ether_header eh;
00164 memcpy(&eh, buf + bufix, ETHER_HDR_LEN);
00165
00166 #ifdef USERLAND_FILTER
00167 if (ntohs (eh.ether_type) != ETHERTYPE_IP)
00168 return -1;
00169 #endif
00170 if (from != NULL) {
00171 memcpy(&from[1], eh.ether_shost, sizeof(eh.ether_shost));
00172 from[0] = ARPHRD_ETHER;
00173 }
00174
00175 return ETHER_HDR_LEN;
00176 }
00177
00178
00179 static void assemble_udp_ip_header (unsigned char *buf, unsigned *bufix,
00180 u_int32_t from, u_int32_t to, u_int32_t port,
00181 unsigned char *data, unsigned len)
00182 {
00183 struct ip ip;
00184 struct udphdr udp;
00185
00186
00187 ip.ip_v = 4;
00188 ip.ip_hl = 20;
00189 ip.ip_tos = IPTOS_LOWDELAY;
00190 ip.ip_len = htons(sizeof(ip) + sizeof(udp) + len);
00191 ip.ip_id = 0;
00192 ip.ip_off = 0;
00193 ip.ip_ttl = IPDEFTTL;
00194 ip.ip_p = IPPROTO_UDP;
00195 ip.ip_sum = 0;
00196 ip.ip_src.s_addr = from;
00197 ip.ip_dst.s_addr = to;
00198
00199
00200 ip.ip_sum = wrapsum (checksum ((unsigned char *)&ip, sizeof ip, 0));
00201
00202
00203 memcpy (&buf [*bufix], &ip, sizeof ip);
00204 *bufix += sizeof ip;
00205
00206
00207 udp.uh_sport = MESH_DISCOVERY_PORT;
00208 udp.uh_dport = port;
00209 udp.uh_ulen = htons(sizeof(udp) + len);
00210 memset (&udp.uh_sum, 0, sizeof udp.uh_sum);
00211
00212
00213
00214 udp.uh_sum = wrapsum (checksum ((unsigned char *)&udp, sizeof udp,
00215 checksum (data, len,
00216 checksum ((unsigned char *)
00217 &ip.ip_src,
00218 2 * sizeof ip.ip_src,
00219 IPPROTO_UDP +
00220 (u_int32_t)
00221 ntohs (udp.uh_ulen)))));
00222
00223
00224 memcpy (&buf [*bufix], &udp, sizeof udp);
00225 *bufix += sizeof udp;
00226 }
00227
00228
00229 static ssize_t decode_udp_ip_header(unsigned char *buf, unsigned bufix,
00230 struct sockaddr_in *from, unsigned buflen,
00231 unsigned *rbuflen)
00232 {
00233 unsigned char *data;
00234 struct ip ip;
00235 struct udphdr udp;
00236 unsigned char *upp, *endbuf;
00237 u_int32_t ip_len, ulen, pkt_len;
00238 u_int32_t sum, usum;
00239 static int ip_packets_seen;
00240 static int ip_packets_bad_checksum;
00241 static int udp_packets_seen;
00242 static int udp_packets_bad_checksum;
00243 static int udp_packets_length_checked;
00244 static int udp_packets_length_overflow;
00245 unsigned len;
00246
00247
00248 endbuf = buf + bufix + buflen;
00249
00250
00251 if ((buf + bufix + sizeof(ip)) > endbuf)
00252 return -1;
00253
00254
00255
00256
00257
00258 upp = buf + bufix;
00259 memcpy(&ip, upp, sizeof(ip));
00260 ip_len = (*upp & 0x0f) << 2;
00261 upp += ip_len;
00262
00263
00264 pkt_len = ntohs(ip.ip_len);
00265 if (pkt_len > buflen)
00266 return -1;
00267
00268
00269 if ((upp + sizeof(udp)) > endbuf)
00270 return -1;
00271
00272
00273 memcpy(&udp, upp, sizeof(udp));
00274
00275 #ifdef USERLAND_FILTER
00276
00277 if (ip.ip_p != IPPROTO_UDP)
00278 return -1;
00279
00280
00281 if (udp.uh_dport != MESH_DISCOVERY_PORT)
00282 return -1;
00283 #endif
00284
00285 ulen = ntohs(udp.uh_ulen);
00286 if (ulen < sizeof(udp))
00287 return -1;
00288
00289 udp_packets_length_checked++;
00290 if ((upp + ulen) > endbuf) {
00291 udp_packets_length_overflow++;
00292 if ((udp_packets_length_checked > 4) &&
00293 ((udp_packets_length_checked / udp_packets_length_overflow) < 2)) {
00294 syslog(LOG_INFO, "%d udp packets in %d too long - dropped",
00295 udp_packets_length_overflow, udp_packets_length_checked);
00296 udp_packets_length_overflow = 0;
00297 udp_packets_length_checked = 0;
00298 }
00299 return -1;
00300 }
00301
00302 if ((ulen < sizeof(udp)) || ((upp + ulen) > endbuf))
00303 return -1;
00304
00305
00306 ++ip_packets_seen;
00307 if (wrapsum (checksum (buf + bufix, ip_len, 0))) {
00308 ++ip_packets_bad_checksum;
00309 if (ip_packets_seen > 4 &&
00310 (ip_packets_seen / ip_packets_bad_checksum) < 2) {
00311 syslog(LOG_INFO, "%d bad IP checksums seen in %d packets",
00312 ip_packets_bad_checksum, ip_packets_seen);
00313 ip_packets_seen = ip_packets_bad_checksum = 0;
00314 }
00315 return -1;
00316 }
00317
00318
00319 memcpy(&from->sin_addr, &ip.ip_src, 4);
00320
00321
00322
00323
00324 data = upp + sizeof(udp);
00325 len = ulen - sizeof(udp);
00326 usum = udp.uh_sum;
00327 udp.uh_sum = 0;
00328
00329
00330
00331
00332 sum = wrapsum(checksum((unsigned char *)&udp, sizeof(udp),
00333 checksum(data, len,
00334 checksum((unsigned char *)&ip.ip_src,
00335 8, IPPROTO_UDP + ulen))));
00336
00337 udp_packets_seen++;
00338 if (usum && usum != sum) {
00339 udp_packets_bad_checksum++;
00340 if (udp_packets_seen > 4 &&
00341 (udp_packets_seen / udp_packets_bad_checksum) < 2) {
00342 syslog(LOG_INFO, "%d bad udp checksums in %d packets",
00343 udp_packets_bad_checksum, udp_packets_seen);
00344 udp_packets_seen = udp_packets_bad_checksum = 0;
00345 }
00346 return -1;
00347 }
00348
00349
00350 memcpy (&from -> sin_port, &udp.uh_sport, sizeof udp.uh_sport);
00351
00352
00353 if (rbuflen != NULL)
00354 *rbuflen = len;
00355
00356
00357 return ip_len + sizeof udp;
00358 }
00359
00360
00361
00362
00363
00364
00365 int mesh_socket(char *ifname, unsigned int port)
00366 {
00367 int sd, error, yes = 1;
00368
00369 #ifdef RAW_PACKETS
00370 struct sock_fprog filter;
00371 struct sockaddr my_addr;
00372
00373 if ((sd = socket(PF_PACKET, SOCK_PACKET, htons(ETH_P_ALL))) < 0) {
00374 syslog(LOG_ERR, "socket: %m");
00375 return sd;
00376 }
00377
00378
00379
00380
00381 if ((error = setsockopt(sd, SOL_IP, IP_PKTINFO, &yes, sizeof(int))) < 0) {
00382 syslog(LOG_ERR, "setsockopt (IP_PKTINFO) [%s]: %m", "RF aware routing");
00383 return error;
00384 }
00385
00386 memset(&my_addr, 0, sizeof(my_addr));
00387 my_addr.sa_family = AF_PACKET;
00388 strncpy(my_addr.sa_data, ifname, sizeof (my_addr.sa_data));
00389
00390 if ((error = bind(sd, (struct sockaddr*)&my_addr, sizeof(my_addr))) < 0) {
00391 syslog(LOG_ERR, "bind: %m");
00392 return error;
00393 }
00394
00395 filter.len = sizeof(mesh_filter)/sizeof(struct sock_filter);
00396 filter.filter = mesh_filter;
00397 mesh_filter[8].k = ntohs((short)port);
00398
00399 if ((error = setsockopt(sd, SOL_SOCKET, SO_ATTACH_FILTER,
00400 &filter, sizeof(filter))) < 0) {
00401 syslog(LOG_ERR, "setsockopt [ATTACH_FILTER] %m");
00402 return error;
00403 }
00404 #else
00405 struct sockaddr_in my_addr;
00406
00407 if ((sd = socket(PF_INET, SOCK_DGRAM, 0)) < 0) {
00408 syslog(LOG_ERR, "socket: %m");
00409 return sd;
00410 }
00411
00412 my_addr.sin_family = AF_INET;
00413 my_addr.sin_port = htons(port);
00414 my_addr.sin_addr.s_addr = INADDR_ANY;
00415 memset(my_addr.sin_zero, 0, sizeof(my_addr.sin_zero));
00416
00417 if ((error = bind(sd, (struct sockaddr*)&my_addr,
00418 sizeof(struct sockaddr))) < 0) {
00419 syslog(LOG_ERR, "bind: %m");
00420 return error;
00421 }
00422 #endif
00423 return sd;
00424 }
00425
00426
00427
00428 #ifdef RTSCTS
00429 struct {
00430 unsigned int port = 4938;
00431 u_int64_t cts_time;
00432 } rtscts;
00433
00434 enum rc { RTS, CTS};
00435
00436 typedef struct {
00437 u_int8_t rtscts;
00438 } rtscts_hdr_t;
00439
00440
00441 int request_to_send_clear_to_send(protocol_t *proto)
00442 {
00443 while (1) {
00444 rtscts_hdr_t hdr;
00445
00446 if (hdr.rtscts == RTS) {
00447 if (get_current_time() - rtscts.cts_time < THRESHOLD_A)
00448 continue;
00449 else {
00450 rtscts.rts_time = get_current_time();
00451 sendto(CTS, bcast, rtscts.port);
00452 }
00453 } else if (hdr.rtscts == CTS) {
00454 rtscts.cts_time = get_current_time();
00455
00456 }
00457 }
00458 return 0;
00459 }
00460 #endif
00461
00462
00463 ssize_t mesh_sendmsg(char *ifname, char *ifaddr,
00464 int sd, void *buf, size_t len, struct sockaddr_in *to)
00465 {
00466 #ifdef RAW_PACKETS
00467 size_t data_len = len + 1536;
00468 double hh[16];
00469 unsigned hh_len = 0, ih_len = 0;
00470 int alignment;
00471 struct sockaddr sa;
00472 #else
00473 size_t data_len = len;
00474 #endif
00475 unsigned char data[data_len];
00476
00477 #ifdef RTSCTS
00478 u_int64_t hesitate;
00479 rtscts_hdr_t hdr;
00480
00481 top:
00482
00483 if ((hestate = get_current_time() - rtscts.cts_time) < THRESHOLD_A)
00484 usleep((unsigned long)hesitate * 1000);
00485 else {
00486 hdr.rtscts = RTS;
00487 sendto(&hdr, sizeof(rtscts_hdr_t), bcast, rtscts.port);
00488 }
00489
00490 #endif
00491
00492 #ifdef RAW_PACKETS
00493 assemble_ethernet_header((unsigned char*)hh, &hh_len, ifaddr, NULL);
00494 alignment = hh_len % 4;
00495 memcpy(data+alignment, (unsigned char*)hh, hh_len);
00496 ih_len = hh_len + alignment;
00497 assemble_udp_ip_header(data, &ih_len, 0, to->sin_addr.s_addr, to->sin_port,
00498 (unsigned char*)buf, len);
00499 memcpy(data+ih_len, buf, len);
00500
00501 memset(&sa, 0, sizeof(struct sockaddr));
00502 sa.sa_family = AF_PACKET;
00503 strncpy(sa.sa_data, ifname, sizeof(sa.sa_data));
00504
00505 return sendto(sd, data, data_len, 0, &sa, sizeof(struct sockaddr));
00506 #else
00507 memcpy(data, buf, data_len);
00508
00509 return sendto(sd, data, data_len, 0,
00510 (struct sockaddr*)to, sizeof(struct sockaddr));
00511 #endif
00512 }
00513
00514
00515
00516 ssize_t mesh_recvmsg(int sd, void *buf, size_t len, struct sockaddr_in *from,
00517 struct msghdr *msg_header, size_t ctl_msg_size)
00518 {
00519 struct iovec io_vector;
00520 int nbytes;
00521 char ctl_msg[ctl_msg_size];
00522
00523 #ifdef RAW_PACKETS
00524 int offset = 0;
00525 unsigned buf_idx = 0;
00526 ssize_t len_rcvd;
00527 int reply_size = len + 1536;
00528 unsigned char reply[reply_size];
00529 unsigned char hfrom[8];
00530 #else
00531 int reply_size = len;
00532 void *reply = buf;
00533 #endif
00534
00535 memset(reply, 0, reply_size);
00536 memset(msg_header, 0, sizeof(struct msghdr));
00537
00538 io_vector.iov_base = reply;
00539 io_vector.iov_len = reply_size;
00540
00541 msg_header->msg_name = from;
00542 msg_header->msg_namelen = sizeof(struct sockaddr_in);
00543 msg_header->msg_iov = &io_vector;
00544 msg_header->msg_iovlen = 1;
00545 msg_header->msg_control = ctl_msg;
00546 msg_header->msg_controllen = ctl_msg_size;
00547 msg_header->msg_flags = 0;
00548
00549 if ((nbytes = recvmsg(sd, msg_header, 0)) < 0)
00550 return nbytes;
00551
00552 #ifdef RAW_PACKETS
00553 if ((offset = decode_ethernet_header(reply, buf_idx, hfrom)) < 0)
00554 return 0;
00555 buf_idx += offset;
00556 nbytes -= offset;
00557 if ((offset = decode_udp_ip_header(reply, buf_idx, from, nbytes, &len_rcvd)) < 0)
00558 return 0;
00559
00560 buf_idx += offset;
00561 nbytes -= offset;
00562 if (nbytes != reply_size) {
00563 errno = EMSGSIZE;
00564 return -1;
00565 }
00566 memcpy(buf, &reply[buf_idx], len);
00567 #else
00568 if (nbytes != reply_size) {
00569 errno = EMSGSIZE;
00570 return -1;
00571 }
00572 #endif
00573 analyze(self, msg_header);
00574
00575 return nbytes;
00576 }
00577
00578
00579 int timed_recv(int sd, void *buf, int bytes,
00580 struct sockaddr_in *sa, int timeout)
00581 {
00582 int n, num_bytes;
00583 fd_set fds;
00584 struct timeval tv;
00585 socklen_t addr_len = sizeof(struct sockaddr);
00586
00587 FD_ZERO(&fds);
00588 FD_SET(sd, &fds);
00589 tv.tv_sec = timeout;
00590 tv.tv_usec = 0;
00591
00592 n = select(sd+1, &fds, NULL, NULL, &tv);
00593 if (n == 0) {
00594 errno = ETIMEDOUT;
00595 return -2;
00596 }
00597 if (n < 0)
00598 return -1;
00599 num_bytes = recvfrom(sd, buf, bytes, 0, (struct sockaddr*)sa, &addr_len);
00600 #ifdef RAW_PACKETS
00601 {
00602 unsigned char reply[bytes + 1536];
00603 unsigned buf_idx = 0;
00604 int length, offset = 0;
00605 unsigned char hfrom[8];
00606
00607 if ((offset = decode_ethernet_header(reply, buf_idx, hfrom)) < 0)
00608 return 0;
00609 buf_idx += offset;
00610 num_bytes -= offset;
00611 if ((offset = decode_udp_ip_header(reply, buf_idx, sa, bytes,
00612 &length)) < 0)
00613 return 0;
00614
00615 buf_idx += offset;
00616 num_bytes -= offset;
00617 if (num_bytes != length) {
00618 errno = EMSGSIZE;
00619 return -1;
00620 }
00621 memcpy(buf, &reply[buf_idx], length);
00622 }
00623 #endif
00624 return num_bytes;
00625 }
00626
00627
00628 int timed_recv_peek(int sd, void *buf, int bytes,
00629 struct sockaddr_in *sa, int timeout)
00630 {
00631 int n, num_bytes;
00632 fd_set fds;
00633 struct timeval tv;
00634 socklen_t addr_len = sizeof(struct sockaddr);
00635
00636 FD_ZERO(&fds);
00637 FD_SET(sd, &fds);
00638 tv.tv_sec = timeout;
00639 tv.tv_usec = 0;
00640
00641 n = select(sd+1, &fds, NULL, NULL, &tv);
00642 if (n == 0) {
00643 errno = ETIMEDOUT;
00644 return -2;
00645 }
00646 if (n < 0)
00647 return -1;
00648 num_bytes = recvfrom(sd, buf, bytes, MSG_PEEK|MSG_WAITALL,
00649 (struct sockaddr*)sa, &addr_len);
00650 #ifdef RAW_PACKETS
00651 {
00652 unsigned char reply[bytes + 1536];
00653 unsigned buf_idx = 0;
00654 int length, offset = 0;
00655 unsigned char hfrom[8];
00656
00657 if ((offset = decode_ethernet_header(reply, buf_idx, hfrom)) < 0)
00658 return 0;
00659 buf_idx += offset;
00660 num_bytes -= offset;
00661 if ((offset = decode_udp_ip_header(reply, buf_idx, sa, bytes,
00662 &length)) < 0)
00663 return 0;
00664
00665 buf_idx += offset;
00666 num_bytes -= offset;
00667 if (num_bytes != length) {
00668 errno = EMSGSIZE;
00669 return -1;
00670 }
00671 memcpy(buf, &reply[buf_idx], length);
00672 }
00673 #endif
00674 return num_bytes;
00675 }
00676
00677
00678
00679 static void pack_mesh_hdr(struct mesh_hdr *mh)
00680 {
00681 if (mh == NULL)
00682 return;
00683
00684 mh->protocol_id = htons(mh->protocol_id);
00685 mh->seq_num = htonl(mh->seq_num);
00686 }
00687
00688
00689 static void unpack_mesh_hdr(struct mesh_hdr *mh)
00690 {
00691 if (mh == NULL)
00692 return;
00693
00694 mh->protocol_id = ntohs(mh->protocol_id);
00695 mh->seq_num = ntohl(mh->seq_num);
00696 }
00697
00698
00699
00700 ssize_t mesh_raw_send(protocol_t *proto, int sd, void *buf, size_t len,
00701 struct sockaddr_in *to)
00702 {
00703 struct mesh_hdr mh;
00704 size_t offset = sizeof(struct mesh_hdr);
00705 size_t data_len = len + offset;
00706 unsigned char data[data_len];
00707
00708 mh.raw = 1;
00709 mh.nack = 0;
00710 mh.protocol_id = proto->unique_id;
00711 mh.seq_num = 0;
00712 pack_mesh_hdr(&mh);
00713
00714 memcpy(data, (unsigned char *)&mh, offset);
00715 memcpy(data + offset, buf, len);
00716
00717 return mesh_sendmsg(proto->sensor->interface, proto->sensor->mac_addr,
00718 sd, data, data_len, to);
00719 }
00720
00721
00722 ssize_t mesh_send(protocol_t *proto, int sd, void *buf, size_t len,
00723 struct sockaddr_in *to)
00724 {
00725 size_t offset = sizeof(struct mesh_hdr);
00726 size_t data_len = len + offset;
00727 unsigned char data[data_len];
00728 link_entry_t *entry = retrieve_entry(proto, to->sin_addr.s_addr);
00729 struct mesh_hdr mh;
00730 msg_queue_t *mq;
00731
00732 if (entry == NULL) {
00733 syslog(LOG_ERR, "no route to %s", inet_ntoa(to->sin_addr));
00734 return -1;
00735 }
00736
00737 mh.raw = 0;
00738 mh.nack = 0;
00739 mh.protocol_id = proto->unique_id;
00740 mh.seq_num = entry->send_seq_num;
00741 pack_mesh_hdr(&mh);
00742
00743 memcpy(data, (unsigned char *)&mh, offset);
00744 memcpy(data + offset, buf, len);
00745
00746 mq = malloc(offset);
00747 mq->data = malloc(data_len);
00748 memcpy(mq->data, data, data_len);
00749 mq->len = data_len;
00750 mq->port = to->sin_port;
00751 mq->next = NULL;
00752 add_msg(entry, mh.seq_num, mq);
00753
00754 entry->send_seq_num++;
00755 if (entry->send_seq_num > MAX_SEQNUM)
00756 entry->send_seq_num = 1;
00757
00758 return mesh_sendmsg(proto->sensor->interface, proto->sensor->mac_addr,
00759 sd, data, data_len, to);
00760 }
00761
00762
00763 ssize_t mesh_recv(self_t *self, int sd, void *buf, size_t len,
00764 struct sockaddr_in *from)
00765 {
00766 ssize_t nbytes = 0;
00767 size_t offset = sizeof(struct mesh_hdr);
00768 size_t data_len = len + offset;
00769 unsigned char data[data_len];
00770 seqnum_t diff;
00771 link_entry_t *entry;
00772 struct mesh_hdr mh;
00773 struct msghdr msghdr;
00774 protocol_t *proto = NULL;
00775
00776 recvr:
00777 if ((nbytes = mesh_recvmsg(sd, data, data_len, from,
00778 &msghdr, max_ctl_size(self))) < 0)
00779 return nbytes;
00780
00781 memcpy(&mh, data, offset);
00782 unpack_mesh_hdr(&mh);
00783
00784 if (mh.raw == 1)
00785 return nbytes;
00786
00787 proto = protocol_by_id(self, mh.protocol_id);
00788
00789 if (mh.nack == 1) {
00790 link_entry_t *link = retrieve_entry(proto, from->sin_addr.s_addr);
00791 msg_queue_t *msg = retrieve_msg(link, mh.seq_num);
00792 from->sin_port = msg->port;
00793 mesh_sendmsg(self->interface, self->mac_addr,
00794 proto->queue_sd, msg->data, msg->len, from);
00795 goto recvr;
00796 }
00797
00798 if ((entry = retrieve_entry(proto, from->sin_addr.s_addr)) == NULL) {
00799 syslog(LOG_ERR, "no route to %s: sequencing unhandled",
00800 inet_ntoa(from->sin_addr));
00801 return nbytes;
00802 }
00803
00804 if (entry->recv_seq_num > MAX_SEQNUM)
00805 entry->recv_seq_num = 0;
00806 if ((diff = mh.seq_num - entry->recv_seq_num) > 1) {
00807 int i;
00808 for (i = 0; i < diff; i++) {
00809 struct mesh_hdr mhdr;
00810 mhdr.raw = 0;
00811 mhdr.nack = 1;
00812 mhdr.protocol_id = proto->unique_id;
00813 mhdr.seq_num = entry->recv_seq_num + i;
00814 pack_mesh_hdr(&mhdr);
00815 mesh_sendmsg(proto->sensor->interface, proto->sensor->mac_addr,
00816 sd, &mhdr, offset, from);
00817 }
00818 }
00819 entry->recv_seq_num += diff;
00820 memcpy(buf, data + offset, len);
00821
00822 return nbytes;
00823 }
00824
00825
00826
00827 ssize_t mesh_send_nextdoor(self_t *self, int sd, void *buf, size_t len,
00828 double angle, double dist)
00829 {
00830 neighbor_entry_t *nbr = neighbor_by_position(self, angle, dist);
00831
00832 if (nbr == NULL) {
00833 errno = -EINVAL;
00834 return -1;
00835 }
00836 return sendto(sd, buf, len, 0,
00837 (struct sockaddr*)&nbr->ip, sizeof(struct sockaddr_in));
00838 }
00839
00840
00841 ssize_t mesh_recv_nextdoor(self_t *self, int sd, void *buf, size_t len,
00842 double *angle, double *dist)
00843 {
00844 neighbor_entry_t *nbr = NULL;
00845 ssize_t nbytes = 0;
00846 struct sockaddr_in from;
00847 socklen_t addrlen = sizeof(struct sockaddr_in);
00848
00849 nbytes = recvfrom(sd, buf, len, 0, (struct sockaddr*)&from, &addrlen);
00850 if ((nbr = neighbor_by_ip(self, &from)) == NULL)
00851 return -1;
00852
00853 *angle = nbr->bearing;
00854 *dist = nbr->distance;
00855 return nbytes;
00856 }
00857
00858
00859
00860 int reliable_net(self_t *self)
00861 {
00862 while (1) {
00863 u_int64_t min = UINT64_MAX;
00864 protocol_t *proto = self->protocols;
00865 while (proto != NULL) {
00866 link_entry_t *link = NULL;
00867 proto->iterator = NULL;
00868
00869 while ((link = iterate_entries(proto)) != NULL) {
00870 msg_queue_t *msg;
00871 loop:
00872 msg = (msg_queue_t*) avl_find_min(&link->send_queue)->element;
00873
00874 if (msg->timeout < get_current_time()) {
00875 remove_msg(link, msg->id, msg);
00876 free(msg);
00877 goto loop;
00878 } else if (msg->timeout < min)
00879 min = msg->timeout;
00880 }
00881 proto = proto->next;
00882 }
00883 sleep(min / 1000);
00884 }
00885 return -1;
00886 }