Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit edf5653

Browse files
committedFeb 12, 2025·
add pagination during list call to stop the pipelines during pre upgrade job
1 parent 0cbe81c commit edf5653

File tree

5 files changed

+147
-36
lines changed

5 files changed

+147
-36
lines changed
 

‎cdap-client-tests/src/test/java/io/cdap/cdap/client/AbstractClientTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public void setUp() throws Throwable {
102102
.setVerifySSLCert(false)
103103
.setDefaultReadTimeout(60 * 1000)
104104
.setUploadReadTimeout(120 * 1000)
105-
.setConnectionConfig(connectionConfig).build();
105+
.setConnectionConfig(connectionConfig).setAppListPageSize(25).build();
106106
}
107107

108108
protected ClientConfig getClientConfig() {

‎cdap-client-tests/src/test/java/io/cdap/cdap/client/ApplicationClientTestRun.java

+37
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.google.common.collect.ImmutableSet;
2020
import com.google.common.collect.Sets;
21+
import com.google.gson.JsonObject;
2122
import io.cdap.cdap.ConfigTestApp;
2223
import io.cdap.cdap.api.Config;
2324
import io.cdap.cdap.api.artifact.ArtifactSummary;
@@ -163,6 +164,42 @@ public void testAppConfig() throws Exception {
163164
}
164165
}
165166

167+
@Test
168+
public void testPaginatedList() throws Exception {
169+
ApplicationId app = NamespaceId.DEFAULT.app(FakeApp.NAME);
170+
for (int i = 0; i < 30; i++) {
171+
appClient.deploy(NamespaceId.DEFAULT, createAppJarFile(FakeApp.class,
172+
FakeApp.NAME, "1.0.0-SNAPSHOT"));
173+
ApplicationDetail appDetail = appClient.get(app);
174+
app = new ApplicationId(app.getNamespace(), app.getApplication(), appDetail.getAppVersion());
175+
appClient.waitForDeployed(app, 30, TimeUnit.SECONDS);
176+
}
177+
Assert.assertEquals(30, appClient.list(NamespaceId.DEFAULT).size());
178+
179+
int count = 0;
180+
String token = null;
181+
boolean isLastPage = false;
182+
int currentResultSize = 0;
183+
while (!isLastPage) {
184+
JsonObject result = appClient.paginatedList(NamespaceId.DEFAULT, token);
185+
currentResultSize = result.get("applications").getAsJsonArray().size();
186+
count += currentResultSize;
187+
token =
188+
result.get("nextPageToken") == null ? null : result.get("nextPageToken").getAsString();
189+
isLastPage = (token == null);
190+
if (!isLastPage) {
191+
Assert.assertEquals(25, currentResultSize);
192+
}
193+
}
194+
195+
Assert.assertEquals(5, currentResultSize);
196+
Assert.assertEquals(30, count);
197+
198+
appClient.deleteAll(NamespaceId.DEFAULT);
199+
appClient.waitForDeleted(app, 30, TimeUnit.SECONDS);
200+
Assert.assertEquals(0, appClient.list(NamespaceId.DEFAULT).size());
201+
}
202+
166203
@Test
167204
public void testAppUpdate() throws Exception {
168205
String artifactName = "cfg-programs";

‎cdap-client/src/main/java/io/cdap/cdap/client/ApplicationClient.java

+29
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.common.collect.Maps;
2525
import com.google.common.reflect.TypeToken;
2626
import com.google.gson.Gson;
27+
import com.google.gson.JsonObject;
2728
import io.cdap.cdap.api.Config;
2829
import io.cdap.cdap.api.annotation.Beta;
2930
import io.cdap.cdap.api.security.AccessException;
@@ -108,6 +109,34 @@ public List<ApplicationRecord> list(NamespaceId namespace)
108109
}).getResponseObject();
109110
}
110111

