Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update dagster version #2401

Merged
merged 6 commits into from
Oct 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,196 changes: 1,684 additions & 1,512 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ boltons = "^24.0.0"
click = "^8.1.7"
clickhouse-connect = "^0.7.16"
cloud-sql-python-connector = { extras = ["pg8000"], version = "^1.6.0" }
dagster = "1.8.6"
dagster = "^1.8.6"
dagster-dbt = "^0.24.0"
dagster-embedded-elt = "^0.24.0"
dagster-gcp = "^0.24.0"
Expand Down Expand Up @@ -54,14 +54,14 @@ sqlalchemy = "^2.0.25"
textual = "^0.52.1"
redis = "^5.0.7"
githubkit = "^0.11.6"
sqlmesh = {extras = ["trino"], version = "^0.125.0"}
sqlmesh = { extras = ["trino"], version = "^0.125.0" }
dagster-duckdb = "^0.24.0"
dagster-duckdb-polars = "^0.24.0"
google-cloud-bigquery-storage = "^2.25.0"
dagster-sqlmesh = "0.2.0.dev3"
google-auth = "^2.34.0"
pillow = "^10.4.0"
dagster-k8s = "0.24.6"
dagster-k8s = "^0.24.6"


[tool.poetry.scripts]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ with source as (

renamed as (
select
{% for column in columns %}
{{ adapter.quote(column) }}{% if not loop.last %},{% endif %}
{% endfor %}
{% for column in columns %}
{{ adapter.quote(column) }}{% if not loop.last %},{% endif %}
{% endfor %}
from source
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ with source as (

renamed as (
select
{% for column in columns %}
{{ adapter.quote(column) }}{% if not loop.last %},{% endif %}
{% endfor %}
{% for column in columns %}
{{ adapter.quote(column) }}{% if not loop.last %},{% endif %}
{% endfor %}
from source
)

Expand Down
14 changes: 10 additions & 4 deletions warehouse/oso_dagster/factories/bq2clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from dataclasses import dataclass, field
from typing import Optional, Sequence, Dict, List, Tuple
from typing import Optional, Sequence, Dict, List, Tuple, cast
from dagster import (
asset,
AssetExecutionContext,
MaterializeResult,
)
from dagster_gcp import BigQueryResource, GCSResource
from google.cloud.bigquery import Client as BQClient
from .common import AssetFactoryResponse, AssetDeps
from .common import AssetFactoryResponse, AssetDeps, GenericAsset
from ..resources import ClickhouseResource
from ..utils.bq import BigQueryTableConfig, export_to_gcs
from ..utils.errors import UnsupportedTableColumn
Expand Down Expand Up @@ -182,7 +182,12 @@ def bq2clickhouse_asset(
# Also ensure that the expected destination exists. Even if we
# will delete this keeps the `OVERWRITE` mode logic simple
create_table(
ch_client, destination_table_name, columns, index, order_by, if_not_exists=True
ch_client,
destination_table_name,
columns,
index,
order_by,
if_not_exists=True,
)
context.log.info(f"Ensured destination table {destination_table_name}")
create_table(ch_client, temp_dest, columns, index, if_not_exists=False)
Expand All @@ -208,4 +213,5 @@ def bq2clickhouse_asset(
}
)

return AssetFactoryResponse([bq2clickhouse_asset])
# https://github.com/opensource-observer/oso/issues/2403
return AssetFactoryResponse([cast(GenericAsset, bq2clickhouse_asset)])
7 changes: 4 additions & 3 deletions warehouse/oso_dagster/factories/bq_dts.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from dataclasses import dataclass, field
from typing import Optional, Sequence
from typing import Optional, Sequence, cast
from dagster import (
asset,
AssetExecutionContext,
MaterializeResult,
)
from dagster_gcp import BigQueryResource
from .common import AssetFactoryResponse
from .common import AssetFactoryResponse, GenericAsset
from ..constants import impersonate_service_account
from ..resources import BigQueryDataTransferResource
from ..utils import (
Expand Down Expand Up @@ -88,4 +88,5 @@ def _bq_dts_asset(
}
)

return AssetFactoryResponse([_bq_dts_asset])
# https://github.com/opensource-observer/oso/issues/2403
return AssetFactoryResponse([cast(GenericAsset, _bq_dts_asset)])
16 changes: 11 additions & 5 deletions warehouse/oso_dagster/factories/gcs.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import re
from typing import Optional, Sequence, Dict
from typing import Optional, Sequence, Dict, cast
from dataclasses import dataclass, field

import arrow
from google.api_core.exceptions import NotFound
from google.cloud.bigquery.job import CopyJobConfig
from dagster import (
AssetsDefinition,
asset,
asset_sensor,
job,
Expand All @@ -21,7 +22,7 @@
)
from dagster_gcp import BigQueryResource, GCSResource

from .common import AssetFactoryResponse
from .common import AssetFactoryResponse, GenericAsset
from ..utils import (
ensure_dataset,
DatasetOptions,
Expand Down Expand Up @@ -239,13 +240,13 @@ def gcs_clean_up_job():
gcs_clean_up_op()

@asset_sensor(
asset_key=gcs_asset.key,
asset_key=cast(AssetsDefinition, gcs_asset).key,
name=f"{config.name}_clean_up_sensor",
job=gcs_clean_up_job,
default_status=DefaultSensorStatus.STOPPED,
)
def gcs_clean_up_sensor(
context: SensorEvaluationContext, gcs: GCSResource, asset_event: EventLogEntry
context: SensorEvaluationContext, _gcs: GCSResource, asset_event: EventLogEntry
):
yield RunRequest(
run_key=context.cursor,
Expand All @@ -258,4 +259,9 @@ def gcs_clean_up_sensor(
),
)

return AssetFactoryResponse([gcs_asset], [gcs_clean_up_sensor], [gcs_clean_up_job])
# https://github.com/opensource-observer/oso/issues/2403
return AssetFactoryResponse(
[cast(GenericAsset, gcs_asset)],
[gcs_clean_up_sensor],
[gcs_clean_up_job],
)