%{ #include "dyna2.h" #include "dpimpl.h" %} %{ #include "dbthread.h" %func writes out thread structure for the use in the loop code %% boolean write_thread_struct(DYNFILE * dfp,DPImplementation * dpi,GenericMatrix * gm) { int i; boolean qdb = FALSE; boolean tdb = FALSE; if( gm->qtype != NULL && gm->qtype->is_database == TRUE) { if( gm->qtype->is_thread_safe == FALSE ) { warn("Query is a database but not using a thread safe library. Can't use it!"); return FALSE; } else { qdb = TRUE; } } if( gm->ttype != NULL && gm->ttype->is_database == TRUE) { if( gm->ttype->is_thread_safe == FALSE) { warn("Target is a database but not using a thread safe library. Can't use it!"); return FALSE; } else { tdb = TRUE; } } fprintf(dfp->head,"#ifdef PTHREAD\n"); start_struct(dfp,"struct thread_pool_holder_%s",gm->name); if( qdb == FALSE ) { /* static query data structure common to all */ struct_expr(dfp,"%s %s;",gm->query->element_type,gm->query->name); add_end_comment_header(dfp,"Static query data: never free'd"); } else { struct_expr(dfp,"%s %s;",gm->qtype->real,gm->query->name); add_end_comment_header(dfp,"Query object placeholder"); struct_expr(dfp,"%s querydb;",gm->qtype->database_type); add_end_comment_header(dfp,"Query database object"); struct_expr(dfp,"boolean query_init;"); } if( tdb == FALSE ) { /* static query data structure common to all */ struct_expr(dfp,"%s %s;",gm->target->element_type,gm->target->name); add_end_comment_header(dfp,"Static target data: never free'd"); } else { struct_expr(dfp,"%s %s;",gm->ttype->real,gm->target->name); add_end_comment_header(dfp,"Target object placeholder"); struct_expr(dfp,"%s targetdb;",gm->ttype->database_type); add_end_comment_header(dfp,"Target database object"); struct_expr(dfp,"boolean target_init;"); } /* now add all the resources */ for(i=0;ires_len;i++) { struct_expr(dfp,"%s %s",gm->resource[i]->element_type,gm->resource[i]->name); } /* now add mutex's */ struct_expr(dfp,"pthread_mutex_t input_lock"); struct_expr(dfp,"pthread_mutex_t output_lock"); /* add output */ struct_expr(dfp,"Hscore * out"); /* add pthread_t */ struct_expr(dfp,"pthread_t * pool;"); struct_expr(dfp,"int number_of_threads;"); struct_expr(dfp,"boolean search_has_ended;"); if( dpi->db_trace_level > 0 ) { struct_expr(dfp,"DBSearchImpl * dbsi"); } /* close structure */ close_struct(dfp,";"); fprintf(dfp->head,"#endif /* PTHREAD */\n"); return TRUE; } %func Major loop for each thread. This function is really complicated by the way dynamite databases work - in particular the init function returns an object, so we have to keep track for each database if we need to init or not. Secondly we have to get out the information for each query and target regardless of whether they will score over the default or not. To do this we need to rely on the datascore storage allocator to be able to accept frees (annoying or what!). Here is the pseudo code for both a query or target db forever { get input lock if( search ended ) { unlock input lock break; } get datascore datatstructure if( query not init ) init query hard link query read query info into datastructure if( target not init ) init target else reload target if ( end of the target database ) free query data structure -- we have got it hard_linked! reload query database into holder if( end of the query database ) flag search finished unlock input lock else close target database set target to be initiated -- by next thread unlock input lock else unlock input lock do score if( should store ) get output lock add data score else get input lock (otherwise clash with datascore storage allocator) return data score back to storage unlock input lock } %% boolean write_thread_loop(DYNFILE * dfp,DPImplementation * dpi,GenericMatrix * gm) { FuncInfo * fi; boolean tdb,qdb; char * pre; tdb= qdb = FALSE; if( gm->qtype == NULL || gm->ttype == NULL ) { warn("No database types - must have dbtypes for pthreads support"); return FALSE; } if( gm->qtype != NULL && gm->qtype->is_database == TRUE) if( gm->qtype->is_thread_safe == FALSE) { warn("Query is a database but not using a thread safe library. Can't use it!"); return FALSE; } else { qdb = TRUE; } if( gm->ttype != NULL && gm->ttype->is_database == TRUE) if( gm->ttype->is_thread_safe == FALSE) { warn("Target is a database but not using a thread safe library. Can't use it!"); return FALSE; } else { tdb = TRUE; } fi = FuncInfo_named_from_varstr(FI_INTERNAL,"thread_loop_%s",gm->name); add_line_to_Ftext(fi->ft,"Infinite loop code foreach thread for %s",gm->name); macro(dfp,"#ifdef PTHREAD"); start_function_FuncInfo(fi,dfp,"void * thread_loop_%s(void * ptr)",gm->name); expr(dfp,"struct thread_pool_holder_%s * holder",gm->name); expr(dfp,"int db_status"); expr(dfp,"int score"); expr(dfp,"DataScore * ds;"); expr(dfp,"%s %s",gm->query->element_type,gm->query->name); expr(dfp,"%s %s",gm->target->element_type,gm->target->name); add_break(dfp); expr(dfp,"holder = (struct thread_pool_holder_%s *) ptr;",gm->name); if( dpi->db_trace_level >= 1 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 1 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Entering infinite loop for thread...\\n\");"); } add_break(dfp); expr(dfp,"while(1)"); startbrace_tag(dfp,"Infinite loop over all models"); add_block_comment(dfp,"Get input lock"); add_break(dfp); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"About to get input lock for main reload\\n\");"); } expr(dfp,"if( pthread_mutex_lock(&(holder->input_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error on getting input lock for %s\");",gm->name); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Got input lock for main reload\\n\");"); } add_break(dfp); expr(dfp,"if( holder->search_has_ended == TRUE )"); startbrace(dfp); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Database search finished for me!...\\n\");"); } expr(dfp,"if( pthread_mutex_unlock(&(holder->input_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error in releasing input lock for %s\");",gm->name); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Released lock and broken out of loop\\n\");"); } expr(dfp,"break;"); closebrace(dfp); add_break(dfp); add_block_comment(dfp,"Get storage space now, as we have to read in the info for the db now"); if( dpi->db_trace_level >= 3 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 3 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Getting new DataScore from storage...\\n\");"); } expr(dfp,"ds = new_DataScore();"); /* if there is a target database, read from it */ if( tdb == TRUE ) { if( qdb == TRUE ) { /* both query and target db */ add_break(dfp); add_block_comment(dfp,"We need to get our query object"); if( dpi->db_trace_level >= 3 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 3 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Starting query database...\\n\");"); } add_break(dfp); expr(dfp,"if( holder->query_init == FALSE)"); startbrace(dfp); expr(dfp,"holder->%s = %s(holder->querydb,&db_status);",gm->query->name,gm->qtype->init_func); expr(dfp,"holder->query_init = TRUE;"); expr(dfp,"if( db_status == DB_RETURN_ERROR )"); hang_expr(dfp,"fatal(\"Unable to initalise query database in %s search\");",gm->name); closebrace(dfp); expr(dfp,"%s = %s(holder->%s);",gm->query->name,gm->qtype->hard_link_func,gm->query->name); add_block_comment(dfp,"get query information into datascore"); expr(dfp,"%s(ds->query,%s,holder->querydb);",gm->qtype->dataentry_add,gm->query->name); } add_break(dfp); expr(dfp,"if( holder->target_init == FALSE )"); startbrace_tag(dfp,"if the db has not been init'd"); if( dpi->db_trace_level >= 3 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 3 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Starting target database...\\n\");"); } expr(dfp,"%s = %s(holder->targetdb,&db_status);",gm->target->name,gm->ttype->init_func); expr(dfp,"holder->target_init = TRUE;"); closebrace(dfp); expr(dfp,"else"); startbrace_tag(dfp,"Normal reload"); expr(dfp," %s = %s(NULL,holder->targetdb,&db_status)",gm->target->name,gm->ttype->reload_func); closebrace(dfp); add_break(dfp); add_block_comment(dfp,"Check to see what the reload is like"); add_break(dfp); expr(dfp,"if( db_status == DB_RETURN_ERROR )"); startbrace(dfp); expr(dfp,"fatal(\"In searching %s, Reload error on database %s, in threads\");",gm->name,gm->target->name); closebrace(dfp); if( qdb == TRUE ) { /* both query and target db */ add_break(dfp); expr(dfp,"if( db_status == DB_RETURN_END)"); startbrace_tag(dfp,"End of target database"); add_block_comment(dfp,"close target database and schedule it for initalisation by next thread"); expr(dfp,"%s(NULL,holder->targetdb);",gm->ttype->close_func); expr(dfp,"holder->target_init = FALSE;"); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Target Database to be reloaded...\\n\");"); } add_break(dfp); add_block_comment(dfp,"free'ing the query object"); expr(dfp,"%s(holder->%s);",gm->qtype->free_func,gm->query->name); add_block_comment(dfp,"get the next query object for the next thread"); expr(dfp,"holder->%s = %s(NULL,holder->querydb,&db_status);",gm->query->name,gm->qtype->reload_func); expr(dfp,"if( db_status == DB_RETURN_ERROR )"); hang_expr(dfp,"fatal(\"In searching %s, reload error on database %s, in threads\");",gm->name,gm->query->name); expr(dfp,"if( db_status == DB_RETURN_END )"); startbrace_tag(dfp,"last load!"); add_block_comment(dfp,"End of target and query database - finished search!"); expr(dfp,"%s(NULL,holder->querydb);",gm->qtype->close_func); expr(dfp,"holder->search_has_ended = TRUE;"); closebrace(dfp); add_break(dfp); add_block_comment(dfp,"release input mutex"); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Releasing input lock after end of target\\n\");"); } expr(dfp,"if( pthread_mutex_unlock(&(holder->input_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error in releasing input lock for %s\");",gm->name); expr(dfp,"continue;"); closebrace(dfp); /* end of if at end ! */ expr(dfp,"else "); startbrace_tag(dfp,"Normal reload"); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Releasing input lock for normal reload\\n\");"); } expr(dfp,"if( pthread_mutex_unlock(&(holder->input_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error in releasing input lock for %s\");",gm->name); closebrace(dfp); /* end of if at end ! */ add_block_comment(dfp,"get target information into datascore"); expr(dfp,"%s(ds->target,%s,holder->targetdb);",gm->ttype->dataentry_add,gm->target->name); } else { /* targetdb but no querydb */ expr(dfp,"if( db_status == DB_RETURN_END)"); startbrace_tag(dfp,"End of target database"); expr(dfp,"holder->search_has_ended = TRUE;"); expr(dfp,"%s(NULL,holder->targetdb)",gm->ttype->close_func); add_block_comment(dfp,"release input mutex"); expr(dfp,"if( pthread_mutex_unlock(&(holder->input_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error in releasing input lock for %s\");",gm->name); closebrace(dfp); /* end of if at end ! */ } } else { /* querydb but no targetdb */ expr(dfp,"if( holder->query_init == FALSE )"); startbrace_tag(dfp,"if the db has not been init'd"); expr(dfp,"%s = %s(holder->querydb,&db_status);",gm->query->name,gm->qtype->init_func); expr(dfp,"holder->query_init = TRUE;"); closebrace(dfp); expr(dfp,"else"); startbrace_tag(dfp,"Normal reload"); expr(dfp," %s = %s(NULL,holder->querydb,&db_status)",gm->query->name,gm->qtype->reload_func); closebrace(dfp); add_block_comment(dfp,"Check to see what the reload is like"); expr(dfp,"if( db_status == DB_RETURN_ERROR )"); startbrace(dfp); expr(dfp,"fatal(\"In searching %s, Reload error on database %s, in threads\");",gm->name,gm->query->name); closebrace(dfp); expr(dfp,"if( db_status == DB_RETURN_END)"); startbrace_tag(dfp,"End of query database"); expr(dfp,"holder->search_has_ended = TRUE;"); expr(dfp,"%s(NULL,holder->querydb)",gm->qtype->close_func); add_block_comment(dfp,"release input mutex"); expr(dfp,"if( pthread_mutex_unlock(&(holder->input_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error in releasing input lock for %s\");",gm->name); closebrace(dfp); /* end of if at end ! */ add_block_comment(dfp,"get query information into datascore"); expr(dfp,"%s(ds->query,%s,holder->querydb);",gm->qtype->dataentry_add,gm->query->name); } add_break(dfp); add_block_comment(dfp,"Now there is a new query/target pair ready for comparison"); if( dpi->db_trace_level >= 1 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 1 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"A new pair to be compared...\\n\");"); } pre = get_pre_chainstr_GenericMatrix("holder->",gm); expr(dfp,"score = score_only_%s(%s)",gm->name,pre); ckfree(pre); add_break(dfp); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Getting output lock\\n\");"); } add_block_comment(dfp,"Getting lock on output"); expr(dfp,"if( pthread_mutex_lock(&(holder->output_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error on getting output lock for %s\");",gm->name); add_block_comment(dfp,"If the score is less than cutoff, schedule the datascore for reuse"); expr(dfp,"if( should_store_Hscore(holder->out,score) != TRUE)"); startbrace(dfp); expr(dfp,"free_DataScore(ds)"); closebrace(dfp); expr(dfp,"else"); startbrace_tag(dfp,"storing score"); expr(dfp,"ds->score = score"); expr(dfp,"add_Hscore(holder->out,ds)"); closebrace(dfp); expr(dfp,"if( pthread_mutex_unlock(&(holder->output_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error on releasing output lock for %s\");",gm->name); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Released output lock\\n\");"); } add_break(dfp); add_block_comment(dfp,"Now free database objects"); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"About to get input lock for free func\\n\");"); } expr(dfp,"if( pthread_mutex_lock(&(holder->input_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error on getting input lock for %s\");",gm->name); if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Got input lock for free func\\n\");"); } if( qdb == TRUE ) { expr(dfp,"%s(%s);",gm->qtype->free_func,gm->query->name); } if( tdb == TRUE ) { expr(dfp,"%s(%s);",gm->ttype->free_func,gm->target->name); } if( dpi->db_trace_level >= 2 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 2 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Releasing input lock after free'ing\\n\");"); } expr(dfp,"if( pthread_mutex_unlock(&(holder->input_lock))!= 0 )"); hang_expr(dfp,"fatal(\"Error in releasing input lock for %s\");",gm->name); closebrace(dfp); /* back to while */ add_break(dfp); if( dpi->db_trace_level >= 1 ) { expr(dfp,"if ( holder->dbsi->trace_level >= 1 )"); hang_expr(dfp,"fprintf(holder->dbsi->trace_file,\"Exiting forever loop\\n\");"); } expr(dfp,"return NULL;"); close_function(dfp); add_break(dfp); macro(dfp,"#endif /* PTHREAD */"); return 1; } %func makes an the argument calling string which is compatible with the arg_str from get_argstr_GenericMatrix, but with a pre placement in it eg "query,target,holder->comp_mat" %arg gm r structure holding generic matrix return allocated string (must free) with chained-args %% char * get_pre_chainstr_GenericMatrix(const char * pre,const GenericMatrix * gm) { char buffer[MAXLINE]; int i; sprintf(buffer,"%s, %s ",gm->query->name,gm->target->name); for(i=0;ires_len;i++) { strcat(buffer,","); strcat(buffer,pre); strcat(buffer,gm->resource[i]->name); } return stringalloc(buffer); } %}