マージソート

 

 マージソート(merge sort)は、小さなソート済みデータを用意しておき、そのソート済みデータをきれいな順番で並べながら少し大きなレコードにマージさせる作業を繰り返して、全体をソートする方法です。

特 徴

レコード長に注意

 このうち、メモ書きを使うことで無条件にデータ移動が発生する項目は重要です。

 ソートを紹介しているサイトの記事は、数字を並び替えるだけの例で、速いとか遅いとか、すべてを語ろうとします。

 データ転送で、レジスタとメモリ間の転送回数を考えてみます。スワップの場合は、

  1.  データ域からレジスタヘ
  2.  レジスタから新しいデータ域へ

 という2工程で、1つのデータがスワップして新しい場所に設定されます。しかし、マージソートの場合はメモ書きに一度書き込む構造になっていますから、

  1.  データ域からレジスタヘ
  2.  レジスタからメモ書き領域へ
  3.  メモ書き領域からレジスタヘ
  4.  レジスタから新しいデータ域へ

 という4工程を実行することになります。処理工程が2倍になりました。マージソートの速度を向上させたくても、限界になっている部分です。

 マージソートでは、データ転送回数が多いということは、レコード長が大きなデータの場合は、想像以上に大きな遅延が発生するという意味になります。しかし、紹介記事では、たったの4バイトの数字の並び替えで説明していますから、レコード長が大きな場合は注意が必要という情報を掲載していません。というか、気が付いていないと思われます。レコード長4バイトでは、この現象の影響が、小さくて気が付かないからです。

 記事を書くのなら、注意書きくらい書いておきなさいよ! と言いたいのが筆者の本音ですが。というか、本当にマージソートを理解していて、他人様に情報を教えてあげようと思っているのなら、危険因子は当然書くはずなんですが。本当は理解なんかしていなくて、再帰コールでダイナミックな動きをするクイックソートの記事を書きたくて、それだけでは項目数がさみしいから、ついでにマージソートの記事も書いたのではないですか?

 レコード長がびっくりするくらい大きなレコードの場合は、CPU のレジスタ経由ではなく、DMA コントローラーで メモリ - メモリ 転送する方法もあります。しかし、そのような大きなレコードはマスターデーターを直接ソートするのではなく、小さなインデックスを作成し、インデックス上でソートするのが一般的です。DMA を使うと互換性もなくなりますしね。まして、一般のアプリケーションプログラマはその方法さえわからないはずですから。

実行速度の例

 win32 環境での速度例です。レコード数 5千万件、レコード長8バイト、数値の単位はミリ秒です。

  Random32 Random32R Random15 Random15R Forward Reverse Constant Med Killer
Type A(VC) 9110.46 9110.46 8096.45 8034.05 4492.83 4602.03 4461.63 4602.03
Type B(VC) 9110.46 9063.66 8049.65 7971.65 4461.63 4492.83 4461.63 4508.43
Type C(VC) 7909.25 7956.05 6942.04 6926.44 3385.22 3385.22 3369.62 3463.22

 x64 環境での速度例です。レコード数 5千万件、レコード長 16 バイト、数値の単位はミリ秒です。

  Random32 Random32R Random15 Random15R Forward Reverse Constant Med Killer
Type A(VC) 9781.26 9906.06 9094.86 9172.86 7316.45 7332.05 7285.25 7160.45
Type B(VC) 9765.66 9812.46 9063.66 9063.66 7222.85 7191.65 7176.05 6567.64
Type C(VC) 8034.05 8127.65 7659.65 7644.05 4570.83 4602.03 4617.63 4648.83

 ソースリストを掲載しますが、コピー & ペーストだと、インデントが壊れて汚くなっています。もう少しきれいなソースリストも用意しました。速度測定プログラムでのソースリストなので、処理を中断する Abort 処理が混じっていますから、削除してしまえばちょっとだけ処理速度が速くなります。スワップ処理やコピー処理に XMM レジスターを使うことで、ソート本体のレジスタ変数を少しでも確保して高速化する狙いがあります。

MergeSort.txt stdafx.txt

Type A

