root/common/trunk/src/mbus.c @ 502

Revision 502, 29.4 KB (checked in by ucaccsp, 14 years ago)

Fix sequence number generation: it was possible for a message to have the
same sequence number as the previous ACK.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*
2 * FILE:     mbus.c
3 * AUTHOR:   Colin Perkins
4 * MODIFIED: Orion Hodson
5 *           Markus Germeier
6 *
7 * Copyright (c) 1997-2000 University College London
8 * All rights reserved.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, is permitted provided that the following conditions
12 * are met:
13 * 1. Redistributions of source code must retain the above copyright
14 *    notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 *    notice, this list of conditions and the following disclaimer in the
17 *    documentation and/or other materials provided with the distribution.
18 * 3. All advertising materials mentioning features or use of this software
19 *    must display the following acknowledgement:
20 *      This product includes software developed by the Computer Science
21 *      Department at University College London
22 * 4. Neither the name of the University nor of the Department may be used
23 *    to endorse or promote products derived from this software without
24 *    specific prior written permission.
25 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
26 * ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
29 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
30 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
31 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
32 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
33 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
34 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35 * SUCH DAMAGE.
36 */
37
38#include "config_unix.h"
39#include "config_win32.h"
40#include "debug.h"
41#include "memory.h"
42#include "net_udp.h"
43#include "hmac.h"
44#include "qfDES.h"
45#include "base64.h"
46#include "gettimeofday.h"
47#include "mbus.h"
48#include "mbus_config.h"
49#include "mbus_parser.h"
50#include "mbus_addr.h"
51
52#define MBUS_BUF_SIZE     1500
53#define MBUS_ACK_BUF_SIZE 1500
54#define MBUS_MAX_ADDR       10
55#define MBUS_MAX_QLEN       50 /* Number of messages we can queue with mbus_qmsg() */
56
57#define MBUS_MAGIC      0x87654321
58#define MBUS_MSG_MAGIC  0x12345678
59
60#ifdef NEED_VSNPRINTF
61static int vsnprintf(char *s, size_t buf_size, const char *format, va_list ap)
62{
63        /* Quick hack replacement for vsnprintf... note that this */
64        /* doesn't check for buffer overflows, and so is open to  */
65        /* many really nasty attacks!                             */
66        UNUSED(buf_size);
67        return vsprintf(s,format,ap);
68}
69#endif
70
71struct mbus_msg {
72        struct mbus_msg *next;
73        struct timeval   send_time;     /* Time the message was sent, to trigger a retransmit */
74        struct timeval   comp_time;     /* Time the message was composed, the timestamp in the packet header */
75        char            *dest;
76        int              reliable;
77        int              complete;      /* Indicates that we've finished adding cmds to this message */
78        int              seqnum;
79        int              retransmit_count;
80        int              message_size;
81        int              num_cmds;
82        char            *cmd_list[MBUS_MAX_QLEN];
83        char            *arg_list[MBUS_MAX_QLEN];
84        uint32_t         idx_list[MBUS_MAX_QLEN];
85        uint32_t         magic;         /* For debugging... */
86};
87
88struct mbus {
89        socket_udp               *s;
90        char                     *addr;                         /* Addresses we respond to.                                     */
91        int                       max_other_addr;
92        int                       num_other_addr;
93        char                    **other_addr;                   /* Addresses of other entities on the mbus.                     */
94        struct timeval          **other_hello;                  /* Time of last mbus.hello we received from other entities      */
95        int                       seqnum;
96        struct mbus_msg          *cmd_queue;                    /* Queue of messages waiting to be sent */
97        struct mbus_msg          *waiting_ack;                  /* The last reliable message sent, if we have not yet got the ACK */
98        char                     *hashkey;
99        int                       hashkeylen;
100        char                     *encrkey;
101        int                       encrkeylen;
102        struct timeval            last_heartbeat;               /* Last time we sent a heartbeat message */
103        struct mbus_config       *cfg;
104        void (*cmd_handler)(char *src, char *cmd, char *arg, void *dat);
105        void (*err_handler)(int seqnum, int reason);
106        uint32_t                  magic;                        /* For debugging...                                             */
107        uint32_t                  index;
108        uint32_t                  index_sent;
109};
110
111static void mbus_validate(struct mbus *m)
112{
113#ifdef DEBUG
114        int     i;
115
116        assert(m->num_other_addr <= m->max_other_addr);
117        assert(m->num_other_addr >= 0);
118        for (i = 0; i < m->num_other_addr; i++) {
119                assert(m->other_addr[i]  != NULL);
120                assert(m->other_hello[i] != NULL);
121        }
122        for (i = m->num_other_addr + 1; i < m->max_other_addr; i++) {
123                assert(m->other_addr[i]  == NULL);
124                assert(m->other_hello[i] == NULL);
125        }
126#endif
127        assert(m->magic == MBUS_MAGIC);
128        xmemchk();
129}
130
131static void mbus_msg_validate(struct mbus_msg *m)
132{
133#ifdef DEBUG
134        int     i;
135
136        assert((m->num_cmds < MBUS_MAX_QLEN) && (m->num_cmds >= 0));
137        for (i = 0; i < m->num_cmds; i++) {
138                assert(m->cmd_list[i] != NULL);
139                assert(m->arg_list[i] != NULL);
140                if (i > 0) {
141                        assert(m->idx_list[i] > m->idx_list[i-1]);
142                }
143        }
144        for (i = m->num_cmds + 1; i < MBUS_MAX_QLEN; i++) {
145                assert(m->cmd_list[i] == NULL);
146                assert(m->arg_list[i] == NULL);
147        }       
148        assert(m->dest != NULL);
149#endif
150        assert(m->magic == MBUS_MSG_MAGIC);
151}
152
153static void store_other_addr(struct mbus *m, char *a)
154{
155        /* This takes the address a and ensures it is stored in the   */
156        /* m->other_addr field of the mbus structure. The other_addr  */
157        /* field should probably be a hash table, but for now we hope */
158        /* that there are not too many entities on the mbus, so the   */
159        /* list is small.                                             */
160        int     i;
161
162        mbus_validate(m);
163
164        for (i = 0; i < m->num_other_addr; i++) {
165                if (mbus_addr_match(m->other_addr[i], a)) {
166                        /* Already in the list... */
167                        gettimeofday(m->other_hello[i],NULL);
168                        return;
169                }
170        }
171
172        if (m->num_other_addr == m->max_other_addr) {
173                /* Expand the list... */
174                m->max_other_addr *= 2;
175                m->other_addr = (char **) xrealloc(m->other_addr, m->max_other_addr * sizeof(char *));
176                m->other_hello = (struct timeval **) xrealloc(m->other_hello, m->max_other_addr * sizeof(struct timeval *));
177        }
178        m->other_hello[m->num_other_addr]=(struct timeval *)xmalloc(sizeof(struct timeval));
179        gettimeofday(m->other_hello[m->num_other_addr],NULL);
180        m->other_addr[m->num_other_addr++] = xstrdup(a);
181}
182
183static void remove_other_addr(struct mbus *m, char *a)
184{
185        /* Removes the address a from the m->other_addr field of the */
186        /* mbus structure.                                           */
187        int     i, j;
188
189        mbus_validate(m);
190
191        for (i = 0; i < m->num_other_addr; i++) {
192                if (mbus_addr_match(m->other_addr[i], a)) {
193                        xfree(m->other_addr[i]);
194                        xfree(m->other_hello[i]);
195                        for (j = i+1; j < m->num_other_addr; j++) {
196                                m->other_addr[j-1] = m->other_addr[j];
197                                m->other_hello[j-1] = m->other_hello[j];
198                        }
199                        m->other_addr[m->num_other_addr  - 1] = NULL;
200                        m->other_hello[m->num_other_addr - 1] = NULL;
201                        m->num_other_addr--;
202                }
203        }
204}
205
206static void remove_inactiv_other_addr(struct mbus *m, struct timeval t, int interval){
207        /* Remove addresses we haven't heard from for about 5 * interval */
208        /* Count backwards so it is safe to remove entries               */
209        int i;
210   
211        mbus_validate(m);
212
213        for (i=m->num_other_addr-1; i>=0; i--){
214                if ((t.tv_sec-(m->other_hello[i]->tv_sec)) > 5 * interval) {
215                        debug_msg("remove dead entity (%s)\n", m->other_addr[i]);
216                        remove_other_addr(m, m->other_addr[i]);
217                }
218        }
219}
220
221int mbus_addr_valid(struct mbus *m, char *addr)
222{
223        int     i;
224
225        mbus_validate(m);
226
227        for (i = 0; i < m->num_other_addr; i++) {
228                if (mbus_addr_match(m->other_addr[i], addr)) {
229                        return TRUE;
230                }
231        }
232        return FALSE;
233}
234
235static int mbus_addr_unique(struct mbus *m, char *addr)
236{
237        int     i, n = 0;
238
239        mbus_validate(m);
240
241        for (i = 0; i < m->num_other_addr; i++) {
242                if (mbus_addr_match(m->other_addr[i], addr)) {
243                        n++;
244                }
245        }
246        return n==1;
247}
248
249/* The mb_* functions are used to build an mbus message up in the */
250/* mb_buffer, and to add authentication and encryption before the */
251/* message is sent.                                               */
252char     mb_cryptbuf[MBUS_BUF_SIZE];
253char    *mb_buffer;
254char    *mb_bufpos;
255
256#define MBUS_AUTH_LEN 16
257
258static void mb_header(int seqnum, int ts, char reliable, const char *src, const char *dst, int ackseq)
259{
260        xmemchk();
261        mb_buffer   = (char *) xmalloc(MBUS_BUF_SIZE + 1);
262        memset(mb_buffer,   0, MBUS_BUF_SIZE);
263        memset(mb_buffer, ' ', MBUS_AUTH_LEN);
264        mb_bufpos = mb_buffer + MBUS_AUTH_LEN;
265        sprintf(mb_bufpos, "\nmbus/1.0 %6d %9d %c (%s) %s ", seqnum, ts, reliable, src, dst);
266        mb_bufpos += 33 + strlen(src) + strlen(dst);
267        if (ackseq == -1) {
268                sprintf(mb_bufpos, "()\n");
269                mb_bufpos += 3;
270        } else {
271                sprintf(mb_bufpos, "(%6d)\n", ackseq);
272                mb_bufpos += 9;
273        }
274}
275
276static void mb_add_command(const char *cmnd, const char *args)
277{
278        int offset = strlen(cmnd) + strlen(args) + 5;
279
280        assert((mb_bufpos + offset - mb_buffer) < MBUS_BUF_SIZE);
281
282        sprintf(mb_bufpos, "%s (%s)\n", cmnd, args);
283        mb_bufpos += offset - 1; /* The -1 in offset means we're not NUL terminated - fix in mb_send */
284}
285
286static void mb_send(struct mbus *m)
287{
288        char            digest[16];
289        int             len;
290        unsigned char   initVec[8] = {0,0,0,0,0,0,0,0};
291 
292        mbus_validate(m);
293
294        *(mb_bufpos++) = '\0';
295        assert((mb_bufpos - mb_buffer) < MBUS_BUF_SIZE);
296        assert(strlen(mb_buffer) < MBUS_BUF_SIZE);
297
298        /* Pad to a multiple of 8 bytes, so the encryption can work... */
299        while (((mb_bufpos - mb_buffer) % 8) != 0) {
300                *(mb_bufpos++) = '\0';
301        }
302        len = mb_bufpos - mb_buffer;
303        assert(len < MBUS_BUF_SIZE);
304        assert(strlen(mb_buffer) < MBUS_BUF_SIZE);
305
306        xmemchk();
307        if (m->hashkey != NULL) {
308                /* Authenticate... */
309                hmac_md5(mb_buffer + MBUS_AUTH_LEN+1, strlen(mb_buffer) - (MBUS_AUTH_LEN+1), m->hashkey, m->hashkeylen, digest);
310                base64encode(digest, 12, mb_buffer, MBUS_AUTH_LEN);
311        }
312        xmemchk();
313        if (m->encrkey != NULL) {
314                /* Encrypt... */
315                memset(mb_cryptbuf, 0, MBUS_BUF_SIZE);
316                memcpy(mb_cryptbuf, mb_buffer, len);
317                assert((len % 8) == 0);
318                assert(len < MBUS_BUF_SIZE);
319                assert(m->encrkeylen == 8);
320                xmemchk();
321                qfDES_CBC_e(m->encrkey, mb_cryptbuf, len, initVec);
322                xmemchk();
323                memcpy(mb_buffer, mb_cryptbuf, len);
324        }
325        xmemchk();
326        udp_send(m->s, mb_buffer, len);
327        xfree(mb_buffer);
328}
329
330static void resend(struct mbus *m, struct mbus_msg *curr)
331{
332        /* Don't need to check for buffer overflows: this was done in mbus_send() when */
333        /* this message was first transmitted. If it was okay then, it's okay now.     */
334        int      i;
335
336        mbus_validate(m);
337
338        mb_header(curr->seqnum, curr->comp_time.tv_sec, (char)(curr->reliable?'R':'U'), m->addr, curr->dest, -1);
339        for (i = 0; i < curr->num_cmds; i++) {
340                mb_add_command(curr->cmd_list[i], curr->arg_list[i]);
341        }
342        mb_send(m);
343        curr->retransmit_count++;
344}
345
346void mbus_retransmit(struct mbus *m)
347{
348        struct mbus_msg *curr = m->waiting_ack;
349        struct timeval  time;
350        long            diff;
351
352        mbus_validate(m);
353
354        if (!mbus_waiting_ack(m)) {
355                return;
356        }
357
358        mbus_msg_validate(curr);
359
360        gettimeofday(&time, NULL);
361
362        /* diff is time in milliseconds that the message has been awaiting an ACK */
363        diff = ((time.tv_sec * 1000) + (time.tv_usec / 1000)) - ((curr->send_time.tv_sec * 1000) + (curr->send_time.tv_usec / 1000));
364        if (diff > 10000) {
365                debug_msg("Reliable mbus message failed!\n");
366                if (m->err_handler == NULL) {
367                        abort();
368                }
369                m->err_handler(curr->seqnum, MBUS_MESSAGE_LOST);
370                /* if we don't delete this failed message, the error handler
371                   gets triggered every time we call mbus_retransmit */
372                while (m->waiting_ack->num_cmds > 0) {
373                    m->waiting_ack->num_cmds--;
374                    xfree(m->waiting_ack->cmd_list[m->waiting_ack->num_cmds]);
375                    xfree(m->waiting_ack->arg_list[m->waiting_ack->num_cmds]);
376                }
377                xfree(m->waiting_ack->dest);
378                xfree(m->waiting_ack);
379                m->waiting_ack = NULL;
380                return;
381        }
382        /* Note: We only send one retransmission each time, to avoid
383         * overflowing the receiver with a burst of requests...
384         */
385        if ((diff > 750) && (curr->retransmit_count == 2)) {
386                resend(m, curr);
387                return;
388        }
389        if ((diff > 500) && (curr->retransmit_count == 1)) {
390                resend(m, curr);
391                return;
392        }
393        if ((diff > 250) && (curr->retransmit_count == 0)) {
394                resend(m, curr);
395                return;
396        }
397}
398
399void mbus_heartbeat(struct mbus *m, int interval)
400{
401        struct timeval  curr_time;
402        char    *a = (char *) xmalloc(3);
403        sprintf(a, "()");
404
405        mbus_validate(m);
406
407        gettimeofday(&curr_time, NULL);
408        if (curr_time.tv_sec - m->last_heartbeat.tv_sec >= interval) {
409                mb_header(++m->seqnum, (int) curr_time.tv_sec, 'U', m->addr, "()", -1);
410                mb_add_command("mbus.hello", "");
411                mb_send(m);
412
413                m->last_heartbeat = curr_time;
414                /* Remove dead sources */
415                remove_inactiv_other_addr(m, curr_time, interval);
416        }
417        xfree(a);
418}
419
420int mbus_waiting_ack(struct mbus *m)
421{
422        mbus_validate(m);
423        return m->waiting_ack != NULL;
424}
425
426int mbus_sent_all(struct mbus *m)
427{
428        mbus_validate(m);
429        return (m->cmd_queue == NULL) && (m->waiting_ack == NULL);
430}
431
432struct mbus *mbus_init(void  (*cmd_handler)(char *src, char *cmd, char *arg, void *dat),
433                       void  (*err_handler)(int seqnum, int reason),
434                       char  *addr)
435{
436        struct mbus             *m;
437        struct mbus_key          k;
438        struct mbus_parser      *mp;
439        int                      i;
440        char                    *net_addr, *tmp;
441        uint16_t                 net_port;
442        int                      net_scope;
443
444        m = (struct mbus *) xmalloc(sizeof(struct mbus));
445        if (m == NULL) {
446                debug_msg("Unable to allocate memory for mbus\n");
447                return NULL;
448        }
449
450        m->cfg = mbus_create_config();
451        mbus_lock_config_file(m->cfg);
452        net_addr = (char *) xmalloc(20);
453        mbus_get_net_addr(m->cfg, net_addr, &net_port, &net_scope);
454        m->s              = udp_init(net_addr, net_port, net_port, net_scope);
455        if (m->s == NULL) {
456                debug_msg("Unable to initialize mbus address\n");
457                xfree(m);
458                return NULL;
459        }
460        m->seqnum         = 0;
461        m->cmd_handler    = cmd_handler;
462        m->err_handler    = err_handler;
463        m->num_other_addr = 0;
464        m->max_other_addr = 10;
465        m->other_addr     = (char **) xmalloc(sizeof(char *) * 10);
466        m->other_hello    = (struct timeval **) xmalloc(sizeof(struct timeval *) * 10);
467        for (i = 0; i < 10; i++) {
468                m->other_addr[i]  = NULL;
469                m->other_hello[i] = NULL;
470        }
471        m->cmd_queue      = NULL;
472        m->waiting_ack    = NULL;
473        m->magic          = MBUS_MAGIC;
474        m->index          = 0;
475        m->index_sent     = 0;
476
477        mp = mbus_parse_init(xstrdup(addr));
478        if (!mbus_parse_lst(mp, &tmp)) {
479                debug_msg("Invalid mbus address\n");
480                abort();
481        }
482        m->addr = xstrdup(tmp);
483        mbus_parse_done(mp);
484        assert(m->addr != NULL);
485
486        gettimeofday(&(m->last_heartbeat), NULL);
487
488        mbus_get_encrkey(m->cfg, &k);
489        m->encrkey    = k.key;
490        m->encrkeylen = k.key_len;
491
492        mbus_get_hashkey(m->cfg, &k);
493        m->hashkey    = k.key;
494        m->hashkeylen = k.key_len;
495
496        mbus_unlock_config_file(m->cfg);
497
498        xfree(net_addr);
499
500        return m;
501}
502
503void mbus_cmd_handler(struct mbus *m, void  (*cmd_handler)(char *src, char *cmd, char *arg, void *dat))
504{
505        mbus_validate(m);
506        m->cmd_handler = cmd_handler;
507}
508
509static void mbus_flush_msgs(struct mbus_msg **queue)
510{
511        struct mbus_msg *curr, *next;
512        int i;
513       
514        curr = *queue;
515        while(curr) {
516                next = curr->next;
517                xfree(curr->dest);
518                for(i = 0; i < curr->num_cmds; i++) {
519                        xfree(curr->cmd_list[i]);
520                        xfree(curr->arg_list[i]);
521                }
522                xfree(curr);
523                curr = next;
524        }
525        *queue = NULL;
526}
527
528void mbus_exit(struct mbus *m)
529{
530        int i;
531
532        assert(m != NULL);
533        mbus_validate(m);
534
535        mbus_qmsg(m, "()", "mbus.bye", "", FALSE);
536        mbus_send(m);
537
538        /* FIXME: It should be a fatal error to call mbus_exit() if some messages are still outstanding. */
539        /*        We will need an mbus_flush() call first though, to ensure nothing is waiting.          */
540        mbus_flush_msgs(&m->cmd_queue);
541        mbus_flush_msgs(&m->waiting_ack);
542
543        if (m->encrkey != NULL) {
544                xfree(m->encrkey);
545        }
546        if (m->hashkey != NULL) {
547                xfree(m->hashkey);
548        }
549
550        udp_exit(m->s);
551
552        /* Clean up other_* */
553        for (i=m->num_other_addr-1; i>=0; i--){
554            remove_other_addr(m, m->other_addr[i]);
555        }
556
557        xfree(m->addr);
558        xfree(m->other_addr);
559        xfree(m->other_hello);
560        xfree(m->cfg);
561        xfree(m);
562}
563
564void mbus_send(struct mbus *m)
565{
566        /* Send one, or more, messages previosly queued with mbus_qmsg(). */
567        /* Messages for the same destination are batched together. Stops  */
568        /* when a reliable message is sent, until the ACK is received.    */
569        struct mbus_msg *curr = m->cmd_queue;
570        int              i;
571
572        mbus_validate(m);
573        if (m->waiting_ack != NULL) {
574                return;
575        }
576
577        while (curr != NULL) {
578                mbus_msg_validate(curr);
579                /* It's okay for us to send messages which haven't been marked as complete - */
580                /* that just means we're sending something which has the potential to have   */
581                /* more data piggybacked. However, if it's not complete it MUST be the last  */
582                /* in the list, or something has been reordered - which is bad.              */
583                if (!curr->complete) {
584                        assert(curr->next == NULL);
585                }
586
587                if (curr->reliable) {
588                        if (!mbus_addr_valid(m, curr->dest)) {
589                            debug_msg("Trying to send reliably to an unknown address...\n");
590                            if (m->err_handler == NULL) {
591                                abort();
592                            }
593                            m->err_handler(curr->seqnum, MBUS_DESTINATION_UNKNOWN);
594                        }
595                        if (!mbus_addr_unique(m, curr->dest)) {
596                            debug_msg("Trying to send reliably but address is not unique...\n");
597                            if (m->err_handler == NULL) {
598                                abort();
599                            }
600                            m->err_handler(curr->seqnum, MBUS_DESTINATION_NOT_UNIQUE);
601                        }
602                }
603                /* Create the message... */
604                mb_header(curr->seqnum, curr->comp_time.tv_sec, (char)(curr->reliable?'R':'U'), m->addr, curr->dest, -1);
605                for (i = 0; i < curr->num_cmds; i++) {
606                        assert(m->index_sent == (curr->idx_list[i] - 1));
607                        m->index_sent = curr->idx_list[i];
608                        mb_add_command(curr->cmd_list[i], curr->arg_list[i]);
609                }
610                mb_send(m);
611               
612                m->cmd_queue = curr->next;
613                if (curr->reliable) {
614                        /* Reliable message, wait for the ack... */
615                        gettimeofday(&(curr->send_time), NULL);
616                        m->waiting_ack = curr;
617                        curr->next = NULL;
618                        return;
619                } else {
620                        while (curr->num_cmds > 0) {
621                                curr->num_cmds--;
622                                xfree(curr->cmd_list[curr->num_cmds]); curr->cmd_list[curr->num_cmds] = NULL;
623                                xfree(curr->arg_list[curr->num_cmds]); curr->arg_list[curr->num_cmds] = NULL;
624                        }
625                        xfree(curr->dest);
626                        xfree(curr);
627                }
628                curr = m->cmd_queue;
629        }
630}
631
632void mbus_qmsg(struct mbus *m, const char *dest, const char *cmnd, const char *args, int reliable)
633{
634        /* Queue up a message for sending. The message is not */
635        /* actually sent until mbus_send() is called.         */
636        struct mbus_msg *curr = m->cmd_queue;
637        struct mbus_msg *prev = NULL;
638        int              alen = strlen(cmnd) + strlen(args) + 4;
639        int              i;
640
641        mbus_validate(m);
642        while (curr != NULL) {
643                mbus_msg_validate(curr);
644                if (!curr->complete) {
645                        /* This message is still open for new commands. It MUST be the last in the */
646                        /* cmd_queue, else commands will be reordered.                             */
647                        assert(curr->next == NULL);
648                        if (mbus_addr_identical(curr->dest, dest) &&
649                            (curr->num_cmds < MBUS_MAX_QLEN) && ((curr->message_size + alen) < (MBUS_BUF_SIZE - 500))) {
650                                curr->num_cmds++;
651                                curr->reliable |= reliable;
652                                curr->cmd_list[curr->num_cmds-1] = xstrdup(cmnd);
653                                curr->arg_list[curr->num_cmds-1] = xstrdup(args);
654                                curr->idx_list[curr->num_cmds-1] = ++(m->index);
655                                curr->message_size += alen;
656                                mbus_msg_validate(curr);
657                                return;
658                        } else {
659                                curr->complete = TRUE;
660                        }
661                }
662                prev = curr;
663                curr = curr->next;
664        }
665        /* If we get here, we've not found an open message in the cmd_queue.  We */
666        /* have to create a new message, and add it to the end of the cmd_queue. */
667        curr = (struct mbus_msg *) xmalloc(sizeof(struct mbus_msg));
668        curr->magic            = MBUS_MSG_MAGIC;
669        curr->next             = NULL;
670        curr->dest             = xstrdup(dest);
671        curr->retransmit_count = 0;
672        curr->message_size     = alen + 60 + strlen(dest) + strlen(m->addr);
673        curr->seqnum           = ++m->seqnum;
674        curr->reliable         = reliable;
675        curr->complete         = FALSE;
676        curr->num_cmds         = 1;
677        curr->cmd_list[0]      = xstrdup(cmnd);
678        curr->arg_list[0]      = xstrdup(args);
679        curr->idx_list[curr->num_cmds-1] = ++(m->index);
680        for (i = 1; i < MBUS_MAX_QLEN; i++) {
681                curr->cmd_list[i] = NULL;
682                curr->arg_list[i] = NULL;
683        }
684        if (prev == NULL) {
685                m->cmd_queue = curr;
686        } else {
687                prev->next = curr;
688        }
689        gettimeofday(&(curr->send_time), NULL);
690        gettimeofday(&(curr->comp_time), NULL);
691        mbus_msg_validate(curr);
692}
693
694void mbus_qmsgf(struct mbus *m, const char *dest, int reliable, const char *cmnd, const char *format, ...)
695{
696        /* This is a wrapper around mbus_qmsg() which does a printf() style format into  */
697        /* a buffer. Saves the caller from having to a a malloc(), write the args string */
698        /* and then do a free(), and also saves worring about overflowing the buffer, so */
699        /* removing a common source of bugs!                                             */
700        char    buffer[MBUS_BUF_SIZE];
701        va_list ap;
702
703        mbus_validate(m);
704        va_start(ap, format);
705#ifdef WIN32
706        _vsnprintf(buffer, MBUS_BUF_SIZE, format, ap);
707#else
708        vsnprintf(buffer, MBUS_BUF_SIZE, format, ap);
709#endif
710        va_end(ap);
711        mbus_qmsg(m, dest, cmnd, buffer, reliable);
712}
713
714int mbus_recv(struct mbus *m, void *data, struct timeval *timeout)
715{
716        char                    *auth, *ver, *src, *dst, *ack, *r, *cmd, *param, *npos;
717        char                     buffer[MBUS_BUF_SIZE];
718        int                      buffer_len, seq, a, rx, ts, authlen, loop_count;
719        char                     ackbuf[MBUS_ACK_BUF_SIZE];
720        char                     digest[16];
721        unsigned char            initVec[8] = {0,0,0,0,0,0,0,0};
722        struct timeval           t;
723        struct mbus_parser      *mp, *mp2;
724
725        mbus_validate(m);
726
727        rx = FALSE;
728        loop_count = 0;
729        while (loop_count++ < 10) {
730                memset(buffer, 0, MBUS_BUF_SIZE);
731                assert(m->s != NULL);
732                udp_fd_zero();
733                udp_fd_set(m->s);
734                t.tv_sec  = timeout->tv_sec;
735                t.tv_usec = timeout->tv_usec;
736                if ((udp_select(&t) > 0) && udp_fd_isset(m->s)) {
737                        buffer_len = udp_recv(m->s, buffer, MBUS_BUF_SIZE);
738                        if (buffer_len > 0) {
739                                rx = TRUE;
740                        } else {
741                                return rx;
742                        }
743                } else {
744                        return FALSE;
745                }
746
747                if (m->encrkey != NULL) {
748                        /* Decrypt the message... */
749                        if ((buffer_len % 8) != 0) {
750                                debug_msg("Encrypted message not a multiple of 8 bytes in length\n");
751                                continue;
752                        }
753                        memcpy(mb_cryptbuf, buffer, buffer_len);
754                        memset(initVec, 0, 8);
755                        qfDES_CBC_d(m->encrkey, mb_cryptbuf, buffer_len, initVec);
756                        memcpy(buffer, mb_cryptbuf, buffer_len);
757                }
758
759                /* Sanity check that this is a vaguely sensible format message... Should prevent */
760                /* problems if we're fed complete garbage, but won't prevent determined hackers. */
761                if (strncmp(buffer + MBUS_AUTH_LEN + 1, "mbus/1.0", 8) != 0) {
762                        continue;
763                }
764
765                mp = mbus_parse_init(buffer);
766                /* remove trailing 0 bytes */
767                npos = (char *) strchr(buffer,'\0');
768                if(npos!=NULL) {
769                        buffer_len=npos-buffer;
770                }
771                /* Parse the authentication header */
772                if (!mbus_parse_sym(mp, &auth)) {
773                        debug_msg("Failed to parse authentication header\n");
774                        mbus_parse_done(mp);
775                        continue;
776                }
777
778                /* Check that the packet authenticates correctly... */
779                authlen = strlen(auth);
780                hmac_md5(buffer + authlen + 1, buffer_len - authlen - 1, m->hashkey, m->hashkeylen, digest);
781                base64encode(digest, 12, ackbuf, 16);
782                if ((strlen(auth) != 16) || (strncmp(auth, ackbuf, 16) != 0)) {
783                        debug_msg("Failed to authenticate message...\n");
784                        mbus_parse_done(mp);
785                        continue;
786                }
787
788                /* Parse the header */
789                if (!mbus_parse_sym(mp, &ver)) {
790                        mbus_parse_done(mp);
791                        debug_msg("Parser failed version (1): %s\n",ver);
792                        continue;
793                }
794                if (strcmp(ver, "mbus/1.0") != 0) {
795                        mbus_parse_done(mp);
796                        debug_msg("Parser failed version (2): %s\n",ver);
797                        continue;
798                }
799                if (!mbus_parse_int(mp, &seq)) {
800                        mbus_parse_done(mp);
801                        debug_msg("Parser failed seq\n");
802                        continue;
803                }
804                if (!mbus_parse_int(mp, &ts)) {
805                        mbus_parse_done(mp);
806                        debug_msg("Parser failed ts\n");
807                        continue;
808                }
809                if (!mbus_parse_sym(mp, &r)) {
810                        mbus_parse_done(mp);
811                        debug_msg("Parser failed reliable\n");
812                        continue;
813                }
814                if (!mbus_parse_lst(mp, &src)) {
815                        mbus_parse_done(mp);
816                        debug_msg("Parser failed src\n");
817                        continue;
818                }
819                if (!mbus_parse_lst(mp, &dst)) {
820                        mbus_parse_done(mp);
821                        debug_msg("Parser failed dst\n");
822                        continue;
823                }
824                if (!mbus_parse_lst(mp, &ack)) {
825                        mbus_parse_done(mp);
826                        debug_msg("Parser failed ack\n");
827                        continue;
828                }
829
830                store_other_addr(m, src);
831
832                /* Check if the message was addressed to us... */
833                if (mbus_addr_match(m->addr, dst)) {
834                        /* ...if so, process any ACKs received... */
835                        mp2 = mbus_parse_init(ack);
836                        while (mbus_parse_int(mp2, &a)) {
837                                if (mbus_waiting_ack(m)) {
838                                        if (m->waiting_ack->seqnum == a) {
839                                                while (m->waiting_ack->num_cmds > 0) {
840                                                        m->waiting_ack->num_cmds--;
841                                                        xfree(m->waiting_ack->cmd_list[m->waiting_ack->num_cmds]);
842                                                        xfree(m->waiting_ack->arg_list[m->waiting_ack->num_cmds]);
843                                                }
844                                                xfree(m->waiting_ack->dest);
845                                                xfree(m->waiting_ack);
846                                                m->waiting_ack = NULL;
847                                        } else {
848                                                debug_msg("Got ACK %d but wanted %d\n", a, m->waiting_ack->seqnum);
849                                        }
850                                } else {
851                                        debug_msg("Got ACK %d but wasn't expecting it\n", a);
852                                }
853                        }
854                        mbus_parse_done(mp2);
855                        /* ...if an ACK was requested, send one... */
856                        if (strcmp(r, "R") == 0) {
857                                char            *newsrc = (char *) xmalloc(strlen(src) + 3);
858                                struct timeval   t;
859
860                                sprintf(newsrc, "(%s)", src);   /* Yes, this is a kludge. */
861                                gettimeofday(&t, NULL);
862                                mb_header(++m->seqnum, (int) t.tv_sec, 'U', m->addr, newsrc, seq);
863                                mb_send(m);
864                                xfree(newsrc);
865                        } else if (strcmp(r, "U") == 0) {
866                                /* Unreliable message.... not need to do anything */
867                        } else {
868                                debug_msg("Message with invalid reliability field \"%s\" ignored\n", r);
869                        }
870                        /* ...and process the commands contained in the message */
871                        while (mbus_parse_sym(mp, &cmd)) {
872                                if (mbus_parse_lst(mp, &param)) {
873                                        char            *newsrc = (char *) xmalloc(strlen(src) + 3);
874                                        sprintf(newsrc, "(%s)", src);   /* Yes, this is a kludge. */
875                                        /* Finally, we snoop on the message we just passed to the application, */
876                                        /* to do housekeeping of our list of known mbus sources...             */
877                                        if (strcmp(cmd, "mbus.bye") == 0) {
878                                                remove_other_addr(m, newsrc);
879                                        }
880                                        if (strcmp(cmd, "mbus.hello") == 0) {
881                                                /* Mark this source as activ. We remove dead sources in mbus_heartbeat */
882                                                store_other_addr(m, newsrc);
883                                        }
884                                        m->cmd_handler(newsrc, cmd, param, data);
885                                        xfree(newsrc);
886                                } else {
887                                        debug_msg("Unable to parse mbus command:\n");
888                                        debug_msg("cmd = %s\n", cmd);
889                                        debug_msg("arg = %s\n", param);
890                                        break;
891                                }
892                        }
893                }
894                mbus_parse_done(mp);
895        }
896        return rx;
897}
898
899#define RZ_HANDLE_WAITING 1
900#define RZ_HANDLE_GO      2
901
902struct mbus_rz {
903        char            *peer;
904        char            *token;
905        struct mbus     *m;
906        void            *data;
907        int              mode;
908        void (*cmd_handler)(char *src, char *cmd, char *args, void *data);
909};
910
911static void rz_handler(char *src, char *cmd, char *args, void *data)
912{
913        struct mbus_rz          *r = (struct mbus_rz *) data;
914        struct mbus_parser      *mp;
915
916        if ((r->mode == RZ_HANDLE_WAITING) && (strcmp(cmd, "mbus.waiting") == 0)) {
917                char    *t;
918
919                mp = mbus_parse_init(args);
920                mbus_parse_str(mp, &t);
921                if (strcmp(mbus_decode_str(t), r->token) == 0) {
922                        if (r->peer != NULL) xfree(r->peer);
923                        r->peer = xstrdup(src);
924                }
925                mbus_parse_done(mp);
926        } else if ((r->mode == RZ_HANDLE_GO) && (strcmp(cmd, "mbus.go") == 0)) {
927                char    *t;
928
929                mp = mbus_parse_init(args);
930                mbus_parse_str(mp, &t);
931                if (strcmp(mbus_decode_str(t), r->token) == 0) {
932                        if (r->peer != NULL) xfree(r->peer);
933                        r->peer = xstrdup(src);
934                }
935                mbus_parse_done(mp);
936        } else {
937                r->cmd_handler(src, cmd, args, r->data);
938        }
939}
940
941char *mbus_rendezvous_waiting(struct mbus *m, char *addr, char *token, void *data)
942{
943        /* Loop, sending mbus.waiting(token) to "addr", until we get mbus.go(token) */
944        /* back from our peer. Any other mbus commands received whilst waiting are  */
945        /* processed in the normal manner, as if mbus_recv() had been called.       */
946        char            *token_e, *peer;
947        struct timeval   timeout;
948        struct mbus_rz  *r;
949
950        mbus_validate(m);
951
952        r = (struct mbus_rz *) xmalloc(sizeof(struct mbus_rz));
953        r->peer        = NULL;
954        r->token       = token;
955        r->m           = m;
956        r->data        = data;
957        r->mode        = RZ_HANDLE_GO;
958        r->cmd_handler = m->cmd_handler;
959        m->cmd_handler = rz_handler;
960        token_e        = mbus_encode_str(token);
961        while (r->peer == NULL) {
962                timeout.tv_sec  = 0;
963                timeout.tv_usec = 100000;
964                mbus_heartbeat(m, 1);
965                mbus_qmsgf(m, addr, FALSE, "mbus.waiting", "%s", token_e);
966                mbus_send(m);
967                mbus_recv(m, r, &timeout);
968                mbus_retransmit(m);
969        }
970        m->cmd_handler = r->cmd_handler;
971        peer = r->peer;
972        xfree(r);
973        xfree(token_e);
974        return peer;
975}
976
977char *mbus_rendezvous_go(struct mbus *m, char *token, void *data)
978{
979        /* Wait until we receive mbus.waiting(token), then send mbus.go(token) back to   */
980        /* the sender of that message. Whilst waiting, other mbus commands are processed */
981        /* in the normal manner as if mbus_recv() had been called.                       */
982        char            *token_e, *peer;
983        struct timeval   timeout;
984        struct mbus_rz  *r;
985
986        mbus_validate(m);
987
988        r = (struct mbus_rz *) xmalloc(sizeof(struct mbus_rz));
989        r->peer        = NULL;
990        r->token       = token;
991        r->m           = m;
992        r->data        = data;
993        r->mode        = RZ_HANDLE_WAITING;
994        r->cmd_handler = m->cmd_handler;
995        m->cmd_handler = rz_handler;
996        token_e        = mbus_encode_str(token);
997        while (r->peer == NULL) {
998                timeout.tv_sec  = 0;
999                timeout.tv_usec = 100000;
1000                mbus_heartbeat(m, 1);
1001                mbus_send(m);
1002                mbus_recv(m, r, &timeout);
1003                mbus_retransmit(m);
1004        }
1005
1006        mbus_qmsgf(m, r->peer, TRUE, "mbus.go", "%s", token_e);
1007        do {
1008                mbus_heartbeat(m, 1);
1009                mbus_retransmit(m);
1010                mbus_send(m);
1011                timeout.tv_sec  = 0;
1012                timeout.tv_usec = 100000;
1013                mbus_recv(m, r, &timeout);
1014        } while (!mbus_sent_all(m));
1015
1016        m->cmd_handler = r->cmd_handler;
1017        peer = r->peer;
1018        xfree(r);
1019        xfree(token_e);
1020        return peer;
1021}
Note: See TracBrowser for help on using the browser.