Skip to content

Commit d651a50

Browse files
authored
Merge pull request #41 from VigneshVSV/develop
Bug fix serializer customization for event affordance
2 parents 3d78a7b + b317010 commit d651a50

File tree

9 files changed

+163
-67
lines changed

9 files changed

+163
-67
lines changed

CHANGELOG.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
✓ means ready to try
1111

1212
- cookie auth & its specification in TD (cookie auth branch)
13-
- image event handlers (develop branch) for streaming live video as JPEG and PNG ✓
14-
- pydantic support for property models (develop branch) ✓
13+
- adding custom handlers for each property, action and event to override default behaviour
14+
- pydantic support for property models
15+
16+
## [v0.2.6] - 2024-09-09
17+
18+
- bug fix events when multiple serializers are used
19+
- events support custom HTTP handlers (not polished yet, use as last resort), not yet compatible with TD
20+
- image event handlers for streaming live video as JPEG and PNG, not yet compatible with TD
1521

1622
## [v0.2.5] - 2024-09-09
1723

18-
- released to anaconda
24+
- released to anaconda, it can take a while to turn up. A badge will be added in README when successful.
1925

2026
## [v0.2.4] - 2024-09-09
2127

hololinked/client/proxy.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,7 @@ def load_thing(self):
567567
elif data.what == ResourceTypes.EVENT:
568568
assert isinstance(data, ServerSentEvent)
569569
event = _Event(self.zmq_client, data.name, data.obj_name, data.unique_identifier, data.socket_address,
570-
serializer=self.zmq_client.zmq_serializer, logger=self.logger)
570+
serialization_specific=data.serialization_specific, serializer=self.zmq_client.zmq_serializer, logger=self.logger)
571571
_add_event(self, event, data)
572572
self.__dict__[data.name] = event
573573

@@ -755,17 +755,19 @@ def oneway_set(self, value : typing.Any) -> None:
755755

756756
class _Event:
757757

