root/vic/branches/cc/rtp/transmitter.cpp @ 4614

Revision 4614, 9.6 KB (checked in by soohyunc, 4 years ago)

sync'ed time measurement method among grabber/session manager/transmitter/codec

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*-
2 * Copyright (c) 1993-1994 The Regents of the University of California.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 * 3. All advertising materials mentioning features or use of this software
14 *    must display the following acknowledgement:
15 *      This product includes software developed by the Network Research
16 *      Group at Lawrence Berkeley Laboratory.
17 * 4. Neither the name of the University nor of the Laboratory may be used
18 *    to endorse or promote products derived from this software without
19 *    specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
22 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
24 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
25 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
26 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
27 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
28 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
30 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
31 * SUCH DAMAGE.
32 *
33 * $Id$
34 */
35static const char rcsid[] =
36    "@(#) $Header$ (LBL)";
37
38#include <stdio.h>
39#include <stdlib.h>
40#ifndef WIN32
41#include <unistd.h>
42#endif
43#include <errno.h>
44#include <string.h>
45#ifdef WIN32
46//#include <winsock.h>
47#include <io.h>
48#include <sys/stat.h>
49#else
50#include <sys/param.h>
51#include <sys/uio.h>
52#include <netinet/in.h>
53#include <sys/file.h>
54#include <sys/stat.h>
55#endif
56#include "ntp-time.h"
57#include "pktbuf-rtp.h"
58#include "transmitter.h"
59#include "net.h"
60#include "source.h"
61#include "decoder.h"
62#include "vic_tcl.h"
63
64#if defined(sun) && !defined(__svr4__) || (defined(_AIX) && !defined(_AIX41))
65extern "C" writev(int, iovec*, int);
66#endif
67
68//Transmitter::pktbuf* Transmitter::freehdrs_;
69//Transmitter::buffer* Transmitter::freebufs_;
70int Transmitter::nbufs_;
71int Transmitter::nhdrs_;
72
73/*
74 * Sequence number is static so when we change the encoding (which causes
75 * new encoder to be allocated) we don't reset the sequence counter.
76 * Otherwise, receivers will get confused, reset their stats, and generate
77 * odd looking streams of reception reports (i.e., the packet counts will
78 * drop back to 0).
79 */
80u_int16_t Transmitter::seqno_ = 1;
81
82Transmitter::Transmitter() :
83        mtu_(1024),
84        nf_(0),
85        nb_(0),
86        np_(0),
87        kbps_(128),
88        nextpkttime_(0.),
89        busy_(0),
90        head_(0),
91        tail_(0),
92        loop_layer_(1000),
93        loopback_(0),
94        is_cc_active_(1),
95        is_first_(1),
96        cc_type_(WBCC)
97{
98        memset((char*)&mh_, 0, sizeof(mh_));
99        mh_.msg_iovlen = 2;
100}
101
102/* Return time of day in seconds */
103inline double Transmitter::gettimeofday_secs() const
104{
105        timeval tv;
106        ::gettimeofday(&tv, 0);
107        return (tv.tv_sec + 1e-6 * tv.tv_usec);
108}
109
110void Transmitter::loopback(pktbuf* pb)
111{
112        int layer = pb->layer;
113        rtphdr* rh = (rtphdr*)pb->data;
114        int cc = pb->len;
115        /*
116         * Update statistics.
117         */
118        if (layer >= loop_layer_) {
119                /*XXX*/
120                pb->release();
121                return;
122        }
123        nb_ += cc;
124        ++np_;
125
126        SourceManager& sm = SourceManager::instance();
127        Source* s = sm.localsrc();
128        timeval now = unixtime();
129        Source::Layer& sl = s->layer(pb->layer);
130
131        sl.lts_data(now);
132        s->action();
133        sl.sts_data(rh->rh_ts);
134        sl.np(1);
135        sl.nb(cc);
136        sl.cs((u_int16_t)ntohs(rh->rh_seqno),s);
137
138        int flags = ntohs(rh->rh_flags);
139        if (flags & RTP_M) {
140                ++nf_;
141                sl.nf(1);
142        }
143        int fmt = flags & 0x7f;
144        /*
145         * Handle initialization of loopback decoder
146         * and changes in the stream.
147         */
148        PacketHandler* h = s->handler();
149        if (h == 0)
150                h = s->activate(fmt);
151        else if (s->format() != fmt) {
152                h = s->change_format(fmt);
153        }
154
155        if (s->mute()) {
156                pb->release();
157                return;
158        }
159        h->recv(pb);
160}
161
162int Transmitter::dumpfd_ = -1;
163void Transmitter::dump(int fd)
164{
165        dumpfd_ = fd;
166#define MAGIC "RTPCLIP 1.0"
167        (void)write(fd, MAGIC, sizeof(MAGIC));
168}
169
170/*XXX*/
171#ifdef WIN32
172int writev(int fd, iovec* iov, int iovlen)
173{
174        int len = 0, n;
175        for (int i = 0; i < iovlen; i++) {
176                if ((n = write(fd, iov[i].iov_base, iov[i].iov_len)) == -1) {
177                        perror("writev");
178                        exit(1);
179                }
180                len += n;
181        }
182        return(len);
183}
184#endif
185
186void Transmitter::dump(int fd, iovec* iov, int iovlen) const
187{
188        register int length = 0;
189        for (int i = iovlen; --i >= 0; )
190                length += iov[i].iov_len;
191
192        char cliphdr[4];
193        *(short*)cliphdr = htons(length);
194        cliphdr[2] = 0; /* data packet (i.e., not an rtcp packet) */
195        cliphdr[3] = 0; /* ? */
196
197        (void)write(fd, cliphdr, 4);
198        if (writev(fd, iov, iovlen) < 0) {
199                perror("write");
200                exit(1);
201        }
202}
203
204/*
205 * Time it takes in seconds to send this
206 * packet at the configured bandwidth.
207 */
208double Transmitter::txtime(pktbuf* pb)
209{
210//      int cc = pb->iov[0].iov_len + pb->iov[1].iov_len;
211        int cc = pb->len;
212        return (8 * cc / (1000. * kbps_));
213}
214
215void Transmitter::send(pktbuf* pb)
216{
217        switch (cc_type_) {
218        //
219        // window-based congestion control (TFWC)
220        //
221        case WBCC:
222                // if it is the very first packet, just send it.
223                if(is_first_) {
224                        if (head_ != 0) {
225                                tail_->next = pb;
226                                tail_ = pb;
227                        } else
228                                tail_ = head_ = pb;
229                        pb->next = 0;
230                        cc_tfwc_output();
231                        is_first_ = false;
232                }
233                // if it is not, just queue up the packets.
234                else {
235                        if (head_ != 0) {
236                                tail_->next = pb;
237                                tail_ = pb;
238                        } else
239                                tail_ = head_ = pb;
240                        pb->next = 0;
241                }
242                break;
243
244        //
245        // rate-based congestion control (TFRC)
246        //
247        case RBCC:
248                // if it is the very first packet, just send it.
249                if(is_first_) {
250                        if (head_ != 0) {
251                                tail_->next = pb;
252                                tail_ = pb;
253                        } else
254                                tail_ = head_ = pb;
255                        pb->next = 0;
256                        cc_tfrc_output();
257                        is_first_ = false;
258                }
259                // if it is not, just queue up the packets.
260                else {
261                        if (head_ != 0) {
262                                tail_->next = pb;
263                                tail_ = pb;
264                        } else
265                                tail_ = head_ = pb;
266                        pb->next = 0;
267                }
268                break;
269
270        //
271        // without congestion control
272        //
273        case NOCC:
274        default:
275                // CC is not active, so just go for the normal operation
276                if (!busy_) {
277                        double delay = txtime(pb);
278                        nextpkttime_ = gettimeofday_secs() + delay;
279                        output(pb);
280                        /*
281                         * emulate a transmit interrupt --
282                         * assume we will have more to send.
283                         */
284                        msched(int(delay * 1e-3));
285                        busy_ = 1;
286                } else {
287                        if (head_ != 0) {
288                                tail_->next = pb;
289                                tail_ = pb;
290                        } else
291                                tail_ = head_ = pb;
292                        pb->next = 0;
293                }
294        } // switch (cc_type)
295}
296
297/*
298 * main TFWC CC output routines
299 */
300void Transmitter::cc_tfwc_output()
301{
302        fprintf(stderr,"\t---------entering cc_tfwc_output()----------\n");
303        fprintf(stderr,"\t|                                          |\n");
304        fprintf(stderr,"\tV                                          V\n");
305
306        // head of the RTP data packet buffer (pb)
307        pktbuf* pb = head_;
308
309        // if pb is null, then set the next available packet as the first packet of
310        // the packet buffer. and then, return - i.e., do not try sending packets.
311        if (pb == 0) {
312                is_first_ = true;
313                fprintf(stderr,
314                "\t=========== PACKET NOT AVAILABLE ===========\n\n");
315                return;
316        }
317
318        //printf("\tthere are packets available to send in cc_tfwc_output()\n");
319        // pb is not null, hence parse it.
320        rtphdr* rh = (rtphdr *) pb->data;
321
322        // cwnd value
323        int magic = (int) tfwc_magic();
324        //debug_msg("cwnd: %d\n", magic);
325
326        // just acked seqno
327        int jack = (int) tfwc_sndr_jacked();
328        //debug_msg("jack: %d\n", jack);
329
330        //fprintf(stderr, "\tXXX now: %f\tnum: %d\tcwnd: %d\tjack: %d\n",
331        //tx_now()-tx_now_offset_, ntohs(rh->rh_seqno), magic, jack);
332
333        // while packet seqno is within "cwnd + jack", send that packet
334        while (ntohs(rh->rh_seqno) <= magic + jack) {
335                fprintf(stderr, "\n\tnow: %f\tseqno: %d\n\n",
336                        tx_now()-tx_now_offset_, ntohs(rh->rh_seqno));
337                // record seqno and timestamp at TfwcSndr side
338                tfwc_sndr_send(pb);
339
340                // move head pointer
341                head_ = pb->next;
342
343                // call Transmitter::output(pb)
344                output(pb);
345
346                // if the moved head pointer is not null, parse packet buffer.
347                // otherwise, break while statement.
348                if (head_ != 0) {
349                        pb = head_;
350                        rh = (rtphdr *) pb->data;
351                } else {
352                        break;
353                }
354        } // end while ()
355        fprintf(stderr,"\t^                                          ^\n");
356        fprintf(stderr,"\t|                                          |\n");
357        fprintf(stderr,"\t============================================\n");
358}
359
360/*
361 * main TFRC CC output
362 */
363void Transmitter::cc_tfrc_output() {
364        // TBA
365}
366
367void Transmitter::timeout()
368{
369        double now = gettimeofday_secs();
370        for (;;) {
371                pktbuf* p = head_;
372                if (p != 0) {
373                        head_ = p->next;
374                        nextpkttime_ += txtime(p);
375                        output(p);
376                        int ms = int(1e-3 * (nextpkttime_ - now));
377                        /* make sure we will wait more than 10ms */
378                        if (ms > 1000) {
379                                msched(ms);
380                                return;
381                        }
382                } else {
383                        busy_ = 0;
384                        break;
385                }
386        }
387}
388
389void Transmitter::flush()
390{
391        if (!is_cc_on()) {
392                if (busy_) {
393                        busy_ = 0;
394                        cancel();
395                }
396
397                pktbuf* p = head_;
398                while (p != 0) {
399                        pktbuf* n = p->next;
400                        output(p);
401                        p = n;
402                }
403                head_ = 0;
404        }
405}
406
407void Transmitter::output(pktbuf* pb)
408{
409        //fprintf(stderr, "\n\tTransmitter::output()\n");
410        //if (dumpfd_ >= 0)
411        //      dump(dumpfd_, pb->iov, mh_.msg_iovlen);
412//dprintf("layer: %d \n",pb->layer);
413        transmit(pb);
414        loopback(pb);
415//      pb->release() is called by decoder in loopback;
416}
417
418/*void Transmitter::release(pktbuf* pb)
419{
420        pb->next = freehdrs_;
421        freehdrs_ = pb;
422        buffer* p = pb->buf;
423        if (p != 0) {
424                p->next = freebufs_;
425                freebufs_ = p;
426        }
427}
428*/
Note: See TracBrowser for help on using the browser.