root/rat/trunk/source.c @ 2927

Revision 2927, 49.7 KB (checked in by ucacoxh, 15 years ago)

- Oops. Was freeing packet 5 lines prematurely. Hazards of cutting a pasting.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*
2 * FILE:      source.c
3 * AUTHOR(S): Orion Hodson
4 *
5 * Layering support added by Tristan Henderson.
6 *     
7 * $Revision$
8 * $Date$
9 *
10 * Copyright (c) 1999 University College London
11 * All rights reserved.
12 *
13 */
14
15#include "config_unix.h"
16#include "config_win32.h"
17
18#include "ts.h"
19#include "playout.h"
20#include "channel.h"
21#include "channel_types.h"
22#include "codec_types.h"
23#include "codec.h"
24#include "codec_state.h"
25#include "converter.h"
26#include "audio_util.h"
27#include "render_3D.h"
28#include "repair.h"
29#include "timers.h"
30#include "ts.h"
31#include "channel_types.h"
32#include "pdb.h"
33#include "pktbuf.h"
34#include "source.h"
35#include "debug.h"
36#include "util.h"
37#include "net_udp.h"
38#include "mix.h"
39#include "rtp.h"
40#include "playout_calc.h"
41#include "ui.h"
42#include "session.h"
43#include "auddev.h"
44
45#define SKEW_ADAPT_THRESHOLD       5000
46#define SOURCE_YOUNG_AGE             20
47#define SOURCE_AUDIO_HISTORY_MS    1000
48#define NO_CONT_TOGED_FOR_PLAYOUT_RECALC 4
49
50/* constants for skew adjustment:
51 SOURCE_SKEW_SLOW - denotes source clock appears slower than ours.
52 SOURCE_SKEW_FAST - denotes source clock appears faster than ours.
53*/
54typedef enum { SOURCE_SKEW_SLOW, SOURCE_SKEW_FAST, SOURCE_SKEW_NONE } skew_t;
55
56typedef struct s_source {
57        struct s_source            *next;
58        struct s_source            *prev;
59        pdb_entry_t                *pdbe; /* persistent database entry */
60        u_int32                     age;
61        ts_t                        last_played;
62        ts_t                        last_repair;
63        u_int16                     consec_lost;
64        u_int32                     mean_energy;
65        ts_sequencer                seq;
66        struct s_pktbuf            *pktbuf;
67        u_int32                     packets_done;
68        struct s_channel_state     *channel_state;
69        struct s_codec_state_store *codec_states;
70        struct s_pb                *channel;
71        struct s_pb                *media;
72        struct s_pb_iterator       *media_pos;
73        struct s_converter         *converter;
74        /* Fine grained playout buffer adjustment variables.  Used in        */
75        /* attempts to correct for clock skew between source and local host. */
76        skew_t                      skew;
77        ts_t                        skew_adjust;
78        /* b/w estimation variables                                          */
79        u_int32                     byte_count;
80        ts_t                        byte_count_start;
81        double                      bps;
82} source;
83
84/* A linked list is used for sources and this is fine since we mostly expect */
85/* 1 or 2 sources to be simultaneously active and so efficiency is not a     */
86/* killer.                                                                   */
87
88typedef struct s_source_list {
89        source  sentinel;
90        u_int16 nsrcs;
91} source_list;
92
93/*****************************************************************************/
94/* Source List functions.  Source List is used as a container for sources    */
95/*****************************************************************************/
96
97int
98source_list_create(source_list **pplist)
99{
100        source_list *plist = (source_list*)xmalloc(sizeof(source_list));
101        if (plist != NULL) {
102                *pplist = plist;
103                plist->sentinel.next = &plist->sentinel;
104                plist->sentinel.prev = &plist->sentinel;
105                plist->nsrcs = 0;
106                return TRUE;
107        }
108        return FALSE;
109}
110
111void
112source_list_clear(source_list *plist)
113{
114       assert(plist != NULL);
115       
116       while(plist->sentinel.next != &plist->sentinel) {
117               source_remove(plist, plist->sentinel.next);
118       }
119}
120
121void
122source_list_destroy(source_list **pplist)
123{
124        source_list *plist = *pplist;
125        source_list_clear(plist);
126        assert(plist->nsrcs == 0);
127        xfree(plist);
128        *pplist = NULL;
129}
130
131u_int32
132source_list_source_count(source_list *plist)
133{
134        return plist->nsrcs;
135}
136
137source*
138source_list_get_source_no(source_list *plist, u_int32 n)
139{
140        source *curr;
141
142        assert(plist != NULL);
143
144        if (n < plist->nsrcs) {
145                curr = plist->sentinel.next;
146                while(n != 0) {
147                        curr = curr->next;
148                        n--;
149                }
150                return curr;
151        }
152        return NULL;
153}
154
155source*
156source_get_by_ssrc(source_list *plist, u_int32 ssrc)
157{
158        source *curr, *stop;
159       
160        curr = plist->sentinel.next;
161        stop = &plist->sentinel;
162        while(curr != stop) {
163                if (curr->pdbe->ssrc == ssrc) {
164                        return curr;
165                }
166                curr = curr->next;
167        }
168 
169        return NULL;
170}
171
172/*****************************************************************************/
173/* Timestamp constants and initialization                                    */
174/*****************************************************************************/
175
176static ts_t zero_ts;        /* No time at all :-)                            */
177static ts_t keep_source_ts; /* How long source kept after source goes quiet  */
178static ts_t history_ts;     /* How much old audio hang onto for repair usage */
179static ts_t bw_avg_period;  /* Average period for bandwidth estimate         */
180static ts_t skew_thresh;    /* Significant size b4 consider playout adapt    */
181static ts_t skew_limit;     /* Upper bound, otherwise clock reset.           */
182static int  time_constants_inited = FALSE;
183
184
185static void
186time_constants_init()
187{
188        /* We use these time constants *all* the time.   Initialize once     */
189        zero_ts        = ts_map32(8000, 0);
190        keep_source_ts = ts_map32(8000, 2000);
191        history_ts     = ts_map32(8000, 1000);
192        bw_avg_period  = ts_map32(8000, 8000);
193        skew_thresh    = ts_map32(8000, 160);
194        skew_limit     = ts_map32(8000, 2000);
195        time_constants_inited = TRUE;
196}
197
198/*****************************************************************************/
199/* Source functions.  A source is an active audio source.                    */
200/*****************************************************************************/
201
202source*
203source_create(source_list    *plist,
204              u_int32         ssrc,
205              pdb_t          *pdb)
206{
207        source *psrc;
208        int     success;
209
210        assert(plist != NULL);
211        assert(source_get_by_ssrc(plist, ssrc) == NULL);
212
213        /* Time constant initialization. Nothing to do with source creation  */
214        /* just has to go somewhere before sources might be active, here it  */
215        /* definitely is!                                                    */
216        if (time_constants_inited == FALSE) {
217                time_constants_init();
218        }
219
220        /* On with the show...                                               */
221        psrc = (source*)block_alloc(sizeof(source));
222        if (psrc == NULL) {
223                return NULL;
224        }
225        memset(psrc, 0, sizeof(source));
226
227        if (pdb_item_get(pdb, ssrc, &psrc->pdbe) == FALSE) {
228                debug_msg("Persistent database item not found\n");
229                abort();
230        }
231
232        psrc->pdbe->first_mix  = 1; /* Used to note nothing mixed anything   */
233        psrc->pdbe->cont_toged = 0; /* Reset continuous thrown on ground cnt */
234        psrc->channel_state    = NULL;       
235        psrc->skew             = SOURCE_SKEW_NONE;
236
237        /* Allocate channel and media buffers                                */
238        success = pb_create(&psrc->channel,
239                            (playoutfreeproc)channel_data_destroy);
240        if (!success) {
241                debug_msg("Failed to allocate channel buffer\n");
242                goto fail_create_channel;
243        }
244
245        success = pb_create(&psrc->media, (playoutfreeproc)media_data_destroy);
246        if (!success) {
247                debug_msg("Failed to allocate media buffer\n");
248                goto fail_create_media;
249        }
250
251        success = pb_iterator_create(psrc->media, &psrc->media_pos);
252        if (!success) {
253                debug_msg("Failed to attach iterator to media buffer\n");
254                goto fail_create_iterator;
255        }
256
257        success = codec_state_store_create(&psrc->codec_states, DECODER);
258        if (!success) {
259                debug_msg("Failed to allocate codec state storage\n");
260                goto fail_create_states;
261        }
262
263        success = pktbuf_create(&psrc->pktbuf, 4);
264        if (!success) {
265                debug_msg("Failed to allocate packet buffer\n");
266                goto fail_pktbuf;
267        }
268
269        /* List maintenance    */
270        psrc->next = plist->sentinel.next;
271        psrc->prev = &plist->sentinel;
272        psrc->next->prev = psrc;
273        psrc->prev->next = psrc;
274        plist->nsrcs++;
275
276        debug_msg("Created source decode path\n");
277
278        return psrc;
279
280        /* Failure fall throughs */
281fail_pktbuf:
282        codec_state_store_destroy(&psrc->codec_states);
283fail_create_states:
284        pb_iterator_destroy(psrc->media, &psrc->media_pos);       
285fail_create_iterator:
286        pb_destroy(&psrc->media);
287fail_create_media:
288        pb_destroy(&psrc->channel);
289fail_create_channel:
290        block_free(psrc, sizeof(source));
291
292        return NULL;
293}
294
295/* All sources need to be reconfigured when anything changes in
296 * audio path.  These include change of device frequency, change of
297 * the number of channels, etc..
298 */
299
300void
301source_reconfigure(source        *src,
302                   converter_id_t conv_id,
303                   int            render_3d,
304                   u_int16        out_rate,
305                   u_int16        out_channels)
306{
307        u_int16    src_rate, src_channels;
308        codec_id_t            src_cid;
309        const codec_format_t *src_cf;
310
311        assert(src->pdbe != NULL);
312
313        /* Set age to zero and flush existing media
314         * so that repair mechanism does not attempt
315         * to patch across different block sizes.
316         */
317
318        src->age = 0;
319        pb_flush(src->media);
320
321        /* Get rate and channels of incoming media so we know
322         * what we have to change.
323         */
324        src_cid = codec_get_by_payload(src->pdbe->enc);
325        src_cf  = codec_get_format(src_cid);
326        src_rate     = (u_int16)src_cf->format.sample_rate;
327        src_channels = (u_int16)src_cf->format.channels;
328
329        if (render_3d) {
330                assert(out_channels == 2);
331                /* Rejig 3d renderer if there, else create */
332                if (src->pdbe->render_3D_data) {
333                        int azi3d, fil3d, len3d;
334                        render_3D_get_parameters(src->pdbe->render_3D_data,
335                                                 &azi3d,
336                                                 &fil3d,
337                                                 &len3d);
338                        render_3D_set_parameters(src->pdbe->render_3D_data,
339                                                 (int)src_rate,
340                                                 azi3d,
341                                                 fil3d,
342                                                 len3d);
343                } else {
344                        src->pdbe->render_3D_data = render_3D_init((int)src_rate);
345                }
346                assert(src->pdbe->render_3D_data);
347                /* Render 3d is before sample rate/channel conversion, and   */
348                /* output 2 channels.                                        */
349                src_channels = 2;
350        } else {
351                /* Rendering is switched off so destroy info.                */
352                if (src->pdbe->render_3D_data != NULL) {
353                        render_3D_free(&src->pdbe->render_3D_data);
354                }
355        }
356
357        /* Now destroy converter if it is already there.                     */
358        if (src->converter) {
359                converter_destroy(&src->converter);
360        }
361
362        if (src_rate != out_rate || src_channels != out_channels) {
363                converter_fmt_t c;
364                c.src_freq      = src_rate;
365                c.from_channels = src_channels;
366                c.dst_freq      = out_rate;
367                c.to_channels   = out_channels;
368                converter_create(conv_id, &c, &src->converter);
369        }
370        src->byte_count = 0;
371        src->bps        = 0.0;
372}
373
374void
375source_remove(source_list *plist, source *psrc)
376{
377        assert(plist);
378        assert(psrc);
379        assert(source_get_by_ssrc(plist, psrc->pdbe->ssrc) != NULL);
380
381        psrc->next->prev = psrc->prev;
382        psrc->prev->next = psrc->next;
383
384        if (psrc->channel_state) {
385                channel_decoder_destroy(&psrc->channel_state);
386        }
387
388        if (psrc->converter) {
389                converter_destroy(&psrc->converter);
390        }
391
392        pb_iterator_destroy(psrc->media, &psrc->media_pos);
393        pb_destroy(&psrc->channel);
394        pb_destroy(&psrc->media);
395        codec_state_store_destroy(&psrc->codec_states);
396        pktbuf_destroy(&psrc->pktbuf);
397        plist->nsrcs--;
398
399        debug_msg("Destroying source decode path\n");
400       
401        block_free(psrc, sizeof(source));
402
403        assert(source_get_by_ssrc(plist, psrc->pdbe->ssrc) == NULL);
404}
405             
406/* Source Processing Routines ************************************************/
407
408/* Returns true if fn takes ownership responsibility for data */
409static int
410source_process_packet (source *src,
411                       u_char *pckt,
412                       u_int32 pckt_len,
413                       u_int8  payload,
414                       ts_t    playout)
415{
416        channel_data *cd;
417        channel_unit *cu;
418        cc_id_t       cid;
419        u_int8        clayers;
420
421        assert(src != NULL);
422        assert(pckt != NULL);
423
424        /* Need to check:
425         * (i) if layering is enabled
426         * (ii) if channel_data exists for this playout point (if pb_iterator_get_at...)
427         * Then need to:
428         * (i) create cd if doesn't exist
429         * (ii) add packet to cd->elem[layer]
430         * We work out layer number by deducting the base port
431         * no from the port no this packet came from
432         * But what if layering on one port?
433         */
434
435        /* Or we could:
436         * (i) check if cd exists for this playout point
437         * (ii) if so, memcmp() to see if this packet already exists (ugh!)
438         */
439
440        cid = channel_coder_get_by_payload(payload);
441        clayers = channel_coder_get_layers(cid);
442        if (clayers > 1) {
443                struct s_pb_iterator *pi;
444                u_int8 i;
445                u_int32 clen;
446                int dup;
447                ts_t lplayout;
448                pb_iterator_create(src->channel, &pi);
449                while(pb_iterator_advance(pi)) {
450                        pb_iterator_get_at(pi, (u_char**)&cd, &clen, &lplayout);
451                       /* if lplayout==playout there is already channel_data for this playout point */
452                        if(!ts_eq(playout, lplayout)) {
453                                continue;
454                        }
455                        pb_iterator_detach_at(pi, (u_char**)&cd, &clen, &lplayout);
456                        assert(cd->nelem >= 1);
457
458                       /* if this channel_data is full, this new packet must *
459                        * be a duplicate, so we don't need to check          */
460                        if(cd->nelem >= clayers) {
461                                debug_msg("source_process_packet failed - duplicate layer\n");
462                                src->pdbe->duplicates++;
463                                pb_iterator_destroy(src->channel, &pi);
464                                goto done;
465                        }
466
467                        cu = (channel_unit*)block_alloc(sizeof(channel_unit));
468                        cu->data     = pckt;
469                        cu->data_len = pckt_len;
470                        cu->pt       = payload;
471
472                        dup = 0;
473
474                       /* compare existing channel_units to this one */
475                        for(i=0; i<cd->nelem; i++) {
476                                if(cu->data_len!=cd->elem[i]->data_len) break;
477                                /* This memcmp arbitrarily only checks
478                                 * 20 bytes, otherwise it takes too
479                                 * long */
480                                if (memcmp(cu->data, cd->elem[i]->data, 20) == 0) {
481                                        dup=1;
482                                }
483                        }
484
485                       /* duplicate, so stick the channel_data back on *
486                        * the playout buffer and swiftly depart        */
487                        if(dup) {
488                                debug_msg("source_process_packet failed - duplicate layer\n");
489                                src->pdbe->duplicates++;
490                                /* destroy temporary channel_unit */
491                                block_free(cu->data, cu->data_len);
492                                cu->data_len = 0;
493                                block_free(cu, sizeof(channel_unit));
494                                pb_iterator_destroy(src->channel, &pi);
495                                goto done;
496                        }
497
498                       /* add this layer if not a duplicate           *
499                        * NB: layers are not added in order, and thus *
500                        * have to be reorganised in the layered       *
501                        * channel coder                               */
502                        cd->elem[cd->nelem] = cu;
503                        cd->nelem++;
504                        pb_iterator_destroy(src->channel, &pi);
505                        goto done;
506                }
507                pb_iterator_destroy(src->channel, &pi);
508        }
509
510        if (channel_data_create(&cd, 1) == 0) {
511                return FALSE;
512        }
513       
514        cu               = cd->elem[0];
515        cu->data         = pckt;
516        cu->data_len     = pckt_len;
517        cu->pt           = payload;
518
519        /* Check we have state to decode this */
520        cid = channel_coder_get_by_payload(cu->pt);
521        if (src->channel_state &&
522            channel_decoder_matches(cid, src->channel_state) == FALSE) {
523                debug_msg("Channel coder changed - flushing\n");
524                channel_decoder_destroy(&src->channel_state);
525                pb_flush(src->channel);
526        }
527
528        /* Make state if not there and create decoder */
529        if (src->channel_state == NULL &&
530            channel_decoder_create(cid, &src->channel_state) == FALSE) {
531                debug_msg("Cannot decode payload %d\n", cu->pt);
532                channel_data_destroy(&cd, sizeof(channel_data));
533        }
534        src->age++;
535done:   
536        if (pb_add(src->channel, (u_char*)cd, sizeof(channel_data), playout) == FALSE) {
537                src->pdbe->duplicates++;
538                channel_data_destroy(&cd, sizeof(channel_data));
539        }
540
541        return TRUE;
542}
543
544static void
545source_process_packets(session_t *sp, source *src, ts_t now)
546{
547        ts_t    src_ts, playout, transit;
548        pdb_entry_t     *e;
549        rtp_packet      *p;
550        cc_id_t          ccid = -1;
551        u_int16          units_per_packet = -1;
552        u_int32          delta_ts, delta_seq;
553        u_char           codec_pt;
554        int              adjust_playout;
555
556        e = src->pdbe;
557        while(pktbuf_dequeue(src->pktbuf, &p)) {
558                adjust_playout = FALSE;
559
560                if (p->m) {
561                        adjust_playout = TRUE;
562                        debug_msg("New Talkspurt: %lu\n", p->ts);
563                }
564               
565                ccid = channel_coder_get_by_payload((u_char)p->pt);
566                if (channel_verify_and_stat(ccid, (u_char)p->pt,
567                                            p->data, p->data_len,
568                                            &units_per_packet, &codec_pt) == FALSE) {
569                        debug_msg("Packet discarded: packet failed channel verify.\n");
570                        xfree(p);
571                        continue;
572                }
573
574                if (e->channel_coder_id != ccid ||
575                    e->enc              != codec_pt ||
576                    e->units_per_packet != units_per_packet ||
577                    src->packets_done == 0) {
578                        /* Something has changed or is uninitialized...      */
579                        const codec_format_t *cf;
580                        const audio_format   *dev_fmt;
581                        codec_id_t           cid;
582                        u_int32              samples_per_frame;
583
584                        cid = codec_get_by_payload(codec_pt);
585                        cf  = codec_get_format(cid);
586                        /* Fix clock.                                        */
587                        change_freq(e->clock, cf->format.sample_rate);
588                        /* Fix details.                                      */
589                        e->enc              = codec_pt;
590                        e->units_per_packet = units_per_packet;
591                        e->channel_coder_id = ccid;       
592                        samples_per_frame   = codec_get_samples_per_frame(cid);
593                        debug_msg("Samples per frame %d rate %d\n", samples_per_frame, cf->format.sample_rate);
594                        e->inter_pkt_gap    = e->units_per_packet * (u_int16)samples_per_frame;
595                        e->frame_dur        = ts_map32(cf->format.sample_rate, samples_per_frame);
596
597                        debug_msg("Encoding change\n");
598                        /* Get string describing encoding.                   */
599                        channel_describe_data(ccid, codec_pt,
600                                              p->data, p->data_len,
601                                              e->enc_fmt, e->enc_fmt_len);
602                        if (sp->mbus_engine) {
603                                ui_update_stats(sp, e->ssrc);
604                        }
605                        /* Configure converter */
606                        dev_fmt = audio_get_ofmt(sp->audio_device);
607                        source_reconfigure(src,
608                                           sp->converter,
609                                           sp->render_3d,
610                                           (u_int16)dev_fmt->sample_rate,
611                                           (u_int16)dev_fmt->channels);
612                        adjust_playout      = TRUE;
613                }
614               
615                /* Check for talkspurt start indicated by change in          */
616                /* relationship between timestamps and sequence numbers.     */
617                delta_seq = p->seq - e->last_seq;
618                delta_ts  = p->ts  - e->last_ts;
619                if (delta_seq * e->inter_pkt_gap != delta_ts) {
620                        debug_msg("Seq no / timestamp realign (%lu * %lu != %lu)\n",
621                                  delta_seq, e->inter_pkt_gap, delta_ts);
622                        adjust_playout = TRUE;
623                }
624
625                if (ts_gt(e->jitter, e->playout)) {
626                        /* Network conditions have changed drastically.      */
627                        /* We are in the wrong ball park change immediately. */
628                        adjust_playout = TRUE;
629                }
630
631                /* Check for continuous number of packets being discarded.   */
632                /* This happens when jitter or transit estimate is no longer */
633                /* consistent with the real world.                           */
634                if (e->cont_toged >= NO_CONT_TOGED_FOR_PLAYOUT_RECALC) {
635                        adjust_playout = TRUE;
636                        e->cont_toged  = 0;
637                } else if (e->cont_toged != 0) {
638                        debug_msg("cont_toged %d\n", e->cont_toged);
639                }
640
641                /* Calculate the playout point for this packet.              */
642                src_ts = ts_seq32_in(&src->seq, get_freq(e->clock), p->ts);
643
644                /* Transit delay is the difference between our local clock   */
645                /* and the packet timestamp (src_ts).  Note: we expect       */
646                /* packet clumping at talkspurt start because of VAD's       */
647                /* fetching previous X seconds of audio on signal detection  */
648                /* in order to send unvoiced audio at start.                 */
649                if (adjust_playout && pktbuf_get_count(src->pktbuf)) {
650                        rtp_packet *p;
651                        ts_t        last_ts;
652                        pktbuf_peak_last(src->pktbuf, &p);
653                        assert(p != NULL);
654                        last_ts = ts_seq32_in(&src->seq, get_freq(e->clock), p->ts);
655                        transit = ts_sub(sp->cur_ts, last_ts);
656                        debug_msg("Used transit of last packet\n");
657                } else {
658                        transit = ts_sub(sp->cur_ts, src_ts);
659                }
660
661                playout = playout_calc(sp, e->ssrc, transit, adjust_playout);
662                if ((p->m || src->packets_done == 0) && ts_gt(playout, e->frame_dur)) {
663                        /* Packets are likely to be compressed at talkspurt start */
664                        /* because of VAD going back and grabbing frames.         */
665                        playout = ts_sub(playout, e->frame_dur);
666                        debug_msg("New ts shift XXX\n");
667                }
668                playout = ts_add(e->transit, playout);
669                playout = ts_add(src_ts, playout);
670
671                e->last_transit = transit;
672
673                /* If last_played is valid then enough audio is buffer for   */
674                /* the playout check to be sensible.                         */
675                if (ts_valid(src->last_played) &&
676                    ts_gt(src->last_played, playout)) {
677                        debug_msg("Packet late (%u > %u)- discarding\n",
678                                  src->last_played.ticks,
679                                  playout.ticks);
680                        src->pdbe->cont_toged++;
681                        src->pdbe->jit_toged++;
682                        xfree(p);
683                        continue;
684                }
685
686                if (!ts_gt(now, playout)) {
687                        u_char  *u;
688                        u    = (u_char*)block_alloc(p->data_len);
689                        /* Would be great if memcpy occured after validation */
690                        /* in source_process_packet (or not at all)          */
691                        memcpy(u, p->data, p->data_len);
692                        if (source_process_packet(src, u, p->data_len, codec_pt, playout) == FALSE) {
693                                block_free(u, (int)p->data_len);
694                        }
695                        src->pdbe->cont_toged = 0;
696                } else {
697                        /* Packet being decoded is before start of current  */
698                        /* so there is now way it's audio will be played    */
699                        /* Playout recalculation gets triggered in          */
700                        /* rtp_callback if cont_toged hits a critical       */
701                        /* threshold.  It signifies current playout delay   */
702                        /* is inappropriate.                                */
703                        src->pdbe->cont_toged++;
704                        src->pdbe->jit_toged++;
705                }
706
707                /* Update persistent database fields.                        */
708                if (e->last_seq > p->seq) {
709                        e->misordered++;
710                }
711                e->last_seq = p->seq;
712                e->last_ts  = p->ts;
713                e->last_arr = sp->cur_ts;
714                src->packets_done++;
715                xfree(p);
716        }
717}
718
719int
720source_add_packet (source     *src,
721                   rtp_packet *pckt)
722{
723        src->byte_count += pckt->data_len;
724        return pktbuf_enqueue(src->pktbuf, pckt);
725}
726
727static void
728source_update_bps(source *src, ts_t now)
729{
730        ts_t delta;
731        if (!ts_valid(src->byte_count_start)) {
732                src->byte_count_start = now;
733                src->byte_count       = 0;
734                src->bps              = 0.0;
735                return;
736        }
737
738        delta = ts_sub(now, src->byte_count_start);
739       
740        if (ts_gt(delta, bw_avg_period)) {
741                double this_est;
742                this_est = 8.0 * src->byte_count * 1000.0/ ts_to_ms(delta);
743                if (src->bps == 0.0) {
744                        src->bps = this_est;
745                } else {
746                        src->bps += (this_est - src->bps)/2.0;
747                }
748                src->byte_count = 0;
749                src->byte_count_start = now;
750                debug_msg("bps %f\n", src->bps);
751        }
752}
753
754double 
755source_get_bps(source *src)
756{
757        return src->bps;
758}
759
760/* recommend_drop_dur does quick pattern match with audio that is about to   */
761/* be played i.e. first few samples to determine how much audio can be       */
762/* dropped with causing glitch.                                              */
763
764#define SOURCE_COMPARE_WINDOW_SIZE 8
765/* Match threshold is mean abs diff. lower score gives less noise, but less  */
766/* adaption..., might be better if threshold adapted with how much extra     */
767/* data we have buffered...                                                  */
768#define MATCH_THRESHOLD 1000
769
770static ts_t
771recommend_drop_dur(media_data *md)
772{
773        u_int32 score, lowest_score, lowest_begin;
774        u_int16 rate, channels;
775        sample *buffer;
776        int i, j,samples;
777
778        i = md->nrep - 1;
779        while(i >= 0) {
780                if (codec_get_native_info(md->rep[i]->id, &rate, &channels)) {
781                        break;
782                }
783                i--;
784        }
785        assert(i != -1);
786       
787        buffer  = (sample*)md->rep[i]->data;
788        samples = md->rep[i]->data_len / (sizeof(sample) * channels);
789
790        i = 0;
791        j = samples / 16;
792        lowest_score = 0xffffffff;
793        lowest_begin = 0;
794        while (j < samples - SOURCE_COMPARE_WINDOW_SIZE) {
795                score = 0;
796                for (i = 0; i < SOURCE_COMPARE_WINDOW_SIZE; i++) {
797                        score += abs(buffer[i * channels] - buffer[(j+i) * channels]);
798                }
799                if (score <= lowest_score) {
800                        lowest_score = score;
801                        lowest_begin = j;
802                }
803                j++;
804        }
805
806        if (lowest_score/SOURCE_COMPARE_WINDOW_SIZE < MATCH_THRESHOLD) {
807                debug_msg("match score %d, drop dur %d\n", lowest_score/SOURCE_COMPARE_WINDOW_SIZE, lowest_begin);
808                return ts_map32(rate, lowest_begin);
809        } else {
810                return zero_ts;
811        }
812}
813
814#define SOURCE_MERGE_LEN_SAMPLES 5
815
816static void
817conceal_dropped_samples(media_data *md, ts_t drop_dur)
818{
819        /* We are dropping drop_dur samples and want signal to be            */
820        /* continuous.  So we blend samples that would have been played if   */
821        /* they weren't dropped with where signal continues after the drop.  */
822        u_int32 drop_samples;
823        u_int16 rate, channels;
824        int32 tmp, a, b, i, merge_len;
825        sample *new_start, *old_start;
826
827        i = md->nrep - 1;
828        while(i >= 0) {
829                if (codec_get_native_info(md->rep[i]->id, &rate, &channels)) {
830                        break;
831                }
832                i--;
833        }
834
835        assert(i != -1);
836
837        drop_dur     = ts_convert(rate, drop_dur);
838        drop_samples = channels * drop_dur.ticks;
839       
840        /* new_start is what will be played by mixer */
841        new_start = (sample*)md->rep[i]->data + drop_samples;
842        old_start = (sample*)md->rep[i]->data;
843
844        merge_len = SOURCE_MERGE_LEN_SAMPLES * channels;
845        for (i = 0; i < merge_len; i++) {
846                a   = (merge_len - i) * old_start[i] / merge_len;
847                b   = i * new_start[i]/ merge_len;
848                tmp =  (sample)(a + b);
849                new_start[i] = (short)tmp;
850        }
851}
852
853/* source_check_buffering is supposed to check amount of audio buffered      */
854/* corresponds to what we expect from playout so we can think about skew     */
855/* adjustment.                                                               */
856
857int
858source_check_buffering(source *src)
859{
860        ts_t actual, desired, diff;
861
862        if (src->age < SOURCE_YOUNG_AGE) {
863                /* If the source is new(ish) then not enough audio will be   */
864                /* in the playout buffer because it hasn't arrived yet.      */
865                return FALSE;
866        }
867
868        actual  = source_get_audio_buffered(src);
869        desired = source_get_playout_delay(src);
870        diff    = ts_abs_diff(actual, desired);
871
872        if (ts_gt(diff, skew_thresh) && ts_gt(skew_limit, diff)) {
873                src->skew_adjust = diff;
874                if (ts_gt(actual, desired)) {
875                        /* We're accumulating audio, their clock faster   */
876                        src->skew = SOURCE_SKEW_FAST;
877                } else {
878                        /* We're short of audio, so their clock is slower */
879                        src->skew = SOURCE_SKEW_SLOW;
880                }
881                return TRUE;
882        }
883        src->skew = SOURCE_SKEW_NONE;
884        return FALSE;
885}
886
887/* source_skew_adapt exists to shift playout units if source clock appears   */
888/* to be fast or slow.  The media_data unit is here so that it can be        */
889/* examined to see if it is low energy and adjustment would be okay.  Might  */
890/* want to be more sophisticated and put a silence detector in rather than   */
891/* static threshold.                                                         */
892/*                                                                           */
893/* Returns what adaption type occurred.                                      */
894
895static skew_t
896source_skew_adapt(source *src, media_data *md, ts_t playout)
897{
898        u_int32 i, e = 0, samples = 0;
899        u_int16 rate, channels;
900        ts_t adjustment;
901
902        assert(src);
903        assert(md);
904        assert(src->skew != SOURCE_SKEW_NONE);
905
906        for(i = 0; i < md->nrep; i++) {
907                if (codec_get_native_info(md->rep[i]->id, &rate, &channels)) {
908                        samples = md->rep[i]->data_len / (channels * sizeof(sample));
909                        e = avg_audio_energy((sample*)md->rep[i]->data, samples * channels, channels);
910                        src->mean_energy = (15 * src->mean_energy + e)/16;
911                        break;
912                }
913        }
914
915        if (i == md->nrep) {
916                /* don't adapt if unit has not been decoded (error) or       */
917                /* signal has too much energy                                */
918                return SOURCE_SKEW_NONE;
919        }
920
921        /* When we are making the adjustment we must shift playout buffers   */
922        /* and timestamps that the source decode process uses.  Must be      */
923        /* careful with last repair because it is not valid if no repair has */
924        /* taken place.                                                      */
925
926        if (src->skew == SOURCE_SKEW_FAST/* &&
927                (2*e <=  src->mean_energy || e < 200) */) {
928                /* source is fast so we need to bring units forward.
929                 * Should only move forward at most a single unit
930                 * otherwise we might discard something we have not
931                 * classified.  */
932
933                adjustment = recommend_drop_dur(md);
934                if (ts_gt(adjustment, src->skew_adjust) || adjustment.ticks == 0) {
935                        /* adjustment needed is greater than adjustment period
936                         * that best matches dropable by signal matching.
937                         */
938                        return SOURCE_SKEW_NONE;
939                }
940                debug_msg("dropping %d / %d samples\n", adjustment.ticks, src->skew_adjust.ticks);
941                pb_shift_forward(src->media,   adjustment);
942                pb_shift_forward(src->channel, adjustment);
943
944                src->pdbe->transit      = ts_sub(src->pdbe->transit, adjustment);
945                /* avg_transit and last_transit are fine.  Difference in     */
946                /* avg_transit and transit triggered this adjustment.        */
947
948                if (ts_valid(src->last_repair)) {
949                        src->last_repair = ts_sub(src->last_repair, adjustment);
950                }
951
952                if (ts_valid(src->last_played)) {
953                        src->last_played = ts_sub(src->last_played, adjustment);
954                }
955
956                /* Remove skew adjustment from estimate of skew outstanding */
957                if (ts_gt(src->skew_adjust, adjustment)) {
958                        src->skew_adjust = ts_sub(src->skew_adjust, adjustment);
959                } else {
960                        src->skew = SOURCE_SKEW_NONE;
961                }
962
963                conceal_dropped_samples(md, adjustment);
964
965                return SOURCE_SKEW_FAST;
966        } else if (src->skew == SOURCE_SKEW_SLOW) {
967                adjustment = ts_map32(rate, samples);
968                if (ts_gt(src->skew_adjust, adjustment)) {
969                        adjustment = ts_map32(rate, samples);
970                }
971                pb_shift_units_back_after(src->media,   playout, adjustment);
972                pb_shift_units_back_after(src->channel, playout, adjustment);
973                src->pdbe->transit = ts_add(src->pdbe->transit, adjustment);
974
975                if (ts_gt(adjustment, src->skew_adjust)) {
976                        src->skew_adjust = zero_ts;
977                } else {
978                        src->skew_adjust = ts_sub(src->skew_adjust, adjustment);
979                }
980
981/* shouldn't have to make this adjustment since we are now adjusting
982 * units in future only.
983                src->last_played = ts_add(src->last_played, adjustment);
984                if (ts_valid(src->last_repair)) {
985                        src->last_repair = ts_add(src->last_repair, adjustment);
986                }
987                */
988                debug_msg("Playout buffer shift back %d (%d).\n", adjustment.ticks, src->last_played.ticks);
989                src->skew = SOURCE_SKEW_NONE;
990                return SOURCE_SKEW_SLOW;
991        }
992
993        return SOURCE_SKEW_NONE;
994}
995
996static int
997source_repair(source     *src,
998              repair_id_t r,
999              ts_t        step)
1000{
1001        media_data* fill_md, *prev_md;
1002        ts_t        fill_ts,  prev_ts;
1003        u_int32     success,  prev_len;
1004
1005        /* Check for need to reset of consec_lost count */
1006
1007        if (ts_valid(src->last_repair) == FALSE ||
1008            ts_eq(src->last_played, src->last_repair) == FALSE) {
1009                src->consec_lost = 0;
1010        }
1011
1012        /* We repair one unit at a time since it may be all we need */
1013        pb_iterator_retreat(src->media_pos);
1014        pb_iterator_get_at(src->media_pos,
1015                           (u_char**)&prev_md,
1016                           &prev_len,
1017                           &prev_ts);
1018
1019        assert(prev_md != NULL);
1020
1021        if (!ts_eq(prev_ts, src->last_played)) {
1022                debug_msg("prev_ts and last_played don't match\n");
1023                return FALSE;
1024        }
1025
1026        media_data_create(&fill_md, 1);
1027        repair(r,
1028               src->consec_lost,
1029               src->codec_states,
1030               prev_md,
1031               fill_md->rep[0]);
1032        fill_ts = ts_add(src->last_played, step);
1033        success = pb_add(src->media,
1034                         (u_char*)fill_md,
1035                         sizeof(media_data),
1036                         fill_ts);
1037        if (success) {
1038                src->consec_lost ++;
1039                src->last_repair = fill_ts;
1040                pb_iterator_advance(src->media_pos);
1041#ifndef NDEBUG
1042        /* Reusing prev_* - c'est mal, je sais */
1043                pb_iterator_get_at(src->media_pos,
1044                                   (u_char**)&prev_md,
1045                                   &prev_len,
1046                                   &prev_ts);
1047                if (ts_eq(prev_ts, fill_ts) == FALSE) {
1048                        debug_msg("Added at %d, but got %d when tried to get it back!\n", fill_ts.ticks, prev_ts.ticks);
1049                        return FALSE;
1050                }
1051#endif
1052        } else {
1053                /* This should only ever fail at when source changes
1054                 * sample rate in less time than playout buffer
1055                 * timeout.  This should be a very very rare event... 
1056                 */
1057                debug_msg("Repair add data failed (%d), last_played %d.\n", fill_ts.ticks, src->last_played.ticks);
1058                media_data_destroy(&fill_md, sizeof(media_data));
1059                src->consec_lost = 0;
1060                return FALSE;
1061        }
1062        return TRUE;
1063}
1064
1065int
1066source_process(session_t *sp,
1067               source            *src,
1068               struct s_mix_info *ms,
1069               int                render_3d,
1070               repair_id_t        repair_type,
1071               ts_t               start_ts,    /* Real-world time           */
1072               ts_t               end_ts)      /* Real-world time + cushion */
1073{
1074        media_data  *md;
1075        coded_unit  *cu;
1076        codec_state *cs;
1077        u_int32     md_len, src_freq;
1078        ts_t        playout, step, cutoff;
1079        int         i, success, hold_repair = 0;
1080
1081        /* Note: hold_repair is used to stop repair occuring.
1082         * Occasionally, there is a race condition when the playout
1083         * point is recalculated causing overlap, and when playout
1084         * buffer shift occurs in middle of a loss.
1085         */
1086       
1087        source_process_packets(sp, src, start_ts);
1088
1089        /* Split channel coder units up into media units */
1090        if (pb_node_count(src->channel)) {
1091                channel_decoder_decode(src->channel_state,
1092                                       src->channel,
1093                                       src->media,
1094                                       end_ts);
1095        }
1096
1097        src_freq = get_freq(src->pdbe->clock);
1098        step = ts_map32(src_freq, src->pdbe->inter_pkt_gap / src->pdbe->units_per_packet);
1099
1100        while (pb_iterator_advance(src->media_pos)) {
1101                pb_iterator_get_at(src->media_pos,
1102                                   (u_char**)&md,
1103                                   &md_len,
1104                                   &playout);
1105                assert(md != NULL);
1106                assert(md_len == sizeof(media_data));
1107               
1108                /* Conditions for repair:
1109                 * (a) last_played has meaning.
1110                 * (b) playout point does not what we expect.
1111                 * (c) repair type is not no repair.
1112                 * (d) last decoded was not too long ago.
1113                 */
1114                cutoff = ts_sub(end_ts, ts_map32(src_freq, SOURCE_AUDIO_HISTORY_MS));
1115
1116                assert((ts_valid(src->last_played) == FALSE) || ts_eq(playout, src->last_played) == FALSE);
1117
1118                if (ts_valid(src->last_played) &&
1119                    ts_gt(playout, ts_add(src->last_played, step)) &&
1120                    ts_gt(src->last_played, cutoff) &&
1121                    hold_repair == 0) {
1122                        /* If repair was successful media_pos is moved,
1123                         * so get data at media_pos again.
1124                         */
1125                        if (source_repair(src, repair_type, step) == FALSE) {
1126                                hold_repair += 2; /* 1 works, but 2 is probably better */
1127                        }
1128                        debug_msg("Repair\n");
1129                        success = pb_iterator_get_at(src->media_pos,
1130                                                     (u_char**)&md,
1131                                                     &md_len,
1132                                                     &playout);
1133                        assert(success);
1134                } else if (hold_repair > 0) {
1135                        hold_repair --;
1136                }
1137
1138                if (ts_gt(playout, end_ts)) {
1139                        /* This playout point is after now so stop */
1140                        pb_iterator_retreat(src->media_pos);
1141                        break;
1142                }
1143
1144                for(i = 0; i < md->nrep; i++) {
1145                        if (codec_is_native_coding(md->rep[i]->id)) {
1146                                break;
1147                        }
1148                }
1149
1150                if (i == md->nrep) {
1151                        /* We need to decode this unit, may not have to
1152                         * when repair has been used.
1153                         */
1154#ifdef DEBUG
1155                        for(i = 0; i < md->nrep; i++) {
1156                                /* if there is a native coding this
1157                                 * unit has already been decoded and
1158                                 * this would be bug */
1159                                assert(md->rep[i] != NULL);
1160                                assert(codec_id_is_valid(md->rep[i]->id));
1161                                assert(codec_is_native_coding(md->rep[i]->id) == FALSE);
1162                        }
1163#endif /* DEBUG */
1164                        cu = (coded_unit*)block_alloc(sizeof(coded_unit));
1165                        /* Decode frame */
1166                        assert(cu != NULL);
1167                        memset(cu, 0, sizeof(coded_unit));
1168                        cs = codec_state_store_get(src->codec_states, md->rep[0]->id);
1169                        codec_decode(cs, md->rep[0], cu);
1170                        assert(md->rep[md->nrep] == NULL);
1171                        md->rep[md->nrep] = cu;
1172                        md->nrep++;
1173                }
1174
1175                if (render_3d && src->pdbe->render_3D_data) {
1176                        /* 3d rendering necessary */
1177                        coded_unit *decoded, *render;
1178                        decoded = md->rep[md->nrep - 1];
1179                        assert(codec_is_native_coding(decoded->id));
1180                       
1181                        render = (coded_unit*)block_alloc(sizeof(coded_unit));
1182                        memset(render, 0, sizeof(coded_unit));
1183                       
1184                        render_3D(src->pdbe->render_3D_data,decoded,render);
1185                        assert(md->rep[md->nrep] == NULL);
1186                        md->rep[md->nrep] = render;
1187                        md->nrep++;
1188                }
1189
1190                if (src->converter) {
1191                        /* convert frame */
1192                        coded_unit *decoded, *render;
1193                        decoded = md->rep[md->nrep - 1];
1194                        assert(codec_is_native_coding(decoded->id));
1195
1196                        render = (coded_unit*)block_alloc(sizeof(coded_unit));
1197                        memset(render, 0, sizeof(coded_unit));
1198                        converter_process(src->converter,
1199                                          decoded,
1200                                          render);
1201                        assert(md->rep[md->nrep] == NULL);
1202                        md->rep[md->nrep] = render;
1203                        md->nrep++;
1204                }
1205
1206                if (src->skew != SOURCE_SKEW_NONE &&
1207                    source_skew_adapt(src, md, playout) != SOURCE_SKEW_NONE) {
1208                        /* We have skew and we have adjusted playout buffer  */
1209                        /* timestamps, so re-get unit to get correct         */
1210                        /* timestamp info.                                   */
1211                        pb_iterator_get_at(src->media_pos,
1212                                           (u_char**)&md,
1213                                           &md_len,
1214                                           &playout);
1215                        assert(md != NULL);
1216                        assert(md_len == sizeof(media_data));
1217                }
1218
1219                if (src->pdbe->gain != 1.0 && codec_is_native_coding(md->rep[md->nrep - 1]->id)) {
1220                        audio_scale_buffer((sample*)md->rep[md->nrep - 1]->data,
1221                                           md->rep[md->nrep - 1]->data_len / sizeof(sample),
1222                                           src->pdbe->gain);
1223                }
1224
1225                if (mix_process(ms, src->pdbe, md->rep[md->nrep - 1], playout) == FALSE) {
1226                        /* Sources sampling rate changed mid-flow? dump data */
1227                        /* make source look irrelevant, it should get        */
1228                        /* destroyed and the recreated with proper decode    */
1229                        /* path when new data arrives.  Not graceful..       */
1230                        /* A better way would be just to flush media then    */
1231                        /* invoke source_reconfigure if this is ever really  */
1232                        /* an issue.                                         */
1233                        pb_flush(src->media);
1234                        pb_flush(src->channel);
1235                }
1236
1237                src->last_played = playout;
1238        }
1239
1240        source_update_bps(src, start_ts);
1241
1242        UNUSED(i); /* Except for debugging */
1243       
1244        return TRUE;
1245}
1246
1247int
1248source_audit(source *src)
1249{
1250        if (src->age != 0) {
1251                /* Keep 1/8 seconds worth of audio */
1252                pb_iterator_audit(src->media_pos, history_ts);
1253                return TRUE;
1254        }
1255        return FALSE;
1256}
1257
1258ts_t
1259source_get_audio_buffered (source *src)
1260{
1261        /* Changes in avg_transit change amount of audio buffered. */
1262        ts_t delta;
1263        delta = ts_sub(src->pdbe->transit, src->pdbe->avg_transit);
1264        return ts_add(src->pdbe->playout, delta);
1265}
1266
1267ts_t
1268source_get_playout_delay (source *src)
1269{
1270        return src->pdbe->playout;
1271        /*  return ts_sub(src->pdbe->playout, src->pdbe->transit); */
1272}
1273
1274int
1275source_relevant(source *src, ts_t now)
1276{
1277        assert(src);
1278
1279        if (pb_relevant(src->media, now) || pb_relevant(src->channel, now) || src->age < 50) {
1280                return TRUE;
1281        } if (ts_valid(src->last_played)) {
1282                /* Source is quiescent                                     */
1283                ts_t quiet;       
1284                quiet = ts_sub(now, src->last_played);
1285                if (ts_gt(keep_source_ts, quiet)) {
1286                        return TRUE;
1287                }
1288        }
1289        return FALSE;
1290}
1291
1292struct s_pb*
1293source_get_decoded_buffer(source *src)
1294{
1295        return src->media;
1296}
1297
1298u_int32
1299source_get_ssrc(source *src)
1300{
1301        return src->pdbe->ssrc;
1302}
Note: See TracBrowser for help on using the browser.