root/rat/trunk/source.c @ 2916

Revision 2916, 49.0 KB (checked in by ucacoxh, 14 years ago)

- Use avg_transit if close to transit time of last packet. It's the average of

what we have been observing so a reasonable thing to use.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*
2 * FILE:      source.c
3 * AUTHOR(S): Orion Hodson
4 *
5 * Layering support added by Tristan Henderson.
6 *     
7 * $Revision$
8 * $Date$
9 *
10 * Copyright (c) 1999 University College London
11 * All rights reserved.
12 *
13 */
14
15#include "config_unix.h"
16#include "config_win32.h"
17
18#include "ts.h"
19#include "playout.h"
20#include "channel.h"
21#include "channel_types.h"
22#include "codec_types.h"
23#include "codec.h"
24#include "codec_state.h"
25#include "converter.h"
26#include "audio_util.h"
27#include "render_3D.h"
28#include "repair.h"
29#include "timers.h"
30#include "ts.h"
31#include "channel_types.h"
32#include "pdb.h"
33#include "pktbuf.h"
34#include "source.h"
35#include "debug.h"
36#include "util.h"
37#include "net_udp.h"
38#include "mix.h"
39#include "rtp.h"
40#include "playout_calc.h"
41#include "ui.h"
42#include "session.h"
43#include "auddev.h"
44
45#define SKEW_ADAPT_THRESHOLD       5000
46#define SOURCE_YOUNG_AGE             20
47#define SOURCE_AUDIO_HISTORY_MS    1000
48#define NO_CONT_TOGED_FOR_PLAYOUT_RECALC 4
49
50/* constants for skew adjustment:
51 SOURCE_SKEW_SLOW - denotes source clock appears slower than ours.
52 SOURCE_SKEW_FAST - denotes source clock appears faster than ours.
53*/
54typedef enum { SOURCE_SKEW_SLOW, SOURCE_SKEW_FAST, SOURCE_SKEW_NONE } skew_t;
55
56typedef struct s_source {
57        struct s_source            *next;
58        struct s_source            *prev;
59        pdb_entry_t                *pdbe; /* persistent database entry */
60        u_int32                     age;
61        ts_t                        last_played;
62        ts_t                        last_repair;
63        u_int16                     consec_lost;
64        u_int32                     mean_energy;
65        ts_sequencer                seq;
66        struct s_pktbuf            *pktbuf;
67        u_int32                     packets_done;
68        struct s_channel_state     *channel_state;
69        struct s_codec_state_store *codec_states;
70        struct s_pb                *channel;
71        struct s_pb                *media;
72        struct s_pb_iterator       *media_pos;
73        struct s_converter         *converter;
74        /* Fine grained playout buffer adjustment variables.  Used in        */
75        /* attempts to correct for clock skew between source and local host. */
76        skew_t                      skew;
77        ts_t                        skew_adjust;
78        /* b/w estimation variables                                          */
79        u_int32                     byte_count;
80        ts_t                        byte_count_start;
81        double                      bps;
82} source;
83
84/* A linked list is used for sources and this is fine since we mostly expect */
85/* 1 or 2 sources to be simultaneously active and so efficiency is not a     */
86/* killer.                                                                   */
87
88typedef struct s_source_list {
89        source  sentinel;
90        u_int16 nsrcs;
91} source_list;
92
93/*****************************************************************************/
94/* Source List functions.  Source List is used as a container for sources    */
95/*****************************************************************************/
96
97int
98source_list_create(source_list **pplist)
99{
100        source_list *plist = (source_list*)xmalloc(sizeof(source_list));
101        if (plist != NULL) {
102                *pplist = plist;
103                plist->sentinel.next = &plist->sentinel;
104                plist->sentinel.prev = &plist->sentinel;
105                plist->nsrcs = 0;
106                return TRUE;
107        }
108        return FALSE;
109}
110
111void
112source_list_clear(source_list *plist)
113{
114       assert(plist != NULL);
115       
116       while(plist->sentinel.next != &plist->sentinel) {
117               source_remove(plist, plist->sentinel.next);
118       }
119}
120
121void
122source_list_destroy(source_list **pplist)
123{
124        source_list *plist = *pplist;
125        source_list_clear(plist);
126        assert(plist->nsrcs == 0);
127        xfree(plist);
128        *pplist = NULL;
129}
130
131u_int32
132source_list_source_count(source_list *plist)
133{
134        return plist->nsrcs;
135}
136
137source*
138source_list_get_source_no(source_list *plist, u_int32 n)
139{
140        source *curr;
141
142        assert(plist != NULL);
143
144        if (n < plist->nsrcs) {
145                curr = plist->sentinel.next;
146                while(n != 0) {
147                        curr = curr->next;
148                        n--;
149                }
150                return curr;
151        }
152        return NULL;
153}
154
155source*
156source_get_by_ssrc(source_list *plist, u_int32 ssrc)
157{
158        source *curr, *stop;
159       
160        curr = plist->sentinel.next;
161        stop = &plist->sentinel;
162        while(curr != stop) {
163                if (curr->pdbe->ssrc == ssrc) {
164                        return curr;
165                }
166                curr = curr->next;
167        }
168 
169        return NULL;
170}
171
172/*****************************************************************************/
173/* Timestamp constants and initialization                                    */
174/*****************************************************************************/
175
176static ts_t zero_ts;        /* No time at all :-)                            */
177static ts_t keep_source_ts; /* How long source kept after source goes quiet  */
178static ts_t history_ts;     /* How much old audio hang onto for repair usage */
179static ts_t bw_avg_period;  /* Average period for bandwidth estimate         */
180static ts_t skew_thresh;    /* Significant size b4 consider playout adapt    */
181static ts_t skew_limit;     /* Upper bound, otherwise clock reset.           */
182static int  time_constants_inited = FALSE;
183
184
185static void
186time_constants_init()
187{
188        /* We use these time constants *all* the time.   Initialize once     */
189        zero_ts        = ts_map32(8000, 0);
190        keep_source_ts = ts_map32(8000, 2000);
191        history_ts     = ts_map32(8000, 1000);
192        bw_avg_period  = ts_map32(8000, 8000);
193        skew_thresh    = ts_map32(8000, 160);
194        skew_limit     = ts_map32(8000, 2000);
195        time_constants_inited = TRUE;
196}
197
198/*****************************************************************************/
199/* Source functions.  A source is an active audio source.                    */
200/*****************************************************************************/
201
202source*
203source_create(source_list    *plist,
204              u_int32         ssrc,
205              pdb_t          *pdb)
206{
207        source *psrc;
208        int     success;
209
210        assert(plist != NULL);
211        assert(source_get_by_ssrc(plist, ssrc) == NULL);
212
213        /* Time constant initialization. Nothing to do with source creation  */
214        /* just has to go somewhere before sources might be active, here it  */
215        /* definitely is!                                                    */
216        if (time_constants_inited == FALSE) {
217                time_constants_init();
218        }
219
220        /* On with the show...                                               */
221        psrc = (source*)block_alloc(sizeof(source));
222        if (psrc == NULL) {
223                return NULL;
224        }
225        memset(psrc, 0, sizeof(source));
226
227        if (pdb_item_get(pdb, ssrc, &psrc->pdbe) == FALSE) {
228                debug_msg("Persistent database item not found\n");
229                abort();
230        }
231
232        psrc->pdbe->first_mix  = 1; /* Used to note nothing mixed anything   */
233        psrc->pdbe->cont_toged = 0; /* Reset continuous thrown on ground cnt */
234        psrc->channel_state    = NULL;       
235        psrc->skew             = SOURCE_SKEW_NONE;
236
237        /* Allocate channel and media buffers                                */
238        success = pb_create(&psrc->channel,
239                            (playoutfreeproc)channel_data_destroy);
240        if (!success) {
241                debug_msg("Failed to allocate channel buffer\n");
242                goto fail_create_channel;
243        }
244
245        success = pb_create(&psrc->media, (playoutfreeproc)media_data_destroy);
246        if (!success) {
247                debug_msg("Failed to allocate media buffer\n");
248                goto fail_create_media;
249        }
250
251        success = pb_iterator_create(psrc->media, &psrc->media_pos);
252        if (!success) {
253                debug_msg("Failed to attach iterator to media buffer\n");
254                goto fail_create_iterator;
255        }
256
257        success = codec_state_store_create(&psrc->codec_states, DECODER);
258        if (!success) {
259                debug_msg("Failed to allocate codec state storage\n");
260                goto fail_create_states;
261        }
262
263        success = pktbuf_create(&psrc->pktbuf, 4);
264        if (!success) {
265                debug_msg("Failed to allocate packet buffer\n");
266                goto fail_pktbuf;
267        }
268
269        /* List maintenance    */
270        psrc->next = plist->sentinel.next;
271        psrc->prev = &plist->sentinel;
272        psrc->next->prev = psrc;
273        psrc->prev->next = psrc;
274        plist->nsrcs++;
275
276        debug_msg("Created source decode path\n");
277
278        return psrc;
279
280        /* Failure fall throughs */
281fail_pktbuf:
282        codec_state_store_destroy(&psrc->codec_states);
283fail_create_states:
284        pb_iterator_destroy(psrc->media, &psrc->media_pos);       
285fail_create_iterator:
286        pb_destroy(&psrc->media);
287fail_create_media:
288        pb_destroy(&psrc->channel);
289fail_create_channel:
290        block_free(psrc, sizeof(source));
291
292        return NULL;
293}
294
295/* All sources need to be reconfigured when anything changes in
296 * audio path.  These include change of device frequency, change of
297 * the number of channels, etc..
298 */
299
300void
301source_reconfigure(source        *src,
302                   converter_id_t conv_id,
303                   int            render_3d,
304                   u_int16        out_rate,
305                   u_int16        out_channels)
306{
307        u_int16    src_rate, src_channels;
308        codec_id_t            src_cid;
309        const codec_format_t *src_cf;
310
311        assert(src->pdbe != NULL);
312
313        /* Set age to zero and flush existing media
314         * so that repair mechanism does not attempt
315         * to patch across different block sizes.
316         */
317
318        src->age = 0;
319        pb_flush(src->media);
320
321        /* Get rate and channels of incoming media so we know
322         * what we have to change.
323         */
324        src_cid = codec_get_by_payload(src->pdbe->enc);
325        src_cf  = codec_get_format(src_cid);
326        src_rate     = (u_int16)src_cf->format.sample_rate;
327        src_channels = (u_int16)src_cf->format.channels;
328
329        if (render_3d) {
330                assert(out_channels == 2);
331                /* Rejig 3d renderer if there, else create */
332                if (src->pdbe->render_3D_data) {
333                        int azi3d, fil3d, len3d;
334                        render_3D_get_parameters(src->pdbe->render_3D_data,
335                                                 &azi3d,
336                                                 &fil3d,
337                                                 &len3d);
338                        render_3D_set_parameters(src->pdbe->render_3D_data,
339                                                 (int)src_rate,
340                                                 azi3d,
341                                                 fil3d,
342                                                 len3d);
343                } else {
344                        src->pdbe->render_3D_data = render_3D_init((int)src_rate);
345                }
346                assert(src->pdbe->render_3D_data);
347                /* Render 3d is before sample rate/channel conversion, and   */
348                /* output 2 channels.                                        */
349                src_channels = 2;
350        } else {
351                /* Rendering is switched off so destroy info.                */
352                if (src->pdbe->render_3D_data != NULL) {
353                        render_3D_free(&src->pdbe->render_3D_data);
354                }
355        }
356
357        /* Now destroy converter if it is already there.                     */
358        if (src->converter) {
359                converter_destroy(&src->converter);
360        }
361
362        if (src_rate != out_rate || src_channels != out_channels) {
363                converter_fmt_t c;
364                c.src_freq      = src_rate;
365                c.from_channels = src_channels;
366                c.dst_freq      = out_rate;
367                c.to_channels   = out_channels;
368                converter_create(conv_id, &c, &src->converter);
369        }
370        src->byte_count = 0;
371        src->bps        = 0.0;
372}
373
374void
375source_remove(source_list *plist, source *psrc)
376{
377        assert(plist);
378        assert(psrc);
379        assert(source_get_by_ssrc(plist, psrc->pdbe->ssrc) != NULL);
380
381        psrc->next->prev = psrc->prev;
382        psrc->prev->next = psrc->next;
383
384        if (psrc->channel_state) {
385                channel_decoder_destroy(&psrc->channel_state);
386        }
387
388        if (psrc->converter) {
389                converter_destroy(&psrc->converter);
390        }
391
392        pb_iterator_destroy(psrc->media, &psrc->media_pos);
393        pb_destroy(&psrc->channel);
394        pb_destroy(&psrc->media);
395        codec_state_store_destroy(&psrc->codec_states);
396        pktbuf_destroy(&psrc->pktbuf);
397        plist->nsrcs--;
398
399        debug_msg("Destroying source decode path\n");
400       
401        block_free(psrc, sizeof(source));
402
403        assert(source_get_by_ssrc(plist, psrc->pdbe->ssrc) == NULL);
404}
405             
406/* Source Processing Routines ************************************************/
407
408/* Returns true if fn takes ownership responsibility for data */
409static int
410source_process_packet (source *src,
411                       u_char *pckt,
412                       u_int32 pckt_len,
413                       u_int8  payload,
414                       ts_t    playout)
415{
416        channel_data *cd;
417        channel_unit *cu;
418        cc_id_t       cid;
419        u_int8        clayers;
420
421        assert(src != NULL);
422        assert(pckt != NULL);
423
424        /* Need to check:
425         * (i) if layering is enabled
426         * (ii) if channel_data exists for this playout point (if pb_iterator_get_at...)
427         * Then need to:
428         * (i) create cd if doesn't exist
429         * (ii) add packet to cd->elem[layer]
430         * We work out layer number by deducting the base port
431         * no from the port no this packet came from
432         * But what if layering on one port?
433         */
434
435        /* Or we could:
436         * (i) check if cd exists for this playout point
437         * (ii) if so, memcmp() to see if this packet already exists (ugh!)
438         */
439
440        cid = channel_coder_get_by_payload(payload);
441        clayers = channel_coder_get_layers(cid);
442        if (clayers > 1) {
443                struct s_pb_iterator *pi;
444                u_int8 i;
445                u_int32 clen;
446                int dup;
447                ts_t lplayout;
448                pb_iterator_create(src->channel, &pi);
449                while(pb_iterator_advance(pi)) {
450                        pb_iterator_get_at(pi, (u_char**)&cd, &clen, &lplayout);
451                       /* if lplayout==playout there is already channel_data for this playout point */
452                        if(!ts_eq(playout, lplayout)) {
453                                continue;
454                        }
455                        pb_iterator_detach_at(pi, (u_char**)&cd, &clen, &lplayout);
456                        assert(cd->nelem >= 1);
457
458                       /* if this channel_data is full, this new packet must *
459                        * be a duplicate, so we don't need to check          */
460                        if(cd->nelem >= clayers) {
461                                debug_msg("source_process_packet failed - duplicate layer\n");
462                                src->pdbe->duplicates++;
463                                pb_iterator_destroy(src->channel, &pi);
464                                goto done;
465                        }
466
467                        cu = (channel_unit*)block_alloc(sizeof(channel_unit));
468                        cu->data     = pckt;
469                        cu->data_len = pckt_len;
470                        cu->pt       = payload;
471
472                        dup = 0;
473
474                       /* compare existing channel_units to this one */
475                        for(i=0; i<cd->nelem; i++) {
476                                if(cu->data_len!=cd->elem[i]->data_len) break;
477                                /* This memcmp arbitrarily only checks
478                                 * 20 bytes, otherwise it takes too
479                                 * long */
480                                if (memcmp(cu->data, cd->elem[i]->data, 20) == 0) {
481                                        dup=1;
482                                }
483                        }
484
485                       /* duplicate, so stick the channel_data back on *
486                        * the playout buffer and swiftly depart        */
487                        if(dup) {
488                                debug_msg("source_process_packet failed - duplicate layer\n");
489                                src->pdbe->duplicates++;
490                                /* destroy temporary channel_unit */
491                                block_free(cu->data, cu->data_len);
492                                cu->data_len = 0;
493                                block_free(cu, sizeof(channel_unit));
494                                pb_iterator_destroy(src->channel, &pi);
495                                goto done;
496                        }
497
498                       /* add this layer if not a duplicate           *
499                        * NB: layers are not added in order, and thus *
500                        * have to be reorganised in the layered       *
501                        * channel coder                               */
502                        cd->elem[cd->nelem] = cu;
503                        cd->nelem++;
504                        pb_iterator_destroy(src->channel, &pi);
505                        goto done;
506                }
507                pb_iterator_destroy(src->channel, &pi);
508        }
509
510        if (channel_data_create(&cd, 1) == 0) {
511                return FALSE;
512        }
513       
514        cu               = cd->elem[0];
515        cu->data         = pckt;
516        cu->data_len     = pckt_len;
517        cu->pt           = payload;
518
519        /* Check we have state to decode this */
520        cid = channel_coder_get_by_payload(cu->pt);
521        if (src->channel_state &&
522            channel_decoder_matches(cid, src->channel_state) == FALSE) {
523                debug_msg("Channel coder changed - flushing\n");
524                channel_decoder_destroy(&src->channel_state);
525                pb_flush(src->channel);
526        }
527
528        /* Make state if not there and create decoder */
529        if (src->channel_state == NULL &&
530            channel_decoder_create(cid, &src->channel_state) == FALSE) {
531                debug_msg("Cannot decode payload %d\n", cu->pt);
532                channel_data_destroy(&cd, sizeof(channel_data));
533        }
534        src->age++;
535done:   
536        if (pb_add(src->channel, (u_char*)cd, sizeof(channel_data), playout) == FALSE) {
537                src->pdbe->duplicates++;
538                channel_data_destroy(&cd, sizeof(channel_data));
539        }
540
541        return TRUE;
542}
543
544static void
545source_process_packets(session_t *sp, source *src, ts_t now)
546{
547        ts_t    src_ts, playout, transit;
548        pdb_entry_t     *e;
549        rtp_packet      *p;
550        cc_id_t          ccid = -1;
551        u_int16          units_per_packet = -1;
552        u_int32          delta_ts, delta_seq;
553        u_char           codec_pt;
554        int              adjust_playout;
555
556        e = src->pdbe;
557        while(pktbuf_dequeue(src->pktbuf, &p)) {
558                adjust_playout = FALSE;
559
560                if (p->m) {
561                        adjust_playout = TRUE;
562                        debug_msg("New Talkspurt: %lu\n", p->ts);
563                }
564               
565                ccid = channel_coder_get_by_payload((u_char)p->pt);
566                if (channel_verify_and_stat(ccid, (u_char)p->pt,
567                                            p->data, p->data_len,
568                                            &units_per_packet, &codec_pt) == FALSE) {
569                        debug_msg("Packet discarded: packet failed channel verify.\n");
570                        xfree(p);
571                        continue;
572                }
573
574                if (e->channel_coder_id != ccid ||
575                    e->enc              != codec_pt ||
576                    e->units_per_packet != units_per_packet ||
577                    src->packets_done == 0) {
578                        /* Something has changed or is uninitialized...      */
579                        const codec_format_t *cf;
580                        const audio_format *dev_fmt;
581                        codec_id_t cid;
582
583                        cid = codec_get_by_payload(codec_pt);
584                        cf  = codec_get_format(cid);
585                        /* Fix clock.                                        */
586                        change_freq(e->clock, cf->format.sample_rate);
587                        /* Fix details.                                      */
588                        e->enc              = codec_pt;
589                        e->units_per_packet = units_per_packet;
590                        e->channel_coder_id = ccid;       
591                        e->inter_pkt_gap    = e->units_per_packet *
592                                (u_int16)codec_get_samples_per_frame(cid);
593                        debug_msg("Encoding change\n");
594                        /* Get string describing encoding.                   */
595                        channel_describe_data(ccid, codec_pt,
596                                              p->data, p->data_len,
597                                              e->enc_fmt, e->enc_fmt_len);
598                        if (sp->mbus_engine) {
599                                ui_update_stats(sp, e->ssrc);
600                        }
601                        /* Configure converter */
602                        dev_fmt = audio_get_ofmt(sp->audio_device);
603                        source_reconfigure(src,
604                                           sp->converter,
605                                           sp->render_3d,
606                                           (u_int16)dev_fmt->sample_rate,
607                                           (u_int16)dev_fmt->channels);
608                        adjust_playout      = TRUE;
609                }
610               
611                /* Check for talkspurt start indicated by change in          */
612                /* relationship between timestamps and sequence numbers.     */
613                delta_seq = p->seq - e->last_seq;
614                delta_ts  = p->ts  - e->last_ts;
615                if (delta_seq * e->inter_pkt_gap != delta_ts) {
616                        debug_msg("Seq no / timestamp realign (%lu * %lu != %lu)\n",
617                                  delta_seq, e->inter_pkt_gap, delta_ts);
618                        adjust_playout = TRUE;
619                }
620
621                /* Check for continuous number of packets being discarded.   */
622                /* This happens when jitter or transit estimate is no longer */
623                /* consistent with the real world.                           */
624                if (e->cont_toged == NO_CONT_TOGED_FOR_PLAYOUT_RECALC) {
625                        adjust_playout = TRUE;
626                        e->cont_toged  = 0;
627                } else if (e->cont_toged != 0) {
628                        debug_msg("cont_toged %d\n", e->cont_toged);
629                }
630
631                /* Calculate the playout point for this packet.              */
632                src_ts = ts_seq32_in(&src->seq, get_freq(e->clock), p->ts);
633
634                /* Transit delay is the difference between our local clock   */
635                /* and the packet timestamp (src_ts).  Note: we expect       */
636                /* packet clumping at talkspurt start because of VAD's       */
637                /* fetching previous X seconds of audio on signal detection  */
638                /* in order to send unvoiced audio at start.                 */
639                if (adjust_playout && pktbuf_get_count(src->pktbuf)) {
640                        rtp_packet *p;
641                        ts_t        last_ts;
642                        pktbuf_peak_last(src->pktbuf, &p);
643                        assert(p != NULL);
644                        last_ts = ts_seq32_in(&src->seq, get_freq(e->clock), p->ts);
645                        transit = ts_sub(sp->cur_ts, last_ts);
646                        debug_msg("Used transit of last packet\n");
647                } else {
648                        transit = ts_sub(sp->cur_ts, src_ts);
649                }
650
651                if (adjust_playout && src->packets_done &&
652                    ts_gt(ts_mul(src->pdbe->jitter, 3), ts_abs_diff(transit, src->pdbe->avg_transit))) {
653                        /* Use avg transit as it's close */
654                        transit = src->pdbe->avg_transit;
655                        debug_msg("transit %d  transit_avg %d\n", transit.ticks, src->pdbe->avg_transit.ticks);
656                }
657
658                playout = playout_calc(sp, e->ssrc, transit, adjust_playout);
659                playout = ts_add(e->transit, playout);
660                playout = ts_add(src_ts, playout);
661
662                e->last_transit = transit;
663
664                /* If last_played is valid then enough audio is buffer for   */
665                /* the playout check to be sensible.                         */
666                if (ts_valid(src->last_played) &&
667                    ts_gt(src->last_played, playout)) {
668                        debug_msg("Packet late (%u > %u)- discarding\n",
669                                  src->last_played.ticks,
670                                  playout.ticks);
671                        xfree(p);
672                        continue;
673                }
674
675                if (!ts_gt(now, playout)) {
676                        u_char  *u;
677                        u    = (u_char*)block_alloc(p->data_len);
678                        /* Would be great if memcpy occured after validation */
679                        /* in source_process_packet (or not at all)          */
680                        memcpy(u, p->data, p->data_len);
681                        if (source_process_packet(src, u, p->data_len, codec_pt, playout) == FALSE) {
682                                block_free(u, (int)p->data_len);
683                        }
684                        src->pdbe->cont_toged = 0;
685                } else {
686                        /* Packet being decoded is before start of current  */
687                        /* so there is now way it's audio will be played    */
688                        /* Playout recalculation gets triggered in          */
689                        /* rtp_callback if cont_toged hits a critical       */
690                        /* threshold.  It signifies current playout delay   */
691                        /* is inappropriate.                                */
692                        src->pdbe->cont_toged++;
693                        src->pdbe->jit_toged++;
694                }
695                xfree(p);
696
697                /* Update persistent database fields.                        */
698                if (e->last_seq > p->seq) {
699                        e->misordered++;
700                }
701                e->last_seq = p->seq;
702                e->last_ts  = p->ts;
703                e->last_arr = sp->cur_ts;
704                src->packets_done++;
705        }
706}
707
708int
709source_add_packet (source     *src,
710                   rtp_packet *pckt)
711{
712        return pktbuf_enqueue(src->pktbuf, pckt);
713
714        /* Update b/w estimate */
715/*
716        ts_t delta;
717        if (src->byte_count == 0) {
718                src->byte_count_start = playout;
719        }
720        src->byte_count += ((rtp_packet*)pckt)->data_len;
721        delta = ts_sub(playout, src->byte_count_start);
722       
723        if (ts_gt(delta, bw_avg_period)) {
724                double this_est;
725                this_est = 8.0 * src->byte_count * 1000.0/ ts_to_ms(delta);
726                if (src->bps == 0.0) {
727                        src->bps = this_est;
728                } else {
729                        src->bps += (this_est - src->bps)/8.0;
730                }
731                src->byte_count = 0;
732        }
733        */
734}
735
736double 
737source_get_bps(source *src)
738{
739        return src->bps;
740}
741
742/* recommend_drop_dur does quick pattern match with audio that is about to   */
743/* be played i.e. first few samples to determine how much audio can be       */
744/* dropped with causing glitch.                                              */
745
746#define SOURCE_COMPARE_WINDOW_SIZE 8
747/* Match threshold is mean abs diff. lower score gives less noise, but less  */
748/* adaption..., might be better if threshold adapted with how much extra     */
749/* data we have buffered...                                                  */
750#define MATCH_THRESHOLD 1000
751
752static ts_t
753recommend_drop_dur(media_data *md)
754{
755        u_int32 score, lowest_score, lowest_begin;
756        u_int16 rate, channels;
757        sample *buffer;
758        int i, j,samples;
759
760        i = md->nrep - 1;
761        while(i >= 0) {
762                if (codec_get_native_info(md->rep[i]->id, &rate, &channels)) {
763                        break;
764                }
765                i--;
766        }
767        assert(i != -1);
768       
769        buffer  = (sample*)md->rep[i]->data;
770        samples = md->rep[i]->data_len / (sizeof(sample) * channels);
771
772        i = 0;
773        j = samples / 16;
774        lowest_score = 0xffffffff;
775        lowest_begin = 0;
776        while (j < samples - SOURCE_COMPARE_WINDOW_SIZE) {
777                score = 0;
778                for (i = 0; i < SOURCE_COMPARE_WINDOW_SIZE; i++) {
779                        score += abs(buffer[i * channels] - buffer[(j+i) * channels]);
780                }
781                if (score <= lowest_score) {
782                        lowest_score = score;
783                        lowest_begin = j;
784                }
785                j++;
786        }
787
788        if (lowest_score/SOURCE_COMPARE_WINDOW_SIZE < MATCH_THRESHOLD) {
789                debug_msg("match score %d, drop dur %d\n", lowest_score/SOURCE_COMPARE_WINDOW_SIZE, lowest_begin);
790                return ts_map32(rate, lowest_begin);
791        } else {
792                return zero_ts;
793        }
794}
795
796#define SOURCE_MERGE_LEN_SAMPLES 5
797
798static void
799conceal_dropped_samples(media_data *md, ts_t drop_dur)
800{
801        /* We are dropping drop_dur samples and want signal to be            */
802        /* continuous.  So we blend samples that would have been played if   */
803        /* they weren't dropped with where signal continues after the drop.  */
804        u_int32 drop_samples;
805        u_int16 rate, channels;
806        int32 tmp, a, b, i, merge_len;
807        sample *new_start, *old_start;
808
809        i = md->nrep - 1;
810        while(i >= 0) {
811                if (codec_get_native_info(md->rep[i]->id, &rate, &channels)) {
812                        break;
813                }
814                i--;
815        }
816
817        assert(i != -1);
818
819        drop_dur     = ts_convert(rate, drop_dur);
820        drop_samples = channels * drop_dur.ticks;
821       
822        /* new_start is what will be played by mixer */
823        new_start = (sample*)md->rep[i]->data + drop_samples;
824        old_start = (sample*)md->rep[i]->data;
825
826        merge_len = SOURCE_MERGE_LEN_SAMPLES * channels;
827        for (i = 0; i < merge_len; i++) {
828                a   = (merge_len - i) * old_start[i] / merge_len;
829                b   = i * new_start[i]/ merge_len;
830                tmp =  (sample)(a + b);
831                new_start[i] = (short)tmp;
832        }
833}
834
835/* source_check_buffering is supposed to check amount of audio buffered      */
836/* corresponds to what we expect from playout so we can think about skew     */
837/* adjustment.                                                               */
838
839int
840source_check_buffering(source *src)
841{
842        ts_t actual, desired, diff;
843
844        if (src->age < SOURCE_YOUNG_AGE) {
845                /* If the source is new(ish) then not enough audio will be   */
846                /* in the playout buffer because it hasn't arrived yet.      */
847                return FALSE;
848        }
849
850        actual  = source_get_audio_buffered(src);
851        desired = source_get_playout_delay(src);
852        diff    = ts_abs_diff(actual, desired);
853
854        if (ts_gt(diff, skew_thresh) && ts_gt(skew_limit, diff)) {
855                src->skew_adjust = diff;
856                if (ts_gt(actual, desired)) {
857                        /* We're accumulating audio, their clock faster   */
858                        src->skew = SOURCE_SKEW_FAST;
859                } else {
860                        /* We're short of audio, so their clock is slower */
861                        src->skew = SOURCE_SKEW_SLOW;
862                }
863                return TRUE;
864        }
865        src->skew = SOURCE_SKEW_NONE;
866        return FALSE;
867}
868
869/* source_skew_adapt exists to shift playout units if source clock appears   */
870/* to be fast or slow.  The media_data unit is here so that it can be        */
871/* examined to see if it is low energy and adjustment would be okay.  Might  */
872/* want to be more sophisticated and put a silence detector in rather than   */
873/* static threshold.                                                         */
874/*                                                                           */
875/* Returns what adaption type occurred.                                      */
876
877static skew_t
878source_skew_adapt(source *src, media_data *md, ts_t playout)
879{
880        u_int32 i, e = 0, samples = 0;
881        u_int16 rate, channels;
882        ts_t adjustment;
883
884        assert(src);
885        assert(md);
886        assert(src->skew != SOURCE_SKEW_NONE);
887
888        for(i = 0; i < md->nrep; i++) {
889                if (codec_get_native_info(md->rep[i]->id, &rate, &channels)) {
890                        samples = md->rep[i]->data_len / (channels * sizeof(sample));
891                        e = avg_audio_energy((sample*)md->rep[i]->data, samples * channels, channels);
892                        src->mean_energy = (15 * src->mean_energy + e)/16;
893                        break;
894                }
895        }
896
897        if (i == md->nrep) {
898                /* don't adapt if unit has not been decoded (error) or       */
899                /* signal has too much energy                                */
900                return SOURCE_SKEW_NONE;
901        }
902
903        /* When we are making the adjustment we must shift playout buffers   */
904        /* and timestamps that the source decode process uses.  Must be      */
905        /* careful with last repair because it is not valid if no repair has */
906        /* taken place.                                                      */
907
908        if (src->skew == SOURCE_SKEW_FAST/* &&
909                (2*e <=  src->mean_energy || e < 200) */) {
910                /* source is fast so we need to bring units forward.
911                 * Should only move forward at most a single unit
912                 * otherwise we might discard something we have not
913                 * classified.  */
914
915                adjustment = recommend_drop_dur(md);
916                if (ts_gt(adjustment, src->skew_adjust) || adjustment.ticks == 0) {
917                        /* adjustment needed is greater than adjustment period
918                         * that best matches dropable by signal matching.
919                         */
920                        return SOURCE_SKEW_NONE;
921                }
922                debug_msg("dropping %d / %d samples\n", adjustment.ticks, src->skew_adjust.ticks);
923                pb_shift_forward(src->media,   adjustment);
924                pb_shift_forward(src->channel, adjustment);
925                src->pdbe->transit      = ts_sub(src->pdbe->transit,      adjustment);
926                src->pdbe->last_transit = ts_sub(src->pdbe->last_transit, adjustment);
927                src->pdbe->avg_transit  = ts_sub(src->pdbe->avg_transit,  adjustment);
928
929                if (ts_valid(src->last_repair)) {
930                        src->last_repair = ts_sub(src->last_repair, adjustment);
931                }
932
933                if (ts_valid(src->last_played)) {
934                        src->last_played = ts_sub(src->last_played, adjustment);
935                }
936
937                /* Remove skew adjustment from estimate of skew outstanding */
938                if (ts_gt(src->skew_adjust, adjustment)) {
939                        src->skew_adjust = ts_sub(src->skew_adjust, adjustment);
940                } else {
941                        src->skew = SOURCE_SKEW_NONE;
942                }
943
944                conceal_dropped_samples(md, adjustment);
945
946                return SOURCE_SKEW_FAST;
947        } else if (src->skew == SOURCE_SKEW_SLOW) {
948                adjustment = ts_map32(rate, samples);
949                if (ts_gt(src->skew_adjust, adjustment)) {
950                        adjustment = ts_map32(rate, samples);
951                }
952                pb_shift_units_back_after(src->media,   playout, adjustment);
953                pb_shift_units_back_after(src->channel, playout, adjustment);
954                src->pdbe->transit      = ts_add(src->pdbe->transit,      adjustment);
955                src->pdbe->last_transit = ts_add(src->pdbe->last_transit, adjustment);
956                src->pdbe->avg_transit  = ts_add(src->pdbe->avg_transit,  adjustment);
957
958                if (ts_gt(adjustment, src->skew_adjust)) {
959                        src->skew_adjust = zero_ts;
960                } else {
961                        src->skew_adjust = ts_sub(src->skew_adjust, adjustment);
962                }
963
964/* shouldn't have to make this adjustment since we are now adjusting
965 * units in future only.
966                src->last_played = ts_add(src->last_played, adjustment);
967                if (ts_valid(src->last_repair)) {
968                        src->last_repair = ts_add(src->last_repair, adjustment);
969                }
970                */
971                debug_msg("Playout buffer shift back %d (%d).\n", adjustment.ticks, src->last_played.ticks);
972                src->skew = SOURCE_SKEW_NONE;
973                return SOURCE_SKEW_SLOW;
974        }
975
976        return SOURCE_SKEW_NONE;
977}
978
979static int
980source_repair(source     *src,
981              repair_id_t r,
982              ts_t        step)
983{
984        media_data* fill_md, *prev_md;
985        ts_t        fill_ts,  prev_ts;
986        u_int32     success,  prev_len;
987
988        /* Check for need to reset of consec_lost count */
989
990        if (ts_valid(src->last_repair) == FALSE ||
991            ts_eq(src->last_played, src->last_repair) == FALSE) {
992                src->consec_lost = 0;
993        }
994
995        /* We repair one unit at a time since it may be all we need */
996        pb_iterator_retreat(src->media_pos);
997        pb_iterator_get_at(src->media_pos,
998                           (u_char**)&prev_md,
999                           &prev_len,
1000                           &prev_ts);
1001
1002        assert(prev_md != NULL);
1003
1004        if (!ts_eq(prev_ts, src->last_played)) {
1005                debug_msg("prev_ts and last_played don't match\n");
1006                return FALSE;
1007        }
1008
1009        media_data_create(&fill_md, 1);
1010        repair(r,
1011               src->consec_lost,
1012               src->codec_states,
1013               prev_md,
1014               fill_md->rep[0]);
1015        fill_ts = ts_add(src->last_played, step);
1016        success = pb_add(src->media,
1017                         (u_char*)fill_md,
1018                         sizeof(media_data),
1019                         fill_ts);
1020        if (success) {
1021                src->consec_lost ++;
1022                src->last_repair = fill_ts;
1023                pb_iterator_advance(src->media_pos);
1024#ifndef NDEBUG
1025        /* Reusing prev_* - c'est mal, je sais */
1026                pb_iterator_get_at(src->media_pos,
1027                                   (u_char**)&prev_md,
1028                                   &prev_len,
1029                                   &prev_ts);
1030                if (ts_eq(prev_ts, fill_ts) == FALSE) {
1031                        debug_msg("Added at %d, but got %d when tried to get it back!\n", fill_ts.ticks, prev_ts.ticks);
1032                        return FALSE;
1033                }
1034#endif
1035        } else {
1036                /* This should only ever fail at when source changes
1037                 * sample rate in less time than playout buffer
1038                 * timeout.  This should be a very very rare event... 
1039                 */
1040                debug_msg("Repair add data failed (%d), last_played %d.\n", fill_ts.ticks, src->last_played.ticks);
1041                media_data_destroy(&fill_md, sizeof(media_data));
1042                src->consec_lost = 0;
1043                return FALSE;
1044        }
1045        return TRUE;
1046}
1047
1048int
1049source_process(session_t *sp,
1050               source            *src,
1051               struct s_mix_info *ms,
1052               int                render_3d,
1053               repair_id_t        repair_type,
1054               ts_t               start_ts,    /* Real-world time           */
1055               ts_t               end_ts)      /* Real-world time + cushion */
1056{
1057        media_data  *md;
1058        coded_unit  *cu;
1059        codec_state *cs;
1060        u_int32     md_len, src_freq;
1061        ts_t        playout, step, cutoff;
1062        int         i, success, hold_repair = 0;
1063        int         new_source = !ts_valid(src->last_played);
1064
1065        /* Note: hold_repair is used to stop repair occuring.
1066         * Occasionally, there is a race condition when the playout
1067         * point is recalculated causing overlap, and when playout
1068         * buffer shift occurs in middle of a loss.
1069         */
1070       
1071        source_process_packets(sp, src, start_ts);
1072
1073        /* Split channel coder units up into media units */
1074        if (pb_node_count(src->channel)) {
1075                channel_decoder_decode(src->channel_state,
1076                                       src->channel,
1077                                       src->media,
1078                                       end_ts);
1079        }
1080
1081        src_freq = get_freq(src->pdbe->clock);
1082        step = ts_map32(src_freq, src->pdbe->inter_pkt_gap / src->pdbe->units_per_packet);
1083
1084        while (pb_iterator_advance(src->media_pos)) {
1085                pb_iterator_get_at(src->media_pos,
1086                                   (u_char**)&md,
1087                                   &md_len,
1088                                   &playout);
1089                assert(md != NULL);
1090                assert(md_len == sizeof(media_data));
1091               
1092                /* Conditions for repair:
1093                 * (a) last_played has meaning.
1094                 * (b) playout point does not what we expect.
1095                 * (c) repair type is not no repair.
1096                 * (d) last decoded was not too long ago.
1097                 */
1098                cutoff = ts_sub(end_ts, ts_map32(src_freq, SOURCE_AUDIO_HISTORY_MS));
1099
1100                assert((ts_valid(src->last_played) == FALSE) || ts_eq(playout, src->last_played) == FALSE);
1101
1102                if (ts_valid(src->last_played) &&
1103                    ts_gt(playout, ts_add(src->last_played, step)) &&
1104                    ts_gt(src->last_played, cutoff) &&
1105                    hold_repair == 0) {
1106                        /* If repair was successful media_pos is moved,
1107                         * so get data at media_pos again.
1108                         */
1109                        if (source_repair(src, repair_type, step) == FALSE) {
1110                                hold_repair += 2; /* 1 works, but 2 is probably better */
1111                        }
1112                        debug_msg("Repair\n");
1113                        success = pb_iterator_get_at(src->media_pos,
1114                                                     (u_char**)&md,
1115                                                     &md_len,
1116                                                     &playout);
1117                        assert(success);
1118                } else if (hold_repair > 0) {
1119                        hold_repair --;
1120                }
1121
1122                if (ts_gt(playout, end_ts)) {
1123                        /* This playout point is after now so stop */
1124                        pb_iterator_retreat(src->media_pos);
1125                        break;
1126                }
1127
1128                for(i = 0; i < md->nrep; i++) {
1129                        if (codec_is_native_coding(md->rep[i]->id)) {
1130                                break;
1131                        }
1132                }
1133
1134                if (i == md->nrep) {
1135                        /* We need to decode this unit, may not have to
1136                         * when repair has been used.
1137                         */
1138#ifdef DEBUG
1139                        for(i = 0; i < md->nrep; i++) {
1140                                /* if there is a native coding this
1141                                 * unit has already been decoded and
1142                                 * this would be bug */
1143                                assert(md->rep[i] != NULL);
1144                                assert(codec_id_is_valid(md->rep[i]->id));
1145                                assert(codec_is_native_coding(md->rep[i]->id) == FALSE);
1146                        }
1147#endif /* DEBUG */
1148                        cu = (coded_unit*)block_alloc(sizeof(coded_unit));
1149                        /* Decode frame */
1150                        assert(cu != NULL);
1151                        memset(cu, 0, sizeof(coded_unit));
1152                        cs = codec_state_store_get(src->codec_states, md->rep[0]->id);
1153                        codec_decode(cs, md->rep[0], cu);
1154                        assert(md->rep[md->nrep] == NULL);
1155                        md->rep[md->nrep] = cu;
1156                        md->nrep++;
1157                }
1158
1159                if (render_3d && src->pdbe->render_3D_data) {
1160                        /* 3d rendering necessary */
1161                        coded_unit *decoded, *render;
1162                        decoded = md->rep[md->nrep - 1];
1163                        assert(codec_is_native_coding(decoded->id));
1164                       
1165                        render = (coded_unit*)block_alloc(sizeof(coded_unit));
1166                        memset(render, 0, sizeof(coded_unit));
1167                       
1168                        render_3D(src->pdbe->render_3D_data,decoded,render);
1169                        assert(md->rep[md->nrep] == NULL);
1170                        md->rep[md->nrep] = render;
1171                        md->nrep++;
1172                }
1173
1174                if (src->converter) {
1175                        /* convert frame */
1176                        coded_unit *decoded, *render;
1177                        decoded = md->rep[md->nrep - 1];
1178                        assert(codec_is_native_coding(decoded->id));
1179
1180                        render = (coded_unit*)block_alloc(sizeof(coded_unit));
1181                        memset(render, 0, sizeof(coded_unit));
1182                        converter_process(src->converter,
1183                                          decoded,
1184                                          render);
1185                        assert(md->rep[md->nrep] == NULL);
1186                        md->rep[md->nrep] = render;
1187                        md->nrep++;
1188                }
1189
1190                if (src->skew != SOURCE_SKEW_NONE &&
1191                    source_skew_adapt(src, md, playout) != SOURCE_SKEW_NONE) {
1192                        /* We have skew and we have adjusted playout buffer  */
1193                        /* timestamps, so re-get unit to get correct         */
1194                        /* timestamp info.                                   */
1195                        pb_iterator_get_at(src->media_pos,
1196                                           (u_char**)&md,
1197                                           &md_len,
1198                                           &playout);
1199                        assert(md != NULL);
1200                        assert(md_len == sizeof(media_data));
1201                }
1202
1203                if (src->pdbe->gain != 1.0 && codec_is_native_coding(md->rep[md->nrep - 1]->id)) {
1204                        audio_scale_buffer((sample*)md->rep[md->nrep - 1]->data,
1205                                           md->rep[md->nrep - 1]->data_len / sizeof(sample),
1206                                           src->pdbe->gain);
1207                }
1208
1209                if (mix_process(ms, src->pdbe, md->rep[md->nrep - 1], playout) == FALSE) {
1210                        /* Sources sampling rate changed mid-flow? dump data */
1211                        /* make source look irrelevant, it should get        */
1212                        /* destroyed and the recreated with proper decode    */
1213                        /* path when new data arrives.  Not graceful..       */
1214                        /* A better way would be just to flush media then    */
1215                        /* invoke source_reconfigure if this is ever really  */
1216                        /* an issue.                                         */
1217                        pb_flush(src->media);
1218                        pb_flush(src->channel);
1219                }
1220
1221                src->last_played = playout;
1222        }
1223
1224        if (new_source) {
1225                debug_msg("New source has received %d packets\n", src->age);
1226        }
1227
1228        UNUSED(i); /* Except for debugging */
1229       
1230        return TRUE;
1231}
1232
1233int
1234source_audit(source *src)
1235{
1236        if (src->age != 0) {
1237                /* Keep 1/8 seconds worth of audio */
1238                pb_iterator_audit(src->media_pos, history_ts);
1239                return TRUE;
1240        }
1241        return FALSE;
1242}
1243
1244ts_t
1245source_get_audio_buffered (source *src)
1246{
1247        /* Changes in avg_transit change amount of audio buffered. */
1248        ts_t delta;
1249        delta = ts_sub(src->pdbe->transit, src->pdbe->avg_transit);
1250        return ts_add(src->pdbe->playout, delta);
1251}
1252
1253ts_t
1254source_get_playout_delay (source *src)
1255{
1256        return src->pdbe->playout;
1257        /*  return ts_sub(src->pdbe->playout, src->pdbe->transit); */
1258}
1259
1260int
1261source_relevant(source *src, ts_t now)
1262{
1263        assert(src);
1264
1265        if (src->age < 50) {
1266                return TRUE;
1267        }
1268       
1269        if (!ts_eq(source_get_audio_buffered(src), zero_ts) ||
1270                ts_gt(ts_add(src->pdbe->last_arr, keep_source_ts), now)) {
1271                return TRUE;
1272        }
1273       
1274        return pb_relevant(src->media, now) || pb_relevant(src->channel, now);
1275}
1276
1277struct s_pb*
1278source_get_decoded_buffer(source *src)
1279{
1280        return src->media;
1281}
1282
1283u_int32
1284source_get_ssrc(source *src)
1285{
1286        return src->pdbe->ssrc;
1287}
Note: See TracBrowser for help on using the browser.