[PyKDE] Multithreaded GUI programming

Paul Felix pef at fluent.com
Sun Dec 8 01:38:01 GMT 2002


Hi Mickey,

Does Qt/Embedded include the network module?  If so, perhaps this is a
solution:

You were on the right track when thinking about a communication pipe, and
calling select() in an event loop, but maybe you can create a QSocket and
let Qt do that.

Here's a stab at it...

Paul

----------------------------------------------------------------------

#!/usr/bin/env python

import sys, socket, threading, time
from qt import *
from qtnetwork import *

class ThreadEventService:
    """An event service thread that sends an event by writing a char to
    a socket.  This class is thread-safe, allowing multiple threads
    to trigger an event."""
    def __init__(self):
        self._threadLock = threading.Lock()
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._sock.bind(('localhost', 0))
        self._conn = None

        thread = threading.Thread(target=self._listen)
        thread.start()

    def getPort(self):
        return self._sock.getsockname()[1]

    def notify(self):
        self._threadLock.acquire()
        if self._conn is not None:
	    self._conn.send('1')
        self._threadLock.release()

    def _listen(self):
        self._threadLock.acquire()
	self._sock.listen(1)
	self._conn, addr = self._sock.accept()
        self._threadLock.release()

class ThreadEventHandler(QObject):
    """An event handler that uses Qsocket to connect to a socket-based event.
    service. It gets a signal when the server has sent an event by writing a
    char to the socket."""
    def __init__(self, eventService):
        QObject.__init__(self)
	self._qsock = QSocket()
	self._qsock.connectToHost('localhost', eventService.getPort())
	QObject.connect(self._qsock, SIGNAL("readyRead()"), self._onThreadEvent)

    def _onThreadEvent(self):
        self._qsock.getch()
        print 'Received worker thread event'

def createWorkerThreads(numThreads, eventService):
    threads = []
    for i in range(numThreads):
	thread = threading.Thread(name="Thread %s" % i,
				target=workerThreadProc, args=(eventService,))
	thread.setDaemon(1)
	thread.start()
	threads.append(thread)
    return threads

def workerThreadProc(eventService):
    while 1:
        time.sleep(3)
        eventService.notify()


if __name__ == "__main__":
    app = QApplication(sys.argv)

    hello = QPushButton("Hello world!",None)
    hello.resize(100,30)
    hello.show()
    app.setMainWidget(hello)

    eventService = ThreadEventService()
    eventHandler = ThreadEventHandler(eventService)
    threads = createWorkerThreads(3, eventService)

    app.exec_loop()




More information about the PyQt mailing list