Skip to content

Commit 2a55dc6

Browse files
authored
Merge pull request #465 from DalgoT4D/451-api-to-fetch-sources-in-a-schema
451 api to fetch sources in a schema
2 parents 32e3397 + da8a5b0 commit 2a55dc6

File tree

3 files changed

+63
-7
lines changed

3 files changed

+63
-7
lines changed

ddpui/api/transform_api.py

+27-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from ddpui.utils.custom_logger import CustomLogger
1919

2020
from ddpui.schemas.org_task_schema import DbtProjectSchema
21-
from ddpui.schemas.dbt_workflow_schema import CreateDbtModelPayload
21+
from ddpui.schemas.dbt_workflow_schema import CreateDbtModelPayload, SyncSourcesSchema
2222

2323
from ddpui.core import dbtautomation_service
2424

@@ -121,6 +121,32 @@ def delete_dbt_project(request, project_name: str):
121121
return {"message": f"Project {project_name} deleted successfully"}
122122

123123

124+
@transformapi.post("/sync_sources/", auth=auth.CanManagePipelines())
125+
def sync_sources(request, payload: SyncSourcesSchema):
126+
"""
127+
Sync sources from a given schema.
128+
"""
129+
orguser: OrgUser = request.orguser
130+
org = orguser.org
131+
132+
org_warehouse = OrgWarehouse.objects.filter(org=org).first()
133+
if not org_warehouse:
134+
raise HttpError(status_code=404, detail="Please set up your warehouse first")
135+
136+
orgdbt = OrgDbt.objects.filter(org=org, gitrepo_url=None).first()
137+
if not orgdbt:
138+
raise HttpError(status_code=404, detail="DBT workspace not set up")
139+
140+
sources_file_path, error = dbtautomation_service.sync_sources_to_dbt(
141+
payload.schema_name, payload.source_name, org, org_warehouse
142+
)
143+
144+
if error:
145+
raise HttpError(status_code=422, detail=error)
146+
147+
return {"sources_file_path": str(sources_file_path)}
148+
149+
124150
########################## Models #############################################
125151

126152

ddpui/core/dbtautomation_service.py

+26-5
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
1+
import os
12
from pathlib import Path
2-
from ddpui.models.org import OrgWarehouse, OrgDbt
3-
from ddpui.utils import secretsmanager
4-
from dbt_automation.utils.warehouseclient import get_client
53

6-
# operations
7-
from dbt_automation.operations.flattenjson import flattenjson
84
from dbt_automation.operations.arithmetic import arithmetic
95
from dbt_automation.operations.castdatatypes import cast_datatypes
106
from dbt_automation.operations.coalescecolumns import coalesce_columns
117
from dbt_automation.operations.concatcolumns import concat_columns
128
from dbt_automation.operations.droprenamecolumns import drop_columns, rename_columns
139
from dbt_automation.operations.flattenairbyte import flatten_operation
10+
11+
# operations
12+
from dbt_automation.operations.flattenjson import flattenjson
1413
from dbt_automation.operations.mergetables import union_tables
1514
from dbt_automation.operations.regexextraction import regex_extraction
15+
from dbt_automation.operations.syncsources import sync_sources
16+
from dbt_automation.utils.warehouseclient import get_client
17+
from dbt_automation.utils.dbtproject import dbtProject
1618

19+
from ddpui.models.org import OrgDbt, OrgWarehouse
20+
from ddpui.utils import secretsmanager
1721

1822
OPERATIONS_DICT = {
1923
"flatten": flatten_operation,
@@ -52,3 +56,20 @@ def create_dbt_model_in_project(
5256
)
5357

5458
return str(sql_file_path), None
59+
60+
61+
def sync_sources_to_dbt(
62+
schema_name: str, source_name: str, org: str, org_warehouse: str
63+
):
64+
"""
65+
Sync sources from a given schema to dbt.
66+
"""
67+
warehouse_client = _get_wclient(org_warehouse)
68+
69+
sources_file_path = sync_sources(
70+
config={"source_schema": schema_name, "source_name": source_name},
71+
warehouse=warehouse_client,
72+
dbtproject=dbtProject(Path(os.getenv("CLIENTDBT_ROOT")) / org.slug / "dbtrepo"),
73+
)
74+
75+
return str(sources_file_path), None

ddpui/schemas/dbt_workflow_schema.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from ninja import Schema, Field
1+
from ninja import Field, Schema
22

33

44
class CreateDbtModelPayload(Schema):
@@ -10,3 +10,12 @@ class CreateDbtModelPayload(Schema):
1010
display_name: str
1111
config: dict
1212
op_type: str
13+
14+
15+
class SyncSourcesSchema(Schema):
16+
"""
17+
schema to sync sources from the schema
18+
"""
19+
20+
schema_name: str
21+
source_name: str

0 commit comments

Comments
 (0)