root/rat/trunk/main_engine.c @ 4213

Revision 4213, 16.0 KB (checked in by turam, 6 years ago)

Accept -X arguments on command line, and pass to underlying apps and into tcl interpreter

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*
2 * FILE:    main-engine.c
3 * PROGRAM: RAT
4 * AUTHOR:  Colin Perkins
5 *
6 * Copyright (c) 1995-2001 University College London
7 * All rights reserved.
8 */
9 
10#ifndef HIDE_SOURCE_STRINGS
11static const char cvsid[] =
12        "$Id$";
13#endif /* HIDE_SOURCE_STRINGS */
14
15#include "config_unix.h"
16#include "config_win32.h"
17#include "debug.h"
18#include "memory.h"
19#include "audio_types.h"
20#include "codec_types.h"
21#include "codec.h"
22#include "channel_types.h"
23#include "session.h"
24#include "audio.h"
25#include "auddev.h"
26#include "cushion.h"
27#include "converter.h"
28#include "tcltk.h"
29#include "pdb.h"
30#include "ui_send_rtp.h"
31#include "ui_send_audio.h"
32#include "ui_send_stats.h"
33#include "net.h"
34#include "parameters.h"
35#include "transmit.h"
36#include "source.h"
37#include "mix.h"
38#include "sndfile.h"
39#include "mbus_ui.h"
40#include "mbus_engine.h"
41#include "crypt_random.h"
42#include "net_udp.h"
43#include "settings.h"
44#include "rtp.h"
45#include "rtp_callback.h"
46#include "tonegen.h"
47#include "voxlet.h"
48#include "fatal_error.h"
49#include "version.h"
50
51#include "mbus_parser.h"
52#include "mbus.h"
53#include "util.h"
54
55char            *c_addr, *token[2], *token_e[2]; /* Could be in the session struct */
56int              should_exit = FALSE;
57int              mbus_shutdown_error = FALSE;
58int              num_sessions = 1;
59
60pid_t ppid;
61
62#ifndef WIN32
63static void
64signal_handler(int signal)
65{
66        debug_msg("Caught signal %d\n", signal);
67        exit(-1);
68}
69#endif
70
71#define MBUS_ADDR_ENGINE "(media:audio module:engine app:rat session:%d id:%lu)"
72
73static void parse_args(int argc, char *argv[])
74{
75        int             i, tc;
76
77        tc = 0;
78        for (i = 1; i < argc; i++) {
79                if (strcmp(argv[i], "-ctrl") == 0) {
80                        c_addr = xstrdup(argv[++i]);
81                } else if (strcmp(argv[i], "-token") == 0) {
82                        token[tc]   = xstrdup(argv[++i]);
83                        token_e[tc] = mbus_encode_str(token[tc]);
84                        debug_msg("token[%d] = %s\n", tc, token[tc]);
85                        tc++;
86                } else if (strcmp(argv[i], "-T") == 0) {
87                        /* Enable transcoder... */
88                        debug_msg("Enabled transcoder support\n");
89                        num_sessions = 2;
90                } else if (strcmp(argv[i], "-X") == 0) {
91                        /* accept -X arguments, but only because they get passed to tcl */
92                        i++;
93                } else {
94                        printf("Unknown argument to %s: \"%s\"\n", argv[0], argv[i]);
95                        printf("Usage: rat-%s-media [-T] -ctrl <addr> -token <token> [-token <token>] [-X arg=value]\n", RAT_VERSION);
96                        abort();
97                }
98        }
99
100        /*
101         * Want app instance to be same across all processes that
102         * consitute this rat.  Parent pid appears after last colon.
103         * Obviously on Un*x we could use getppid...
104         */
105        i = strlen(c_addr) - 1;
106        while(i > 1 && c_addr[i - 1] != ':') {
107                i--;
108        }
109        ppid = strtoul(&c_addr[i], NULL, 10);
110}
111
112static void 
113mbus_error_handler(int seqnum, int reason)
114{
115        debug_msg("mbus message failed (seqnum:%d - %s)\n", seqnum, mbus_errlist[reason>=MBUS_ERR_MAX?MBUS_ERR_MAX:reason]);
116        if (should_exit == FALSE) {
117                char msg[64];
118                sprintf(msg, "MBUS message failed (%d:%s)\n", seqnum, mbus_errlist[reason>=MBUS_ERR_MAX?MBUS_ERR_MAX:reason]);
119                fatal_error("RAT v" RAT_VERSION, msg);
120                abort();
121        }
122        mbus_shutdown_error = TRUE;
123        UNUSED(seqnum);
124        UNUSED(reason);
125        /* Ignore error we're closing down anyway */
126}
127
128int rendezvous_with_controller(session_t *sp[2])
129{
130        int             i, j, done, waiting_limitcount=0;
131        struct timeval  timeout;
132
133        /* Signal to the controller that we are ready to go. It should be sending us an     */
134        /* mbus.waiting(foo) where "foo" is the same as the "-token" argument we were       */
135        /* passed on startup. We respond with mbus.go(foo) sent reliably to the controller. */
136        /* This gets complicated, because we may have two instances of the mbus active.     */
137        for (i = 0; i < num_sessions; i++) {
138                debug_msg("Need mbus.waiting(%s) from controller...\n", token[i]);
139                sp[i]->mbus_waiting       = TRUE;
140                sp[i]->mbus_waiting_token = token[i];
141        }
142        do {
143                done = TRUE;
144                for (j = 0; j < num_sessions; j++) {
145                        timeout.tv_sec  = 0;
146                        timeout.tv_usec = 10000;
147                        mbus_send(sp[j]->mbus_engine);
148                        mbus_recv(sp[j]->mbus_engine, (void *) sp[j], &timeout);
149                        mbus_heartbeat(sp[j]->mbus_engine, 1);
150                        mbus_retransmit(sp[j]->mbus_engine);
151                        done &= !sp[j]->mbus_waiting;
152                        if (!sp[j]->mbus_waiting) {
153                                debug_msg("Sending mbus.go(%s) to controller...\n", token[j]);
154                                mbus_qmsgf(sp[j]->mbus_engine, c_addr, TRUE, "mbus.go", "%s", token_e[j]);
155                        }
156                        /* Wait till 3000*10000 (20secs) - otherwise it is an error */
157                        if (waiting_limitcount++>2000)
158                                return FALSE;
159                }
160        } while (!done);
161        debug_msg("Got all needed mbus.waiting() messages from controller\n");
162
163        for (i = 0; i < num_sessions; i++) {
164                do {
165                        done = FALSE;
166                        for (i = 0; i < num_sessions; i++) {
167                                mbus_heartbeat(sp[i]->mbus_engine, 1);
168                                mbus_retransmit(sp[i]->mbus_engine);
169                                mbus_send(sp[i]->mbus_engine);
170                                timeout.tv_sec  = 0;
171                                timeout.tv_usec = 10000;
172                                mbus_recv(sp[i]->mbus_engine, (void *) sp[i], &timeout);
173                                done |= mbus_sent_all(sp[i]->mbus_engine);
174                        }
175                } while (!done);
176        }
177        debug_msg("Sent all mbus.go() messages to controller\n");
178
179        /* At this point we know the mbus address of our controller, and have conducted */
180        /* a successful rendezvous with it. It will now send us configuration commands. */
181        for (i = 0; i < num_sessions; i++) {
182                debug_msg("Need mbus.go(%s) from controller...\n", token_e[i]);
183                sp[i]->mbus_go       = TRUE;
184                sp[i]->mbus_go_token = token[i];
185        }
186
187        do {
188                done = TRUE;
189                for (i = 0; i < num_sessions; i++) {
190                        timeout.tv_sec  = 0;
191                        timeout.tv_usec = 20000;
192                        mbus_qmsgf(sp[i]->mbus_engine, c_addr, FALSE, "mbus.waiting", "%s", token_e[i]);
193                        mbus_send(sp[i]->mbus_engine);
194                        mbus_recv(sp[i]->mbus_engine, (void *) sp[i], &timeout);
195                        mbus_heartbeat(sp[i]->mbus_engine, 1);
196                        mbus_retransmit(sp[i]->mbus_engine);
197                        done &= !sp[i]->mbus_go;
198                }
199        } while (!done);
200        debug_msg("Got all needed mbus.go() messages from controller\n");
201
202        for (i = 0; i < num_sessions; i++) {
203                assert(sp[i]->rtp_session[0] != NULL);
204        }
205        return TRUE;
206}
207
208int main(int argc, char *argv[])
209{
210        uint32_t         rtp_time = 0;
211        int              seed, elapsed_time, alc = 0, scnt = 0;
212        session_t       *sp[2];
213        struct timeval   timeout;
214        uint8_t          i, j;
215
216        debug_msg("Ratmedia started\n");
217
218#ifdef WIN32
219        SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
220#else
221        signal(SIGINT, signal_handler);
222        debug_set_core_dir(argv[0]);
223#endif
224
225        /* Setup things which are independent of the session. These should */
226        /* be create static data only, since it will be accessed by BOTH   */
227        /* instances when running as a transcoder.                         */
228        seed = (gethostid() << 8) | (getpid() & 0xff);
229        srand48(seed);
230        lbl_srandom(seed);
231
232        parse_args(argc, argv);
233        converters_init();
234        audio_init_interfaces();
235
236        /* Initialize the session structure, and all session specific data */
237        for (i = 0; i < num_sessions; i++) {
238                sp[i] = (session_t *) xmalloc(sizeof(session_t));
239                session_init(sp[i], i, (num_sessions == 1)?AUDIO_TOOL:TRANSCODER);
240                audio_device_get_safe_config(&sp[i]->new_config);
241                audio_device_reconfigure(sp[i]);
242                assert(audio_device_is_open(sp[i]->audio_device));
243
244                settings_load_early(sp[i]);
245                /* Initialise our mbus interface... once this is done we can talk to our controller */
246                sp[i]->mbus_engine_addr = (char *) xmalloc(strlen(MBUS_ADDR_ENGINE) + 15);
247                sprintf(sp[i]->mbus_engine_addr, MBUS_ADDR_ENGINE, i, (unsigned long) ppid);
248                sp[i]->mbus_engine = mbus_init(mbus_engine_rx, mbus_error_handler, sp[i]->mbus_engine_addr);
249                if (sp[i]->mbus_engine == NULL) {
250                        fatal_error("RAT v" RAT_VERSION, "Could not initialize Mbus: Is multicast enabled?");
251                        return FALSE;
252                }
253        }
254        if (num_sessions == 2) {
255                sp[0]->other_session = sp[1];
256                sp[1]->other_session = sp[0];
257        }
258
259        if (rendezvous_with_controller(sp) == FALSE) {
260                fatal_error("RAT v" RAT_VERSION, "RATmedia could not rendezvous_with_controller - Firewall/VPN problem");
261                return FALSE;
262        }
263
264        /* Moved session[0] setup to rx_rtp_addr so one may change addr
265           binding dynamically */
266        /* Load saved settings, and create the participant database... */
267        /* FIXME: probably needs updating for the transcoder so we can */
268        /*        have different saved settings for each domain.       */
269        /* FIXME: this gets the wrong device name for the transcoder.  */
270        for (i = 0; i < num_sessions; i++) {
271                if (pdb_create(&sp[i]->pdb) == FALSE) {
272                        debug_msg("Failed to create participant database\n");
273                        abort();
274                }
275                pdb_item_create(sp[i]->pdb, (uint16_t)ts_get_freq(sp[i]->cur_ts), rtp_my_ssrc(sp[i]->rtp_session[0]));
276                settings_load_late(sp[i]);
277                session_validate(sp[i]);
278        }
279
280        xmemchk();
281        xdoneinit();
282
283        debug_msg("Entering media engine main loop\n");
284        while (!should_exit) {
285                for (i = 0; i < num_sessions; i++) {
286                        elapsed_time = 0;
287                        alc++;
288
289                        /* Process audio */
290                        if (num_sessions == 2) {
291                                elapsed_time = audio_rw_process(sp[i], sp[1-i], sp[i]->ms);
292                        } else {
293                                elapsed_time = audio_rw_process(sp[i], sp[i], sp[i]->ms);
294                        }
295
296                        if (tx_is_sending(sp[i]->tb)) {
297                                tx_process_audio(sp[i]->tb);
298                                tx_send(sp[i]->tb);
299                        }
300
301                        /* Process RTP/RTCP packets */
302                        timeout.tv_sec  = 0;
303                        timeout.tv_usec = 0;
304                        for (j = 0; j < sp[i]->rtp_session_count; j++) {
305                                rtp_time = tx_get_rtp_time(sp[i]);
306                                while(rtp_recv(sp[i]->rtp_session[j], &timeout, rtp_time));
307                                rtp_send_ctrl(sp[i]->rtp_session[j], rtp_time, rtcp_app_site_callback);
308                                rtp_update(sp[i]->rtp_session[j]);
309                        }
310
311                        /* Process mbus */
312                        timeout.tv_sec  = 0;
313                        timeout.tv_usec = 0;
314                        mbus_recv(sp[i]->mbus_engine, (void *) sp[i], &timeout);
315                        mbus_heartbeat(sp[i]->mbus_engine, 1);
316                        mbus_retransmit(sp[i]->mbus_engine);
317                        mbus_send(sp[i]->mbus_engine);
318
319                        /* Process and mix active sources */
320                        if (sp[i]->playing_audio) {
321                                struct s_source *s;
322                                int              sidx;
323                                timestamp_t      cush_ts;
324
325                                session_validate(sp[i]);
326                                cush_ts = ts_map32(ts_get_freq(sp[i]->cur_ts), cushion_get_size(sp[i]->cushion));
327                                cush_ts = ts_add(sp[i]->cur_ts, cush_ts);
328                                scnt = (int)source_list_source_count(sp[i]->active_sources);
329                                for(sidx = 0; sidx < scnt; sidx++) {
330                                        s = source_list_get_source_no(sp[i]->active_sources, sidx);
331                                        if (source_relevant(s, sp[i]->cur_ts)) {
332                                                pdb_entry_t *e;
333                                                timestamp_t         two_secs, delta;
334                                                source_check_buffering(s);
335                                                source_process(sp[i], s, sp[i]->cur_ts, cush_ts);
336                                                source_audit(s);
337                                                /* Check for UI update necessary, updating once per 2 secs */
338                                                pdb_item_get(sp[i]->pdb, source_get_ssrc(s), &e);
339                                                delta    = ts_sub(sp[i]->cur_ts, e->last_ui_update);
340                                                two_secs = ts_map32(8000, 16000);
341                                                if (ts_gt(delta, two_secs)) {
342                                                        ui_send_stats(sp[i], sp[i]->mbus_ui_addr, e->ssrc);
343                                                        e->last_ui_update = sp[i]->cur_ts;
344                                                }
345                                        } else {
346                                                /* Remove source as stopped */
347                                                uint32_t ssrc;
348                                                ssrc = source_get_ssrc(s);
349                                                ui_send_rtp_inactive(sp[i], sp[i]->mbus_ui_addr, ssrc);
350                                                source_remove(sp[i]->active_sources, s);
351                                                sidx--;
352                                                scnt--;
353                                        }
354                                }
355                                /* Play local file if playing */
356                                if (sp[i]->local_file_player) {
357                                        if (voxlet_play(sp[i]->local_file_player, sp[i]->cur_ts, cush_ts) == FALSE) {
358                                                voxlet_destroy(&sp[i]->local_file_player);
359                                        }
360                                }
361                                /* Play loopback tone if present */
362                                if (sp[i]->tone_generator) {
363                                        tonegen_play(sp[i]->tone_generator, sp[i]->cur_ts, cush_ts);
364                                }
365                        } else {
366                                /* Destroy localfile player if not playing audio */
367                                if (sp[i]->local_file_player) {
368                                        voxlet_destroy(&sp[i]->local_file_player);
369                                }
370                        }
371
372                        /* Echo Suppression - cut off transmitter when receiving     */
373                        /* audio, enable when stop receiving.                        */
374                        session_validate(sp[i]);
375                        if (sp[i]->echo_suppress) {
376                                if (scnt > 0) {
377                                        if (tx_is_sending(sp[i]->tb)) {
378                                                tx_stop(sp[i]->tb);
379                                                sp[i]->echo_tx_active = TRUE;
380                                                debug_msg("Echo suppressor (disabling tx)\n");
381                                        }
382                                } else if (sp[i]->echo_tx_active) {
383                                        /* Transmitter was stopped because we were   */
384                                        /* playing out audio.  Restart it.           */
385                                        if (tx_is_sending(sp[i]->tb) == FALSE) {
386                                                tx_start(sp[i]->tb);
387                                                debug_msg("Echo suppressor (enabling tx)\n");
388                                        }
389                                        sp[i]->echo_tx_active = FALSE;
390                                }
391                        }
392                        /* Lecture Mode */
393                        if ((alc % 50) == 0) {
394                                if (!sp[i]->lecture && tx_is_sending(sp[i]->tb) && sp[i]->auto_lecture != 0) {
395                                        struct timeval   time;
396                                        gettimeofday(&time, NULL);
397                                        if (time.tv_sec - sp[i]->auto_lecture > 120) {
398                                                sp[i]->auto_lecture = 0;
399                                                debug_msg("Dummy lecture mode\n");
400                                        }
401                                }
402                        }
403
404                        if (sp[i]->ui_on) {
405                                /* We have a user interface... do any periodic updates needed. */
406                                if (sp[i]->audio_device && elapsed_time != 0) {
407                                        ui_send_periodic_updates(sp[i], sp[i]->mbus_ui_addr, elapsed_time);
408                                }
409                        } else {
410                                /* We don't yet have a user interface... send out a message soliciting one... */
411                                if ((alc % 25) == 0) {
412                                        mbus_qmsgf(sp[i]->mbus_engine, "()", FALSE, "mbus.waiting", "\"rat-ui-requested\"");
413                                }
414                        }
415                        if (sp[i]->new_config != NULL) {
416                                /* wait for mbus messages - closing audio device    */
417                                /* can timeout unprocessed messages as some drivers */
418                                /* pause to drain before closing.                   */
419                                network_process_mbus(sp[i]);
420                                if (audio_device_reconfigure(sp[i])) {
421                                        int saved_playing_audio = sp[i]->playing_audio;
422                                        /* Device reconfig takes a second or two, discard
423                                         * data that has arrived during this time
424                                         */
425                                        sp[i]->playing_audio = 0;
426                                        timeout.tv_sec  = 0;
427                                        timeout.tv_usec = 0;
428                                        for (j = 0; j < sp[i]->rtp_session_count; j++) {
429                                                while(rtp_recv(sp[i]->rtp_session[j], &timeout, rtp_time));
430                                        }
431                                        sp[i]->playing_audio = saved_playing_audio;
432                                        /* Device reconfigured so decode paths of all sources */
433                                        /* are misconfigured. Delete the source, and incoming */
434                                        /* data will drive the correct new path.              */
435                                        source_list_clear(sp[i]->active_sources);
436                                        ui_send_audio_update(sp[i], sp[i]->mbus_ui_addr);
437                                        if (sp[i]->local_file_player) {
438                                                voxlet_destroy(&sp[i]->local_file_player);
439                                        }
440                                }
441                        }
442                       
443                        /* Choke CPU usage */
444                        if (!audio_is_ready(sp[i]->audio_device)) {
445                                audio_wait_for(sp[i]->audio_device, 40);
446                        }
447
448                        /* Check controller is still alive */
449                        if (mbus_addr_valid(sp[i]->mbus_engine, c_addr) == FALSE) {
450                                should_exit = TRUE;
451                                debug_msg("Controller address is no longer valid.  Assuming it exited\n");
452                        }
453
454                        /* Debugging sanity checking... */
455                        session_validate(sp[i]);
456                        xmemchk();
457                }
458        }
459
460        for (i = 0; i < num_sessions; i++) {
461                settings_save(sp[i]);
462                tx_stop(sp[i]->tb);
463
464                for (j = 0; j < sp[i]->rtp_session_count; j++) {
465                        debug_msg("Media engine exiting session: %d\n", j);
466                        rtp_send_bye(sp[i]->rtp_session[j]);
467                        rtp_callback_exit(sp[i]->rtp_session[j]);
468                        rtp_done(sp[i]->rtp_session[j]);
469                }
470
471                /* Inform other processes that we're about to quit... */
472                mbus_qmsgf(sp[i]->mbus_engine, "()", FALSE, "mbus.bye", "");
473                mbus_send(sp[i]->mbus_engine);
474
475                /* Free audio device and clean up */
476                audio_device_release(sp[i], sp[i]->audio_device);
477                audio_free_interfaces();
478        }
479       
480        /* FIXME: This should be integrated into the previous loop, so instead of */
481        /*        sending mbus.bye() ourselves, we just call mbus_exit(...) which */
482        /*        sends a bye and shuts everything down cleanly for us.           */
483        /* FIXME: Needs updating for transcoder...                                */
484        if (mbus_addr_valid(sp[0]->mbus_engine, c_addr)) {
485                do {
486                        struct timeval   timeout;
487                        mbus_send(sp[0]->mbus_engine);
488                        /* At this stage we no longer care about acks for messages */
489                        /* mbus_retransmit(sp[0]->mbus_engine); */
490                        timeout.tv_sec  = 0;
491                        timeout.tv_usec = 20000;
492                        mbus_recv(sp[0]->mbus_engine, sp[0], &timeout);
493                } while (!mbus_sent_all(sp[0]->mbus_engine) && mbus_shutdown_error == FALSE);
494        }
495        mbus_exit(sp[0]->mbus_engine);
496
497        for (i = 0; i < num_sessions; i++) {
498                if (sp[i]->logger != NULL) {
499                        fclose(sp[i]->logger);
500                }
501                session_validate(sp[i]);
502                session_exit(sp[i]);
503                xfree(token[i]);
504                xfree(token_e[i]);
505        }
506       
507        converters_free();
508        xfree(c_addr);
509        xmemdmp();
510        debug_msg("Media engine exit\n");
511        return 0;
512}
Note: See TracBrowser for help on using the browser.