Stxxl
1.3.1
|
00001 /*************************************************************************** 00002 * include/stxxl/bits/containers/priority_queue.h 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 1999 Peter Sanders <sanders@mpi-sb.mpg.de> 00007 * Copyright (C) 2003, 2004, 2007 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00008 * Copyright (C) 2007-2009 Johannes Singler <singler@ira.uka.de> 00009 * Copyright (C) 2007-2010 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> 00010 * 00011 * Distributed under the Boost Software License, Version 1.0. 00012 * (See accompanying file LICENSE_1_0.txt or copy at 00013 * http://www.boost.org/LICENSE_1_0.txt) 00014 **************************************************************************/ 00015 00016 #ifndef STXXL_PRIORITY_QUEUE_HEADER 00017 #define STXXL_PRIORITY_QUEUE_HEADER 00018 00019 #include <vector> 00020 00021 #include <stxxl/bits/deprecated.h> 00022 #include <stxxl/bits/mng/typed_block.h> 00023 #include <stxxl/bits/mng/block_alloc.h> 00024 #include <stxxl/bits/mng/read_write_pool.h> 00025 #include <stxxl/bits/mng/prefetch_pool.h> 00026 #include <stxxl/bits/mng/write_pool.h> 00027 #include <stxxl/bits/common/tmeta.h> 00028 #include <stxxl/bits/algo/sort_base.h> 00029 #include <stxxl/bits/parallel.h> 00030 #include <stxxl/bits/common/is_sorted.h> 00031 00032 #if STXXL_PARALLEL 00033 00034 #if defined(STXXL_PARALLEL_MODE) && ((__GNUC__ * 10000 + __GNUC_MINOR__ * 100) < 40400) 00035 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 00036 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 00037 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 00038 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 0 00039 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 0 00040 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 0 00041 #endif 00042 00043 // enable/disable parallel merging for certain cases, for performance tuning 00044 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 00045 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 1 00046 #endif 00047 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 00048 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 1 00049 #endif 00050 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 00051 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 1 00052 #endif 00053 00054 #endif //STXXL_PARALLEL 00055 00056 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 00057 #define STXXL_PQ_EXTERNAL_LOSER_TREE 0 // no loser tree for the external sequences 00058 #else 00059 #define STXXL_PQ_EXTERNAL_LOSER_TREE 1 00060 #endif 00061 00062 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 00063 #define STXXL_PQ_INTERNAL_LOSER_TREE 0 // no loser tree for the internal sequences 00064 #else 00065 #define STXXL_PQ_INTERNAL_LOSER_TREE 1 00066 #endif 00067 00068 #include <stxxl/bits/containers/pq_helpers.h> 00069 #include <stxxl/bits/containers/pq_mergers.h> 00070 #include <stxxl/bits/containers/pq_ext_merger.h> 00071 #include <stxxl/bits/containers/pq_losertree.h> 00072 00073 __STXXL_BEGIN_NAMESPACE 00074 00075 /* 00076 KNBufferSize1 = 32; 00077 KNN = 512; // length of group 1 sequences 00078 KNKMAX = 64; // maximal arity 00079 LogKNKMAX = 6; // ceil(log KNKMAX) 00080 KNLevels = 4; // overall capacity >= KNN*KNKMAX^KNLevels 00081 */ 00082 00083 // internal memory consumption >= N_*(KMAX_^IntLevels_) + ext 00084 00085 template < 00086 class Tp_, 00087 class Cmp_, 00088 unsigned BufferSize1_ = 32, // equalize procedure call overheads etc. 00089 unsigned N_ = 512, // length of group 1 sequences 00090 unsigned IntKMAX_ = 64, // maximal arity for internal mergers 00091 unsigned IntLevels_ = 4, // number of internal groups 00092 unsigned BlockSize_ = (2 * 1024 * 1024), // external block size 00093 unsigned ExtKMAX_ = 64, // maximal arity for external mergers 00094 unsigned ExtLevels_ = 2, // number of external groups 00095 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY 00096 > 00097 struct priority_queue_config 00098 { 00099 typedef Tp_ value_type; 00100 typedef Cmp_ comparator_type; 00101 typedef AllocStr_ alloc_strategy_type; 00102 enum 00103 { 00104 delete_buffer_size = BufferSize1_, 00105 N = N_, 00106 IntKMAX = IntKMAX_, 00107 num_int_groups = IntLevels_, 00108 num_ext_groups = ExtLevels_, 00109 BlockSize = BlockSize_, 00110 ExtKMAX = ExtKMAX_, 00111 element_size = sizeof(Tp_) 00112 }; 00113 }; 00114 00115 __STXXL_END_NAMESPACE 00116 00117 namespace std 00118 { 00119 template <class BlockType_, 00120 class Cmp_, 00121 unsigned Arity_, 00122 class AllocStr_> 00123 void swap(stxxl::priority_queue_local::ext_merger<BlockType_, Cmp_, Arity_, AllocStr_> & a, 00124 stxxl::priority_queue_local::ext_merger<BlockType_, Cmp_, Arity_, AllocStr_> & b) 00125 { 00126 a.swap(b); 00127 } 00128 template <class ValTp_, class Cmp_, unsigned KNKMAX> 00129 void swap(stxxl::priority_queue_local::loser_tree<ValTp_, Cmp_, KNKMAX> & a, 00130 stxxl::priority_queue_local::loser_tree<ValTp_, Cmp_, KNKMAX> & b) 00131 { 00132 a.swap(b); 00133 } 00134 } 00135 00136 __STXXL_BEGIN_NAMESPACE 00137 00139 template <class Config_> 00140 class priority_queue : private noncopyable 00141 { 00142 public: 00143 typedef Config_ Config; 00144 enum 00145 { 00146 delete_buffer_size = Config::delete_buffer_size, 00147 N = Config::N, 00148 IntKMAX = Config::IntKMAX, 00149 num_int_groups = Config::num_int_groups, 00150 num_ext_groups = Config::num_ext_groups, 00151 total_num_groups = Config::num_int_groups + Config::num_ext_groups, 00152 BlockSize = Config::BlockSize, 00153 ExtKMAX = Config::ExtKMAX 00154 }; 00155 00157 typedef typename Config::value_type value_type; 00159 typedef typename Config::comparator_type comparator_type; 00160 typedef typename Config::alloc_strategy_type alloc_strategy_type; 00162 typedef stxxl::uint64 size_type; 00163 typedef typed_block<BlockSize, value_type> block_type; 00164 typedef read_write_pool<block_type> pool_type; 00165 00166 protected: 00167 typedef priority_queue_local::internal_priority_queue<value_type, std::vector<value_type>, comparator_type> 00168 insert_heap_type; 00169 00170 typedef priority_queue_local::loser_tree< 00171 value_type, 00172 comparator_type, 00173 IntKMAX> int_merger_type; 00174 00175 typedef priority_queue_local::ext_merger< 00176 block_type, 00177 comparator_type, 00178 ExtKMAX, 00179 alloc_strategy_type> ext_merger_type; 00180 00181 00182 int_merger_type int_mergers[num_int_groups]; 00183 pool_type * pool; 00184 bool pool_owned; 00185 ext_merger_type * ext_mergers; 00186 00187 // one delete buffer for each tree => group buffer 00188 value_type group_buffers[total_num_groups][N + 1]; // tree->group_buffers->delete_buffer (extra space for sentinel) 00189 value_type * group_buffer_current_mins[total_num_groups]; // group_buffer_current_mins[i] is current start of group_buffers[i], end is group_buffers[i] + N 00190 00191 // overall delete buffer 00192 value_type delete_buffer[delete_buffer_size + 1]; 00193 value_type * delete_buffer_current_min; // current start of delete_buffer 00194 value_type * delete_buffer_end; // end of delete_buffer 00195 00196 comparator_type cmp; 00197 00198 // insert buffer 00199 insert_heap_type insert_heap; 00200 00201 // how many groups are active 00202 unsigned_type num_active_groups; 00203 00204 // total size not counting insert_heap and delete_buffer 00205 size_type size_; 00206 00207 private: 00208 void init(); 00209 00210 void refill_delete_buffer(); 00211 unsigned_type refill_group_buffer(unsigned_type k); 00212 00213 unsigned_type make_space_available(unsigned_type level); 00214 void empty_insert_heap(); 00215 00216 value_type get_supremum() const { return cmp.min_value(); } //{ return group_buffers[0][KNN].key; } 00217 unsigned_type current_delete_buffer_size() const { return delete_buffer_end - delete_buffer_current_min; } 00218 unsigned_type current_group_buffer_size(unsigned_type i) const { return &(group_buffers[i][N]) - group_buffer_current_mins[i]; } 00219 00220 public: 00226 priority_queue(pool_type & pool_); 00227 00237 _STXXL_DEPRECATED(priority_queue(prefetch_pool<block_type> & p_pool_, write_pool<block_type> & w_pool_)); 00238 00248 priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem); 00249 00252 void swap(priority_queue & obj) 00253 { 00254 //swap_1D_arrays(int_mergers,obj.int_mergers,num_int_groups); // does not work in g++ 3.4.3 :( bug? 00255 for (unsigned_type i = 0; i < num_int_groups; ++i) 00256 std::swap(int_mergers[i], obj.int_mergers[i]); 00257 00258 //std::swap(pool,obj.pool); 00259 //std::swap(pool_owned, obj.pool_owned); 00260 std::swap(ext_mergers, obj.ext_mergers); 00261 for (unsigned_type i1 = 0; i1 < total_num_groups; ++i1) 00262 for (unsigned_type i2 = 0; i2 < (N + 1); ++i2) 00263 std::swap(group_buffers[i1][i2], obj.group_buffers[i1][i2]); 00264 00265 swap_1D_arrays(group_buffer_current_mins, obj.group_buffer_current_mins, total_num_groups); 00266 swap_1D_arrays(delete_buffer, obj.delete_buffer, delete_buffer_size + 1); 00267 std::swap(delete_buffer_current_min, obj.delete_buffer_current_min); 00268 std::swap(delete_buffer_end, obj.delete_buffer_end); 00269 std::swap(cmp, obj.cmp); 00270 std::swap(insert_heap, obj.insert_heap); 00271 std::swap(num_active_groups, obj.num_active_groups); 00272 std::swap(size_, obj.size_); 00273 } 00274 00275 virtual ~priority_queue(); 00276 00279 size_type size() const; 00280 00283 bool empty() const { return (size() == 0); } 00284 00296 const value_type & top() const; 00297 00304 void pop(); 00305 00310 void push(const value_type & obj); 00311 00316 unsigned_type mem_cons() const 00317 { 00318 unsigned_type dynam_alloc_mem = 0; 00319 //dynam_alloc_mem += w_pool.mem_cons(); 00320 //dynam_alloc_mem += p_pool.mem_cons(); 00321 for (int i = 0; i < num_int_groups; ++i) 00322 dynam_alloc_mem += int_mergers[i].mem_cons(); 00323 00324 for (int i = 0; i < num_ext_groups; ++i) 00325 dynam_alloc_mem += ext_mergers[i].mem_cons(); 00326 00327 00328 return (sizeof(*this) + 00329 sizeof(ext_merger_type) * num_ext_groups + 00330 dynam_alloc_mem); 00331 } 00332 }; 00333 00334 00335 template <class Config_> 00336 inline typename priority_queue<Config_>::size_type priority_queue<Config_>::size() const 00337 { 00338 return size_ + 00339 insert_heap.size() - 1 + 00340 (delete_buffer_end - delete_buffer_current_min); 00341 } 00342 00343 00344 template <class Config_> 00345 inline const typename priority_queue<Config_>::value_type & priority_queue<Config_>::top() const 00346 { 00347 assert(!insert_heap.empty()); 00348 00349 const typename priority_queue<Config_>::value_type & t = insert_heap.top(); 00350 if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, t)) 00351 return t; 00352 else 00353 return *delete_buffer_current_min; 00354 } 00355 00356 template <class Config_> 00357 inline void priority_queue<Config_>::pop() 00358 { 00359 //STXXL_VERBOSE1("priority_queue::pop()"); 00360 assert(!insert_heap.empty()); 00361 00362 if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, insert_heap.top())) 00363 insert_heap.pop(); 00364 else 00365 { 00366 assert(delete_buffer_current_min < delete_buffer_end); 00367 ++delete_buffer_current_min; 00368 if (delete_buffer_current_min == delete_buffer_end) 00369 refill_delete_buffer(); 00370 } 00371 } 00372 00373 template <class Config_> 00374 inline void priority_queue<Config_>::push(const value_type & obj) 00375 { 00376 //STXXL_VERBOSE3("priority_queue::push("<< obj <<")"); 00377 assert(int_mergers->not_sentinel(obj)); 00378 if (insert_heap.size() == N + 1) 00379 empty_insert_heap(); 00380 00381 00382 assert(!insert_heap.empty()); 00383 00384 insert_heap.push(obj); 00385 } 00386 00387 00389 00390 template <class Config_> 00391 priority_queue<Config_>::priority_queue(pool_type & pool_) : 00392 pool(&pool_), 00393 pool_owned(false), 00394 delete_buffer_end(delete_buffer + delete_buffer_size), 00395 insert_heap(N + 2), 00396 num_active_groups(0), size_(0) 00397 { 00398 STXXL_VERBOSE2("priority_queue::priority_queue(pool)"); 00399 init(); 00400 } 00401 00402 // DEPRECATED 00403 template <class Config_> 00404 priority_queue<Config_>::priority_queue(prefetch_pool<block_type> & p_pool_, write_pool<block_type> & w_pool_) : 00405 pool(new pool_type(p_pool_, w_pool_)), 00406 pool_owned(true), 00407 delete_buffer_end(delete_buffer + delete_buffer_size), 00408 insert_heap(N + 2), 00409 num_active_groups(0), size_(0) 00410 { 00411 STXXL_VERBOSE2("priority_queue::priority_queue(p_pool, w_pool)"); 00412 init(); 00413 } 00414 00415 template <class Config_> 00416 priority_queue<Config_>::priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem) : 00417 pool(new pool_type(p_pool_mem / BlockSize, w_pool_mem / BlockSize)), 00418 pool_owned(true), 00419 delete_buffer_end(delete_buffer + delete_buffer_size), 00420 insert_heap(N + 2), 00421 num_active_groups(0), size_(0) 00422 { 00423 STXXL_VERBOSE2("priority_queue::priority_queue(pool sizes)"); 00424 init(); 00425 } 00426 00427 template <class Config_> 00428 void priority_queue<Config_>::init() 00429 { 00430 assert(!cmp(cmp.min_value(), cmp.min_value())); // verify strict weak ordering 00431 00432 ext_mergers = new ext_merger_type[num_ext_groups]; 00433 for (unsigned_type j = 0; j < num_ext_groups; ++j) 00434 ext_mergers[j].set_pool(pool); 00435 00436 value_type sentinel = cmp.min_value(); 00437 insert_heap.push(sentinel); // always keep the sentinel 00438 delete_buffer[delete_buffer_size] = sentinel; // sentinel 00439 delete_buffer_current_min = delete_buffer_end; // empty 00440 for (unsigned_type i = 0; i < total_num_groups; i++) 00441 { 00442 group_buffers[i][N] = sentinel; // sentinel 00443 group_buffer_current_mins[i] = &(group_buffers[i][N]); // empty 00444 } 00445 } 00446 00447 template <class Config_> 00448 priority_queue<Config_>::~priority_queue() 00449 { 00450 STXXL_VERBOSE2("priority_queue::~priority_queue()"); 00451 if (pool_owned) 00452 delete pool; 00453 00454 delete[] ext_mergers; 00455 } 00456 00457 //--------------------- Buffer refilling ------------------------------- 00458 00459 // refill group_buffers[j] and return number of elements found 00460 template <class Config_> 00461 unsigned_type priority_queue<Config_>::refill_group_buffer(unsigned_type group) 00462 { 00463 STXXL_VERBOSE2("priority_queue::refill_group_buffer(" << group << ")"); 00464 00465 value_type * target; 00466 unsigned_type length; 00467 size_type group_size = (group < num_int_groups) ? 00468 int_mergers[group].size() : 00469 ext_mergers[group - num_int_groups].size(); //elements left in segments 00470 unsigned_type left_elements = group_buffers[group] + N - group_buffer_current_mins[group]; //elements left in target buffer 00471 if (group_size + left_elements >= size_type(N)) 00472 { // buffer will be filled completely 00473 target = group_buffers[group]; 00474 length = N - left_elements; 00475 } 00476 else 00477 { 00478 target = group_buffers[group] + N - group_size - left_elements; 00479 length = group_size; 00480 } 00481 00482 if (length > 0) 00483 { 00484 // shift remaininig elements to front 00485 memmove(target, group_buffer_current_mins[group], left_elements * sizeof(value_type)); 00486 group_buffer_current_mins[group] = target; 00487 00488 // fill remaining space from group 00489 if (group < num_int_groups) 00490 int_mergers[group].multi_merge(target + left_elements, length); 00491 else 00492 ext_mergers[group - num_int_groups].multi_merge( 00493 target + left_elements, 00494 target + left_elements + length); 00495 } 00496 00497 //STXXL_MSG(length + left_elements); 00498 //std::copy(target,target + length + left_elements,std::ostream_iterator<value_type>(std::cout, "\n")); 00499 #if STXXL_CHECK_ORDER_IN_SORTS 00500 priority_queue_local::invert_order<typename Config::comparator_type, value_type, value_type> inv_cmp(cmp); 00501 if (!stxxl::is_sorted(group_buffer_current_mins[group], group_buffers[group] + N, inv_cmp)) 00502 { 00503 STXXL_VERBOSE2("length: " << length << " left_elements: " << left_elements); 00504 for (value_type * v = group_buffer_current_mins[group] + 1; v < group_buffer_current_mins[group] + left_elements; ++v) 00505 { 00506 if (inv_cmp(*v, *(v - 1))) 00507 { 00508 STXXL_MSG("Error in buffer " << group << " at position " << (v - group_buffer_current_mins[group] - 1) << "/" << (v - group_buffer_current_mins[group]) << " " << *(v - 2) << " " << *(v - 1) << " " << *v << " " << *(v + 1)); 00509 } 00510 } 00511 assert(false); 00512 } 00513 #endif 00514 00515 return length + left_elements; 00516 } 00517 00518 00519 template <class Config_> 00520 void priority_queue<Config_>::refill_delete_buffer() 00521 { 00522 STXXL_VERBOSE2("priority_queue::refill_delete_buffer()"); 00523 00524 size_type total_group_size = 0; 00525 //num_active_groups is <= 4 00526 for (int i = num_active_groups - 1; i >= 0; i--) 00527 { 00528 if ((group_buffers[i] + N) - group_buffer_current_mins[i] < delete_buffer_size) 00529 { 00530 unsigned_type length = refill_group_buffer(i); 00531 // max active level dry now? 00532 if (length == 0 && unsigned(i) == num_active_groups - 1) 00533 --num_active_groups; 00534 00535 total_group_size += length; 00536 } 00537 else 00538 total_group_size += delete_buffer_size; // actually only a sufficient lower bound 00539 } 00540 00541 unsigned_type length; 00542 if (total_group_size >= delete_buffer_size) // buffer can be filled completely 00543 { 00544 length = delete_buffer_size; // amount to be copied 00545 size_ -= size_type(delete_buffer_size); // amount left in group_buffers 00546 } 00547 else 00548 { 00549 length = total_group_size; 00550 assert(size_ == size_type(length)); // trees and group_buffers get empty 00551 size_ = 0; 00552 } 00553 00554 priority_queue_local::invert_order<typename Config::comparator_type, value_type, value_type> inv_cmp(cmp); 00555 00556 // now call simplified refill routines 00557 // which can make the assumption that 00558 // they find all they are asked in the buffers 00559 delete_buffer_current_min = delete_buffer_end - length; 00560 STXXL_VERBOSE2("Active groups = " << num_active_groups); 00561 switch (num_active_groups) 00562 { 00563 case 0: 00564 break; 00565 case 1: 00566 std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + length, delete_buffer_current_min); 00567 group_buffer_current_mins[0] += length; 00568 break; 00569 case 2: 00570 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 00571 { 00572 std::pair<value_type *, value_type *> seqs[2] = 00573 { 00574 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N), 00575 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N) 00576 }; 00577 parallel::multiway_merge_sentinel(seqs, seqs + 2, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately 00578 00579 group_buffer_current_mins[0] = seqs[0].first; 00580 group_buffer_current_mins[1] = seqs[1].first; 00581 } 00582 #else 00583 priority_queue_local::merge_iterator( 00584 group_buffer_current_mins[0], 00585 group_buffer_current_mins[1], delete_buffer_current_min, length, cmp); 00586 #endif 00587 break; 00588 case 3: 00589 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 00590 { 00591 std::pair<value_type *, value_type *> seqs[3] = 00592 { 00593 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N), 00594 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N), 00595 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N) 00596 }; 00597 parallel::multiway_merge_sentinel(seqs, seqs + 3, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately 00598 00599 group_buffer_current_mins[0] = seqs[0].first; 00600 group_buffer_current_mins[1] = seqs[1].first; 00601 group_buffer_current_mins[2] = seqs[2].first; 00602 } 00603 #else 00604 priority_queue_local::merge3_iterator( 00605 group_buffer_current_mins[0], 00606 group_buffer_current_mins[1], 00607 group_buffer_current_mins[2], delete_buffer_current_min, length, cmp); 00608 #endif 00609 break; 00610 case 4: 00611 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 00612 { 00613 std::pair<value_type *, value_type *> seqs[4] = 00614 { 00615 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N), 00616 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N), 00617 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N), 00618 std::make_pair(group_buffer_current_mins[3], group_buffers[3] + N) 00619 }; 00620 parallel::multiway_merge_sentinel(seqs, seqs + 4, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately 00621 00622 group_buffer_current_mins[0] = seqs[0].first; 00623 group_buffer_current_mins[1] = seqs[1].first; 00624 group_buffer_current_mins[2] = seqs[2].first; 00625 group_buffer_current_mins[3] = seqs[3].first; 00626 } 00627 #else 00628 priority_queue_local::merge4_iterator( 00629 group_buffer_current_mins[0], 00630 group_buffer_current_mins[1], 00631 group_buffer_current_mins[2], 00632 group_buffer_current_mins[3], delete_buffer_current_min, length, cmp); //side effect free 00633 #endif 00634 break; 00635 default: 00636 STXXL_THROW(std::runtime_error, "priority_queue<...>::refill_delete_buffer()", 00637 "Overflow! The number of buffers on 2nd level in stxxl::priority_queue is currently limited to 4"); 00638 } 00639 00640 #if STXXL_CHECK_ORDER_IN_SORTS 00641 if (!stxxl::is_sorted(delete_buffer_current_min, delete_buffer_end, inv_cmp)) 00642 { 00643 for (value_type * v = delete_buffer_current_min + 1; v < delete_buffer_end; ++v) 00644 { 00645 if (inv_cmp(*v, *(v - 1))) 00646 { 00647 STXXL_MSG("Error at position " << (v - delete_buffer_current_min - 1) << "/" << (v - delete_buffer_current_min) << " " << *(v - 1) << " " << *v); 00648 } 00649 } 00650 assert(false); 00651 } 00652 #endif 00653 //std::copy(delete_buffer_current_min,delete_buffer_current_min + length,std::ostream_iterator<value_type>(std::cout, "\n")); 00654 } 00655 00656 //-------------------------------------------------------------------- 00657 00658 // check if space is available on level k and 00659 // empty this level if necessary leading to a recursive call. 00660 // return the level where space was finally available 00661 template <class Config_> 00662 unsigned_type priority_queue<Config_>::make_space_available(unsigned_type level) 00663 { 00664 STXXL_VERBOSE2("priority_queue::make_space_available(" << level << ")"); 00665 unsigned_type finalLevel; 00666 assert(level < total_num_groups); 00667 assert(level <= num_active_groups); 00668 00669 if (level == num_active_groups) 00670 ++num_active_groups; 00671 00672 const bool spaceIsAvailable_ = 00673 (level < num_int_groups) ? int_mergers[level].is_space_available() 00674 : ((level == total_num_groups - 1) ? true : (ext_mergers[level - num_int_groups].is_space_available())); 00675 00676 if (spaceIsAvailable_) 00677 { 00678 finalLevel = level; 00679 } 00680 else 00681 { 00682 finalLevel = make_space_available(level + 1); 00683 00684 if (level < num_int_groups - 1) // from internal to internal tree 00685 { 00686 unsigned_type segmentSize = int_mergers[level].size(); 00687 value_type * newSegment = new value_type[segmentSize + 1]; 00688 int_mergers[level].multi_merge(newSegment, segmentSize); // empty this level 00689 00690 newSegment[segmentSize] = delete_buffer[delete_buffer_size]; // sentinel 00691 // for queues where size << #inserts 00692 // it might make sense to stay in this level if 00693 // segmentSize < alpha * KNN * k^level for some alpha < 1 00694 int_mergers[level + 1].insert_segment(newSegment, segmentSize); 00695 } 00696 else 00697 { 00698 if (level == num_int_groups - 1) // from internal to external tree 00699 { 00700 const unsigned_type segmentSize = int_mergers[num_int_groups - 1].size(); 00701 STXXL_VERBOSE1("Inserting segment into first level external: " << level << " " << segmentSize); 00702 ext_mergers[0].insert_segment(int_mergers[num_int_groups - 1], segmentSize); 00703 } 00704 else // from external to external tree 00705 { 00706 const size_type segmentSize = ext_mergers[level - num_int_groups].size(); 00707 STXXL_VERBOSE1("Inserting segment into second level external: " << level << " " << segmentSize); 00708 ext_mergers[level - num_int_groups + 1].insert_segment(ext_mergers[level - num_int_groups], segmentSize); 00709 } 00710 } 00711 } 00712 return finalLevel; 00713 } 00714 00715 00716 // empty the insert heap into the main data structure 00717 template <class Config_> 00718 void priority_queue<Config_>::empty_insert_heap() 00719 { 00720 STXXL_VERBOSE2("priority_queue::empty_insert_heap()"); 00721 assert(insert_heap.size() == (N + 1)); 00722 00723 const value_type sup = get_supremum(); 00724 00725 // build new segment 00726 value_type * newSegment = new value_type[N + 1]; 00727 value_type * newPos = newSegment; 00728 00729 // put the new data there for now 00730 //insert_heap.sortTo(newSegment); 00731 value_type * SortTo = newSegment; 00732 00733 insert_heap.sort_to(SortTo); 00734 00735 SortTo = newSegment + N; 00736 insert_heap.clear(); 00737 insert_heap.push(*SortTo); 00738 00739 assert(insert_heap.size() == 1); 00740 00741 newSegment[N] = sup; // sentinel 00742 00743 // copy the delete_buffer and group_buffers[0] to temporary storage 00744 // (the temporary can be eliminated using some dirty tricks) 00745 const unsigned_type tempSize = N + delete_buffer_size; 00746 value_type temp[tempSize + 1]; 00747 unsigned_type sz1 = current_delete_buffer_size(); 00748 unsigned_type sz2 = current_group_buffer_size(0); 00749 value_type * pos = temp + tempSize - sz1 - sz2; 00750 std::copy(delete_buffer_current_min, delete_buffer_current_min + sz1, pos); 00751 std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + sz2, pos + sz1); 00752 temp[tempSize] = sup; // sentinel 00753 00754 // refill delete_buffer 00755 // (using more complicated code it could be made somewhat fuller 00756 // in certain circumstances) 00757 priority_queue_local::merge_iterator(pos, newPos, delete_buffer_current_min, sz1, cmp); 00758 00759 // refill group_buffers[0] 00760 // (as above we might want to take the opportunity 00761 // to make group_buffers[0] fuller) 00762 priority_queue_local::merge_iterator(pos, newPos, group_buffer_current_mins[0], sz2, cmp); 00763 00764 // merge the rest to the new segment 00765 // note that merge exactly trips into the footsteps 00766 // of itself 00767 priority_queue_local::merge_iterator(pos, newPos, newSegment, N, cmp); 00768 00769 // and insert it 00770 unsigned_type freeLevel = make_space_available(0); 00771 assert(freeLevel == 0 || int_mergers[0].size() == 0); 00772 int_mergers[0].insert_segment(newSegment, N); 00773 00774 // get rid of invalid level 2 buffers 00775 // by inserting them into tree 0 (which is almost empty in this case) 00776 if (freeLevel > 0) 00777 { 00778 for (int_type i = freeLevel; i >= 0; i--) 00779 { 00780 // reverse order not needed 00781 // but would allow immediate refill 00782 00783 newSegment = new value_type[current_group_buffer_size(i) + 1]; // with sentinel 00784 std::copy(group_buffer_current_mins[i], group_buffer_current_mins[i] + current_group_buffer_size(i) + 1, newSegment); 00785 int_mergers[0].insert_segment(newSegment, current_group_buffer_size(i)); 00786 group_buffer_current_mins[i] = group_buffers[i] + N; // empty 00787 } 00788 } 00789 00790 // update size 00791 size_ += size_type(N); 00792 00793 // special case if the tree was empty before 00794 if (delete_buffer_current_min == delete_buffer_end) 00795 refill_delete_buffer(); 00796 } 00797 00798 namespace priority_queue_local 00799 { 00800 struct Parameters_for_priority_queue_not_found_Increase_IntM 00801 { 00802 enum { fits = false }; 00803 typedef Parameters_for_priority_queue_not_found_Increase_IntM result; 00804 }; 00805 00806 struct dummy 00807 { 00808 enum { fits = false }; 00809 typedef dummy result; 00810 }; 00811 00812 template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, unsigned_type B_, unsigned_type m_, bool stop = false> 00813 struct find_B_m 00814 { 00815 typedef find_B_m<E_, IntM_, MaxS_, B_, m_, stop> Self; 00816 enum { 00817 k = IntM_ / B_, // number of blocks that fit into M 00818 element_size = E_, // element size 00819 IntM = IntM_, 00820 B = B_, // block size 00821 m = m_, // number of blocks fitting into buffers 00822 c = k - m_, 00823 // memory occ. by block must be at least 10 times larger than size of ext sequence 00824 // && satisfy memory req && if we have two ext mergers their degree must be at least 64=m/2 00825 fits = c > 10 && ((k - m) * (m) * (m * B / (element_size * 4 * 1024))) >= MaxS_ 00826 && ((MaxS_ < ((k - m) * m / (2 * element_size)) * 1024) || m >= 128), 00827 step = 1 00828 }; 00829 00830 typedef typename find_B_m<element_size, IntM, MaxS_, B, m + step, fits || (m >= k - step)>::result candidate1; 00831 typedef typename find_B_m<element_size, IntM, MaxS_, B / 2, 1, fits || candidate1::fits>::result candidate2; 00832 typedef typename IF<fits, Self, typename IF<candidate1::fits, candidate1, candidate2>::result>::result result; 00833 }; 00834 00835 // specialization for the case when no valid parameters are found 00836 template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, bool stop> 00837 struct find_B_m<E_, IntM_, MaxS_, 2048, 1, stop> 00838 { 00839 enum { fits = false }; 00840 typedef Parameters_for_priority_queue_not_found_Increase_IntM result; 00841 }; 00842 00843 // to speedup search 00844 template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, unsigned_type B_, unsigned_type m_> 00845 struct find_B_m<E_, IntM_, MaxS_, B_, m_, true> 00846 { 00847 enum { fits = false }; 00848 typedef dummy result; 00849 }; 00850 00851 // E_ size of element in bytes 00852 template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_> 00853 struct find_settings 00854 { 00855 // start from block size (8*1024*1024) bytes 00856 typedef typename find_B_m<E_, IntM_, MaxS_, (8 * 1024 * 1024), 1>::result result; 00857 }; 00858 00859 struct Parameters_not_found_Try_to_change_the_Tune_parameter 00860 { 00861 typedef Parameters_not_found_Try_to_change_the_Tune_parameter result; 00862 }; 00863 00864 00865 template <unsigned_type AI_, unsigned_type X_, unsigned_type CriticalSize_> 00866 struct compute_N 00867 { 00868 typedef compute_N<AI_, X_, CriticalSize_> Self; 00869 enum 00870 { 00871 X = X_, 00872 AI = AI_, 00873 N = X / (AI * AI) // two stage internal 00874 }; 00875 typedef typename IF<(N >= CriticalSize_), Self, typename compute_N<AI / 2, X, CriticalSize_>::result>::result result; 00876 }; 00877 00878 template <unsigned_type X_, unsigned_type CriticalSize_> 00879 struct compute_N<1, X_, CriticalSize_> 00880 { 00881 typedef Parameters_not_found_Try_to_change_the_Tune_parameter result; 00882 }; 00883 } 00884 00886 00889 00891 00954 template <class Tp_, class Cmp_, unsigned_type IntM_, unsigned MaxS_, unsigned Tune_ = 6> 00955 class PRIORITY_QUEUE_GENERATOR 00956 { 00957 public: 00958 // actual calculation of B, m, k and element_size 00959 typedef typename priority_queue_local::find_settings<sizeof(Tp_), IntM_, MaxS_>::result settings; 00960 enum { 00961 B = settings::B, 00962 m = settings::m, 00963 X = B * (settings::k - m) / settings::element_size, // interpretation of result 00964 Buffer1Size = 32 // fixed 00965 }; 00966 // derivation of N, AI, AE 00967 typedef typename priority_queue_local::compute_N<(1 << Tune_), X, 4 * Buffer1Size>::result ComputeN; 00968 enum 00969 { 00970 N = ComputeN::N, 00971 AI = ComputeN::AI, 00972 AE = (m / 2 < 2) ? 2 : (m / 2) // at least 2 00973 }; 00974 enum { 00975 // Estimation of maximum internal memory consumption (in bytes) 00976 EConsumption = X * settings::element_size + settings::B * AE + ((MaxS_ / X) / AE) * settings::B * 1024 00977 }; 00978 /* 00979 unsigned BufferSize1_ = 32, // equalize procedure call overheads etc. 00980 unsigned N_ = 512, // bandwidth 00981 unsigned IntKMAX_ = 64, // maximal arity for internal mergers 00982 unsigned IntLevels_ = 4, 00983 unsigned BlockSize_ = (2*1024*1024), 00984 unsigned ExtKMAX_ = 64, // maximal arity for external mergers 00985 unsigned ExtLevels_ = 2, 00986 */ 00987 typedef priority_queue<priority_queue_config<Tp_, Cmp_, Buffer1Size, N, AI, 2, B, AE, 2> > result; 00988 }; 00989 00991 00992 __STXXL_END_NAMESPACE 00993 00994 00995 namespace std 00996 { 00997 template <class Config_> 00998 void swap(stxxl::priority_queue<Config_> & a, 00999 stxxl::priority_queue<Config_> & b) 01000 { 01001 a.swap(b); 01002 } 01003 } 01004 01005 #endif // !STXXL_PRIORITY_QUEUE_HEADER 01006 // vim: et:ts=4:sw=4