Skip to content

Commit 7eb9adf

Browse files
committed
[python][plan][api] Init Event, Action, Function and WorkflowPlan.
1 parent a47452d commit 7eb9adf

File tree

8 files changed

+217
-0
lines changed

8 files changed

+217
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ python/flink_agents/flink_agents.egg-info/
77
python/build/
88
python/dist
99
python/.DS_Store
10+
__pycache__

python/flink_agents/api/event.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from abc import ABC
2+
from uuid import UUID, uuid4
3+
4+
from pydantic import BaseModel, Field
5+
6+
7+
class Event(BaseModel, ABC, extra="allow"):
8+
"""Base class for all event types in the system.
9+
10+
Attributes:
11+
----------
12+
id : UUID
13+
Unique identifier for the event, automatically generated using uuid4.
14+
"""
15+
16+
id: UUID = Field(default_factory=uuid4)
17+
18+
19+
class InputEvent(Event):
20+
"""Event generated by the framework, carrying an input data that
21+
arrives at the workflow.
22+
"""
23+
24+
input: str
25+
26+
27+
class OutputEvent(Event):
28+
"""Event representing a result from workflow. By generating an OutputEvent,
29+
actions can emit output data.
30+
31+
Attributes:
32+
----------
33+
isLegal : bool
34+
Is the result legal or not.
35+
result : str
36+
The final result returned by the workflow.
37+
"""
38+
39+
isLegal: bool
40+
result: str

python/flink_agents/plan/action.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from typing import List, Type
2+
3+
from pydantic import BaseModel
4+
from python.flink_agents.api.event import Event
5+
from python.flink_agents.plan.function import Function
6+
7+
8+
class Action(BaseModel):
9+
"""Representation of a workflow action with event listening and function execution.
10+
11+
This class encapsulates a named workflow action that listens for specific event
12+
types and executes an associated function when those events occur.
13+
14+
Attributes:
15+
----------
16+
name : str
17+
Name/identifier of the workflow Action.
18+
exec : Function
19+
To be executed when the Action is triggered.
20+
listen_event_types : List[Type[Event]]
21+
List of event types that will trigger this Action's execution.
22+
"""
23+
24+
name: str
25+
exec: Function
26+
listen_event_types: List[Type[Event]]

python/flink_agents/plan/function.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import importlib
2+
import inspect
3+
from typing import Any, Callable, Dict, Tuple
4+
5+
from pydantic import BaseModel
6+
7+
8+
class Function(BaseModel):
9+
"""Descriptor for a callable function, storing module and qualified name for dynamic
10+
retrieval.
11+
12+
This class allows serialization and lazy loading of functions by storing their
13+
module and
14+
qualified name. The actual callable is loaded on-demand when the instance is called.
15+
16+
Attributes:
17+
----------
18+
module : str
19+
Name of the Python module where the function is defined.
20+
qualname : str
21+
Qualified name of the function (e.g., 'ClassName.method' for class methods).
22+
__func: Callable
23+
Internal cache for the resolved function
24+
"""
25+
26+
module: str
27+
qualname: str
28+
__func: Callable = None
29+
30+
@staticmethod
31+
def from_callable(func: Callable) -> "Function":
32+
"""Create a Function descriptor from an existing callable.
33+
34+
Parameters
35+
----------
36+
func : Callable
37+
The function or method to be wrapped.
38+
39+
Returns:
40+
-------
41+
Function
42+
A Function instance with module and qualname populated based on the input
43+
callable.
44+
"""
45+
return Function(
46+
module=inspect.getmodule(func).__name__,
47+
qualname=func.__qualname__,
48+
__func=func,
49+
)
50+
51+
def __call__(self, *args: Tuple[Any, ...], **kwargs: Dict[str, Any]) -> Any:
52+
"""Execute the stored function with provided arguments.
53+
54+
Lazily loads the function from its module and qualified name if not already
55+
cached.
56+
57+
Parameters
58+
----------
59+
*args : tuple
60+
Positional arguments to pass to the function.
61+
**kwargs : dict
62+
Keyword arguments to pass to the function.
63+
64+
Returns:
65+
-------
66+
Any
67+
The result of calling the resolved function with the provided arguments.
68+
69+
Notes:
70+
-----
71+
If the function is a method (qualified name contains a class reference), it will
72+
resolve the method from the corresponding class.
73+
"""
74+
if self.__func is None:
75+
module = importlib.import_module(self.module)
76+
if "." in self.qualname:
77+
# Handle class methods (e.g., 'ClassName.method')
78+
classname, methodname = self.qualname.rsplit(".", 1)
79+
clazz = getattr(module, classname)
80+
self.__func = getattr(clazz, methodname)
81+
else:
82+
# Handle standalone functions
83+
self.__func = getattr(module, self.qualname)
84+
return self.__func(*args, **kwargs)

python/flink_agents/plan/tests/__init__.py

Whitespace-only changes.
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from python.flink_agents.api.event import InputEvent, OutputEvent
2+
from python.flink_agents.plan.action import Action
3+
from python.flink_agents.plan.function import Function
4+
from python.flink_agents.plan.workflow_plan import WorkflowPlan
5+
6+
7+
def increment(event: InputEvent) -> OutputEvent: # noqa: D103
8+
value = event.input
9+
value = int(value) + 1
10+
return OutputEvent(isLegal=True, result=str(value))
11+
12+
13+
def test_simplest_workflow_plan() -> None: # noqa: D103
14+
INCREMENT_ACTION = Action(
15+
name="increment",
16+
exec=Function.from_callable(increment),
17+
listen_event_types=[InputEvent],
18+
)
19+
actions = {InputEvent: [INCREMENT_ACTION]}
20+
workflow_plan = WorkflowPlan(actions=actions)
21+
22+
input_event = InputEvent(input="1")
23+
input_event_triggered_actions = workflow_plan.get_action(type(input_event))
24+
for action in input_event_triggered_actions:
25+
result_event = action.exec(input_event)
26+
if isinstance(result_event, OutputEvent):
27+
assert result_event.isLegal
28+
print(result_event.result)
29+
assert result_event.result == "2"
30+
31+
32+
if __name__ == "__main__":
33+
test_simplest_workflow_plan()
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from typing import Dict, List, Type
2+
3+
from pydantic import BaseModel
4+
from python.flink_agents.api.event import Event
5+
from python.flink_agents.plan.action import Action
6+
7+
8+
class WorkflowPlan(BaseModel):
9+
"""Workflow plan compiled from user defined workflow.
10+
11+
Attributes:
12+
----------
13+
actions : Dict[Type[Event], List[Action]]
14+
Mapping of event types to the list of Actions that listen to them.
15+
"""
16+
17+
actions: Dict[Type[Event], List[Action]]
18+
19+
def get_action(self, event_type: Type[Event]) -> List[Action]:
20+
"""Get steps that listen to the specified event type.
21+
22+
Parameters
23+
----------
24+
event_type : Type[Event]
25+
The event type to query.
26+
27+
Returns:
28+
-------
29+
list[Action]
30+
List of Actions that will respond to this event type.
31+
"""
32+
return self.actions[event_type]

python/requirements/build_requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
build
16+
pydantic==2.11.4

0 commit comments

Comments
 (0)