返回> 网站首页 

进入多核时代的C++

yoours2011-01-12 13:38:33 阅读 1233

简介一边听听音乐,一边写写文章。

本文介绍了多核时代下C++遇到的问题以及应对手段,最后简单介绍了怎样使用Intel Thread Building Blocks来让C++充分利用多核CPU的处理能力。

    几年之前,CPU的性能还主要取决于CPU的主频,经过超摩尔定律的发展后,没过多长时间CPU的主频速度就已接近“极限”,使得单单靠提高CPU的主频来提升性能变得非常困难。
    目前,Intel、AMD等CPU生产商都转而采用了多核技术来提升CPU性能,甚至提出了群核CPU的概念。这意味着,要充分发挥多核CPU的性能,程序就必须采用多线程并发计算的方式,传统的串行程序将会极大地浪费多核CPU的运算能力!

    C++是上世纪80年代诞生的语言,它的前身是同样风靡全球的C语言。一直以来它都以代码效率卓越著称,进入多核时代后,因为C++标准库没有提供多线程支持,要用C++开发出充分利用多核CPU的程序将面临很大挑战。
    于是,在C++社区出现了不少优秀的库以支持并行编程,如各种跨平台的线程库,OpenMP,Clik++等。另一方面,微软也从Win2K开始不断地加入线程池API(如QueueUserWorkItem),C++09标准也明确地表示要加入多线程的支持。

    使用线程库编写并行程序的优点是可以精确调度各个线程,并且可以在所有C++编译器里使用。不过要充分发挥多核CPU的性能,还要考虑很多因素,主要难点有:

  • 死锁    编写多线程必然会遇到同步问题,如果同步控制出现问题,就可能出现死锁或脏数据。
  • 线程之间通信    使用何种机制在多个线程之间通信?即要保证通信数据同步又要保证效率。
  • 负载平衡    分配到每个线程的工作量要尽量平衡,避免一个线程忙一个线程闲的情形发生。
  • 资源匹配    程序应该使用多少个线程?过少的线程不能充分利用CPU的多核优势,而过多的线程会造成线程调度过于频繁同样会降低效率。

    OpenMP是目前比较流行的C++并行编程方式,它通过在代码中插入专用的pragma编译指令来指示编译器把串行代码编译成并行程序。
    它的优点是易于使用,几乎不用修改原代码就可对老程序进行并发支持的改造。问题是它必须要有编译器的支持,尽管目前不少编译器都提供了OpenMP的支 持,但它毕竟不是C++的一部分,甚至它都不是真正意义上的C++库。使用OpenMP的C++代码看上去总是有些怪异(个人观点^_^)。

    现在,我们又有了一个新选择:Intel Thread Building Blocks(TBB,线程构建模块)。TBB是一个开源的C++模板库,能够运行在 Windows、Linux、Macintosh以及UNIX等系统上,只要是标准的C++编译器都可以使用它。

    TBB的功能和优势

功能优势
基于任务的并行化在逻辑任务而非物理线程的角度来指定线程功能
  • 让开发者关注更高层的可扩展任务级模式而非底层的线程机制
  • 使用被证实可有效利用多内核的数据分解提取技术
  • 自动负载平衡
  • 高效地支持嵌套并行化,允许从一个并行组件中建立出一个新的并行组件
并行算法从库中选择高效并行算法模板,即可快速地获得多核处理器带来的优势。
  • 快速应用为并行性能及可量测性而设计的算法。
  • 范型模板让你轻易地把它们定制成你所需要的算法。
  • 支持简单插件部署到应用中提升软件速度,优化内核和本地缓存。
  • 依靠预置的并行结构,在很多情况下都能减少生产多线程软件的工作量。
跨平台支持编写一次应用就可以部署到多个操作系统中。
  • 为32位和64位的Windows*、Linux* 和 Mac OS* X 平台提供一种解决方案。
  • 支持Intel、Microsoft及GNU的业界领先的编译器。
  • 加快在多种多核平台中部署应用软件的速度。
程序库级解决方案只需花费很小的精力就可得到高优化的并行功能。
  • 你的 C++ 应用程序只需调用Intel Threading Building Blocks 库。
  • 标准 C++ - 不需要使用新语言重写代码。
  • 兼容其它线程包。
  • 可无限制地和你的软件一起分发运行时库。
  • 无缝整合到已有的开发环境中。
高并发容器优化处理器的能力实现任务并发
  • 使用线程安全并且高并发的接口简化多线程应用的开发。
  • 使用预测试的数据结构提升应用软件质量。
  • 更高效率的内核或处理器多路执行来提升应用软件性能。

