Stxxl
1.3.1
|
00001 /*************************************************************************** 00002 * include/stxxl/bits/stream/sort_stream.h 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2002-2005 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * Copyright (C) 2006-2008 Johannes Singler <singler@ira.uka.de> 00008 * Copyright (C) 2008-2010 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> 00009 * 00010 * Distributed under the Boost Software License, Version 1.0. 00011 * (See accompanying file LICENSE_1_0.txt or copy at 00012 * http://www.boost.org/LICENSE_1_0.txt) 00013 **************************************************************************/ 00014 00015 #ifndef STXXL_SORT_STREAM_HEADER 00016 #define STXXL_SORT_STREAM_HEADER 00017 00018 #ifdef STXXL_BOOST_CONFIG 00019 #include <boost/config.hpp> 00020 #endif 00021 00022 #include <stxxl/bits/stream/stream.h> 00023 #include <stxxl/bits/mng/mng.h> 00024 #include <stxxl/bits/algo/sort_base.h> 00025 #include <stxxl/bits/algo/sort_helper.h> 00026 #include <stxxl/bits/algo/adaptor.h> 00027 #include <stxxl/bits/algo/run_cursor.h> 00028 #include <stxxl/bits/algo/losertree.h> 00029 #include <stxxl/bits/stream/sorted_runs.h> 00030 00031 00032 __STXXL_BEGIN_NAMESPACE 00033 00034 namespace stream 00035 { 00038 00039 00041 // CREATE RUNS // 00043 00050 template < 00051 class Input_, 00052 class Cmp_, 00053 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type), 00054 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY> 00055 class basic_runs_creator : private noncopyable 00056 { 00057 protected: 00058 Input_ & input; 00059 Cmp_ cmp; 00060 00061 public: 00062 typedef Cmp_ cmp_type; 00063 typedef typename Input_::value_type value_type; 00064 typedef typed_block<BlockSize_, value_type> block_type; 00065 typedef sort_helper::trigger_entry<block_type> trigger_entry_type; 00066 typedef sorted_runs<trigger_entry_type> sorted_runs_type; 00067 00068 private: 00069 typedef typename sorted_runs_type::run_type run_type; 00070 sorted_runs_type result_; // stores the result (sorted runs) 00071 unsigned_type m_; // memory for internal use in blocks 00072 bool result_computed; // true iff result is already computed (used in 'result' method) 00073 00075 unsigned_type fetch(block_type * blocks, unsigned_type first_idx, unsigned_type last_idx) 00076 { 00077 typename element_iterator_traits<block_type>::element_iterator output = 00078 make_element_iterator(blocks, first_idx); 00079 unsigned_type curr_idx = first_idx; 00080 while (!input.empty() && curr_idx != last_idx) { 00081 *output = *input; 00082 ++input; 00083 ++output; 00084 ++curr_idx; 00085 } 00086 return curr_idx; 00087 } 00088 00089 void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx) 00090 { 00091 unsigned_type last_idx = num_blocks * block_type::size; 00092 if (first_idx < last_idx) { 00093 typename element_iterator_traits<block_type>::element_iterator curr = 00094 make_element_iterator(blocks, first_idx); 00095 while (first_idx != last_idx) { 00096 *curr = cmp.max_value(); 00097 ++curr; 00098 ++first_idx; 00099 } 00100 } 00101 } 00102 00104 void sort_run(block_type * run, unsigned_type elements) 00105 { 00106 potentially_parallel:: 00107 sort(make_element_iterator(run, 0), 00108 make_element_iterator(run, elements), 00109 cmp); 00110 } 00111 00112 void compute_result(); 00113 00114 public: 00119 basic_runs_creator(Input_ & i, Cmp_ c, unsigned_type memory_to_use) : 00120 input(i), cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false) 00121 { 00122 sort_helper::verify_sentinel_strict_weak_ordering(cmp); 00123 if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) { 00124 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'"); 00125 } 00126 assert(m_ > 0); 00127 } 00128 00132 const sorted_runs_type & result() 00133 { 00134 if (!result_computed) 00135 { 00136 compute_result(); 00137 result_computed = true; 00138 #ifdef STXXL_PRINT_STAT_AFTER_RF 00139 STXXL_MSG(*stats::get_instance()); 00140 #endif //STXXL_PRINT_STAT_AFTER_RF 00141 } 00142 return result_; 00143 } 00144 }; 00145 00149 template <class Input_, class Cmp_, unsigned BlockSize_, class AllocStr_> 00150 void basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_>::compute_result() 00151 { 00152 unsigned_type i = 0; 00153 unsigned_type m2 = m_ / 2; 00154 const unsigned_type el_in_run = m2 * block_type::size; // # el in a run 00155 STXXL_VERBOSE1("basic_runs_creator::compute_result m2=" << m2); 00156 unsigned_type blocks1_length = 0, blocks2_length = 0; 00157 block_type * Blocks1 = NULL; 00158 00159 #ifndef STXXL_SMALL_INPUT_PSORT_OPT 00160 Blocks1 = new block_type[m2 * 2]; 00161 #else 00162 while (!input.empty() && blocks1_length != block_type::size) 00163 { 00164 result_.small_.push_back(*input); 00165 ++input; 00166 ++blocks1_length; 00167 } 00168 00169 if (blocks1_length == block_type::size && !input.empty()) 00170 { 00171 Blocks1 = new block_type[m2 * 2]; 00172 std::copy(result_.small_.begin(), result_.small_.end(), Blocks1[0].begin()); 00173 result_.small_.clear(); 00174 } 00175 else 00176 { 00177 STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length); 00178 result_.elements = blocks1_length; 00179 potentially_parallel:: 00180 sort(result_.small_.begin(), result_.small_.end(), cmp); 00181 return; 00182 } 00183 #endif //STXXL_SMALL_INPUT_PSORT_OPT 00184 00185 // the first block may be there already 00186 blocks1_length = fetch(Blocks1, blocks1_length, el_in_run); 00187 00188 // sort first run 00189 sort_run(Blocks1, blocks1_length); 00190 if (blocks1_length <= block_type::size && input.empty()) 00191 { 00192 // small input, do not flush it on the disk(s) 00193 STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length); 00194 assert(result_.small_.empty()); 00195 result_.small_.insert(result_.small_.end(), Blocks1[0].begin(), Blocks1[0].begin() + blocks1_length); 00196 result_.elements = blocks1_length; 00197 delete[] Blocks1; 00198 return; 00199 } 00200 00201 block_type * Blocks2 = Blocks1 + m2; 00202 block_manager * bm = block_manager::get_instance(); 00203 request_ptr * write_reqs = new request_ptr[m2]; 00204 run_type run; 00205 00206 unsigned_type cur_run_size = div_ceil(blocks1_length, block_type::size); // in blocks 00207 run.resize(cur_run_size); 00208 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end())); 00209 00210 disk_queues::get_instance()->set_priority_op(request_queue::WRITE); 00211 00212 // fill the rest of the last block with max values 00213 fill_with_max_value(Blocks1, cur_run_size, blocks1_length); 00214 00215 for (i = 0; i < cur_run_size; ++i) 00216 { 00217 run[i].value = Blocks1[i][0]; 00218 write_reqs[i] = Blocks1[i].write(run[i].bid); 00219 } 00220 result_.runs.push_back(run); 00221 result_.runs_sizes.push_back(blocks1_length); 00222 result_.elements += blocks1_length; 00223 00224 if (input.empty()) 00225 { 00226 // return 00227 wait_all(write_reqs, write_reqs + cur_run_size); 00228 delete[] write_reqs; 00229 delete[] Blocks1; 00230 return; 00231 } 00232 00233 STXXL_VERBOSE1("Filling the second part of the allocated blocks"); 00234 blocks2_length = fetch(Blocks2, 0, el_in_run); 00235 00236 if (input.empty()) 00237 { 00238 // optimization if the whole set fits into both halves 00239 // (re)sort internally and return 00240 blocks2_length += el_in_run; 00241 sort_run(Blocks1, blocks2_length); // sort first an second run together 00242 wait_all(write_reqs, write_reqs + cur_run_size); 00243 bm->delete_blocks(make_bid_iterator(run.begin()), make_bid_iterator(run.end())); 00244 00245 cur_run_size = div_ceil(blocks2_length, block_type::size); 00246 run.resize(cur_run_size); 00247 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end())); 00248 00249 // fill the rest of the last block with max values 00250 fill_with_max_value(Blocks1, cur_run_size, blocks2_length); 00251 00252 assert(cur_run_size > m2); 00253 00254 for (i = 0; i < m2; ++i) 00255 { 00256 run[i].value = Blocks1[i][0]; 00257 write_reqs[i]->wait(); 00258 write_reqs[i] = Blocks1[i].write(run[i].bid); 00259 } 00260 00261 request_ptr * write_reqs1 = new request_ptr[cur_run_size - m2]; 00262 00263 for ( ; i < cur_run_size; ++i) 00264 { 00265 run[i].value = Blocks1[i][0]; 00266 write_reqs1[i - m2] = Blocks1[i].write(run[i].bid); 00267 } 00268 00269 result_.runs[0] = run; 00270 result_.runs_sizes[0] = blocks2_length; 00271 result_.elements = blocks2_length; 00272 00273 wait_all(write_reqs, write_reqs + m2); 00274 delete[] write_reqs; 00275 wait_all(write_reqs1, write_reqs1 + cur_run_size - m2); 00276 delete[] write_reqs1; 00277 00278 delete[] Blocks1; 00279 00280 return; 00281 } 00282 00283 // more than 2 runs can be filled, i. e. the general case 00284 00285 sort_run(Blocks2, blocks2_length); 00286 00287 cur_run_size = div_ceil(blocks2_length, block_type::size); // in blocks 00288 run.resize(cur_run_size); 00289 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end())); 00290 00291 for (i = 0; i < cur_run_size; ++i) 00292 { 00293 run[i].value = Blocks2[i][0]; 00294 write_reqs[i]->wait(); 00295 write_reqs[i] = Blocks2[i].write(run[i].bid); 00296 } 00297 assert((blocks2_length % el_in_run) == 0); 00298 00299 result_.runs.push_back(run); 00300 result_.runs_sizes.push_back(blocks2_length); 00301 result_.elements += blocks2_length; 00302 00303 while (!input.empty()) 00304 { 00305 blocks1_length = fetch(Blocks1, 0, el_in_run); 00306 sort_run(Blocks1, blocks1_length); 00307 cur_run_size = div_ceil(blocks1_length, block_type::size); // in blocks 00308 run.resize(cur_run_size); 00309 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end())); 00310 00311 // fill the rest of the last block with max values (occurs only on the last run) 00312 fill_with_max_value(Blocks1, cur_run_size, blocks1_length); 00313 00314 for (i = 0; i < cur_run_size; ++i) 00315 { 00316 run[i].value = Blocks1[i][0]; 00317 write_reqs[i]->wait(); 00318 write_reqs[i] = Blocks1[i].write(run[i].bid); 00319 } 00320 result_.runs.push_back(run); 00321 result_.runs_sizes.push_back(blocks1_length); 00322 result_.elements += blocks1_length; 00323 00324 std::swap(Blocks1, Blocks2); 00325 std::swap(blocks1_length, blocks2_length); 00326 } 00327 00328 wait_all(write_reqs, write_reqs + m2); 00329 delete[] write_reqs; 00330 delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2); 00331 } 00332 00339 template < 00340 class Input_, 00341 class Cmp_, 00342 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type), 00343 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY> 00344 class runs_creator : public basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_> 00345 { 00346 private: 00347 typedef basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_> base; 00348 00349 public: 00350 typedef typename base::block_type block_type; 00351 00352 private: 00353 using base::input; 00354 00355 public: 00360 runs_creator(Input_ & i, Cmp_ c, unsigned_type memory_to_use) : base(i, c, memory_to_use) 00361 { } 00362 }; 00363 00364 00372 template <class ValueType_> 00373 struct use_push 00374 { 00375 typedef ValueType_ value_type; 00376 }; 00377 00388 template < 00389 class ValueType_, 00390 class Cmp_, 00391 unsigned BlockSize_, 00392 class AllocStr_> 00393 class runs_creator< 00394 use_push<ValueType_>, 00395 Cmp_, 00396 BlockSize_, 00397 AllocStr_> 00398 : private noncopyable 00399 { 00400 Cmp_ cmp; 00401 00402 public: 00403 typedef Cmp_ cmp_type; 00404 typedef ValueType_ value_type; 00405 typedef typed_block<BlockSize_, value_type> block_type; 00406 typedef sort_helper::trigger_entry<block_type> trigger_entry_type; 00407 typedef sorted_runs<trigger_entry_type> sorted_runs_type; 00408 typedef sorted_runs_type result_type; 00409 00410 private: 00411 typedef typename sorted_runs_type::run_type run_type; 00412 sorted_runs_type result_; // stores the result (sorted runs) 00413 unsigned_type m_; // memory for internal use in blocks 00414 00415 bool result_computed; // true after the result() method was called for the first time 00416 00417 const unsigned_type m2; 00418 const unsigned_type el_in_run; 00419 unsigned_type cur_el; 00420 block_type * Blocks1; 00421 block_type * Blocks2; 00422 request_ptr * write_reqs; 00423 run_type run; 00424 00425 void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx) 00426 { 00427 unsigned_type last_idx = num_blocks * block_type::size; 00428 if (first_idx < last_idx) { 00429 typename element_iterator_traits<block_type>::element_iterator curr = 00430 make_element_iterator(blocks, first_idx); 00431 while (first_idx != last_idx) { 00432 *curr = cmp.max_value(); 00433 ++curr; 00434 ++first_idx; 00435 } 00436 } 00437 } 00438 00439 void sort_run(block_type * run, unsigned_type elements) 00440 { 00441 potentially_parallel:: 00442 sort(make_element_iterator(run, 0), 00443 make_element_iterator(run, elements), 00444 cmp); 00445 } 00446 00447 void compute_result() 00448 { 00449 if (cur_el == 0) 00450 return; 00451 00452 unsigned_type cur_el_reg = cur_el; 00453 sort_run(Blocks1, cur_el_reg); 00454 result_.elements += cur_el_reg; 00455 if (cur_el_reg <= block_type::size && result_.elements == cur_el_reg) 00456 { 00457 // small input, do not flush it on the disk(s) 00458 STXXL_VERBOSE1("runs_creator(use_push): Small input optimization, input length: " << cur_el_reg); 00459 result_.small_.resize(cur_el_reg); 00460 std::copy(Blocks1[0].begin(), Blocks1[0].begin() + cur_el_reg, result_.small_.begin()); 00461 return; 00462 } 00463 00464 const unsigned_type cur_run_size = div_ceil(cur_el_reg, block_type::size); // in blocks 00465 run.resize(cur_run_size); 00466 block_manager * bm = block_manager::get_instance(); 00467 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end())); 00468 00469 disk_queues::get_instance()->set_priority_op(request_queue::WRITE); 00470 00471 result_.runs_sizes.push_back(cur_el_reg); 00472 00473 // fill the rest of the last block with max values 00474 fill_with_max_value(Blocks1, cur_run_size, cur_el_reg); 00475 00476 unsigned_type i = 0; 00477 for ( ; i < cur_run_size; ++i) 00478 { 00479 run[i].value = Blocks1[i][0]; 00480 if (write_reqs[i].get()) 00481 write_reqs[i]->wait(); 00482 00483 write_reqs[i] = Blocks1[i].write(run[i].bid); 00484 } 00485 result_.runs.push_back(run); 00486 00487 for (i = 0; i < m2; ++i) 00488 if (write_reqs[i].get()) 00489 write_reqs[i]->wait(); 00490 } 00491 00492 void cleanup() 00493 { 00494 delete[] write_reqs; 00495 delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2); 00496 write_reqs = NULL; 00497 Blocks1 = Blocks2 = NULL; 00498 } 00499 00500 public: 00504 runs_creator(Cmp_ c, unsigned_type memory_to_use) : 00505 cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false), 00506 m2(m_ / 2), 00507 el_in_run(m2 * block_type::size), 00508 cur_el(0), 00509 Blocks1(new block_type[m2 * 2]), 00510 Blocks2(Blocks1 + m2), 00511 write_reqs(new request_ptr[m2]) 00512 { 00513 sort_helper::verify_sentinel_strict_weak_ordering(cmp); 00514 if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) { 00515 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'"); 00516 } 00517 assert(m2 > 0); 00518 } 00519 00520 ~runs_creator() 00521 { 00522 if (!result_computed) 00523 cleanup(); 00524 } 00525 00528 void push(const value_type & val) 00529 { 00530 assert(result_computed == false); 00531 unsigned_type cur_el_reg = cur_el; 00532 if (cur_el_reg < el_in_run) 00533 { 00534 Blocks1[cur_el_reg / block_type::size][cur_el_reg % block_type::size] = val; 00535 ++cur_el; 00536 return; 00537 } 00538 00539 assert(el_in_run == cur_el); 00540 cur_el = 0; 00541 00542 //sort and store Blocks1 00543 sort_run(Blocks1, el_in_run); 00544 result_.elements += el_in_run; 00545 00546 const unsigned_type cur_run_size = div_ceil(el_in_run, block_type::size); // in blocks 00547 run.resize(cur_run_size); 00548 block_manager * bm = block_manager::get_instance(); 00549 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end())); 00550 00551 disk_queues::get_instance()->set_priority_op(request_queue::WRITE); 00552 00553 result_.runs_sizes.push_back(el_in_run); 00554 00555 for (unsigned_type i = 0; i < cur_run_size; ++i) 00556 { 00557 run[i].value = Blocks1[i][0]; 00558 if (write_reqs[i].get()) 00559 write_reqs[i]->wait(); 00560 00561 write_reqs[i] = Blocks1[i].write(run[i].bid); 00562 } 00563 00564 result_.runs.push_back(run); 00565 00566 std::swap(Blocks1, Blocks2); 00567 00568 push(val); 00569 } 00570 00574 const sorted_runs_type & result() 00575 { 00576 if (1) 00577 { 00578 if (!result_computed) 00579 { 00580 compute_result(); 00581 result_computed = true; 00582 cleanup(); 00583 #ifdef STXXL_PRINT_STAT_AFTER_RF 00584 STXXL_MSG(*stats::get_instance()); 00585 #endif //STXXL_PRINT_STAT_AFTER_RF 00586 } 00587 } 00588 return result_; 00589 } 00590 }; 00591 00592 00599 template <class ValueType_> 00600 struct from_sorted_sequences 00601 { 00602 typedef ValueType_ value_type; 00603 }; 00604 00615 template < 00616 class ValueType_, 00617 class Cmp_, 00618 unsigned BlockSize_, 00619 class AllocStr_> 00620 class runs_creator< 00621 from_sorted_sequences<ValueType_>, 00622 Cmp_, 00623 BlockSize_, 00624 AllocStr_> 00625 : private noncopyable 00626 { 00627 typedef ValueType_ value_type; 00628 typedef typed_block<BlockSize_, value_type> block_type; 00629 typedef sort_helper::trigger_entry<block_type> trigger_entry_type; 00630 typedef AllocStr_ alloc_strategy_type; 00631 Cmp_ cmp; 00632 00633 public: 00634 typedef Cmp_ cmp_type; 00635 typedef sorted_runs<trigger_entry_type> sorted_runs_type; 00636 typedef sorted_runs_type result_type; 00637 00638 private: 00639 typedef typename sorted_runs_type::run_type run_type; 00640 00641 sorted_runs_type result_; // stores the result (sorted runs) 00642 unsigned_type m_; // memory for internal use in blocks 00643 buffered_writer<block_type> writer; 00644 block_type * cur_block; 00645 unsigned_type offset; 00646 unsigned_type iblock; 00647 unsigned_type irun; 00648 alloc_strategy_type alloc_strategy; // needs to be reset after each run 00649 00650 public: 00655 runs_creator(Cmp_ c, unsigned_type memory_to_use) : 00656 cmp(c), 00657 m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), 00658 writer(m_, m_ / 2), 00659 cur_block(writer.get_free_block()), 00660 offset(0), 00661 iblock(0), 00662 irun(0) 00663 { 00664 sort_helper::verify_sentinel_strict_weak_ordering(cmp); 00665 assert(m_ > 0); 00666 if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) { 00667 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'"); 00668 } 00669 } 00670 00673 void push(const value_type & val) 00674 { 00675 assert(offset < block_type::size); 00676 00677 (*cur_block)[offset] = val; 00678 ++offset; 00679 00680 if (offset == block_type::size) 00681 { 00682 // write current block 00683 00684 block_manager * bm = block_manager::get_instance(); 00685 // allocate space for the block 00686 result_.runs.resize(irun + 1); 00687 result_.runs[irun].resize(iblock + 1); 00688 bm->new_blocks( 00689 alloc_strategy, 00690 make_bid_iterator(result_.runs[irun].begin() + iblock), 00691 make_bid_iterator(result_.runs[irun].end()), 00692 iblock 00693 ); 00694 00695 result_.runs[irun][iblock].value = (*cur_block)[0]; // init trigger 00696 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid); 00697 ++iblock; 00698 00699 offset = 0; 00700 } 00701 00702 ++result_.elements; 00703 } 00704 00706 void finish() 00707 { 00708 if (offset == 0 && iblock == 0) // current run is empty 00709 return; 00710 00711 00712 result_.runs_sizes.resize(irun + 1); 00713 result_.runs_sizes.back() = iblock * block_type::size + offset; 00714 00715 if (offset) // if current block is partially filled 00716 { 00717 while (offset != block_type::size) 00718 { 00719 (*cur_block)[offset] = cmp.max_value(); 00720 ++offset; 00721 } 00722 offset = 0; 00723 00724 block_manager * bm = block_manager::get_instance(); 00725 // allocate space for the block 00726 result_.runs.resize(irun + 1); 00727 result_.runs[irun].resize(iblock + 1); 00728 bm->new_blocks( 00729 alloc_strategy, 00730 make_bid_iterator(result_.runs[irun].begin() + iblock), 00731 make_bid_iterator(result_.runs[irun].end()), 00732 iblock 00733 ); 00734 00735 result_.runs[irun][iblock].value = (*cur_block)[0]; // init trigger 00736 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid); 00737 } 00738 else 00739 { } 00740 00741 alloc_strategy = alloc_strategy_type(); // reinitialize block allocator for the next run 00742 iblock = 0; 00743 ++irun; 00744 } 00745 00749 const sorted_runs_type & result() 00750 { 00751 finish(); 00752 writer.flush(); 00753 00754 return result_; 00755 } 00756 }; 00757 00758 00763 template <class RunsType_, class Cmp_> 00764 bool check_sorted_runs(const RunsType_ & sruns, Cmp_ cmp) 00765 { 00766 sort_helper::verify_sentinel_strict_weak_ordering(cmp); 00767 typedef typename RunsType_::block_type block_type; 00768 typedef typename block_type::value_type value_type; 00769 STXXL_VERBOSE2("Elements: " << sruns.elements); 00770 unsigned_type nruns = sruns.runs.size(); 00771 STXXL_VERBOSE2("Runs: " << nruns); 00772 unsigned_type irun = 0; 00773 for (irun = 0; irun < nruns; ++irun) 00774 { 00775 const unsigned_type nblocks = sruns.runs[irun].size(); 00776 block_type * blocks = new block_type[nblocks]; 00777 request_ptr * reqs = new request_ptr[nblocks]; 00778 for (unsigned_type j = 0; j < nblocks; ++j) 00779 { 00780 reqs[j] = blocks[j].read(sruns.runs[irun][j].bid); 00781 } 00782 wait_all(reqs, reqs + nblocks); 00783 for (unsigned_type j = 0; j < nblocks; ++j) 00784 { 00785 if (cmp(blocks[j][0], sruns.runs[irun][j].value) || 00786 cmp(sruns.runs[irun][j].value, blocks[j][0])) 00787 { 00788 STXXL_ERRMSG("check_sorted_runs wrong trigger in the run"); 00789 return false; 00790 } 00791 } 00792 if (!stxxl::is_sorted(make_element_iterator(blocks, 0), 00793 make_element_iterator(blocks, sruns.runs_sizes[irun]), 00794 cmp)) 00795 { 00796 STXXL_ERRMSG("check_sorted_runs wrong order in the run"); 00797 return false; 00798 } 00799 00800 delete[] reqs; 00801 delete[] blocks; 00802 } 00803 00804 STXXL_MSG("Checking runs finished successfully"); 00805 00806 return true; 00807 } 00808 00809 00811 // MERGE RUNS // 00813 00820 template <class RunsType_, 00821 class Cmp_, 00822 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY> 00823 class basic_runs_merger : private noncopyable 00824 { 00825 protected: 00826 typedef RunsType_ sorted_runs_type; 00827 typedef AllocStr_ alloc_strategy; 00828 typedef typename sorted_runs_type::size_type size_type; 00829 typedef Cmp_ value_cmp; 00830 typedef typename sorted_runs_type::run_type run_type; 00831 typedef typename sorted_runs_type::block_type block_type; 00832 typedef block_type out_block_type; 00833 typedef typename run_type::value_type trigger_entry_type; 00834 typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type; 00835 typedef run_cursor2<block_type, prefetcher_type> run_cursor_type; 00836 typedef sort_helper::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type; 00837 typedef loser_tree<run_cursor_type, run_cursor2_cmp_type> loser_tree_type; 00838 typedef stxxl::int64 diff_type; 00839 typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence; 00840 typedef typename std::vector<sequence>::size_type seqs_size_type; 00841 00842 public: 00844 typedef typename sorted_runs_type::value_type value_type; 00845 00846 private: 00847 sorted_runs_type sruns; 00848 value_cmp cmp; 00849 size_type elements_remaining; 00850 00851 out_block_type * current_block; 00852 unsigned_type buffer_pos; 00853 value_type current_value; // cache for the current value 00854 00855 run_type consume_seq; 00856 int_type * prefetch_seq; 00857 prefetcher_type * prefetcher; 00858 loser_tree_type * losers; 00859 #if STXXL_PARALLEL_MULTIWAY_MERGE 00860 std::vector<sequence> * seqs; 00861 std::vector<block_type *> * buffers; 00862 diff_type num_currently_mergeable; 00863 #endif 00864 00865 #if STXXL_CHECK_ORDER_IN_SORTS 00866 value_type last_element; 00867 #endif //STXXL_CHECK_ORDER_IN_SORTS 00868 00870 00871 void merge_recursively(unsigned_type memory_to_use); 00872 00873 void deallocate_prefetcher() 00874 { 00875 if (prefetcher) 00876 { 00877 delete losers; 00878 #if STXXL_PARALLEL_MULTIWAY_MERGE 00879 delete seqs; 00880 delete buffers; 00881 #endif 00882 delete prefetcher; 00883 delete[] prefetch_seq; 00884 prefetcher = NULL; 00885 } 00886 // free blocks in runs , (or the user should do it?) 00887 sruns.deallocate_blocks(); 00888 } 00889 00890 void fill_current_block() 00891 { 00892 STXXL_VERBOSE1("fill_current_block"); 00893 if (do_parallel_merge()) 00894 { 00895 #if STXXL_PARALLEL_MULTIWAY_MERGE 00896 // begin of STL-style merging 00897 diff_type rest = out_block_type::size; // elements still to merge for this output block 00898 00899 do // while rest > 0 and still elements available 00900 { 00901 if (num_currently_mergeable < rest) 00902 { 00903 if (!prefetcher || prefetcher->empty()) 00904 { 00905 // anything remaining is already in memory 00906 num_currently_mergeable = elements_remaining; 00907 } 00908 else 00909 { 00910 num_currently_mergeable = sort_helper::count_elements_less_equal( 00911 *seqs, consume_seq[prefetcher->pos()].value, cmp); 00912 } 00913 } 00914 00915 diff_type output_size = STXXL_MIN(num_currently_mergeable, rest); // at most rest elements 00916 00917 STXXL_VERBOSE1("before merge " << output_size); 00918 00919 stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), current_block->end() - rest, cmp, output_size); 00920 // sequence iterators are progressed appropriately 00921 00922 rest -= output_size; 00923 num_currently_mergeable -= output_size; 00924 00925 STXXL_VERBOSE1("after merge"); 00926 00927 sort_helper::refill_or_remove_empty_sequences(*seqs, *buffers, *prefetcher); 00928 } while (rest > 0 && (*seqs).size() > 0); 00929 00930 #if STXXL_CHECK_ORDER_IN_SORTS 00931 if (!stxxl::is_sorted(current_block->begin(), current_block->end(), cmp)) 00932 { 00933 for (value_type * i = current_block->begin() + 1; i != current_block->end(); ++i) 00934 if (cmp(*i, *(i - 1))) 00935 { 00936 STXXL_VERBOSE1("Error at position " << (i - current_block->begin())); 00937 } 00938 assert(false); 00939 } 00940 #endif //STXXL_CHECK_ORDER_IN_SORTS 00941 00942 // end of STL-style merging 00943 #else 00944 STXXL_THROW_UNREACHABLE(); 00945 #endif //STXXL_PARALLEL_MULTIWAY_MERGE 00946 } 00947 else 00948 { 00949 // begin of native merging procedure 00950 losers->multi_merge(current_block->elem, current_block->elem + STXXL_MIN<size_type>(out_block_type::size, elements_remaining)); 00951 // end of native merging procedure 00952 } 00953 STXXL_VERBOSE1("current block filled"); 00954 00955 if (elements_remaining <= out_block_type::size) 00956 deallocate_prefetcher(); 00957 } 00958 00959 public: 00964 basic_runs_merger(const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) : 00965 sruns(r), 00966 cmp(c), 00967 elements_remaining(sruns.elements), 00968 current_block(NULL), 00969 buffer_pos(0), 00970 prefetch_seq(NULL), 00971 prefetcher(NULL), 00972 losers(NULL) 00973 #if STXXL_PARALLEL_MULTIWAY_MERGE 00974 , seqs(NULL), 00975 buffers(NULL), 00976 num_currently_mergeable(0) 00977 #endif 00978 #if STXXL_CHECK_ORDER_IN_SORTS 00979 , last_element(cmp.min_value()) 00980 #endif //STXXL_CHECK_ORDER_IN_SORTS 00981 { 00982 initialize(r, memory_to_use); 00983 } 00984 00985 protected: 00986 void initialize(const sorted_runs_type & r, unsigned_type memory_to_use) 00987 { 00988 sort_helper::verify_sentinel_strict_weak_ordering(cmp); 00989 00990 sruns = r; 00991 elements_remaining = r.elements; 00992 00993 if (empty()) 00994 return; 00995 00996 if (!sruns.small_run().empty()) 00997 { 00998 // we have a small input <= B, that is kept in the main memory 00999 STXXL_VERBOSE1("basic_runs_merger: small input optimization, input length: " << elements_remaining); 01000 assert(elements_remaining == size_type(sruns.small_run().size())); 01001 assert(sruns.small_run().size() <= out_block_type::size); 01002 current_block = new out_block_type; 01003 std::copy(sruns.small_run().begin(), sruns.small_run().end(), current_block->begin()); 01004 current_value = current_block->elem[0]; 01005 buffer_pos = 1; 01006 01007 return; 01008 } 01009 01010 #if STXXL_CHECK_ORDER_IN_SORTS 01011 assert(check_sorted_runs(r, cmp)); 01012 #endif //STXXL_CHECK_ORDER_IN_SORTS 01013 01014 disk_queues::get_instance()->set_priority_op(request_queue::WRITE); 01015 01016 int_type disks_number = config::get_instance()->disks_number(); 01017 unsigned_type min_prefetch_buffers = 2 * disks_number; 01018 unsigned_type input_buffers = (memory_to_use > sizeof(out_block_type) ? memory_to_use - sizeof(out_block_type) : 0) / block_type::raw_size; 01019 unsigned_type nruns = sruns.runs.size(); 01020 01021 if (input_buffers < nruns + min_prefetch_buffers) 01022 { 01023 // can not merge runs in one pass 01024 // merge recursively: 01025 STXXL_WARNMSG_RECURSIVE_SORT("The implementation of sort requires more than one merge pass, therefore for a better"); 01026 STXXL_WARNMSG_RECURSIVE_SORT("efficiency decrease block size of run storage (a parameter of the run_creator)"); 01027 STXXL_WARNMSG_RECURSIVE_SORT("or increase the amount memory dedicated to the merger."); 01028 STXXL_WARNMSG_RECURSIVE_SORT("m=" << input_buffers << " nruns=" << nruns << " prefetch_blocks=" << min_prefetch_buffers); 01029 01030 // check whether we have enough memory to merge recursively 01031 unsigned_type recursive_merge_buffers = memory_to_use / block_type::raw_size; 01032 if (recursive_merge_buffers < 2 * min_prefetch_buffers + 1 + 2) { 01033 // recursive merge uses min_prefetch_buffers for input buffering and min_prefetch_buffers output buffering 01034 // as well as 1 current output block and at least 2 input blocks 01035 STXXL_ERRMSG("There are only m=" << recursive_merge_buffers << " blocks available for recursive merging, but " 01036 << min_prefetch_buffers << "+" << min_prefetch_buffers << "+1 are needed read-ahead/write-back/output, and"); 01037 STXXL_ERRMSG("the merger requires memory to store at least two input blocks internally. Aborting."); 01038 throw bad_parameter("basic_runs_merger::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'"); 01039 } 01040 01041 merge_recursively(memory_to_use); 01042 01043 nruns = sruns.runs.size(); 01044 } 01045 01046 assert(nruns + min_prefetch_buffers <= input_buffers); 01047 01048 unsigned_type i; 01049 unsigned_type prefetch_seq_size = 0; 01050 for (i = 0; i < nruns; ++i) 01051 { 01052 prefetch_seq_size += sruns.runs[i].size(); 01053 } 01054 01055 consume_seq.resize(prefetch_seq_size); 01056 prefetch_seq = new int_type[prefetch_seq_size]; 01057 01058 typename run_type::iterator copy_start = consume_seq.begin(); 01059 for (i = 0; i < nruns; ++i) 01060 { 01061 copy_start = std::copy( 01062 sruns.runs[i].begin(), 01063 sruns.runs[i].end(), 01064 copy_start); 01065 } 01066 01067 std::stable_sort(consume_seq.begin(), consume_seq.end(), 01068 sort_helper::trigger_entry_cmp<trigger_entry_type, value_cmp>(cmp) _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL); 01069 01070 const unsigned_type n_prefetch_buffers = STXXL_MAX(min_prefetch_buffers, input_buffers - nruns); 01071 01072 #if STXXL_SORT_OPTIMAL_PREFETCHING 01073 // heuristic 01074 const int_type n_opt_prefetch_buffers = min_prefetch_buffers + (3 * (n_prefetch_buffers - min_prefetch_buffers)) / 10; 01075 01076 compute_prefetch_schedule( 01077 consume_seq, 01078 prefetch_seq, 01079 n_opt_prefetch_buffers, 01080 disks_number); 01081 #else 01082 for (i = 0; i < prefetch_seq_size; ++i) 01083 prefetch_seq[i] = i; 01084 #endif //STXXL_SORT_OPTIMAL_PREFETCHING 01085 01086 prefetcher = new prefetcher_type( 01087 consume_seq.begin(), 01088 consume_seq.end(), 01089 prefetch_seq, 01090 STXXL_MIN(nruns + n_prefetch_buffers, prefetch_seq_size)); 01091 01092 if (do_parallel_merge()) 01093 { 01094 #if STXXL_PARALLEL_MULTIWAY_MERGE 01095 // begin of STL-style merging 01096 seqs = new std::vector<sequence>(nruns); 01097 buffers = new std::vector<block_type *>(nruns); 01098 01099 for (unsigned_type i = 0; i < nruns; ++i) //initialize sequences 01100 { 01101 (*buffers)[i] = prefetcher->pull_block(); //get first block of each run 01102 (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end()); //this memory location stays the same, only the data is exchanged 01103 } 01104 // end of STL-style merging 01105 #else 01106 STXXL_THROW_UNREACHABLE(); 01107 #endif //STXXL_PARALLEL_MULTIWAY_MERGE 01108 } 01109 else 01110 { 01111 // begin of native merging procedure 01112 losers = new loser_tree_type(prefetcher, nruns, run_cursor2_cmp_type(cmp)); 01113 // end of native merging procedure 01114 } 01115 01116 current_block = new out_block_type; 01117 fill_current_block(); 01118 01119 current_value = current_block->elem[0]; 01120 buffer_pos = 1; 01121 } 01122 01123 public: 01125 bool empty() const 01126 { 01127 return elements_remaining == 0; 01128 } 01129 01131 const value_type & operator * () const 01132 { 01133 assert(!empty()); 01134 return current_value; 01135 } 01136 01138 const value_type * operator -> () const 01139 { 01140 return &(operator * ()); 01141 } 01142 01144 basic_runs_merger & operator ++ () // preincrement operator 01145 { 01146 assert(!empty()); 01147 01148 --elements_remaining; 01149 01150 if (buffer_pos != out_block_type::size) 01151 { 01152 current_value = current_block->elem[buffer_pos]; 01153 ++buffer_pos; 01154 } 01155 else 01156 { 01157 if (!empty()) 01158 { 01159 fill_current_block(); 01160 01161 #if STXXL_CHECK_ORDER_IN_SORTS 01162 assert(stxxl::is_sorted(current_block->elem, current_block->elem + STXXL_MIN<size_type>(elements_remaining, current_block->size), cmp)); 01163 assert(!cmp(current_block->elem[0], current_value)); 01164 #endif //STXXL_CHECK_ORDER_IN_SORTS 01165 current_value = current_block->elem[0]; 01166 buffer_pos = 1; 01167 } 01168 } 01169 01170 #if STXXL_CHECK_ORDER_IN_SORTS 01171 if (!empty()) 01172 { 01173 assert(!cmp(current_value, last_element)); 01174 last_element = current_value; 01175 } 01176 #endif //STXXL_CHECK_ORDER_IN_SORTS 01177 01178 return *this; 01179 } 01180 01183 virtual ~basic_runs_merger() 01184 { 01185 deallocate_prefetcher(); 01186 delete current_block; 01187 } 01188 }; 01189 01190 01191 template <class RunsType_, class Cmp_, class AllocStr_> 01192 void basic_runs_merger<RunsType_, Cmp_, AllocStr_>::merge_recursively(unsigned_type memory_to_use) 01193 { 01194 block_manager * bm = block_manager::get_instance(); 01195 unsigned_type ndisks = config::get_instance()->disks_number(); 01196 unsigned_type nwrite_buffers = 2 * ndisks; 01197 unsigned_type memory_for_write_buffers = nwrite_buffers * sizeof(block_type); 01198 01199 // memory consumption of the recursive merger (uses block_type as out_block_type) 01200 unsigned_type recursive_merger_memory_prefetch_buffers = 2 * ndisks * sizeof(block_type); 01201 unsigned_type recursive_merger_memory_out_block = sizeof(block_type); 01202 unsigned_type memory_for_buffers = memory_for_write_buffers 01203 + recursive_merger_memory_prefetch_buffers 01204 + recursive_merger_memory_out_block; 01205 // maximum arity in the recursive merger 01206 unsigned_type max_arity = (memory_to_use > memory_for_buffers ? memory_to_use - memory_for_buffers : 0) / block_type::raw_size; 01207 01208 unsigned_type nruns = sruns.runs.size(); 01209 const unsigned_type merge_factor = optimal_merge_factor(nruns, max_arity); 01210 assert(merge_factor > 1); 01211 assert(merge_factor <= max_arity); 01212 while (nruns > max_arity) 01213 { 01214 unsigned_type new_nruns = div_ceil(nruns, merge_factor); 01215 STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns << 01216 " opt_merge_factor: " << merge_factor << " max_arity: " << max_arity << " new_nruns: " << new_nruns); 01217 01218 sorted_runs_type new_runs; 01219 new_runs.runs.resize(new_nruns); 01220 new_runs.runs_sizes.resize(new_nruns); 01221 new_runs.elements = sruns.elements; 01222 01223 unsigned_type runs_left = nruns; 01224 unsigned_type cur_out_run = 0; 01225 unsigned_type elements_in_new_run = 0; 01226 01227 while (runs_left > 0) 01228 { 01229 int_type runs2merge = STXXL_MIN(runs_left, merge_factor); 01230 elements_in_new_run = 0; 01231 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i) 01232 { 01233 elements_in_new_run += sruns.runs_sizes[i]; 01234 } 01235 const unsigned_type blocks_in_new_run1 = div_ceil(elements_in_new_run, block_type::size); 01236 01237 new_runs.runs_sizes[cur_out_run] = elements_in_new_run; 01238 // allocate run 01239 new_runs.runs[cur_out_run++].resize(blocks_in_new_run1); 01240 runs_left -= runs2merge; 01241 } 01242 01243 // allocate blocks for the new runs 01244 for (unsigned_type i = 0; i < new_runs.runs.size(); ++i) 01245 bm->new_blocks(alloc_strategy(), make_bid_iterator(new_runs.runs[i].begin()), make_bid_iterator(new_runs.runs[i].end())); 01246 01247 // merge all 01248 runs_left = nruns; 01249 cur_out_run = 0; 01250 size_type elements_left = sruns.elements; 01251 01252 while (runs_left > 0) 01253 { 01254 unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor); 01255 STXXL_VERBOSE("Merging " << runs2merge << " runs"); 01256 01257 sorted_runs_type cur_runs; 01258 cur_runs.runs.resize(runs2merge); 01259 cur_runs.runs_sizes.resize(runs2merge); 01260 01261 std::copy(sruns.runs.begin() + nruns - runs_left, 01262 sruns.runs.begin() + nruns - runs_left + runs2merge, 01263 cur_runs.runs.begin()); 01264 std::copy(sruns.runs_sizes.begin() + nruns - runs_left, 01265 sruns.runs_sizes.begin() + nruns - runs_left + runs2merge, 01266 cur_runs.runs_sizes.begin()); 01267 01268 runs_left -= runs2merge; 01269 /* 01270 cur_runs.elements = (runs_left)? 01271 (new_runs.runs[cur_out_run].size()*block_type::size): 01272 (elements_left); 01273 */ 01274 cur_runs.elements = new_runs.runs_sizes[cur_out_run]; 01275 elements_left -= cur_runs.elements; 01276 01277 if (runs2merge > 1) 01278 { 01279 basic_runs_merger<RunsType_, Cmp_, AllocStr_> merger(cur_runs, cmp, memory_to_use - memory_for_write_buffers); 01280 01281 { // make sure everything is being destroyed in right time 01282 buf_ostream<block_type, typename run_type::iterator> out( 01283 new_runs.runs[cur_out_run].begin(), 01284 nwrite_buffers); 01285 01286 size_type cnt = 0; 01287 const size_type cnt_max = cur_runs.elements; 01288 01289 while (cnt != cnt_max) 01290 { 01291 *out = *merger; 01292 if ((cnt % block_type::size) == 0) // have to write the trigger value 01293 new_runs.runs[cur_out_run][cnt / size_type(block_type::size)].value = *merger; 01294 01295 ++cnt; 01296 ++out; 01297 ++merger; 01298 } 01299 assert(merger.empty()); 01300 01301 while (cnt % block_type::size) 01302 { 01303 *out = cmp.max_value(); 01304 ++out; 01305 ++cnt; 01306 } 01307 } 01308 } 01309 else 01310 { 01311 bm->delete_blocks( 01312 make_bid_iterator(new_runs.runs.back().begin()), 01313 make_bid_iterator(new_runs.runs.back().end())); 01314 01315 assert(cur_runs.runs.size() == 1); 01316 01317 std::copy(cur_runs.runs.front().begin(), 01318 cur_runs.runs.front().end(), 01319 new_runs.runs.back().begin()); 01320 new_runs.runs_sizes.back() = cur_runs.runs_sizes.front(); 01321 } 01322 01323 ++cur_out_run; 01324 } 01325 assert(elements_left == 0); 01326 01327 nruns = new_nruns; 01328 sruns = new_runs; 01329 } 01330 } 01331 01332 01339 template <class RunsType_, 01340 class Cmp_, 01341 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY> 01342 class runs_merger : public basic_runs_merger<RunsType_, Cmp_, AllocStr_> 01343 { 01344 private: 01345 typedef RunsType_ sorted_runs_type; 01346 typedef basic_runs_merger<RunsType_, Cmp_, AllocStr_> base; 01347 typedef typename base::value_cmp value_cmp; 01348 typedef typename base::block_type block_type; 01349 01350 public: 01355 runs_merger(const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) : 01356 base(r, c, memory_to_use) 01357 { } 01358 }; 01359 01360 01362 // SORT // 01364 01372 template <class Input_, 01373 class Cmp_, 01374 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type), 01375 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY, 01376 class runs_creator_type = runs_creator<Input_, Cmp_, BlockSize_, AllocStr_> > 01377 class sort : public noncopyable 01378 { 01379 typedef typename runs_creator_type::sorted_runs_type sorted_runs_type; 01380 typedef runs_merger<sorted_runs_type, Cmp_, AllocStr_> runs_merger_type; 01381 01382 runs_creator_type creator; 01383 runs_merger_type merger; 01384 01385 public: 01387 typedef typename Input_::value_type value_type; 01388 01393 sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use) : 01394 creator(in, c, memory_to_use), 01395 merger(creator.result(), c, memory_to_use) 01396 { 01397 sort_helper::verify_sentinel_strict_weak_ordering(c); 01398 } 01399 01405 sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use_rc, unsigned_type memory_to_use_m) : 01406 creator(in, c, memory_to_use_rc), 01407 merger(creator.result(), c, memory_to_use_m) 01408 { 01409 sort_helper::verify_sentinel_strict_weak_ordering(c); 01410 } 01411 01412 01414 bool empty() const 01415 { 01416 return merger.empty(); 01417 } 01418 01420 const value_type & operator * () const 01421 { 01422 assert(!empty()); 01423 return *merger; 01424 } 01425 01426 const value_type * operator -> () const 01427 { 01428 assert(!empty()); 01429 return merger.operator -> (); 01430 } 01431 01433 sort & operator ++ () 01434 { 01435 ++merger; 01436 return *this; 01437 } 01438 }; 01439 01444 template < 01445 class ValueType_, 01446 unsigned BlockSize_> 01447 class compute_sorted_runs_type 01448 { 01449 typedef ValueType_ value_type; 01450 typedef BID<BlockSize_> bid_type; 01451 typedef sort_helper::trigger_entry<bid_type, value_type> trigger_entry_type; 01452 01453 public: 01454 typedef sorted_runs<trigger_entry_type> result; 01455 }; 01456 01458 } 01459 01462 01464 01473 template <unsigned BlockSize, 01474 class RandomAccessIterator, 01475 class CmpType, 01476 class AllocStr> 01477 void sort(RandomAccessIterator begin, 01478 RandomAccessIterator end, 01479 CmpType cmp, 01480 unsigned_type MemSize, 01481 AllocStr AS) 01482 { 01483 STXXL_UNUSED(AS); 01484 #ifdef BOOST_MSVC 01485 typedef typename streamify_traits<RandomAccessIterator>::stream_type InputType; 01486 #else 01487 typedef __typeof__(stream::streamify(begin, end)) InputType; 01488 #endif //BOOST_MSVC 01489 InputType Input(begin, end); 01490 typedef stream::sort<InputType, CmpType, BlockSize, AllocStr> sorter_type; 01491 sorter_type Sort(Input, cmp, MemSize); 01492 stream::materialize(Sort, begin); 01493 } 01494 01496 01497 __STXXL_END_NAMESPACE 01498 01499 #endif // !STXXL_SORT_STREAM_HEADER 01500 // vim: et:ts=4:sw=4