admin 管理员组

文章数量: 1087139


2024年1月14日发(作者:易语言游戏登录器源码)

实现类: DefaultScheduler

//

启动Scheduler ?? private void resetAndStartScheduler() throws Exception { validateRunsInMainThread(); final CompletableFuture schedulerAssignedFuture; if (tJobStatus() == D) { schedulerAssignedFuture = tedFuture(null); nThreadExecutor(getMainThreadExecutor()); } else { suspendAndClearSchedulerFields( new FlinkException( "ExecutionGraph is being reset in order to be rescheduled.")); final JobManagerJobMetricGroup newJobManagerJobMetricGroup = (jobGraph); final SchedulerNG newScheduler = createScheduler(executionDeploymentTracker, newJobManagerJobMetricGroup); schedulerAssignedFuture = schedulerNG .getTerminationFuture() .handle( (ignored, throwable) -> { nThreadExecutor(getMainThreadExecutor()); assignScheduler(newScheduler, newJobManagerJobMetricGroup); return null; }); } //

启动调度 startScheduling NoException(n(this::startScheduling)); }3.4. JobMaster#startScheduling private void startScheduling() { checkState(jobStatusListener == null); // register self as job status change listener jobStatusListener = new JobManagerJobStatusListener(); erJobStatusListener(jobStatusListener); //

开始调度

cheduling(); }3.5. DefaultScheduler#startSchedulingInternal @Override protected void startSchedulingInternal() { ( "Starting scheduling with scheduling strategy [{}]", ss().getName()); prepareExecutionGraphForNgScheduling(); //

默认调度策略 : PipelinedRegion SchedulingStrategy // PipelinedRegionSchedulingStrategy#startScheduling cheduling(); }

