Reference documentation for deal.II version 9.1.0-pre
vector_operations_internal.h
1 // ---------------------------------------------------------------------
2 //
3 // Copyright (C) 2016 - 2018 by the deal.II authors
4 //
5 // This file is part of the deal.II library.
6 //
7 // The deal.II library is free software; you can use it, redistribute
8 // it, and/or modify it under the terms of the GNU Lesser General
9 // Public License as published by the Free Software Foundation; either
10 // version 2.1 of the License, or (at your option) any later version.
11 // The full text of the license can be found in the file LICENSE.md at
12 // the top level directory of deal.II.
13 //
14 // ---------------------------------------------------------------------
15 
16 
17 #ifndef dealii_vector_operations_internal_h
18 #define dealii_vector_operations_internal_h
19 
20 #include <deal.II/base/multithread_info.h>
21 #include <deal.II/base/parallel.h>
22 #include <deal.II/base/thread_management.h>
23 #include <deal.II/base/vectorization.h>
24 
25 #include <cstdio>
26 #include <cstring>
27 
28 DEAL_II_NAMESPACE_OPEN
29 
30 namespace internal
31 {
32  namespace VectorOperations
33  {
35 
36  template <typename T>
37  bool
38  is_non_negative(const T &t)
39  {
40  return t >= 0;
41  }
42 
43 
44  template <typename T>
45  bool
46  is_non_negative(const std::complex<T> &)
47  {
48  Assert(false, ExcMessage("Complex numbers do not have an ordering."));
49 
50  return false;
51  }
52 
53 
54  template <typename T>
55  void
56  print(const T &t, const char *format)
57  {
58  if (format != nullptr)
59  std::printf(format, t);
60  else
61  std::printf(" %5.2f", double(t));
62  }
63 
64 
65 
66  template <typename T>
67  void
68  print(const std::complex<T> &t, const char *format)
69  {
70  if (format != nullptr)
71  std::printf(format, t.real(), t.imag());
72  else
73  std::printf(" %5.2f+%5.2fi", double(t.real()), double(t.imag()));
74  }
75 
76  // call std::copy, except for in
77  // the case where we want to copy
78  // from std::complex to a
79  // non-complex type
80  template <typename T, typename U>
81  void
82  copy(const T *begin, const T *end, U *dest)
83  {
84  std::copy(begin, end, dest);
85  }
86 
87  template <typename T, typename U>
88  void
89  copy(const std::complex<T> *begin,
90  const std::complex<T> *end,
91  std::complex<U> * dest)
92  {
93  std::copy(begin, end, dest);
94  }
95 
96  template <typename T, typename U>
97  void
98  copy(const std::complex<T> *, const std::complex<T> *, U *)
99  {
100  Assert(false,
101  ExcMessage("Can't convert a vector of complex numbers "
102  "into a vector of reals/doubles"));
103  }
104 
105 
106 
107 #ifdef DEAL_II_WITH_THREADS
108 
116  template <typename Functor>
118  {
119  TBBForFunctor(Functor & functor,
120  const size_type start,
121  const size_type end)
122  : functor(functor)
123  , start(start)
124  , end(end)
125  {
126  const size_type vec_size = end - start;
127  // set chunk size for sub-tasks
128  const unsigned int gs =
129  internal::VectorImplementation::minimum_parallel_grain_size;
130  n_chunks =
131  std::min(static_cast<size_type>(4 * MultithreadInfo::n_threads()),
132  vec_size / gs);
133  chunk_size = vec_size / n_chunks;
134 
135  // round to next multiple of 512 (or minimum grain size if that happens
136  // to be smaller). this is advantageous because our accumulation
137  // algorithms favor lengths of a power of 2 due to pairwise summation ->
138  // at most one 'oddly' sized chunk
139  if (chunk_size > 512)
140  chunk_size = ((chunk_size + 511) / 512) * 512;
141  n_chunks = (vec_size + chunk_size - 1) / chunk_size;
142  AssertIndexRange((n_chunks - 1) * chunk_size, vec_size);
143  AssertIndexRange(vec_size, n_chunks * chunk_size + 1);
144  }
145 
146  void
147  operator()(const tbb::blocked_range<size_type> &range) const
148  {
149  const size_type r_begin = start + range.begin() * chunk_size;
150  const size_type r_end = std::min(start + range.end() * chunk_size, end);
151  functor(r_begin, r_end);
152  }
153 
154  Functor & functor;
155  const size_type start;
156  const size_type end;
157  unsigned int n_chunks;
158  size_type chunk_size;
159  };
160 #endif
161 
162  template <typename Functor>
163  void
164  parallel_for(
165  Functor & functor,
166  size_type start,
167  size_type end,
168  std::shared_ptr<parallel::internal::TBBPartitioner> &partitioner)
169  {
170 #ifdef DEAL_II_WITH_THREADS
171  size_type vec_size = end - start;
172  // only go to the parallel function in case there are at least 4 parallel
173  // items, otherwise the overhead is too large
174  if (vec_size >=
175  4 * internal::VectorImplementation::minimum_parallel_grain_size &&
177  {
178  Assert(partitioner.get() != nullptr,
180  "Unexpected initialization of Vector that does "
181  "not set the TBB partitioner to a usable state."));
182  std::shared_ptr<tbb::affinity_partitioner> tbb_partitioner =
183  partitioner->acquire_one_partitioner();
184 
185  TBBForFunctor<Functor> generic_functor(functor, start, end);
186  tbb::parallel_for(
187  tbb::blocked_range<size_type>(0, generic_functor.n_chunks, 1),
188  generic_functor,
189  *tbb_partitioner);
190  partitioner->release_one_partitioner(tbb_partitioner);
191  }
192  else if (vec_size > 0)
193  functor(start, end);
194 #else
195  functor(start, end);
196  (void)partitioner;
197 #endif
198  }
199 
200 
201  // Define the functors necessary to use SIMD with TBB. we also include the
202  // simple copy and set operations
203 
204  template <typename Number>
205  struct Vector_set
206  {
207  Vector_set(Number value, Number *dst)
208  : value(value)
209  , dst(dst)
210  {
211  Assert(dst != nullptr, ExcInternalError());
212  }
213 
214  void
215  operator()(const size_type begin, const size_type end) const
216  {
217  Assert(end >= begin, ExcInternalError());
218 
219  if (value == Number())
220  {
221 #ifdef DEAL_II_WITH_CXX17
222  if constexpr (std::is_trivial<Number>::value)
223 #else
224  if (std::is_trivial<Number>::value)
225 #endif
226  {
227  std::memset(dst + begin, 0, sizeof(Number) * (end - begin));
228  return;
229  }
230  }
231  std::fill(dst + begin, dst + end, value);
232  }
233 
234  const Number value;
235  Number *const dst;
236  };
237 
238  template <typename Number, typename OtherNumber>
239  struct Vector_copy
240  {
241  Vector_copy(const OtherNumber *src, Number *dst)
242  : src(src)
243  , dst(dst)
244  {
245  Assert(src != nullptr, ExcInternalError());
246  Assert(dst != nullptr, ExcInternalError());
247  }
248 
249  void
250  operator()(const size_type begin, const size_type end) const
251  {
252  Assert(end >= begin, ExcInternalError());
253 
254 #if __GNUG__ && __GNUC__ < 5
255  if (__has_trivial_copy(Number) &&
256  std::is_same<Number, OtherNumber>::value)
257 #else
258 # ifdef DEAL_II_WITH_CXX17
259  if constexpr (std::is_trivially_copyable<Number>() &&
260  std::is_same<Number, OtherNumber>::value)
261 # else
262  if (std::is_trivially_copyable<Number>() &&
263  std::is_same<Number, OtherNumber>::value)
264 # endif
265 #endif
266  std::memcpy(dst + begin, src + begin, (end - begin) * sizeof(Number));
267  else
268  {
269  DEAL_II_OPENMP_SIMD_PRAGMA
270  for (size_type i = begin; i < end; ++i)
271  dst[i] = src[i];
272  }
273  }
274 
275  const OtherNumber *const src;
276  Number *const dst;
277  };
278 
279  template <typename Number>
280  struct Vectorization_multiply_factor
281  {
282  Vectorization_multiply_factor(Number *val, Number factor)
283  : val(val)
284  , factor(factor)
285  {}
286 
287  void
288  operator()(const size_type begin, const size_type end) const
289  {
291  {
292  DEAL_II_OPENMP_SIMD_PRAGMA
293  for (size_type i = begin; i < end; ++i)
294  val[i] *= factor;
295  }
296  else
297  {
298  for (size_type i = begin; i < end; ++i)
299  val[i] *= factor;
300  }
301  }
302 
303  Number *val;
304  Number factor;
305  };
306 
307  template <typename Number>
308  struct Vectorization_add_av
309  {
310  Vectorization_add_av(Number *val, Number *v_val, Number factor)
311  : val(val)
312  , v_val(v_val)
313  , factor(factor)
314  {}
315 
316  void
317  operator()(const size_type begin, const size_type end) const
318  {
320  {
321  DEAL_II_OPENMP_SIMD_PRAGMA
322  for (size_type i = begin; i < end; ++i)
323  val[i] += factor * v_val[i];
324  }
325  else
326  {
327  for (size_type i = begin; i < end; ++i)
328  val[i] += factor * v_val[i];
329  }
330  }
331 
332  Number *val;
333  Number *v_val;
334  Number factor;
335  };
336 
337  template <typename Number>
338  struct Vectorization_sadd_xav
339  {
340  Vectorization_sadd_xav(Number *val, Number *v_val, Number a, Number x)
341  : val(val)
342  , v_val(v_val)
343  , a(a)
344  , x(x)
345  {}
346 
347  void
348  operator()(const size_type begin, const size_type end) const
349  {
351  {
352  DEAL_II_OPENMP_SIMD_PRAGMA
353  for (size_type i = begin; i < end; ++i)
354  val[i] = x * val[i] + a * v_val[i];
355  }
356  else
357  {
358  for (size_type i = begin; i < end; ++i)
359  val[i] = x * val[i] + a * v_val[i];
360  }
361  }
362 
363  Number *val;
364  Number *v_val;
365  Number a;
366  Number x;
367  };
368 
369  template <typename Number>
370  struct Vectorization_subtract_v
371  {
372  Vectorization_subtract_v(Number *val, Number *v_val)
373  : val(val)
374  , v_val(v_val)
375  {}
376 
377  void
378  operator()(const size_type begin, const size_type end) const
379  {
381  {
382  DEAL_II_OPENMP_SIMD_PRAGMA
383  for (size_type i = begin; i < end; ++i)
384  val[i] -= v_val[i];
385  }
386  else
387  {
388  for (size_type i = begin; i < end; ++i)
389  val[i] -= v_val[i];
390  }
391  }
392 
393  Number *val;
394  Number *v_val;
395  };
396 
397  template <typename Number>
398  struct Vectorization_add_factor
399  {
400  Vectorization_add_factor(Number *val, Number factor)
401  : val(val)
402  , factor(factor)
403  {}
404 
405  void
406  operator()(const size_type begin, const size_type end) const
407  {
409  {
410  DEAL_II_OPENMP_SIMD_PRAGMA
411  for (size_type i = begin; i < end; ++i)
412  val[i] += factor;
413  }
414  else
415  {
416  for (size_type i = begin; i < end; ++i)
417  val[i] += factor;
418  }
419  }
420 
421  Number *val;
422  Number factor;
423  };
424 
425  template <typename Number>
426  struct Vectorization_add_v
427  {
428  Vectorization_add_v(Number *val, Number *v_val)
429  : val(val)
430  , v_val(v_val)
431  {}
432 
433  void
434  operator()(const size_type begin, const size_type end) const
435  {
437  {
438  DEAL_II_OPENMP_SIMD_PRAGMA
439  for (size_type i = begin; i < end; ++i)
440  val[i] += v_val[i];
441  }
442  else
443  {
444  for (size_type i = begin; i < end; ++i)
445  val[i] += v_val[i];
446  }
447  }
448 
449  Number *val;
450  Number *v_val;
451  };
452 
453  template <typename Number>
454  struct Vectorization_add_avpbw
455  {
456  Vectorization_add_avpbw(Number *val,
457  Number *v_val,
458  Number *w_val,
459  Number a,
460  Number b)
461  : val(val)
462  , v_val(v_val)
463  , w_val(w_val)
464  , a(a)
465  , b(b)
466  {}
467 
468  void
469  operator()(const size_type begin, const size_type end) const
470  {
472  {
473  DEAL_II_OPENMP_SIMD_PRAGMA
474  for (size_type i = begin; i < end; ++i)
475  val[i] = val[i] + a * v_val[i] + b * w_val[i];
476  }
477  else
478  {
479  for (size_type i = begin; i < end; ++i)
480  val[i] = val[i] + a * v_val[i] + b * w_val[i];
481  }
482  }
483 
484  Number *val;
485  Number *v_val;
486  Number *w_val;
487  Number a;
488  Number b;
489  };
490 
491  template <typename Number>
492  struct Vectorization_sadd_xv
493  {
494  Vectorization_sadd_xv(Number *val, Number *v_val, Number x)
495  : val(val)
496  , v_val(v_val)
497  , x(x)
498  {}
499 
500  void
501  operator()(const size_type begin, const size_type end) const
502  {
504  {
505  DEAL_II_OPENMP_SIMD_PRAGMA
506  for (size_type i = begin; i < end; ++i)
507  val[i] = x * val[i] + v_val[i];
508  }
509  else
510  {
511  for (size_type i = begin; i < end; ++i)
512  val[i] = x * val[i] + v_val[i];
513  }
514  }
515 
516  Number *val;
517  Number *v_val;
518  Number x;
519  };
520 
521  template <typename Number>
522  struct Vectorization_sadd_xavbw
523  {
524  Vectorization_sadd_xavbw(Number *val,
525  Number *v_val,
526  Number *w_val,
527  Number x,
528  Number a,
529  Number b)
530  : val(val)
531  , v_val(v_val)
532  , w_val(w_val)
533  , x(x)
534  , a(a)
535  , b(b)
536  {}
537 
538  void
539  operator()(const size_type begin, const size_type end) const
540  {
542  {
543  DEAL_II_OPENMP_SIMD_PRAGMA
544  for (size_type i = begin; i < end; ++i)
545  val[i] = x * val[i] + a * v_val[i] + b * w_val[i];
546  }
547  else
548  {
549  for (size_type i = begin; i < end; ++i)
550  val[i] = x * val[i] + a * v_val[i] + b * w_val[i];
551  }
552  }
553 
554  Number *val;
555  Number *v_val;
556  Number *w_val;
557  Number x;
558  Number a;
559  Number b;
560  };
561 
562  template <typename Number>
563  struct Vectorization_scale
564  {
565  Vectorization_scale(Number *val, Number *v_val)
566  : val(val)
567  , v_val(v_val)
568  {}
569 
570  void
571  operator()(const size_type begin, const size_type end) const
572  {
574  {
575  DEAL_II_OPENMP_SIMD_PRAGMA
576  for (size_type i = begin; i < end; ++i)
577  val[i] *= v_val[i];
578  }
579  else
580  {
581  for (size_type i = begin; i < end; ++i)
582  val[i] *= v_val[i];
583  }
584  }
585 
586  Number *val;
587  Number *v_val;
588  };
589 
590  template <typename Number>
591  struct Vectorization_equ_au
592  {
593  Vectorization_equ_au(Number *val, Number *u_val, Number a)
594  : val(val)
595  , u_val(u_val)
596  , a(a)
597  {}
598 
599  void
600  operator()(const size_type begin, const size_type end) const
601  {
603  {
604  DEAL_II_OPENMP_SIMD_PRAGMA
605  for (size_type i = begin; i < end; ++i)
606  val[i] = a * u_val[i];
607  }
608  else
609  {
610  for (size_type i = begin; i < end; ++i)
611  val[i] = a * u_val[i];
612  }
613  }
614 
615  Number *val;
616  Number *u_val;
617  Number a;
618  };
619 
620  template <typename Number>
621  struct Vectorization_equ_aubv
622  {
623  Vectorization_equ_aubv(Number *val,
624  Number *u_val,
625  Number *v_val,
626  Number a,
627  Number b)
628  : val(val)
629  , u_val(u_val)
630  , v_val(v_val)
631  , a(a)
632  , b(b)
633  {}
634 
635  void
636  operator()(const size_type begin, const size_type end) const
637  {
639  {
640  DEAL_II_OPENMP_SIMD_PRAGMA
641  for (size_type i = begin; i < end; ++i)
642  val[i] = a * u_val[i] + b * v_val[i];
643  }
644  else
645  {
646  for (size_type i = begin; i < end; ++i)
647  val[i] = a * u_val[i] + b * v_val[i];
648  }
649  }
650 
651  Number *val;
652  Number *u_val;
653  Number *v_val;
654  Number a;
655  Number b;
656  };
657 
658  template <typename Number>
659  struct Vectorization_equ_aubvcw
660  {
661  Vectorization_equ_aubvcw(Number *val,
662  Number *u_val,
663  Number *v_val,
664  Number *w_val,
665  Number a,
666  Number b,
667  Number c)
668  : val(val)
669  , u_val(u_val)
670  , v_val(v_val)
671  , w_val(w_val)
672  , a(a)
673  , b(b)
674  , c(c)
675  {}
676 
677  void
678  operator()(const size_type begin, const size_type end) const
679  {
681  {
682  DEAL_II_OPENMP_SIMD_PRAGMA
683  for (size_type i = begin; i < end; ++i)
684  val[i] = a * u_val[i] + b * v_val[i] + c * w_val[i];
685  }
686  else
687  {
688  for (size_type i = begin; i < end; ++i)
689  val[i] = a * u_val[i] + b * v_val[i] + c * w_val[i];
690  }
691  }
692 
693  Number *val;
694  Number *u_val;
695  Number *v_val;
696  Number *w_val;
697  Number a;
698  Number b;
699  Number c;
700  };
701 
702  template <typename Number>
703  struct Vectorization_ratio
704  {
705  Vectorization_ratio(Number *val, Number *a_val, Number *b_val)
706  : val(val)
707  , a_val(a_val)
708  , b_val(b_val)
709  {}
710 
711  void
712  operator()(const size_type begin, const size_type end) const
713  {
715  {
716  DEAL_II_OPENMP_SIMD_PRAGMA
717  for (size_type i = begin; i < end; ++i)
718  val[i] = a_val[i] / b_val[i];
719  }
720  else
721  {
722  for (size_type i = begin; i < end; ++i)
723  val[i] = a_val[i] / b_val[i];
724  }
725  }
726 
727  Number *val;
728  Number *a_val;
729  Number *b_val;
730  };
731 
732 
733 
734  // All sums over all the vector entries (l2-norm, inner product, etc.) are
735  // performed with the same code, using a templated operation defined
736  // here. There are always two versions defined, a standard one that covers
737  // most cases and a vectorized one which is only for equal types and float
738  // and double.
739  template <typename Number, typename Number2>
740  struct Dot
741  {
742  static const bool vectorizes =
743  std::is_same<Number, Number2>::value &&
745 
746  Dot(const Number *X, const Number2 *Y)
747  : X(X)
748  , Y(Y)
749  {}
750 
751  Number
752  operator()(const size_type i) const
753  {
754  return X[i] * Number(numbers::NumberTraits<Number2>::conjugate(Y[i]));
755  }
756 
758  do_vectorized(const size_type i) const
759  {
761  x.load(X + i);
762  y.load(Y + i);
763 
764  // the following operation in VectorizedArray does an element-wise
765  // scalar product without taking into account complex values and
766  // the need to take the complex-conjugate of one argument. this
767  // may be a bug, but because all VectorizedArray classes only
768  // work on real scalars, it doesn't really matter very much.
769  // in any case, assert that we really don't get here for
770  // complex-valued objects
771  static_assert(numbers::NumberTraits<Number>::is_complex == false,
772  "This operation is not correctly implemented for "
773  "complex-valued objects.");
774  return x * y;
775  }
776 
777  const Number * X;
778  const Number2 *Y;
779  };
780 
781  template <typename Number, typename RealType>
782  struct Norm2
783  {
784  static const bool vectorizes =
786 
787  Norm2(const Number *X)
788  : X(X)
789  {}
790 
791  RealType
792  operator()(const size_type i) const
793  {
795  }
796 
798  do_vectorized(const size_type i) const
799  {
801  x.load(X + i);
802  return x * x;
803  }
804 
805  const Number *X;
806  };
807 
808  template <typename Number, typename RealType>
809  struct Norm1
810  {
811  static const bool vectorizes =
813 
814  Norm1(const Number *X)
815  : X(X)
816  {}
817 
818  RealType
819  operator()(const size_type i) const
820  {
822  }
823 
825  do_vectorized(const size_type i) const
826  {
828  x.load(X + i);
829  return std::abs(x);
830  }
831 
832  const Number *X;
833  };
834 
835  template <typename Number, typename RealType>
836  struct NormP
837  {
838  static const bool vectorizes =
840 
841  NormP(const Number *X, RealType p)
842  : X(X)
843  , p(p)
844  {}
845 
846  RealType
847  operator()(const size_type i) const
848  {
849  return std::pow(numbers::NumberTraits<Number>::abs(X[i]), p);
850  }
851 
853  do_vectorized(const size_type i) const
854  {
856  x.load(X + i);
857  return std::pow(std::abs(x), p);
858  }
859 
860  const Number *X;
861  RealType p;
862  };
863 
864  template <typename Number>
865  struct MeanValue
866  {
867  static const bool vectorizes =
869 
870  MeanValue(const Number *X)
871  : X(X)
872  {}
873 
874  Number
875  operator()(const size_type i) const
876  {
877  return X[i];
878  }
879 
881  do_vectorized(const size_type i) const
882  {
884  x.load(X + i);
885  return x;
886  }
887 
888  const Number *X;
889  };
890 
891  template <typename Number>
892  struct AddAndDot
893  {
894  static const bool vectorizes =
896 
897  AddAndDot(Number *X, const Number *V, const Number *W, Number a)
898  : X(X)
899  , V(V)
900  , W(W)
901  , a(a)
902  {}
903 
904  Number
905  operator()(const size_type i) const
906  {
907  X[i] += a * V[i];
908  return X[i] * Number(numbers::NumberTraits<Number>::conjugate(W[i]));
909  }
910 
912  do_vectorized(const size_type i) const
913  {
914  VectorizedArray<Number> x, w, v;
915  x.load(X + i);
916  v.load(V + i);
917  x += a * v;
918  x.store(X + i);
919  // may only load from W after storing in X because the pointers might
920  // point to the same memory
921  w.load(W + i);
922 
923  // the following operation in VectorizedArray does an element-wise
924  // scalar product without taking into account complex values and
925  // the need to take the complex-conjugate of one argument. this
926  // may be a bug, but because all VectorizedArray classes only
927  // work on real scalars, it doesn't really matter very much.
928  // in any case, assert that we really don't get here for
929  // complex-valued objects
930  static_assert(numbers::NumberTraits<Number>::is_complex == false,
931  "This operation is not correctly implemented for "
932  "complex-valued objects.");
933  return x * w;
934  }
935 
936  Number * X;
937  const Number *V, *W;
938  Number a;
939  };
940 
941 
942 
943  // this is the main working loop for all vector sums using the templated
944  // operation above. it accumulates the sums using a block-wise summation
945  // algorithm with post-update. this blocked algorithm has been proposed in
946  // a similar form by Castaldo, Whaley and Chronopoulos (SIAM
947  // J. Sci. Comput. 31, 1156-1174, 2008) and we use the smallest possible
948  // block size, 2. Sometimes it is referred to as pairwise summation. The
949  // worst case error made by this algorithm is on the order O(eps *
950  // log2(vec_size)), whereas a naive summation is O(eps * vec_size). Even
951  // though the Kahan summation is even more accurate with an error O(eps)
952  // by carrying along remainders not captured by the main sum, that involves
953  // additional costs which are not worthwhile. See the Wikipedia article on
954  // the Kahan summation algorithm.
955 
956  // The algorithm implemented here has the additional benefit that it is
957  // easily parallelized without changing the order of how the elements are
958  // added (floating point addition is not associative). For the same vector
959  // size and minimum_parallel_grainsize, the blocks are always the
960  // same and added pairwise.
961 
962  // The depth of recursion is controlled by the 'magic' parameter
963  // vector_accumulation_recursion_threshold: If the length is below
964  // vector_accumulation_recursion_threshold * 32 (32 is the part of code we
965  // unroll), a straight loop instead of recursion will be used. At the
966  // innermost level, eight values are added consecutively in order to better
967  // balance multiplications and additions.
968 
969  // Loops are unrolled as follows: the range [first,last) is broken into
970  // @p n_chunks each of size 32 plus the @p remainder.
971  // accumulate_regular() does the work on 32*n_chunks elements employing SIMD
972  // if possible and stores the result of the operation for each chunk in @p outer_results.
973 
974  // The code returns the result as the last argument in order to make
975  // spawning tasks simpler and use automatic template deduction.
976 
977 
983  const unsigned int vector_accumulation_recursion_threshold = 128;
984 
985  template <typename Operation, typename ResultType>
986  void
987  accumulate_recursive(const Operation &op,
988  const size_type first,
989  const size_type last,
990  ResultType & result)
991  {
992  const size_type vec_size = last - first;
993  if (vec_size <= vector_accumulation_recursion_threshold * 32)
994  {
995  // the vector is short enough so we perform the summation. first
996  // work on the regular part. The innermost 32 values are expanded in
997  // order to obtain known loop bounds for most of the work.
998  size_type index = first;
999  ResultType outer_results[vector_accumulation_recursion_threshold];
1000 
1001  // set the zeroth element to zero to correctly handle the case where
1002  // vec_size == 0
1003  outer_results[0] = ResultType();
1004 
1005  // the variable serves two purposes: (i) number of chunks (each 32
1006  // indices) for the given size; all results are stored in
1007  // outer_results[0,n_chunks) (ii) in the SIMD case n_chunks is also a
1008  // next free index in outer_results[] to which we can write after
1009  // accumulate_regular() is executed.
1010  size_type n_chunks = vec_size / 32;
1011  const size_type remainder = vec_size % 32;
1012  Assert(remainder == 0 ||
1013  n_chunks < vector_accumulation_recursion_threshold,
1014  ExcInternalError());
1015 
1016  // Select between the regular version and vectorized version based
1017  // on the number types we are given. To choose the vectorized
1018  // version often enough, we need to have all tasks but the last one
1019  // to be divisible by the vectorization length
1020  accumulate_regular(
1021  op,
1022  n_chunks,
1023  index,
1024  outer_results,
1025  std::integral_constant<bool, Operation::vectorizes>());
1026 
1027  // now work on the remainder, i.e., the last up to 32 values. Use
1028  // switch statement with fall-through to work on these values.
1029  if (remainder > 0)
1030  {
1031  // if we got here, it means that (vec_size <=
1032  // vector_accumulation_recursion_threshold * 32), which is to say
1033  // that the domain can be split into n_chunks <=
1034  // vector_accumulation_recursion_threshold:
1035  AssertIndexRange(n_chunks,
1036  vector_accumulation_recursion_threshold + 1);
1037  // split the remainder into chunks of 8, there could be up to 3
1038  // such chunks since remainder < 32.
1039  // Work on those chunks without any SIMD, that is we call
1040  // op(index).
1041  const size_type inner_chunks = remainder / 8;
1042  Assert(inner_chunks <= 3, ExcInternalError());
1043  const size_type remainder_inner = remainder % 8;
1044  ResultType r0 = ResultType(), r1 = ResultType(),
1045  r2 = ResultType();
1046  switch (inner_chunks)
1047  {
1048  case 3:
1049  r2 = op(index++);
1050  for (size_type j = 1; j < 8; ++j)
1051  r2 += op(index++);
1052  DEAL_II_FALLTHROUGH;
1053  case 2:
1054  r1 = op(index++);
1055  for (size_type j = 1; j < 8; ++j)
1056  r1 += op(index++);
1057  r1 += r2;
1058  DEAL_II_FALLTHROUGH;
1059  case 1:
1060  r2 = op(index++);
1061  for (size_type j = 1; j < 8; ++j)
1062  r2 += op(index++);
1063  DEAL_II_FALLTHROUGH;
1064  default:
1065  for (size_type j = 0; j < remainder_inner; ++j)
1066  r0 += op(index++);
1067  r0 += r2;
1068  r0 += r1;
1069  if (n_chunks == vector_accumulation_recursion_threshold)
1070  outer_results[vector_accumulation_recursion_threshold -
1071  1] += r0;
1072  else
1073  {
1074  outer_results[n_chunks] = r0;
1075  n_chunks++;
1076  }
1077  break;
1078  }
1079  }
1080  // make sure we worked through all indices
1081  AssertDimension(index, last);
1082 
1083  // now sum the results from the chunks stored in
1084  // outer_results[0,n_chunks) recursively
1085  while (n_chunks > 1)
1086  {
1087  if (n_chunks % 2 == 1)
1088  outer_results[n_chunks++] = ResultType();
1089  for (size_type i = 0; i < n_chunks; i += 2)
1090  outer_results[i / 2] = outer_results[i] + outer_results[i + 1];
1091  n_chunks /= 2;
1092  }
1093  result = outer_results[0];
1094  }
1095  else
1096  {
1097  // split vector into four pieces and work on the pieces
1098  // recursively. Make pieces (except last) divisible by one fourth the
1099  // recursion threshold.
1100  const size_type new_size =
1101  (vec_size / (vector_accumulation_recursion_threshold * 32)) *
1102  vector_accumulation_recursion_threshold * 8;
1103  Assert(first + 3 * new_size < last, ExcInternalError());
1104  ResultType r0, r1, r2, r3;
1105  accumulate_recursive(op, first, first + new_size, r0);
1106  accumulate_recursive(op, first + new_size, first + 2 * new_size, r1);
1107  accumulate_recursive(op,
1108  first + 2 * new_size,
1109  first + 3 * new_size,
1110  r2);
1111  accumulate_recursive(op, first + 3 * new_size, last, r3);
1112  r0 += r1;
1113  r2 += r3;
1114  result = r0 + r2;
1115  }
1116  }
1117 
1118 
1119  // this is the inner working routine for the accumulation loops
1120  // below. This is the standard case where the loop bounds are known. We
1121  // pulled this function out of the regular accumulate routine because we
1122  // might do this thing vectorized (see specialized function below)
1123  template <typename Operation, typename ResultType>
1124  void
1125  accumulate_regular(
1126  const Operation &op,
1127  size_type & n_chunks,
1128  size_type & index,
1129  ResultType (&outer_results)[vector_accumulation_recursion_threshold],
1130  std::integral_constant<bool, false>)
1131  {
1132  // note that each chunk is chosen to have a width of 32, thereby the index
1133  // is incremented by 4*8 for each @p i.
1134  for (size_type i = 0; i < n_chunks; ++i)
1135  {
1136  ResultType r0 = op(index);
1137  ResultType r1 = op(index + 1);
1138  ResultType r2 = op(index + 2);
1139  ResultType r3 = op(index + 3);
1140  index += 4;
1141  for (size_type j = 1; j < 8; ++j, index += 4)
1142  {
1143  r0 += op(index);
1144  r1 += op(index + 1);
1145  r2 += op(index + 2);
1146  r3 += op(index + 3);
1147  }
1148  r0 += r1;
1149  r2 += r3;
1150  outer_results[i] = r0 + r2;
1151  }
1152  }
1153 
1154 
1155 
1156  // this is the inner working routine for the accumulation loops
1157  // below. This is the specialized case where the loop bounds are known and
1158  // where we can vectorize. In that case, we request the 'do_vectorized'
1159  // routine of the operation instead of the regular one which does several
1160  // operations at once.
1161  template <typename Operation, typename Number>
1162  void
1163  accumulate_regular(
1164  const Operation &op,
1165  size_type & n_chunks,
1166  size_type & index,
1167  Number (&outer_results)[vector_accumulation_recursion_threshold],
1168  std::integral_constant<bool, true>)
1169  {
1170  // we start from @p index and workout @p n_chunks each of size 32.
1171  // in order employ SIMD and work on @p nvecs at a time, we split this
1172  // loop yet again:
1173  // First we work on (n_chunks/nvecs) chunks, where each chunk processes
1174  // nvecs*(4*8) elements.
1175 
1176  const unsigned int nvecs = VectorizedArray<Number>::n_array_elements;
1177  const size_type regular_chunks = n_chunks / nvecs;
1178  for (size_type i = 0; i < regular_chunks; ++i)
1179  {
1180  VectorizedArray<Number> r0 = op.do_vectorized(index);
1181  VectorizedArray<Number> r1 = op.do_vectorized(index + nvecs);
1182  VectorizedArray<Number> r2 = op.do_vectorized(index + 2 * nvecs);
1183  VectorizedArray<Number> r3 = op.do_vectorized(index + 3 * nvecs);
1184  index += nvecs * 4;
1185  for (size_type j = 1; j < 8; ++j, index += nvecs * 4)
1186  {
1187  r0 += op.do_vectorized(index);
1188  r1 += op.do_vectorized(index + nvecs);
1189  r2 += op.do_vectorized(index + 2 * nvecs);
1190  r3 += op.do_vectorized(index + 3 * nvecs);
1191  }
1192  r0 += r1;
1193  r2 += r3;
1194  r0 += r2;
1195  r0.store(
1196  &outer_results[i * VectorizedArray<Number>::n_array_elements]);
1197  }
1198 
1199  // If we are treating a case where the vector length is not divisible by
1200  // the vectorization length, need a cleanup loop
1201  // The remaining chunks are processed one by one starting from
1202  // regular_chunks * nvecs; We do as much as possible with 2 SIMD
1203  // operations within each chunk. Here we assume that nvecs < 32/2 = 16 as
1204  // well as 16%nvecs==0.
1206  Assert(16 % nvecs == 0, ExcInternalError());
1207  if (n_chunks % VectorizedArray<Number>::n_array_elements != 0)
1208  {
1210  r1 = VectorizedArray<Number>();
1211  const size_type start_irreg = regular_chunks * nvecs;
1212  for (size_type c = start_irreg; c < n_chunks; ++c)
1213  for (size_type j = 0; j < 32; j += 2 * nvecs, index += 2 * nvecs)
1214  {
1215  r0 += op.do_vectorized(index);
1216  r1 += op.do_vectorized(index + nvecs);
1217  }
1218  r0 += r1;
1219  r0.store(&outer_results[start_irreg]);
1220  // update n_chunks to denote unused element in outer_results[] from
1221  // which we can keep writing.
1222  n_chunks = start_irreg + VectorizedArray<Number>::n_array_elements;
1223  }
1224  }
1225 
1226 
1227 
1228 #ifdef DEAL_II_WITH_THREADS
1229 
1257  template <typename Operation, typename ResultType>
1259  {
1260  static const unsigned int threshold_array_allocate = 512;
1261 
1262  TBBReduceFunctor(const Operation &op,
1263  const size_type start,
1264  const size_type end)
1265  : op(op)
1266  , start(start)
1267  , end(end)
1268  {
1269  const size_type vec_size = end - start;
1270  // set chunk size for sub-tasks
1271  const unsigned int gs =
1272  internal::VectorImplementation::minimum_parallel_grain_size;
1273  n_chunks =
1274  std::min(static_cast<size_type>(4 * MultithreadInfo::n_threads()),
1275  vec_size / gs);
1276  chunk_size = vec_size / n_chunks;
1277 
1278  // round to next multiple of 512 (or leave it at the minimum grain size
1279  // if that happens to be smaller). this is advantageous because our
1280  // algorithm favors lengths of a power of 2 due to pairwise summation ->
1281  // at most one 'oddly' sized chunk
1282  if (chunk_size > 512)
1283  chunk_size = ((chunk_size + 511) / 512) * 512;
1284  n_chunks = (vec_size + chunk_size - 1) / chunk_size;
1285  AssertIndexRange((n_chunks - 1) * chunk_size, vec_size);
1286  AssertIndexRange(vec_size, n_chunks * chunk_size + 1);
1287 
1288  if (n_chunks > threshold_array_allocate)
1289  {
1290  // make sure we allocate an even number of elements,
1291  // access to the new last element is needed in do_sum()
1292  large_array.resize(2 * ((n_chunks + 1) / 2));
1293  array_ptr = large_array.data();
1294  }
1295  else
1296  array_ptr = &small_array[0];
1297  }
1298 
1303  void
1304  operator()(const tbb::blocked_range<size_type> &range) const
1305  {
1306  for (size_type i = range.begin(); i < range.end(); ++i)
1307  accumulate_recursive(op,
1308  start + i * chunk_size,
1309  std::min(start + (i + 1) * chunk_size, end),
1310  array_ptr[i]);
1311  }
1312 
1313  ResultType
1314  do_sum() const
1315  {
1316  while (n_chunks > 1)
1317  {
1318  if (n_chunks % 2 == 1)
1319  array_ptr[n_chunks++] = ResultType();
1320  for (size_type i = 0; i < n_chunks; i += 2)
1321  array_ptr[i / 2] = array_ptr[i] + array_ptr[i + 1];
1322  n_chunks /= 2;
1323  }
1324  return array_ptr[0];
1325  }
1326 
1327  const Operation &op;
1328  const size_type start;
1329  const size_type end;
1330 
1331  mutable unsigned int n_chunks;
1332  unsigned int chunk_size;
1333  ResultType small_array[threshold_array_allocate];
1334  std::vector<ResultType> large_array;
1335  // this variable either points to small_array or large_array depending on
1336  // the number of threads we want to feed
1337  mutable ResultType *array_ptr;
1338  };
1339 #endif
1340 
1341 
1342 
1347  template <typename Operation, typename ResultType>
1348  void
1349  parallel_reduce(
1350  const Operation & op,
1351  const size_type start,
1352  const size_type end,
1353  ResultType & result,
1354  std::shared_ptr<parallel::internal::TBBPartitioner> &partitioner)
1355  {
1356 #ifdef DEAL_II_WITH_THREADS
1357  size_type vec_size = end - start;
1358  // only go to the parallel function in case there are at least 4 parallel
1359  // items, otherwise the overhead is too large
1360  if (vec_size >=
1361  4 * internal::VectorImplementation::minimum_parallel_grain_size &&
1363  {
1364  Assert(partitioner.get() != nullptr,
1366  "Unexpected initialization of Vector that does "
1367  "not set the TBB partitioner to a usable state."));
1368  std::shared_ptr<tbb::affinity_partitioner> tbb_partitioner =
1369  partitioner->acquire_one_partitioner();
1370 
1371  TBBReduceFunctor<Operation, ResultType> generic_functor(op,
1372  start,
1373  end);
1374  tbb::parallel_for(
1375  tbb::blocked_range<size_type>(0, generic_functor.n_chunks, 1),
1376  generic_functor,
1377  *tbb_partitioner);
1378  partitioner->release_one_partitioner(tbb_partitioner);
1379  result = generic_functor.do_sum();
1380  }
1381  else
1382  accumulate_recursive(op, start, end, result);
1383 #else
1384  accumulate_recursive(op, start, end, result);
1385  (void)partitioner;
1386 #endif
1387  }
1388  } // namespace VectorOperations
1389 } // namespace internal
1390 
1391 DEAL_II_NAMESPACE_CLOSE
1392 
1393 #endif
#define AssertDimension(dim1, dim2)
Definition: exceptions.h:1366
#define AssertIndexRange(index, range)
Definition: exceptions.h:1407
static real_type abs(const number &x)
Definition: numbers.h:483
unsigned long long int global_dof_index
Definition: types.h:72
static::ExceptionBase & ExcMessage(std::string arg1)
void store(Number *ptr) const
static real_type abs_square(const number &x)
Definition: numbers.h:474
#define Assert(cond, exc)
Definition: exceptions.h:1227
void load(const Number *ptr)
static unsigned int n_threads()
void operator()(const tbb::blocked_range< size_type > &range) const
static::ExceptionBase & ExcInternalError()