DWORD __stdcall MergeSortA(SortRecStruct* pArray, size_t pRecCount, bool* pAbort)
{
SortRecStruct *Buffer;
size_t sztAllocSize, sztSpan, sztPtr, sztMid, sztBtm, sztDest, sztPtrL, sztPtrR;
int nResult = 0;

if( pRecCount >= 2 )
{
sztAllocSize = sizeof( SortRecStruct ) * pRecCount;
Buffer = ( SortRecStruct* ) VirtualAlloc( NULL, sztAllocSize, MEM_COMMIT, PAGE_READWRITE );
if( Buffer )
{
sztSpan = 1;
do
{
sztPtr = 0;
do
{
sztMid = min( sztPtr + sztSpan, pRecCount );
sztBtm = min( sztPtr + 2 * sztSpan, pRecCount );
sztDest = sztPtrL = sztPtr; sztPtrR = sztMid;
do
{
if( sztPtrL < sztMid
&& ( sztPtrR >= sztBtm || pArray[ sztPtrL ].Key <= pArray[ sztPtrR ].Key ) )
CopyRecAU( &Buffer[ sztDest ], &pArray[ sztPtrL++ ] );
else
CopyRecAU( &Buffer[ sztDest ], &pArray[ sztPtrR++ ] );
sztDest++;
}
while( sztDest < sztBtm );
sztPtr = sztSpan * 2 + sztPtr;
}
while( sztPtr < pRecCount );
memcpy( pArray, Buffer, sztAllocSize );
sztSpan *= 2;
if( *pAbort ) return 1223;
}
while( sztSpan < pRecCount );
VirtualFree( Buffer, sztAllocSize, MEM_DECOMMIT );
VirtualFree( Buffer, 0, MEM_RELEASE );
}
else
nResult = GetLastError();
}
return nResult;
}

Type B

static void __stdcall MergeProcB(SortRecStruct* pArray, SortRecStruct* pTemp, size_t pTop, size_t pMid, size_t pBtm)
{
size_t sztPtr, sztLeftPtr, sztCount, sztTempDest;

sztLeftPtr = pMid - 1; sztTempDest = pTop; sztCount = pBtm - pTop + 1;

while( ( pTop <= sztLeftPtr ) && ( pMid <= pBtm ) )
{
if( pArray[ pTop ].Key <= pArray[ pMid ].Key )
{
CopyRecAU( &pTemp[ sztTempDest ], &pArray[ pTop++ ] );
}
else
{
CopyRecAU( &pTemp[ sztTempDest ], &pArray[ pMid++ ] );
}
sztTempDest++;
}
//
while( pTop <= sztLeftPtr )
{
CopyRecAU( &pTemp[ sztTempDest++ ], &pArray[ pTop++ ] );
}
//
while( pMid <= pBtm )
{
CopyRecAU( &pTemp[ sztTempDest++ ], &pArray[ pMid++ ] );
}
//
for( sztPtr = 0; sztPtr < sztCount; pBtm--, sztPtr++ )
{
CopyRecUA( &pArray[ pBtm ], &pTemp[ pBtm ] );
}
}

static void __stdcall DualSort(SortRecStruct* p1, SortRecStruct* p2)
{
if( p1->Key > p2->Key )
SwapUU( p1, p2 );
}

static DWORD __stdcall MergeSortBB(SortRecStruct* pArray, SortRecStruct* pTemp, size_t pTop, size_t pBtm, bool* pAbort)
{
size_t sztMidPtr;

if( pBtm > pTop )
{
if( ( pBtm - pTop ) > 1 )
{
sztMidPtr = pTop + ( pBtm - pTop ) / 2;
MergeSortBB( pArray, pTemp, pTop, sztMidPtr, pAbort );
if( *pAbort ) return 1223;
MergeSortBB( pArray, pTemp, sztMidPtr + 1, pBtm, pAbort );
if( *pAbort ) return 1223;
MergeProcB( pArray, pTemp, pTop, sztMidPtr + 1, pBtm );
if( *pAbort ) return 1223;
}
else
{
while( pTop < pBtm )
{
DualSort( &pArray[ pTop ], &pArray[ pBtm ] );
pTop += 2;
}
}
}
return 0;
}

DWORD __stdcall MergeSortBR(SortRecStruct* pArray, size_t pTop, size_t pBtm, bool* pAbort)
{
SortRecStruct *OutBuff;
size_t sztAllocSize;
int nStatus, nResult = 0;

if( pBtm > pTop )
{
sztAllocSize = sizeof( SortRecStruct ) * ( pBtm - pTop + 1 );
OutBuff = ( SortRecStruct* ) VirtualAlloc( NULL, sztAllocSize, MEM_COMMIT, PAGE_READWRITE );
if( OutBuff == NULL )
nResult = GetLastError();
else
{
nStatus = MergeSortBB( pArray, OutBuff, pTop, pBtm, pAbort );
VirtualFree( OutBuff, sztAllocSize, MEM_DECOMMIT );
VirtualFree( OutBuff, 0, MEM_RELEASE );
if( nStatus ) nResult = nStatus;
}
}
return nResult;
}

