16 #ifndef dealii_work_stream_h 17 # define dealii_work_stream_h 20 # include <deal.II/base/config.h> 22 # include <deal.II/base/graph_coloring.h> 23 # include <deal.II/base/multithread_info.h> 24 # include <deal.II/base/parallel.h> 25 # include <deal.II/base/template_constraints.h> 26 # include <deal.II/base/thread_local_storage.h> 27 # include <deal.II/base/thread_management.h> 29 # ifdef DEAL_II_WITH_THREADS 30 # include <deal.II/base/thread_management.h> 32 # include <tbb/pipeline.h> 35 # include <functional> 40 DEAL_II_NAMESPACE_OPEN
149 # ifdef DEAL_II_WITH_THREADS 170 namespace Implementation2
175 template <
typename Iterator,
typename ScratchData,
typename CopyData>
194 std::shared_ptr<ScratchData> scratch_data;
195 bool currently_in_use;
201 : currently_in_use(false)
206 , currently_in_use(in_use)
220 : scratch_data(o.scratch_data)
221 , currently_in_use(o.currently_in_use)
304 , scratch_data(nullptr)
305 , sample_scratch_data(nullptr)
306 , currently_in_use(false)
317 const Iterator & end,
318 const unsigned int buffer_size,
321 const CopyData & sample_copy_data)
325 , sample_scratch_data(sample_scratch_data)
326 , chunk_size(chunk_size)
329 for (
unsigned int element = 0; element <
item_buffer.size();
366 for (
unsigned int i = 0; i <
item_buffer.size(); ++i)
373 Assert(current_item !=
nullptr,
374 ExcMessage(
"This can't be. There must be a free item!"));
391 if (current_item->
n_items == 0)
467 template <
typename Iterator,
typename ScratchData,
typename CopyData>
477 const std::function<
void(
const Iterator &, ScratchData &, CopyData &)>
479 bool copier_exist =
true)
480 : tbb::filter( false)
482 , copier_exist(copier_exist)
511 ScratchData *scratch_data =
nullptr;
514 current_item->scratch_data->get();
518 for (
typename ItemType::ScratchDataList::iterator p =
519 scratch_data_list.begin();
520 p != scratch_data_list.end();
522 if (p->currently_in_use ==
false)
524 scratch_data = p->scratch_data.get();
525 p->currently_in_use =
true;
530 if (scratch_data ==
nullptr)
533 new ScratchData(*current_item->sample_scratch_data);
535 typename ItemType::ScratchDataList::value_type
536 new_scratch_object(scratch_data,
true);
537 scratch_data_list.push_back(new_scratch_object);
545 for (
unsigned int i = 0; i < current_item->n_items; ++i)
550 worker(current_item->work_items[i],
552 current_item->copy_datas[i]);
554 catch (
const std::exception &exc)
556 Threads::internal::handle_std_exception(exc);
560 Threads::internal::handle_unknown_exception();
569 current_item->scratch_data->get();
571 for (
typename ItemType::ScratchDataList::iterator p =
572 scratch_data_list.begin();
573 p != scratch_data_list.end();
575 if (p->scratch_data.get() == scratch_data)
578 p->currently_in_use =
false;
583 if (copier_exist ==
false)
584 current_item->currently_in_use =
false;
598 const std::function<void(const Iterator &, ScratchData &, CopyData &)>
615 template <
typename Iterator,
typename ScratchData,
typename CopyData>
625 Copier(
const std::function<
void(
const CopyData &)> &copier)
648 for (
unsigned int i = 0; i < current_item->n_items; ++i)
653 copier(current_item->copy_datas[i]);
655 catch (
const std::exception &exc)
657 Threads::internal::handle_std_exception(exc);
661 Threads::internal::handle_unknown_exception();
666 current_item->currently_in_use =
false;
679 const std::function<void(const CopyData &)>
copier;
692 namespace Implementation3
699 template <
typename Iterator,
typename ScratchData,
typename CopyData>
702 std::shared_ptr<ScratchData> scratch_data;
703 std::shared_ptr<CopyData> copy_data;
704 bool currently_in_use;
710 : currently_in_use(false)
718 , currently_in_use(in_use)
731 : scratch_data(o.scratch_data)
732 , copy_data(o.copy_data)
733 , currently_in_use(o.currently_in_use)
744 template <
typename Iterator,
typename ScratchData,
typename CopyData>
752 const std::function<
void(
const Iterator &, ScratchData &, CopyData &)>
754 const std::function<
void(
const CopyData &)> &copier,
756 const CopyData & sample_copy_data)
759 , sample_scratch_data(sample_scratch_data)
760 , sample_copy_data(sample_copy_data)
770 typename std::vector<Iterator>::const_iterator> &range)
782 ScratchData *scratch_data =
nullptr;
783 CopyData * copy_data =
nullptr;
789 for (
typename ScratchAndCopyDataList::iterator p =
790 scratch_and_copy_data_list.begin();
791 p != scratch_and_copy_data_list.end();
793 if (p->currently_in_use ==
false)
795 scratch_data = p->scratch_data.get();
796 copy_data = p->copy_data.get();
797 p->currently_in_use =
true;
803 if (scratch_data ==
nullptr)
807 copy_data =
new CopyData(sample_copy_data);
809 scratch_and_copy_data_list.emplace_back(scratch_data,
817 for (
typename std::vector<Iterator>::const_iterator p = range.begin();
824 worker(*p, *scratch_data, *copy_data);
828 catch (
const std::exception &exc)
830 Threads::internal::handle_std_exception(exc);
834 Threads::internal::handle_unknown_exception();
844 for (
typename ScratchAndCopyDataList::iterator p =
845 scratch_and_copy_data_list.begin();
846 p != scratch_and_copy_data_list.end();
848 if (p->scratch_data.get() == scratch_data)
851 p->currently_in_use =
false;
872 const std::function<void(const Iterator &, ScratchData &, CopyData &)>
879 const std::function<void(const CopyData &)>
copier;
885 const CopyData & sample_copy_data;
892 # endif // DEAL_II_WITH_THREADS 929 template <
typename Worker,
932 typename ScratchData,
935 run(
const std::vector<std::vector<Iterator>> &colored_iterators,
939 const CopyData & sample_copy_data,
978 template <
typename Worker,
981 typename ScratchData,
984 run(
const Iterator & begin,
985 const typename identity<Iterator>::type &end,
988 const ScratchData & sample_scratch_data,
989 const CopyData & sample_copy_data,
994 ExcMessage(
"The queue length must be at least one, and preferably " 995 "larger than the number of processors on this system."));
1002 if (!(begin != end))
1007 # ifdef DEAL_II_WITH_THREADS 1013 CopyData copy_data = sample_copy_data;
1015 for (Iterator i = begin; i != end; ++i)
1019 if (
static_cast<const std::function<
1020 void(
const Iterator &, ScratchData &, CopyData &)
> &>(worker))
1021 worker(i, scratch_data, copy_data);
1022 if (
static_cast<const std::function<void(const CopyData &)> &
>(
1027 # ifdef DEAL_II_WITH_THREADS 1031 if (
static_cast<const std::function<void(const CopyData &)> &
>(copier))
1036 iterator_range_to_item_stream(begin,
1040 sample_scratch_data,
1044 worker_filter(worker);
1046 copier_filter(copier);
1049 tbb::pipeline assembly_line;
1050 assembly_line.add_filter(iterator_range_to_item_stream);
1051 assembly_line.add_filter(worker_filter);
1052 assembly_line.add_filter(copier_filter);
1055 assembly_line.run(queue_length);
1057 assembly_line.clear();
1075 std::vector<std::vector<Iterator>> all_iterators(1);
1076 for (Iterator p = begin; p != end; ++p)
1077 all_iterators[0].push_back(p);
1082 sample_scratch_data,
1093 template <
typename Worker,
1096 typename ScratchData,
1099 run(
const std::vector<std::vector<Iterator>> &colored_iterators,
1102 const ScratchData & sample_scratch_data,
1103 const CopyData & sample_copy_data,
1104 const unsigned int queue_length,
1108 ExcMessage(
"The queue length must be at least one, and preferably " 1109 "larger than the number of processors on this system."));
1111 Assert(chunk_size > 0,
ExcMessage(
"The chunk_size must be at least one."));
1116 # ifdef DEAL_II_WITH_THREADS 1122 CopyData copy_data = sample_copy_data;
1124 for (
unsigned int color = 0; color < colored_iterators.size(); ++color)
1125 for (
typename std::vector<Iterator>::const_iterator p =
1126 colored_iterators[color].begin();
1127 p != colored_iterators[color].end();
1132 if (
static_cast<const std::function<
void(
1133 const Iterator &, ScratchData &, CopyData &)
> &>(worker))
1134 worker(*p, scratch_data, copy_data);
1135 if (
static_cast<const std::function<void(const CopyData &)> &
>(
1140 # ifdef DEAL_II_WITH_THREADS 1144 for (
unsigned int color = 0; color < colored_iterators.size(); ++color)
1145 if (colored_iterators[color].size() > 0)
1150 using RangeType =
typename std::vector<Iterator>::const_iterator;
1152 WorkerAndCopier worker_and_copier(worker,
1154 sample_scratch_data,
1158 tbb::blocked_range<RangeType>(colored_iterators[color].begin(),
1159 colored_iterators[color].end(),
1161 std::bind(&WorkerAndCopier::operator(),
1162 std::ref(worker_and_copier),
1163 std::placeholders::_1),
1164 tbb::auto_partitioner());
1201 template <
typename MainClass,
1203 typename ScratchData,
1207 const typename identity<Iterator>::type &end,
1208 MainClass & main_object,
1209 void (MainClass::*worker)(
const Iterator &, ScratchData &, CopyData &),
1210 void (MainClass::*copier)(
const CopyData &),
1211 const ScratchData &sample_scratch_data,
1212 const CopyData & sample_copy_data,
1220 std::ref(main_object),
1221 std::placeholders::_1,
1222 std::placeholders::_2,
1223 std::placeholders::_3),
1224 std::bind(copier, std::ref(main_object), std::placeholders::_1),
1225 sample_scratch_data,
1235 DEAL_II_NAMESPACE_CLOSE
void * operator()(void *item) override
const std::function< void(const Iterator &, ScratchData &, CopyData &)> worker
const std::function< void(const CopyData &)> copier
std::pair< Iterator, Iterator > remaining_iterator_range
std::list< ScratchAndCopyDataObjects > ScratchAndCopyDataList
const std::function< void(const Iterator &, ScratchData &, CopyData &)> worker
ScratchAndCopyDataObjects()
std::vector< Iterator > work_items
virtual void * operator()(void *) override
void * operator()(void *item) override
void operator()(const tbb::blocked_range< typename std::vector< Iterator >::const_iterator > &range)
Threads::ThreadLocalStorage< ScratchDataList > * scratch_data
IteratorRangeToItemStream(const Iterator &begin, const Iterator &end, const unsigned int buffer_size, const unsigned int chunk_size, const ScratchData &sample_scratch_data, const CopyData &sample_copy_data)
static::ExceptionBase & ExcMessage(std::string arg1)
#define Assert(cond, exc)
const ScratchData & sample_scratch_data
std::vector< ItemType > item_buffer
Worker(const std::function< void(const Iterator &, ScratchData &, CopyData &)> &worker, bool copier_exist=true)
std::vector< CopyData > copy_datas
std::list< ScratchDataObject > ScratchDataList
WorkerAndCopier(const std::function< void(const Iterator &, ScratchData &, CopyData &)> &worker, const std::function< void(const CopyData &)> &copier, const ScratchData &sample_scratch_data, const CopyData &sample_copy_data)
const ScratchData * sample_scratch_data
const std::function< void(const CopyData &)> copier
void run(const std::vector< std::vector< Iterator >> &colored_iterators, Worker worker, Copier copier, const ScratchData &sample_scratch_data, const CopyData &sample_copy_data, const unsigned int queue_length=2 *MultithreadInfo::n_threads(), const unsigned int chunk_size=8)
static unsigned int n_threads()
Threads::ThreadLocalStorage< typename ItemType::ScratchDataList > thread_local_scratch
const unsigned int chunk_size
Copier(const std::function< void(const CopyData &)> &copier)
const ScratchData & sample_scratch_data
static::ExceptionBase & ExcInternalError()