root/common/trunk/src/mbus.c @ 475

Revision 475, 29.1 KB (checked in by ucacoxh, 14 years ago)

- Make waiting_ack->next null when putting message on ack list, otherwise

flush messages can attempt to delete the same message twice.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*
2 * FILE:     mbus.c
3 * AUTHOR:   Colin Perkins
4 * MODIFIED: Orion Hodson
5 *           Markus Germeier
6 *
7 * Copyright (c) 1997-2000 University College London
8 * All rights reserved.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, is permitted provided that the following conditions
12 * are met:
13 * 1. Redistributions of source code must retain the above copyright
14 *    notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 *    notice, this list of conditions and the following disclaimer in the
17 *    documentation and/or other materials provided with the distribution.
18 * 3. All advertising materials mentioning features or use of this software
19 *    must display the following acknowledgement:
20 *      This product includes software developed by the Computer Science
21 *      Department at University College London
22 * 4. Neither the name of the University nor of the Department may be used
23 *    to endorse or promote products derived from this software without
24 *    specific prior written permission.
25 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
26 * ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
29 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
30 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
31 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
32 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
33 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
34 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35 * SUCH DAMAGE.
36 */
37
38#include "config_unix.h"
39#include "config_win32.h"
40#include "debug.h"
41#include "memory.h"
42#include "net_udp.h"
43#include "hmac.h"
44#include "qfDES.h"
45#include "base64.h"
46#include "gettimeofday.h"
47#include "mbus.h"
48#include "mbus_config.h"
49#include "mbus_parser.h"
50#include "mbus_addr.h"
51
52#define MBUS_BUF_SIZE     1500
53#define MBUS_ACK_BUF_SIZE 1500
54#define MBUS_MAX_ADDR       10
55#define MBUS_MAX_QLEN       50 /* Number of messages we can queue with mbus_qmsg() */
56
57#define MBUS_MAGIC      0x87654321
58#define MBUS_MSG_MAGIC  0x12345678
59
60#ifdef NEED_VSNPRINTF
61static int vsnprintf(char *s, size_t buf_size, const char *format, va_list ap)
62{
63        /* Quick hack replacement for vsnprintf... note that this */
64        /* doesn't check for buffer overflows, and so is open to  */
65        /* many really nasty attacks!                             */
66        UNUSED(buf_size);
67        return vsprintf(s,format,ap);
68}
69#endif
70
71struct mbus_msg {
72        struct mbus_msg *next;
73        struct timeval   send_time;     /* Time the message was sent, to trigger a retransmit */
74        struct timeval   comp_time;     /* Time the message was composed, the timestamp in the packet header */
75        char            *dest;
76        int              reliable;
77        int              complete;      /* Indicates that we've finished adding cmds to this message */
78        int              seqnum;
79        int              retransmit_count;
80        int              message_size;
81        int              num_cmds;
82        char            *cmd_list[MBUS_MAX_QLEN];
83        char            *arg_list[MBUS_MAX_QLEN];
84        uint32_t         idx_list[MBUS_MAX_QLEN];
85        uint32_t         magic;         /* For debugging... */
86};
87
88struct mbus {
89        socket_udp               *s;
90        char                     *addr;                         /* Addresses we respond to.                                     */
91        int                       max_other_addr;
92        int                       num_other_addr;
93        char                    **other_addr;                   /* Addresses of other entities on the mbus.                     */
94        struct timeval          **other_hello;                  /* Time of last mbus.hello we received from other entities      */
95        int                       seqnum;
96        struct mbus_msg          *cmd_queue;                    /* Queue of messages waiting to be sent */
97        struct mbus_msg          *waiting_ack;                  /* The last reliable message sent, if we have not yet got the ACK */
98        char                     *hashkey;
99        int                       hashkeylen;
100        char                     *encrkey;
101        int                       encrkeylen;
102        struct timeval            last_heartbeat;               /* Last time we sent a heartbeat message */
103        struct mbus_config       *cfg;
104        void (*cmd_handler)(char *src, char *cmd, char *arg, void *dat);
105        void (*err_handler)(int seqnum, int reason);
106        uint32_t                  magic;                        /* For debugging...                                             */
107        uint32_t                  index;
108        uint32_t                  index_sent;
109};
110
111static void mbus_validate(struct mbus *m)
112{
113#ifdef DEBUG
114        int     i;
115
116        assert(m->num_other_addr <= m->max_other_addr);
117        assert(m->num_other_addr >= 0);
118        for (i = 0; i < m->num_other_addr; i++) {
119                assert(m->other_addr[i]  != NULL);
120                assert(m->other_hello[i] != NULL);
121        }
122        for (i = m->num_other_addr + 1; i < m->max_other_addr; i++) {
123                assert(m->other_addr[i]  == NULL);
124                assert(m->other_hello[i] == NULL);
125        }
126#endif
127        assert(m->magic == MBUS_MAGIC);
128}
129
130static void mbus_msg_validate(struct mbus_msg *m)
131{
132#ifdef DEBUG
133        int     i;
134
135        assert((m->num_cmds < MBUS_MAX_QLEN) && (m->num_cmds >= 0));
136        for (i = 0; i < m->num_cmds; i++) {
137                assert(m->cmd_list[i] != NULL);
138                assert(m->arg_list[i] != NULL);
139                if (i > 0) {
140                        assert(m->idx_list[i] > m->idx_list[i-1]);
141                }
142        }
143        for (i = m->num_cmds + 1; i < MBUS_MAX_QLEN; i++) {
144                assert(m->cmd_list[i] == NULL);
145                assert(m->arg_list[i] == NULL);
146        }       
147        assert(m->dest != NULL);
148#endif
149        assert(m->magic == MBUS_MSG_MAGIC);
150}
151
152static void store_other_addr(struct mbus *m, char *a)
153{
154        /* This takes the address a and ensures it is stored in the   */
155        /* m->other_addr field of the mbus structure. The other_addr  */
156        /* field should probably be a hash table, but for now we hope */
157        /* that there are not too many entities on the mbus, so the   */
158        /* list is small.                                             */
159        int     i;
160
161        mbus_validate(m);
162
163        for (i = 0; i < m->num_other_addr; i++) {
164                if (mbus_addr_match(m->other_addr[i], a)) {
165                        /* Already in the list... */
166                        gettimeofday(m->other_hello[i],NULL);
167                        return;
168                }
169        }
170
171        if (m->num_other_addr == m->max_other_addr) {
172                /* Expand the list... */
173                m->max_other_addr *= 2;
174                m->other_addr = (char **) xrealloc(m->other_addr, m->max_other_addr * sizeof(char *));
175                m->other_hello = (struct timeval **) xrealloc(m->other_hello, m->max_other_addr * sizeof(struct timeval *));
176        }
177        m->other_hello[m->num_other_addr]=(struct timeval *)xmalloc(sizeof(struct timeval));
178        gettimeofday(m->other_hello[m->num_other_addr],NULL);
179        m->other_addr[m->num_other_addr++] = xstrdup(a);
180}
181
182static void remove_other_addr(struct mbus *m, char *a)
183{
184        /* Removes the address a from the m->other_addr field of the */
185        /* mbus structure.                                           */
186        int     i, j;
187
188        mbus_validate(m);
189
190        for (i = 0; i < m->num_other_addr; i++) {
191                if (mbus_addr_match(m->other_addr[i], a)) {
192                        xfree(m->other_addr[i]);
193                        xfree(m->other_hello[i]);
194                        for (j = i+1; j < m->num_other_addr; j++) {
195                                m->other_addr[j-1] = m->other_addr[j];
196                                m->other_hello[j-1] = m->other_hello[j];
197                        }
198                        m->other_addr[m->num_other_addr  - 1] = NULL;
199                        m->other_hello[m->num_other_addr - 1] = NULL;
200                        m->num_other_addr--;
201                }
202        }
203}
204
205static void remove_inactiv_other_addr(struct mbus *m, struct timeval t, int interval){
206        /* Remove addresses we haven't heard from for about 5 * interval */
207        /* Count backwards so it is safe to remove entries               */
208        int i;
209   
210        mbus_validate(m);
211
212        for (i=m->num_other_addr-1; i>=0; i--){
213                if ((t.tv_sec-(m->other_hello[i]->tv_sec)) > 5 * interval) {
214                        debug_msg("remove dead entity (%s)\n", m->other_addr[i]);
215                        remove_other_addr(m, m->other_addr[i]);
216                }
217        }
218}
219
220int mbus_addr_valid(struct mbus *m, char *addr)
221{
222        int     i;
223
224        mbus_validate(m);
225
226        for (i = 0; i < m->num_other_addr; i++) {
227                if (mbus_addr_match(m->other_addr[i], addr)) {
228                        return TRUE;
229                }
230        }
231        return FALSE;
232}
233
234static int mbus_addr_unique(struct mbus *m, char *addr)
235{
236        int     i, n = 0;
237
238        mbus_validate(m);
239
240        for (i = 0; i < m->num_other_addr; i++) {
241                if (mbus_addr_match(m->other_addr[i], addr)) {
242                        n++;
243                }
244        }
245        return n==1;
246}
247
248/* The mb_* functions are used to build an mbus message up in the */
249/* mb_buffer, and to add authentication and encryption before the */
250/* message is sent.                                               */
251char     mb_cryptbuf[MBUS_BUF_SIZE];
252char    *mb_buffer;
253char    *mb_bufpos;
254
255#define MBUS_AUTH_LEN 16
256
257static void mb_header(int seqnum, int ts, char reliable, const char *src, const char *dst, int ackseq)
258{
259        xmemchk();
260        mb_buffer   = (char *) xmalloc(MBUS_BUF_SIZE + 1);
261        memset(mb_buffer,   0, MBUS_BUF_SIZE);
262        memset(mb_buffer, ' ', MBUS_AUTH_LEN);
263        mb_bufpos = mb_buffer + MBUS_AUTH_LEN;
264        sprintf(mb_bufpos, "\nmbus/1.0 %6d %9d %c (%s) %s ", seqnum, ts, reliable, src, dst);
265        mb_bufpos += 33 + strlen(src) + strlen(dst);
266        if (ackseq == -1) {
267                sprintf(mb_bufpos, "()\n");
268                mb_bufpos += 3;
269        } else {
270                sprintf(mb_bufpos, "(%6d)\n", ackseq);
271                mb_bufpos += 9;
272        }
273}
274
275static void mb_add_command(const char *cmnd, const char *args)
276{
277        int offset = strlen(cmnd) + strlen(args) + 5;
278
279        assert((mb_bufpos + offset - mb_buffer) < MBUS_BUF_SIZE);
280
281        sprintf(mb_bufpos, "%s (%s)\n", cmnd, args);
282        mb_bufpos += offset - 1; /* The -1 in offset means we're not NUL terminated - fix in mb_send */
283}
284
285static void mb_send(struct mbus *m)
286{
287        char            digest[16];
288        int             len;
289        unsigned char   initVec[8] = {0,0,0,0,0,0,0,0};
290 
291        mbus_validate(m);
292
293        *(mb_bufpos++) = '\0';
294        assert((mb_bufpos - mb_buffer) < MBUS_BUF_SIZE);
295        assert(strlen(mb_buffer) < MBUS_BUF_SIZE);
296
297        /* Pad to a multiple of 8 bytes, so the encryption can work... */
298        while (((mb_bufpos - mb_buffer) % 8) != 0) {
299                *(mb_bufpos++) = '\0';
300        }
301        len = mb_bufpos - mb_buffer;
302        assert(len < MBUS_BUF_SIZE);
303        assert(strlen(mb_buffer) < MBUS_BUF_SIZE);
304
305        if (m->hashkey != NULL) {
306                /* Authenticate... */
307                hmac_md5(mb_buffer + MBUS_AUTH_LEN+1, strlen(mb_buffer) - (MBUS_AUTH_LEN+1), m->hashkey, m->hashkeylen, digest);
308                base64encode(digest, 12, mb_buffer, MBUS_AUTH_LEN);
309        }
310        if (m->encrkey != NULL) {
311                /* Encrypt... */
312                memset(mb_cryptbuf, 0, MBUS_BUF_SIZE);
313                memcpy(mb_cryptbuf, mb_buffer, len);
314                assert((len % 8) == 0);
315                assert(len < MBUS_BUF_SIZE);
316                assert(m->encrkeylen == 8);
317                qfDES_CBC_e(m->encrkey, mb_cryptbuf, len, initVec);
318                memcpy(mb_buffer, mb_cryptbuf, len);
319        }
320        udp_send(m->s, mb_buffer, len);
321        xfree(mb_buffer);
322}
323
324static void resend(struct mbus *m, struct mbus_msg *curr)
325{
326        /* Don't need to check for buffer overflows: this was done in mbus_send() when */
327        /* this message was first transmitted. If it was okay then, it's okay now.     */
328        int      i;
329
330        mbus_validate(m);
331
332        mb_header(curr->seqnum, curr->comp_time.tv_sec, (char)(curr->reliable?'R':'U'), m->addr, curr->dest, -1);
333        for (i = 0; i < curr->num_cmds; i++) {
334                mb_add_command(curr->cmd_list[i], curr->arg_list[i]);
335        }
336        mb_send(m);
337        curr->retransmit_count++;
338}
339
340void mbus_retransmit(struct mbus *m)
341{
342        struct mbus_msg *curr = m->waiting_ack;
343        struct timeval  time;
344        long            diff;
345
346        mbus_validate(m);
347
348        if (!mbus_waiting_ack(m)) {
349                return;
350        }
351
352        mbus_msg_validate(curr);
353
354        gettimeofday(&time, NULL);
355
356        /* diff is time in milliseconds that the message has been awaiting an ACK */
357        diff = ((time.tv_sec * 1000) + (time.tv_usec / 1000)) - ((curr->send_time.tv_sec * 1000) + (curr->send_time.tv_usec / 1000));
358        if (diff > 10000) {
359                debug_msg("Reliable mbus message failed!\n");
360                if (m->err_handler == NULL) {
361                        abort();
362                }
363                m->err_handler(curr->seqnum, MBUS_MESSAGE_LOST);
364                /* if we don't delete this failed message, the error handler
365                   gets triggered every time we call mbus_retransmit */
366                while (m->waiting_ack->num_cmds > 0) {
367                    m->waiting_ack->num_cmds--;
368                    xfree(m->waiting_ack->cmd_list[m->waiting_ack->num_cmds]);
369                    xfree(m->waiting_ack->arg_list[m->waiting_ack->num_cmds]);
370                }
371                xfree(m->waiting_ack->dest);
372                xfree(m->waiting_ack);
373                m->waiting_ack = NULL;
374                return;
375        }
376        /* Note: We only send one retransmission each time, to avoid
377         * overflowing the receiver with a burst of requests...
378         */
379        if ((diff > 750) && (curr->retransmit_count == 2)) {
380                resend(m, curr);
381                return;
382        }
383        if ((diff > 500) && (curr->retransmit_count == 1)) {
384                resend(m, curr);
385                return;
386        }
387        if ((diff > 250) && (curr->retransmit_count == 0)) {
388                resend(m, curr);
389                return;
390        }
391}
392
393void mbus_heartbeat(struct mbus *m, int interval)
394{
395        struct timeval  curr_time;
396        char    *a = (char *) xmalloc(3);
397        sprintf(a, "()");
398
399        mbus_validate(m);
400
401        gettimeofday(&curr_time, NULL);
402        if (curr_time.tv_sec - m->last_heartbeat.tv_sec >= interval) {
403                mb_header(++m->seqnum, (int) curr_time.tv_sec, 'U', m->addr, "()", -1);
404                mb_add_command("mbus.hello", "");
405                mb_send(m);
406
407                m->last_heartbeat = curr_time;
408                /* Remove dead sources */
409                remove_inactiv_other_addr(m, curr_time, interval);
410        }
411        xfree(a);
412}
413
414int mbus_waiting_ack(struct mbus *m)
415{
416        mbus_validate(m);
417        return m->waiting_ack != NULL;
418}
419
420int mbus_sent_all(struct mbus *m)
421{
422        mbus_validate(m);
423        return (m->cmd_queue == NULL) && (m->waiting_ack == NULL);
424}
425
426struct mbus *mbus_init(void  (*cmd_handler)(char *src, char *cmd, char *arg, void *dat),
427                       void  (*err_handler)(int seqnum, int reason),
428                       char  *addr)
429{
430        struct mbus             *m;
431        struct mbus_key          k;
432        struct mbus_parser      *mp;
433        int                      i;
434        char                    *net_addr, *tmp;
435        uint16_t                 net_port;
436        int                      net_scope;
437
438        m = (struct mbus *) xmalloc(sizeof(struct mbus));
439        if (m == NULL) {
440                debug_msg("Unable to allocate memory for mbus\n");
441                return NULL;
442        }
443
444        m->cfg = mbus_create_config();
445        mbus_lock_config_file(m->cfg);
446        net_addr = (char *) xmalloc(20);
447        mbus_get_net_addr(m->cfg, net_addr, &net_port, &net_scope);
448        m->s              = udp_init(net_addr, net_port, net_port, net_scope); 
449        m->seqnum         = 0;
450        m->cmd_handler    = cmd_handler;
451        m->err_handler    = err_handler;
452        m->num_other_addr = 0;
453        m->max_other_addr = 10;
454        m->other_addr     = (char **) xmalloc(sizeof(char *) * 10);
455        m->other_hello    = (struct timeval **) xmalloc(sizeof(struct timeval *) * 10);
456        for (i = 0; i < 10; i++) {
457                m->other_addr[i]  = NULL;
458                m->other_hello[i] = NULL;
459        }
460        m->cmd_queue      = NULL;
461        m->waiting_ack    = NULL;
462        m->magic          = MBUS_MAGIC;
463        m->index          = 0;
464        m->index_sent     = 0;
465
466        mp = mbus_parse_init(xstrdup(addr));
467        if (!mbus_parse_lst(mp, &tmp)) {
468                debug_msg("Invalid mbus address\n");
469                abort();
470        }
471        m->addr = xstrdup(tmp);
472        mbus_parse_done(mp);
473        assert(m->addr != NULL);
474
475        gettimeofday(&(m->last_heartbeat), NULL);
476
477        mbus_get_encrkey(m->cfg, &k);
478        m->encrkey    = k.key;
479        m->encrkeylen = k.key_len;
480
481        mbus_get_hashkey(m->cfg, &k);
482        m->hashkey    = k.key;
483        m->hashkeylen = k.key_len;
484
485        mbus_unlock_config_file(m->cfg);
486
487        xfree(net_addr);
488
489        return m;
490}
491
492void mbus_cmd_handler(struct mbus *m, void  (*cmd_handler)(char *src, char *cmd, char *arg, void *dat))
493{
494        mbus_validate(m);
495        m->cmd_handler = cmd_handler;
496}
497
498static void mbus_flush_msgs(struct mbus_msg **queue)
499{
500        struct mbus_msg *curr, *next;
501        int i;
502       
503        curr = *queue;
504        while(curr) {
505                next = curr->next;
506                xfree(curr->dest);
507                for(i = 0; i < curr->num_cmds; i++) {
508                        xfree(curr->cmd_list[i]);
509                        xfree(curr->arg_list[i]);
510                }
511                xfree(curr);
512                curr = next;
513        }
514        *queue = NULL;
515}
516
517void mbus_exit(struct mbus *m)
518{
519        int i;
520
521        assert(m != NULL);
522        mbus_validate(m);
523
524        mbus_qmsg(m, "()", "mbus.bye", "", FALSE);
525        mbus_send(m);
526
527        /* FIXME: It should be a fatal error to call mbus_exit() if some messages are still outstanding. */
528        /*        We will need an mbus_flush() call first though, to ensure nothing is waiting.          */
529        mbus_flush_msgs(&m->cmd_queue);
530        mbus_flush_msgs(&m->waiting_ack);
531
532        if (m->encrkey != NULL) {
533                xfree(m->encrkey);
534        }
535        if (m->hashkey != NULL) {
536                xfree(m->hashkey);
537        }
538
539        udp_exit(m->s);
540
541        /* Clean up other_* */
542        for (i=m->num_other_addr-1; i>=0; i--){
543            remove_other_addr(m, m->other_addr[i]);
544        }
545
546        xfree(m->addr);
547        xfree(m->other_addr);
548        xfree(m->other_hello);
549        xfree(m->cfg);
550        xfree(m);
551}
552
553void mbus_send(struct mbus *m)
554{
555        /* Send one, or more, messages previosly queued with mbus_qmsg(). */
556        /* Messages for the same destination are batched together. Stops  */
557        /* when a reliable message is sent, until the ACK is received.    */
558        struct mbus_msg *curr = m->cmd_queue;
559        int              i;
560
561        mbus_validate(m);
562        if (m->waiting_ack != NULL) {
563                return;
564        }
565
566        while (curr != NULL) {
567                mbus_msg_validate(curr);
568                /* It's okay for us to send messages which haven't been marked as complete - */
569                /* that just means we're sending something which has the potential to have   */
570                /* more data piggybacked. However, if it's not complete it MUST be the last  */
571                /* in the list, or something has been reordered - which is bad.              */
572                if (!curr->complete) {
573                        assert(curr->next == NULL);
574                }
575
576                if (curr->reliable) {
577                        if (!mbus_addr_valid(m, curr->dest)) {
578                            debug_msg("Trying to send reliably to an unknown address...\n");
579                            if (m->err_handler == NULL) {
580                                abort();
581                            }
582                            m->err_handler(curr->seqnum, MBUS_DESTINATION_UNKNOWN);
583                        }
584                        if (!mbus_addr_unique(m, curr->dest)) {
585                            debug_msg("Trying to send reliably but address is not unique...\n");
586                            if (m->err_handler == NULL) {
587                                abort();
588                            }
589                            m->err_handler(curr->seqnum, MBUS_DESTINATION_NOT_UNIQUE);
590                        }
591                }
592                /* Create the message... */
593                mb_header(curr->seqnum, curr->comp_time.tv_sec, (char)(curr->reliable?'R':'U'), m->addr, curr->dest, -1);
594                for (i = 0; i < curr->num_cmds; i++) {
595                        assert(m->index_sent == (curr->idx_list[i] - 1));
596                        m->index_sent = curr->idx_list[i];
597                        mb_add_command(curr->cmd_list[i], curr->arg_list[i]);
598                }
599                mb_send(m);
600               
601                m->cmd_queue = curr->next;
602                if (curr->reliable) {
603                        /* Reliable message, wait for the ack... */
604                        gettimeofday(&(curr->send_time), NULL);
605                        m->waiting_ack = curr;
606                        curr->next = NULL;
607                        return;
608                } else {
609                        while (curr->num_cmds > 0) {
610                                curr->num_cmds--;
611                                xfree(curr->cmd_list[curr->num_cmds]); curr->cmd_list[curr->num_cmds] = NULL;
612                                xfree(curr->arg_list[curr->num_cmds]); curr->arg_list[curr->num_cmds] = NULL;
613                        }
614                        xfree(curr->dest);
615                        xfree(curr);
616                }
617                curr = m->cmd_queue;
618        }
619}
620
621void mbus_qmsg(struct mbus *m, const char *dest, const char *cmnd, const char *args, int reliable)
622{
623        /* Queue up a message for sending. The message is not */
624        /* actually sent until mbus_send() is called.         */
625        struct mbus_msg *curr = m->cmd_queue;
626        struct mbus_msg *prev = NULL;
627        int              alen = strlen(cmnd) + strlen(args) + 4;
628        int              i;
629
630        mbus_validate(m);
631        while (curr != NULL) {
632                mbus_msg_validate(curr);
633                if (!curr->complete) {
634                        /* This message is still open for new commands. It MUST be the last in the */
635                        /* cmd_queue, else commands will be reordered.                             */
636                        assert(curr->next == NULL);
637                        if (mbus_addr_identical(curr->dest, dest) &&
638                            (curr->num_cmds < MBUS_MAX_QLEN) && ((curr->message_size + alen) < (MBUS_BUF_SIZE - 500))) {
639                                curr->num_cmds++;
640                                curr->reliable |= reliable;
641                                curr->cmd_list[curr->num_cmds-1] = xstrdup(cmnd);
642                                curr->arg_list[curr->num_cmds-1] = xstrdup(args);
643                                curr->idx_list[curr->num_cmds-1] = ++(m->index);
644                                curr->message_size += alen;
645                                mbus_msg_validate(curr);
646                                return;
647                        } else {
648                                curr->complete = TRUE;
649                        }
650                }
651                prev = curr;
652                curr = curr->next;
653        }
654        /* If we get here, we've not found an open message in the cmd_queue.  We */
655        /* have to create a new message, and add it to the end of the cmd_queue. */
656        curr = (struct mbus_msg *) xmalloc(sizeof(struct mbus_msg));
657        curr->magic            = MBUS_MSG_MAGIC;
658        curr->next             = NULL;
659        curr->dest             = xstrdup(dest);
660        curr->retransmit_count = 0;
661        curr->message_size     = alen + 60 + strlen(dest) + strlen(m->addr);
662        curr->seqnum           = m->seqnum++;
663        curr->reliable         = reliable;
664        curr->complete         = FALSE;
665        curr->num_cmds         = 1;
666        curr->cmd_list[0]      = xstrdup(cmnd);
667        curr->arg_list[0]      = xstrdup(args);
668        curr->idx_list[curr->num_cmds-1] = ++(m->index);
669        for (i = 1; i < MBUS_MAX_QLEN; i++) {
670                curr->cmd_list[i] = NULL;
671                curr->arg_list[i] = NULL;
672        }
673        if (prev == NULL) {
674                m->cmd_queue = curr;
675        } else {
676                prev->next = curr;
677        }
678        gettimeofday(&(curr->send_time), NULL);
679        gettimeofday(&(curr->comp_time), NULL);
680        mbus_msg_validate(curr);
681}
682
683void mbus_qmsgf(struct mbus *m, const char *dest, int reliable, const char *cmnd, const char *format, ...)
684{
685        /* This is a wrapper around mbus_qmsg() which does a printf() style format into  */
686        /* a buffer. Saves the caller from having to a a malloc(), write the args string */
687        /* and then do a free(), and also saves worring about overflowing the buffer, so */
688        /* removing a common source of bugs!                                             */
689        char    buffer[MBUS_BUF_SIZE];
690        va_list ap;
691
692        mbus_validate(m);
693        va_start(ap, format);
694#ifdef WIN32
695        _vsnprintf(buffer, MBUS_BUF_SIZE, format, ap);
696#else
697        vsnprintf(buffer, MBUS_BUF_SIZE, format, ap);
698#endif
699        va_end(ap);
700        mbus_qmsg(m, dest, cmnd, buffer, reliable);
701}
702
703int mbus_recv(struct mbus *m, void *data, struct timeval *timeout)
704{
705        char                    *auth, *ver, *src, *dst, *ack, *r, *cmd, *param, *npos;
706        char                     buffer[MBUS_BUF_SIZE];
707        int                      buffer_len, seq, a, rx, ts, authlen, loop_count;
708        char                     ackbuf[MBUS_ACK_BUF_SIZE];
709        char                     digest[16];
710        unsigned char            initVec[8] = {0,0,0,0,0,0,0,0};
711        struct timeval           t;
712        struct mbus_parser      *mp, *mp2;
713
714        mbus_validate(m);
715
716        rx = FALSE;
717        loop_count = 0;
718        while (loop_count++ < 10) {
719                memset(buffer, 0, MBUS_BUF_SIZE);
720                assert(m->s != NULL);
721                udp_fd_zero();
722                udp_fd_set(m->s);
723                t.tv_sec  = timeout->tv_sec;
724                t.tv_usec = timeout->tv_usec;
725                if ((udp_select(&t) > 0) && udp_fd_isset(m->s)) {
726                        buffer_len = udp_recv(m->s, buffer, MBUS_BUF_SIZE);
727                        if (buffer_len > 0) {
728                                rx = TRUE;
729                        } else {
730                                return rx;
731                        }
732                } else {
733                        return FALSE;
734                }
735
736                if (m->encrkey != NULL) {
737                        /* Decrypt the message... */
738                        if ((buffer_len % 8) != 0) {
739                                debug_msg("Encrypted message not a multiple of 8 bytes in length\n");
740                                continue;
741                        }
742                        memcpy(mb_cryptbuf, buffer, buffer_len);
743                        memset(initVec, 0, 8);
744                        qfDES_CBC_d(m->encrkey, mb_cryptbuf, buffer_len, initVec);
745                        memcpy(buffer, mb_cryptbuf, buffer_len);
746                }
747
748                /* Sanity check that this is a vaguely sensible format message... Should prevent */
749                /* problems if we're fed complete garbage, but won't prevent determined hackers. */
750                if (strncmp(buffer + MBUS_AUTH_LEN + 1, "mbus/1.0", 8) != 0) {
751                        continue;
752                }
753
754                mp = mbus_parse_init(buffer);
755                /* remove trailing 0 bytes */
756                npos = (char *) strchr(buffer,'\0');
757                if(npos!=NULL) {
758                        buffer_len=npos-buffer;
759                }
760                /* Parse the authentication header */
761                if (!mbus_parse_sym(mp, &auth)) {
762                        debug_msg("Failed to parse authentication header\n");
763                        mbus_parse_done(mp);
764                        continue;
765                }
766
767                /* Check that the packet authenticates correctly... */
768                authlen = strlen(auth);
769                hmac_md5(buffer + authlen + 1, buffer_len - authlen - 1, m->hashkey, m->hashkeylen, digest);
770                base64encode(digest, 12, ackbuf, 16);
771                if ((strlen(auth) != 16) || (strncmp(auth, ackbuf, 16) != 0)) {
772                        debug_msg("Failed to authenticate message...\n");
773                        mbus_parse_done(mp);
774                        continue;
775                }
776
777                /* Parse the header */
778                if (!mbus_parse_sym(mp, &ver)) {
779                        mbus_parse_done(mp);
780                        debug_msg("Parser failed version (1): %s\n",ver);
781                        continue;
782                }
783                if (strcmp(ver, "mbus/1.0") != 0) {
784                        mbus_parse_done(mp);
785                        debug_msg("Parser failed version (2): %s\n",ver);
786                        continue;
787                }
788                if (!mbus_parse_int(mp, &seq)) {
789                        mbus_parse_done(mp);
790                        debug_msg("Parser failed seq\n");
791                        continue;
792                }
793                if (!mbus_parse_int(mp, &ts)) {
794                        mbus_parse_done(mp);
795                        debug_msg("Parser failed ts\n");
796                        continue;
797                }
798                if (!mbus_parse_sym(mp, &r)) {
799                        mbus_parse_done(mp);
800                        debug_msg("Parser failed reliable\n");
801                        continue;
802                }
803                if (!mbus_parse_lst(mp, &src)) {
804                        mbus_parse_done(mp);
805                        debug_msg("Parser failed src\n");
806                        continue;
807                }
808                if (!mbus_parse_lst(mp, &dst)) {
809                        mbus_parse_done(mp);
810                        debug_msg("Parser failed dst\n");
811                        continue;
812                }
813                if (!mbus_parse_lst(mp, &ack)) {
814                        mbus_parse_done(mp);
815                        debug_msg("Parser failed ack\n");
816                        continue;
817                }
818
819                store_other_addr(m, src);
820
821                /* Check if the message was addressed to us... */
822                if (mbus_addr_match(m->addr, dst)) {
823                        /* ...if so, process any ACKs received... */
824                        mp2 = mbus_parse_init(ack);
825                        while (mbus_parse_int(mp2, &a)) {
826                                if (mbus_waiting_ack(m)) {
827                                        if (m->waiting_ack->seqnum == a) {
828                                                while (m->waiting_ack->num_cmds > 0) {
829                                                        m->waiting_ack->num_cmds--;
830                                                        xfree(m->waiting_ack->cmd_list[m->waiting_ack->num_cmds]);
831                                                        xfree(m->waiting_ack->arg_list[m->waiting_ack->num_cmds]);
832                                                }
833                                                xfree(m->waiting_ack->dest);
834                                                xfree(m->waiting_ack);
835                                                m->waiting_ack = NULL;
836                                        } else {
837                                                debug_msg("Got ACK %d but wanted %d\n", a, m->waiting_ack->seqnum);
838                                        }
839                                } else {
840                                        debug_msg("Got ACK %d but wasn't expecting it\n", a);
841                                }
842                        }
843                        mbus_parse_done(mp2);
844                        /* ...if an ACK was requested, send one... */
845                        if (strcmp(r, "R") == 0) {
846                                char            *newsrc = (char *) xmalloc(strlen(src) + 3);
847                                struct timeval   t;
848
849                                sprintf(newsrc, "(%s)", src);   /* Yes, this is a kludge. */
850                                gettimeofday(&t, NULL);
851                                mb_header(++m->seqnum, (int) t.tv_sec, 'U', m->addr, newsrc, seq);
852                                mb_send(m);
853                                xfree(newsrc);
854                        } else if (strcmp(r, "U") == 0) {
855                                /* Unreliable message.... not need to do anything */
856                        } else {
857                                debug_msg("Message with invalid reliability field \"%s\" ignored\n", r);
858                        }
859                        /* ...and process the commands contained in the message */
860                        while (mbus_parse_sym(mp, &cmd)) {
861                                if (mbus_parse_lst(mp, &param)) {
862                                        char            *newsrc = (char *) xmalloc(strlen(src) + 3);
863                                        sprintf(newsrc, "(%s)", src);   /* Yes, this is a kludge. */
864                                        /* Finally, we snoop on the message we just passed to the application, */
865                                        /* to do housekeeping of our list of known mbus sources...             */
866                                        if (strcmp(cmd, "mbus.bye") == 0) {
867                                                remove_other_addr(m, newsrc);
868                                        }
869                                        if (strcmp(cmd, "mbus.hello") == 0) {
870                                                /* Mark this source as activ. We remove dead sources in mbus_heartbeat */
871                                                store_other_addr(m, newsrc);
872                                        }
873                                        m->cmd_handler(newsrc, cmd, param, data);
874                                        xfree(newsrc);
875                                } else {
876                                        debug_msg("Unable to parse mbus command:\n");
877                                        debug_msg("cmd = %s\n", cmd);
878                                        debug_msg("arg = %s\n", param);
879                                        break;
880                                }
881                        }
882                }
883                mbus_parse_done(mp);
884        }
885        return rx;
886}
887
888#define RZ_HANDLE_WAITING 1
889#define RZ_HANDLE_GO      2
890
891struct mbus_rz {
892        char            *peer;
893        char            *token;
894        struct mbus     *m;
895        void            *data;
896        int              mode;
897        void (*cmd_handler)(char *src, char *cmd, char *args, void *data);
898};
899
900static void rz_handler(char *src, char *cmd, char *args, void *data)
901{
902        struct mbus_rz          *r = (struct mbus_rz *) data;
903        struct mbus_parser      *mp;
904
905        if ((r->mode == RZ_HANDLE_WAITING) && (strcmp(cmd, "mbus.waiting") == 0)) {
906                char    *t;
907
908                mp = mbus_parse_init(args);
909                mbus_parse_str(mp, &t);
910                if (strcmp(mbus_decode_str(t), r->token) == 0) {
911                        if (r->peer != NULL) xfree(r->peer);
912                        r->peer = xstrdup(src);
913                }
914                mbus_parse_done(mp);
915        } else if ((r->mode == RZ_HANDLE_GO) && (strcmp(cmd, "mbus.go") == 0)) {
916                char    *t;
917
918                mp = mbus_parse_init(args);
919                mbus_parse_str(mp, &t);
920                if (strcmp(mbus_decode_str(t), r->token) == 0) {
921                        if (r->peer != NULL) xfree(r->peer);
922                        r->peer = xstrdup(src);
923                }
924                mbus_parse_done(mp);
925        } else {
926                r->cmd_handler(src, cmd, args, r->data);
927        }
928}
929
930char *mbus_rendezvous_waiting(struct mbus *m, char *addr, char *token, void *data)
931{
932        /* Loop, sending mbus.waiting(token) to "addr", until we get mbus.go(token) */
933        /* back from our peer. Any other mbus commands received whilst waiting are  */
934        /* processed in the normal manner, as if mbus_recv() had been called.       */
935        char            *token_e, *peer;
936        struct timeval   timeout;
937        struct mbus_rz  *r;
938
939        mbus_validate(m);
940
941        r = (struct mbus_rz *) xmalloc(sizeof(struct mbus_rz));
942        r->peer        = NULL;
943        r->token       = token;
944        r->m           = m;
945        r->data        = data;
946        r->mode        = RZ_HANDLE_GO;
947        r->cmd_handler = m->cmd_handler;
948        m->cmd_handler = rz_handler;
949        token_e        = mbus_encode_str(token);
950        while (r->peer == NULL) {
951                timeout.tv_sec  = 0;
952                timeout.tv_usec = 100000;
953                mbus_heartbeat(m, 1);
954                mbus_qmsgf(m, addr, FALSE, "mbus.waiting", "%s", token_e);
955                mbus_send(m);
956                mbus_recv(m, r, &timeout);
957                mbus_retransmit(m);
958        }
959        m->cmd_handler = r->cmd_handler;
960        peer = r->peer;
961        xfree(r);
962        xfree(token_e);
963        return peer;
964}
965
966char *mbus_rendezvous_go(struct mbus *m, char *token, void *data)
967{
968        /* Wait until we receive mbus.waiting(token), then send mbus.go(token) back to   */
969        /* the sender of that message. Whilst waiting, other mbus commands are processed */
970        /* in the normal manner as if mbus_recv() had been called.                       */
971        char            *token_e, *peer;
972        struct timeval   timeout;
973        struct mbus_rz  *r;
974
975        mbus_validate(m);
976
977        r = (struct mbus_rz *) xmalloc(sizeof(struct mbus_rz));
978        r->peer        = NULL;
979        r->token       = token;
980        r->m           = m;
981        r->data        = data;
982        r->mode        = RZ_HANDLE_WAITING;
983        r->cmd_handler = m->cmd_handler;
984        m->cmd_handler = rz_handler;
985        token_e        = mbus_encode_str(token);
986        while (r->peer == NULL) {
987                timeout.tv_sec  = 0;
988                timeout.tv_usec = 100000;
989                mbus_heartbeat(m, 1);
990                mbus_send(m);
991                mbus_recv(m, r, &timeout);
992                mbus_retransmit(m);
993        }
994
995        mbus_qmsgf(m, r->peer, TRUE, "mbus.go", "%s", token_e);
996        do {
997                mbus_heartbeat(m, 1);
998                mbus_retransmit(m);
999                mbus_send(m);
1000                timeout.tv_sec  = 0;
1001                timeout.tv_usec = 100000;
1002                mbus_recv(m, r, &timeout);
1003        } while (!mbus_sent_all(m));
1004
1005        m->cmd_handler = r->cmd_handler;
1006        peer = r->peer;
1007        xfree(r);
1008        xfree(token_e);
1009        return peer;
1010}
Note: See TracBrowser for help on using the browser.