Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot pass dict as a parameter 'data' to Completed state on Prefect v3.0.0rc16 #14927

Closed
ihor-ramskyi-globallogic opened this issue Aug 14, 2024 · 9 comments · Fixed by #15506
Labels
bug Something isn't working

Comments

@ihor-ramskyi-globallogic
Copy link

ihor-ramskyi-globallogic commented Aug 14, 2024

Bug summary

I cannot pass dict as parameter 'data' on Completed state on Prefect v3.0.0rc16. This is the code on which it fails:

from prefect import flow, task
from prefect.states import Completed

@task
def task_a():
    return Completed(message='A', data={'A': 1})

@flow
def some_flow():
    a = task_a.submit().result()
    return

if __name__ == '__main__':
    some_flow()

As result of running this code, I get such log and traceback:

logs and traceback
17:28:58.177 | INFO    | prefect.engine - Created flow run 'energetic-coot' for flow 'some-flow'
17:28:58.179 | INFO    | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/7b207023-a0a2-4eda-816b-218ace94fb46
17:28:58.271 | INFO    | Flow run 'energetic-coot' - Submitting task task_a to thread pool executor...
17:28:58.373 | INFO    | Task run 'task_a-0' - Created task run 'task_a-0' for task 'task_a'
17:28:58.458 | ERROR   | Task run 'task_a-0' - Task run failed with exception: TypeError("Can't instantiate abstract class BaseResult with abstract methods create, get") - Retries are exhausted
Traceback (most recent call last):
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 804, in run_context
    yield self
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 1420, in run_task_sync
    engine.call_task_fn(txn)
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 832, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 6, in task_a
    return Completed(message='A', data={'A': 1})
  File "My_Path\lib\site-packages\prefect\states.py", line 556, in Completed
    return cls(type=StateType.COMPLETED, **kwargs)
  File "My_Path\lib\site-packages\pydantic\main.py", line 176, in __init__
    self.__pydantic_validator__.validate_python(data, self_instance=self)
  File "My_Path\lib\site-packages\prefect\results.py", line 405, in __new__
    return super().__new__(cls)
TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get
17:28:58.495 | ERROR   | Task run 'task_a-0' - Finished in state Failed("Task run encountered an exception TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get")
17:28:58.498 | ERROR   | Flow run 'energetic-coot' - Encountered exception during execution: TypeError("Can't instantiate abstract class BaseResult with abstract methods create, get")
Traceback (most recent call last):
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 636, in run_context
    yield self
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 680, in run_flow_sync
    engine.call_flow_fn()
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 659, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
  File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 11, in some_flow
    a = task_a.submit().result()
  File "My_Path\lib\site-packages\prefect\futures.py", line 164, in result
    _result = self._final_state.result(
  File "My_Path\lib\site-packages\prefect\client\schemas\objects.py", line 261, in result
    return get_state_result(
  File "My_Path\lib\site-packages\prefect\states.py", line 68, in get_state_result
    return _get_state_result(
  File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 392, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
  File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 243, in run_coro_as_sync
    return call.result()
  File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
    return self.future.result(timeout=timeout)
  File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 182, in result
    return self.__get_result()
  File "C:\Users\ihor.ramskyi\AppData\Local\Programs\Python\Python310\lib\concurrent\futures\_base.py", line 403, in __get_result
    raise self._exception
  File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
    result = await coro
  File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 225, in coroutine_wrapper
    return await task
  File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 382, in ctx_call
    result = await async_fn(*args, **kwargs)
  File "My_Path\lib\site-packages\prefect\states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 804, in run_context
    yield self
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 1420, in run_task_sync
    engine.call_task_fn(txn)
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 832, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 6, in task_a
    return Completed(message='A', data={'A': 1})
  File "My_Path\lib\site-packages\prefect\states.py", line 556, in Completed
    return cls(type=StateType.COMPLETED, **kwargs)
  File "My_Path\lib\site-packages\pydantic\main.py", line 176, in __init__
    self.__pydantic_validator__.validate_python(data, self_instance=self)
  File "My_Path\lib\site-packages\prefect\results.py", line 405, in __new__
    return super().__new__(cls)
TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get
17:28:58.543 | ERROR   | Flow run 'energetic-coot' - Finished in state Failed("Flow run encountered an exception: TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get")
Traceback (most recent call last):
  File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 15, in <module>
    some_flow()
  File "My_Path\lib\site-packages\prefect\flows.py", line 1322, in __call__
    return run_flow(
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 802, in run_flow
    return run_flow_sync(**kwargs)
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 682, in run_flow_sync
    return engine.state if return_type == "state" else engine.result()
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 246, in result
    raise self._raised
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 636, in run_context
    yield self
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 680, in run_flow_sync
    engine.call_flow_fn()
  File "My_Path\lib\site-packages\prefect\flow_engine.py", line 659, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
  File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 11, in some_flow
    a = task_a.submit().result()
  File "My_Path\lib\site-packages\prefect\futures.py", line 164, in result
    _result = self._final_state.result(
  File "My_Path\lib\site-packages\prefect\client\schemas\objects.py", line 261, in result
    return get_state_result(
  File "My_Path\lib\site-packages\prefect\states.py", line 68, in get_state_result
    return _get_state_result(
  File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 392, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
  File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 243, in run_coro_as_sync
    return call.result()
  File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
    return self.future.result(timeout=timeout)
  File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 182, in result
    return self.__get_result()
  File "C:\Users\ihor.ramskyi\AppData\Local\Programs\Python\Python310\lib\concurrent\futures\_base.py", line 403, in __get_result
    raise self._exception
  File "My_Path\lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
    result = await coro
  File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 225, in coroutine_wrapper
    return await task
  File "My_Path\lib\site-packages\prefect\utilities\asyncutils.py", line 382, in ctx_call
    result = await async_fn(*args, **kwargs)
  File "My_Path\lib\site-packages\prefect\states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 804, in run_context
    yield self
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 1420, in run_task_sync
    engine.call_task_fn(txn)
  File "My_Path\lib\site-packages\prefect\task_engine.py", line 832, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "My_Path\lib\site-packages\prefect\utilities\callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "C:\Users\ihor.ramskyi\PycharmProjects\Prefect_Tutorial\test_flow4.py", line 6, in task_a
    return Completed(message='A', data={'A': 1})
  File "My_Path\lib\site-packages\prefect\states.py", line 556, in Completed
    return cls(type=StateType.COMPLETED, **kwargs)
  File "My_Path\lib\site-packages\pydantic\main.py", line 176, in __init__
    self.__pydantic_validator__.validate_python(data, self_instance=self)
  File "My_Path\lib\site-packages\prefect\results.py", line 405, in __new__
    return super().__new__(cls)
TypeError: Can't instantiate abstract class BaseResult with abstract methods create, get

However, same code works correctly (returning Complete state from both the task and the flow), when I run it on Prefect v2.20.0 or when I run it on Prefect v3.0.0rc16 but pass data=1, data='abcde', or even data=(1, {'A': 1}).

Version info (prefect version output)

Version:             3.0.0rc16
API version:         0.8.4
Python version:      3.10.11
Git commit:          aad66936
Built:               Mon, Aug 12, 2024 3:51 PM
OS/Arch:             win32/AMD64
Profile:             default
Server type:         server
Pydantic version:    2.7.1

Additional context

No response

@ihor-ramskyi-globallogic ihor-ramskyi-globallogic added the bug Something isn't working label Aug 14, 2024
@zzstoatzz
Copy link
Collaborator

thanks for the issue @ihor-ramskyi-globallogic - we'll take a look here

just out of curiosity, what's your use case for passing data to a Completed state? i.e. how did you use data downstream in 2.x?

@zzstoatzz
Copy link
Collaborator

note to self, here's a more concise repro

In [1]: from prefect.states import Completed

In [2]: Completed(data={"A": 1})
...
File ~/github.com/prefecthq/prefect/src/prefect/results.py:407, in BaseResult.__new__(cls, **kwargs)
    405     return super().__new__(subcls)
    406 else:
--> 407     return super().__new__(cls)

TypeError: Can't instantiate abstract class BaseResult without an implementation for abstract methods 'create', 'get'

@cicdw
Copy link
Member

cicdw commented Aug 14, 2024

@zzstoatzz seems like in this case we should dispatch to an UnpersistedResult type in the creation of the BaseResult?

@ihor-ramskyi-globallogic
Copy link
Author

thanks for the issue @ihor-ramskyi-globallogic - we'll take a look here

just out of curiosity, what's your use case for passing data to a Completed state? i.e. how did you use data downstream in 2.x?

I return Failed if task should be failed for any reason - and for unified approach, I use Completed otherwise. So, I still have to return data from the task, and I put it as data of Completed state.

However, I cannot use type dict for data for Failed state anyways... Because it says it cannot resolve dict in exception.

@ihor-ramskyi-globallogic
Copy link
Author

@zzstoatzz sorry to bother you, but is there a plan to merge the fix in the near future?

@zzstoatzz
Copy link
Collaborator

hi @ihor-ramskyi-globallogic - thanks for the bump

i hesitated with that PR because I couldn't reason about the solution, even though it did appear to accomplish the desired outcome. I'll revisit it as soon as I can

@ihor-ramskyi-globallogic
Copy link
Author

Hi again
Thank you for response! However, I have to bother you again - this issue blocks my team from upgrading to Prefect 3 without changing bunch of code. Is there any visible point in time when this gonna be merged?

@cicdw
Copy link
Member

cicdw commented Sep 19, 2024

We just made a lot of updates to our results implementation so we will review @zzstoatzz 's for next week's release. If for some reason we decide it's not a viable solution, we will let you know and give you an update on timing.

@ihor-ramskyi-globallogic
Copy link
Author

@cicdw thanks for the update!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
3 participants