root/rat/trunk/source.c @ 2930

Revision 2930, 50.3 KB (checked in by ucacoxh, 14 years ago)

- Added emergency buffer adaptation. When playing classical music
(for instance) there are not enough suitable repeated segments which
means if skew is sufficient we can still accumulate audio. Now there
is panic point, beyond which we arbitrarily drop the odd half frame
instead of the whizzy pattern matching stuff.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*
2 * FILE:      source.c
3 * AUTHOR(S): Orion Hodson
4 *
5 * Layering support added by Tristan Henderson.
6 *     
7 * $Revision$
8 * $Date$
9 *
10 * Copyright (c) 1999 University College London
11 * All rights reserved.
12 *
13 */
14
15#include "config_unix.h"
16#include "config_win32.h"
17
18#include "ts.h"
19#include "playout.h"
20#include "channel.h"
21#include "channel_types.h"
22#include "codec_types.h"
23#include "codec.h"
24#include "codec_state.h"
25#include "converter.h"
26#include "audio_util.h"
27#include "render_3D.h"
28#include "repair.h"
29#include "timers.h"
30#include "ts.h"
31#include "channel_types.h"
32#include "pdb.h"
33#include "pktbuf.h"
34#include "source.h"
35#include "debug.h"
36#include "util.h"
37#include "net_udp.h"
38#include "mix.h"
39#include "rtp.h"
40#include "playout_calc.h"
41#include "ui.h"
42#include "session.h"
43#include "auddev.h"
44
45#define SKEW_ADAPT_THRESHOLD       5000
46#define SOURCE_YOUNG_AGE             20
47#define SOURCE_AUDIO_HISTORY_MS    1000
48#define NO_CONT_TOGED_FOR_PLAYOUT_RECALC 2
49
50#define SOURCE_COMPARE_WINDOW_SIZE 8
51/* Match threshold is mean abs diff. lower score gives less noise, but less  */
52/* adaption..., might be better if threshold adapted with how much extra     */
53/* data we have buffered...                                                  */
54#define MATCH_THRESHOLD 1200
55
56/* constants for skew adjustment:
57 SOURCE_SKEW_SLOW - denotes source clock appears slower than ours.
58 SOURCE_SKEW_FAST - denotes source clock appears faster than ours.
59*/
60typedef enum { SOURCE_SKEW_SLOW, SOURCE_SKEW_FAST, SOURCE_SKEW_NONE } skew_t;
61
62typedef struct s_source {
63        struct s_source            *next;
64        struct s_source            *prev;
65        pdb_entry_t                *pdbe; /* persistent database entry */
66        u_int32                     age;
67        ts_t                        last_played;
68        ts_t                        last_repair;
69        u_int16                     consec_lost;
70        u_int32                     mean_energy;
71        ts_sequencer                seq;
72        struct s_pktbuf            *pktbuf;
73        u_int32                     packets_done;
74        struct s_channel_state     *channel_state;
75        struct s_codec_state_store *codec_states;
76        struct s_pb                *channel;
77        struct s_pb                *media;
78        struct s_pb_iterator       *media_pos;
79        struct s_converter         *converter;
80        /* Fine grained playout buffer adjustment variables.  Used in        */
81        /* attempts to correct for clock skew between source and local host. */
82        skew_t                      skew;
83        ts_t                        skew_adjust;
84        /* b/w estimation variables                                          */
85        u_int32                     byte_count;
86        ts_t                        byte_count_start;
87        double                      bps;
88} source;
89
90/* A linked list is used for sources and this is fine since we mostly expect */
91/* 1 or 2 sources to be simultaneously active and so efficiency is not a     */
92/* killer.                                                                   */
93
94typedef struct s_source_list {
95        source  sentinel;
96        u_int16 nsrcs;
97} source_list;
98
99/*****************************************************************************/
100/* Source List functions.  Source List is used as a container for sources    */
101/*****************************************************************************/
102
103int
104source_list_create(source_list **pplist)
105{
106        source_list *plist = (source_list*)xmalloc(sizeof(source_list));
107        if (plist != NULL) {
108                *pplist = plist;
109                plist->sentinel.next = &plist->sentinel;
110                plist->sentinel.prev = &plist->sentinel;
111                plist->nsrcs = 0;
112                return TRUE;
113        }
114        return FALSE;
115}
116
117void
118source_list_clear(source_list *plist)
119{
120       assert(plist != NULL);
121       
122       while(plist->sentinel.next != &plist->sentinel) {
123               source_remove(plist, plist->sentinel.next);
124       }
125}
126
127void
128source_list_destroy(source_list **pplist)
129{
130        source_list *plist = *pplist;
131        source_list_clear(plist);
132        assert(plist->nsrcs == 0);
133        xfree(plist);
134        *pplist = NULL;
135}
136
137u_int32
138source_list_source_count(source_list *plist)
139{
140        return plist->nsrcs;
141}
142
143source*
144source_list_get_source_no(source_list *plist, u_int32 n)
145{
146        source *curr;
147
148        assert(plist != NULL);
149
150        if (n < plist->nsrcs) {
151                curr = plist->sentinel.next;
152                while(n != 0) {
153                        curr = curr->next;
154                        n--;
155                }
156                return curr;
157        }
158        return NULL;
159}
160
161source*
162source_get_by_ssrc(source_list *plist, u_int32 ssrc)
163{
164        source *curr, *stop;
165       
166        curr = plist->sentinel.next;
167        stop = &plist->sentinel;
168        while(curr != stop) {
169                if (curr->pdbe->ssrc == ssrc) {
170                        return curr;
171                }
172                curr = curr->next;
173        }
174 
175        return NULL;
176}
177
178/*****************************************************************************/
179/* Timestamp constants and initialization                                    */
180/*****************************************************************************/
181
182static ts_t zero_ts;        /* No time at all :-)                            */
183static ts_t keep_source_ts; /* How long source kept after source goes quiet  */
184static ts_t history_ts;     /* How much old audio hang onto for repair usage */
185static ts_t bw_avg_period;  /* Average period for bandwidth estimate         */
186static ts_t skew_thresh;    /* Significant size b4 consider playout adapt    */
187static ts_t skew_limit;     /* Upper bound, otherwise clock reset.           */
188static int  time_constants_inited = FALSE;
189
190
191static void
192time_constants_init()
193{
194        /* We use these time constants *all* the time.   Initialize once     */
195        zero_ts        = ts_map32(8000, 0);
196        keep_source_ts = ts_map32(8000, 2000);
197        history_ts     = ts_map32(8000, 1000);
198        bw_avg_period  = ts_map32(8000, 8000);
199        skew_thresh    = ts_map32(8000, 160);
200        skew_limit     = ts_map32(8000, 4000);
201        time_constants_inited = TRUE;
202}
203
204/*****************************************************************************/
205/* Source functions.  A source is an active audio source.                    */
206/*****************************************************************************/
207
208source*
209source_create(source_list    *plist,
210              u_int32         ssrc,
211              pdb_t          *pdb)
212{
213        source *psrc;
214        int     success;
215
216        assert(plist != NULL);
217        assert(source_get_by_ssrc(plist, ssrc) == NULL);
218
219        /* Time constant initialization. Nothing to do with source creation  */
220        /* just has to go somewhere before sources might be active, here it  */
221        /* definitely is!                                                    */
222        if (time_constants_inited == FALSE) {
223                time_constants_init();
224        }
225
226        /* On with the show...                                               */
227        psrc = (source*)block_alloc(sizeof(source));
228        if (psrc == NULL) {
229                return NULL;
230        }
231        memset(psrc, 0, sizeof(source));
232
233        if (pdb_item_get(pdb, ssrc, &psrc->pdbe) == FALSE) {
234                debug_msg("Persistent database item not found\n");
235                abort();
236        }
237
238        psrc->pdbe->first_mix  = 1; /* Used to note nothing mixed anything   */
239        psrc->pdbe->cont_toged = 0; /* Reset continuous thrown on ground cnt */
240        psrc->channel_state    = NULL;       
241        psrc->skew             = SOURCE_SKEW_NONE;
242
243        /* Allocate channel and media buffers                                */
244        success = pb_create(&psrc->channel,
245                            (playoutfreeproc)channel_data_destroy);
246        if (!success) {
247                debug_msg("Failed to allocate channel buffer\n");
248                goto fail_create_channel;
249        }
250
251        success = pb_create(&psrc->media, (playoutfreeproc)media_data_destroy);
252        if (!success) {
253                debug_msg("Failed to allocate media buffer\n");
254                goto fail_create_media;
255        }
256
257        success = pb_iterator_create(psrc->media, &psrc->media_pos);
258        if (!success) {
259                debug_msg("Failed to attach iterator to media buffer\n");
260                goto fail_create_iterator;
261        }
262
263        success = codec_state_store_create(&psrc->codec_states, DECODER);
264        if (!success) {
265                debug_msg("Failed to allocate codec state storage\n");
266                goto fail_create_states;
267        }
268
269        success = pktbuf_create(&psrc->pktbuf, 4);
270        if (!success) {
271                debug_msg("Failed to allocate packet buffer\n");
272                goto fail_pktbuf;
273        }
274
275        /* List maintenance    */
276        psrc->next = plist->sentinel.next;
277        psrc->prev = &plist->sentinel;
278        psrc->next->prev = psrc;
279        psrc->prev->next = psrc;
280        plist->nsrcs++;
281
282        debug_msg("Created source decode path\n");
283
284        return psrc;
285
286        /* Failure fall throughs */
287fail_pktbuf:
288        codec_state_store_destroy(&psrc->codec_states);
289fail_create_states:
290        pb_iterator_destroy(psrc->media, &psrc->media_pos);       
291fail_create_iterator:
292        pb_destroy(&psrc->media);
293fail_create_media:
294        pb_destroy(&psrc->channel);
295fail_create_channel:
296        block_free(psrc, sizeof(source));
297
298        return NULL;
299}
300
301/* All sources need to be reconfigured when anything changes in
302 * audio path.  These include change of device frequency, change of
303 * the number of channels, etc..
304 */
305
306void
307source_reconfigure(source        *src,
308                   converter_id_t conv_id,
309                   int            render_3d,
310                   u_int16        out_rate,
311                   u_int16        out_channels)
312{
313        u_int16    src_rate, src_channels;
314        codec_id_t            src_cid;
315        const codec_format_t *src_cf;
316
317        assert(src->pdbe != NULL);
318
319        /* Set age to zero and flush existing media
320         * so that repair mechanism does not attempt
321         * to patch across different block sizes.
322         */
323
324        src->age = 0;
325        pb_flush(src->media);
326
327        /* Get rate and channels of incoming media so we know
328         * what we have to change.
329         */
330        src_cid = codec_get_by_payload(src->pdbe->enc);
331        src_cf  = codec_get_format(src_cid);
332        src_rate     = (u_int16)src_cf->format.sample_rate;
333        src_channels = (u_int16)src_cf->format.channels;
334
335        if (render_3d) {
336                assert(out_channels == 2);
337                /* Rejig 3d renderer if there, else create */
338                if (src->pdbe->render_3D_data) {
339                        int azi3d, fil3d, len3d;
340                        render_3D_get_parameters(src->pdbe->render_3D_data,
341                                                 &azi3d,
342                                                 &fil3d,
343                                                 &len3d);
344                        render_3D_set_parameters(src->pdbe->render_3D_data,
345                                                 (int)src_rate,
346                                                 azi3d,
347                                                 fil3d,
348                                                 len3d);
349                } else {
350                        src->pdbe->render_3D_data = render_3D_init((int)src_rate);
351                }
352                assert(src->pdbe->render_3D_data);
353                /* Render 3d is before sample rate/channel conversion, and   */
354                /* output 2 channels.                                        */
355                src_channels = 2;
356        } else {
357                /* Rendering is switched off so destroy info.                */
358                if (src->pdbe->render_3D_data != NULL) {
359                        render_3D_free(&src->pdbe->render_3D_data);
360                }
361        }
362
363        /* Now destroy converter if it is already there.                     */
364        if (src->converter) {
365                converter_destroy(&src->converter);
366        }
367
368        if (src_rate != out_rate || src_channels != out_channels) {
369                converter_fmt_t c;
370                c.src_freq      = src_rate;
371                c.from_channels = src_channels;
372                c.dst_freq      = out_rate;
373                c.to_channels   = out_channels;
374                converter_create(conv_id, &c, &src->converter);
375        }
376        src->byte_count = 0;
377        src->bps        = 0.0;
378}
379
380void
381source_remove(source_list *plist, source *psrc)
382{
383        assert(plist);
384        assert(psrc);
385        assert(source_get_by_ssrc(plist, psrc->pdbe->ssrc) != NULL);
386
387        psrc->next->prev = psrc->prev;
388        psrc->prev->next = psrc->next;
389
390        if (psrc->channel_state) {
391                channel_decoder_destroy(&psrc->channel_state);
392        }
393
394        if (psrc->converter) {
395                converter_destroy(&psrc->converter);
396        }
397
398        pb_iterator_destroy(psrc->media, &psrc->media_pos);
399        pb_destroy(&psrc->channel);
400        pb_destroy(&psrc->media);
401        codec_state_store_destroy(&psrc->codec_states);
402        pktbuf_destroy(&psrc->pktbuf);
403        plist->nsrcs--;
404
405        debug_msg("Destroying source decode path\n");
406       
407        block_free(psrc, sizeof(source));
408
409        assert(source_get_by_ssrc(plist, psrc->pdbe->ssrc) == NULL);
410}
411             
412/* Source Processing Routines ************************************************/
413
414/* Returns true if fn takes ownership responsibility for data */
415static int
416source_process_packet (source *src,
417                       u_char *pckt,
418                       u_int32 pckt_len,
419                       u_int8  payload,
420                       ts_t    playout)
421{
422        channel_data *cd;
423        channel_unit *cu;
424        cc_id_t       cid;
425        u_int8        clayers;
426
427        assert(src != NULL);
428        assert(pckt != NULL);
429
430        /* Need to check:
431         * (i) if layering is enabled
432         * (ii) if channel_data exists for this playout point (if pb_iterator_get_at...)
433         * Then need to:
434         * (i) create cd if doesn't exist
435         * (ii) add packet to cd->elem[layer]
436         * We work out layer number by deducting the base port
437         * no from the port no this packet came from
438         * But what if layering on one port?
439         */
440
441        /* Or we could:
442         * (i) check if cd exists for this playout point
443         * (ii) if so, memcmp() to see if this packet already exists (ugh!)
444         */
445
446        cid = channel_coder_get_by_payload(payload);
447        clayers = channel_coder_get_layers(cid);
448        if (clayers > 1) {
449                struct s_pb_iterator *pi;
450                u_int8 i;
451                u_int32 clen;
452                int dup;
453                ts_t lplayout;
454                pb_iterator_create(src->channel, &pi);
455                while(pb_iterator_advance(pi)) {
456                        pb_iterator_get_at(pi, (u_char**)&cd, &clen, &lplayout);
457                       /* if lplayout==playout there is already channel_data for this playout point */
458                        if(!ts_eq(playout, lplayout)) {
459                                continue;
460                        }
461                        pb_iterator_detach_at(pi, (u_char**)&cd, &clen, &lplayout);
462                        assert(cd->nelem >= 1);
463
464                       /* if this channel_data is full, this new packet must *
465                        * be a duplicate, so we don't need to check          */
466                        if(cd->nelem >= clayers) {
467                                debug_msg("source_process_packet failed - duplicate layer\n");
468                                src->pdbe->duplicates++;
469                                pb_iterator_destroy(src->channel, &pi);
470                                goto done;
471                        }
472
473                        cu = (channel_unit*)block_alloc(sizeof(channel_unit));
474                        cu->data     = pckt;
475                        cu->data_len = pckt_len;
476                        cu->pt       = payload;
477
478                        dup = 0;
479
480                       /* compare existing channel_units to this one */
481                        for(i=0; i<cd->nelem; i++) {
482                                if(cu->data_len!=cd->elem[i]->data_len) break;
483                                /* This memcmp arbitrarily only checks
484                                 * 20 bytes, otherwise it takes too
485                                 * long */
486                                if (memcmp(cu->data, cd->elem[i]->data, 20) == 0) {
487                                        dup=1;
488                                }
489                        }
490
491                       /* duplicate, so stick the channel_data back on *
492                        * the playout buffer and swiftly depart        */
493                        if(dup) {
494                                debug_msg("source_process_packet failed - duplicate layer\n");
495                                src->pdbe->duplicates++;
496                                /* destroy temporary channel_unit */
497                                block_free(cu->data, cu->data_len);
498                                cu->data_len = 0;
499                                block_free(cu, sizeof(channel_unit));
500                                pb_iterator_destroy(src->channel, &pi);
501                                goto done;
502                        }
503
504                       /* add this layer if not a duplicate           *
505                        * NB: layers are not added in order, and thus *
506                        * have to be reorganised in the layered       *
507                        * channel coder                               */
508                        cd->elem[cd->nelem] = cu;
509                        cd->nelem++;
510                        pb_iterator_destroy(src->channel, &pi);
511                        goto done;
512                }
513                pb_iterator_destroy(src->channel, &pi);
514        }
515
516        if (channel_data_create(&cd, 1) == 0) {
517                return FALSE;
518        }
519       
520        cu               = cd->elem[0];
521        cu->data         = pckt;
522        cu->data_len     = pckt_len;
523        cu->pt           = payload;
524
525        /* Check we have state to decode this */
526        cid = channel_coder_get_by_payload(cu->pt);
527        if (src->channel_state &&
528            channel_decoder_matches(cid, src->channel_state) == FALSE) {
529                debug_msg("Channel coder changed - flushing\n");
530                channel_decoder_destroy(&src->channel_state);
531                pb_flush(src->channel);
532        }
533
534        /* Make state if not there and create decoder */
535        if (src->channel_state == NULL &&
536            channel_decoder_create(cid, &src->channel_state) == FALSE) {
537                debug_msg("Cannot decode payload %d\n", cu->pt);
538                channel_data_destroy(&cd, sizeof(channel_data));
539        }
540        src->age++;
541done:   
542        if (pb_add(src->channel, (u_char*)cd, sizeof(channel_data), playout) == FALSE) {
543                src->pdbe->duplicates++;
544                channel_data_destroy(&cd, sizeof(channel_data));
545        }
546
547        return TRUE;
548}
549
550static void
551source_process_packets(session_t *sp, source *src, ts_t now)
552{
553        ts_t    src_ts, playout, transit;
554        pdb_entry_t     *e;
555        rtp_packet      *p;
556        cc_id_t          ccid = -1;
557        u_int16          units_per_packet = -1;
558        u_int32          delta_ts, delta_seq;
559        u_char           codec_pt;
560        int              adjust_playout;
561
562        e = src->pdbe;
563        while(pktbuf_dequeue(src->pktbuf, &p)) {
564                adjust_playout = FALSE;
565
566                if (p->m) {
567                        adjust_playout = TRUE;
568                        debug_msg("New Talkspurt: %lu\n", p->ts);
569                }
570               
571                ccid = channel_coder_get_by_payload((u_char)p->pt);
572                if (channel_verify_and_stat(ccid, (u_char)p->pt,
573                                            p->data, p->data_len,
574                                            &units_per_packet, &codec_pt) == FALSE) {
575                        debug_msg("Packet discarded: packet failed channel verify.\n");
576                        xfree(p);
577                        continue;
578                }
579
580                if (e->channel_coder_id != ccid ||
581                    e->enc              != codec_pt ||
582                    e->units_per_packet != units_per_packet ||
583                    src->packets_done == 0) {
584                        /* Something has changed or is uninitialized...      */
585                        const codec_format_t *cf;
586                        const audio_format   *dev_fmt;
587                        codec_id_t           cid;
588                        u_int32              samples_per_frame;
589
590                        cid = codec_get_by_payload(codec_pt);
591                        cf  = codec_get_format(cid);
592                        /* Fix clock.                                        */
593                        change_freq(e->clock, cf->format.sample_rate);
594                        /* Fix details.                                      */
595                        e->enc              = codec_pt;
596                        e->units_per_packet = units_per_packet;
597                        e->channel_coder_id = ccid;       
598                        samples_per_frame   = codec_get_samples_per_frame(cid);
599                        debug_msg("Samples per frame %d rate %d\n", samples_per_frame, cf->format.sample_rate);
600                        e->inter_pkt_gap    = e->units_per_packet * (u_int16)samples_per_frame;
601                        e->frame_dur        = ts_map32(cf->format.sample_rate, samples_per_frame);
602
603                        debug_msg("Encoding change\n");
604                        /* Get string describing encoding.                   */
605                        channel_describe_data(ccid, codec_pt,
606                                              p->data, p->data_len,
607                                              e->enc_fmt, e->enc_fmt_len);
608                        if (sp->mbus_engine) {
609                                ui_update_stats(sp, e->ssrc);
610                        }
611                        /* Configure converter */
612                        dev_fmt = audio_get_ofmt(sp->audio_device);
613                        source_reconfigure(src,
614                                           sp->converter,
615                                           sp->render_3d,
616                                           (u_int16)dev_fmt->sample_rate,
617                                           (u_int16)dev_fmt->channels);
618                        adjust_playout      = TRUE;
619                }
620               
621                /* Check for talkspurt start indicated by change in          */
622                /* relationship between timestamps and sequence numbers.     */
623                delta_seq = p->seq - e->last_seq;
624                delta_ts  = p->ts  - e->last_ts;
625                if (delta_seq * e->inter_pkt_gap != delta_ts) {
626                        debug_msg("Seq no / timestamp realign (%lu * %lu != %lu)\n",
627                                  delta_seq, e->inter_pkt_gap, delta_ts);
628                        adjust_playout = TRUE;
629                }
630
631                if (ts_gt(e->jitter, e->playout)) {
632                        /* Network conditions have changed drastically.      */
633                        /* We are in the wrong ball park change immediately. */
634                        adjust_playout = TRUE;
635                }
636
637                /* Check for continuous number of packets being discarded.   */
638                /* This happens when jitter or transit estimate is no longer */
639                /* consistent with the real world.                           */
640                if (e->cont_toged >= NO_CONT_TOGED_FOR_PLAYOUT_RECALC) {
641                        adjust_playout = TRUE;
642                        e->cont_toged  = 0;
643                } else if (e->cont_toged != 0) {
644                        debug_msg("cont_toged %d\n", e->cont_toged);
645                }
646
647                /* Calculate the playout point for this packet.              */
648                src_ts = ts_seq32_in(&src->seq, get_freq(e->clock), p->ts);
649
650                /* Transit delay is the difference between our local clock   */
651                /* and the packet timestamp (src_ts).  Note: we expect       */
652                /* packet clumping at talkspurt start because of VAD's       */
653                /* fetching previous X seconds of audio on signal detection  */
654                /* in order to send unvoiced audio at start.                 */
655                if (adjust_playout && pktbuf_get_count(src->pktbuf)) {
656                        rtp_packet *p;
657                        ts_t        last_ts;
658                        pktbuf_peak_last(src->pktbuf, &p);
659                        assert(p != NULL);
660                        last_ts = ts_seq32_in(&src->seq, get_freq(e->clock), p->ts);
661                        transit = ts_sub(sp->cur_ts, last_ts);
662                        debug_msg("Used transit of last packet\n");
663                } else {
664                        transit = ts_sub(sp->cur_ts, src_ts);
665                }
666
667                playout = playout_calc(sp, e->ssrc, transit, adjust_playout);
668                if ((p->m || src->packets_done == 0) && ts_gt(playout, e->frame_dur)) {
669                        /* Packets are likely to be compressed at talkspurt start */
670                        /* because of VAD going back and grabbing frames.         */
671                        playout = ts_sub(playout, e->frame_dur);
672                        debug_msg("New ts shift XXX\n");
673                }
674                playout = ts_add(e->transit, playout);
675                playout = ts_add(src_ts, playout);
676
677                e->last_transit = transit;
678
679                /* If last_played is valid then enough audio is buffer for   */
680                /* the playout check to be sensible.                         */
681                if (ts_valid(src->last_played) &&
682                    ts_gt(src->last_played, playout)) {
683                        debug_msg("Packet late (%u > %u)- discarding\n",
684                                  src->last_played.ticks,
685                                  playout.ticks);
686                        src->pdbe->cont_toged++;
687                        src->pdbe->jit_toged++;
688                        xfree(p);
689                        continue;
690                }
691
692                if (!ts_gt(now, playout)) {
693                        u_char  *u;
694                        u    = (u_char*)block_alloc(p->data_len);
695                        /* Would be great if memcpy occured after validation */
696                        /* in source_process_packet (or not at all)          */
697                        memcpy(u, p->data, p->data_len);
698                        if (source_process_packet(src, u, p->data_len, codec_pt, playout) == FALSE) {
699                                block_free(u, (int)p->data_len);
700                        }
701                        src->pdbe->cont_toged = 0;
702                } else {
703                        /* Packet being decoded is before start of current  */
704                        /* so there is now way it's audio will be played    */
705                        /* Playout recalculation gets triggered in          */
706                        /* rtp_callback if cont_toged hits a critical       */
707                        /* threshold.  It signifies current playout delay   */
708                        /* is inappropriate.                                */
709                        src->pdbe->cont_toged++;
710                        src->pdbe->jit_toged++;
711                }
712
713                /* Update persistent database fields.                        */
714                if (e->last_seq > p->seq) {
715                        e->misordered++;
716                }
717                e->last_seq = p->seq;
718                e->last_ts  = p->ts;
719                e->last_arr = sp->cur_ts;
720                src->packets_done++;
721                xfree(p);
722        }
723}
724
725int
726source_add_packet (source     *src,
727                   rtp_packet *pckt)
728{
729        src->byte_count += pckt->data_len;
730        return pktbuf_enqueue(src->pktbuf, pckt);
731}
732
733static void
734source_update_bps(source *src, ts_t now)
735{
736        ts_t delta;
737        if (!ts_valid(src->byte_count_start)) {
738                src->byte_count_start = now;
739                src->byte_count       = 0;
740                src->bps              = 0.0;
741                return;
742        }
743
744        delta = ts_sub(now, src->byte_count_start);
745       
746        if (ts_gt(delta, bw_avg_period)) {
747                double this_est;
748                this_est = 8.0 * src->byte_count * 1000.0/ ts_to_ms(delta);
749                if (src->bps == 0.0) {
750                        src->bps = this_est;
751                } else {
752                        src->bps += (this_est - src->bps)/2.0;
753                }
754                src->byte_count = 0;
755                src->byte_count_start = now;
756                debug_msg("bps %f\n", src->bps);
757        }
758}
759
760double 
761source_get_bps(source *src)
762{
763        return src->bps;
764}
765
766/* recommend_drop_dur does quick pattern match with audio that is about to   */
767/* be played i.e. first few samples to determine how much audio can be       */
768/* dropped with causing glitch.                                              */
769
770static ts_t
771recommend_drop_dur(media_data *md)
772{
773        u_int32 score, lowest_score, lowest_begin;
774        u_int16 rate, channels;
775        sample *buffer;
776        int i, j,samples;
777
778        i = md->nrep - 1;
779        while(i >= 0) {
780                if (codec_get_native_info(md->rep[i]->id, &rate, &channels)) {
781                        break;
782                }
783                i--;
784        }
785        assert(i != -1);
786       
787        buffer  = (sample*)md->rep[i]->data;
788        samples = md->rep[i]->data_len / (sizeof(sample) * channels);
789
790        i = 0;
791        j = samples / 16;
792        lowest_score = 0xffffffff;
793        lowest_begin = 0;
794        while (j < samples - SOURCE_COMPARE_WINDOW_SIZE) {
795                score = 0;
796                for (i = 0; i < SOURCE_COMPARE_WINDOW_SIZE; i++) {
797                        score += abs(buffer[i * channels] - buffer[(j+i) * channels]);
798                }
799                if (score <= lowest_score) {
800                        lowest_score = score;
801                        lowest_begin = j;
802                }
803                j++;
804        }
805
806        if (lowest_score/SOURCE_COMPARE_WINDOW_SIZE < MATCH_THRESHOLD) {
807                debug_msg("match score %d, drop dur %d\n", lowest_score/SOURCE_COMPARE_WINDOW_SIZE, lowest_begin);
808                return ts_map32(rate, lowest_begin);
809        } else {
810                debug_msg("Score %d > Thresh %d\n", lowest_score/SOURCE_COMPARE_WINDOW_SIZE, MATCH_THRESHOLD);
811                return zero_ts;
812        }
813}
814
815#define SOURCE_MERGE_LEN_SAMPLES 5
816
817static void
818conceal_dropped_samples(media_data *md, ts_t drop_dur)
819{
820        /* We are dropping drop_dur samples and want signal to be            */
821        /* continuous.  So we blend samples that would have been played if   */
822        /* they weren't dropped with where signal continues after the drop.  */
823        u_int32 drop_samples;
824        u_int16 rate, channels;
825        int32 tmp, a, b, i, merge_len;
826        sample *new_start, *old_start;
827
828        i = md->nrep - 1;
829        while(i >= 0) {
830                if (codec_get_native_info(md->rep[i]->id, &rate, &channels)) {
831                        break;
832                }
833                i--;
834        }
835
836        assert(i != -1);
837
838        drop_dur     = ts_convert(rate, drop_dur);
839        drop_samples = channels * drop_dur.ticks;
840       
841        /* new_start is what will be played by mixer */
842        new_start = (sample*)md->rep[i]->data + drop_samples;
843        old_start = (sample*)md->rep[i]->data;
844
845        merge_len = SOURCE_MERGE_LEN_SAMPLES * channels;
846        for (i = 0; i < merge_len; i++) {
847                a   = (merge_len - i) * old_start[i] / merge_len;
848                b   = i * new_start[i]/ merge_len;
849                tmp =  (sample)(a + b);
850                new_start[i] = (short)tmp;
851        }
852}
853
854/* source_check_buffering is supposed to check amount of audio buffered      */
855/* corresponds to what we expect from playout so we can think about skew     */
856/* adjustment.                                                               */
857
858int
859source_check_buffering(source *src)
860{
861        ts_t actual, desired, diff;
862
863        if (src->age < SOURCE_YOUNG_AGE) {
864                /* If the source is new(ish) then not enough audio will be   */
865                /* in the playout buffer because it hasn't arrived yet.      */
866                return FALSE;
867        }
868
869        actual  = source_get_audio_buffered(src);
870        desired = source_get_playout_delay(src);
871        diff    = ts_abs_diff(actual, desired);
872
873        if (ts_gt(diff, skew_thresh)) {
874                src->skew_adjust = diff;
875                if (ts_gt(actual, desired)) {
876                        /* We're accumulating audio, their clock faster   */
877                        src->skew = SOURCE_SKEW_FAST;
878                } else {
879                        /* We're short of audio, so their clock is slower */
880                        src->skew = SOURCE_SKEW_SLOW;
881                }
882                return TRUE;
883        }
884        src->skew = SOURCE_SKEW_NONE;
885        return FALSE;
886}
887
888/* source_skew_adapt exists to shift playout units if source clock appears   */
889/* to be fast or slow.  The media_data unit is here so that it can be        */
890/* examined to see if it is low energy and adjustment would be okay.  Might  */
891/* want to be more sophisticated and put a silence detector in rather than   */
892/* static threshold.                                                         */
893/*                                                                           */
894/* Returns what adaption type occurred.                                      */
895
896static skew_t
897source_skew_adapt(source *src, media_data *md, ts_t playout)
898{
899        u_int32 i, e = 0, samples = 0;
900        u_int16 rate, channels;
901        ts_t adjustment;
902
903        assert(src);
904        assert(md);
905        assert(src->skew != SOURCE_SKEW_NONE);
906
907        for(i = 0; i < md->nrep; i++) {
908                if (codec_get_native_info(md->rep[i]->id, &rate, &channels)) {
909                        samples = md->rep[i]->data_len / (channels * sizeof(sample));
910                        e = avg_audio_energy((sample*)md->rep[i]->data, samples * channels, channels);
911                        src->mean_energy = (15 * src->mean_energy + e)/16;
912                        break;
913                }
914        }
915
916        if (i == md->nrep) {
917                /* don't adapt if unit has not been decoded (error) or       */
918                /* signal has too much energy                                */
919                return SOURCE_SKEW_NONE;
920        }
921
922        /* When we are making the adjustment we must shift playout buffers   */
923        /* and timestamps that the source decode process uses.  Must be      */
924        /* careful with last repair because it is not valid if no repair has */
925        /* taken place.                                                      */
926
927        if (src->skew == SOURCE_SKEW_FAST/* &&
928                (2*e <=  src->mean_energy || e < 200) */) {
929                /* source is fast so we need to bring units forward.
930                 * Should only move forward at most a single unit
931                 * otherwise we might discard something we have not
932                 * classified.  */
933
934                if (ts_gt(skew_limit, src->skew_adjust)) {
935                        adjustment = recommend_drop_dur(md);
936                } else {
937                        /* Things are really skewed.  We're more than        */
938                        /* skew_limit off of where we ought to be.  Just     */
939                        /* drop a frame and don't worry.                     */
940                        debug_msg("Dropping Frame\n");
941                        adjustment = ts_div(src->pdbe->frame_dur, 2);
942                }
943
944                if (ts_gt(adjustment, src->skew_adjust) || adjustment.ticks == 0) {
945                        /* adjustment needed is greater than adjustment      */
946                        /* period that best matches dropable by signal       */
947                        /* matching.                                         */
948                        return SOURCE_SKEW_NONE;
949                }
950                debug_msg("dropping %d / %d samples\n", adjustment.ticks, src->skew_adjust.ticks);
951                pb_shift_forward(src->media,   adjustment);
952                pb_shift_forward(src->channel, adjustment);
953
954                src->pdbe->transit      = ts_sub(src->pdbe->transit, adjustment);
955                /* avg_transit and last_transit are fine.  Difference in     */
956                /* avg_transit and transit triggered this adjustment.        */
957
958                if (ts_valid(src->last_repair)) {
959                        src->last_repair = ts_sub(src->last_repair, adjustment);
960                }
961
962                if (ts_valid(src->last_played)) {
963                        src->last_played = ts_sub(src->last_played, adjustment);
964                }
965
966                /* Remove skew adjustment from estimate of skew outstanding */
967                if (ts_gt(src->skew_adjust, adjustment)) {
968                        src->skew_adjust = ts_sub(src->skew_adjust, adjustment);
969                } else {
970                        src->skew = SOURCE_SKEW_NONE;
971                }
972
973                conceal_dropped_samples(md, adjustment);
974
975                return SOURCE_SKEW_FAST;
976        } else if (src->skew == SOURCE_SKEW_SLOW) {
977                adjustment = ts_map32(rate, samples);
978                if (ts_gt(src->skew_adjust, adjustment)) {
979                        adjustment = ts_map32(rate, samples);
980                }
981                pb_shift_units_back_after(src->media,   playout, adjustment);
982                pb_shift_units_back_after(src->channel, playout, adjustment);
983                src->pdbe->transit = ts_add(src->pdbe->transit, adjustment);
984
985                if (ts_gt(adjustment, src->skew_adjust)) {
986                        src->skew_adjust = zero_ts;
987                } else {
988                        src->skew_adjust = ts_sub(src->skew_adjust, adjustment);
989                }
990
991/* shouldn't have to make this adjustment since we are now adjusting
992 * units in future only.
993                src->last_played = ts_add(src->last_played, adjustment);
994                if (ts_valid(src->last_repair)) {
995                        src->last_repair = ts_add(src->last_repair, adjustment);
996                }
997                */
998                debug_msg("Playout buffer shift back %d (%d).\n", adjustment.ticks, src->last_played.ticks);
999                src->skew = SOURCE_SKEW_NONE;
1000                return SOURCE_SKEW_SLOW;
1001        }
1002
1003        return SOURCE_SKEW_NONE;
1004}
1005
1006static int
1007source_repair(source     *src,
1008              repair_id_t r,
1009              ts_t        step)
1010{
1011        media_data* fill_md, *prev_md;
1012        ts_t        fill_ts,  prev_ts;
1013        u_int32     success,  prev_len;
1014
1015        /* Check for need to reset of consec_lost count */
1016
1017        if (ts_valid(src->last_repair) == FALSE ||
1018            ts_eq(src->last_played, src->last_repair) == FALSE) {
1019                src->consec_lost = 0;
1020        }
1021
1022        /* We repair one unit at a time since it may be all we need */
1023        pb_iterator_retreat(src->media_pos);
1024        pb_iterator_get_at(src->media_pos,
1025                           (u_char**)&prev_md,
1026                           &prev_len,
1027                           &prev_ts);
1028
1029        assert(prev_md != NULL);
1030
1031        if (!ts_eq(prev_ts, src->last_played)) {
1032                debug_msg("prev_ts and last_played don't match\n");
1033                return FALSE;
1034        }
1035
1036        media_data_create(&fill_md, 1);
1037        repair(r,
1038               src->consec_lost,
1039               src->codec_states,
1040               prev_md,
1041               fill_md->rep[0]);
1042        fill_ts = ts_add(src->last_played, step);
1043        success = pb_add(src->media,
1044                         (u_char*)fill_md,
1045                         sizeof(media_data),
1046                         fill_ts);
1047        if (success) {
1048                src->consec_lost ++;
1049                src->last_repair = fill_ts;
1050                pb_iterator_advance(src->media_pos);
1051#ifndef NDEBUG
1052        /* Reusing prev_* - c'est mal, je sais */
1053                pb_iterator_get_at(src->media_pos,
1054                                   (u_char**)&prev_md,
1055                                   &prev_len,
1056                                   &prev_ts);
1057                if (ts_eq(prev_ts, fill_ts) == FALSE) {
1058                        debug_msg("Added at %d, but got %d when tried to get it back!\n", fill_ts.ticks, prev_ts.ticks);
1059                        return FALSE;
1060                }
1061#endif
1062        } else {
1063                /* This should only ever fail at when source changes
1064                 * sample rate in less time than playout buffer
1065                 * timeout.  This should be a very very rare event... 
1066                 */
1067                debug_msg("Repair add data failed (%d), last_played %d.\n", fill_ts.ticks, src->last_played.ticks);
1068                media_data_destroy(&fill_md, sizeof(media_data));
1069                src->consec_lost = 0;
1070                return FALSE;
1071        }
1072        return TRUE;
1073}
1074
1075int
1076source_process(session_t *sp,
1077               source            *src,
1078               struct s_mix_info *ms,
1079               int                render_3d,
1080               repair_id_t        repair_type,
1081               ts_t               start_ts,    /* Real-world time           */
1082               ts_t               end_ts)      /* Real-world time + cushion */
1083{
1084        media_data  *md;
1085        coded_unit  *cu;
1086        codec_state *cs;
1087        u_int32     md_len, src_freq;
1088        ts_t        playout, step, cutoff;
1089        int         i, success, hold_repair = 0;
1090
1091        /* Note: hold_repair is used to stop repair occuring.
1092         * Occasionally, there is a race condition when the playout
1093         * point is recalculated causing overlap, and when playout
1094         * buffer shift occurs in middle of a loss.
1095         */
1096       
1097        source_process_packets(sp, src, start_ts);
1098
1099        /* Split channel coder units up into media units */
1100        if (pb_node_count(src->channel)) {
1101                channel_decoder_decode(src->channel_state,
1102                                       src->channel,
1103                                       src->media,
1104                                       end_ts);
1105        }
1106
1107        src_freq = get_freq(src->pdbe->clock);
1108        step = ts_map32(src_freq, src->pdbe->inter_pkt_gap / src->pdbe->units_per_packet);
1109
1110        while (pb_iterator_advance(src->media_pos)) {
1111                pb_iterator_get_at(src->media_pos,
1112                                   (u_char**)&md,
1113                                   &md_len,
1114                                   &playout);
1115                assert(md != NULL);
1116                assert(md_len == sizeof(media_data));
1117               
1118                /* Conditions for repair:
1119                 * (a) last_played has meaning.
1120                 * (b) playout point does not what we expect.
1121                 * (c) repair type is not no repair.
1122                 * (d) last decoded was not too long ago.
1123                 */
1124                cutoff = ts_sub(end_ts, ts_map32(src_freq, SOURCE_AUDIO_HISTORY_MS));
1125
1126                assert((ts_valid(src->last_played) == FALSE) || ts_eq(playout, src->last_played) == FALSE);
1127
1128                if (ts_valid(src->last_played) &&
1129                    ts_gt(playout, ts_add(src->last_played, step)) &&
1130                    ts_gt(src->last_played, cutoff) &&
1131                    hold_repair == 0) {
1132                        /* If repair was successful media_pos is moved,
1133                         * so get data at media_pos again.
1134                         */
1135                        if (source_repair(src, repair_type, step) == FALSE) {
1136                                hold_repair += 2; /* 1 works, but 2 is probably better */
1137                        }
1138                        debug_msg("Repair\n");
1139                        success = pb_iterator_get_at(src->media_pos,
1140                                                     (u_char**)&md,
1141                                                     &md_len,
1142                                                     &playout);
1143                        assert(success);
1144                } else if (hold_repair > 0) {
1145                        hold_repair --;
1146                }
1147
1148                if (ts_gt(playout, end_ts)) {
1149                        /* This playout point is after now so stop */
1150                        pb_iterator_retreat(src->media_pos);
1151                        break;
1152                }
1153
1154                for(i = 0; i < md->nrep; i++) {
1155                        if (codec_is_native_coding(md->rep[i]->id)) {
1156                                break;
1157                        }
1158                }
1159
1160                if (i == md->nrep) {
1161                        /* We need to decode this unit, may not have to
1162                         * when repair has been used.
1163                         */
1164#ifdef DEBUG
1165                        for(i = 0; i < md->nrep; i++) {
1166                                /* if there is a native coding this
1167                                 * unit has already been decoded and
1168                                 * this would be bug */
1169                                assert(md->rep[i] != NULL);
1170                                assert(codec_id_is_valid(md->rep[i]->id));
1171                                assert(codec_is_native_coding(md->rep[i]->id) == FALSE);
1172                        }
1173#endif /* DEBUG */
1174                        cu = (coded_unit*)block_alloc(sizeof(coded_unit));
1175                        /* Decode frame */
1176                        assert(cu != NULL);
1177                        memset(cu, 0, sizeof(coded_unit));
1178                        cs = codec_state_store_get(src->codec_states, md->rep[0]->id);
1179                        codec_decode(cs, md->rep[0], cu);
1180                        assert(md->rep[md->nrep] == NULL);
1181                        md->rep[md->nrep] = cu;
1182                        md->nrep++;
1183                }
1184
1185                if (render_3d && src->pdbe->render_3D_data) {
1186                        /* 3d rendering necessary */
1187                        coded_unit *decoded, *render;
1188                        decoded = md->rep[md->nrep - 1];
1189                        assert(codec_is_native_coding(decoded->id));
1190                       
1191                        render = (coded_unit*)block_alloc(sizeof(coded_unit));
1192                        memset(render, 0, sizeof(coded_unit));
1193                       
1194                        render_3D(src->pdbe->render_3D_data,decoded,render);
1195                        assert(md->rep[md->nrep] == NULL);
1196                        md->rep[md->nrep] = render;
1197                        md->nrep++;
1198                }
1199
1200                if (src->converter) {
1201                        /* convert frame */
1202                        coded_unit *decoded, *render;
1203                        decoded = md->rep[md->nrep - 1];
1204                        assert(codec_is_native_coding(decoded->id));
1205
1206                        render = (coded_unit*)block_alloc(sizeof(coded_unit));
1207                        memset(render, 0, sizeof(coded_unit));
1208                        converter_process(src->converter,
1209                                          decoded,
1210                                          render);
1211                        assert(md->rep[md->nrep] == NULL);
1212                        md->rep[md->nrep] = render;
1213                        md->nrep++;
1214                }
1215
1216                if (src->skew != SOURCE_SKEW_NONE &&
1217                    source_skew_adapt(src, md, playout) != SOURCE_SKEW_NONE) {
1218                        /* We have skew and we have adjusted playout buffer  */
1219                        /* timestamps, so re-get unit to get correct         */
1220                        /* timestamp info.                                   */
1221                        pb_iterator_get_at(src->media_pos,
1222                                           (u_char**)&md,
1223                                           &md_len,
1224                                           &playout);
1225                        assert(md != NULL);
1226                        assert(md_len == sizeof(media_data));
1227                }
1228
1229                if (src->pdbe->gain != 1.0 && codec_is_native_coding(md->rep[md->nrep - 1]->id)) {
1230                        audio_scale_buffer((sample*)md->rep[md->nrep - 1]->data,
1231                                           md->rep[md->nrep - 1]->data_len / sizeof(sample),
1232                                           src->pdbe->gain);
1233                }
1234
1235                if (mix_process(ms, src->pdbe, md->rep[md->nrep - 1], playout) == FALSE) {
1236                        /* Sources sampling rate changed mid-flow? dump data */
1237                        /* make source look irrelevant, it should get        */
1238                        /* destroyed and the recreated with proper decode    */
1239                        /* path when new data arrives.  Not graceful..       */
1240                        /* A better way would be just to flush media then    */
1241                        /* invoke source_reconfigure if this is ever really  */
1242                        /* an issue.                                         */
1243                        pb_flush(src->media);
1244                        pb_flush(src->channel);
1245                }
1246
1247                src->last_played = playout;
1248        }
1249
1250        source_update_bps(src, start_ts);
1251
1252        UNUSED(i); /* Except for debugging */
1253       
1254        return TRUE;
1255}
1256
1257int
1258source_audit(source *src)
1259{
1260        if (src->age != 0) {
1261                /* Keep 1/8 seconds worth of audio */
1262                pb_iterator_audit(src->media_pos, history_ts);
1263                return TRUE;
1264        }
1265        return FALSE;
1266}
1267
1268ts_t
1269source_get_audio_buffered (source *src)
1270{
1271        /* Changes in avg_transit change amount of audio buffered. */
1272        ts_t delta;
1273        delta = ts_sub(src->pdbe->transit, src->pdbe->avg_transit);
1274        return ts_add(src->pdbe->playout, delta);
1275}
1276
1277ts_t
1278source_get_playout_delay (source *src)
1279{
1280        return src->pdbe->playout;
1281        /*  return ts_sub(src->pdbe->playout, src->pdbe->transit); */
1282}
1283
1284int
1285source_relevant(source *src, ts_t now)
1286{
1287        assert(src);
1288
1289        if (pb_relevant(src->media, now) || pb_relevant(src->channel, now) || src->age < 50) {
1290                return TRUE;
1291        } if (ts_valid(src->last_played)) {
1292                /* Source is quiescent                                     */
1293                ts_t quiet;       
1294                quiet = ts_sub(now, src->last_played);
1295                if (ts_gt(keep_source_ts, quiet)) {
1296                        return TRUE;
1297                }
1298        }
1299        return FALSE;
1300}
1301
1302struct s_pb*
1303source_get_decoded_buffer(source *src)
1304{
1305        return src->media;
1306}
1307
1308u_int32
1309source_get_ssrc(source *src)
1310{
1311        return src->pdbe->ssrc;
1312}
Note: See TracBrowser for help on using the browser.