void UrineThread::run()
{
int readlen, retval, i;
char str[TEMPDATABUFSIZE];
fd_set rfds;
struct timeval tv ;
tv.tv_sec = 0;
tv.tv_usec = 100000;
printf("thread....\n");
while (!stopThread)
{
FD_ZERO(&rfds); // æ¸…ç©ºä¸²å£æŽ¥æ”¶ç«¯å£é›†
FD_SET(fd,&rfds); // è®¾ç½®ä¸²å£æŽ¥æ”¶ç«¯å£é›†
while(FD_ISSET(fd, &rfds)) // æ£€æµ‹ä¸²å£æ˜¯å¦æœ‰è¯»å†™åŠ¨ä½œ
{
FD_ZERO(&rfds); // æ¸…ç©ºä¸²å£æŽ¥æ”¶ç«¯å£é›† æ¯æ¬¡å¾ªçŽ¯éƒ½è¦æ¸…空,å¦åˆ™ä¸ä¼šæ£€æµ‹åˆ°æœ‰å˜åŒ–
FD_SET(fd,&rfds); // è®¾ç½®ä¸²å£æŽ¥æ”¶ç«¯å£é›†
retval = select(fd+1,&rfds,NULL,NULL,&tv);
if(retval == -1)
{
printf("an error accured.\n");
break;
}
else if(retval) //retval > 0
{
readlen = ::read(fd, str, TEMPDATABUFSIZE); //è¯»å–æ•°æ®
printf("readlen value: %d\n", readlen);
printf("data Buff size : %d\n", dataBuff.size());
for (i=0; i<readlen; ++i)
{
mutex.lock();
while (dataBuff.size() >= DATABUFFSIZE)
buffFull.wait(&mutex); //ç*‰å¾…
dataBuff.enqueue(str);
buffEmpty.wakeAll();
enqWait.wakeAll(); //唤醒所有线程
mutex.unlock();
}
/* str[readlen] = '\0'; //给读å–的数æ®åŠ ç»“å°¾ç¬¦
dataBuff.append(str); */ //将数æ®åŠ å…¥åˆ°ç¼“å†²åŒºä¸*
FD_ZERO(&rfds);
FD_SET(fd,&rfds);
retval = select(fd+1,&rfds,NULL,NULL,&tv); //判æ–*是å¦è¿˜æœ‰æ•°æ®
if(!retval) //如果没有数æ®åˆ™é€€å‡ºç¬¬äºŒå±‚循环
{
break;
}
}
}
msleep(60); //æ— æ•°æ®æ—¶ï¼Œç¡çœ 100毫秒, 新增的
}
}
void UrineShowThread::run()
{
bool result;
PacketType currPack;
while (!stopThread)
{
mutQue.lock();
while (packQue.size() < 1)
deqWait.wait(&mutQue);
printf("data Buff size: %d, data Que size: %d\n", dataBuff.size(), packQue.size());
currPack = packQue.dequeue();
enqWait.wakeAll();
buffEmpty.wakeAll();
mutQue.unlock();
result = sHostPackHandler[gHostPackInfo[currPack.ID].type]( currPack );
if (FALSE == result)
{
printf("pack handler fault.\n");
// the command has not been processed, add code here to process it
}
}
}
void UrineProcThread::run()
{
printf("data process.\n"); //在æ*¤å¤„åŠ æ•°æ®å¤„ç†ä»£ç
gHostPackMan.MakePack(); //解包,并å*˜äºŽåŒ…队列ä¸*
printf("out the make pack recursive\n");
}
void PackMan::MakePack( void )
{
UCHAR currChar;
BOOL result;
// repeat untill no data in receive buffer
while (!stopThread) //åªæœ‰å½“所有数æ®éƒ½å¤„ç†åŽæ‰é€€å‡º
{
mutex.lock();
// if (dataBuff.size() < 1)
while (dataBuff.size() < 1)
buffEmpty.wait(&mutex);
currChar = dataBuff.dequeue(); //从缓冲区ä¸*获得一个数æ®
buffFull.wakeAll();
mutex.unlock();
// packet ID has been received
if (mPackIdGot)
{
// current byte is a valid packet data
if ( 0x80 <= currChar )
{
// be careful: data stored begin from the second byte
mCurrPack.buffer[mCurrPackLen] = currChar;
++mCurrPackLen;
--mRestByte;
// whole packet has been received,
if ( 0 >= mRestByte )
{
result = UnpackWithCheckSum( &mCurrPack.buffer[0], mCurrPackLen );
if (result) //解包æˆåŠŸ
{
mutQue.lock();
while (packQue.size() >= PACKQUEUESIZE)
enqWait.wait(&mutQue);
packQue.enqueue(mCurrPack); //把解的包放到队列ä¸*去
deqWait.wakeAll();
mutQue.unlock();
}
else
{
printf("check sum fault.\n");
// mErrorPack ++;
//
// if( NULL != gPtrView )
// {
// ::PostMessage( gPtrView->m_hWnd, WM_COMM_ERROR, mErrorPack, 0 );
// }
}
mPackIdGot = 0;
}
}
// current byte is not a valid packet data, maybe is a packet ID,
// unget it for further analysis
else
{
// there must be a error, because current packet is not integral
//mPacksReceived.Put(Pack_ErrPack);
// mErrorPack ++;
//
// if( NULL != gPtrView )
// {
// ::PostMessage( gPtrView->m_hWnd, WM_COMM_ERROR, mErrorPack, 0 );
// }
printf("the data is fault.\n");
mPackIdGot = 0;
mutex.lock();
while (dataBuff.size() >= DATABUFFSIZE)
buffFull.wait(&mutex); //ç*‰å¾…
dataBuff.prepend(currChar); //currChar å¯èƒ½æ˜¯ä¸€ä¸ªåŒ…, 釿–°æ’入队列
buffEmpty.wakeAll(); //唤醒所有线程
mutex.unlock();
// mUART.mRxCharQue.Unget( );
}
}
// packet ID has not been received
else
{
// check whether currChar is a valid packet ID
if ( ( mMaxPackID > currChar ) && ( 0 < mPackInfo[currChar].len ) )
{
// ****** >>> ****** //
mRestByte = mPackInfo[currChar].len - 1 ;
mCurrPackLen = 1;
mCurrPack.ID = currChar;
mPackIdGot = 1;
printf("rest byte len: %d\n", mRestByte);
// if this kind of packet only has an ID, a whole packet received
if ( 0 == mRestByte )
{
mutQue.lock();
while (packQue.size() >= PACKQUEUESIZE)
enqWait.wait(&mutQue);
packQue.enqueue(mCurrPack); //把解的包放到队列ä¸*去
deqWait.wakeAll();
mutQue.unlock();
mPackIdGot = 0;
}
}
// currChar is not a valid packet ID
else
{
printf("fault id: %d\n", currChar);
/* mErrorPack ++;
if( NULL != gPtrView )
{
::PostMessage( gPtrView->m_hWnd, WM_COMM_ERROR, mErrorPack, 0 );
}
// there must be a error, because current packet is not integral
mPacksReceived.Put(Pack_UnknownPack);
*/
}
}
} // while
return;
}
void UrineThread::run()
{
int readlen, retval, i;
char str[TEMPDATABUFSIZE];
fd_set rfds;
struct timeval tv ;
tv.tv_sec = 0;
tv.tv_usec = 100000;
printf("thread....\n");
while (!stopThread)
{
FD_ZERO(&rfds); // æ¸…ç©ºä¸²å£æŽ¥æ”¶ç«¯å£é›†
FD_SET(fd,&rfds); // è®¾ç½®ä¸²å£æŽ¥æ”¶ç«¯å£é›†
while(FD_ISSET(fd, &rfds)) // æ£€æµ‹ä¸²å£æ˜¯å¦æœ‰è¯»å†™åŠ¨ä½œ
{
FD_ZERO(&rfds); // æ¸…ç©ºä¸²å£æŽ¥æ”¶ç«¯å£é›† æ¯æ¬¡å¾ªçŽ¯éƒ½è¦æ¸…空,å¦åˆ™ä¸ä¼šæ£€æµ‹åˆ°æœ‰å˜åŒ–
FD_SET(fd,&rfds); // è®¾ç½®ä¸²å£æŽ¥æ”¶ç«¯å£é›†
retval = select(fd+1,&rfds,NULL,NULL,&tv);
if(retval == -1)
{
printf("an error accured.\n");
break;
}
else if(retval) //retval > 0
{
readlen = ::read(fd, str, TEMPDATABUFSIZE); //è¯»å–æ•°æ®
printf("readlen value: %d\n", readlen);
printf("data Buff size : %d\n", dataBuff.size());
for (i=0; i<readlen; ++i)
{
mutex.lock();
while (dataBuff.size() >= DATABUFFSIZE)
buffFull.wait(&mutex); //ç*‰å¾…
dataBuff.enqueue(str);
buffEmpty.wakeAll();
enqWait.wakeAll(); //唤醒所有线程
mutex.unlock();
}
/* str[readlen] = '\0'; //给读å–的数æ®åŠ ç»“å°¾ç¬¦
dataBuff.append(str); */ //将数æ®åŠ å…¥åˆ°ç¼“å†²åŒºä¸*
FD_ZERO(&rfds);
FD_SET(fd,&rfds);
retval = select(fd+1,&rfds,NULL,NULL,&tv); //判æ–*是å¦è¿˜æœ‰æ•°æ®
if(!retval) //如果没有数æ®åˆ™é€€å‡ºç¬¬äºŒå±‚循环
{
break;
}
}
}
msleep(60); //æ— æ•°æ®æ—¶ï¼Œç¡çœ 100毫秒, 新增的
}
}
void UrineShowThread::run()
{
bool result;
PacketType currPack;
while (!stopThread)
{
mutQue.lock();
while (packQue.size() < 1)
deqWait.wait(&mutQue);
printf("data Buff size: %d, data Que size: %d\n", dataBuff.size(), packQue.size());
currPack = packQue.dequeue();
enqWait.wakeAll();
buffEmpty.wakeAll();
mutQue.unlock();
result = sHostPackHandler[gHostPackInfo[currPack.ID].type]( currPack );
if (FALSE == result)
{
printf("pack handler fault.\n");
// the command has not been processed, add code here to process it
}
}
}
void UrineProcThread::run()
{
printf("data process.\n"); //在æ*¤å¤„åŠ æ•°æ®å¤„ç†ä»£ç
gHostPackMan.MakePack(); //解包,并å*˜äºŽåŒ…队列ä¸*
printf("out the make pack recursive\n");
}
void PackMan::MakePack( void )
{
UCHAR currChar;
BOOL result;
// repeat untill no data in receive buffer
while (!stopThread) //åªæœ‰å½“所有数æ®éƒ½å¤„ç†åŽæ‰é€€å‡º
{
mutex.lock();
// if (dataBuff.size() < 1)
while (dataBuff.size() < 1)
buffEmpty.wait(&mutex);
currChar = dataBuff.dequeue(); //从缓冲区ä¸*获得一个数æ®
buffFull.wakeAll();
mutex.unlock();
// packet ID has been received
if (mPackIdGot)
{
// current byte is a valid packet data
if ( 0x80 <= currChar )
{
// be careful: data stored begin from the second byte
mCurrPack.buffer[mCurrPackLen] = currChar;
++mCurrPackLen;
--mRestByte;
// whole packet has been received,
if ( 0 >= mRestByte )
{
result = UnpackWithCheckSum( &mCurrPack.buffer[0], mCurrPackLen );
if (result) //解包æˆåŠŸ
{
mutQue.lock();
while (packQue.size() >= PACKQUEUESIZE)
enqWait.wait(&mutQue);
packQue.enqueue(mCurrPack); //把解的包放到队列ä¸*去
deqWait.wakeAll();
mutQue.unlock();
}
else
{
printf("check sum fault.\n");
// mErrorPack ++;
//
// if( NULL != gPtrView )
// {
// ::PostMessage( gPtrView->m_hWnd, WM_COMM_ERROR, mErrorPack, 0 );
// }
}
mPackIdGot = 0;
}
}
// current byte is not a valid packet data, maybe is a packet ID,
// unget it for further analysis
else
{
// there must be a error, because current packet is not integral
//mPacksReceived.Put(Pack_ErrPack);
// mErrorPack ++;
//
// if( NULL != gPtrView )
// {
// ::PostMessage( gPtrView->m_hWnd, WM_COMM_ERROR, mErrorPack, 0 );
// }
printf("the data is fault.\n");
mPackIdGot = 0;
mutex.lock();
while (dataBuff.size() >= DATABUFFSIZE)
buffFull.wait(&mutex); //ç*‰å¾…
dataBuff.prepend(currChar); //currChar å¯èƒ½æ˜¯ä¸€ä¸ªåŒ…, 釿–°æ’入队列
buffEmpty.wakeAll(); //唤醒所有线程
mutex.unlock();
// mUART.mRxCharQue.Unget( );
}
}
// packet ID has not been received
else
{
// check whether currChar is a valid packet ID
if ( ( mMaxPackID > currChar ) && ( 0 < mPackInfo[currChar].len ) )
{
// ****** >>> ****** //
mRestByte = mPackInfo[currChar].len - 1 ;
mCurrPackLen = 1;
mCurrPack.ID = currChar;
mPackIdGot = 1;
printf("rest byte len: %d\n", mRestByte);
// if this kind of packet only has an ID, a whole packet received
if ( 0 == mRestByte )
{
mutQue.lock();
while (packQue.size() >= PACKQUEUESIZE)
enqWait.wait(&mutQue);
packQue.enqueue(mCurrPack); //把解的包放到队列ä¸*去
deqWait.wakeAll();
mutQue.unlock();
mPackIdGot = 0;
}
}
// currChar is not a valid packet ID
else
{
printf("fault id: %d\n", currChar);
/* mErrorPack ++;
if( NULL != gPtrView )
{
::PostMessage( gPtrView->m_hWnd, WM_COMM_ERROR, mErrorPack, 0 );
}
// there must be a error, because current packet is not integral
mPacksReceived.Put(Pack_UnknownPack);
*/
}
}
} // while
return;
}
To copy to clipboard, switch view to plain text mode
Bookmarks