112+
/**
113+
* Retrieves a paginated list of applications within the specified namespace.
114+
*
115+
* @param namespace The {@link NamespaceId} representing the namespace from which to list
116+
* applications.
117+
* @param nextPageToken The token for fetching the next page of results.
118+
* @return A {@link JsonObject} containing the paginated list of applications and the next page
119+
* token if available.
120+
* @throws IOException If a network error occurred.
121+
* @throws UnauthenticatedException If the request is not authorized successfully in th gateway
122+
* server
123+
* @throws UnauthorizedException If the caller lacks sufficient permissions.
124+
*/
125+
public JsonObject paginatedList(NamespaceId namespace, String nextPageToken)
126+
throws IOException, UnauthenticatedException, UnauthorizedException {
127+
StringBuilder pathBuilder = new StringBuilder("apps?latestOnly=false&pageSize=").append(
128+
config.getAppListPageSize());
129+
130+
if (nextPageToken != null && !nextPageToken.isEmpty()) {
131+
pathBuilder.append("&pageToken=").append(nextPageToken);
132+
}
133+
HttpResponse response = restClient.execute(HttpMethod.GET,
134+
config.resolveNamespacedURLV3(namespace, pathBuilder.toString()),
135+
config.getAccessToken());
136+
return ObjectResponse.fromJsonBody(response, new TypeToken<JsonObject>() {
137+
}).getResponseObject();
138+
}
139+
111140
/**
112141
* Lists all applications currently deployed, optionally filtering to only include applications
113142
* that use the specified artifact name and version.

‎cdap-client/src/main/java/io/cdap/cdap/client/config/ClientConfig.java

+20-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public class ClientConfig {
4444
private static final int DEFAULT_READ_TIMEOUT = 15000;
4545
private static final int DEFAULT_CONNECT_TIMEOUT = 15000;
4646

47+
private static final int DEFAULT_APP_LIST_PAGE_SIZE = 0;
48+
4749
private static final String DEFAULT_VERSION = Constants.Gateway.API_VERSION_3_TOKEN;
4850

4951
@Nullable
@@ -56,6 +58,7 @@ public class ClientConfig {
5658
private int uploadConnectTimeout;
5759

5860
private int unavailableRetryLimit;
61+
private int appListPageSize;
5962
private String apiVersion;
6063
private Supplier<AccessToken> accessToken;
6164
private Map<String, String> additionalHeaders;
@@ -65,7 +68,7 @@ private ClientConfig(@Nullable ConnectionConfig connectionConfig,
6568
String apiVersion, Supplier<AccessToken> accessToken,
6669
int defaultReadTimeout, int defaultConnectTimeout,
6770
int uploadReadTimeout, int uploadConnectTimeout,
68-
Map<String, String> additionalHeaders) {
71+
Map<String, String> additionalHeaders, int appListPageSize) {
6972
this.connectionConfig = connectionConfig;
7073
this.verifySSLCert = verifySSLCert;
7174
this.apiVersion = apiVersion;
@@ -76,6 +79,7 @@ private ClientConfig(@Nullable ConnectionConfig connectionConfig,
7679
this.uploadReadTimeout = uploadReadTimeout;
7780
this.uploadConnectTimeout = uploadConnectTimeout;
7881
this.additionalHeaders = additionalHeaders;
82+
this.appListPageSize = appListPageSize;
7983
}
8084

8185
public static ClientConfig getDefault() {
@@ -167,6 +171,8 @@ public int getUploadConnectTimeout() {
167171
return uploadConnectTimeout;
168172
}
169173

174+
public int getAppListPageSize() { return appListPageSize; }
175+
170176
public Map<String, String> getAdditionalHeaders() {
171177
return additionalHeaders;
172178
}
@@ -198,6 +204,10 @@ public void setDefaultConnectTimeout(int defaultConnectTimeout) {
198204
this.defaultConnectTimeout = defaultConnectTimeout;
199205
}
200206

207+
public void setAppListPageSize(int appListPageSize) {
208+
this.appListPageSize = appListPageSize;
209+
}
210+
201211
public void setUploadReadTimeout(int uploadReadTimeout) {
202212
this.uploadReadTimeout = uploadReadTimeout;
203213
}
@@ -264,6 +274,7 @@ public static final class Builder {
264274
private int uploadConnectTimeout = DEFAULT_UPLOAD_CONNECT_TIMEOUT;
265275
private int defaultReadTimeout = DEFAULT_READ_TIMEOUT;
266276
private int defaultConnectTimeout = DEFAULT_CONNECT_TIMEOUT;
277+
private int appListPageSize = DEFAULT_APP_LIST_PAGE_SIZE;
267278

268279
private int unavailableRetryLimit = DEFAULT_SERVICE_UNAVAILABLE_RETRY_LIMIT;
269280
private Map<String, String> additionalHeaders = new HashMap<>();
@@ -281,6 +292,7 @@ public Builder(ClientConfig clientConfig) {
281292
this.defaultReadTimeout = clientConfig.defaultReadTimeout;
282293
this.defaultConnectTimeout = clientConfig.defaultConnectTimeout;
283294
this.unavailableRetryLimit = clientConfig.unavailableRetryLimit;
295+
this.appListPageSize = clientConfig.appListPageSize;
284296
}
285297

286298
public Builder setConnectionConfig(ConnectionConfig connectionConfig) {
@@ -313,6 +325,11 @@ public Builder setDefaultConnectTimeout(int defaultConnectTimeout) {
313325
return this;
314326
}
315327

328+
public Builder setAppListPageSize(int appListPageSize) {
329+
this.appListPageSize = appListPageSize;
330+
return this;
331+
}
332+
316333
public Builder setAccessToken(Supplier<AccessToken> accessToken) {
317334
this.accessToken = accessToken;
318335
return this;
@@ -342,7 +359,8 @@ public ClientConfig build() {
342359
return new ClientConfig(connectionConfig, verifySSLCert,
343360
unavailableRetryLimit, apiVersion, accessToken,
344361
defaultReadTimeout, defaultConnectTimeout,
345-
uploadReadTimeout, uploadConnectTimeout, ImmutableMap.copyOf(additionalHeaders));
362+
uploadReadTimeout, uploadConnectTimeout, ImmutableMap.copyOf(additionalHeaders),
363+
appListPageSize);
346364
}
347365
}
348366

‎cdap-master/src/main/java/io/cdap/cdap/master/upgrade/UpgradeJobMain.java

+60-33
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
*/
1616
package io.cdap.cdap.master.upgrade;
1717

