Stxxl
1.3.1
|
00001 /*************************************************************************** 00002 * include/stxxl/bits/algo/sort.h 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2002-2003 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * Copyright (C) 2006 Johannes Singler <singler@ira.uka.de> 00008 * Copyright (C) 2008-2011 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> 00009 * 00010 * Distributed under the Boost Software License, Version 1.0. 00011 * (See accompanying file LICENSE_1_0.txt or copy at 00012 * http://www.boost.org/LICENSE_1_0.txt) 00013 **************************************************************************/ 00014 00015 #ifndef STXXL_SORT_HEADER 00016 #define STXXL_SORT_HEADER 00017 00018 #include <functional> 00019 00020 #include <stxxl/bits/mng/mng.h> 00021 #include <stxxl/bits/common/rand.h> 00022 #include <stxxl/bits/mng/adaptor.h> 00023 #include <stxxl/bits/common/simple_vector.h> 00024 #include <stxxl/bits/common/settings.h> 00025 #include <stxxl/bits/mng/block_alloc_interleaved.h> 00026 #include <stxxl/bits/io/request_operations.h> 00027 #include <stxxl/bits/algo/sort_base.h> 00028 #include <stxxl/bits/algo/sort_helper.h> 00029 #include <stxxl/bits/algo/intksort.h> 00030 #include <stxxl/bits/algo/adaptor.h> 00031 #include <stxxl/bits/algo/async_schedule.h> 00032 #include <stxxl/bits/mng/block_prefetcher.h> 00033 #include <stxxl/bits/mng/buf_writer.h> 00034 #include <stxxl/bits/algo/run_cursor.h> 00035 #include <stxxl/bits/algo/losertree.h> 00036 #include <stxxl/bits/algo/inmemsort.h> 00037 #include <stxxl/bits/parallel.h> 00038 #include <stxxl/bits/common/is_sorted.h> 00039 00040 00041 __STXXL_BEGIN_NAMESPACE 00042 00045 00046 00049 namespace sort_local 00050 { 00051 template <typename block_type, typename bid_type> 00052 struct read_next_after_write_completed 00053 { 00054 block_type * block; 00055 bid_type bid; 00056 request_ptr * req; 00057 void operator () (request * /*completed_req*/) 00058 { 00059 *req = block->read(bid); 00060 } 00061 }; 00062 00063 00064 template < 00065 typename block_type, 00066 typename run_type, 00067 typename input_bid_iterator, 00068 typename value_cmp> 00069 void 00070 create_runs( 00071 input_bid_iterator it, 00072 run_type ** runs, 00073 int_type nruns, 00074 int_type _m, 00075 value_cmp cmp) 00076 { 00077 typedef typename block_type::value_type type; 00078 typedef typename block_type::bid_type bid_type; 00079 STXXL_VERBOSE1("stxxl::create_runs nruns=" << nruns << " m=" << _m); 00080 00081 int_type m2 = _m / 2; 00082 block_manager * bm = block_manager::get_instance(); 00083 block_type * Blocks1 = new block_type[m2]; 00084 block_type * Blocks2 = new block_type[m2]; 00085 bid_type * bids1 = new bid_type[m2]; 00086 bid_type * bids2 = new bid_type[m2]; 00087 request_ptr * read_reqs1 = new request_ptr[m2]; 00088 request_ptr * read_reqs2 = new request_ptr[m2]; 00089 request_ptr * write_reqs = new request_ptr[m2]; 00090 read_next_after_write_completed<block_type, bid_type> * next_run_reads = 00091 new read_next_after_write_completed<block_type, bid_type>[m2]; 00092 00093 disk_queues::get_instance()->set_priority_op(request_queue::WRITE); 00094 00095 int_type i; 00096 int_type run_size = 0; 00097 00098 assert(nruns >= 2); 00099 00100 run_size = runs[0]->size(); 00101 assert(run_size == m2); 00102 00103 for (i = 0; i < run_size; ++i) 00104 { 00105 STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks1[i].elem)); 00106 bids1[i] = *(it++); 00107 read_reqs1[i] = Blocks1[i].read(bids1[i]); 00108 } 00109 00110 run_size = runs[1]->size(); 00111 00112 for (i = 0; i < run_size; ++i) 00113 { 00114 STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks2[i].elem)); 00115 bids2[i] = *(it++); 00116 read_reqs2[i] = Blocks2[i].read(bids2[i]); 00117 } 00118 00119 for (int_type k = 0; k < nruns - 1; ++k) 00120 { 00121 run_type * run = runs[k]; 00122 run_size = run->size(); 00123 assert(run_size == m2); 00124 #ifndef NDEBUG 00125 int_type next_run_size = runs[k + 1]->size(); 00126 #endif 00127 assert((next_run_size == m2) || (next_run_size <= m2 && k == nruns - 2)); 00128 00129 STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1"); 00130 wait_all(read_reqs1, run_size); 00131 STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1"); 00132 for (i = 0; i < run_size; ++i) 00133 bm->delete_block(bids1[i]); 00134 00135 potentially_parallel:: 00136 sort(make_element_iterator(Blocks1, 0), 00137 make_element_iterator(Blocks1, run_size * block_type::size), 00138 cmp); 00139 00140 STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs"); 00141 if (k > 0) 00142 wait_all(write_reqs, m2); 00143 STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs"); 00144 00145 int_type runplus2size = (k < nruns - 2) ? runs[k + 2]->size() : 0; 00146 for (i = 0; i < m2; ++i) 00147 { 00148 STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem)); 00149 (*run)[i].value = Blocks1[i][0]; 00150 if (i >= runplus2size) { 00151 write_reqs[i] = Blocks1[i].write((*run)[i].bid); 00152 } 00153 else 00154 { 00155 next_run_reads[i].block = Blocks1 + i; 00156 next_run_reads[i].req = read_reqs1 + i; 00157 bids1[i] = next_run_reads[i].bid = *(it++); 00158 write_reqs[i] = Blocks1[i].write((*run)[i].bid, next_run_reads[i]); 00159 } 00160 } 00161 std::swap(Blocks1, Blocks2); 00162 std::swap(bids1, bids2); 00163 std::swap(read_reqs1, read_reqs2); 00164 } 00165 00166 run_type * run = runs[nruns - 1]; 00167 run_size = run->size(); 00168 STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1"); 00169 wait_all(read_reqs1, run_size); 00170 STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1"); 00171 for (i = 0; i < run_size; ++i) 00172 bm->delete_block(bids1[i]); 00173 00174 potentially_parallel:: 00175 sort(make_element_iterator(Blocks1, 0), 00176 make_element_iterator(Blocks1, run_size * block_type::size), 00177 cmp); 00178 00179 STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs"); 00180 wait_all(write_reqs, m2); 00181 STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs"); 00182 00183 for (i = 0; i < run_size; ++i) 00184 { 00185 STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem)); 00186 (*run)[i].value = Blocks1[i][0]; 00187 write_reqs[i] = Blocks1[i].write((*run)[i].bid); 00188 } 00189 00190 STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs"); 00191 wait_all(write_reqs, run_size); 00192 STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs"); 00193 00194 delete[] Blocks1; 00195 delete[] Blocks2; 00196 delete[] bids1; 00197 delete[] bids2; 00198 delete[] read_reqs1; 00199 delete[] read_reqs2; 00200 delete[] write_reqs; 00201 delete[] next_run_reads; 00202 } 00203 00204 00205 template <typename block_type, typename run_type, typename value_cmp> 00206 bool check_sorted_runs(run_type ** runs, 00207 unsigned_type nruns, 00208 unsigned_type m, 00209 value_cmp cmp) 00210 { 00211 typedef typename block_type::value_type value_type; 00212 00213 STXXL_MSG("check_sorted_runs Runs: " << nruns); 00214 unsigned_type irun = 0; 00215 for (irun = 0; irun < nruns; ++irun) 00216 { 00217 const unsigned_type nblocks_per_run = runs[irun]->size(); 00218 unsigned_type blocks_left = nblocks_per_run; 00219 block_type * blocks = new block_type[m]; 00220 request_ptr * reqs = new request_ptr[m]; 00221 value_type last = cmp.min_value(); 00222 00223 for (unsigned_type off = 0; off < nblocks_per_run; off += m) 00224 { 00225 const unsigned_type nblocks = STXXL_MIN(blocks_left, m); 00226 const unsigned_type nelements = nblocks * block_type::size; 00227 blocks_left -= nblocks; 00228 00229 for (unsigned_type j = 0; j < nblocks; ++j) 00230 { 00231 reqs[j] = blocks[j].read((*runs[irun])[j + off].bid); 00232 } 00233 wait_all(reqs, reqs + nblocks); 00234 00235 if (off && cmp(blocks[0][0], last)) 00236 { 00237 STXXL_MSG("check_sorted_runs wrong first value in the run " << irun); 00238 STXXL_MSG(" first value: " << blocks[0][0]); 00239 STXXL_MSG(" last value: " << last); 00240 for (unsigned_type k = 0; k < block_type::size; ++k) 00241 STXXL_MSG("Element " << k << " in the block is :" << blocks[0][k]); 00242 00243 return false; 00244 } 00245 00246 for (unsigned_type j = 0; j < nblocks; ++j) 00247 { 00248 if (!(blocks[j][0] == (*runs[irun])[j + off].value)) 00249 { 00250 STXXL_MSG("check_sorted_runs wrong trigger in the run " << irun << " block " << (j + off)); 00251 STXXL_MSG(" trigger value: " << (*runs[irun])[j + off].value); 00252 STXXL_MSG("Data in the block:"); 00253 for (unsigned_type k = 0; k < block_type::size; ++k) 00254 STXXL_MSG("Element " << k << " in the block is :" << blocks[j][k]); 00255 00256 STXXL_MSG("BIDS:"); 00257 for (unsigned_type k = 0; k < nblocks; ++k) 00258 { 00259 if (k == j) 00260 STXXL_MSG("Bad one comes next."); 00261 STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid)); 00262 } 00263 00264 return false; 00265 } 00266 } 00267 if (!stxxl::is_sorted(make_element_iterator(blocks, 0), 00268 make_element_iterator(blocks, nelements), 00269 cmp)) 00270 { 00271 STXXL_MSG("check_sorted_runs wrong order in the run " << irun); 00272 STXXL_MSG("Data in blocks:"); 00273 for (unsigned_type j = 0; j < nblocks; ++j) 00274 { 00275 for (unsigned_type k = 0; k < block_type::size; ++k) 00276 STXXL_MSG(" Element " << k << " in block " << (j + off) << " is :" << blocks[j][k]); 00277 } 00278 STXXL_MSG("BIDS:"); 00279 for (unsigned_type k = 0; k < nblocks; ++k) 00280 { 00281 STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid)); 00282 } 00283 00284 return false; 00285 } 00286 00287 last = blocks[nblocks - 1][block_type::size - 1]; 00288 } 00289 00290 assert(blocks_left == 0); 00291 delete[] reqs; 00292 delete[] blocks; 00293 } 00294 00295 return true; 00296 } 00297 00298 00299 template <typename block_type, typename run_type, typename value_cmp> 00300 void merge_runs(run_type ** in_runs, int_type nruns, run_type * out_run, unsigned_type _m, value_cmp cmp) 00301 { 00302 typedef typename block_type::value_type value_type; 00303 typedef typename run_type::value_type trigger_entry_type; 00304 typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type; 00305 typedef run_cursor2<block_type, prefetcher_type> run_cursor_type; 00306 typedef sort_helper::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type; 00307 00308 run_type consume_seq(out_run->size()); 00309 00310 int_type * prefetch_seq = new int_type[out_run->size()]; 00311 00312 typename run_type::iterator copy_start = consume_seq.begin(); 00313 for (int_type i = 0; i < nruns; i++) 00314 { 00315 // TODO: try to avoid copy 00316 copy_start = std::copy( 00317 in_runs[i]->begin(), 00318 in_runs[i]->end(), 00319 copy_start); 00320 } 00321 00322 std::stable_sort(consume_seq.begin(), consume_seq.end(), 00323 sort_helper::trigger_entry_cmp<trigger_entry_type, value_cmp>(cmp) _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL); 00324 00325 int_type disks_number = config::get_instance()->disks_number(); 00326 00327 #ifdef PLAY_WITH_OPT_PREF 00328 const int_type n_write_buffers = 4 * disks_number; 00329 #else 00330 const int_type n_prefetch_buffers = STXXL_MAX(2 * disks_number, (3 * (int_type(_m) - nruns) / 4)); 00331 const int_type n_write_buffers = STXXL_MAX(2 * disks_number, int_type(_m) - nruns - n_prefetch_buffers); 00332 #if STXXL_SORT_OPTIMAL_PREFETCHING 00333 // heuristic 00334 const int_type n_opt_prefetch_buffers = 2 * disks_number + (3 * (n_prefetch_buffers - 2 * disks_number)) / 10; 00335 #endif 00336 #endif 00337 00338 #if STXXL_SORT_OPTIMAL_PREFETCHING 00339 compute_prefetch_schedule( 00340 consume_seq, 00341 prefetch_seq, 00342 n_opt_prefetch_buffers, 00343 disks_number); 00344 #else 00345 for (unsigned_type i = 0; i < out_run->size(); i++) 00346 prefetch_seq[i] = i; 00347 00348 #endif 00349 00350 prefetcher_type prefetcher(consume_seq.begin(), 00351 consume_seq.end(), 00352 prefetch_seq, 00353 nruns + n_prefetch_buffers); 00354 00355 buffered_writer<block_type> writer(n_write_buffers, n_write_buffers / 2); 00356 00357 int_type out_run_size = out_run->size(); 00358 00359 block_type * out_buffer = writer.get_free_block(); 00360 00361 //If parallelism is activated, one can still fall back to the 00362 //native merge routine by setting stxxl::SETTINGS::native_merge= true, //otherwise, it is used anyway. 00363 00364 if (do_parallel_merge()) 00365 { 00366 #if STXXL_PARALLEL_MULTIWAY_MERGE 00367 00368 // begin of STL-style merging 00369 00370 typedef stxxl::int64 diff_type; 00371 typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence; 00372 typedef typename std::vector<sequence>::size_type seqs_size_type; 00373 std::vector<sequence> seqs(nruns); 00374 std::vector<block_type *> buffers(nruns); 00375 00376 for (int_type i = 0; i < nruns; i++) // initialize sequences 00377 { 00378 buffers[i] = prefetcher.pull_block(); // get first block of each run 00379 seqs[i] = std::make_pair(buffers[i]->begin(), buffers[i]->end()); 00380 // this memory location stays the same, only the data is exchanged 00381 } 00382 00383 #if STXXL_CHECK_ORDER_IN_SORTS 00384 value_type last_elem = cmp.min_value(); 00385 #endif 00386 diff_type num_currently_mergeable = 0; 00387 00388 for (int_type j = 0; j < out_run_size; ++j) // for the whole output run, out_run_size is in blocks 00389 { 00390 diff_type rest = block_type::size; // elements still to merge for this output block 00391 00392 STXXL_VERBOSE1("output block " << j); 00393 do { 00394 if (num_currently_mergeable < rest) 00395 { 00396 if (prefetcher.empty()) 00397 { 00398 // anything remaining is already in memory 00399 num_currently_mergeable = (out_run_size - j) * block_type::size 00400 - (block_type::size - rest); 00401 } 00402 else 00403 { 00404 num_currently_mergeable = sort_helper::count_elements_less_equal( 00405 seqs, consume_seq[prefetcher.pos()].value, cmp); 00406 } 00407 } 00408 00409 diff_type output_size = STXXL_MIN(num_currently_mergeable, rest); // at most rest elements 00410 00411 STXXL_VERBOSE1("before merge " << output_size); 00412 00413 stxxl::parallel::multiway_merge(seqs.begin(), seqs.end(), out_buffer->end() - rest, cmp, output_size); 00414 // sequence iterators are progressed appropriately 00415 00416 rest -= output_size; 00417 num_currently_mergeable -= output_size; 00418 00419 STXXL_VERBOSE1("after merge"); 00420 00421 sort_helper::refill_or_remove_empty_sequences(seqs, buffers, prefetcher); 00422 } while (rest > 0 && seqs.size() > 0); 00423 00424 #if STXXL_CHECK_ORDER_IN_SORTS 00425 if (!stxxl::is_sorted(out_buffer->begin(), out_buffer->end(), cmp)) 00426 { 00427 for (value_type * i = out_buffer->begin() + 1; i != out_buffer->end(); i++) 00428 if (cmp(*i, *(i - 1))) 00429 { 00430 STXXL_VERBOSE1("Error at position " << (i - out_buffer->begin())); 00431 } 00432 assert(false); 00433 } 00434 00435 if (j > 0) // do not check in first iteration 00436 assert(cmp((*out_buffer)[0], last_elem) == false); 00437 00438 last_elem = (*out_buffer)[block_type::size - 1]; 00439 #endif 00440 00441 (*out_run)[j].value = (*out_buffer)[0]; // save smallest value 00442 00443 out_buffer = writer.write(out_buffer, (*out_run)[j].bid); 00444 } 00445 00446 // end of STL-style merging 00447 00448 #else 00449 STXXL_THROW_UNREACHABLE(); 00450 #endif 00451 } 00452 else 00453 { 00454 // begin of native merging procedure 00455 00456 loser_tree<run_cursor_type, run_cursor2_cmp_type> 00457 losers(&prefetcher, nruns, run_cursor2_cmp_type(cmp)); 00458 00459 #if STXXL_CHECK_ORDER_IN_SORTS 00460 value_type last_elem = cmp.min_value(); 00461 #endif 00462 00463 for (int_type i = 0; i < out_run_size; ++i) 00464 { 00465 losers.multi_merge(out_buffer->elem, out_buffer->elem + block_type::size); 00466 (*out_run)[i].value = *(out_buffer->elem); 00467 00468 #if STXXL_CHECK_ORDER_IN_SORTS 00469 assert(stxxl::is_sorted( 00470 out_buffer->begin(), 00471 out_buffer->end(), 00472 cmp)); 00473 00474 if (i) 00475 assert(cmp(*(out_buffer->elem), last_elem) == false); 00476 00477 last_elem = (*out_buffer).elem[block_type::size - 1]; 00478 #endif 00479 00480 out_buffer = writer.write(out_buffer, (*out_run)[i].bid); 00481 } 00482 00483 // end of native merging procedure 00484 } 00485 00486 delete[] prefetch_seq; 00487 00488 block_manager * bm = block_manager::get_instance(); 00489 for (int_type i = 0; i < nruns; ++i) 00490 { 00491 unsigned_type sz = in_runs[i]->size(); 00492 for (unsigned_type j = 0; j < sz; ++j) 00493 bm->delete_block((*in_runs[i])[j].bid); 00494 00495 00496 delete in_runs[i]; 00497 } 00498 } 00499 00500 00501 template <typename block_type, 00502 typename alloc_strategy, 00503 typename input_bid_iterator, 00504 typename value_cmp> 00505 simple_vector<sort_helper::trigger_entry<block_type> > * 00506 sort_blocks(input_bid_iterator input_bids, 00507 unsigned_type _n, 00508 unsigned_type _m, 00509 value_cmp cmp 00510 ) 00511 { 00512 typedef typename block_type::value_type type; 00513 typedef typename block_type::bid_type bid_type; 00514 typedef sort_helper::trigger_entry<block_type> trigger_entry_type; 00515 typedef simple_vector<trigger_entry_type> run_type; 00516 typedef typename interleaved_alloc_traits<alloc_strategy>::strategy interleaved_alloc_strategy; 00517 00518 unsigned_type m2 = _m / 2; 00519 unsigned_type full_runs = _n / m2; 00520 unsigned_type partial_runs = ((_n % m2) ? 1 : 0); 00521 unsigned_type nruns = full_runs + partial_runs; 00522 unsigned_type i; 00523 00524 block_manager * mng = block_manager::get_instance(); 00525 00526 //STXXL_VERBOSE ("n=" << _n << " nruns=" << nruns << "=" << full_runs << "+" << partial_runs); 00527 00528 double begin = timestamp(), after_runs_creation, end; 00529 00530 run_type ** runs = new run_type *[nruns]; 00531 00532 for (i = 0; i < full_runs; i++) 00533 runs[i] = new run_type(m2); 00534 00535 00536 if (partial_runs) 00537 runs[i] = new run_type(_n - full_runs * m2); 00538 00539 for (i = 0; i < nruns; ++i) 00540 mng->new_blocks(alloc_strategy(), make_bid_iterator(runs[i]->begin()), make_bid_iterator(runs[i]->end())); 00541 00542 sort_local::create_runs<block_type, 00543 run_type, 00544 input_bid_iterator, 00545 value_cmp>(input_bids, runs, nruns, _m, cmp); 00546 00547 after_runs_creation = timestamp(); 00548 00549 double io_wait_after_rf = stats::get_instance()->get_io_wait_time(); 00550 00551 disk_queues::get_instance()->set_priority_op(request_queue::WRITE); 00552 00553 const int_type merge_factor = optimal_merge_factor(nruns, _m); 00554 run_type ** new_runs; 00555 00556 while (nruns > 1) 00557 { 00558 int_type new_nruns = div_ceil(nruns, merge_factor); 00559 STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns << 00560 " opt_merge_factor: " << merge_factor << " m:" << _m << " new_nruns: " << new_nruns); 00561 00562 new_runs = new run_type *[new_nruns]; 00563 00564 int_type runs_left = nruns; 00565 int_type cur_out_run = 0; 00566 int_type blocks_in_new_run = 0; 00567 00568 while (runs_left > 0) 00569 { 00570 int_type runs2merge = STXXL_MIN(runs_left, merge_factor); 00571 blocks_in_new_run = 0; 00572 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++) 00573 blocks_in_new_run += runs[i]->size(); 00574 00575 // allocate run 00576 new_runs[cur_out_run++] = new run_type(blocks_in_new_run); 00577 runs_left -= runs2merge; 00578 } 00579 // allocate blocks for the new runs 00580 if (cur_out_run == 1 && blocks_in_new_run == int_type(_n) && !input_bids->is_managed()) 00581 { 00582 // if we sort a file we can reuse the input bids for the output 00583 input_bid_iterator cur = input_bids; 00584 for (int_type i = 0; cur != (input_bids + _n); ++cur) 00585 { 00586 (*new_runs[0])[i++].bid = *cur; 00587 } 00588 00589 bid_type & firstBID = (*new_runs[0])[0].bid; 00590 if (firstBID.is_managed()) 00591 { 00592 // the first block does not belong to the file 00593 // need to reallocate it 00594 mng->new_block(FR(), firstBID); 00595 } 00596 bid_type & lastBID = (*new_runs[0])[_n - 1].bid; 00597 if (lastBID.is_managed()) 00598 { 00599 // the first block does not belong to the file 00600 // need to reallocate it 00601 mng->new_block(FR(), lastBID); 00602 } 00603 } 00604 else 00605 { 00606 mng->new_blocks(interleaved_alloc_strategy(new_nruns, alloc_strategy()), 00607 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, 0, new_nruns, blocks_in_new_run), 00608 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, _n, new_nruns, blocks_in_new_run)); 00609 } 00610 // merge all 00611 runs_left = nruns; 00612 cur_out_run = 0; 00613 while (runs_left > 0) 00614 { 00615 int_type runs2merge = STXXL_MIN(runs_left, merge_factor); 00616 #if STXXL_CHECK_ORDER_IN_SORTS 00617 assert((check_sorted_runs<block_type, run_type, value_cmp>(runs + nruns - runs_left, runs2merge, m2, cmp))); 00618 #endif 00619 STXXL_VERBOSE("Merging " << runs2merge << " runs"); 00620 merge_runs<block_type, run_type>(runs + nruns - runs_left, 00621 runs2merge, *(new_runs + (cur_out_run++)), _m, cmp 00622 ); 00623 runs_left -= runs2merge; 00624 } 00625 00626 nruns = new_nruns; 00627 delete[] runs; 00628 runs = new_runs; 00629 } 00630 00631 run_type * result = *runs; 00632 delete[] runs; 00633 00634 end = timestamp(); 00635 00636 STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Run creation time: " << 00637 after_runs_creation - begin << " s"); 00638 STXXL_VERBOSE("Time in I/O wait(rf): " << io_wait_after_rf << " s"); 00639 STXXL_VERBOSE(*stats::get_instance()); 00640 STXXL_UNUSED(begin + after_runs_creation + end + io_wait_after_rf); 00641 00642 return result; 00643 } 00644 } 00645 00646 00692 00693 00694 00695 00696 00697 00698 00699 template <typename ExtIterator_, typename StrictWeakOrdering_> 00700 void sort(ExtIterator_ first, ExtIterator_ last, StrictWeakOrdering_ cmp, unsigned_type M) 00701 { 00702 sort_helper::verify_sentinel_strict_weak_ordering(cmp); 00703 00704 typedef simple_vector<sort_helper::trigger_entry<typename ExtIterator_::block_type> > run_type; 00705 00706 typedef typename ExtIterator_::vector_type::value_type value_type; 00707 typedef typename ExtIterator_::block_type block_type; 00708 00709 unsigned_type n = 0; 00710 00711 block_manager * mng = block_manager::get_instance(); 00712 00713 first.flush(); 00714 00715 if ((last - first) * sizeof(value_type) * sort_memory_usage_factor() < M) 00716 { 00717 stl_in_memory_sort(first, last, cmp); 00718 } 00719 else 00720 { 00721 if (!(2 * block_type::raw_size * sort_memory_usage_factor() <= M)) { 00722 throw bad_parameter("stxxl::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'M'"); 00723 } 00724 00725 if (first.block_offset()) 00726 { 00727 if (last.block_offset()) // first and last element are 00728 // not the first elements of their block 00729 { 00730 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type; 00731 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type; 00732 typename ExtIterator_::bid_type first_bid, last_bid; 00733 request_ptr req; 00734 00735 req = first_block->read(*first.bid()); 00736 mng->new_block(FR(), first_bid); // try to overlap 00737 mng->new_block(FR(), last_bid); 00738 req->wait(); 00739 00740 00741 req = last_block->read(*last.bid()); 00742 00743 unsigned_type i = 0; 00744 for ( ; i < first.block_offset(); ++i) 00745 { 00746 first_block->elem[i] = cmp.min_value(); 00747 } 00748 00749 req->wait(); 00750 00751 00752 req = first_block->write(first_bid); 00753 for (i = last.block_offset(); i < block_type::size; ++i) 00754 { 00755 last_block->elem[i] = cmp.max_value(); 00756 } 00757 00758 req->wait(); 00759 00760 00761 req = last_block->write(last_bid); 00762 00763 n = last.bid() - first.bid() + 1; 00764 00765 std::swap(first_bid, *first.bid()); 00766 std::swap(last_bid, *last.bid()); 00767 00768 req->wait(); 00769 00770 00771 delete first_block; 00772 delete last_block; 00773 00774 run_type * out = 00775 sort_local::sort_blocks< 00776 typename ExtIterator_::block_type, 00777 typename ExtIterator_::vector_type::alloc_strategy_type, 00778 typename ExtIterator_::bids_container_iterator> 00779 (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp); 00780 00781 00782 first_block = new typename ExtIterator_::block_type; 00783 last_block = new typename ExtIterator_::block_type; 00784 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type; 00785 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type; 00786 request_ptr * reqs = new request_ptr[2]; 00787 00788 reqs[0] = first_block->read(first_bid); 00789 reqs[1] = sorted_first_block->read((*(out->begin())).bid); 00790 00791 reqs[0]->wait(); 00792 reqs[1]->wait(); 00793 00794 reqs[0] = last_block->read(last_bid); 00795 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid); 00796 00797 for (i = first.block_offset(); i < block_type::size; i++) 00798 { 00799 first_block->elem[i] = sorted_first_block->elem[i]; 00800 } 00801 00802 reqs[0]->wait(); 00803 reqs[1]->wait(); 00804 00805 req = first_block->write(first_bid); 00806 00807 for (i = 0; i < last.block_offset(); ++i) 00808 { 00809 last_block->elem[i] = sorted_last_block->elem[i]; 00810 } 00811 00812 req->wait(); 00813 00814 req = last_block->write(last_bid); 00815 00816 mng->delete_block(out->begin()->bid); 00817 mng->delete_block((*out)[out->size() - 1].bid); 00818 00819 *first.bid() = first_bid; 00820 *last.bid() = last_bid; 00821 00822 typename run_type::iterator it = out->begin(); 00823 ++it; 00824 typename ExtIterator_::bids_container_iterator cur_bid = first.bid(); 00825 ++cur_bid; 00826 00827 for ( ; cur_bid != last.bid(); ++cur_bid, ++it) 00828 { 00829 *cur_bid = (*it).bid; 00830 } 00831 00832 delete first_block; 00833 delete sorted_first_block; 00834 delete sorted_last_block; 00835 delete[] reqs; 00836 delete out; 00837 00838 req->wait(); 00839 00840 00841 delete last_block; 00842 } 00843 else 00844 { 00845 // first element is 00846 // not the first element of its block 00847 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type; 00848 typename ExtIterator_::bid_type first_bid; 00849 request_ptr req; 00850 00851 req = first_block->read(*first.bid()); 00852 mng->new_block(FR(), first_bid); // try to overlap 00853 req->wait(); 00854 00855 00856 unsigned_type i = 0; 00857 for ( ; i < first.block_offset(); ++i) 00858 { 00859 first_block->elem[i] = cmp.min_value(); 00860 } 00861 00862 req = first_block->write(first_bid); 00863 00864 n = last.bid() - first.bid(); 00865 00866 std::swap(first_bid, *first.bid()); 00867 00868 req->wait(); 00869 00870 00871 delete first_block; 00872 00873 run_type * out = 00874 sort_local::sort_blocks< 00875 typename ExtIterator_::block_type, 00876 typename ExtIterator_::vector_type::alloc_strategy_type, 00877 typename ExtIterator_::bids_container_iterator> 00878 (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp); 00879 00880 00881 first_block = new typename ExtIterator_::block_type; 00882 00883 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type; 00884 00885 request_ptr * reqs = new request_ptr[2]; 00886 00887 reqs[0] = first_block->read(first_bid); 00888 reqs[1] = sorted_first_block->read((*(out->begin())).bid); 00889 00890 reqs[0]->wait(); 00891 reqs[1]->wait(); 00892 00893 for (i = first.block_offset(); i < block_type::size; ++i) 00894 { 00895 first_block->elem[i] = sorted_first_block->elem[i]; 00896 } 00897 00898 req = first_block->write(first_bid); 00899 00900 mng->delete_block(out->begin()->bid); 00901 00902 *first.bid() = first_bid; 00903 00904 typename run_type::iterator it = out->begin(); 00905 ++it; 00906 typename ExtIterator_::bids_container_iterator cur_bid = first.bid(); 00907 ++cur_bid; 00908 00909 for ( ; cur_bid != last.bid(); ++cur_bid, ++it) 00910 { 00911 *cur_bid = (*it).bid; 00912 } 00913 00914 *cur_bid = (*it).bid; 00915 00916 delete sorted_first_block; 00917 delete[] reqs; 00918 delete out; 00919 00920 req->wait(); 00921 00922 delete first_block; 00923 } 00924 } 00925 else 00926 { 00927 if (last.block_offset()) // last is 00928 // not the first element of its block 00929 { 00930 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type; 00931 typename ExtIterator_::bid_type last_bid; 00932 request_ptr req; 00933 unsigned_type i; 00934 00935 req = last_block->read(*last.bid()); 00936 mng->new_block(FR(), last_bid); 00937 req->wait(); 00938 00939 00940 for (i = last.block_offset(); i < block_type::size; ++i) 00941 { 00942 last_block->elem[i] = cmp.max_value(); 00943 } 00944 00945 req = last_block->write(last_bid); 00946 00947 n = last.bid() - first.bid() + 1; 00948 00949 std::swap(last_bid, *last.bid()); 00950 00951 req->wait(); 00952 00953 00954 delete last_block; 00955 00956 run_type * out = 00957 sort_local::sort_blocks< 00958 typename ExtIterator_::block_type, 00959 typename ExtIterator_::vector_type::alloc_strategy_type, 00960 typename ExtIterator_::bids_container_iterator> 00961 (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp); 00962 00963 00964 last_block = new typename ExtIterator_::block_type; 00965 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type; 00966 request_ptr * reqs = new request_ptr[2]; 00967 00968 reqs[0] = last_block->read(last_bid); 00969 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid); 00970 00971 reqs[0]->wait(); 00972 reqs[1]->wait(); 00973 00974 for (i = 0; i < last.block_offset(); ++i) 00975 { 00976 last_block->elem[i] = sorted_last_block->elem[i]; 00977 } 00978 00979 req = last_block->write(last_bid); 00980 00981 mng->delete_block((*out)[out->size() - 1].bid); 00982 00983 *last.bid() = last_bid; 00984 00985 typename run_type::iterator it = out->begin(); 00986 typename ExtIterator_::bids_container_iterator cur_bid = first.bid(); 00987 00988 for ( ; cur_bid != last.bid(); ++cur_bid, ++it) 00989 { 00990 *cur_bid = (*it).bid; 00991 } 00992 00993 delete sorted_last_block; 00994 delete[] reqs; 00995 delete out; 00996 00997 req->wait(); 00998 00999 delete last_block; 01000 } 01001 else 01002 { 01003 // first and last element are first elements of their of blocks 01004 n = last.bid() - first.bid(); 01005 01006 run_type * out = 01007 sort_local::sort_blocks<typename ExtIterator_::block_type, 01008 typename ExtIterator_::vector_type::alloc_strategy_type, 01009 typename ExtIterator_::bids_container_iterator> 01010 (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp); 01011 01012 typename run_type::iterator it = out->begin(); 01013 typename ExtIterator_::bids_container_iterator cur_bid = first.bid(); 01014 01015 for ( ; cur_bid != last.bid(); ++cur_bid, ++it) 01016 { 01017 *cur_bid = (*it).bid; 01018 } 01019 01020 delete out; 01021 } 01022 } 01023 } 01024 01025 #if STXXL_CHECK_ORDER_IN_SORTS 01026 typedef typename ExtIterator_::const_iterator const_iterator; 01027 assert(stxxl::is_sorted(const_iterator(first), const_iterator(last), cmp)); 01028 #endif 01029 } 01030 01032 01033 __STXXL_END_NAMESPACE 01034 01035 #endif // !STXXL_SORT_HEADER 01036 // vim: et:ts=4:sw=4