[PyKDE] Multithreaded GUI programming
Paul Felix
pef at fluent.com
Sun Dec 8 01:38:01 GMT 2002
Hi Mickey,
Does Qt/Embedded include the network module? If so, perhaps this is a
solution:
You were on the right track when thinking about a communication pipe, and
calling select() in an event loop, but maybe you can create a QSocket and
let Qt do that.
Here's a stab at it...
Paul
----------------------------------------------------------------------
#!/usr/bin/env python
import sys, socket, threading, time
from qt import *
from qtnetwork import *
class ThreadEventService:
"""An event service thread that sends an event by writing a char to
a socket. This class is thread-safe, allowing multiple threads
to trigger an event."""
def __init__(self):
self._threadLock = threading.Lock()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.bind(('localhost', 0))
self._conn = None
thread = threading.Thread(target=self._listen)
thread.start()
def getPort(self):
return self._sock.getsockname()[1]
def notify(self):
self._threadLock.acquire()
if self._conn is not None:
self._conn.send('1')
self._threadLock.release()
def _listen(self):
self._threadLock.acquire()
self._sock.listen(1)
self._conn, addr = self._sock.accept()
self._threadLock.release()
class ThreadEventHandler(QObject):
"""An event handler that uses Qsocket to connect to a socket-based event.
service. It gets a signal when the server has sent an event by writing a
char to the socket."""
def __init__(self, eventService):
QObject.__init__(self)
self._qsock = QSocket()
self._qsock.connectToHost('localhost', eventService.getPort())
QObject.connect(self._qsock, SIGNAL("readyRead()"), self._onThreadEvent)
def _onThreadEvent(self):
self._qsock.getch()
print 'Received worker thread event'
def createWorkerThreads(numThreads, eventService):
threads = []
for i in range(numThreads):
thread = threading.Thread(name="Thread %s" % i,
target=workerThreadProc, args=(eventService,))
thread.setDaemon(1)
thread.start()
threads.append(thread)
return threads
def workerThreadProc(eventService):
while 1:
time.sleep(3)
eventService.notify()
if __name__ == "__main__":
app = QApplication(sys.argv)
hello = QPushButton("Hello world!",None)
hello.resize(100,30)
hello.show()
app.setMainWidget(hello)
eventService = ThreadEventService()
eventHandler = ThreadEventHandler(eventService)
threads = createWorkerThreads(3, eventService)
app.exec_loop()
More information about the PyQt
mailing list