root/common/trunk/src/mbus.c @ 474

Revision 474, 29.1 KB (checked in by ucacoxh, 14 years ago)

- Reverse out last change (message flushing). All I want is for Win32

to release the device cleanly otherwise it breaks some drivers. I
guess we put a device reconfigure on reception of bye and exit.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*
2 * FILE:     mbus.c
3 * AUTHOR:   Colin Perkins
4 * MODIFIED: Orion Hodson
5 *           Markus Germeier
6 *
7 * Copyright (c) 1997-2000 University College London
8 * All rights reserved.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, is permitted provided that the following conditions
12 * are met:
13 * 1. Redistributions of source code must retain the above copyright
14 *    notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 *    notice, this list of conditions and the following disclaimer in the
17 *    documentation and/or other materials provided with the distribution.
18 * 3. All advertising materials mentioning features or use of this software
19 *    must display the following acknowledgement:
20 *      This product includes software developed by the Computer Science
21 *      Department at University College London
22 * 4. Neither the name of the University nor of the Department may be used
23 *    to endorse or promote products derived from this software without
24 *    specific prior written permission.
25 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
26 * ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
29 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
30 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
31 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
32 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
33 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
34 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35 * SUCH DAMAGE.
36 */
37
38#include "config_unix.h"
39#include "config_win32.h"
40#include "debug.h"
41#include "memory.h"
42#include "net_udp.h"
43#include "hmac.h"
44#include "qfDES.h"
45#include "base64.h"
46#include "gettimeofday.h"
47#include "mbus.h"
48#include "mbus_config.h"
49#include "mbus_parser.h"
50#include "mbus_addr.h"
51
52#define MBUS_BUF_SIZE     1500
53#define MBUS_ACK_BUF_SIZE 1500
54#define MBUS_MAX_ADDR       10
55#define MBUS_MAX_QLEN       50 /* Number of messages we can queue with mbus_qmsg() */
56
57#define MBUS_MAGIC      0x87654321
58#define MBUS_MSG_MAGIC  0x12345678
59
60#ifdef NEED_VSNPRINTF
61static int vsnprintf(char *s, size_t buf_size, const char *format, va_list ap)
62{
63        /* Quick hack replacement for vsnprintf... note that this */
64        /* doesn't check for buffer overflows, and so is open to  */
65        /* many really nasty attacks!                             */
66        UNUSED(buf_size);
67        return vsprintf(s,format,ap);
68}
69#endif
70
71struct mbus_msg {
72        struct mbus_msg *next;
73        struct timeval   send_time;     /* Time the message was sent, to trigger a retransmit */
74        struct timeval   comp_time;     /* Time the message was composed, the timestamp in the packet header */
75        char            *dest;
76        int              reliable;
77        int              complete;      /* Indicates that we've finished adding cmds to this message */
78        int              seqnum;
79        int              retransmit_count;
80        int              message_size;
81        int              num_cmds;
82        char            *cmd_list[MBUS_MAX_QLEN];
83        char            *arg_list[MBUS_MAX_QLEN];
84        uint32_t         idx_list[MBUS_MAX_QLEN];
85        uint32_t         magic;         /* For debugging... */
86};
87
88struct mbus {
89        socket_udp               *s;
90        char                     *addr;                         /* Addresses we respond to.                                     */
91        int                       max_other_addr;
92        int                       num_other_addr;
93        char                    **other_addr;                   /* Addresses of other entities on the mbus.                     */
94        struct timeval          **other_hello;                  /* Time of last mbus.hello we received from other entities      */
95        int                       seqnum;
96        struct mbus_msg          *cmd_queue;                    /* Queue of messages waiting to be sent */
97        struct mbus_msg          *waiting_ack;                  /* The last reliable message sent, if we have not yet got the ACK */
98        char                     *hashkey;
99        int                       hashkeylen;
100        char                     *encrkey;
101        int                       encrkeylen;
102        struct timeval            last_heartbeat;               /* Last time we sent a heartbeat message */
103        struct mbus_config       *cfg;
104        void (*cmd_handler)(char *src, char *cmd, char *arg, void *dat);
105        void (*err_handler)(int seqnum, int reason);
106        uint32_t                  magic;                        /* For debugging...                                             */
107        uint32_t                  index;
108        uint32_t                  index_sent;
109};
110
111static void mbus_validate(struct mbus *m)
112{
113#ifdef DEBUG
114        int     i;
115
116        assert(m->num_other_addr <= m->max_other_addr);
117        assert(m->num_other_addr >= 0);
118        for (i = 0; i < m->num_other_addr; i++) {
119                assert(m->other_addr[i]  != NULL);
120                assert(m->other_hello[i] != NULL);
121        }
122        for (i = m->num_other_addr + 1; i < m->max_other_addr; i++) {
123                assert(m->other_addr[i]  == NULL);
124                assert(m->other_hello[i] == NULL);
125        }
126#endif
127        assert(m->magic == MBUS_MAGIC);
128}
129
130static void mbus_msg_validate(struct mbus_msg *m)
131{
132#ifdef DEBUG
133        int     i;
134
135        assert((m->num_cmds < MBUS_MAX_QLEN) && (m->num_cmds >= 0));
136        for (i = 0; i < m->num_cmds; i++) {
137                assert(m->cmd_list[i] != NULL);
138                assert(m->arg_list[i] != NULL);
139                if (i > 0) {
140                        assert(m->idx_list[i] > m->idx_list[i-1]);
141                }
142        }
143        for (i = m->num_cmds + 1; i < MBUS_MAX_QLEN; i++) {
144                assert(m->cmd_list[i] == NULL);
145                assert(m->arg_list[i] == NULL);
146        }       
147        assert(m->dest != NULL);
148#endif
149        assert(m->magic == MBUS_MSG_MAGIC);
150}
151
152static void store_other_addr(struct mbus *m, char *a)
153{
154        /* This takes the address a and ensures it is stored in the   */
155        /* m->other_addr field of the mbus structure. The other_addr  */
156        /* field should probably be a hash table, but for now we hope */
157        /* that there are not too many entities on the mbus, so the   */
158        /* list is small.                                             */
159        int     i;
160
161        mbus_validate(m);
162
163        for (i = 0; i < m->num_other_addr; i++) {
164                if (mbus_addr_match(m->other_addr[i], a)) {
165                        /* Already in the list... */
166                        gettimeofday(m->other_hello[i],NULL);
167                        return;
168                }
169        }
170
171        if (m->num_other_addr == m->max_other_addr) {
172                /* Expand the list... */
173                m->max_other_addr *= 2;
174                m->other_addr = (char **) xrealloc(m->other_addr, m->max_other_addr * sizeof(char *));
175                m->other_hello = (struct timeval **) xrealloc(m->other_hello, m->max_other_addr * sizeof(struct timeval *));
176        }
177        m->other_hello[m->num_other_addr]=(struct timeval *)xmalloc(sizeof(struct timeval));
178        gettimeofday(m->other_hello[m->num_other_addr],NULL);
179        m->other_addr[m->num_other_addr++] = xstrdup(a);
180}
181
182static void remove_other_addr(struct mbus *m, char *a)
183{
184        /* Removes the address a from the m->other_addr field of the */
185        /* mbus structure.                                           */
186        int     i, j;
187
188        mbus_validate(m);
189
190        for (i = 0; i < m->num_other_addr; i++) {
191                if (mbus_addr_match(m->other_addr[i], a)) {
192                        xfree(m->other_addr[i]);
193                        xfree(m->other_hello[i]);
194                        for (j = i+1; j < m->num_other_addr; j++) {
195                                m->other_addr[j-1] = m->other_addr[j];
196                                m->other_hello[j-1] = m->other_hello[j];
197                        }
198                        m->other_addr[m->num_other_addr  - 1] = NULL;
199                        m->other_hello[m->num_other_addr - 1] = NULL;
200                        m->num_other_addr--;
201                }
202        }
203}
204
205static void remove_inactiv_other_addr(struct mbus *m, struct timeval t, int interval){
206        /* Remove addresses we haven't heard from for about 5 * interval */
207        /* Count backwards so it is safe to remove entries               */
208        int i;
209   
210        mbus_validate(m);
211
212        for (i=m->num_other_addr-1; i>=0; i--){
213                if ((t.tv_sec-(m->other_hello[i]->tv_sec)) > 5 * interval) {
214                        debug_msg("remove dead entity (%s)\n", m->other_addr[i]);
215                        remove_other_addr(m, m->other_addr[i]);
216                }
217        }
218}
219
220int mbus_addr_valid(struct mbus *m, char *addr)
221{
222        int     i;
223
224        mbus_validate(m);
225
226        for (i = 0; i < m->num_other_addr; i++) {
227                if (mbus_addr_match(m->other_addr[i], addr)) {
228                        return TRUE;
229                }
230        }
231        return FALSE;
232}
233
234static int mbus_addr_unique(struct mbus *m, char *addr)
235{
236        int     i, n = 0;
237
238        mbus_validate(m);
239
240        for (i = 0; i < m->num_other_addr; i++) {
241                if (mbus_addr_match(m->other_addr[i], addr)) {
242                        n++;
243                }
244        }
245        return n==1;
246}
247
248/* The mb_* functions are used to build an mbus message up in the */
249/* mb_buffer, and to add authentication and encryption before the */
250/* message is sent.                                               */
251char     mb_cryptbuf[MBUS_BUF_SIZE];
252char    *mb_buffer;
253char    *mb_bufpos;
254
255#define MBUS_AUTH_LEN 16
256
257static void mb_header(int seqnum, int ts, char reliable, const char *src, const char *dst, int ackseq)
258{
259        xmemchk();
260        mb_buffer   = (char *) xmalloc(MBUS_BUF_SIZE + 1);
261        memset(mb_buffer,   0, MBUS_BUF_SIZE);
262        memset(mb_buffer, ' ', MBUS_AUTH_LEN);
263        mb_bufpos = mb_buffer + MBUS_AUTH_LEN;
264        sprintf(mb_bufpos, "\nmbus/1.0 %6d %9d %c (%s) %s ", seqnum, ts, reliable, src, dst);
265        mb_bufpos += 33 + strlen(src) + strlen(dst);
266        if (ackseq == -1) {
267                sprintf(mb_bufpos, "()\n");
268                mb_bufpos += 3;
269        } else {
270                sprintf(mb_bufpos, "(%6d)\n", ackseq);
271                mb_bufpos += 9;
272        }
273}
274
275static void mb_add_command(const char *cmnd, const char *args)
276{
277        int offset = strlen(cmnd) + strlen(args) + 5;
278
279        assert((mb_bufpos + offset - mb_buffer) < MBUS_BUF_SIZE);
280
281        sprintf(mb_bufpos, "%s (%s)\n", cmnd, args);
282        mb_bufpos += offset - 1; /* The -1 in offset means we're not NUL terminated - fix in mb_send */
283}
284
285static void mb_send(struct mbus *m)
286{
287        char            digest[16];
288        int             len;
289        unsigned char   initVec[8] = {0,0,0,0,0,0,0,0};
290 
291        mbus_validate(m);
292
293        *(mb_bufpos++) = '\0';
294        assert((mb_bufpos - mb_buffer) < MBUS_BUF_SIZE);
295        assert(strlen(mb_buffer) < MBUS_BUF_SIZE);
296
297        /* Pad to a multiple of 8 bytes, so the encryption can work... */
298        while (((mb_bufpos - mb_buffer) % 8) != 0) {
299                *(mb_bufpos++) = '\0';
300        }
301        len = mb_bufpos - mb_buffer;
302        assert(len < MBUS_BUF_SIZE);
303        assert(strlen(mb_buffer) < MBUS_BUF_SIZE);
304
305        if (m->hashkey != NULL) {
306                /* Authenticate... */
307                hmac_md5(mb_buffer + MBUS_AUTH_LEN+1, strlen(mb_buffer) - (MBUS_AUTH_LEN+1), m->hashkey, m->hashkeylen, digest);
308                base64encode(digest, 12, mb_buffer, MBUS_AUTH_LEN);
309        }
310        if (m->encrkey != NULL) {
311                /* Encrypt... */
312                memset(mb_cryptbuf, 0, MBUS_BUF_SIZE);
313                memcpy(mb_cryptbuf, mb_buffer, len);
314                assert((len % 8) == 0);
315                assert(len < MBUS_BUF_SIZE);
316                assert(m->encrkeylen == 8);
317                qfDES_CBC_e(m->encrkey, mb_cryptbuf, len, initVec);
318                memcpy(mb_buffer, mb_cryptbuf, len);
319        }
320        udp_send(m->s, mb_buffer, len);
321        xfree(mb_buffer);
322}
323
324static void resend(struct mbus *m, struct mbus_msg *curr)
325{
326        /* Don't need to check for buffer overflows: this was done in mbus_send() when */
327        /* this message was first transmitted. If it was okay then, it's okay now.     */
328        int      i;
329
330        mbus_validate(m);
331
332        mb_header(curr->seqnum, curr->comp_time.tv_sec, (char)(curr->reliable?'R':'U'), m->addr, curr->dest, -1);
333        for (i = 0; i < curr->num_cmds; i++) {
334                mb_add_command(curr->cmd_list[i], curr->arg_list[i]);
335        }
336        mb_send(m);
337        curr->retransmit_count++;
338}
339
340void mbus_retransmit(struct mbus *m)
341{
342        struct mbus_msg *curr = m->waiting_ack;
343        struct timeval  time;
344        long            diff;
345
346        mbus_validate(m);
347
348        if (!mbus_waiting_ack(m)) {
349                return;
350        }
351
352        mbus_msg_validate(curr);
353
354        gettimeofday(&time, NULL);
355
356        /* diff is time in milliseconds that the message has been awaiting an ACK */
357        diff = ((time.tv_sec * 1000) + (time.tv_usec / 1000)) - ((curr->send_time.tv_sec * 1000) + (curr->send_time.tv_usec / 1000));
358        if (diff > 10000) {
359                debug_msg("Reliable mbus message failed!\n");
360                if (m->err_handler == NULL) {
361                        abort();
362                }
363                m->err_handler(curr->seqnum, MBUS_MESSAGE_LOST);
364                /* if we don't delete this failed message, the error handler
365                   gets triggered every time we call mbus_retransmit */
366                while (m->waiting_ack->num_cmds > 0) {
367                    m->waiting_ack->num_cmds--;
368                    xfree(m->waiting_ack->cmd_list[m->waiting_ack->num_cmds]);
369                    xfree(m->waiting_ack->arg_list[m->waiting_ack->num_cmds]);
370                }
371                xfree(m->waiting_ack->dest);
372                xfree(m->waiting_ack);
373                m->waiting_ack = NULL;
374                return;
375        }
376        /* Note: We only send one retransmission each time, to avoid
377         * overflowing the receiver with a burst of requests...
378         */
379        if ((diff > 750) && (curr->retransmit_count == 2)) {
380                resend(m, curr);
381                return;
382        }
383        if ((diff > 500) && (curr->retransmit_count == 1)) {
384                resend(m, curr);
385                return;
386        }
387        if ((diff > 250) && (curr->retransmit_count == 0)) {
388                resend(m, curr);
389                return;
390        }
391}
392
393void mbus_heartbeat(struct mbus *m, int interval)
394{
395        struct timeval  curr_time;
396        char    *a = (char *) xmalloc(3);
397        sprintf(a, "()");
398
399        mbus_validate(m);
400
401        gettimeofday(&curr_time, NULL);
402        if (curr_time.tv_sec - m->last_heartbeat.tv_sec >= interval) {
403                mb_header(++m->seqnum, (int) curr_time.tv_sec, 'U', m->addr, "()", -1);
404                mb_add_command("mbus.hello", "");
405                mb_send(m);
406
407                m->last_heartbeat = curr_time;
408                /* Remove dead sources */
409                remove_inactiv_other_addr(m, curr_time, interval);
410        }
411        xfree(a);
412}
413
414int mbus_waiting_ack(struct mbus *m)
415{
416        mbus_validate(m);
417        return m->waiting_ack != NULL;
418}
419
420int mbus_sent_all(struct mbus *m)
421{
422        mbus_validate(m);
423        return (m->cmd_queue == NULL) && (m->waiting_ack == NULL);
424}
425
426struct mbus *mbus_init(void  (*cmd_handler)(char *src, char *cmd, char *arg, void *dat),
427                       void  (*err_handler)(int seqnum, int reason),
428                       char  *addr)
429{
430        struct mbus             *m;
431        struct mbus_key          k;
432        struct mbus_parser      *mp;
433        int                      i;
434        char                    *net_addr, *tmp;
435        uint16_t                 net_port;
436        int                      net_scope;
437
438        m = (struct mbus *) xmalloc(sizeof(struct mbus));
439        if (m == NULL) {
440                debug_msg("Unable to allocate memory for mbus\n");
441                return NULL;
442        }
443
444        m->cfg = mbus_create_config();
445        mbus_lock_config_file(m->cfg);
446        net_addr = (char *) xmalloc(20);
447        mbus_get_net_addr(m->cfg, net_addr, &net_port, &net_scope);
448        m->s              = udp_init(net_addr, net_port, net_port, net_scope); 
449        m->seqnum         = 0;
450        m->cmd_handler    = cmd_handler;
451        m->err_handler    = err_handler;
452        m->num_other_addr = 0;
453        m->max_other_addr = 10;
454        m->other_addr     = (char **) xmalloc(sizeof(char *) * 10);
455        m->other_hello    = (struct timeval **) xmalloc(sizeof(struct timeval *) * 10);
456        for (i = 0; i < 10; i++) {
457                m->other_addr[i]  = NULL;
458                m->other_hello[i] = NULL;
459        }
460        m->cmd_queue      = NULL;
461        m->waiting_ack    = NULL;
462        m->magic          = MBUS_MAGIC;
463        m->index          = 0;
464        m->index_sent     = 0;
465
466        mp = mbus_parse_init(xstrdup(addr));
467        if (!mbus_parse_lst(mp, &tmp)) {
468                debug_msg("Invalid mbus address\n");
469                abort();
470        }
471        m->addr = xstrdup(tmp);
472        mbus_parse_done(mp);
473        assert(m->addr != NULL);
474
475        gettimeofday(&(m->last_heartbeat), NULL);
476
477        mbus_get_encrkey(m->cfg, &k);
478        m->encrkey    = k.key;
479        m->encrkeylen = k.key_len;
480
481        mbus_get_hashkey(m->cfg, &k);
482        m->hashkey    = k.key;
483        m->hashkeylen = k.key_len;
484
485        mbus_unlock_config_file(m->cfg);
486
487        xfree(net_addr);
488
489        return m;
490}
491
492void mbus_cmd_handler(struct mbus *m, void  (*cmd_handler)(char *src, char *cmd, char *arg, void *dat))
493{
494        mbus_validate(m);
495        m->cmd_handler = cmd_handler;
496}
497
498static void mbus_flush_msgs(struct mbus_msg *queue)
499{
500        struct mbus_msg *curr, *next;
501        int i;
502
503        curr = queue;
504        while(curr) {
505                next = curr->next;
506                xfree(curr->dest);
507                for(i = 0; i < curr->num_cmds; i++) {
508                        xfree(curr->cmd_list[i]);
509                        xfree(curr->arg_list[i]);
510                }
511                curr = next;
512        }
513}
514
515void mbus_exit(struct mbus *m)
516{
517        int i;
518
519        assert(m != NULL);
520        mbus_validate(m);
521
522        mbus_qmsg(m, "()", "mbus.bye", "", FALSE);
523        mbus_send(m);
524
525        /* FIXME: It should be a fatal error to call mbus_exit() if some messages are still outstanding. */
526        /*        We will need an mbus_flush() call first though, to ensure nothing is waiting.          */
527        mbus_flush_msgs(m->cmd_queue);
528        mbus_flush_msgs(m->waiting_ack);
529
530        if (m->encrkey != NULL) {
531                xfree(m->encrkey);
532        }
533        if (m->hashkey != NULL) {
534                xfree(m->hashkey);
535        }
536
537        udp_exit(m->s);
538
539        /* Clean up other_* */
540        for (i=m->num_other_addr-1; i>=0; i--){
541            remove_other_addr(m, m->other_addr[i]);
542        }
543
544        xfree(m->addr);
545        xfree(m->other_addr);
546        xfree(m->other_hello);
547        xfree(m->cfg);
548        xfree(m);
549}
550
551void mbus_send(struct mbus *m)
552{
553        /* Send one, or more, messages previosly queued with mbus_qmsg(). */
554        /* Messages for the same destination are batched together. Stops  */
555        /* when a reliable message is sent, until the ACK is received.    */
556        struct mbus_msg *curr = m->cmd_queue;
557        int              i;
558
559        mbus_validate(m);
560        if (m->waiting_ack != NULL) {
561                return;
562        }
563
564        while (curr != NULL) {
565                mbus_msg_validate(curr);
566                /* It's okay for us to send messages which haven't been marked as complete - */
567                /* that just means we're sending something which has the potential to have   */
568                /* more data piggybacked. However, if it's not complete it MUST be the last  */
569                /* in the list, or something has been reordered - which is bad.              */
570                if (!curr->complete) {
571                        assert(curr->next == NULL);
572                }
573
574                if (curr->reliable) {
575                        if (!mbus_addr_valid(m, curr->dest)) {
576                            debug_msg("Trying to send reliably to an unknown address...\n");
577                            if (m->err_handler == NULL) {
578                                abort();
579                            }
580                            m->err_handler(curr->seqnum, MBUS_DESTINATION_UNKNOWN);
581                        }
582                        if (!mbus_addr_unique(m, curr->dest)) {
583                            debug_msg("Trying to send reliably but address is not unique...\n");
584                            if (m->err_handler == NULL) {
585                                abort();
586                            }
587                            m->err_handler(curr->seqnum, MBUS_DESTINATION_NOT_UNIQUE);
588                        }
589                }
590                /* Create the message... */
591                mb_header(curr->seqnum, curr->comp_time.tv_sec, (char)(curr->reliable?'R':'U'), m->addr, curr->dest, -1);
592                for (i = 0; i < curr->num_cmds; i++) {
593                        assert(m->index_sent == (curr->idx_list[i] - 1));
594                        m->index_sent = curr->idx_list[i];
595                        mb_add_command(curr->cmd_list[i], curr->arg_list[i]);
596                }
597                mb_send(m);
598               
599                m->cmd_queue = curr->next;
600                if (curr->reliable) {
601                        /* Reliable message, wait for the ack... */
602                        gettimeofday(&(curr->send_time), NULL);
603                        m->waiting_ack = curr;
604                        return;
605                } else {
606                        while (curr->num_cmds > 0) {
607                                curr->num_cmds--;
608                                xfree(curr->cmd_list[curr->num_cmds]); curr->cmd_list[curr->num_cmds] = NULL;
609                                xfree(curr->arg_list[curr->num_cmds]); curr->arg_list[curr->num_cmds] = NULL;
610                        }
611                        xfree(curr->dest);
612                        xfree(curr);
613                }
614                curr = m->cmd_queue;
615        }
616}
617
618void mbus_qmsg(struct mbus *m, const char *dest, const char *cmnd, const char *args, int reliable)
619{
620        /* Queue up a message for sending. The message is not */
621        /* actually sent until mbus_send() is called.         */
622        struct mbus_msg *curr = m->cmd_queue;
623        struct mbus_msg *prev = NULL;
624        int              alen = strlen(cmnd) + strlen(args) + 4;
625        int              i;
626
627        mbus_validate(m);
628        while (curr != NULL) {
629                mbus_msg_validate(curr);
630                if (!curr->complete) {
631                        /* This message is still open for new commands. It MUST be the last in the */
632                        /* cmd_queue, else commands will be reordered.                             */
633                        assert(curr->next == NULL);
634                        if (mbus_addr_identical(curr->dest, dest) &&
635                            (curr->num_cmds < MBUS_MAX_QLEN) && ((curr->message_size + alen) < (MBUS_BUF_SIZE - 500))) {
636                                curr->num_cmds++;
637                                curr->reliable |= reliable;
638                                curr->cmd_list[curr->num_cmds-1] = xstrdup(cmnd);
639                                curr->arg_list[curr->num_cmds-1] = xstrdup(args);
640                                curr->idx_list[curr->num_cmds-1] = ++(m->index);
641                                curr->message_size += alen;
642                                mbus_msg_validate(curr);
643                                return;
644                        } else {
645                                curr->complete = TRUE;
646                        }
647                }
648                prev = curr;
649                curr = curr->next;
650        }
651        /* If we get here, we've not found an open message in the cmd_queue.  We */
652        /* have to create a new message, and add it to the end of the cmd_queue. */
653        curr = (struct mbus_msg *) xmalloc(sizeof(struct mbus_msg));
654        curr->magic            = MBUS_MSG_MAGIC;
655        curr->next             = NULL;
656        curr->dest             = xstrdup(dest);
657        curr->retransmit_count = 0;
658        curr->message_size     = alen + 60 + strlen(dest) + strlen(m->addr);
659        curr->seqnum           = m->seqnum++;
660        curr->reliable         = reliable;
661        curr->complete         = FALSE;
662        curr->num_cmds         = 1;
663        curr->cmd_list[0]      = xstrdup(cmnd);
664        curr->arg_list[0]      = xstrdup(args);
665        curr->idx_list[curr->num_cmds-1] = ++(m->index);
666        for (i = 1; i < MBUS_MAX_QLEN; i++) {
667                curr->cmd_list[i] = NULL;
668                curr->arg_list[i] = NULL;
669        }
670        if (prev == NULL) {
671                m->cmd_queue = curr;
672        } else {
673                prev->next = curr;
674        }
675        gettimeofday(&(curr->send_time), NULL);
676        gettimeofday(&(curr->comp_time), NULL);
677        mbus_msg_validate(curr);
678}
679
680void mbus_qmsgf(struct mbus *m, const char *dest, int reliable, const char *cmnd, const char *format, ...)
681{
682        /* This is a wrapper around mbus_qmsg() which does a printf() style format into  */
683        /* a buffer. Saves the caller from having to a a malloc(), write the args string */
684        /* and then do a free(), and also saves worring about overflowing the buffer, so */
685        /* removing a common source of bugs!                                             */
686        char    buffer[MBUS_BUF_SIZE];
687        va_list ap;
688
689        mbus_validate(m);
690        va_start(ap, format);
691#ifdef WIN32
692        _vsnprintf(buffer, MBUS_BUF_SIZE, format, ap);
693#else
694        vsnprintf(buffer, MBUS_BUF_SIZE, format, ap);
695#endif
696        va_end(ap);
697        mbus_qmsg(m, dest, cmnd, buffer, reliable);
698}
699
700int mbus_recv(struct mbus *m, void *data, struct timeval *timeout)
701{
702        char                    *auth, *ver, *src, *dst, *ack, *r, *cmd, *param, *npos;
703        char                     buffer[MBUS_BUF_SIZE];
704        int                      buffer_len, seq, a, rx, ts, authlen, loop_count;
705        char                     ackbuf[MBUS_ACK_BUF_SIZE];
706        char                     digest[16];
707        unsigned char            initVec[8] = {0,0,0,0,0,0,0,0};
708        struct timeval           t;
709        struct mbus_parser      *mp, *mp2;
710
711        mbus_validate(m);
712
713        rx = FALSE;
714        loop_count = 0;
715        while (loop_count++ < 10) {
716                memset(buffer, 0, MBUS_BUF_SIZE);
717                assert(m->s != NULL);
718                udp_fd_zero();
719                udp_fd_set(m->s);
720                t.tv_sec  = timeout->tv_sec;
721                t.tv_usec = timeout->tv_usec;
722                if ((udp_select(&t) > 0) && udp_fd_isset(m->s)) {
723                        buffer_len = udp_recv(m->s, buffer, MBUS_BUF_SIZE);
724                        if (buffer_len > 0) {
725                                rx = TRUE;
726                        } else {
727                                return rx;
728                        }
729                } else {
730                        return FALSE;
731                }
732
733                if (m->encrkey != NULL) {
734                        /* Decrypt the message... */
735                        if ((buffer_len % 8) != 0) {
736                                debug_msg("Encrypted message not a multiple of 8 bytes in length\n");
737                                continue;
738                        }
739                        memcpy(mb_cryptbuf, buffer, buffer_len);
740                        memset(initVec, 0, 8);
741                        qfDES_CBC_d(m->encrkey, mb_cryptbuf, buffer_len, initVec);
742                        memcpy(buffer, mb_cryptbuf, buffer_len);
743                }
744
745                /* Sanity check that this is a vaguely sensible format message... Should prevent */
746                /* problems if we're fed complete garbage, but won't prevent determined hackers. */
747                if (strncmp(buffer + MBUS_AUTH_LEN + 1, "mbus/1.0", 8) != 0) {
748                        continue;
749                }
750
751                mp = mbus_parse_init(buffer);
752                /* remove trailing 0 bytes */
753                npos = (char *) strchr(buffer,'\0');
754                if(npos!=NULL) {
755                        buffer_len=npos-buffer;
756                }
757                /* Parse the authentication header */
758                if (!mbus_parse_sym(mp, &auth)) {
759                        debug_msg("Failed to parse authentication header\n");
760                        mbus_parse_done(mp);
761                        continue;
762                }
763
764                /* Check that the packet authenticates correctly... */
765                authlen = strlen(auth);
766                hmac_md5(buffer + authlen + 1, buffer_len - authlen - 1, m->hashkey, m->hashkeylen, digest);
767                base64encode(digest, 12, ackbuf, 16);
768                if ((strlen(auth) != 16) || (strncmp(auth, ackbuf, 16) != 0)) {
769                        debug_msg("Failed to authenticate message...\n");
770                        mbus_parse_done(mp);
771                        continue;
772                }
773
774                /* Parse the header */
775                if (!mbus_parse_sym(mp, &ver)) {
776                        mbus_parse_done(mp);
777                        debug_msg("Parser failed version (1): %s\n",ver);
778                        continue;
779                }
780                if (strcmp(ver, "mbus/1.0") != 0) {
781                        mbus_parse_done(mp);
782                        debug_msg("Parser failed version (2): %s\n",ver);
783                        continue;
784                }
785                if (!mbus_parse_int(mp, &seq)) {
786                        mbus_parse_done(mp);
787                        debug_msg("Parser failed seq\n");
788                        continue;
789                }
790                if (!mbus_parse_int(mp, &ts)) {
791                        mbus_parse_done(mp);
792                        debug_msg("Parser failed ts\n");
793                        continue;
794                }
795                if (!mbus_parse_sym(mp, &r)) {
796                        mbus_parse_done(mp);
797                        debug_msg("Parser failed reliable\n");
798                        continue;
799                }
800                if (!mbus_parse_lst(mp, &src)) {
801                        mbus_parse_done(mp);
802                        debug_msg("Parser failed src\n");
803                        continue;
804                }
805                if (!mbus_parse_lst(mp, &dst)) {
806                        mbus_parse_done(mp);
807                        debug_msg("Parser failed dst\n");
808                        continue;
809                }
810                if (!mbus_parse_lst(mp, &ack)) {
811                        mbus_parse_done(mp);
812                        debug_msg("Parser failed ack\n");
813                        continue;
814                }
815
816                store_other_addr(m, src);
817
818                /* Check if the message was addressed to us... */
819                if (mbus_addr_match(m->addr, dst)) {
820                        /* ...if so, process any ACKs received... */
821                        mp2 = mbus_parse_init(ack);
822                        while (mbus_parse_int(mp2, &a)) {
823                                if (mbus_waiting_ack(m)) {
824                                        if (m->waiting_ack->seqnum == a) {
825                                                while (m->waiting_ack->num_cmds > 0) {
826                                                        m->waiting_ack->num_cmds--;
827                                                        xfree(m->waiting_ack->cmd_list[m->waiting_ack->num_cmds]);
828                                                        xfree(m->waiting_ack->arg_list[m->waiting_ack->num_cmds]);
829                                                }
830                                                xfree(m->waiting_ack->dest);
831                                                xfree(m->waiting_ack);
832                                                m->waiting_ack = NULL;
833                                        } else {
834                                                debug_msg("Got ACK %d but wanted %d\n", a, m->waiting_ack->seqnum);
835                                        }
836                                } else {
837                                        debug_msg("Got ACK %d but wasn't expecting it\n", a);
838                                }
839                        }
840                        mbus_parse_done(mp2);
841                        /* ...if an ACK was requested, send one... */
842                        if (strcmp(r, "R") == 0) {
843                                char            *newsrc = (char *) xmalloc(strlen(src) + 3);
844                                struct timeval   t;
845
846                                sprintf(newsrc, "(%s)", src);   /* Yes, this is a kludge. */
847                                gettimeofday(&t, NULL);
848                                mb_header(++m->seqnum, (int) t.tv_sec, 'U', m->addr, newsrc, seq);
849                                mb_send(m);
850                                xfree(newsrc);
851                        } else if (strcmp(r, "U") == 0) {
852                                /* Unreliable message.... not need to do anything */
853                        } else {
854                                debug_msg("Message with invalid reliability field \"%s\" ignored\n", r);
855                        }
856                        /* ...and process the commands contained in the message */
857                        while (mbus_parse_sym(mp, &cmd)) {
858                                if (mbus_parse_lst(mp, &param)) {
859                                        char            *newsrc = (char *) xmalloc(strlen(src) + 3);
860                                        sprintf(newsrc, "(%s)", src);   /* Yes, this is a kludge. */
861                                        /* Finally, we snoop on the message we just passed to the application, */
862                                        /* to do housekeeping of our list of known mbus sources...             */
863                                        if (strcmp(cmd, "mbus.bye") == 0) {
864                                                remove_other_addr(m, newsrc);
865                                        }
866                                        if (strcmp(cmd, "mbus.hello") == 0) {
867                                                /* Mark this source as activ. We remove dead sources in mbus_heartbeat */
868                                                store_other_addr(m, newsrc);
869                                        }
870                                        m->cmd_handler(newsrc, cmd, param, data);
871                                        xfree(newsrc);
872                                } else {
873                                        debug_msg("Unable to parse mbus command:\n");
874                                        debug_msg("cmd = %s\n", cmd);
875                                        debug_msg("arg = %s\n", param);
876                                        break;
877                                }
878                        }
879                }
880                mbus_parse_done(mp);
881        }
882        return rx;
883}
884
885#define RZ_HANDLE_WAITING 1
886#define RZ_HANDLE_GO      2
887
888struct mbus_rz {
889        char            *peer;
890        char            *token;
891        struct mbus     *m;
892        void            *data;
893        int              mode;
894        void (*cmd_handler)(char *src, char *cmd, char *args, void *data);
895};
896
897static void rz_handler(char *src, char *cmd, char *args, void *data)
898{
899        struct mbus_rz          *r = (struct mbus_rz *) data;
900        struct mbus_parser      *mp;
901
902        if ((r->mode == RZ_HANDLE_WAITING) && (strcmp(cmd, "mbus.waiting") == 0)) {
903                char    *t;
904
905                mp = mbus_parse_init(args);
906                mbus_parse_str(mp, &t);
907                if (strcmp(mbus_decode_str(t), r->token) == 0) {
908                        if (r->peer != NULL) xfree(r->peer);
909                        r->peer = xstrdup(src);
910                }
911                mbus_parse_done(mp);
912        } else if ((r->mode == RZ_HANDLE_GO) && (strcmp(cmd, "mbus.go") == 0)) {
913                char    *t;
914
915                mp = mbus_parse_init(args);
916                mbus_parse_str(mp, &t);
917                if (strcmp(mbus_decode_str(t), r->token) == 0) {
918                        if (r->peer != NULL) xfree(r->peer);
919                        r->peer = xstrdup(src);
920                }
921                mbus_parse_done(mp);
922        } else {
923                r->cmd_handler(src, cmd, args, r->data);
924        }
925}
926
927char *mbus_rendezvous_waiting(struct mbus *m, char *addr, char *token, void *data)
928{
929        /* Loop, sending mbus.waiting(token) to "addr", until we get mbus.go(token) */
930        /* back from our peer. Any other mbus commands received whilst waiting are  */
931        /* processed in the normal manner, as if mbus_recv() had been called.       */
932        char            *token_e, *peer;
933        struct timeval   timeout;
934        struct mbus_rz  *r;
935
936        mbus_validate(m);
937
938        r = (struct mbus_rz *) xmalloc(sizeof(struct mbus_rz));
939        r->peer        = NULL;
940        r->token       = token;
941        r->m           = m;
942        r->data        = data;
943        r->mode        = RZ_HANDLE_GO;
944        r->cmd_handler = m->cmd_handler;
945        m->cmd_handler = rz_handler;
946        token_e        = mbus_encode_str(token);
947        while (r->peer == NULL) {
948                timeout.tv_sec  = 0;
949                timeout.tv_usec = 100000;
950                mbus_heartbeat(m, 1);
951                mbus_qmsgf(m, addr, FALSE, "mbus.waiting", "%s", token_e);
952                mbus_send(m);
953                mbus_recv(m, r, &timeout);
954                mbus_retransmit(m);
955        }
956        m->cmd_handler = r->cmd_handler;
957        peer = r->peer;
958        xfree(r);
959        xfree(token_e);
960        return peer;
961}
962
963char *mbus_rendezvous_go(struct mbus *m, char *token, void *data)
964{
965        /* Wait until we receive mbus.waiting(token), then send mbus.go(token) back to   */
966        /* the sender of that message. Whilst waiting, other mbus commands are processed */
967        /* in the normal manner as if mbus_recv() had been called.                       */
968        char            *token_e, *peer;
969        struct timeval   timeout;
970        struct mbus_rz  *r;
971
972        mbus_validate(m);
973
974        r = (struct mbus_rz *) xmalloc(sizeof(struct mbus_rz));
975        r->peer        = NULL;
976        r->token       = token;
977        r->m           = m;
978        r->data        = data;
979        r->mode        = RZ_HANDLE_WAITING;
980        r->cmd_handler = m->cmd_handler;
981        m->cmd_handler = rz_handler;
982        token_e        = mbus_encode_str(token);
983        while (r->peer == NULL) {
984                timeout.tv_sec  = 0;
985                timeout.tv_usec = 100000;
986                mbus_heartbeat(m, 1);
987                mbus_send(m);
988                mbus_recv(m, r, &timeout);
989                mbus_retransmit(m);
990        }
991
992        mbus_qmsgf(m, r->peer, TRUE, "mbus.go", "%s", token_e);
993        do {
994                mbus_heartbeat(m, 1);
995                mbus_retransmit(m);
996                mbus_send(m);
997                timeout.tv_sec  = 0;
998                timeout.tv_usec = 100000;
999                mbus_recv(m, r, &timeout);
1000        } while (!mbus_sent_all(m));
1001
1002        m->cmd_handler = r->cmd_handler;
1003        peer = r->peer;
1004        xfree(r);
1005        xfree(token_e);
1006        return peer;
1007}
Note: See TracBrowser for help on using the browser.