Skip to content

[Feature][api][python] Introduce Python API for Workflow, Event and Action #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

wenjin272
Copy link
Contributor

@wenjin272 wenjin272 commented Jun 19, 2025

Linked issue: #15

This PR is based on the previous PR #6 , and the commit from [[Feature][api][python]] is the code of this PR.

This PR import Workflow which users can implement it to design their own workflow logic, WorkflowRunner & WorkflowRunnerContext for workflow execution, and other utils for converting user defined workflow to workflow plan.

And I implement LocalRunner and LocalRunnerContext for giving an example for WorkflowRunner and WorkflowRunnerContext, together with an example of how to run user defined workflow.

@wenjin272 wenjin272 changed the title Init workflow api [Feature][api][python] Introduce Python API for Workflow, Event and Action Jun 19, 2025
@action(InputEvent)
@staticmethod
def first_action(event: Event, ctx: WorkflowRunnerContext): #noqa D102
event.input += " first_action"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a good example. Events received by an action should be immutable, and we should not encourage users to modify them.

Comment on lines +27 to +31
if __name__ == "__main__":
workflow = MyWorkflow()
session_id = workflow.run(input="input", runner='LocalRunner')
for output in workflow.get_outputs(session_id):
print(output)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We should not have run and get_outputs on a workflow, because they only make sense in local runner. It would be problematic if user calls them when executing in Flink runner.
  2. We probably should replace the concept of session_id with key.
  3. If workflow.run is only for local runner, we should not require user to specify the runner='LocalRunner'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest something like:

in_q = Queue()
out_q = (FlinkAgentsExecutionEnvironment
         .from_queue(in_q)
         .apply(workflow)
         .to_queue()
)

in_q.put(("intput1", "key"))
in_q.put("input2") # automatically generate new unique key

while not out_q.empty():
    (key, output) = out_q.get()
    print(key, output)

Internal workflow runner instance used to execute the workflow.
"""

__runner: WorkflowRunner = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workflow should not be aware of the runner.

from uuid import UUID


class WorkflowRunner(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workflow runner should not be a public API. We don't want users to call it or implement it.

from flink_agents.api.event import Event


class WorkflowRunnerContext(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just call this RunnerContext.

Comment on lines +25 to +47
@abstractmethod
def add_output(self, output: Any) -> None:
"""Add an output generate by workflow execution to the context.

Parameters
----------
output : Any
The output to be added to the queue.
"""

@abstractmethod
def get_outputs(self) -> deque[Any]:
"""Get outputs stored in this context.

Returns:
-------
deque[Any]
The outputs generated by workflow execution on this context.
"""

@abstractmethod
def clear_output(self) -> None:
"""Clear outputs stored in this context."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need these in the user-facing API?

from flink_agents.api.event import Event


#TODO: implement Closure to support access self in action function, like llama-index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean?

from flink_agents.plan.function import PythonFunction


def get_actions(workflow: Workflow) -> List[Action]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why introducing this util file?


class MyWorkflow(Workflow): #noqa D101
@action(InputEvent)
@staticmethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to get rid of this @staticmethod?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants