返回> 网站首页 

Intel Threading Building Blocks 之 并行循环

yoours2011-01-12 13:47:03 阅读 1363

简介一边听听音乐,一边写写文章。

Intel Threading Building Blocks(下文简称TBB)是一个C++的并行编程模板库,它能使你的程序充分利用多核CPU的性能优势。

关于TBB的简介请点击这里查看

\

配置(以VC2005为例)

www.threadingbuildingblocks.org/download.php下载TBB库,建议顺便把几本教程和参考手册也下下来。

解压...

    其中的include子目录就是头文件路径

    ia32子目录里有vc7.1,vc8和vc9三个下级目录,按照你所使用的IDE选择一个(比如我的VC2005就选择vc8),里面的bin和lib分别是dll文件和对应的lib文件所在地。

    另外还有一个em64t目录,也许是64位的吧,没试过-_-

在VC2005项目属性里:

    附加包含目录里加上TBB路径下子目录include\的完整路径名。

    附加库目录加上TBB路径下子目录ia32\vcX\lib\的完整路径名。

把TBB路径下子目录ia32\vcX\bin添加到PATH环境变量中,或者把dll文件复制到要编译的程序所在目录下,确保系统能找到这些dll文件就行。

说起来挺长,其实只是三个目录的问题:include,lib,bin。我想只要是编写过C程序的人看到这三个目录都会知道该怎么做吧。

开始使用TBB

一个使用TBB库的程序样子应该是这样地:
  1. #include <tbb/task_scheduler_init.h>
  2. #include ...//其它头文件
  3.  
  4. int main()
  5. {
  6.     tbb::task_scheduler_init init;
  7.     ...//代码
  8.     return 0;
  9. }

task_scheduler_init对象在构造时初始化TBB环境(比如线程池之类的东东),析构时回收TBB环境。在使用其它TBB组件之前必须先构造一个task_scheduler_init对象。

我们可以在task_scheduler_init对象的构造函数里指定线程池里线程的数量,比如tbb::task_scheduler_init init(10)。如果不指定,默认值是task_scheduler_init::automatic,它会自动根据当前系统决定线程量。

task_scheduler_init定义在tbb/task_scheduler_init.h文件中,绝大部分的TBB组件都放在它们自己的头文件中,比如下面要讲的blocked_range放在blocked_range.h中,parallel_for放在parallel_for.h中等。

并行排序parallel_sort

为了突出TBB的简单易学,也为了增强一下学习的信心,先放上一个小甜饼:并行排序

  1. template<typename RandomAccessIterator>
  2.   void parallel_sort(RandomAccessIterator begin,
  3.                      RandomAccessIterator end);
  4. template<typename RandomAccessIterator, typename Compare>
  5.   void parallel_sort(RandomAccessIterator begin,
  6.                      RandomAccessIterator end,
  7.                      const Compare& comp ); 

从定义可以看出,它的用法与std::sort完全一样,我们只需把原程序里的std::sort替换成tbb::parallel_sort,就得到了一个多核优化的程序了(嗯~~起码能向别人这么吹了,呵呵)。

例:
  1. #include <tbb/task_scheduler_init.h>
  2. #include <tbb/parallel_sort.h>
  3. #include <math.h>
  4.  
  5. int main()
  6. {
  7.     //准备排序原料
  8.     const int N = 100000;
  9.     float a[N];
  10.     float b[N];
  11.     forint i = 0; i < N; i++ ) {
  12.        a[i] = sin((double)i);
  13.        b[i] = cos((double)i);
  14.     }
  15.     //TBB初始化
  16.     tbb::task_scheduler_init init;
  17.     //排序
  18.     tbb::parallel_sort(a, a + N);
  19.     //倒序
  20.     tbb::parallel_sort(b, b + N, std::greater<float>( ));   
  21.    
  22.     return 0;
  23. }

注意,和std::sort一样,tbb::parallel_sort的排序结果是不稳定的。


使用并行for循环

