root/vic/branches/cc/rtp/session.cpp @ 4238

Revision 4238, 31.6 KB (checked in by soohyunc, 6 years ago)

(temporary commit)
building a separate RTP extended send method
- the original send_report() only specifies RR and SR

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*-
2 * Copyright (c) 1993-1994 The Regents of the University of California.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 * 3. All advertising materials mentioning features or use of this software
14 *    must display the following acknowledgement:
15 *      This product includes software developed by the University of
16 *      California, Berkeley and the Network Research Group at
17 *      Lawrence Berkeley Laboratory.
18 * 4. Neither the name of the University nor of the Laboratory may be used
19 *    to endorse or promote products derived from this software without
20 *    specific prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
23 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
26 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
33 *
34 * $Id$
35 */
36static const char rcsid[] =
37    "@(#) $Header$ (LBL)";
38
39#include "config.h"
40#include <math.h>
41#include <errno.h>
42#include <string.h>
43#ifdef WIN32
44extern "C" int getpid();
45#endif
46#include "source.h"
47#include "vic_tcl.h"
48#include "media-timer.h"
49#include "crypt.h"
50#include "timer.h"
51#include "ntp-time.h"
52#include "session.h"
53#include "cc/tfwc_sndr.h"
54
55/* added to support the mbus
56#include "mbus_handler.h"*/
57
58
59static class SessionMatcher : public Matcher {
60    public:
61                SessionMatcher() : Matcher("session") {}
62                TclObject* match(const char* id) {
63                        if (strcmp(id, "audio/rtp") == 0)
64                                return (new AudioSessionManager);
65                        else if (strcmp(id, "video/rtp") == 0)
66                                return (new VideoSessionManager);
67                        return (0);
68                }
69} session_matcher;
70
71int VideoSessionManager::check_format(int fmt) const
72{
73        switch(fmt) {
74                case RTP_PT_RAW:
75                case RTP_PT_CELLB:
76                case RTP_PT_JPEG:
77                case RTP_PT_CUSEEME:
78                case RTP_PT_NV:
79                case RTP_PT_CPV:
80                case RTP_PT_H261:
81                case RTP_PT_BVC:
82                case RTP_PT_H261_COMPAT:/*XXX*/
83                case RTP_PT_H263:
84                case RTP_PT_MPEG4:
85                case RTP_PT_H264:
86                case RTP_PT_H263P:
87                case RTP_PT_LDCT:
88                case RTP_PT_PVH:
89                case RTP_PT_DV:
90#ifdef USE_H261AS
91                case RTP_PT_H261AS:
92#endif 
93                return (1);
94        }
95        return (0);
96}
97
98int AudioSessionManager::check_format(int fmt) const
99{
100        switch (fmt) {
101        case RTP_PT_PCMU:
102        case RTP_PT_CELP:
103        case RTP_PT_GSM:
104        case RTP_PT_DVI:
105        case RTP_PT_LPC:
106                return (1);
107        }
108        return (0);
109}
110
111
112static SessionManager* manager;
113
114void
115adios()
116{
117        if (SourceManager::instance().localsrc() != 0)
118                manager->send_bye();
119        exit(0);
120}
121
122/*void ReportTimer::timeout()
123{
124sm_.send_report();
125}*/
126
127void DataHandler::dispatch(int)
128{
129        sm_->recv(this);
130}
131
132void CtrlHandler::dispatch(int)
133{
134        sm_->recv(this);
135}
136
137CtrlHandler::CtrlHandler()
138: ctrl_inv_bw_(0.),
139ctrl_avg_size_(128.),
140rint_(0.0) //SV-XXX: Debian
141{
142}
143
144inline void CtrlHandler::schedule_timer()
145{
146        msched(int(fmod(double(random()), rint_) + rint_ * .5 + .5));
147}
148
149void CtrlHandler::net(Network* n)
150{
151        DataHandler::net(n);
152        cancel();
153        if (n != 0) {
154        /*
155        * schedule a timer for our first report using half the
156        * min ctrl interval.  This gives us some time before
157        * our first report to learn about other sources so our
158        * next report interval will account for them.  The avg
159        * ctrl size was initialized to 128 bytes which is
160        * conservative (it assumes everyone else is generating
161        * SRs instead of RRs).
162                */
163                double rint = ctrl_avg_size_ * ctrl_inv_bw_;
164                if (rint < CTRL_MIN_RPT_TIME / 2. * 1000.)
165                        rint = CTRL_MIN_RPT_TIME / 2. * 1000.;
166                rint_ = rint;
167                schedule_timer();
168        }
169}
170
171void CtrlHandler::sample_size(int cc)
172{
173        ctrl_avg_size_ += CTRL_SIZE_GAIN * (double(cc + 28) - ctrl_avg_size_);
174}
175
176void CtrlHandler::adapt(int nsrc, int nrr, int we_sent)
177{
178/*
179* compute the time to the next report.  we do this here
180* because we need to know if there were any active sources
181* during the last report period (nrr above) & if we were
182* a source.  The bandwidth limit for ctrl traffic was set
183* on startup from the session bandwidth.  It is the inverse
184* of bandwidth (ie., ms/byte) to avoid a divide below.
185        */
186        double ibw = ctrl_inv_bw_;
187        if (nrr) {
188                /* there were active sources */
189                if (we_sent) {
190                        ibw *= 1./CTRL_SENDER_BW_FRACTION;
191                        nsrc = nrr;
192                } else {
193                        ibw *= 1./CTRL_RECEIVER_BW_FRACTION;
194                        nsrc -= nrr;
195                }
196        }
197        double rint = ctrl_avg_size_ * double(nsrc) * ibw;
198        if (rint < CTRL_MIN_RPT_TIME * 1000.)
199                rint = CTRL_MIN_RPT_TIME * 1000.;
200        rint_ = rint;
201}
202
203void CtrlHandler::timeout()
204{
205        sm_->announce(this);
206        schedule_timer();
207}
208
209//SV-XXX: rearranged initialization order to shut up gcc4
210SessionManager::SessionManager()
211//      : dh_(*this), ch_(*this), rt_(*this),
212: mb_(mbus_handler_engine, NULL),
213lipSyncEnabled_(0),
214badversion_(0),
215badoptions_(0),
216badfmt_(0),
217badext_(0),
218nrunt_(0),
219last_np_(0),
220sdes_seq_(0),
221rtcp_inv_bw_(0.),
222rtcp_avg_size_(128.),
223confid_(-1),
224seqno_(0),              // RTP packet sequence number (from RTP header)
225lastseq_(0),    // last received packet's seqno
226ackvec_(0)              // bit vector (AckVec)
227{
228        /*XXX For adios() to send bye*/
229        manager = this;
230       
231        for (int i = 0; i < NLAYER; ++i) {
232                dh_[i].manager(this);
233                ch_[i].manager(this);
234        }
235       
236        /*XXX*/
237        pktbuf_ = new u_char[2 * RTP_MTU];
238        pool_ = new BufferPool;
239       
240        /*
241        * schedule a timer for our first report using half the
242        * min rtcp interval.  This gives us some time before
243        * our first report to learn about other sources so our
244        * next report interval will account for them.  The avg
245        * rtcp size was initialized to 128 bytes which is
246        * conservative (it assumes everyone else is generating
247        * SRs instead of RRs).
248        */
249        double rint = rtcp_avg_size_ * rtcp_inv_bw_;
250        if (rint < RTCP_MIN_RPT_TIME / 2. * 1000.)
251                rint = RTCP_MIN_RPT_TIME / 2. * 1000.;
252        rint_ = rint;
253        //rt_.msched(int(fmod(double(random()), rint) + rint * .5 + .5));
254}
255
256SessionManager::~SessionManager()
257{
258        if (pktbuf_)
259                delete[] pktbuf_;
260       
261        delete pool_;
262}
263
264u_int32_t SessionManager::alloc_srcid(Address & addr) const
265{
266        timeval tv;
267        ::gettimeofday(&tv, 0);
268        u_int32_t srcid = u_int32_t(tv.tv_sec + tv.tv_usec);
269        srcid += (u_int32_t)getuid();
270        srcid += (u_int32_t)getpid();
271/* __IPv6 changed srcid computation */
272        for(unsigned int i = 0; i < (addr.length() % sizeof(u_int32_t)); i++) {
273                srcid += ((u_int32_t*)((const void*)addr))[i];
274        }
275        return (srcid);
276}
277
278extern char* onestat(char* cp, const char* name, u_long v);
279
280char* SessionManager::stats(char* cp) const
281{
282        cp = onestat(cp, "Bad-RTP-version", badversion_);
283        cp = onestat(cp, "Bad-RTPv1-options", badoptions_);
284        cp = onestat(cp, "Bad-Payload-Format", badfmt_);
285        cp = onestat(cp, "Bad-RTP-Extension", badext_);
286        cp = onestat(cp, "Runts", nrunt_);
287        Crypt* p = dh_[0].net()->crypt();
288        if (p != 0) {
289                cp = onestat(cp, "Crypt-Bad-Length", p->badpktlen());
290                cp = onestat(cp, "Crypt-Bad-P-Bit", p->badpbit());
291        }
292        /*XXX*/
293        if (ch_[0].net() != 0) {
294                Crypt* p = ch_[0].net()->crypt();
295                if (p != 0) {
296                        cp = onestat(cp, "Crypt-Ctrl-Bad-Length", p->badpktlen());
297                        cp = onestat(cp, "Crypt-Ctrl-Bad-P-Bit", p->badpbit());
298                }
299        }
300        *--cp = 0;
301        return (cp);
302}
303
304int SessionManager::command(int argc, const char*const* argv)
305{
306        Tcl& tcl = Tcl::instance();
307        char* cp = tcl.buffer();
308        if (argc == 2) {
309                if (strcmp(argv[1], "active") == 0) {
310                        SourceManager::instance().sortactive(cp);
311                        tcl.result(cp);
312                        return (TCL_OK);
313                }
314                if (strcmp(argv[1], "local-addr-heuristic") == 0) {
315                        strcpy(cp, intoa(LookupLocalAddr()));
316                        tcl.result(cp);
317                        return (TCL_OK);
318                }
319                if (strcmp(argv[1], "stats") == 0) {
320                        stats(cp);
321                        tcl.result(cp);
322                        return (TCL_OK);
323                }
324                if (strcmp(argv[1], "nb") == 0) {
325                        sprintf(cp, "%u", 8 * nb_);
326                        tcl.result(cp);
327                        return (TCL_OK);
328                }
329                if (strcmp(argv[1], "nf") == 0) {
330                        sprintf(cp, "%u", nf_);
331                        tcl.result(cp);
332                        return (TCL_OK);
333                }
334                if (strcmp(argv[1], "np") == 0 ||
335                    strcmp(argv[1], "ns") == 0) {
336                        sprintf(cp, "%u", np_);
337                        tcl.result(cp);
338                        return (TCL_OK);
339                }
340                if (strcmp(argv[1], "lip-sync") == 0) {
341                        sprintf(cp, "%u", lipSyncEnabled_);
342                        tcl.result(cp);
343                        return (TCL_OK);
344                }
345
346        } else if (argc == 3) {
347                if (strcmp(argv[1], "sm") == 0) {
348                        sm_ = (SourceManager*)TclObject::lookup(argv[2]);
349                        return (TCL_OK);
350                }
351                if (strcmp(argv[1], "name") == 0) {
352                        Source* s = SourceManager::instance().localsrc();
353                        s->sdes(RTCP_SDES_NAME, argv[2]);
354                        return (TCL_OK);
355                }
356                if (strcmp(argv[1], "email") == 0) {
357                        Source* s = SourceManager::instance().localsrc();
358                        s->sdes(RTCP_SDES_EMAIL, argv[2]);
359                        return (TCL_OK);
360                }
361                if (strcmp(argv[1], "random-srcid") == 0) {
362                        Address * addrp;
363                        if ((addrp = (dh_[0].net())->alloc(argv[2])) !=0 ) { //SV-XXX: placed ()'s against truth check == NULL
364                          sprintf(cp, "%u", alloc_srcid(*addrp));
365                          delete addrp;
366                        }
367                        tcl.result(cp);
368                        return (TCL_OK);
369                }
370                if (strcmp(argv[1], "data-net") == 0) {
371                        dh_[0].net((Network*)TclObject::lookup(argv[2]));
372                        return (TCL_OK);
373                }
374                if (strcmp(argv[1], "ctrl-net") == 0) {
375                        ch_[0].net((Network*)TclObject::lookup(argv[2]));
376                        return (TCL_OK);
377                }
378                if (strcmp(argv[1], "max-bandwidth") == 0) {
379                        double bw = atof(argv[2]) / 8.;
380                        rtcp_inv_bw_ = 1. / (bw * RTCP_SESSION_BW_FRACTION);
381                        return (TCL_OK);
382                }
383                if (strcmp(argv[1], "confid") == 0) {
384                        confid_ = atoi(argv[2]);
385                        return (TCL_OK);
386                }
387                if (strcmp(argv[1], "data-bandwidth") == 0) {
388                        /*XXX assume value in range */
389                        bps(atoi(argv[2]));
390                        return (TCL_OK);
391                }
392                if (strcmp(argv[1], "mtu") == 0) {
393                        mtu_ = atoi(argv[2]);
394                        return (TCL_OK);
395                }
396                if (strcmp(argv[1], "loopback") == 0) {
397                        loopback_ = atoi(argv[2]);
398                        return (TCL_OK);
399                }
400                if (strcmp(argv[1], "lip-sync") == 0) {
401                        lipSyncEnabled_ = atoi(argv[2]);
402                        return (TCL_OK);
403                }
404
405        }  else if (argc == 4) {
406                if (strcmp(argv[1], "data-net") == 0) {
407                        u_int layer = atoi(argv[3]);
408                        if (layer >= NLAYER)
409                                abort();
410                        if (*argv[2] == 0) {
411                                dh_[layer].net(0);
412                                return (TCL_OK);
413                        }
414                        Network* net = (Network*)TclObject::lookup(argv[2]);
415                        if (net == 0) {
416                                tcl.resultf("no network %s", argv[2]);
417                                return (TCL_ERROR);
418                        }
419                        if (net->rchannel() < 0) {
420                                tcl.resultf("network not open");
421                                return (TCL_ERROR);
422                        }
423                        dh_[layer].net(net);
424                        return (TCL_OK);
425                }
426                if (strcmp(argv[1], "ctrl-net") == 0) {
427                        u_int layer = atoi(argv[3]);
428                        if (layer >= NLAYER)
429                                abort();
430                        if (*argv[2] == 0) {
431                                ch_[layer].net(0);
432                                return (TCL_OK);
433                        }
434                        Network* net = (Network*)TclObject::lookup(argv[2]);
435                        if (net == 0) {
436                                tcl.resultf("no network %s", argv[2]);
437                                return (TCL_ERROR);
438                        }
439                        if (net->rchannel() < 0) {
440                                tcl.resultf("network not open");
441                                return (TCL_ERROR);
442                        }
443                        ch_[layer].net(net);
444                        return (TCL_OK);
445                }
446                return (Transmitter::command(argc, argv));
447        }
448        //should not be reached
449        return (TCL_ERROR);
450}
451
452void SessionManager::transmit(pktbuf* pb)
453{
454        //mh_.msg_iov = pb->iov;
455        //      dh_[.net()->send(mh_);
456                //debug_msg("L %d,",pb->layer);
457        // Using loop_layer for now to restrict transmission as well
458        if (pb->layer < loop_layer_) {
459        //      if ( pb->layer <0 ) exit(1);
460                Network* n = dh_[pb->layer].net();
461                if (n != 0)
462                        n->send(pb);
463        }
464}
465
466u_char* SessionManager::build_sdes_item(u_char* p, int code, Source& s)
467{
468        const char* value = s.sdes(code);
469        if (value != 0) {
470                int len = strlen(value);
471                *p++ = code;
472                *p++ = len;
473                memcpy(p, value, len);
474                p += len;
475        }
476        return (p);
477}
478
479int SessionManager::build_sdes(rtcphdr* rh, Source& ls)
480{
481        int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_SDES;
482        rh->rh_flags = htons(flags);
483        rh->rh_ssrc = ls.srcid();
484        u_char* p = (u_char*)(rh + 1);
485        p = build_sdes_item(p, RTCP_SDES_CNAME, ls);
486
487        /*
488         * We always send a cname plus one other sdes
489         * There's a schedule for what we send sequenced by sdes_seq_:
490         *   - send 'email' every 0th & 4th packet
491         *   - send 'note' every 2nd packet
492         *   - send 'tool' every 6th packet
493         *   - send 'name' in all the odd slots
494         * (if 'note' is not the empty string, we switch the roles
495         *  of name & note)
496         */
497        int nameslot, noteslot;
498        const char* note = ls.sdes(RTCP_SDES_NOTE);
499        if (note) {
500                if (*note) {
501                        nameslot = RTCP_SDES_NOTE;
502                        noteslot = RTCP_SDES_NAME;
503                } else {
504                        nameslot = RTCP_SDES_NAME;
505                        noteslot = RTCP_SDES_NOTE;
506                }
507        } else {
508                nameslot = RTCP_SDES_NAME;
509                noteslot = RTCP_SDES_NAME;
510        }
511        u_int seq = (++sdes_seq_) & 0x7;
512        switch (seq) {
513
514        case 0case 4:
515                p = build_sdes_item(p, RTCP_SDES_EMAIL, ls);
516                break;
517
518        case 2:
519                p = build_sdes_item(p, noteslot, ls);
520                break;
521        case 6:
522                p = build_sdes_item(p, RTCP_SDES_TOOL, ls);
523                break;
524        default:
525                p = build_sdes_item(p, nameslot, ls);
526        }
527        int len = p - (u_char*)rh;
528        int pad = 4 - (len & 3);
529        len += pad;
530        rh->rh_len = htons((len >> 2) - 1);
531        while (--pad >= 0)
532                *p++ = 0;
533
534        return (len);
535}
536
537int SessionManager::build_app(rtcphdr* rh, Source& ls, const char *name,
538                void *data, int datalen)
539{
540  int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_APP;
541  rh->rh_flags = htons(flags);
542  rh->rh_ssrc = ls.srcid();
543  u_char* p = (u_char*)(rh + 1);
544    int len;
545   
546    // write the name field
547    len = strlen(name);
548    if( len < 4 ) {
549        memcpy(p,name,len);
550        p += len;
551       
552        // pad short names
553        while( p - (u_char*)(rh+1) < 4 )
554            *p++ = 0;
555    }
556    else {
557        // use first four chars of given name
558        memcpy(p,name,4);
559        p += 4;
560    }
561   
562    // write the app data
563    memcpy(p,data,datalen);
564    p += datalen;
565   
566    // pad as needed
567    len = p - (u_char*)rh;
568    int pad = 4 - (len & 3);
569    while( --pad >= 0 )
570        *p++ = 0;
571  len = p - (u_char*)rh;
572
573    // set the length now that it's known
574  rh->rh_len = htons((len >> 2) - 1);
575
576  return (len);
577}
578
579/*void SessionManager::send_report()
580{
581        send_report(0);
582}
583
584  void SessionManager::send_bye()
585  {
586  send_report(1);
587}*/
588
589// SessionManager is no longer used as Timer - Each
590// CtrlHandler has its own Timer which calls this;
591void SessionManager::announce(CtrlHandler* ch)
592{
593        send_report(ch, 0);
594        send_xreport(ch, 0);
595}
596
597/*
598 * send RTP extended report.
599 */
600void SessionManager::send_xreport(CtrlHandler* ch, int bye, int app)
601{
602        UNUSED(app);
603
604        SourceManager& sm = SourceManager::instance();
605        Source& s = *sm.localsrc();     // local source
606        rtcphdr* rh = (rtcphdr*)pktbuf_;
607        rh->rh_ssrc = s.srcid();
608
609        int flags = RTP_VERSION << 14;
610        int layer = ch- ch_;
611        Source:: Layer& sl = s.layer(layer);
612        timeval now = unixtime();
613        sl.lts_ctrl(now);
614
615        rtcp_xr* xr;    // extended report
616
617        flags |= RTCP_PT_XR;
618        xr = (rtcp_xr*)(rh + 1);
619}
620
621
622
623/*XXX check for buffer overflow*/
624/*
625* Send an RTPv2 report packet.
626*/
627//void SessionManager::send_report(int bye)
628
629void SessionManager::send_report(CtrlHandler* ch, int bye, int app)
630{
631        UNUSED(app); //SV-XXX: unused
632
633        SourceManager& sm = SourceManager::instance();
634        Source& s = *sm.localsrc();
635        rtcphdr* rh = (rtcphdr*)pktbuf_;
636        rh->rh_ssrc = s.srcid();
637        int flags = RTP_VERSION << 14;
638        int layer = ch - ch_; //LLL
639        Source::Layer& sl = s.layer(layer);
640        timeval now = unixtime();
641        sl.lts_ctrl(now);
642        int we_sent = 0;
643        rtcp_rr* rr;
644        rtcp_xr* xr;    // extended report
645        Tcl& tcl = Tcl::instance();
646
647        /*
648         * If we've sent data since our last sender report send a
649         * new report.  The MediaTimer check is to make sure we still
650         * have a grabber -- if not, we won't be able to interpret the
651         * media timestamps so there's no point in sending an SR.
652         */
653        MediaTimer* mt = MediaTimer::instance();
654        if (sl.np() != last_np_ && mt) {
655                last_np_ = sl.np();
656                we_sent = 1;
657                flags |= RTCP_PT_SR;
658                rtcp_sr* sr = (rtcp_sr*)(rh + 1);
659                sr->sr_ntp = ntp64time(now);
660                HTONL(sr->sr_ntp.upper);
661                HTONL(sr->sr_ntp.lower);
662                sr->sr_ts = htonl(mt->ref_ts());
663                sr->sr_np = htonl(sl.np());
664                sr->sr_nb = htonl(sl.nb());
665                rr = (rtcp_rr*)(sr + 1);
666                xr = (rtcp_xr*)(rr + 1);        // extended report
667        } else {
668                flags |= RTCP_PT_RR;
669                rr = (rtcp_rr*)(rh + 1);
670                xr = (rtcp_xr*)(rr + 1);        // extended report
671        }
672        int nrr = 0;
673        int nsrc = 0;
674        /*
675        * we don't want to inflate report interval if user has set
676        * the flag that causes all sources to be 'kept' so we
677        * consider sources 'inactive' if we haven't heard a control
678        * msg from them for ~32 reporting intervals.
679        */
680        //LLL   u_int inactive = u_int(rint_ * (32./1000.));
681        u_int inactive = u_int(ch->rint() * (32./1000.));
682        if (inactive < 2)
683                inactive = 2;
684        for (Source* sp = sm.sources(); sp != 0; sp = sp->next_) {
685                ++nsrc;
686                Source::Layer& sl = sp->layer(layer);
687                //              int received = sp->np() - sp->snp();
688                int received = sl.np() - sl.snp();
689                if (received == 0) {
690                        //              if (u_int(now.tv_sec - sp->lts_ctrl().tv_sec) > inactive)
691                        if (u_int(now.tv_sec - sl.lts_ctrl().tv_sec) > inactive)
692                                --nsrc;
693                        continue;
694                }
695                //              sp->snp(sp->np());
696                sl.snp(sl.np());
697                rr->rr_srcid = sp->srcid();
698                //              int expected = sp->ns() - sp->sns();
699                int expected = sl.ns() - sl.sns();
700                //              sp->sns(sp->ns());
701                sl.sns(sl.ns());
702                u_int32_t v;
703                int lost = expected - received;
704                if (lost <= 0)
705                        v = 0;
706                else
707                        /* expected != 0 if lost > 0 */
708                        v = ((lost << 8) / expected) << 24;
709                /* XXX should saturate on over/underflow */
710                //              v |= (sp->ns() - sp->np()) & 0xffffff;
711                v |= (sl.ns() - sl.np()) & 0xffffff;
712                rr->rr_loss = htonl(v);
713                //              rr->rr_ehsr = htonl(sp->ehs());
714                rr->rr_ehsr = htonl(sl.ehs());
715                rr->rr_dv = (sp->handler() != 0) ? sp->handler()->delvar() : 0;
716                //              rr->rr_lsr = htonl(sp->sts_ctrl());
717                rr->rr_lsr = htonl(sl.sts_ctrl());
718                //              if (sp->lts_ctrl().tv_sec == 0)
719                if (sl.lts_ctrl().tv_sec == 0)
720                        rr->rr_dlsr = 0;
721                else {
722                        u_int32_t ntp_now = ntptime(now);
723                        //                      u_int32_t ntp_then = ntptime(sp->lts_ctrl());
724                        u_int32_t ntp_then = ntptime(sl.lts_ctrl());
725                        rr->rr_dlsr = htonl(ntp_now - ntp_then);
726                }
727                ++rr;
728                if (++nrr >= 31)
729                        break;
730        }
731        flags |= nrr << 8;
732        rh->rh_flags = htons(flags);
733        int len = (u_char*)rr - pktbuf_;
734        rh->rh_len = htons((len >> 2) - 1);
735
736        if (bye)
737                len += build_bye((rtcphdr*)rr, s);
738        else
739                len += build_sdes((rtcphdr*)rr, s);
740       
741        // build "site" app data if specified
742        const char *data = tcl.attr("site");
743        if(data)
744        {
745            rr = (rtcp_rr*)(((u_char*)rh) + len);
746            len += build_app((rtcphdr*)rr, s, "site", (void *)data, strlen(data));
747        }
748        //LLL   ch_.send(pktbuf_, len);
749        ch->send(pktbuf_, len);
750       
751        /*
752      rtcp_avg_size_ += RTCP_SIZE_GAIN * (double(len + 28) - rtcp_avg_size_);
753         
754          // compute the time to the next report.  we do this here
755          // because we need to know if there were any active sources
756          // during the last report period (nrr above) & if we were
757          // a source.  The bandwidth limit for rtcp traffic was set
758          // on startup from the session bandwidth.  It is the inverse
759          // of bandwidth (ie., ms/byte) to avoid a divide below.
760       
761        //      double ibw = rtcp_inv_bw_;
762        if (nrr) {
763        // there were active sources
764        //              if (we_sent) {
765        ibw *= 1./RTCP_SENDER_BW_FRACTION;
766        nsrc = nrr;
767        } else {
768        ibw *= 1./RTCP_RECEIVER_BW_FRACTION;
769        nsrc -= nrr;
770        }
771        }
772        double rint = rtcp_avg_size_ * double(nsrc) * ibw;
773        if (rint < RTCP_MIN_RPT_TIME * 1000.)
774                rint = RTCP_MIN_RPT_TIME * 1000.;
775        rint_ = rint;
776        rt_.msched(int(fmod(double(random()), rint) + rint * .5 + .5));
777        */
778       
779        // Call timer adaption for each layer
780        ch->adapt(nsrc, nrr, we_sent);
781        ch->sample_size(len);
782       
783        //      sm.CheckActiveSources(rint);
784        if (layer == 0)
785                sm.CheckActiveSources(ch->rint());
786       
787}
788
789int SessionManager::build_bye(rtcphdr* rh, Source& ls)
790{
791        int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_BYE;
792        rh->rh_flags = ntohs(flags);
793        rh->rh_len = htons(1);
794        rh->rh_ssrc = ls.srcid();
795        return (8);
796}
797
798/*
799 * receive an RTP packet
800 */
801void SessionManager::recv(DataHandler* dh)
802{
803        int layer = dh - dh_;
804        pktbuf* pb = pool_->alloc(layer);
805        Address * addrp;
806        /* leave room in case we need to expand rtpv1 into an rtpv2 header */
807        /* XXX the free mem routine didn't like it ... */
808        //u_char* bp = &pktbuf_[4];
809        //u_char* bp = pktbuf_;
810       
811        int cc = dh->recv(pb->data, sizeof(pb->data), addrp);
812        //int cc = dh->recv(bp, 2 * RTP_MTU - 4, addrp);
813        if (cc <= 0) {
814                pb->release();
815                return;
816        }
817
818        rtphdr* rh = (rtphdr*)pb->data;
819        seqno_ = ntohs(rh->rh_seqno);   // received packet seqno
820        debug_msg("received seqno:      %d\n", seqno_);
821
822    // Ignore loopback packets
823        if (!loopback_) {
824                //rtphdr* rh = (rtphdr*)pb->data;
825                SourceManager& sm = SourceManager::instance();
826                if (rh->rh_ssrc == (*sm.localsrc()).srcid()) {
827                        debug_msg("(loopback) seqno:    %d\n", seqno_);
828                        pb->release();  // releasing loopback packet
829                        return;
830                }
831        } // now, loopback packets ignored (if disabled)
832
833        // set bit vector
834        for (int i = lastseq_+1; i <= seqno_; i++) {
835                        SET_BIT_VEC (ackvec_, 1);
836        }
837
838        // printing bit vector
839        bool isThere;
840        debug_msg("XXX received ackvec:");
841        for (int i = lastseq_+1; i <= seqno_; i++) {
842                isThere = SEE_BIT_VEC (ackvec_, i, seqno_);
843                printf(" %d... %s ", seqno_, isThere ? "Ok" : "NOk");
844        }
845        printf("\n");
846        lastseq_ = seqno_;
847
848        int version = pb->data[0] >> 6;
849        //int version = *(u_char*)rh >> 6;
850        if (version != 2) {
851                ++badversion_;
852                pb->release();
853                return;
854        }
855        if (cc < (int)sizeof(rtphdr)) {
856                ++nrunt_;
857                pb->release();
858                return;
859        }
860        pb->len = cc;
861       
862        //bp += sizeof(*rh);
863        //cc -= sizeof(*rh);
864        demux(pb, *addrp);
865}
866
867void SessionManager::demux(pktbuf* pb, Address & addr)
868{
869        rtphdr* rh = (rtphdr*)pb->data;
870        u_int32_t srcid = rh->rh_ssrc;
871        int flags = ntohs(rh->rh_flags);
872        // for LIP SYNC
873        //SV-XXX: unused: u_char *pkt = pb->data - sizeof(*rh);
874
875        if ((flags & RTP_X) != 0) {
876        /*
877        * the minimal-control audio/video profile
878        * explicitly forbids extensions
879                */
880                ++badext_;
881                pb->release();
882                return;
883        }
884
885        /*
886         * Check for illegal payload types.  Most likely this is
887         * a session packet arriving on the data port.
888         */
889        int fmt = flags & 0x7f;
890        if (!check_format(fmt)) {
891                ++badfmt_;
892                pb->release();
893                return;
894        }
895
896        SourceManager& sm = SourceManager::instance();
897        u_int16_t seqno = ntohs(rh->rh_seqno);
898        Source* s = sm.demux(srcid, addr, seqno, pb->layer);
899        if (s == 0) {
900        /*
901        * Takes a pair of validated packets before we will
902        * believe the source.  This prevents a runaway
903        * allocation of Source data structures for a
904        * stream of garbage packets.
905                */
906                pb->release();
907                return;
908        }
909        /* inform this source of the mbus */
910        s->mbus(&mb_);
911       
912        Source::Layer& sl = s->layer(pb->layer);
913        timeval now = unixtime();
914        //      s->lts_data(now);
915        sl.lts_data(now);
916        //      s->sts_data(rh->rh_ts);
917        sl.sts_data(rh->rh_ts);
918       
919        // extract CSRC count (CC field); increment pb->dp data pointer & adjust length accordingly
920        int cnt = (flags >> 8) & 0xf;
921        if (cnt > 0) {
922                //u_char* nh = (u_char*)rh + (cnt << 2);
923                rtphdr hdr = *rh;
924                pb->dp += (cnt << 2);
925                pb->len -= (cnt << 2);
926                u_int32_t* bp = (u_int32_t*)(rh + 1);
927                while (--cnt >= 0) {
928                        u_int32_t csrc = *(u_int32_t*)bp;
929                        bp += 4;
930                        Source* cs = sm.lookup(csrc, srcid, addr);
931                        //                      cs->lts_data(now);
932                        cs->layer(pb->layer).lts_data(now);
933                        cs->action();
934                }
935                //              rtphdr hdr = *rh;
936                //              rh = (rtphdr*)nh;
937                /*XXX move header up so it's contiguous with data*/
938                rh = (rtphdr*)pb->dp;
939                *rh = hdr;
940        } else
941                s->action();
942
943        if (s->sync() && lipSyncEnabled()) { 
944                /*
945                 * Synchronisation is enabled on this source;
946                 * video packets have to
947                 * be buffered, their playout point scheduled, and the
948                 * playout delays communicated with the audio tool ...
949                 */ 
950
951                /*
952                * XXX bit rate doesn't include rtpv1 options;
953                * but v1 is going away anyway.
954                */
955                //              int dup = s->cs(seqno);
956                //              s->np(1);
957                //              s->nb(cc + sizeof(*rh));
958                int dup = sl.cs(seqno, s);
959                sl.np(1);
960                sl.nb(pb->len);
961                if (dup) {
962                        pb->release();
963                        return;
964                }
965                if (flags & RTP_M) // check if reach frame boundaries
966                        //                      s->nf(1);
967                        sl.nf(1);
968               
969                //s->recv(pkt, rh, pb, pb->len); // this should invoke Source::recv and buffer
970                //s->recv(bp); // this should invoke Source::recv and buffer
971               
972                pktbuf_ = (u_char*)new_blk();
973        } /* synced */
974
975        else { /* ... playout video packets as they arrive */
976                /*
977                 * This is a data packet.  If the source needs activation,
978                 * or the packet format has changed, deal with this.
979                 * Then, hand the packet off to the packet handler.
980                 * XXX might want to be careful about flip-flopping
981                 * here when format changes due to misordered packets
982                 * (easy solution -- keep rtp seqno of last fmt change).
983                 */
984                PacketHandler* h = s->handler();
985                if (h == 0)
986                        h = s->activate(fmt);
987                else if (s->format() != fmt)
988                        h = s->change_format(fmt);
989               
990                        /*
991                        * XXX bit rate doesn't include rtpv1 options;
992                        * but v1 is going away anyway.
993                */
994                //              int dup = s->cs(seqno);
995                //              s->np(1);
996                //              s->nb(cc + sizeof(*rh));
997                int dup = sl.cs(seqno, s);
998                sl.np(1);
999                sl.nb(pb->len);
1000                if (dup){
1001                        pb->release();
1002                        return;
1003                }
1004                if (flags & RTP_M)
1005                        //                  s->nf(1);
1006                        sl.nf(1);
1007#ifdef notdef
1008        /* This should move to the handler */
1009        /*XXX could get rid of hdrlen and move run check into recv method*/
1010               
1011                int hlen = h->hdrlen();
1012                cc -= hlen;
1013                if (cc < 0) {
1014                        //                  s->runt(1);
1015                        sl.runt(1);
1016                        pb->release();
1017                        return;
1018                }
1019#endif
1020                if (s->mute()) {
1021                        pb->release();
1022                        return;
1023                }
1024                //h->recv(rh, bp + hlen, cc);
1025                h->recv(pb);
1026        } /* not sync-ed */
1027
1028}
1029
1030void SessionManager::parse_rr_records(u_int32_t, rtcp_rr*, int,
1031                                      const u_char*, Address &)
1032{
1033}
1034                                     
1035
1036void SessionManager::parse_sr(rtcphdr* rh, int flags, u_char*ep,
1037                                                          Source* ps, Address & addr, int layer)
1038{
1039        rtcp_sr* sr = (rtcp_sr*)(rh + 1);
1040        Source* s;
1041        u_int32_t ssrc = rh->rh_ssrc;
1042        if (ps->srcid() != ssrc)
1043                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
1044        else
1045                s = ps;
1046       
1047        Source::Layer& sl = s->layer(layer);
1048       
1049        timeval now = unixtime();
1050
1051        sl.lts_ctrl(now);
1052        sl.sts_ctrl(ntohl(sr->sr_ntp.upper) << 16 |
1053                ntohl(sr->sr_ntp.lower) >> 16);
1054       
1055        //int cnt = flags >> 8 & 0x1f;
1056        //parse_rr_records(ssrc, (rtcp_rr*)(sr + 1), cnt, ep, addr);
1057       
1058        /*s->lts_ctrl(now);
1059        s->sts_ctrl(ntohl(sr->sr_ntp.upper) << 16 |
1060        ntohl(sr->sr_ntp.lower) >> 16);*/
1061       
1062        s->rtp_ctrl(ntohl(sr->sr_ts));
1063        u_int32_t t = ntptime(now);
1064        s->map_ntp_time(t);
1065        s->map_rtp_time(s->convert_time(t));
1066        s->rtp2ntp(1);
1067        //printf("Got SR\n");
1068
1069        int cnt = flags >> 8 & 0x1f;
1070        parse_rr_records(ssrc, (rtcp_rr*)(sr + 1), cnt, ep, addr);
1071}
1072
1073void SessionManager::parse_rr(rtcphdr* rh, int flags, u_char* ep,
1074                                                          Source* ps, Address & addr, int layer)
1075{
1076        Source* s;
1077        u_int32_t ssrc = rh->rh_ssrc;
1078        if (ps->srcid() != ssrc)
1079                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
1080        else
1081                s = ps;
1082       
1083        s->layer(layer).lts_ctrl(unixtime());
1084        int cnt = flags >> 8 & 0x1f;
1085        parse_rr_records(ssrc, (rtcp_rr*)(rh + 1), cnt, ep, addr);
1086}
1087
1088void SessionManager::parse_xr(rtcphdr* rh, int flags, u_char* ep,
1089                                                          Source* ps, Address & addr, int layer)
1090{
1091
1092        Source* s;
1093        u_int32_t ssrc = rh->rh_ssrc;
1094        if (ps->srcid() != ssrc)
1095                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
1096        else
1097                s = ps;
1098
1099        s->layer(layer).lts_ctrl(unixtime());
1100        int cnt = flags >> 8 & 0x1f;
1101        parse_xr_records(ssrc, (rtcp_xr*)(rh + 1), cnt, ep, addr);
1102}
1103
1104void SessionManager::parse_xr_records(u_int32_t ssrc, rtcp_xr* r, int cnt,
1105                                      const u_char* ep, Address & addr)
1106{
1107        debug_msg("XXX parse_xr_records\n");
1108        UNUSED(cnt);
1109        UNUSED(ep);
1110        UNUSED(addr);
1111
1112        ackvec_ = r->xr_ackvec;
1113        /*
1114         * if AoA is received, then first trim ackvec and send a new ackvec
1115         * if AckVec is received, then parse it to TfwcSndr
1116         */
1117}
1118
1119int SessionManager::sdesbody(u_int32_t* p, u_char* ep, Source* ps,
1120                                                Address & addr, u_int32_t ssrc, int layer)
1121{
1122        Source* s;
1123        u_int32_t srcid = *p;
1124        if (ps->srcid() != srcid)
1125                s = SourceManager::instance().lookup(srcid, ssrc, addr);
1126        else
1127                s = ps;
1128        if (s == 0)
1129                return (0);
1130                /*
1131                * Note ctrl packet since we will never see any direct ctrl packets
1132                * from a source through a mixer (and we don't want the source to
1133                * time out).
1134        */
1135        s->layer(layer).lts_ctrl(unixtime());
1136       
1137        u_char* cp = (u_char*)(p + 1);
1138        while (cp < ep) {
1139                char buf[256];
1140
1141                u_int type = cp[0];
1142                if (type == 0) {
1143                        /* end of chunk */
1144                        return (((cp - (u_char*)p) >> 2) + 1);
1145                }
1146                u_int len = cp[1];
1147                u_char* eopt = cp + len + 2;
1148                if (eopt > ep)
1149                        return (0);
1150
1151                if (type >= RTCP_SDES_MIN && type <= RTCP_SDES_MAX) {
1152                        memcpy(buf, (char*)&cp[2], len);
1153                        buf[len] = 0;
1154                        s->sdes(type, buf);
1155                } // else
1156                        /*XXX*/;
1157
1158                cp = eopt;
1159        }
1160        return (0);
1161}
1162
1163void SessionManager::parse_sdes(rtcphdr* rh, int flags, u_char* ep, Source* ps,
1164                                                                Address & addr, u_int32_t ssrc, int layer)
1165{
1166        int cnt = flags >> 8 & 0x1f;
1167        u_int32_t* p = (u_int32_t*)&rh->rh_ssrc;
1168        while (--cnt >= 0 && (u_char*)p < ep) {
1169                int n = sdesbody(p, ep, ps, addr, ssrc, layer);
1170                if (n == 0)
1171                        break;
1172                p += n;
1173        }
1174        if (cnt >= 0)
1175                ps->badsdes(1);
1176}
1177
1178void SessionManager::parse_bye(rtcphdr* rh, int flags, u_char* ep, Source* ps)
1179{
1180        int cnt = flags >> 8 & 0x1f;
1181        u_int32_t* p = (u_int32_t*)&rh->rh_ssrc;
1182
1183        while (--cnt >= 0) {
1184                if (p >= (u_int32_t*)ep) {
1185                        ps->badbye(1);
1186                        return;
1187                }
1188                Source* s;
1189                if (ps->srcid() != rh->rh_ssrc)
1190                        s = SourceManager::instance().consult(*p);
1191                else
1192                        s = ps;
1193                if (s != 0)
1194                        s->lts_done(unixtime());
1195                ++p;
1196        }
1197}
1198
1199/*
1200 * Receive an rtcp packet (from the control port).
1201 */
1202void SessionManager::recv(CtrlHandler* ch)
1203{
1204        Address * srcp;
1205        int cc = ch->recv(pktbuf_, 2 * RTP_MTU, srcp);
1206        if (cc <= 0)
1207                return;
1208
1209        rtcphdr* rh = (rtcphdr*)pktbuf_;
1210
1211    // Ignore loopback packets
1212        if (!loopback_) {
1213                SourceManager& sm = SourceManager::instance();
1214                if (rh->rh_ssrc == (*sm.localsrc()).srcid())
1215                        return;
1216        }
1217
1218        if (cc < int(sizeof(*rh))) {
1219                ++nrunt_;
1220                return;
1221        }
1222        /*
1223         * try to filter out junk: first thing in packet must be
1224         * sr, rr or bye & version number must be correct.
1225         */
1226        switch(ntohs(rh->rh_flags) & 0xc0ff) {
1227        case RTP_VERSION << 14 | RTCP_PT_SR:
1228        case RTP_VERSION << 14 | RTCP_PT_RR:
1229        case RTP_VERSION << 14 | RTCP_PT_XR:
1230        case RTP_VERSION << 14 | RTCP_PT_BYE:
1231                break;
1232        default:
1233                /*
1234                 * XXX should further categorize this error -- it is
1235                 * likely that people mis-implement applications that
1236                 * don't put something other than SR,RR,BYE first.
1237                 */
1238                ++badversion_;
1239                return;
1240        }
1241        /*
1242         * at this point we think the packet's valid.  Update our average
1243         * size estimator.  Also, there's valid ssrc so charge errors to it
1244         */
1245        rtcp_avg_size_ += RTCP_SIZE_GAIN * (double(cc + 28) - rtcp_avg_size_);
1246        Address & addr = *srcp;
1247
1248        /*
1249         * First record in compount packet must be the ssrc of the
1250         * sender of the packet.  Pull it out here so we can use
1251         * it in the sdes parsing, since the sdes record doesn't
1252         * contain the ssrc of the sender (in the case of mixers).
1253         */
1254        u_int32_t ssrc = rh->rh_ssrc;
1255        Source* ps = SourceManager::instance().lookup(ssrc, ssrc, addr);
1256        if (ps == 0)
1257                return;
1258       
1259        int layer = ch - ch_;
1260                /*
1261                * Outer loop parses multiple RTCP records of a "compound packet".
1262                * There is no framing between records.  Boundaries are implicit
1263                * and the overall length comes from UDP.
1264        */
1265        u_char* epack = (u_char*)rh + cc;
1266        while ((u_char*)rh < epack) {
1267                u_int len = (ntohs(rh->rh_len) << 2) + 4;
1268                u_char* ep = (u_char*)rh + len;
1269                if (ep > epack) {
1270                        ps->badsesslen(1);
1271                        return;
1272                }
1273                u_int flags = ntohs(rh->rh_flags);
1274                if (flags >> 14 != RTP_VERSION) {
1275                        ps->badsessver(1);
1276                        return;
1277                }
1278                switch (flags & 0xff) {
1279
1280                case RTCP_PT_SR:
1281                        parse_sr(rh, flags, ep, ps, addr, layer);
1282                        break;
1283
1284                case RTCP_PT_RR:
1285                        parse_rr(rh, flags, ep, ps, addr, layer);
1286                        break;
1287
1288                case RTCP_PT_XR:
1289                        parse_xr(rh, flags, ep, ps, addr, layer);
1290                        break;
1291
1292                case RTCP_PT_SDES:
1293                        parse_sdes(rh, flags, ep, ps, addr, ssrc, layer);
1294                        break;
1295
1296                case RTCP_PT_BYE:
1297                        parse_bye(rh, flags, ep, ps);
1298                        break;
1299
1300                default:
1301                        ps->badsessopt(1);
1302                        break;
1303                }
1304                rh = (rtcphdr*)ep;
1305        }
1306        return;
1307}
Note: See TracBrowser for help on using the browser.