root/vic/branches/cc/rtp/session.cpp @ 4249

Revision 4249, 31.7 KB (checked in by soohyunc, 6 years ago)

(on-going) parse_xr_records()

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
RevLine 
[900]1/*-
2 * Copyright (c) 1993-1994 The Regents of the University of California.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 * 3. All advertising materials mentioning features or use of this software
14 *    must display the following acknowledgement:
15 *      This product includes software developed by the University of
16 *      California, Berkeley and the Network Research Group at
17 *      Lawrence Berkeley Laboratory.
18 * 4. Neither the name of the University nor of the Laboratory may be used
19 *    to endorse or promote products derived from this software without
20 *    specific prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
23 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
26 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
[4233]33 *
34 * $Id$
[900]35 */
36static const char rcsid[] =
37    "@(#) $Header$ (LBL)";
38
39#include "config.h"
40#include <math.h>
41#include <errno.h>
42#include <string.h>
43#ifdef WIN32
44extern "C" int getpid();
45#endif
46#include "source.h"
47#include "vic_tcl.h"
48#include "media-timer.h"
49#include "crypt.h"
50#include "timer.h"
51#include "ntp-time.h"
52#include "session.h"
[4234]53#include "cc/tfwc_sndr.h"
[4243]54#include "cc/tfwc_rcvr.h"
[900]55
56/* added to support the mbus
57#include "mbus_handler.h"*/
58
59
60static class SessionMatcher : public Matcher {
61    public:
[956]62                SessionMatcher() : Matcher("session") {}
63                TclObject* match(const char* id) {
64                        if (strcmp(id, "audio/rtp") == 0)
65                                return (new AudioSessionManager);
66                        else if (strcmp(id, "video/rtp") == 0)
67                                return (new VideoSessionManager);
68                        return (0);
69                }
[900]70} session_matcher;
71
72int VideoSessionManager::check_format(int fmt) const
73{
74        switch(fmt) {
[956]75                case RTP_PT_RAW:
76                case RTP_PT_CELLB:
77                case RTP_PT_JPEG:
78                case RTP_PT_CUSEEME:
79                case RTP_PT_NV:
80                case RTP_PT_CPV:
81                case RTP_PT_H261:
82                case RTP_PT_BVC:
83                case RTP_PT_H261_COMPAT:/*XXX*/
84                case RTP_PT_H263:
[3885]85                case RTP_PT_MPEG4:
86                case RTP_PT_H264:
[956]87                case RTP_PT_H263P:
88                case RTP_PT_LDCT:
89                case RTP_PT_PVH:
[3805]90                case RTP_PT_DV:
[3795]91#ifdef USE_H261AS
92                case RTP_PT_H261AS:
93#endif 
[900]94                return (1);
95        }
96        return (0);
97}
98
99int AudioSessionManager::check_format(int fmt) const
100{
101        switch (fmt) {
102        case RTP_PT_PCMU:
103        case RTP_PT_CELP:
104        case RTP_PT_GSM:
105        case RTP_PT_DVI:
106        case RTP_PT_LPC:
107                return (1);
108        }
109        return (0);
110}
111
112
113static SessionManager* manager;
114
115void
116adios()
117{
118        if (SourceManager::instance().localsrc() != 0)
119                manager->send_bye();
120        exit(0);
121}
122
[956]123/*void ReportTimer::timeout()
[900]124{
[956]125sm_.send_report();
126}*/
[900]127
128void DataHandler::dispatch(int)
129{
[956]130        sm_->recv(this);
[900]131}
132
133void CtrlHandler::dispatch(int)
134{
[956]135        sm_->recv(this);
[900]136}
137
[956]138CtrlHandler::CtrlHandler()
139: ctrl_inv_bw_(0.),
[3746]140ctrl_avg_size_(128.),
141rint_(0.0) //SV-XXX: Debian
[956]142{
143}
144
145inline void CtrlHandler::schedule_timer()
146{
147        msched(int(fmod(double(random()), rint_) + rint_ * .5 + .5));
148}
149
150void CtrlHandler::net(Network* n)
151{
152        DataHandler::net(n);
153        cancel();
154        if (n != 0) {
155        /*
156        * schedule a timer for our first report using half the
157        * min ctrl interval.  This gives us some time before
158        * our first report to learn about other sources so our
159        * next report interval will account for them.  The avg
160        * ctrl size was initialized to 128 bytes which is
161        * conservative (it assumes everyone else is generating
162        * SRs instead of RRs).
163                */
164                double rint = ctrl_avg_size_ * ctrl_inv_bw_;
165                if (rint < CTRL_MIN_RPT_TIME / 2. * 1000.)
166                        rint = CTRL_MIN_RPT_TIME / 2. * 1000.;
167                rint_ = rint;
168                schedule_timer();
169        }
170}
171
172void CtrlHandler::sample_size(int cc)
173{
174        ctrl_avg_size_ += CTRL_SIZE_GAIN * (double(cc + 28) - ctrl_avg_size_);
175}
176
177void CtrlHandler::adapt(int nsrc, int nrr, int we_sent)
178{
179/*
180* compute the time to the next report.  we do this here
181* because we need to know if there were any active sources
182* during the last report period (nrr above) & if we were
183* a source.  The bandwidth limit for ctrl traffic was set
184* on startup from the session bandwidth.  It is the inverse
185* of bandwidth (ie., ms/byte) to avoid a divide below.
186        */
187        double ibw = ctrl_inv_bw_;
188        if (nrr) {
189                /* there were active sources */
190                if (we_sent) {
191                        ibw *= 1./CTRL_SENDER_BW_FRACTION;
192                        nsrc = nrr;
193                } else {
194                        ibw *= 1./CTRL_RECEIVER_BW_FRACTION;
195                        nsrc -= nrr;
196                }
197        }
198        double rint = ctrl_avg_size_ * double(nsrc) * ibw;
199        if (rint < CTRL_MIN_RPT_TIME * 1000.)
200                rint = CTRL_MIN_RPT_TIME * 1000.;
201        rint_ = rint;
202}
203
204void CtrlHandler::timeout()
205{
206        sm_->announce(this);
207        schedule_timer();
208}
209
[3702]210//SV-XXX: rearranged initialization order to shut up gcc4
[900]211SessionManager::SessionManager()
[956]212//      : dh_(*this), ch_(*this), rt_(*this),
[3702]213: mb_(mbus_handler_engine, NULL),
214lipSyncEnabled_(0),
215badversion_(0),
216badoptions_(0),
217badfmt_(0),
218badext_(0),
219nrunt_(0),
220last_np_(0),
[956]221sdes_seq_(0),
222rtcp_inv_bw_(0.),
[3702]223rtcp_avg_size_(128.),
[4233]224confid_(-1),
225seqno_(0),              // RTP packet sequence number (from RTP header)
226lastseq_(0),    // last received packet's seqno
227ackvec_(0)              // bit vector (AckVec)
[900]228{
[956]229        /*XXX For adios() to send bye*/
[900]230        manager = this;
[956]231       
232        for (int i = 0; i < NLAYER; ++i) {
233                dh_[i].manager(this);
234                ch_[i].manager(this);
235        }
236       
[900]237        /*XXX*/
238        pktbuf_ = new u_char[2 * RTP_MTU];
[956]239        pool_ = new BufferPool;
240       
[900]241        /*
[956]242        * schedule a timer for our first report using half the
243        * min rtcp interval.  This gives us some time before
244        * our first report to learn about other sources so our
245        * next report interval will account for them.  The avg
246        * rtcp size was initialized to 128 bytes which is
247        * conservative (it assumes everyone else is generating
248        * SRs instead of RRs).
249        */
[900]250        double rint = rtcp_avg_size_ * rtcp_inv_bw_;
251        if (rint < RTCP_MIN_RPT_TIME / 2. * 1000.)
252                rint = RTCP_MIN_RPT_TIME / 2. * 1000.;
253        rint_ = rint;
[956]254        //rt_.msched(int(fmod(double(random()), rint) + rint * .5 + .5));
[900]255}
256
257SessionManager::~SessionManager()
258{
259        if (pktbuf_)
[4130]260                delete[] pktbuf_;
[956]261       
262        delete pool_;
[900]263}
264
265u_int32_t SessionManager::alloc_srcid(Address & addr) const
266{
267        timeval tv;
268        ::gettimeofday(&tv, 0);
269        u_int32_t srcid = u_int32_t(tv.tv_sec + tv.tv_usec);
270        srcid += (u_int32_t)getuid();
271        srcid += (u_int32_t)getpid();
272/* __IPv6 changed srcid computation */
[3798]273        for(unsigned int i = 0; i < (addr.length() % sizeof(u_int32_t)); i++) {
[956]274                srcid += ((u_int32_t*)((const void*)addr))[i];
[900]275        }
276        return (srcid);
277}
278
279extern char* onestat(char* cp, const char* name, u_long v);
280
281char* SessionManager::stats(char* cp) const
282{
283        cp = onestat(cp, "Bad-RTP-version", badversion_);
284        cp = onestat(cp, "Bad-RTPv1-options", badoptions_);
285        cp = onestat(cp, "Bad-Payload-Format", badfmt_);
286        cp = onestat(cp, "Bad-RTP-Extension", badext_);
287        cp = onestat(cp, "Runts", nrunt_);
[956]288        Crypt* p = dh_[0].net()->crypt();
[900]289        if (p != 0) {
290                cp = onestat(cp, "Crypt-Bad-Length", p->badpktlen());
291                cp = onestat(cp, "Crypt-Bad-P-Bit", p->badpbit());
292        }
293        /*XXX*/
[956]294        if (ch_[0].net() != 0) {
295                Crypt* p = ch_[0].net()->crypt();
[900]296                if (p != 0) {
297                        cp = onestat(cp, "Crypt-Ctrl-Bad-Length", p->badpktlen());
298                        cp = onestat(cp, "Crypt-Ctrl-Bad-P-Bit", p->badpbit());
299                }
300        }
301        *--cp = 0;
302        return (cp);
303}
304
305int SessionManager::command(int argc, const char*const* argv)
306{
307        Tcl& tcl = Tcl::instance();
308        char* cp = tcl.buffer();
309        if (argc == 2) {
310                if (strcmp(argv[1], "active") == 0) {
311                        SourceManager::instance().sortactive(cp);
312                        tcl.result(cp);
313                        return (TCL_OK);
314                }
315                if (strcmp(argv[1], "local-addr-heuristic") == 0) {
316                        strcpy(cp, intoa(LookupLocalAddr()));
317                        tcl.result(cp);
318                        return (TCL_OK);
319                }
320                if (strcmp(argv[1], "stats") == 0) {
321                        stats(cp);
322                        tcl.result(cp);
323                        return (TCL_OK);
324                }
325                if (strcmp(argv[1], "nb") == 0) {
326                        sprintf(cp, "%u", 8 * nb_);
327                        tcl.result(cp);
328                        return (TCL_OK);
329                }
330                if (strcmp(argv[1], "nf") == 0) {
331                        sprintf(cp, "%u", nf_);
332                        tcl.result(cp);
333                        return (TCL_OK);
334                }
335                if (strcmp(argv[1], "np") == 0 ||
336                    strcmp(argv[1], "ns") == 0) {
337                        sprintf(cp, "%u", np_);
338                        tcl.result(cp);
339                        return (TCL_OK);
340                }
341                if (strcmp(argv[1], "lip-sync") == 0) {
342                        sprintf(cp, "%u", lipSyncEnabled_);
343                        tcl.result(cp);
344                        return (TCL_OK);
345                }
346
347        } else if (argc == 3) {
[956]348                if (strcmp(argv[1], "sm") == 0) {
349                        sm_ = (SourceManager*)TclObject::lookup(argv[2]);
350                        return (TCL_OK);
351                }
[900]352                if (strcmp(argv[1], "name") == 0) {
353                        Source* s = SourceManager::instance().localsrc();
354                        s->sdes(RTCP_SDES_NAME, argv[2]);
355                        return (TCL_OK);
356                }
357                if (strcmp(argv[1], "email") == 0) {
358                        Source* s = SourceManager::instance().localsrc();
359                        s->sdes(RTCP_SDES_EMAIL, argv[2]);
360                        return (TCL_OK);
361                }
362                if (strcmp(argv[1], "random-srcid") == 0) {
363                        Address * addrp;
[3753]364                        if ((addrp = (dh_[0].net())->alloc(argv[2])) !=0 ) { //SV-XXX: placed ()'s against truth check == NULL
[900]365                          sprintf(cp, "%u", alloc_srcid(*addrp));
366                          delete addrp;
367                        }
368                        tcl.result(cp);
369                        return (TCL_OK);
370                }
371                if (strcmp(argv[1], "data-net") == 0) {
[956]372                        dh_[0].net((Network*)TclObject::lookup(argv[2]));
[900]373                        return (TCL_OK);
374                }
375                if (strcmp(argv[1], "ctrl-net") == 0) {
[956]376                        ch_[0].net((Network*)TclObject::lookup(argv[2]));
[900]377                        return (TCL_OK);
378                }
379                if (strcmp(argv[1], "max-bandwidth") == 0) {
380                        double bw = atof(argv[2]) / 8.;
381                        rtcp_inv_bw_ = 1. / (bw * RTCP_SESSION_BW_FRACTION);
382                        return (TCL_OK);
383                }
384                if (strcmp(argv[1], "confid") == 0) {
385                        confid_ = atoi(argv[2]);
386                        return (TCL_OK);
387                }
388                if (strcmp(argv[1], "data-bandwidth") == 0) {
389                        /*XXX assume value in range */
390                        bps(atoi(argv[2]));
391                        return (TCL_OK);
392                }
393                if (strcmp(argv[1], "mtu") == 0) {
394                        mtu_ = atoi(argv[2]);
395                        return (TCL_OK);
396                }
397                if (strcmp(argv[1], "loopback") == 0) {
398                        loopback_ = atoi(argv[2]);
399                        return (TCL_OK);
400                }
401                if (strcmp(argv[1], "lip-sync") == 0) {
402                        lipSyncEnabled_ = atoi(argv[2]);
403                        return (TCL_OK);
404                }
[956]405
406        }  else if (argc == 4) {
407                if (strcmp(argv[1], "data-net") == 0) {
408                        u_int layer = atoi(argv[3]);
409                        if (layer >= NLAYER)
410                                abort();
411                        if (*argv[2] == 0) {
412                                dh_[layer].net(0);
413                                return (TCL_OK);
414                        }
415                        Network* net = (Network*)TclObject::lookup(argv[2]);
416                        if (net == 0) {
417                                tcl.resultf("no network %s", argv[2]);
418                                return (TCL_ERROR);
419                        }
420                        if (net->rchannel() < 0) {
421                                tcl.resultf("network not open");
422                                return (TCL_ERROR);
423                        }
424                        dh_[layer].net(net);
425                        return (TCL_OK);
426                }
427                if (strcmp(argv[1], "ctrl-net") == 0) {
428                        u_int layer = atoi(argv[3]);
429                        if (layer >= NLAYER)
430                                abort();
431                        if (*argv[2] == 0) {
432                                ch_[layer].net(0);
433                                return (TCL_OK);
434                        }
435                        Network* net = (Network*)TclObject::lookup(argv[2]);
436                        if (net == 0) {
437                                tcl.resultf("no network %s", argv[2]);
438                                return (TCL_ERROR);
439                        }
440                        if (net->rchannel() < 0) {
441                                tcl.resultf("network not open");
442                                return (TCL_ERROR);
443                        }
444                        ch_[layer].net(net);
445                        return (TCL_OK);
446                }
447                return (Transmitter::command(argc, argv));
[900]448        }
[3773]449        //should not be reached
450        return (TCL_ERROR);
[900]451}
452
453void SessionManager::transmit(pktbuf* pb)
454{
[956]455        //mh_.msg_iov = pb->iov;
456        //      dh_[.net()->send(mh_);
457                //debug_msg("L %d,",pb->layer);
458        // Using loop_layer for now to restrict transmission as well
459        if (pb->layer < loop_layer_) {
460        //      if ( pb->layer <0 ) exit(1);
461                Network* n = dh_[pb->layer].net();
462                if (n != 0)
463                        n->send(pb);
464        }
[900]465}
466
467u_char* SessionManager::build_sdes_item(u_char* p, int code, Source& s)
468{
469        const char* value = s.sdes(code);
470        if (value != 0) {
471                int len = strlen(value);
472                *p++ = code;
473                *p++ = len;
474                memcpy(p, value, len);
475                p += len;
476        }
477        return (p);
478}
479
480int SessionManager::build_sdes(rtcphdr* rh, Source& ls)
481{
482        int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_SDES;
483        rh->rh_flags = htons(flags);
484        rh->rh_ssrc = ls.srcid();
485        u_char* p = (u_char*)(rh + 1);
486        p = build_sdes_item(p, RTCP_SDES_CNAME, ls);
487
488        /*
489         * We always send a cname plus one other sdes
490         * There's a schedule for what we send sequenced by sdes_seq_:
491         *   - send 'email' every 0th & 4th packet
492         *   - send 'note' every 2nd packet
493         *   - send 'tool' every 6th packet
494         *   - send 'name' in all the odd slots
495         * (if 'note' is not the empty string, we switch the roles
496         *  of name & note)
497         */
498        int nameslot, noteslot;
499        const char* note = ls.sdes(RTCP_SDES_NOTE);
500        if (note) {
501                if (*note) {
502                        nameslot = RTCP_SDES_NOTE;
503                        noteslot = RTCP_SDES_NAME;
504                } else {
505                        nameslot = RTCP_SDES_NAME;
506                        noteslot = RTCP_SDES_NOTE;
507                }
508        } else {
509                nameslot = RTCP_SDES_NAME;
510                noteslot = RTCP_SDES_NAME;
511        }
512        u_int seq = (++sdes_seq_) & 0x7;
513        switch (seq) {
514
515        case 0case 4:
516                p = build_sdes_item(p, RTCP_SDES_EMAIL, ls);
517                break;
518
519        case 2:
520                p = build_sdes_item(p, noteslot, ls);
521                break;
522        case 6:
523                p = build_sdes_item(p, RTCP_SDES_TOOL, ls);
524                break;
525        default:
526                p = build_sdes_item(p, nameslot, ls);
527        }
528        int len = p - (u_char*)rh;
529        int pad = 4 - (len & 3);
530        len += pad;
531        rh->rh_len = htons((len >> 2) - 1);
532        while (--pad >= 0)
533                *p++ = 0;
534
535        return (len);
536}
537
[4184]538int SessionManager::build_app(rtcphdr* rh, Source& ls, const char *name,
539                void *data, int datalen)
[3995]540{
541  int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_APP;
542  rh->rh_flags = htons(flags);
543  rh->rh_ssrc = ls.srcid();
544  u_char* p = (u_char*)(rh + 1);
545    int len;
546   
547    // write the name field
548    len = strlen(name);
549    if( len < 4 ) {
550        memcpy(p,name,len);
551        p += len;
552       
553        // pad short names
554        while( p - (u_char*)(rh+1) < 4 )
555            *p++ = 0;
556    }
557    else {
558        // use first four chars of given name
559        memcpy(p,name,4);
560        p += 4;
561    }
562   
563    // write the app data
564    memcpy(p,data,datalen);
565    p += datalen;
566   
567    // pad as needed
568    len = p - (u_char*)rh;
569    int pad = 4 - (len & 3);
570    while( --pad >= 0 )
571        *p++ = 0;
572  len = p - (u_char*)rh;
573
574    // set the length now that it's known
575  rh->rh_len = htons((len >> 2) - 1);
576
577  return (len);
578}
579
[956]580/*void SessionManager::send_report()
[900]581{
582        send_report(0);
583}
584
[956]585  void SessionManager::send_bye()
586  {
587  send_report(1);
588}*/
589
590// SessionManager is no longer used as Timer - Each
591// CtrlHandler has its own Timer which calls this;
592void SessionManager::announce(CtrlHandler* ch)
[900]593{
[956]594        send_report(ch, 0);
[900]595}
596
597/*XXX check for buffer overflow*/
598/*
[956]599* Send an RTPv2 report packet.
600*/
601void SessionManager::send_report(CtrlHandler* ch, int bye, int app)
[900]602{
[3702]603        UNUSED(app); //SV-XXX: unused
604
[900]605        SourceManager& sm = SourceManager::instance();
606        Source& s = *sm.localsrc();
607        rtcphdr* rh = (rtcphdr*)pktbuf_;
608        rh->rh_ssrc = s.srcid();
609        int flags = RTP_VERSION << 14;
[956]610        int layer = ch - ch_; //LLL
611        Source::Layer& sl = s.layer(layer);
[900]612        timeval now = unixtime();
[956]613        sl.lts_ctrl(now);
[900]614        int we_sent = 0;
615        rtcp_rr* rr;
[4248]616        rtcp_xr_hdr* xrh;       // extended report header
617        rtcp_xr_blk* xrb;       // extended report block
[3995]618        Tcl& tcl = Tcl::instance();
619
[900]620        /*
621         * If we've sent data since our last sender report send a
622         * new report.  The MediaTimer check is to make sure we still
623         * have a grabber -- if not, we won't be able to interpret the
624         * media timestamps so there's no point in sending an SR.
625         */
626        MediaTimer* mt = MediaTimer::instance();
[956]627        if (sl.np() != last_np_ && mt) {
628                last_np_ = sl.np();
[900]629                we_sent = 1;
630                flags |= RTCP_PT_SR;
631                rtcp_sr* sr = (rtcp_sr*)(rh + 1);
632                sr->sr_ntp = ntp64time(now);
633                HTONL(sr->sr_ntp.upper);
634                HTONL(sr->sr_ntp.lower);
635                sr->sr_ts = htonl(mt->ref_ts());
[956]636                sr->sr_np = htonl(sl.np());
637                sr->sr_nb = htonl(sl.nb());
[900]638                rr = (rtcp_rr*)(sr + 1);
639        } else {
640                flags |= RTCP_PT_RR;
641                rr = (rtcp_rr*)(rh + 1);
642        }
[4239]643
644        // if CC is turned on, we need XR report
645        if (is_cc_on()) {
[4242]646                flags |= RTCP_PT_XR;            // setting flags to XR
[4248]647                xrh = (rtcp_xr_hdr*)(rh + 1);   // XR header
[4249]648                int xrlen = (xrh->xr_flags << 16) >> 16; // XR length
[4248]649                xrb = (rtcp_xr_blk*)(xrh + xrlen + 1);  // XR block
650                xrb->begin_seq = lastseq_;// this will be used for ackofack
651                xrb->end_seq = seqno_ + 1;// as defined in RFC3611 section 4.1
652                xrb->chunk = get_ackvec();      // ackvec
[4239]653        }
654
[900]655        int nrr = 0;
656        int nsrc = 0;
657        /*
[956]658        * we don't want to inflate report interval if user has set
659        * the flag that causes all sources to be 'kept' so we
660        * consider sources 'inactive' if we haven't heard a control
661        * msg from them for ~32 reporting intervals.
662        */
663        //LLL   u_int inactive = u_int(rint_ * (32./1000.));
664        u_int inactive = u_int(ch->rint() * (32./1000.));
[900]665        if (inactive < 2)
666                inactive = 2;
667        for (Source* sp = sm.sources(); sp != 0; sp = sp->next_) {
668                ++nsrc;
[956]669                Source::Layer& sl = sp->layer(layer);
670                //              int received = sp->np() - sp->snp();
671                int received = sl.np() - sl.snp();
[900]672                if (received == 0) {
[4234]673                        //              if (u_int(now.tv_sec - sp->lts_ctrl().tv_sec) > inactive)
[956]674                        if (u_int(now.tv_sec - sl.lts_ctrl().tv_sec) > inactive)
[900]675                                --nsrc;
676                        continue;
677                }
[956]678                //              sp->snp(sp->np());
679                sl.snp(sl.np());
[900]680                rr->rr_srcid = sp->srcid();
[956]681                //              int expected = sp->ns() - sp->sns();
682                int expected = sl.ns() - sl.sns();
683                //              sp->sns(sp->ns());
684                sl.sns(sl.ns());
[900]685                u_int32_t v;
686                int lost = expected - received;
687                if (lost <= 0)
688                        v = 0;
689                else
690                        /* expected != 0 if lost > 0 */
691                        v = ((lost << 8) / expected) << 24;
692                /* XXX should saturate on over/underflow */
[956]693                //              v |= (sp->ns() - sp->np()) & 0xffffff;
694                v |= (sl.ns() - sl.np()) & 0xffffff;
[900]695                rr->rr_loss = htonl(v);
[956]696                //              rr->rr_ehsr = htonl(sp->ehs());
697                rr->rr_ehsr = htonl(sl.ehs());
[900]698                rr->rr_dv = (sp->handler() != 0) ? sp->handler()->delvar() : 0;
[956]699                //              rr->rr_lsr = htonl(sp->sts_ctrl());
700                rr->rr_lsr = htonl(sl.sts_ctrl());
701                //              if (sp->lts_ctrl().tv_sec == 0)
702                if (sl.lts_ctrl().tv_sec == 0)
[900]703                        rr->rr_dlsr = 0;
704                else {
705                        u_int32_t ntp_now = ntptime(now);
[956]706                        //                      u_int32_t ntp_then = ntptime(sp->lts_ctrl());
707                        u_int32_t ntp_then = ntptime(sl.lts_ctrl());
[900]708                        rr->rr_dlsr = htonl(ntp_now - ntp_then);
709                }
710                ++rr;
711                if (++nrr >= 31)
712                        break;
713        }
714        flags |= nrr << 8;
715        rh->rh_flags = htons(flags);
716        int len = (u_char*)rr - pktbuf_;
717        rh->rh_len = htons((len >> 2) - 1);
718
719        if (bye)
720                len += build_bye((rtcphdr*)rr, s);
721        else
722                len += build_sdes((rtcphdr*)rr, s);
[956]723       
[3995]724        // build "site" app data if specified
725        const char *data = tcl.attr("site");
726        if(data)
727        {
728            rr = (rtcp_rr*)(((u_char*)rh) + len);
729            len += build_app((rtcphdr*)rr, s, "site", (void *)data, strlen(data));
730        }
[956]731        //LLL   ch_.send(pktbuf_, len);
732        ch->send(pktbuf_, len);
733       
[3995]734        /*
735      rtcp_avg_size_ += RTCP_SIZE_GAIN * (double(len + 28) - rtcp_avg_size_);
[3702]736         
737          // compute the time to the next report.  we do this here
738          // because we need to know if there were any active sources
739          // during the last report period (nrr above) & if we were
740          // a source.  The bandwidth limit for rtcp traffic was set
741          // on startup from the session bandwidth.  It is the inverse
742          // of bandwidth (ie., ms/byte) to avoid a divide below.
743       
744        //      double ibw = rtcp_inv_bw_;
[900]745        if (nrr) {
[3702]746        // there were active sources
747        //              if (we_sent) {
[956]748        ibw *= 1./RTCP_SENDER_BW_FRACTION;
749        nsrc = nrr;
750        } else {
751        ibw *= 1./RTCP_RECEIVER_BW_FRACTION;
752        nsrc -= nrr;
[900]753        }
[956]754        }
[900]755        double rint = rtcp_avg_size_ * double(nsrc) * ibw;
756        if (rint < RTCP_MIN_RPT_TIME * 1000.)
757                rint = RTCP_MIN_RPT_TIME * 1000.;
758        rint_ = rint;
759        rt_.msched(int(fmod(double(random()), rint) + rint * .5 + .5));
[956]760        */
761       
762        // Call timer adaption for each layer
763        ch->adapt(nsrc, nrr, we_sent);
764        ch->sample_size(len);
765       
766        //      sm.CheckActiveSources(rint);
767        if (layer == 0)
768                sm.CheckActiveSources(ch->rint());
769       
[900]770}
771
772int SessionManager::build_bye(rtcphdr* rh, Source& ls)
773{
774        int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_BYE;
775        rh->rh_flags = ntohs(flags);
776        rh->rh_len = htons(1);
777        rh->rh_ssrc = ls.srcid();
778        return (8);
779}
780
[4234]781/*
[4242]782 * receive an RTP data packet
[4234]783 */
[900]784void SessionManager::recv(DataHandler* dh)
785{
[956]786        int layer = dh - dh_;
787        pktbuf* pb = pool_->alloc(layer);
[900]788        Address * addrp;
789        /* leave room in case we need to expand rtpv1 into an rtpv2 header */
790        /* XXX the free mem routine didn't like it ... */
791        //u_char* bp = &pktbuf_[4];
[956]792        //u_char* bp = pktbuf_;
793       
794        int cc = dh->recv(pb->data, sizeof(pb->data), addrp);
795        //int cc = dh->recv(bp, 2 * RTP_MTU - 4, addrp);
796        if (cc <= 0) {
797                pb->release();
[900]798                return;
[956]799        }
[1032]800
[4233]801        rtphdr* rh = (rtphdr*)pb->data;
[4243]802        seqno_ = ntohs(rh->rh_seqno);   // get received packet seqno
[4229]803
[1032]804    // Ignore loopback packets
805        if (!loopback_) {
[4233]806                //rtphdr* rh = (rtphdr*)pb->data;
[1032]807                SourceManager& sm = SourceManager::instance();
808                if (rh->rh_ssrc == (*sm.localsrc()).srcid()) {
[4233]809                        debug_msg("(loopback) seqno:    %d\n", seqno_);
810                        pb->release();  // releasing loopback packet
[1032]811                        return;
812                }
[4243]813        } // now, loopback packets ignored (if disabled)
814
[4245]815        // set received seqno - passing seqno to TfwcRcvr
816        set_received_seqno(seqno_, lastseq_);
817        lastseq_ = seqno_;      // set last seqno
[1032]818
[956]819        int version = pb->data[0] >> 6;
820        //int version = *(u_char*)rh >> 6;
[900]821        if (version != 2) {
822                ++badversion_;
[956]823                pb->release();
[900]824                return;
825        }
[3798]826        if (cc < (int)sizeof(rtphdr)) {
[956]827                ++nrunt_;
828                pb->release();
829                return;
830        }
831        pb->len = cc;
832       
833        //bp += sizeof(*rh);
834        //cc -= sizeof(*rh);
835        demux(pb, *addrp);
[900]836}
837
[956]838void SessionManager::demux(pktbuf* pb, Address & addr)
[900]839{
[956]840        rtphdr* rh = (rtphdr*)pb->data;
[900]841        u_int32_t srcid = rh->rh_ssrc;
842        int flags = ntohs(rh->rh_flags);
[956]843        // for LIP SYNC
[3702]844        //SV-XXX: unused: u_char *pkt = pb->data - sizeof(*rh);
[956]845
[900]846        if ((flags & RTP_X) != 0) {
[956]847        /*
848        * the minimal-control audio/video profile
849        * explicitly forbids extensions
850                */
[900]851                ++badext_;
[956]852                pb->release();
[900]853                return;
854        }
855
856        /*
857         * Check for illegal payload types.  Most likely this is
858         * a session packet arriving on the data port.
859         */
860        int fmt = flags & 0x7f;
861        if (!check_format(fmt)) {
862                ++badfmt_;
[956]863                pb->release();
[900]864                return;
865        }
866
867        SourceManager& sm = SourceManager::instance();
868        u_int16_t seqno = ntohs(rh->rh_seqno);
[956]869        Source* s = sm.demux(srcid, addr, seqno, pb->layer);
870        if (s == 0) {
871        /*
872        * Takes a pair of validated packets before we will
873        * believe the source.  This prevents a runaway
874        * allocation of Source data structures for a
875        * stream of garbage packets.
876                */
877                pb->release();
[900]878                return;
[956]879        }
[900]880        /* inform this source of the mbus */
881        s->mbus(&mb_);
882       
[956]883        Source::Layer& sl = s->layer(pb->layer);
[900]884        timeval now = unixtime();
[956]885        //      s->lts_data(now);
886        sl.lts_data(now);
887        //      s->sts_data(rh->rh_ts);
888        sl.sts_data(rh->rh_ts);
889       
890        // extract CSRC count (CC field); increment pb->dp data pointer & adjust length accordingly
[900]891        int cnt = (flags >> 8) & 0xf;
892        if (cnt > 0) {
[956]893                //u_char* nh = (u_char*)rh + (cnt << 2);
894                rtphdr hdr = *rh;
895                pb->dp += (cnt << 2);
896                pb->len -= (cnt << 2);
897                u_int32_t* bp = (u_int32_t*)(rh + 1);
[900]898                while (--cnt >= 0) {
899                        u_int32_t csrc = *(u_int32_t*)bp;
900                        bp += 4;
901                        Source* cs = sm.lookup(csrc, srcid, addr);
[956]902                        //                      cs->lts_data(now);
903                        cs->layer(pb->layer).lts_data(now);
[900]904                        cs->action();
905                }
[956]906                //              rtphdr hdr = *rh;
907                //              rh = (rtphdr*)nh;
[900]908                /*XXX move header up so it's contiguous with data*/
[956]909                rh = (rtphdr*)pb->dp;
[900]910                *rh = hdr;
911        } else
912                s->action();
913
914        if (s->sync() && lipSyncEnabled()) { 
915                /*
916                 * Synchronisation is enabled on this source;
917                 * video packets have to
918                 * be buffered, their playout point scheduled, and the
919                 * playout delays communicated with the audio tool ...
920                 */ 
921
922                /*
[956]923                * XXX bit rate doesn't include rtpv1 options;
924                * but v1 is going away anyway.
925                */
926                //              int dup = s->cs(seqno);
927                //              s->np(1);
928                //              s->nb(cc + sizeof(*rh));
929                int dup = sl.cs(seqno, s);
930                sl.np(1);
931                sl.nb(pb->len);
932                if (dup) {
933                        pb->release();
[900]934                        return;
[956]935                }
936                if (flags & RTP_M) // check if reach frame boundaries
937                        //                      s->nf(1);
938                        sl.nf(1);
939               
940                //s->recv(pkt, rh, pb, pb->len); // this should invoke Source::recv and buffer
941                //s->recv(bp); // this should invoke Source::recv and buffer
942               
[900]943                pktbuf_ = (u_char*)new_blk();
944        } /* synced */
945
946        else { /* ... playout video packets as they arrive */
947                /*
948                 * This is a data packet.  If the source needs activation,
949                 * or the packet format has changed, deal with this.
950                 * Then, hand the packet off to the packet handler.
951                 * XXX might want to be careful about flip-flopping
952                 * here when format changes due to misordered packets
953                 * (easy solution -- keep rtp seqno of last fmt change).
954                 */
955                PacketHandler* h = s->handler();
[956]956                if (h == 0)
957                        h = s->activate(fmt);
958                else if (s->format() != fmt)
959                        h = s->change_format(fmt);
960               
961                        /*
962                        * XXX bit rate doesn't include rtpv1 options;
963                        * but v1 is going away anyway.
964                */
965                //              int dup = s->cs(seqno);
966                //              s->np(1);
967                //              s->nb(cc + sizeof(*rh));
968                int dup = sl.cs(seqno, s);
969                sl.np(1);
970                sl.nb(pb->len);
971                if (dup){
972                        pb->release();
973                        return;
974                }
975                if (flags & RTP_M)
976                        //                  s->nf(1);
977                        sl.nf(1);
978#ifdef notdef
979        /* This should move to the handler */
980        /*XXX could get rid of hdrlen and move run check into recv method*/
981               
982                int hlen = h->hdrlen();
983                cc -= hlen;
984                if (cc < 0) {
985                        //                  s->runt(1);
986                        sl.runt(1);
987                        pb->release();
988                        return;
989                }
990#endif
991                if (s->mute()) {
992                        pb->release();
993                        return;
994                }
995                //h->recv(rh, bp + hlen, cc);
996                h->recv(pb);
[900]997        } /* not sync-ed */
998
999}
1000
1001void SessionManager::parse_rr_records(u_int32_t, rtcp_rr*, int,
1002                                      const u_char*, Address &)
1003{
1004}
1005                                     
1006
1007void SessionManager::parse_sr(rtcphdr* rh, int flags, u_char*ep,
[956]1008                                                          Source* ps, Address & addr, int layer)
[900]1009{
1010        rtcp_sr* sr = (rtcp_sr*)(rh + 1);
1011        Source* s;
1012        u_int32_t ssrc = rh->rh_ssrc;
1013        if (ps->srcid() != ssrc)
1014                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
1015        else
1016                s = ps;
[956]1017       
1018        Source::Layer& sl = s->layer(layer);
1019       
[900]1020        timeval now = unixtime();
[956]1021
1022        sl.lts_ctrl(now);
1023        sl.sts_ctrl(ntohl(sr->sr_ntp.upper) << 16 |
1024                ntohl(sr->sr_ntp.lower) >> 16);
1025       
1026        //int cnt = flags >> 8 & 0x1f;
1027        //parse_rr_records(ssrc, (rtcp_rr*)(sr + 1), cnt, ep, addr);
1028       
1029        /*s->lts_ctrl(now);
[900]1030        s->sts_ctrl(ntohl(sr->sr_ntp.upper) << 16 |
[956]1031        ntohl(sr->sr_ntp.lower) >> 16);*/
1032       
[900]1033        s->rtp_ctrl(ntohl(sr->sr_ts));
1034        u_int32_t t = ntptime(now);
1035        s->map_ntp_time(t);
1036        s->map_rtp_time(s->convert_time(t));
1037        s->rtp2ntp(1);
1038        //printf("Got SR\n");
1039
1040        int cnt = flags >> 8 & 0x1f;
1041        parse_rr_records(ssrc, (rtcp_rr*)(sr + 1), cnt, ep, addr);
1042}
1043
1044void SessionManager::parse_rr(rtcphdr* rh, int flags, u_char* ep,
[956]1045                                                          Source* ps, Address & addr, int layer)
[900]1046{
1047        Source* s;
1048        u_int32_t ssrc = rh->rh_ssrc;
1049        if (ps->srcid() != ssrc)
1050                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
1051        else
1052                s = ps;
[956]1053       
1054        s->layer(layer).lts_ctrl(unixtime());
[900]1055        int cnt = flags >> 8 & 0x1f;
1056        parse_rr_records(ssrc, (rtcp_rr*)(rh + 1), cnt, ep, addr);
1057}
1058
[4234]1059void SessionManager::parse_xr(rtcphdr* rh, int flags, u_char* ep,
1060                                                          Source* ps, Address & addr, int layer)
1061{
1062
1063        Source* s;
1064        u_int32_t ssrc = rh->rh_ssrc;
1065        if (ps->srcid() != ssrc)
1066                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
1067        else
1068                s = ps;
1069
1070        s->layer(layer).lts_ctrl(unixtime());
1071        int cnt = flags >> 8 & 0x1f;
[4248]1072        parse_xr_records(ssrc, (rtcp_xr_hdr*)(rh + 1), cnt, ep, addr);
[4234]1073}
1074
[4248]1075void SessionManager::parse_xr_records(u_int32_t ssrc, rtcp_xr_hdr* xrh, int cnt,
[4234]1076                                      const u_char* ep, Address & addr)
1077{
[4240]1078        debug_msg("XXX parse_xr_records\n");
[4237]1079        UNUSED(cnt);
1080        UNUSED(ep);
1081        UNUSED(addr);
1082
[4249]1083        int xrlen = (xrh->xr_flags << 16) >> 16;
1084        rtcp_xr_blk* xrb = (rtcp_xr_blk*)(xrh + xrlen + 1);
[4236]1085        /*
[4248]1086         * if AoA is received, trim ackvec and send a new ackvec
[4236]1087         * if AckVec is received, then parse it to TfwcSndr
1088         */
[4249]1089        if (xrb->begin_seq == xrb->end_seq) {
1090                // we just received ackofack, so do receiver stuffs here
1091               
1092                //trim_vec(xrb->chunk); // chunk in xrb is ackvec
1093                ch_[0].send(build_ackvpkt(xrh), xrlen);
1094        } else {
1095                ackvec_ = xrb->chunk;
1096                ackofack_ = xrb->begin_seq;
1097                // time stamp update comes to here
1098                tfwc_sndr_recv(ackvec_);        // parse AckVec
1099        }
[4234]1100}
1101
[4249]1102u_char* SessionManager::build_ackvpkt(rtcp_xr_hdr* xrh)
1103{
1104        u_char* p = (u_char*) xrh;
1105        return (p);
1106}
1107
[900]1108int SessionManager::sdesbody(u_int32_t* p, u_char* ep, Source* ps,
[956]1109                                                Address & addr, u_int32_t ssrc, int layer)
[900]1110{
1111        Source* s;
1112        u_int32_t srcid = *p;
1113        if (ps->srcid() != srcid)
1114                s = SourceManager::instance().lookup(srcid, ssrc, addr);
1115        else
1116                s = ps;
1117        if (s == 0)
1118                return (0);
[956]1119                /*
1120                * Note ctrl packet since we will never see any direct ctrl packets
1121                * from a source through a mixer (and we don't want the source to
1122                * time out).
1123        */
1124        s->layer(layer).lts_ctrl(unixtime());
1125       
[900]1126        u_char* cp = (u_char*)(p + 1);
1127        while (cp < ep) {
1128                char buf[256];
1129
1130                u_int type = cp[0];
1131                if (type == 0) {
1132                        /* end of chunk */
1133                        return (((cp - (u_char*)p) >> 2) + 1);
1134                }
1135                u_int len = cp[1];
1136                u_char* eopt = cp + len + 2;
1137                if (eopt > ep)
1138                        return (0);
1139
1140                if (type >= RTCP_SDES_MIN && type <= RTCP_SDES_MAX) {
1141                        memcpy(buf, (char*)&cp[2], len);
1142                        buf[len] = 0;
1143                        s->sdes(type, buf);
[4233]1144                } // else
[900]1145                        /*XXX*/;
1146
1147                cp = eopt;
1148        }
1149        return (0);
1150}
1151
1152void SessionManager::parse_sdes(rtcphdr* rh, int flags, u_char* ep, Source* ps,
[956]1153                                                                Address & addr, u_int32_t ssrc, int layer)
[900]1154{
1155        int cnt = flags >> 8 & 0x1f;
1156        u_int32_t* p = (u_int32_t*)&rh->rh_ssrc;
[956]1157        while (--cnt >= 0 && (u_char*)p < ep) {
1158                int n = sdesbody(p, ep, ps, addr, ssrc, layer);
[900]1159                if (n == 0)
1160                        break;
1161                p += n;
1162        }
1163        if (cnt >= 0)
1164                ps->badsdes(1);
1165}
1166
1167void SessionManager::parse_bye(rtcphdr* rh, int flags, u_char* ep, Source* ps)
1168{
1169        int cnt = flags >> 8 & 0x1f;
1170        u_int32_t* p = (u_int32_t*)&rh->rh_ssrc;
1171
1172        while (--cnt >= 0) {
1173                if (p >= (u_int32_t*)ep) {
1174                        ps->badbye(1);
1175                        return;
1176                }
1177                Source* s;
1178                if (ps->srcid() != rh->rh_ssrc)
1179                        s = SourceManager::instance().consult(*p);
1180                else
1181                        s = ps;
1182                if (s != 0)
1183                        s->lts_done(unixtime());
1184                ++p;
1185        }
1186}
1187
1188/*
1189 * Receive an rtcp packet (from the control port).
1190 */
1191void SessionManager::recv(CtrlHandler* ch)
1192{
1193        Address * srcp;
1194        int cc = ch->recv(pktbuf_, 2 * RTP_MTU, srcp);
1195        if (cc <= 0)
1196                return;
1197
1198        rtcphdr* rh = (rtcphdr*)pktbuf_;
[1032]1199
1200    // Ignore loopback packets
1201        if (!loopback_) {
1202                SourceManager& sm = SourceManager::instance();
1203                if (rh->rh_ssrc == (*sm.localsrc()).srcid())
1204                        return;
1205        }
1206
[900]1207        if (cc < int(sizeof(*rh))) {
1208                ++nrunt_;
1209                return;
1210        }
1211        /*
1212         * try to filter out junk: first thing in packet must be
1213         * sr, rr or bye & version number must be correct.
1214         */
1215        switch(ntohs(rh->rh_flags) & 0xc0ff) {
1216        case RTP_VERSION << 14 | RTCP_PT_SR:
1217        case RTP_VERSION << 14 | RTCP_PT_RR:
[4234]1218        case RTP_VERSION << 14 | RTCP_PT_XR:
[900]1219        case RTP_VERSION << 14 | RTCP_PT_BYE:
1220                break;
1221        default:
1222                /*
1223                 * XXX should further categorize this error -- it is
1224                 * likely that people mis-implement applications that
1225                 * don't put something other than SR,RR,BYE first.
1226                 */
1227                ++badversion_;
1228                return;
1229        }
1230        /*
1231         * at this point we think the packet's valid.  Update our average
1232         * size estimator.  Also, there's valid ssrc so charge errors to it
1233         */
1234        rtcp_avg_size_ += RTCP_SIZE_GAIN * (double(cc + 28) - rtcp_avg_size_);
1235        Address & addr = *srcp;
1236
1237        /*
1238         * First record in compount packet must be the ssrc of the
1239         * sender of the packet.  Pull it out here so we can use
1240         * it in the sdes parsing, since the sdes record doesn't
1241         * contain the ssrc of the sender (in the case of mixers).
1242         */
1243        u_int32_t ssrc = rh->rh_ssrc;
1244        Source* ps = SourceManager::instance().lookup(ssrc, ssrc, addr);
1245        if (ps == 0)
1246                return;
[956]1247       
1248        int layer = ch - ch_;
1249                /*
1250                * Outer loop parses multiple RTCP records of a "compound packet".
1251                * There is no framing between records.  Boundaries are implicit
1252                * and the overall length comes from UDP.
1253        */
[900]1254        u_char* epack = (u_char*)rh + cc;
1255        while ((u_char*)rh < epack) {
1256                u_int len = (ntohs(rh->rh_len) << 2) + 4;
1257                u_char* ep = (u_char*)rh + len;
1258                if (ep > epack) {
1259                        ps->badsesslen(1);
1260                        return;
1261                }
1262                u_int flags = ntohs(rh->rh_flags);
1263                if (flags >> 14 != RTP_VERSION) {
1264                        ps->badsessver(1);
1265                        return;
1266                }
1267                switch (flags & 0xff) {
1268
1269                case RTCP_PT_SR:
[956]1270                        parse_sr(rh, flags, ep, ps, addr, layer);
[900]1271                        break;
1272
1273                case RTCP_PT_RR:
[956]1274                        parse_rr(rh, flags, ep, ps, addr, layer);
[900]1275                        break;
1276
[4234]1277                case RTCP_PT_XR:
1278                        parse_xr(rh, flags, ep, ps, addr, layer);
1279                        break;
1280
[900]1281                case RTCP_PT_SDES:
[956]1282                        parse_sdes(rh, flags, ep, ps, addr, ssrc, layer);
[900]1283                        break;
1284
1285                case RTCP_PT_BYE:
1286                        parse_bye(rh, flags, ep, ps);
1287                        break;
1288
1289                default:
1290                        ps->badsessopt(1);
1291                        break;
1292                }
1293                rh = (rtcphdr*)ep;
1294        }
[4234]1295        return;
[900]1296}
Note: See TracBrowser for help on using the browser.