// {ExecutionVertexID@7457} "ea632d67b7d595e5b851708ae9ad79d6_3" -> {ExecutionVertexDeploymentOption@7490} // {ExecutionVertexID@7441} "0a448493b4782967b157_0" -> {ExecutionVertexDeploymentOption@7491} // {ExecutionVertexID@7455} "ea632d67b7d595e5b851708ae9ad79d6_2" -> {ExecutionVertexDeploymentOption@7492} // {ExecutionVertexID@7443} "0a448493b4782967b157_1" -> {ExecutionVertexDeploymentOption@7493} final Map deploymentOptionsByVertex = groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions); // 0 = {ExecutionVertexID@7434} "bc764cd8ddf7a0cff126f51c16239658_0" // 1 = {ExecutionVertexID@7441} "0a448493b4782967b157_0" // 2 = {ExecutionVertexID@7443} "0a448493b4782967b157_1" // 3 = {ExecutionVertexID@7445} "0a448493b4782967b157_2" // 4 = {ExecutionVertexID@7448} "0a448493b4782967b157_3" // 5 = {ExecutionVertexID@7451} "ea632d67b7d595e5b851708ae9ad79d6_0" // 6 = {ExecutionVertexID@7453} "ea632d67b7d595e5b851708ae9ad79d6_1" // 7 = {ExecutionVertexID@7455} "ea632d67b7d595e5b851708ae9ad79d6_2" // 8 = {ExecutionVertexID@7457} "ea632d67b7d595e5b851708ae9ad79d6_3" // 9 = {ExecutionVertexID@7460} "6d2677a0ecc3fd8df0b72ec675edf8f4_0" final List verticesToDeploy = () .map(ExecutionVertexDeploymentOption::getExecutionVertexId) .collect(()); // {ExecutionVertexID@7434} "bc764cd8ddf7a0cff126f51c16239658_0" -> {ExecutionVertexVersion@7566} // key = {ExecutionVertexID@7434} "bc764cd8ddf7a0cff126f51c16239658_0" // value = {ExecutionVertexVersion@7566} // executionVertexId = {ExecutionVertexID@7434} "bc764cd8ddf7a0cff126f51c16239658_0" // version = 1 // {ExecutionVertexID@7453} "ea632d67b7d595e5b851708ae9ad79d6_1" -> {ExecutionVertexVersion@7567} // key = {ExecutionVertexID@7453} "ea632d67b7d595e5b851708ae9ad79d6_1" // value = {ExecutionVertexVersion@7567} // executionVertexId = {ExecutionVertexID@7453} "ea632d67b7d595e5b851708ae9ad79d6_1" // version = 1 // {ExecutionVertexID@7445} "0a448493b4782967b157_2" -> {ExecutionVertexVersion@7568} // {ExecutionVertexID@7451} "ea632d67b7d595e5b851708ae9ad79d6_0" -> {ExecutionVertexVersion@7569} // {ExecutionVertexID@7448} "0a448493b4782967b157_3" -> {ExecutionVertexVersion@7570} // {ExecutionVertexID@7460} "6d2677a0ecc3fd8df0b72ec675edf8f4_0" -> {ExecutionVertexVersion@7571} // {ExecutionVertexID@7457} "ea632d67b7d595e5b851708ae9ad79d6_3" -> {ExecutionVertexVersion@7572} // {ExecutionVertexID@7441} "0a448493b4782967b157_0" -> {ExecutionVertexVersion@7573} // {ExecutionVertexID@7455} "ea632d67b7d595e5b851708ae9ad79d6_2" -> {ExecutionVertexVersion@7574} // {ExecutionVertexID@7443} "0a448493b4782967b157_1" -> {ExecutionVertexVersion@7575} final Map requiredVersionByVertex = VertexModifications(verticesToDeploy); transitionToScheduled(verticesToDeploy); // allocateSlots ?? // slotExecutionVertexAssignments = {ArrayList@8148} size = 10 // 0 = {SlotExecutionVertexAssignment@8064} // 1 = {SlotExecutionVertexAssignment@8070} // 2 = {SlotExecutionVertexAssignment@8072} // 3 = {SlotExecutionVertexAssignment@8066} // 4 = {SlotExecutionVertexAssignment@8068} // 5 = {SlotExecutionVertexAssignment@8067} // 6 = {SlotExecutionVertexAssignment@8065} // 7 = {SlotExecutionVertexAssignment@8073} // 8 = {SlotExecutionVertexAssignment@8071} // 9 = {SlotExecutionVertexAssignment@8069} final List slotExecutionVertexAssignments = allocateSlots(executionVertexDeploymentOptions); // DeploymentHandle包含 :

版本,

参数 , slot分配信息 // deploymentHandles = {ArrayList@8194} size = 10 // 0 = {DeploymentHandle@8212} // requiredVertexVersion = {ExecutionVertexVersion@7566} // executionVertexDeploymentOption = {ExecutionVertexDeploymentOption@7484}

// executionVertexDeploymentOption = {ExecutionVertexDeploymentOption@7484} // slotExecutionVertexAssignment = {SlotExecutionVertexAssignment@8064} // 1 = {DeploymentHandle@8213} // requiredVertexVersion = {ExecutionVertexVersion@7573} // executionVertexDeploymentOption = {ExecutionVertexDeploymentOption@7491} // slotExecutionVertexAssignment = {SlotExecutionVertexAssignment@8070} // 2 = {DeploymentHandle@8214} // 3 = {DeploymentHandle@8215} // 4 = {DeploymentHandle@8216} // 5 = {DeploymentHandle@8217} // 6 = {DeploymentHandle@8218} // 7 = {DeploymentHandle@8219} // 8 = {DeploymentHandle@8220} // 9 = {DeploymentHandle@8221} final List deploymentHandles = createDeploymentHandles( requiredVersionByVertex, deploymentOptionsByVertex, slotExecutionVertexAssignments); //

开始部署 ... waitForAllSlotsAndDeploy(deploymentHandles); }3.9. DefaultScheduler#waitForAllSlotsAndDeploy private void waitForAllSlotsAndDeploy(final List deploymentHandles) { //

分配资源,

