root/vic/branches/mpeg4/rtp/session.cpp @ 4130

Revision 4130, 29.3 KB (checked in by douglask, 6 years ago)

Silence mismatched delete / delete [] messages from valgrind

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*-
2 * Copyright (c) 1993-1994 The Regents of the University of California.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 * 3. All advertising materials mentioning features or use of this software
14 *    must display the following acknowledgement:
15 *      This product includes software developed by the University of
16 *      California, Berkeley and the Network Research Group at
17 *      Lawrence Berkeley Laboratory.
18 * 4. Neither the name of the University nor of the Laboratory may be used
19 *    to endorse or promote products derived from this software without
20 *    specific prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
23 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
26 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
33 */
34static const char rcsid[] =
35    "@(#) $Header$ (LBL)";
36
37#include "config.h"
38#include <math.h>
39#include <errno.h>
40#include <string.h>
41#ifdef WIN32
42extern "C" int getpid();
43#endif
44#include "source.h"
45#include "vic_tcl.h"
46#include "media-timer.h"
47#include "crypt.h"
48#include "timer.h"
49#include "ntp-time.h"
50#include "session.h"
51
52/* added to support the mbus
53#include "mbus_handler.h"*/
54
55
56static class SessionMatcher : public Matcher {
57    public:
58                SessionMatcher() : Matcher("session") {}
59                TclObject* match(const char* id) {
60                        if (strcmp(id, "audio/rtp") == 0)
61                                return (new AudioSessionManager);
62                        else if (strcmp(id, "video/rtp") == 0)
63                                return (new VideoSessionManager);
64                        return (0);
65                }
66} session_matcher;
67
68int VideoSessionManager::check_format(int fmt) const
69{
70        switch(fmt) {
71                case RTP_PT_RAW:
72                case RTP_PT_CELLB:
73                case RTP_PT_JPEG:
74                case RTP_PT_CUSEEME:
75                case RTP_PT_NV:
76                case RTP_PT_CPV:
77                case RTP_PT_H261:
78                case RTP_PT_BVC:
79                case RTP_PT_H261_COMPAT:/*XXX*/
80                case RTP_PT_H263:
81                case RTP_PT_MPEG4:
82                case RTP_PT_H264:
83                case RTP_PT_H263P:
84                case RTP_PT_LDCT:
85                case RTP_PT_PVH:
86                case RTP_PT_DV:
87#ifdef USE_H261AS
88                case RTP_PT_H261AS:
89#endif 
90                return (1);
91        }
92        return (0);
93}
94
95int AudioSessionManager::check_format(int fmt) const
96{
97        switch (fmt) {
98        case RTP_PT_PCMU:
99        case RTP_PT_CELP:
100        case RTP_PT_GSM:
101        case RTP_PT_DVI:
102        case RTP_PT_LPC:
103                return (1);
104        }
105        return (0);
106}
107
108
109static SessionManager* manager;
110
111void
112adios()
113{
114        if (SourceManager::instance().localsrc() != 0)
115                manager->send_bye();
116        exit(0);
117}
118
119/*void ReportTimer::timeout()
120{
121sm_.send_report();
122}*/
123
124void DataHandler::dispatch(int)
125{
126        sm_->recv(this);
127}
128
129void CtrlHandler::dispatch(int)
130{
131        sm_->recv(this);
132}
133
134CtrlHandler::CtrlHandler()
135: ctrl_inv_bw_(0.),
136ctrl_avg_size_(128.),
137rint_(0.0) //SV-XXX: Debian
138{
139}
140
141inline void CtrlHandler::schedule_timer()
142{
143        msched(int(fmod(double(random()), rint_) + rint_ * .5 + .5));
144}
145
146void CtrlHandler::net(Network* n)
147{
148        DataHandler::net(n);
149        cancel();
150        if (n != 0) {
151        /*
152        * schedule a timer for our first report using half the
153        * min ctrl interval.  This gives us some time before
154        * our first report to learn about other sources so our
155        * next report interval will account for them.  The avg
156        * ctrl size was initialized to 128 bytes which is
157        * conservative (it assumes everyone else is generating
158        * SRs instead of RRs).
159                */
160                double rint = ctrl_avg_size_ * ctrl_inv_bw_;
161                if (rint < CTRL_MIN_RPT_TIME / 2. * 1000.)
162                        rint = CTRL_MIN_RPT_TIME / 2. * 1000.;
163                rint_ = rint;
164                schedule_timer();
165        }
166}
167
168void CtrlHandler::sample_size(int cc)
169{
170        ctrl_avg_size_ += CTRL_SIZE_GAIN * (double(cc + 28) - ctrl_avg_size_);
171}
172
173void CtrlHandler::adapt(int nsrc, int nrr, int we_sent)
174{
175/*
176* compute the time to the next report.  we do this here
177* because we need to know if there were any active sources
178* during the last report period (nrr above) & if we were
179* a source.  The bandwidth limit for ctrl traffic was set
180* on startup from the session bandwidth.  It is the inverse
181* of bandwidth (ie., ms/byte) to avoid a divide below.
182        */
183        double ibw = ctrl_inv_bw_;
184        if (nrr) {
185                /* there were active sources */
186                if (we_sent) {
187                        ibw *= 1./CTRL_SENDER_BW_FRACTION;
188                        nsrc = nrr;
189                } else {
190                        ibw *= 1./CTRL_RECEIVER_BW_FRACTION;
191                        nsrc -= nrr;
192                }
193        }
194        double rint = ctrl_avg_size_ * double(nsrc) * ibw;
195        if (rint < CTRL_MIN_RPT_TIME * 1000.)
196                rint = CTRL_MIN_RPT_TIME * 1000.;
197        rint_ = rint;
198}
199
200void CtrlHandler::timeout()
201{
202        sm_->announce(this);
203        schedule_timer();
204}
205
206//SV-XXX: rearranged initialization order to shut up gcc4
207SessionManager::SessionManager()
208//      : dh_(*this), ch_(*this), rt_(*this),
209: mb_(mbus_handler_engine, NULL),
210lipSyncEnabled_(0),
211badversion_(0),
212badoptions_(0),
213badfmt_(0),
214badext_(0),
215nrunt_(0),
216last_np_(0),
217sdes_seq_(0),
218rtcp_inv_bw_(0.),
219rtcp_avg_size_(128.),
220confid_(-1)
221{
222        /*XXX For adios() to send bye*/
223        manager = this;
224       
225        for (int i = 0; i < NLAYER; ++i) {
226                dh_[i].manager(this);
227                ch_[i].manager(this);
228        }
229       
230        /*XXX*/
231        pktbuf_ = new u_char[2 * RTP_MTU];
232        pool_ = new BufferPool;
233       
234        /*
235        * schedule a timer for our first report using half the
236        * min rtcp interval.  This gives us some time before
237        * our first report to learn about other sources so our
238        * next report interval will account for them.  The avg
239        * rtcp size was initialized to 128 bytes which is
240        * conservative (it assumes everyone else is generating
241        * SRs instead of RRs).
242        */
243        double rint = rtcp_avg_size_ * rtcp_inv_bw_;
244        if (rint < RTCP_MIN_RPT_TIME / 2. * 1000.)
245                rint = RTCP_MIN_RPT_TIME / 2. * 1000.;
246        rint_ = rint;
247        //rt_.msched(int(fmod(double(random()), rint) + rint * .5 + .5));
248}
249
250SessionManager::~SessionManager()
251{
252        if (pktbuf_)
253                delete[] pktbuf_;
254       
255        delete pool_;
256}
257
258u_int32_t SessionManager::alloc_srcid(Address & addr) const
259{
260        timeval tv;
261        ::gettimeofday(&tv, 0);
262        u_int32_t srcid = u_int32_t(tv.tv_sec + tv.tv_usec);
263        srcid += (u_int32_t)getuid();
264        srcid += (u_int32_t)getpid();
265/* __IPv6 changed srcid computation */
266        for(unsigned int i = 0; i < (addr.length() % sizeof(u_int32_t)); i++) {
267                srcid += ((u_int32_t*)((const void*)addr))[i];
268        }
269        return (srcid);
270}
271
272extern char* onestat(char* cp, const char* name, u_long v);
273
274char* SessionManager::stats(char* cp) const
275{
276        cp = onestat(cp, "Bad-RTP-version", badversion_);
277        cp = onestat(cp, "Bad-RTPv1-options", badoptions_);
278        cp = onestat(cp, "Bad-Payload-Format", badfmt_);
279        cp = onestat(cp, "Bad-RTP-Extension", badext_);
280        cp = onestat(cp, "Runts", nrunt_);
281        Crypt* p = dh_[0].net()->crypt();
282        if (p != 0) {
283                cp = onestat(cp, "Crypt-Bad-Length", p->badpktlen());
284                cp = onestat(cp, "Crypt-Bad-P-Bit", p->badpbit());
285        }
286        /*XXX*/
287        if (ch_[0].net() != 0) {
288                Crypt* p = ch_[0].net()->crypt();
289                if (p != 0) {
290                        cp = onestat(cp, "Crypt-Ctrl-Bad-Length", p->badpktlen());
291                        cp = onestat(cp, "Crypt-Ctrl-Bad-P-Bit", p->badpbit());
292                }
293        }
294        *--cp = 0;
295        return (cp);
296}
297
298int SessionManager::command(int argc, const char*const* argv)
299{
300        Tcl& tcl = Tcl::instance();
301        char* cp = tcl.buffer();
302        if (argc == 2) {
303                if (strcmp(argv[1], "active") == 0) {
304                        SourceManager::instance().sortactive(cp);
305                        tcl.result(cp);
306                        return (TCL_OK);
307                }
308                if (strcmp(argv[1], "local-addr-heuristic") == 0) {
309                        strcpy(cp, intoa(LookupLocalAddr()));
310                        tcl.result(cp);
311                        return (TCL_OK);
312                }
313                if (strcmp(argv[1], "stats") == 0) {
314                        stats(cp);
315                        tcl.result(cp);
316                        return (TCL_OK);
317                }
318                if (strcmp(argv[1], "nb") == 0) {
319                        sprintf(cp, "%u", 8 * nb_);
320                        tcl.result(cp);
321                        return (TCL_OK);
322                }
323                if (strcmp(argv[1], "nf") == 0) {
324                        sprintf(cp, "%u", nf_);
325                        tcl.result(cp);
326                        return (TCL_OK);
327                }
328                if (strcmp(argv[1], "np") == 0 ||
329                    strcmp(argv[1], "ns") == 0) {
330                        sprintf(cp, "%u", np_);
331                        tcl.result(cp);
332                        return (TCL_OK);
333                }
334                if (strcmp(argv[1], "lip-sync") == 0) {
335                        sprintf(cp, "%u", lipSyncEnabled_);
336                        tcl.result(cp);
337                        return (TCL_OK);
338                }
339
340        } else if (argc == 3) {
341                if (strcmp(argv[1], "sm") == 0) {
342                        sm_ = (SourceManager*)TclObject::lookup(argv[2]);
343                        return (TCL_OK);
344                }
345                if (strcmp(argv[1], "name") == 0) {
346                        Source* s = SourceManager::instance().localsrc();
347                        s->sdes(RTCP_SDES_NAME, argv[2]);
348                        return (TCL_OK);
349                }
350                if (strcmp(argv[1], "email") == 0) {
351                        Source* s = SourceManager::instance().localsrc();
352                        s->sdes(RTCP_SDES_EMAIL, argv[2]);
353                        return (TCL_OK);
354                }
355                if (strcmp(argv[1], "random-srcid") == 0) {
356                        Address * addrp;
357                        if ((addrp = (dh_[0].net())->alloc(argv[2])) !=0 ) { //SV-XXX: placed ()'s against truth check == NULL
358                          sprintf(cp, "%u", alloc_srcid(*addrp));
359                          delete addrp;
360                        }
361                        tcl.result(cp);
362                        return (TCL_OK);
363                }
364                if (strcmp(argv[1], "data-net") == 0) {
365                        dh_[0].net((Network*)TclObject::lookup(argv[2]));
366                        return (TCL_OK);
367                }
368                if (strcmp(argv[1], "ctrl-net") == 0) {
369                        ch_[0].net((Network*)TclObject::lookup(argv[2]));
370                        return (TCL_OK);
371                }
372                if (strcmp(argv[1], "max-bandwidth") == 0) {
373                        double bw = atof(argv[2]) / 8.;
374                        rtcp_inv_bw_ = 1. / (bw * RTCP_SESSION_BW_FRACTION);
375                        return (TCL_OK);
376                }
377                if (strcmp(argv[1], "confid") == 0) {
378                        confid_ = atoi(argv[2]);
379                        return (TCL_OK);
380                }
381                if (strcmp(argv[1], "data-bandwidth") == 0) {
382                        /*XXX assume value in range */
383                        bps(atoi(argv[2]));
384                        return (TCL_OK);
385                }
386                if (strcmp(argv[1], "mtu") == 0) {
387                        mtu_ = atoi(argv[2]);
388                        return (TCL_OK);
389                }
390                if (strcmp(argv[1], "loopback") == 0) {
391                        loopback_ = atoi(argv[2]);
392                        return (TCL_OK);
393                }
394                if (strcmp(argv[1], "lip-sync") == 0) {
395                        lipSyncEnabled_ = atoi(argv[2]);
396                        return (TCL_OK);
397                }
398
399        }  else if (argc == 4) {
400                if (strcmp(argv[1], "data-net") == 0) {
401                        u_int layer = atoi(argv[3]);
402                        if (layer >= NLAYER)
403                                abort();
404                        if (*argv[2] == 0) {
405                                dh_[layer].net(0);
406                                return (TCL_OK);
407                        }
408                        Network* net = (Network*)TclObject::lookup(argv[2]);
409                        if (net == 0) {
410                                tcl.resultf("no network %s", argv[2]);
411                                return (TCL_ERROR);
412                        }
413                        if (net->rchannel() < 0) {
414                                tcl.resultf("network not open");
415                                return (TCL_ERROR);
416                        }
417                        dh_[layer].net(net);
418                        return (TCL_OK);
419                }
420                if (strcmp(argv[1], "ctrl-net") == 0) {
421                        u_int layer = atoi(argv[3]);
422                        if (layer >= NLAYER)
423                                abort();
424                        if (*argv[2] == 0) {
425                                ch_[layer].net(0);
426                                return (TCL_OK);
427                        }
428                        Network* net = (Network*)TclObject::lookup(argv[2]);
429                        if (net == 0) {
430                                tcl.resultf("no network %s", argv[2]);
431                                return (TCL_ERROR);
432                        }
433                        if (net->rchannel() < 0) {
434                                tcl.resultf("network not open");
435                                return (TCL_ERROR);
436                        }
437                        ch_[layer].net(net);
438                        return (TCL_OK);
439                }
440                return (Transmitter::command(argc, argv));
441        }
442        //should not be reached
443        return (TCL_ERROR);
444}
445
446void SessionManager::transmit(pktbuf* pb)
447{
448        //mh_.msg_iov = pb->iov;
449        //      dh_[.net()->send(mh_);
450                //debug_msg("L %d,",pb->layer);
451        // Using loop_layer for now to restrict transmission as well
452        if (pb->layer < loop_layer_) {
453        //      if ( pb->layer <0 ) exit(1);
454                Network* n = dh_[pb->layer].net();
455                if (n != 0)
456                        n->send(pb);
457        }
458}
459
460u_char* SessionManager::build_sdes_item(u_char* p, int code, Source& s)
461{
462        const char* value = s.sdes(code);
463        if (value != 0) {
464                int len = strlen(value);
465                *p++ = code;
466                *p++ = len;
467                memcpy(p, value, len);
468                p += len;
469        }
470        return (p);
471}
472
473int SessionManager::build_sdes(rtcphdr* rh, Source& ls)
474{
475        int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_SDES;
476        rh->rh_flags = htons(flags);
477        rh->rh_ssrc = ls.srcid();
478        u_char* p = (u_char*)(rh + 1);
479        p = build_sdes_item(p, RTCP_SDES_CNAME, ls);
480
481        /*
482         * We always send a cname plus one other sdes
483         * There's a schedule for what we send sequenced by sdes_seq_:
484         *   - send 'email' every 0th & 4th packet
485         *   - send 'note' every 2nd packet
486         *   - send 'tool' every 6th packet
487         *   - send 'name' in all the odd slots
488         * (if 'note' is not the empty string, we switch the roles
489         *  of name & note)
490         */
491        int nameslot, noteslot;
492        const char* note = ls.sdes(RTCP_SDES_NOTE);
493        if (note) {
494                if (*note) {
495                        nameslot = RTCP_SDES_NOTE;
496                        noteslot = RTCP_SDES_NAME;
497                } else {
498                        nameslot = RTCP_SDES_NAME;
499                        noteslot = RTCP_SDES_NOTE;
500                }
501        } else {
502                nameslot = RTCP_SDES_NAME;
503                noteslot = RTCP_SDES_NAME;
504        }
505        u_int seq = (++sdes_seq_) & 0x7;
506        switch (seq) {
507
508        case 0case 4:
509                p = build_sdes_item(p, RTCP_SDES_EMAIL, ls);
510                break;
511
512        case 2:
513                p = build_sdes_item(p, noteslot, ls);
514                break;
515        case 6:
516                p = build_sdes_item(p, RTCP_SDES_TOOL, ls);
517                break;
518        default:
519                p = build_sdes_item(p, nameslot, ls);
520        }
521        int len = p - (u_char*)rh;
522        int pad = 4 - (len & 3);
523        len += pad;
524        rh->rh_len = htons((len >> 2) - 1);
525        while (--pad >= 0)
526                *p++ = 0;
527
528        return (len);
529}
530
531int SessionManager::build_app(rtcphdr* rh, Source& ls, char *name, void *data, int datalen)
532{
533  int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_APP;
534  rh->rh_flags = htons(flags);
535  rh->rh_ssrc = ls.srcid();
536  u_char* p = (u_char*)(rh + 1);
537    int len;
538   
539    // write the name field
540    len = strlen(name);
541    if( len < 4 ) {
542        memcpy(p,name,len);
543        p += len;
544       
545        // pad short names
546        while( p - (u_char*)(rh+1) < 4 )
547            *p++ = 0;
548    }
549    else {
550        // use first four chars of given name
551        memcpy(p,name,4);
552        p += 4;
553    }
554   
555    // write the app data
556    memcpy(p,data,datalen);
557    p += datalen;
558   
559    // pad as needed
560    len = p - (u_char*)rh;
561    int pad = 4 - (len & 3);
562    while( --pad >= 0 )
563        *p++ = 0;
564  len = p - (u_char*)rh;
565
566    // set the length now that it's known
567  rh->rh_len = htons((len >> 2) - 1);
568
569  return (len);
570}
571
572/*void SessionManager::send_report()
573{
574        send_report(0);
575}
576
577  void SessionManager::send_bye()
578  {
579  send_report(1);
580}*/
581
582// SessionManager is no longer used as Timer - Each
583// CtrlHandler has its own Timer which calls this;
584void SessionManager::announce(CtrlHandler* ch)
585{
586        send_report(ch, 0);
587}
588
589/*XXX check for buffer overflow*/
590/*
591* Send an RTPv2 report packet.
592*/
593//void SessionManager::send_report(int bye)
594
595void SessionManager::send_report(CtrlHandler* ch, int bye, int app)
596{
597        UNUSED(app); //SV-XXX: unused
598
599        SourceManager& sm = SourceManager::instance();
600        Source& s = *sm.localsrc();
601        rtcphdr* rh = (rtcphdr*)pktbuf_;
602        rh->rh_ssrc = s.srcid();
603        int flags = RTP_VERSION << 14;
604        int layer = ch - ch_; //LLL
605        Source::Layer& sl = s.layer(layer);
606        timeval now = unixtime();
607        sl.lts_ctrl(now);
608        int we_sent = 0;
609        rtcp_rr* rr;
610        Tcl& tcl = Tcl::instance();
611
612        /*
613         * If we've sent data since our last sender report send a
614         * new report.  The MediaTimer check is to make sure we still
615         * have a grabber -- if not, we won't be able to interpret the
616         * media timestamps so there's no point in sending an SR.
617         */
618        MediaTimer* mt = MediaTimer::instance();
619        if (sl.np() != last_np_ && mt) {
620                last_np_ = sl.np();
621                we_sent = 1;
622                flags |= RTCP_PT_SR;
623                rtcp_sr* sr = (rtcp_sr*)(rh + 1);
624                sr->sr_ntp = ntp64time(now);
625                HTONL(sr->sr_ntp.upper);
626                HTONL(sr->sr_ntp.lower);
627                sr->sr_ts = htonl(mt->ref_ts());
628                sr->sr_np = htonl(sl.np());
629                sr->sr_nb = htonl(sl.nb());
630                rr = (rtcp_rr*)(sr + 1);
631        } else {
632                flags |= RTCP_PT_RR;
633                rr = (rtcp_rr*)(rh + 1);
634        }
635        int nrr = 0;
636        int nsrc = 0;
637        /*
638        * we don't want to inflate report interval if user has set
639        * the flag that causes all sources to be 'kept' so we
640        * consider sources 'inactive' if we haven't heard a control
641        * msg from them for ~32 reporting intervals.
642        */
643        //LLL   u_int inactive = u_int(rint_ * (32./1000.));
644        u_int inactive = u_int(ch->rint() * (32./1000.));
645        if (inactive < 2)
646                inactive = 2;
647        for (Source* sp = sm.sources(); sp != 0; sp = sp->next_) {
648                ++nsrc;
649                Source::Layer& sl = sp->layer(layer);
650                //              int received = sp->np() - sp->snp();
651                int received = sl.np() - sl.snp();
652                if (received == 0) {
653                        //                      if (u_int(now.tv_sec - sp->lts_ctrl().tv_sec) > inactive)
654                        if (u_int(now.tv_sec - sl.lts_ctrl().tv_sec) > inactive)
655                                --nsrc;
656                        continue;
657                }
658                //              sp->snp(sp->np());
659                sl.snp(sl.np());
660                rr->rr_srcid = sp->srcid();
661                //              int expected = sp->ns() - sp->sns();
662                int expected = sl.ns() - sl.sns();
663                //              sp->sns(sp->ns());
664                sl.sns(sl.ns());
665                u_int32_t v;
666                int lost = expected - received;
667                if (lost <= 0)
668                        v = 0;
669                else
670                        /* expected != 0 if lost > 0 */
671                        v = ((lost << 8) / expected) << 24;
672                /* XXX should saturate on over/underflow */
673                //              v |= (sp->ns() - sp->np()) & 0xffffff;
674                v |= (sl.ns() - sl.np()) & 0xffffff;
675                rr->rr_loss = htonl(v);
676                //              rr->rr_ehsr = htonl(sp->ehs());
677                rr->rr_ehsr = htonl(sl.ehs());
678                rr->rr_dv = (sp->handler() != 0) ? sp->handler()->delvar() : 0;
679                //              rr->rr_lsr = htonl(sp->sts_ctrl());
680                rr->rr_lsr = htonl(sl.sts_ctrl());
681                //              if (sp->lts_ctrl().tv_sec == 0)
682                if (sl.lts_ctrl().tv_sec == 0)
683                        rr->rr_dlsr = 0;
684                else {
685                        u_int32_t ntp_now = ntptime(now);
686                        //                      u_int32_t ntp_then = ntptime(sp->lts_ctrl());
687                        u_int32_t ntp_then = ntptime(sl.lts_ctrl());
688                        rr->rr_dlsr = htonl(ntp_now - ntp_then);
689                }
690                ++rr;
691                if (++nrr >= 31)
692                        break;
693        }
694        flags |= nrr << 8;
695        rh->rh_flags = htons(flags);
696        int len = (u_char*)rr - pktbuf_;
697        rh->rh_len = htons((len >> 2) - 1);
698
699        if (bye)
700                len += build_bye((rtcphdr*)rr, s);
701        else
702                len += build_sdes((rtcphdr*)rr, s);
703       
704        // build "site" app data if specified
705        const char *data = tcl.attr("site");
706        if(data)
707        {
708            rr = (rtcp_rr*)(((u_char*)rh) + len);
709            len += build_app((rtcphdr*)rr, s, "site", (void *)data, strlen(data));
710        }
711        //LLL   ch_.send(pktbuf_, len);
712        ch->send(pktbuf_, len);
713       
714        /*
715      rtcp_avg_size_ += RTCP_SIZE_GAIN * (double(len + 28) - rtcp_avg_size_);
716         
717          // compute the time to the next report.  we do this here
718          // because we need to know if there were any active sources
719          // during the last report period (nrr above) & if we were
720          // a source.  The bandwidth limit for rtcp traffic was set
721          // on startup from the session bandwidth.  It is the inverse
722          // of bandwidth (ie., ms/byte) to avoid a divide below.
723       
724        //      double ibw = rtcp_inv_bw_;
725        if (nrr) {
726        // there were active sources
727        //              if (we_sent) {
728        ibw *= 1./RTCP_SENDER_BW_FRACTION;
729        nsrc = nrr;
730        } else {
731        ibw *= 1./RTCP_RECEIVER_BW_FRACTION;
732        nsrc -= nrr;
733        }
734        }
735        double rint = rtcp_avg_size_ * double(nsrc) * ibw;
736        if (rint < RTCP_MIN_RPT_TIME * 1000.)
737                rint = RTCP_MIN_RPT_TIME * 1000.;
738        rint_ = rint;
739        rt_.msched(int(fmod(double(random()), rint) + rint * .5 + .5));
740        */
741       
742        // Call timer adaption for each layer
743        ch->adapt(nsrc, nrr, we_sent);
744        ch->sample_size(len);
745       
746        //      sm.CheckActiveSources(rint);
747        if (layer == 0)
748                sm.CheckActiveSources(ch->rint());
749       
750}
751
752int SessionManager::build_bye(rtcphdr* rh, Source& ls)
753{
754        int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_BYE;
755        rh->rh_flags = ntohs(flags);
756        rh->rh_len = htons(1);
757        rh->rh_ssrc = ls.srcid();
758        return (8);
759}
760
761void SessionManager::recv(DataHandler* dh)
762{
763        int layer = dh - dh_;
764        pktbuf* pb = pool_->alloc(layer);
765        Address * addrp;
766        /* leave room in case we need to expand rtpv1 into an rtpv2 header */
767        /* XXX the free mem routine didn't like it ... */
768        //u_char* bp = &pktbuf_[4];
769        //u_char* bp = pktbuf_;
770       
771        int cc = dh->recv(pb->data, sizeof(pb->data), addrp);
772        //int cc = dh->recv(bp, 2 * RTP_MTU - 4, addrp);
773        if (cc <= 0) {
774                pb->release();
775                return;
776        }
777
778    // Ignore loopback packets
779        if (!loopback_) {
780                rtphdr* rh = (rtphdr*)pb->data;
781                SourceManager& sm = SourceManager::instance();
782                if (rh->rh_ssrc == (*sm.localsrc()).srcid()) {
783                        pb->release();
784                        return;
785                }
786        }
787
788        //rtphdr* rh = (rtphdr*)pb->data;
789        int version = pb->data[0] >> 6;
790        //int version = *(u_char*)rh >> 6;
791        if (version != 2) {
792                ++badversion_;
793                pb->release();
794                return;
795        }
796        if (cc < (int)sizeof(rtphdr)) {
797                ++nrunt_;
798                pb->release();
799                return;
800        }
801        pb->len = cc;
802       
803        //bp += sizeof(*rh);
804        //cc -= sizeof(*rh);
805        demux(pb, *addrp);
806}
807
808void SessionManager::demux(pktbuf* pb, Address & addr)
809{
810        rtphdr* rh = (rtphdr*)pb->data;
811        u_int32_t srcid = rh->rh_ssrc;
812        int flags = ntohs(rh->rh_flags);
813        // for LIP SYNC
814        //SV-XXX: unused: u_char *pkt = pb->data - sizeof(*rh);
815
816        if ((flags & RTP_X) != 0) {
817        /*
818        * the minimal-control audio/video profile
819        * explicitly forbids extensions
820                */
821                ++badext_;
822                pb->release();
823                return;
824        }
825
826        /*
827         * Check for illegal payload types.  Most likely this is
828         * a session packet arriving on the data port.
829         */
830        int fmt = flags & 0x7f;
831        if (!check_format(fmt)) {
832                ++badfmt_;
833                pb->release();
834                return;
835        }
836
837        SourceManager& sm = SourceManager::instance();
838        u_int16_t seqno = ntohs(rh->rh_seqno);
839        Source* s = sm.demux(srcid, addr, seqno, pb->layer);
840        if (s == 0) {
841        /*
842        * Takes a pair of validated packets before we will
843        * believe the source.  This prevents a runaway
844        * allocation of Source data structures for a
845        * stream of garbage packets.
846                */
847                pb->release();
848                return;
849        }
850        /* inform this source of the mbus */
851        s->mbus(&mb_);
852       
853        Source::Layer& sl = s->layer(pb->layer);
854        timeval now = unixtime();
855        //      s->lts_data(now);
856        sl.lts_data(now);
857        //      s->sts_data(rh->rh_ts);
858        sl.sts_data(rh->rh_ts);
859       
860        // extract CSRC count (CC field); increment pb->dp data pointer & adjust length accordingly
861        int cnt = (flags >> 8) & 0xf;
862        if (cnt > 0) {
863                //u_char* nh = (u_char*)rh + (cnt << 2);
864                rtphdr hdr = *rh;
865                pb->dp += (cnt << 2);
866                pb->len -= (cnt << 2);
867                u_int32_t* bp = (u_int32_t*)(rh + 1);
868                while (--cnt >= 0) {
869                        u_int32_t csrc = *(u_int32_t*)bp;
870                        bp += 4;
871                        Source* cs = sm.lookup(csrc, srcid, addr);
872                        //                      cs->lts_data(now);
873                        cs->layer(pb->layer).lts_data(now);
874                        cs->action();
875                }
876                //              rtphdr hdr = *rh;
877                //              rh = (rtphdr*)nh;
878                /*XXX move header up so it's contiguous with data*/
879                rh = (rtphdr*)pb->dp;
880                *rh = hdr;
881        } else
882                s->action();
883
884        if (s->sync() && lipSyncEnabled()) { 
885                /*
886                 * Synchronisation is enabled on this source;
887                 * video packets have to
888                 * be buffered, their playout point scheduled, and the
889                 * playout delays communicated with the audio tool ...
890                 */ 
891
892                /*
893                * XXX bit rate doesn't include rtpv1 options;
894                * but v1 is going away anyway.
895                */
896                //              int dup = s->cs(seqno);
897                //              s->np(1);
898                //              s->nb(cc + sizeof(*rh));
899                int dup = sl.cs(seqno, s);
900                sl.np(1);
901                sl.nb(pb->len);
902                if (dup) {
903                        pb->release();
904                        return;
905                }
906                if (flags & RTP_M) // check if reach frame boundaries
907                        //                      s->nf(1);
908                        sl.nf(1);
909               
910                //s->recv(pkt, rh, pb, pb->len); // this should invoke Source::recv and buffer
911                //s->recv(bp); // this should invoke Source::recv and buffer
912               
913                pktbuf_ = (u_char*)new_blk();
914        } /* synced */
915
916        else { /* ... playout video packets as they arrive */
917                /*
918                 * This is a data packet.  If the source needs activation,
919                 * or the packet format has changed, deal with this.
920                 * Then, hand the packet off to the packet handler.
921                 * XXX might want to be careful about flip-flopping
922                 * here when format changes due to misordered packets
923                 * (easy solution -- keep rtp seqno of last fmt change).
924                 */
925                PacketHandler* h = s->handler();
926                if (h == 0)
927                        h = s->activate(fmt);
928                else if (s->format() != fmt)
929                        h = s->change_format(fmt);
930               
931                        /*
932                        * XXX bit rate doesn't include rtpv1 options;
933                        * but v1 is going away anyway.
934                */
935                //              int dup = s->cs(seqno);
936                //              s->np(1);
937                //              s->nb(cc + sizeof(*rh));
938                int dup = sl.cs(seqno, s);
939                sl.np(1);
940                sl.nb(pb->len);
941                if (dup){
942                        pb->release();
943                        return;
944                }
945                if (flags & RTP_M)
946                        //                  s->nf(1);
947                        sl.nf(1);
948#ifdef notdef
949        /* This should move to the handler */
950        /*XXX could get rid of hdrlen and move run check into recv method*/
951               
952                int hlen = h->hdrlen();
953                cc -= hlen;
954                if (cc < 0) {
955                        //                  s->runt(1);
956                        sl.runt(1);
957                        pb->release();
958                        return;
959                }
960#endif
961                if (s->mute()) {
962                        pb->release();
963                        return;
964                }
965                //h->recv(rh, bp + hlen, cc);
966                h->recv(pb);
967        } /* not sync-ed */
968
969}
970
971void SessionManager::parse_rr_records(u_int32_t, rtcp_rr*, int,
972                                      const u_char*, Address &)
973{
974}
975                                     
976
977void SessionManager::parse_sr(rtcphdr* rh, int flags, u_char*ep,
978                                                          Source* ps, Address & addr, int layer)
979{
980        rtcp_sr* sr = (rtcp_sr*)(rh + 1);
981        Source* s;
982        u_int32_t ssrc = rh->rh_ssrc;
983        if (ps->srcid() != ssrc)
984                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
985        else
986                s = ps;
987       
988        Source::Layer& sl = s->layer(layer);
989       
990        timeval now = unixtime();
991
992        sl.lts_ctrl(now);
993        sl.sts_ctrl(ntohl(sr->sr_ntp.upper) << 16 |
994                ntohl(sr->sr_ntp.lower) >> 16);
995       
996        //int cnt = flags >> 8 & 0x1f;
997        //parse_rr_records(ssrc, (rtcp_rr*)(sr + 1), cnt, ep, addr);
998       
999        /*s->lts_ctrl(now);
1000        s->sts_ctrl(ntohl(sr->sr_ntp.upper) << 16 |
1001        ntohl(sr->sr_ntp.lower) >> 16);*/
1002       
1003        s->rtp_ctrl(ntohl(sr->sr_ts));
1004        u_int32_t t = ntptime(now);
1005        s->map_ntp_time(t);
1006        s->map_rtp_time(s->convert_time(t));
1007        s->rtp2ntp(1);
1008        //printf("Got SR\n");
1009
1010        int cnt = flags >> 8 & 0x1f;
1011        parse_rr_records(ssrc, (rtcp_rr*)(sr + 1), cnt, ep, addr);
1012}
1013
1014void SessionManager::parse_rr(rtcphdr* rh, int flags, u_char* ep,
1015                                                          Source* ps, Address & addr, int layer)
1016{
1017        Source* s;
1018        u_int32_t ssrc = rh->rh_ssrc;
1019        if (ps->srcid() != ssrc)
1020                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
1021        else
1022                s = ps;
1023       
1024        s->layer(layer).lts_ctrl(unixtime());
1025        int cnt = flags >> 8 & 0x1f;
1026        parse_rr_records(ssrc, (rtcp_rr*)(rh + 1), cnt, ep, addr);
1027}
1028
1029int SessionManager::sdesbody(u_int32_t* p, u_char* ep, Source* ps,
1030                                                Address & addr, u_int32_t ssrc, int layer)
1031{
1032        Source* s;
1033        u_int32_t srcid = *p;
1034        if (ps->srcid() != srcid)
1035                s = SourceManager::instance().lookup(srcid, ssrc, addr);
1036        else
1037                s = ps;
1038        if (s == 0)
1039                return (0);
1040                /*
1041                * Note ctrl packet since we will never see any direct ctrl packets
1042                * from a source through a mixer (and we don't want the source to
1043                * time out).
1044        */
1045        s->layer(layer).lts_ctrl(unixtime());
1046       
1047        u_char* cp = (u_char*)(p + 1);
1048        while (cp < ep) {
1049                char buf[256];
1050
1051                u_int type = cp[0];
1052                if (type == 0) {
1053                        /* end of chunk */
1054                        return (((cp - (u_char*)p) >> 2) + 1);
1055                }
1056                u_int len = cp[1];
1057                u_char* eopt = cp + len + 2;
1058                if (eopt > ep)
1059                        return (0);
1060
1061                if (type >= RTCP_SDES_MIN && type <= RTCP_SDES_MAX) {
1062                        memcpy(buf, (char*)&cp[2], len);
1063                        buf[len] = 0;
1064                        s->sdes(type, buf);
1065                } else
1066                        /*XXX*/;
1067
1068                cp = eopt;
1069        }
1070        return (0);
1071}
1072
1073void SessionManager::parse_sdes(rtcphdr* rh, int flags, u_char* ep, Source* ps,
1074                                                                Address & addr, u_int32_t ssrc, int layer)
1075{
1076        int cnt = flags >> 8 & 0x1f;
1077        u_int32_t* p = (u_int32_t*)&rh->rh_ssrc;
1078        while (--cnt >= 0 && (u_char*)p < ep) {
1079                int n = sdesbody(p, ep, ps, addr, ssrc, layer);
1080                if (n == 0)
1081                        break;
1082                p += n;
1083        }
1084        if (cnt >= 0)
1085                ps->badsdes(1);
1086}
1087
1088void SessionManager::parse_bye(rtcphdr* rh, int flags, u_char* ep, Source* ps)
1089{
1090        int cnt = flags >> 8 & 0x1f;
1091        u_int32_t* p = (u_int32_t*)&rh->rh_ssrc;
1092
1093        while (--cnt >= 0) {
1094                if (p >= (u_int32_t*)ep) {
1095                        ps->badbye(1);
1096                        return;
1097                }
1098                Source* s;
1099                if (ps->srcid() != rh->rh_ssrc)
1100                        s = SourceManager::instance().consult(*p);
1101                else
1102                        s = ps;
1103                if (s != 0)
1104                        s->lts_done(unixtime());
1105                ++p;
1106        }
1107}
1108
1109/*
1110 * Receive an rtcp packet (from the control port).
1111 */
1112void SessionManager::recv(CtrlHandler* ch)
1113{
1114        Address * srcp;
1115        int cc = ch->recv(pktbuf_, 2 * RTP_MTU, srcp);
1116        if (cc <= 0)
1117                return;
1118
1119        rtcphdr* rh = (rtcphdr*)pktbuf_;
1120
1121    // Ignore loopback packets
1122        if (!loopback_) {
1123                SourceManager& sm = SourceManager::instance();
1124                if (rh->rh_ssrc == (*sm.localsrc()).srcid())
1125                        return;
1126        }
1127
1128        if (cc < int(sizeof(*rh))) {
1129                ++nrunt_;
1130                return;
1131        }
1132        /*
1133         * try to filter out junk: first thing in packet must be
1134         * sr, rr or bye & version number must be correct.
1135         */
1136        switch(ntohs(rh->rh_flags) & 0xc0ff) {
1137        case RTP_VERSION << 14 | RTCP_PT_SR:
1138        case RTP_VERSION << 14 | RTCP_PT_RR:
1139        case RTP_VERSION << 14 | RTCP_PT_BYE:
1140                break;
1141        default:
1142                /*
1143                 * XXX should further categorize this error -- it is
1144                 * likely that people mis-implement applications that
1145                 * don't put something other than SR,RR,BYE first.
1146                 */
1147                ++badversion_;
1148                return;
1149        }
1150        /*
1151         * at this point we think the packet's valid.  Update our average
1152         * size estimator.  Also, there's valid ssrc so charge errors to it
1153         */
1154        rtcp_avg_size_ += RTCP_SIZE_GAIN * (double(cc + 28) - rtcp_avg_size_);
1155        Address & addr = *srcp;
1156
1157        /*
1158         * First record in compount packet must be the ssrc of the
1159         * sender of the packet.  Pull it out here so we can use
1160         * it in the sdes parsing, since the sdes record doesn't
1161         * contain the ssrc of the sender (in the case of mixers).
1162         */
1163        u_int32_t ssrc = rh->rh_ssrc;
1164        Source* ps = SourceManager::instance().lookup(ssrc, ssrc, addr);
1165        if (ps == 0)
1166                return;
1167       
1168        int layer = ch - ch_;
1169                /*
1170                * Outer loop parses multiple RTCP records of a "compound packet".
1171                * There is no framing between records.  Boundaries are implicit
1172                * and the overall length comes from UDP.
1173        */
1174        u_char* epack = (u_char*)rh + cc;
1175        while ((u_char*)rh < epack) {
1176                u_int len = (ntohs(rh->rh_len) << 2) + 4;
1177                u_char* ep = (u_char*)rh + len;
1178                if (ep > epack) {
1179                        ps->badsesslen(1);
1180                        return;
1181                }
1182                u_int flags = ntohs(rh->rh_flags);
1183                if (flags >> 14 != RTP_VERSION) {
1184                        ps->badsessver(1);
1185                        return;
1186                }
1187                switch (flags & 0xff) {
1188
1189                case RTCP_PT_SR:
1190                        parse_sr(rh, flags, ep, ps, addr, layer);
1191                        break;
1192
1193                case RTCP_PT_RR:
1194                        parse_rr(rh, flags, ep, ps, addr, layer);
1195                        break;
1196
1197                case RTCP_PT_SDES:
1198                        parse_sdes(rh, flags, ep, ps, addr, ssrc, layer);
1199                        break;
1200
1201                case RTCP_PT_BYE:
1202                        parse_bye(rh, flags, ep, ps);
1203                        break;
1204
1205                default:
1206                        ps->badsessopt(1);
1207                        break;
1208                }
1209                rh = (rtcphdr*)ep;
1210        }
1211}
Note: See TracBrowser for help on using the browser.