Skip to content

Commit

Permalink
Merge pull request #15873 from cdapio/cherrypick/fix/CDAP-21096_sched…
Browse files Browse the repository at this point in the history
…uler

[CDAP-21096] Use RemoteScheduleManager to update TimeSchedulerService from Appfabric service
  • Loading branch information
vsethi09 authored Feb 11, 2025
2 parents dd5505a + a85a1fe commit 59fa341
Show file tree
Hide file tree
Showing 13 changed files with 478 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@
import io.cdap.cdap.internal.app.runtime.artifact.PluginFinder;
import io.cdap.cdap.internal.app.runtime.schedule.DistributedTimeSchedulerService;
import io.cdap.cdap.internal.app.runtime.schedule.ExecutorThreadPool;
import io.cdap.cdap.internal.app.runtime.schedule.LocalScheduleManager;
import io.cdap.cdap.internal.app.runtime.schedule.LocalTimeSchedulerService;
import io.cdap.cdap.internal.app.runtime.schedule.RemoteScheduleManager;
import io.cdap.cdap.internal.app.runtime.schedule.ScheduleManager;
import io.cdap.cdap.internal.app.runtime.schedule.TimeSchedulerService;
import io.cdap.cdap.internal.app.runtime.schedule.store.DatasetBasedTimeScheduleStore;
import io.cdap.cdap.internal.app.runtime.schedule.store.TriggerMisfireLogger;
Expand Down Expand Up @@ -244,6 +247,7 @@ protected void configure() {
.in(Scopes.SINGLETON);
bind(TimeSchedulerService.class).to(LocalTimeSchedulerService.class)
.in(Scopes.SINGLETON);
bind(ScheduleManager.class).to(LocalScheduleManager.class).in(Scopes.SINGLETON);
bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class);
bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
Expand All @@ -265,6 +269,7 @@ protected void configure() {
binder(), HttpHandler.class, Names.named(Constants.AppFabric.SERVER_HANDLERS_BINDING));
handlerBinder.addBinding().to(ProgramRuntimeHttpHandler.class);
handlerBinder.addBinding().to(ProgramScheduleHttpHandler.class);
handlerBinder.addBinding().to(OperationsDashboardHttpHandler.class);

