root/vic/branches/cc/rtp/session.cpp @ 4235

Revision 4235, 30.9 KB (checked in by soohyunc, 6 years ago)

o parse ackvec information

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*-
2 * Copyright (c) 1993-1994 The Regents of the University of California.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 * 3. All advertising materials mentioning features or use of this software
14 *    must display the following acknowledgement:
15 *      This product includes software developed by the University of
16 *      California, Berkeley and the Network Research Group at
17 *      Lawrence Berkeley Laboratory.
18 * 4. Neither the name of the University nor of the Laboratory may be used
19 *    to endorse or promote products derived from this software without
20 *    specific prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
23 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
26 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
33 *
34 * $Id$
35 */
36static const char rcsid[] =
37    "@(#) $Header$ (LBL)";
38
39#include "config.h"
40#include <math.h>
41#include <errno.h>
42#include <string.h>
43#ifdef WIN32
44extern "C" int getpid();
45#endif
46#include "source.h"
47#include "vic_tcl.h"
48#include "media-timer.h"
49#include "crypt.h"
50#include "timer.h"
51#include "ntp-time.h"
52#include "session.h"
53#include "cc/tfwc_sndr.h"
54
55/* added to support the mbus
56#include "mbus_handler.h"*/
57
58
59static class SessionMatcher : public Matcher {
60    public:
61                SessionMatcher() : Matcher("session") {}
62                TclObject* match(const char* id) {
63                        if (strcmp(id, "audio/rtp") == 0)
64                                return (new AudioSessionManager);
65                        else if (strcmp(id, "video/rtp") == 0)
66                                return (new VideoSessionManager);
67                        return (0);
68                }
69} session_matcher;
70
71int VideoSessionManager::check_format(int fmt) const
72{
73        switch(fmt) {
74                case RTP_PT_RAW:
75                case RTP_PT_CELLB:
76                case RTP_PT_JPEG:
77                case RTP_PT_CUSEEME:
78                case RTP_PT_NV:
79                case RTP_PT_CPV:
80                case RTP_PT_H261:
81                case RTP_PT_BVC:
82                case RTP_PT_H261_COMPAT:/*XXX*/
83                case RTP_PT_H263:
84                case RTP_PT_MPEG4:
85                case RTP_PT_H264:
86                case RTP_PT_H263P:
87                case RTP_PT_LDCT:
88                case RTP_PT_PVH:
89                case RTP_PT_DV:
90#ifdef USE_H261AS
91                case RTP_PT_H261AS:
92#endif 
93                return (1);
94        }
95        return (0);
96}
97
98int AudioSessionManager::check_format(int fmt) const
99{
100        switch (fmt) {
101        case RTP_PT_PCMU:
102        case RTP_PT_CELP:
103        case RTP_PT_GSM:
104        case RTP_PT_DVI:
105        case RTP_PT_LPC:
106                return (1);
107        }
108        return (0);
109}
110
111
112static SessionManager* manager;
113
114void
115adios()
116{
117        if (SourceManager::instance().localsrc() != 0)
118                manager->send_bye();
119        exit(0);
120}
121
122/*void ReportTimer::timeout()
123{
124sm_.send_report();
125}*/
126
127void DataHandler::dispatch(int)
128{
129        sm_->recv(this);
130}
131
132void CtrlHandler::dispatch(int)
133{
134        sm_->recv(this);
135}
136
137CtrlHandler::CtrlHandler()
138: ctrl_inv_bw_(0.),
139ctrl_avg_size_(128.),
140rint_(0.0) //SV-XXX: Debian
141{
142}
143
144inline void CtrlHandler::schedule_timer()
145{
146        msched(int(fmod(double(random()), rint_) + rint_ * .5 + .5));
147}
148
149void CtrlHandler::net(Network* n)
150{
151        DataHandler::net(n);
152        cancel();
153        if (n != 0) {
154        /*
155        * schedule a timer for our first report using half the
156        * min ctrl interval.  This gives us some time before
157        * our first report to learn about other sources so our
158        * next report interval will account for them.  The avg
159        * ctrl size was initialized to 128 bytes which is
160        * conservative (it assumes everyone else is generating
161        * SRs instead of RRs).
162                */
163                double rint = ctrl_avg_size_ * ctrl_inv_bw_;
164                if (rint < CTRL_MIN_RPT_TIME / 2. * 1000.)
165                        rint = CTRL_MIN_RPT_TIME / 2. * 1000.;
166                rint_ = rint;
167                schedule_timer();
168        }
169}
170
171void CtrlHandler::sample_size(int cc)
172{
173        ctrl_avg_size_ += CTRL_SIZE_GAIN * (double(cc + 28) - ctrl_avg_size_);
174}
175
176void CtrlHandler::adapt(int nsrc, int nrr, int we_sent)
177{
178/*
179* compute the time to the next report.  we do this here
180* because we need to know if there were any active sources
181* during the last report period (nrr above) & if we were
182* a source.  The bandwidth limit for ctrl traffic was set
183* on startup from the session bandwidth.  It is the inverse
184* of bandwidth (ie., ms/byte) to avoid a divide below.
185        */
186        double ibw = ctrl_inv_bw_;
187        if (nrr) {
188                /* there were active sources */
189                if (we_sent) {
190                        ibw *= 1./CTRL_SENDER_BW_FRACTION;
191                        nsrc = nrr;
192                } else {
193                        ibw *= 1./CTRL_RECEIVER_BW_FRACTION;
194                        nsrc -= nrr;
195                }
196        }
197        double rint = ctrl_avg_size_ * double(nsrc) * ibw;
198        if (rint < CTRL_MIN_RPT_TIME * 1000.)
199                rint = CTRL_MIN_RPT_TIME * 1000.;
200        rint_ = rint;
201}
202
203void CtrlHandler::timeout()
204{
205        sm_->announce(this);
206        schedule_timer();
207}
208
209//SV-XXX: rearranged initialization order to shut up gcc4
210SessionManager::SessionManager()
211//      : dh_(*this), ch_(*this), rt_(*this),
212: mb_(mbus_handler_engine, NULL),
213lipSyncEnabled_(0),
214badversion_(0),
215badoptions_(0),
216badfmt_(0),
217badext_(0),
218nrunt_(0),
219last_np_(0),
220sdes_seq_(0),
221rtcp_inv_bw_(0.),
222rtcp_avg_size_(128.),
223confid_(-1),
224seqno_(0),              // RTP packet sequence number (from RTP header)
225lastseq_(0),    // last received packet's seqno
226ackvec_(0)              // bit vector (AckVec)
227{
228        /*XXX For adios() to send bye*/
229        manager = this;
230       
231        for (int i = 0; i < NLAYER; ++i) {
232                dh_[i].manager(this);
233                ch_[i].manager(this);
234        }
235       
236        /*XXX*/
237        pktbuf_ = new u_char[2 * RTP_MTU];
238        pool_ = new BufferPool;
239       
240        /*
241        * schedule a timer for our first report using half the
242        * min rtcp interval.  This gives us some time before
243        * our first report to learn about other sources so our
244        * next report interval will account for them.  The avg
245        * rtcp size was initialized to 128 bytes which is
246        * conservative (it assumes everyone else is generating
247        * SRs instead of RRs).
248        */
249        double rint = rtcp_avg_size_ * rtcp_inv_bw_;
250        if (rint < RTCP_MIN_RPT_TIME / 2. * 1000.)
251                rint = RTCP_MIN_RPT_TIME / 2. * 1000.;
252        rint_ = rint;
253        //rt_.msched(int(fmod(double(random()), rint) + rint * .5 + .5));
254}
255
256SessionManager::~SessionManager()
257{
258        if (pktbuf_)
259                delete[] pktbuf_;
260       
261        delete pool_;
262}
263
264u_int32_t SessionManager::alloc_srcid(Address & addr) const
265{
266        timeval tv;
267        ::gettimeofday(&tv, 0);
268        u_int32_t srcid = u_int32_t(tv.tv_sec + tv.tv_usec);
269        srcid += (u_int32_t)getuid();
270        srcid += (u_int32_t)getpid();
271/* __IPv6 changed srcid computation */
272        for(unsigned int i = 0; i < (addr.length() % sizeof(u_int32_t)); i++) {
273                srcid += ((u_int32_t*)((const void*)addr))[i];
274        }
275        return (srcid);
276}
277
278extern char* onestat(char* cp, const char* name, u_long v);
279
280char* SessionManager::stats(char* cp) const
281{
282        cp = onestat(cp, "Bad-RTP-version", badversion_);
283        cp = onestat(cp, "Bad-RTPv1-options", badoptions_);
284        cp = onestat(cp, "Bad-Payload-Format", badfmt_);
285        cp = onestat(cp, "Bad-RTP-Extension", badext_);
286        cp = onestat(cp, "Runts", nrunt_);
287        Crypt* p = dh_[0].net()->crypt();
288        if (p != 0) {
289                cp = onestat(cp, "Crypt-Bad-Length", p->badpktlen());
290                cp = onestat(cp, "Crypt-Bad-P-Bit", p->badpbit());
291        }
292        /*XXX*/
293        if (ch_[0].net() != 0) {
294                Crypt* p = ch_[0].net()->crypt();
295                if (p != 0) {
296                        cp = onestat(cp, "Crypt-Ctrl-Bad-Length", p->badpktlen());
297                        cp = onestat(cp, "Crypt-Ctrl-Bad-P-Bit", p->badpbit());
298                }
299        }
300        *--cp = 0;
301        return (cp);
302}
303
304int SessionManager::command(int argc, const char*const* argv)
305{
306        Tcl& tcl = Tcl::instance();
307        char* cp = tcl.buffer();
308        if (argc == 2) {
309                if (strcmp(argv[1], "active") == 0) {
310                        SourceManager::instance().sortactive(cp);
311                        tcl.result(cp);
312                        return (TCL_OK);
313                }
314                if (strcmp(argv[1], "local-addr-heuristic") == 0) {
315                        strcpy(cp, intoa(LookupLocalAddr()));
316                        tcl.result(cp);
317                        return (TCL_OK);
318                }
319                if (strcmp(argv[1], "stats") == 0) {
320                        stats(cp);
321                        tcl.result(cp);
322                        return (TCL_OK);
323                }
324                if (strcmp(argv[1], "nb") == 0) {
325                        sprintf(cp, "%u", 8 * nb_);
326                        tcl.result(cp);
327                        return (TCL_OK);
328                }
329                if (strcmp(argv[1], "nf") == 0) {
330                        sprintf(cp, "%u", nf_);
331                        tcl.result(cp);
332                        return (TCL_OK);
333                }
334                if (strcmp(argv[1], "np") == 0 ||
335                    strcmp(argv[1], "ns") == 0) {
336                        sprintf(cp, "%u", np_);
337                        tcl.result(cp);
338                        return (TCL_OK);
339                }
340                if (strcmp(argv[1], "lip-sync") == 0) {
341                        sprintf(cp, "%u", lipSyncEnabled_);
342                        tcl.result(cp);
343                        return (TCL_OK);
344                }
345
346        } else if (argc == 3) {
347                if (strcmp(argv[1], "sm") == 0) {
348                        sm_ = (SourceManager*)TclObject::lookup(argv[2]);
349                        return (TCL_OK);
350                }
351                if (strcmp(argv[1], "name") == 0) {
352                        Source* s = SourceManager::instance().localsrc();
353                        s->sdes(RTCP_SDES_NAME, argv[2]);
354                        return (TCL_OK);
355                }
356                if (strcmp(argv[1], "email") == 0) {
357                        Source* s = SourceManager::instance().localsrc();
358                        s->sdes(RTCP_SDES_EMAIL, argv[2]);
359                        return (TCL_OK);
360                }
361                if (strcmp(argv[1], "random-srcid") == 0) {
362                        Address * addrp;
363                        if ((addrp = (dh_[0].net())->alloc(argv[2])) !=0 ) { //SV-XXX: placed ()'s against truth check == NULL
364                          sprintf(cp, "%u", alloc_srcid(*addrp));
365                          delete addrp;
366                        }
367                        tcl.result(cp);
368                        return (TCL_OK);
369                }
370                if (strcmp(argv[1], "data-net") == 0) {
371                        dh_[0].net((Network*)TclObject::lookup(argv[2]));
372                        return (TCL_OK);
373                }
374                if (strcmp(argv[1], "ctrl-net") == 0) {
375                        ch_[0].net((Network*)TclObject::lookup(argv[2]));
376                        return (TCL_OK);
377                }
378                if (strcmp(argv[1], "max-bandwidth") == 0) {
379                        double bw = atof(argv[2]) / 8.;
380                        rtcp_inv_bw_ = 1. / (bw * RTCP_SESSION_BW_FRACTION);
381                        return (TCL_OK);
382                }
383                if (strcmp(argv[1], "confid") == 0) {
384                        confid_ = atoi(argv[2]);
385                        return (TCL_OK);
386                }
387                if (strcmp(argv[1], "data-bandwidth") == 0) {
388                        /*XXX assume value in range */
389                        bps(atoi(argv[2]));
390                        return (TCL_OK);
391                }
392                if (strcmp(argv[1], "mtu") == 0) {
393                        mtu_ = atoi(argv[2]);
394                        return (TCL_OK);
395                }
396                if (strcmp(argv[1], "loopback") == 0) {
397                        loopback_ = atoi(argv[2]);
398                        return (TCL_OK);
399                }
400                if (strcmp(argv[1], "lip-sync") == 0) {
401                        lipSyncEnabled_ = atoi(argv[2]);
402                        return (TCL_OK);
403                }
404
405        }  else if (argc == 4) {
406                if (strcmp(argv[1], "data-net") == 0) {
407                        u_int layer = atoi(argv[3]);
408                        if (layer >= NLAYER)
409                                abort();
410                        if (*argv[2] == 0) {
411                                dh_[layer].net(0);
412                                return (TCL_OK);
413                        }
414                        Network* net = (Network*)TclObject::lookup(argv[2]);
415                        if (net == 0) {
416                                tcl.resultf("no network %s", argv[2]);
417                                return (TCL_ERROR);
418                        }
419                        if (net->rchannel() < 0) {
420                                tcl.resultf("network not open");
421                                return (TCL_ERROR);
422                        }
423                        dh_[layer].net(net);
424                        return (TCL_OK);
425                }
426                if (strcmp(argv[1], "ctrl-net") == 0) {
427                        u_int layer = atoi(argv[3]);
428                        if (layer >= NLAYER)
429                                abort();
430                        if (*argv[2] == 0) {
431                                ch_[layer].net(0);
432                                return (TCL_OK);
433                        }
434                        Network* net = (Network*)TclObject::lookup(argv[2]);
435                        if (net == 0) {
436                                tcl.resultf("no network %s", argv[2]);
437                                return (TCL_ERROR);
438                        }
439                        if (net->rchannel() < 0) {
440                                tcl.resultf("network not open");
441                                return (TCL_ERROR);
442                        }
443                        ch_[layer].net(net);
444                        return (TCL_OK);
445                }
446                return (Transmitter::command(argc, argv));
447        }
448        //should not be reached
449        return (TCL_ERROR);
450}
451
452void SessionManager::transmit(pktbuf* pb)
453{
454        //mh_.msg_iov = pb->iov;
455        //      dh_[.net()->send(mh_);
456                //debug_msg("L %d,",pb->layer);
457        // Using loop_layer for now to restrict transmission as well
458        if (pb->layer < loop_layer_) {
459        //      if ( pb->layer <0 ) exit(1);
460                Network* n = dh_[pb->layer].net();
461                if (n != 0)
462                        n->send(pb);
463        }
464}
465
466u_char* SessionManager::build_sdes_item(u_char* p, int code, Source& s)
467{
468        const char* value = s.sdes(code);
469        if (value != 0) {
470                int len = strlen(value);
471                *p++ = code;
472                *p++ = len;
473                memcpy(p, value, len);
474                p += len;
475        }
476        return (p);
477}
478
479int SessionManager::build_sdes(rtcphdr* rh, Source& ls)
480{
481        int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_SDES;
482        rh->rh_flags = htons(flags);
483        rh->rh_ssrc = ls.srcid();
484        u_char* p = (u_char*)(rh + 1);
485        p = build_sdes_item(p, RTCP_SDES_CNAME, ls);
486
487        /*
488         * We always send a cname plus one other sdes
489         * There's a schedule for what we send sequenced by sdes_seq_:
490         *   - send 'email' every 0th & 4th packet
491         *   - send 'note' every 2nd packet
492         *   - send 'tool' every 6th packet
493         *   - send 'name' in all the odd slots
494         * (if 'note' is not the empty string, we switch the roles
495         *  of name & note)
496         */
497        int nameslot, noteslot;
498        const char* note = ls.sdes(RTCP_SDES_NOTE);
499        if (note) {
500                if (*note) {
501                        nameslot = RTCP_SDES_NOTE;
502                        noteslot = RTCP_SDES_NAME;
503                } else {
504                        nameslot = RTCP_SDES_NAME;
505                        noteslot = RTCP_SDES_NOTE;
506                }
507        } else {
508                nameslot = RTCP_SDES_NAME;
509                noteslot = RTCP_SDES_NAME;
510        }
511        u_int seq = (++sdes_seq_) & 0x7;
512        switch (seq) {
513
514        case 0case 4:
515                p = build_sdes_item(p, RTCP_SDES_EMAIL, ls);
516                break;
517
518        case 2:
519                p = build_sdes_item(p, noteslot, ls);
520                break;
521        case 6:
522                p = build_sdes_item(p, RTCP_SDES_TOOL, ls);
523                break;
524        default:
525                p = build_sdes_item(p, nameslot, ls);
526        }
527        int len = p - (u_char*)rh;
528        int pad = 4 - (len & 3);
529        len += pad;
530        rh->rh_len = htons((len >> 2) - 1);
531        while (--pad >= 0)
532                *p++ = 0;
533
534        return (len);
535}
536
537int SessionManager::build_app(rtcphdr* rh, Source& ls, const char *name,
538                void *data, int datalen)
539{
540  int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_APP;
541  rh->rh_flags = htons(flags);
542  rh->rh_ssrc = ls.srcid();
543  u_char* p = (u_char*)(rh + 1);
544    int len;
545   
546    // write the name field
547    len = strlen(name);
548    if( len < 4 ) {
549        memcpy(p,name,len);
550        p += len;
551       
552        // pad short names
553        while( p - (u_char*)(rh+1) < 4 )
554            *p++ = 0;
555    }
556    else {
557        // use first four chars of given name
558        memcpy(p,name,4);
559        p += 4;
560    }
561   
562    // write the app data
563    memcpy(p,data,datalen);
564    p += datalen;
565   
566    // pad as needed
567    len = p - (u_char*)rh;
568    int pad = 4 - (len & 3);
569    while( --pad >= 0 )
570        *p++ = 0;
571  len = p - (u_char*)rh;
572
573    // set the length now that it's known
574  rh->rh_len = htons((len >> 2) - 1);
575
576  return (len);
577}
578
579/*void SessionManager::send_report()
580{
581        send_report(0);
582}
583
584  void SessionManager::send_bye()
585  {
586  send_report(1);
587}*/
588
589// SessionManager is no longer used as Timer - Each
590// CtrlHandler has its own Timer which calls this;
591void SessionManager::announce(CtrlHandler* ch)
592{
593        send_report(ch, 0);
594}
595
596/*XXX check for buffer overflow*/
597/*
598* Send an RTPv2 report packet.
599*/
600//void SessionManager::send_report(int bye)
601
602void SessionManager::send_report(CtrlHandler* ch, int bye, int app)
603{
604        UNUSED(app); //SV-XXX: unused
605
606        SourceManager& sm = SourceManager::instance();
607        Source& s = *sm.localsrc();
608        rtcphdr* rh = (rtcphdr*)pktbuf_;
609        rh->rh_ssrc = s.srcid();
610        int flags = RTP_VERSION << 14;
611        int layer = ch - ch_; //LLL
612        Source::Layer& sl = s.layer(layer);
613        timeval now = unixtime();
614        sl.lts_ctrl(now);
615        int we_sent = 0;
616        rtcp_rr* rr;
617        rtcp_xr* xr;    // extended report
618        Tcl& tcl = Tcl::instance();
619
620        /*
621         * If we've sent data since our last sender report send a
622         * new report.  The MediaTimer check is to make sure we still
623         * have a grabber -- if not, we won't be able to interpret the
624         * media timestamps so there's no point in sending an SR.
625         */
626        MediaTimer* mt = MediaTimer::instance();
627        if (sl.np() != last_np_ && mt) {
628                last_np_ = sl.np();
629                we_sent = 1;
630                flags |= RTCP_PT_SR;
631                rtcp_sr* sr = (rtcp_sr*)(rh + 1);
632                sr->sr_ntp = ntp64time(now);
633                HTONL(sr->sr_ntp.upper);
634                HTONL(sr->sr_ntp.lower);
635                sr->sr_ts = htonl(mt->ref_ts());
636                sr->sr_np = htonl(sl.np());
637                sr->sr_nb = htonl(sl.nb());
638                rr = (rtcp_rr*)(sr + 1);
639                xr = (rtcp_xr*)(rr + 1);
640        } else {
641                flags |= RTCP_PT_RR;
642                rr = (rtcp_rr*)(rh + 1);
643                xr = (rtcp_xr*)(rr + 1);        // extended report
644        }
645        int nrr = 0;
646        int nsrc = 0;
647        /*
648        * we don't want to inflate report interval if user has set
649        * the flag that causes all sources to be 'kept' so we
650        * consider sources 'inactive' if we haven't heard a control
651        * msg from them for ~32 reporting intervals.
652        */
653        //LLL   u_int inactive = u_int(rint_ * (32./1000.));
654        u_int inactive = u_int(ch->rint() * (32./1000.));
655        if (inactive < 2)
656                inactive = 2;
657        for (Source* sp = sm.sources(); sp != 0; sp = sp->next_) {
658                ++nsrc;
659                Source::Layer& sl = sp->layer(layer);
660                //              int received = sp->np() - sp->snp();
661                int received = sl.np() - sl.snp();
662                if (received == 0) {
663                        //              if (u_int(now.tv_sec - sp->lts_ctrl().tv_sec) > inactive)
664                        if (u_int(now.tv_sec - sl.lts_ctrl().tv_sec) > inactive)
665                                --nsrc;
666                        continue;
667                }
668                //              sp->snp(sp->np());
669                sl.snp(sl.np());
670                rr->rr_srcid = sp->srcid();
671                //              int expected = sp->ns() - sp->sns();
672                int expected = sl.ns() - sl.sns();
673                //              sp->sns(sp->ns());
674                sl.sns(sl.ns());
675                u_int32_t v;
676                int lost = expected - received;
677                if (lost <= 0)
678                        v = 0;
679                else
680                        /* expected != 0 if lost > 0 */
681                        v = ((lost << 8) / expected) << 24;
682                /* XXX should saturate on over/underflow */
683                //              v |= (sp->ns() - sp->np()) & 0xffffff;
684                v |= (sl.ns() - sl.np()) & 0xffffff;
685                rr->rr_loss = htonl(v);
686                //              rr->rr_ehsr = htonl(sp->ehs());
687                rr->rr_ehsr = htonl(sl.ehs());
688                rr->rr_dv = (sp->handler() != 0) ? sp->handler()->delvar() : 0;
689                //              rr->rr_lsr = htonl(sp->sts_ctrl());
690                rr->rr_lsr = htonl(sl.sts_ctrl());
691                //              if (sp->lts_ctrl().tv_sec == 0)
692                if (sl.lts_ctrl().tv_sec == 0)
693                        rr->rr_dlsr = 0;
694                else {
695                        u_int32_t ntp_now = ntptime(now);
696                        //                      u_int32_t ntp_then = ntptime(sp->lts_ctrl());
697                        u_int32_t ntp_then = ntptime(sl.lts_ctrl());
698                        rr->rr_dlsr = htonl(ntp_now - ntp_then);
699                }
700                ++rr;
701                if (++nrr >= 31)
702                        break;
703        }
704        flags |= nrr << 8;
705        rh->rh_flags = htons(flags);
706        int len = (u_char*)rr - pktbuf_;
707        rh->rh_len = htons((len >> 2) - 1);
708
709        if (bye)
710                len += build_bye((rtcphdr*)rr, s);
711        else
712                len += build_sdes((rtcphdr*)rr, s);
713       
714        // build "site" app data if specified
715        const char *data = tcl.attr("site");
716        if(data)
717        {
718            rr = (rtcp_rr*)(((u_char*)rh) + len);
719            len += build_app((rtcphdr*)rr, s, "site", (void *)data, strlen(data));
720        }
721        //LLL   ch_.send(pktbuf_, len);
722        ch->send(pktbuf_, len);
723       
724        /*
725      rtcp_avg_size_ += RTCP_SIZE_GAIN * (double(len + 28) - rtcp_avg_size_);
726         
727          // compute the time to the next report.  we do this here
728          // because we need to know if there were any active sources
729          // during the last report period (nrr above) & if we were
730          // a source.  The bandwidth limit for rtcp traffic was set
731          // on startup from the session bandwidth.  It is the inverse
732          // of bandwidth (ie., ms/byte) to avoid a divide below.
733       
734        //      double ibw = rtcp_inv_bw_;
735        if (nrr) {
736        // there were active sources
737        //              if (we_sent) {
738        ibw *= 1./RTCP_SENDER_BW_FRACTION;
739        nsrc = nrr;
740        } else {
741        ibw *= 1./RTCP_RECEIVER_BW_FRACTION;
742        nsrc -= nrr;
743        }
744        }
745        double rint = rtcp_avg_size_ * double(nsrc) * ibw;
746        if (rint < RTCP_MIN_RPT_TIME * 1000.)
747                rint = RTCP_MIN_RPT_TIME * 1000.;
748        rint_ = rint;
749        rt_.msched(int(fmod(double(random()), rint) + rint * .5 + .5));
750        */
751       
752        // Call timer adaption for each layer
753        ch->adapt(nsrc, nrr, we_sent);
754        ch->sample_size(len);
755       
756        //      sm.CheckActiveSources(rint);
757        if (layer == 0)
758                sm.CheckActiveSources(ch->rint());
759       
760}
761
762int SessionManager::build_bye(rtcphdr* rh, Source& ls)
763{
764        int flags = RTP_VERSION << 14 | 1 << 8 | RTCP_PT_BYE;
765        rh->rh_flags = ntohs(flags);
766        rh->rh_len = htons(1);
767        rh->rh_ssrc = ls.srcid();
768        return (8);
769}
770
771/*
772 * receive an RTP packet
773 */
774void SessionManager::recv(DataHandler* dh)
775{
776        int layer = dh - dh_;
777        pktbuf* pb = pool_->alloc(layer);
778        Address * addrp;
779        /* leave room in case we need to expand rtpv1 into an rtpv2 header */
780        /* XXX the free mem routine didn't like it ... */
781        //u_char* bp = &pktbuf_[4];
782        //u_char* bp = pktbuf_;
783       
784        int cc = dh->recv(pb->data, sizeof(pb->data), addrp);
785        //int cc = dh->recv(bp, 2 * RTP_MTU - 4, addrp);
786        if (cc <= 0) {
787                pb->release();
788                return;
789        }
790
791        rtphdr* rh = (rtphdr*)pb->data;
792        seqno_ = ntohs(rh->rh_seqno);   // received packet seqno
793        debug_msg("received seqno:      %d\n", seqno_);
794
795    // Ignore loopback packets
796        if (!loopback_) {
797                //rtphdr* rh = (rtphdr*)pb->data;
798                SourceManager& sm = SourceManager::instance();
799                if (rh->rh_ssrc == (*sm.localsrc()).srcid()) {
800                        debug_msg("(loopback) seqno:    %d\n", seqno_);
801                        pb->release();  // releasing loopback packet
802                        return;
803                }
804        } // now, loopback packets ignored (if disabled)
805
806        // set bit vector
807        for (int i = lastseq_+1; i <= seqno_; i++) {
808                        SET_BIT_VEC (ackvec_, 1);
809        }
810
811        // printing bit vector
812        bool isThere;
813        debug_msg("XXX received ackvec:");
814        for (int i = lastseq_+1; i <= seqno_; i++) {
815                isThere = SEE_BIT_VEC (ackvec_, i, seqno_);
816                printf(" %d... %s ", seqno_, isThere ? "Ok" : "NOk");
817        }
818        printf("\n");
819        lastseq_ = seqno_;
820
821        int version = pb->data[0] >> 6;
822        //int version = *(u_char*)rh >> 6;
823        if (version != 2) {
824                ++badversion_;
825                pb->release();
826                return;
827        }
828        if (cc < (int)sizeof(rtphdr)) {
829                ++nrunt_;
830                pb->release();
831                return;
832        }
833        pb->len = cc;
834       
835        //bp += sizeof(*rh);
836        //cc -= sizeof(*rh);
837        demux(pb, *addrp);
838}
839
840void SessionManager::demux(pktbuf* pb, Address & addr)
841{
842        rtphdr* rh = (rtphdr*)pb->data;
843        u_int32_t srcid = rh->rh_ssrc;
844        int flags = ntohs(rh->rh_flags);
845        // for LIP SYNC
846        //SV-XXX: unused: u_char *pkt = pb->data - sizeof(*rh);
847
848        if ((flags & RTP_X) != 0) {
849        /*
850        * the minimal-control audio/video profile
851        * explicitly forbids extensions
852                */
853                ++badext_;
854                pb->release();
855                return;
856        }
857
858        /*
859         * Check for illegal payload types.  Most likely this is
860         * a session packet arriving on the data port.
861         */
862        int fmt = flags & 0x7f;
863        if (!check_format(fmt)) {
864                ++badfmt_;
865                pb->release();
866                return;
867        }
868
869        SourceManager& sm = SourceManager::instance();
870        u_int16_t seqno = ntohs(rh->rh_seqno);
871        Source* s = sm.demux(srcid, addr, seqno, pb->layer);
872        if (s == 0) {
873        /*
874        * Takes a pair of validated packets before we will
875        * believe the source.  This prevents a runaway
876        * allocation of Source data structures for a
877        * stream of garbage packets.
878                */
879                pb->release();
880                return;
881        }
882        /* inform this source of the mbus */
883        s->mbus(&mb_);
884       
885        Source::Layer& sl = s->layer(pb->layer);
886        timeval now = unixtime();
887        //      s->lts_data(now);
888        sl.lts_data(now);
889        //      s->sts_data(rh->rh_ts);
890        sl.sts_data(rh->rh_ts);
891       
892        // extract CSRC count (CC field); increment pb->dp data pointer & adjust length accordingly
893        int cnt = (flags >> 8) & 0xf;
894        if (cnt > 0) {
895                //u_char* nh = (u_char*)rh + (cnt << 2);
896                rtphdr hdr = *rh;
897                pb->dp += (cnt << 2);
898                pb->len -= (cnt << 2);
899                u_int32_t* bp = (u_int32_t*)(rh + 1);
900                while (--cnt >= 0) {
901                        u_int32_t csrc = *(u_int32_t*)bp;
902                        bp += 4;
903                        Source* cs = sm.lookup(csrc, srcid, addr);
904                        //                      cs->lts_data(now);
905                        cs->layer(pb->layer).lts_data(now);
906                        cs->action();
907                }
908                //              rtphdr hdr = *rh;
909                //              rh = (rtphdr*)nh;
910                /*XXX move header up so it's contiguous with data*/
911                rh = (rtphdr*)pb->dp;
912                *rh = hdr;
913        } else
914                s->action();
915
916        if (s->sync() && lipSyncEnabled()) { 
917                /*
918                 * Synchronisation is enabled on this source;
919                 * video packets have to
920                 * be buffered, their playout point scheduled, and the
921                 * playout delays communicated with the audio tool ...
922                 */ 
923
924                /*
925                * XXX bit rate doesn't include rtpv1 options;
926                * but v1 is going away anyway.
927                */
928                //              int dup = s->cs(seqno);
929                //              s->np(1);
930                //              s->nb(cc + sizeof(*rh));
931                int dup = sl.cs(seqno, s);
932                sl.np(1);
933                sl.nb(pb->len);
934                if (dup) {
935                        pb->release();
936                        return;
937                }
938                if (flags & RTP_M) // check if reach frame boundaries
939                        //                      s->nf(1);
940                        sl.nf(1);
941               
942                //s->recv(pkt, rh, pb, pb->len); // this should invoke Source::recv and buffer
943                //s->recv(bp); // this should invoke Source::recv and buffer
944               
945                pktbuf_ = (u_char*)new_blk();
946        } /* synced */
947
948        else { /* ... playout video packets as they arrive */
949                /*
950                 * This is a data packet.  If the source needs activation,
951                 * or the packet format has changed, deal with this.
952                 * Then, hand the packet off to the packet handler.
953                 * XXX might want to be careful about flip-flopping
954                 * here when format changes due to misordered packets
955                 * (easy solution -- keep rtp seqno of last fmt change).
956                 */
957                PacketHandler* h = s->handler();
958                if (h == 0)
959                        h = s->activate(fmt);
960                else if (s->format() != fmt)
961                        h = s->change_format(fmt);
962               
963                        /*
964                        * XXX bit rate doesn't include rtpv1 options;
965                        * but v1 is going away anyway.
966                */
967                //              int dup = s->cs(seqno);
968                //              s->np(1);
969                //              s->nb(cc + sizeof(*rh));
970                int dup = sl.cs(seqno, s);
971                sl.np(1);
972                sl.nb(pb->len);
973                if (dup){
974                        pb->release();
975                        return;
976                }
977                if (flags & RTP_M)
978                        //                  s->nf(1);
979                        sl.nf(1);
980#ifdef notdef
981        /* This should move to the handler */
982        /*XXX could get rid of hdrlen and move run check into recv method*/
983               
984                int hlen = h->hdrlen();
985                cc -= hlen;
986                if (cc < 0) {
987                        //                  s->runt(1);
988                        sl.runt(1);
989                        pb->release();
990                        return;
991                }
992#endif
993                if (s->mute()) {
994                        pb->release();
995                        return;
996                }
997                //h->recv(rh, bp + hlen, cc);
998                h->recv(pb);
999        } /* not sync-ed */
1000
1001}
1002
1003void SessionManager::parse_rr_records(u_int32_t, rtcp_rr*, int,
1004                                      const u_char*, Address &)
1005{
1006}
1007                                     
1008
1009void SessionManager::parse_sr(rtcphdr* rh, int flags, u_char*ep,
1010                                                          Source* ps, Address & addr, int layer)
1011{
1012        rtcp_sr* sr = (rtcp_sr*)(rh + 1);
1013        Source* s;
1014        u_int32_t ssrc = rh->rh_ssrc;
1015        if (ps->srcid() != ssrc)
1016                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
1017        else
1018                s = ps;
1019       
1020        Source::Layer& sl = s->layer(layer);
1021       
1022        timeval now = unixtime();
1023
1024        sl.lts_ctrl(now);
1025        sl.sts_ctrl(ntohl(sr->sr_ntp.upper) << 16 |
1026                ntohl(sr->sr_ntp.lower) >> 16);
1027       
1028        //int cnt = flags >> 8 & 0x1f;
1029        //parse_rr_records(ssrc, (rtcp_rr*)(sr + 1), cnt, ep, addr);
1030       
1031        /*s->lts_ctrl(now);
1032        s->sts_ctrl(ntohl(sr->sr_ntp.upper) << 16 |
1033        ntohl(sr->sr_ntp.lower) >> 16);*/
1034       
1035        s->rtp_ctrl(ntohl(sr->sr_ts));
1036        u_int32_t t = ntptime(now);
1037        s->map_ntp_time(t);
1038        s->map_rtp_time(s->convert_time(t));
1039        s->rtp2ntp(1);
1040        //printf("Got SR\n");
1041
1042        int cnt = flags >> 8 & 0x1f;
1043        parse_rr_records(ssrc, (rtcp_rr*)(sr + 1), cnt, ep, addr);
1044}
1045
1046void SessionManager::parse_rr(rtcphdr* rh, int flags, u_char* ep,
1047                                                          Source* ps, Address & addr, int layer)
1048{
1049        Source* s;
1050        u_int32_t ssrc = rh->rh_ssrc;
1051        if (ps->srcid() != ssrc)
1052                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
1053        else
1054                s = ps;
1055       
1056        s->layer(layer).lts_ctrl(unixtime());
1057        int cnt = flags >> 8 & 0x1f;
1058        parse_rr_records(ssrc, (rtcp_rr*)(rh + 1), cnt, ep, addr);
1059}
1060
1061void SessionManager::parse_xr(rtcphdr* rh, int flags, u_char* ep,
1062                                                          Source* ps, Address & addr, int layer)
1063{
1064        UNUSED(flags);
1065        UNUSED(ep);
1066        UNUSED(ps);
1067        UNUSED(addr);
1068        UNUSED(layer);
1069
1070        Source* s;
1071        u_int32_t ssrc = rh->rh_ssrc;
1072        if (ps->srcid() != ssrc)
1073                s = SourceManager::instance().lookup(ssrc, ssrc, addr);
1074        else
1075                s = ps;
1076
1077        s->layer(layer).lts_ctrl(unixtime());
1078        int cnt = flags >> 8 & 0x1f;
1079        parse_xr_records(ssrc, (rtcp_xr*)(rh + 1), cnt, ep, addr);
1080}
1081
1082void SessionManager::parse_xr_records(u_int32_t ssrc, rtcp_xr* r, int cnt,
1083                                      const u_char* ep, Address & addr)
1084{
1085        r->xr_ackvec = ackvec_;
1086}
1087
1088int SessionManager::sdesbody(u_int32_t* p, u_char* ep, Source* ps,
1089                                                Address & addr, u_int32_t ssrc, int layer)
1090{
1091        Source* s;
1092        u_int32_t srcid = *p;
1093        if (ps->srcid() != srcid)
1094                s = SourceManager::instance().lookup(srcid, ssrc, addr);
1095        else
1096                s = ps;
1097        if (s == 0)
1098                return (0);
1099                /*
1100                * Note ctrl packet since we will never see any direct ctrl packets
1101                * from a source through a mixer (and we don't want the source to
1102                * time out).
1103        */
1104        s->layer(layer).lts_ctrl(unixtime());
1105       
1106        u_char* cp = (u_char*)(p + 1);
1107        while (cp < ep) {
1108                char buf[256];
1109
1110                u_int type = cp[0];
1111                if (type == 0) {
1112                        /* end of chunk */
1113                        return (((cp - (u_char*)p) >> 2) + 1);
1114                }
1115                u_int len = cp[1];
1116                u_char* eopt = cp + len + 2;
1117                if (eopt > ep)
1118                        return (0);
1119
1120                if (type >= RTCP_SDES_MIN && type <= RTCP_SDES_MAX) {
1121                        memcpy(buf, (char*)&cp[2], len);
1122                        buf[len] = 0;
1123                        s->sdes(type, buf);
1124                } // else
1125                        /*XXX*/;
1126
1127                cp = eopt;
1128        }
1129        return (0);
1130}
1131
1132void SessionManager::parse_sdes(rtcphdr* rh, int flags, u_char* ep, Source* ps,
1133                                                                Address & addr, u_int32_t ssrc, int layer)
1134{
1135        int cnt = flags >> 8 & 0x1f;
1136        u_int32_t* p = (u_int32_t*)&rh->rh_ssrc;
1137        while (--cnt >= 0 && (u_char*)p < ep) {
1138                int n = sdesbody(p, ep, ps, addr, ssrc, layer);
1139                if (n == 0)
1140                        break;
1141                p += n;
1142        }
1143        if (cnt >= 0)
1144                ps->badsdes(1);
1145}
1146
1147void SessionManager::parse_bye(rtcphdr* rh, int flags, u_char* ep, Source* ps)
1148{
1149        int cnt = flags >> 8 & 0x1f;
1150        u_int32_t* p = (u_int32_t*)&rh->rh_ssrc;
1151
1152        while (--cnt >= 0) {
1153                if (p >= (u_int32_t*)ep) {
1154                        ps->badbye(1);
1155                        return;
1156                }
1157                Source* s;
1158                if (ps->srcid() != rh->rh_ssrc)
1159                        s = SourceManager::instance().consult(*p);
1160                else
1161                        s = ps;
1162                if (s != 0)
1163                        s->lts_done(unixtime());
1164                ++p;
1165        }
1166}
1167
1168/*
1169 * Receive an rtcp packet (from the control port).
1170 */
1171void SessionManager::recv(CtrlHandler* ch)
1172{
1173        Address * srcp;
1174        int cc = ch->recv(pktbuf_, 2 * RTP_MTU, srcp);
1175        if (cc <= 0)
1176                return;
1177
1178        rtcphdr* rh = (rtcphdr*)pktbuf_;
1179
1180    // Ignore loopback packets
1181        if (!loopback_) {
1182                SourceManager& sm = SourceManager::instance();
1183                if (rh->rh_ssrc == (*sm.localsrc()).srcid())
1184                        return;
1185        }
1186
1187        if (cc < int(sizeof(*rh))) {
1188                ++nrunt_;
1189                return;
1190        }
1191        /*
1192         * try to filter out junk: first thing in packet must be
1193         * sr, rr or bye & version number must be correct.
1194         */
1195        switch(ntohs(rh->rh_flags) & 0xc0ff) {
1196        case RTP_VERSION << 14 | RTCP_PT_SR:
1197        case RTP_VERSION << 14 | RTCP_PT_RR:
1198        case RTP_VERSION << 14 | RTCP_PT_XR:
1199        case RTP_VERSION << 14 | RTCP_PT_BYE:
1200                break;
1201        default:
1202                /*
1203                 * XXX should further categorize this error -- it is
1204                 * likely that people mis-implement applications that
1205                 * don't put something other than SR,RR,BYE first.
1206                 */
1207                ++badversion_;
1208                return;
1209        }
1210        /*
1211         * at this point we think the packet's valid.  Update our average
1212         * size estimator.  Also, there's valid ssrc so charge errors to it
1213         */
1214        rtcp_avg_size_ += RTCP_SIZE_GAIN * (double(cc + 28) - rtcp_avg_size_);
1215        Address & addr = *srcp;
1216
1217        /*
1218         * First record in compount packet must be the ssrc of the
1219         * sender of the packet.  Pull it out here so we can use
1220         * it in the sdes parsing, since the sdes record doesn't
1221         * contain the ssrc of the sender (in the case of mixers).
1222         */
1223        u_int32_t ssrc = rh->rh_ssrc;
1224        Source* ps = SourceManager::instance().lookup(ssrc, ssrc, addr);
1225        if (ps == 0)
1226                return;
1227       
1228        int layer = ch - ch_;
1229                /*
1230                * Outer loop parses multiple RTCP records of a "compound packet".
1231                * There is no framing between records.  Boundaries are implicit
1232                * and the overall length comes from UDP.
1233        */
1234        u_char* epack = (u_char*)rh + cc;
1235        while ((u_char*)rh < epack) {
1236                u_int len = (ntohs(rh->rh_len) << 2) + 4;
1237                u_char* ep = (u_char*)rh + len;
1238                if (ep > epack) {
1239                        ps->badsesslen(1);
1240                        return;
1241                }
1242                u_int flags = ntohs(rh->rh_flags);
1243                if (flags >> 14 != RTP_VERSION) {
1244                        ps->badsessver(1);
1245                        return;
1246                }
1247                switch (flags & 0xff) {
1248
1249                case RTCP_PT_SR:
1250                        parse_sr(rh, flags, ep, ps, addr, layer);
1251                        break;
1252
1253                case RTCP_PT_RR:
1254                        parse_rr(rh, flags, ep, ps, addr, layer);
1255                        break;
1256
1257                case RTCP_PT_XR:
1258                        parse_xr(rh, flags, ep, ps, addr, layer);
1259                        break;
1260
1261                case RTCP_PT_SDES:
1262                        parse_sdes(rh, flags, ep, ps, addr, ssrc, layer);
1263                        break;
1264
1265                case RTCP_PT_BYE:
1266                        parse_bye(rh, flags, ep, ps);
1267                        break;
1268
1269                default:
1270                        ps->badsessopt(1);
1271                        break;
1272                }
1273                rh = (rtcphdr*)ep;
1274        }
1275        return;
1276}
Note: See TracBrowser for help on using the browser.