示例:

    用TBB写并行transform代替原std::transform,分别使用parallel_for和pipeline实现

  1. #include <iostream>
  2. #include <algorithm>
  3. #include <vector>
  4. #include <list>
  5. #include <math.h>
  6. #include <tbb/task_scheduler_init.h>
  7. #include <tbb/blocked_range.h>
  8. #include <tbb/parallel_for.h>
  9. #include <tbb/pipeline.h>
  10. #include <tbb/tick_count.h>
  11.  
  12. //-----------------随机存取迭代器版本-------------------------------------
  13. //both_random_helper,作用:测试两个模板是否都是random_access_iterator_tag
  14. //是则Iter_cat返回random_access_iterator_tag
  15. //否则返回forward_iterator_tag
  16. template<class _Cat1, class _Cat2>
  17. struct both_random_helper{
  18.     typedef std::forward_iterator_tag Iter_cat;
  19. };
  20.  
  21. template<>
  22. struct both_random_helper<
  23.     std::random_access_iterator_tag,
  24.     std::random_access_iterator_tag>{
  25.     typedef std::random_access_iterator_tag Iter_cat;
  26. };
  27.  
  28. // 用于存放一对迭代器
  29. template<class _InIt, class _OutIt>
  30. struct Iter_pair{
  31.     _InIt m_in;
  32.     _OutIt m_out;
  33.     Iter_pair(_InIt _in, _OutIt _out)
  34.         :m_in(_in),m_out(_out){}
  35.     //支持blocked_range拆分
  36.     Iter_pair operator+(size_t off) const
  37.     {
  38.         return Iter_pair(m_in+off, m_out+off);
  39.     }
  40.  
  41.     size_t operator-(Iter_pair rhs) const
  42.     {
  43.         return m_in-rhs.m_in;
  44.     }
  45.  
  46.     bool operator<(Iter_pair rhs) const
  47.     {
  48.         return m_in<rhs.m_in;
  49.     }
  50. };
  51.  
  52. // 随机存取迭代器版本
  53. template<class _InIt, class _OutIt, class _Fn1>
  54. struct op_parallel_transform{
  55.     op_parallel_transform(_Fn1 _Func)
  56.         :m_Func(_Func){}
  57.     void operator()(const tbb::blocked_range<Iter_pair<_InIt,_OutIt> > &r) const
  58.     {
  59.         std::transform(r.begin().m_in, r.end().m_in, r.begin().m_out, m_Func);
  60.     }
  61. private:
  62.     _Fn1 m_Func;
  63. };
  64.  
  65. template<class _InIt, class _OutIt, class _Fn1>
  66. _OutIt _parallel_transform(_InIt _First, _InIt _Last, _OutIt _Dest,
  67.     _Fn1 _Func, std::random_access_iterator_tag)
  68. {
  69.     // 使用parallel_for来处理
  70.     typedef typename Iter_pair<_InIt,_OutIt> iter_pair_type;
  71.     _OutIt LastDest = _Dest + (_Last - _First);
  72.     iter_pair_type begin(_First, _Dest);
  73.     iter_pair_type end(_Last, LastDest);
  74.     tbb::blocked_range<iter_pair_type> x(begin, end);
  75.     tbb::parallel_for(x, op_parallel_transform<_InIt,_OutIt,_Fn1>(_Func), tbb::auto_partitioner());
  76.     return LastDest;
  77. }
  78.  
  79. //-----------------顺序存取迭代器版本-------------------------------------
  80. template<class _InIt>
  81. struct filter_in : tbb::filter{
  82.     filter_in(_InIt _First, _InIt _Last)
  83.         :tbb::filter(true),m_First(_First), m_Last(_Last){}
  84.     void* operator()(void*)
  85.     {
  86.         if(m_First==m_Last) return NULL;
  87.  
  88.         void* p = &(*m_First);
  89.         ++m_First;
  90.         return p;
  91.     }
  92. private:
  93.     _InIt m_First, m_Last;
  94. };
  95.  
  96. template<class _Fn1>
  97. struct filter_process : tbb::filter{
  98.     typedef typename _Fn1::result_type r_type;
  99.     typedef typename _Fn1::argument_type a_type;
  100.  
  101.     filter_process(_Fn1 _Func)
  102.         :tbb::filter(false),m_Func(_Func){}
  103.     void* operator()(void* data)
  104.     {
  105.         a_type &at = *(a_type*)data;
  106.         m_r = m_Func( at );
  107.         return &m_r;
  108.     }
  109. private:
  110.     _Fn1 m_Func;
  111.     r_type m_r;
  112. };
  113.  
  114. template<class _OutIt, class _DataType>
  115. struct filter_out : tbb::filter{
  116.     filter_out(_OutIt _Dest)
  117.         :tbb::filter(true),m_Dest(_Dest){}
  118.     void* operator()(void* data)
  119.     {
  120.         _DataType *p = (_DataType*) data;
  121.         *m_Dest = *p;
  122.         ++m_Dest;
  123.         return NULL;
  124.     }
  125. private:
  126.     _OutIt m_Dest;
  127. };
  128.  
  129. template<class _InIt, class _OutIt, class _Fn1>
  130. _OutIt _parallel_transform(_InIt _First, _InIt _Last, _OutIt _Dest,
  131.     _Fn1 _Func, std::forward_iterator_tag)
  132. {
  133.     // 使用管线pipeline来处理
  134.     tbb::pipeline pipeline;
  135.     filter_in<_InIt> f1(_First, _Last);
  136.     filter_process<_Fn1> f2(_Func);
  137.     filter_out<_OutIt, _Fn1::result_type> f3(_Dest);
  138.     pipeline.add_filter(f1);
  139.     pipeline.add_filter(f2);
  140.     pipeline.add_filter(f3);
  141.     pipeline.run(3);
  142.     return _Dest;
  143. }
  144.  
  145. //----------------------parallel_transform----------------------------
  146. template<class _InIt, class _OutIt, class _Fn1>
  147. inline
  148. _OutIt parallel_transform(_InIt _First, _InIt _Last, _OutIt _Dest, _Fn1 _Func)
  149. {
  150.     typedef typename std::iterator_traits<_InIt>::iterator_category cat1;
  151.     typedef typename std::iterator_traits<_OutIt>::iterator_category cat2;
  152.     return _parallel_transform(_First, _Last, _Dest,
  153.         _Func, both_random_helper<cat1,cat2>::Iter_cat());
  154. }
  155.  
  156. // 测试代码
  157. struct SinFunctor
  158.     :std::unary_function<doubledouble>{
  159.     double operator()(double &d) const
  160.     {
  161.         // 高强度运算
  162.         double sum = 0;
  163.         for(int i=0; i<10000; i++)  sum += sin(i*d);
  164.         return sum;
  165.     }
  166. };
  167.  
  168. int main() 
  169. {
  170.     // 随机存取迭代
  171.     std::vector<double> a(10000,1.5);
  172.     // 顺序存取迭代
  173.     std::list<double>   b(10000,1.5);
  174.  
  175.     tbb::task_scheduler_init init;
  176.  
  177.     tbb::tick_count t0,t1;
  178.  
  179.     t0 = tbb::tick_count::now();
  180.     parallel_transform(a.begin(), a.end(), a.begin(), SinFunctor());
  181.     t1 = tbb::tick_count::now();
  182.     std::cout << "并行(随机迭代)" << (t1 - t0).seconds() << std::endl;
  183.     
  184.     t0 = tbb::tick_count::now();
  185.     std::transform(a.begin(), a.end(), a.begin(), SinFunctor());
  186.     t1 = tbb::tick_count::now();
  187.     std::cout << "原版(随机迭代)" << (t1 - t0).seconds() << std::endl;
  188.  
  189.     t0 = tbb::tick_count::now();
  190.     parallel_transform(b.begin(), b.end(), b.begin(), SinFunctor());
  191.     t1 = tbb::tick_count::now();
  192.     std::cout << "并行(顺序迭代)" << (t1 - t0).seconds() << std::endl;
  193.  
  194.     t0 = tbb::tick_count::now();
  195.     std::transform(b.begin(), b.end(), b.begin(), SinFunctor());
  196.     t1 = tbb::tick_count::now();
  197.     std::cout << "原版(顺序迭代)"<< (t1 - t0).seconds() << std::endl;
  198.     
  199.     std::cin.get();
  200.     return 0;
  201. }

