root/rat/trunk/receive.c @ 1939

Revision 1939, 21.1 KB (checked in by ucaccsp, 16 years ago)

Rearrange time.c and rat_time.h to be timers.[ch], 'coz it's neater.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*
2 * FILE:    receive.c
3 * PROGRAM: RAT
4 * AUTHOR:  Isidor Kouvelas + Orion Hodson
5 *
6 * $Revision$
7 * $Date$
8 *
9 * Copyright (c) 1995,1996 University College London
10 * All rights reserved.
11 *
12 * Redistribution and use in source and binary forms, with or without
13 * modification, is permitted, for non-commercial use only, provided
14 * that the following conditions are met:
15 * 1. Redistributions of source code must retain the above copyright
16 *    notice, this list of conditions and the following disclaimer.
17 * 2. Redistributions in binary form must reproduce the above copyright
18 *    notice, this list of conditions and the following disclaimer in the
19 *    documentation and/or other materials provided with the distribution.
20 * 3. All advertising materials mentioning features or use of this software
21 *    must display the following acknowledgement:
22 *      This product includes software developed by the Computer Science
23 *      Department at University College London
24 * 4. Neither the name of the University nor of the Department may be used
25 *    to endorse or promote products derived from this software without
26 *    specific prior written permission.
27 * Use of this software for commercial purposes is explicitly forbidden
28 * unless prior written permission is obtained from the authors.
29 *
30 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
31 * ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
32 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
34 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
35 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
36 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
37 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
38 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
39 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
40 * SUCH DAMAGE.
41 */
42
43#include "config_unix.h"
44#include "config_win32.h"
45#include "assert.h"
46#include "receive.h"
47#include "interfaces.h"
48#include "util.h"
49#include "timers.h"
50#include "rtcp_pckt.h"
51#include "rtcp_db.h"
52#include "repair.h"
53#include "mix.h"
54#include "audio.h"
55#include "convert.h"
56#include "cushion.h"
57#include "transmit.h"
58#include "ui.h"
59
60typedef struct s_participant_playout_buffer {
61        struct s_participant_playout_buffer *next;
62        struct s_rtcp_dbentry *src;
63        rx_queue_element_struct *head_ptr;
64        rx_queue_element_struct *tail_ptr;
65        rx_queue_element_struct *last_got;
66        u_int32 creation_time;
67        u_int32 len;
68        u_int32 age;
69        u_int32 hist; /* bitmap of additions and removals */
70} ppb_t;
71
72/* this is still trial and error */
73#define PB_GROW(x)     (x)->hist<<=1; (x)->len++
74#define PB_SHRINK(x)   (x)->hist = (((x)->hist << 1)|1); (x)->len--
75#define PB_DRAINING(x) (((x->hist)&0x0f) == 0x0f)
76
77rx_queue_element_struct *
78new_rx_unit(void)
79{
80        rx_queue_element_struct *u;
81        u = (rx_queue_element_struct *)block_alloc(sizeof(rx_queue_element_struct));
82        memset(u, 0, sizeof(rx_queue_element_struct));
83        return (u);
84}
85
86static void
87add_unit_to_interval(rx_queue_element_struct *ip, rx_queue_element_struct *ru)
88{
89        ip->talk_spurt_start |= ru->talk_spurt_start;
90        /* XXX we should detect duplicates here [oth]
91         * if we have reached limit of number of ccu's or we have two or
92         * more with identical headers then we know we've had a duplicate.
93         */
94        if (ip->mixed   == FALSE &&
95            ru->cc_pt   == ip->cc_pt &&
96            ip->ccu_cnt == 0) {
97                while(ru->ccu_cnt>0 &&
98                      ip->ccu_cnt < ru->ccu[ru->ccu_cnt-1]->cc->max_cc_per_interval) {
99                        ip->ccu[ip->ccu_cnt++] = ru->ccu[--ru->ccu_cnt];
100                        ru->ccu[ru->ccu_cnt] = NULL;
101                }
102        } else {
103                debug_msg("duplicate\n");        ip->dbe_source[0]->duplicates++;
104       
105        }
106        free_rx_unit(&ru);
107}
108
109static rx_queue_element_struct *
110add_or_get_interval(ppb_t *buf, rx_queue_element_struct *ru)
111{
112        rx_queue_element_struct **ipp;
113
114        ipp = &buf->head_ptr;
115
116        /* we look on src_ts to ensure correct ordering through decode path */
117        while (*ipp && ts_gt(ru->src_ts, (*ipp)->src_ts))
118                ipp = &((*ipp)->next_ptr);
119
120        if (*ipp == NULL || (*ipp)->src_ts != ru->src_ts) {
121                /* interval didn't already exist */
122                ru->next_ptr = *ipp;
123                if (*ipp == NULL) {
124                        ru->prev_ptr = buf->tail_ptr;
125                        buf->tail_ptr = ru;
126                } else {
127                        ru->prev_ptr = (*ipp)->prev_ptr;
128                        (*ipp)->prev_ptr = ru;
129                }
130                *ipp = ru;
131                PB_GROW(buf);
132        }
133        return (*ipp);
134}
135
136#define MAX_FILLIN_UNITS 32
137
138static int
139fillin_playout_buffer(ppb_t *buf,
140                      rx_queue_element_struct *from,
141                      rx_queue_element_struct *to)
142{
143        rx_queue_element_struct *last, *curr;
144        u_int32 playout_step, units_made = 0;
145       
146        assert(ts_abs_diff(from->src_ts,to->src_ts) % to->unit_size == 0);
147
148        playout_step = ts_abs_diff(from->playoutpt, to->playoutpt) *
149                from->unit_size / ts_abs_diff(from->src_ts, to->src_ts);
150
151#ifdef DEBUG_PLAYOUT
152        if (playout_step > 4*from->unit_size) {
153                debug_msg("playout_step is large %d.\n",
154                        playout_step);
155        }
156#endif
157
158        last = from;
159        while(last->src_ts + last->unit_size != to->src_ts &&
160              units_made < MAX_FILLIN_UNITS) {
161                curr = new_rx_unit();
162                PB_GROW(buf);
163                buf->age ++;
164                curr->src_ts    = last->src_ts    + last->unit_size;
165                curr->playoutpt = last->playoutpt + playout_step;
166                curr->unit_size = last->unit_size;
167                curr->cc_pt     = last->cc_pt;
168
169                curr->dbe_source_count = last->dbe_source_count;
170                memcpy(curr->dbe_source,
171                       last->dbe_source,
172                       curr->dbe_source_count * sizeof(struct s_rtcp_dbentry*));
173
174                curr->next_ptr = to;
175                to->prev_ptr   = curr;
176                curr->prev_ptr = last;
177                last->next_ptr = curr;
178               
179                last = curr;
180                units_made++;
181        }
182
183        assert(units_made>0);
184#ifdef DEBUG_PLAYOUT
185        debug_msg("Allocated %d new units with separation %d\n",
186                units_made,
187                playout_step);
188#endif /* DEBUG_PLAYOUT */
189        return units_made;
190}
191
192#ifdef DEBUG_PLAYOUT
193static void
194verify_playout_buffer(ppb_t* buf)
195{
196        rx_queue_element_struct *el;
197        u_int32 src_diff, playout_diff, buf_len = 0;
198
199        el = buf->head_ptr;
200        while( el && el->next_ptr ) {
201                if (ts_gt(el->src_ts, el->next_ptr->src_ts)) {
202                        src_diff = ts_abs_diff( el->next_ptr->src_ts,
203                                                el->src_ts );
204                        debug_msg( "src_ts jump %08u.\n",
205                                 src_diff );
206                }
207                if (ts_gt(el->playoutpt, el->next_ptr->playoutpt)) {
208                        playout_diff = ts_abs_diff( el->next_ptr->playoutpt,
209                                                    el->playoutpt );
210                        debug_msg( "out of order playout units by %08u.\n",
211                                 playout_diff );
212                }
213                el = el->next_ptr;
214                buf_len++;
215        }
216
217        if (buf->len != buf_len + 1) {
218                debug_msg("Buffer length estimate is wrong (%d %d)!\n",
219                        buf->len,
220                        buf_len + 1);
221        }
222}
223#endif /* DEBUG_PLAYOUT */
224
225static rx_queue_element_struct *
226playout_buffer_add(ppb_t *buf, rx_queue_element_struct *ru)
227{
228        rx_queue_element_struct *ip;
229
230        if ((ip = add_or_get_interval(buf, ru)) != ru) {
231                add_unit_to_interval(ip, ru);
232        }
233
234        assert(ip != NULL);
235
236        /* If there's a gap between us and the units around us, because of
237         * loss or mis-ordering, fill in the units so that channel decoder
238         * does not get out of sync.
239         */
240        if (ip->next_ptr != NULL &&
241            ts_abs_diff(ip->src_ts, ip->next_ptr->src_ts) != ip->unit_size &&
242            ip->next_ptr->talk_spurt_start == FALSE) {
243                fillin_playout_buffer(buf, ip, ip->next_ptr);
244        }
245
246        if (ip->prev_ptr != NULL &&
247            ts_abs_diff(ip->src_ts, ip->prev_ptr->src_ts) != ip->unit_size &&
248            ip->prev_ptr->talk_spurt_start == FALSE) {
249                fillin_playout_buffer(buf, ip->prev_ptr, ip);
250        }
251
252        /* If playout point has been adjusted due to loss, late arrivals,
253         * or start of new talkspurt, shift any overlapping units to keep
254         * channel decoder in sync.
255         */
256        while(ip &&
257              ip->prev_ptr &&
258              ts_gt(ip->prev_ptr->playoutpt, ip->playoutpt)) {
259#ifdef DEBUG_PLAYOUT
260                debug_msg("Shifting unit from %ld to %ld\n",
261                        ip->prev_ptr->playoutpt,
262                        ip->playoutpt);
263#endif
264                ip->prev_ptr->playoutpt = ip->playoutpt;
265        }
266
267#ifdef DEBUG_PLAYOUT
268        verify_playout_buffer(buf);
269#endif /* DEBUG_PLAYOUT */
270
271        return (ru);
272}
273
274static rx_queue_element_struct *
275playout_buffer_get(session_struct *sp, ppb_t *buf, u_int32 from, u_int32 to)
276{
277        rx_queue_element_struct *ip;
278
279        if (buf->last_got == NULL) {
280                ip = buf->head_ptr;
281        } else {
282                ip = buf->last_got->next_ptr;
283        }
284        while (ip && ts_gt(from, ip->playoutpt)) {
285                buf->last_got = ip;
286                ip = ip->next_ptr;
287        }
288
289        if (ip) {
290                if (ts_gt(ip->playoutpt, to)) {
291                        return (NULL);
292                }
293                buf->last_got = ip;
294                channel_decode(sp, ip);
295                decode_unit(ip);
296        }
297        return (ip);
298}
299
300static void
301playout_buffer_destroy(session_struct *sp, ppb_t **list, ppb_t *buf)
302{
303        ppb_t *pb, *lpb;
304        rx_queue_element_struct *nrx;
305
306        ui_info_deactivate(buf->src);
307
308        debug_msg("Destroying playout buffer\n");
309
310        while (buf->head_ptr) {
311                nrx = buf->head_ptr->next_ptr;
312                free_rx_unit(&buf->head_ptr);
313                buf->head_ptr = nrx;
314        }
315       
316        pb  = *list;
317        lpb = NULL;
318        while(pb) {
319                if (buf == pb) {
320                        pb = buf->next;
321                        block_free(buf, sizeof(ppb_t));
322                        if (buf == *list) {
323                                *list = pb;
324                                break;
325                        } else {
326                                lpb->next = pb;
327                                break;
328                        }
329                }
330                lpb = pb;
331                pb = pb->next;
332        }
333
334        if (*list == NULL && sp->echo_was_sending && sp->echo_suppress) {
335                debug_msg("Echo suppressor unmuting (%d).\n", sp->echo_was_sending);
336                if (sp->echo_was_sending) {
337                        tx_start(sp);
338                }
339                sp->echo_was_sending = FALSE;
340        }
341}
342
343#define HISTORY_LEN     60      /* ms */
344#define SUPPRESS_LEN   200      /* ms */
345
346static void
347clear_old_participant_history(session_struct *sp, ppb_t *buf)
348{
349        rx_queue_element_struct *ip;
350        u_int32 cutoff, cur_time, adj;
351
352        cur_time = get_time(buf->src->clock);
353        if (sp->echo_suppress) {
354                adj = (get_freq(buf->src->clock)/1000) * SUPPRESS_LEN;
355        } else {
356                adj = (get_freq(buf->src->clock)/1000) * HISTORY_LEN;
357        }
358        cutoff = cur_time - adj;
359        assert(cutoff!=cur_time);
360
361        while (buf->head_ptr && ts_gt(cutoff, buf->head_ptr->playoutpt)) {
362                ip = buf->head_ptr;
363                buf->head_ptr = ip->next_ptr;
364
365                if (buf->last_got == ip) {
366                        buf->last_got = NULL;
367                }
368                free_rx_unit(&ip);
369                PB_SHRINK(buf);
370        }
371
372        if (buf->head_ptr) {
373                buf->head_ptr->prev_ptr = NULL;
374        } else {
375                buf->tail_ptr = NULL;
376                playout_buffer_destroy(sp, &sp->playout_buf_list, buf);
377        }
378}
379
380int 
381playout_buffer_exists (ppb_t *list, rtcp_dbentry *src)
382{
383        ppb_t *p;
384
385        p = list;
386        while(p != NULL) {
387                if (p->src == src) {
388                        return TRUE;
389                }
390                p = p->next;
391        }
392       
393        return FALSE;
394}
395
396int32
397playout_buffer_duration (ppb_t *list, rtcp_dbentry *src)
398{
399        ppb_t *p;
400
401        p = list;
402        while(p != NULL) {
403                if (p->src == src && p->last_got) {
404                        return (p->tail_ptr->playoutpt - p->last_got->playoutpt) * 1000 / get_freq(p->src->clock);
405                }
406                p = p->next;
407        }
408        return 0;
409}
410
411void
412playout_buffers_destroy(session_struct *sp, ppb_t **list)
413{
414        ppb_t *p;
415        while((p = *list)) {
416                playout_buffer_destroy(sp, list, p);
417        }
418}
419
420static ppb_t *
421find_participant_queue(session_struct *sp, ppb_t **list, rtcp_dbentry *src, int dev_pt, int src_pt, struct s_pcm_converter *pc)
422{
423        ppb_t *p;
424        codec_t *cp_dev, *cp_src;
425
426        for (p = *list; p; p = p->next) {
427                if (p->src == src)
428                        return (p);
429        }
430
431        /* Echo suppression */
432        if (*list == NULL && sp->echo_suppress) {
433                /* We are going to create a playout buffer so mute mike */
434                debug_msg("Echo suppressor muting.\n");
435                sp->echo_was_sending = sp->sending_audio;
436                if (sp->sending_audio) {
437                        tx_stop(sp);
438                }
439        }
440
441        p = (ppb_t*)block_alloc(sizeof(ppb_t));
442        memset(p, 0, sizeof(ppb_t));
443        p->src           = src;
444        p->creation_time = get_time(src->clock);
445        p->next          = *list;
446        *list            = p;
447       
448        ui_info_activate(src);
449       
450        cp_dev = get_codec_by_pt(dev_pt);
451        cp_src = get_codec_by_pt(src_pt);
452        assert(cp_dev);
453        assert(cp_src);
454        if (!codec_compatible(cp_dev,cp_src)) {
455                if (src->converter) converter_destroy(&src->converter);
456                assert(src->converter == NULL);
457                src->converter = converter_create(pc,
458                                                  cp_src->channels,
459                                                  cp_src->freq,
460                                                  cp_dev->channels,
461                                                  cp_dev->freq);
462        }
463
464        return (p);
465}
466
467void
468playout_buffer_remove(session_struct *sp, ppb_t **list, rtcp_dbentry *src)
469{
470        /* We don't need to free "src", that's done elsewhere... [csp] */
471        ppb_t *curr, *tmp;
472
473        if (src->converter) converter_destroy(&src->converter);
474
475        assert(list != NULL);
476
477        curr = *list;
478        while(curr) {
479                tmp = curr->next;
480                if (curr->src == src) {
481                        playout_buffer_destroy(sp, list, curr);
482                }
483                curr = tmp;
484        }
485}
486
487static void
488fix_first_playout_error(u_int32 now, rx_queue_element_struct *up)
489{
490        u_int32 error;
491        struct  s_rtcp_dbentry *ssrc;
492
493        error = now - up->playoutpt;
494        ssrc = up->dbe_source[0];
495       
496        ssrc->playout += error;
497
498        debug_msg("error = %ld\n", error);
499
500        while(up) {
501                if (up->dbe_source[0] == ssrc) {
502                        up->playoutpt += error;
503                }
504                up = up->next_ptr;
505        }
506}
507
508#define PLAYOUT_SAFETY 5
509
510void 
511playout_buffers_process(session_struct *sp, rx_queue_struct *receive_queue, ppb_t **buf_list, struct s_mix_info *ms)
512{
513        /* There is a nasty race condition in this function, which people
514         * should be aware of: when a participant leaves a session (either
515         * via a timeout or reception of an RTCP BYE packet), the RTCP
516         * database entries for that source are removed. However, there may
517         * still be some packets left for that source in the receiver
518         * pipeline. This means that the up->dbe_source[] and buf->src
519         * fields (and possibly some others) in the code below may be
520         * dangling pointers... Now, this should never happen, since the
521         * code removing a participant cleans up the receiver mess, right?
522         * Well, we hope anyway...... At least one bug has been caused by
523         * this already (a crash just after receiving a BYE packet from a
524         * participant who is sending). Don't say I didn't warn you! [csp]
525         */
526        rx_queue_element_struct *up;
527        ppb_t                   *buf;
528        u_int32                 cur_time, cs, cu;
529       
530        cs = cu = 0;
531       
532        while (receive_queue->queue_empty == FALSE) {
533                up       = get_unit_off_rx_queue(receive_queue);
534                buf      = find_participant_queue(sp, buf_list, up->dbe_source[0], sp->encodings[0], up->dbe_source[0]->enc, sp->converter);
535                cur_time = get_time(buf->src->clock);
536               
537                /* This is to compensate for clock drift.
538                 * Same check should be made in case it is too early.
539                 */
540               
541                if (ts_gt(cur_time, up->playoutpt)) {
542                        if (cur_time == buf->creation_time) {
543                                /* It is silly to throw first packet away */
544                                fix_first_playout_error(cur_time, up);
545                        } else {
546                                up->dbe_source[0]->jit_TOGed++;
547                                up->dbe_source[0]->cont_toged++;
548                                debug_msg("cont_toged %d\n",
549                                          up->dbe_source[0]->cont_toged);
550                        }
551                } else {
552                        up->dbe_source[0]->cont_toged = 0;
553                }
554#ifdef DEBUG_PLAYOUT
555                verify_playout_buffer(buf);
556#endif
557                up = playout_buffer_add(buf, up);
558#ifdef DEBUG_PLAYOUT
559                verify_playout_buffer(buf);
560#endif
561                /*
562                 * If we have already worked past this point then mix it!
563                 */
564               
565                if (up && buf->last_got && up->mixed == FALSE
566                    && ts_gt(buf->last_got->playoutpt, up->playoutpt)
567                    && ts_gt(up->playoutpt, cur_time)){
568                        debug_msg("Mixing late audio\n");
569                        channel_decode(sp, up);
570                        decode_unit(up);
571                        if (up->native_count) {
572                                mix_do_one_chunk(sp, ms, up);
573                        }
574                }
575        }
576       
577        /* If sp->cushion is NULL, it probably means we don't have access to the audio device... */
578        if (sp->cushion != NULL) {
579                cs = cushion_get_size(sp->cushion);
580                cu = cushion_get_step(sp->cushion);
581        }
582
583        for (buf = *buf_list; buf; buf = buf->next) {
584                cur_time = get_time(buf->src->clock);
585#ifdef DEBUG_PLAYOUT_BROKEN
586                {
587                        static struct timeval last_foo;
588                        struct timeval foo;
589                        gettimeofday(&foo, NULL);
590                        debug_msg("%08ld: playout range: %ld - %ld\n\tbuffer playout range %ld - %ld\n\tbuffer ts range %ld - %ld\n",
591                                  (foo.tv_sec  - last_foo.tv_sec) * 1000 +
592                                  (foo.tv_usec - last_foo.tv_usec)/1000,
593                                  cur_time,
594                                  cur_time + cs,
595                                  buf->head_ptr->playoutpt,
596                                  buf->tail_ptr->playoutpt,
597                                  buf->head_ptr->src_ts,
598                                  buf->tail_ptr->src_ts
599                                );
600                        memcpy(&last_foo, &foo, sizeof(struct timeval));
601                }
602#endif /* DEBUG_PLAYOUT_BROKEN */
603                if ((buf->tail_ptr->playoutpt - cur_time) < 5*cs/4) {
604                        debug_msg("Less audio buffered (%ld) than cushion safety (%ld)!\n",
605                                  buf->tail_ptr->playoutpt - cur_time,
606                                  5 * cs / 4);
607                        buf->src->playout_danger = TRUE;
608                }
609
610                while ((up = playout_buffer_get(sp, buf, cur_time, cur_time + cs))) {
611                        if (!up->comp_count  && sp->repair != REPAIR_NONE
612                            && up->prev_ptr != NULL && up->next_ptr != NULL
613                            && up->prev_ptr->native_count)
614                        repair(sp->repair, up);
615#ifdef DEBUG_PLAYOUT
616                        if (up->prev_ptr) {
617                                u_int32 src_diff = ts_abs_diff(up->prev_ptr->src_ts,up->src_ts);
618                            if (src_diff != up->unit_size) {
619                                    debug_msg("src_ts jump %08d\n",src_diff);
620                            }
621                        }
622#endif /* DEBUG_PLAYOUT */
623                       
624                        if (up->native_count && up->mixed == FALSE) {
625                                mix_do_one_chunk(sp, ms, up);
626                        } else {
627                                if (up->native_count) {
628                                        debug_msg("already mixed\n");
629                                } else {
630                                        if (up->comp_count) {
631                                                debug_msg("Not decoded ?\n");
632                                        } else {
633                                                assert(up->comp_data[0].data == NULL);
634                                                debug_msg("No data for block, buf len %ld, cushion size %ld\n",
635                                                          playout_buffer_duration(buf, buf->src),
636                                                          cs);
637                                        }
638                                }
639                        }
640                }
641                clear_old_participant_history(sp, buf);
642        }
643}
Note: See TracBrowser for help on using the browser.