#ifdef _cplusplus extern "C" { #endif #include "dbthread.h" /* Function: write_thread_struct(dfp,dpi,gm) * * Descrip: writes out thread structure * for the use in the loop code * * * Arg: dfp [UNKN ] Undocumented argument [DYNFILE *] * Arg: dpi [UNKN ] Undocumented argument [DPImplementation *] * Arg: gm [UNKN ] Undocumented argument [GenericMatrix *] * * Return [UNKN ] Undocumented return value [boolean] * */ # line 21 "dbthread.dy" boolean write_thread_struct(DYNFILE * dfp,DPImplementation * dpi,GenericMatrix * gm) { int i; boolean qdb = FALSE; boolean tdb = FALSE; if( gm->qtype != NULL && gm->qtype->is_database == TRUE) { if( gm->qtype->is_thread_safe == FALSE ) { warn("Query is a database but not using a thread safe library. Can't use it!"); return FALSE; } else { qdb = TRUE; } } if( gm->ttype != NULL && gm->ttype->is_database == TRUE) { if( gm->ttype->is_thread_safe == FALSE) { warn("Target is a database but not using a thread safe library. Can't use it!"); return FALSE; } else { tdb = TRUE; } } fprintf(dfp->head,"#ifdef PTHREAD\n"); start_struct(dfp,"struct thread_pool_holder_%s",gm->name); if( qdb == FALSE ) { /* static query data structure common to all */ struct_expr(dfp,"%s %s;",gm->query->element_type,gm->query->name); add_end_comment_header(dfp,"Static query data: never free'd"); } else { struct_expr(dfp,"%s %s;",gm->qtype->real,gm->query->name); add_end_comment_header(dfp,"Query object placeholder"); struct_expr(dfp,"%s querydb;",gm->qtype->database_type); add_end_comment_header(dfp,"Query database object"); struct_expr(dfp,"boolean query_init;"); } if( tdb == FALSE ) { /* static query data structure common to all */ struct_expr(dfp,"%s %s;",gm->target->element_type,gm->target->name); add_end_comment_header(dfp,"Static target data: never free'd"); } else { struct_expr(dfp,"%s %s;",gm->ttype->real,gm->target->name); add_end_comment_header(dfp,"Target object placeholder"); struct_expr(dfp,"%s targetdb;",gm->ttype->database_type); add_end_comment_header(dfp,"Target database object"); struct_expr(dfp,"boolean target_init;"); } /* now add all the resources */ for(i=0;ires_len;i++) { struct_expr(dfp,"%s %s",gm->resource[i]->element_type,gm->resource[i]->name); } /* now add mutex's */ struct_expr(dfp,"pthread_mutex_t input_lock"); struct_expr(dfp,"pthread_mutex_t output_lock"); /* add output */ struct_expr(dfp,"Hscore * out"); /* add pthread_t */ struct_expr(dfp,"pthread_t * pool;"); struct_expr(dfp,"int number_of_threads;"); struct_expr(dfp,"boolean search_has_ended;"); if( dpi->db_trace_level > 0 ) { struct_expr(dfp,"DBSearchImpl * dbsi"); } /* close structure */ close_struct(dfp,";"); fprintf(dfp->head,"#endif /* PTHREAD */\n"); return TRUE; } /* Function: write_thread_loop(dfp,dpi,gm) * * Descrip: Major loop for each thread. * * This function is really complicated by the way * dynamite databases work - in particular the init * function returns an object, so we have to keep * track for each database if we need to init or not. * * Secondly we have to get out the information for * each query and target regardless of whether they will * score over the default or not. To do this we need * to rely on the datascore storage allocator to be * able to accept frees (annoying or what!). * * Here is the pseudo code for both a query or target db * * forever { * get input lock * if( search ended ) { * unlock input lock * break; * } * * get datascore datatstructure * * if( query not init ) * init query * hard link query * read query info into datastructure * * if( target not init ) * init target * else * reload target * * if ( end of the target database ) * free query data structure -- we have got it hard_linked! * reload query database into holder * if( end of the query database ) * flag search finished * unlock input lock * else * close target database * set target to be initiated -- by next thread * unlock input lock * else * unlock input lock * * do score * * if( should store ) * get output lock * add data score * else * get input lock (otherwise clash with datascore storage allocator) * return data score back to storage * unlock input lock * } * * * Arg: dfp [UNKN ] Undocumented argument [DYNFILE *] * Arg: dpi [UNKN ] Undocumented argument [DPImplementation *] * Arg: gm [UNKN ] Undocumented argument [GenericMatrix *] * * Return [UNKN ] Undocumented return value [boolean] * */ # line 165 "dbthread.dy" boolean write_thread_loop(DYNFILE * dfp,DPImplementation * dpi,GenericMatrix * gm) { FuncInfo * fi; boolean tdb,qdb; char * pre; tdb= qdb = FALSE; if( gm->qtype == NULL || gm->ttype == NULL ) { warn("No database types - must have dbtypes for pthreads support"); return FALSE; } if( gm->qtype != NULL && gm->qtype->is_database == TRUE) if( gm->qtype->is_thread_safe == FALSE) { warn("Query is a database but not using a thread safe library. Can't use it!"); return FALSE; } else { qdb = TRUE; } if( gm->ttype != NULL && gm->ttype->is_database == TRUE) if( gm->ttype->is_thread_safe == FALSE) { warn("Target is a database but not using a thread safe library. Can't use it!"); return FALSE; } else { tdb = TRUE; } fi = FuncInfo_named_from_varstr(FI_INTERNAL,"thread_loop_%s",gm->name); add_line_to_Ftext(fi->ft,"Infinite loop code foreach thread for %s",gm->name); macro(dfp,"#ifdef PTHREAD"); start_function_FuncInfo(fi,dfp,"void * thread_loop_%s(void * ptr)",gm->name); expr(dfp,"struct thread_pool_holder_%s * holder",gm->name); expr(dfp,"int db_status"); expr(dfp,"int score"); expr(dfp,"DataScore * ds;"); expr(dfp,"%s %s",gm->query->element_type,gm->query->name); expr(dfp,"%s %s",gm->target->element_type,gm->target->name); add_break(dfp); expr(dfp,"holder = (struct thread_pool_holder_%s *) ptr;",gm->name); if( dpi->db_trace_level >= 1 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 1 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Entering infinite loop for thread...\\n\");"); } add_break(dfp); expr(dfp,"while(1)"); startbrace_tag(dfp,"Infinite loop over all models"); add_block_comment(dfp,"Get input lock"); add_break(dfp); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"About to get input lock for main reload\\n\");"); } expr(dfp,"if( pthread_mutex_lock(&(holder->input_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error on getting input lock for %s\");",gm->name); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Got input lock for main reload\\n\");"); } add_break(dfp); expr(dfp,"if( holder->search_has_ended == TRUE )"); startbrace(dfp); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Database search finished for me!...\\n\");"); } expr(dfp,"if( pthread_mutex_unlock(&(holder->input_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error in releasing input lock for %s\");",gm->name); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Released lock and broken out of loop\\n\");"); } expr(dfp,"break;"); closebrace(dfp); add_break(dfp); add_block_comment(dfp,"Get storage space now, as we have to read in the info for the db now"); if( dpi->db_trace_level >= 3 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 3 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Getting new DataScore from storage...\\n\");"); } expr(dfp,"ds = new_DataScore();"); /* if there is a target database, read from it */ if( tdb == TRUE ) { if( qdb == TRUE ) { /* both query and target db */ add_break(dfp); add_block_comment(dfp,"We need to get our query object"); if( dpi->db_trace_level >= 3 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 3 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Starting query database...\\n\");"); } add_break(dfp); expr(dfp,"if( holder->query_init == FALSE)"); startbrace(dfp); expr(dfp,"holder->%s = %s(holder->querydb,&db_status);",gm->query->name,gm->qtype->init_func); expr(dfp,"holder->query_init = TRUE;"); expr(dfp,"if( db_status == DB_RETURN_ERROR )"); hang_expr(dfp,"fatal(\"Unable to initalise query database in %s search\");",gm->name); closebrace(dfp); expr(dfp,"%s = %s(holder->%s);",gm->query->name,gm->qtype->hard_link_func,gm->query->name); add_block_comment(dfp,"get query information into datascore"); expr(dfp,"%s(ds->query,%s,holder->querydb);",gm->qtype->dataentry_add,gm->query->name); } add_break(dfp); expr(dfp,"if( holder->target_init == FALSE )"); startbrace_tag(dfp,"if the db has not been init'd"); if( dpi->db_trace_level >= 3 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 3 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Starting target database...\\n\");"); } expr(dfp,"%s = %s(holder->targetdb,&db_status);",gm->target->name,gm->ttype->init_func); expr(dfp,"holder->target_init = TRUE;"); closebrace(dfp); expr(dfp,"else"); startbrace_tag(dfp,"Normal reload"); expr(dfp," %s = %s(NULL,holder->targetdb,&db_status)",gm->target->name,gm->ttype->reload_func); closebrace(dfp); add_break(dfp); add_block_comment(dfp,"Check to see what the reload is like"); add_break(dfp); expr(dfp,"if( db_status == DB_RETURN_ERROR )"); startbrace(dfp); expr(dfp,"fatal(\"In searching %s, Reload error on database %s, in threads\");",gm->name,gm->target->name); closebrace(dfp); if( qdb == TRUE ) { /* both query and target db */ add_break(dfp); expr(dfp,"if( db_status == DB_RETURN_END)"); startbrace_tag(dfp,"End of target database"); add_block_comment(dfp,"close target database and schedule it for initalisation by next thread"); expr(dfp,"%s(NULL,holder->targetdb);",gm->ttype->close_func); expr(dfp,"holder->target_init = FALSE;"); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Target Database to be reloaded...\\n\");"); } add_break(dfp); add_block_comment(dfp,"free'ing the query object"); expr(dfp,"%s(holder->%s);",gm->qtype->free_func,gm->query->name); add_block_comment(dfp,"get the next query object for the next thread"); expr(dfp,"holder->%s = %s(NULL,holder->querydb,&db_status);",gm->query->name,gm->qtype->reload_func); expr(dfp,"if( db_status == DB_RETURN_ERROR )"); hang_expr(dfp,"fatal(\"In searching %s, reload error on database %s, in threads\");",gm->name,gm->query->name); expr(dfp,"if( db_status == DB_RETURN_END )"); startbrace_tag(dfp,"last load!"); add_block_comment(dfp,"End of target and query database - finished search!"); expr(dfp,"%s(NULL,holder->querydb);",gm->qtype->close_func); expr(dfp,"holder->search_has_ended = TRUE;"); closebrace(dfp); add_break(dfp); add_block_comment(dfp,"release input mutex"); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Releasing input lock after end of target\\n\");"); } expr(dfp,"if( pthread_mutex_unlock(&(holder->input_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error in releasing input lock for %s\");",gm->name); expr(dfp,"continue;"); closebrace(dfp); /* end of if at end ! */ expr(dfp,"else "); startbrace_tag(dfp,"Normal reload"); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Releasing input lock for normal reload\\n\");"); } expr(dfp,"if( pthread_mutex_unlock(&(holder->input_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error in releasing input lock for %s\");",gm->name); closebrace(dfp); /* end of if at end ! */ add_block_comment(dfp,"get target information into datascore"); expr(dfp,"%s(ds->target,%s,holder->targetdb);",gm->ttype->dataentry_add,gm->target->name); } else { /* targetdb but no querydb */ expr(dfp,"if( db_status == DB_RETURN_END)"); startbrace_tag(dfp,"End of target database"); expr(dfp,"holder->search_has_ended = TRUE;"); expr(dfp,"%s(NULL,holder->targetdb)",gm->ttype->close_func); add_block_comment(dfp,"release input mutex"); expr(dfp,"if( pthread_mutex_unlock(&(holder->input_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error in releasing input lock for %s\");",gm->name); closebrace(dfp); /* end of if at end ! */ } } else { /* querydb but no targetdb */ expr(dfp,"if( holder->query_init == FALSE )"); startbrace_tag(dfp,"if the db has not been init'd"); expr(dfp,"%s = %s(holder->querydb,&db_status);",gm->query->name,gm->qtype->init_func); expr(dfp,"holder->query_init = TRUE;"); closebrace(dfp); expr(dfp,"else"); startbrace_tag(dfp,"Normal reload"); expr(dfp," %s = %s(NULL,holder->querydb,&db_status)",gm->query->name,gm->qtype->reload_func); closebrace(dfp); add_block_comment(dfp,"Check to see what the reload is like"); expr(dfp,"if( db_status == DB_RETURN_ERROR )"); startbrace(dfp); expr(dfp,"fatal(\"In searching %s, Reload error on database %s, in threads\");",gm->name,gm->query->name); closebrace(dfp); expr(dfp,"if( db_status == DB_RETURN_END)"); startbrace_tag(dfp,"End of query database"); expr(dfp,"holder->search_has_ended = TRUE;"); expr(dfp,"%s(NULL,holder->querydb)",gm->qtype->close_func); add_block_comment(dfp,"release input mutex"); expr(dfp,"if( pthread_mutex_unlock(&(holder->input_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error in releasing input lock for %s\");",gm->name); closebrace(dfp); /* end of if at end ! */ add_block_comment(dfp,"get query information into datascore"); expr(dfp,"%s(ds->query,%s,holder->querydb);",gm->qtype->dataentry_add,gm->query->name); } add_break(dfp); add_block_comment(dfp,"Now there is a new query/target pair ready for comparison"); if( dpi->db_trace_level >= 1 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 1 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"A new pair to be compared...\\n\");"); } pre = get_pre_chainstr_GenericMatrix("holder->",gm); expr(dfp,"score = score_only_%s(%s)",gm->name,pre); ckfree(pre); add_break(dfp); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Getting output lock\\n\");"); } add_block_comment(dfp,"Getting lock on output"); expr(dfp,"if( pthread_mutex_lock(&(holder->output_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error on getting output lock for %s\");",gm->name); add_block_comment(dfp,"If the score is less than cutoff, schedule the datascore for reuse"); expr(dfp,"if( should_store_Hscore(holder->out,score) != TRUE)"); startbrace(dfp); expr(dfp,"free_DataScore(ds)"); closebrace(dfp); expr(dfp,"else"); startbrace_tag(dfp,"storing score"); expr(dfp,"ds->score = score"); expr(dfp,"add_Hscore(holder->out,ds)"); closebrace(dfp); expr(dfp,"if( pthread_mutex_unlock(&(holder->output_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error on releasing output lock for %s\");",gm->name); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Released output lock\\n\");"); } add_break(dfp); add_block_comment(dfp,"Now free database objects"); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"About to get input lock for free func\\n\");"); } expr(dfp,"if( pthread_mutex_lock(&(holder->input_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error on getting input lock for %s\");",gm->name); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Got input lock for free func\\n\");"); } if( qdb == TRUE ) { expr(dfp,"%s(%s);",gm->qtype->free_func,gm->query->name); } if( tdb == TRUE ) { expr(dfp,"%s(%s);",gm->ttype->free_func,gm->target->name); } if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Releasing input lock after free'ing\\n\");"); } expr(dfp,"if( pthread_mutex_unlock(&(holder->input_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error in releasing input lock for %s\");",gm->name); closebrace(dfp); /* back to while */ add_break(dfp); if( dpi->db_trace_level >= 1 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 1 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Exiting forever loop\\n\");"); } expr(dfp,"return NULL;"); close_function(dfp); add_break(dfp); macro(dfp,"#endif /* PTHREAD */"); return 1; } /* Function: get_pre_chainstr_GenericMatrix(pre,gm) * * Descrip: makes an the argument calling string which * is compatible with the arg_str from * get_argstr_GenericMatrix, but with a * pre placement in it * * eg "query,target,holder->comp_mat" * * * Arg: pre [UNKN ] Undocumented argument [const char *] * Arg: gm [READ ] structure holding generic matrix [const GenericMatrix *] * * Return [UNKN ] allocated string (must free) with chained-args [char *] * */ # line 531 "dbthread.dy" char * get_pre_chainstr_GenericMatrix(const char * pre,const GenericMatrix * gm) { char buffer[MAXLINE]; int i; sprintf(buffer,"%s, %s ",gm->query->name,gm->target->name); for(i=0;ires_len;i++) { strcat(buffer,","); strcat(buffer,pre); strcat(buffer,gm->resource[i]->name); } return stringalloc(buffer); } # line 565 "dbthread.c" #ifdef _cplusplus } #endif