Results 1 to 10 of 10

Thread: Reading from sockets in a multithreaded program

  1. #1
    Join Date
    Mar 2007
    Posts
    8
    Qt products
    Qt4
    Platforms
    Unix/X11

    Unhappy Reading from sockets in a multithreaded program

    Hi folks,

    I'm new to Qt, so please forgive any ignorance... Note that I did read whatever I could find on the net and the Qt documentation on sockets and threading, but I must be overlooking something...

    I need to develop a distributed system of applications. One of these will use a GUI, so our choice was to develop everything with Qt. Communication between these applications will be on TCP based sockets. The apps will have to use multithreading for their tasks and communication processing.

    I started off with a server and a client. The server sends packets to the client and the client just dumps these. The connection set-up between server and client works fine. The sending of the data by the server works fine also.

    For the client I've made a Connection class derived from QThread. The run function of this QThread waits forReadyRead(-1), after which the data is streamed into a container. Whenever a container is read completely, it is placed in a QQueue (guarded with a QMutex) and a QSemaphore is set.

    The main loop blocks on the QSemphore. Whenever data is available the container is taken from the QQueue (guarded with a QMutex), it is dumped and deleted.

    At least, that is what I desire.

    The client does receive data. The main loop does dump this data. But for no apparent reason the process of reading/dumping stops. It seems like my threads are not scheduled anymore. Sometimes immedately, sometimes only in the end. In DDD it runs without problems

    I assume I'm doing something wrong with this QThread business, but it could have to do with this waitForReadRead(-1) also. To be honest, I'm lost.

    I've tried also to use an additional Consumer class, with its own QThread which blocks on the semaphore->acquire. This didn't solve my problem either

    Below, some excerpts of my code. I hope somebody is able to point my in the right direction...

    main of client
    Qt Code:
    1. int main(int argc, char *argv[])
    2. {
    3. QCoreApplication lApplication(argc, argv);
    4.  
    5. Connection *lClient = new Connection(QHostAddress("172.16.140.32"), 5001);
    6. QSemaphore *lSema = lClient->getSemaphore();
    7. CCSDS *lPacket; /// container
    8. int i = 0;
    9.  
    10. lClient->start(QThread::LowPriority);
    11.  
    12. while (true) {
    13. lSema->acquire();
    14. lPacket = lClient->recvCCSDS();
    15. printf("%d\n", ++i);
    16. if (lPacket) {
    17. lPacket->dump();
    18. delete lPacket;
    19. }
    20. }
    21. }
    To copy to clipboard, switch view to plain text mode 

    Most of the "magic" happens in this Connection class:

    Qt Code:
    1. Connection::Connection(const QHostAddress aAddress, const qint16 aPort)
    2. : QThread(), mSocket(NULL), mQueue(), mMutex(), mSemaphore(0)
    3. {
    4. /// create the socket
    5. mSocket = new QTcpSocket();
    6. mSocket->setTextModeEnabled(false);
    7. mSocket->connectToHost(aAddress, aPort);
    8. mSocket->waitForConnected(-1):
    9. }
    10.  
    11. void Connection::run()
    12. {
    13. int lCount = 0;
    14. int lBufLength = 0; /// size of the buffer not yet read
    15. int lPacketLength = 0; /// length of current CCSDS packet
    16. int lReadLength = 0; /// length of data read
    17. int lStart = 0;
    18. char *lData = NULL;
    19. CCSDS *lPacket = NULL;
    20. QDataStream lStream(mSocket);
    21.  
    22. lStream.setVersion(QDataStream::Qt_4_2);
    23.  
    24. while (true) {
    25. if (!mSocket || mSocket->state() == QAbstractSocket::UnconnectedState)
    26. return;
    27.  
    28. while (lBufLength < CCSDS_PRIM_SIZE) {
    29. if (!mSocket->waitForReadyRead(-1))
    30. return;
    31.  
    32. lBufLength = mSocket->bytesAvailable();
    33. }
    34.  
    35. while (lBufLength) {
    36. lBufLength = mSocket->bytesAvailable();
    37. lPacket = new CCSDS();
    38. lData = lPacket->data();
    39. lStart = 0;
    40.  
    41. /// read only the primary header
    42. lReadLength = lStream.readRawData(lData, CCSDS_PRIM_SIZE);
    43.  
    44. if (lReadLength == -1)
    45. return;
    46.  
    47. /// adapt lBufLength and lStart with part which has been read
    48. lStart += lReadLength;
    49. lBufLength -= lReadLength;
    50.  
    51. /// primary header available: determine packet length
    52. lPacketLength = 1 + (lData[4] << 8) + lData[5];
    53.  
    54. while (lBufLength < lPacketLength) {
    55. if (!mSocket->waitForReadyRead(-1))
    56. return;
    57.  
    58. lBufLength += mSocket->bytesAvailable();
    59. }
    60.  
    61. lReadLength = lStream.readRawData(&lData[lStart], MIN(lPacketLength,lBufLength));
    62.  
    63. if (lReadLength == -1) {
    64. return;
    65. }
    66.  
    67. lStart += lReadLength;
    68. lBufLength -= lReadLength;
    69.  
    70. /// packet received: put on queue
    71. mMutex.lock();
    72. mQueue.enqueue(lPacket);
    73. mMutex.unlock();
    74. lPacket = NULL;
    75. lCount++;
    76. }
    77. /// inform user: set semaphore
    78. mSemaphore.release(lCount);
    79. lCount = 0;
    80. }
    81. }
    82.  
    83. CCSDS *Connection::recvCCSDS()
    84. {
    85. CCSDS *lPacket = NULL;
    86.  
    87. /// critical section
    88. mMutex.lock();
    89. if (!mQueue.isEmpty()) {
    90. lPacket = mQueue.dequeue();
    91. }
    92.  
    93. mMutex.unlock();
    94. return lPacket;
    95. }
    To copy to clipboard, switch view to plain text mode 

  2. #2
    Join Date
    Jan 2006
    Location
    Munich, Germany
    Posts
    4,714
    Thanks
    21
    Thanked 418 Times in 411 Posts
    Qt products
    Qt3 Qt4 Qt5 Qt/Embedded
    Platforms
    Unix/X11 Windows

    Default Re: Reading from sockets in a multithreaded program

    It could be a mutex deadlock.
    Try using tryLock() instead of lock() and see if it still gets stuck.
    If it doesn't get stuck anymore, you know on which path you have to go.

  3. #3
    Join Date
    Aug 2006
    Location
    Switzerland
    Posts
    52
    Thanked 13 Times in 11 Posts
    Qt products
    Qt4
    Platforms
    Unix/X11

    Default Re: Reading from sockets in a multithreaded program

    I'm not 100% sure of this, cause I have no way to check it now, but:

    You create QTcpSocket in Connection constructor, so it belongs to main thread. In Connection thread you call waitForRead() methods, so Connection thread is blocked until some data is available in socket. The following part is the one I'm not sure. I think that even when you use waitFor...() methods in Connection thread, the QSocket is still processed by the thread it lives in. So in order to receive any data (and as a result to unblock Connection thread) the main thread has to do some job. But it may happen that it will be blocked on acquire() and you get deadlock. Main thread waits for Connection thread to release semaphore and Connection thread waits for Main thread to process socket and deliver some data.

    Try to move QSocket creation to run() method of Connection class and see if it helps.
    The Wheel weaves as the Wheel wills.

  4. #4
    Join Date
    Mar 2007
    Posts
    8
    Qt products
    Qt4
    Platforms
    Unix/X11

    Default Re: Reading from sockets in a multithreaded program

    > deadlock

    Good hint: it seems to have to do with the semaphores... My results improve if I change the
    Qt Code:
    1. lSema->acquire();
    To copy to clipboard, switch view to plain text mode 
    into
    Qt Code:
    1. if (!lSema->tryAcquire()) continue;
    To copy to clipboard, switch view to plain text mode 
    in the main loop of the client.

    Still, I fail to see what I'm doing wrong... Besides, sometimes I seem to hit this deadlock situation anyhow.

    Reading the documentation on QMutex and QSemapore indicate that these objects are thread-safe and that the acquire() and lock() functions block until they obtain the semaphore or lock respectively.

    I've entered QT += thread network in mij .pro file. Is there something else required?

  5. #5
    Join Date
    Mar 2007
    Posts
    8
    Qt products
    Qt4
    Platforms
    Unix/X11

    Default Re: Reading from sockets in a multithreaded program

    Quote Originally Posted by danadam View Post
    Main thread waits for Connection thread to release semaphore and Connection thread waits for Main thread to process socket and deliver some data.

    Try to move QSocket creation to run() method of Connection class and see if it helps.
    This seems a plausible explanation. However, after trying this I still mange get the 'deadlock'. Only in about 5% of my tries, all data is received without problems.

  6. #6
    Join Date
    Aug 2006
    Location
    Switzerland
    Posts
    52
    Thanked 13 Times in 11 Posts
    Qt products
    Qt4
    Platforms
    Unix/X11

    Default Re: Reading from sockets in a multithreaded program

    I think I found the reason. 58th line of Connection code in the first post is:
    Qt Code:
    1. lBufLength += mSocket->bytesAvailable();
    To copy to clipboard, switch view to plain text mode 
    I guess it should be:
    Qt Code:
    1. lBufLength = mSocket->bytesAvailable();
    To copy to clipboard, switch view to plain text mode 
    The Wheel weaves as the Wheel wills.

  7. #7
    Join Date
    Mar 2007
    Posts
    8
    Qt products
    Qt4
    Platforms
    Unix/X11

    Default Re: Reading from sockets in a multithreaded program

    Quote Originally Posted by danadam View Post
    I guess it should be:
    Qt Code:
    1. lBufLength = mSocket->bytesAvailable();
    To copy to clipboard, switch view to plain text mode 
    Good catch: it is indeed wrong. However, my deadlock remains...

  8. #8
    Join Date
    Jan 2006
    Location
    Munich, Germany
    Posts
    4,714
    Thanks
    21
    Thanked 418 Times in 411 Posts
    Qt products
    Qt3 Qt4 Qt5 Qt/Embedded
    Platforms
    Unix/X11 Windows

    Default Re: Reading from sockets in a multithreaded program

    Good catch: it is indeed wrong. However, my deadlock remains...
    Why don't you try tryLock()?
    It is easy, and it will tell you on the spot if its a mutex deadlock that you have...

  9. #9
    Join Date
    Mar 2007
    Posts
    8
    Qt products
    Qt4
    Platforms
    Unix/X11

    Default Re: Reading from sockets in a multithreaded program

    Quote Originally Posted by high_flyer View Post
    Why don't you try tryLock()?
    I did try this, but still my problem remains.

    I've stripped my code even further. Upon reception of a packet it is deleted right away (it is not placed in a mutex guarded QQueue, no semaphore release, no semaphore acquire): just reception of data and deletion. The creation of the QTcpSocket is done in the run() function. My problem remains...

    It is like the waitForReadyRead(-1) sometimes doesn't wake up even when data is present at the socket.

  10. #10
    Join Date
    Aug 2006
    Posts
    83

    Default Re: Reading from sockets in a multithreaded program

    I have a problem with using Qt networking. I want to establish a connection between a server and clients. The client is using a chalkboard and is using server to send coordinates of the lines to draw to them. For the drawing part I am using Irrlecht (because the program is not going to be the chalboard (this is just for testing), but a real 3d environment.

    I have problem getting the communication to work. I would really appreciate if someone could look at the code (I attached) and please give me some clue.

    On the server side I have:
    Qt Code:
    1. Server::Server(QObject *parent) : QTcpServer(parent)
    2. {
    3. /* maximum number of players */
    4. setMaxPendingConnections(8);
    5. m_connectedClients=-1; // none connected clients
    6. connect(this,SIGNAL(newConnection()),SLOT(newConnections()));
    7.  
    8. }
    9. void Server::send(int socket,int x1, int y1, int x2, int y2)
    10. {
    11. // I use qhash to keep information about all clients and socketDescriptors
    12. QHashIterator<int, Server_thread*> i(m_clients);
    13. while (i.hasNext()) {
    14. i.next();
    15. if(i.key() != socket) // don't send this to the client which send the info
    16. i.value()->send(x1,y1,x2,y2);
    17. }
    18. }
    19.  
    20. /* accept incoming connections */
    21. void Server::newConnections()
    22. {
    23. while(hasPendingConnections()) {
    24. QTcpSocket *socket = nextPendingConnection();
    25. Server_thread *thread = new Server_thread(socket, this);
    26. // add the client and socketDescriptor
    27. m_clients[socket->socketDescriptor()] = thread;
    28. /* delete the thread that has finished */
    29. connect(thread, SIGNAL(finished()), thread, SLOT(deleteLater()));
    30. thread->start();
    31. }
    32. }
    To copy to clipboard, switch view to plain text mode 
    The thread for clients...
    Qt Code:
    1. Server_thread::Server_thread(QTcpSocket *socket, Server *parent)
    2. : QThread(parent)
    3. {
    4. m_parent = parent;
    5. m_socket = socket;
    6. connect(this,SIGNAL(address(QString)),this,SLOT(new_connection(QString)));
    7. connect(m_socket,SIGNAL(readyRead()),this,SLOT(read()));
    8. m_socketDescriptor = m_socket->socketDescriptor();
    9.  
    10. emit address(m_socket->peerAddress().toString());
    11.  
    12. /* number of clients++ */
    13. m_parent->incNrClients();
    14.  
    15. nextBlockSize=0;
    16. }
    17.  
    18.  
    19. void Server_thread::run()
    20. {
    21. // just to keep the thread active...
    22. while(m_socket && m_socket->state() == QAbstractSocket::ConnectedState) {
    23.  
    24. }
    25.  
    26. m_socket->disconnectFromHost();
    27. m_socket->waitForDisconnected(1000);
    28. }
    29.  
    30. void Server_thread::read()
    31. {
    32. QDataStream in(m_socket);
    33. in.setVersion(QDataStream::Qt_4_2);
    34. /* read data */
    35. if(nextBlockSize==0) {
    36. if(m_socket->bytesAvailable() < sizeof(quint16))
    37. return;
    38.  
    39. in >> nextBlockSize;
    40. }
    41. if(m_socket->bytesAvailable() < nextBlockSize)
    42. return;
    43.  
    44. QString koordinate;
    45. int x1,y1,x2,y2;
    46.  
    47. in >> koordinate;
    48.  
    49. sscanf(koordinate.toStdString().c_str(),"%d %d %d %d",&x1,&y1,&x2,&y2);
    50.  
    51. m_parent->send(m_socketDescriptor,x1,y1,x2,y2);
    52. }
    53.  
    54. void Server_thread::send(int x1, int y1, int x2, int y2)
    55. {
    56. QString string;
    57. string += QString(to_string<int>(x1).c_str());
    58. string += " ";
    59. string += QString(to_string<int>(y1).c_str());
    60. string += " ";
    61. string += QString(to_string<int>(x2).c_str());
    62. string += " ";
    63. string += QString(to_string<int>(y2).c_str());
    64.  
    65.  
    66. QByteArray block;
    67. QDataStream out(&block, QIODevice::WriteOnly);
    68. out.setVersion(QDataStream::Qt_4_2);
    69. out << (quint16)0;
    70. out << string;
    71. out.device()->seek(0);
    72. out << (quint16)(block.size() - sizeof(quint16));
    73.  
    74. m_socket->write(block);
    75.  
    76. }
    To copy to clipboard, switch view to plain text mode 

    On the client side I have:
    Qt Code:
    1. using namespace irr;
    2. using namespace core;
    3. using namespace scene;
    4. using namespace video;
    5.  
    6. //event listener -> this works for sure
    7. class ChalkboardEventReceiver : public IEventReceiver
    8. {
    9. public:
    10. ChalkboardEventReceiver(Client *c)
    11. : mouseDown(false), connection(NULL)
    12. {
    13. connection = c;
    14. }
    15.  
    16. bool OnEvent(SEvent event)
    17. {
    18. if(event.EventType == EET_MOUSE_INPUT_EVENT) {
    19. if(event.MouseInput.Event == EMIE_LMOUSE_PRESSED_DOWN) {
    20. x = event.MouseInput.X;
    21. y = event.MouseInput.Y;
    22. mouseDown = true;
    23.  
    24. return true;
    25. }
    26. else if(event.MouseInput.Event == EMIE_LMOUSE_LEFT_UP) {
    27. connection->AddLineLocal(x, y, event.MouseInput.X, event.MouseInput.Y);
    28. connection->SendLineToServer(x, y, event.MouseInput.X, event.MouseInput.Y);
    29. mouseDown = false;
    30.  
    31. return true;
    32. }
    33. else if(mouseDown && event.MouseInput.Event == EMIE_MOUSE_MOVED) {
    34. connection->AddLineLocal(x, y, event.MouseInput.X, event.MouseInput.Y);
    35. connection->SendLineToServer(x, y, event.MouseInput.X, event.MouseInput.Y);
    36. x = event.MouseInput.X;
    37. y = event.MouseInput.Y;
    38.  
    39. return true;
    40. }
    41. }
    42.  
    43. return false;
    44. }
    45.  
    46.  
    47. protected:
    48. s32 x, y;
    49. bool mouseDown;
    50. Client *connection;
    51. };
    52.  
    53. int main(int argc, char *argv[])
    54. {
    55. QApplication app(argc, argv);
    56.  
    57. Client client;
    58. ChalkboardEventReceiver receiver(&client);
    59. client.init(&receiver);
    60.  
    61. app.connect(&app, SIGNAL(lastWindowClosed()), &app, SLOT(quit()));
    62. return app.exec();
    63.  
    64. }
    65.  
    66. Client::Client(QWidget *parent) : QWidget(parent)
    67. {
    68. connect(&m_socket, SIGNAL(readyRead()), this, SLOT(HandlePacket()));
    69. connect(&m_socket, SIGNAL(error(QAbstractSocket::SocketError)),
    70. this, SLOT(displayError(QAbstractSocket::SocketError)));
    71. m_socket.connectToHost("127.0.0.1",3210);
    72. nextBlockSize = 0;
    73. // Wait for the init() call
    74. device = 0;
    75. // Default
    76. driverType = irr::video::EDT_OPENGL;
    77. }
    78.  
    79. void Client::SendLineToServer(s32 x1, s32 y1, s32 x2, s32 y2)
    80. {
    81.  
    82. QString string;
    83. string += QString(to_string<int>(x1).c_str());
    84. string += " ";
    85. string += QString(to_string<int>(y1).c_str());
    86. string += " ";
    87. string += QString(to_string<int>(x2).c_str());
    88. string += " ";
    89. string += QString(to_string<int>(y2).c_str());
    90.  
    91. QByteArray block;
    92. QDataStream out(&block, QIODevice::WriteOnly);
    93. out.setVersion(QDataStream::Qt_4_2);
    94. out << (quint16)0;
    95. out << string;
    96. out.device()->seek(0);
    97.  
    98. //calculate the size of the block
    99. out << (quint16)(block.size() - sizeof(quint16));
    100.  
    101. m_socket.write(block);
    102. }
    103.  
    104. void Client::HandlePacket()
    105. {
    106. QDataStream in(&m_socket);
    107. in.setVersion(QDataStream::Qt_4_2);
    108.  
    109. if (nextBlockSize == 0) {
    110. if (m_socket.bytesAvailable() < (int)sizeof(quint16))
    111. return;
    112.  
    113. in >> nextBlockSize;
    114. }
    115.  
    116. if (m_socket.bytesAvailable() < nextBlockSize)
    117. return;
    118.  
    119. QString string;
    120. in >> string;
    121.  
    122. s32 x1,y1,x2,y2;
    123. const char *str = string.toStdString().c_str();
    124. sscanf(str,"%d %d %d %d",&x1,&y1,&x2,&y2);
    125.  
    126. AddLineLocal(x1, y1, x2, y2);
    127. }
    To copy to clipboard, switch view to plain text mode 

    I'm sorry for posting so much code here (I attached the whole source code). I would really appreciate some help.
    Attached Files Attached Files

Similar Threads

  1. QT MySQL
    By sabeeshcs in forum Newbie
    Replies: 6
    Last Post: 12th January 2007, 04:19
  2. Reading from TCP Socket crashes program
    By OnionRingOfDoom in forum Qt Programming
    Replies: 26
    Last Post: 27th January 2006, 19:32

Bookmarks

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •  
Digia, Qt and their respective logos are trademarks of Digia Plc in Finland and/or other countries worldwide.