00001
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <stdio.h>
00037 #include <stdlib.h>
00038 #include <string.h>
00039 #include <errno.h>
00040 #include <mini_mpi.h>
00041 #include <mini_mpi-internal.h>
00042 #include "lwip.h"
00043
00044 extern int my_numprocs;
00045 extern int my_taskid;
00046 extern unsigned short my_init;
00047 extern struct sockaddr_in head_addr;
00048 extern socket_ptr_t my_socket;
00049 extern task_t *task_array;
00050
00051
00052
00053
00054 #define MY_NC ((struct netconn*)my_socket.nc)
00055
00056 static struct netconn *bcast_conn;
00057
00058
00059 int MPI_Init(int *argc, char ***argv)
00060 {
00061 int error = MPI_SUCCESS;
00062 int header, their_port = 0;
00063 struct netbuf *outbuf, *inbuf;
00064 struct ip_addr bcast_addr, *their_addr;
00065 char *table, msg_out[32], msg_in[MAX_MTU+1];
00066
00067 sprintf(msg_out, "%d", MINI_MPI_INIT);
00068 if ((outbuf = netbuf_new()) == NULL) {
00069 error = ENOMEM;
00070 goto end;
00071 }
00072 if ((bcast_conn = netconn_new(NETCONN_UDP)) == NULL) {
00073 error = ENOMEM;
00074 goto end_init;
00075 }
00076 IP4_ADDR(&bcast_addr, 255,255,255,255);
00077 if ((error = netconn_connect(bcast_conn, &bcast_addr,
00078 MPI_INIT_PORT)) < 0) {
00079 netconn_delete(bcast_conn);
00080 goto end_init;
00081 }
00082
00083 netbuf_ref(outbuf, msg_out, strlen(msg_out) + 1);
00084 if ((error = netconn_send(bcast_conn, outbuf)) < 0) {
00085 netconn_close(bcast_conn);
00086 netconn_delete(bcast_conn);
00087 goto end_init;
00088 }
00089
00090 await_reply:
00091 while ((inbuf = netconn_recv(bcast_conn)) != NULL) {
00092 int msg_len = sizeof(msg_in) - 1;
00093 int cur_len = 0;
00094 char indata[msg_len + 1];
00095
00096 do {
00097 char *fragdata;
00098 unsigned short len = 0;
00099 int left = msg_len - cur_len;
00100 netbuf_data(inbuf, (void**)&fragdata, &len);
00101 memcpy(msg_in + cur_len, fragdata,
00102 len > left ? left : len);
00103 cur_len += len;
00104 } while(netbuf_next(inbuf) >= 0);
00105 strncat(msg_in, indata, cur_len);
00106
00107 if (their_port == 0) {
00108 their_addr = netbuf_fromaddr(inbuf);
00109 memcpy(&head_addr.sin_addr, their_addr,
00110 sizeof(struct ip_addr));
00111 head_addr.sin_port = netbuf_fromport(inbuf);
00112 }
00113 free(inbuf);
00114 }
00115
00116 error = sscanf(msg_in, "%d::%d:%d::", &header, &my_numprocs,
00117 &my_taskid);
00118 if (error < 3 || header != MINI_MPI_HEAD) {
00119
00120 goto await_reply;
00121 }
00122
00124 if ((table = strstr(msg_in, "::::")) == NULL) {
00125 error = errno;
00126 netconn_close(bcast_conn);
00127 netconn_delete(bcast_conn);
00128 goto end_init;
00129 }
00130 table += 2;
00131 string_to_tasks(my_numprocs, table, task_array);
00132
00133 if ((my_socket.nc = netconn_new(NETCONN_TCP)) == NULL) {
00134 error = ENOMEM;
00135 netconn_close(bcast_conn);
00136 netconn_delete(bcast_conn);
00137 goto end_init;
00138 }
00139 if ((error = netconn_connect(MY_NC,
00140 (struct ip_addr*)&head_addr.sin_addr,
00141 MPI_PORT)) < 0) {
00142 netconn_delete(MY_NC);
00143 netconn_close(bcast_conn);
00144 netconn_delete(bcast_conn);
00145 goto end_init;
00146 }
00147
00148 #ifdef VIRT_TIME
00149
00150 #endif
00151 my_init = 1;
00152
00153 end_init:
00154 netbuf_delete(outbuf);
00155 end:
00156 return error;
00157 }
00158
00159
00160 int MPI_Comm_socket(MPI_Comm comm, int *sock)
00161 {
00162 if (comm != MPI_COMM_WORLD || my_init == 0)
00163 return EINVAL;
00164 *sock = (int)my_socket.nc;
00165 return MPI_SUCCESS;
00166 }
00167
00168
00169 int MPI_Abort(MPI_Comm comm, int errorcode)
00170 {
00171 int error;
00172 char msg[32];
00173 sprintf(msg, "%d", MINI_MPI_ABORT);
00174
00175 if (comm != MPI_COMM_WORLD || my_init == 0)
00176 return EINVAL;
00177
00178 if ((error = netconn_write(MY_NC, msg, strlen(msg),
00179 NETCONN_NOCOPY)) < 0)
00180 return error;
00181 return MPI_SUCCESS;
00182 }
00183
00184
00185 int MPI_Finalize(void)
00186 {
00187 int error;
00188 char msg[32];
00189 sprintf(msg, "%d", MINI_MPI_FINAL);
00190
00191 if (my_init == 0)
00192 return EINVAL;
00193
00194 if ((error = netconn_write(MY_NC, msg, strlen(msg),
00195 NETCONN_NOCOPY)) < 0)
00196 return error;
00197
00198 my_init = 0;
00199 free(task_array);
00200
00201 return MPI_SUCCESS;
00202 }
00203
00204
00205
00206 int MPI_Send(void *buf, int count, MPI_Datatype type, int dest,
00207 int tag, MPI_Comm comm)
00208 {
00209 int error = 0;
00210 struct netconn *sock = (struct netconn*)task_array[dest].sock.nc;
00211 int msg_len = (type_size(type) * count) + 1;
00212 char msg[msg_len];
00213
00214 if (comm != MPI_COMM_WORLD || my_init == 0 || sock == NULL)
00215 return EINVAL;
00216
00217 buffer_to_char(msg, msg_len, buf, count, type);
00218 #ifdef VIRT_TIME
00219 if ((error = vt_send(dest, sock, msg, strlen(msg))) < 0)
00220 return error;
00221 #else
00222 if ((error = netconn_write(sock, msg, strlen(msg),
00223 NETCONN_COPY)) < 0)
00224 return error;
00225 #endif
00226 #ifdef MINI_MPI_TRACK
00227 {
00228 char tracker[msg_len + 33];
00229
00230 sprintf(tracker, "%d:%s", MINI_MPI_SEND, msg);
00231 if ((error = netconn_write(MY_NC, tracker, strlen(tracker) + 1,
00232 NETCONN_COPY)) < 0)
00233 return error;
00234 }
00235 #endif
00236 return MPI_SUCCESS;
00237 }
00238
00239 int MPI_Recv(void *buf, int count, MPI_Datatype type, int source,
00240 int tag, MPI_Comm comm, MPI_Status *status)
00241 {
00242 char *msg;
00243 int msg_len = 0;
00244 struct netconn *sock;
00245 struct netbuf *inbuf;
00246
00247 if (source == -1)
00248 sock = bcast_conn;
00249 else
00250 sock = (struct netconn*)task_array[source].sock.nc;
00251
00252 if (comm != MPI_COMM_WORLD || my_init == 0 || sock == NULL)
00253 return EINVAL;
00254
00255 #ifdef VIRT_TIME
00256 msg = vt_recv(sock, MPI_MSG_LEN);
00257 char_to_buffer(buf, count, type, msg);
00258 #else
00259 msg = malloc(MPI_MSG_LEN + 1);
00260 inbuf = netconn_recv(sock);
00261 if (sock->err != ERR_OK)
00262 return sock->err;
00263
00264 do {
00265 char *data;
00266 unsigned short data_len;
00267
00268 netbuf_data(inbuf, (void**)&data, &data_len);
00269 data[data_len] = '\0';
00270
00271 strncat(msg, data, MPI_MSG_LEN - msg_len);
00272 msg_len += data_len;
00273 } while (netbuf_next(inbuf) >= 0);
00274 char_to_buffer(buf, count, type, msg);
00275 #endif
00276 #ifdef MINI_MPI_TRACK
00277 {
00278 int error = 0;
00279 char tracker[msg_len + 33];
00280
00281 sprintf(tracker, "%d:%s", MINI_MPI_RECV, msg);
00282 if ((error = netconn_write(MY_NC, tracker, strlen(tracker) + 1,
00283 NETCONN_COPY)) < 0)
00284 return error;
00285 }
00286 #endif
00287 #ifndef VIRT_TIME
00288 free(msg);
00289 #endif
00290 return MPI_SUCCESS;
00291 }
00292
00293
00294
00295 int MPI_Isend(void *buf, int count, MPI_Datatype type, int dest,
00296 int tag, MPI_Comm comm, MPI_Request *request)
00297 {
00298 int error = 0;
00299 struct netconn *sock = (struct netconn*)task_array[dest].sock.nc;
00300 int msg_len = (type_size(type) * count) + 1;
00301 char msg[msg_len];
00302
00303 if (comm != MPI_COMM_WORLD || my_init == 0 || sock == NULL)
00304 return EINVAL;
00305
00307
00308 buffer_to_char(msg, msg_len, buf, count, type);
00309 #ifdef VIRT_TIME
00310 if ((error = vt_send(sock, msg, strlen(msg))) < 0)
00311 return error;
00312 #else
00313 if ((error = netconn_write(sock, msg, strlen(msg),
00314 NETCONN_COPY)) < 0)
00315 return error;
00316 #endif
00317 #ifdef MINI_MPI_TRACK
00318 {
00319 char tracker[msg_len + 33];
00320
00321 sprintf(tracker, "%d:%s", MINI_MPI_SEND, msg);
00322 if ((error = netconn_write(MY_NC, tracker, strlen(tracker) + 1,
00323 NETCONN_COPY)) < 0)
00324 return error;
00325 }
00326 #endif
00327 return MPI_SUCCESS;
00328 }
00329
00330 int MPI_Irecv(void *buf, int count, MPI_Datatype type, int source,
00331 int tag, MPI_Comm comm, MPI_Request *request)
00332 {
00333 char *msg;
00334 int msg_len = 0;
00335 struct netconn *sock;
00336 struct netbuf *inbuf;
00337
00338 if (source == -1)
00339 sock = bcast_conn;
00340 else
00341 sock = (struct netconn*)task_array[source].sock.nc;
00342
00343 if (comm != MPI_COMM_WORLD || my_init == 0 || sock == NULL)
00344 return EINVAL;
00345
00347 #ifdef VIRT_TIME
00348 msg = vt_recv(sock, MPI_MSG_LEN);
00349 char_to_buffer(buf, count, type, msg);
00350 #else
00351 msg = malloc(MPI_MSG_LEN + 1);
00352 inbuf = netconn_recv(sock);
00353 if (sock->err != ERR_OK)
00354 return sock->err;
00355
00356 do {
00357 char *data;
00358 unsigned short data_len;
00359
00360 netbuf_data(inbuf, (void**)&data, &data_len);
00361 data[data_len] = '\0';
00362
00363 strncat(msg, data, MPI_MSG_LEN - msg_len);
00364 msg_len += data_len;
00365 } while (netbuf_next(inbuf) >= 0);
00366 char_to_buffer(buf, count, type, msg);
00367 #endif
00368
00369 #ifdef MINI_MPI_TRACK
00370 {
00371 int error = 0;
00372 char tracker[msg_len + 33];
00373
00374 sprintf(tracker, "%d:%s", MINI_MPI_RECV, msg);
00375 if ((error = netconn_write(MY_NC, tracker, strlen(tracker) + 1,
00376 NETCONN_COPY)) < 0)
00377 return error;
00378 }
00379 #endif
00380 #ifndef VIRT_TIME
00381 free(msg);
00382 #endif
00383 return MPI_SUCCESS;
00384 }
00385
00386 int MPI_Wait(MPI_Request *request, MPI_Status *status)
00387 {
00389
00390 return MPI_SUCCESS;
00391 }
00392
00393 int MPI_Waitall(int count, MPI_Request *request[], MPI_Status *status[])
00394 {
00395
00396 return MPI_SUCCESS;
00397 }
00398
00399
00400
00401 int MPI_Barrier(MPI_Comm comm)
00402 {
00403 struct netbuf *resume;
00404 int error;
00405 unsigned short msg_in_len = 0;
00406 char msg_out[32], *msg_in;
00407 sprintf(msg_out, "%d", MINI_MPI_BARRIER);
00408
00409 if (comm != MPI_COMM_WORLD || my_init == 0 || MY_NC == NULL)
00410 return EINVAL;
00411
00412 if ((error = netconn_write(MY_NC, msg_out, strlen(msg_out) + 1,
00413 NETCONN_NOCOPY)) < 0)
00414 return error;
00415
00416 await_resume:
00417 if ((resume = netconn_recv(MY_NC)) == NULL)
00418 return ENOTCONN;
00419 netbuf_data(resume, (void**)&msg_in, &msg_in_len);
00420
00421 if (atoi(msg_in) != MINI_MPI_RESUME) {
00422
00423 goto await_resume;
00424 }
00425 return MPI_SUCCESS;
00426 }
00427
00428 int MPI_Bcast(void *buf, int count, MPI_Datatype type,
00429 int root, MPI_Comm comm)
00430 {
00431 int error = 0;
00432 int msg_len = (type_size(type) * count) + 1;
00433 char msg[msg_len];
00434
00435 if (comm != MPI_COMM_WORLD || my_init == 0 || bcast_conn == NULL)
00436 return EINVAL;
00437
00438 buffer_to_char(msg, msg_len, buf, count, type);
00439 #ifdef VIRT_TIME
00440 if ((error = vt_send(bcast_conn, msg, strlen(msg))) < 0)
00441 return error;
00442 #else
00443 if ((error = netconn_write(bcast_conn, msg, strlen(msg),
00444 NETCONN_COPY)) < 0)
00445 return error;
00446 #endif
00447 return MPI_SUCCESS;
00448 }
00449
00450
00451 int MPI_Scatter(void *sendbuffer, int sendcount, MPI_Datatype sendtype,
00452 void *recvbuffer, int recvcount, MPI_Datatype recvtype,
00453 int root, MPI_Comm comm)
00454 {
00455 if (comm != MPI_COMM_WORLD || my_init == 0)
00456 return EINVAL;
00457
00459 #ifdef VIRT_TIME
00460
00461
00462 #else
00463
00464
00465 #endif
00466 #ifdef MINI_MPI_TRACK
00467 {
00468 int error = 0;
00469 char tracker[msg_len + 33];
00470
00471 sprintf(tracker, "%d:%s", MINI_MPI_SCATTER, msg);
00472 if ((error = netconn_write(MY_NC, tracker, strlen(tracker) + 1,
00473 NETCONN_COPY)) < 0)
00474 return error;
00475 }
00476 #endif
00477 return MPI_SUCCESS;
00478 }
00479
00480 int MPI_Gather(void *sendbuffer, int sendcount, MPI_Datatype sendtype,
00481 void *recvbuffer, int recvcount, MPI_Datatype recvtype,
00482 int root, MPI_Comm comm)
00483 {
00484 if (comm != MPI_COMM_WORLD || my_init == 0)
00485 return EINVAL;
00486
00488 #ifdef VIRT_TIME
00489
00490 #else
00491
00492 #endif
00493 #ifdef MINI_MPI_TRACK
00494 {
00495 int error = 0;
00496 char tracker[msg_len + 33];
00497
00498 sprintf(tracker, "%d:%s", MINI_MPI_SEND, msg);
00499 if ((error = netconn_write(MY_NC, tracker, strlen(tracker) + 1,
00500 NETCONN_COPY)) < 0)
00501 return error;
00502 }
00503 #endif
00504 return MPI_SUCCESS;
00505 }