758-
__slots__ = ['_zmq_client', '_name', '_obj_name', '_unique_identifier', '_socket_address', '_callbacks',
758+
__slots__ = ['_zmq_client', '_name', '_obj_name', '_unique_identifier', '_socket_address', '_callbacks', '_serialization_specific',
759759
'_serializer', '_subscribed', '_thread', '_thread_callbacks', '_event_consumer', '_logger']
760760
# event subscription
761761
# Dont add class doc otherwise __doc__ in slots will conflict with class variable
762762

763763
def __init__(self, client : SyncZMQClient, name : str, obj_name : str, unique_identifier : str, socket : str,
764-
serializer : BaseSerializer = None, logger : logging.Logger = None) -> None:
764+
serialization_specific : bool = False, serializer : BaseSerializer = None, logger : logging.Logger = None) -> None:
765+
self._zmq_client = client
765766
self._name = name
766767
self._obj_name = obj_name
767768
self._unique_identifier = unique_identifier
768769
self._socket_address = socket
770+
self._serialization_specific = serialization_specific
769771
self._callbacks = None
770772
self._serializer = serializer
771773
self._logger = logger
@@ -781,9 +783,11 @@ def add_callbacks(self, callbacks : typing.Union[typing.List[typing.Callable], t
781783

782784
def subscribe(self, callbacks : typing.Union[typing.List[typing.Callable], typing.Callable],
783785
thread_callbacks : bool = False):
784-
self._event_consumer = EventConsumer(self._unique_identifier, self._socket_address,
785-
f"{self._name}|RPCEvent|{uuid.uuid4()}", b'PROXY',
786-
zmq_serializer=self._serializer, logger=self._logger)
786+
self._event_consumer = EventConsumer(
787+
'zmq-' + self._unique_identifier if self._serialization_specific else self._unique_identifier,
788+
self._socket_address, f"{self._name}|RPCEvent|{uuid.uuid4()}", b'PROXY',
789+
zmq_serializer=self._serializer, logger=self._logger
790+
)
787791
self.add_callbacks(callbacks)
788792
self._subscribed = True
789793
self._thread_callbacks = thread_callbacks

hololinked/server/HTTPServer.py

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
from dataclasses import dataclass
23
import zmq
34
import zmq.asyncio
45
import logging
@@ -14,19 +15,34 @@
1415
from ..param import Parameterized
1516
from ..param.parameters import (Integer, IPAddress, ClassSelector, Selector, TypedList, String)
1617
from .constants import ZMQ_PROTOCOLS, CommonRPC, HTTPServerTypes, ResourceTypes, ServerMessage
17-
from .utils import get_IP_from_interface
18+
from .utils import get_IP_from_interface, issubklass
1819
from .dataklasses import HTTPResource, ServerSentEvent
1920
from .utils import get_default_logger
2021
from .serializers import JSONSerializer
2122
from .database import ThingInformation
2223
from .zmq_message_brokers import AsyncZMQClient, MessageMappedZMQClientPool
2324
from .handlers import RPCHandler, BaseHandler, EventHandler, ThingsHandler, StopHandler
2425
from .schema_validators import BaseSchemaValidator, JsonSchemaValidator
26+
from .events import Event
2527
from .eventloop import EventLoop
2628
from .config import global_config
2729

2830

2931

32+
33+
@dataclass
34+
class InteractionAffordance:
35+
URL_path : str
36+
obj : Event # typing.Union[Property, Action, Event]
37+
http_methods : typing.Tuple[str, typing.Optional[str], typing.Optional[str]]
38+
handler : BaseHandler
39+
kwargs : dict
40+
41+
def __eq__(self, other : "InteractionAffordance") -> bool:
42+
return self.obj == other.obj
43+
44+
45+
3046
class HTTPServer(Parameterized):
3147
"""
3248
HTTP(s) server to route requests to ``Thing``.
@@ -63,7 +79,7 @@ class HTTPServer(Parameterized):
6379
Unlike pure CORS, the server resource is not even executed if the client is not
6480
an allowed client. if None any client is served.""")
6581
host = String(default=None, allow_None=True,
66-
doc="Host Server to subscribe to coordinate starting sequence of remote objects & web GUI" ) # type: str
82+
doc="Host Server to subscribe to coordinate starting sequence of things & web GUI" ) # type: str
6783
# network_interface = String(default='Ethernet',
6884
# doc="Currently there is no logic to detect the IP addresss (as externally visible) correctly, \
6985
# therefore please send the network interface name to retrieve the IP. If a DNS server is present, \
@@ -138,6 +154,7 @@ def __init__(self, things : typing.List[str], *, port : int = 8080, address : st
138154
self._zmq_protocol = ZMQ_PROTOCOLS.IPC
139155
self._zmq_inproc_socket_context = None
140156
self._zmq_inproc_event_context = None
157+
self._local_rules = dict() # type: typing.Dict[str, typing.List[InteractionAffordance]]
141158

142159
@property
143160
def all_ok(self) -> bool:
@@ -147,6 +164,9 @@ def all_ok(self) -> bool:
147164
f"{self.address}:{self.port}"),
148165
self.log_level)
149166

