6
6
# third party
7
7
from faker import Faker
8
8
import pytest
9
- from sim .core import BaseEvent
10
- from sim .core import Simulator
11
- from sim .core import SimulatorContext
12
- from sim .core import sim_activity
13
- from sim .core import sim_entrypoint
14
9
15
10
# syft absolute
16
11
import syft as sy
17
- from syft import test_settings
18
- from syft .client .client import SyftClient
19
12
from syft .service .request .request import RequestStatus
20
13
from syft .util .test_helpers .apis import make_schema
21
14
from syft .util .test_helpers .apis import make_test_query
22
15
from syft .util .test_helpers .worker_helpers import (
23
16
build_and_launch_worker_pool_from_docker_str ,
24
17
)
25
18
19
+ # relative
20
+ from .flows .user_bigquery_api import bq_submit_query
21
+ from .flows .user_bigquery_api import bq_submit_query_results
22
+ from .flows .user_bigquery_api import bq_test_query
23
+ from .sim .core import BaseEvent
24
+ from .sim .core import Simulator
25
+ from .sim .core import SimulatorContext
26
+ from .sim .core import sim_activity
27
+ from .sim .core import sim_entrypoint
28
+
26
29
fake = Faker ()
27
30
NUM_USERS = 3
28
31
NUM_ENDPOINTS = 3 # test_query, submit_query, schema_query
@@ -59,42 +62,13 @@ class Event(BaseEvent):
59
62
# ------------------------------------------------------------------------------------------------
60
63
61
64
62
- def query_sql ():
63
- dataset_2 = test_settings .get ("dataset_2" , default = "dataset_2" )
64
- table_2 = test_settings .get ("table_2" , default = "table_2" )
65
- table_2_col_id = test_settings .get ("table_2_col_id" , default = "table_id" )
66
- table_2_col_score = test_settings .get ("table_2_col_score" , default = "colname" )
67
-
68
- query = f"SELECT { table_2_col_id } , AVG({ table_2_col_score } ) AS average_score \
69
- FROM { dataset_2 } .{ table_2 } \
70
- GROUP BY { table_2_col_id } \
71
- LIMIT 10000"
72
- return query
73
-
74
-
75
- def get_code_from_msg (msg : str ):
76
- return str (msg .split ("`" )[1 ].replace ("()" , "" ).replace ("client." , "" ))
77
-
78
-
79
- # ------------------------------------------------------------------------------------------------
80
-
81
-
82
65
@sim_activity (
83
66
wait_for = Event .ADMIN_LOW_SIDE_ENDPOINTS_AVAILABLE ,
84
67
trigger = Event .USER_CAN_QUERY_TEST_ENDPOINT ,
85
68
)
86
69
async def user_query_test_endpoint (ctx : SimulatorContext , client : sy .DatasiteClient ):
87
70
"""Run query on test endpoint"""
88
-
89
- user = client .logged_in_user
90
-
91
- def _query_endpoint ():
92
- ctx .logger .info (f"User: { user } - Calling client.api.bigquery.test_query (mock)" )
93
- res = client .api .bigquery .test_query (sql_query = query_sql ())
94
- assert len (res ) == 10000
95
- ctx .logger .info (f"User: { user } - Received { len (res )} rows" )
96
-
97
- await asyncio .to_thread (_query_endpoint )
71
+ await asyncio .to_thread (bq_test_query , ctx , client )
98
72
99
73
100
74
@sim_activity (
@@ -103,40 +77,15 @@ def _query_endpoint():
103
77
)
104
78
async def user_bq_submit (ctx : SimulatorContext , client : sy .DatasiteClient ):
105
79
"""Submit query to be run on private data"""
106
- user = client .logged_in_user
107
-
108
- def _submit_endpoint ():
109
- func_name = "invalid_func" if random .random () < 0.5 else "test_query"
110
- ctx .logger .info (
111
- f"User: { user } - Calling client.api.services.bigquery.submit_query func_name={ func_name } "
112
- )
113
-
114
- res = client .api .bigquery .submit_query (
115
- func_name = func_name ,
116
- query = query_sql (),
117
- )
118
- ctx .logger .info (f"User: { user } - Received { res } " )
119
-
120
- await asyncio .to_thread (_submit_endpoint )
80
+ await asyncio .to_thread (bq_submit_query , ctx , client )
121
81
122
82
123
83
@sim_activity (
124
84
wait_for = Event .ADMIN_LOW_ALL_RESULTS_AVAILABLE ,
125
85
trigger = Event .USER_CHECKED_RESULTS ,
126
86
)
127
87
async def user_checks_results (ctx : SimulatorContext , client : sy .DatasiteClient ):
128
- def _check_results ():
129
- for request in client .requests :
130
- if request .get_status () == RequestStatus .APPROVED :
131
- job = request .code (blocking = False )
132
- result = job .wait ()
133
- assert len (result ) == 10000
134
- if request .get_status () == RequestStatus .REJECTED :
135
- ctx .logger .info (
136
- f"User: Request with function named { request .code .service_func_name } was rejected"
137
- )
138
-
139
- await asyncio .to_thread (_check_results )
88
+ await asyncio .to_thread (bq_submit_query_results , ctx , client )
140
89
141
90
142
91
@sim_activity (wait_for = Event .GUEST_USERS_CREATED , trigger = Event .USER_FLOW_COMPLETED )
@@ -148,6 +97,7 @@ async def user_flow(ctx: SimulatorContext, server_url_low: str, user: dict):
148
97
)
149
98
ctx .logger .info (f"User: { client .logged_in_user } - logged in" )
150
99
100
+ # this must be executed sequentially.
151
101
await user_query_test_endpoint (ctx , client )
152
102
await user_bq_submit (ctx , client )
153
103
await user_checks_results (ctx , client )
@@ -158,7 +108,7 @@ async def user_flow(ctx: SimulatorContext, server_url_low: str, user: dict):
158
108
159
109
@sim_activity (trigger = Event .GUEST_USERS_CREATED )
160
110
async def admin_signup_users (
161
- ctx : SimulatorContext , admin_client : SyftClient , users : list [dict ]
111
+ ctx : SimulatorContext , admin_client : sy . DatasiteClient , users : list [dict ]
162
112
):
163
113
for user in users :
164
114
ctx .logger .info (f"Admin low: Creating guest user { user ['email' ]} " )
@@ -173,7 +123,7 @@ async def admin_signup_users(
173
123
@sim_activity (trigger = Event .ADMIN_BQ_SCHEMA_ENDPOINT_CREATED )
174
124
async def admin_endpoint_bq_schema (
175
125
ctx : SimulatorContext ,
176
- admin_client : SyftClient ,
126
+ admin_client : sy . DatasiteClient ,
177
127
worker_pool : str | None = None ,
178
128
):
179
129
path = "bigquery.schema"
@@ -195,7 +145,7 @@ async def admin_endpoint_bq_schema(
195
145
@sim_activity (trigger = Event .ADMIN_BQ_TEST_ENDPOINT_CREATED )
196
146
async def admin_endpoint_bq_test (
197
147
ctx : SimulatorContext ,
198
- admin_client : SyftClient ,
148
+ admin_client : sy . DatasiteClient ,
199
149
worker_pool : str | None = None ,
200
150
):
201
151
path = "bigquery.test_query"
@@ -290,7 +240,7 @@ def execute_query(query: str, endpoint):
290
240
291
241
292
242
@sim_activity (trigger = Event .ADMIN_ALL_ENDPOINTS_CREATED )
293
- async def admin_create_endpoint (ctx : SimulatorContext , admin_client : SyftClient ):
243
+ async def admin_create_endpoint (ctx : SimulatorContext , admin_client : sy . DatasiteClient ):
294
244
worker_pool = "biquery-pool"
295
245
296
246
await asyncio .gather (
@@ -308,7 +258,7 @@ async def admin_create_endpoint(ctx: SimulatorContext, admin_client: SyftClient)
308
258
Event .ADMIN_LOWSIDE_WORKER_POOL_CREATED ,
309
259
]
310
260
)
311
- async def admin_watch_sync (ctx : SimulatorContext , admin_client : SyftClient ):
261
+ async def admin_watch_sync (ctx : SimulatorContext , admin_client : sy . DatasiteClient ):
312
262
while True :
313
263
await asyncio .sleep (random .uniform (5 , 10 ))
314
264
@@ -340,7 +290,7 @@ async def admin_watch_sync(ctx: SimulatorContext, admin_client: SyftClient):
340
290
341
291
342
292
# @sim_activity(trigger=Event.ADMIN_WORKER_POOL_CREATED)
343
- async def admin_create_bq_pool (ctx : SimulatorContext , admin_client : SyftClient ):
293
+ async def admin_create_bq_pool (ctx : SimulatorContext , admin_client : sy . DatasiteClient ):
344
294
worker_pool = "biquery-pool"
345
295
346
296
base_image = admin_client .images .get_all ()[0 ]
@@ -375,12 +325,16 @@ async def admin_create_bq_pool(ctx: SimulatorContext, admin_client: SyftClient):
375
325
376
326
377
327
@sim_activity (trigger = Event .ADMIN_HIGHSIDE_WORKER_POOL_CREATED )
378
- async def admin_create_bq_pool_high (ctx : SimulatorContext , admin_client : SyftClient ):
328
+ async def admin_create_bq_pool_high (
329
+ ctx : SimulatorContext , admin_client : sy .DatasiteClient
330
+ ):
379
331
await admin_create_bq_pool (ctx , admin_client )
380
332
381
333
382
334
@sim_activity (trigger = Event .ADMIN_LOWSIDE_WORKER_POOL_CREATED )
383
- async def admin_create_bq_pool_low (ctx : SimulatorContext , admin_client : SyftClient ):
335
+ async def admin_create_bq_pool_low (
336
+ ctx : SimulatorContext , admin_client : sy .DatasiteClient
337
+ ):
384
338
await admin_create_bq_pool (ctx , admin_client )
385
339
386
340
@@ -391,7 +345,9 @@ async def admin_create_bq_pool_low(ctx: SimulatorContext, admin_client: SyftClie
391
345
],
392
346
trigger = Event .ADMIN_HIGHSIDE_FLOW_COMPLETED ,
393
347
)
394
- async def admin_triage_requests_high (ctx : SimulatorContext , admin_client : SyftClient ):
348
+ async def admin_triage_requests_high (
349
+ ctx : SimulatorContext , admin_client : sy .DatasiteClient
350
+ ):
395
351
while True :
396
352
await asyncio .sleep (random .uniform (5 , 10 ))
397
353
@@ -452,9 +408,7 @@ async def admin_low_side(ctx: SimulatorContext, admin_auth, users):
452
408
453
409
@sim_activity (trigger = Event .ADMIN_SYNC_COMPLETED )
454
410
async def admin_sync_to_low_flow (
455
- ctx : SimulatorContext ,
456
- admin_auth_high ,
457
- admin_auth_low ,
411
+ ctx : SimulatorContext , admin_auth_high : dict , admin_auth_low : dict
458
412
):
459
413
high_client = sy .login (** admin_auth_high )
460
414
ctx .logger .info ("Admin: logged in to high-side" )
@@ -485,7 +439,7 @@ async def admin_sync_to_low_flow(
485
439
486
440
@sim_activity (trigger = Event .ADMIN_SYNC_COMPLETED )
487
441
async def admin_sync_to_high_flow (
488
- ctx : SimulatorContext , admin_auth_high , admin_auth_low
442
+ ctx : SimulatorContext , admin_auth_high : dict , admin_auth_low : dict
489
443
):
490
444
high_client = sy .login (** admin_auth_high )
491
445
ctx .logger .info ("Admin low: logged in to high-side" )
0 commit comments