root/rat/trunk/main_engine.c @ 3071

Revision 3071, 11.8 KB (checked in by ucaccsp, 15 years ago)

- Fix saving of audioChannelParameters setting
- Fix use of mbus structure after freeing it
- Fix freeing of memory on shutdown
- Add assorted debugging code (trying to make this thing stable...)

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*
2 * FILE:    main-engine.c
3 * PROGRAM: RAT
4 * AUTHOR:  Colin Perkins
5 *
6 * $Revision$
7 * $Date$
8 *
9 * Copyright (c) 1995-2000 University College London
10 * All rights reserved.
11 *
12 */
13
14#include "config_unix.h"
15#include "config_win32.h"
16#include "debug.h"
17#include "memory.h"
18#include "audio_types.h"
19#include "codec_types.h"
20#include "codec.h"
21#include "channel_types.h"
22#include "session.h"
23#include "audio.h"
24#include "auddev.h"
25#include "cushion.h"
26#include "converter.h"
27#include "tcltk.h"
28#include "pdb.h"
29#include "ui.h"
30#include "net.h"
31#include "timers.h"
32#include "parameters.h"
33#include "transmit.h"
34#include "source.h"
35#include "mix.h"
36#include "sndfile.h"
37#include "mbus.h"
38#include "mbus_ui.h"
39#include "mbus_engine.h"
40#include "crypt_random.h"
41#include "net_udp.h"
42#include "settings.h"
43#include "rtp.h"
44#include "rtp_callback.h"
45
46int      should_exit = FALSE;
47char    *c_addr, *token, *token_e; /* These should be parameters of the session? */
48
49#ifndef WIN32
50static void
51signal_handler(int signal)
52{
53        debug_msg("Caught signal %d\n", signal);
54        exit(-1);
55}
56#endif
57
58#define MBUS_ADDR_ENGINE "(media:audio module:engine app:rat instance:%u)"
59
60static void parse_args(int argc, char *argv[])
61{
62        int     i;
63
64        if (argc != 5) {
65                printf("Usage: %s -ctrl <addr> -token <token>\n", argv[0]);
66                exit(1);
67        }
68        for (i = 1; i < argc; i++) {
69                if (strcmp(argv[i], "-ctrl") == 0) {
70                        c_addr = xstrdup(argv[++i]);
71                } else if (strcmp(argv[i], "-token") == 0) {
72                        token   = xstrdup(argv[++i]);
73                        token_e = mbus_encode_str(token);
74                } else {
75                        printf("Unknown argument \"%s\"\n", argv[i]);
76                        abort();
77                }
78        }
79}
80
81static void 
82mbus_error_handler(int seqnum, int reason)
83{
84        debug_msg("mbus message failed (%d:%d)\n", seqnum, reason);
85        if (should_exit == FALSE) {
86                abort();
87        }
88        UNUSED(seqnum);
89        UNUSED(reason);
90        /* Ignore error we're closing down anyway */
91}
92
93int main(int argc, char *argv[])
94{
95        uint32_t         cur_time = 0, ntp_time = 0;
96        int              seed, elapsed_time, alc = 0, scnt = 0;
97        session_t       *sp;
98        struct timeval   time;
99        struct timeval   timeout;
100        uint8_t          j;
101
102#ifdef WIN32
103        SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
104#else
105        signal(SIGINT, signal_handler);
106        debug_set_core_dir(argv[0]);
107#endif
108
109        /* Setup things which are independent of the session. These should */
110        /* be create static data only, since it will be accessed by BOTH   */
111        /* instances when running as a transcoder.                         */
112        seed = (gethostid() << 8) | (getpid() & 0xff);
113        srand48(seed);
114        lbl_srandom(seed);
115        converters_init();
116        audio_init_interfaces();
117        parse_args(argc, argv);
118
119        /* Initialize the session structure, and all session specific data */
120        sp = (session_t *) xmalloc(sizeof(session_t));
121        session_init(sp);
122        audio_device_get_safe_config(&sp->new_config);
123        audio_device_reconfigure(sp);
124        sp->cur_ts = ts_seq32_in(&sp->decode_sequencer, get_freq(sp->device_clock), 0);
125        assert(audio_device_is_open(sp->audio_device));
126        settings_load_early(sp);
127
128        /* Initialise our mbus interface... once this is done we can talk to our controller */
129        sp->mbus_engine = mbus_init(mbus_engine_rx, mbus_error_handler);
130        sp->mbus_engine_addr = (char *) xmalloc(strlen(MBUS_ADDR_ENGINE) + 3);
131        sprintf(sp->mbus_engine_addr, MBUS_ADDR_ENGINE, (uint32_t) getpid());
132        mbus_addr(sp->mbus_engine, sp->mbus_engine_addr);
133
134        /* The first stage is to wait until we hear from our controller. The address of the */
135        /* controller is passed to us via a command line parameter, and we just wait until  */
136        /* we get an mbus.hello() from that address.                                        */
137        debug_msg("Waiting to validate address %s\n", c_addr);
138        while (!mbus_addr_valid(sp->mbus_engine, c_addr)) {
139                timeout.tv_sec  = 0;
140                timeout.tv_usec = 250000;
141                mbus_recv(sp->mbus_engine, NULL, &timeout);
142                mbus_send(sp->mbus_engine);
143                mbus_heartbeat(sp->mbus_engine, 1);
144                mbus_retransmit(sp->mbus_engine);
145        }
146        debug_msg("Address %s is valid\n", c_addr);
147
148        /* Next, we signal to the controller that we are ready to go. It should be sending  */
149        /* us an mbus.waiting(foo) where "foo" is the same as the "-token" argument we were */
150        /* passed on startup. We respond with mbus.go(foo) sent reliably to the controller. */
151        debug_msg("Waiting for mbus.waiting(%s) from controller...\n", token);
152        mbus_rendezvous_go(sp->mbus_engine, token, (void *) sp);
153        debug_msg("...got it\n");
154
155        /* At this point we know the mbus address of our controller, and have conducted */
156        /* a successful rendezvous with it. It will now send us configuration commands. */
157        debug_msg("Waiting for mbus.go(%s) from controller...\n", token);
158        mbus_rendezvous_waiting(sp->mbus_engine, c_addr, token, (void *) sp);
159        debug_msg("...got it\n");
160        assert(sp->rtp_session[0] != NULL);
161
162        if (pdb_create(&sp->pdb) == FALSE) {
163                debug_msg("Failed to create persistent database\n");
164                abort();
165        }
166        pdb_item_create(sp->pdb, sp->clock, (uint16_t)get_freq(sp->device_clock), rtp_my_ssrc(sp->rtp_session[0]));
167        settings_load_late(sp);
168
169        ui_initial_settings(sp);
170        ui_update(sp);
171        network_process_mbus(sp);
172       
173        audio_drain(sp->audio_device);
174        if (tx_is_sending(sp->tb)) {
175                tx_start(sp->tb);
176        }
177
178        session_validate(sp);
179        xmemchk();
180        xdoneinit();
181
182        while (!should_exit) {
183                elapsed_time = 0;
184
185                /* Process audio */
186                elapsed_time = audio_rw_process(sp, sp, sp->ms);
187                cur_time = get_time(sp->device_clock);
188                ntp_time = ntp_time32();
189                sp->cur_ts   = ts_seq32_in(&sp->decode_sequencer, get_freq(sp->device_clock), cur_time);
190
191                if (tx_is_sending(sp->tb)) {
192                        tx_process_audio(sp->tb);
193                        tx_send(sp->tb);
194                }
195
196                /* Process RTP/RTCP packets */
197                timeout.tv_sec  = 0;
198                timeout.tv_usec = 0;
199                for (j = 0; j < sp->rtp_session_count; j++) {
200                        while(rtp_recv(sp->rtp_session[j], &timeout, cur_time));
201                        rtp_send_ctrl(sp->rtp_session[j], cur_time, NULL);
202                        rtp_update(sp->rtp_session[j]);
203                }
204
205                /* Process mbus */
206                timeout.tv_sec  = 0;
207                timeout.tv_usec = 0;
208                mbus_recv(sp->mbus_engine, (void *) sp, &timeout);
209                mbus_heartbeat(sp->mbus_engine, 1);
210                mbus_retransmit(sp->mbus_engine);
211                mbus_send(sp->mbus_engine);
212
213                /* Process audio */
214                elapsed_time += audio_rw_process(sp, sp, sp->ms);
215                cur_time      = get_time(sp->device_clock);
216                ntp_time      = ntp_time32();
217                sp->cur_ts    = ts_seq32_in(&sp->decode_sequencer, get_freq(sp->device_clock), cur_time);
218
219                if (tx_is_sending(sp->tb)) {
220                        tx_process_audio(sp->tb);
221                        tx_send(sp->tb);
222                }
223
224                /* Process and mix active sources */
225                if (sp->playing_audio) {
226                        struct s_source *s;
227                        int              sidx;
228                        ts_t             cush_ts;
229
230                        session_validate(sp);
231                        cush_ts = ts_map32(get_freq(sp->device_clock), cushion_get_size(sp->cushion));
232                        cush_ts = ts_add(sp->cur_ts, cush_ts);
233                        scnt = (int)source_list_source_count(sp->active_sources);
234                        for(sidx = 0; sidx < scnt; sidx++) {
235                                s = source_list_get_source_no(sp->active_sources, sidx);
236                                if (source_relevant(s, sp->cur_ts)) {
237                                        pdb_entry_t *e;
238                                        ts_t         two_secs, delta;
239                                        source_check_buffering(s);
240                                        source_process(sp, s, sp->ms, sp->render_3d, sp->repair, sp->cur_ts, cush_ts);
241                                        source_audit(s);
242                                        /* Check for UI update necessary, updating once per 2 secs */
243                                        pdb_item_get(sp->pdb, source_get_ssrc(s), &e);
244                                        delta    = ts_sub(sp->cur_ts, e->last_ui_update);
245                                        two_secs = ts_map32(8000, 16000);
246                                        if (ts_gt(delta, two_secs)) {
247                                                ui_update_stats(sp, e->ssrc);
248                                                e->last_ui_update = sp->cur_ts;
249                                        }
250                                } else {
251                                        /* Remove source as stopped */
252                                        uint32_t ssrc;
253                                        ssrc = source_get_ssrc(s);
254                                        ui_info_deactivate(sp, ssrc);
255                                        source_remove(sp->active_sources, s);
256                                        sidx--;
257                                        scnt--;
258                                }
259                        }
260                }
261
262                /* Echo Suppression - cut off transmitter when receiving     */
263                /* audio, enable when stop receiving.                        */
264                session_validate(sp);
265                if (sp->echo_suppress) {
266                        if (scnt > 0) {
267                                if (tx_is_sending(sp->tb)) {
268                                        tx_stop(sp->tb);
269                                        sp->echo_tx_active = TRUE;
270                                        debug_msg("Echo suppressor (disabling tx)\n");
271                                }
272                        } else if (sp->echo_tx_active) {
273                                /* Transmitter was stopped because we were   */
274                                /* playing out audio.  Restart it.           */
275                                if (tx_is_sending(sp->tb) == FALSE) {
276                                        tx_start(sp->tb);
277                                        debug_msg("Echo suppressor (enabling tx)\n");
278                                }
279                                sp->echo_tx_active = FALSE;
280                        }
281                }
282                /* Lecture Mode */
283                if (alc >= 50) {
284                        if (!sp->lecture && tx_is_sending(sp->tb) && sp->auto_lecture != 0) {
285                                gettimeofday(&time, NULL);
286                                if (time.tv_sec - sp->auto_lecture > 120) {
287                                        sp->auto_lecture = 0;
288                                        debug_msg("Dummy lecture mode\n");
289                                }
290                        }
291                        alc = 0;
292                } else {
293                        alc++;
294                }
295
296                if (sp->audio_device && elapsed_time != 0) {
297                        ui_periodic_updates(sp, elapsed_time);
298                }
299                if (sp->new_config != NULL) {
300                        /* wait for mbus messages - closing audio device    */
301                        /* can timeout unprocessed messages as some drivers */
302                        /* pause to drain before closing.                   */
303                        network_process_mbus(sp);
304                        if (audio_device_reconfigure(sp)) {
305                                /* Device reconfigured so decode paths of all sources */
306                                /* are misconfigured. Delete the source, and incoming */
307                                /* data will drive the correct new path.              */
308                                source_list_clear(sp->active_sources);
309                                ui_update(sp);
310                        }
311                }
312               
313                /* Choke CPU usage */
314                if (!audio_is_ready(sp->audio_device)) {
315                        audio_wait_for(sp->audio_device, 20);
316                }
317
318                /* Debugging sanity check of the session... */
319                session_validate(sp);
320                /* ...and check that nothing has trashed memory too badly! */
321                xmemchk();
322        }
323
324        settings_save(sp);
325        tx_stop(sp->tb);
326        if (sp->in_file  != NULL) snd_read_close (&sp->in_file);
327        if (sp->out_file != NULL) snd_write_close(&sp->out_file);
328        audio_device_release(sp, sp->audio_device);
329        pdb_destroy(&sp->pdb);
330
331        for (j = 0; j < sp->rtp_session_count; j++) {
332                rtp_send_bye(sp->rtp_session[j]);
333                rtp_done(sp->rtp_session[j]);
334                rtp_callback_exit(sp->rtp_session[j]);
335        }
336
337        /* Inform other processes that we're about to quit... */
338        mbus_qmsgf(sp->mbus_engine, "()", FALSE, "mbus.bye", "");
339        mbus_send(sp->mbus_engine);
340        do {
341                struct timeval   timeout;
342                mbus_send(sp->mbus_engine);
343                mbus_retransmit(sp->mbus_engine);
344                timeout.tv_sec  = 0;
345                timeout.tv_usec = 20000;
346                mbus_recv(sp->mbus_engine, sp, &timeout);
347        } while (!mbus_sent_all(sp->mbus_engine));
348        mbus_exit(sp->mbus_engine);
349
350        session_validate(sp);
351        session_exit(sp);
352        converters_free();
353        audio_free_interfaces();
354        xfree(c_addr);
355        xfree(token);
356        xfree(token_e);
357        xmemdmp();
358        return 0;
359}
Note: See TracBrowser for help on using the browser.