00001 #include "ab.h"
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00024
00025 #if DEBUG
00026 #define DEBUG_AB(debug_instr) debug_instr
00027 #else
00028 #define DEBUG_AB(debug_instr)
00029 #endif
00030
00031 #define TRUE 1
00032 #define FALSE 0
00033
00034 #define INCOMING 0
00035 #define OUTGOING 1
00036
00037 #define ERR_AB_WRITE 10
00038 #define MSG_HEAD(msg) msg[0]
00039 #define MSG_SOURCE(msg) msg[1]
00040 #define MSG_DEST(msg) msg[2]
00041
00043 sem_t sem_ab_write,sem_ab_portHandler,sem_todo_i,sem_todo_o;
00044
00046 site_s * s_head;
00047
00049 msg_s * m_head, * m_tail;
00050
00052 unsigned char _id;
00053
00056 void (*user_portHandler_8) (const unsigned char *buf, unsigned char len);
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078 #define set_ack(msg,val) MSG_HEAD(msg) = (val ? (MSG_HEAD(msg)|0x2) : (MSG_HEAD(msg)&0xFD))
00079 #define set_ack_1(msg) MSG_HEAD(msg) |= 0x2;
00080 #define set_ack_0(msg) MSG_HEAD(msg) &= 0xFD;
00081 #define is_ack(msg) (MSG_HEAD(msg)&0x2)
00082
00083 #define set_bit(msg,val) MSG_HEAD(msg) = (val ? (MSG_HEAD(msg)|0x1) : (MSG_HEAD(msg)&0xFE))
00084 #define set_bit_1(msg) MSG_HEAD(msg) |= 0x1;
00085 #define set_bit_0(msg) MSG_HEAD(msg) &= 0xFE;
00086 #define get_bit(msg) (MSG_HEAD(msg)&0x1)
00087
00088
00089 #define alloc_site() ((site_s*) malloc(sizeof(site_s)))
00090 #define alloc_mess(length) ((msg_s*) malloc(sizeof(msg_s)+HEADER_SZ+length))
00091 #define free_mess(mess) free (mess)
00092
00093
00095
00096
00097 #define flip_sb(rs) rs^=0x1
00098 #define flip_rb(rs) rs^=0x2
00099 #define get_sb(rs) (rs&0x1)
00100 #define get_rb(rs) ((rs&0x2)>>1)
00101
00102
00107 static void erase(site_s * site ) {
00108 msg_s * msg;
00109 msg = site->head;
00110 if (msg) {
00111 site->head=msg->next;
00112 }
00113 sem_trywait(&sem_todo_o);
00114 }
00115
00123 static msg_s * new_msg(const unsigned char * header, const unsigned char* msg,
00124 unsigned char length){
00125 msg_s * m;
00126
00127 m = alloc_mess(length);
00128 if (!m) return NULL;
00129
00130
00131
00132 m->txt = ((char*)m)+sizeof(msg_s);
00133 m->len=length + HEADER_SZ;
00134 sem_init(&m->sem_ack, 1, 0);
00135 memcpy(m->txt, header, HEADER_SZ);
00136 memcpy(m->txt+HEADER_SZ, msg, length);
00137
00138 m->next=NULL;
00139
00140 return m;
00141 }
00142
00147 static site_s * new_site(unsigned char id){
00148 site_s *s;
00149 if ((s = alloc_site()) == NULL ) return NULL;
00150 s->id=id;
00151 s->head=0;
00152 s->status=0;
00153 s->timeout_wait=0;
00154 return s;
00155 }
00156
00162 static void add( site_s *site, msg_s *m ) {
00163 msg_s *i=site->head;
00164 if (m) {
00165 if(!i) site->head=m;
00166 else {
00167 while(i->next) i=i->next;
00168 i->next=m;
00169 }
00170 }
00171 }
00172
00178 static site_s * search_and_add(unsigned char id) {
00179 site_s* i;
00180 for (i=s_head; i; i=i->next) if (i->id == id) break;
00181 if (!i) {
00182 i = s_head;
00183 s_head = new_site(id);
00184 if (s_head) s_head->next = i;
00185 i = s_head;
00186 }
00187 return i;
00188 }
00189
00196 static void AB_portHandler(const unsigned char *message, unsigned char length){
00197 msg_s *curr;
00198
00199 if(length>=HEADER_SZ){
00200 if(MSG_DEST(message) == _id) {
00201 if(sem_trywait(&sem_ab_portHandler)==0) {
00202 if ((curr = new_msg(message,
00203 message+HEADER_SZ,
00204 length-HEADER_SZ)) != NULL) {
00205 if (m_tail) m_tail->next = curr;
00206 else m_head = curr;
00207 m_tail = curr;
00208 sem_post(&sem_todo_i);
00209 }
00210 sem_post(&sem_ab_portHandler);
00211 }
00212 }
00213 }
00214 }
00215
00219 static int thread_AB_I () {
00220 site_s * site;
00221 msg_s * msg, * msg_temp;
00222 unsigned char* _txt;
00223 char ack[HEADER_SZ];
00224
00225 while(1) {
00226
00227 sem_wait(&sem_todo_i);
00228 sem_post(&sem_todo_i);
00229
00230
00231 sem_wait(&sem_ab_portHandler);
00232 msg=m_head;
00233 m_tail = NULL;
00234 sem_post(&sem_ab_portHandler);
00235
00236 while(msg != NULL) {
00237 _txt = msg->txt;
00238 sem_trywait(&sem_todo_i);
00239
00240 if(is_ack(_txt)) {
00241 DEBUG_AB(cputs("ack"));
00242
00243 sem_wait(&sem_ab_write);
00244 site=search_and_add(MSG_SOURCE(_txt));
00245 if(site && site->head) {
00246 if(get_sb(site->status)==get_bit(_txt) ) {
00247 sem_t *t = &site->head->sem_ack;
00248 erase(site);
00249 sem_post(t);
00250 site->timeout_wait = 0;
00251 flip_sb(site->status);
00252 }
00253 }
00254 sem_post(&sem_ab_write);
00255 } else {
00256
00257 MSG_SOURCE(ack) = MSG_DEST (_txt);
00258 ab_source = MSG_DEST (ack) = MSG_SOURCE(_txt);
00259 MSG_HEAD (ack) = 0;
00260 set_ack_1 (ack);
00261 set_bit (ack,get_bit(_txt));
00262
00263 sem_wait(&sem_ab_write);
00264
00265 lnp_integrity_write(ack, HEADER_SZ);
00266 msleep(0);
00267
00268
00269 site=search_and_add(MSG_SOURCE(_txt));
00270
00271
00272 if(site) {
00273 if(get_rb(site->status) == get_bit(_txt)) {
00274 user_portHandler_8(_txt+HEADER_SZ,
00275 msg->len-HEADER_SZ);
00276 flip_rb(site->status);
00277 }
00278 }
00279 sem_post(&sem_ab_write);
00280 }
00281
00282
00283 msg_temp=msg;
00284 msg=msg->next;
00285 free_mess(msg_temp);
00286 }
00287
00288 }
00289 }
00290
00294 static int thread_AB_O () {
00295 site_s * site;
00296 unsigned char* _txt;
00297
00298 while(1) {
00299 msleep (WRITE_WAIT_TIME);
00300 sem_wait(&sem_todo_o);
00301 sem_post(&sem_todo_o);
00302
00303 sem_wait(&sem_ab_write);
00304 site=s_head;
00305 while(site) {
00306
00307
00308 if(site->head!=NULL && site->timeout_wait < get_system_up_time()) {
00309
00310
00311 _txt=site->head->txt;
00312
00313 if (MSG_DEST(_txt) == _id) {
00314
00315
00316
00317 user_portHandler_8(_txt+HEADER_SZ,
00318 site->head->len-HEADER_SZ);
00319 erase (site);
00320 } else {
00321
00322
00323
00324
00325 MSG_HEAD(_txt)=0;
00326 set_bit(_txt,get_sb(site->status));
00327
00328
00329 lnp_integrity_write(_txt,site->head->len);
00330 site->timeout_wait = get_system_up_time() + WRITE_TIMEOUT +
00331 get_system_up_time()%WRITE_TIMEOUT + site->head->len;
00332 msleep(0);
00333 }
00334 }
00335 site=site->next;
00336 }
00337 sem_post(&sem_ab_write);
00338 }
00339 return 0;
00340 }
00342
00343
00346
00353 int init_ab_protocol_8(void (*u_portHandler) (const unsigned char *message, unsigned char length),
00354 unsigned char id) {
00355 unsigned char * msg;
00356 s_head=NULL;
00357 m_head=NULL;
00358 m_tail=NULL;
00359 _id = id;
00360 sem_init(&sem_ab_portHandler,1,1);
00361 sem_init(&sem_ab_write,1,1);
00362 sem_init(&sem_todo_i,1,0);
00363 sem_init(&sem_todo_o,1,0);
00364 user_portHandler_8 = u_portHandler;
00365 lnp_integrity_set_handler(AB_portHandler);
00366
00367 if (execi(thread_AB_I, 0,0,PRIO_NORMAL, DEFAULT_STACK_SIZE*2) == -1) return 0xfe ;
00368 if (execi(thread_AB_O, 0,0,PRIO_NORMAL, DEFAULT_STACK_SIZE) == -1) return 0xff;
00369
00370 return 0;
00371 }
00372
00373 static msg_s * ab_write(const unsigned char* message, unsigned char length, unsigned char destination) {
00374 char msg_header[HEADER_SZ];
00375 site_s* site;
00376 msg_s* mess = NULL;
00377
00378 MSG_HEAD(msg_header) = 0;
00379 MSG_SOURCE(msg_header) = _id;
00380 MSG_DEST(msg_header) = destination;
00381
00382 sem_wait(&sem_ab_write);
00383 site=search_and_add(destination);
00384 sem_post(&sem_ab_write);
00385
00386 if (site) {
00387 mess = new_msg(msg_header, message, length);
00388 if (mess) {
00389 sem_wait(&sem_ab_write);
00390 add(site, mess);
00391 sem_post(&sem_ab_write);
00392 sem_post(&sem_todo_o);
00393 }
00394 }
00395
00396 return mess;
00397 }
00398
00399
00400
00410 int ab_awrite(const unsigned char* message, unsigned char length, unsigned char destination) {
00411 msg_s * msg = ab_write(message, length, destination);
00412 if (msg) free_mess(msg);
00413 return msg == NULL;
00414 }
00415
00425 int ab_swrite(const unsigned char* message, unsigned char length,
00426 unsigned char destination) {
00427 msg_s * msg = ab_write(message, length, destination);
00428 if (msg) {
00429 sem_wait(&msg->sem_ack);
00430 free_mess(msg);
00431 }
00432 return msg == NULL;
00433 }
00434
00435
00437