mapreduce源码分析作业提交、初始化、分配、计算过程之初始化篇

发布时间:2016-12-11 16:07:31 编辑:www.fx114.net 分享查询网我要评论
本篇文章主要介绍了"mapreduce源码分析作业提交、初始化、分配、计算过程之初始化篇",主要涉及到mapreduce源码分析作业提交、初始化、分配、计算过程之初始化篇方面的内容,对于mapreduce源码分析作业提交、初始化、分配、计算过程之初始化篇感兴趣的同学可以参考一下。

作业初始化  mapreduce源码分析作业提交、初始化、分配、计算过程之提交篇最后讲到Client远程RPC调用Jobtracker的submitJob方法,mapreduce作业初始化就从此处作为切入口。   jobtracker 里的submitJob方法: public synchronized JobStatus submitJob(JobID jobId) throws IOException {     if(jobs.containsKey(jobId)) {       //job already running, don't start twice       return jobs.get(jobId).getStatus();     }          JobInProgress job = new JobInProgress(jobId, this, this.conf); //创建JobInprogress对象维护作业的运行时信息          String queue = job.getProfile().getQueueName();     if(!(queueManager.getQueues().contains(queue))) {             new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));       throw new IOException("Queue \"" + queue + "\" does not exist");             }     // check for access                                                           //检查用户是否有指定队列作业提交权限     try {       checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);     } catch (IOException ioe) {        LOG.warn("Access denied for user " + job.getJobConf().getUser()                  + ". Ignoring job " + jobId, ioe);       new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));       throw ioe;     }     // Check the job if it cannot run in the cluster because of invalid memory     // requirements.     try {       checkMemoryRequirements(job);                              //检查作业配置的内存是否配置合理,用户提交作业时可用//mapred.job.map.memory.mb mapred.job.reduce.memory.mb指定map,reduce占用的内存量,管理员可能过参数mapred.cluster.max.map.memory.mb, mapred.cluster.max.reduce.memory.mb配置用户最大内存使用量,一旦超过,则作业提交失败     } catch (IOException ioe) {       new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));       throw ioe;     }    return addJob(jobId, job);   //通知taskscheduler将作业加入作业列队,同时初始化作业  } jobtracker中listeners列表执行add方法,触发到eagerTaskInitializationListener类去执行init(wait notify方法)-调用jobtracker的initJob-再调用JobInprogress的initTasks方法-在JobProgress内会创建所有的map,reduce对应的taskprogress放在缓存里nonRunningMapCache....   addJob方法内:   private synchronized JobStatus addJob(JobID jobId, JobInProgress job) {     totalSubmissions++;     synchronized (jobs) {       synchronized (taskScheduler) {         jobs.put(job.getProfile().getJobID(), job);   /* 观察者设计模式 jobInProgressListeners为监听对象,存储为一系列的待通知对象 ,private final List<JobInProgressListener> jobInProgressListeners = new CopyOnWriteArrayList<JobInProgressListener>(); jobtracker内有jobInProgressListeners初始化方法addJobInProgressListener */         for (JobInProgressListener listener : jobInProgressListeners) {            try {             listener.jobAdded(job);                         //调用listener就会调用taskscheduler(JobQueueTaskScheduler)的相关方法,所谓观察者模式           } catch (IOException ioe) {             LOG.warn("Failed to add and so skipping the job : "                 + job.getJobID() + ". Exception : " + ioe);           }         }       }     }     myInstrumentation.submitJob(job.getJobConf(), jobId);     return job.getStatus();   }    listener.jobAdded即是调用JobQueueJobInProgressListener的jobAdded方法,那什么时候把JobQueueJobInProgressListener对象赋给jobtracker的呢?是在taskscheduler启动时候,那taskscheduler什么时候启动呢,那是在jobtracker start初始化的时候会根据mapred.jobtracker.taskScheduler构造相应的调度器   JobQueueTaskScheduler调度器的启动 jobQueueJobInProgressListener第一个增加到jobtracker里的监听列表里,所以作业增加在作业初始化前面   public synchronized void start() throws IOException {     super.start();     taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener); //taskTrackerManager实际是jobtracker的对象,向jobtracker注册jobQueueJobInProgressListener对象,所以上面的listener.jobAdded(job);调用的方法,调的是jobQueueJobInProgressListener的方法     eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);     eagerTaskInitializationListener.start();     taskTrackerManager.addJobInProgressListener(         eagerTaskInitializationListener);  //eagerTaskInitializationListener会初始化作业,调用jobtracker中的initJob方法,initJob方法会调用JobInprogress的initTask方法   } jobQueueJobInProgressListener的方法 public void jobAdded(JobInProgress job) {     jobQueue.put(new JobSchedulingInfo(job.getStatus()), job);   } 上边的调度器是jobtracker启动的时候构建的     public static JobTracker startTracker(JobConf conf, String identifier)    throws IOException, InterruptedException {     JobTracker result = null;     while (true) {       try {         result = new JobTracker(conf, identifier); //jobtracker构造方法内会根据参数mapred.jobtracker.taskScheduler创建相应的调度器         result.taskScheduler.setTaskTrackerManager(result); //将jobtracker实例传给taskScheduler   jobtracker内main方法会调用startTracker     public static void main(String argv[]                           ) throws IOException, InterruptedException { ....    try {       JobTracker tracker = startTracker(new JobConf());       tracker.offerService(); //启动各个线程,interTrackerServer为ipc.Server类的对象(有内部类Listener,以nio 的方式启动守护进程,启动  infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort,          tmpInfoPort == 0, conf);  HttpServer( org.apache.hadoop.http)  HttpServer的作用 * Create a Jetty embedded server to answer http requests. The primary goal  * is to serve up status information for the server * There are three contexts:  *   "/logs/" -> points to the log directory  *   "/static/" -> points to common static files (src/webapps/static)  *   "/" -> the jsp server code from (src/webapps/<name>)     作业的初始化 调度器: Listener类都会加到jobtracker类,jobtracker的submitjob会执行Listener的addjob方法 eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);     eagerTaskInitializationListener.start();     taskTrackerManager.addJobInProgressListener(         eagerTaskInitializationListener);  //eagerTaskInitializationListener会初始化作业,调用jobtracker中的initJob方法,initJob方法会调用JobInprogress的initTask方法 细节: jobAdded会触发initJob   EagerTaskInitializationListener类: .... class JobInitManager implements Runnable {         public void run() {       JobInProgress job = null;       while (true) {         try {           synchronized (jobInitQueue) {             while (jobInitQueue.isEmpty()) {               jobInitQueue.wait();             }             job = jobInitQueue.remove(0);           }           threadPool.execute(new InitJob(job));         } catch (InterruptedException t) {           LOG.info("JobInitManagerThread interrupted.");           break;         }        } ....  class InitJob implements Runnable {        private JobInProgress job;          public InitJob(JobInProgress job) {       this.job = job;     }          public void run() {       ttm.initJob(job); //此处的ttm即是jobtracker     }   } ....  public void jobAdded(JobInProgress job) {     synchronized (jobInitQueue) {       jobInitQueue.add(job);       resortInitQueue();       jobInitQueue.notifyAll();     }   } JobInprogress的initTask方法创建一系列taskinprogress缓存起来,等待任务分配发回到tasktracker /** * Construct the splits, etc. This is invoked from an async * thread so that split-computation doesn't block anyone. */ public synchronized void initTasks() throws IOException, KillInterruptedException { if (tasksInited.get() || isComplete()) { return; } synchronized(jobInitKillStatus){ if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) { return; } jobInitKillStatus.initStarted = true; } LOG.info("Initializing " + jobId); // log job info JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), this.startTime, hasRestarted()); // log the job priority setPriority(this.priority); // // read input splits and create a map per a split // String jobFile = profile.getJobFile(); Path sysDir = new Path(this.jobtracker.getSystemDir()); FileSystem fs = sysDir.getFileSystem(conf); DataInputStream splitFile = fs.open(new Path(conf.get("mapred.job.split.file"))); JobClient.RawSplit[] splits; try { splits = JobClient.readSplitFile(splitFile); } finally { splitFile.close(); } numMapTasks = splits.length; // if the number of splits is larger than a configured value // then fail the job. int maxTasks = jobtracker.getMaxTasksPerJob(); if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) { throw new IOException( "The number of tasks for this job " + (numMapTasks + numReduceTasks) + " exceeds the configured limit " + maxTasks); } jobtracker.getInstrumentation().addWaiting( getJobID(), numMapTasks + numReduceTasks); maps = new TaskInProgress[numMapTasks]; for(int i=0; i < numMapTasks; ++i) { inputLength += splits[i].getDataLength(); maps[i] = new TaskInProgress(jobId, jobFile, splits[i], jobtracker, conf, this, i); } LOG.info("Input size for job " + jobId + " = " + inputLength + ". Number of splits = " + splits.length); if (numMapTasks > 0) { nonRunningMapCache = createCache(splits, maxLevel); } // set the launch time this.launchTime = System.currentTimeMillis(); // // Create reduce tasks // this.reduces = new TaskInProgress[numReduceTasks]; for (int i = 0; i < numReduceTasks; i++) { reduces[i] = new TaskInProgress(jobId, jobFile, numMapTasks, i, jobtracker, conf, this); nonRunningReduces.add(reduces[i]); } // Calculate the minimum number of maps to be complete before // we should start scheduling reduces completedMapsForReduceSlowstart = (int)Math.ceil( (conf.getFloat("mapred.reduce.slowstart.completed.maps", DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * numMapTasks)); // create cleanup two cleanup tips, one map and one reduce. cleanup = new TaskInProgress[2]; // cleanup map tip. This map doesn't use any splits. Just assign an empty // split. JobClient.RawSplit emptySplit = new JobClient.RawSplit(); cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks); cleanup[0].setJobCleanupTask(); // cleanup reduce tip. cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks, jobtracker, conf, this); cleanup[1].setJobCleanupTask(); // create two setup tips, one map and one reduce. setup = new TaskInProgress[2]; // setup map tip. This map doesn't use any split. Just assign an empty // split. setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks + 1 ); setup[0].setJobSetupTask(); // setup reduce tip. setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks + 1, jobtracker, conf, this); setup[1].setJobSetupTask(); synchronized(jobInitKillStatus){ jobInitKillStatus.initDone = true; if(jobInitKillStatus.killed) { throw new KillInterruptedException("Job " + jobId + " killed in init"); } } tasksInited.set(true); JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, numMapTasks, numReduceTasks); }  

上一篇:新手学习,如何调用存储过程简单小列子
下一篇:重置 InnoDB 表的日志文件

相关文章

相关评论