root/vic/trunk/mbus.cc @ 734

Revision 734, 15.2 KB (checked in by ucackha, 16 years ago)

Added registry code from RAT + tidying up. Kris

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*
2 * FILE:    mbus.c
3 * AUTHORS: Colin Perkins
4 * Modified by Dimitrios Miras
5 *
6 * Copyright (c) 1997,1998 University College London
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, is permitted, for non-commercial use only, provided
11 * that the following conditions are met:
12 * 1. Redistributions of source code must retain the above copyright
13 *    notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 *    notice, this list of conditions and the following disclaimer in the
16 *    documentation and/or other materials provided with the distribution.
17 * 3. All advertising materials mentioning features or use of this software
18 *    must display the following acknowledgement:
19 *      This product includes software developed by the Computer Science
20 *      Department at University College London
21 * 4. Neither the name of the University nor of the Department may be used
22 *    to endorse or promote products derived from this software without
23 *    specific prior written permission.
24 * Use of this software for commercial purposes is explicitly forbidden
25 * unless prior written permission is obtained from the authors.
26 *
27 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
28 * ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
29 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
30 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
31 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
32 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
33 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
34 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
35 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
36 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
37 * SUCH DAMAGE.
38 */
39
40#ifndef WIN32
41#include <strings.h>
42#include <ctype.h>
43#include <stdio.h>
44#include <stdlib.h>
45#include <string.h>
46#include <sys/time.h>
47#include <sys/types.h>
48#include <sys/socket.h>
49#include <netinet/in.h>
50#include <arpa/inet.h>
51#include <netdb.h>
52#include <unistd.h>
53#endif
54
55#include <ctype.h>
56#include <stdio.h>
57#include <stdlib.h>
58#include <string.h>
59#include <sys/types.h>
60#include <assert.h>
61#include "mbus.h"
62
63#define MBUS_ADDR       0xe0ffdeef      /* 224.255.222.239 */
64#define MBUS_PORT       47000
65#define MBUS_BUF_SIZE   1024
66#define MBUS_MAX_ADDR   10
67#define MBUS_MAX_PD     10
68
69
70int MBusHandler::mbus_addr_match(char *a, char *b)
71{
72        while ((*a != '\0') && (*b != '\0')) {
73                while (isspace(*a)) a++;
74                while (isspace(*b)) b++;
75                if (*a == '*') {
76                        a++;
77                        if ((*a != '\0') && !isspace(*a)) {
78                                return 0;
79                        }
80                        while(!isspace(*b) && (*b != '\0')) b++;
81                }
82                if (*b == '*') {
83                        b++;
84                        if ((*b != '\0') && !isspace(*b)) {
85                                return 0;
86                        }
87                        while(!isspace(*a) && (*a != '\0')) a++;
88                }
89                if (*a != *b) {
90                        return 0;
91                }
92                a++;
93                b++;
94        }
95        return 1;
96}
97
98
99
100void MBusHandler::mbus_ack_list_insert(char *srce, char *dest, char *cmnd, char *args, int seqnum)
101{
102        struct mbus_ack *curr = (struct mbus_ack *) malloc(sizeof(struct mbus_ack));
103
104        assert(srce != NULL);
105        assert(dest != NULL);
106        assert(cmnd != NULL);
107        assert(args != NULL);
108
109        mbus_parse_init(strdup(dest));
110        mbus_parse_lst(&(curr->dest));
111        mbus_parse_done();
112
113        curr->next = m_->ack_list;
114        curr->prev = NULL;
115        curr->cmnd = strdup(cmnd);
116        curr->args = strdup(args);
117        curr->seqn = seqnum;
118        gettimeofday(&(curr->time), NULL);
119
120        if (m_->ack_list != NULL) {
121                m_->ack_list->prev = curr;
122        }
123        m_->ack_list = curr;
124}
125
126void MBusHandler::mbus_ack_list_remove(char *srce, char *dest, int seqnum)
127{
128        /* This would be much more efficient if it scanned from last to first, since     */
129        /* we're most likely to receive ACKs in LIFO order, and this assumes FIFO...     */
130        /* We hope that the number of outstanding ACKs is small, so this doesn't matter. */
131        struct mbus_ack *curr = m_->ack_list;
132
133        while (curr != NULL) {
134                if (mbus_addr_match(curr->srce, dest) && mbus_addr_match(curr->dest, srce) && (curr->seqn == seqnum)) {
135                        free(curr->srce);
136                        free(curr->dest);
137                        free(curr->cmnd);
138                        free(curr->args);
139                        if (curr->next != NULL) curr->next->prev = curr->prev;
140                        if (curr->prev != NULL) curr->prev->next = curr->next;
141                        if (m_->ack_list == curr) m_->ack_list = curr->next;
142                        free(curr);
143                        return;
144                }
145                curr = curr->next;
146        }
147        /* If we get here, it's an ACK for something that's not in the ACK
148         * list. That's not necessarily a problem, could just be a duplicate
149         * ACK for a retransmission... We ignore it for now...
150         */
151}
152
153
154void MBusHandler::mbus_send_ack(char *dest, int seqnum)
155{
156        char                    buffer[80];
157        struct sockaddr_in      saddr;
158        u_long                  addr = MBUS_ADDR;
159
160        memcpy((char *) &saddr.sin_addr.s_addr, (char *) &addr, sizeof(addr));
161        saddr.sin_family = AF_INET;
162        saddr.sin_port   = htons(MBUS_PORT+m_->channel);
163        sprintf(buffer, "mbus/1.0 %d U (%s) (%s) (%d)\n", ++m_->seqnum, m_->addr[0], dest, seqnum);
164        if ((sendto(m_->fd, buffer, strlen(buffer), 0, (struct sockaddr *) &saddr, sizeof(saddr))) < 0) {
165                perror("mbus_send: sendto");
166        }
167}
168
169void  MBusHandler::mbus_retransmit()
170{
171        struct mbus_ack         *curr = m_->ack_list;
172        struct timeval           time;
173        long                     diff;
174        char                    *b;
175        struct sockaddr_in       saddr;
176        u_long                   addr = MBUS_ADDR;
177
178        gettimeofday(&time, NULL);
179
180        while (curr != NULL) {
181                /* diff is time in milliseconds that the message has been awaiting an ACK */
182                diff = ((time.tv_sec * 1000) + (time.tv_usec / 1000)) - ((curr->time.tv_sec * 1000) + (curr->time.tv_usec / 1000));
183                if (diff > 10000) {
184                        printf("Reliable mbus message failed! (wait=%ld)\n", diff);
185                        printf(">>>\n");
186                        printf("   mbus/1.0 %d R (%s) %s ()\n   %s (%s)\n", curr->seqn, curr->srce, curr->dest, curr->cmnd, curr->args);
187                        printf("<<<\n");
188                        if (m_->err_handler != NULL) {
189                                m_->err_handler(curr->seqn);
190                        } else {
191                                abort();
192                        }
193                        abort();
194                }
195                if (diff > 2000) {
196                        memcpy((char *) &saddr.sin_addr.s_addr, (char *) &addr, sizeof(addr));
197                        saddr.sin_family = AF_INET;
198                        saddr.sin_port   = htons(MBUS_PORT+m_->channel);
199                        b                = (char *)malloc(strlen(curr->dest)+strlen(curr->cmnd)+strlen(curr->args)+strlen(curr->srce)+80);
200                        sprintf(b, "mbus/1.0 %d R (%s) %s ()\n%s (%s)\n", curr->seqn, curr->srce, curr->dest, curr->cmnd, curr->args);
201                        if ((sendto(m_->fd, b, strlen(b), 0, (struct sockaddr *) &saddr, sizeof(saddr))) < 0) {
202                                perror("mbus_send: sendto");
203                        }
204                        free(b);
205                }
206                curr = curr->next;
207        }
208}
209
210int MBusHandler::mbus_socket_init(int channel)
211{
212        struct sockaddr_in sinme;
213        struct ip_mreq     imr;
214#ifdef WIN32
215        int                                ttl   =  0;
216#else
217        char               ttl   =  0;
218#endif
219        int                reuse =  1;
220        char               loop  =  1;
221        int                fd    = -1;
222
223        if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
224                perror("mbus: socket");
225                return -1;
226        }
227
228        if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse, sizeof(reuse)) < 0) {
229                perror("mbus: setsockopt SO_REUSEADDR");
230                return -1;
231        }
232#ifdef SO_REUSEPORT
233        if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (char *) &reuse, sizeof(reuse)) < 0) {
234                perror("mbus: setsockopt SO_REUSEPORT");
235                return -1;
236        }
237#endif
238
239        sinme.sin_family      = AF_INET;
240        sinme.sin_addr.s_addr = htonl(INADDR_ANY);
241        sinme.sin_port        = htons(MBUS_PORT+channel);
242        if (bind(fd, (struct sockaddr *) & sinme, sizeof(sinme)) < 0) {
243                perror("mbus: bind");
244                return -1;
245        }
246
247        imr.imr_multiaddr.s_addr = MBUS_ADDR;
248        imr.imr_interface.s_addr = INADDR_ANY;
249        if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *) &imr, sizeof(struct ip_mreq)) < 0) {
250                perror("mbus: setsockopt IP_ADD_MEMBERSHIP");
251                return -1;
252        }
253
254#ifndef WIN32
255        if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) < 0) {
256                perror("mbus: setsockopt IP_MULTICAST_LOOP");
257                return -1;
258        }
259#endif
260
261        if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, (char*)&ttl, sizeof(ttl)) < 0) {
262                perror("mbus: setsockopt IP_MULTICAST_TTL");
263                return -1;
264        }
265        assert(fd != -1);
266        return fd;
267}
268
269
270MBusHandler::MBusHandler(int  channel,
271                       void (*cmd_handler)(char *src, char *cmd, char *arg, void *dat),
272                       void (*err_handler)(int seqnum))
273        : msgs(0)
274{
275        int i;
276
277        m_ = (struct mbus *) malloc(sizeof(struct mbus));
278        m_->fd = mbus_socket_init(channel);
279        unlink();
280        link(m_->fd, TK_READABLE);
281        m_->channel = channel;
282        m_->seqnum = 0;
283        m_->ack_list = NULL;
284        m_->cmd_handler = cmd_handler;
285        m_->err_handler = err_handler;
286        m_->num_addr = 0;
287        m_->parse_depth = 0;
288        for (i=0; i<MBUS_MAX_ADDR; i++)
289                m_->addr[i] = NULL;
290        for (i=0; i<MBUS_MAX_PD; i++)
291                m_->parse_buffer[i] = NULL;
292        /* XXX 0 should eventually be replaced by pid */
293        mbus_addr("(video engine vic 0)");
294        mbus_audio_addr = strdup("(audio engine * *)");
295}
296
297
298MBusHandler::~MBusHandler()
299{
300        int i;
301
302        struct mbus_ack *tmp;   
303        if (m_) {
304#ifndef WIN32
305                close(m_->fd);
306#endif
307                if (mbus_audio_addr)
308                        free(mbus_audio_addr);
309                for (i=0; i<MBUS_MAX_ADDR; i++)
310                        if (m_->addr[i])
311                                free(m_->addr[i]);
312                for (i=0; i<MBUS_MAX_PD; i++)
313                        if (m_->parse_buffer[i])
314                                free(m_->parse_buffer[i]);
315                while (m_->ack_list) {
316                        tmp = m_->ack_list;
317                        m_->ack_list = m_->ack_list->next;
318                        free(tmp);
319                }
320                free(m_);
321        }
322}
323
324
325void MBusHandler::dispatch(int mask)
326{
327        mbus_recv(this);
328}
329
330
331
332void MBusHandler::mbus_addr(char *addr)
333{
334        assert(m_->num_addr < MBUS_MAX_ADDR);
335        mbus_parse_init(strdup(addr));
336        if (mbus_parse_lst(&(m_->addr[m_->num_addr]))) {
337                m_->num_addr++;
338        }
339        mbus_parse_done();
340}
341
342
343
344int MBusHandler::mbus_send(char *dest, char *cmnd, char *args, int reliable)
345{
346        char                    *buffer;
347        struct sockaddr_in       saddr;
348        u_long                   addr = MBUS_ADDR;
349
350        assert(dest != NULL);
351        assert(cmnd != NULL);
352        assert(args != NULL);
353        assert(strlen(cmnd) != 0);
354
355        m_->seqnum++;
356
357        if (reliable) {
358                mbus_ack_list_insert(m_->addr[0], dest, cmnd, args, m_->seqnum);
359        }
360
361        memcpy((char *) &saddr.sin_addr.s_addr, (char *) &addr, sizeof(addr));
362        saddr.sin_family = AF_INET;
363        saddr.sin_port = htons(MBUS_PORT+m_->channel);
364        buffer = (char *) malloc(strlen(dest) + strlen(cmnd) + strlen(args) + strlen(m_->addr[0]) + 80);
365        sprintf(buffer, "mbus/1.0 %d %c (%s) %s ()\n%s (%s)\n", m_->seqnum, reliable?'R':'U', m_->addr[0], dest, cmnd, args);
366        if ((sendto(m_->fd, buffer, strlen(buffer), 0, (struct sockaddr *) &saddr, sizeof(saddr))) < 0) {
367                perror("mbus_send: sendto");
368        }
369        free(buffer);
370        return m_->seqnum;
371}
372
373void MBusHandler::mbus_parse_init(char *str)
374{
375        assert(m_->parse_depth < (MBUS_MAX_PD - 1));
376        m_->parse_buffer[++m_->parse_depth] = str;
377}
378
379void MBusHandler::mbus_parse_done()
380{
381        m_->parse_depth--;
382        assert(m_->parse_depth >= 0);
383}
384
385int MBusHandler::mbus_parse_lst(char **l)
386{
387        int instr = 0;
388        int inlst = 0;
389
390        *l = m_->parse_buffer[m_->parse_depth];
391        while (isspace(*m_->parse_buffer[m_->parse_depth])) {
392                m_->parse_buffer[m_->parse_depth]++;
393        }
394        if (*m_->parse_buffer[m_->parse_depth] != '(') {
395                return 0;
396        }
397        *(m_->parse_buffer[m_->parse_depth]) = ' ';
398        while (*m_->parse_buffer[m_->parse_depth] != '\0') {
399                if ((*m_->parse_buffer[m_->parse_depth] == '"') && (*(m_->parse_buffer[m_->parse_depth]-1) != '\\')) {
400                        instr = !instr;
401                }
402                if ((*m_->parse_buffer[m_->parse_depth] == '(') && (*(m_->parse_buffer[m_->parse_depth]-1) != '\\') && !instr) {
403                        inlst = !inlst;
404                }
405                if ((*m_->parse_buffer[m_->parse_depth] == ')') && (*(m_->parse_buffer[m_->parse_depth]-1) != '\\') && !instr) {
406                        if (inlst) {
407                                inlst = !inlst;
408                        } else {
409                                *m_->parse_buffer[m_->parse_depth] = '\0';
410                                m_->parse_buffer[m_->parse_depth]++;
411                                return 1;
412                        }
413                }
414                m_->parse_buffer[m_->parse_depth]++;
415        }
416        return 0;
417}
418
419int MBusHandler::mbus_parse_str(char **s)
420{
421        while (isspace(*m_->parse_buffer[m_->parse_depth])) {
422                m_->parse_buffer[m_->parse_depth]++;
423        }
424        if (*m_->parse_buffer[m_->parse_depth] != '"') {
425                return 0;
426        }
427        *s = m_->parse_buffer[m_->parse_depth]++;
428        while (*m_->parse_buffer[m_->parse_depth] != '\0') {
429                if ((*m_->parse_buffer[m_->parse_depth] == '"') && (*(m_->parse_buffer[m_->parse_depth]-1) != '\\')) {
430                        m_->parse_buffer[m_->parse_depth]++;
431                        *m_->parse_buffer[m_->parse_depth] = '\0';
432                        m_->parse_buffer[m_->parse_depth]++;
433                        return 1;
434                }
435                m_->parse_buffer[m_->parse_depth]++;
436        }
437        return 0;
438}
439
440int MBusHandler::mbus_parse_sym(char **s)
441{
442        while (isspace(*m_->parse_buffer[m_->parse_depth])) {
443                m_->parse_buffer[m_->parse_depth]++;
444        }
445        if (!isalpha(*m_->parse_buffer[m_->parse_depth])) {
446                return 0;
447        }
448        *s = m_->parse_buffer[m_->parse_depth]++;
449        while (!isspace(*m_->parse_buffer[m_->parse_depth]) && (*m_->parse_buffer[m_->parse_depth] != '\0')) {
450                m_->parse_buffer[m_->parse_depth]++;
451        }
452        *m_->parse_buffer[m_->parse_depth] = '\0';
453        m_->parse_buffer[m_->parse_depth]++;
454        return 1;
455}
456
457int MBusHandler::mbus_parse_int(int *i)
458{
459        char    *p;
460        *i = strtol(m_->parse_buffer[m_->parse_depth], &p, 10);
461
462        if (p == m_->parse_buffer[m_->parse_depth]) {
463                return 0;
464        }
465        if (!isspace(*p) && (*p != '\0')) {
466                return 0;
467        }
468        m_->parse_buffer[m_->parse_depth] = p;
469        return 1;
470}
471
472int MBusHandler::mbus_parse_flt(double *d)
473{
474        char    *p;
475        *d = strtod(m_->parse_buffer[m_->parse_depth], &p);
476
477        if (p == m_->parse_buffer[m_->parse_depth]) {
478                return 0;
479        }
480        if (!isspace(*p) && (*p != '\0')) {
481                return 0;
482        }
483        m_->parse_buffer[m_->parse_depth] = p;
484        return 1;
485}
486
487char *MBusHandler::mbus_decode_str(char *s)
488{
489        int   l = strlen(s), i, j;
490
491        /* Check that this an encoded string... */
492        assert(s[0]   == '\"');
493        assert(s[l-1] == '\"');
494
495        for (i=1, j=0; i< l-1; j++, i++) {
496                if (s[i] == '\\') {
497                        i++;
498                }
499                s[j] = s[i];
500        }
501
502        s[l-2] = '\0';
503        return s;
504}
505
506char *MBusHandler::mbus_encode_str(char *s)
507{
508        static char     *encode_buffer = NULL;
509        static int       encode_buflen = 0;
510
511        int l = strlen(s);
512        if (encode_buflen < l) {
513                if (encode_buffer != NULL) {
514                        free(encode_buffer);
515                }
516                encode_buflen = l+3;
517                encode_buffer = (char *) malloc(encode_buflen);
518        }
519        strcpy(encode_buffer+1, s);
520        encode_buffer[0]   = '\"';
521        encode_buffer[l+1] = '\"';
522        encode_buffer[l+2] = '\0';
523        return encode_buffer;
524}
525
526void MBusHandler::mbus_recv(void *data)
527{
528        char    *ver, *src, *dst, *ack, *r, *cmd, *param;
529        char     buffer[MBUS_BUF_SIZE];
530        int      buffer_len, seq, i, a;
531
532        memset(buffer, 0, MBUS_BUF_SIZE);
533        if ((buffer_len = recvfrom(m_->fd, buffer, MBUS_BUF_SIZE, 0, NULL, NULL)) <= 0) {
534                return;
535        }
536        msgs++;
537        mbus_parse_init(buffer);
538        /* Parse the header */
539        if (!mbus_parse_sym(&ver) || (strcmp(ver, "mbus/1.0") != 0)) {
540                mbus_parse_done();
541                return;
542        }
543        if (!mbus_parse_int(&seq)) {
544                mbus_parse_done();
545                return;
546        }
547        if (!mbus_parse_sym(&r)) {
548                mbus_parse_done();
549                return;
550        }
551        if (!mbus_parse_lst(&src)) {
552                mbus_parse_done();
553                return;
554        }
555        if (!mbus_parse_lst(&dst)) {
556                mbus_parse_done();
557                return;
558        }
559        if (!mbus_parse_lst(&ack)) {
560                mbus_parse_done();
561                return;
562        }
563        /* Check if the message was addressed to us... */
564        for (i = 0; i < m_->num_addr; i++) {
565                if (mbus_addr_match(m_->addr[i], dst)) {
566                        /* ...if so, process any ACKs received... */
567                        mbus_parse_init(ack);
568                        while (mbus_parse_int(&i)) {
569                                mbus_ack_list_remove(src, dst, i);
570                        }
571                        mbus_parse_done();
572                        /* ...if an ACK was requested, send one... */
573                        if (strcmp(r, "R") == 0) {
574                                mbus_send_ack(src, seq);
575                        }
576                        /* ...and process the commands contained in the message */
577                        while (mbus_parse_sym(&cmd)) {
578                                if (mbus_parse_lst(&param) == 0) {
579                                        break;
580                                }
581                                m_->cmd_handler(src, cmd, param, data);
582                        }
583                }
584        }
585        mbus_parse_done();
586}
Note: See TracBrowser for help on using the browser.