00001
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <l4/thread/thread.h>
00037 #include <virtual_time.h>
00038 #include <virtual_time-internal.h>
00039 #include <mutex.h>
00040 #include <avl.h>
00041 #include <orbisquartus.h>
00042 #include <mini_mpi.h>
00043 #include "lwip.h"
00044
00045
00046 extern OrbisQuartus *oq;
00047 extern mutex_t vt_mutex;
00048 extern mutex_t vt_sync;
00049
00050 extern AVL_tree<VT_timestamp> send_queue;
00051 extern AVL_tree<VT_timestamp> rcv_queue;
00052 extern AVL_tree<VT_timestamp> state_queue;
00053
00054 char *vt_recv(struct netconn *sock, int len)
00055 {
00056 struct netbuf *inbuf;
00057 int msg_len = len + VT_message::string_len();
00058 char msg[msg_len + 1];
00059 VT_message *vm;
00060
00061 inbuf = netconn_recv(sock);
00062 do {
00063 char *data;
00064 unsigned short data_len;
00065
00066 netbuf_data(inbuf, (void**)&data, &data_len);
00067 data[data_len] = '\0';
00068
00069 strncat(msg, data, MPI_MSG_LEN - msg_len);
00070 msg_len += data_len;
00071 } while (netbuf_next(inbuf) >= 0);
00072
00073 vm = new VT_message(msg);
00074 if (vm->rvt() >= global_virtual_time()) {
00075 if (vm->rvt() < local_virtual_time()) {
00076 AVL_node<VT_timestamp> *n;
00077 for (n = send_queue.find(vm->compare());
00078 n != NULL; n = n->get_next()) {
00079 VT_message *m =
00080 dynamic_cast<VT_message *>(n->elt());
00081 if (m != NULL) {
00082 vt_send(m->sender, sock, m->to_string(),
00083 strlen(m->to_string()));
00084 send_queue.delete_elt(m);
00085 }
00086 }
00087
00088 return NULL;
00089 } else {
00090 VT_message *other;
00091 if ((other = dynamic_cast<VT_message*>
00092 (rcv_queue.retrieve(vm->compare()))) != NULL) {
00093 if (other->msg_sign != vm->msg_sign)
00094 rcv_queue.delete_elt(vm);
00095 } else
00096 rcv_queue.insert_elt(vm);
00097 }
00098 }
00099 return vm->content;
00100 }
00101
00102
00103 int vt_send(int dest, struct netconn *sock, char *buf, int len)
00104 {
00105 int error = 0;
00106 VT_message *vm = new VT_message(dest, buf);
00107
00108 if ((error = netconn_write(sock, vm->to_string(),
00109 strlen(vm->to_string()), NETCONN_COPY)) < 0)
00110 return error;
00111
00112 vm->msg_sign = NEGATIVE;
00113 send_queue.insert_elt(vm);
00114 return error;
00115 }
00116
00117
00118
00119 static void virtual_time_client(void *unused)
00120 {
00121 while (!daemon_shuttingdown()) {
00122 while (!daemon_quitting()) {
00124 }
00125
00126 }
00127 }
00128
00129
00130 void virtual_time_init(OrbisQuartus *orbis)
00131 {
00132 oq = orbis;
00133
00134 if ((oq->vt_thread = l4thread_create(virtual_time_client, NULL,
00135 L4THREAD_CREATE_ASYNC)) < 0) {
00136 fprintf(stderr, "ERROR-- virtual time thread: %d\n",
00137 (int)oq->vt_thread);
00138 exit(-1);
00139 }
00140 }