Main Page | Modules | Data Structures | File List | Data Fields

ab.c

00001 #include "ab.h"
00002 /*
00003   Implementation of the alternating bit protocol on top of LNP for 
00004   LegOS/BrickOS
00005   Copyright (C) 2004  Bram De Wachter
00006 
00007   This program is free software; you can redistribute it and/or
00008   modify it under the terms of the GNU General Public License
00009   as published by the Free Software Foundation; either version 2
00010   of the License, or (at your option) any later version.
00011   
00012   This program is distributed in the hope that it will be useful,
00013   but WITHOUT ANY WARRANTY; without even the implied warranty of
00014   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015   GNU General Public License for more details.
00016   
00017   You should have received a copy of the GNU General Public License
00018   along with this program; if not, write to the Free Software
00019   Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
00020 */
00021 
00024 
00025 #if DEBUG
00026 #define DEBUG_AB(debug_instr)  debug_instr
00027 #else
00028 #define DEBUG_AB(debug_instr)
00029 #endif
00030 
00031 #define TRUE                        1
00032 #define FALSE                       0
00033 
00034 #define INCOMING                    0
00035 #define OUTGOING                    1
00036 
00037 #define ERR_AB_WRITE               10
00038 #define MSG_HEAD(msg)          msg[0]
00039 #define MSG_SOURCE(msg)        msg[1]
00040 #define MSG_DEST(msg)          msg[2]
00041 
00043 sem_t sem_ab_write,sem_ab_portHandler,sem_todo_i,sem_todo_o;
00044 
00046 site_s * s_head;     
00047 
00049 msg_s *  m_head, * m_tail;     
00050 
00052 unsigned char _id;
00053 
00056 void (*user_portHandler_8)  (const unsigned char *buf, unsigned char len);
00057 
00058 
00059 
00060 /*
00061    Message :
00062 
00063       0          1           2      3 ... payload ... N
00064    MSG_HEAD  MSG_SOURCE  MSG_DEST  
00065 
00066    MSG_HEAD : 
00067    7 6 5 4 3 2 1 0  (8 bits)
00068   MSB           LSB
00069    
00070    0 : bit for abp
00071    1 : is_ack
00072 
00073    => HEAD = 0 : message with bit = 0
00074              1 : message with bit = 1
00075              2 : ack with bit     = 0 
00076              3 : ack with bit     = 1
00077 */   
00078 #define set_ack(msg,val)   MSG_HEAD(msg) = (val ? (MSG_HEAD(msg)|0x2) : (MSG_HEAD(msg)&0xFD))
00079 #define set_ack_1(msg)     MSG_HEAD(msg) |= 0x2;
00080 #define set_ack_0(msg)     MSG_HEAD(msg) &= 0xFD;
00081 #define is_ack(msg)        (MSG_HEAD(msg)&0x2)
00082 
00083 #define set_bit(msg,val)   MSG_HEAD(msg) = (val ? (MSG_HEAD(msg)|0x1) : (MSG_HEAD(msg)&0xFE))
00084 #define set_bit_1(msg)     MSG_HEAD(msg) |= 0x1;
00085 #define set_bit_0(msg)     MSG_HEAD(msg) &= 0xFE;
00086 #define get_bit(msg)       (MSG_HEAD(msg)&0x1)
00087 
00088 
00089 #define alloc_site()       ((site_s*) malloc(sizeof(site_s)))
00090 #define alloc_mess(length) ((msg_s*)  malloc(sizeof(msg_s)+HEADER_SZ+length))
00091 #define free_mess(mess)    free (mess)
00092 
00093 /*****************************************************************************/
00095 /*****************************************************************************/
00096 
00097 #define flip_sb(rs)    rs^=0x1
00098 #define flip_rb(rs)    rs^=0x2
00099 #define get_sb(rs)     (rs&0x1)
00100 #define get_rb(rs)     ((rs&0x2)>>1)
00101 
00102 
00107 static void erase(site_s * site ) {
00108   msg_s * msg;
00109   msg = site->head;
00110   if (msg) {
00111     site->head=msg->next;
00112   }
00113   sem_trywait(&sem_todo_o);
00114 }
00115 
00123 static msg_s * new_msg(const unsigned char * header, const unsigned char* msg, 
00124                        unsigned char length){
00125   msg_s * m;
00126 
00127   m = alloc_mess(length);
00128   if (!m) return NULL;
00129 
00130   // Combine the two mallocs to only one, 
00131   // make m->txt point to the area after the msg_s structure
00132   m->txt = ((char*)m)+sizeof(msg_s);
00133   m->len=length + HEADER_SZ;
00134   sem_init(&m->sem_ack, 1, 0);
00135   memcpy(m->txt, header, HEADER_SZ);
00136   memcpy(m->txt+HEADER_SZ, msg, length);
00137 
00138   m->next=NULL;
00139 
00140   return m;
00141 }
00142 
00147 static site_s * new_site(unsigned char id){
00148   site_s *s;
00149   if ((s = alloc_site()) == NULL ) return NULL;
00150   s->id=id;
00151   s->head=0;
00152   s->status=0;
00153   s->timeout_wait=0;
00154   return s;
00155 }
00156 
00162 static void add( site_s *site, msg_s *m ) {
00163   msg_s *i=site->head;
00164   if (m) {
00165     if(!i) site->head=m;
00166     else {
00167       while(i->next) i=i->next;
00168       i->next=m;
00169     }
00170   }
00171 }
00172 
00178 static site_s * search_and_add(unsigned char id) {
00179   site_s* i;
00180   for (i=s_head; i; i=i->next) if (i->id == id) break;
00181   if (!i) {
00182     i = s_head;
00183     s_head = new_site(id);
00184     if (s_head) s_head->next = i;        
00185     i = s_head;
00186   }
00187   return i;
00188 }
00189 
00196 static void AB_portHandler(const unsigned char *message, unsigned char length){
00197   msg_s *curr;
00198 
00199   if(length>=HEADER_SZ){
00200           if(MSG_DEST(message) == _id) {
00201                   if(sem_trywait(&sem_ab_portHandler)==0) {/*list not in use in thread AB*/
00202                           if ((curr = new_msg(message,
00203                                               message+HEADER_SZ,
00204                                               length-HEADER_SZ)) != NULL) {
00205                                   if (m_tail) m_tail->next = curr;
00206                                   else m_head = curr;
00207                                   m_tail = curr;
00208                                   sem_post(&sem_todo_i);
00209                           } 
00210                           sem_post(&sem_ab_portHandler);
00211                   }
00212           }
00213   }
00214 }
00215 
00219 static int thread_AB_I () {
00220         site_s * site;
00221         msg_s  * msg, * msg_temp;
00222         unsigned char* _txt;
00223         char ack[HEADER_SZ];
00224 
00225         while(1) {
00226 
00227                 sem_wait(&sem_todo_i); 
00228                 sem_post(&sem_todo_i); 
00229 
00230                 //treatment of received messages
00231                 sem_wait(&sem_ab_portHandler);
00232                 msg=m_head;
00233                 m_tail = NULL;
00234                 sem_post(&sem_ab_portHandler);
00235 
00236                 while(msg != NULL) {
00237                         _txt = msg->txt;
00238                         sem_trywait(&sem_todo_i);
00239 
00240                         if(is_ack(_txt)) { // ACK RECEIVED
00241                                 DEBUG_AB(cputs("ack"));
00242 
00243                                 sem_wait(&sem_ab_write);
00244                                 site=search_and_add(MSG_SOURCE(_txt));
00245                                 if(site && site->head) {
00246                                         if(get_sb(site->status)==get_bit(_txt) ) {
00247                                                 sem_t *t = &site->head->sem_ack;
00248                                                 erase(site);
00249                                                 sem_post(t);
00250                                                 site->timeout_wait = 0; // Reset timeout
00251                                                 flip_sb(site->status);
00252                                         }
00253                                 }
00254                                 sem_post(&sem_ab_write);
00255                         } else {        // Message received,
00256                                 // answer with an ack containing same bit
00257                                 MSG_SOURCE(ack) = MSG_DEST  (_txt);
00258                                 ab_source       = MSG_DEST  (ack) = MSG_SOURCE(_txt);
00259                                 MSG_HEAD  (ack) = 0;
00260                                 set_ack_1 (ack);
00261                                 set_bit   (ack,get_bit(_txt));  
00262 
00263                                 sem_wait(&sem_ab_write);        
00264                                 // Send ack
00265                                 lnp_integrity_write(ack, HEADER_SZ);
00266                                 msleep(0);
00267 
00268                                 // Find the site that sent this message
00269                                 site=search_and_add(MSG_SOURCE(_txt));
00270 
00271                                 // Site found, if bit corresponds, call user
00272                                 if(site) {
00273                                         if(get_rb(site->status) == get_bit(_txt)) {
00274                                                 user_portHandler_8(_txt+HEADER_SZ,
00275                                                                 msg->len-HEADER_SZ);
00276                                                 flip_rb(site->status); //switch BIT of site
00277                                         }
00278                                 }
00279                                 sem_post(&sem_ab_write);
00280                         }
00281 
00282                         // Message treated, destroy it
00283                         msg_temp=msg;
00284                         msg=msg->next;
00285                         free_mess(msg_temp);
00286                 } // Loop on messages
00287 
00288         } // Loop while(1)
00289 }
00290 
00294 static int thread_AB_O () {
00295         site_s * site;            
00296         unsigned char* _txt;
00297 
00298         while(1) {
00299                 msleep (WRITE_WAIT_TIME);
00300                 sem_wait(&sem_todo_o);
00301                 sem_post(&sem_todo_o);
00302 
00303                 sem_wait(&sem_ab_write);
00304                 site=s_head;
00305                 while(site) {
00306 
00307                         // Check if there is a site whose timeout expired
00308                         if(site->head!=NULL && site->timeout_wait < get_system_up_time()) {
00309                                 // This site is ready for sending !
00310                                 // Take out the message to send
00311                                 _txt=site->head->txt;
00312 
00313                                 if (MSG_DEST(_txt) == _id) {
00314                                         // Sending to ourselfs ... hmmm ... call the handler immediately
00315                                         // Of course, there will be no ack, and delivery is certain,
00316                                         // so remove the message from the queue
00317                                         user_portHandler_8(_txt+HEADER_SZ,
00318                                                         site->head->len-HEADER_SZ);
00319                                         erase (site);
00320                                 } else {
00321                                         // Sending to a distant site ...
00322 
00323                                         // Clear the status. Make it a message status
00324                                         // With the corresponding bit
00325                                         MSG_HEAD(_txt)=0;
00326                                         set_bit(_txt,get_sb(site->status));
00327 
00328                                         // Send the message, and start the timeout timer
00329                                         lnp_integrity_write(_txt,site->head->len);
00330                                         site->timeout_wait = get_system_up_time() + WRITE_TIMEOUT + 
00331                                                              get_system_up_time()%WRITE_TIMEOUT + site->head->len;
00332                                         msleep(0);
00333                                 }
00334                         } 
00335                         site=site->next;
00336                 }
00337                 sem_post(&sem_ab_write);
00338         }
00339         return 0;
00340 }
00342 
00343 
00346 
00353 int init_ab_protocol_8(void (*u_portHandler) (const unsigned char *message, unsigned char length),  
00354                 unsigned char id) {
00355         unsigned char * msg;
00356         s_head=NULL;
00357         m_head=NULL;
00358         m_tail=NULL;
00359         _id = id;
00360         sem_init(&sem_ab_portHandler,1,1);
00361         sem_init(&sem_ab_write,1,1);
00362         sem_init(&sem_todo_i,1,0);
00363         sem_init(&sem_todo_o,1,0);
00364         user_portHandler_8 = u_portHandler; 
00365         lnp_integrity_set_handler(AB_portHandler);
00366         
00367         if (execi(thread_AB_I, 0,0,PRIO_NORMAL, DEFAULT_STACK_SIZE*2) == -1) return 0xfe ;
00368         if (execi(thread_AB_O, 0,0,PRIO_NORMAL, DEFAULT_STACK_SIZE) == -1) return 0xff;
00369 
00370         return 0;
00371 }
00372 
00373 static msg_s * ab_write(const unsigned char* message, unsigned char length, unsigned char destination) {
00374         char msg_header[HEADER_SZ];
00375         site_s* site;
00376         msg_s* mess = NULL;
00377 
00378         MSG_HEAD(msg_header)   = 0;
00379         MSG_SOURCE(msg_header) = _id;
00380         MSG_DEST(msg_header)   = destination;
00381 
00382         sem_wait(&sem_ab_write);
00383         site=search_and_add(destination);
00384         sem_post(&sem_ab_write);
00385 
00386         if (site) {
00387                 mess = new_msg(msg_header, message, length);
00388                 if (mess)  {
00389                         sem_wait(&sem_ab_write);
00390                         add(site, mess);
00391                         sem_post(&sem_ab_write);
00392                         sem_post(&sem_todo_o);
00393                 }
00394         }
00395 
00396         return mess;
00397 }
00398 
00399 
00400 
00410 int ab_awrite(const unsigned char* message, unsigned char length, unsigned char destination) {
00411         msg_s * msg = ab_write(message, length, destination);   
00412         if (msg) free_mess(msg);
00413         return msg == NULL;
00414 }
00415 
00425 int ab_swrite(const unsigned char* message, unsigned char length,
00426                  unsigned char destination) {
00427   msg_s * msg = ab_write(message, length, destination);
00428   if (msg) {
00429       sem_wait(&msg->sem_ack);
00430       free_mess(msg);
00431   }
00432   return msg == NULL;
00433 }
00434 
00435 
00437 

Generated on Sat Mar 12 16:12:58 2005 for Alternating Bit by doxygen 1.3.3