/*********************************************** !!!! DO NOT EDIT THIS FILE !!!! This file was auto-generated by Build.PL from lib/KinoSearch/Util/SortExternal.pm See KinoSearch::Docs::DevGuide for details. ***********************************************/ #line 278 "lib/KinoSearch/Util/SortExternal.pm" #include "KinoSearchUtilSortExternal.h" static SortExRun* Kino_SortEx_new_run(double, double); static void Kino_SortEx_grow_bufbuf(ByteBuf***, I32, I32); static I32 Kino_SortEx_refill_run(SortExternal*, SortExRun*); static void Kino_SortEx_refill_cache(SortExternal*); static void Kino_SortEx_merge_runs(SortExternal*); static ByteBuf* Kino_SortEx_find_endpost(SortExternal*); static I32 Kino_SortEx_define_slice(SortExRun*, ByteBuf*); static void Kino_SortEx_mergesort(ByteBuf**, ByteBuf**, I32); static void Kino_SortEx_msort(ByteBuf**, ByteBuf**, U32, U32); static void Kino_SortEx_merge(ByteBuf**, U32, ByteBuf**, U32, ByteBuf**); static void Kino_SortEx_clear_cache(SortExternal*); static void Kino_SortEx_clear_run_cache(SortExRun*); static void Kino_SortEx_destroy_run(SortExRun*); #define KINO_PER_ITEM_OVERHEAD (sizeof(ByteBuf) + sizeof(ByteBuf*)) SortExternal* Kino_SortEx_new(SV *outstream_sv, SV *invindex_sv, SV *seg_name_sv, I32 mem_threshold) { SortExternal *sortex; /* allocate */ Kino_New(0, sortex, 1, SortExternal); Kino_New(0, sortex->cache, 100, ByteBuf*); Kino_New(0, sortex->runs, 1, SortExRun*); /* init */ sortex->scratch = NULL; sortex->scratch_cap = 0; sortex->cache_cap = 100; sortex->cache_elems = 0; sortex->cache_pos = 0; sortex->cache_bytes = 0; sortex->num_runs = 0; sortex->instream_sv = &PL_sv_undef; sortex->feed = Kino_SortEx_feed; sortex->fetch = Kino_SortEx_fetch_death; /* assign */ sortex->outstream_sv = newSVsv(outstream_sv); Kino_extract_struct(outstream_sv, sortex->outstream, OutStream*, "KinoSearch::Store::OutStream"); sortex->invindex_sv = newSVsv(invindex_sv); sortex->seg_name_sv = newSVsv(seg_name_sv); sortex->mem_threshold = mem_threshold; /* derive */ sortex->run_cache_limit = mem_threshold / 2; return sortex; } /* Create a new SortExRun object */ static SortExRun* Kino_SortEx_new_run(double start, double end) { SortExRun *run; /* allocate */ Kino_New(0, run, 1, SortExRun); Kino_New(0, run->cache, 100, ByteBuf*); /* init */ run->cache_cap = 100; run->cache_elems = 0; run->cache_pos = 0; /* assign */ run->start = start; run->file_pos = start; run->end = end; return run; } void Kino_SortEx_feed(SortExternal* sortex, char* ptr, I32 len) { /* add room for more cache elements if needed */ if (sortex->cache_elems == sortex->cache_cap) { /* add 100, plus 10% of the current capacity */ sortex->cache_cap = sortex->cache_cap + 100 + (sortex->cache_cap / 8); Kino_Renew(sortex->cache, sortex->cache_cap, ByteBuf*); } sortex->cache[ sortex->cache_elems ] = Kino_BB_new_string(ptr, len); sortex->cache_elems++; /* track memory consumed */ sortex->cache_bytes += KINO_PER_ITEM_OVERHEAD; sortex->cache_bytes += len + 1; /* check if it's time to flush the cache */ if (sortex->cache_bytes >= sortex->mem_threshold) Kino_SortEx_sort_run(sortex); } ByteBuf* Kino_SortEx_fetch(SortExternal *sortex) { if (sortex->cache_pos >= sortex->cache_elems) Kino_SortEx_refill_cache(sortex); if (sortex->cache_elems > 0) { return sortex->cache[ sortex->cache_pos++ ]; } else { return NULL; } } ByteBuf* Kino_SortEx_fetch_death(SortExternal *sortex) { ByteBuf *bb = NULL; Kino_confess("can't call fetch before sort_all"); return bb; } void Kino_SortEx_enable_fetch(SortExternal *sortex) { sortex->fetch = Kino_SortEx_fetch; } /* Allocate more memory to an array of pointers to pointers to ByteBufs, if * the current allocation isn't sufficient. */ static void Kino_SortEx_grow_bufbuf(ByteBuf ***bb_buf, I32 current, I32 desired) { if (current < desired) Kino_Renew(*bb_buf, desired, ByteBuf*); } /* Sort the main cache. */ void Kino_SortEx_sort_cache(SortExternal *sortex) { Kino_SortEx_grow_bufbuf(&sortex->scratch, sortex->scratch_cap, sortex->cache_elems); Kino_SortEx_mergesort(sortex->cache, sortex->scratch, sortex->cache_elems); } void Kino_SortEx_sort_run(SortExternal *sortex) { OutStream *outstream; ByteBuf **cache, **cache_end; ByteBuf *bb; double start, end; /* bail if there's nothing in the cache */ if (sortex->cache_bytes == 0) return; /* allocate space for a new run */ sortex->num_runs++; Kino_Renew(sortex->runs, sortex->num_runs, SortExRun*); /* make local copies */ outstream = sortex->outstream; cache = sortex->cache; /* mark start of run */ start = outstream->tell(outstream); /* write sorted items to file */ Kino_SortEx_sort_cache(sortex); cache_end = cache + sortex->cache_elems; for (cache = sortex->cache; cache < cache_end; cache++) { bb = *cache; outstream->write_vint(outstream, bb->size); outstream->write_bytes(outstream, bb->ptr, bb->size); } /* clear the cache */ Kino_SortEx_clear_cache(sortex); /* mark end of run and build a new SortExRun object */ end = outstream->tell(outstream); sortex->runs[ sortex->num_runs - 1 ] = Kino_SortEx_new_run(start, end); /* recalculate the size allowed for each run's cache */ sortex->run_cache_limit = (sortex->mem_threshold / 2) / sortex->num_runs; sortex->run_cache_limit = sortex->run_cache_limit < 65536 ? 65536 : sortex->run_cache_limit; } /* Recover sorted items from disk, up to the allowable memory limit. */ static I32 Kino_SortEx_refill_run(SortExternal* sortex, SortExRun *run) { InStream *instream; double end; I32 run_cache_bytes = 0; int num_elems = 0; /* number of items recovered */ I32 len; ByteBuf *bb; I32 run_cache_limit; /* see if we actually need to refill */ if (run->cache_elems - run->cache_pos) return run->cache_elems - run->cache_pos; else Kino_SortEx_clear_run_cache(run); /* make local copies */ instream = sortex->instream; run_cache_limit = sortex->run_cache_limit; end = run->end; instream->seek(instream, run->file_pos); while (1) { /* bail if we've read everything in this run */ if (instream->tell(instream) >= end) { /* make sure we haven't read too much */ if (instream->tell(instream) > end) { UV pos = instream->tell(instream); Kino_confess( "read past end of run: %"UVuf", %"UVuf, pos, (UV)end ); } break; } /* bail if we've hit the ceiling for this run's cache */ if (run_cache_bytes > run_cache_limit) break; /* retrieve and decode len; allocate a ByteBuf and recover the string */ len = instream->read_vint(instream); bb = Kino_BB_new(len); instream->read_bytes(instream, bb->ptr, len); bb->ptr[len] = '\0'; /* add to the run's cache */ if (num_elems == run->cache_cap) { run->cache_cap = run->cache_cap + 100 + (run->cache_cap / 8); Kino_Renew(run->cache, run->cache_cap, ByteBuf*); } run->cache[ num_elems ] = bb; /* track how much we've read so far */ num_elems++; run_cache_bytes += len + 1 + KINO_PER_ITEM_OVERHEAD; } /* reset the cache array position and length; remember file pos */ run->cache_elems = num_elems; run->cache_pos = 0; run->file_pos = instream->tell(instream); return num_elems; } /* Refill the main cache, drawing from the caches of all runs. */ static void Kino_SortEx_refill_cache(SortExternal *sortex) { ByteBuf *endpost; SortExRun *run; I32 i = 0; I32 total = 0; /* free all the existing ByteBufs, as they've been fetched by now */ Kino_SortEx_clear_cache(sortex); /* make sure all runs have at least one item in the cache */ while (i < sortex->num_runs) { run = sortex->runs[i]; if ( (run->cache_elems > run->cache_pos) || (Kino_SortEx_refill_run(sortex, run)) ) { i++; } else { /* discard empty runs */ Kino_SortEx_destroy_run(run); sortex->num_runs--; sortex->runs[i] = sortex->runs[ sortex->num_runs ]; sortex->runs[ sortex->num_runs ] = NULL; } } if (!sortex->num_runs) return; /* move as many items as possible into the sorting cache */ endpost = Kino_SortEx_find_endpost(sortex); for (i = 0; i < sortex->num_runs; i++) { total += Kino_SortEx_define_slice(sortex->runs[i], endpost); } /* make sure we have enough room in both the main cache and the scratch */ Kino_SortEx_grow_bufbuf(&sortex->cache, sortex->cache_cap, total); Kino_SortEx_grow_bufbuf(&sortex->scratch, sortex->scratch_cap, total); Kino_SortEx_merge_runs(sortex); sortex->cache_elems = total; } /* Merge all the items which are "in-range" from all the Runs into the main * cache. */ static void Kino_SortEx_merge_runs(SortExternal *sortex) { SortExRun *run; ByteBuf ***slice_starts; ByteBuf **cache = sortex->cache; I32 *slice_sizes; I32 i = 0, j = 0, slice_size = 0, num_slices = 0; Kino_New(0, slice_starts, sortex->num_runs, ByteBuf**); Kino_New(0, slice_sizes, sortex->num_runs, I32); /* copy all the elements in range into the cache */ j = 0; for (i = 0; i < sortex->num_runs; i++) { run = sortex->runs[i]; slice_size = run->slice_size; if (slice_size == 0) continue; slice_sizes[j] = slice_size; slice_starts[j] = cache; Copy( (run->cache + run->cache_pos), cache, slice_size, ByteBuf* ); run->cache_pos += slice_size; cache += slice_size; num_slices = ++j; } /* exploit previous sorting, rather than sort cache naively */ while (num_slices > 1) { /* leave the first slice intact if the number of slices is odd */ i = 0; j = 0; while (i < num_slices) { if (num_slices - i >= 2) { /* merge two consecutive slices */ slice_size = slice_sizes[i] + slice_sizes[i+1]; Kino_SortEx_merge(slice_starts[i], slice_sizes[i], slice_starts[i+1], slice_sizes[i+1], sortex->scratch); slice_sizes[j] = slice_size; slice_starts[j] = slice_starts[i]; Copy(sortex->scratch, slice_starts[j], slice_size, ByteBuf*); i += 2; j += 1; } else if (num_slices - i >= 1) { /* move single slice pointer */ slice_sizes[j] = slice_sizes[i]; slice_starts[j] = slice_starts[i]; i += 1; j += 1; } } num_slices = j; } Kino_Safefree(slice_starts); Kino_Safefree(slice_sizes); } /* Return a pointer to the item in one of the runs' caches which is * the highest in sort order, but which we can guarantee is lower in sort * order than any item which has yet to enter a run cache. */ static ByteBuf* Kino_SortEx_find_endpost(SortExternal *sortex) { int i; ByteBuf *endpost = NULL, *candidate = NULL; SortExRun *run; for (i = 0; i < sortex->num_runs; i++) { /* get a run and verify no errors */ run = sortex->runs[i]; if (run->cache_pos == run->cache_elems || run->cache_elems < 1) Kino_confess("find_endpost encountered an empty run cache"); /* get the last item in this run's cache */ candidate = run->cache[ run->cache_elems - 1 ]; /* if it's the first run, the item is automatically the new endpost */ if (i == 0) { endpost = candidate; continue; } /* if it's less than the current endpost, it's the new endpost */ else if (Kino_BB_compare(candidate, endpost) < 0) { endpost = candidate; } } return endpost; } /* Record the number of items in the run's cache which are lexically * less than or equal to the endpost. */ static I32 Kino_SortEx_define_slice(SortExRun *run, ByteBuf *endpost) { I32 lo, mid, hi, delta; ByteBuf **cache = run->cache; /* operate on a slice of the cache */ lo = run->cache_pos - 1; hi = run->cache_elems; /* binary search */ while (hi - lo > 1) { mid = (lo + hi) / 2; delta = Kino_BB_compare(cache[mid], endpost); if (delta > 0) hi = mid; else lo = mid; } run->slice_size = lo == -1 ? 0 : (lo - run->cache_pos) + 1; return run->slice_size; } /* Standard merge sort. */ static void Kino_SortEx_mergesort(ByteBuf **bufbuf, ByteBuf **scratch, I32 buf_size) { if (buf_size == 0) return; Kino_SortEx_msort(bufbuf, scratch, 0, buf_size - 1); } /* Standard merge sort msort function. */ static void Kino_SortEx_msort(ByteBuf **bufbuf, ByteBuf **scratch, U32 left, U32 right) { I32 mid; if (right > left) { mid = ( (right+left)/2 ) + 1; Kino_SortEx_msort(bufbuf, scratch, left, mid - 1); Kino_SortEx_msort(bufbuf, scratch, mid, right); Kino_SortEx_merge( (bufbuf + left), (mid - left), (bufbuf + mid), (right - mid + 1), scratch); Copy( scratch, (bufbuf + left), (right - left + 1), ByteBuf* ); } } /* Standard mergesort merge function. This variant is capable of merging two * discontiguous source arrays. Copying elements back into the source is left * for the caller. */ static void Kino_SortEx_merge(ByteBuf **left_ptr, U32 left_size, ByteBuf **right_ptr, U32 right_size, ByteBuf **dest) { ByteBuf **left_boundary, **right_boundary; left_boundary = left_ptr + left_size; right_boundary = right_ptr + right_size; while (left_ptr < left_boundary && right_ptr < right_boundary) { if (Kino_BB_compare(*left_ptr, *right_ptr) < 1) { *dest++ = *left_ptr++; } else { *dest++ = *right_ptr++; } } while (left_ptr < left_boundary) { *dest++ = *left_ptr++; } while (right_ptr < right_boundary) { *dest++ = *right_ptr++; } } static void Kino_SortEx_clear_cache(SortExternal *sortex) { ByteBuf **cache, **cache_end; cache_end = sortex->cache + sortex->cache_elems; /* only blow away items that haven't been released */ for (cache = sortex->cache + sortex->cache_pos; cache < cache_end; cache++ ) { Kino_BB_destroy(*cache); } sortex->cache_bytes = 0; sortex->cache_elems = 0; sortex->cache_pos = 0; } static void Kino_SortEx_clear_run_cache(SortExRun *run) { ByteBuf **cache, **cache_end; cache_end = run->cache + run->cache_elems; /* only destroy items which haven't been passed to the main cache */ for (cache = run->cache + run->cache_pos; cache < cache_end; cache++) { Kino_BB_destroy(*cache); } run->cache_elems = 0; run->cache_pos = 0; } void Kino_SortEx_destroy(SortExternal *sortex) { I32 i; /* delegate to Perl garbage collector */ SvREFCNT_dec(sortex->outstream_sv); SvREFCNT_dec(sortex->instream_sv); SvREFCNT_dec(sortex->invindex_sv); SvREFCNT_dec(sortex->seg_name_sv); /* free the cache and the scratch */ Kino_SortEx_clear_cache(sortex); Kino_Safefree(sortex->cache); Kino_Safefree(sortex->scratch); /* free all of the runs and the array that held them */ for (i = 0; i < sortex->num_runs; i++) { Kino_SortEx_destroy_run(sortex->runs[i]); } Kino_Safefree(sortex->runs); /* free me */ Kino_Safefree(sortex); } static void Kino_SortEx_destroy_run(SortExRun *run) { Kino_SortEx_clear_run_cache(run); Kino_Safefree(run->cache); Kino_Safefree(run); }