开始部署 NoException( assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles))); }3.10. DefaultScheduler#deployAll private BiFunction deployAll( final List deploymentHandles) { return (ignored, throwable) -> { propagateIfNonNull(throwable); for (final DeploymentHandle deploymentHandle : deploymentHandles) { final SlotExecutionVertexAssignment slotExecutionVertexAssignment = tExecutionVertexAssignment(); final CompletableFuture slotAssigned = icalSlotFuture(); checkState(()); // slot

分配任务 : deployOrHandleError NoException( (deployOrHandleError(deploymentHandle))); } return null; }; }3.11. DefaultScheduler#deployOrHandleError

private BiFunction deployOrHandleError( final DeploymentHandle deploymentHandle) { final ExecutionVertexVersion requiredVertexVersion = uiredVertexVersion(); final ExecutionVertexID executionVertexId = cutionVertexId(); return (ignored, throwable) -> { if (fied(requiredVertexVersion)) { ( "Refusing to deploy execution vertex {} because this deployment was " + "superseded by another deployment", executionVertexId); return null; } if (throwable == null) { //

部署task deployTaskSafe(executionVertexId); } else { handleTaskDeploymentFailure(executionVertexId, throwable); } return null; }; }3.12. DefaultScheduler#deployTaskSafe private void deployTaskSafe(final ExecutionVertexID executionVertexId) { try { //

获取ExecutionVertex final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId); //

开始部署 : ExecutionVertex // DefaultExecutionVertexOperations#deploy (executionVertex); } catch (Throwable e) { handleTaskDeploymentFailure(executionVertexId, e); } }3.13. DefaultExecutionVertexOperations#deploy @Override public void deploy(final ExecutionVertex executionVertex) throws JobException { (); }3.14. ExecutionVertex#deploy public void deploy() throws JobException { //

部署 (); }3.15. ExecutionVertex#deploy /** *

将 execution

部署到先前分配的资源。 * Deploys the execution to the previously assigned resource. * * @throws JobException if the execution cannot be deployed to the assigned resource

