关于TBB的简介请点击这里查看
配置(以VC2005为例)
从www.threadingbuildingblocks.org/download.php下载TBB库,建议顺便把几本教程和参考手册也下下来。
解压...
其中的include子目录就是头文件路径
ia32子目录里有vc7.1,vc8和vc9三个下级目录,按照你所使用的IDE选择一个(比如我的VC2005就选择vc8),里面的bin和lib分别是dll文件和对应的lib文件所在地。
另外还有一个em64t目录,也许是64位的吧,没试过-_-
在VC2005项目属性里:
附加包含目录里加上TBB路径下子目录include\的完整路径名。
附加库目录加上TBB路径下子目录ia32\vcX\lib\的完整路径名。
把TBB路径下子目录ia32\vcX\bin添加到PATH环境变量中,或者把dll文件复制到要编译的程序所在目录下,确保系统能找到这些dll文件就行。
说起来挺长,其实只是三个目录的问题:include,lib,bin。我想只要是编写过C程序的人看到这三个目录都会知道该怎么做吧。
开始使用TBB
一个使用TBB库的程序样子应该是这样地:
- #include <tbb/task_scheduler_init.h>
- #include ...//其它头文件
-
- int main()
- {
- tbb::task_scheduler_init init;
- ...//代码
- return 0;
- }
task_scheduler_init对象在构造时初始化TBB环境(比如线程池之类的东东),析构时回收TBB环境。在使用其它TBB组件之前必须先构造一个task_scheduler_init对象。
我们可以在task_scheduler_init对象的构造函数里指定线程池里线程的数量,比如tbb::task_scheduler_init init(10)。如果不指定,默认值是task_scheduler_init::automatic,它会自动根据当前系统决定线程量。
task_scheduler_init定义在tbb/task_scheduler_init.h文件中,绝大部分的TBB组件都放在它们自己的头文件中,比如下面要讲的blocked_range放在blocked_range.h中,parallel_for放在parallel_for.h中等。
并行排序parallel_sort
为了突出TBB的简单易学,也为了增强一下学习的信心,先放上一个小甜饼:并行排序
- template<typename RandomAccessIterator>
- void parallel_sort(RandomAccessIterator begin,
- RandomAccessIterator end);
- template<typename RandomAccessIterator, typename Compare>
- void parallel_sort(RandomAccessIterator begin,
- RandomAccessIterator end,
- const Compare& comp );
从定义可以看出,它的用法与std::sort完全一样,我们只需把原程序里的std::sort替换成tbb::parallel_sort,就得到了一个多核优化的程序了(嗯~~起码能向别人这么吹了,呵呵)。
例:
- #include <tbb/task_scheduler_init.h>
- #include <tbb/parallel_sort.h>
- #include <math.h>
-
- int main()
- {
- //准备排序原料
- const int N = 100000;
- float a[N];
- float b[N];
- for( int i = 0; i < N; i++ ) {
- a[i] = sin((double)i);
- b[i] = cos((double)i);
- }
- //TBB初始化
- tbb::task_scheduler_init init;
- //排序
- tbb::parallel_sort(a, a + N);
- //倒序
- tbb::parallel_sort(b, b + N, std::greater<float>( ));
-
- return 0;
- }
注意,和std::sort一样,tbb::parallel_sort的排序结果是不稳定的。
使用并行for循环
比如我们的程序里有这样的代码
- int main()
- {
- for(int i=0; i<10; i++)
- {
- //处理比较耗时的操作,这里用暂停300ms模拟
- Sleep(300);
- std::cout << i;
- }
- return 0;
- }
运行它要花费3秒的时间,顺序输出0123456789
现在,我们把它改成并行处理方式:
- #include <tbb/task_scheduler_init.h>
- #include <tbb/blocked_range.h>
- #include <tbb/parallel_for.h>
- struct OpFor{
- void operator()(const tbb::blocked_range<int> &r) const
- {
- for(int i=r.begin(); i!=r.end(); ++i)
- {
- //处理比较耗时的操作,这里用暂停300ms模拟
- Sleep(300);
- std::cout << i;
- }
- }
- };
-
- int main()
- {
- tbb::task_scheduler_init init;
- OpFor f;
- tbb::parallel_for(tbb::blocked_range<int>(0,10), f);
- return 0;
- }
在我的双核CPU下,它的运行时间是1563ms,快了近一倍,输出是乱序的,说明循环被并行处理了。
template<Value>blocked_range模板类用于存放一对一维迭代器,这对分别指向起始和终止的迭代器组成了一个范围块。
它的构造函数是:
blocked_range( Value begin_, Value end_, size_type grainsize_=1 );
参数begin_和end_指定起始、终止迭代器,然后可通过begin()和end()方法得到它们。
第三个参数grainsize指定以多大粒度拆分范围。
在运行时,TBB会把blocked_range指定的范围拆分成几个小块来并行执行。上例中,TBB会生成复制出多个OpFor,然后把拆分出来的小块交给这些OpFor执行。由于我们使用默认拆分粒度1,则tbb::blocked_range<int>(0,10)会被拆分成10个小块交给10个OpFor执行(可以为OpFor加上拷贝构造和析构,看是不是调用了10次)。
parallel_for函数启动并行循环,上例中,一个更好的方式是:
tbb::parallel_for(tbb::blocked_range<int>(0,10), f, tbb::auto_partitioner());
其中第三个参数指定循环体的拆分方式,使用auto_partitioner()则会根据循环块的运算量和系统环境自动决定如何拆分,这是一个推荐的方式。默认是simple_partitioner(),它简单地按照blocked_range的第三个参数拆分循环。
注意,循环块必须是可重入的才可以用parallel_for来提升效率。如果一段代码,上一次运行和下一次运行之间没有联系,或者说上一次运行不会改变下一次运行的行为就是可重入的。比如:
下面这段代码for循环里的代码块是可重入的,前一次循环和下一次循环没有关系。
- char buf[] = "hello world";
- for(int i=0; i<sizeof(buf); i++)
- buf[i] = toupper(buf[i]);
下面这段累加的代码是不可重入的,因为下一次循环依赖于上一次循环的结果(想象一下,如果i=5和i=6并行执行的情形,这个sum的值必然要出现混乱)。
- int sum=0;
- for(int i=0; i<10; i++)
- sum += i;
使用parallel_reduce来适应更多循环
上面提到一个不可重入的累加的例子,不能使用
parallel_for并行执行,TBB用另一种方法让这种形式的循环也能并行执行。见下图:
把一个范围blocked_range切分成几份小块,并行地执行各小块中的累加操作,最后把各小块的累加结果合并起来得到最终结果。
代码:
- #include <tbb/task_scheduler_init.h>
- #include <tbb/blocked_range.h>
- #include <tbb/parallel_reduce.h>
-
- struct OpSum{
- int m_sum;
-
- void operator()(const tbb::blocked_range<int> &r)
- {
- for(int i=r.begin(); i!=r.end(); ++i)
- {
- Sleep(300);
- m_sum += i;
- }
- }
-
- // 合并支线
- void join(OpSum& x){m_sum += x.m_sum;}
-
- // 分出一个支线,如果要访问x,应该保证原子操作
- OpSum( OpSum& x, tbb::split)
- : m_sum(0) {;}
- OpSum()
- :m_sum(0) {;}
- };
-
- int main()
- {
- tbb::task_scheduler_init init;
- OpSum x;
- tbb::parallel_reduce(tbb::blocked_range<int>(0,10), x, tbb::auto_partitioner());
- return 0;
- }
本例中,parallel_reduce的用法和parallel_for相同。关键是运算子OpSum,它必须要有一个构造函数OpSum( OpSum& x, tbb::split)和一个方法void join(OpSum& x)。
当TBB决定分出一个blocked_range时,会调用OpSum( OpSum& x, tbb::split),这个tbb::split只是一个点位符,用于和拷贝构造函数相区别。
当TBB合并blocked_range时,会调用void join(OpSum& x),本例中合并就意味着把两个结果加起来。
在TBB的examples目录下有使用parallel_reduce并行查找素数的例子。
在循环中实时添加源数据parallel_do
parallel_do是一个类似于std::for_each的函数,我们可以直接把它看作std::for_each,当然,它是并行的:
- #include <tbb/task_scheduler_init.h>
- #include <tbb/parallel_do.h>
- #include <functional>
- void fun(char x)
- {
- Sleep(300); //暂停300ms,模拟长时间运算。
- std::cout << x;
- }
- int main()
- {
- tbb::task_scheduler_init init;
-
- char data[]="abcdefg";
- tbb::parallel_do(data, data+strlen(data), std::ptr_fun(fun));
-
- return 0;
- }
parallel_do内部维护了一个parallel_do_feeder对象,我们可以通过这个对象添加更多数据,parallel_do也会把我们实时添加的数据放入算子运算。调用算子时,parallel_do会尝试把parallel_do_feeder作为第二个参数送给运算子,我们可以在这里添加数据:
- #include <tbb/task_scheduler_init.h>
- #include <tbb/parallel_do.h>
- #include <functional>
- void fun(char x)
- {
- Sleep(300); //暂停300ms,模拟长时间运算。
- std::cout << x;
- }
- struct CFunor{
- void operator()(char x, tbb::parallel_do_feeder<char>& feeder) const
- {
- fun(x);
- //再加点数据(大写的)
- if(x>='a') feeder.add( x - 'a' + 'A' );
- }
- };
- int main()
- {
- tbb::task_scheduler_init init;
-
- char data[]="abcdefg";
- tbb::parallel_do(data, data+strlen(data), CFunor());
-
- return 0;
- }
流水线pipeline
流水线操作是常用的一种并行模式,它模仿传统的工厂装配线的工作模式。数据流过一系列的流水线节点,每个节点以自己的方式处理数据,然后传给下一节点。这里,有些节点可能可以并行操作,而有些不行。比如视频处理,有些节点对某帧的操作不依赖于其它帧的数据,这些节点就可以同时处理多个帧;而有些节点就需要等上一帧处理完再处理下一帧。
TBB的pipeline和filter类实现了这种流水线模式。
数据处理节点要从filter继承,filter的构造函数只有一个bool型的参数,指出是否是串行操作,如果是true,则此节点只能串行操作,如果是false,这个节点就可以并行操作了。
filter里有一个纯虚函数void* operator()(void* item),必须实现它。我们从输入参数item取得上一节点传入的数据,然后返回处理后的数据。
完成所有节点后,使用pipeline的add_filter方法按顺序加入这些节点,最后调用pipeline的run方法运行这个流水线,直到流水线的第一个节点返回NULL为止。
- #include <tbb/task_scheduler_init.h>
- #include <tbb/pipeline.h>
- struct MyFilter1 : tbb::filter{
- MyFilter1(int count)
- :m_nCount(count),tbb::filter(true) // bool is_serial_
- {
- }
-
- virtual void* operator()(void*)
- {
- // 模拟读取数据,这里直接返回存放当前线程ID的一个内存空间
- if(m_nCount>0)
- {
- m_nCount--;
- LPDWORD pdwThread = new DWORD;
- *pdwThread = ::GetCurrentThreadId();
- return pdwThread;
- }
- else
- {
- return NULL;
- }
- }
- private:
- int m_nCount;
- };
- struct MyFilter2 : tbb::filter{
- MyFilter2()
- :tbb::filter(false) // bool is_serial_
- {
- }
-
- virtual void* operator()(void* item)
- {
- // 模拟处理数据,这里叠加一个当前线程ID
- LPDWORD lpPrev = (LPDWORD)item;
- LPDWORD pdwThreads = new DWORD[2];
- pdwThreads[0] = *lpPrev;
- pdwThreads[1] = ::GetCurrentThreadId();
- delete lpPrev; //把前面传过来的内存块删除
- Sleep(1000); //模拟长时间处理
- return pdwThreads;
- }
- };
-
- struct MyFilter3 : tbb::filter{
- MyFilter3()
- :tbb::filter(true) // bool is_serial_
- {
- }
-
- virtual void* operator()(void* item)
- {
- //模拟最后一步处理,一般是输出,这里直接向控制台输出
- //因为MyFilter3指定了串行工作,所以可以放心地用cout输出
- LPDWORD lpPrev = (LPDWORD)item;
- std::cout << "MyFilter1:" << std::hex << lpPrev[0];
- std::cout << ";MyFilter2:" << lpPrev[1];
- std::cout << ";MyFilter3:" << ::GetCurrentThreadId() << std::endl;
- delete []lpPrev; //把前面传过来的内存块删除
- return NULL;
- }
- };
- int main()
- {
- tbb::task_scheduler_init init;
-
- tbb::pipeline pipeline;
- MyFilter1 f1(10);
- MyFilter2 f2;
- MyFilter3 f3;
- pipeline.add_filter(f1);
- pipeline.add_filter(f2);
- pipeline.add_filter(f3);
- pipeline.run(5);
-
- return 0;
- }
本例中,MyFilter1取得数据,并加了一个结束条件(取m_nCount次);然后把数据传给MyFilter2,MyFilter2处理时间比较长(每次操作要花1秒的时间),好在它能并行处理;最后传给MyFilter3输出。运行方式如下图
pipeline的run方法要指定最大可同时启动的任务数,上图中最大同时启动了5个任务(一种颜色表示一个任务)。要注意的是,实际的任务数还与task_scheduler_init初始定义的线程池容量有关,如我的双核CPU电脑上貌似默认的线程池里只有两个线程,要试验最大任务数的话可以直接指定线程池容量,如直接指定线程池里有20个线程:
tbb::task_scheduler_init init(20);