DWORD __stdcall MergeSortB(SortRecStruct* pArray, size_t pRecCount, bool* pAbort)
{
return MergeSortBR( pArray, 0, pRecCount - 1, pAbort );
}

Type C

static void __stdcall MergeProcC(SortRecStruct* pTemp, SortRecStruct* pTop, SortRecStruct* pMid, SortRecStruct* pBtm)
{
SortRecStruct *RecPtr, *RecLeftPtr, *TempDest;
size_t sztCount;

RecPtr = pTop; RecLeftPtr = pMid - 1; TempDest = pTemp;

while( ( RecPtr <= RecLeftPtr ) && ( pMid <= pBtm ) )
{
if( RecPtr->Key <= pMid->Key )
{
CopyRecAU( TempDest, RecPtr++ );
}
else
{
CopyRecAU( TempDest, pMid++ );
}
TempDest++;
}
//
while( RecPtr <= RecLeftPtr )
{
CopyRecAU( TempDest++, RecPtr++ );
}
//
while( pMid <= pBtm )
{
CopyRecAU( TempDest++, pMid++ );
}
//
sztCount = ( pBtm - pTop ) + 1;
while( sztCount )
{
CopyRecUA( pTop++, pTemp++ ); sztCount--;
}
}

static DWORD __stdcall MergeSortCC(SortRecStruct* pTemp, SortRecStruct* pTop, SortRecStruct* pBtm, bool* pAbort)
{
SortRecStruct *RecMidPtr;

if( pBtm > pTop )
{
if( ( pBtm - pTop ) > 1 )
{
RecMidPtr = ( pBtm - pTop ) / 2 + pTop;
MergeSortCC( pTemp, pTop, RecMidPtr, pAbort );
if( *pAbort ) return 1223;
MergeSortCC( pTemp, RecMidPtr + 1, pBtm, pAbort );
if( *pAbort ) return 1223;
MergeProcC( pTemp, pTop, RecMidPtr + 1, pBtm );
if( *pAbort ) return 1223;
}
else
{
while( pTop < pBtm )
{
DualSort( pTop, pBtm );
pTop += 2;
}
}
}
return 0;
}

DWORD __stdcall MergeSortCR(SortRecStruct* pArray, size_t pTop, size_t pBtm, bool* pAbort)
{
SortRecStruct *OutBuff;
size_t sztAllocSize;
int nStatus, nResult = 0;

if( pBtm > pTop )
{
sztAllocSize = sizeof( SortRecStruct ) * ( pBtm - pTop + 1 );
OutBuff = ( SortRecStruct* ) VirtualAlloc( NULL, sztAllocSize, MEM_COMMIT, PAGE_READWRITE );
if( OutBuff == NULL )
nResult = GetLastError();
else
{
nStatus = MergeSortCC( OutBuff, pArray, &pArray[ pBtm ], pAbort );
VirtualFree( OutBuff, sztAllocSize, MEM_DECOMMIT );
VirtualFree( OutBuff, 0, MEM_RELEASE );
if( nStatus ) nResult = nStatus;
}
}
return nResult;
}

DWORD __stdcall MergeSortC(SortRecStruct* pArray, size_t pRecCount, bool* pAbort)
{
return MergeSortCR( pArray, 0, pRecCount - 1, pAbort );
}

ヒント

 何か理由があって、クイックソートではなく、マージソートを基幹業務にしたい方もおられるようです。でも、少しでも速い方がいいですよね。

 マージソートでは、選別してバッファに書く作業と、バッファを書き戻す作業があります。普通は、選別作業を最後まで行い、選別が終わったらバッファからデータ領域へ書き戻す方法を使います。バッファのデータは最初は L1 キャッシュにもありますが、レコード数が多いとか、レコード長が大きいという理由で、やがて L1 キャッシュから消えていきます。

 選別作業はシーケンシャルに書き込みが進んでいきますから、せっかく L1 キャッシュにデータがあるのに、やがて消えていくのも、ちょっともったいないです。

 もし、選別作業を小分けするようにできれば、バッファのデータが L1 キャッシュに残っているうちに書き戻し作業ができれば、少なくともキャッシュからの読み取り行為は高速に行えますよね? L1 キャッシュの半分の大きさが目安になります。 なんてね。

 他にも、細かい部分で改善できる点はあります。じっくり取り組んで、データの動きやキャッシュの動きを考えてみてください。


Comb Sort | Quick Sort


contents
Top Page caution Bubble Sort Odd Even Sort Cocktail Sort  Gnome Sort Selection Sort  Min Max Sort Insertion Sort Shell Sort Heap Sort Comb Sort Merge Sort Quick Sort Do-Re-Mi-Fa Sort chat
inserted by FC2 system