N-sim
Emulation and simulation of
Wireless Sensor Networks



   Home


   Project Page


   Download


   CVS



   Installation


   Configuration


   Plug-ins




 Hosted by
SourceForge.net Logo

/home/brennan/n-sim/OrbisQuartus/server/l4/mini_mpi-l4.c

Go to the documentation of this file.
00001 
00014 /*
00015  * Copyright 2007. Los Alamos National Security, LLC. This material
00016  * was produced under U.S. Government contract DE-AC52-06NA25396 for
00017  * Los Alamos National Laboratory (LANL), which is operated by Los
00018  * Alamos National Security, LLC, for the Department of Energy. The
00019  * U.S. Government has rights to use, reproduce, and distribute this
00020  * software. NEITHER THE GOVERNMENT NOR LOS ALAMOS NATIONAL SECURITY,
00021  * LLC, MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES ANY LEGAL
00022  * LIABILITY FOR THE USE OF THIS SOFTWARE. If software is modified to
00023  * produce derivative works, such modified software should be clearly
00024  * marked, so as not to confuse it with the version available from LANL.
00025  *
00026  * Additionally, this program is free software; you can redistribute
00027  * it and/or modify it under the terms of the GNU General Public
00028  * License as published by the Free Software Foundation; version 2 of
00029  * the License. Accordingly, this program is distributed in the hope
00030  * it will be useful, but WITHOUT ANY WARRANTY; without even the
00031  * implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
00032  * PURPOSE. See the GNU General Public License for more details.
00033  */
00034 
00035 
00036 #include <stdio.h>
00037 #include <stdlib.h>
00038 #include <string.h>
00039 #include <errno.h>
00040 #include <mini_mpi.h>
00041 #include <mini_mpi-internal.h>
00042 #include "lwip.h"
00043 
00044 extern int my_numprocs;
00045 extern int my_taskid;
00046 extern unsigned short my_init;
00047 extern struct sockaddr_in head_addr;
00048 extern socket_ptr_t my_socket;
00049 extern task_t *task_array;
00050 
00051 //#define VIRT_TIME
00052 //#define MINI_MPI_TRACK
00053 
00054 #define MY_NC ((struct netconn*)my_socket.nc)
00055 
00056 static struct netconn *bcast_conn;
00057 
00058 
00059 int MPI_Init(int *argc, char ***argv)
00060 {
00061         int error = MPI_SUCCESS;
00062         int header, their_port = 0;
00063         struct netbuf *outbuf, *inbuf;
00064         struct ip_addr bcast_addr, *their_addr;
00065         char *table, msg_out[32], msg_in[MAX_MTU+1];
00066 
00067         sprintf(msg_out, "%d", MINI_MPI_INIT);
00068         if ((outbuf = netbuf_new()) == NULL) {
00069                 error = ENOMEM;
00070                 goto end;
00071         }
00072         if ((bcast_conn = netconn_new(NETCONN_UDP)) == NULL) {
00073                 error = ENOMEM;
00074                 goto end_init;
00075         }
00076         IP4_ADDR(&bcast_addr, 255,255,255,255);
00077         if ((error = netconn_connect(bcast_conn, &bcast_addr,
00078                                      MPI_INIT_PORT)) < 0) {
00079                 netconn_delete(bcast_conn);
00080                 goto end_init;
00081         }
00082 
00083         netbuf_ref(outbuf, msg_out, strlen(msg_out) + 1);
00084         if ((error = netconn_send(bcast_conn, outbuf)) < 0) {
00085                 netconn_close(bcast_conn);
00086                 netconn_delete(bcast_conn);
00087                 goto end_init;
00088         }
00089 
00090  await_reply:
00091         while ((inbuf = netconn_recv(bcast_conn)) != NULL) {
00092                 int msg_len = sizeof(msg_in) - 1;
00093                 int cur_len = 0;
00094                 char indata[msg_len + 1];
00095 
00096                 do {
00097                         char *fragdata;
00098                         unsigned short len = 0;
00099                         int left = msg_len - cur_len;
00100                         netbuf_data(inbuf, (void**)&fragdata, &len);
00101                         memcpy(msg_in + cur_len, fragdata,
00102                                len > left ? left : len);
00103                         cur_len += len;
00104                 } while(netbuf_next(inbuf) >= 0);
00105                 strncat(msg_in, indata, cur_len);
00106 
00107                 if (their_port == 0) {
00108                         their_addr = netbuf_fromaddr(inbuf);
00109                         memcpy(&head_addr.sin_addr, their_addr,
00110                                sizeof(struct ip_addr));
00111                         head_addr.sin_port = netbuf_fromport(inbuf);
00112                 }
00113                 free(inbuf);
00114         }
00115 
00116         error = sscanf(msg_in, "%d::%d:%d::", &header, &my_numprocs, 
00117                        &my_taskid);
00118         if (error < 3 || header != MINI_MPI_HEAD) {
00119                 // log error
00120                 goto await_reply;
00121         }
00122 
00124         if ((table = strstr(msg_in, "::::")) == NULL) {
00125                 error = errno;
00126                 netconn_close(bcast_conn);
00127                 netconn_delete(bcast_conn);
00128                 goto end_init;
00129         }
00130         table += 2;
00131         string_to_tasks(my_numprocs, table, task_array);
00132 
00133         if ((my_socket.nc = netconn_new(NETCONN_TCP)) == NULL) {
00134                 error = ENOMEM;
00135                 netconn_close(bcast_conn);
00136                 netconn_delete(bcast_conn);
00137                 goto end_init;
00138         }
00139         if ((error = netconn_connect(MY_NC,
00140                                      (struct ip_addr*)&head_addr.sin_addr,
00141                                      MPI_PORT)) < 0) {
00142                 netconn_delete(MY_NC);
00143                 netconn_close(bcast_conn);
00144                 netconn_delete(bcast_conn);
00145                 goto end_init;
00146         }
00147 
00148 #ifdef VIRT_TIME
00149         // initialize virtual_time
00150 #endif
00151         my_init = 1;
00152 
00153  end_init:
00154         netbuf_delete(outbuf);
00155  end:
00156         return error;
00157 }
00158 
00159 
00160 int MPI_Comm_socket(MPI_Comm comm, int *sock)
00161 {
00162         if (comm != MPI_COMM_WORLD || my_init == 0)
00163                 return EINVAL;
00164         *sock = (int)my_socket.nc;
00165         return MPI_SUCCESS;
00166 }
00167 
00168 
00169 int MPI_Abort(MPI_Comm comm, int errorcode)
00170 {
00171         int error;
00172         char msg[32];
00173         sprintf(msg, "%d", MINI_MPI_ABORT);
00174 
00175         if (comm != MPI_COMM_WORLD || my_init == 0)
00176                 return EINVAL;
00177 
00178         if ((error = netconn_write(MY_NC, msg, strlen(msg),
00179                                    NETCONN_NOCOPY)) < 0)
00180                 return error;
00181         return MPI_SUCCESS;
00182 }
00183 
00184 
00185 int MPI_Finalize(void)
00186 {
00187         int error;
00188         char msg[32];
00189         sprintf(msg, "%d", MINI_MPI_FINAL);
00190 
00191         if (my_init == 0)
00192                 return EINVAL;
00193 
00194         if ((error = netconn_write(MY_NC, msg, strlen(msg),
00195                                    NETCONN_NOCOPY)) < 0)
00196                 return error;
00197 
00198         my_init = 0;
00199         free(task_array);
00200 
00201         return MPI_SUCCESS;
00202 }
00203 
00204 /******************************/
00205 
00206 int MPI_Send(void *buf, int count, MPI_Datatype type, int dest,
00207              int tag, MPI_Comm comm)
00208 {
00209         int error = 0;
00210         struct netconn *sock = (struct netconn*)task_array[dest].sock.nc;
00211         int msg_len = (type_size(type) * count) + 1;
00212         char msg[msg_len];
00213 
00214         if (comm != MPI_COMM_WORLD || my_init == 0 || sock == NULL)
00215                 return EINVAL;
00216 
00217         buffer_to_char(msg, msg_len, buf, count, type);
00218 #ifdef VIRT_TIME
00219         if ((error = vt_send(dest, sock, msg, strlen(msg))) < 0)
00220                 return error;
00221 #else
00222         if ((error = netconn_write(sock, msg, strlen(msg),
00223                                    NETCONN_COPY)) < 0)
00224                 return error;
00225 #endif
00226 #ifdef MINI_MPI_TRACK
00227         {
00228                 char tracker[msg_len + 33];
00229 
00230                 sprintf(tracker, "%d:%s", MINI_MPI_SEND, msg);
00231                 if ((error = netconn_write(MY_NC, tracker, strlen(tracker) + 1,
00232                                            NETCONN_COPY)) < 0)
00233                         return error;
00234         }
00235 #endif
00236         return MPI_SUCCESS;
00237 }
00238 
00239 int MPI_Recv(void *buf, int count, MPI_Datatype type, int source,
00240              int tag, MPI_Comm comm, MPI_Status *status)
00241 {
00242         char *msg;
00243         int msg_len = 0;
00244         struct netconn *sock;
00245         struct netbuf *inbuf;
00246 
00247         if (source == -1)  
00248                 sock = bcast_conn;
00249         else
00250                 sock = (struct netconn*)task_array[source].sock.nc;
00251 
00252         if (comm != MPI_COMM_WORLD || my_init == 0 || sock == NULL)
00253                 return EINVAL;
00254 
00255 #ifdef VIRT_TIME
00256         msg = vt_recv(sock, MPI_MSG_LEN);
00257         char_to_buffer(buf, count, type, msg);
00258 #else
00259         msg = malloc(MPI_MSG_LEN + 1);
00260         inbuf = netconn_recv(sock);
00261         if (sock->err != ERR_OK)
00262                 return sock->err;
00263 
00264         do {
00265                 char *data;
00266                 unsigned short data_len;
00267 
00268                 netbuf_data(inbuf, (void**)&data, &data_len);
00269                 data[data_len] = '\0';
00270 
00271                 strncat(msg, data, MPI_MSG_LEN - msg_len);
00272                 msg_len += data_len;
00273         } while (netbuf_next(inbuf) >= 0);
00274         char_to_buffer(buf, count, type, msg);
00275 #endif
00276 #ifdef MINI_MPI_TRACK
00277         {
00278                 int error = 0;
00279                 char tracker[msg_len + 33];
00280 
00281                 sprintf(tracker, "%d:%s", MINI_MPI_RECV, msg);
00282                 if ((error = netconn_write(MY_NC, tracker, strlen(tracker) + 1,
00283                                            NETCONN_COPY)) < 0)
00284                         return error;
00285         }
00286 #endif
00287 #ifndef VIRT_TIME
00288         free(msg);
00289 #endif
00290         return MPI_SUCCESS;
00291 }
00292 
00293 /******************************/
00294 
00295 int MPI_Isend(void *buf, int count, MPI_Datatype type, int dest,
00296              int tag, MPI_Comm comm, MPI_Request *request)
00297 {
00298         int error = 0;
00299         struct netconn *sock = (struct netconn*)task_array[dest].sock.nc;
00300         int msg_len = (type_size(type) * count) + 1;
00301         char msg[msg_len];
00302 
00303         if (comm != MPI_COMM_WORLD || my_init == 0 || sock == NULL)
00304                 return EINVAL;
00305 
00307 
00308         buffer_to_char(msg, msg_len, buf, count, type);
00309 #ifdef VIRT_TIME
00310         if ((error = vt_send(sock, msg, strlen(msg))) < 0)
00311                 return error;
00312 #else
00313         if ((error = netconn_write(sock, msg, strlen(msg),
00314                                    NETCONN_COPY)) < 0)
00315                 return error;
00316 #endif
00317 #ifdef MINI_MPI_TRACK
00318         {
00319                 char tracker[msg_len + 33];
00320 
00321                 sprintf(tracker, "%d:%s", MINI_MPI_SEND, msg);
00322                 if ((error = netconn_write(MY_NC, tracker, strlen(tracker) + 1,
00323                                            NETCONN_COPY)) < 0)
00324                         return error;
00325         }
00326 #endif
00327         return MPI_SUCCESS;
00328 }
00329 
00330 int MPI_Irecv(void *buf, int count, MPI_Datatype type, int source,
00331              int tag, MPI_Comm comm, MPI_Request *request)
00332 {
00333         char *msg;
00334         int msg_len = 0;
00335         struct netconn *sock;
00336         struct netbuf *inbuf;
00337         
00338         if (source == -1)  
00339                 sock = bcast_conn;
00340         else
00341                 sock = (struct netconn*)task_array[source].sock.nc;
00342 
00343         if (comm != MPI_COMM_WORLD || my_init == 0 || sock == NULL)
00344                 return EINVAL;
00345 
00347 #ifdef VIRT_TIME
00348         msg = vt_recv(sock, MPI_MSG_LEN);
00349         char_to_buffer(buf, count, type, msg);
00350 #else
00351         msg = malloc(MPI_MSG_LEN + 1);
00352         inbuf = netconn_recv(sock);
00353         if (sock->err != ERR_OK)
00354                 return sock->err;
00355 
00356         do {
00357                 char *data;
00358                 unsigned short data_len;
00359 
00360                 netbuf_data(inbuf, (void**)&data, &data_len);
00361                 data[data_len] = '\0';
00362 
00363                 strncat(msg, data, MPI_MSG_LEN - msg_len);
00364                 msg_len += data_len;
00365         } while (netbuf_next(inbuf) >= 0);
00366         char_to_buffer(buf, count, type, msg);
00367 #endif
00368 
00369 #ifdef MINI_MPI_TRACK
00370         {
00371                 int error = 0;
00372                 char tracker[msg_len + 33];
00373 
00374                 sprintf(tracker, "%d:%s", MINI_MPI_RECV, msg);
00375                 if ((error = netconn_write(MY_NC, tracker, strlen(tracker) + 1,
00376                                            NETCONN_COPY)) < 0)
00377                         return error;
00378         }
00379 #endif
00380 #ifndef VIRT_TIME
00381         free(msg);
00382 #endif
00383         return MPI_SUCCESS;
00384 }
00385 
00386 int MPI_Wait(MPI_Request *request, MPI_Status *status)
00387 {
00389         // select?
00390         return MPI_SUCCESS;
00391 }
00392 
00393 int MPI_Waitall(int count, MPI_Request *request[], MPI_Status *status[])
00394 {
00395 
00396         return MPI_SUCCESS;
00397 }
00398 
00399 /******************************/
00400 
00401 int MPI_Barrier(MPI_Comm comm)
00402 {
00403         struct netbuf *resume;
00404         int error;
00405         unsigned short msg_in_len = 0;
00406         char msg_out[32], *msg_in;
00407         sprintf(msg_out, "%d", MINI_MPI_BARRIER);
00408 
00409         if (comm != MPI_COMM_WORLD || my_init == 0 || MY_NC == NULL)
00410                 return EINVAL;
00411 
00412         if ((error = netconn_write(MY_NC, msg_out, strlen(msg_out) + 1,
00413                                    NETCONN_NOCOPY)) < 0)
00414                 return error;
00415 
00416  await_resume:
00417         if ((resume = netconn_recv(MY_NC)) == NULL)
00418                 return ENOTCONN;
00419         netbuf_data(resume, (void**)&msg_in, &msg_in_len);
00420 
00421         if (atoi(msg_in) != MINI_MPI_RESUME) {
00422                 // log error
00423                 goto await_resume;
00424         }
00425         return MPI_SUCCESS;
00426 }
00427 
00428 int MPI_Bcast(void *buf, int count, MPI_Datatype type,
00429               int root, MPI_Comm comm)
00430 {
00431         int error = 0;
00432         int msg_len = (type_size(type) * count) + 1;
00433         char msg[msg_len];
00434 
00435         if (comm != MPI_COMM_WORLD || my_init == 0 || bcast_conn == NULL)
00436                 return EINVAL;
00437 
00438         buffer_to_char(msg, msg_len, buf, count, type);
00439 #ifdef VIRT_TIME
00440         if ((error = vt_send(bcast_conn, msg, strlen(msg))) < 0)
00441                 return error;
00442 #else
00443         if ((error = netconn_write(bcast_conn, msg, strlen(msg),
00444                                    NETCONN_COPY)) < 0)
00445                 return error;
00446 #endif
00447         return MPI_SUCCESS;
00448 }
00449 
00450 
00451 int MPI_Scatter(void *sendbuffer, int sendcount, MPI_Datatype sendtype,
00452                 void *recvbuffer, int recvcount, MPI_Datatype recvtype,
00453                 int root, MPI_Comm comm)
00454 {
00455         if (comm != MPI_COMM_WORLD || my_init == 0)
00456                 return EINVAL;
00457 
00459 #ifdef VIRT_TIME
00460 //      if ((error = vt_send(MY_NC, msg, strlen(msg))) < 0)
00461 //              return error;
00462 #else
00463 //      if ((error = netconn_write(MY_NC, msg, strlen(msg), NETCONN_COPY)) < 0)
00464 //              return error;
00465 #endif
00466 #ifdef MINI_MPI_TRACK
00467         {
00468                 int error = 0;
00469                 char tracker[msg_len + 33];
00470 
00471                 sprintf(tracker, "%d:%s", MINI_MPI_SCATTER, msg);
00472                 if ((error = netconn_write(MY_NC, tracker, strlen(tracker) + 1,
00473                                            NETCONN_COPY)) < 0)
00474                         return error;
00475         }
00476 #endif
00477         return MPI_SUCCESS;
00478 }
00479 
00480 int MPI_Gather(void *sendbuffer, int sendcount, MPI_Datatype sendtype,
00481                 void *recvbuffer, int recvcount, MPI_Datatype recvtype,
00482                 int root, MPI_Comm comm)
00483 {
00484         if (comm != MPI_COMM_WORLD || my_init == 0)
00485                 return EINVAL;
00486 
00488 #ifdef VIRT_TIME
00489         // recv ??
00490 #else
00491         
00492 #endif
00493 #ifdef MINI_MPI_TRACK
00494         {
00495                 int error = 0;
00496                 char tracker[msg_len + 33];
00497 
00498                 sprintf(tracker, "%d:%s", MINI_MPI_SEND, msg);
00499                 if ((error = netconn_write(MY_NC, tracker, strlen(tracker) + 1,
00500                                            NETCONN_COPY)) < 0)
00501                         return error;
00502         }
00503 #endif
00504         return MPI_SUCCESS;
00505 }


© 2007, Los Alamos National Security, LLC.