/* input.c * - Main producer control loop. Fetches data from input modules, and controls * submission of these to the instance threads. Timing control happens here. * * $Id: input.c,v 1.32 2004/03/11 17:22:59 karl Exp $ * * Copyright (c) 2001 Michael Smith * * This program is distributed under the terms of the GNU General * Public License, version 2. You may use, modify, and redistribute * it under the terms of this license. A copy should be included * with this source. */ #ifdef HAVE_CONFIG_H #include #endif #include #include #include #ifdef HAVE_STDINT_H # include #endif #include #include #include #include #include #include "cfgparse.h" #include "stream.h" #include "input.h" #include "event.h" #include "inputmodule.h" #include "im_playlist.h" #include "im_stdinpcm.h" #ifdef HAVE_OSS #include "im_oss.h" #endif #ifdef HAVE_SUN_AUDIO #include "im_sun.h" #endif #ifdef HAVE_ALSA #include "im_alsa.h" #endif #ifdef _WIN32 typedef __int64 int64_t typedef unsigned __int64 uint64_t #endif #define MODULE "input/" #include "logging.h" #define MAX_BUFFER_FAILURES 15 typedef struct _timing_control_tag { uint64_t starttime; uint64_t senttime; uint64_t samples; uint64_t oldsamples; unsigned samplerate; long serialno; } timing_control; typedef struct _module { char *name; input_module_t *(*open)(module_param_t *params); } module; static module modules[] = { { "playlist", playlist_open_module}, { "stdinpcm", stdin_open_module}, #ifdef HAVE_OSS { "oss", oss_open_module}, #endif #ifdef HAVE_SUN_AUDIO { "sun", sun_open_module}, #endif #ifdef HAVE_ALSA { "alsa", alsa_open_module}, #endif {NULL,NULL} }; static timing_control control; /* This is identical to shout_sync(), really. */ void input_sleep(void) { int64_t sleep; /* no need to sleep if we haven't sent data */ if (control.senttime == 0) return; sleep = ((double)control.senttime / 1000) - (timing_get_time() - control.starttime); /* trap for long sleeps, typically indicating a clock change. it's not */ /* perfect though, as low bitrate/low samplerate vorbis can trigger this */ if(sleep > 8000) { LOG_WARN1("Extended sleep requested (%ld ms), sleeping for 5 seconds", sleep); timing_sleep(5000); } else if(sleep > 0) timing_sleep((uint64_t)sleep); } int input_calculate_pcm_sleep(unsigned bytes, unsigned bytes_per_sec) { control.senttime += ((uint64_t)bytes * 1000000)/bytes_per_sec; return 0; } int input_calculate_ogg_sleep(ogg_page *page) { static ogg_stream_state os; ogg_packet op; static vorbis_info vi; static vorbis_comment vc; static int need_start_pos, need_headers, state_in_use = 0; static int serialno = 0; static uint64_t offset; static uint64_t first_granulepos; if (ogg_page_granulepos(page) == -1) { LOG_ERROR0("Timing control: corrupt timing information in vorbis file, cannot stream."); return -1; } if (ogg_page_bos (page)) { control.oldsamples = 0; if (state_in_use) ogg_stream_clear (&os); ogg_stream_init (&os, ogg_page_serialno (page)); serialno = ogg_page_serialno (page); state_in_use = 1; vorbis_info_init (&vi); vorbis_comment_init (&vc); need_start_pos = 1; need_headers = 3; offset = (uint64_t)0; } if (need_start_pos) { int found_first_granulepos = 0; ogg_stream_pagein (&os, page); while (ogg_stream_packetout (&os, &op) == 1) { if (need_headers) { if (vorbis_synthesis_headerin (&vi, &vc, &op) < 0) { LOG_ERROR0("Timing control: can't determine sample rate for input, not vorbis."); control.samplerate = 0; return -1; } need_headers--; control.samplerate = vi.rate; if (need_headers == 0) { vorbis_comment_clear (&vc); first_granulepos = (uint64_t)0; return 0; } continue; } /* headers have been read */ if (first_granulepos == 0 && op.granulepos > 0) { first_granulepos = op.granulepos; found_first_granulepos = 1; } offset += vorbis_packet_blocksize (&vi, &op) / 4; } if (!found_first_granulepos) return 0; need_start_pos = 0; control.oldsamples = first_granulepos - offset; vorbis_info_clear (&vi); ogg_stream_clear (&os); state_in_use = 0; } if (serialno != ogg_page_serialno (page)) { LOG_ERROR0 ("Found page which does not belong to current logical stream"); return -1; } control.samples = ogg_page_granulepos (page) - control.oldsamples; control.oldsamples = ogg_page_granulepos (page); control.senttime += ((uint64_t)control.samples * 1000000 / (uint64_t)control.samplerate); return 0; } void input_flush_queue(buffer_queue *queue, int keep_critical) { queue_item *item, *next, *prev=NULL; LOG_DEBUG0("Input queue flush requested"); thread_mutex_lock(&queue->lock); if(!queue->head) { thread_mutex_unlock(&queue->lock); return; } item = queue->head; while(item) { next = item->next; if(!(keep_critical && item->buf->critical)) { thread_mutex_lock(&ices_config->refcount_lock); item->buf->count--; if(!item->buf->count) { free(item->buf->buf); free(item->buf); } thread_mutex_unlock(&ices_config->refcount_lock); if(prev) prev->next = next; else queue->head = next; free(item); item = next; queue->length--; } else { prev = item; item = next; } } /* Now, fix up the tail pointer */ queue->tail = NULL; item = queue->head; while(item) { queue->tail = item; item = item->next; } thread_mutex_unlock(&queue->lock); } void input_loop(void) { input_module_t *inmod=NULL; instance_t *instance, *prev, *next; queue_item *queued; int shutdown = 0; int current_module = 0; int valid_stream = 1; int inc_count; int not_waiting_for_critical; thread_cond_create(&ices_config->queue_cond); thread_cond_create(&ices_config->event_pending_cond); thread_mutex_create(&ices_config->refcount_lock); thread_mutex_create(&ices_config->flush_lock); memset (&control, 0, sizeof (control)); while(ices_config->playlist_module && modules[current_module].open) { if(!strcmp(ices_config->playlist_module, modules[current_module].name)) { inmod = modules[current_module].open(ices_config->module_params); break; } current_module++; } if(!inmod) { LOG_ERROR1("Couldn't initialise input module \"%s\"", ices_config->playlist_module); return; } ices_config->inmod = inmod; /* ok, basic config stuff done. Now, we want to start all our listening * threads. */ instance = ices_config->instances; while(instance) { stream_description *arg = calloc(1, sizeof(stream_description)); arg->stream = instance; arg->input = inmod; /* if(instance->savefilename != NULL) thread_create("savefile", savefile_stream, arg, 1); */ thread_create("stream", ices_instance_stream, arg, 1); instance = instance->next; } /* treat as if a signal has arrived straight away */ signal_usr1_handler (0); /* now we go into the main loop * We shut down the main thread ONLY once all the instances * have killed themselves. */ while(!shutdown) { ref_buffer *chunk = calloc(1, sizeof(ref_buffer)); buffer_queue *current; int ret; instance = ices_config->instances; prev = NULL; while(instance) { /* if an instance has died, get rid of it ** this should be replaced with something that tries to ** restart the instance, probably. */ if (instance->died) { LOG_DEBUG0("An instance died, removing it"); next = instance->next; if (prev) prev->next = next; else ices_config->instances = next; /* Just in case, flush any existing buffers * Locks shouldn't be needed, but lets be SURE */ thread_mutex_lock(&ices_config->flush_lock); input_flush_queue(instance->queue, 0); thread_mutex_unlock(&ices_config->flush_lock); config_free_instance(instance); free(instance); instance = next; continue; } prev = instance; instance = instance->next; } instance = ices_config->instances; if(!instance) { shutdown = 1; free(chunk); continue; } if(ices_config->shutdown) /* We've been signalled to shut down, but */ { /* the instances haven't done so yet... */ timing_sleep(250); /* sleep for quarter of a second */ free(chunk); continue; } /* If this is the first time through, set initial time. This should * be done before the call to inmod->getdata() below, in order to * properly keep time if this input module blocks. */ if (control.starttime == 0) control.starttime = timing_get_time(); /* get a chunk of data from the input module */ ret = inmod->getdata(inmod->internal, chunk); /* input module signalled non-fatal error. Skip this chunk */ if(ret==0) { free(chunk); continue; } /* Input module signalled fatal error, shut down - nothing we can do * from here */ if(ret < 0) { ices_config->shutdown = 1; thread_cond_broadcast(&ices_config->queue_cond); free(chunk); continue; } if(chunk->critical) valid_stream = 1; if(ret < 0) { /* Tell the input module to go to the next track, hopefully allowing * resync. */ ices_config->inmod->handle_event(ices_config->inmod, EVENT_NEXTTRACK,NULL); valid_stream = 0; } inc_count = 0; not_waiting_for_critical = 0; if(valid_stream) { while(instance) { if(instance->wait_for_critical && !chunk->critical) { instance = instance->next; continue; } not_waiting_for_critical = 1; if(instance->skip) { instance = instance->next; continue; } queued = malloc(sizeof(queue_item)); queued->buf = chunk; current = instance->queue; inc_count++; thread_mutex_lock(¤t->lock); if(current->head == NULL) { current->head = current->tail = queued; current->head->next = current->tail->next = NULL; } else { current->tail->next = queued; queued->next = NULL; current->tail = queued; } current->length++; thread_mutex_unlock(¤t->lock); instance = instance->next; } } /* If everything is waiting for a critical buffer, force one * early. (This will take effect on the next pass through) */ if(valid_stream && !not_waiting_for_critical) { ices_config->inmod->handle_event(ices_config->inmod, EVENT_NEXTTRACK,NULL); instance = ices_config->instances; while(instance) { thread_mutex_lock(&ices_config->flush_lock); input_flush_queue(instance->queue, 0); instance->wait_for_critical = 0; thread_mutex_unlock(&ices_config->flush_lock); instance = instance->next; } } /* Make sure we don't end up with a 0-refcount buffer that * will never hit any of the free points. (this happens * if all threads are set to skip, for example). */ thread_mutex_lock(&ices_config->refcount_lock); chunk->count += inc_count; if(!chunk->count) { free(chunk->buf); free(chunk); } thread_mutex_unlock(&ices_config->refcount_lock); if(valid_stream) { /* wake up the instances */ thread_cond_broadcast(&ices_config->queue_cond); } } LOG_INFO0 ("All instances removed, shutting down..."); ices_config->shutdown = 1; thread_cond_broadcast(&ices_config->event_pending_cond); timing_sleep(250); /* sleep for quarter of a second */ thread_cond_destroy(&ices_config->queue_cond); thread_cond_destroy(&ices_config->event_pending_cond); thread_mutex_destroy(&ices_config->flush_lock); thread_mutex_destroy(&ices_config->refcount_lock); inmod->handle_event(inmod, EVENT_SHUTDOWN, NULL); return; }