root/vic/branches/cc/rtp/transmitter.cpp @ 4728

Revision 4728, 10.8 KB (checked in by soohyunc, 4 years ago)

o Packet/AckVec re-ordering

when the sender received a deprecated ack (out of 3 dupack range), then do
nothing but trigger send packets out to keep the Jacob's packet conservative
rule.

when the sender received a re-ordered ack, then do nothing but trigger send
packets out as the above.

o trim TfwcSndr? and Transmitter

only need to pass ts_off_ once at the very beginning.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*-
2 * Copyright (c) 1993-1994 The Regents of the University of California.
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 * 3. All advertising materials mentioning features or use of this software
14 *    must display the following acknowledgement:
15 *      This product includes software developed by the Network Research
16 *      Group at Lawrence Berkeley Laboratory.
17 * 4. Neither the name of the University nor of the Laboratory may be used
18 *    to endorse or promote products derived from this software without
19 *    specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
22 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
24 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
25 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
26 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
27 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
28 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
30 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
31 * SUCH DAMAGE.
32 *
33 * $Id$
34 */
35static const char rcsid[] =
36    "@(#) $Header$ (LBL)";
37
38#include <stdio.h>
39#include <stdlib.h>
40#ifndef WIN32
41#include <unistd.h>
42#endif
43#include <errno.h>
44#include <string.h>
45#ifdef WIN32
46//#include <winsock.h>
47#include <io.h>
48#include <sys/stat.h>
49#else
50#include <sys/param.h>
51#include <sys/uio.h>
52#include <netinet/in.h>
53#include <sys/file.h>
54#include <sys/stat.h>
55#endif
56#include "ntp-time.h"
57#include "pktbuf-rtp.h"
58#include "transmitter.h"
59#include "net.h"
60#include "source.h"
61#include "decoder.h"
62#include "vic_tcl.h"
63
64#if defined(sun) && !defined(__svr4__) || (defined(_AIX) && !defined(_AIX41))
65extern "C" writev(int, iovec*, int);
66#endif
67
68//Transmitter::pktbuf* Transmitter::freehdrs_;
69//Transmitter::buffer* Transmitter::freebufs_;
70int Transmitter::nbufs_;
71int Transmitter::nhdrs_;
72
73/*
74 * Sequence number is static so when we change the encoding (which causes
75 * new encoder to be allocated) we don't reset the sequence counter.
76 * Otherwise, receivers will get confused, reset their stats, and generate
77 * odd looking streams of reception reports (i.e., the packet counts will
78 * drop back to 0).
79 */
80u_int16_t Transmitter::seqno_ = 1;
81
82Transmitter::Transmitter() :
83        mtu_(1024),
84        nf_(0),
85        nb_(0),
86        np_(0),
87        kbps_(128),
88        nextpkttime_(0.),
89        busy_(0),
90        head_(0),
91        tail_(0),
92        loop_layer_(1000),
93        loopback_(0),
94        is_cc_active_(1),
95        is_buf_empty_(1),
96        cc_type_(WBCC)
97{
98        memset((char*)&mh_, 0, sizeof(mh_));
99        mh_.msg_iovlen = 2;
100
101        // CC related...
102        epc_ = 0;       // experimental packet counter
103}
104
105/* Return time of day in seconds */
106inline double Transmitter::gettimeofday_secs() const
107{
108        timeval tv;
109        ::gettimeofday(&tv, 0);
110        return (tv.tv_sec + 1e-6 * tv.tv_usec);
111}
112
113void Transmitter::loopback(pktbuf* pb)
114{
115        int layer = pb->layer;
116        rtphdr* rh = (rtphdr*)pb->data;
117        int cc = pb->len;
118        /*
119         * Update statistics.
120         */
121        if (layer >= loop_layer_) {
122                /*XXX*/
123                pb->release();
124                return;
125        }
126        nb_ += cc;
127        ++np_;
128
129        SourceManager& sm = SourceManager::instance();
130        Source* s = sm.localsrc();
131        timeval now = unixtime();
132        Source::Layer& sl = s->layer(pb->layer);
133
134        sl.lts_data(now);
135        s->action();
136        sl.sts_data(rh->rh_ts);
137        sl.np(1);
138        sl.nb(cc);
139        sl.cs((u_int16_t)ntohs(rh->rh_seqno),s);
140
141        int flags = ntohs(rh->rh_flags);
142        if (flags & RTP_M) {
143                ++nf_;
144                sl.nf(1);
145        }
146        int fmt = flags & 0x7f;
147        /*
148         * Handle initialization of loopback decoder
149         * and changes in the stream.
150         */
151        PacketHandler* h = s->handler();
152        if (h == 0)
153                h = s->activate(fmt);
154        else if (s->format() != fmt) {
155                h = s->change_format(fmt);
156        }
157
158        if (s->mute()) {
159                pb->release();
160                return;
161        }
162        h->recv(pb);
163}
164
165int Transmitter::dumpfd_ = -1;
166void Transmitter::dump(int fd)
167{
168        dumpfd_ = fd;
169#define MAGIC "RTPCLIP 1.0"
170        (void)write(fd, MAGIC, sizeof(MAGIC));
171}
172
173/*XXX*/
174#ifdef WIN32
175int writev(int fd, iovec* iov, int iovlen)
176{
177        int len = 0, n;
178        for (int i = 0; i < iovlen; i++) {
179                if ((n = write(fd, iov[i].iov_base, iov[i].iov_len)) == -1) {
180                        perror("writev");
181                        exit(1);
182                }
183                len += n;
184        }
185        return(len);
186}
187#endif
188
189void Transmitter::dump(int fd, iovec* iov, int iovlen) const
190{
191        register int length = 0;
192        for (int i = iovlen; --i >= 0; )
193                length += iov[i].iov_len;
194
195        char cliphdr[4];
196        *(short*)cliphdr = htons(length);
197        cliphdr[2] = 0; /* data packet (i.e., not an rtcp packet) */
198        cliphdr[3] = 0; /* ? */
199
200        (void)write(fd, cliphdr, 4);
201        if (writev(fd, iov, iovlen) < 0) {
202                perror("write");
203                exit(1);
204        }
205}
206
207/*
208 * Time it takes in seconds to send this
209 * packet at the configured bandwidth.
210 */
211double Transmitter::txtime(pktbuf* pb)
212{
213//      int cc = pb->iov[0].iov_len + pb->iov[1].iov_len;
214        int cc = pb->len;
215        return (8 * cc / (1000. * kbps_));
216}
217
218/*
219 * Tx pktbuf size
220 */
221int Transmitter::tx_buf_size() {
222        int size = 0;
223        pktbuf* pb = head_;
224        while (pb) {
225                size++;
226                pb = pb->next;
227        }       
228        return size;
229}
230
231void Transmitter::send(pktbuf* pb)
232{
233        switch (cc_type_) {
234        //
235        // window-based congestion control (TFWC)
236        //
237        case WBCC:
238                // pb is empty - try sending a packet
239                if(is_buf_empty_) {
240                        if (head_ != 0) {
241                                tail_->next = pb;
242                                tail_ = pb;
243                        } else
244                                tail_ = head_ = pb;
245                        pb->next = 0;
246                        cc_tfwc_output();
247                        is_buf_empty_ = false;
248                }
249                // if not, check if cwnd allows send this packet
250                else {
251                        if (head_ != 0) {
252                                tail_->next = pb;
253                                tail_ = pb;
254                        } else
255                                tail_ = head_ = pb;
256                        pb->next = 0;
257                        cc_tfwc_output(pb);
258                }
259                break;
260
261        //
262        // rate-based congestion control (TFRC)
263        //
264        case RBCC:
265                // pb is empty
266                if(is_buf_empty_) {
267                        if (head_ != 0) {
268                                tail_->next = pb;
269                                tail_ = pb;
270                        } else
271                                tail_ = head_ = pb;
272                        pb->next = 0;
273                        cc_tfrc_output();
274                        is_buf_empty_ = false;
275                }
276                // pb is not emtpy
277                else {
278                        if (head_ != 0) {
279                                tail_->next = pb;
280                                tail_ = pb;
281                        } else
282                                tail_ = head_ = pb;
283                        pb->next = 0;
284                }
285                break;
286
287        //
288        // without congestion control
289        //
290        case NOCC:
291        default:
292                // CC is not active, so just go for the normal operation
293                if (!busy_) {
294                        double delay = txtime(pb);
295                        nextpkttime_ = gettimeofday_secs() + delay;
296                        output(pb);
297                        /*
298                         * emulate a transmit interrupt --
299                         * assume we will have more to send.
300                         */
301                        msched(int(delay * 1e-3));
302                        busy_ = 1;
303                } else {
304                        if (head_ != 0) {
305                                tail_->next = pb;
306                                tail_ = pb;
307                        } else
308                                tail_ = head_ = pb;
309                        pb->next = 0;
310                }
311        } // switch (cc_type)
312}
313
314void Transmitter::cc_tfwc_output(pktbuf* pb)
315{
316//      fprintf(stderr,"\t--------entering cc_tfwc_output(pb)---------\n");
317//      fprintf(stderr,"\t|                                          |\n");
318//      fprintf(stderr,"\tV                                          V\n");
319
320        // pb is not null, hence parse it.
321        rtphdr* rh = (rtphdr *) pb->data;
322
323        int magic = (int) tfwc_magic();
324        //debug_msg("cwnd: %d\n", magic);
325        int jack = (int) tfwc_sndr_jacked();
326        //debug_msg("jack: %d\n", jack);
327       
328        if (ntohs(rh->rh_seqno) <= magic + jack) {
329                // record seqno and timestamp at TfwcSndr side
330                tfwc_sndr_send(ntohs(rh->rh_seqno), tx_now()-tx_now_offset_);
331                // move head pointer
332                head_ = pb->next;
333                // call Transmitter::output_data_only(pb)
334                output_data_only(pb);
335        }
336//      fprintf(stderr,"\t^                                          ^\n");
337//      fprintf(stderr,"\t|                                          |\n");
338//      fprintf(stderr,"\t============================================\n");
339}
340
341/*
342 * main TFWC CC output routines
343 */
344void Transmitter::cc_tfwc_output()
345{
346//      fprintf(stderr,"\t---------entering cc_tfwc_output()----------\n");
347//      fprintf(stderr,"\t|                                          |\n");
348//      fprintf(stderr,"\tV                                          V\n");
349
350        // head of the RTP data packet buffer (pb)
351        pktbuf* pb = head_;
352
353        // if pb is null, then set the next available packet as the first packet of
354        // the packet buffer. and then, return - i.e., do not try sending packets.
355        if (pb == 0) {
356                is_buf_empty_ = true;
357//              fprintf(stderr,
358//              "\t=========== PACKET NOT AVAILABLE ===========\n\n");
359                return;
360        }
361
362        //printf("\tthere are packets available to send in cc_tfwc_output()\n");
363        // pb is not null, hence parse it.
364        rtphdr* rh = (rtphdr *) pb->data;
365
366        // cwnd value
367        int magic = (int) tfwc_magic();
368//      debug_msg("cwnd: %d\n", magic);
369
370        // just acked seqno
371        int jack = (int) tfwc_sndr_jacked();
372//      debug_msg("jack: %d\n", jack);
373
374        //fprintf(stderr, "\tXXX now: %f\tnum: %d\tcwnd: %d\tjack: %d\n",
375        //tx_now()-tx_now_offset_, ntohs(rh->rh_seqno), magic, jack);
376
377        // while packet seqno is within "cwnd + jack", send that packet
378        while (ntohs(rh->rh_seqno) <= magic + jack) {
379                // record seqno and timestamp at TfwcSndr side
380                tfwc_sndr_send(ntohs(rh->rh_seqno), tx_now()-tx_now_offset_);
381                // move head pointer
382                head_ = pb->next;
383                // call Transmitter::output(pb)
384                output(pb);
385
386                // if the moved head pointer is not null, parse packet buffer.
387                // otherwise, break while statement.
388                if (head_ != 0) {
389                        pb = head_;
390                        rh = (rtphdr *) pb->data;
391                } else {
392                        break;
393                }
394        } // end while ()
395//      fprintf(stderr,"\t^                                          ^\n");
396//      fprintf(stderr,"\t|                                          |\n");
397//      fprintf(stderr,"\t============================================\n");
398}
399
400/*
401 * main TFRC CC output
402 */
403void Transmitter::cc_tfrc_output() {
404        // TBA
405}
406
407void Transmitter::timeout()
408{
409        double now = gettimeofday_secs();
410        for (;;) {
411                pktbuf* p = head_;
412                if (p != 0) {
413                        head_ = p->next;
414                        nextpkttime_ += txtime(p);
415                        output(p);
416                        int ms = int(1e-3 * (nextpkttime_ - now));
417                        /* make sure we will wait more than 10ms */
418                        if (ms > 1000) {
419                                msched(ms);
420                                return;
421                        }
422                } else {
423                        busy_ = 0;
424                        break;
425                }
426        }
427}
428
429void Transmitter::flush()
430{
431        if (!is_cc_on()) {
432                if (busy_) {
433                        busy_ = 0;
434                        cancel();
435                }
436
437                pktbuf* p = head_;
438                while (p != 0) {
439                        pktbuf* n = p->next;
440                        output(p);
441                        p = n;
442                }
443                head_ = 0;
444        }
445}
446
447void Transmitter::output(pktbuf* pb)
448{
449        //fprintf(stderr, "\n\tTransmitter::output()\n");
450        //if (dumpfd_ >= 0)
451        //      dump(dumpfd_, pb->iov, mh_.msg_iovlen);
452//dprintf("layer: %d \n",pb->layer);
453        transmit(pb);
454        loopback(pb);
455//      pb->release() is called by decoder in loopback;
456}
457
458void Transmitter::output_data_only(pktbuf* pb)
459{
460        tx_data_only(pb);
461        loopback(pb);
462}
463
464/*void Transmitter::release(pktbuf* pb)
465{
466        pb->next = freehdrs_;
467        freehdrs_ = pb;
468        buffer* p = pb->buf;
469        if (p != 0) {
470                p->next = freebufs_;
471                freebufs_ = p;
472        }
473}
474*/
Note: See TracBrowser for help on using the browser.