比如我们的程序里有这样的代码

  1. int main()
  2. {
  3.     for(int i=0; i<10; i++)
  4.     {
  5.         //处理比较耗时的操作,这里用暂停300ms模拟
  6.         Sleep(300);
  7.         std::cout << i;
  8.     }
  9.     return 0;
  10. }

运行它要花费3秒的时间,顺序输出0123456789

现在,我们把它改成并行处理方式:
  1. #include <tbb/task_scheduler_init.h>
  2. #include <tbb/blocked_range.h>
  3. #include <tbb/parallel_for.h>
  4. struct OpFor{
  5.     void operator()(const tbb::blocked_range<int> &r) const
  6.     {
  7.         for(int i=r.begin(); i!=r.end(); ++i)
  8.         {
  9.             //处理比较耗时的操作,这里用暂停300ms模拟
  10.             Sleep(300);
  11.             std::cout << i;
  12.         }
  13.     }
  14. };
  15.  
  16. int main()
  17. {
  18.     tbb::task_scheduler_init init;
  19.     OpFor f;
  20.     tbb::parallel_for(tbb::blocked_range<int>(0,10), f);
  21.     return 0;
  22. }

在我的双核CPU下,它的运行时间是1563ms,快了近一倍,输出是乱序的,说明循环被并行处理了。

template<Value>blocked_range模板类用于存放一对一维迭代器,这对分别指向起始和终止的迭代器组成了一个范围块。

它的构造函数是:
blocked_range( Value begin_, Value end_, size_type grainsize_=1 );

    参数begin_和end_指定起始、终止迭代器,然后可通过begin()和end()方法得到它们。
    第三个参数grainsize指定以多大粒度拆分范围。

在运行时,TBB会把blocked_range指定的范围拆分成几个小块来并行执行。上例中,TBB会生成复制出多个OpFor,然后把拆分出来的小块交给这些OpFor执行。由于我们使用默认拆分粒度1,则tbb::blocked_range<int>(0,10)会被拆分成10个小块交给10个OpFor执行(可以为OpFor加上拷贝构造和析构,看是不是调用了10次)。

parallel_for函数启动并行循环,上例中,一个更好的方式是:
tbb::parallel_for(tbb::blocked_range<int>(0,10), f, tbb::auto_partitioner());

其中第三个参数指定循环体的拆分方式,使用auto_partitioner()则会根据循环块的运算量和系统环境自动决定如何拆分,这是一个推荐的方式。默认是simple_partitioner(),它简单地按照blocked_range的第三个参数拆分循环。

注意,循环块必须是可重入的才可以用parallel_for来提升效率。如果一段代码,上一次运行和下一次运行之间没有联系,或者说上一次运行不会改变下一次运行的行为就是可重入的。比如:

下面这段代码for循环里的代码块是可重入的,前一次循环和下一次循环没有关系。
  1. char buf[] = "hello world";
  2. for(int i=0; i<sizeof(buf); i++)
  3.     buf[i] = toupper(buf[i]);

下面这段累加的代码是不可重入的,因为下一次循环依赖于上一次循环的结果(想象一下,如果i=5和i=6并行执行的情形,这个sum的值必然要出现混乱)。

  1. int sum=0;
  2. for(int i=0; i<10; i++)
  3.     sum += i;

使用parallel_reduce来适应更多循环

上面提到一个不可重入的累加的例子,不能使用parallel_for并行执行,TBB用另一种方法让这种形式的循环也能并行执行。见下图:
\

把一个范围blocked_range切分成几份小块,并行地执行各小块中的累加操作,最后把各小块的累加结果合并起来得到最终结果。

