root/vic/branches/cc/rtp/transmitter.cpp @ 4617

Revision 4617, 10.7 KB (checked in by soohyunc, 4 years ago)

*** Vic can send packets out within the encoding loop ***

as long as Vic has enough cwnd to send that packet.

i.e.,)
as long as cwnd allows to send the packet during encoding, Vic doesn't have
to wait for XR (ackvec) reception.

Vic should be able to send the packet immediately even in the encoding loop.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*-
2 * Copyright (c) 1993-1994 The Regents of the University of California.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 * 3. All advertising materials mentioning features or use of this software
14 *    must display the following acknowledgement:
15 *      This product includes software developed by the Network Research
16 *      Group at Lawrence Berkeley Laboratory.
17 * 4. Neither the name of the University nor of the Laboratory may be used
18 *    to endorse or promote products derived from this software without
19 *    specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
22 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
24 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
25 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
26 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
27 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
28 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
30 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
31 * SUCH DAMAGE.
32 *
33 * $Id$
34 */
35static const char rcsid[] =
36    "@(#) $Header$ (LBL)";
37
38#include <stdio.h>
39#include <stdlib.h>
40#ifndef WIN32
41#include <unistd.h>
42#endif
43#include <errno.h>
44#include <string.h>
45#ifdef WIN32
46//#include <winsock.h>
47#include <io.h>
48#include <sys/stat.h>
49#else
50#include <sys/param.h>
51#include <sys/uio.h>
52#include <netinet/in.h>
53#include <sys/file.h>
54#include <sys/stat.h>
55#endif
56#include "ntp-time.h"
57#include "pktbuf-rtp.h"
58#include "transmitter.h"
59#include "net.h"
60#include "source.h"
61#include "decoder.h"
62#include "vic_tcl.h"
63
64#if defined(sun) && !defined(__svr4__) || (defined(_AIX) && !defined(_AIX41))
65extern "C" writev(int, iovec*, int);
66#endif
67
68//Transmitter::pktbuf* Transmitter::freehdrs_;
69//Transmitter::buffer* Transmitter::freebufs_;
70int Transmitter::nbufs_;
71int Transmitter::nhdrs_;
72
73/*
74 * Sequence number is static so when we change the encoding (which causes
75 * new encoder to be allocated) we don't reset the sequence counter.
76 * Otherwise, receivers will get confused, reset their stats, and generate
77 * odd looking streams of reception reports (i.e., the packet counts will
78 * drop back to 0).
79 */
80u_int16_t Transmitter::seqno_ = 1;
81
82Transmitter::Transmitter() :
83        mtu_(1024),
84        nf_(0),
85        nb_(0),
86        np_(0),
87        kbps_(128),
88        nextpkttime_(0.),
89        busy_(0),
90        head_(0),
91        tail_(0),
92        loop_layer_(1000),
93        loopback_(0),
94        is_cc_active_(1),
95        is_buf_empty_(1),
96        cc_type_(WBCC)
97{
98        memset((char*)&mh_, 0, sizeof(mh_));
99        mh_.msg_iovlen = 2;
100}
101
102/* Return time of day in seconds */
103inline double Transmitter::gettimeofday_secs() const
104{
105        timeval tv;
106        ::gettimeofday(&tv, 0);
107        return (tv.tv_sec + 1e-6 * tv.tv_usec);
108}
109
110void Transmitter::loopback(pktbuf* pb)
111{
112        int layer = pb->layer;
113        rtphdr* rh = (rtphdr*)pb->data;
114        int cc = pb->len;
115        /*
116         * Update statistics.
117         */
118        if (layer >= loop_layer_) {
119                /*XXX*/
120                pb->release();
121                return;
122        }
123        nb_ += cc;
124        ++np_;
125
126        SourceManager& sm = SourceManager::instance();
127        Source* s = sm.localsrc();
128        timeval now = unixtime();
129        Source::Layer& sl = s->layer(pb->layer);
130
131        sl.lts_data(now);
132        s->action();
133        sl.sts_data(rh->rh_ts);
134        sl.np(1);
135        sl.nb(cc);
136        sl.cs((u_int16_t)ntohs(rh->rh_seqno),s);
137
138        int flags = ntohs(rh->rh_flags);
139        if (flags & RTP_M) {
140                ++nf_;
141                sl.nf(1);
142        }
143        int fmt = flags & 0x7f;
144        /*
145         * Handle initialization of loopback decoder
146         * and changes in the stream.
147         */
148        PacketHandler* h = s->handler();
149        if (h == 0)
150                h = s->activate(fmt);
151        else if (s->format() != fmt) {
152                h = s->change_format(fmt);
153        }
154
155        if (s->mute()) {
156                pb->release();
157                return;
158        }
159        h->recv(pb);
160}
161
162int Transmitter::dumpfd_ = -1;
163void Transmitter::dump(int fd)
164{
165        dumpfd_ = fd;
166#define MAGIC "RTPCLIP 1.0"
167        (void)write(fd, MAGIC, sizeof(MAGIC));
168}
169
170/*XXX*/
171#ifdef WIN32
172int writev(int fd, iovec* iov, int iovlen)
173{
174        int len = 0, n;
175        for (int i = 0; i < iovlen; i++) {
176                if ((n = write(fd, iov[i].iov_base, iov[i].iov_len)) == -1) {
177                        perror("writev");
178                        exit(1);
179                }
180                len += n;
181        }
182        return(len);
183}
184#endif
185
186void Transmitter::dump(int fd, iovec* iov, int iovlen) const
187{
188        register int length = 0;
189        for (int i = iovlen; --i >= 0; )
190                length += iov[i].iov_len;
191
192        char cliphdr[4];
193        *(short*)cliphdr = htons(length);
194        cliphdr[2] = 0; /* data packet (i.e., not an rtcp packet) */
195        cliphdr[3] = 0; /* ? */
196
197        (void)write(fd, cliphdr, 4);
198        if (writev(fd, iov, iovlen) < 0) {
199                perror("write");
200                exit(1);
201        }
202}
203
204/*
205 * Time it takes in seconds to send this
206 * packet at the configured bandwidth.
207 */
208double Transmitter::txtime(pktbuf* pb)
209{
210//      int cc = pb->iov[0].iov_len + pb->iov[1].iov_len;
211        int cc = pb->len;
212        return (8 * cc / (1000. * kbps_));
213}
214
215void Transmitter::send(pktbuf* pb)
216{
217        switch (cc_type_) {
218        //
219        // window-based congestion control (TFWC)
220        //
221        case WBCC:
222                // if it is the very first packet, just send it.
223                if(is_buf_empty_) {
224                        if (head_ != 0) {
225                                tail_->next = pb;
226                                tail_ = pb;
227                        } else
228                                tail_ = head_ = pb;
229                        pb->next = 0;
230                        cc_tfwc_output();
231                        is_buf_empty_ = false;
232                }
233                // if it is not, just queue up the packets.
234                else {
235                        if (head_ != 0) {
236                                tail_->next = pb;
237                                tail_ = pb;
238                        } else
239                                tail_ = head_ = pb;
240                        pb->next = 0;
241                        cc_tfwc_output(pb);
242                }
243                break;
244
245        //
246        // rate-based congestion control (TFRC)
247        //
248        case RBCC:
249                // if it is the very first packet, just send it.
250                if(is_buf_empty_) {
251                        if (head_ != 0) {
252                                tail_->next = pb;
253                                tail_ = pb;
254                        } else
255                                tail_ = head_ = pb;
256                        pb->next = 0;
257                        cc_tfrc_output();
258                        is_buf_empty_ = false;
259                }
260                // if it is not, just queue up the packets.
261                else {
262                        if (head_ != 0) {
263                                tail_->next = pb;
264                                tail_ = pb;
265                        } else
266                                tail_ = head_ = pb;
267                        pb->next = 0;
268                }
269                break;
270
271        //
272        // without congestion control
273        //
274        case NOCC:
275        default:
276                // CC is not active, so just go for the normal operation
277                if (!busy_) {
278                        double delay = txtime(pb);
279                        nextpkttime_ = gettimeofday_secs() + delay;
280                        output(pb);
281                        /*
282                         * emulate a transmit interrupt --
283                         * assume we will have more to send.
284                         */
285                        msched(int(delay * 1e-3));
286                        busy_ = 1;
287                } else {
288                        if (head_ != 0) {
289                                tail_->next = pb;
290                                tail_ = pb;
291                        } else
292                                tail_ = head_ = pb;
293                        pb->next = 0;
294                }
295        } // switch (cc_type)
296}
297
298void Transmitter::cc_tfwc_output(pktbuf* pb)
299{
300//      fprintf(stderr,"\t--------entering cc_tfwc_output(pb)---------\n");
301//      fprintf(stderr,"\t|                                          |\n");
302//      fprintf(stderr,"\tV                                          V\n");
303
304        // pb is not null, hence parse it.
305        rtphdr* rh = (rtphdr *) pb->data;
306
307        int magic = (int) tfwc_magic();
308        //debug_msg("cwnd: %d\n", magic);
309        int jack = (int) tfwc_sndr_jacked();
310        //debug_msg("jack: %d\n", jack);
311       
312        if (ntohs(rh->rh_seqno) <= magic + jack) {
313                fprintf(stderr, "\n\tnow: %f\tseqno: %d\n\n",
314                        tx_now()-tx_now_offset_, ntohs(rh->rh_seqno));
315                // record seqno and timestamp at TfwcSndr side
316                tfwc_sndr_send(pb);
317
318                // move head pointer
319                head_ = pb->next;
320
321                // call Transmitter::output(pb)
322                output(pb);
323        }
324//      fprintf(stderr,"\t^                                          ^\n");
325//      fprintf(stderr,"\t|                                          |\n");
326//      fprintf(stderr,"\t============================================\n");
327}
328
329/*
330 * main TFWC CC output routines
331 */
332void Transmitter::cc_tfwc_output()
333{
334//      fprintf(stderr,"\t---------entering cc_tfwc_output()----------\n");
335//      fprintf(stderr,"\t|                                          |\n");
336//      fprintf(stderr,"\tV                                          V\n");
337
338        // head of the RTP data packet buffer (pb)
339        pktbuf* pb = head_;
340
341        // if pb is null, then set the next available packet as the first packet of
342        // the packet buffer. and then, return - i.e., do not try sending packets.
343        if (pb == 0) {
344                is_buf_empty_ = true;
345//              fprintf(stderr,
346//              "\t=========== PACKET NOT AVAILABLE ===========\n\n");
347                return;
348        }
349
350        //printf("\tthere are packets available to send in cc_tfwc_output()\n");
351        // pb is not null, hence parse it.
352        rtphdr* rh = (rtphdr *) pb->data;
353
354        // cwnd value
355        int magic = (int) tfwc_magic();
356//      debug_msg("cwnd: %d\n", magic);
357
358        // just acked seqno
359        int jack = (int) tfwc_sndr_jacked();
360//      debug_msg("jack: %d\n", jack);
361
362        //fprintf(stderr, "\tXXX now: %f\tnum: %d\tcwnd: %d\tjack: %d\n",
363        //tx_now()-tx_now_offset_, ntohs(rh->rh_seqno), magic, jack);
364
365        // while packet seqno is within "cwnd + jack", send that packet
366        while (ntohs(rh->rh_seqno) <= magic + jack) {
367                fprintf(stderr, "\n\tnow: %f\tseqno: %d\n\n",
368                        tx_now()-tx_now_offset_, ntohs(rh->rh_seqno));
369                // record seqno and timestamp at TfwcSndr side
370                tfwc_sndr_send(pb);
371
372                // move head pointer
373                head_ = pb->next;
374
375                // call Transmitter::output(pb)
376                output(pb);
377
378                // if the moved head pointer is not null, parse packet buffer.
379                // otherwise, break while statement.
380                if (head_ != 0) {
381                        pb = head_;
382                        rh = (rtphdr *) pb->data;
383                } else {
384                        break;
385                }
386        } // end while ()
387//      fprintf(stderr,"\t^                                          ^\n");
388//      fprintf(stderr,"\t|                                          |\n");
389//      fprintf(stderr,"\t============================================\n");
390}
391
392/*
393 * main TFRC CC output
394 */
395void Transmitter::cc_tfrc_output() {
396        // TBA
397}
398
399void Transmitter::timeout()
400{
401        double now = gettimeofday_secs();
402        for (;;) {
403                pktbuf* p = head_;
404                if (p != 0) {
405                        head_ = p->next;
406                        nextpkttime_ += txtime(p);
407                        output(p);
408                        int ms = int(1e-3 * (nextpkttime_ - now));
409                        /* make sure we will wait more than 10ms */
410                        if (ms > 1000) {
411                                msched(ms);
412                                return;
413                        }
414                } else {
415                        busy_ = 0;
416                        break;
417                }
418        }
419}
420
421void Transmitter::flush()
422{
423        if (!is_cc_on()) {
424                if (busy_) {
425                        busy_ = 0;
426                        cancel();
427                }
428
429                pktbuf* p = head_;
430                while (p != 0) {
431                        pktbuf* n = p->next;
432                        output(p);
433                        p = n;
434                }
435                head_ = 0;
436        }
437}
438
439void Transmitter::output(pktbuf* pb)
440{
441        //fprintf(stderr, "\n\tTransmitter::output()\n");
442        //if (dumpfd_ >= 0)
443        //      dump(dumpfd_, pb->iov, mh_.msg_iovlen);
444//dprintf("layer: %d \n",pb->layer);
445        transmit(pb);
446        loopback(pb);
447//      pb->release() is called by decoder in loopback;
448}
449
450/*void Transmitter::release(pktbuf* pb)
451{
452        pb->next = freehdrs_;
453        freehdrs_ = pb;
454        buffer* p = pb->buf;
455        if (p != 0) {
456                p->next = freebufs_;
457                freebufs_ = p;
458        }
459}
460*/
Note: See TracBrowser for help on using the browser.