root/vic/branches/cc/rtp/session.cpp @ 4184

Revision 4184, 29.3 KB (checked in by soohyunc, 6 years ago)

fix warning message:
warning: deprecated conversion from string constant to ‘char*’

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*-
2 * Copyright (c) 1993-1994 The Regents of the University of California.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 * 3. All advertising materials mentioning features or use of this software
14 *    must display the following acknowledgement:
15 *      This product includes software developed by the University of
16 *      California, Berkeley and the Network Research Group at
17 *      Lawrence Berkeley Laboratory.
18 * 4. Neither the name of the University nor of the Laboratory may be used
19 *    to endorse or promote products derived from this software without
20 *    specific prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
23 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
26 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
33 */
34static const char rcsid[] =
35    "@(#) $Header$ (LBL)";
36
37#include "config.h"
38#include <math.h>
39#include <errno.h>
40#include <string.h>
41#ifdef WIN32
42extern "C" int getpid();
43#endif
44#include "source.h"
45#include "vic_tcl.h"
46#include "media-timer.h"
47#include "crypt.h"
48#include "timer.h"
49#include "ntp-time.h"
50#include "session.h"
51
52/* added to support the mbus
53#include "mbus_handler.h"*/
54
55
56static class SessionMatcher : public Matcher {
57    public:
58                SessionMatcher() : Matcher("session") {}
59                TclObject* match(const char* id) {
60                        if (strcmp(id, "audio/rtp") == 0)
61                                return (new AudioSessionManager);
62                        else if (strcmp(id, "video/rtp") == 0)
63                                return (new VideoSessionManager);
64                        return (0);
65                }
66} session_matcher;
67
68int VideoSessionManager::check_format(int fmt) const
69{
70        switch(fmt) {
71                case RTP_PT_RAW:
72                case RTP_PT_CELLB:
73                case RTP_PT_JPEG:
74                case RTP_PT_CUSEEME:
75                case RTP_PT_NV:
76                case RTP_PT_CPV:
77                case RTP_PT_H261:
78                case RTP_PT_BVC:
79                case RTP_PT_H261_COMPAT:/*XXX*/
80                case RTP_PT_H263:
81                case RTP_PT_MPEG4:
82                case RTP_PT_H264:
83                case RTP_PT_H263P:
84                case RTP_PT_LDCT:
85                case RTP_PT_PVH:
86                case RTP_PT_DV:
87#ifdef USE_H261AS
88                case RTP_PT_H261AS:
89#endif 
90                return (1);
91        }
92        return (0);
93}
94
95int AudioSessionManager::check_format(int fmt) const
96{
97        switch (fmt) {
98        case RTP_PT_PCMU:
99        case RTP_PT_CELP:
100        case RTP_PT_GSM:
101        case RTP_PT_DVI:
102        case RTP_PT_LPC:
103                return (1);
104        }
105        return (0);
106}
107
108
109static SessionManager* manager;
110
111void
112adios()
113{
114        if (SourceManager::instance().localsrc() != 0)
115                manager->send_bye();
116        exit(0);
117}
118
119/*void ReportTimer::timeout()
120{
121sm_.send_report();
122}*/
123
124void DataHandler::dispatch(int)
125{
126        sm_->recv(this);
127}
128
129void CtrlHandler::dispatch(int)
130{
131        sm_->recv(this);
132}
133
134CtrlHandler::CtrlHandler()
135: ctrl_inv_bw_(0.),
136ctrl_avg_size_(128.),
137rint_(0.0) //SV-XXX: Debian
138{
139}
140
141inline void CtrlHandler::schedule_timer()
142{
143        msched(int(fmod(double(random()), rint_) + rint_ * .5 + .5));
144}
145
146void CtrlHandler::net(Network* n)
147{
148        DataHandler::net(n);
149        cancel();
150        if (n != 0) {
151        /*
152        * schedule a timer for our first report using half the
153        * min ctrl interval.  This gives us some time before
154        * our first report to learn about other sources so our
155        * next report interval will account for them.  The avg
156        * ctrl size was initialized to 128 bytes which is
157        * conservative (it assumes everyone else is generating
158        * SRs instead of RRs).
159                */
160                double rint = ctrl_avg_size_ * ctrl_inv_bw_;
161                if (rint < CTRL_MIN_RPT_TIME / 2. * 1000.)
162                        rint = CTRL_MIN_RPT_TIME / 2. * 1000.;
163                rint_ = rint;
164                schedule_timer();
165        }
166}
167
168void CtrlHandler::sample_size(int cc)
169{
170        ctrl_avg_size_ += CTRL_SIZE_GAIN * (double(cc + 28) - ctrl_avg_size_);
171}
172
173void CtrlHandler::adapt(int nsrc, int nrr, int we_sent)
174{
175/*
176* compute the time to the next report.  we do this here
177* because we need to know if there were any active sources
178* during the last report period (nrr above) & if we were
179* a source.  The bandwidth limit for ctrl traffic was set
180* on startup from the session bandwidth.  It is the inverse
181* of bandwidth (ie., ms/byte) to avoid a divide below.
182        */
183        double ibw = ctrl_inv_bw_;
184        if (nrr) {
185                /* there were active sources */
186                if (we_sent) {
187                        ibw *= 1./CTRL_SENDER_BW_FRACTION;
188                        nsrc = nrr;
189                } else {
190                        ibw *= 1./CTRL_RECEIVER_BW_FRACTION;
191                        nsrc -= nrr;
192                }
193        }
194        double rint = ctrl_avg_size_ * double(nsrc) * ibw;
195        if (rint < CTRL_MIN_RPT_TIME * 1000.)
196                rint = CTRL_MIN_RPT_TIME * 1000.;
197        rint_ = rint;
198}
199
200void CtrlHandler::timeout()
201{
202        sm_->announce(this);
203        schedule_timer();
204}
205
206//SV-XXX: rearranged initialization order to shut up gcc4
207SessionManager::SessionManager()
208//      : dh_(*this), ch_(*this), rt_(*this),
209: mb_(mbus_handler_engine, NULL),
210lipSyncEnabled_(0),
211badversion_(0),
212badoptions_(0),
213badfmt_(0),
214badext_(0),
215nrunt_(0),
216last_np_(0),
217sdes_seq_(0),
218rtcp_inv_bw_(0.),
219rtcp_avg_size_(128.),
220confid_(-1)
221{
222        /*XXX For adios() to send bye*/
223        manager = this;
224       
225        for (int i = 0; i < NLAYER; ++i) {
226                dh_[i].manager(this);
227                ch_[i].manager(this);
228        }
229       
230        /*XXX*/
231        pktbuf_ = new u_char[2 * RTP_MTU];
232        pool_ = new BufferPool;
233       
234        /*
235        * schedule a timer for our first report using half the
236        * min rtcp interval.  This gives us some time before
237        * our first report to learn about other sources so our
238        * next report interval will account for them.  The avg
239        * rtcp size was initialized to 128 bytes which is
240        * conservative (it assumes everyone else is generating
241        * SRs instead of RRs).
242        */
243        double rint = rtcp_avg_size_ * rtcp_inv_bw_;
244        if (rint < RTCP_MIN_RPT_TIME / 2. * 1000.)
245                rint = RTCP_MIN_RPT_TIME / 2. * 1000.;
246        rint_ = rint;
247        //rt_.msched(int(fmod(double(random()), rint) + rint * .5 + .5));
248}
249
250SessionManager::~SessionManager()
251{
252        if (pktbuf_)
253                delete[] pktbuf_;
254       
255        delete pool_;
256}
257
258u_int32_t SessionManager::alloc_srcid(Address & addr) const
259{
260        timeval tv;
261        ::gettimeofday(&tv, 0);
262        u_int32_t srcid = u_int32_t(tv.tv_sec + tv.tv_usec);
263        srcid += (u_int32_t)getuid();
264        srcid += (u_int32_t)getpid();
265/* __IPv6 changed srcid computation */
266        for(unsigned int i = 0; i < (addr.length() % sizeof(u_int32_t)); i++) {
267                srcid += ((u_int32_t*)((const void*)addr))[i];
268        }
269        return (srcid);
270}
271
272extern char* onestat(char* cp, const char* name, u_long v);
273
274char* SessionManager::stats(char* cp) const
275{
276        cp = onestat(cp, "Bad-RTP-version", badversion_);
277        cp = onestat(cp, "Bad-RTPv1-options", badoptions_);
278        cp = onestat(cp, "Bad-Payload-Format", badfmt_);
279        cp = onestat(cp, "Bad-RTP-Extension", badext_);
280        cp = onestat(cp, "Runts", nrunt_);
281        Crypt* p = dh_[0].net()->crypt();
282        if (p != 0) {
283                cp = onestat(cp, "Crypt-Bad-Length", p->badpktlen());
284                cp = onestat(cp, "Crypt-Bad-P-Bit", p->badpbit());
285        }
286        /*XXX*/
287        if (ch_[0].net() != 0) {
288                Crypt* p = ch_[0].net()->crypt();
289                if (p != 0) {
290                        cp = onestat(cp, "Crypt-Ctrl-Bad-Length", p->badpktlen());
291                        cp = onestat(cp, "Crypt-Ctrl-Bad-P-Bit", p->badpbit());
292                }
293        }
294        *--cp = 0;
295        return (cp);
296}
297
298int SessionManager::command(int argc, const char*const* argv)
299{
300        Tcl& tcl = Tcl::instance();
301        char* cp = tcl.buffer();
302        if (argc == 2) {
303                if (strcmp(argv[1], "active") == 0) {
304                        SourceManager::instance().sortactive(cp);
305                        tcl.result(cp);
306                        return (TCL_OK);
307                }
308                if (strcmp(argv[1], "local-addr-heuristic") == 0) {
309                        strcpy(cp, intoa(LookupLocalAddr()));
310                        tcl.result(cp);
311                        return (TCL_OK);
312                }
313                if (strcmp(argv[1], "stats") == 0) {
314                        stats(cp);
315                        tcl.result(cp);
316                        return (TCL_OK);
317                }
318                if (strcmp(argv[1], "nb") == 0) {
319                        sprintf(cp, "%u", 8 * nb_);
320                        tcl.result(cp);
321                        return (TCL_OK);
322                }
323                if (strcmp(argv[1], "nf") == 0) {
324                        sprintf(cp, "%u", nf_);
325                        tcl.result(cp);
326                        return (TCL_OK);
327                }
328                if (strcmp(argv[1], "np") == 0 ||
329                    strcmp(argv[1], "ns") == 0) {
330                        sprintf(cp, "%u", np_);
331                        tcl.result(cp);
332                        return (TCL_OK);
333                }
334                if (strcmp(argv[1], "lip-sync") == 0) {
335                        sprintf(cp, "%u", lipSyncEnabled_);
336                        tcl.result(cp);
337                        return (TCL_OK);
338                }
339
340        } else if (argc == 3) {
341                if (strcmp(argv[1], "sm") == 0) {
342                        sm_ = (SourceManager*)TclObject::lookup(argv[2]);
343                        return (TCL_OK);
344                }
345                if (strcmp(argv[1], "name") == 0) {
346                        Source* s = SourceManager::instance().localsrc();
347                        s->sdes(RTCP_SDES_NAME, argv[2]);
348                        return (TCL_OK);
349                }
350                if (strcmp(argv[1], "email") == 0) {
351                        Source* s = SourceManager::instance().localsrc();
352                        s->sdes(RTCP_SDES_EMAIL, argv[2]);
353                        return (TCL_OK);
354                }
355                if (strcmp(argv[1], "random-srcid") == 0) {
356                        Address * addrp;
357                        if ((addrp = (dh_[0].net())->alloc(argv[2])) !=0 ) { //SV-XXX: placed ()'s against truth check == NULL
358                          sprintf(cp, "%u", alloc_srcid(*addrp));
359                          delete addrp;
360                        }
361                        tcl.result(cp);
362                        return (TCL_OK);
363                }
364                if (strcmp(argv[1], "data-net") == 0) {
365                        dh_[0].net((Network*)TclObject::lookup(argv[2]));
366                        return (TCL_OK);
367                }
368                if (strcmp(argv[1], "ctrl-net") == 0) {
369                        ch_[0].net((Network*)TclObject::lookup(argv[2]));
370                        return (TCL_OK);
371                }
372                if (strcmp(argv[1], "max-bandwidth") == 0) {
373                        double bw = atof(argv[2]) / 8.;
374                        rtcp_inv_bw_ = 1. / (bw * RTCP_SESSION_BW_FRACTION);
375                        return (TCL_OK);
376                }
377                if (strcmp(argv[1], "confid") == 0) {
378                        confid_ = atoi(argv[2]);
379                        return (TCL_OK);
380                }
381                if (strcmp(argv[1], "data-bandwidth") == 0) {
382                        /*XXX assume value in range */
383                        bps(atoi(argv[2]));
384                        return (TCL_OK);
385                }
386                if (strcmp(argv[1], "mtu") == 0) {
387                        mtu_ = atoi(argv[2]);
388                        return (TCL_OK);
389                }
390                if (strcmp(argv[1], "loopback") == 0) {
391                        loopback_ = atoi(argv[2]);
392                        return (TCL_OK);
393                }
394                if (strcmp(argv[1], "lip-sync") == 0) {
395                        lipSyncEnabled_ = atoi(argv[2]);
396                        return (TCL_OK);
397                }
398
399        }  else if (argc == 4) {
400                if (strcmp(argv[1], "data-net") == 0) {
401                        u_int layer = atoi(argv[3]);
402                        if (layer >= NLAYER)
403                                abort();
404                        if (*argv[2] == 0) {
405                                dh_[layer].net(0);
406                                return (TCL_OK);
407                        }
408                        Network* net = (Network*)TclObject::lookup(argv[2]);
409                        if (net == 0) {
410                                tcl.resultf("no network %s", argv[2]);
411                                return (TCL_ERROR);
412                        }
413                        if (net->rchannel() < 0) {
414                                tcl.resultf("network not open");
415                                return (TCL_ERROR);
416                        }
417                        dh_[layer].net(net);
418                        return (TCL_OK);
419                }
420                if (strcmp(argv[1], "ctrl-net") == 0) {
421                        u_int layer = atoi(argv[3]);
422                        if (layer >= NLAYER)
423                                abort();
424                        if (*argv[2] == 0) {
425                                ch_[layer].net(0);
426                                return (TCL_OK);
427                        }
428                        Network* net = (Network*)TclObject::lookup(argv[2]);
429                        if (net == 0) {
430                                tcl.resultf("no network %s", argv[2]);
431                                return (TCL_ERROR);
432                        }
433                        if (net->rchannel() < 0) {
434                                tcl.resultf("network not open");
435                                return (TCL_ERROR);
436                        }
437                        ch_[layer].net(net);
438                        return (TCL_OK);
439                }
440                return (Transmitter::command(argc, argv));
441        }
442        //should not be reached
443        return (TCL_ERROR);
444}
445
446void SessionManager::transmit(pktbuf* pb)
447{
448        //mh_.msg_iov = pb->iov;
449        //      dh_[.net()->send(mh_);
450                //debug_msg("L %d,",pb->layer);
451        // Using loop_layer for now to restrict transmission as well
452        if (pb->layer < loop_layer_) {
453        //      if ( pb->layer <0 ) exit(1);
454                Network* n = dh_[pb->layer].net();
455                if (n != 0)
456                        n->send(pb);
457        }
458}
459
460u_char* SessionManager::build_sdes_item(u_char* p, int code, Source& s)
461{
462        const char* value = s.sdes(code);
463        if (value != 0) {
464                int len = strlen(value);
465                *p++ = code;
466                *p++ = len;
467                memcpy(p, value, len);
468                p += len;
469        }
470        return (p);
471}
472
473int SessionManager::build_sdes(rtcphdr* rh, Source& ls)
474{
475        int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_SDES;
476        rh->rh_flags = htons(flags);
477        rh->rh_ssrc = ls.srcid();
478        u_char* p = (u_char*)(rh + 1);
479        p = build_sdes_item(p, RTCP_SDES_CNAME, ls);
480
481        /*
482         * We always send a cname plus one other sdes
483         * There's a schedule for what we send sequenced by sdes_seq_:
484         *   - send 'email' every 0th & 4th packet
485         *   - send 'note' every 2nd packet
486         *   - send 'tool' every 6th packet
487         *   - send 'name' in all the odd slots
488         * (if 'note' is not the empty string, we switch the roles
489         *  of name & note)
490         */
491        int nameslot, noteslot;
492        const char* note = ls.sdes(RTCP_SDES_NOTE);
493        if (note) {
494                if (*note) {
495                        nameslot = RTCP_SDES_NOTE;
496                        noteslot = RTCP_SDES_NAME;
497                } else {
498                        nameslot = RTCP_SDES_NAME;
499                        noteslot = RTCP_SDES_NOTE;
500                }
501        } else {
502                nameslot = RTCP_SDES_NAME;
503                noteslot = RTCP_SDES_NAME;
504        }
505        u_int seq = (++sdes_seq_) & 0x7;
506        switch (seq) {
507
508        case 0case 4:
509                p = build_sdes_item(p, RTCP_SDES_EMAIL, ls);
510                break;
511
512        case 2:
513                p = build_sdes_item(p, noteslot, ls);
514                break;
515        case 6:
516                p = build_sdes_item(p, RTCP_SDES_TOOL, ls);
517                break;
518        default:
519                p = build_sdes_item(p, nameslot, ls);
520        }
521        int len = p - (u_char*)rh;
522        int pad = 4 - (len & 3);
523        len += pad;
524        rh->rh_len = htons((len >> 2) - 1);
525        while (--pad >= 0)
526                *p++ = 0;
527
528        return (len);
529}
530
531int SessionManager::build_app(rtcphdr* rh, Source& ls, const char *name,
532                void *data, int datalen)
533{
534  int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_APP;
535  rh->rh_flags = htons(flags);
536  rh->rh_ssrc = ls.srcid();
537  u_char* p = (u_char*)(rh + 1);
538    int len;
539   
540    // write the name field
541    len = strlen(name);
542    if( len < 4 ) {
543        memcpy(p,name,len);
544        p += len;
545       
546        // pad short names
547        while( p - (u_char*)(rh+1) < 4 )
548            *p++ = 0;
549    }
550    else {
551        // use first four chars of given name
552        memcpy(p,name,4);
553        p += 4;
554    }
555   
556    // write the app data
557    memcpy(p,data,datalen);
558    p += datalen;
559   
560    // pad as needed
561    len = p - (u_char*)rh;
562    int pad = 4 - (len & 3);
563    while( --pad >= 0 )
564        *p++ = 0;
565  len = p - (u_char*)rh;
566
567    // set the length now that it's known
568  rh->rh_len = htons((len >> 2) - 1);
569
570  return (len);
571}
572
573/*void SessionManager::send_report()
574{
575        send_report(0);
576}
577
578  void SessionManager::send_bye()
579  {
580  send_report(1);
581}*/
582
583// SessionManager is no longer used as Timer - Each
584// CtrlHandler has its own Timer which calls this;
585void SessionManager::announce(CtrlHandler* ch)
586{
587        send_report(ch, 0);
588}
589
590/*XXX check for buffer overflow*/
591/*
592* Send an RTPv2 report packet.
593*/
594//void SessionManager::send_report(int bye)
595
596void SessionManager::send_report(CtrlHandler* ch, int bye, int app)
597{
598        UNUSED(app); //SV-XXX: unused
599
600        SourceManager& sm = SourceManager::instance();
601        Source& s = *sm.localsrc();
602        rtcphdr* rh = (rtcphdr*)pktbuf_;
603        rh->rh_ssrc = s.srcid();
604        int flags = RTP_VERSION << 14;
605        int layer = ch - ch_; //LLL
606        Source::Layer& sl = s.layer(layer);
607        timeval now = unixtime();
608        sl.lts_ctrl(now);
609        int we_sent = 0;
610        rtcp_rr* rr;
611        Tcl& tcl = Tcl::instance();
612
613        /*
614         * If we've sent data since our last sender report send a
615         * new report.  The MediaTimer check is to make sure we still
616         * have a grabber -- if not, we won't be able to interpret the
617         * media timestamps so there's no point in sending an SR.
618         */
619        MediaTimer* mt = MediaTimer::instance();
620        if (sl.np() != last_np_ && mt) {
621                last_np_ = sl.np();
622                we_sent = 1;
623                flags |= RTCP_PT_SR;
624                rtcp_sr* sr = (rtcp_sr*)(rh + 1);
625                sr->sr_ntp = ntp64time(now);
626                HTONL(sr->sr_ntp.upper);
627                HTONL(sr->sr_ntp.lower);
628                sr->sr_ts = htonl(mt->ref_ts());
629                sr->sr_np = htonl(sl.np());
630                sr->sr_nb = htonl(sl.nb());
631                rr = (rtcp_rr*)(sr + 1);
632        } else {
633                flags |= RTCP_PT_RR;
634                rr = (rtcp_rr*)(rh + 1);
635        }
636        int nrr = 0;
637        int nsrc = 0;
638        /*
639        * we don't want to inflate report interval if user has set
640        * the flag that causes all sources to be 'kept' so we
641        * consider sources 'inactive' if we haven't heard a control
642        * msg from them for ~32 reporting intervals.
643        */
644        //LLL   u_int inactive = u_int(rint_ * (32./1000.));
645        u_int inactive = u_int(ch->rint() * (32./1000.));
646        if (inactive < 2)
647                inactive = 2;
648        for (Source* sp = sm.sources(); sp != 0; sp = sp->next_) {
649                ++nsrc;
650                Source::Layer& sl = sp->layer(layer);
651                //              int received = sp->np() - sp->snp();
652                int received = sl.np() - sl.snp();
653                if (received == 0) {
654                        //                      if (u_int(now.tv_sec - sp->lts_ctrl().tv_sec) > inactive)
655                        if (u_int(now.tv_sec - sl.lts_ctrl().tv_sec) > inactive)
656                                --nsrc;
657                        continue;
658                }
659                //              sp->snp(sp->np());
660                sl.snp(sl.np());
661                rr->rr_srcid = sp->srcid();
662                //              int expected = sp->ns() - sp->sns();
663                int expected = sl.ns() - sl.sns();
664                //              sp->sns(sp->ns());
665                sl.sns(sl.ns());
666                u_int32_t v;
667                int lost = expected - received;
668                if (lost <= 0)
669                        v = 0;
670                else
671                        /* expected != 0 if lost > 0 */
672                        v = ((lost << 8) / expected) << 24;
673                /* XXX should saturate on over/underflow */
674                //              v |= (sp->ns() - sp->np()) & 0xffffff;
675                v |= (sl.ns() - sl.np()) & 0xffffff;
676                rr->rr_loss = htonl(v);
677                //              rr->rr_ehsr = htonl(sp->ehs());
678                rr->rr_ehsr = htonl(sl.ehs());
679                rr->rr_dv = (sp->handler() != 0) ? sp->handler()->delvar() : 0;
680                //              rr->rr_lsr = htonl(sp->sts_ctrl());
681                rr->rr_lsr = htonl(sl.sts_ctrl());
682                //              if (sp->lts_ctrl().tv_sec == 0)
683                if (sl.lts_ctrl().tv_sec == 0)
684                        rr->rr_dlsr = 0;
685                else {
686                        u_int32_t ntp_now = ntptime(now);
687                        //                      u_int32_t ntp_then = ntptime(sp->lts_ctrl());
688                        u_int32_t ntp_then = ntptime(sl.lts_ctrl());
689                        rr->rr_dlsr = htonl(ntp_now - ntp_then);
690                }
691                ++rr;
692                if (++nrr >= 31)
693                        break;
694        }
695        flags |= nrr << 8;
696        rh->rh_flags = htons(flags);
697        int len = (u_char*)rr - pktbuf_;
698        rh->rh_len = htons((len >> 2) - 1);
699
700        if (bye)
701                len += build_bye((rtcphdr*)rr, s);
702        else
703                len += build_sdes((rtcphdr*)rr, s);
704       
705        // build "site" app data if specified
706        const char *data = tcl.attr("site");
707        if(data)
708        {
709            rr = (rtcp_rr*)(((u_char*)rh) + len);
710            len += build_app((rtcphdr*)rr, s, "site", (void *)data, strlen(data));
711        }
712        //LLL   ch_.send(pktbuf_, len);
713        ch->send(pktbuf_, len);
714       
715        /*
716      rtcp_avg_size_ += RTCP_SIZE_GAIN * (double(len + 28) - rtcp_avg_size_);
717         
718          // compute the time to the next report.  we do this here
719          // because we need to know if there were any active sources
720          // during the last report period (nrr above) & if we were
721          // a source.  The bandwidth limit for rtcp traffic was set
722          // on startup from the session bandwidth.  It is the inverse
723          // of bandwidth (ie., ms/byte) to avoid a divide below.
724       
725        //      double ibw = rtcp_inv_bw_;
726        if (nrr) {
727        // there were active sources
728        //              if (we_sent) {
729        ibw *= 1./RTCP_SENDER_BW_FRACTION;
730        nsrc = nrr;
731        } else {
732        ibw *= 1./RTCP_RECEIVER_BW_FRACTION;
733        nsrc -= nrr;
734        }
735        }
736        double rint = rtcp_avg_size_ * double(nsrc) * ibw;
737        if (rint < RTCP_MIN_RPT_TIME * 1000.)
738                rint = RTCP_MIN_RPT_TIME * 1000.;
739        rint_ = rint;
740        rt_.msched(int(fmod(double(random()), rint) + rint * .5 + .5));
741        */
742       
743        // Call timer adaption for each layer
744        ch->adapt(nsrc, nrr, we_sent);
745        ch->sample_size(len);
746       
747        //      sm.CheckActiveSources(rint);
748        if (layer == 0)
749                sm.CheckActiveSources(ch->rint());
750       
751}
752
753int SessionManager::build_bye(rtcphdr* rh, Source& ls)
754{
755        int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_BYE;
756        rh->rh_flags = ntohs(flags);
757        rh->rh_len = htons(1);
758        rh->rh_ssrc = ls.srcid();
759        return (8);
760}
761
762void SessionManager::recv(DataHandler* dh)
763{
764        int layer = dh - dh_;
765        pktbuf* pb = pool_->alloc(layer);
766        Address * addrp;
767        /* leave room in case we need to expand rtpv1 into an rtpv2 header */
768        /* XXX the free mem routine didn't like it ... */
769        //u_char* bp = &pktbuf_[4];
770        //u_char* bp = pktbuf_;
771       
772        int cc = dh->recv(pb->data, sizeof(pb->data), addrp);
773        //int cc = dh->recv(bp, 2 * RTP_MTU - 4, addrp);
774        if (cc <= 0) {
775                pb->release();
776                return;
777        }
778
779    // Ignore loopback packets
780        if (!loopback_) {
781                rtphdr* rh = (rtphdr*)pb->data;
782                SourceManager& sm = SourceManager::instance();
783                if (rh->rh_ssrc == (*sm.localsrc()).srcid()) {
784                        pb->release();
785                        return;
786                }
787        }
788
789        //rtphdr* rh = (rtphdr*)pb->data;
790        int version = pb->data[0] >> 6;
791        //int version = *(u_char*)rh >> 6;
792        if (version != 2) {
793                ++badversion_;
794                pb->release();
795                return;
796        }
797        if (cc < (int)sizeof(rtphdr)) {
798                ++nrunt_;
799                pb->release();
800                return;
801        }
802        pb->len = cc;
803       
804        //bp += sizeof(*rh);
805        //cc -= sizeof(*rh);
806        demux(pb, *addrp);
807}
808
809void SessionManager::demux(pktbuf* pb, Address & addr)
810{
811        rtphdr* rh = (rtphdr*)pb->data;
812        u_int32_t srcid = rh->rh_ssrc;
813        int flags = ntohs(rh->rh_flags);
814        // for LIP SYNC
815        //SV-XXX: unused: u_char *pkt = pb->data - sizeof(*rh);
816
817        if ((flags & RTP_X) != 0) {
818        /*
819        * the minimal-control audio/video profile
820        * explicitly forbids extensions
821                */
822                ++badext_;
823                pb->release();
824                return;
825        }
826
827        /*
828         * Check for illegal payload types.  Most likely this is
829         * a session packet arriving on the data port.
830         */
831        int fmt = flags & 0x7f;
832        if (!check_format(fmt)) {
833                ++badfmt_;
834                pb->release();
835                return;
836        }
837
838        SourceManager& sm = SourceManager::instance();
839        u_int16_t seqno = ntohs(rh->rh_seqno);
840        Source* s = sm.demux(srcid, addr, seqno, pb->layer);
841        if (s == 0) {
842        /*
843        * Takes a pair of validated packets before we will
844        * believe the source.  This prevents a runaway
845        * allocation of Source data structures for a
846        * stream of garbage packets.
847                */
848                pb->release();
849                return;
850        }
851        /* inform this source of the mbus */
852        s->mbus(&mb_);
853       
854        Source::Layer& sl = s->layer(pb->layer);
855        timeval now = unixtime();
856        //      s->lts_data(now);
857        sl.lts_data(now);
858        //      s->sts_data(rh->rh_ts);
859        sl.sts_data(rh->rh_ts);
860       
861        // extract CSRC count (CC field); increment pb->dp data pointer & adjust length accordingly
862        int cnt = (flags >> 8) & 0xf;
863        if (cnt > 0) {
864                //u_char* nh = (u_char*)rh + (cnt << 2);
865                rtphdr hdr = *rh;
866                pb->dp += (cnt << 2);
867                pb->len -= (cnt << 2);
868                u_int32_t* bp = (u_int32_t*)(rh + 1);
869                while (--cnt >= 0) {
870                        u_int32_t csrc = *(u_int32_t*)bp;
871                        bp += 4;
872                        Source* cs = sm.lookup(csrc, srcid, addr);
873                        //                      cs->lts_data(now);
874                        cs->layer(pb->layer).lts_data(now);
875                        cs->action();
876                }
877                //              rtphdr hdr = *rh;
878                //              rh = (rtphdr*)nh;
879                /*XXX move header up so it's contiguous with data*/
880                rh = (rtphdr*)pb->dp;
881                *rh = hdr;
882        } else
883                s->action();
884
885        if (s->sync() && lipSyncEnabled()) { 
886                /*
887                 * Synchronisation is enabled on this source;
888                 * video packets have to
889                 * be buffered, their playout point scheduled, and the
890                 * playout delays communicated with the audio tool ...
891                 */ 
892
893                /*
894                * XXX bit rate doesn't include rtpv1 options;
895                * but v1 is going away anyway.
896                */
897                //              int dup = s->cs(seqno);
898                //              s->np(1);
899                //              s->nb(cc + sizeof(*rh));
900                int dup = sl.cs(seqno, s);
901                sl.np(1);
902                sl.nb(pb->len);
903                if (dup) {
904                        pb->release();
905                        return;
906                }
907                if (flags & RTP_M) // check if reach frame boundaries
908                        //                      s->nf(1);
909                        sl.nf(1);
910               
911                //s->recv(pkt, rh, pb, pb->len); // this should invoke Source::recv and buffer
912                //s->recv(bp); // this should invoke Source::recv and buffer
913               
914                pktbuf_ = (u_char*)new_blk();
915        } /* synced */
916
917        else { /* ... playout video packets as they arrive */
918                /*
919                 * This is a data packet.  If the source needs activation,
920                 * or the packet format has changed, deal with this.
921                 * Then, hand the packet off to the packet handler.
922                 * XXX might want to be careful about flip-flopping
923                 * here when format changes due to misordered packets
924                 * (easy solution -- keep rtp seqno of last fmt change).
925                 */
926                PacketHandler* h = s->handler();
927                if (h == 0)
928                        h = s->activate(fmt);
929                else if (s->format() != fmt)
930                        h = s->change_format(fmt);
931               
932                        /*
933                        * XXX bit rate doesn't include rtpv1 options;
934                        * but v1 is going away anyway.
935                */
936                //              int dup = s->cs(seqno);
937                //              s->np(1);
938                //              s->nb(cc + sizeof(*rh));
939                int dup = sl.cs(seqno, s);
940                sl.np(1);
941                sl.nb(pb->len);
942                if (dup){
943                        pb->release();
944                        return;
945                }
946                if (flags & RTP_M)
947                        //                  s->nf(1);
948                        sl.nf(1);
949#ifdef notdef
950        /* This should move to the handler */
951        /*XXX could get rid of hdrlen and move run check into recv method*/
952               
953                int hlen = h->hdrlen();
954                cc -= hlen;
955                if (cc < 0) {
956                        //                  s->runt(1);
957                        sl.runt(1);
958                        pb->release();
959                        return;
960                }
961#endif
962                if (s->mute()) {
963                        pb->release();
964                        return;
965                }
966                //h->recv(rh, bp + hlen, cc);
967                h->recv(pb);
968        } /* not sync-ed */
969
970}
971
972void SessionManager::parse_rr_records(u_int32_t, rtcp_rr*, int,
973                                      const u_char*, Address &)
974{
975}
976                                     
977
978void SessionManager::parse_sr(rtcphdr* rh, int flags, u_char*ep,
979                                                          Source* ps, Address & addr, int layer)
980{
981        rtcp_sr* sr = (rtcp_sr*)(rh + 1);
982        Source* s;
983        u_int32_t ssrc = rh->rh_ssrc;
984        if (ps->srcid() != ssrc)
985                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
986        else
987                s = ps;
988       
989        Source::Layer& sl = s->layer(layer);
990       
991        timeval now = unixtime();
992
993        sl.lts_ctrl(now);
994        sl.sts_ctrl(ntohl(sr->sr_ntp.upper) << 16 |
995                ntohl(sr->sr_ntp.lower) >> 16);
996       
997        //int cnt = flags >> 8 & 0x1f;
998        //parse_rr_records(ssrc, (rtcp_rr*)(sr + 1), cnt, ep, addr);
999       
1000        /*s->lts_ctrl(now);
1001        s->sts_ctrl(ntohl(sr->sr_ntp.upper) << 16 |
1002        ntohl(sr->sr_ntp.lower) >> 16);*/
1003       
1004        s->rtp_ctrl(ntohl(sr->sr_ts));
1005        u_int32_t t = ntptime(now);
1006        s->map_ntp_time(t);
1007        s->map_rtp_time(s->convert_time(t));
1008        s->rtp2ntp(1);
1009        //printf("Got SR\n");
1010
1011        int cnt = flags >> 8 & 0x1f;
1012        parse_rr_records(ssrc, (rtcp_rr*)(sr + 1), cnt, ep, addr);
1013}
1014
1015void SessionManager::parse_rr(rtcphdr* rh, int flags, u_char* ep,
1016                                                          Source* ps, Address & addr, int layer)
1017{
1018        Source* s;
1019        u_int32_t ssrc = rh->rh_ssrc;
1020        if (ps->srcid() != ssrc)
1021                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
1022        else
1023                s = ps;
1024       
1025        s->layer(layer).lts_ctrl(unixtime());
1026        int cnt = flags >> 8 & 0x1f;
1027        parse_rr_records(ssrc, (rtcp_rr*)(rh + 1), cnt, ep, addr);
1028}
1029
1030int SessionManager::sdesbody(u_int32_t* p, u_char* ep, Source* ps,
1031                                                Address & addr, u_int32_t ssrc, int layer)
1032{
1033        Source* s;
1034        u_int32_t srcid = *p;
1035        if (ps->srcid() != srcid)
1036                s = SourceManager::instance().lookup(srcid, ssrc, addr);
1037        else
1038                s = ps;
1039        if (s == 0)
1040                return (0);
1041                /*
1042                * Note ctrl packet since we will never see any direct ctrl packets
1043                * from a source through a mixer (and we don't want the source to
1044                * time out).
1045        */
1046        s->layer(layer).lts_ctrl(unixtime());
1047       
1048        u_char* cp = (u_char*)(p + 1);
1049        while (cp < ep) {
1050                char buf[256];
1051
1052                u_int type = cp[0];
1053                if (type == 0) {
1054                        /* end of chunk */
1055                        return (((cp - (u_char*)p) >> 2) + 1);
1056                }
1057                u_int len = cp[1];
1058                u_char* eopt = cp + len + 2;
1059                if (eopt > ep)
1060                        return (0);
1061
1062                if (type >= RTCP_SDES_MIN && type <= RTCP_SDES_MAX) {
1063                        memcpy(buf, (char*)&cp[2], len);
1064                        buf[len] = 0;
1065                        s->sdes(type, buf);
1066                } else
1067                        /*XXX*/;
1068
1069                cp = eopt;
1070        }
1071        return (0);
1072}
1073
1074void SessionManager::parse_sdes(rtcphdr* rh, int flags, u_char* ep, Source* ps,
1075                                                                Address & addr, u_int32_t ssrc, int layer)
1076{
1077        int cnt = flags >> 8 & 0x1f;
1078        u_int32_t* p = (u_int32_t*)&rh->rh_ssrc;
1079        while (--cnt >= 0 && (u_char*)p < ep) {
1080                int n = sdesbody(p, ep, ps, addr, ssrc, layer);
1081                if (n == 0)
1082                        break;
1083                p += n;
1084        }
1085        if (cnt >= 0)
1086                ps->badsdes(1);
1087}
1088
1089void SessionManager::parse_bye(rtcphdr* rh, int flags, u_char* ep, Source* ps)
1090{
1091        int cnt = flags >> 8 & 0x1f;
1092        u_int32_t* p = (u_int32_t*)&rh->rh_ssrc;
1093
1094        while (--cnt >= 0) {
1095                if (p >= (u_int32_t*)ep) {
1096                        ps->badbye(1);
1097                        return;
1098                }
1099                Source* s;
1100                if (ps->srcid() != rh->rh_ssrc)
1101                        s = SourceManager::instance().consult(*p);
1102                else
1103                        s = ps;
1104                if (s != 0)
1105                        s->lts_done(unixtime());
1106                ++p;
1107        }
1108}
1109
1110/*
1111 * Receive an rtcp packet (from the control port).
1112 */
1113void SessionManager::recv(CtrlHandler* ch)
1114{
1115        Address * srcp;
1116        int cc = ch->recv(pktbuf_, 2 * RTP_MTU, srcp);
1117        if (cc <= 0)
1118                return;
1119
1120        rtcphdr* rh = (rtcphdr*)pktbuf_;
1121
1122    // Ignore loopback packets
1123        if (!loopback_) {
1124                SourceManager& sm = SourceManager::instance();
1125                if (rh->rh_ssrc == (*sm.localsrc()).srcid())
1126                        return;
1127        }
1128
1129        if (cc < int(sizeof(*rh))) {
1130                ++nrunt_;
1131                return;
1132        }
1133        /*
1134         * try to filter out junk: first thing in packet must be
1135         * sr, rr or bye & version number must be correct.
1136         */
1137        switch(ntohs(rh->rh_flags) & 0xc0ff) {
1138        case RTP_VERSION << 14 | RTCP_PT_SR:
1139        case RTP_VERSION << 14 | RTCP_PT_RR:
1140        case RTP_VERSION << 14 | RTCP_PT_BYE:
1141                break;
1142        default:
1143                /*
1144                 * XXX should further categorize this error -- it is
1145                 * likely that people mis-implement applications that
1146                 * don't put something other than SR,RR,BYE first.
1147                 */
1148                ++badversion_;
1149                return;
1150        }
1151        /*
1152         * at this point we think the packet's valid.  Update our average
1153         * size estimator.  Also, there's valid ssrc so charge errors to it
1154         */
1155        rtcp_avg_size_ += RTCP_SIZE_GAIN * (double(cc + 28) - rtcp_avg_size_);
1156        Address & addr = *srcp;
1157
1158        /*
1159         * First record in compount packet must be the ssrc of the
1160         * sender of the packet.  Pull it out here so we can use
1161         * it in the sdes parsing, since the sdes record doesn't
1162         * contain the ssrc of the sender (in the case of mixers).
1163         */
1164        u_int32_t ssrc = rh->rh_ssrc;
1165        Source* ps = SourceManager::instance().lookup(ssrc, ssrc, addr);
1166        if (ps == 0)
1167                return;
1168       
1169        int layer = ch - ch_;
1170                /*
1171                * Outer loop parses multiple RTCP records of a "compound packet".
1172                * There is no framing between records.  Boundaries are implicit
1173                * and the overall length comes from UDP.
1174        */
1175        u_char* epack = (u_char*)rh + cc;
1176        while ((u_char*)rh < epack) {
1177                u_int len = (ntohs(rh->rh_len) << 2) + 4;
1178                u_char* ep = (u_char*)rh + len;
1179                if (ep > epack) {
1180                        ps->badsesslen(1);
1181                        return;
1182                }
1183                u_int flags = ntohs(rh->rh_flags);
1184                if (flags >> 14 != RTP_VERSION) {
1185                        ps->badsessver(1);
1186                        return;
1187                }
1188                switch (flags & 0xff) {
1189
1190                case RTCP_PT_SR:
1191                        parse_sr(rh, flags, ep, ps, addr, layer);
1192                        break;
1193
1194                case RTCP_PT_RR:
1195                        parse_rr(rh, flags, ep, ps, addr, layer);
1196                        break;
1197
1198                case RTCP_PT_SDES:
1199                        parse_sdes(rh, flags, ep, ps, addr, ssrc, layer);
1200                        break;
1201
1202                case RTCP_PT_BYE:
1203                        parse_bye(rh, flags, ep, ps);
1204                        break;
1205
1206                default:
1207                        ps->badsessopt(1);
1208                        break;
1209                }
1210                rh = (rtcphdr*)ep;
1211        }
1212}
Note: See TracBrowser for help on using the browser.