代码:
  1. #include <tbb/task_scheduler_init.h>
  2. #include <tbb/blocked_range.h>
  3. #include <tbb/parallel_reduce.h>
  4.  
  5. struct OpSum{
  6.     int m_sum;
  7.  
  8.     void operator()(const tbb::blocked_range<int> &r)
  9.     {
  10.         for(int i=r.begin(); i!=r.end(); ++i)
  11.         {
  12.             Sleep(300);
  13.             m_sum += i;
  14.         }
  15.     }
  16.  
  17.     // 合并支线
  18.     void join(OpSum& x){m_sum += x.m_sum;}
  19.  
  20.     // 分出一个支线,如果要访问x,应该保证原子操作
  21.     OpSum( OpSum& x, tbb::split)
  22.         : m_sum(0) {;}
  23.     OpSum()
  24.         :m_sum(0) {;}
  25. };
  26.  
  27. int main()
  28. {
  29.     tbb::task_scheduler_init init;
  30.     OpSum x;
  31.     tbb::parallel_reduce(tbb::blocked_range<int>(0,10), x, tbb::auto_partitioner());
  32.     return 0;
  33. }

本例中,parallel_reduce的用法和parallel_for相同。关键是运算子OpSum,它必须要有一个构造函数OpSum( OpSum& x, tbb::split)和一个方法void join(OpSum& x)

当TBB决定分出一个blocked_range时,会调用OpSum( OpSum& x, tbb::split),这个tbb::split只是一个点位符,用于和拷贝构造函数相区别。

当TBB合并blocked_range时,会调用void join(OpSum& x),本例中合并就意味着把两个结果加起来。

在TBB的examples目录下有使用parallel_reduce并行查找素数的例子。


在循环中实时添加源数据parallel_do

parallel_do是一个类似于std::for_each的函数,我们可以直接把它看作std::for_each,当然,它是并行的:

  1. #include <tbb/task_scheduler_init.h>
  2. #include <tbb/parallel_do.h>
  3. #include <functional>
  4. void fun(char x)
  5. {
  6.     Sleep(300); //暂停300ms,模拟长时间运算。
  7.     std::cout << x;
  8. }
  9. int main()
  10. {
  11.     tbb::task_scheduler_init init;
  12.  
  13.     char data[]="abcdefg";
  14.     tbb::parallel_do(data, data+strlen(data), std::ptr_fun(fun));
  15.  
  16.     return 0;
  17. }

parallel_do内部维护了一个parallel_do_feeder对象,我们可以通过这个对象添加更多数据,parallel_do也会把我们实时添加的数据放入算子运算。调用算子时,parallel_do会尝试把parallel_do_feeder作为第二个参数送给运算子,我们可以在这里添加数据:

  1. #include <tbb/task_scheduler_init.h>
  2. #include <tbb/parallel_do.h>
  3. #include <functional>
  4. void fun(char x)
  5. {
  6.     Sleep(300); //暂停300ms,模拟长时间运算。
  7.     std::cout << x;
  8. }
  9. struct CFunor{
  10.     void operator()(char x, tbb::parallel_do_feeder<char>& feeder) const
  11.     {
  12.         fun(x);
  13.         //再加点数据(大写的)
  14.         if(x>='a') feeder.add( x - 'a' + 'A' );
  15.     }
  16. };
  17. int main()
  18. {
  19.     tbb::task_scheduler_init init;
  20.  
  21.     char data[]="abcdefg";
  22.     tbb::parallel_do(data, data+strlen(data), CFunor());
  23.  
  24.     return 0;
  25. }

流水线pipeline

流水线操作是常用的一种并行模式,它模仿传统的工厂装配线的工作模式。数据流过一系列的流水线节点,每个节点以自己的方式处理数据,然后传给下一节点。这里,有些节点可能可以并行操作,而有些不行。比如视频处理,有些节点对某帧的操作不依赖于其它帧的数据,这些节点就可以同时处理多个帧;而有些节点就需要等上一帧处理完再处理下一帧。

TBB的pipelinefilter类实现了这种流水线模式。

数据处理节点要从filter继承,filter的构造函数只有一个bool型的参数,指出是否是串行操作,如果是true,则此节点只能串行操作,如果是false,这个节点就可以并行操作了。

filter里有一个纯虚函数void* operator()(void* item),必须实现它。我们从输入参数item取得上一节点传入的数据,然后返回处理后的数据。

