Golang程序  |  369行  |  7.53 KB

// Copyright 2015 Google Inc. All rights reserved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kati

import (
	"container/heap"
	"errors"
	"fmt"
	"os"
	"os/exec"
	"syscall"
	"time"

	"github.com/golang/glog"
)

var (
	errNothingDone = errors.New("nothing done")
)

type job struct {
	n        *DepNode
	ex       *Executor
	parents  []*job
	outputTs int64
	numDeps  int
	depsTs   int64
	id       int

	runners []runner
}

type jobResult struct {
	j   *job
	w   *worker
	err error
}

type newDep struct {
	j        *job
	neededBy *job
}

type worker struct {
	wm       *workerManager
	jobChan  chan *job
	waitChan chan bool
	doneChan chan bool
}

type jobQueue []*job

func (jq jobQueue) Len() int      { return len(jq) }
func (jq jobQueue) Swap(i, j int) { jq[i], jq[j] = jq[j], jq[i] }

func (jq jobQueue) Less(i, j int) bool {
	// First come, first serve, for GNU make compatibility.
	return jq[i].id < jq[j].id
}

func (jq *jobQueue) Push(x interface{}) {
	item := x.(*job)
	*jq = append(*jq, item)
}

func (jq *jobQueue) Pop() interface{} {
	old := *jq
	n := len(old)
	item := old[n-1]
	*jq = old[0 : n-1]
	return item
}

func newWorker(wm *workerManager) *worker {
	w := &worker{
		wm:       wm,
		jobChan:  make(chan *job),
		waitChan: make(chan bool),
		doneChan: make(chan bool),
	}
	return w
}

func (w *worker) Run() {
	done := false
	for !done {
		select {
		case j := <-w.jobChan:
			err := j.build()
			w.wm.ReportResult(w, j, err)
		case done = <-w.waitChan:
		}
	}
	w.doneChan <- true
}

func (w *worker) PostJob(j *job) {
	w.jobChan <- j
}

func (w *worker) Wait() {
	w.waitChan <- true
	<-w.doneChan
}

func (j *job) createRunners() ([]runner, error) {
	runners, _, err := createRunners(j.ex.ctx, j.n)
	return runners, err
}

// TODO(ukai): use time.Time?
func getTimestamp(filename string) int64 {
	st, err := os.Stat(filename)
	if err != nil {
		return -2
	}
	return st.ModTime().Unix()
}

func (j *job) build() error {
	if j.n.IsPhony {
		j.outputTs = -2 // trigger cmd even if all inputs don't exist.
	} else {
		j.outputTs = getTimestamp(j.n.Output)
	}

	if !j.n.HasRule {
		if j.outputTs >= 0 || j.n.IsPhony {
			return errNothingDone
		}
		if len(j.parents) == 0 {
			return fmt.Errorf("*** No rule to make target %q.", j.n.Output)
		}
		return fmt.Errorf("*** No rule to make target %q, needed by %q.", j.n.Output, j.parents[0].n.Output)
	}

	if j.outputTs >= j.depsTs {
		// TODO: stats.
		return errNothingDone
	}

	rr, err := j.createRunners()
	if err != nil {
		return err
	}
	if len(rr) == 0 {
		return errNothingDone
	}
	for _, r := range rr {
		err := r.run(j.n.Output)
		glog.Warningf("cmd result for %q: %v", j.n.Output, err)
		if err != nil {
			exit := exitStatus(err)
			return fmt.Errorf("*** [%s] Error %d", j.n.Output, exit)
		}
	}

	if j.n.IsPhony {
		j.outputTs = time.Now().Unix()
	} else {
		j.outputTs = getTimestamp(j.n.Output)
		if j.outputTs < 0 {
			j.outputTs = time.Now().Unix()
		}
	}
	return nil
}

func (wm *workerManager) handleJobs() error {
	for {
		if len(wm.freeWorkers) == 0 {
			return nil
		}
		if wm.readyQueue.Len() == 0 {
			return nil
		}
		j := heap.Pop(&wm.readyQueue).(*job)
		glog.V(1).Infof("run: %s", j.n.Output)

		j.numDeps = -1 // Do not let other workers pick this.
		w := wm.freeWorkers[0]
		wm.freeWorkers = wm.freeWorkers[1:]
		wm.busyWorkers[w] = true
		w.jobChan <- j
	}
}

func (wm *workerManager) updateParents(j *job) {
	for _, p := range j.parents {
		p.numDeps--
		glog.V(1).Infof("child: %s (%d)", p.n.Output, p.numDeps)
		if p.depsTs < j.outputTs {
			p.depsTs = j.outputTs
		}
		wm.maybePushToReadyQueue(p)
	}
}

