@@ -780,6 +780,7 @@ def __init__(self, *,
780
780
self .socket : zmq .Socket | zmq .asyncio .Socket
781
781
self .poller : zmq .Poller | zmq .asyncio .Poller
782
782
self ._poll_timeout = kwargs .get ('poll_timeout' , 1000 ) # default to 1000 ms
783
+ self ._stop = False # in general, stop any loop with this variaböe
783
784
784
785
@property
785
786
def poll_timeout (self ) -> int :
@@ -849,6 +850,13 @@ def handled_default_message_types(self, response_message: RequestMessage) -> boo
849
850
if response_message .type == HANDSHAKE :
850
851
return True
851
852
return False
853
+
854
+
855
+ def stop (self ) -> None :
856
+ """
857
+ stop the client.
858
+ """
859
+ self ._stop = True
852
860
853
861
854
862
@@ -955,7 +963,8 @@ def recv_response(self, message_id: bytes) -> ResponseMessage:
955
963
if True, any exceptions raised during execution inside ``Thing`` instance will be raised on the client.
956
964
See docs of ``raise_local_exception()`` for info on exception
957
965
"""
958
- while True :
966
+ self ._stop = False
967
+ while not self ._stop :
959
968
if message_id in self ._response_cache :
960
969
return self ._response_cache .pop (message_id )
961
970
sockets = self .poller .poll (self .poll_timeout )
@@ -1027,8 +1036,9 @@ def handshake(self, timeout: typing.Union[float, int] = 60000) -> None:
1027
1036
"""
1028
1037
hanshake with server before sending first message
1029
1038
"""
1039
+ self ._stop = False
1030
1040
start_time = time .time_ns ()
1031
- while True :
1041
+ while not self . _stop :
1032
1042
if timeout is not None and (time .time_ns () - start_time )/ 1e6 > timeout :
1033
1043
raise ConnectionError (f"Unable to contact server '{ self .server_id } ' from client '{ self .id } '" )
1034
1044
self .socket .send_multipart (RequestMessage .craft_with_message_type (self .id , self .server_id , HANDSHAKE ).byte_array )
@@ -1123,11 +1133,12 @@ async def _handshake(self, timeout: float | int | None = 60000) -> None:
1123
1133
"""
1124
1134
hanshake with server before sending first message
1125
1135
"""
1136
+ self ._stop = False
1126
1137
if self ._monitor_socket is not None and self ._monitor_socket in self .poller :
1127
1138
self .poller .unregister (self ._monitor_socket )
1128
1139
self ._handshake_event .clear ()
1129
1140
start_time = time .time_ns ()
1130
- while True :
1141
+ while not self . _stop :
1131
1142
if timeout is not None and (time .time_ns () - start_time )/ 1e6 > timeout :
1132
1143
raise ConnectionError (f"Unable to contact server '{ self .server_id } ' from client '{ self .id } '" )
1133
1144
await self .socket .send_multipart (RequestMessage .craft_with_message_type (self .id , self .server_id , HANDSHAKE ).byte_array )
@@ -1237,7 +1248,8 @@ async def async_recv_response(self, message_id: str) -> typing.List[ResponseMess
1237
1248
deserialize_response: bool
1238
1249
deserializes the data field of the message
1239
1250
"""
1240
- while True :
1251
+ self ._stop = False
1252
+ while not self ._stop :
1241
1253
if message_id in self ._response_cache :
1242
1254
return self ._response_cache .pop (message_id )
1243
1255
sockets = await self .poller .poll (self ._poll_timeout )
@@ -1700,6 +1712,8 @@ def stop_polling(self):
1700
1712
stop polling for replies from server
1701
1713
"""
1702
1714
self .stop_poll = True
1715
+ for client in self .pool .values ():
1716
+ client .stop ()
1703
1717
1704
1718
async def async_execute_in_all (self ,
1705
1719
objekt : str ,
0 commit comments