網頁

2017年1月23日 星期一

[VC] Multi-Threading data transfer 框架

 

Multi-Threading 現在已經是一種普遍應用的技術, 但是對初次接觸的人來說, 並不是那麼容易上手.手上有一個測試程式, 用這個例子做個說明, 並且提供一個完整框架.

這個例子是透過 USB, 取得一個 Frame (畫面), 做一些計算處理, 同時顯示到螢幕上. 所謂的同時, 是說計算處理並不和顯示有直接相關. 計算主要是計算它的解像力, Lens Shading, 這些計算比較耗時, 結果並不需要更新到畫面. 顯示的目的只是提供操作人員可以看到畫面.

由這樣的應用目的, 可以導出如下的需求:

  • 盡力收取影像, 提高計算部分的 frame rate –> 效率考量.
  • 計算或顯示如果運算來不及, 可以放棄目前收進來的  frame.
  • frame 必須完整. 計算完成前, 影像資料不能被更動.

為了因應這三個不同的工作內容( 取得影像, 計算, 顯示), 程式部分區分成 3 個 thread.

  • Master Thread, 身兼管理其他 thread 的任務. 同步機制, 分配記憶體, 取得影像, 發送影像. 以及結束後收拾殘局.
  • Computing Thread. 默默地計算影像資料.
  • Display Thread. 更新影像到螢幕上.

Computing Thread  及 Display Thread 在這個例子中是一樣的. 程式的流程簡單說明如下

  • CMasterThread ( 封裝 MasterThread . 及相關變數)
  • CMasterThread::Create() 嘗試建立 CMasterThread 物件, 並啟動 CMasterThread 的 ThreadFunction
  • CMasterThread::ThreadFunction() 為 ComputingThread, DisplayThread 配置同步物件, CriticalSection, Event, Image Buffer.
  • CMasterThread::ThreadFunction() 建立 Shutdown 作為通知所有 Thread 系統即將關閉的同步機制.
  • CMasterThread::ThreadFunction() 啟動 Display Thread
  • CMasterThread::ThreadFunction() 啟動 Computing Thread
  • -- 此時 Display Thread, Computing Thread 雖已啟動, 但會等不到資料 ready 的 event,  而進入等待狀態
  • CMasterThread::ThreadFunction() 配置接收記憶體
  • -- 此時 CMasterThread::ThreadFunction() 會等待 bStart 的旗標. 此旗標會由外部透過 CMasterThread::Start() 設定. 這樣的做法是可以讓外部準確的控制起跑時間.
  • 起跑之後, CMasterThread::ThreadFunction() 進入迴圈
    • bStop 旗標, 此旗標由外部控制是否繼續執行. 若不繼續執行, 則進行停止的程序.
    • 首先, 先取得一個 Frame
    • 以 TryEnterCriticalSection 嘗試鎖定 Computing 的 Buffer, 如果可以鎖定, 表示 Computing Thread 沒有在使用中, 所以可以更新資料. 如果不能鎖定, 表示 Computing Thread 在使用中. 放棄這次的更新.
    • 設定 Computing Thread 的 data available event. Computing Thread 會因而喚起, 開始處理.
    • 同樣的嘗試更新 Display Thread
  • 開始關閉程序
  • 設定 Shutdown event. Computing Thread 及 Display Thread 收到後會結束執行.
  • 等到 Computing Thread 及 Display Thread 都結束.
  • 關閉同步物件, 移除配置的記憶體.
  • 由外部呼叫 Clean() 釋放 CMasterThread 物件

** 注意, 程式中沒有資料處理, 而是以隨機的 Sleep() 來產生不同的時間差, 以測試程式的穩定性.

** Source code 的 plug-in 好像壞掉了. 可以由此取得完整專案 source code https://github.com/lxnick/ThreadTest

 

程式列表 :

MasterThread.h

[sourcecode language='cpp' ]
#include <process.h>
#include <windows.h>  
#include <string>

#define BUFFER_SIZE				(16 * 1024 * 1024 )

typedef struct tagTHREAD_INFO
{
	std::string szName;
	HANDLE		hHandle;
	DWORD		dwID;
	CRITICAL_SECTION	CriticalSection;
	HANDLE				hDataAvailable;

	unsigned long	nFrameIndex;
	unsigned char * pBuffer;
}	THREAD_INFO, *LPTHREAD_INFO;

class CMasterThread
{
private:
	CMasterThread();
	~CMasterThread();

	HANDLE hThread;	
	unsigned threadID; 

public:
	static CMasterThread* Create();
	static void Clean();

	static unsigned __stdcall ThreadFunction(void* pArguments);

	void Start();
	void Stop();

	BOOL bStart;
	BOOL bStop;

	HANDLE	hShutdown;

	THREAD_INFO	DisplayInfo;
	THREAD_INFO	ComputingInfo;
};
[/sourcecode]

MasterThread.cpp

[sourcecode language='cpp'  gutter='false']

#include "stdafx.h"
#include "MasterThread.h"