*/ public void deploy() throws JobException { assertRunningInJobMasterMainThread(); //

获取slot // slotRequestId = {SlotRequestId@8931} "SlotRequestId{7d3611a3599a124ed703d75c55561420}" // slotContext = {AllocatedSlot@8932} "AllocatedSlot e5eeb5d0e767c407ea81ab345a14ebd8 @ container_18_0017_01_000002 @ henghe-030 (dataPort=39722) - 0" // slotSharingGroupId = null // locality = {Locality@8933} "UNKNOWN" // slotOwner = {SharedSlot@8934} // releaseFuture = {CompletableFuture@8935} "tableFuture@7ea60a0f[Not completed]" // state = {SingleLogicalSlot$State@8936} "ALIVE" // payload = {Execution@8899} "Attempt #0 (Source: Socket Stream (1/1)) @ LogicalSlot@7f697d27 - [SCHEDULED]" // willBeOccupiedIndefinitely = true final LogicalSlot slot = assignedResource; checkNotNull( slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource."); // Check if the TaskManager died in the meantime // This only speeds up the response to TaskManagers failing concurrently to deployments. // The more general check is the rpcTimeout of the deployment call if (!e()) { throw new JobException("Target slot (TaskManager) for deployment is no longer alive."); } // make sure exactly one deployment call happens from the correct state // note: the transition from CREATED to DEPLOYING is for testing purposes only ExecutionState previous = ; if (previous == SCHEDULED || previous == CREATED) { if (!transitionState(previous, DEPLOYING)) { // race condition, someone else beat us to the deploying call. // this should actually not happen and indicates a race somewhere else throw new IllegalStateException( "Cannot deploy task: Concurrent deployment call race."); } } else { // vertex may have been cancelled, or it was already scheduled throw new IllegalStateException( "The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous); } if (this != load()) { throw new IllegalStateException( ( "The execution %s has not been assigned to the assigned slot.", this)); } try { // race double check, did we fail/cancel and do we need to release the slot? if ( != DEPLOYING) { eSlot( new FlinkException( "Actual state of execution " + this + " (" + state + ") does not match expected state DEPLOYING.")); return;

+ ')'; markFailed( new Exception( "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation() + ") not responding after a rpcTimeout of " + rpcTimeout, failure)); } else { markFailed(failure); } } }, jobMasterMainThreadExecutor); } catch (Throwable t) { markFailed(t); if (isLegacyScheduling()) { w(t); } } }3.16. Task @Override public CompletableFuture submitTask(TaskDeploymentDescriptor tdd, Time timeout) { return Task(tdd, jobMasterId, timeout); }3.17. TaskExecutor#submitTask // ---------------------------------------------------------------------- // Task lifecycle RPCs //

提交

任务

// ---------------------------------------------------------------------- @Override public CompletableFuture submitTask( TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) { try { final JobID jobId = Id(); final ExecutionAttemptID executionAttemptID = cutionAttemptId(); final tion jobManagerConnection = nection(jobId) .orElseThrow( () -> { final String message = "Could not submit task because there is no JobManager " + "associated for the job " + jobId + '.'; (message); return new TaskSubmissionException(message); });

if (!(MasterId(), jobMasterId)) { final String message = "Rejecting the task submission because the job manager leader id " + jobMasterId + " does not match the expected job manager leader id " + MasterId() + '.'; (message); throw new TaskSubmissionException(message); } if (!kSlotActive(jobId, ocationId())) { final String message = "No task slot allocated for job ID " + jobId + " and allocation ID " + ocationId() + '.'; (message); throw new TaskSubmissionException(message); } // re-integrate offloaded data: try { gData(manentBlobService()); } catch (IOException | ClassNotFoundException e) { throw new TaskSubmissionException( "Could not re-integrate offloaded TaskDeploymentDescriptor data.", e); } // deserialize the pre-serialized information final JobInformation jobInformation; final TaskInformation taskInformation; try { jobInformation = ializedJobInformation() .deserializeValue(getClass().getClassLoader()); taskInformation = ializedTaskInformation() .deserializeValue(getClass().getClassLoader()); } catch (IOException | ClassNotFoundException e) { throw new TaskSubmissionException( "Could not deserialize the job or task information.", e); } if (!(Id())) { throw new TaskSubmissionException( "Inconsistent job ID information inside TaskDeploymentDescriptor (" + Id() + " vs. " + Id() + ")"); } TaskMetricGroup taskMetricGroup = kForJob( Id(), Name(), VertexId(), cutionAttemptId(), kName(), taskIndex(), emptNumber());

tateService(), adcastVariableManager(), kEventDispatcher(), externalResourceInfoProvider, taskStateManager, taskManagerActions, inputSplitProvider, checkpointResponder, taskOperatorEventGateway, aggregateManager, classLoaderHandle, fileCache, taskManagerConfiguration, taskMetricGroup, resultPartitionConsumableNotifier, partitionStateChecker, getRpcService().getExecutor()); (_BACKPRESSURED, task::isBackPressured); // Received task // Window( // TumblingProcessingTimeWindows(5000), // ProcessingTimeTrigger, // ReduceFunction$1, PassThroughWindowFunction // ) -> // Sink: Print to Std. Out (1/1)#0 (141dd597dc560a831b2b4bc195943f0b), // // deploy into slot with allocation id // 3755cb8f9962a9a7738db04f2a02084c. ( "Received task {} ({}), deploy into slot with allocation id {}.", kInfo().getTaskNameWithSubtasks(), cutionAttemptId(), ocationId()); boolean taskAdded; try { taskAdded = k(task); } catch (SlotNotFoundException | SlotNotActiveException e) { throw new TaskSubmissionException("Could not submit task.", e); } if (taskAdded) { //

启动线程 askThread(); setupResultPartitionBookkeeping( Id(), ducedPartitions(), minationFuture()); return tedFuture(()); } else { final String message = "TaskManager already contains a task for id " + cutionId() + '.'; (message); throw new TaskSubmissionException(message); } } catch (TaskSubmissionException e) { return tedExceptionally(e); } }


本文标签: 调度 游戏 部署 开始 分配