9 矩阵特征值计算 在实际的工程计算中,经常会遇到求n阶方阵A的特征值(Eigenvalue)与特征向量(Eigenvector)的问题。对于一个方阵A,如果数值λ使方程组 Ax=λx 即 (A-λIn)x=0有非零解向量(Solution Vector)x,则称λ为方阵A的特征值,而非零向量x为特征值λ所对应的特征向量,其中In为n阶单位矩阵。 由于根据定义直接求矩阵特征值的过程比较复杂,因此在实际计算中,往往采取一些数值方法。本章主要介绍求一般方阵绝对值最大的特征值的乘幂(Power)法、求对称方阵特征值的雅可比法和单侧旋转(One-side Rotation)法以及求一般矩阵全部特征值的QR方法及一些相关的并行算法。 求解矩阵最大特征值的乘幂法 乘幂法及其串行算法 在许多实际问题中,只需要计算绝对值最大的特征值,而并不需要求矩阵的全部特征值。乘幂法是一种求矩阵绝对值最大的特征值的方法。记实方阵A的n个特征值为λi i=(1,2, …,n),且满足: │λ1│≥│λ2│≥│λ3│≥…≥│λn│ 特征值λi对应的特征向量为xi。乘幂法的做法是:①取n维非零向量v0作为初始向量;②对于k=1,2, …,做如下迭代: uk =Avk-1 vk= uk /║uk║∞ 直至ε为止,这时vk+1就是A的绝对值最大的特征值λ1所对应的特征向量x1。若vk-1与vk的各个分量同号且成比例,则λ1=║uk║∞;若vk-1与vk的各个分量异号且成比例,则λ1= -║uk║∞。若各取一次乘法和加法运算时间、一次除法运算时间、一次比较运算时间为一个单位时间,则因为一轮计算要做一次矩阵向量相乘、一次求最大元操作和一次规格化操作,所以下述乘幂法串行算法21.1的一轮计算时间为n2+2n=O(n2 )。 算法21.1 单处理器上乘幂法求解矩阵最大特征值的算法 输入:系数矩阵An×n,初始向量v n×1,ε 输出:最大的特征值max Begin while (│diff│>ε) do (1)for i=1 to n do (1.1)sum=0 (1.2)for j= 1 to n do sum=sum+a[i,j]*x[j] end for (1.3)b[i]= sum end for (2)max=│b[1]│ (3)for i=2 to n do if (│b[i]│>max) then max=│b[i]│ end if end for (4)for i=1 to n do x[i] =b[i]/max end for (5)diff=max-oldmax , oldmax=max end while End 乘幂法并行算法 乘幂法求矩阵特征值由反复进行矩阵向量相乘来实现,因而可以采用矩阵向量相乘的数据划分方法。设处理器个数为p,对矩阵A按行划分为p块,每块含有连续的m行向量,这里,编号为i的处理器含有A的第im至第(i+1)m-1行数据,(i=0,1, …,p-1),初始向量v被广播给所有处理器。 各处理器并行地对存于局部存储器中a的行块和向量v做乘积操作,并求其m维结果向量中的最大值localmax,然后通过归约操作的求最大值运算得到整个n维结果向量中的最大值max并广播给所有处理器,各处理器利用max进行规格化操作。最后通过扩展收集操作将各处理器中的维结果向量按处理器编号连接起来并广播给所有处理器,以进行下一次迭代计算,直至收敛。具体算法框架描述如下: 算法21.2 乘幂法求解矩阵最大特征值的并行算法 输入:系数矩阵An×n,初始向量v n×1,ε 输出:最大的特征值max Begin 对所有处理器my_rank(my_rank=0,…, p-1)同时执行如下的算法: while (│diff│>ε) do /* diff 为特征向量的各个分量新旧值之差的最大值*/ (1)for i=0 to m-1 do /*对所存的行计算特征向量的相应分量*/ (1.1)sum=0 (1.2)for j= 0 to n-1 do sum=sum+a[i,j]*x[j] end for (1.3)b[i]= sum end for (2)localmax=│b[0]│ /*对所计算的特征向量的相应分量 求新旧值之差的最大值localmax */ (3)for i=1 to m-1 do if (│b[i]│>localmax) then localmax=│b[i]│ end if end for (4)用Allreduce操作求出所有处理器中localmax值的最大值max 并广播到所有处理器中 (5)for i=0to m-1 do /*对所计算的特征向量归一化 */ b[i] =b[i]/max end for (6)用Allgather操作将各个处理器中计算出的特征向量的分量的新值组合并广播到 所有处理器中 (7)diff=max-oldmax, oldmax=max end while End 若各取一次乘法和加法运算时间、一次除法运算时间、一次比较运算时间为一个单位时间,一轮迭代的计算时间为mn+2m;一轮迭代中,各处理器做一次归约操作,通信量为1,一次扩展收集操作,通信量为m,则通信时间为。因此乘幂法的一轮并行计算时间为。 MPI源程序请参见所附光盘。 求对称矩阵特征值的雅可比法 雅可比法求对称矩阵特征值的串行算法 若矩阵A=[aij]是n阶实对称矩阵,则存在一个正交矩阵U,使得 UTAU=D 其中D是一个对角矩阵,即  这里λi (i=1,2,…,n)是A的特征值,U的第i列是与λi对应的特征向量。雅可比算法主要是通过正交相似变换将一个实对称矩阵对角化,从而求出该矩阵的全部特征值和对应的特征向量。因此可以用一系列的初等正交变换逐步消去A的非对角线元素,从而使矩阵A对角化。设初等正交矩阵为R(p,q,θ),其(p,p)( q,q)位置的数据是cosθ,(p, q)( q, p)位置的数据分别是-sinθ和sinθ(p ≠ q),其它位置的数据和一个同阶数的单位矩阵相同。显然可以得到: R(p,q,θ) TR(p,q,θ)=In 不妨记B= R(p,q,θ)TAR(p,q,θ),如果将右端展开,则可知矩阵B的元素与矩阵A的元素之间有如下关系: bpp = appcos2θ+aqqsin2θ+apqsin2θ bqq = appsin2θ+aqq cos2θ-apqsin2θ bpq = ((aqq -app)sin2θ)/2+apq cos2θ bqp = bpq bpj= apjcosθ+aqjsinθ bqj = -apjsinθ +aqjcosθ bip= aipcosθ+aiqsinθ biq= -aipsinθ +aiqcosθ bij= aij 其中1 ≤ i, j ≤ n且i,j ≠ p,q。因为A为对称矩阵,R为正交矩阵,所以B亦为对称矩阵。若要求矩阵B的元素bpq =0,则只需令 ((aqq -app)sin2θ)/2+apq cos2θ=0,即:  在实际应用时,考虑到并不需要解出θ,而只需要求出sin2θ,sinθ和cosθ就可以了。如果限制θ值在-π/2 < 2θ ≤ π/2,则可令  容易推出: , ,  利用sin2θ,sinθ和cosθ的值,即得矩阵B的各元素。矩阵A经过旋转变换,选定的非主对角元素apq及aqp(一般是绝对值最大的)就被消去,且其主对角元素的平方和增加了,而非主对角元素的平方和减少了2a2pq,矩阵元素总的平方和不变。通过反复选取主元素apq,并作旋转变换,就逐步将矩阵A变为对角矩阵。在对称矩阵中共有(n2-n)/2个非主对角元素要被消去, 而每消去一个非主对角元素需要对2n个元素进行旋转变换, 对一个元素进行旋转变换需要2次乘法和1次加法,若各取一次乘法运算时间或一次加法运算时间为一个单位时间,则消去一个非主对角元素需要3个单位运算时间,所以下述算法21.3的一轮计算时间复杂度为 (n2-n)/2*2n*3=3n2(n-1)=O(n3)。 算法21.3 单处理器上雅可比法求对称矩阵特征值的算法 输入:矩阵An×n,ε 输出:矩阵特征值Eigenvalue Begin (1)while (max >ε) do (1.1) max=a[1,2] (1.2)for i=1 to n do for j= i+1 to n do if (│a[i,j]) │>max) then max =│a[i,j] │,p=i,q=j end if end for end for (1.3)Compute: m=- a[p,q],n=(a[q,q]- a[p,p])/2,w=sgn(n)*m/sqrt(m*m+n*n), si2=w,si1=w/sqrt(2(1+ sqrt(1-w*w))),co1=sqrt(1-si1*si1), b[p,p]= a[p,p]*co1*co1+ a[q,q]*si1*si1+ a[p,q]*si2, b[q,q]= a[p,p]*si1*si1+ a[q,q]*co1*co1- a[p,q]*si2, b[q, p]=0,b[p,q]=0 (1.4)for j=1 to n do if ((j ≠ p) and ( j ≠ q)) then (i)b[p,j]= a[p,j]*co1+ a[q,j]*si1 (ii)b[q,j]= -a[p,j]*si1 + a[q,j]*co1 end if end for (1.5)for i=1 to n do if((i ≠ p) and (i ≠ q)) then (i)b[i, p]= a[i,p]*co1+ a[i,q]*si1 (ii)b[i, q]= - a[i,p]*si1+ a[i,q]*co1 end if end for (1.6)for i=1 to n do for j=1 to n do a[i,j]=b[i,j] end for end for end while (2)for i=1 to n do Eigenvalue[i]= a[i, i] end for End 雅可比法求对称矩阵特征值的并行算法 串行雅可比算法逐次寻找非主对角元绝对值最大的元素的方法并不适合于并行计算。因此,在并行处理中,我们每次并不寻找绝对值最大的非主对角元消去,而是按一定的顺序将A中的所有上三角元素全部消去一遍,这样的过程称为一轮。由于对称性,在一轮中,A的下三角元素也同时被消去一遍。经过若干轮,可使A的非主对角元的绝对值减少,收敛于一个对角方阵。具体算法如下: 设处理器个数为p,对矩阵A按行划分为2p块,每块含有连续的m/2行向量,记,这些行块依次记为A0,A1, …,A2p-1,并将A2i与A2i+1存放与标号为i的处理器中。 每轮计算开始,各处理器首先对其局部存储器中所存的两个行块的所有行两两配对进行旋转变换,消去相应的非对角线元素。然后按图21.1所示规律将数据块在不同处理器之间传送,以消去其它非主对角元素。  图 21.1 p=4时的雅可比算法求对称矩阵特征值的数据交换图 这里长方形框中两个方格内的整数被看成是所移动的行块的编号。在要构成新的行块配对时,只要将每个处理器所对应的行块按箭头方向移至相邻的处理器即可,这样的传送可以在行块之间实现完全配对。当编号为i和j的两个行块被送至同一处理器时,令编号为i的行块中的每行元素和编号为j的行块中的每行元素配对,以消去相应的非主对角元素,这样在所有的行块都两两配对之后,可以将所有的非主对角元素消去一遍,从而完成一轮计算。由图 21.1可以看出,在一轮计算中,处理器之间要2p-1次交换行块。为保证不同行块配对时可以将原矩阵A的非主对角元素消去,引入变量b记录每个处理器中的行块数据在原矩阵A中的实际行号。在行块交换时,变量b也跟着相应的行块在各处理器之间传送。 处理器之间的数据块交换存在如下规律: 0号处理器前一个行块(简称前数据块,后一个行块简称后数据块)始终保持不变,将后数据块发送给1号处理器,作为1号处理器的前数据块。同时接收1号处理器发送的后数据块作为自己的后数据块。 p-1号处理器首先将后数据块发送给编号为p-2的处理器,作为p-2号处理器的后数据块。然后将前数据块移至后数据块的位置上,最后接收p-2号处理器发送的前数据块作为自己的前数据块。 编号为my_rank的其余处理器将前数据块发送给编号为my_rank+1的处理器,作为my_rank+1号处理器的前数据块。将后数据块发送给编号为my_rank-1的处理器,作为my_rank-1号处理器的后数据块。 为了避免在通信过程中发生死锁,奇数号处理器和偶数号处理器的收发顺序被错开。使偶数号处理器先发送后接收;而奇数号处理器先将数据保存在缓冲区中,然后接收偶数号处理器发送的数据,最后再将缓冲区中的数据发送给偶数号处理器。数据块传送的具体过程描述如下: 算法21.4 雅可比法求对称矩阵特征值的并行过程中处理器之间的数据块交换算法 输入:矩阵A的行数据块和向量b的数据块分布于各个处理器中 输出:在处理器阵列中传送后的矩阵A的行数据块和向量b的数据块 Begin 对所有处理器my_rank(my_rank=0,…, p-1)同时执行如下的算法: /*矩阵A和向量b为要传送的数据块*/ (1)if (my-rank=0) then /*0号处理器*/ (1.1)将后数据块发送给1号处理器 (1.2)接收1号处理器发送来的后数据块作为自己的后数据块 end if (2)if ((my-rank=p-1) and ( my-rank mod 2 ≠ 0)) then /*处理器p-1且其为奇数*/ (2.1)for i=m/2 to m-1 do /* 将后数据块保存在缓冲区buffer中*/ for j=0 to n-1 do buffer[i-m/2,j]=a[i,j] end for end for (2.2)for i=m/2 to m-1 do buf [i-m/2] =b[i] end for (2.3)for i=0 to m/2-1 do /*将前数据块移至后数据块的位置上*/ for j=0 to n-1 do a[i+m/2,j]=a[i,j] end for end for (2.4)for i=0 to m/2-1 do b[i+m/2] =b[i] end for (2.5)接收p-2号处理器发送的数据块作为自己的前数据块 (2.6)将buffer中的后数据块发送给编号为p-2的处理器 end if (3)if ((my-rank=p-1) and ( my-rank mod 2=0)) then /*处理器p-1且其为偶数*/ (3.1)将后数据块发送给编号为p-2的处理器 (3.2)for i=0 to m/2-1 do /*将前数据块移至后数据块的位置上*/ for j=0 to n-1 do a[i+m/2,j]=a[i,j] end for end for (3.3)for i=0 to m/2-1 do b[i+m/2] =b[i] end for (3.4)接收p-2号处理器发送的数据块作为自己的前数据块 end if (4)if ((my-rank ≠ p-1) and ( my-rank ≠ 0)) then /*其它的处理器*/ (4.1)if (my-rank mod 2=0) then /*偶数号处理器*/ (i)将前数据块发送给编号为my_rank+1的处理器 (ii)将后数据块发送给编号为my_rank-1的处理器 (ii)接收编号为my_rank-1的处理器发送的数据块作为自己的前数据块 (iv)接收编号为my_rank+1的处理器发送的数据块作为自己的后数据块 else /*奇数号处理器*/ (v)for i=0 to m-1 do /* 将前后数据块保存在缓冲区buffer中*/ for j=0 to n-1 do buffer[i,j]=a[i,j] end for end for (vi)for i=0 to m-1 do buf[i] =b[i] end for (vii)接收编号为my_rank-1的处理器发送的数据块作为自己的前数据块 (viii)接收编号为my_rank+1的处理器发送的数据块作为自己的后数据块 (ix)将存于buffer中的前数据块发送给编号为my_rank+1的处理器 (x)将存于buffer中的后数据块发送给编号为my_rank-1的处理器 end if end if End 各处理器并行地对其局部存储器中的非主对角元素aij进行消去,首先计算旋转参数并对第i行和第j行两行元素进行旋转行变换。然后通过扩展收集操作将相应的旋转参数及第i列和第j列的列号按处理器编号连接起来并广播给所有处理器。各处理器在收到这些旋转参数和列号之后,按0,1,…,p-1的顺序依次读取旋转参数及列号并对其m行中的第i列和第j列元素进行旋转列变换。 经过一轮计算的2p-1次数据交换之后,原矩阵A的所有非主对角元素都被消去一次。此时,各处理器求其局部存储器中的非主对角元素的最大元localmax,然后通过归约操作的求最大值运算求得将整个n阶矩阵非主对角元素的最大元max,并广播给所有处理器以决定是否进行下一轮迭代。具体算法框架描述如下: 算法21.5 雅可比法求对称矩阵特征值的并行算法 输入:矩阵An×n,ε 输出:矩阵A的主对角元素即为A的特征值 Begin 对所有处理器my_rank(my_rank=0,…, p-1)同时执行如下的算法: (a)for i=0 to m-1 do b[i] =myrank*m+i /* b记录处理器中的行块数据在原矩阵A中的实际行号*/ end for (b)while (│max│>ε) do /* max 为A中所有非对角元最大的绝对值*/ (1)for i=my_rank*m to (my_rank+1)*m-2 do /*对本处理器内部所有行两两配对进行旋转变换*/ for j=i+1 to (my_rank+1)*m-1 do (1.1)r=i mod m , t=j mod m /*i, j 为进行旋转变换行(称为主行)的实际行号, r, t为它们在块内的相对行号*/ (1.2)if (a[r,j] ≠ 0) then /*对四个主元素的旋转变换*/ (i)Compute: f=-a[r,j], g=( a[t,j]- a[r,i])/2, h=sgn(g)*f/sqrt(f*f+g*g) , sin2=h , sin1=h/sqrt(2*(1+sqrt(1-h*h))) , cos1=sqrt(1-sin1*sin1), bpp= a[r,i]*cos1*cos1+a[t,j]*sin1*sin1+a[r,j]*sin2, bqq= a[r,i]* sin1*sin1+a[t,j]* cos1*cos1-a[r,j]*sin2, bpq=0 , bqp=0 (ii)for v=0 to n-1 do /*对两个主行其余元素的旋转变换*/ if ((v ≠ i) and ( v ≠ j)) then br[v] = a[r,v]*cos1 + a[t,v]*sin1 a[t,v] = -a[r,v]* sin1 + a[t,v]* cos1 end if end for (iii)for v=0 to n-1 do if ((v ≠ i) and ( v ≠ j)) then a[r,v]=br[v] end if end for (iv)for v=0 to m-1 do /*对两个主列在本处理器内的其余元素的旋转变换*/ if (( v ≠ r) and ( v ≠ t)) then bi[v] = a[v,i]*cos1 + a[v,j]*sin1 a[v,j]= - a[v,i]* sin1 + a[v,j]* cos1 end if end for (v)for v=0 to m-1do if ((v ≠ r) and ( v ≠ t)) then a[v,i]= bi[v] end if end for (vi)Compute: a[r,i]=bpp , a[t,j]=bqq , a[r,j]=bpq , a[t,i]=bqp, /*用temp1保存本处理器主行的行号和旋转参数*/ temp1[0]=sin1, temp1[1]=cos1, temp1[2]=(float)i ,temp1[3]= (float)j else (vii)Compute: temp1[0]=0, temp1[1]= 0, temp1[2]= 0 , temp1[3]= 0 end if (1.3)将所有处理器temp1中的旋转参数及主行的行号 按处理器编号连接起来并广播给所有处理器,存于temp2中 (1.4)current=0 (1.5) for v=1 to p do /*根据temp2中的其它处理器的旋转参数及主行的行号对相关的列在本处理器的部分进行旋转变换*/ (i)Compute: s1=temp2[(v-1)*4+0] , c1=temp2[(v-1)*4+1], i1=(int)temp2[(v-1)*4+2], j1=(int)temp2[(v-1)*4+3] (ii )if (s1、c1、 i1、 j1中有一不为0) then if (my-rank ≠ current) then for z=0 to m-1 do zi[z]=a[z,i1]*c1 + a[z,j1]*s1 a[ z,j1]=- a[z,i1]*s1 + a[z,j1]*c1 end for for z=0 to m-1 do a[z,i1]= zi[z] end for end if end if (iii)current=current+1 end for end for end for (2)for counter=1 to 2p-2 do /*进行2p-2次处理器间的数据交换, 并对交换后处理器中所有行两两配对进行旋转变换*/ (2.1)Data_exchange( ) /*处理器间的数据交换*/ (2.2)for i=0 to m/2-1 do for j=m/2 to m-1 do (i) if (a[i,b[j]] ≠ 0) then /*对四个主元素的旋转变换*/ ①Compute: f= -a[i,b[j]],g=(a[j,b[j]]- a[i,b[i]])/2, h=sgn(g)*f/sqrt(f*f+g*g), sin2=h, sin1=h/sqrt(2*(1+sqrt(1-h*h))), cos1=sqrt(1-sin1*sin1), bpp= a[i,b[i]]*cos1*cos1+ a[j,b[j]]*sin1*sin1+a[i,b[j]]*sin2, bqq= a[i,b[i]]* sin1*sin1+a[j,b[j]]* cos1*cos1-a[i,b[j]]*sin2, bpq=0, bqp=0 ②for v=0 to n-1 do /*对两个主行其余元素的旋转变换*/ if ((v ≠ b[i]) and ( v ≠ b[j])) then br[v] = a[i,v]*cos1 + a[j,v]*sin1 a[j,v] = -a[i,v]* sin1 + a[j,v]* cos1 end if end for ③for v=0 to n-1 do if ((v ≠ b[i]) and ( v ≠ b[j])) then a[i,v]=br[v] end if end for ④for v=0 to m-1 do /*对本处理器内两个主列的其余元素旋转变换*/ if ((v ≠ i) and ( v ≠ j)) then bi[v] = a[v, b[i]]*cos1 + a[v, b[j]]*sin1 a[v, b[j]] = - a[v, b[i]]* sin1 + a[v, b[j]]* cos1 end if end for ⑤for v=0 to m-1 do if ((v ≠ i) and ( v ≠ j)) then a[v, b[i]]=bi[v] end if end for ⑥Compute: a[i, b[i]]=bpp , a[j, b[j]]=bqq , a[i, b[j]]=bpq , a[j, b[i]]=bqp /*用temp1保存本处理器主行的行号和旋转参数*/ temp1[0]=sin1 , temp1[1]=cos1, temp1[2]=(float)b[i] , temp1[3]= (float)b[j] else ⑦Compute: temp1[0]=0,temp1[1]= 0, temp1[2]= 0,temp1[3]= 0 end if (ii)将所有处理器temp1中的旋转参数及主行的行号按处理 器编号连接起来并广播给所有处理器,存于temp2中 (iii)current=0 (iv)for v=1 to p do /*根据temp2中的其它处理器的旋转参数及主行的行号对相关的列在本处理器的部分进行旋转变换*/ ①Compute: s1=temp2[(v-1)*4+0], c1=temp2[(v-1)*4+1], i1=(int)temp2[(v-1)*4+2], j1=(int)temp2[(v-1)*4+3] ②if (s1、c1、 i1、 j1中有一不为0) then if (my-rank ≠ current) then for z=0 to m-1 do zi[z]=a[z,i1]*c1 + a[z,j1]*s1 a[ z,j1]=- a[z,i1]s1 + a[z,j1]*c1 end for for z=0 to m-1 do a[z,i1]= zi[z] end for end if end if ③current=current+1 end for end for end for end for (3)Data-exchange( ) /*进行一轮中的最后一次处理器间的数据交换,使数据回到原来的位置*/ (4)localmax=max /*localmax为本处理器中非对角元最大的绝对值*/ (5)for i=0 to m-1 do for j=0 to n-1 do if ((m*my-rank+i) ≠ j) then if (│a[i,j]│> localmax) then localmax=│a[i,j]│ end if end if end for end for (6)通过Allreduce操作求出所有处理器中localmax的最大值为新的max值 end while End 在上述算法中, 每个处理器在一轮迭代中要处理2 p个行块对, 由于每一个行块对含有m/2行, 因而对每一个行块对的处理要有(m/2)2=m2/4 个行的配对, 即消去m2/4 个非主对角元素. 每消去一个非主对角元素要对同行n 个元素和同列n 个元素进行旋转变换. 由于按行划分数据块, 对同行n 个元素进行旋转变换的过程是在本处理器中进行的. 而对同列n 个元素进行旋转变换的过程则分布在所有处理器中进行. 但由于所有处理器同时在为其它处理器的元素消去过程进行列的旋转变换,对每个处理器而言,对列元素进行旋转变换的处理总量仍然是n 个元素。 因此,一个处理器每消去一个非主对角元素共要对2n 个元素进行旋转变换。而对一个元素进行旋转变换需要2次乘法和1次加法,若取一次乘法运算时间或一次加法运算时间为一个单位时间,则其需要3个单位运算时间,即一轮迭代的计算时间为T1=3×2 p ×2n ×m2/4=3n3/p;在每轮迭代中,各个处理器之间以点对点的通信方式相互错开交换数据2p-1次,通信量为mn+m,扩展收集操作n(n-1)/(2p)次,通信量为4,另外有归约操作1次,通信量为1,从而不难得出上述算法求解过程中的总通信时间为: 因此雅可比算法求对矩阵特征值的一轮并行计算时间为。我们的大量实验结果说明,对于n阶方阵,用上述算法进行并行计算,一般需要不超过O(logn)轮就可以收敛。 MPI源程序请参见章末附录。 求对称矩阵特征值的单侧旋转法 单侧旋转法的算法描述 求解对称方阵特征值及特征向量的雅可比算法在每次消去计算前都要对非主对角元素选择最大元,导致非实际计算开销很大。在消去计算时,必须对方阵同时进行行、列旋转变换,这称之为双侧旋转(Two-side Rotation)变换。在双侧旋转变换中,方阵的行、列方向都有数据相关关系,使得整个计算中的相关关系复杂,特别是在对高阶矩阵进行特征值求解时,增加了处理器间的通信开销。针对传统雅可比算法的缺点,可以将双侧旋转改为单侧旋转,得出一种求对称矩阵特征值的快速算法。这一算法对矩阵仅实施列变换,使得数据相关关系仅在同列之间,因此方便数据划分,适合并行计算,称为单侧旋转法(One-side Rotation)。若A为一对称方阵, λ是A的特征值,x是A的特征向量,则有:Ax=λx。又A=AT,所以ATAx=Aλx=λλx,这说明了 λ2是ATA的特征值,x是ATA的特征向量 ,即ATA的特征值是A的特征值的平方,并且它们的特征向量相同。 我们使用18.7.1节中所介绍的Givens旋转变换对A进行一系列的列变换,得到方阵Q使其各列两两正交,即AV=Q,这里V为表示正交化过程的变换方阵。 因QTQ=(AV) TAV=V TATAV = diag(δ1,δ2, …,δn) 为n阶对角方阵,可见这里δ1,δ2, …,δn 是矩阵ATA的特征值,V的各列是ATA的特征向量。由此可得: (i=1,2, …,n)。设Q的第i列为qi, 则 qiTqi=δi , ||qi||2= 。因此在将A进行列变换得到各列两两正交的方阵Q后,其各列的谱范数||qi||2即为A的特征值的绝对值。记V的第i列为vi, AV=Q,所以Avi = qi。又因为vi 为A的特征向量,所以Avi =λivi,即推得λivi= qi。因此λi的符号可由向量 qi及vi的对应分量是否同号判别,实际上在具体算法中我们只要判断它们的第一个分量是否同号即可。若相同,则λi 取正值,否则取负值。 求对称矩阵特征值的单侧旋转法的串行算法如下: 算法21.6 求对称矩阵特征值的单侧旋转法 输入:对称矩阵A,精度e 输出:矩阵A的特征值存于向量B中 Begin (1)while (p>ε) p=0 for i=1 to n do for j=i+1 to n do (1.1 )sum[0]=0, sum[1]=0, sum[2]=0 (1.2 )for k=1 to n do sum[0]= sum[0]+a[k,i]*a[k, j] sum[1]= sum[1]+ a[k,i]* a[k,i] sum[2]= sum[2]+ a[k, j]* a[k, j] end for (1.3 )if (│sum[0]│>ε) then (i) aa=2*sum[0] bb=sum[1]-sum[2] rr=sqrt(aa*aa+bb*bb) (ii) if (bb≥0) then c=sqrt((bb+rr)/(2*rr)) s=aa/(2*rr*c) else s=sqrt((rr-bb)/(2*rr)) c=aa/(2*rr*s) end if (iii) for k=1 to n do temp[k]=c*a[k,i]+s*a[k,j] a[k,j]=[-s]*a[k,i]+c*a[k,j] end for (iv) for k=1 to n do temp1[k]= c*e[k,i]+s*e[k,j] e[k,j]=[-s]*e[k,i]+c*e[k,j] end for (v) for k=1 to n do a[k,i]= temp[k] end for (vi) for k=1 to n do e[k,i]= temp1[k] end for (vii) if (│sum[0]│>p) then p=│sum[0] │ end if end if end for end for end while (2)for i=1 to n do su=0 for j=1 to n do su=su+a[j,i]* a[j,i] end for B[j]=sqrt[su] if (e[0,j]*a[0,j]<0) then B[j]= - B[j] endif end for End 上述算法的一轮迭代需进行n(n-1)/2次旋转变换,若取一次乘法或一次加法运算时间为一个单位时间,则一次旋转变换要做3次内积运算而消耗6n个单位时间;与此同时,两列元素进行正交计算还需要12n个单位时间,所以奇异值分解的一轮计算时间复杂度为n(n-1)/2*(12 n +6n)= 9n(n-1) n=O(n3)。 求对称矩阵特征值的单侧旋转法的并行计算 在求对称矩阵特征值的单侧旋转法的计算中, 主要的计算是矩阵的各列正交化过程.为了进行并行计算, 我们对n阶对称矩阵A按行划分为p块(p为处理器数),每块含有连续的q行向量,这里q=n/p,第i块包含A的第i×q, …,(i+1)×q-1行向量,其数据元素被分配到第i号处理器上(i=0,1,…,p-1)。E矩阵取阶数为n的单位矩阵I,按同样方式进行数据划分,每块含有连续的q行向量。对矩阵A的每一个列向量而言,它被分成p个长度为q的子向量,分布于p个处理器之中,它们协同地对各列向量做正交计算。在对第i列与第j列进行正交计算时,各个处理器首先求其局部存储器中的q维子向量[i,j]、[i,i]、[j,j]的内积,然后通过归约操作的求和运算求得整个n维向量对[i,j]、[i,i]、[j,j]的内积并广播给所有处理器,最后各处理器利用这些内积对其局部存储器中的第i列及第j列q维子向量的元素做正交计算。算法21.7按这样的方式对所有列对正交化一次以完成一轮运算,重复进行若干轮运算,直到迭代收敛为止。在各列正交后, 编号为0的处理器收集各处理器中的计算结果,由0号处理器计算矩阵的特征值. 具体算法框架描述如下: 算法21.7 求对称矩阵特征值的并行单侧旋转算法 输入:对称矩阵A,精度e 输出:对称矩阵A的特征值存于向量B中 Begin 对所有处理器my_rank(my_rank=0,…, p-1)同时执行如下的算法: (a) while (not convergence) do (1)k=0 (2)for i=0 to n-1 do (2.1)for j=i+1 to n-1 do (i)sum[0]=0, sum[1]=0, sum[2]=0 (ii)计算本处理器所存的列子向量a[*, i]、a[*, j]的内积赋值给 ss[0] (iii)计算本处理器所存的列子向量a[*, i]、a[*, i]的内积赋值给 ss[1] (iv)计算本处理器所存的列子向量a[*, j]、a[*, j]的内积赋值给 ss[2] (v)通过规约操作实现: ①计算所有处理器中ss[0]的和赋给sum [0] ②计算所有处理器中ss[1]的和赋给 sum [1] ③计算所有处理器中ss[2]的和赋给sum[2] ④将sum [0]、sum [1]、sum [2] 的值广播到所有处理器中 (vi)if (│sum(0)│>e ) then /*各个处理器并行进行对i和j列正交化*/ ①aa=2*sum[0] ②bb=sum[1]-sum[2] ③rr=sqrt(aa*aa+bb*bb) ④if (bb>=0) then c=sqrt((bb+rr)/(2*rr)) s=aa/(2*rr*c) else s=sqrt((rr-bb)/(2*rr)) c=aa/(2*rr*s) end if ⑤for v=0 to q-1 do temp[v]=c*a[v, i]+s*a[v, j] a[v, j]=(-s)*a[v, i]+c*a[v, j] end for ⑥for v=0 to z-1 do temp1[v]=c*e[v, i]+s*e[v, j] e[v,j]=(-s)*e[v, i]+c*e[v, j] end for ⑦for v=0 to q-1 do a[v,i]=temp[v] end for ⑧for v=0 to z-1 do e[v,i]=temp1[v] end for ⑨k= k+1 end if end for end for end while 0号处理器从其余各处理器中接收子矩阵a和e,得到经过变换的矩阵A和I: /*0号处理器负责计算特征值*/ (b) for i=1 to n do (1)su=0 (2)for j=1 to n do su=su+A[j,i]* A[j,i] end for (3)B[j]=sqrt(su) (4)if (I[0,j]*a[0,j]<0) then B[j]= - B[j] endif end for End 上述算法的一轮迭代要进行n(n-1)/2次旋转变换, 而一次旋转变换需要做3次内积运算,若取一次乘法或一次加法运算时间为一个单位时间,则需要6q个单位时间,另外,还要对四列子向量中元素进行正交计算花费12q个单位时间,所以一轮迭代需要的计算时间T1=n(n-1)6q;内积计算需要通信,一轮迭代共做归约操作n(n-1)/2次,每次通信量为3,因而通信时间T2=。由此得出一轮迭代的并行计算时间Tp=T1+T2。 MPI源程序请参见所附光盘。 求一般矩阵全部特征值的QR方法 QR方法求一般矩阵全部特征值的串行算法 在18.6节中,我们介绍了对一个n阶方阵A进行QR分解的并行算法,它可将A分解成A=QR的形式,其中R是上三角矩阵,Q是正交矩阵,即满足QTQ=I。利用QR方法也可求方阵特征值,它的基本思想是: 首先令A1= A,并对A1进行QR分解,即: A1= Q1 R1;然后对A1作如下相似变换:A2 = Q1 -1A1 Q1= Q1 -1 Q1 R1 Q1= R1Q1,即将R1Q1相乘得到A2;再对A2进行QR分解得A2 = Q2 R2,然后将R2Q2相乘得A3。反复进行这一过程,即得到矩阵序列:A1, A2, …, Am, Am+1 ,…,它们满足如下递推关系:Ai= Qi Ri ;Ai+1= Ri Qi (i=1,2, …, m,…)其中Qi均为正交阵,Ri均为上三角方阵。这样得到的矩阵序列或者将收敛于一个以A的特征值为对角线元素的上三角矩阵,形如:  或者将收敛于一个特征值容易计算的块上三角矩阵。 算法21.8 单处理器上QR方法求一般矩阵全部特征值的算法 输入:矩阵An×n,单位矩阵Q,ε 输出:矩阵特征值Eigenvalue Begin (1) while ((p>ε) and (count<1000)) do (1.1)p=0 , count = count +1 (1.2)QR_DECOMPOSITION(A,Q,R) /*算法18.11对矩阵A进行QR分解*/ (1.3)MATRIX_TRANSPOSITION(Q) /*由于算法18.11 对A进行QR分解只得到Q-1,因而要使用算法18.1对矩阵Q-1进行转置,得到(Q-1)T=( Q-1)-1=Q*/ (1.4)A=MATRIX_PRODUCT(R,Q) /*算法18.5将矩阵RQ相乘,得到新的A 矩阵 */ (1.5)for i=1 to n do for j=1 to i-1do if (│a[i,j]│> p) then p=│a[i,j]│ end if end for end for end while (2) for i=1 to n do Eigenvalue[i]=a[i,i] end for End 显然,QR分解求矩阵特征值算法的一轮时间复杂度为QR分解、矩阵转置和矩阵相乘算法的时间复杂度之和。 QR方法求一般矩阵全部特征值的并行算法 并行QR分解求矩阵特征值的思想就是反复运用并行QR分解和并行矩阵相乘算法进行迭代,直到矩阵序列收敛于一个上三角矩阵或块上三角矩阵为止。具体的并行算法描述如下: 算法21.9 QR方法求一般矩阵全部特征值的并行算法 输入:矩阵An×n,单位矩阵Q,ε 输出:矩阵特征值Eigenvalue Begin 对所有处理器my_rank(my_rank=0,…, p-1)同时执行如下的算法: (a) while ((p>ε) and (count<1000)) do (1)p=0 , count = count +1 /* 对矩阵A进行QR分解*/ (2)if(my_rank=0) then/*0号处理器*/ (2.1)for j=0 to m-2 do (i)for i=j+1 to m-1 do Turnningtransform( ) /*旋转变换*/ end for (ii)将旋转变换后的A和Q的第j 行传送到第1号处理器 end for (2.2)将旋转变换后的A和Q的第m-1行传送到第1号处理器 end if (3)if ((my_rank>0) and (my_rank<(p-1))) then /*中间处理器*/ (3.1)for j=0 to my_rank*m-1 do (i)接收左邻处理器传送来的A和Q子块中的第j 行 (ii)for i=0 to m-1 do Turnningtransform( ) /*旋转变换*/ end for (iii)将旋转变换后的A和Q子块中的第j 行传送到右邻处理器 end for (3.2)for j=0 to m-2 do (i)z=my_rank*m (ii)for i=j+1 to m-1 do Turnningtransform( ) /*旋转变换*/ end for (iii)将旋转变换后的A和Q子块中的第j 行传送到右邻处理器 end for (3.3)将旋转变换后的A和Q子块中的第m-1行传送到右邻处理器 end if (4)if (my_rank= (p-1)) then /*p-1号处理器*/ (4.1)for j=0 to my_rank*m-1 do (i)接收左邻处理器传送来的A和Q子块中的第j 行 (ii)for i=0 to m-1 do Turnningtransform( ) /*旋转变换*/ end for end for (4.2)for j=0 to m-2 do for i=j+1 to m-1 do f Turnningtransform( ) /*旋转变换*/ end for end for end if (5)p-1号处理器对矩阵Q进行转置 (6)p-1号处理器分别将矩阵R和矩阵Q划分为大小为m*M和M*m的p 块子阵,依次发送给0至p-2号处理器 (7)使用算法18.6对矩阵RQ进行并行相乘得到新的A矩阵 (8)for i=1 to n do /* 求出A中元素的最大绝对值赋予p*/ for j=1 to i-1do if (│a[i,j]│> p) then p=│a[i,j]│ end if end for end for end while (b)for i=1 to n do Eigenvalue[i] =a[i,i] end for End 在上述算法的一轮迭代中,实际上是使用算法18.12、18.1、18.6并行地进行矩阵的QR分解﹑矩阵的转置﹑矩阵的相乘各一次,因此, 一轮迭代的计算时间为上述算法的时间复杂度之和。 MPI源程序请参见所附光盘。 小结 矩阵的特征值与特征向量在工程技术上应用十分广泛,在诸如系统稳定性问题和数字信号处理的K-L变换中,它都是最基本、常用的算法之一。本章主要讨论了矩阵特征值的几种基本算法,关于该方面的其它讲解与分析读者可参考文献[1]、[2]。 参考文献 陈国良 编著.并行算法的设计与分析(修订版).高等教育出版社,2002.11 陈国良,陈崚 编著.VLSI计算理论与并行算法.中国科学技术大学出版社,1991 附录 求对称矩阵特征值的雅可比并行算法MPI源程序 源程序cjacobi.c #include "stdio.h" #include "stdlib.h" #include "mpi.h" #include "math.h" #include "string.h" #define E 0.00001 #define min -1 #define intsize sizeof(int) #define charsize sizeof(char) #define floatsize sizeof(float) /*A是一个N*N的对称矩阵*/ #define A(x,y) A[x*N+y] /*I是一个N*N的单位矩阵*/ #define I(x,y) I[x*N+y] #define a(x,y) a[x*N+y] #define e(x,y) e[x*N+y] #define b(x) b[x] #define buffer(x,y) buffer[x*N+y] #define buffee(x,y) buffee[x*N+y] int M,N; int *b; int m,p; int myid,group_size; float *A,*I; float *a,*e; float max; float sum; MPI_Status status; float sgn(float v) { float f; if (v>=0) f=1; else f=-1; return f; } void read_fileA() { int i,j; FILE *fdA; fdA=fopen("dataIn.txt","r"); fscanf(fdA,"%d %d", &M, &N); if(M != N) { puts("The input is error!"); exit(0); } m=N/p; if (N%p!=0) m++; A=(float*)malloc(floatsize*N*m*p); I=(float*)malloc(floatsize*N*m*p); for(i = 0; i < M; i ++) for(j = 0; j < M; j ++) fscanf(fdA, "%f", A+i*M+j); fclose(fdA); printf("Input of file \"dataIn.txt\"\n"); printf("%d\t %d\n",M, N); for(i=0;i<M;i++) { for(j=0;j<N;j++) printf("%f\t",A(i,j)); printf("\n"); } for(i=0;i<N;i++) { for(j=0;j<N;j++) if (i==j) I(i,j)=1; else I(i,j)=0; } } void send_a() { int i; for(i=1;i<p;i++) { MPI_Send(&(A(m*i,0)),m*N, MPI _FLOAT,i,i,MPI_COMM_WORLD); MPI_Send(&(I(m*i,0)),m*N,MPI_ FLOAT, i,i,MPI_COMM_WORLD); } free(A); free(I); } void get_a() { int i,j; if (myid==0) { for(i=0;i<m;i++) for(j=0;j<N;j++) { a(i,j)=A(i,j); e(i,j)=I(i,j); } } else { MPI_Recv(a,m*N,MPI_FLOAT,0,myid, MPI_COMM_WORLD,&status); MPI_Recv(e,m*N,MPI_FLOAT,0,myid, MPI_COMM_WORLD,&status); } } int main(int argc,char **argv) { float *c; int k; int loop; int i,j,v,z,r,t,y; int i1,j1; float f,g,h; float sin1,sin2,cos1; float s1,c1; float *br,*bt,*bi,*bj,*zi,*zj; float *temp1,*temp2,*buffer,*buffee; int counter,current; int *buf; int mml,mpl; float bpp,bqq,bpq,bqp; float lmax; MPI_Init(&argc,&argv); MPI_Comm_size(MPI_COMM_WORLD, &group_size); MPI_Comm_rank(MPI_COMM_WORLD, &myid); p=group_size; max=10.0; loop=0; if (myid==0) read_fileA(); /*0号处理器将N广播给所有处理器*/ MPI_Bcast(&N,1,MPI_INT,0, MPI_COMM_WORLD); m=N/p; if (N%p!=0) m++; a=(float*)malloc(floatsize*m*N); e=(float*)malloc(floatsize*m*N); b=(int*)malloc(intsize*m); br=(float*)malloc(floatsize*N); bt=(float*)malloc(floatsize*N); bi=(float*)malloc(floatsize*m); bj=(float*)malloc(floatsize*m); zi=(float*)malloc(floatsize*m); zj=(float*)malloc(floatsize*m); temp1=(float*)malloc(floatsize*4); temp2=(float*)malloc(floatsize*4*p); if ((myid==p-1)&&(myid%2!=0)) { buffer=(float*)malloc(floatsize*m/2*N); buffee=(float*)malloc(floatsize*m/2*N); buf=(int*)malloc(intsize*m/2); } if ((myid%2!=0)&&(myid!=p-1)) { buffer=(float*)malloc(floatsize*m*N); buffee=(float*)malloc(floatsize*m*N); buf=(int*)malloc(intsize*m); } /*0号处理器采用行块划分将矩阵A和I划分 为m*N的p块子矩阵,依次发送给1至p-1号处理器*/ if (myid==0) { get_a(); send_a(); } else get_a(); for(i=0;i<m;i++) b(i)=myid*m+i; while (fabs(max)>E) { loop++; /*每轮计算开始,各处理器对其局部存 储器中离主对角线最近的行块元素进行消去*/ for(i=myid*m;i<(myid+1)*m-1;i++) for(j=i+1;j<=(myid+1)*m-1;j++) { r=i%m; t=j%m; /*消去a(r,j)元素*/ if(a(r,j)!=0) { f=(-a(r,j)); g=(a(t,j)-a(r,i))/2; h=sgn(g)*f/ sqrt(f*f+g*g); sin2=h; sin1=h/sqrt(2*(1+ sqrt(1-h*h))); cos1=sqrt(1-sin1*sin1); bpp=a(r,i)*cos1*cos1 +a(t,j)*sin1*sin1 +a(r,j)*sin2; bqq=a(r,i)*sin1*sin1 +a(t,j)*cos1*cos1 -a(r,j)*sin2; bpq=0; bqp=0; /*对子块a的第t,r两 行元素进行行变 换*/ for(v=0;v<N;v++) if ((v!=i)&&(v!=j)) { br[v]=a(r,v)*cos1 +a(t,v)*sin1; a(t,v)=-a(r,v) *sin1 +a(t,v)*cos1; } for(v=0;v<N;v++) if ((v!=i)&&(v!=j)) a(r,v)=br[v]; /*对子块e的第i, j两列 元素进行列变换*/ for(v=0;v<m;v++) br[v]=e(v,i)*cos1 +e(v,j)*sin1; for(v=0;v<m;v++) e(v,j)=e(v,i)*(-sin1) +e(v,j)*cos1; for(v=0;v<m;v++) e(v,i)=br[v]; /*对子块a的第i,j两 列元素进行列变 换*/ for(v=0;v<m;v++) if ((v!=r)&&(v!=t)) { bi[v]=a(v,i)*cos1 +a(v,j)*sin1; a(v,j)=-a(v,i) *sin1 +a(v,j)*cos1; } for(v=0;v<m;v++) if ((v!=r)&&(v!=t)) a(v,i)=bi[v]; a(r,i)=bpp; a(t,j)=bqq; a(r,j)=bpq; a(t,i)=bqp; temp1[0]=sin1; temp1[1]=cos1; temp1[2]=(float)i; temp1[3]=(float)j; } else { temp1[0]=0.0; temp1[1]=0.0; temp1[2]=0.0; temp1[3]=0.0; } /*各处理器通过扩展收集操 作将自己的旋转参数及第i列和第j列的列号按处理器编号连接起来并广播给所有处理器*/ MPI_Allgather(temp1,4, MPI_FLOAT,temp2,4, MPI_FLOAT, MPI_COMM_WORLD); current=0; /*各处理器在收到这些旋转 参数和列号之后,按0,1,…,p-1的顺序依次读取旋转参数及列号并对其m行中的第i列和第j列元素进行旋转列变换*/ for(v=1;v<=p;v++) { s1=temp2[(v-1)*4+0]; c1=temp2[(v-1)*4+1]; i1=temp2[(v-1)*4+2]; j1=temp2[(v-1)*4+3]; if ((s1!=0.0)||(c1!=0.0) ||(i1!=0)||(j1!=0)) { if (myid!=current) { for(z=0;z<m;z++) { zi[z]=a(z,i1) *c1+a(z,j1) *s1; a(z,j1)= s1* (-a(z,i1)) +a(z,j1) *c1; } for(z=0;z<m;z++) a(z,i1)=zi[z]; for(z=0;z<m;z++) zi[z]=e(z,i1) *c1+e(z,j1) *s1; for(z=0;z<m;z++) e(z,j1)= c1* e(z,j1)-e(z,i1)*s1; for(z=0;z<m;z++) e(z,i1)=zi[z]; } } current=current+1; } /* for v */ } /* for i,j */ /*前2p-2次数据交换*/ for(counter=1;counter<=2*p-2;counter++) { if (myid==0) { MPI_Send(&(a(m/2,0)), m/2*N, MPI_FLOAT,myid+1, myid+1, MPI_COMM_WORLD); MPI_Send(&(e(m/2,0)),m/2*N, MPI_FLOAT,myid+1,myid+1, MPI_COMM_WORLD); MPI_Send(&b(m/2),m/2,MPI_INT, myid+1,myid+1, MPI_COMM_WORLD); MPI_Recv(&(a(m/2,0)),m/2*N, MPI_FLOAT,myid+1,myid, MPI_COMM_WORLD, &status); MPI_Recv(&(e(m/2,0)),m/2*N, MPI_FLOAT,myid+1,myid, MPI_COMM_WORLD, &status); MPI_Recv(&b(m/2),m/2, MPI_ INT,myid+1,myid, MPI_COMM_WORLD, &status); } if ((myid==p-1)&&(myid%2!=0)) { for(i=m/2;i<m;i++) for(j=0;j<N;j++) buffer((i-m/2),j)=a(i,j); for(i=m/2;i<m;i++) for(j=0;j<N;j++) buffee((i-m/2),j)=e(i,j); for(i=m/2;i<m;i++) buf[i-m/2]=b(i); for(i=0;i<m/2;i++) for(j=0;j<N;j++) a((i+m/2),j)=a(i,j); for(i=0;i<m/2;i++) for(j=0;j<N;j++) e((i+m/2),j)=e(i,j); for(i=0;i<m/2;i++) b(m/2+i)=b(i); MPI_Recv(&(a(0,0)),m/2*N, MPI_FLOAT,myid-1, myid, MPI_COMM_WORLD, &status); MPI_Recv(&(e(0,0)),m/2*N, MPI_FLOAT,myid-1,myid, MPI_COMM_WORLD, &status); MPI_Recv(&b(0),m/2, MPI_INT, myid-1,myid, MPI_COMM_WORLD, &status); MPI_Send(buffer,m/2*N,) MPI_FLOAT,myid-1,myid-1, MPI_COMM_WORLD); MPI_Send(buffee,m/2*N, MPI_FLOAT,myid-1,myid-1, MPI_COMM_WORLD); MPI_Send(buf,m/2,MPI_INT, myid-1, myid-1, MPI_COMM_WORLD); } if ((myid==p-1)&&(myid%2==0)) { MPI_Send(&(a(m/2,0)),m/2*N, MPI_FLOAT,myid-1,myid-1, MPI_COMM_WORLD); MPI_Send(&(e(m/2,0)),m/2*N, MPI_FLOAT,myid-1,myid-1, MPI_COMM_WORLD); MPI_Send(&b(m/2),m/2, MPI_INT,myid-1,myid-1, MPI_COMM_WORLD); for(i=0;i<m/2;i++) for(j=0;j<N;j++) a((i+m/2),j)=a(i,j); for(i=0;i<m/2;i++) for(j=0;j<N;j++) e((i+m/2),j)=e(i,j); for(i=0;i<m/2;i++) b(i+m/2)=b(i); MPI_Recv(&(a(0,0)),m/2*N, MPI_FLOAT,myid-1,myid, MPI_COMM_WORLD, &status); MPI_Recv(&(e(0,0)),m/2*N, MPI_FLOAT,myid-1,myid, MPI_COMM_WORLD, &status); MPI_Recv(&b(0),m/2, MPI_INT,myid-1,myid, MPI_COMM_WORLD, &status); } if ((myid!=0)&&(myid!=p-1)) { if(myid%2==0) { MPI_Send(&(a(0,0)),m/2*N, MPI_FLOAT,myid+1, myid+1, MPI_COMM_WORLD); MPI_Send(&(e(0,0)),m/2*N, MPI_FLOAT,myid+1, myid+1, MPI_COMM_WORLD); MPI_Send(&b(0),m/2, MPI_INT,myid+1, myid+1, MPI_COMM_WORLD); MPI_Send(&(a(m/2,0)),m/2* N, MPI_FLOAT,myid-1, myid-1, MPI_COMM_WORLD); MPI_Send(&(e(m/2,0)),m/2* N, MPI_FLOAT,myid-1, myid-1, MPI_COMM_ ORLD); MPI_Send(&b(m/2),m/2, MPI_INT,myid-1, myid-1, MPI_COMM_WORLD); MPI_Recv(&(a(0,0)),m/2*N, MPI_FLOAT,myid-1, myid, MPI_COMM_WORLD, &status); MPI_Recv(&(e(0,0)),m/2*N, MPI_FLOAT,myid-1, myid, MPI_COMM_WORLD, &status); MPI_Recv(&b(0),m/2, MPI_INT,myid-1,myid,MPI_COMM_WORLD,&status); MPI_Recv(&(a(m/2,0)),m/2* N, MPI_FLOAT,myid+1, myid, MPI_COMM_ WORLD,&status); MPI_Recv(&(e(m/2,0)),m/2* N, MPI_FLOAT,myid+1 ,myid, MPI_COMM_ WORLD, &status); MPI_Recv(&b(m/2),m/2, MPI_INT,myid+1,myid, MPI_COMM_WORLD, &status); } if(myid%2!=0) { for(i=0;i<m;i++) for(j=0;j<N;j++) buffer(i,j)=a(i,j); for(i=0;i<m;i++) for(j=0;j<N;j++) buffee(i,j)=e(i,j); for(i=0;i<m;i++) buf[i]=b(i); MPI_Recv(&(a(0,0)),m/2*N, MPI_FLOAT,myid-1, myid, MPI_COMM_WORLD, &status); MPI_Recv(&(e(0,0)),m/2*N, MPI_FLOAT,myid-1, myid, MPI_COMM_WORLD,&status); MPI_Recv(&b(0),m/2, MPI_INT,myid-1,myid, MPI_COMM_WORLD,&status); MPI_Recv(&(a(m/2,0)),m/2* N, MPI_FLOAT,myid+1, myid, MPI_COMM_WORLD,&status); MPI_Recv(&(e(m/2,0)),m/2* N, MPI_FLOAT,myid+1, myid, MPI_COMM_WORLD, &status); MPI_Recv(&b(m/2),m/2, MPI_INT,myid+1,myid, MPI_COMM_WORLD, &status); MPI_Send(&(buffer(0,0)),m/2 *N,MPI_FLOAT, myid+1,myid+1, MPI_COMM_WORLD); MPI_Send(&(buffee(0,0)),m/2 *N,MPI_FLOAT,myid+1,myid+1, MPI_COMM_WORLD); MPI_Send(&buf[0],m/2,MPI_ INT,myid+1,myid+1, MPI_COMM_WORLD); MPI_Send(&(buffer(m/2,0)),m /2*N,MPI_FLOAT, myid-1,myid-1, MPI_COMM_WORLD); MPI_Send(&(buffee(m/2,0)), m/2*N,MPI_FLOAT, myid-1,myid-1, MPI_COMM_WORLD); MPI_Send(&buf[m/2],m/2, MPI_INT,myid-1, myid-1, MPI_COMM_WORLD); } } /*每经过一次交换,消去相应 的非主对角元素*/ for(i=0;i<m/2;i++) for(j=m/2;j<m;j++) { if (a(i,b(j))!=0) { f=-a(i,b(j)); g=(a(j,b(j))-a(i,b(i)))/2; h=sgn(g)*f/ sqrt(f*f+g*g); sin2=h; sin1=h/sqrt(2*(1+ sqrt(1-h*h))); cos1=sqrt(1-sin1*sin1); bpp=a(i,b(i))*cos1*cos1 +a(j,b(j))*sin1* sin1+a(i,b(j)) *sin2; bqq=a(i,b(i))*sin1*sin1 +a(j,b(j))*cos1* cos1-a(i,b(j)) *sin2; bpq=0; bqp=0; for(v=0;v<N;v++) if ((v!=b(i))&&(v!=b(j))) { br[v]=a(i,v)*cos1 +a(j,v)*sin1; a(j,v)=-a(i,v)*sin1 +a(j,v)*cos1; } for(v=0;v<N;v++) if ((v!=b(i))&&(v!=b(j))) a(i,v)=br[v]; for(v=0;v<m;v++) br[v]=e(v,b(i)) *cos1+ e(v,b(j)) *sin1; for(v=0;v<m;v++) e(v,b(j))=e(v,b(i)) *(-sin1)+ e(v,b(j)) *cos1; for(v=0;v<m;v++) e(v,b(i))=br[v]; for(v=0;v<m;v++) if ((v!=i)&&(v!=j)) { bi[v]=a(v,b(i)) *cos1+ a(v,b(j))* sin1; a(v,b(j))=-a(v,b(i)) *sin1+ a(v,b(j))* cos1; } for(v=0;v<m;v++) if ((v!=i)&&(v!=j)) a(v,b(i))=bi[v]; a(i,b(i))=bpp; a(j,b(j))=bqq; a(i,b(j))=bpq; a(j,b(i))=bqp; temp1[0]=sin1; temp1[1]=cos1; temp1[2]=(float)b(i); temp1[3]=(float)b(j); } else { temp1[0]=0.0; temp1[1]=0.0; temp1[2]=0.0; temp1[3]=0.0; } MPI_Allgather(temp1,4, MPI_FLOAT,temp2,4, MPI_FLOAT, MPI_COMM_WORLD); current=0; for(v=1;v<=p;v++) { s1=temp2[(v-1)*4+0]; c1=temp2[(v-1)*4+1]; i1=temp2[(v-1)*4+2]; j1=temp2[(v-1)*4+3]; if ((s1!=0.0)||(c1!=0.0) ||(i1!=0)||(j1!=0)) { if (myid!=current) { for(z=0;z<m;z++) { zi[z]=a(z,i1) *c1 + a(z, j1) *s1; a(z,j1)= c1* a(z,j1)- s1 * a(z,i1); } for(z=0;z<m;z++) a(z,i1)=zi[z]; for(z=0;z<m;z++) zi[z]=e(z,i1) *c1+ e(z,j1) *s1; for(z=0;z<m;z++) e(z,j1)= c1* e(z,j1)- s1* e(z,i1); for(z=0;z<m;z++) e(z,i1)=zi[z]; } /* if myid!=current */ } /* if */ current=current+1; } /* for v */ } /* for i,j */ } /* counter */ /*第2p-1次数据交换*/ if (myid==0) { MPI_Send(&(a(m/2,0)),m/2*N, MPI_FLOAT,myid+1,myid+1, MPI_COMM_WORLD); MPI_Send(&(e(m/2,0)),m/2*N, MPI_FLOAT,myid+1,myid+1,MPI_COMM_WORLD); MPI_Send(&b(m/2),m/2,MPI_INT, myid+1,myid+1, MPI_COMM_WORLD); MPI_Recv(&(a(m/2,0)),m/2*N, MPI_FLOAT,myid+1,myid, MPI_COMM_WORLD, &status); MPI_Recv(&(e(m/2,0)),m/2*N, MPI_FLOAT,myid+1,myid, MPI_COMM_WORLD, &status); MPI_Recv(&b(m/2),m/2,MPI_INT, myid+1,myid, MPI_COMM_WORLD, &status); } if ((myid==p-1)&&(myid%2!=0)) { for(i=m/2;i<m;i++) for(j=0;j<N;j++) buffer((i-m/2),j)=a(i,j); for(i=m/2;i<m;i++) for(j=0;j<N;j++) buffee((i-m/2),j)=e(i,j); for(i=m/2;i<m;i++) buf[i-m/2]=b(i); for(i=0;i<m/2;i++) for(j=0;j<N;j++) a((i+m/2),j)=a(i,j); for(i=0;i<m/2;i++) for(j=0;j<N;j++) e((i+m/2),j)=e(i,j); for(i=0;i<m/2;i++) b(m/2+i)=b(i); MPI_Recv(&(a(0,0)),m/2*N, MPI_FLOAT,myid-1,myid, MPI_COMM_WORLD, &status); MPI_Recv(&(e(0,0)),m/2*N, MPI_FLOAT,myid-1,myid, MPI_COMM_WORLD, &status); MPI_Recv(&b(0),m/2,MPI_INT, myid-1,myid, MPI_COMM_WORLD, &status); MPI_Send(buffer,m/2*N, MPI_FLOAT,myid-1,myid-1,MPI_COMM_WORLD); MPI_Send(buffee,m/2*N, MPI_FLOAT,myid-1,myid-1, MPI_COMM_WORLD); MPI_Send(buf,m/2,MPI_INT,myid- 1,myid-1, MPI_COMM_WORLD); } if ((myid==p-1)&&(myid%2==0)) { MPI_Send(&(a(m/2,0)),m/2*N, MPI_FLOAT,myid-1,myid-1, MPI_COMM_WORLD); MPI_Send(&(e(m/2,0)),m/2*N, MPI_FLOAT,myid-1,myid-1,MPI_COMM_WORLD); MPI_Send(&b(m/2),m/2,MPI_INT, myid-1,myid-1, MPI_COMM_WORLD); for(i=0;i<m/2;i++) for(j=0;j<N;j++) a((i+m/2),j)=a(i,j); for(i=0;i<m/2;i++) for(j=0;j<N;j++) e((i+m/2),j)=e(i,j); for(i=0;i<m/2;i++) b(i+m/2)=b(i); MPI_Recv(&(a(0,0)),m/2*N, MPI_FLOAT,myid-1,myid, MPI_COMM_WORLD, &status); MPI_Recv(&(e(0,0)),m/2*N, MPI_FLOAT,myid-1,myid, MPI_COMM_WORLD, &status); MPI_Recv(&b(0),m/2,MPI_INT, myid-1,myid, MPI_COMM_WORLD, &status); } if ((myid!=0)&&(myid!=p-1)) { if(myid%2==0) { MPI_Send(&(a(0,0)),m/2*N, MPI_FLOAT,myid+1, myid+1, MPI_COMM_WORLD); MPI_Send(&(e(0,0)),m/2*N, MPI_FLOAT,myid+1, myid+1, MPI_COMM_WORLD); MPI_Send(&b(0),m/2, MPI_INT,myid+1, myid+1, MPI_COMM_WORLD); MPI_Send(&(a(m/2,0)),m/2* N,MPI_FLOAT,myid-1,myid-1, MPI_COMM_WORLD); MPI_Send(&(e(m/2,0)),m/2* N,MPI_FLOAT,myid-1, myid-1, MPI_COMM_WORLD); MPI_Send(&b(m/2),m/2, MPI_INT,myid-1,myid-1 ,MPI_COMM_WORLD) ; MPI_Recv(&(a(0,0)),m/2*N, MPI_FLOAT,myid-1, myid, MPI_COMM_WORLD,&status); MPI_Recv(&(e(0,0)),m/2*N, MPI_FLOAT,myid-1, myid, MPI_COMM_WORLD,&status); MPI_Recv(&b(0),m/2, MPI_INT,myid-1,myid,MPI_COMM_WORLD,&status); MPI_Recv(&(a(m/2,0)),m/2* N,MPI_FLOAT,myid+1, myid, MPI_COMM_WORLD, &status); MPI_Recv(&(e(m/2,0)),m/2* N,MPI_FLOAT,myid+1,myid, MPI_COMM_WORLD, &status); MPI_Recv(&b(m/2),m/2, MPI_INT,myid+1,myid,MPI_COMM_WORLD,&status); } if(myid%2!=0) { for(i=0;i<m;i++) for(j=0;j<N;j++) buffer(i,j)=a(i,j); for(i=0;i<m;i++) for(j=0;j<N;j++) buffee(i,j)=e(i,j); for(i=0;i<m;i++) buf[i]=b(i); MPI_Recv(&(a(0,0)),m/2*N, MPI_FLOAT,myid-1, myid, MPI_COMM_WORLD,&status); MPI_Recv(&(e(0,0)),m/2*N, MPI_FLOAT,myid-1, myid, MPI_COMM_WORLD,&status); MPI_Recv(&b(0),m/2, MPI_INT,myid-1,myid, MPI_COMM_WORLD,&status); MPI_Recv(&(a(m/2,0)),m/2* N,MPI_FLOAT,myid+1,myid, MPI_COMM_WORLD, &status); MPI_Recv(&(e(m/2,0)),m/2* N,MPI_FLOAT,myid+1,myid, MPI_COMM_WORLD, &status); MPI_Recv(&b(m/2),m/2, MPI_INT,myid+1,myid,MPI_COMM_WORLD,&status); MPI_Send(&(buffer(0,0)), m/2*N,MPI_FLOAT, myid+1,myid+1, MPI_COMM_WORLD); MPI_Send(&(buffee(0,0)), m/2*N,MPI_FLOAT, myid+1,myid+1, MPI_COMM_WORLD); MPI_Send(&buf[0],m/2, MPI_INT,myid+1, myid+1, MPI_COMM_WORLD); MPI_Send(&(buffer(m/2,0)), m/2*N,MPI_FLOAT, myid-1,myid-1, MPI_COMM_WORLD); MPI_Send(&(buffee(m/2,0)), m/2*N,MPI_FLOAT, myid-1,myid-1, MPI_COMM_WORLD); MPI_Send(&buf[m/2],m/2, MPI_INT,myid-1,myid-1,MPI_COMM_WORLD); } } lmax=min; for(i=0;i<m;i++) for(j=0;j<N;j++) if ((m*myid+i)!=j) if (fabs(a(i,j))>lmax) lmax=fabs(a(i,j)); } /*通过归约操作的求最大值运算求得将 整个n阶矩阵非主对角元素的最大元max,并广播给所有处理器以决定是否进行下一轮迭代*/ MPI_Allreduce(&lmax,&max,1, MPI_FLOAT,MPI_MAX, MPI_COMM_WORLD); } /* while */ /*0号处理器收集经过旋转变换的各子矩阵a, 得到非主对角元素全为0的结果矩阵A*/ if (myid==0) { A=(float*)malloc(floatsize*N*m*p); I=(float*)malloc(floatsize*N*m*p); for(i=0;i<m;i++) for(j=0;j<N;j++) { A(i,j)=a(i,j); I(i,j)=e(i,j); } } if (myid!=0) { MPI_Send(a,m*N,MPI_FLOAT,0,myid, MPI_COMM_WORLD); MPI_Send(e,m*N,MPI_FLOAT,0,myid, MPI_COMM_WORLD); } else for(i=1;i<p;i++) { MPI_Recv(a,m*N,MPI_FLOAT,i, i, MPI_COMM_WORLD, &status); MPI_Recv(e,m*N,MPI_FLOAT,i, i,MPI_COMM_WORLD, &status); for(j=0;j<m;j++) for(k=0;k<N;k++) A((i*m+j),k)=a(j,k); for(j=0;j<m;j++) for(k=0;k<N;k++) I((i*m+j),k)=e(j,k); } /*矩阵A的主对角元素就是特征值*/ if (myid==0) { for(i=0;i<N;i++) printf("the %dst envalue:%f\n",i, A(i,i)); printf("\n"); printf("Iteration num = %d\n",loop); } MPI_Finalize(); free(a); free(b); free(c); free(br); free(bt); free(bi); free(bj); free(zi); free(zj); free(buffer); free(buf); free(buffee); free(A); free(I); free(temp1); free(temp2); return(0); } 2. 运行实例 编译:mpicc –o cjacobi cjacobi.cc 运行:可以使用命令 mpirun –np ProcessSize cjacobi来运行该程序,其中ProcessSize是所使用的处理器个数,这里取为4。本实例中使用了 mpirun –np 4 cjacobi 运行结果: Input of file "dataIn.txt" 8 8 1.000000 3.000000 4.000000 4.000000 5.000000 6.000000 7.000000 8.000000 3.000000 1.000000 2.000000 2.000000 3.000000 4.000000 5.000000 6.000000 4.000000 2.000000 1.000000 2.000000 3.000000 3.000000 2.000000 1.000000 1.000000 2.000000 3.000000 4.000000 4.000000 6.000000 7.000000 8.000000 4.000000 5.000000 5.000000 6.000000 7.000000 8.000000 1.000000 1.000000 2.000000 2.000000 2.000000 3.000000 4.000000 5.000000 7.000000 3.000000 2.000000 4.000000 5.000000 7.000000 9.000000 0.000000 2.000000 3.000000 1.000000 2.000000 4.000000 6.000000 2.000000 7.000000 8.000000 1.000000 the 0st envalue:-5.539342 the 1st envalue:9.369179 the 2st envalue:2.678424 the 3st envalue:0.378010 the 4st envalue:30.210718 the 5st envalue:-1.180327 the 6st envalue:-10.858351 the 7st envalue:-3.058303 Iteration num = 4 说明:该运行实例中,A为8×8的对称矩阵,它的元素值存放于文档“dataIn.txt”中,最后输出矩阵A的7个特征值。