#define	WAIT_FRAME_TIME ( 100 )
#define	COMPUTING_TIME ( 50 )
#define	DISPLAY_TIME ( 50 )

const static char EVENT_COMPUTING[] = "ComputingDataAvailable";
const static char EVENT_DISPLAY[] = "DisplayDataAvailable";

static CMasterThread* pMasterThread = NULL;

unsigned __stdcall DisplayThreadFunction( void* pArguments )  
{
	LPTHREAD_INFO pInfo = (LPTHREAD_INFO) pArguments;

    printf( "\tDisplay Thread Launch ...\n" );  
	HANDLE hDataEvent = OpenEvent(EVENT_ALL_ACCESS, false, EVENT_DISPLAY);

	while( TRUE )
	{
		HANDLE Wait[] = { pMasterThread->hShutdown, pInfo->hDataAvailable };
		DWORD dwWait = WaitForMultipleObjects(sizeof(Wait)/sizeof(HANDLE), Wait, FALSE, INFINITE);

		if (dwWait != (WAIT_OBJECT_0 + 1))
		{
			printf("\tDisplay Thread Exit ...\n");
			return 0;
		}

		if ( TryEnterCriticalSection( & pInfo->CriticalSection ) != 0)
		{
			Sleep( DISPLAY_TIME );
			printf( "\tDisplay Thread Frame %d\n", pInfo->nFrameIndex);

			if (pInfo->pBuffer[0] != (pInfo->nFrameIndex & 0xFF))
				printf("\tDisplay Data dismatch\n");

			int random_number = rand() % DISPLAY_TIME;
			printf("\tDisplay Time = %d\n", random_number);
			Sleep(random_number);

			if (pInfo->pBuffer[0] != (pInfo->nFrameIndex & 0xFF))
				printf("\tDisplay Data overwritten\n");

			LeaveCriticalSection ( &pInfo->CriticalSection );
			ResetEvent(hDataEvent);
		}
	}

   printf( "\tDisplay Thread Shutdown ...\n" );  
}

unsigned __stdcall ComputingThreadFunction( void* pArguments )  
{
	LPTHREAD_INFO pInfo = (LPTHREAD_INFO) pArguments;

    printf( "\tComputing Thread Launch ...\n" );  
	HANDLE hDataEvent = OpenEvent(EVENT_ALL_ACCESS, false, EVENT_COMPUTING);

	while( TRUE )
	{
		HANDLE Wait[] = { pMasterThread->hShutdown, hDataEvent };
		DWORD dwWait = WaitForMultipleObjects(sizeof(Wait)/sizeof(HANDLE), Wait, FALSE, INFINITE);

		if ( dwWait != ( WAIT_OBJECT_0 +1))
		{
			printf("\tComputing Thread Exit ...\n");
			return 0;
		}

		if ( TryEnterCriticalSection( & pInfo->CriticalSection ) != 0)
		{
			printf("\tComputing Thread Frame %d\n", pInfo->nFrameIndex);

			if (pInfo->pBuffer[0] != (pInfo->nFrameIndex & 0xFF) )
				printf("\tComputing Data dismatch\n");

			int random_number = rand() % COMPUTING_TIME;
			printf("\tComputing Time = %d\n", random_number);
			Sleep(random_number);

			if (pInfo->pBuffer[0] != (pInfo->nFrameIndex & 0xFF))
				printf("\tComputing Data overwritten\n");

			LeaveCriticalSection(&pInfo->CriticalSection);
			ResetEvent(hDataEvent);
		}
	}
   printf( "\tComputing Thread Shutdown ...\n" ); 
}


