in go/cmd/ct-fetch/ct-fetch.go [578:730]
func (lw *LogWorker) Run(ctx context.Context, entryChan chan<- CtLogEntry) error {
var firstIndex, lastIndex uint64
switch lw.WorkOrder {
case Init:
if lw.STH.TreeSize < lw.JobSize {
firstIndex = 0
} else {
firstIndex = lw.STH.TreeSize - lw.JobSize
}
lastIndex = lw.STH.TreeSize - 1
glog.Infof("[%s] Running Init job %d %d", lw.Name(), firstIndex, lastIndex)
case Update:
firstIndex = lw.LogState.MaxEntry + 1
lastIndex = lw.LogState.MaxEntry + lw.JobSize
glog.Infof("[%s] Running Update job %d %d", lw.Name(), firstIndex, lastIndex)
case Backfill:
// We will make fewer get-entries requests to the CT Log if we align firstIndex
// to a power of two while backfilling.
// TODO(jms) document the fact that JobSize should be a power of two
if lw.LogState.MinEntry%lw.JobSize != 0 {
firstIndex = lw.LogState.MinEntry - (lw.LogState.MinEntry % lw.JobSize)
} else {
// Backfill implies MinEntry > 0, so MinEntry is a non-zero multiple of
// JobSize.
firstIndex = lw.LogState.MinEntry - lw.JobSize
}
lastIndex = lw.LogState.MinEntry - 1
glog.Infof("[%s] Running Backfill job %d %d", lw.Name(), firstIndex, lastIndex)
case Sleep:
lw.sleep(ctx)
return nil
}
if lastIndex > lw.STH.TreeSize-1 {
lastIndex = lw.STH.TreeSize - 1
}
glog.Infof("[%s] Downloading entries %d through %d",
lw.Name(), firstIndex, lastIndex)
// We're going to tell users that we downloaded entries
// |firstIndex| through |lastIndex|, and we want some assurance
// that we've actually done this (especially if we're not getting
// entries directly from the CT Log!). We'll ask the log for a
// consistency proof between the trees of size |firstIndex| and
// |lastIndex+1|. We'll then check that the entries we download
// generate the corresponding terms of the proof. We have to handle
// the case |firstIndex| = 0 specially.
//
// Note: we're essentially monitoring the log (as in section 5.3
// of RFC 6962). However, we can't always verify the consistency
// proofs that we request because we don't always have the
// necessary tree heads.
//
// TODO(jms): Check the proof when |newSize| = |lw.STH.TreeSize|
//
oldSize := firstIndex
newSize := lastIndex + 1
if oldSize == 0 {
// Special case: the consistency proof with |oldSize| =
// 0 is empty. With |oldSize| = 1 (or |oldSize| = 2) it
// doesn't include a hash that depends on entry 0 (resp.
// 0 or 1). We ensured newSize > 3 when we assigned this
// worker its job, so we can use oldSize = 3.
oldSize = 3
}
proof, err := lw.Client.GetSTHConsistency(ctx, oldSize, newSize)
if err != nil {
glog.Errorf("[%s] Unable to fetch consistency proof: %s", lw.Name(), err)
lw.sleep(ctx) // Assume this is a temporary outage and wait
return nil
}
// Annotate the proof with the leaves that influence each term.
subtrees, err := consistencyProofToSubtrees(proof, oldSize, newSize)
if err != nil {
glog.Errorf("[%s] Could not annotate proof: %s", lw.Name(), err)
return err
}
// We want to keep a contiguous set of verified entries in the
// database at all times. We'll queue the verifiers in the right
// order for this to happen.
verifiers := make([]*CtLogSubtreeVerifier, 0, len(subtrees))
switch lw.WorkOrder {
case Init:
fallthrough
case Update:
// We're updating towards the latest STH. Download subtrees
// in order of increasing first element.
for _, subtree := range subtrees {
if !(firstIndex <= subtree.First && subtree.Last <= lastIndex) {
continue
}
item := CtLogSubtreeVerifier{Subtree: subtree}
pos := sort.Search(len(verifiers),
func(i int) bool {
return subtree.First < verifiers[i].Subtree.First
})
verifiers = append(verifiers, nil)
copy(verifiers[pos+1:], verifiers[pos:])
verifiers[pos] = &item
}
case Backfill:
// We're backfilling towards index 0. Download subtrees
// in order of decreasing last element.
for _, subtree := range subtrees {
if !(firstIndex <= subtree.First && subtree.Last <= lastIndex) {
continue
}
item := CtLogSubtreeVerifier{Subtree: subtree}
pos := sort.Search(len(verifiers),
func(i int) bool {
return subtree.Last > verifiers[i].Subtree.Last
})
verifiers = append(verifiers, nil)
copy(verifiers[pos+1:], verifiers[pos:])
verifiers[pos] = &item
}
}
// Download entries and verify checksums
Loop:
for _, verifier := range verifiers {
minTimestamp, maxTimestamp, err := lw.downloadCTRangeToChannel(ctx, verifier, entryChan)
if err != nil {
glog.Errorf("[%s] downloadCTRangeToChannel exited with an error: %s.", lw.Name(), err)
lw.sleep(ctx) // Assume this is a temporary outage and wait
return nil
}
err = verifier.CheckClaim()
if err != nil {
glog.Errorf("[%s] downloadCTRangeToChannel could not verify entries %d-%d: %s",
lw.Name(), verifier.Subtree.First, verifier.Subtree.Last, err)
return err
}
err = lw.saveState(&verifier.Subtree, minTimestamp, maxTimestamp)
if err != nil {
glog.Errorf("[%s] Failed to update log state: %s", lw.Name(), err)
return err
}
select {
case <-ctx.Done():
break Loop
default:
}
}
glog.Infof("[%s] Verified entries %d-%d", lw.Name(), verifiers[0].Subtree.First, verifiers[len(verifiers)-1].Subtree.Last)
return nil
}