167+
if self._zmq_protocol == ZMQ_PROTOCOLS.INPROC and (self._zmq_inproc_socket_context is None or self._zmq_inproc_event_context is None):
168+
raise ValueError("Inproc socket context is not provided. Logic Error.")
169+
150170
self.app = Application(handlers=[
151171
(r'/remote-objects', ThingsHandler, dict(request_handler=self.request_handler,
152172
event_handler=self.event_handler)),
@@ -250,7 +270,7 @@ async def update_router_with_thing(self, client : AsyncZMQClient):
250270
# Just to avoid duplication of this call as we proceed at single client level and not message mapped level
251271
return
252272
self._lost_things[client.instance_name] = client
253-
self.logger.info(f"attempting to update router with remote object {client.instance_name}.")
273+
self.logger.info(f"attempting to update router with thing {client.instance_name}.")
254274
while True:
255275
try:
256276
await client.handshake_complete()
@@ -272,7 +292,13 @@ async def update_router_with_thing(self, client : AsyncZMQClient):
272292
)))
273293
elif http_resource["what"] == ResourceTypes.EVENT:
274294
resource = ServerSentEvent(**http_resource)
275-
handlers.append((instruction, self.event_handler, dict(
295+
if resource.class_name in self._local_rules and any(ia.obj._obj_name == resource.obj_name for ia in self._local_rules[resource.class_name]):
296+
for ia in self._local_rules[resource.class_name]:
297+
if ia.obj._obj_name == resource.obj_name:
298+
handlers.append((f'/{client.instance_name}{ia.URL_path}', ia.handler, dict(resource=resource, validator=None,
299+
owner=self, **ia.kwargs)))
300+
else:
301+
handlers.append((instruction, self.event_handler, dict(
276302
resource=resource,
277303
validator=None,
278304
owner=self
@@ -306,10 +332,11 @@ def __init__(
306332
to make RPCHandler work
307333
"""
308334
self.app.wildcard_router.add_rules(handlers)
309-
self.logger.info(f"updated router with remote object {client.instance_name}.")
335+
self.logger.info(f"updated router with thing {client.instance_name}.")
310336
break
311337
except Exception as ex:
312-
self.logger.error(f"error while trying to update router with remote object - {str(ex)}. " +
338+
print("error", ex)
339+
self.logger.error(f"error while trying to update router with thing - {str(ex)}. " +
313340
"Trying again in 5 seconds")
314341
await asyncio.sleep(5)
315342

@@ -328,10 +355,39 @@ def __init__(
328355
raise_client_side_exception=True
329356
)
330357
except Exception as ex:
331-
self.logger.error(f"error while trying to update remote object with HTTP server details - {str(ex)}. " +
358+
self.logger.error(f"error while trying to update thing with HTTP server details - {str(ex)}. " +
332359
"Trying again in 5 seconds")
333360
self.zmq_client_pool.poller.register(client.socket, zmq.POLLIN)
334361
self._lost_things.pop(client.instance_name)
362+
363+
364+
def add_event(self, URL_path : str, event : Event, handler : typing.Optional[BaseHandler] = None,
365+
**kwargs) -> None:
366+
"""
367+
Add an event to be served by HTTP server
368+
369+
Parameters
370+
----------
371+
URL_path : str
372+
URL path to access the event
373+
event : Event
374+
Event to be served
375+
handler : BaseHandler, optional
376+
custom handler for the event
377+
kwargs : dict
378+
additional keyword arguments to be passed to the handler's __init__
379+
"""
380+
if not isinstance(event, Event):
381+
raise TypeError("event should be of type Event")
382+
if not issubklass(handler, BaseHandler):
383+
raise TypeError("handler should be subclass of BaseHandler")
384+
if event.owner.__name__ not in self._local_rules:
385+
self._local_rules[event.owner.__name__] = []
386+
obj = InteractionAffordance(URL_path=URL_path, obj=event,
387+
http_methods=('GET',), handler=handler or self.event_handler,
388+
kwargs=kwargs)
389+
if obj not in self._local_rules[event.owner.__name__]:
390+
self._local_rules[event.owner.__name__].append(obj)
335391

336392

337393
__all__ = [

hololinked/server/dataklasses.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ class HTTPResource(SerializableDataclass):
290290
pass the request as a argument to the callable. For HTTP server ``tornado.web.HTTPServerRequest`` will be passed.
291291
"""
292292
what : str
293+
class_name : str # just metadata
293294
instance_name : str
294295
obj_name : str
295296
fullpath : str
@@ -298,10 +299,11 @@ class HTTPResource(SerializableDataclass):
298299
request_as_argument : bool = field(default=False)
299300

300301

301-
def __init__(self, *, what : str, instance_name : str, obj_name : str, fullpath : str,
302+
def __init__(self, *, what : str, class_name : str, instance_name : str, obj_name : str, fullpath : str,
302303
request_as_argument : bool = False, argument_schema : typing.Optional[JSON] = None,
303304
**instructions) -> None:
304305
self.what = what
306+
self.class_name = class_name
305307
self.instance_name = instance_name
306308
self.obj_name = obj_name
307309
self.fullpath = fullpath
@@ -340,6 +342,7 @@ class ZMQResource(SerializableDataclass):
340342
argument schema of the method/action for validation before passing over the instruction to the RPC server.
341343
"""
342344
what : str
345+
class_name : str # just metadata
343346
instance_name : str
344347
instruction : str
345348
obj_name : str
@@ -350,10 +353,11 @@ class ZMQResource(SerializableDataclass):
350353
return_value_schema : typing.Optional[JSON]
351354
request_as_argument : bool = field(default=False)
352355

353-
def __init__(self, *, what : str, instance_name : str, instruction : str, obj_name : str,
356+
def __init__(self, *, what : str, class_name : str, instance_name : str, instruction : str, obj_name : str,
354357
qualname : str, doc : str, top_owner : bool, argument_schema : typing.Optional[JSON] = None,
355358
return_value_schema : typing.Optional[JSON] = None, request_as_argument : bool = False) -> None:
356359
self.what = what
360+
self.class_name = class_name
357361
self.instance_name = instance_name
358362
self.instruction = instruction
359363
self.obj_name = obj_name
@@ -390,7 +394,9 @@ class ServerSentEvent(SerializableDataclass):
390394
"""
391395
name : str = field(default=UNSPECIFIED)
392396
obj_name : str = field(default=UNSPECIFIED)
397+
class_name : str = field(default=UNSPECIFIED) # just metadata
393398
unique_identifier : str = field(default=UNSPECIFIED)
399+
serialization_specific : bool = field(default=False)
394400
socket_address : str = field(default=UNSPECIFIED)
395401
what : str = field(default=ResourceTypes.EVENT)
396402

@@ -404,7 +410,7 @@ def build_our_temp_TD(instance):
404410

405411
assert isinstance(instance, Thing), f"got invalid type {type(instance)}"
406412

407-
our_TD = instance.get_thing_description()
413+
our_TD = instance.get_thing_description(ignore_errors=True)
408414
our_TD["inheritance"] = [class_.__name__ for class_ in instance.__class__.mro()]
409415

410416
for instruction, remote_info in instance.instance_resources.items():
@@ -470,13 +476,15 @@ def get_organised_resources(instance):
470476

471477
httpserver_resources[fullpath] = HTTPResource(
472478
what=ResourceTypes.PROPERTY,
479+
class_name=instance.__class__.__name__,
473480
instance_name=instance._owner.instance_name if instance._owner is not None else instance.instance_name,
474481
obj_name=remote_info.obj_name,
475482
fullpath=fullpath,
476483
**instructions
477484
)
478485
zmq_resources[fullpath] = ZMQResource(
479486
what=ResourceTypes.PROPERTY,
487+
class_name=instance.__class__.__name__,
480488
instance_name=instance._owner.instance_name if instance._owner is not None else instance.instance_name,
481489
instruction=fullpath,
482490
doc=prop.__doc__,
@@ -494,6 +502,8 @@ def get_organised_resources(instance):
494502
assert isinstance(prop._observable_event_descriptor, Event), f"observable event not yet set for {prop.name}. logic error."
495503
evt_fullpath = f"{instance._full_URL_path_prefix}{prop._observable_event_descriptor.URL_path}"
496504
dispatcher = EventDispatcher(evt_fullpath)
505+
dispatcher._remote_info.class_name = instance.__class__.__name__
506+
dispatcher._remote_info.serialization_specific = instance.zmq_serializer != instance.http_serializer
497507
setattr(instance, prop._observable_event_descriptor._obj_name, dispatcher)
498508
# prop._observable_event_descriptor._remote_info.unique_identifier = evt_fullpath
499509
httpserver_resources[evt_fullpath] = dispatcher._remote_info
@@ -515,6 +525,7 @@ def get_organised_resources(instance):
515525
# needs to be cleaned up for multiple HTTP methods
516526
httpserver_resources[instruction] = HTTPResource(
517527
what=ResourceTypes.ACTION,
528+
class_name=instance.__class__.__name__,
518529
instance_name=instance._owner.instance_name if instance._owner is not None else instance.instance_name,
519530
obj_name=remote_info.obj_name,
520531
fullpath=fullpath,
@@ -524,6 +535,7 @@ def get_organised_resources(instance):
524535
)
525536
zmq_resources[instruction] = ZMQResource(
526537
what=ResourceTypes.ACTION,
538+
class_name=instance.__class__.__name__,
527539
instance_name=instance._owner.instance_name if instance._owner is not None else instance.instance_name,
528540
instruction=instruction,
529541
obj_name=getattr(resource, '__name__'),
@@ -545,6 +557,8 @@ def get_organised_resources(instance):
545557
fullpath = f"{instance._full_URL_path_prefix}{resource.URL_path}"
546558
# resource._remote_info.unique_identifier = fullpath
547559
dispatcher = EventDispatcher(fullpath)
560+
dispatcher._remote_info.class_name = instance.__class__.__name__
561+
dispatcher._remote_info.serialization_specific = instance.zmq_serializer != instance.http_serializer
548562
setattr(instance, name, dispatcher) # resource._remote_info.unique_identifier))
549563
httpserver_resources[fullpath] = dispatcher._remote_info
550564
zmq_resources[fullpath] = dispatcher._remote_info

0 commit comments

Comments
 (0)