18+
import com.google.common.reflect.TypeToken;
19+
import com.google.gson.Gson;
20+
import com.google.gson.JsonObject;
1821
import io.cdap.cdap.api.retry.RetryableException;
1922
import io.cdap.cdap.client.ApplicationClient;
2023
import io.cdap.cdap.client.NamespaceClient;
@@ -39,6 +42,7 @@
3942
import io.cdap.cdap.proto.id.WorkflowId;
4043
import io.cdap.cdap.security.impersonation.SecurityUtil;
4144
import java.io.IOException;
45+
import java.lang.reflect.Type;
4246
import java.util.List;
4347
import java.util.concurrent.TimeUnit;
4448
import java.util.stream.Collectors;
@@ -54,9 +58,12 @@
5458
public class UpgradeJobMain {
5559

5660
private static final int DEFAULT_READ_TIMEOUT_MILLIS = 90 * 1000;
61+
private static final int APP_LIST_PAGE_SIZE = 25;
5762
private static final String SCHEDULED = "SCHEDULED";
5863
private static final Logger LOG = LoggerFactory.getLogger(UpgradeJobMain.class);
5964

65+
private static final Gson GSON = new Gson();
66+
6067
public static void main(String[] args) {
6168
if (args.length != 2) {
6269
throw new RuntimeException(
@@ -72,7 +79,8 @@ public static void main(String[] args) {
7279
ClientConfig.Builder clientConfigBuilder =
7380
ClientConfig.builder()
7481
.setDefaultReadTimeout(DEFAULT_READ_TIMEOUT_MILLIS)
75-
.setConnectionConfig(connectionConfig);
82+
.setConnectionConfig(connectionConfig)
83+
.setAppListPageSize(APP_LIST_PAGE_SIZE);
7684

7785
// If used in proxy mode, attach a user ID header to upgrade jobs.
7886
CConfiguration cConf = CConfiguration.create();
@@ -114,43 +122,62 @@ private static void suspendSchedulesAndStopPipelines(ClientConfig clientConfig)
114122
namespaceIdList.add(NamespaceId.SYSTEM);
115123

116124
for (NamespaceId namespaceId : namespaceIdList) {
117-
for (ApplicationRecord record : applicationClient.list(namespaceId)) {
118-
ApplicationId applicationId =
119-
new ApplicationId(namespaceId.getNamespace(), record.getName(), record.getAppVersion());
120-
LOG.debug("Trying to stop schedule and workflows for application " + applicationId);
121-
List<WorkflowId> workflowIds =
122-
applicationClient.get(applicationId).getPrograms().stream()
123-
.filter(programRecord -> programRecord.getType().equals(ProgramType.WORKFLOW))
124-
.map(programRecord -> new WorkflowId(applicationId, programRecord.getName()))
125-
.collect(Collectors.toList());
126-
for (WorkflowId workflowId : workflowIds) {
127-
List<ScheduleId> scheduleIds =
128-
scheduleClient.listSchedules(workflowId).stream()
129-
.map(scheduleDetail ->
130-
new ScheduleId(namespaceId.getNamespace(), record.getName(),
131-
scheduleDetail.getName()))
132-
.collect(Collectors.toList());
133-
for (ScheduleId scheduleId : scheduleIds) {
134-
if (scheduleClient.getStatus(scheduleId).equals(SCHEDULED)) {
135-
scheduleClient.suspend(scheduleId);
136-
}
137-
}
138-
// Need to stop workflows first or else the program will fail to stop below
139-
if (!programClient.getStatus(workflowId).equals(ProgramStatus.STOPPED.toString())) {
140-
try {
141-
programClient.stop(workflowId);
142-
} catch (BadRequestException e) {
143-
// There might be race condition between checking if the program is in RUNNING state and stopping it.
144-
// This can cause programClient.stop to throw BadRequestException so verifying if the program
145-
// transitioned to stop state since it was checked earlier or not.
125+
String token = null;
126+
boolean isLastPage = false;
127+
while (!isLastPage) {
128+
JsonObject paginatedListResponse = applicationClient.paginatedList(namespaceId, token);
129+
token = paginatedListResponse.get("nextPageToken") == null ? null
130+
: paginatedListResponse.get("nextPageToken").getAsString();
131+
LOG.debug("Called paginated list API and got token: {}", token);
132+
if (paginatedListResponse.get("applications").getAsJsonArray().size() != 0) {
133+
Type appListType = new TypeToken<List<ApplicationRecord>>() {
134+
}.getType();
135+
List<ApplicationRecord> records = GSON.fromJson(
136+
paginatedListResponse.get("applications").getAsJsonArray(), appListType);
137+
for (ApplicationRecord record : records) {
138+
ApplicationId applicationId =
139+
new ApplicationId(namespaceId.getNamespace(),
140+
record.getName(), record.getAppVersion());
141+
LOG.debug("Trying to stop schedule and workflows for application " + applicationId);
142+
List<WorkflowId> workflowIds =
143+
applicationClient.get(applicationId).getPrograms().stream()
144+
.filter(programRecord -> programRecord.getType().equals(ProgramType.WORKFLOW))
145+
.map(programRecord -> new WorkflowId(applicationId, programRecord.getName()))
146+
.collect(Collectors.toList());
147+
for (WorkflowId workflowId : workflowIds) {
148+
List<ScheduleId> scheduleIds =
149+
scheduleClient.listSchedules(workflowId).stream()
150+
.map(scheduleDetail ->
151+
new ScheduleId(namespaceId.getNamespace(), record.getName(),
152+
scheduleDetail.getName()))
153+
.collect(Collectors.toList());
154+
for (ScheduleId scheduleId : scheduleIds) {
155+
if (scheduleClient.getStatus(scheduleId).equals(SCHEDULED)) {
156+
scheduleClient.suspend(scheduleId);
157+
}
158+
}
159+
// Need to stop workflows first or else the program will fail to stop below
146160
if (!programClient.getStatus(workflowId).equals(ProgramStatus.STOPPED.toString())) {
147-
// Pipeline still in running state. Continue with stopping rest of the pipelines in this namespace and
148-
// next retry should try to stop/verify status for this pipeline.
149-
shouldRetry = true;
161+
try {
162+
programClient.stop(workflowId);
163+
} catch (BadRequestException e) {
164+
// There might be race condition between checking if the program
165+
// is in RUNNING state and stopping it. This can cause programClient.stop to
166+
// throw BadRequestException so verifying if the program transitioned to stop
167+
// state since it was checked earlier or not.
168+
if (!programClient.getStatus(workflowId)
169+
.equals(ProgramStatus.STOPPED.toString())) {
170+
// Pipeline still in running state. Continue with stopping rest of the
171+
// pipelines in this namespace and next retry should try to stop/verify status
172+
// for this pipeline.
173+
shouldRetry = true;
174+
}
175+
}
150176
}
151177
}
152178
}
153179
}
180+
isLastPage = (token == null);
154181
}
155182
// At least one pipeline is still in running state so retry to verify pipeline status .
156183
if (shouldRetry) {

0 commit comments

Comments
 (0)
Please sign in to comment.