N-sim
Emulation and simulation of
Wireless Sensor Networks



   Home


   Project Page


   Download


   CVS



   Installation


   Configuration


   Plug-ins




 Hosted by
SourceForge.net Logo

/home/brennan/n-sim/Vaike/linux/system-addons/networking/mesh_net.c

Go to the documentation of this file.
00001 
00014 /*
00015  * Copyright 2007. Los Alamos National Security, LLC. This material
00016  * was produced under U.S. Government contract DE-AC52-06NA25396 for
00017  * Los Alamos National Laboratory (LANL), which is operated by Los
00018  * Alamos National Security, LLC, for the Department of Energy. The
00019  * U.S. Government has rights to use, reproduce, and distribute this
00020  * software. NEITHER THE GOVERNMENT NOR LOS ALAMOS NATIONAL SECURITY,
00021  * LLC, MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES ANY LEGAL
00022  * LIABILITY FOR THE USE OF THIS SOFTWARE. If software is modified to
00023  * produce derivative works, such modified software should be clearly
00024  * marked, so as not to confuse it with the version available from LANL.
00025  *
00026  * Additionally, this program is free software; you can redistribute
00027  * it and/or modify it under the terms of the GNU General Public
00028  * License as published by the Free Software Foundation; version 2 of
00029  * the License. Accordingly, this program is distributed in the hope
00030  * it will be useful, but WITHOUT ANY WARRANTY; without even the
00031  * implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
00032  * PURPOSE. See the GNU General Public License for more details.
00033  */
00034 
00035 #define RAW_PACKETS
00036 //#define RTSCTS
00037 
00038 #define _BSD_SOURCE
00039 #include <stdio.h>
00040 #include <stdlib.h>
00041 #include <string.h>
00042 #include <unistd.h>
00043 #include <sys/types.h>
00044 #include <sys/socket.h>
00045 #include <sys/time.h>
00046 #include <sys/select.h>
00047 #include <netinet/in.h>
00048 #include <arpa/inet.h>
00049 #include <syslog.h>
00050 #include <errno.h>
00051 
00052 #ifdef RAW_PACKETS
00053 #include <netinet/in.h>
00054 #include <net/if.h>
00055 #include <net/if_arp.h>
00056 #include <asm/types.h>
00057 #include <linux/filter.h>
00058 #include <netinet/ip.h>
00059 #include <netinet/udp.h>
00060 #include <net/ethernet.h>
00061 #endif
00062 
00063 #include "mesh_net.h"
00064 #include "mesh_protocol.h"
00065 #include "mesh_discovery.h"
00066 #include "mesh_link.h"
00067 #include "mesh_neighbor.h"
00068 #include "sensor_self.h"
00069 
00070 
00071 /**********************************************************/
00072 /***  stolen verbatim from the ISC DHCP implementation  ***/
00073 
00074 static struct sock_filter mesh_filter[] = {
00075         /* Make sure this is an IP packet... */
00076         BPF_STMT (BPF_LD + BPF_H + BPF_ABS, 12),
00077         BPF_JUMP (BPF_JMP + BPF_JEQ + BPF_K, ETHERTYPE_IP, 0, 8),
00078 
00079         /* Make sure it's a UDP packet... */
00080         BPF_STMT (BPF_LD + BPF_B + BPF_ABS, 23),
00081         BPF_JUMP (BPF_JMP + BPF_JEQ + BPF_K, IPPROTO_UDP, 0, 6),
00082 
00083         /* Make sure this isn't a fragment... */
00084         BPF_STMT(BPF_LD + BPF_H + BPF_ABS, 20),
00085         BPF_JUMP(BPF_JMP + BPF_JSET + BPF_K, 0x1fff, 4, 0),
00086 
00087         /* Get the IP header length... */
00088         BPF_STMT (BPF_LDX + BPF_B + BPF_MSH, 14),
00089 
00090         /* Make sure it's to the right port... */
00091         BPF_STMT (BPF_LD + BPF_H + BPF_IND, 16),
00092         BPF_JUMP (BPF_JMP + BPF_JEQ + BPF_K, 67, 0, 1),             /* patch */
00093 
00094         /* If we passed all the tests, ask for the whole packet. */
00095         BPF_STMT(BPF_RET+BPF_K, (u_int)-1),
00096 
00097         /* Otherwise, drop it. */
00098         BPF_STMT(BPF_RET+BPF_K, 0),
00099 };
00100 
00101 
00102 static u_int32_t checksum(unsigned char *buf, unsigned nbytes, u_int32_t sum)
00103 {
00104   unsigned i;
00105   /* Checksum all the pairs of bytes first... */
00106   for (i = 0; i < (nbytes & ~1U); i += 2) {
00107     sum += (u_int16_t) ntohs(*((u_int16_t *)(buf + i)));
00108     /* Add carry. */
00109     if (sum > 0xFFFF)
00110       sum -= 0xFFFF;
00111   }     
00112 
00113   /* If there's a single byte left over, checksum it, too.   Network
00114      byte order is big-endian, so the remaining byte is the high byte. */
00115   if (i < nbytes) {
00116     sum += buf [i] << 8;
00117     /* Add carry. */
00118     if (sum > 0xFFFF)
00119       sum -= 0xFFFF;
00120   }
00121   return sum;
00122 }
00123 
00124 
00125 /* Finish computing the checksum, and then put it into network byte order. */
00126 static u_int32_t wrapsum(u_int32_t sum)
00127 {
00128   sum = ~sum & 0xFFFF;
00129   return htons(sum);
00130 }
00131 
00132 
00133 /**********************************************************/
00134 /*************************************************/
00135 /****  based on the ISC DHCP implementation  *****/
00136 
00137 
00138 static void assemble_ethernet_header(unsigned char *buf, unsigned *bufix,
00139                                      unsigned char *from, unsigned char *to)
00140 {
00141   struct ether_header eh;
00142 
00143   if (to != NULL)
00144     memcpy (eh.ether_dhost, &to[1], sizeof(eh.ether_dhost));
00145   else
00146     memset (eh.ether_dhost, 0xff, sizeof(eh.ether_dhost));  /* bcast MAC */
00147 
00148   if (from != NULL)
00149     memcpy (eh.ether_shost, &from[1],  sizeof(eh.ether_shost));
00150   else
00151     memset (eh.ether_shost, 0x00, sizeof(eh.ether_shost));
00152 
00153   eh.ether_type = htons(ETHERTYPE_IP);
00154 
00155   memcpy (&buf[*bufix], &eh, ETHER_HDR_LEN);
00156   *bufix += ETHER_HDR_LEN;
00157 }
00158 
00159 
00160 static ssize_t decode_ethernet_header(unsigned char *buf, unsigned bufix,
00161                                       unsigned char *from)
00162 {
00163   struct ether_header eh;
00164   memcpy(&eh, buf + bufix, ETHER_HDR_LEN);
00165 
00166 #ifdef USERLAND_FILTER
00167   if (ntohs (eh.ether_type) != ETHERTYPE_IP)
00168           return -1;
00169 #endif
00170   if (from != NULL) {
00171     memcpy(&from[1], eh.ether_shost, sizeof(eh.ether_shost));
00172     from[0] = ARPHRD_ETHER;
00173   }
00174 
00175   return ETHER_HDR_LEN;
00176 }
00177 
00178 
00179 static void assemble_udp_ip_header (unsigned char *buf, unsigned *bufix,
00180                                     u_int32_t from, u_int32_t to, u_int32_t port,
00181                                     unsigned char *data, unsigned len)
00182 {
00183   struct ip ip;
00184   struct udphdr udp;
00185 
00186   /* Fill out the IP header */
00187   ip.ip_v = 4;
00188   ip.ip_hl = 20;
00189   ip.ip_tos = IPTOS_LOWDELAY;
00190   ip.ip_len = htons(sizeof(ip) + sizeof(udp) + len);
00191   ip.ip_id = 0;
00192   ip.ip_off = 0;
00193   ip.ip_ttl = IPDEFTTL;
00194   ip.ip_p = IPPROTO_UDP;
00195   ip.ip_sum = 0;
00196   ip.ip_src.s_addr = from;
00197   ip.ip_dst.s_addr = to;
00198 
00199   /* Checksum the IP header... */
00200   ip.ip_sum = wrapsum (checksum ((unsigned char *)&ip, sizeof ip, 0));
00201         
00202   /* Copy the ip header into the buffer... */
00203   memcpy (&buf [*bufix], &ip, sizeof ip);
00204   *bufix += sizeof ip;
00205 
00206   /* Fill out the UDP header */
00207   udp.uh_sport = MESH_DISCOVERY_PORT;   /* XXX */
00208   udp.uh_dport = port;                  /* XXX */
00209   udp.uh_ulen = htons(sizeof(udp) + len);
00210   memset (&udp.uh_sum, 0, sizeof udp.uh_sum);
00211 
00212   /* Compute UDP checksums, including the ``pseudo-header'', the UDP
00213      header and the data. */
00214   udp.uh_sum = wrapsum (checksum ((unsigned char *)&udp, sizeof udp,
00215                                   checksum (data, len, 
00216                                             checksum ((unsigned char *)
00217                                                       &ip.ip_src,
00218                                                       2 * sizeof ip.ip_src,
00219                                                       IPPROTO_UDP +
00220                                                       (u_int32_t)
00221                                                       ntohs (udp.uh_ulen)))));
00222 
00223   /* Copy the udp header into the buffer... */
00224   memcpy (&buf [*bufix], &udp, sizeof udp);
00225   *bufix += sizeof udp;
00226 }
00227 
00228 
00229 static ssize_t decode_udp_ip_header(unsigned char *buf, unsigned bufix,
00230                                     struct sockaddr_in *from, unsigned buflen,
00231                                     unsigned *rbuflen)
00232 {
00233   unsigned char *data;
00234   struct ip ip;
00235   struct udphdr udp;
00236   unsigned char *upp, *endbuf;
00237   u_int32_t ip_len, ulen, pkt_len;
00238   u_int32_t sum, usum;
00239   static int ip_packets_seen;
00240   static int ip_packets_bad_checksum;
00241   static int udp_packets_seen;
00242   static int udp_packets_bad_checksum;
00243   static int udp_packets_length_checked;
00244   static int udp_packets_length_overflow;
00245   unsigned len;
00246 
00247   /* Designate the end of the input buffer for bounds checks. */
00248   endbuf = buf + bufix + buflen;
00249 
00250   /* Assure there is at least an IP header there. */
00251   if ((buf + bufix + sizeof(ip)) > endbuf)
00252     return -1;
00253 
00254   /* Copy the IP header into a stack aligned structure for inspection.
00255    * There may be bits in the IP header that we're not decoding, so we
00256    * copy out the bits we grok and skip ahead by ip.ip_hl * 4.
00257    */
00258   upp = buf + bufix;
00259   memcpy(&ip, upp, sizeof(ip));
00260   ip_len = (*upp & 0x0f) << 2;
00261   upp += ip_len;
00262 
00263   /* Check the IP packet length. */
00264   pkt_len = ntohs(ip.ip_len);
00265   if (pkt_len > buflen)
00266     return -1;
00267 
00268   /* Assure after ip_len bytes that there is enough room for a UDP header. */
00269   if ((upp + sizeof(udp)) > endbuf)
00270     return -1;
00271 
00272   /* Copy the UDP header into a stack aligned structure for inspection. */
00273   memcpy(&udp, upp, sizeof(udp));
00274 
00275 #ifdef USERLAND_FILTER
00276   /* Is it a UDP packet? */
00277   if (ip.ip_p != IPPROTO_UDP)
00278     return -1;
00279 
00280   /* Is it to the port we're serving? */
00281   if (udp.uh_dport != MESH_DISCOVERY_PORT)
00282     return -1;
00283 #endif /* USERLAND_FILTER */
00284 
00285   ulen = ntohs(udp.uh_ulen);
00286   if (ulen < sizeof(udp))
00287     return -1;
00288 
00289   udp_packets_length_checked++;
00290   if ((upp + ulen) > endbuf) {
00291     udp_packets_length_overflow++;
00292     if ((udp_packets_length_checked > 4) &&
00293         ((udp_packets_length_checked / udp_packets_length_overflow) < 2)) {
00294       syslog(LOG_INFO, "%d udp packets in %d too long - dropped",
00295              udp_packets_length_overflow, udp_packets_length_checked);
00296       udp_packets_length_overflow = 0;
00297       udp_packets_length_checked = 0;
00298     }
00299     return -1;
00300   }
00301 
00302   if ((ulen < sizeof(udp)) || ((upp + ulen) > endbuf))
00303     return -1;
00304 
00305   /* Check the IP header checksum - it should be zero. */
00306   ++ip_packets_seen;
00307   if (wrapsum (checksum (buf + bufix, ip_len, 0))) {
00308     ++ip_packets_bad_checksum;
00309     if (ip_packets_seen > 4 &&
00310         (ip_packets_seen / ip_packets_bad_checksum) < 2) {
00311       syslog(LOG_INFO, "%d bad IP checksums seen in %d packets",
00312                 ip_packets_bad_checksum, ip_packets_seen);
00313       ip_packets_seen = ip_packets_bad_checksum = 0;
00314     }
00315     return -1;
00316   }
00317 
00318   /* Copy out the IP source address... */
00319   memcpy(&from->sin_addr, &ip.ip_src, 4);
00320 
00321   /* Compute UDP checksums, including the ``pseudo-header'', the UDP
00322      header and the data.   If the UDP checksum field is zero, we're
00323      not supposed to do a checksum. */
00324   data = upp + sizeof(udp);
00325   len = ulen - sizeof(udp);
00326   usum = udp.uh_sum;
00327   udp.uh_sum = 0;
00328 
00329   /* XXX: We have to pass &udp, because we have to zero the checksum
00330    * field before calculating the sum...'upp' isn't zeroed.
00331    */
00332   sum = wrapsum(checksum((unsigned char *)&udp, sizeof(udp),
00333                          checksum(data, len,
00334                                   checksum((unsigned char *)&ip.ip_src,
00335                                            8, IPPROTO_UDP + ulen))));
00336 
00337   udp_packets_seen++;
00338   if (usum && usum != sum) {
00339     udp_packets_bad_checksum++;
00340     if (udp_packets_seen > 4 &&
00341         (udp_packets_seen / udp_packets_bad_checksum) < 2) {
00342       syslog(LOG_INFO, "%d bad udp checksums in %d packets",
00343              udp_packets_bad_checksum, udp_packets_seen);
00344       udp_packets_seen = udp_packets_bad_checksum = 0;
00345     }
00346     return -1;
00347   }
00348 
00349   /* Copy out the port... */
00350   memcpy (&from -> sin_port, &udp.uh_sport, sizeof udp.uh_sport);
00351 
00352   /* Save the length of the UDP payload. */
00353   if (rbuflen != NULL)
00354     *rbuflen = len;
00355 
00356   /* Return the index to the UDP payload. */
00357   return ip_len + sizeof udp;
00358 }
00359 
00360 
00361 /*************************************************/
00362 
00363 
00364 
00365 int mesh_socket(char *ifname, unsigned int port)
00366 {
00367   int sd, error, yes = 1;
00368 
00369 #ifdef RAW_PACKETS
00370   struct sock_fprog filter;
00371   struct sockaddr my_addr;
00372 
00373   if ((sd = socket(PF_PACKET, SOCK_PACKET, htons(ETH_P_ALL))) < 0) {
00374     syslog(LOG_ERR, "socket: %m");
00375     return sd;
00376   }
00377 
00378   //TODO: SO_LINGER = 0 ???
00379 
00380   /* setup signal/noise gathering */
00381   if ((error = setsockopt(sd, SOL_IP, IP_PKTINFO, &yes, sizeof(int))) < 0) {
00382     syslog(LOG_ERR, "setsockopt (IP_PKTINFO) [%s]: %m", "RF aware routing");
00383     return error;
00384   }
00385 
00386   memset(&my_addr, 0, sizeof(my_addr));
00387   my_addr.sa_family = AF_PACKET;
00388   strncpy(my_addr.sa_data, ifname, sizeof (my_addr.sa_data));
00389 
00390   if ((error = bind(sd, (struct sockaddr*)&my_addr, sizeof(my_addr))) < 0) {
00391     syslog(LOG_ERR, "bind: %m");
00392     return error;
00393   }
00394 
00395   filter.len = sizeof(mesh_filter)/sizeof(struct sock_filter);
00396   filter.filter = mesh_filter;
00397   mesh_filter[8].k = ntohs((short)port);
00398 
00399   if ((error = setsockopt(sd, SOL_SOCKET, SO_ATTACH_FILTER,
00400                           &filter, sizeof(filter))) < 0) {
00401     syslog(LOG_ERR, "setsockopt [ATTACH_FILTER] %m");
00402     return error;
00403   }
00404 #else
00405   struct sockaddr_in my_addr;
00406 
00407   if ((sd = socket(PF_INET, SOCK_DGRAM, 0)) < 0) {
00408     syslog(LOG_ERR, "socket: %m");
00409     return sd;
00410   }
00411 
00412   my_addr.sin_family = AF_INET;
00413   my_addr.sin_port = htons(port);
00414   my_addr.sin_addr.s_addr = INADDR_ANY;
00415   memset(my_addr.sin_zero, 0, sizeof(my_addr.sin_zero));
00416 
00417   if ((error = bind(sd, (struct sockaddr*)&my_addr,
00418                     sizeof(struct sockaddr))) < 0) {
00419     syslog(LOG_ERR, "bind: %m");
00420     return error;
00421   }
00422 #endif
00423   return sd;
00424 }
00425 
00426 
00427 
00428 #ifdef RTSCTS
00429 struct {
00430   unsigned int port = 4938;
00431   u_int64_t cts_time;
00432 } rtscts;
00433 
00434 enum rc { RTS, CTS};
00435 
00436 typedef struct {
00437   u_int8_t rtscts;
00438 } rtscts_hdr_t;
00439 
00440 /* thread */
00441 int request_to_send_clear_to_send(protocol_t *proto)
00442 {
00443   while (1) {
00444     rtscts_hdr_t hdr;
00445     //listen - not from myself
00446     if (hdr.rtscts == RTS) {
00447       if (get_current_time() - rtscts.cts_time < THRESHOLD_A)
00448         continue;
00449       else {
00450         rtscts.rts_time = get_current_time();
00451         sendto(CTS, bcast, rtscts.port);
00452       }
00453     } else if (hdr.rtscts == CTS) {
00454       rtscts.cts_time = get_current_time();
00455       // release mutex
00456     }
00457   }
00458   return 0;
00459 }
00460 #endif
00461 
00462 
00463 ssize_t mesh_sendmsg(char *ifname, char *ifaddr,
00464                     int sd, void *buf, size_t len, struct sockaddr_in *to)
00465 {
00466 #ifdef RAW_PACKETS
00467   size_t data_len = len + 1536;
00468   double hh[16];
00469   unsigned hh_len = 0, ih_len = 0;
00470   int alignment;
00471   struct sockaddr sa;
00472 #else
00473   size_t data_len = len;
00474 #endif
00475   unsigned char data[data_len];
00476 
00477 #ifdef RTSCTS
00478   u_int64_t hesitate;
00479   rtscts_hdr_t hdr;
00480 
00481  top: 
00482 // TODO: implement rts/cts for packet sockets
00483   if ((hestate = get_current_time() - rtscts.cts_time) < THRESHOLD_A)
00484     usleep((unsigned long)hesitate * 1000);
00485   else {
00486     hdr.rtscts = RTS;
00487     sendto(&hdr, sizeof(rtscts_hdr_t), bcast, rtscts.port);
00488   }
00489   //obtain cts_mutex
00490 #endif
00491 
00492 #ifdef RAW_PACKETS
00493   assemble_ethernet_header((unsigned char*)hh, &hh_len, ifaddr, NULL);
00494   alignment = hh_len % 4;
00495   memcpy(data+alignment, (unsigned char*)hh, hh_len);
00496   ih_len = hh_len + alignment;
00497   assemble_udp_ip_header(data, &ih_len, 0, to->sin_addr.s_addr, to->sin_port,
00498                          (unsigned char*)buf, len);
00499   memcpy(data+ih_len, buf, len);
00500 
00501   memset(&sa, 0, sizeof(struct sockaddr));
00502   sa.sa_family = AF_PACKET;
00503   strncpy(sa.sa_data, ifname, sizeof(sa.sa_data));
00504 
00505   return sendto(sd, data, data_len, 0, &sa, sizeof(struct sockaddr));
00506 #else
00507   memcpy(data, buf, data_len);
00508 
00509   return sendto(sd, data, data_len, 0,
00510                 (struct sockaddr*)to, sizeof(struct sockaddr));
00511 #endif
00512 }
00513 
00514 
00515 
00516 ssize_t mesh_recvmsg(int sd, void *buf, size_t len, struct sockaddr_in *from,
00517                      struct msghdr *msg_header, size_t ctl_msg_size)
00518 {
00519   struct iovec io_vector;
00520   int nbytes;
00521   char ctl_msg[ctl_msg_size];
00522 
00523 #ifdef RAW_PACKETS
00524   int offset = 0;
00525   unsigned buf_idx = 0;
00526   ssize_t len_rcvd;
00527   int reply_size = len + 1536;
00528   unsigned char reply[reply_size];
00529   unsigned char hfrom[8];
00530 #else
00531   int reply_size = len;
00532   void *reply = buf;
00533 #endif
00534 
00535   memset(reply, 0, reply_size);
00536   memset(msg_header, 0, sizeof(struct msghdr));
00537 
00538   io_vector.iov_base = reply;
00539   io_vector.iov_len = reply_size;
00540 
00541   msg_header->msg_name = from;
00542   msg_header->msg_namelen = sizeof(struct sockaddr_in);
00543   msg_header->msg_iov = &io_vector;
00544   msg_header->msg_iovlen = 1;
00545   msg_header->msg_control = ctl_msg;
00546   msg_header->msg_controllen = ctl_msg_size;
00547   msg_header->msg_flags = 0;
00548 
00549   if ((nbytes = recvmsg(sd, msg_header, 0)) < 0)
00550     return nbytes;
00551 
00552 #ifdef RAW_PACKETS
00553   if ((offset = decode_ethernet_header(reply, buf_idx, hfrom)) < 0)
00554     return 0;
00555   buf_idx += offset;
00556   nbytes -= offset;
00557   if ((offset = decode_udp_ip_header(reply, buf_idx, from, nbytes, &len_rcvd)) < 0)
00558     return 0;
00559 
00560   buf_idx += offset;
00561   nbytes -= offset;
00562   if (nbytes != reply_size) {
00563     errno = EMSGSIZE;
00564     return -1;
00565   }
00566   memcpy(buf, &reply[buf_idx], len);
00567 #else
00568   if (nbytes != reply_size) {
00569     errno = EMSGSIZE;
00570     return -1;
00571   }
00572 #endif
00573   analyze(self, msg_header);  // FIXME: instead use registered packet analysis
00574 
00575   return nbytes;
00576 }
00577 
00578 
00579 int timed_recv(int sd, void *buf, int bytes,
00580                struct sockaddr_in *sa, int timeout)
00581 {
00582   int n, num_bytes;
00583   fd_set fds;
00584   struct timeval tv;
00585   socklen_t addr_len = sizeof(struct sockaddr);
00586 
00587   FD_ZERO(&fds);
00588   FD_SET(sd, &fds);
00589   tv.tv_sec = timeout;
00590   tv.tv_usec = 0;
00591 
00592   n = select(sd+1, &fds, NULL, NULL, &tv);
00593   if (n == 0) {
00594     errno = ETIMEDOUT;
00595     return -2;
00596   }
00597   if (n < 0)
00598     return -1;
00599   num_bytes = recvfrom(sd, buf, bytes, 0, (struct sockaddr*)sa, &addr_len);
00600 #ifdef RAW_PACKETS
00601   {
00602     unsigned char reply[bytes + 1536];
00603     unsigned buf_idx = 0;
00604     int length, offset = 0;
00605     unsigned char hfrom[8];
00606 
00607     if ((offset = decode_ethernet_header(reply, buf_idx, hfrom)) < 0)
00608       return 0;
00609     buf_idx += offset;
00610     num_bytes -= offset;
00611     if ((offset = decode_udp_ip_header(reply, buf_idx, sa, bytes,
00612                                        &length)) < 0)
00613       return 0;
00614 
00615     buf_idx += offset;
00616     num_bytes -= offset;
00617     if (num_bytes != length) {
00618       errno = EMSGSIZE;
00619       return -1;
00620     }
00621     memcpy(buf, &reply[buf_idx], length);
00622   }
00623 #endif
00624   return num_bytes;
00625 }
00626 
00627 
00628 int timed_recv_peek(int sd, void *buf, int bytes,
00629                     struct sockaddr_in *sa, int timeout)
00630 {
00631   int n, num_bytes;
00632   fd_set fds;
00633   struct timeval tv;
00634   socklen_t addr_len = sizeof(struct sockaddr);
00635 
00636   FD_ZERO(&fds);
00637   FD_SET(sd, &fds);
00638   tv.tv_sec = timeout;
00639   tv.tv_usec = 0;
00640 
00641   n = select(sd+1, &fds, NULL, NULL, &tv);
00642   if (n == 0) {
00643     errno = ETIMEDOUT;
00644     return -2;
00645   }
00646   if (n < 0)
00647     return -1;
00648   num_bytes = recvfrom(sd, buf, bytes, MSG_PEEK|MSG_WAITALL,
00649                   (struct sockaddr*)sa, &addr_len);
00650 #ifdef RAW_PACKETS
00651   {
00652     unsigned char reply[bytes + 1536];
00653     unsigned buf_idx = 0;
00654     int length, offset = 0;
00655     unsigned char hfrom[8];
00656 
00657     if ((offset = decode_ethernet_header(reply, buf_idx, hfrom)) < 0)
00658       return 0;
00659     buf_idx += offset;
00660     num_bytes -= offset;
00661     if ((offset = decode_udp_ip_header(reply, buf_idx, sa, bytes,
00662                                        &length)) < 0)
00663       return 0;
00664 
00665     buf_idx += offset;
00666     num_bytes -= offset;
00667     if (num_bytes != length) {
00668       errno = EMSGSIZE;
00669       return -1;
00670     }
00671     memcpy(buf, &reply[buf_idx], length);
00672   }
00673 #endif
00674   return num_bytes;
00675 }
00676 
00677 
00678 
00679 static void pack_mesh_hdr(struct mesh_hdr *mh)
00680 {
00681   if (mh == NULL)
00682     return;
00683 
00684   mh->protocol_id = htons(mh->protocol_id);
00685   mh->seq_num = htonl(mh->seq_num);
00686 }
00687 
00688 
00689 static void unpack_mesh_hdr(struct mesh_hdr *mh)
00690 {
00691   if (mh == NULL)
00692     return;
00693 
00694   mh->protocol_id = ntohs(mh->protocol_id);
00695   mh->seq_num = ntohl(mh->seq_num);
00696 }
00697 
00698 
00699 
00700 ssize_t mesh_raw_send(protocol_t *proto, int sd, void *buf, size_t len,
00701                         struct sockaddr_in *to)
00702 {
00703   struct mesh_hdr mh;
00704   size_t offset = sizeof(struct mesh_hdr);
00705   size_t data_len = len + offset;
00706   unsigned char data[data_len];
00707 
00708   mh.raw = 1;
00709   mh.nack = 0;
00710   mh.protocol_id = proto->unique_id;
00711   mh.seq_num = 0;
00712   pack_mesh_hdr(&mh);
00713 
00714   memcpy(data, (unsigned char *)&mh, offset);
00715   memcpy(data + offset, buf, len);
00716 
00717   return mesh_sendmsg(proto->sensor->interface, proto->sensor->mac_addr,
00718                       sd, data, data_len, to);
00719 }
00720 
00721 
00722 ssize_t mesh_send(protocol_t *proto, int sd, void *buf, size_t len,
00723                   struct sockaddr_in *to)
00724 {
00725   size_t offset = sizeof(struct mesh_hdr);
00726   size_t data_len = len + offset;
00727   unsigned char data[data_len];
00728   link_entry_t *entry = retrieve_entry(proto, to->sin_addr.s_addr);
00729   struct mesh_hdr mh;
00730   msg_queue_t *mq;
00731 
00732   if (entry == NULL) {
00733     syslog(LOG_ERR, "no route to %s", inet_ntoa(to->sin_addr));
00734     return -1;
00735   }
00736 
00737   mh.raw = 0;
00738   mh.nack = 0;
00739   mh.protocol_id = proto->unique_id;
00740   mh.seq_num = entry->send_seq_num;
00741   pack_mesh_hdr(&mh);
00742 
00743   memcpy(data, (unsigned char *)&mh, offset);
00744   memcpy(data + offset, buf, len);
00745 
00746   mq = malloc(offset);
00747   mq->data = malloc(data_len);
00748   memcpy(mq->data, data, data_len);
00749   mq->len = data_len;
00750   mq->port = to->sin_port;
00751   mq->next = NULL;
00752   add_msg(entry, mh.seq_num, mq);
00753 
00754   entry->send_seq_num++;
00755   if (entry->send_seq_num > MAX_SEQNUM)
00756     entry->send_seq_num = 1;
00757 
00758   return mesh_sendmsg(proto->sensor->interface, proto->sensor->mac_addr,
00759                       sd, data, data_len, to);
00760 }
00761 
00762 
00763 ssize_t mesh_recv(self_t *self, int sd, void *buf, size_t len,
00764                   struct sockaddr_in *from)
00765 {
00766   ssize_t nbytes = 0;
00767   size_t offset = sizeof(struct mesh_hdr);
00768   size_t data_len = len + offset;
00769   unsigned char data[data_len];
00770   seqnum_t diff;
00771   link_entry_t *entry;
00772   struct mesh_hdr mh;
00773   struct msghdr msghdr;
00774   protocol_t *proto = NULL;
00775 
00776  recvr:
00777   if ((nbytes = mesh_recvmsg(sd, data, data_len, from,
00778                              &msghdr, max_ctl_size(self))) < 0)
00779     return nbytes;
00780 
00781   memcpy(&mh, data, offset);
00782   unpack_mesh_hdr(&mh);
00783 
00784   if (mh.raw == 1)
00785     return nbytes;
00786 
00787   proto = protocol_by_id(self, mh.protocol_id);
00788 
00789   if (mh.nack == 1) {
00790     link_entry_t *link = retrieve_entry(proto, from->sin_addr.s_addr);
00791     msg_queue_t *msg = retrieve_msg(link, mh.seq_num);
00792     from->sin_port = msg->port;
00793     mesh_sendmsg(self->interface, self->mac_addr,
00794                    proto->queue_sd, msg->data, msg->len, from);
00795     goto recvr;
00796   }
00797 
00798   if ((entry = retrieve_entry(proto, from->sin_addr.s_addr)) == NULL) {
00799     syslog(LOG_ERR, "no route to %s: sequencing unhandled",
00800            inet_ntoa(from->sin_addr));
00801     return nbytes;
00802   }
00803 
00804   if (entry->recv_seq_num > MAX_SEQNUM)
00805     entry->recv_seq_num = 0;
00806   if ((diff = mh.seq_num - entry->recv_seq_num) > 1) {
00807     int i;
00808     for (i = 0; i < diff; i++) {
00809       struct mesh_hdr mhdr;
00810       mhdr.raw = 0;
00811       mhdr.nack = 1;
00812       mhdr.protocol_id = proto->unique_id;
00813       mhdr.seq_num = entry->recv_seq_num + i;
00814       pack_mesh_hdr(&mhdr);
00815       mesh_sendmsg(proto->sensor->interface, proto->sensor->mac_addr,
00816                    sd, &mhdr, offset, from);
00817     }
00818   }
00819   entry->recv_seq_num += diff;
00820   memcpy(buf, data + offset, len);
00821 
00822   return nbytes;
00823 }
00824 
00825 
00826 
00827 ssize_t mesh_send_nextdoor(self_t *self, int sd, void *buf, size_t len,
00828                            double angle, double dist)
00829 {
00830   neighbor_entry_t *nbr = neighbor_by_position(self, angle, dist);
00831 
00832   if (nbr == NULL) {
00833     errno = -EINVAL;
00834     return -1;
00835   }
00836   return sendto(sd, buf, len, 0,
00837                 (struct sockaddr*)&nbr->ip, sizeof(struct sockaddr_in));
00838 }
00839 
00840 
00841 ssize_t mesh_recv_nextdoor(self_t *self, int sd, void *buf, size_t len,
00842                            double *angle, double *dist)
00843 {
00844   neighbor_entry_t *nbr = NULL;
00845   ssize_t nbytes = 0;
00846   struct sockaddr_in from;
00847   socklen_t addrlen = sizeof(struct sockaddr_in);
00848 
00849   nbytes = recvfrom(sd, buf, len, 0, (struct sockaddr*)&from, &addrlen);
00850   if ((nbr = neighbor_by_ip(self, &from)) == NULL)
00851     return -1;
00852 
00853   *angle = nbr->bearing;
00854   *dist = nbr->distance;
00855   return nbytes;
00856 }
00857 
00858 
00859 /* thread */
00860 int reliable_net(self_t *self)
00861 {
00862   while (1) {
00863     u_int64_t min = UINT64_MAX;
00864     protocol_t *proto = self->protocols;
00865     while (proto != NULL) {
00866       link_entry_t *link = NULL;
00867       proto->iterator = NULL;
00868 
00869       while ((link = iterate_entries(proto)) != NULL) {
00870         msg_queue_t *msg;
00871       loop:
00872         msg = (msg_queue_t*) avl_find_min(&link->send_queue)->element;
00873 
00874         if (msg->timeout < get_current_time()) {
00875           remove_msg(link, msg->id, msg);
00876           free(msg);
00877           goto loop;
00878         } else if (msg->timeout < min)
00879           min = msg->timeout;
00880       }
00881       proto = proto->next;
00882     }
00883     sleep(min / 1000);
00884   }
00885   return -1;
00886 }


© 2007, Los Alamos National Security, LLC.