在我的双核Centrino电脑上测试结果如下:

并行(随机迭代)3.17299 原版(随机迭代)5.41531 并行(顺序迭代)3.13054 原版(顺序迭代)5.33182

     在顺序存取迭代器版本的_parallel_transform中,因为迭代器不能随意跳转,所以使用了tbb::pipeline加三个filter分 别执行顺序读取,处理和写入。其中的处理是可以并行处理的。从上面的结果可以看出,pipeline的性能甚至不亚于parallel_for循环。关于 pipeline的使用说明,请参考文章:TBB流水线

    这里写的parallel_transform可以直接替换大家原有代码中的std::transform,当然如果有兴趣的话完全可以把标准库中的算法 全用TBB改写成并行算法。不过要注意的一点是并不是任何地方都适合使用并行运算的,象这个例子中测试用的处理算子是“for(int i=0; i<10000; i++)  sum += sin(i*d);”这样的需要耗时较长的运算,如果把它改成“return sin(d);”。那么考虑到线程调度以及TBB的任务调度,并行算法的效率可能还不如串行算法。

    TBB库可以从这里下载:http://www.threadingbuildingblocks.org/download.php

    另外再推荐几篇TBB入门文章:
    Intel Threading Building Blocks 之 并行循环
    Intel Threading Building Blocks 之 并发容器
    Intel Threading Building Blocks 基于任务编程

相信度过了自己的20岁生日之后的C++,在多核时候将再创辉煌!

微信小程序扫码登陆

文章评论

1233人参与,0条评论