root/vic/branches/cc/rtp/session.cpp @ 4248

Revision 4248, 31.4 KB (checked in by soohyunc, 6 years ago)

XR header and block contents separated. this is because we want to use "chunks"
as AckVec? and the time stamp depending upon the block type (BT).

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*-
2 * Copyright (c) 1993-1994 The Regents of the University of California.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 * 3. All advertising materials mentioning features or use of this software
14 *    must display the following acknowledgement:
15 *      This product includes software developed by the University of
16 *      California, Berkeley and the Network Research Group at
17 *      Lawrence Berkeley Laboratory.
18 * 4. Neither the name of the University nor of the Laboratory may be used
19 *    to endorse or promote products derived from this software without
20 *    specific prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
23 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
26 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
33 *
34 * $Id$
35 */
36static const char rcsid[] =
37    "@(#) $Header$ (LBL)";
38
39#include "config.h"
40#include <math.h>
41#include <errno.h>
42#include <string.h>
43#ifdef WIN32
44extern "C" int getpid();
45#endif
46#include "source.h"
47#include "vic_tcl.h"
48#include "media-timer.h"
49#include "crypt.h"
50#include "timer.h"
51#include "ntp-time.h"
52#include "session.h"
53#include "cc/tfwc_sndr.h"
54#include "cc/tfwc_rcvr.h"
55
56/* added to support the mbus
57#include "mbus_handler.h"*/
58
59
60static class SessionMatcher : public Matcher {
61    public:
62                SessionMatcher() : Matcher("session") {}
63                TclObject* match(const char* id) {
64                        if (strcmp(id, "audio/rtp") == 0)
65                                return (new AudioSessionManager);
66                        else if (strcmp(id, "video/rtp") == 0)
67                                return (new VideoSessionManager);
68                        return (0);
69                }
70} session_matcher;
71
72int VideoSessionManager::check_format(int fmt) const
73{
74        switch(fmt) {
75                case RTP_PT_RAW:
76                case RTP_PT_CELLB:
77                case RTP_PT_JPEG:
78                case RTP_PT_CUSEEME:
79                case RTP_PT_NV:
80                case RTP_PT_CPV:
81                case RTP_PT_H261:
82                case RTP_PT_BVC:
83                case RTP_PT_H261_COMPAT:/*XXX*/
84                case RTP_PT_H263:
85                case RTP_PT_MPEG4:
86                case RTP_PT_H264:
87                case RTP_PT_H263P:
88                case RTP_PT_LDCT:
89                case RTP_PT_PVH:
90                case RTP_PT_DV:
91#ifdef USE_H261AS
92                case RTP_PT_H261AS:
93#endif 
94                return (1);
95        }
96        return (0);
97}
98
99int AudioSessionManager::check_format(int fmt) const
100{
101        switch (fmt) {
102        case RTP_PT_PCMU:
103        case RTP_PT_CELP:
104        case RTP_PT_GSM:
105        case RTP_PT_DVI:
106        case RTP_PT_LPC:
107                return (1);
108        }
109        return (0);
110}
111
112
113static SessionManager* manager;
114
115void
116adios()
117{
118        if (SourceManager::instance().localsrc() != 0)
119                manager->send_bye();
120        exit(0);
121}
122
123/*void ReportTimer::timeout()
124{
125sm_.send_report();
126}*/
127
128void DataHandler::dispatch(int)
129{
130        sm_->recv(this);
131}
132
133void CtrlHandler::dispatch(int)
134{
135        sm_->recv(this);
136}
137
138CtrlHandler::CtrlHandler()
139: ctrl_inv_bw_(0.),
140ctrl_avg_size_(128.),
141rint_(0.0) //SV-XXX: Debian
142{
143}
144
145inline void CtrlHandler::schedule_timer()
146{
147        msched(int(fmod(double(random()), rint_) + rint_ * .5 + .5));
148}
149
150void CtrlHandler::net(Network* n)
151{
152        DataHandler::net(n);
153        cancel();
154        if (n != 0) {
155        /*
156        * schedule a timer for our first report using half the
157        * min ctrl interval.  This gives us some time before
158        * our first report to learn about other sources so our
159        * next report interval will account for them.  The avg
160        * ctrl size was initialized to 128 bytes which is
161        * conservative (it assumes everyone else is generating
162        * SRs instead of RRs).
163                */
164                double rint = ctrl_avg_size_ * ctrl_inv_bw_;
165                if (rint < CTRL_MIN_RPT_TIME / 2. * 1000.)
166                        rint = CTRL_MIN_RPT_TIME / 2. * 1000.;
167                rint_ = rint;
168                schedule_timer();
169        }
170}
171
172void CtrlHandler::sample_size(int cc)
173{
174        ctrl_avg_size_ += CTRL_SIZE_GAIN * (double(cc + 28) - ctrl_avg_size_);
175}
176
177void CtrlHandler::adapt(int nsrc, int nrr, int we_sent)
178{
179/*
180* compute the time to the next report.  we do this here
181* because we need to know if there were any active sources
182* during the last report period (nrr above) & if we were
183* a source.  The bandwidth limit for ctrl traffic was set
184* on startup from the session bandwidth.  It is the inverse
185* of bandwidth (ie., ms/byte) to avoid a divide below.
186        */
187        double ibw = ctrl_inv_bw_;
188        if (nrr) {
189                /* there were active sources */
190                if (we_sent) {
191                        ibw *= 1./CTRL_SENDER_BW_FRACTION;
192                        nsrc = nrr;
193                } else {
194                        ibw *= 1./CTRL_RECEIVER_BW_FRACTION;
195                        nsrc -= nrr;
196                }
197        }
198        double rint = ctrl_avg_size_ * double(nsrc) * ibw;
199        if (rint < CTRL_MIN_RPT_TIME * 1000.)
200                rint = CTRL_MIN_RPT_TIME * 1000.;
201        rint_ = rint;
202}
203
204void CtrlHandler::timeout()
205{
206        sm_->announce(this);
207        schedule_timer();
208}
209
210//SV-XXX: rearranged initialization order to shut up gcc4
211SessionManager::SessionManager()
212//      : dh_(*this), ch_(*this), rt_(*this),
213: mb_(mbus_handler_engine, NULL),
214lipSyncEnabled_(0),
215badversion_(0),
216badoptions_(0),
217badfmt_(0),
218badext_(0),
219nrunt_(0),
220last_np_(0),
221sdes_seq_(0),
222rtcp_inv_bw_(0.),
223rtcp_avg_size_(128.),
224confid_(-1),
225seqno_(0),              // RTP packet sequence number (from RTP header)
226lastseq_(0),    // last received packet's seqno
227ackvec_(0)              // bit vector (AckVec)
228{
229        /*XXX For adios() to send bye*/
230        manager = this;
231       
232        for (int i = 0; i < NLAYER; ++i) {
233                dh_[i].manager(this);
234                ch_[i].manager(this);
235        }
236       
237        /*XXX*/
238        pktbuf_ = new u_char[2 * RTP_MTU];
239        pool_ = new BufferPool;
240       
241        /*
242        * schedule a timer for our first report using half the
243        * min rtcp interval.  This gives us some time before
244        * our first report to learn about other sources so our
245        * next report interval will account for them.  The avg
246        * rtcp size was initialized to 128 bytes which is
247        * conservative (it assumes everyone else is generating
248        * SRs instead of RRs).
249        */
250        double rint = rtcp_avg_size_ * rtcp_inv_bw_;
251        if (rint < RTCP_MIN_RPT_TIME / 2. * 1000.)
252                rint = RTCP_MIN_RPT_TIME / 2. * 1000.;
253        rint_ = rint;
254        //rt_.msched(int(fmod(double(random()), rint) + rint * .5 + .5));
255}
256
257SessionManager::~SessionManager()
258{
259        if (pktbuf_)
260                delete[] pktbuf_;
261       
262        delete pool_;
263}
264
265u_int32_t SessionManager::alloc_srcid(Address & addr) const
266{
267        timeval tv;
268        ::gettimeofday(&tv, 0);
269        u_int32_t srcid = u_int32_t(tv.tv_sec + tv.tv_usec);
270        srcid += (u_int32_t)getuid();
271        srcid += (u_int32_t)getpid();
272/* __IPv6 changed srcid computation */
273        for(unsigned int i = 0; i < (addr.length() % sizeof(u_int32_t)); i++) {
274                srcid += ((u_int32_t*)((const void*)addr))[i];
275        }
276        return (srcid);
277}
278
279extern char* onestat(char* cp, const char* name, u_long v);
280
281char* SessionManager::stats(char* cp) const
282{
283        cp = onestat(cp, "Bad-RTP-version", badversion_);
284        cp = onestat(cp, "Bad-RTPv1-options", badoptions_);
285        cp = onestat(cp, "Bad-Payload-Format", badfmt_);
286        cp = onestat(cp, "Bad-RTP-Extension", badext_);
287        cp = onestat(cp, "Runts", nrunt_);
288        Crypt* p = dh_[0].net()->crypt();
289        if (p != 0) {
290                cp = onestat(cp, "Crypt-Bad-Length", p->badpktlen());
291                cp = onestat(cp, "Crypt-Bad-P-Bit", p->badpbit());
292        }
293        /*XXX*/
294        if (ch_[0].net() != 0) {
295                Crypt* p = ch_[0].net()->crypt();
296                if (p != 0) {
297                        cp = onestat(cp, "Crypt-Ctrl-Bad-Length", p->badpktlen());
298                        cp = onestat(cp, "Crypt-Ctrl-Bad-P-Bit", p->badpbit());
299                }
300        }
301        *--cp = 0;
302        return (cp);
303}
304
305int SessionManager::command(int argc, const char*const* argv)
306{
307        Tcl& tcl = Tcl::instance();
308        char* cp = tcl.buffer();
309        if (argc == 2) {
310                if (strcmp(argv[1], "active") == 0) {
311                        SourceManager::instance().sortactive(cp);
312                        tcl.result(cp);
313                        return (TCL_OK);
314                }
315                if (strcmp(argv[1], "local-addr-heuristic") == 0) {
316                        strcpy(cp, intoa(LookupLocalAddr()));
317                        tcl.result(cp);
318                        return (TCL_OK);
319                }
320                if (strcmp(argv[1], "stats") == 0) {
321                        stats(cp);
322                        tcl.result(cp);
323                        return (TCL_OK);
324                }
325                if (strcmp(argv[1], "nb") == 0) {
326                        sprintf(cp, "%u", 8 * nb_);
327                        tcl.result(cp);
328                        return (TCL_OK);
329                }
330                if (strcmp(argv[1], "nf") == 0) {
331                        sprintf(cp, "%u", nf_);
332                        tcl.result(cp);
333                        return (TCL_OK);
334                }
335                if (strcmp(argv[1], "np") == 0 ||
336                    strcmp(argv[1], "ns") == 0) {
337                        sprintf(cp, "%u", np_);
338                        tcl.result(cp);
339                        return (TCL_OK);
340                }
341                if (strcmp(argv[1], "lip-sync") == 0) {
342                        sprintf(cp, "%u", lipSyncEnabled_);
343                        tcl.result(cp);
344                        return (TCL_OK);
345                }
346
347        } else if (argc == 3) {
348                if (strcmp(argv[1], "sm") == 0) {
349                        sm_ = (SourceManager*)TclObject::lookup(argv[2]);
350                        return (TCL_OK);
351                }
352                if (strcmp(argv[1], "name") == 0) {
353                        Source* s = SourceManager::instance().localsrc();
354                        s->sdes(RTCP_SDES_NAME, argv[2]);
355                        return (TCL_OK);
356                }
357                if (strcmp(argv[1], "email") == 0) {
358                        Source* s = SourceManager::instance().localsrc();
359                        s->sdes(RTCP_SDES_EMAIL, argv[2]);
360                        return (TCL_OK);
361                }
362                if (strcmp(argv[1], "random-srcid") == 0) {
363                        Address * addrp;
364                        if ((addrp = (dh_[0].net())->alloc(argv[2])) !=0 ) { //SV-XXX: placed ()'s against truth check == NULL
365                          sprintf(cp, "%u", alloc_srcid(*addrp));
366                          delete addrp;
367                        }
368                        tcl.result(cp);
369                        return (TCL_OK);
370                }
371                if (strcmp(argv[1], "data-net") == 0) {
372                        dh_[0].net((Network*)TclObject::lookup(argv[2]));
373                        return (TCL_OK);
374                }
375                if (strcmp(argv[1], "ctrl-net") == 0) {
376                        ch_[0].net((Network*)TclObject::lookup(argv[2]));
377                        return (TCL_OK);
378                }
379                if (strcmp(argv[1], "max-bandwidth") == 0) {
380                        double bw = atof(argv[2]) / 8.;
381                        rtcp_inv_bw_ = 1. / (bw * RTCP_SESSION_BW_FRACTION);
382                        return (TCL_OK);
383                }
384                if (strcmp(argv[1], "confid") == 0) {
385                        confid_ = atoi(argv[2]);
386                        return (TCL_OK);
387                }
388                if (strcmp(argv[1], "data-bandwidth") == 0) {
389                        /*XXX assume value in range */
390                        bps(atoi(argv[2]));
391                        return (TCL_OK);
392                }
393                if (strcmp(argv[1], "mtu") == 0) {
394                        mtu_ = atoi(argv[2]);
395                        return (TCL_OK);
396                }
397                if (strcmp(argv[1], "loopback") == 0) {
398                        loopback_ = atoi(argv[2]);
399                        return (TCL_OK);
400                }
401                if (strcmp(argv[1], "lip-sync") == 0) {
402                        lipSyncEnabled_ = atoi(argv[2]);
403                        return (TCL_OK);
404                }
405
406        }  else if (argc == 4) {
407                if (strcmp(argv[1], "data-net") == 0) {
408                        u_int layer = atoi(argv[3]);
409                        if (layer >= NLAYER)
410                                abort();
411                        if (*argv[2] == 0) {
412                                dh_[layer].net(0);
413                                return (TCL_OK);
414                        }
415                        Network* net = (Network*)TclObject::lookup(argv[2]);
416                        if (net == 0) {
417                                tcl.resultf("no network %s", argv[2]);
418                                return (TCL_ERROR);
419                        }
420                        if (net->rchannel() < 0) {
421                                tcl.resultf("network not open");
422                                return (TCL_ERROR);
423                        }
424                        dh_[layer].net(net);
425                        return (TCL_OK);
426                }
427                if (strcmp(argv[1], "ctrl-net") == 0) {
428                        u_int layer = atoi(argv[3]);
429                        if (layer >= NLAYER)
430                                abort();
431                        if (*argv[2] == 0) {
432                                ch_[layer].net(0);
433                                return (TCL_OK);
434                        }
435                        Network* net = (Network*)TclObject::lookup(argv[2]);
436                        if (net == 0) {
437                                tcl.resultf("no network %s", argv[2]);
438                                return (TCL_ERROR);
439                        }
440                        if (net->rchannel() < 0) {
441                                tcl.resultf("network not open");
442                                return (TCL_ERROR);
443                        }
444                        ch_[layer].net(net);
445                        return (TCL_OK);
446                }
447                return (Transmitter::command(argc, argv));
448        }
449        //should not be reached
450        return (TCL_ERROR);
451}
452
453void SessionManager::transmit(pktbuf* pb)
454{
455        //mh_.msg_iov = pb->iov;
456        //      dh_[.net()->send(mh_);
457                //debug_msg("L %d,",pb->layer);
458        // Using loop_layer for now to restrict transmission as well
459        if (pb->layer < loop_layer_) {
460        //      if ( pb->layer <0 ) exit(1);
461                Network* n = dh_[pb->layer].net();
462                if (n != 0)
463                        n->send(pb);
464        }
465}
466
467u_char* SessionManager::build_sdes_item(u_char* p, int code, Source& s)
468{
469        const char* value = s.sdes(code);
470        if (value != 0) {
471                int len = strlen(value);
472                *p++ = code;
473                *p++ = len;
474                memcpy(p, value, len);
475                p += len;
476        }
477        return (p);
478}
479
480int SessionManager::build_sdes(rtcphdr* rh, Source& ls)
481{
482        int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_SDES;
483        rh->rh_flags = htons(flags);
484        rh->rh_ssrc = ls.srcid();
485        u_char* p = (u_char*)(rh + 1);
486        p = build_sdes_item(p, RTCP_SDES_CNAME, ls);
487
488        /*
489         * We always send a cname plus one other sdes
490         * There's a schedule for what we send sequenced by sdes_seq_:
491         *   - send 'email' every 0th & 4th packet
492         *   - send 'note' every 2nd packet
493         *   - send 'tool' every 6th packet
494         *   - send 'name' in all the odd slots
495         * (if 'note' is not the empty string, we switch the roles
496         *  of name & note)
497         */
498        int nameslot, noteslot;
499        const char* note = ls.sdes(RTCP_SDES_NOTE);
500        if (note) {
501                if (*note) {
502                        nameslot = RTCP_SDES_NOTE;
503                        noteslot = RTCP_SDES_NAME;
504                } else {
505                        nameslot = RTCP_SDES_NAME;
506                        noteslot = RTCP_SDES_NOTE;
507                }
508        } else {
509                nameslot = RTCP_SDES_NAME;
510                noteslot = RTCP_SDES_NAME;
511        }
512        u_int seq = (++sdes_seq_) & 0x7;
513        switch (seq) {
514
515        case 0case 4:
516                p = build_sdes_item(p, RTCP_SDES_EMAIL, ls);
517                break;
518
519        case 2:
520                p = build_sdes_item(p, noteslot, ls);
521                break;
522        case 6:
523                p = build_sdes_item(p, RTCP_SDES_TOOL, ls);
524                break;
525        default:
526                p = build_sdes_item(p, nameslot, ls);
527        }
528        int len = p - (u_char*)rh;
529        int pad = 4 - (len & 3);
530        len += pad;
531        rh->rh_len = htons((len >> 2) - 1);
532        while (--pad >= 0)
533                *p++ = 0;
534
535        return (len);
536}
537
538int SessionManager::build_app(rtcphdr* rh, Source& ls, const char *name,
539                void *data, int datalen)
540{
541  int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_APP;
542  rh->rh_flags = htons(flags);
543  rh->rh_ssrc = ls.srcid();
544  u_char* p = (u_char*)(rh + 1);
545    int len;
546   
547    // write the name field
548    len = strlen(name);
549    if( len < 4 ) {
550        memcpy(p,name,len);
551        p += len;
552       
553        // pad short names
554        while( p - (u_char*)(rh+1) < 4 )
555            *p++ = 0;
556    }
557    else {
558        // use first four chars of given name
559        memcpy(p,name,4);
560        p += 4;
561    }
562   
563    // write the app data
564    memcpy(p,data,datalen);
565    p += datalen;
566   
567    // pad as needed
568    len = p - (u_char*)rh;
569    int pad = 4 - (len & 3);
570    while( --pad >= 0 )
571        *p++ = 0;
572  len = p - (u_char*)rh;
573
574    // set the length now that it's known
575  rh->rh_len = htons((len >> 2) - 1);
576
577  return (len);
578}
579
580/*void SessionManager::send_report()
581{
582        send_report(0);
583}
584
585  void SessionManager::send_bye()
586  {
587  send_report(1);
588}*/
589
590// SessionManager is no longer used as Timer - Each
591// CtrlHandler has its own Timer which calls this;
592void SessionManager::announce(CtrlHandler* ch)
593{
594        send_report(ch, 0);
595}
596
597/*XXX check for buffer overflow*/
598/*
599* Send an RTPv2 report packet.
600*/
601void SessionManager::send_report(CtrlHandler* ch, int bye, int app)
602{
603        UNUSED(app); //SV-XXX: unused
604
605        SourceManager& sm = SourceManager::instance();
606        Source& s = *sm.localsrc();
607        rtcphdr* rh = (rtcphdr*)pktbuf_;
608        rh->rh_ssrc = s.srcid();
609        int flags = RTP_VERSION << 14;
610        int layer = ch - ch_; //LLL
611        Source::Layer& sl = s.layer(layer);
612        timeval now = unixtime();
613        sl.lts_ctrl(now);
614        int we_sent = 0;
615        rtcp_rr* rr;
616        rtcp_xr_hdr* xrh;       // extended report header
617        rtcp_xr_blk* xrb;       // extended report block
618        Tcl& tcl = Tcl::instance();
619
620        /*
621         * If we've sent data since our last sender report send a
622         * new report.  The MediaTimer check is to make sure we still
623         * have a grabber -- if not, we won't be able to interpret the
624         * media timestamps so there's no point in sending an SR.
625         */
626        MediaTimer* mt = MediaTimer::instance();
627        if (sl.np() != last_np_ && mt) {
628                last_np_ = sl.np();
629                we_sent = 1;
630                flags |= RTCP_PT_SR;
631                rtcp_sr* sr = (rtcp_sr*)(rh + 1);
632                sr->sr_ntp = ntp64time(now);
633                HTONL(sr->sr_ntp.upper);
634                HTONL(sr->sr_ntp.lower);
635                sr->sr_ts = htonl(mt->ref_ts());
636                sr->sr_np = htonl(sl.np());
637                sr->sr_nb = htonl(sl.nb());
638                rr = (rtcp_rr*)(sr + 1);
639        } else {
640                flags |= RTCP_PT_RR;
641                rr = (rtcp_rr*)(rh + 1);
642        }
643
644        // if CC is turned on, we need XR report
645        if (is_cc_on()) {
646                flags |= RTCP_PT_XR;            // setting flags to XR
647                xrh = (rtcp_xr_hdr*)(rh + 1);   // XR header
648                int xrlen = xrh->xr_flags << 16 >> 16;  // XR length
649                xrb = (rtcp_xr_blk*)(xrh + xrlen + 1);  // XR block
650                xrb->begin_seq = lastseq_;// this will be used for ackofack
651                xrb->end_seq = seqno_ + 1;// as defined in RFC3611 section 4.1
652                xrb->chunk = get_ackvec();      // ackvec
653        }
654
655        int nrr = 0;
656        int nsrc = 0;
657        /*
658        * we don't want to inflate report interval if user has set
659        * the flag that causes all sources to be 'kept' so we
660        * consider sources 'inactive' if we haven't heard a control
661        * msg from them for ~32 reporting intervals.
662        */
663        //LLL   u_int inactive = u_int(rint_ * (32./1000.));
664        u_int inactive = u_int(ch->rint() * (32./1000.));
665        if (inactive < 2)
666                inactive = 2;
667        for (Source* sp = sm.sources(); sp != 0; sp = sp->next_) {
668                ++nsrc;
669                Source::Layer& sl = sp->layer(layer);
670                //              int received = sp->np() - sp->snp();
671                int received = sl.np() - sl.snp();
672                if (received == 0) {
673                        //              if (u_int(now.tv_sec - sp->lts_ctrl().tv_sec) > inactive)
674                        if (u_int(now.tv_sec - sl.lts_ctrl().tv_sec) > inactive)
675                                --nsrc;
676                        continue;
677                }
678                //              sp->snp(sp->np());
679                sl.snp(sl.np());
680                rr->rr_srcid = sp->srcid();
681                //              int expected = sp->ns() - sp->sns();
682                int expected = sl.ns() - sl.sns();
683                //              sp->sns(sp->ns());
684                sl.sns(sl.ns());
685                u_int32_t v;
686                int lost = expected - received;
687                if (lost <= 0)
688                        v = 0;
689                else
690                        /* expected != 0 if lost > 0 */
691                        v = ((lost << 8) / expected) << 24;
692                /* XXX should saturate on over/underflow */
693                //              v |= (sp->ns() - sp->np()) & 0xffffff;
694                v |= (sl.ns() - sl.np()) & 0xffffff;
695                rr->rr_loss = htonl(v);
696                //              rr->rr_ehsr = htonl(sp->ehs());
697                rr->rr_ehsr = htonl(sl.ehs());
698                rr->rr_dv = (sp->handler() != 0) ? sp->handler()->delvar() : 0;
699                //              rr->rr_lsr = htonl(sp->sts_ctrl());
700                rr->rr_lsr = htonl(sl.sts_ctrl());
701                //              if (sp->lts_ctrl().tv_sec == 0)
702                if (sl.lts_ctrl().tv_sec == 0)
703                        rr->rr_dlsr = 0;
704                else {
705                        u_int32_t ntp_now = ntptime(now);
706                        //                      u_int32_t ntp_then = ntptime(sp->lts_ctrl());
707                        u_int32_t ntp_then = ntptime(sl.lts_ctrl());
708                        rr->rr_dlsr = htonl(ntp_now - ntp_then);
709                }
710                ++rr;
711                if (++nrr >= 31)
712                        break;
713        }
714        flags |= nrr << 8;
715        rh->rh_flags = htons(flags);
716        int len = (u_char*)rr - pktbuf_;
717        rh->rh_len = htons((len >> 2) - 1);
718
719        if (bye)
720                len += build_bye((rtcphdr*)rr, s);
721        else
722                len += build_sdes((rtcphdr*)rr, s);
723       
724        // build "site" app data if specified
725        const char *data = tcl.attr("site");
726        if(data)
727        {
728            rr = (rtcp_rr*)(((u_char*)rh) + len);
729            len += build_app((rtcphdr*)rr, s, "site", (void *)data, strlen(data));
730        }
731        //LLL   ch_.send(pktbuf_, len);
732        ch->send(pktbuf_, len);
733       
734        /*
735      rtcp_avg_size_ += RTCP_SIZE_GAIN * (double(len + 28) - rtcp_avg_size_);
736         
737          // compute the time to the next report.  we do this here
738          // because we need to know if there were any active sources
739          // during the last report period (nrr above) & if we were
740          // a source.  The bandwidth limit for rtcp traffic was set
741          // on startup from the session bandwidth.  It is the inverse
742          // of bandwidth (ie., ms/byte) to avoid a divide below.
743       
744        //      double ibw = rtcp_inv_bw_;
745        if (nrr) {
746        // there were active sources
747        //              if (we_sent) {
748        ibw *= 1./RTCP_SENDER_BW_FRACTION;
749        nsrc = nrr;
750        } else {
751        ibw *= 1./RTCP_RECEIVER_BW_FRACTION;
752        nsrc -= nrr;
753        }
754        }
755        double rint = rtcp_avg_size_ * double(nsrc) * ibw;
756        if (rint < RTCP_MIN_RPT_TIME * 1000.)
757                rint = RTCP_MIN_RPT_TIME * 1000.;
758        rint_ = rint;
759        rt_.msched(int(fmod(double(random()), rint) + rint * .5 + .5));
760        */
761       
762        // Call timer adaption for each layer
763        ch->adapt(nsrc, nrr, we_sent);
764        ch->sample_size(len);
765       
766        //      sm.CheckActiveSources(rint);
767        if (layer == 0)
768                sm.CheckActiveSources(ch->rint());
769       
770}
771
772int SessionManager::build_bye(rtcphdr* rh, Source& ls)
773{
774        int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_BYE;
775        rh->rh_flags = ntohs(flags);
776        rh->rh_len = htons(1);
777        rh->rh_ssrc = ls.srcid();
778        return (8);
779}
780
781/*
782 * receive an RTP data packet
783 */
784void SessionManager::recv(DataHandler* dh)
785{
786        int layer = dh - dh_;
787        pktbuf* pb = pool_->alloc(layer);
788        Address * addrp;
789        /* leave room in case we need to expand rtpv1 into an rtpv2 header */
790        /* XXX the free mem routine didn't like it ... */
791        //u_char* bp = &pktbuf_[4];
792        //u_char* bp = pktbuf_;
793       
794        int cc = dh->recv(pb->data, sizeof(pb->data), addrp);
795        //int cc = dh->recv(bp, 2 * RTP_MTU - 4, addrp);
796        if (cc <= 0) {
797                pb->release();
798                return;
799        }
800
801        rtphdr* rh = (rtphdr*)pb->data;
802        seqno_ = ntohs(rh->rh_seqno);   // get received packet seqno
803
804    // Ignore loopback packets
805        if (!loopback_) {
806                //rtphdr* rh = (rtphdr*)pb->data;
807                SourceManager& sm = SourceManager::instance();
808                if (rh->rh_ssrc == (*sm.localsrc()).srcid()) {
809                        debug_msg("(loopback) seqno:    %d\n", seqno_);
810                        pb->release();  // releasing loopback packet
811                        return;
812                }
813        } // now, loopback packets ignored (if disabled)
814
815        // set received seqno - passing seqno to TfwcRcvr
816        set_received_seqno(seqno_, lastseq_);
817        lastseq_ = seqno_;      // set last seqno
818
819        int version = pb->data[0] >> 6;
820        //int version = *(u_char*)rh >> 6;
821        if (version != 2) {
822                ++badversion_;
823                pb->release();
824                return;
825        }
826        if (cc < (int)sizeof(rtphdr)) {
827                ++nrunt_;
828                pb->release();
829                return;
830        }
831        pb->len = cc;
832       
833        //bp += sizeof(*rh);
834        //cc -= sizeof(*rh);
835        demux(pb, *addrp);
836}
837
838void SessionManager::demux(pktbuf* pb, Address & addr)
839{
840        rtphdr* rh = (rtphdr*)pb->data;
841        u_int32_t srcid = rh->rh_ssrc;
842        int flags = ntohs(rh->rh_flags);
843        // for LIP SYNC
844        //SV-XXX: unused: u_char *pkt = pb->data - sizeof(*rh);
845
846        if ((flags & RTP_X) != 0) {
847        /*
848        * the minimal-control audio/video profile
849        * explicitly forbids extensions
850                */
851                ++badext_;
852                pb->release();
853                return;
854        }
855
856        /*
857         * Check for illegal payload types.  Most likely this is
858         * a session packet arriving on the data port.
859         */
860        int fmt = flags & 0x7f;
861        if (!check_format(fmt)) {
862                ++badfmt_;
863                pb->release();
864                return;
865        }
866
867        SourceManager& sm = SourceManager::instance();
868        u_int16_t seqno = ntohs(rh->rh_seqno);
869        Source* s = sm.demux(srcid, addr, seqno, pb->layer);
870        if (s == 0) {
871        /*
872        * Takes a pair of validated packets before we will
873        * believe the source.  This prevents a runaway
874        * allocation of Source data structures for a
875        * stream of garbage packets.
876                */
877                pb->release();
878                return;
879        }
880        /* inform this source of the mbus */
881        s->mbus(&mb_);
882       
883        Source::Layer& sl = s->layer(pb->layer);
884        timeval now = unixtime();
885        //      s->lts_data(now);
886        sl.lts_data(now);
887        //      s->sts_data(rh->rh_ts);
888        sl.sts_data(rh->rh_ts);
889       
890        // extract CSRC count (CC field); increment pb->dp data pointer & adjust length accordingly
891        int cnt = (flags >> 8) & 0xf;
892        if (cnt > 0) {
893                //u_char* nh = (u_char*)rh + (cnt << 2);
894                rtphdr hdr = *rh;
895                pb->dp += (cnt << 2);
896                pb->len -= (cnt << 2);
897                u_int32_t* bp = (u_int32_t*)(rh + 1);
898                while (--cnt >= 0) {
899                        u_int32_t csrc = *(u_int32_t*)bp;
900                        bp += 4;
901                        Source* cs = sm.lookup(csrc, srcid, addr);
902                        //                      cs->lts_data(now);
903                        cs->layer(pb->layer).lts_data(now);
904                        cs->action();
905                }
906                //              rtphdr hdr = *rh;
907                //              rh = (rtphdr*)nh;
908                /*XXX move header up so it's contiguous with data*/
909                rh = (rtphdr*)pb->dp;
910                *rh = hdr;
911        } else
912                s->action();
913
914        if (s->sync() && lipSyncEnabled()) { 
915                /*
916                 * Synchronisation is enabled on this source;
917                 * video packets have to
918                 * be buffered, their playout point scheduled, and the
919                 * playout delays communicated with the audio tool ...
920                 */ 
921
922                /*
923                * XXX bit rate doesn't include rtpv1 options;
924                * but v1 is going away anyway.
925                */
926                //              int dup = s->cs(seqno);
927                //              s->np(1);
928                //              s->nb(cc + sizeof(*rh));
929                int dup = sl.cs(seqno, s);
930                sl.np(1);
931                sl.nb(pb->len);
932                if (dup) {
933                        pb->release();
934                        return;
935                }
936                if (flags & RTP_M) // check if reach frame boundaries
937                        //                      s->nf(1);
938                        sl.nf(1);
939               
940                //s->recv(pkt, rh, pb, pb->len); // this should invoke Source::recv and buffer
941                //s->recv(bp); // this should invoke Source::recv and buffer
942               
943                pktbuf_ = (u_char*)new_blk();
944        } /* synced */
945
946        else { /* ... playout video packets as they arrive */
947                /*
948                 * This is a data packet.  If the source needs activation,
949                 * or the packet format has changed, deal with this.
950                 * Then, hand the packet off to the packet handler.
951                 * XXX might want to be careful about flip-flopping
952                 * here when format changes due to misordered packets
953                 * (easy solution -- keep rtp seqno of last fmt change).
954                 */
955                PacketHandler* h = s->handler();
956                if (h == 0)
957                        h = s->activate(fmt);
958                else if (s->format() != fmt)
959                        h = s->change_format(fmt);
960               
961                        /*
962                        * XXX bit rate doesn't include rtpv1 options;
963                        * but v1 is going away anyway.
964                */
965                //              int dup = s->cs(seqno);
966                //              s->np(1);
967                //              s->nb(cc + sizeof(*rh));
968                int dup = sl.cs(seqno, s);
969                sl.np(1);
970                sl.nb(pb->len);
971                if (dup){
972                        pb->release();
973                        return;
974                }
975                if (flags & RTP_M)
976                        //                  s->nf(1);
977                        sl.nf(1);
978#ifdef notdef
979        /* This should move to the handler */
980        /*XXX could get rid of hdrlen and move run check into recv method*/
981               
982                int hlen = h->hdrlen();
983                cc -= hlen;
984                if (cc < 0) {
985                        //                  s->runt(1);
986                        sl.runt(1);
987                        pb->release();
988                        return;
989                }
990#endif
991                if (s->mute()) {
992                        pb->release();
993                        return;
994                }
995                //h->recv(rh, bp + hlen, cc);
996                h->recv(pb);
997        } /* not sync-ed */
998
999}
1000
1001void SessionManager::parse_rr_records(u_int32_t, rtcp_rr*, int,
1002                                      const u_char*, Address &)
1003{
1004}
1005                                     
1006
1007void SessionManager::parse_sr(rtcphdr* rh, int flags, u_char*ep,
1008                                                          Source* ps, Address & addr, int layer)
1009{
1010        rtcp_sr* sr = (rtcp_sr*)(rh + 1);
1011        Source* s;
1012        u_int32_t ssrc = rh->rh_ssrc;
1013        if (ps->srcid() != ssrc)
1014                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
1015        else
1016                s = ps;
1017       
1018        Source::Layer& sl = s->layer(layer);
1019       
1020        timeval now = unixtime();
1021
1022        sl.lts_ctrl(now);
1023        sl.sts_ctrl(ntohl(sr->sr_ntp.upper) << 16 |
1024                ntohl(sr->sr_ntp.lower) >> 16);
1025       
1026        //int cnt = flags >> 8 & 0x1f;
1027        //parse_rr_records(ssrc, (rtcp_rr*)(sr + 1), cnt, ep, addr);
1028       
1029        /*s->lts_ctrl(now);
1030        s->sts_ctrl(ntohl(sr->sr_ntp.upper) << 16 |
1031        ntohl(sr->sr_ntp.lower) >> 16);*/
1032       
1033        s->rtp_ctrl(ntohl(sr->sr_ts));
1034        u_int32_t t = ntptime(now);
1035        s->map_ntp_time(t);
1036        s->map_rtp_time(s->convert_time(t));
1037        s->rtp2ntp(1);
1038        //printf("Got SR\n");
1039
1040        int cnt = flags >> 8 & 0x1f;
1041        parse_rr_records(ssrc, (rtcp_rr*)(sr + 1), cnt, ep, addr);
1042}
1043
1044void SessionManager::parse_rr(rtcphdr* rh, int flags, u_char* ep,
1045                                                          Source* ps, Address & addr, int layer)
1046{
1047        Source* s;
1048        u_int32_t ssrc = rh->rh_ssrc;
1049        if (ps->srcid() != ssrc)
1050                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
1051        else
1052                s = ps;
1053       
1054        s->layer(layer).lts_ctrl(unixtime());
1055        int cnt = flags >> 8 & 0x1f;
1056        parse_rr_records(ssrc, (rtcp_rr*)(rh + 1), cnt, ep, addr);
1057}
1058
1059void SessionManager::parse_xr(rtcphdr* rh, int flags, u_char* ep,
1060                                                          Source* ps, Address & addr, int layer)
1061{
1062
1063        Source* s;
1064        u_int32_t ssrc = rh->rh_ssrc;
1065        if (ps->srcid() != ssrc)
1066                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
1067        else
1068                s = ps;
1069
1070        s->layer(layer).lts_ctrl(unixtime());
1071        int cnt = flags >> 8 & 0x1f;
1072        parse_xr_records(ssrc, (rtcp_xr_hdr*)(rh + 1), cnt, ep, addr);
1073}
1074
1075void SessionManager::parse_xr_records(u_int32_t ssrc, rtcp_xr_hdr* xrh, int cnt,
1076                                      const u_char* ep, Address & addr)
1077{
1078        debug_msg("XXX parse_xr_records\n");
1079        UNUSED(cnt);
1080        UNUSED(ep);
1081        UNUSED(addr);
1082
1083        rtcp_xr_blk* xrb;
1084        int xrlen = xrh->xr_flags << 16 >> 16;
1085        xrb = (rtcp_xr_blk*)(xrh + xrlen + 1);
1086        /*
1087         * if AoA is received, trim ackvec and send a new ackvec
1088         * if AckVec is received, then parse it to TfwcSndr
1089         */
1090        ackvec_ = xrb->chunk;
1091        ackofack_ = xrb->begin_seq;
1092        tfwc_sndr_recv(ackvec_);        // parse AckVec
1093}
1094
1095int SessionManager::sdesbody(u_int32_t* p, u_char* ep, Source* ps,
1096                                                Address & addr, u_int32_t ssrc, int layer)
1097{
1098        Source* s;
1099        u_int32_t srcid = *p;
1100        if (ps->srcid() != srcid)
1101                s = SourceManager::instance().lookup(srcid, ssrc, addr);
1102        else
1103                s = ps;
1104        if (s == 0)
1105                return (0);
1106                /*
1107                * Note ctrl packet since we will never see any direct ctrl packets
1108                * from a source through a mixer (and we don't want the source to
1109                * time out).
1110        */
1111        s->layer(layer).lts_ctrl(unixtime());
1112       
1113        u_char* cp = (u_char*)(p + 1);
1114        while (cp < ep) {
1115                char buf[256];
1116
1117                u_int type = cp[0];
1118                if (type == 0) {
1119                        /* end of chunk */
1120                        return (((cp - (u_char*)p) >> 2) + 1);
1121                }
1122                u_int len = cp[1];
1123                u_char* eopt = cp + len + 2;
1124                if (eopt > ep)
1125                        return (0);
1126
1127                if (type >= RTCP_SDES_MIN && type <= RTCP_SDES_MAX) {
1128                        memcpy(buf, (char*)&cp[2], len);
1129                        buf[len] = 0;
1130                        s->sdes(type, buf);
1131                } // else
1132                        /*XXX*/;
1133
1134                cp = eopt;
1135        }
1136        return (0);
1137}
1138
1139void SessionManager::parse_sdes(rtcphdr* rh, int flags, u_char* ep, Source* ps,
1140                                                                Address & addr, u_int32_t ssrc, int layer)
1141{
1142        int cnt = flags >> 8 & 0x1f;
1143        u_int32_t* p = (u_int32_t*)&rh->rh_ssrc;
1144        while (--cnt >= 0 && (u_char*)p < ep) {
1145                int n = sdesbody(p, ep, ps, addr, ssrc, layer);
1146                if (n == 0)
1147                        break;
1148                p += n;
1149        }
1150        if (cnt >= 0)
1151                ps->badsdes(1);
1152}
1153
1154void SessionManager::parse_bye(rtcphdr* rh, int flags, u_char* ep, Source* ps)
1155{
1156        int cnt = flags >> 8 & 0x1f;
1157        u_int32_t* p = (u_int32_t*)&rh->rh_ssrc;
1158
1159        while (--cnt >= 0) {
1160                if (p >= (u_int32_t*)ep) {
1161                        ps->badbye(1);
1162                        return;
1163                }
1164                Source* s;
1165                if (ps->srcid() != rh->rh_ssrc)
1166                        s = SourceManager::instance().consult(*p);
1167                else
1168                        s = ps;
1169                if (s != 0)
1170                        s->lts_done(unixtime());
1171                ++p;
1172        }
1173}
1174
1175/*
1176 * Receive an rtcp packet (from the control port).
1177 */
1178void SessionManager::recv(CtrlHandler* ch)
1179{
1180        Address * srcp;
1181        int cc = ch->recv(pktbuf_, 2 * RTP_MTU, srcp);
1182        if (cc <= 0)
1183                return;
1184
1185        rtcphdr* rh = (rtcphdr*)pktbuf_;
1186
1187    // Ignore loopback packets
1188        if (!loopback_) {
1189                SourceManager& sm = SourceManager::instance();
1190                if (rh->rh_ssrc == (*sm.localsrc()).srcid())
1191                        return;
1192        }
1193
1194        if (cc < int(sizeof(*rh))) {
1195                ++nrunt_;
1196                return;
1197        }
1198        /*
1199         * try to filter out junk: first thing in packet must be
1200         * sr, rr or bye & version number must be correct.
1201         */
1202        switch(ntohs(rh->rh_flags) & 0xc0ff) {
1203        case RTP_VERSION << 14 | RTCP_PT_SR:
1204        case RTP_VERSION << 14 | RTCP_PT_RR:
1205        case RTP_VERSION << 14 | RTCP_PT_XR:
1206        case RTP_VERSION << 14 | RTCP_PT_BYE:
1207                break;
1208        default:
1209                /*
1210                 * XXX should further categorize this error -- it is
1211                 * likely that people mis-implement applications that
1212                 * don't put something other than SR,RR,BYE first.
1213                 */
1214                ++badversion_;
1215                return;
1216        }
1217        /*
1218         * at this point we think the packet's valid.  Update our average
1219         * size estimator.  Also, there's valid ssrc so charge errors to it
1220         */
1221        rtcp_avg_size_ += RTCP_SIZE_GAIN * (double(cc + 28) - rtcp_avg_size_);
1222        Address & addr = *srcp;
1223
1224        /*
1225         * First record in compount packet must be the ssrc of the
1226         * sender of the packet.  Pull it out here so we can use
1227         * it in the sdes parsing, since the sdes record doesn't
1228         * contain the ssrc of the sender (in the case of mixers).
1229         */
1230        u_int32_t ssrc = rh->rh_ssrc;
1231        Source* ps = SourceManager::instance().lookup(ssrc, ssrc, addr);
1232        if (ps == 0)
1233                return;
1234       
1235        int layer = ch - ch_;
1236                /*
1237                * Outer loop parses multiple RTCP records of a "compound packet".
1238                * There is no framing between records.  Boundaries are implicit
1239                * and the overall length comes from UDP.
1240        */
1241        u_char* epack = (u_char*)rh + cc;
1242        while ((u_char*)rh < epack) {
1243                u_int len = (ntohs(rh->rh_len) << 2) + 4;
1244                u_char* ep = (u_char*)rh + len;
1245                if (ep > epack) {
1246                        ps->badsesslen(1);
1247                        return;
1248                }
1249                u_int flags = ntohs(rh->rh_flags);
1250                if (flags >> 14 != RTP_VERSION) {
1251                        ps->badsessver(1);
1252                        return;
1253                }
1254                switch (flags & 0xff) {
1255
1256                case RTCP_PT_SR:
1257                        parse_sr(rh, flags, ep, ps, addr, layer);
1258                        break;
1259
1260                case RTCP_PT_RR:
1261                        parse_rr(rh, flags, ep, ps, addr, layer);
1262                        break;
1263
1264                case RTCP_PT_XR:
1265                        parse_xr(rh, flags, ep, ps, addr, layer);
1266                        break;
1267
1268                case RTCP_PT_SDES:
1269                        parse_sdes(rh, flags, ep, ps, addr, ssrc, layer);
1270                        break;
1271
1272                case RTCP_PT_BYE:
1273                        parse_bye(rh, flags, ep, ps);
1274                        break;
1275
1276                default:
1277                        ps->badsessopt(1);
1278                        break;
1279                }
1280                rh = (rtcphdr*)ep;
1281        }
1282        return;
1283}
Note: See TracBrowser for help on using the browser.