// TODO: Uncomment after CDAP-7688 is resolved
// servicesNamesBinder.addBinding().toInstance(Constants.Service.MESSAGING_SERVICE);
Expand Down Expand Up @@ -303,6 +308,7 @@ protected void configure() {
.in(Scopes.SINGLETON);
bind(TimeSchedulerService.class).to(LocalTimeSchedulerService.class)
.in(Scopes.SINGLETON);
bind(ScheduleManager.class).to(LocalScheduleManager.class).in(Scopes.SINGLETON);
bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class);
bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
Expand Down Expand Up @@ -367,6 +373,7 @@ protected void configure() {
.in(Scopes.SINGLETON);
bind(TimeSchedulerService.class).to(DistributedTimeSchedulerService.class)
.in(Scopes.SINGLETON);
bind(ScheduleManager.class).to(RemoteScheduleManager.class).in(Scopes.SINGLETON);
bind(MRJobInfoFetcher.class).to(DistributedMRJobInfoFetcher.class);
bind(StorageProviderNamespaceAdmin.class)
.to(DistributedStorageProviderNamespaceAdmin.class);
Expand Down Expand Up @@ -511,8 +518,6 @@ protected void configure() {
handlerBinder.addBinding().to(InstanceOperationHttpHandler.class);
handlerBinder.addBinding().to(NamespaceHttpHandler.class);
handlerBinder.addBinding().to(SourceControlManagementHttpHandler.class);
// TODO: [CDAP-13355] Move OperationsDashboardHttpHandler into report generation app
handlerBinder.addBinding().to(OperationsDashboardHttpHandler.class);
handlerBinder.addBinding().to(PreferencesHttpHandler.class);
handlerBinder.addBinding().to(PreferencesHttpHandlerInternal.class);
handlerBinder.addBinding().to(ConsoleSettingsHttpHandler.class);
Expand Down Expand Up @@ -569,6 +574,9 @@ protected void configure() {
processorHandlerBinder.addBinding().to(ProgramRuntimeHttpHandler.class);
processorHandlerBinder.addBinding().to(BootstrapHttpHandler.class);
processorHandlerBinder.addBinding().to(ProgramScheduleHttpHandler.class);
// TODO: [CDAP-13355] Move OperationsDashboardHttpHandler into report generation app
// TODO(CDAP-21134): Check feasibility of moving OperationsDashboardHttpHandler to Appfabric Server.
processorHandlerBinder.addBinding().to(OperationsDashboardHttpHandler.class);
} else {
bind(NoOpScheduler.class).in(Scopes.SINGLETON);
bind(Scheduler.class).to(NoOpScheduler.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import io.cdap.cdap.internal.app.runtime.artifact.PluginFinder;
import io.cdap.cdap.internal.app.runtime.artifact.RemoteArtifactRepositoryReaderWithLocalization;
import io.cdap.cdap.internal.app.runtime.artifact.RemoteArtifactRepositoryWithLocalization;
import io.cdap.cdap.internal.app.runtime.schedule.RemoteScheduleManager;
import io.cdap.cdap.internal.app.runtime.schedule.ScheduleManager;
import io.cdap.cdap.internal.app.runtime.workflow.BasicWorkflowStateWriter;
import io.cdap.cdap.internal.app.runtime.workflow.WorkflowStateWriter;
import io.cdap.cdap.internal.app.store.DefaultStore;
Expand Down Expand Up @@ -189,6 +191,7 @@ protected void configure() {
expose(PreviewRunner.class);

bind(Scheduler.class).to(NoOpScheduler.class);
bind(ScheduleManager.class).to(RemoteScheduleManager.class);

bind(DataTracerFactory.class).to(DefaultDataTracerFactory.class);
expose(DataTracerFactory.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
import io.cdap.cdap.internal.app.deploy.pipeline.MetadataWriterStage;
import io.cdap.cdap.internal.app.deploy.pipeline.ProgramGenerationStage;
import io.cdap.cdap.internal.app.runtime.artifact.ArtifactRepository;
import io.cdap.cdap.internal.app.runtime.schedule.ScheduleManager;
import io.cdap.cdap.internal.capability.CapabilityReader;
import io.cdap.cdap.pipeline.Pipeline;
import io.cdap.cdap.pipeline.PipelineFactory;
import io.cdap.cdap.scheduler.Scheduler;
import io.cdap.cdap.security.impersonation.Impersonator;
import io.cdap.cdap.security.impersonation.OwnerAdmin;
import io.cdap.cdap.security.spi.authentication.AuthenticationContext;
Expand Down Expand Up @@ -71,7 +71,7 @@ public class LocalApplicationManager<I, O> implements Manager<I, O> {
private final MetadataServiceClient metadataServiceClient;
private final Impersonator impersonator;
private final AuthenticationContext authenticationContext;
private final io.cdap.cdap.scheduler.Scheduler programScheduler;
private final ScheduleManager scheduleManager;
private final AccessEnforcer accessEnforcer;
private final StructuredTableAdmin structuredTableAdmin;
private final CapabilityReader capabilityReader;
Expand All @@ -87,7 +87,7 @@ public class LocalApplicationManager<I, O> implements Manager<I, O> {
UsageRegistry usageRegistry, ArtifactRepository artifactRepository,
MetadataServiceClient metadataServiceClient,
Impersonator impersonator, AuthenticationContext authenticationContext,
Scheduler programScheduler,
ScheduleManager scheduleManager,
AccessEnforcer accessEnforcer,
StructuredTableAdmin structuredTableAdmin,
CapabilityReader capabilityReader,
Expand All @@ -105,7 +105,7 @@ public class LocalApplicationManager<I, O> implements Manager<I, O> {
this.metadataServiceClient = metadataServiceClient;
this.impersonator = impersonator;
this.authenticationContext = authenticationContext;
this.programScheduler = programScheduler;
this.scheduleManager = scheduleManager;
this.accessEnforcer = accessEnforcer;
this.structuredTableAdmin = structuredTableAdmin;
this.capabilityReader = capabilityReader;
Expand All @@ -128,11 +128,11 @@ public ListenableFuture<O> deploy(I input) throws Exception {
pipeline.addLast(new CreateDatasetInstancesStage(cConf, datasetFramework, ownerAdmin,
authenticationContext));
pipeline.addLast(new DeletedProgramHandlerStage(store, programTerminator,
metricsSystemClient, metadataServiceClient, programScheduler));
metricsSystemClient, metadataServiceClient, scheduleManager));
pipeline.addLast(new ProgramGenerationStage());
pipeline.addLast(new ApplicationRegistrationStage(store, usageRegistry, ownerAdmin,
metricsCollectionService));
pipeline.addLast(new DeleteAndCreateSchedulesStage(programScheduler));
pipeline.addLast(new DeleteAndCreateSchedulesStage(scheduleManager));
pipeline.addLast(new MetadataWriterStage(metadataServiceClient));
pipeline.setFinally(new DeploymentCleanupStage());
return pipeline.execute(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import io.cdap.cdap.api.app.ApplicationSpecification;
import io.cdap.cdap.api.schedule.Trigger;
import io.cdap.cdap.internal.app.runtime.schedule.ProgramSchedule;
import io.cdap.cdap.internal.app.runtime.schedule.ScheduleManager;
import io.cdap.cdap.internal.schedule.ScheduleCreationSpec;
import io.cdap.cdap.pipeline.AbstractStage;
import io.cdap.cdap.proto.id.ApplicationId;
import io.cdap.cdap.proto.id.ProgramId;
import io.cdap.cdap.scheduler.Scheduler;
import java.util.HashSet;
import java.util.Set;

Expand All @@ -33,11 +33,11 @@
*/
public class DeleteAndCreateSchedulesStage extends AbstractStage<ApplicationWithPrograms> {

private final Scheduler programScheduler;
private final ScheduleManager scheduleManager;

public DeleteAndCreateSchedulesStage(Scheduler programScheduler) {
public DeleteAndCreateSchedulesStage(ScheduleManager scheduleManager) {
super(TypeToken.of(ApplicationWithPrograms.class));
this.programScheduler = programScheduler;
this.scheduleManager = scheduleManager;
}

@Override
Expand All @@ -52,17 +52,17 @@ public void process(final ApplicationWithPrograms input) throws Exception {
ApplicationId appId = input.getApplicationId();
// Get a set of new schedules from the app spec
Set<ProgramSchedule> newSchedules = getProgramScheduleSet(appId, input.getSpecification());
for (ProgramSchedule schedule : programScheduler.listSchedules(appId)) {
for (ProgramSchedule schedule : scheduleManager.listSchedules(appId)) {
if (newSchedules.contains(schedule)) {
newSchedules.remove(schedule); // Remove the existing schedule from the newSchedules
continue;
}
// Delete the existing schedule if it is not present in newSchedules
programScheduler.deleteSchedule(schedule.getScheduleId());
scheduleManager.deleteSchedule(schedule.getScheduleId());
}

// Add new schedules
programScheduler.addSchedules(newSchedules);
scheduleManager.addSchedules(newSchedules);

// Emit the input to next stage.
emit(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.data2.metadata.writer.MetadataServiceClient;
import io.cdap.cdap.internal.app.deploy.ProgramTerminator;
import io.cdap.cdap.internal.app.runtime.schedule.ScheduleManager;
import io.cdap.cdap.pipeline.AbstractStage;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.ProgramTypes;
import io.cdap.cdap.proto.id.ProgramId;
import io.cdap.cdap.scheduler.Scheduler;
import io.cdap.cdap.spi.metadata.MetadataMutation;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -53,18 +53,18 @@ public class DeletedProgramHandlerStage extends AbstractStage<ApplicationDeploya
private final ProgramTerminator programTerminator;
private final MetricsSystemClient metricsSystemClient;
private final MetadataServiceClient metadataServiceClient;
private final Scheduler programScheduler;
private final ScheduleManager scheduleManager;

public DeletedProgramHandlerStage(Store store, ProgramTerminator programTerminator,
MetricsSystemClient metricsSystemClient,
MetadataServiceClient metadataServiceClient,
Scheduler programScheduler) {
ScheduleManager scheduleManager) {
super(TypeToken.of(ApplicationDeployable.class));
this.store = store;
this.programTerminator = programTerminator;
this.metricsSystemClient = metricsSystemClient;
this.metadataServiceClient = metadataServiceClient;
this.programScheduler = programScheduler;
this.scheduleManager = scheduleManager;
}

@Override
Expand All @@ -81,8 +81,8 @@ public void process(ApplicationDeployable appSpec) throws Exception {
ProgramType type = ProgramTypes.fromSpecification(spec);
ProgramId programId = appSpec.getApplicationId().program(type, spec.getName());
programTerminator.stop(programId);
programScheduler.deleteSchedules(programId);
programScheduler.modifySchedulesTriggeredByDeletedProgram(programId);
scheduleManager.deleteSchedules(programId);
scheduleManager.modifySchedulesTriggeredByDeletedProgram(programId);

// Remove metadata for the deleted program
metadataServiceClient.drop(new MetadataMutation.Drop(programId.toMetadataEntity()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.runtime.schedule;

import com.google.common.base.Throwables;
import com.google.inject.Inject;
import io.cdap.cdap.common.AlreadyExistsException;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.ConflictException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.ProfileConflictException;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.internal.app.runtime.SystemArguments;
import io.cdap.cdap.internal.app.runtime.schedule.queue.JobQueueTable;
import io.cdap.cdap.internal.app.runtime.schedule.store.ProgramScheduleStoreDataset;
import io.cdap.cdap.internal.app.runtime.schedule.store.Schedulers;
import io.cdap.cdap.internal.app.store.profile.ProfileStore;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.id.ProfileId;
import io.cdap.cdap.proto.id.ScheduleId;
import io.cdap.cdap.runtime.spi.profile.ProfileStatus;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@code ScheduleManager} to manage program schedules. This class is meant to be used by in-memory
* modules and tests.
*/
public class LocalScheduleManager extends ScheduleManager {

private static final Logger LOG = LoggerFactory.getLogger(LocalScheduleManager.class);
private final TimeSchedulerService timeSchedulerService;

/**
* Parameterized constructor for LocalScheduleManager.
*
* @param transactionRunner TransactionRunner
* @param messagingService MessagingService
* @param cConf CConfiguration
* @param timeSchedulerService TimeSchedulerService
*/
@Inject
public LocalScheduleManager(TransactionRunner transactionRunner, MessagingService messagingService,
CConfiguration cConf, TimeSchedulerService timeSchedulerService) {
super(transactionRunner, messagingService, cConf);
this.timeSchedulerService = timeSchedulerService;
}

@Override
public void addSchedules(Iterable<? extends ProgramSchedule> schedules)
throws BadRequestException, NotFoundException, IOException, ConflictException {
for (ProgramSchedule schedule : schedules) {
if (!schedule.getProgramId().getType().equals(ProgramType.WORKFLOW)) {
throw new BadRequestException(String.format(
"Cannot schedule program %s of type %s: Only workflows can be scheduled",
schedule.getProgramId().getProgram(), schedule.getProgramId().getType()));
}
}

try {
TransactionRunners.run(transactionRunner, context -> {
ProgramScheduleStoreDataset store = Schedulers.getScheduleStore(context);
ProfileStore profileStore = ProfileStore.get(context);
long updatedTime = store.addSchedules(schedules);
for (ProgramSchedule schedule : schedules) {
if (schedule.getProperties() != null) {
Optional<ProfileId> profile = SystemArguments.getProfileIdFromArgs(
schedule.getProgramId().getNamespaceId(), schedule.getProperties());
if (profile.isPresent()) {
ProfileId profileId = profile.get();
if (profileStore.getProfile(profileId).getStatus() == ProfileStatus.DISABLED) {
throw new ProfileConflictException(
String.format("Profile %s in namespace %s is disabled. It cannot "
+ "be assigned to schedule %s",
profileId.getProfile(), profileId.getNamespace(),
schedule.getName()), profileId);
}
}
}
try {
timeSchedulerService.addProgramSchedule(schedule);
} catch (SchedulerException e) {
LOG.error("Exception occurs when adding schedule {}", schedule, e);
throw new RuntimeException(e);
}
}
for (ProgramSchedule schedule : schedules) {
ScheduleId scheduleId = schedule.getScheduleId();

// If the added properties contains profile assignment, add the assignment.
Optional<ProfileId> profileId = SystemArguments.getProfileIdFromArgs(
scheduleId.getNamespaceId(),
schedule.getProperties());
if (profileId.isPresent()) {
profileStore.addProfileAssignment(profileId.get(), scheduleId);
}
}
// Publish the messages at the end of transaction.
for (ProgramSchedule schedule : schedules) {
adminEventPublisher.publishScheduleCreation(schedule.getScheduleId(), updatedTime);
}
return null;
}, Exception.class);
} catch (NotFoundException | ProfileConflictException | AlreadyExistsException e) {
throw e;
} catch (Exception e) {
throw Throwables.propagate(e);
}
}

@Override
public void deleteSchedule(ScheduleId scheduleId)
throws NotFoundException, BadRequestException, IOException, ConflictException {
TransactionRunners.run(transactionRunner, context -> {
ProgramScheduleStoreDataset store = Schedulers.getScheduleStore(context);
ProfileStore profileStore = ProfileStore.get(context);
JobQueueTable queue = JobQueueTable.getJobQueue(context, cConf);
long deleteTime = System.currentTimeMillis();
List<ProgramSchedule> toNotify = new ArrayList<>();
ProgramSchedule schedule = store.getSchedule(scheduleId);
timeSchedulerService.deleteProgramSchedule(schedule);
queue.markJobsForDeletion(scheduleId, deleteTime);
toNotify.add(schedule);
// If the deleted schedule has properties with profile assignment, remove the assignment.
Optional<ProfileId> profileId = SystemArguments.getProfileIdFromArgs(
scheduleId.getNamespaceId(),
schedule.getProperties());
if (profileId.isPresent()) {
try {
profileStore.removeProfileAssignment(profileId.get(), scheduleId);
} catch (NotFoundException e) {
// This should not happen since the profile cannot be deleted if there is a schedule who is using it.
LOG.warn("Unable to find the profile {} when deleting schedule {}, "
+ "skipping assignment deletion.", profileId.get(), scheduleId);
}
}
store.deleteSchedule(scheduleId);
toNotify.forEach(adminEventPublisher::publishScheduleDeletion);
return null;
}, NotFoundException.class);
}
}
Loading

0 comments on commit 59fa341

Please sign in to comment.