00001
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 #include <stdio.h>
00037 #include <stdlib.h>
00038 #include <string.h>
00039 #include <avl.h>
00040 #include <daemon.h>
00041 #include <mini_mpi.h>
00042 #include <orbisquartus.h>
00043 #include <sensor_node.h>
00044 #include <mutex.h>
00045 #include "virtual_time.h"
00046
00047 #define BIT_RATE 400 // FIXME: make this Sensor_node specific
00048
00049 OrbisQuartus *oq;
00050 mutex_t vt_mutex;
00051 mutex_t vt_sync;
00052
00053 AVL_tree<VT_timestamp> send_queue;
00054 AVL_tree<VT_timestamp> rcv_queue;
00055 AVL_tree<VT_timestamp> state_queue;
00056
00057 extern int my_taskid;
00058
00059 char *VT_timestamp::_format = "%lu,%lu";
00060 VT_timestamp VT_timestamp::_local_virtual_time = VT_timestamp(0L, 0L);
00061 VT_timestamp VT_timestamp::_global_virtual_time = VT_timestamp(0L, 0L);
00062
00063
00064 VT_message::VT_message(unsigned int to_whom, char *buf) :
00065 Comparable<VT_timestamp>(VT_timestamp())
00066 {
00067 msg_queue = OUT;
00068 sender = my_taskid;
00069 _svt = key = VT_timestamp();
00070 rcvr = to_whom;
00071 _rvt = VT_timestamp() + static_cast<int>(strlen(buf) * BIT_RATE);
00072 msg_sign = POSITIVE;
00073 content = buf;
00074 }
00075
00076
00077 VT_message::VT_message(char *string) :
00078 Comparable<VT_timestamp>(VT_timestamp())
00079 {
00080 int sign;
00081 char st[VT_timestamp::_strlen + 1];
00082 char rt[VT_timestamp::_strlen + 1];
00083 sscanf(string, "%s::%u::%s::%u::%d::%s",
00084 st, &sender, rt, &rcvr, &sign, content);
00085
00086 _svt = VT_timestamp(st);
00087 _rvt = key = VT_timestamp(rt);
00088 msg_queue = IN;
00089 if (sign == POSITIVE)
00090 msg_sign = POSITIVE;
00091 else
00092 msg_sign = NEGATIVE;
00093 }
00094
00095
00096
00097 int checkpoint(unsigned long node_id)
00098 {
00099 int error = 0;
00100
00101 Sensor_node *node = dynamic_cast<Sensor_node*>
00102 (oq->cfg.get_nodes()->find(node_id)->elt());
00103
00104 if ((error = node->pause()) < 0)
00105 return error;
00106 if ((error = node->save()) < 0)
00107 return error;
00108
00111
00112 error = node->unpause();
00113
00114 return error;
00115 }
00116
00117
00118 int chkpt_all(void)
00119 {
00120 int i, count, error = 0;
00121 unsigned long *assign = 0;
00122 oq->get_assignment(assign, &count);
00123
00124 if (assign == 0)
00125 return -1;
00126
00127 for (i = 0; i < count; i ++) {
00128 if ((error = checkpoint(assign[i])) < 0)
00129 break;
00130 }
00131 return error;
00132 }
00133
00134
00135 int restore(unsigned long node_id)
00136 {
00137 int error = 0;
00138
00139
00140
00141
00143
00144 return error;
00145 }
00146
00147
00148 void virtual_time_iterate(void)
00149 {
00150 VT_timestamp::tick();
00151 }