Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creates ConcurrencyOptions pydantic model. #15291

Merged
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
346bdfb
creates pydantic model and updates deployment orm model
jeanluciano Sep 9, 2024
594e6b2
formatting
jeanluciano Sep 9, 2024
fad0161
moved Options schema
jeanluciano Sep 9, 2024
e12243a
postgress migration location change
jeanluciano Sep 9, 2024
20fda19
import fix
jeanluciano Sep 9, 2024
1de6209
seperates and limit
jeanluciano Sep 11, 2024
32ff726
updates migration notes
jeanluciano Sep 11, 2024
d8f98c3
Merge branch 'main' into jean/oss-66-create-a-concurrency-options-pyd…
jeanluciano Sep 11, 2024
5183872
Add an explicit, client-side GCL upsert for deployment concurrency li…
collincchoy Sep 12, 2024
837d5d5
Fix deployment concurrency limits > 1 never correctly acquire concurr…
collincchoy Sep 12, 2024
9f45162
Fix deployment concurrency limits > 1 never correctly acquire slots f…
collincchoy Sep 12, 2024
0a4940f
Change deployment-concurrency related GCL naming pattern to be more c…
collincchoy Sep 12, 2024
771100a
Revert "Change deployment-concurrency related GCL naming pattern to b…
collincchoy Sep 12, 2024
cd2c759
Tweak tests to cover correct # of slots being acquired and include ma…
collincchoy Sep 12, 2024
9eb04e0
Merge branch 'main' into deployment-concurrency/concurrency-limit-mgmt
collincchoy Sep 12, 2024
d2aeeb0
Reduce network calls when concurrency limit is reached and better han…
collincchoy Sep 12, 2024
1d01d87
Merge branch 'jean/oss-66-create-a-concurrency-options-pydantic-model…
jeanluciano Sep 13, 2024
1ef7d67
seperates pydantic from db concurrency options schema
jeanluciano Sep 13, 2024
43421d9
Merge branch 'main' into jean/oss-66-create-a-concurrency-options-pyd…
jeanluciano Sep 13, 2024
066b655
updated shcema in model
jeanluciano Sep 13, 2024
2af6789
Merge branch 'jean/oss-66-create-a-concurrency-options-pydantic-model…
jeanluciano Sep 13, 2024
d8a74d1
Merge branch 'main' into jean/oss-66-create-a-concurrency-options-pyd…
jeanluciano Sep 13, 2024
9d4fa72
updated schema
jeanluciano Sep 16, 2024
8641a1f
Merge branch 'main' into jean/oss-66-create-a-concurrency-options-pyd…
jeanluciano Sep 16, 2024
b7feac6
split concurrency options and limit fields
jeanluciano Sep 16, 2024
0b33418
Merge branch 'jean/oss-66-create-a-concurrency-options-pydantic-model…
jeanluciano Sep 16, 2024
e0db3a8
Merge branch 'main' into jean/oss-66-create-a-concurrency-options-pyd…
jeanluciano Sep 16, 2024
13e72f1
removed ConcurrencyLimitConfig
jeanluciano Sep 17, 2024
abfcf23
Merge branch 'jean/oss-66-create-a-concurrency-options-pydantic-model…
jeanluciano Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions docs/3.0/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -14593,6 +14593,15 @@
"title": "ConcurrencyLimitCreate",
"description": "Data used by the Prefect REST API to create a concurrency limit."
},
"ConcurrencyLimitStrategy": {
"type": "string",
"enum": [
"ENQUEUE",
"CANCEL_NEW"
],
"title": "ConcurrencyLimitStrategy",
"description": "Enumeration of concurrency collision strategies."
},
"ConcurrencyLimitV2": {
"properties": {
"id": {
Expand Down Expand Up @@ -14804,6 +14813,19 @@
"title": "ConcurrencyLimitV2Update",
"description": "Data used by the Prefect REST API to update a v2 concurrency limit."
},
"ConcurrencyOptions": {
"properties": {
"collision_strategy": {
"$ref": "#/components/schemas/ConcurrencyLimitStrategy"
}
},
"type": "object",
"required": [
"collision_strategy"
],
"title": "ConcurrencyOptions",
"description": "Class for storing the concurrency config in database."
},
"Constant": {
"properties": {
"input_type": {
Expand Down Expand Up @@ -16401,6 +16423,17 @@
"title": "Concurrency Limit",
"description": "The deployment's concurrency limit."
},
"concurrency_options": {
"anyOf": [
{
"$ref": "#/components/schemas/ConcurrencyOptions"
},
{
"type": "null"
}
],
"description": "The deployment's concurrency options."
},
"parameters": {
"anyOf": [
{
Expand Down
5 changes: 5 additions & 0 deletions src/prefect/server/database/migrations/MIGRATION-NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ Each time a database migration is written, an entry is included here with:

This gives us a history of changes and will create merge conflicts if two migrations are made at once, flagging situations where a branch needs to be updated before merging.

# Adds `concurrency_options` to `Deployments`

SQLite: `7d6350aea855`
Postgres: `555ed31b284d`

# Add `concurrency_limit` to `Deployments`
SQLite: `f93e1439f022`
Postgres:`97429116795e`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""add concurrency options

Revision ID: 555ed31b284d
Revises: 97429116795e
Create Date: 2024-09-11 09:03:17.744587

"""
import sqlalchemy as sa
from alembic import op

import prefect
from prefect.server.schemas.core import ConcurrencyOptions

# revision identifiers, used by Alembic.
revision = "555ed31b284d"
down_revision = "97429116795e"
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"deployment",
sa.Column(
"concurrency_options",
prefect.server.utilities.database.Pydantic(ConcurrencyOptions),
nullable=True,
),
)


def downgrade():
op.drop_column("deployment", "concurrency_options")
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""add concurrency options

Revision ID: 7d6350aea855
Revises: f93e1439f022
Create Date: 2024-09-11 09:01:06.678866

"""
import sqlalchemy as sa
from alembic import op

import prefect
from prefect.server.schemas.core import ConcurrencyOptions

# revision identifiers, used by Alembic.
revision = "7d6350aea855"
down_revision = "f93e1439f022"
branch_labels = None
depends_on = None


def upgrade():
with op.batch_alter_table("deployment", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"concurrency_options",
prefect.server.utilities.database.Pydantic(ConcurrencyOptions),
nullable=True,
)
)


def downgrade():
with op.batch_alter_table("deployment", schema=None) as batch_op:
batch_op.drop_column("concurrency_options")
13 changes: 12 additions & 1 deletion src/prefect/server/database/orm_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,18 @@ def job_variables(self):
)

concurrency_limit: Mapped[Union[int, None]] = mapped_column(
sa.Integer, default=None, nullable=True
sa.Integer,
server_default=None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did this server default get added w/o a db migration?

or well - why did this get added?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. TIL alembic doesn't pick up server_default changes by default so I guess this doesn't matter but I think this line isn't doing anything as the DDL has already been generated and the updated DDL to alter the column w/ a db-server-side default isn't tracked via migrations.

nullable=True,
default=None,
)
concurrency_options: Mapped[
Union[schemas.core.ConcurrencyOptions, None]
] = mapped_column(
Pydantic(schemas.core.ConcurrencyOptions),
server_default=None,
nullable=True,
default=None,
)
tags: Mapped[List[str]] = mapped_column(
JSON, server_default="[]", default=list, nullable=False
Expand Down
15 changes: 14 additions & 1 deletion src/prefect/server/models/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ async def create_deployment(
insert_values = deployment.model_dump_for_orm(
exclude_unset=True, exclude={"schedules"}
)
if isinstance(deployment.concurrency_limit, schemas.core.ConcurrencyLimitConfig):
concurrency_options = insert_values.pop("concurrency_limit")
insert_values["concurrency_options"] = {
"collision_strategy": concurrency_options.collision_strategy,
}
insert_values["concurrency_limit"] = concurrency_options.concurrency

# The job_variables field in client and server schemas is named
# infra_overrides in the database.
Expand All @@ -94,7 +100,14 @@ async def create_deployment(

conflict_update_fields = deployment.model_dump_for_orm(
exclude_unset=True,
exclude={"id", "created", "created_by", "schedules", "job_variables"},
exclude={
"id",
"created",
"created_by",
"schedules",
"job_variables",
"concurrency_limit",
},
)
if job_variables:
conflict_update_fields["infra_overrides"] = job_variables
Expand Down
3 changes: 3 additions & 0 deletions src/prefect/server/schemas/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ def remove_old_fields(cls, values):
concurrency_limit: Optional[PositiveInteger] = Field(
default=None, description="The deployment's concurrency limit."
)
concurrency_options: Optional[schemas.core.ConcurrencyOptions] = Field(
default=None, description="The deployment's concurrency options."
)
parameters: Optional[Dict[str, Any]] = Field(
default=None,
description="Parameters for flow runs scheduled by the deployment.",
Expand Down
39 changes: 35 additions & 4 deletions src/prefect/server/schemas/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@
PositiveInteger,
StrictVariableValue,
)
from prefect.utilities.collections import dict_to_flatdict, flatdict_to_dict, listrepr
from prefect.utilities.collections import (
AutoEnum,
dict_to_flatdict,
flatdict_to_dict,
listrepr,
)
from prefect.utilities.names import generate_slug, obfuscate

if TYPE_CHECKING:
Expand Down Expand Up @@ -146,6 +151,32 @@ class UpdatedBy(BaseModel):
)


class ConcurrencyLimitStrategy(AutoEnum):
"""
Enumeration of concurrency collision strategies.
"""

ENQUEUE = AutoEnum.auto()
CANCEL_NEW = AutoEnum.auto()


class ConcurrencyLimitConfig(BaseModel):
"""
Options for configuring deployment concurrency limits.
"""

concurrency: int
collision_strategy: ConcurrencyLimitStrategy


class ConcurrencyOptions(BaseModel):
"""
Class for storing the concurrency config in database.
"""

collision_strategy: ConcurrencyLimitStrategy


class FlowRun(ORMBaseModel):
"""An ORM representation of flow run data."""

Expand Down Expand Up @@ -546,9 +577,9 @@ class Deployment(ORMBaseModel):
schedules: List[DeploymentSchedule] = Field(
default_factory=list, description="A list of schedules for the deployment."
)
concurrency_limit: Optional[PositiveInteger] = Field(
default=None, description="The concurrency limit for the deployment."
)
concurrency_limit: Optional[
Union[NonNegativeInteger, ConcurrencyLimitConfig]
] = Field(default=None, description="The concurrency limit for the deployment.")
jeanluciano marked this conversation as resolved.
Show resolved Hide resolved
job_variables: Dict[str, Any] = Field(
default_factory=dict,
description="Overrides to apply to flow run infrastructure at runtime.",
Expand Down
40 changes: 40 additions & 0 deletions tests/server/models/test_deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,25 @@ async def test_create_deployment_with_updated_by(self, session, flow):
)
assert updated_deployment.updated_by.type == new_updated_by.type

async def test_create_deployment_with_concurrency_limit(self, session, flow):
concurrency_options = schemas.core.ConcurrencyLimitConfig(
concurrency=10,
collision_strategy="ENQUEUE",
)
deployment = await models.deployments.create_deployment(
session=session,
deployment=schemas.core.Deployment(
name="My Deployment",
flow_id=flow.id,
concurrency_limit=concurrency_options,
),
)
assert deployment.concurrency_limit == concurrency_options.concurrency
assert (
deployment.concurrency_options.collision_strategy
== concurrency_options.collision_strategy
)


class TestReadDeployment:
async def test_read_deployment(self, session, flow, flow_function):
Expand Down Expand Up @@ -1068,6 +1087,27 @@ async def test_updating_deployment_does_not_duplicate_work_queue(
assert wq is not None
assert wq.work_pool == work_pool

async def test_update_deployment_with_concurrency_limit(
self,
session,
deployment,
):
await models.deployments.update_deployment(
session=session,
deployment_id=deployment.id,
deployment=schemas.actions.DeploymentUpdate(
concurrency_limit=42,
concurrency_options=schemas.core.ConcurrencyOptions(
collision_strategy="CANCEL_NEW"
),
),
)
updated_deployment = await models.deployments.read_deployment(
session=session, deployment_id=deployment.id
)
assert updated_deployment.concurrency_limit == 42
assert updated_deployment.concurrency_options.collision_strategy == "CANCEL_NEW"


@pytest.fixture
async def deployment_schedules(
Expand Down