-
Notifications
You must be signed in to change notification settings - Fork 6
[Feature][api][python] Introduce Python API for Workflow, Event and Action #32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
cdaa475
to
af9056f
Compare
@action(InputEvent) | ||
@staticmethod | ||
def first_action(event: Event, ctx: WorkflowRunnerContext): #noqa D102 | ||
event.input += " first_action" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a good example. Events received by an action should be immutable, and we should not encourage users to modify them.
if __name__ == "__main__": | ||
workflow = MyWorkflow() | ||
session_id = workflow.run(input="input", runner='LocalRunner') | ||
for output in workflow.get_outputs(session_id): | ||
print(output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We should not have
run
andget_outputs
on a workflow, because they only make sense in local runner. It would be problematic if user calls them when executing in Flink runner. - We probably should replace the concept of
session_id
withkey
. - If
workflow.run
is only for local runner, we should not require user to specify therunner='LocalRunner'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest something like:
in_q = Queue()
out_q = (FlinkAgentsExecutionEnvironment
.from_queue(in_q)
.apply(workflow)
.to_queue()
)
in_q.put(("intput1", "key"))
in_q.put("input2") # automatically generate new unique key
while not out_q.empty():
(key, output) = out_q.get()
print(key, output)
Internal workflow runner instance used to execute the workflow. | ||
""" | ||
|
||
__runner: WorkflowRunner = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The workflow should not be aware of the runner.
from uuid import UUID | ||
|
||
|
||
class WorkflowRunner(ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The workflow runner should not be a public API. We don't want users to call it or implement it.
from flink_agents.api.event import Event | ||
|
||
|
||
class WorkflowRunnerContext(ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just call this RunnerContext
.
@abstractmethod | ||
def add_output(self, output: Any) -> None: | ||
"""Add an output generate by workflow execution to the context. | ||
|
||
Parameters | ||
---------- | ||
output : Any | ||
The output to be added to the queue. | ||
""" | ||
|
||
@abstractmethod | ||
def get_outputs(self) -> deque[Any]: | ||
"""Get outputs stored in this context. | ||
|
||
Returns: | ||
------- | ||
deque[Any] | ||
The outputs generated by workflow execution on this context. | ||
""" | ||
|
||
@abstractmethod | ||
def clear_output(self) -> None: | ||
"""Clear outputs stored in this context.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need these in the user-facing API?
from flink_agents.api.event import Event | ||
|
||
|
||
#TODO: implement Closure to support access self in action function, like llama-index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this mean?
from flink_agents.plan.function import PythonFunction | ||
|
||
|
||
def get_actions(workflow: Workflow) -> List[Action]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why introducing this util file?
|
||
class MyWorkflow(Workflow): #noqa D101 | ||
@action(InputEvent) | ||
@staticmethod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to get rid of this @staticmethod
?
Linked issue: #15
This PR is based on the previous PR #6 , and the commit from [[Feature][api][python]] is the code of this PR.
This PR import Workflow which users can implement it to design their own workflow logic, WorkflowRunner & WorkflowRunnerContext for workflow execution, and other utils for converting user defined workflow to workflow plan.
And I implement LocalRunner and LocalRunnerContext for giving an example for WorkflowRunner and WorkflowRunnerContext, together with an example of how to run user defined workflow.