type workerManager struct {
	maxJobs     int
	jobs        []*job
	readyQueue  jobQueue
	jobChan     chan *job
	resultChan  chan jobResult
	newDepChan  chan newDep
	stopChan    chan bool
	waitChan    chan bool
	doneChan    chan error
	freeWorkers []*worker
	busyWorkers map[*worker]bool
	ex          *Executor
	runnings    map[string]*job

	finishCnt int
	skipCnt   int
}

func newWorkerManager(numJobs int) (*workerManager, error) {
	wm := &workerManager{
		maxJobs:     numJobs,
		jobChan:     make(chan *job),
		resultChan:  make(chan jobResult),
		newDepChan:  make(chan newDep),
		stopChan:    make(chan bool),
		waitChan:    make(chan bool),
		doneChan:    make(chan error),
		busyWorkers: make(map[*worker]bool),
	}

	wm.busyWorkers = make(map[*worker]bool)
	for i := 0; i < numJobs; i++ {
		w := newWorker(wm)
		wm.freeWorkers = append(wm.freeWorkers, w)
		go w.Run()
	}
	heap.Init(&wm.readyQueue)
	go wm.Run()
	return wm, nil
}

func exitStatus(err error) int {
	if err == nil {
		return 0
	}
	exit := 1
	if err, ok := err.(*exec.ExitError); ok {
		if w, ok := err.ProcessState.Sys().(syscall.WaitStatus); ok {
			return w.ExitStatus()
		}
	}
	return exit
}

func (wm *workerManager) hasTodo() bool {
	return wm.finishCnt != len(wm.jobs)
}

func (wm *workerManager) maybePushToReadyQueue(j *job) {
	if j.numDeps != 0 {
		return
	}
	heap.Push(&wm.readyQueue, j)
	glog.V(1).Infof("ready: %s", j.n.Output)
}

func (wm *workerManager) handleNewDep(j *job, neededBy *job) {
	if j.numDeps < 0 {
		neededBy.numDeps--
		if neededBy.id > 0 {
			panic("FIXME: already in WM... can this happen?")
		}
	} else {
		j.parents = append(j.parents, neededBy)
	}
}

func (wm *workerManager) Run() {
	done := false
	var err error
Loop:
	for wm.hasTodo() || len(wm.busyWorkers) > 0 || len(wm.runnings) > 0 || !done {
		select {
		case j := <-wm.jobChan:
			glog.V(1).Infof("wait: %s (%d)", j.n.Output, j.numDeps)
			j.id = len(wm.jobs) + 1
			wm.jobs = append(wm.jobs, j)
			wm.maybePushToReadyQueue(j)
		case jr := <-wm.resultChan:
			glog.V(1).Infof("done: %s", jr.j.n.Output)
			delete(wm.busyWorkers, jr.w)
			wm.freeWorkers = append(wm.freeWorkers, jr.w)
			wm.updateParents(jr.j)
			wm.finishCnt++
			if jr.err == errNothingDone {
				wm.skipCnt++
				jr.err = nil
			}
			if jr.err != nil {
				err = jr.err
				close(wm.stopChan)
				break Loop
			}
		case af := <-wm.newDepChan:
			wm.handleNewDep(af.j, af.neededBy)
			glog.V(1).Infof("dep: %s (%d) %s", af.neededBy.n.Output, af.neededBy.numDeps, af.j.n.Output)
		case done = <-wm.waitChan:
		}
		err = wm.handleJobs()
		if err != nil {
			break Loop
		}

		glog.V(1).Infof("job=%d ready=%d free=%d busy=%d", len(wm.jobs)-wm.finishCnt, wm.readyQueue.Len(), len(wm.freeWorkers), len(wm.busyWorkers))
	}
	if !done {
		<-wm.waitChan
	}

	for _, w := range wm.freeWorkers {
		w.Wait()
	}
	for w := range wm.busyWorkers {
		w.Wait()
	}
	wm.doneChan <- err
}

func (wm *workerManager) PostJob(j *job) error {
	select {
	case wm.jobChan <- j:
		return nil
	case <-wm.stopChan:
		return errors.New("worker manager stopped")
	}
}

func (wm *workerManager) ReportResult(w *worker, j *job, err error) {
	select {
	case wm.resultChan <- jobResult{w: w, j: j, err: err}:
	case <-wm.stopChan:
	}
}

func (wm *workerManager) ReportNewDep(j *job, neededBy *job) {
	select {
	case wm.newDepChan <- newDep{j: j, neededBy: neededBy}:
	case <-wm.stopChan:
	}
}

func (wm *workerManager) Wait() (int, error) {
	wm.waitChan <- true
	err := <-wm.doneChan
	glog.V(2).Infof("finish %d skip %d", wm.finishCnt, wm.skipCnt)
	return wm.finishCnt - wm.skipCnt, err
}