完成所有节点后,使用pipeline的add_filter方法按顺序加入这些节点,最后调用pipeline的run方法运行这个流水线,直到流水线的第一个节点返回NULL为止。

  1. #include <tbb/task_scheduler_init.h>
  2. #include <tbb/pipeline.h>
  3. struct MyFilter1 : tbb::filter{
  4.     MyFilter1(int count)
  5.         :m_nCount(count),tbb::filter(true// bool is_serial_
  6.     {
  7.     }
  8.  
  9.     virtual void* operator()(void*)
  10.     {
  11.         // 模拟读取数据,这里直接返回存放当前线程ID的一个内存空间
  12.         if(m_nCount>0)
  13.         {
  14.             m_nCount--;
  15.             LPDWORD pdwThread = new DWORD;
  16.             *pdwThread = ::GetCurrentThreadId();
  17.             return pdwThread;
  18.         }
  19.         else
  20.         {
  21.             return NULL;
  22.         }
  23.     }
  24. private:
  25.     int m_nCount;
  26. };
  27. struct MyFilter2 : tbb::filter{
  28.     MyFilter2()
  29.         :tbb::filter(false// bool is_serial_
  30.     {
  31.     }
  32.  
  33.     virtual void* operator()(void* item)
  34.     {
  35.         // 模拟处理数据,这里叠加一个当前线程ID
  36.         LPDWORD lpPrev = (LPDWORD)item;
  37.         LPDWORD pdwThreads = new DWORD[2];
  38.         pdwThreads[0] = *lpPrev;
  39.         pdwThreads[1] = ::GetCurrentThreadId();
  40.         delete lpPrev; //把前面传过来的内存块删除
  41.         Sleep(1000); //模拟长时间处理
  42.         return pdwThreads;
  43.     }
  44. };
  45.  
  46. struct MyFilter3 : tbb::filter{
  47.     MyFilter3()
  48.         :tbb::filter(true// bool is_serial_
  49.     {
  50.     }
  51.  
  52.     virtual void* operator()(void* item)
  53.     {
  54.         //模拟最后一步处理,一般是输出,这里直接向控制台输出
  55.         //因为MyFilter3指定了串行工作,所以可以放心地用cout输出
  56.         LPDWORD lpPrev = (LPDWORD)item;
  57.         std::cout << "MyFilter1:" << std::hex << lpPrev[0];
  58.         std::cout << ";MyFilter2:" << lpPrev[1];
  59.         std::cout << ";MyFilter3:" << ::GetCurrentThreadId() << std::endl;
  60.         delete []lpPrev; //把前面传过来的内存块删除
  61.         return NULL;
  62.     }
  63. };
  64. int main()
  65. {
  66.     tbb::task_scheduler_init init;
  67.  
  68.     tbb::pipeline pipeline;
  69.     MyFilter1 f1(10);
  70.     MyFilter2 f2;
  71.     MyFilter3 f3;
  72.     pipeline.add_filter(f1);
  73.     pipeline.add_filter(f2);
  74.     pipeline.add_filter(f3);
  75.     pipeline.run(5);
  76.  
  77.     return 0;
  78. }

本例中,MyFilter1取得数据,并加了一个结束条件(取m_nCount次);然后把数据传给MyFilter2,MyFilter2处理时间比较长(每次操作要花1秒的时间),好在它能并行处理;最后传给MyFilter3输出。运行方式如下图

\

pipeline的run方法要指定最大可同时启动的任务数,上图中最大同时启动了5个任务(一种颜色表示一个任务)。要注意的是,实际的任务数还与task_scheduler_init初始定义的线程池容量有关,如我的双核CPU电脑上貌似默认的线程池里只有两个线程,要试验最大任务数的话可以直接指定线程池容量,如直接指定线程池里有20个线程:

tbb::task_scheduler_init init(20);


微信小程序扫码登陆

文章评论

1363人参与,0条评论