root/vic/branches/cc/rtp/session.cpp @ 4246

Revision 4246, 31.1 KB (checked in by soohyunc, 6 years ago)

o TfwcSndr? recv method (on-going)

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*-
2 * Copyright (c) 1993-1994 The Regents of the University of California.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 * 3. All advertising materials mentioning features or use of this software
14 *    must display the following acknowledgement:
15 *      This product includes software developed by the University of
16 *      California, Berkeley and the Network Research Group at
17 *      Lawrence Berkeley Laboratory.
18 * 4. Neither the name of the University nor of the Laboratory may be used
19 *    to endorse or promote products derived from this software without
20 *    specific prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
23 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
26 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
33 *
34 * $Id$
35 */
36static const char rcsid[] =
37    "@(#) $Header$ (LBL)";
38
39#include "config.h"
40#include <math.h>
41#include <errno.h>
42#include <string.h>
43#ifdef WIN32
44extern "C" int getpid();
45#endif
46#include "source.h"
47#include "vic_tcl.h"
48#include "media-timer.h"
49#include "crypt.h"
50#include "timer.h"
51#include "ntp-time.h"
52#include "session.h"
53#include "cc/tfwc_sndr.h"
54#include "cc/tfwc_rcvr.h"
55
56/* added to support the mbus
57#include "mbus_handler.h"*/
58
59
60static class SessionMatcher : public Matcher {
61    public:
62                SessionMatcher() : Matcher("session") {}
63                TclObject* match(const char* id) {
64                        if (strcmp(id, "audio/rtp") == 0)
65                                return (new AudioSessionManager);
66                        else if (strcmp(id, "video/rtp") == 0)
67                                return (new VideoSessionManager);
68                        return (0);
69                }
70} session_matcher;
71
72int VideoSessionManager::check_format(int fmt) const
73{
74        switch(fmt) {
75                case RTP_PT_RAW:
76                case RTP_PT_CELLB:
77                case RTP_PT_JPEG:
78                case RTP_PT_CUSEEME:
79                case RTP_PT_NV:
80                case RTP_PT_CPV:
81                case RTP_PT_H261:
82                case RTP_PT_BVC:
83                case RTP_PT_H261_COMPAT:/*XXX*/
84                case RTP_PT_H263:
85                case RTP_PT_MPEG4:
86                case RTP_PT_H264:
87                case RTP_PT_H263P:
88                case RTP_PT_LDCT:
89                case RTP_PT_PVH:
90                case RTP_PT_DV:
91#ifdef USE_H261AS
92                case RTP_PT_H261AS:
93#endif 
94                return (1);
95        }
96        return (0);
97}
98
99int AudioSessionManager::check_format(int fmt) const
100{
101        switch (fmt) {
102        case RTP_PT_PCMU:
103        case RTP_PT_CELP:
104        case RTP_PT_GSM:
105        case RTP_PT_DVI:
106        case RTP_PT_LPC:
107                return (1);
108        }
109        return (0);
110}
111
112
113static SessionManager* manager;
114
115void
116adios()
117{
118        if (SourceManager::instance().localsrc() != 0)
119                manager->send_bye();
120        exit(0);
121}
122
123/*void ReportTimer::timeout()
124{
125sm_.send_report();
126}*/
127
128void DataHandler::dispatch(int)
129{
130        sm_->recv(this);
131}
132
133void CtrlHandler::dispatch(int)
134{
135        sm_->recv(this);
136}
137
138CtrlHandler::CtrlHandler()
139: ctrl_inv_bw_(0.),
140ctrl_avg_size_(128.),
141rint_(0.0) //SV-XXX: Debian
142{
143}
144
145inline void CtrlHandler::schedule_timer()
146{
147        msched(int(fmod(double(random()), rint_) + rint_ * .5 + .5));
148}
149
150void CtrlHandler::net(Network* n)
151{
152        DataHandler::net(n);
153        cancel();
154        if (n != 0) {
155        /*
156        * schedule a timer for our first report using half the
157        * min ctrl interval.  This gives us some time before
158        * our first report to learn about other sources so our
159        * next report interval will account for them.  The avg
160        * ctrl size was initialized to 128 bytes which is
161        * conservative (it assumes everyone else is generating
162        * SRs instead of RRs).
163                */
164                double rint = ctrl_avg_size_ * ctrl_inv_bw_;
165                if (rint < CTRL_MIN_RPT_TIME / 2. * 1000.)
166                        rint = CTRL_MIN_RPT_TIME / 2. * 1000.;
167                rint_ = rint;
168                schedule_timer();
169        }
170}
171
172void CtrlHandler::sample_size(int cc)
173{
174        ctrl_avg_size_ += CTRL_SIZE_GAIN * (double(cc + 28) - ctrl_avg_size_);
175}
176
177void CtrlHandler::adapt(int nsrc, int nrr, int we_sent)
178{
179/*
180* compute the time to the next report.  we do this here
181* because we need to know if there were any active sources
182* during the last report period (nrr above) & if we were
183* a source.  The bandwidth limit for ctrl traffic was set
184* on startup from the session bandwidth.  It is the inverse
185* of bandwidth (ie., ms/byte) to avoid a divide below.
186        */
187        double ibw = ctrl_inv_bw_;
188        if (nrr) {
189                /* there were active sources */
190                if (we_sent) {
191                        ibw *= 1./CTRL_SENDER_BW_FRACTION;
192                        nsrc = nrr;
193                } else {
194                        ibw *= 1./CTRL_RECEIVER_BW_FRACTION;
195                        nsrc -= nrr;
196                }
197        }
198        double rint = ctrl_avg_size_ * double(nsrc) * ibw;
199        if (rint < CTRL_MIN_RPT_TIME * 1000.)
200                rint = CTRL_MIN_RPT_TIME * 1000.;
201        rint_ = rint;
202}
203
204void CtrlHandler::timeout()
205{
206        sm_->announce(this);
207        schedule_timer();
208}
209
210//SV-XXX: rearranged initialization order to shut up gcc4
211SessionManager::SessionManager()
212//      : dh_(*this), ch_(*this), rt_(*this),
213: mb_(mbus_handler_engine, NULL),
214lipSyncEnabled_(0),
215badversion_(0),
216badoptions_(0),
217badfmt_(0),
218badext_(0),
219nrunt_(0),
220last_np_(0),
221sdes_seq_(0),
222rtcp_inv_bw_(0.),
223rtcp_avg_size_(128.),
224confid_(-1),
225seqno_(0),              // RTP packet sequence number (from RTP header)
226lastseq_(0),    // last received packet's seqno
227ackvec_(0)              // bit vector (AckVec)
228{
229        /*XXX For adios() to send bye*/
230        manager = this;
231       
232        for (int i = 0; i < NLAYER; ++i) {
233                dh_[i].manager(this);
234                ch_[i].manager(this);
235        }
236       
237        /*XXX*/
238        pktbuf_ = new u_char[2 * RTP_MTU];
239        pool_ = new BufferPool;
240       
241        /*
242        * schedule a timer for our first report using half the
243        * min rtcp interval.  This gives us some time before
244        * our first report to learn about other sources so our
245        * next report interval will account for them.  The avg
246        * rtcp size was initialized to 128 bytes which is
247        * conservative (it assumes everyone else is generating
248        * SRs instead of RRs).
249        */
250        double rint = rtcp_avg_size_ * rtcp_inv_bw_;
251        if (rint < RTCP_MIN_RPT_TIME / 2. * 1000.)
252                rint = RTCP_MIN_RPT_TIME / 2. * 1000.;
253        rint_ = rint;
254        //rt_.msched(int(fmod(double(random()), rint) + rint * .5 + .5));
255}
256
257SessionManager::~SessionManager()
258{
259        if (pktbuf_)
260                delete[] pktbuf_;
261       
262        delete pool_;
263}
264
265u_int32_t SessionManager::alloc_srcid(Address & addr) const
266{
267        timeval tv;
268        ::gettimeofday(&tv, 0);
269        u_int32_t srcid = u_int32_t(tv.tv_sec + tv.tv_usec);
270        srcid += (u_int32_t)getuid();
271        srcid += (u_int32_t)getpid();
272/* __IPv6 changed srcid computation */
273        for(unsigned int i = 0; i < (addr.length() % sizeof(u_int32_t)); i++) {
274                srcid += ((u_int32_t*)((const void*)addr))[i];
275        }
276        return (srcid);
277}
278
279extern char* onestat(char* cp, const char* name, u_long v);
280
281char* SessionManager::stats(char* cp) const
282{
283        cp = onestat(cp, "Bad-RTP-version", badversion_);
284        cp = onestat(cp, "Bad-RTPv1-options", badoptions_);
285        cp = onestat(cp, "Bad-Payload-Format", badfmt_);
286        cp = onestat(cp, "Bad-RTP-Extension", badext_);
287        cp = onestat(cp, "Runts", nrunt_);
288        Crypt* p = dh_[0].net()->crypt();
289        if (p != 0) {
290                cp = onestat(cp, "Crypt-Bad-Length", p->badpktlen());
291                cp = onestat(cp, "Crypt-Bad-P-Bit", p->badpbit());
292        }
293        /*XXX*/
294        if (ch_[0].net() != 0) {
295                Crypt* p = ch_[0].net()->crypt();
296                if (p != 0) {
297                        cp = onestat(cp, "Crypt-Ctrl-Bad-Length", p->badpktlen());
298                        cp = onestat(cp, "Crypt-Ctrl-Bad-P-Bit", p->badpbit());
299                }
300        }
301        *--cp = 0;
302        return (cp);
303}
304
305int SessionManager::command(int argc, const char*const* argv)
306{
307        Tcl& tcl = Tcl::instance();
308        char* cp = tcl.buffer();
309        if (argc == 2) {
310                if (strcmp(argv[1], "active") == 0) {
311                        SourceManager::instance().sortactive(cp);
312                        tcl.result(cp);
313                        return (TCL_OK);
314                }
315                if (strcmp(argv[1], "local-addr-heuristic") == 0) {
316                        strcpy(cp, intoa(LookupLocalAddr()));
317                        tcl.result(cp);
318                        return (TCL_OK);
319                }
320                if (strcmp(argv[1], "stats") == 0) {
321                        stats(cp);
322                        tcl.result(cp);
323                        return (TCL_OK);
324                }
325                if (strcmp(argv[1], "nb") == 0) {
326                        sprintf(cp, "%u", 8 * nb_);
327                        tcl.result(cp);
328                        return (TCL_OK);
329                }
330                if (strcmp(argv[1], "nf") == 0) {
331                        sprintf(cp, "%u", nf_);
332                        tcl.result(cp);
333                        return (TCL_OK);
334                }
335                if (strcmp(argv[1], "np") == 0 ||
336                    strcmp(argv[1], "ns") == 0) {
337                        sprintf(cp, "%u", np_);
338                        tcl.result(cp);
339                        return (TCL_OK);
340                }
341                if (strcmp(argv[1], "lip-sync") == 0) {
342                        sprintf(cp, "%u", lipSyncEnabled_);
343                        tcl.result(cp);
344                        return (TCL_OK);
345                }
346
347        } else if (argc == 3) {
348                if (strcmp(argv[1], "sm") == 0) {
349                        sm_ = (SourceManager*)TclObject::lookup(argv[2]);
350                        return (TCL_OK);
351                }
352                if (strcmp(argv[1], "name") == 0) {
353                        Source* s = SourceManager::instance().localsrc();
354                        s->sdes(RTCP_SDES_NAME, argv[2]);
355                        return (TCL_OK);
356                }
357                if (strcmp(argv[1], "email") == 0) {
358                        Source* s = SourceManager::instance().localsrc();
359                        s->sdes(RTCP_SDES_EMAIL, argv[2]);
360                        return (TCL_OK);
361                }
362                if (strcmp(argv[1], "random-srcid") == 0) {
363                        Address * addrp;
364                        if ((addrp = (dh_[0].net())->alloc(argv[2])) !=0 ) { //SV-XXX: placed ()'s against truth check == NULL
365                          sprintf(cp, "%u", alloc_srcid(*addrp));
366                          delete addrp;
367                        }
368                        tcl.result(cp);
369                        return (TCL_OK);
370                }
371                if (strcmp(argv[1], "data-net") == 0) {
372                        dh_[0].net((Network*)TclObject::lookup(argv[2]));
373                        return (TCL_OK);
374                }
375                if (strcmp(argv[1], "ctrl-net") == 0) {
376                        ch_[0].net((Network*)TclObject::lookup(argv[2]));
377                        return (TCL_OK);
378                }
379                if (strcmp(argv[1], "max-bandwidth") == 0) {
380                        double bw = atof(argv[2]) / 8.;
381                        rtcp_inv_bw_ = 1. / (bw * RTCP_SESSION_BW_FRACTION);
382                        return (TCL_OK);
383                }
384                if (strcmp(argv[1], "confid") == 0) {
385                        confid_ = atoi(argv[2]);
386                        return (TCL_OK);
387                }
388                if (strcmp(argv[1], "data-bandwidth") == 0) {
389                        /*XXX assume value in range */
390                        bps(atoi(argv[2]));
391                        return (TCL_OK);
392                }
393                if (strcmp(argv[1], "mtu") == 0) {
394                        mtu_ = atoi(argv[2]);
395                        return (TCL_OK);
396                }
397                if (strcmp(argv[1], "loopback") == 0) {
398                        loopback_ = atoi(argv[2]);
399                        return (TCL_OK);
400                }
401                if (strcmp(argv[1], "lip-sync") == 0) {
402                        lipSyncEnabled_ = atoi(argv[2]);
403                        return (TCL_OK);
404                }
405
406        }  else if (argc == 4) {
407                if (strcmp(argv[1], "data-net") == 0) {
408                        u_int layer = atoi(argv[3]);
409                        if (layer >= NLAYER)
410                                abort();
411                        if (*argv[2] == 0) {
412                                dh_[layer].net(0);
413                                return (TCL_OK);
414                        }
415                        Network* net = (Network*)TclObject::lookup(argv[2]);
416                        if (net == 0) {
417                                tcl.resultf("no network %s", argv[2]);
418                                return (TCL_ERROR);
419                        }
420                        if (net->rchannel() < 0) {
421                                tcl.resultf("network not open");
422                                return (TCL_ERROR);
423                        }
424                        dh_[layer].net(net);
425                        return (TCL_OK);
426                }
427                if (strcmp(argv[1], "ctrl-net") == 0) {
428                        u_int layer = atoi(argv[3]);
429                        if (layer >= NLAYER)
430                                abort();
431                        if (*argv[2] == 0) {
432                                ch_[layer].net(0);
433                                return (TCL_OK);
434                        }
435                        Network* net = (Network*)TclObject::lookup(argv[2]);
436                        if (net == 0) {
437                                tcl.resultf("no network %s", argv[2]);
438                                return (TCL_ERROR);
439                        }
440                        if (net->rchannel() < 0) {
441                                tcl.resultf("network not open");
442                                return (TCL_ERROR);
443                        }
444                        ch_[layer].net(net);
445                        return (TCL_OK);
446                }
447                return (Transmitter::command(argc, argv));
448        }
449        //should not be reached
450        return (TCL_ERROR);
451}
452
453void SessionManager::transmit(pktbuf* pb)
454{
455        //mh_.msg_iov = pb->iov;
456        //      dh_[.net()->send(mh_);
457                //debug_msg("L %d,",pb->layer);
458        // Using loop_layer for now to restrict transmission as well
459        if (pb->layer < loop_layer_) {
460        //      if ( pb->layer <0 ) exit(1);
461                Network* n = dh_[pb->layer].net();
462                if (n != 0)
463                        n->send(pb);
464        }
465}
466
467u_char* SessionManager::build_sdes_item(u_char* p, int code, Source& s)
468{
469        const char* value = s.sdes(code);
470        if (value != 0) {
471                int len = strlen(value);
472                *p++ = code;
473                *p++ = len;
474                memcpy(p, value, len);
475                p += len;
476        }
477        return (p);
478}
479
480int SessionManager::build_sdes(rtcphdr* rh, Source& ls)
481{
482        int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_SDES;
483        rh->rh_flags = htons(flags);
484        rh->rh_ssrc = ls.srcid();
485        u_char* p = (u_char*)(rh + 1);
486        p = build_sdes_item(p, RTCP_SDES_CNAME, ls);
487
488        /*
489         * We always send a cname plus one other sdes
490         * There's a schedule for what we send sequenced by sdes_seq_:
491         *   - send 'email' every 0th & 4th packet
492         *   - send 'note' every 2nd packet
493         *   - send 'tool' every 6th packet
494         *   - send 'name' in all the odd slots
495         * (if 'note' is not the empty string, we switch the roles
496         *  of name & note)
497         */
498        int nameslot, noteslot;
499        const char* note = ls.sdes(RTCP_SDES_NOTE);
500        if (note) {
501                if (*note) {
502                        nameslot = RTCP_SDES_NOTE;
503                        noteslot = RTCP_SDES_NAME;
504                } else {
505                        nameslot = RTCP_SDES_NAME;
506                        noteslot = RTCP_SDES_NOTE;
507                }
508        } else {
509                nameslot = RTCP_SDES_NAME;
510                noteslot = RTCP_SDES_NAME;
511        }
512        u_int seq = (++sdes_seq_) & 0x7;
513        switch (seq) {
514
515        case 0case 4:
516                p = build_sdes_item(p, RTCP_SDES_EMAIL, ls);
517                break;
518
519        case 2:
520                p = build_sdes_item(p, noteslot, ls);
521                break;
522        case 6:
523                p = build_sdes_item(p, RTCP_SDES_TOOL, ls);
524                break;
525        default:
526                p = build_sdes_item(p, nameslot, ls);
527        }
528        int len = p - (u_char*)rh;
529        int pad = 4 - (len & 3);
530        len += pad;
531        rh->rh_len = htons((len >> 2) - 1);
532        while (--pad >= 0)
533                *p++ = 0;
534
535        return (len);
536}
537
538int SessionManager::build_app(rtcphdr* rh, Source& ls, const char *name,
539                void *data, int datalen)
540{
541  int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_APP;
542  rh->rh_flags = htons(flags);
543  rh->rh_ssrc = ls.srcid();
544  u_char* p = (u_char*)(rh + 1);
545    int len;
546   
547    // write the name field
548    len = strlen(name);
549    if( len < 4 ) {
550        memcpy(p,name,len);
551        p += len;
552       
553        // pad short names
554        while( p - (u_char*)(rh+1) < 4 )
555            *p++ = 0;
556    }
557    else {
558        // use first four chars of given name
559        memcpy(p,name,4);
560        p += 4;
561    }
562   
563    // write the app data
564    memcpy(p,data,datalen);
565    p += datalen;
566   
567    // pad as needed
568    len = p - (u_char*)rh;
569    int pad = 4 - (len & 3);
570    while( --pad >= 0 )
571        *p++ = 0;
572  len = p - (u_char*)rh;
573
574    // set the length now that it's known
575  rh->rh_len = htons((len >> 2) - 1);
576
577  return (len);
578}
579
580/*void SessionManager::send_report()
581{
582        send_report(0);
583}
584
585  void SessionManager::send_bye()
586  {
587  send_report(1);
588}*/
589
590// SessionManager is no longer used as Timer - Each
591// CtrlHandler has its own Timer which calls this;
592void SessionManager::announce(CtrlHandler* ch)
593{
594        send_report(ch, 0);
595}
596
597/*XXX check for buffer overflow*/
598/*
599* Send an RTPv2 report packet.
600*/
601void SessionManager::send_report(CtrlHandler* ch, int bye, int app)
602{
603        UNUSED(app); //SV-XXX: unused
604
605        SourceManager& sm = SourceManager::instance();
606        Source& s = *sm.localsrc();
607        rtcphdr* rh = (rtcphdr*)pktbuf_;
608        rh->rh_ssrc = s.srcid();
609        int flags = RTP_VERSION << 14;
610        int layer = ch - ch_; //LLL
611        Source::Layer& sl = s.layer(layer);
612        timeval now = unixtime();
613        sl.lts_ctrl(now);
614        int we_sent = 0;
615        rtcp_rr* rr;
616        rtcp_xr* xr;    // extended report
617        Tcl& tcl = Tcl::instance();
618
619        /*
620         * If we've sent data since our last sender report send a
621         * new report.  The MediaTimer check is to make sure we still
622         * have a grabber -- if not, we won't be able to interpret the
623         * media timestamps so there's no point in sending an SR.
624         */
625        MediaTimer* mt = MediaTimer::instance();
626        if (sl.np() != last_np_ && mt) {
627                last_np_ = sl.np();
628                we_sent = 1;
629                flags |= RTCP_PT_SR;
630                rtcp_sr* sr = (rtcp_sr*)(rh + 1);
631                sr->sr_ntp = ntp64time(now);
632                HTONL(sr->sr_ntp.upper);
633                HTONL(sr->sr_ntp.lower);
634                sr->sr_ts = htonl(mt->ref_ts());
635                sr->sr_np = htonl(sl.np());
636                sr->sr_nb = htonl(sl.nb());
637                rr = (rtcp_rr*)(sr + 1);
638        } else {
639                flags |= RTCP_PT_RR;
640                rr = (rtcp_rr*)(rh + 1);
641        }
642
643        // if CC is turned on, we need XR report
644        if (is_cc_on()) {
645                flags |= RTCP_PT_XR;            // setting flags to XR
646                xr = (rtcp_xr*)(rh + 1);        // extended report
647                xr->xr_begin_seq = lastseq_;// this will be used for ackofack
648                xr->xr_end_seq = seqno_ + 1;// as defined in RFC3611 section 4.1
649                xr->xr_ackvec = get_ackvec();   // ackvec
650        }
651
652        int nrr = 0;
653        int nsrc = 0;
654        /*
655        * we don't want to inflate report interval if user has set
656        * the flag that causes all sources to be 'kept' so we
657        * consider sources 'inactive' if we haven't heard a control
658        * msg from them for ~32 reporting intervals.
659        */
660        //LLL   u_int inactive = u_int(rint_ * (32./1000.));
661        u_int inactive = u_int(ch->rint() * (32./1000.));
662        if (inactive < 2)
663                inactive = 2;
664        for (Source* sp = sm.sources(); sp != 0; sp = sp->next_) {
665                ++nsrc;
666                Source::Layer& sl = sp->layer(layer);
667                //              int received = sp->np() - sp->snp();
668                int received = sl.np() - sl.snp();
669                if (received == 0) {
670                        //              if (u_int(now.tv_sec - sp->lts_ctrl().tv_sec) > inactive)
671                        if (u_int(now.tv_sec - sl.lts_ctrl().tv_sec) > inactive)
672                                --nsrc;
673                        continue;
674                }
675                //              sp->snp(sp->np());
676                sl.snp(sl.np());
677                rr->rr_srcid = sp->srcid();
678                //              int expected = sp->ns() - sp->sns();
679                int expected = sl.ns() - sl.sns();
680                //              sp->sns(sp->ns());
681                sl.sns(sl.ns());
682                u_int32_t v;
683                int lost = expected - received;
684                if (lost <= 0)
685                        v = 0;
686                else
687                        /* expected != 0 if lost > 0 */
688                        v = ((lost << 8) / expected) << 24;
689                /* XXX should saturate on over/underflow */
690                //              v |= (sp->ns() - sp->np()) & 0xffffff;
691                v |= (sl.ns() - sl.np()) & 0xffffff;
692                rr->rr_loss = htonl(v);
693                //              rr->rr_ehsr = htonl(sp->ehs());
694                rr->rr_ehsr = htonl(sl.ehs());
695                rr->rr_dv = (sp->handler() != 0) ? sp->handler()->delvar() : 0;
696                //              rr->rr_lsr = htonl(sp->sts_ctrl());
697                rr->rr_lsr = htonl(sl.sts_ctrl());
698                //              if (sp->lts_ctrl().tv_sec == 0)
699                if (sl.lts_ctrl().tv_sec == 0)
700                        rr->rr_dlsr = 0;
701                else {
702                        u_int32_t ntp_now = ntptime(now);
703                        //                      u_int32_t ntp_then = ntptime(sp->lts_ctrl());
704                        u_int32_t ntp_then = ntptime(sl.lts_ctrl());
705                        rr->rr_dlsr = htonl(ntp_now - ntp_then);
706                }
707                ++rr;
708                if (++nrr >= 31)
709                        break;
710        }
711        flags |= nrr << 8;
712        rh->rh_flags = htons(flags);
713        int len = (u_char*)rr - pktbuf_;
714        rh->rh_len = htons((len >> 2) - 1);
715
716        if (bye)
717                len += build_bye((rtcphdr*)rr, s);
718        else
719                len += build_sdes((rtcphdr*)rr, s);
720       
721        // build "site" app data if specified
722        const char *data = tcl.attr("site");
723        if(data)
724        {
725            rr = (rtcp_rr*)(((u_char*)rh) + len);
726            len += build_app((rtcphdr*)rr, s, "site", (void *)data, strlen(data));
727        }
728        //LLL   ch_.send(pktbuf_, len);
729        ch->send(pktbuf_, len);
730       
731        /*
732      rtcp_avg_size_ += RTCP_SIZE_GAIN * (double(len + 28) - rtcp_avg_size_);
733         
734          // compute the time to the next report.  we do this here
735          // because we need to know if there were any active sources
736          // during the last report period (nrr above) & if we were
737          // a source.  The bandwidth limit for rtcp traffic was set
738          // on startup from the session bandwidth.  It is the inverse
739          // of bandwidth (ie., ms/byte) to avoid a divide below.
740       
741        //      double ibw = rtcp_inv_bw_;
742        if (nrr) {
743        // there were active sources
744        //              if (we_sent) {
745        ibw *= 1./RTCP_SENDER_BW_FRACTION;
746        nsrc = nrr;
747        } else {
748        ibw *= 1./RTCP_RECEIVER_BW_FRACTION;
749        nsrc -= nrr;
750        }
751        }
752        double rint = rtcp_avg_size_ * double(nsrc) * ibw;
753        if (rint < RTCP_MIN_RPT_TIME * 1000.)
754                rint = RTCP_MIN_RPT_TIME * 1000.;
755        rint_ = rint;
756        rt_.msched(int(fmod(double(random()), rint) + rint * .5 + .5));
757        */
758       
759        // Call timer adaption for each layer
760        ch->adapt(nsrc, nrr, we_sent);
761        ch->sample_size(len);
762       
763        //      sm.CheckActiveSources(rint);
764        if (layer == 0)
765                sm.CheckActiveSources(ch->rint());
766       
767}
768
769int SessionManager::build_bye(rtcphdr* rh, Source& ls)
770{
771        int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_BYE;
772        rh->rh_flags = ntohs(flags);
773        rh->rh_len = htons(1);
774        rh->rh_ssrc = ls.srcid();
775        return (8);
776}
777
778/*
779 * receive an RTP data packet
780 */
781void SessionManager::recv(DataHandler* dh)
782{
783        int layer = dh - dh_;
784        pktbuf* pb = pool_->alloc(layer);
785        Address * addrp;
786        /* leave room in case we need to expand rtpv1 into an rtpv2 header */
787        /* XXX the free mem routine didn't like it ... */
788        //u_char* bp = &pktbuf_[4];
789        //u_char* bp = pktbuf_;
790       
791        int cc = dh->recv(pb->data, sizeof(pb->data), addrp);
792        //int cc = dh->recv(bp, 2 * RTP_MTU - 4, addrp);
793        if (cc <= 0) {
794                pb->release();
795                return;
796        }
797
798        rtphdr* rh = (rtphdr*)pb->data;
799        seqno_ = ntohs(rh->rh_seqno);   // get received packet seqno
800
801    // Ignore loopback packets
802        if (!loopback_) {
803                //rtphdr* rh = (rtphdr*)pb->data;
804                SourceManager& sm = SourceManager::instance();
805                if (rh->rh_ssrc == (*sm.localsrc()).srcid()) {
806                        debug_msg("(loopback) seqno:    %d\n", seqno_);
807                        pb->release();  // releasing loopback packet
808                        return;
809                }
810        } // now, loopback packets ignored (if disabled)
811
812        // set received seqno - passing seqno to TfwcRcvr
813        set_received_seqno(seqno_, lastseq_);
814        lastseq_ = seqno_;      // set last seqno
815
816        int version = pb->data[0] >> 6;
817        //int version = *(u_char*)rh >> 6;
818        if (version != 2) {
819                ++badversion_;
820                pb->release();
821                return;
822        }
823        if (cc < (int)sizeof(rtphdr)) {
824                ++nrunt_;
825                pb->release();
826                return;
827        }
828        pb->len = cc;
829       
830        //bp += sizeof(*rh);
831        //cc -= sizeof(*rh);
832        demux(pb, *addrp);
833}
834
835void SessionManager::demux(pktbuf* pb, Address & addr)
836{
837        rtphdr* rh = (rtphdr*)pb->data;
838        u_int32_t srcid = rh->rh_ssrc;
839        int flags = ntohs(rh->rh_flags);
840        // for LIP SYNC
841        //SV-XXX: unused: u_char *pkt = pb->data - sizeof(*rh);
842
843        if ((flags & RTP_X) != 0) {
844        /*
845        * the minimal-control audio/video profile
846        * explicitly forbids extensions
847                */
848                ++badext_;
849                pb->release();
850                return;
851        }
852
853        /*
854         * Check for illegal payload types.  Most likely this is
855         * a session packet arriving on the data port.
856         */
857        int fmt = flags & 0x7f;
858        if (!check_format(fmt)) {
859                ++badfmt_;
860                pb->release();
861                return;
862        }
863
864        SourceManager& sm = SourceManager::instance();
865        u_int16_t seqno = ntohs(rh->rh_seqno);
866        Source* s = sm.demux(srcid, addr, seqno, pb->layer);
867        if (s == 0) {
868        /*
869        * Takes a pair of validated packets before we will
870        * believe the source.  This prevents a runaway
871        * allocation of Source data structures for a
872        * stream of garbage packets.
873                */
874                pb->release();
875                return;
876        }
877        /* inform this source of the mbus */
878        s->mbus(&mb_);
879       
880        Source::Layer& sl = s->layer(pb->layer);
881        timeval now = unixtime();
882        //      s->lts_data(now);
883        sl.lts_data(now);
884        //      s->sts_data(rh->rh_ts);
885        sl.sts_data(rh->rh_ts);
886       
887        // extract CSRC count (CC field); increment pb->dp data pointer & adjust length accordingly
888        int cnt = (flags >> 8) & 0xf;
889        if (cnt > 0) {
890                //u_char* nh = (u_char*)rh + (cnt << 2);
891                rtphdr hdr = *rh;
892                pb->dp += (cnt << 2);
893                pb->len -= (cnt << 2);
894                u_int32_t* bp = (u_int32_t*)(rh + 1);
895                while (--cnt >= 0) {
896                        u_int32_t csrc = *(u_int32_t*)bp;
897                        bp += 4;
898                        Source* cs = sm.lookup(csrc, srcid, addr);
899                        //                      cs->lts_data(now);
900                        cs->layer(pb->layer).lts_data(now);
901                        cs->action();
902                }
903                //              rtphdr hdr = *rh;
904                //              rh = (rtphdr*)nh;
905                /*XXX move header up so it's contiguous with data*/
906                rh = (rtphdr*)pb->dp;
907                *rh = hdr;
908        } else
909                s->action();
910
911        if (s->sync() && lipSyncEnabled()) { 
912                /*
913                 * Synchronisation is enabled on this source;
914                 * video packets have to
915                 * be buffered, their playout point scheduled, and the
916                 * playout delays communicated with the audio tool ...
917                 */ 
918
919                /*
920                * XXX bit rate doesn't include rtpv1 options;
921                * but v1 is going away anyway.
922                */
923                //              int dup = s->cs(seqno);
924                //              s->np(1);
925                //              s->nb(cc + sizeof(*rh));
926                int dup = sl.cs(seqno, s);
927                sl.np(1);
928                sl.nb(pb->len);
929                if (dup) {
930                        pb->release();
931                        return;
932                }
933                if (flags & RTP_M) // check if reach frame boundaries
934                        //                      s->nf(1);
935                        sl.nf(1);
936               
937                //s->recv(pkt, rh, pb, pb->len); // this should invoke Source::recv and buffer
938                //s->recv(bp); // this should invoke Source::recv and buffer
939               
940                pktbuf_ = (u_char*)new_blk();
941        } /* synced */
942
943        else { /* ... playout video packets as they arrive */
944                /*
945                 * This is a data packet.  If the source needs activation,
946                 * or the packet format has changed, deal with this.
947                 * Then, hand the packet off to the packet handler.
948                 * XXX might want to be careful about flip-flopping
949                 * here when format changes due to misordered packets
950                 * (easy solution -- keep rtp seqno of last fmt change).
951                 */
952                PacketHandler* h = s->handler();
953                if (h == 0)
954                        h = s->activate(fmt);
955                else if (s->format() != fmt)
956                        h = s->change_format(fmt);
957               
958                        /*
959                        * XXX bit rate doesn't include rtpv1 options;
960                        * but v1 is going away anyway.
961                */
962                //              int dup = s->cs(seqno);
963                //              s->np(1);
964                //              s->nb(cc + sizeof(*rh));
965                int dup = sl.cs(seqno, s);
966                sl.np(1);
967                sl.nb(pb->len);
968                if (dup){
969                        pb->release();
970                        return;
971                }
972                if (flags & RTP_M)
973                        //                  s->nf(1);
974                        sl.nf(1);
975#ifdef notdef
976        /* This should move to the handler */
977        /*XXX could get rid of hdrlen and move run check into recv method*/
978               
979                int hlen = h->hdrlen();
980                cc -= hlen;
981                if (cc < 0) {
982                        //                  s->runt(1);
983                        sl.runt(1);
984                        pb->release();
985                        return;
986                }
987#endif
988                if (s->mute()) {
989                        pb->release();
990                        return;
991                }
992                //h->recv(rh, bp + hlen, cc);
993                h->recv(pb);
994        } /* not sync-ed */
995
996}
997
998void SessionManager::parse_rr_records(u_int32_t, rtcp_rr*, int,
999                                      const u_char*, Address &)
1000{
1001}
1002                                     
1003
1004void SessionManager::parse_sr(rtcphdr* rh, int flags, u_char*ep,
1005                                                          Source* ps, Address & addr, int layer)
1006{
1007        rtcp_sr* sr = (rtcp_sr*)(rh + 1);
1008        Source* s;
1009        u_int32_t ssrc = rh->rh_ssrc;
1010        if (ps->srcid() != ssrc)
1011                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
1012        else
1013                s = ps;
1014       
1015        Source::Layer& sl = s->layer(layer);
1016       
1017        timeval now = unixtime();
1018
1019        sl.lts_ctrl(now);
1020        sl.sts_ctrl(ntohl(sr->sr_ntp.upper) << 16 |
1021                ntohl(sr->sr_ntp.lower) >> 16);
1022       
1023        //int cnt = flags >> 8 & 0x1f;
1024        //parse_rr_records(ssrc, (rtcp_rr*)(sr + 1), cnt, ep, addr);
1025       
1026        /*s->lts_ctrl(now);
1027        s->sts_ctrl(ntohl(sr->sr_ntp.upper) << 16 |
1028        ntohl(sr->sr_ntp.lower) >> 16);*/
1029       
1030        s->rtp_ctrl(ntohl(sr->sr_ts));
1031        u_int32_t t = ntptime(now);
1032        s->map_ntp_time(t);
1033        s->map_rtp_time(s->convert_time(t));
1034        s->rtp2ntp(1);
1035        //printf("Got SR\n");
1036
1037        int cnt = flags >> 8 & 0x1f;
1038        parse_rr_records(ssrc, (rtcp_rr*)(sr + 1), cnt, ep, addr);
1039}
1040
1041void SessionManager::parse_rr(rtcphdr* rh, int flags, u_char* ep,
1042                                                          Source* ps, Address & addr, int layer)
1043{
1044        Source* s;
1045        u_int32_t ssrc = rh->rh_ssrc;
1046        if (ps->srcid() != ssrc)
1047                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
1048        else
1049                s = ps;
1050       
1051        s->layer(layer).lts_ctrl(unixtime());
1052        int cnt = flags >> 8 & 0x1f;
1053        parse_rr_records(ssrc, (rtcp_rr*)(rh + 1), cnt, ep, addr);
1054}
1055
1056void SessionManager::parse_xr(rtcphdr* rh, int flags, u_char* ep,
1057                                                          Source* ps, Address & addr, int layer)
1058{
1059
1060        Source* s;
1061        u_int32_t ssrc = rh->rh_ssrc;
1062        if (ps->srcid() != ssrc)
1063                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
1064        else
1065                s = ps;
1066
1067        s->layer(layer).lts_ctrl(unixtime());
1068        int cnt = flags >> 8 & 0x1f;
1069        parse_xr_records(ssrc, (rtcp_xr*)(rh + 1), cnt, ep, addr);
1070}
1071
1072void SessionManager::parse_xr_records(u_int32_t ssrc, rtcp_xr* r, int cnt,
1073                                      const u_char* ep, Address & addr)
1074{
1075        debug_msg("XXX parse_xr_records\n");
1076        UNUSED(cnt);
1077        UNUSED(ep);
1078        UNUSED(addr);
1079
1080        /*
1081         * if AoA is received, then first trim ackvec and send a new ackvec
1082         * if AckVec is received, then parse it to TfwcSndr
1083         */
1084        ackvec_ = r->xr_ackvec;
1085        ackofack_ = r->xr_begin_seq;
1086        tfwc_sndr_recv(ackvec_);        // parse AckVec
1087}
1088
1089int SessionManager::sdesbody(u_int32_t* p, u_char* ep, Source* ps,
1090                                                Address & addr, u_int32_t ssrc, int layer)
1091{
1092        Source* s;
1093        u_int32_t srcid = *p;
1094        if (ps->srcid() != srcid)
1095                s = SourceManager::instance().lookup(srcid, ssrc, addr);
1096        else
1097                s = ps;
1098        if (s == 0)
1099                return (0);
1100                /*
1101                * Note ctrl packet since we will never see any direct ctrl packets
1102                * from a source through a mixer (and we don't want the source to
1103                * time out).
1104        */
1105        s->layer(layer).lts_ctrl(unixtime());
1106       
1107        u_char* cp = (u_char*)(p + 1);
1108        while (cp < ep) {
1109                char buf[256];
1110
1111                u_int type = cp[0];
1112                if (type == 0) {
1113                        /* end of chunk */
1114                        return (((cp - (u_char*)p) >> 2) + 1);
1115                }
1116                u_int len = cp[1];
1117                u_char* eopt = cp + len + 2;
1118                if (eopt > ep)
1119                        return (0);
1120
1121                if (type >= RTCP_SDES_MIN && type <= RTCP_SDES_MAX) {
1122                        memcpy(buf, (char*)&cp[2], len);
1123                        buf[len] = 0;
1124                        s->sdes(type, buf);
1125                } // else
1126                        /*XXX*/;
1127
1128                cp = eopt;
1129        }
1130        return (0);
1131}
1132
1133void SessionManager::parse_sdes(rtcphdr* rh, int flags, u_char* ep, Source* ps,
1134                                                                Address & addr, u_int32_t ssrc, int layer)
1135{
1136        int cnt = flags >> 8 & 0x1f;
1137        u_int32_t* p = (u_int32_t*)&rh->rh_ssrc;
1138        while (--cnt >= 0 && (u_char*)p < ep) {
1139                int n = sdesbody(p, ep, ps, addr, ssrc, layer);
1140                if (n == 0)
1141                        break;
1142                p += n;
1143        }
1144        if (cnt >= 0)
1145                ps->badsdes(1);
1146}
1147
1148void SessionManager::parse_bye(rtcphdr* rh, int flags, u_char* ep, Source* ps)
1149{
1150        int cnt = flags >> 8 & 0x1f;
1151        u_int32_t* p = (u_int32_t*)&rh->rh_ssrc;
1152
1153        while (--cnt >= 0) {
1154                if (p >= (u_int32_t*)ep) {
1155                        ps->badbye(1);
1156                        return;
1157                }
1158                Source* s;
1159                if (ps->srcid() != rh->rh_ssrc)
1160                        s = SourceManager::instance().consult(*p);
1161                else
1162                        s = ps;
1163                if (s != 0)
1164                        s->lts_done(unixtime());
1165                ++p;
1166        }
1167}
1168
1169/*
1170 * Receive an rtcp packet (from the control port).
1171 */
1172void SessionManager::recv(CtrlHandler* ch)
1173{
1174        Address * srcp;
1175        int cc = ch->recv(pktbuf_, 2 * RTP_MTU, srcp);
1176        if (cc <= 0)
1177                return;
1178
1179        rtcphdr* rh = (rtcphdr*)pktbuf_;
1180
1181    // Ignore loopback packets
1182        if (!loopback_) {
1183                SourceManager& sm = SourceManager::instance();
1184                if (rh->rh_ssrc == (*sm.localsrc()).srcid())
1185                        return;
1186        }
1187
1188        if (cc < int(sizeof(*rh))) {
1189                ++nrunt_;
1190                return;
1191        }
1192        /*
1193         * try to filter out junk: first thing in packet must be
1194         * sr, rr or bye & version number must be correct.
1195         */
1196        switch(ntohs(rh->rh_flags) & 0xc0ff) {
1197        case RTP_VERSION << 14 | RTCP_PT_SR:
1198        case RTP_VERSION << 14 | RTCP_PT_RR:
1199        case RTP_VERSION << 14 | RTCP_PT_XR:
1200        case RTP_VERSION << 14 | RTCP_PT_BYE:
1201                break;
1202        default:
1203                /*
1204                 * XXX should further categorize this error -- it is
1205                 * likely that people mis-implement applications that
1206                 * don't put something other than SR,RR,BYE first.
1207                 */
1208                ++badversion_;
1209                return;
1210        }
1211        /*
1212         * at this point we think the packet's valid.  Update our average
1213         * size estimator.  Also, there's valid ssrc so charge errors to it
1214         */
1215        rtcp_avg_size_ += RTCP_SIZE_GAIN * (double(cc + 28) - rtcp_avg_size_);
1216        Address & addr = *srcp;
1217
1218        /*
1219         * First record in compount packet must be the ssrc of the
1220         * sender of the packet.  Pull it out here so we can use
1221         * it in the sdes parsing, since the sdes record doesn't
1222         * contain the ssrc of the sender (in the case of mixers).
1223         */
1224        u_int32_t ssrc = rh->rh_ssrc;
1225        Source* ps = SourceManager::instance().lookup(ssrc, ssrc, addr);
1226        if (ps == 0)
1227                return;
1228       
1229        int layer = ch - ch_;
1230                /*
1231                * Outer loop parses multiple RTCP records of a "compound packet".
1232                * There is no framing between records.  Boundaries are implicit
1233                * and the overall length comes from UDP.
1234        */
1235        u_char* epack = (u_char*)rh + cc;
1236        while ((u_char*)rh < epack) {
1237                u_int len = (ntohs(rh->rh_len) << 2) + 4;
1238                u_char* ep = (u_char*)rh + len;
1239                if (ep > epack) {
1240                        ps->badsesslen(1);
1241                        return;
1242                }
1243                u_int flags = ntohs(rh->rh_flags);
1244                if (flags >> 14 != RTP_VERSION) {
1245                        ps->badsessver(1);
1246                        return;
1247                }
1248                switch (flags & 0xff) {
1249
1250                case RTCP_PT_SR:
1251                        parse_sr(rh, flags, ep, ps, addr, layer);
1252                        break;
1253
1254                case RTCP_PT_RR:
1255                        parse_rr(rh, flags, ep, ps, addr, layer);
1256                        break;
1257
1258                case RTCP_PT_XR:
1259                        parse_xr(rh, flags, ep, ps, addr, layer);
1260                        break;
1261
1262                case RTCP_PT_SDES:
1263                        parse_sdes(rh, flags, ep, ps, addr, ssrc, layer);
1264                        break;
1265
1266                case RTCP_PT_BYE:
1267                        parse_bye(rh, flags, ep, ps);
1268                        break;
1269
1270                default:
1271                        ps->badsessopt(1);
1272                        break;
1273                }
1274                rh = (rtcphdr*)ep;
1275        }
1276        return;
1277}
Note: See TracBrowser for help on using the browser.