admin 管理员组文章数量: 1087139
2024年1月14日发(作者:易语言游戏登录器源码)
实现类: DefaultScheduler
//
启动Scheduler ?? private void resetAndStartScheduler() throws Exception { validateRunsInMainThread(); final CompletableFuture
启动调度 startScheduling NoException(n(this::startScheduling)); }3.4. JobMaster#startScheduling private void startScheduling() { checkState(jobStatusListener == null); // register self as job status change listener jobStatusListener = new JobManagerJobStatusListener(); erJobStatusListener(jobStatusListener); //
开始调度
cheduling(); }3.5. DefaultScheduler#startSchedulingInternal @Override protected void startSchedulingInternal() { ( "Starting scheduling with scheduling strategy [{}]", ss().getName()); prepareExecutionGraphForNgScheduling(); //
默认调度策略 : PipelinedRegion SchedulingStrategy // PipelinedRegionSchedulingStrategy#startScheduling cheduling(); }
// {ExecutionVertexID@7457} "ea632d67b7d595e5b851708ae9ad79d6_3" -> {ExecutionVertexDeploymentOption@7490} // {ExecutionVertexID@7441} "0a448493b4782967b157_0" -> {ExecutionVertexDeploymentOption@7491} // {ExecutionVertexID@7455} "ea632d67b7d595e5b851708ae9ad79d6_2" -> {ExecutionVertexDeploymentOption@7492} // {ExecutionVertexID@7443} "0a448493b4782967b157_1" -> {ExecutionVertexDeploymentOption@7493} final Map
版本,
参数 , slot分配信息 // deploymentHandles = {ArrayList@8194} size = 10 // 0 = {DeploymentHandle@8212} // requiredVertexVersion = {ExecutionVertexVersion@7566} // executionVertexDeploymentOption = {ExecutionVertexDeploymentOption@7484}
// executionVertexDeploymentOption = {ExecutionVertexDeploymentOption@7484} // slotExecutionVertexAssignment = {SlotExecutionVertexAssignment@8064} // 1 = {DeploymentHandle@8213} // requiredVertexVersion = {ExecutionVertexVersion@7573} // executionVertexDeploymentOption = {ExecutionVertexDeploymentOption@7491} // slotExecutionVertexAssignment = {SlotExecutionVertexAssignment@8070} // 2 = {DeploymentHandle@8214} // 3 = {DeploymentHandle@8215} // 4 = {DeploymentHandle@8216} // 5 = {DeploymentHandle@8217} // 6 = {DeploymentHandle@8218} // 7 = {DeploymentHandle@8219} // 8 = {DeploymentHandle@8220} // 9 = {DeploymentHandle@8221} final List
开始部署 ... waitForAllSlotsAndDeploy(deploymentHandles); }3.9. DefaultScheduler#waitForAllSlotsAndDeploy private void waitForAllSlotsAndDeploy(final List
分配资源,
开始部署 NoException( assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles))); }3.10. DefaultScheduler#deployAll private BiFunction
分配任务 : deployOrHandleError NoException( (deployOrHandleError(deploymentHandle))); } return null; }; }3.11. DefaultScheduler#deployOrHandleError
private BiFunction
部署task deployTaskSafe(executionVertexId); } else { handleTaskDeploymentFailure(executionVertexId, throwable); } return null; }; }3.12. DefaultScheduler#deployTaskSafe private void deployTaskSafe(final ExecutionVertexID executionVertexId) { try { //
获取ExecutionVertex final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId); //
开始部署 : ExecutionVertex // DefaultExecutionVertexOperations#deploy (executionVertex); } catch (Throwable e) { handleTaskDeploymentFailure(executionVertexId, e); } }3.13. DefaultExecutionVertexOperations#deploy @Override public void deploy(final ExecutionVertex executionVertex) throws JobException { (); }3.14. ExecutionVertex#deploy public void deploy() throws JobException { //
部署 (); }3.15. ExecutionVertex#deploy /** *
将 execution
部署到先前分配的资源。 * Deploys the execution to the previously assigned resource. * * @throws JobException if the execution cannot be deployed to the assigned resource
*/ public void deploy() throws JobException { assertRunningInJobMasterMainThread(); //
获取slot // slotRequestId = {SlotRequestId@8931} "SlotRequestId{7d3611a3599a124ed703d75c55561420}" // slotContext = {AllocatedSlot@8932} "AllocatedSlot e5eeb5d0e767c407ea81ab345a14ebd8 @ container_18_0017_01_000002 @ henghe-030 (dataPort=39722) - 0" // slotSharingGroupId = null // locality = {Locality@8933} "UNKNOWN" // slotOwner = {SharedSlot@8934} // releaseFuture = {CompletableFuture@8935} "tableFuture@7ea60a0f[Not completed]" // state = {SingleLogicalSlot$State@8936} "ALIVE" // payload = {Execution@8899} "Attempt #0 (Source: Socket Stream (1/1)) @ LogicalSlot@7f697d27 - [SCHEDULED]" // willBeOccupiedIndefinitely = true final LogicalSlot slot = assignedResource; checkNotNull( slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource."); // Check if the TaskManager died in the meantime // This only speeds up the response to TaskManagers failing concurrently to deployments. // The more general check is the rpcTimeout of the deployment call if (!e()) { throw new JobException("Target slot (TaskManager) for deployment is no longer alive."); } // make sure exactly one deployment call happens from the correct state // note: the transition from CREATED to DEPLOYING is for testing purposes only ExecutionState previous = ; if (previous == SCHEDULED || previous == CREATED) { if (!transitionState(previous, DEPLOYING)) { // race condition, someone else beat us to the deploying call. // this should actually not happen and indicates a race somewhere else throw new IllegalStateException( "Cannot deploy task: Concurrent deployment call race."); } } else { // vertex may have been cancelled, or it was already scheduled throw new IllegalStateException( "The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous); } if (this != load()) { throw new IllegalStateException( ( "The execution %s has not been assigned to the assigned slot.", this)); } try { // race double check, did we fail/cancel and do we need to release the slot? if ( != DEPLOYING) { eSlot( new FlinkException( "Actual state of execution " + this + " (" + state + ") does not match expected state DEPLOYING.")); return;
+ ')'; markFailed( new Exception( "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation() + ") not responding after a rpcTimeout of " + rpcTimeout, failure)); } else { markFailed(failure); } } }, jobMasterMainThreadExecutor); } catch (Throwable t) { markFailed(t); if (isLegacyScheduling()) { w(t); } } }3.16. Task @Override public CompletableFuture
提交
任务
// ---------------------------------------------------------------------- @Override public CompletableFuture
if (!(MasterId(), jobMasterId)) { final String message = "Rejecting the task submission because the job manager leader id " + jobMasterId + " does not match the expected job manager leader id " + MasterId() + '.'; (message); throw new TaskSubmissionException(message); } if (!kSlotActive(jobId, ocationId())) { final String message = "No task slot allocated for job ID " + jobId + " and allocation ID " + ocationId() + '.'; (message); throw new TaskSubmissionException(message); } // re-integrate offloaded data: try { gData(manentBlobService()); } catch (IOException | ClassNotFoundException e) { throw new TaskSubmissionException( "Could not re-integrate offloaded TaskDeploymentDescriptor data.", e); } // deserialize the pre-serialized information final JobInformation jobInformation; final TaskInformation taskInformation; try { jobInformation = ializedJobInformation() .deserializeValue(getClass().getClassLoader()); taskInformation = ializedTaskInformation() .deserializeValue(getClass().getClassLoader()); } catch (IOException | ClassNotFoundException e) { throw new TaskSubmissionException( "Could not deserialize the job or task information.", e); } if (!(Id())) { throw new TaskSubmissionException( "Inconsistent job ID information inside TaskDeploymentDescriptor (" + Id() + " vs. " + Id() + ")"); } TaskMetricGroup taskMetricGroup = kForJob( Id(), Name(), VertexId(), cutionAttemptId(), kName(), taskIndex(), emptNumber());
tateService(), adcastVariableManager(), kEventDispatcher(), externalResourceInfoProvider, taskStateManager, taskManagerActions, inputSplitProvider, checkpointResponder, taskOperatorEventGateway, aggregateManager, classLoaderHandle, fileCache, taskManagerConfiguration, taskMetricGroup, resultPartitionConsumableNotifier, partitionStateChecker, getRpcService().getExecutor()); (_BACKPRESSURED, task::isBackPressured); // Received task // Window( // TumblingProcessingTimeWindows(5000), // ProcessingTimeTrigger, // ReduceFunction$1, PassThroughWindowFunction // ) -> // Sink: Print to Std. Out (1/1)#0 (141dd597dc560a831b2b4bc195943f0b), // // deploy into slot with allocation id // 3755cb8f9962a9a7738db04f2a02084c. ( "Received task {} ({}), deploy into slot with allocation id {}.", kInfo().getTaskNameWithSubtasks(), cutionAttemptId(), ocationId()); boolean taskAdded; try { taskAdded = k(task); } catch (SlotNotFoundException | SlotNotActiveException e) { throw new TaskSubmissionException("Could not submit task.", e); } if (taskAdded) { //
启动线程 askThread(); setupResultPartitionBookkeeping( Id(), ducedPartitions(), minationFuture()); return tedFuture(()); } else { final String message = "TaskManager already contains a task for id " + cutionId() + '.'; (message); throw new TaskSubmissionException(message); } } catch (TaskSubmissionException e) { return tedExceptionally(e); } }
版权声明:本文标题:Flink1.12.2源码浅析:Task的调度 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://roclinux.cn/b/1705207434a476937.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论