root/common/trunk/src/mbus.c @ 219

Revision 219, 24.6 KB (checked in by ucaccsp, 15 years ago)

Add mbus_sent_all()

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*
2 * FILE:    mbus.c
3 * AUTHORS: Colin Perkins
4 *
5 * Copyright (c) 1997-99 University College London
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, is permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 *    notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 *    notice, this list of conditions and the following disclaimer in the
15 *    documentation and/or other materials provided with the distribution.
16 * 3. All advertising materials mentioning features or use of this software
17 *    must display the following acknowledgement:
18 *      This product includes software developed by the Computer Science
19 *      Department at University College London
20 * 4. Neither the name of the University nor of the Department may be used
21 *    to endorse or promote products derived from this software without
22 *    specific prior written permission.
23 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
24 * ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
27 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
29 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
32 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33 * SUCH DAMAGE.
34 */
35
36#include "config_unix.h"
37#include "config_win32.h"
38#include "debug.h"
39#include "memory.h"
40#include "net_udp.h"
41#include "hmac.h"
42#include "qfDES.h"
43#include "base64.h"
44#include "gettimeofday.h"
45#include "mbus.h"
46#include "mbus_config.h"
47
48#define MBUS_BUF_SIZE     1500
49#define MBUS_ACK_BUF_SIZE 1500
50#define MBUS_MAX_ADDR       10
51#define MBUS_MAX_PD         10
52#define MBUS_MAX_QLEN       50 /* Number of messages we can queue with mbus_qmsg() */
53
54#ifdef NEED_VSNPRINTF
55static int vsnprintf(char *s, int buf_size, const char *format, va_list ap)
56{
57        /* Quick hack replacement for vsnprintf... note that this */
58        /* doesn't check for buffer overflows, and so is open to  */
59        /* many really nasty attacks!                             */
60        UNUSED(buf_size);
61        return vsprintf(s,format,ap);
62}
63#endif
64
65struct mbus_msg {
66        struct mbus_msg *next;
67        struct timeval   time;  /* Time the message was sent, to trigger a retransmit */
68        struct timeval   ts;    /* Time the message was composed, the timestamp in the packet header */
69        char            *dest;
70        int              reliable;
71        int              complete;      /* Indicates that we've finished adding cmds to this message */
72        int              seqnum;
73        int              retransmit_count;
74        int              message_size;
75        int              num_cmds;
76        char            *cmd_list[MBUS_MAX_QLEN];
77        char            *arg_list[MBUS_MAX_QLEN];
78};
79
80struct mbus {
81        socket_udp               *s;
82        int                       num_addr;
83        char                    *addr[MBUS_MAX_ADDR];           /* Addresses we respond to.                                     */
84        int                       max_other_addr;
85        int                       num_other_addr;
86        char                    **other_addr;                   /* Addresses of other entities on the mbus.                     */
87        char                     *parse_buffer[MBUS_MAX_PD];    /* Temporary storage for parsing mbus commands                  */
88        char                     *parse_bufend[MBUS_MAX_PD];    /* End of space allocated for parsing, to check for overflows   */
89        int                       parse_depth;
90        int                       seqnum;
91        struct mbus_msg          *cmd_queue;                    /* Queue of messages waiting to be sent */
92        struct mbus_msg          *waiting_ack;                  /* The last reliable message sent, if we have not yet got the ACK */
93        char                     *hashkey;
94        int                       hashkeylen;
95        char                     *encrkey;
96        int                       encrkeylen;
97        struct timeval            last_heartbeat;               /* Last time we sent a heartbeat message */
98        struct mbus_config       *cfg;
99        void (*cmd_handler)(char *src, char *cmd, char *arg, void *dat);
100        void (*err_handler)(int seqnum, int reason);
101};
102
103static int mbus_addr_match(char *a, char *b)
104{
105        /* Compare the addresses "a" and "b". These may optionally be */
106        /* surrounded by "(" and ")" and may have an arbitrary amount */
107        /* of white space between components of the addresses. There  */
108        /* is a match if every word of address b is in address a.     */
109        /* NOTE: The strings passed to this function are stored for   */
110        /* later use and MUST NOT be modified by this routine.        */
111        char    *y, c;
112
113        assert(a != NULL);
114        assert(b != NULL);
115
116        /* Skip leading whitespace and '('... */
117        while (isspace((unsigned char)*a) || (*a == '(')) a++;
118        while (isspace((unsigned char)*b) || (*b == '(')) b++;
119
120        while ((*b != '\0') && (*b != ')')) {
121                while (isspace((unsigned char)*b)) b++;
122                for (y = b; ((*y != ' ') && (*y != ')') && (*y != '\0')); y++) {
123                        /* do nothing */
124                }
125                c = *y;
126                *y = '\0';
127                if (strstr(a, b) == NULL) {
128                        /* ...this word not found */
129                        *y = c;
130                        return FALSE;
131                }
132                *y = c;
133                b = y;
134        }               
135        return TRUE;
136}
137
138static void store_other_addr(struct mbus *m, char *a)
139{
140        /* This takes the address a and ensures it is stored in the   */
141        /* m->other_addr field of the mbus structure. The other_addr  */
142        /* field should probably be a hash table, but for now we hope */
143        /* that there are not too many entities on the mbus, so the   */
144        /* list is small.                                             */
145        int     i;
146
147        for (i = 0; i < m->num_other_addr; i++) {
148                if (mbus_addr_match(m->other_addr[i], a)) {
149                        /* Already in the list... */
150                        return;
151                }
152        }
153
154        if (m->num_other_addr == m->max_other_addr) {
155                /* Expand the list... */
156                m->max_other_addr *= 2;
157                m->other_addr = (char **) xrealloc(m->other_addr, m->max_other_addr * sizeof(char *));
158        }
159        m->other_addr[m->num_other_addr++] = xstrdup(a);
160}
161
162int mbus_addr_valid(struct mbus *m, char *addr)
163{
164        int     i;
165
166        for (i = 0; i < m->num_other_addr; i++) {
167                if (mbus_addr_match(m->other_addr[i], addr)) {
168                        return TRUE;
169                }
170        }
171        return FALSE;
172}
173
174/* The tx_* functions are used to build an mbus message up in the */
175/* tx_buffer, and to add authentication and encryption before the */
176/* message is sent.                                               */
177static char      tx_cryptbuf[MBUS_BUF_SIZE];
178static char      tx_buffer[MBUS_BUF_SIZE];
179static char     *tx_bufpos;
180
181#define MBUS_AUTH_LEN 16
182
183static void tx_header(int seqnum, int ts, char reliable, char *src, char *dst, int ackseq)
184{
185        memset(tx_buffer,   0, MBUS_BUF_SIZE);
186        memset(tx_buffer, ' ', MBUS_AUTH_LEN);
187        tx_bufpos = tx_buffer + MBUS_AUTH_LEN;
188        sprintf(tx_bufpos, "\nmbus/1.0 %6d %9d %c (%s) %s ", seqnum, ts, reliable, src, dst);
189        tx_bufpos += 33 + strlen(src) + strlen(dst);
190        if (ackseq == -1) {
191                sprintf(tx_bufpos, "()\n");
192                tx_bufpos += 3;
193        } else {
194                sprintf(tx_bufpos, "(%6d)\n", ackseq);
195                tx_bufpos += 9;
196        }
197}
198
199static void tx_add_command(char *cmnd, char *args)
200{
201        sprintf(tx_bufpos, "%s (%s)\n", cmnd, args);
202        tx_bufpos += strlen(cmnd) + strlen(args) + 4;
203}
204
205static void tx_send(struct mbus *m)
206{
207        char            digest[16];
208        int             len;
209        unsigned char   initVec[8] = {0,0,0,0,0,0,0,0};
210
211        while (((tx_bufpos - tx_buffer) % 8) != 0) {
212                /* Pad to a multiple of 8 bytes, so the encryption can work... */
213                *(tx_bufpos++) = '\0';
214        }
215        *tx_bufpos = '\0';
216        len = tx_bufpos - tx_buffer;
217
218        if (m->hashkey != NULL) {
219                /* Authenticate... */
220                hmac_md5(tx_buffer + MBUS_AUTH_LEN+1, strlen(tx_buffer) - (MBUS_AUTH_LEN+1), m->hashkey, m->hashkeylen, digest);
221                base64encode(digest, 12, tx_buffer, MBUS_AUTH_LEN);
222        }
223        if (m->encrkey != NULL) {
224                /* Encrypt... */
225                memset(tx_cryptbuf, 0, MBUS_BUF_SIZE);
226                memcpy(tx_cryptbuf, tx_buffer, len);
227                assert((len % 8) == 0);
228                assert(len < MBUS_BUF_SIZE);
229                assert(m->encrkeylen == 8);
230                qfDES_CBC_e(m->encrkey, tx_cryptbuf, len, initVec);
231                memcpy(tx_buffer, tx_cryptbuf, len);
232        }
233        udp_send(m->s, tx_buffer, len);
234}
235
236static void resend(struct mbus *m, struct mbus_msg *curr)
237{
238        /* Don't need to check for buffer overflows: this was done in mbus_send() when */
239        /* this message was first transmitted. If it was okay then, it's okay now.     */
240        int      i;
241
242        tx_header(curr->seqnum, curr->ts.tv_sec, (char)(curr->reliable?'R':'U'), m->addr[0], curr->dest, -1);
243        for (i = 0; i < curr->num_cmds; i++) {
244                tx_add_command(curr->cmd_list[i], curr->arg_list[i]);
245        }
246        tx_send(m);
247        curr->retransmit_count++;
248}
249
250void mbus_retransmit(struct mbus *m)
251{
252        struct mbus_msg *curr = m->waiting_ack;
253        struct timeval  time;
254        long            diff;
255
256        if (!mbus_waiting_ack(m)) {
257                return;
258        }
259
260        gettimeofday(&time, NULL);
261
262        /* diff is time in milliseconds that the message has been awaiting an ACK */
263        diff = ((time.tv_sec * 1000) + (time.tv_usec / 1000)) - ((curr->time.tv_sec * 1000) + (curr->time.tv_usec / 1000));
264        if (diff > 10000) {
265                debug_msg("Reliable mbus message failed!\n");
266                if (m->err_handler == NULL) {
267                        abort();
268                }
269                m->err_handler(curr->seqnum, MBUS_MESSAGE_LOST);
270                return;
271        }
272        /* Note: We only send one retransmission each time, to avoid
273         * overflowing the receiver with a burst of requests...
274         */
275        if ((diff > 750) && (curr->retransmit_count == 2)) {
276                resend(m, curr);
277                return;
278        }
279        if ((diff > 500) && (curr->retransmit_count == 1)) {
280                resend(m, curr);
281                return;
282        }
283        if ((diff > 250) && (curr->retransmit_count == 0)) {
284                resend(m, curr);
285                return;
286        }
287        curr = curr->next;
288}
289
290void mbus_heartbeat(struct mbus *m, int interval)
291{
292        struct timeval  curr_time;
293
294        gettimeofday(&curr_time, NULL);
295
296        if (curr_time.tv_sec - m->last_heartbeat.tv_sec > interval) {
297                mbus_qmsg(m, "()", "mbus.hello", "", FALSE);
298                m->last_heartbeat = curr_time;
299        }
300}
301
302int mbus_waiting_ack(struct mbus *m)
303{
304        return m->waiting_ack != NULL;
305}
306
307int mbus_sent_all(struct mbus *m)
308{
309        return (m->cmd_queue == NULL) && (m->waiting_ack == NULL);
310}
311
312struct mbus *mbus_init(void  (*cmd_handler)(char *src, char *cmd, char *arg, void *dat),
313                       void  (*err_handler)(int seqnum, int reason))
314{
315        struct mbus     *m;
316        struct mbus_key  k;
317        int              i;
318
319        m = (struct mbus *) xmalloc(sizeof(struct mbus));
320        if (m == NULL) {
321                debug_msg("Unable to allocate memory for mbus\n");
322                return NULL;
323        }
324
325        m->cfg = (struct mbus_config *) xmalloc(sizeof(struct mbus_config));
326        mbus_lock_config_file(m->cfg);
327        m->s              = udp_init("224.255.222.239", (u_int16) 47000, (u_int16) 47000, 0);
328        m->seqnum         = 0;
329        m->cmd_handler    = cmd_handler;
330        m->err_handler    = err_handler;
331        m->num_addr       = 0;
332        m->num_other_addr = 0;
333        m->max_other_addr = 10;
334        m->other_addr     = (char **) xmalloc(sizeof(char *) * 10);
335        m->parse_depth    = 0;
336        m->cmd_queue      = NULL;
337        m->waiting_ack    = NULL;
338
339        gettimeofday(&(m->last_heartbeat), NULL);
340
341        mbus_get_encrkey(m->cfg, &k);
342        m->encrkey    = k.key;
343        m->encrkeylen = k.key_len;
344
345        mbus_get_hashkey(m->cfg, &k);
346        m->hashkey    = k.key;
347        m->hashkeylen = k.key_len;
348
349        for (i = 0; i < MBUS_MAX_ADDR; i++) m->addr[i]         = NULL;
350        for (i = 0; i < MBUS_MAX_PD;   i++) m->parse_buffer[i] = NULL;
351        for (i = 0; i < MBUS_MAX_PD;   i++) m->parse_bufend[i] = NULL;
352        mbus_unlock_config_file(m->cfg);
353
354        return m;
355}
356
357void mbus_cmd_handler(struct mbus *m, void  (*cmd_handler)(char *src, char *cmd, char *arg, void *dat))
358{
359        m->cmd_handler = cmd_handler;
360}
361
362static void mbus_flush_msgs(struct mbus_msg *queue)
363{
364        struct mbus_msg *curr, *next;
365        int i;
366
367        curr = queue;
368        while(curr) {
369                next = curr->next;
370                xfree(curr->dest);
371                for(i = 0; i < curr->num_cmds; i++) {
372                        xfree(curr->cmd_list[i]);
373                        xfree(curr->arg_list[i]);
374                }
375                curr = next;
376        }
377}
378
379void mbus_exit(struct mbus *m)
380{
381        assert(m != NULL);
382
383        while(m->parse_depth--) {
384                xfree(m->parse_buffer[m->parse_depth]);
385                m->parse_buffer[m->parse_depth] = NULL;
386                m->parse_bufend[m->parse_depth] = NULL;
387        }
388
389        mbus_flush_msgs(m->cmd_queue);
390        mbus_flush_msgs(m->waiting_ack);
391
392        if (m->encrkey != NULL) {
393                xfree(m->encrkey);
394        }
395
396        udp_exit(m->s);
397
398        xfree(m->hashkey);
399        xfree(m->cfg);
400        xfree(m);
401}
402
403void mbus_addr(struct mbus *m, char *addr)
404{
405        assert(m->num_addr < MBUS_MAX_ADDR);
406        mbus_parse_init(m, xstrdup(addr));
407        if (mbus_parse_lst(m, &(m->addr[m->num_addr]))) {
408                m->num_addr++;
409        }
410        mbus_parse_done(m);
411}
412
413void mbus_send(struct mbus *m)
414{
415        /* Send one, or more, messages previosly queued with mbus_qmsg(). */
416        /* Messages for the same destination are batched together. Stops  */
417        /* when a reliable message is sent, until the ACK is received.    */
418        struct mbus_msg *curr = m->cmd_queue;
419        int              i;
420
421        if (m->waiting_ack != NULL) {
422                return;
423        }
424
425        while (curr != NULL) {
426                /* Create the message... */
427                tx_header(curr->seqnum, curr->ts.tv_sec, (char)(curr->reliable?'R':'U'), m->addr[0], curr->dest, -1);
428                for (i = 0; i < curr->num_cmds; i++) {
429                        tx_add_command(curr->cmd_list[i], curr->arg_list[i]);
430                }
431                tx_send(m);
432               
433                m->cmd_queue = curr->next;
434                if (curr->reliable) {
435                        /* Reliable message, wait for the ack... */
436                        gettimeofday(&(curr->time), NULL);
437                        m->waiting_ack = curr;
438                        return;
439                } else {
440                        while (curr->num_cmds > 0) {
441                                curr->num_cmds--;
442                                xfree(curr->cmd_list[curr->num_cmds]);
443                                xfree(curr->arg_list[curr->num_cmds]);
444                        }
445                        xfree(curr->dest);
446                        xfree(curr);
447                }
448                curr = m->cmd_queue;
449        }
450}
451
452void mbus_qmsg(struct mbus *m, char *dest, const char *cmnd, const char *args, int reliable)
453{
454        /* Queue up a message for sending. The message is not */
455        /* actually sent until mbus_send() is called.         */
456        struct mbus_msg *curr = m->cmd_queue;
457        struct mbus_msg *prev = NULL;
458        int              alen = strlen(cmnd) + strlen(args) + 4;
459
460        if (reliable && !mbus_addr_valid(m, dest)) {
461                debug_msg("Trying to send reliably to an unknown address...\n");
462#ifdef NDEF
463                if (m->err_handler == NULL) {
464                        abort();
465                }
466                m->err_handler(curr->seqnum, MBUS_DESTINATION_UNKNOWN);
467#endif
468        }
469
470        while (curr != NULL) {
471                if ((!curr->complete)
472                && mbus_addr_match(curr->dest, dest)
473                && (curr->num_cmds < MBUS_MAX_QLEN)
474                && ((curr->message_size + alen) < (MBUS_BUF_SIZE - 8))) {
475                        /* Slots message in if it fits, but this breaks ordering.  Msg
476                         * X+1 maybe shorter than X that is in next packet, so X+1 jumps
477                         * ahead.
478                         */
479                        curr->num_cmds++;
480                        curr->reliable |= reliable;
481                        curr->cmd_list[curr->num_cmds-1] = xstrdup(cmnd);
482                        curr->arg_list[curr->num_cmds-1] = xstrdup(args);
483                        curr->message_size += alen;
484                        return;
485                } else {
486                        curr->complete = TRUE;
487                }
488                prev = curr;
489                curr = curr->next;
490        }
491        curr = (struct mbus_msg *) xmalloc(sizeof(struct mbus_msg));
492        curr->next             = NULL;
493        curr->dest             = xstrdup(dest);
494        curr->retransmit_count = 0;
495        curr->message_size     = alen + 60 + strlen(dest) + strlen(m->addr[0]);
496        curr->seqnum           = m->seqnum++;
497        curr->reliable         = reliable;
498        curr->complete         = FALSE;
499        curr->num_cmds         = 1;
500        curr->cmd_list[0]      = xstrdup(cmnd);
501        curr->arg_list[0]      = xstrdup(args);
502        if (prev == NULL) {
503                m->cmd_queue = curr;
504        } else {
505                prev->next = curr;
506        }
507        gettimeofday(&(curr->time), NULL);
508        gettimeofday(&(curr->ts),   NULL);
509}
510
511void mbus_qmsgf(struct mbus *m, char *dest, int reliable, const char *cmnd, const char *format, ...)
512{
513        /* This is a wrapper around mbus_qmsg() which does a printf() style format into  */
514        /* a buffer. Saves the caller from having to a a malloc(), write the args string */
515        /* and then do a free(), and also saves worring about overflowing the buffer, so */
516        /* removing a common source of bugs!                                             */
517        char    buffer[MBUS_BUF_SIZE];
518        va_list ap;
519
520        va_start(ap, format);
521#ifdef WIN32
522        _vsnprintf(buffer, MBUS_BUF_SIZE, format, ap);
523#else
524        vsnprintf(buffer, MBUS_BUF_SIZE, format, ap);
525#endif
526        va_end(ap);
527        mbus_qmsg(m, dest, cmnd, buffer, reliable);
528}
529
530void mbus_parse_init(struct mbus *m, char *str)
531{
532        assert(m->parse_depth < (MBUS_MAX_PD - 1));
533        m->parse_depth++;
534        m->parse_buffer[m->parse_depth] = str;
535        m->parse_bufend[m->parse_depth] = str + strlen(str);
536}
537
538void mbus_parse_done(struct mbus *m)
539{
540        m->parse_buffer[m->parse_depth] = NULL;
541        m->parse_bufend[m->parse_depth] = NULL;
542        m->parse_depth--;
543        assert(m->parse_depth >= 0);
544}
545
546#define CHECK_OVERRUN if (m->parse_buffer[m->parse_depth] > m->parse_bufend[m->parse_depth]) {\
547        debug_msg("parse buffer overflow\n");\
548        return FALSE;\
549}
550
551int mbus_parse_lst(struct mbus *m, char **l)
552{
553        int instr = FALSE;
554        int inlst = FALSE;
555
556        *l = m->parse_buffer[m->parse_depth];
557        while (isspace((unsigned char)*m->parse_buffer[m->parse_depth])) {
558                m->parse_buffer[m->parse_depth]++;
559                CHECK_OVERRUN;
560        }
561        if (*m->parse_buffer[m->parse_depth] != '(') {
562                return FALSE;
563        }
564        *(m->parse_buffer[m->parse_depth]) = ' ';
565        while (*m->parse_buffer[m->parse_depth] != '\0') {
566                if ((*m->parse_buffer[m->parse_depth] == '"') && (*(m->parse_buffer[m->parse_depth]-1) != '\\')) {
567                        instr = !instr;
568                }
569                if ((*m->parse_buffer[m->parse_depth] == '(') && (*(m->parse_buffer[m->parse_depth]-1) != '\\') && !instr) {
570                        inlst = !inlst;
571                }
572                if ((*m->parse_buffer[m->parse_depth] == ')') && (*(m->parse_buffer[m->parse_depth]-1) != '\\') && !instr) {
573                        if (inlst) {
574                                inlst = !inlst;
575                        } else {
576                                *m->parse_buffer[m->parse_depth] = '\0';
577                                m->parse_buffer[m->parse_depth]++;
578                                CHECK_OVERRUN;
579                                return TRUE;
580                        }
581                }
582                m->parse_buffer[m->parse_depth]++;
583                CHECK_OVERRUN;
584        }
585        return FALSE;
586}
587
588int mbus_parse_str(struct mbus *m, char **s)
589{
590        while (isspace((unsigned char)*m->parse_buffer[m->parse_depth])) {
591                m->parse_buffer[m->parse_depth]++;
592                CHECK_OVERRUN;
593        }
594        if (*m->parse_buffer[m->parse_depth] != '"') {
595                return FALSE;
596        }
597        *s = m->parse_buffer[m->parse_depth]++;
598        while (*m->parse_buffer[m->parse_depth] != '\0') {
599                if ((*m->parse_buffer[m->parse_depth] == '"') && (*(m->parse_buffer[m->parse_depth]-1) != '\\')) {
600                        m->parse_buffer[m->parse_depth]++;
601                        *m->parse_buffer[m->parse_depth] = '\0';
602                        m->parse_buffer[m->parse_depth]++;
603                        return TRUE;
604                }
605                m->parse_buffer[m->parse_depth]++;
606                CHECK_OVERRUN;
607        }
608        return FALSE;
609}
610
611static int mbus_parse_sym(struct mbus *m, char **s)
612{
613        while (isspace((unsigned char)*m->parse_buffer[m->parse_depth])) {
614                m->parse_buffer[m->parse_depth]++;
615                CHECK_OVERRUN;
616        }
617        if (!isgraph((unsigned char)*m->parse_buffer[m->parse_depth])) {
618                return FALSE;
619        }
620        *s = m->parse_buffer[m->parse_depth]++;
621        while (!isspace((unsigned char)*m->parse_buffer[m->parse_depth]) && (*m->parse_buffer[m->parse_depth] != '\0')) {
622                m->parse_buffer[m->parse_depth]++;
623                CHECK_OVERRUN;
624        }
625        *m->parse_buffer[m->parse_depth] = '\0';
626        m->parse_buffer[m->parse_depth]++;
627        CHECK_OVERRUN;
628        return TRUE;
629}
630
631int mbus_parse_int(struct mbus *m, int *i)
632{
633        char    *p;
634
635        while (isspace((unsigned char)*m->parse_buffer[m->parse_depth])) {
636                m->parse_buffer[m->parse_depth]++;
637                CHECK_OVERRUN;
638        }
639
640        *i = strtol(m->parse_buffer[m->parse_depth], &p, 10);
641        if (((*i == LONG_MAX) || (*i == LONG_MIN)) && (errno == ERANGE)) {
642                debug_msg("integer out of range\n");
643                return FALSE;
644        }
645
646        if (p == m->parse_buffer[m->parse_depth]) {
647                return FALSE;
648        }
649        if (!isspace((unsigned char)*p) && (*p != '\0')) {
650                return FALSE;
651        }
652        m->parse_buffer[m->parse_depth] = p;
653        CHECK_OVERRUN;
654        return TRUE;
655}
656
657int mbus_parse_flt(struct mbus *m, double *d)
658{
659        char    *p;
660        while (isspace((unsigned char)*m->parse_buffer[m->parse_depth])) {
661                m->parse_buffer[m->parse_depth]++;
662                CHECK_OVERRUN;
663        }
664
665        *d = strtod(m->parse_buffer[m->parse_depth], &p);
666        if (errno == ERANGE) {
667                debug_msg("float out of range\n");
668                return FALSE;
669        }
670
671        if (p == m->parse_buffer[m->parse_depth]) {
672                return FALSE;
673        }
674        if (!isspace((unsigned char)*p) && (*p != '\0')) {
675                return FALSE;
676        }
677        m->parse_buffer[m->parse_depth] = p;
678        CHECK_OVERRUN;
679        return TRUE;
680}
681
682char *mbus_decode_str(char *s)
683{
684        int     l = strlen(s);
685        int     i, j;
686
687        /* Check that this an encoded string... */
688        assert(s[0]   == '\"');
689        assert(s[l-1] == '\"');
690
691        for (i=1,j=0; i < l - 1; i++,j++) {
692                if (s[i] == '\\') {
693                        i++;
694                }
695                s[j] = s[i];
696        }
697        s[j] = '\0';
698        return s;
699}
700
701char *mbus_encode_str(const char *s)
702{
703        int      i, j;
704        int      len = strlen(s);
705        char    *buf = (char *) xmalloc((len * 2) + 3);
706
707        for (i = 0, j = 1; i < len; i++,j++) {
708                if (s[i] == ' ') {
709                        buf[j] = '\\';
710                        buf[j+1] = ' ';
711                        j++;
712                } else if (s[i] == '\"') {
713                        buf[j] = '\\';
714                        buf[j+1] = '\"';
715                        j++;
716                } else {
717                        buf[j] = s[i];
718                }
719        }
720        buf[0]   = '\"';
721        buf[j]   = '\"';
722        buf[j+1] = '\0';
723        return buf;
724}
725
726int mbus_recv(struct mbus *m, void *data, struct timeval *timeout)
727{
728        char            *auth, *ver, *src, *dst, *ack, *r, *cmd, *param, *npos;
729        char            buffer[MBUS_BUF_SIZE];
730        int             buffer_len, seq, i, a, rx, ts, authlen;
731        char            ackbuf[MBUS_ACK_BUF_SIZE];
732        char            digest[16];
733        unsigned char   initVec[8] = {0,0,0,0,0,0,0,0};
734
735        rx = FALSE;
736        while (1) {
737                memset(buffer, 0, MBUS_BUF_SIZE);
738                assert(m->s != NULL);
739                udp_fd_zero();
740                udp_fd_set(m->s);
741                if ((udp_select(timeout) > 0) && udp_fd_isset(m->s)) {
742                        buffer_len = udp_recv(m->s, buffer, MBUS_BUF_SIZE);
743                        if (buffer_len > 0) {
744                                rx = TRUE;
745                        } else {
746                                return rx;
747                        }
748                } else {
749                        return FALSE;
750                }
751
752                if (m->encrkey != NULL) {
753                        /* Decrypt the message... */
754                        if ((buffer_len % 8) != 0) {
755                                debug_msg("Encrypted message not a multiple of 8 bytes in length\n");
756                                continue;
757                        }
758                        memcpy(tx_cryptbuf, buffer, buffer_len);
759                        memset(initVec, 0, 8);
760                        qfDES_CBC_d(m->encrkey, tx_cryptbuf, buffer_len, initVec);
761                        memcpy(buffer, tx_cryptbuf, buffer_len);
762                }
763
764                /* Sanity check that this is a vaguely sensible format message... Should prevent */
765                /* problems if we're fed complete garbage, but won't prevent determined hackers. */
766                if (strncmp(buffer + MBUS_AUTH_LEN + 1, "mbus/1.0", 8) != 0) {
767                        debug_msg("Message did not correctly decrypt...\n");
768                        continue;
769                }
770
771                mbus_parse_init(m, buffer);
772                /* remove trailing 0 bytes */
773                npos=strchr(buffer,'\0');
774                if(npos!=NULL) {
775                        buffer_len=npos-buffer;
776                }
777                /* Parse the authentication header */
778                if (!mbus_parse_sym(m, &auth)) {
779                        debug_msg("Failed to parse authentication header\n");
780                        mbus_parse_done(m);
781                        continue;
782                }
783
784                /* Check that the packet authenticates correctly... */
785                authlen = strlen(auth);
786                hmac_md5(buffer + authlen + 1, buffer_len - authlen - 1, m->hashkey, m->hashkeylen, digest);
787                base64encode(digest, 12, ackbuf, 16);
788                if ((strlen(auth) != 16) || (strncmp(auth, ackbuf, 16) != 0)) {
789                        debug_msg("Failed to authenticate message...\n");
790                        mbus_parse_done(m);
791                        continue;
792                }
793
794                /* Parse the header */
795                if (!mbus_parse_sym(m, &ver)) {
796                        mbus_parse_done(m);
797                        debug_msg("Parser failed version (1): %s\n",ver);
798                        continue;
799                }
800                if (strcmp(ver, "mbus/1.0") != 0) {
801                        mbus_parse_done(m);
802                        debug_msg("Parser failed version (2): %s\n",ver);
803                        continue;
804                }
805                if (!mbus_parse_int(m, &seq)) {
806                        mbus_parse_done(m);
807                        debug_msg("Parser failed seq\n");
808                        continue;
809                }
810                if (!mbus_parse_int(m, &ts)) {
811                        mbus_parse_done(m);
812                        debug_msg("Parser failed ts\n");
813                        continue;
814                }
815                if (!mbus_parse_sym(m, &r)) {
816                        mbus_parse_done(m);
817                        debug_msg("Parser failed reliable\n");
818                        continue;
819                }
820                if (!mbus_parse_lst(m, &src)) {
821                        mbus_parse_done(m);
822                        debug_msg("Parser failed src\n");
823                        continue;
824                }
825                if (!mbus_parse_lst(m, &dst)) {
826                        mbus_parse_done(m);
827                        debug_msg("Parser failed dst\n");
828                        continue;
829                }
830                if (!mbus_parse_lst(m, &ack)) {
831                        mbus_parse_done(m);
832                        debug_msg("Parser failed ack\n");
833                        continue;
834                }
835
836                store_other_addr(m, src);
837
838                /* Check if the message was addressed to us... */
839                for (i = 0; i < m->num_addr; i++) {
840                        if (mbus_addr_match(m->addr[i], dst)) {
841                                /* ...if so, process any ACKs received... */
842                                mbus_parse_init(m, ack);
843                                while (mbus_parse_int(m, &a)) {
844                                        if (mbus_waiting_ack(m) && (m->waiting_ack->seqnum == a)) {
845                                                while (m->waiting_ack->num_cmds > 0) {
846                                                        m->waiting_ack->num_cmds--;
847                                                        xfree(m->waiting_ack->cmd_list[m->waiting_ack->num_cmds]);
848                                                        xfree(m->waiting_ack->arg_list[m->waiting_ack->num_cmds]);
849                                                }
850                                                xfree(m->waiting_ack->dest);
851                                                xfree(m->waiting_ack);
852                                                m->waiting_ack = NULL;
853                                        }
854                                }
855                                mbus_parse_done(m);
856                                /* ...if an ACK was requested, send one... */
857                                if (strcmp(r, "R") == 0) {
858                                        char            *newsrc = (char *) xmalloc(strlen(src) + 3);
859                                        struct timeval   t;
860
861                                        sprintf(newsrc, "(%s)", src);   /* Yes, this is a kludge. */
862                                        gettimeofday(&t, NULL);
863                                        tx_header(++m->seqnum, (int) t.tv_sec, 'U', m->addr[0], newsrc, seq);
864                                        tx_send(m);
865                                        xfree(newsrc);
866                                }
867                                /* ...and process the commands contained in the message */
868                                while (mbus_parse_sym(m, &cmd)) {
869                                        if (mbus_parse_lst(m, &param)) {
870                                                char            *newsrc = (char *) xmalloc(strlen(src) + 3);
871                                                sprintf(newsrc, "(%s)", src);   /* Yes, this is a kludge. */
872                                                m->cmd_handler(newsrc, cmd, param, data);
873                                                xfree(newsrc);
874                                        } else {
875                                                debug_msg("Unable to parse mbus command:\n");
876                                                debug_msg("cmd = %s\n", cmd);
877                                                debug_msg("arg = %s\n", param);
878                                                break;
879                                        }
880                                }
881                        }
882                }
883                mbus_parse_done(m);
884        }
885}
Note: See TracBrowser for help on using the browser.