unsigned __stdcall CMasterThread::ThreadFunction( void* pArguments )
{  
    printf( "Master Thread Launch ...\n" );  

	memset( &pMasterThread->DisplayInfo, 0, sizeof ( pMasterThread->DisplayInfo));
	pMasterThread->DisplayInfo.szName = "Display";
	InitializeCriticalSection( & pMasterThread->DisplayInfo.CriticalSection );
	pMasterThread->DisplayInfo.hDataAvailable = CreateEvent( NULL, TRUE, FALSE, EVENT_DISPLAY);
	pMasterThread->DisplayInfo.pBuffer = new unsigned char [ BUFFER_SIZE ];

	memset( &pMasterThread->ComputingInfo, 0, sizeof ( pMasterThread->ComputingInfo));
	pMasterThread->ComputingInfo.szName = "Display";
	InitializeCriticalSection( & pMasterThread->ComputingInfo.CriticalSection );
	pMasterThread->ComputingInfo.hDataAvailable = CreateEvent( NULL, TRUE, FALSE, EVENT_COMPUTING);
	pMasterThread->ComputingInfo.pBuffer = new unsigned char [ BUFFER_SIZE ];

	pMasterThread->hShutdown = CreateEvent( NULL, TRUE, FALSE, "MasterShutdown" );

	pMasterThread->DisplayInfo.hHandle =  (HANDLE)_beginthreadex( 
		NULL, 
		0, 
		&DisplayThreadFunction, 
		&pMasterThread->DisplayInfo, 
		0, 
		NULL );  

 	pMasterThread->ComputingInfo.hHandle =  (HANDLE)_beginthreadex( 
		NULL, 
		0, 
		&ComputingThreadFunction, 
		&pMasterThread->ComputingInfo, 
		0, 
		NULL );   

	unsigned char * pFrameBuffer = new unsigned char [ BUFFER_SIZE ];
	int FrameIndex = 0;

	while( ! pMasterThread->bStart)
		Sleep( 100 );

	while( ! pMasterThread->bStop )
	{
		int random_number = rand() % WAIT_FRAME_TIME;
		printf("Master Wait Frame = %d\n", random_number);
		Sleep(random_number);

		FrameIndex ++;
		printf( "Master Frame Index = %d\n", FrameIndex );  

		int FramePattern = FrameIndex & 0xFF;
		pFrameBuffer[0] = (unsigned char) FramePattern;
//		memset(pFrameBuffer, FramePattern, sizeof(unsigned char) * BUFFER_SIZE);

		if ( TryEnterCriticalSection( & pMasterThread->ComputingInfo.CriticalSection ) != 0)
		{
//			printf("Master Send to Computing = %d\n", FrameIndex);
			memcpy( pMasterThread->ComputingInfo.pBuffer, pFrameBuffer, sizeof( unsigned char ) * BUFFER_SIZE );
			pMasterThread->ComputingInfo.nFrameIndex = FrameIndex;
			LeaveCriticalSection ( & pMasterThread->ComputingInfo.CriticalSection );
			SetEvent( pMasterThread->ComputingInfo.hDataAvailable );
		}

		if ( TryEnterCriticalSection( & pMasterThread->DisplayInfo.CriticalSection ) != 0)
		{
//			printf("Master Send to Display = %d\n", FrameIndex);
			memcpy( pMasterThread->DisplayInfo.pBuffer, pFrameBuffer, sizeof( unsigned char ) * BUFFER_SIZE );
			pMasterThread->DisplayInfo.nFrameIndex = FrameIndex;
			LeaveCriticalSection ( & pMasterThread->DisplayInfo.CriticalSection );
			SetEvent( pMasterThread->DisplayInfo.hDataAvailable );
		}
	}

	printf( "Master Thread End Run\n");  
	SetEvent( pMasterThread->hShutdown );
	HANDLE hWaitThread[] = { pMasterThread->DisplayInfo.hHandle, pMasterThread->ComputingInfo.hHandle };
	WaitForMultipleObjects( sizeof(hWaitThread)/sizeof(HANDLE), hWaitThread, TRUE, INFINITE );
	printf("Child Thread Exit\n");

	CloseHandle( pMasterThread->hShutdown );
	CloseHandle( pMasterThread->ComputingInfo.hDataAvailable  );
	DeleteCriticalSection( & pMasterThread->ComputingInfo.CriticalSection );
	delete [] pMasterThread->ComputingInfo.pBuffer;
		 

	CloseHandle( pMasterThread->DisplayInfo.hDataAvailable );
	DeleteCriticalSection( & pMasterThread->DisplayInfo.CriticalSection );
	delete [] pMasterThread->DisplayInfo.pBuffer;

	ResetEvent(pMasterThread->hShutdown);
	CloseHandle(  pMasterThread->hShutdown);
	delete [] pFrameBuffer;

    return 0;  
}  



CMasterThread::CMasterThread()
{

}

CMasterThread::~CMasterThread()
{

}

CMasterThread* CMasterThread::Create()
{
	if ( pMasterThread != NULL)
		return pMasterThread;

	pMasterThread = new CMasterThread;
	pMasterThread->bStart = FALSE;
	pMasterThread->bStop = FALSE;

	pMasterThread->hThread =  (HANDLE)_beginthreadex( NULL, 0, &ThreadFunction, NULL, 0, &pMasterThread->threadID );

	return pMasterThread;
}

void CMasterThread::Clean()
{
	if ( pMasterThread != NULL)
		delete pMasterThread;

	pMasterThread = NULL;
}

void CMasterThread::Start()
{
	pMasterThread->bStart = TRUE;
}

void CMasterThread::Stop()
{
	pMasterThread->bStop = TRUE;
	WaitForSingleObject( pMasterThread->hThread, INFINITE );

	CloseHandle(pMasterThread->hThread);
}
[/sourcecode]

ThreadTest.cpp

[sourcecode language='cpp' ]
#include "stdafx.h"
#include <stdlib.h>  
#include <stdio.h>  
#include <time.h>  

#include "MasterThread.h"

int _tmain(int argc, _TCHAR* argv[])
{
	for (int i = 0; i < 100; i++)
	{
		srand((unsigned)time(NULL));

		CMasterThread * pMasterThread = CMasterThread::Create();
		Sleep(100);

		pMasterThread->Start();
		Sleep(1000 * 5);
		pMasterThread->Stop();

		CMasterThread::Clean();
	}

	return 0;
}
[/sourcecode]