go/cmd/ct-fetch/ct-fetch.go (824 lines of code) (raw):
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
// Based on github.com/jcjones/ct-sql/
package main
import (
"bytes"
"context"
"crypto"
"encoding/json"
"fmt"
"io/ioutil"
"math/bits"
"math/rand"
"net/http"
"net/url"
"os"
"os/signal"
"sort"
"strings"
"sync"
"syscall"
"time"
"github.com/golang/glog"
"github.com/google/certificate-transparency-go"
"github.com/google/certificate-transparency-go/client"
"github.com/google/certificate-transparency-go/jsonclient"
"github.com/google/certificate-transparency-go/x509"
"github.com/hashicorp/go-metrics"
"github.com/jpillora/backoff"
"github.com/mozilla/crlite/go"
"github.com/mozilla/crlite/go/config"
"github.com/mozilla/crlite/go/engine"
"github.com/mozilla/crlite/go/storage"
)
var (
ctconfig = config.NewCTConfig()
httpClient = http.Client{
Timeout: 10 * time.Second,
Transport: &http.Transport{
TLSHandshakeTimeout: 30 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
MaxIdleConnsPerHost: 10,
DisableKeepAlives: false,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}
)
func uint64Min(x, y uint64) uint64 {
if x < y {
return x
}
return y
}
func uint64Max(x, y uint64) uint64 {
if x > y {
return x
}
return y
}
func uint64ToTimestamp(timestamp uint64) *time.Time {
t := time.Unix(int64(timestamp/1000), int64(timestamp%1000))
return &t
}
type CtLogEntry struct {
LogEntry *ct.LogEntry
LogMeta *types.CTLogMetadata
}
type CtLogSubtree struct {
Root []byte
First uint64
Last uint64
}
func (r *CtLogSubtree) Size() uint64 {
return r.Last - r.First + 1
}
func (r *CtLogSubtree) Midpoint() uint64 {
size := r.Size()
prevPow2 := uint64(0)
if size > 1 {
prevPow2 = 1 << (bits.Len64(size-1) - 1)
}
return r.First + prevPow2
}
func rfc6962LeafHash(leaf []byte) []byte {
// Specified in section 2.1 of RFC 6962
h := crypto.SHA256.New()
h.Write([]byte{0})
h.Write(leaf)
return h.Sum(nil)
}
func rfc6962PairHash(left, right []byte) []byte {
// Specified in section 2.1 of RFC 6962
h := crypto.SHA256.New()
h.Write([]byte{1})
h.Write(left)
h.Write(right)
return h.Sum(nil)
}
type CtLogSubtreeVerifier struct {
Subtree CtLogSubtree
hashStack [][]byte // Scratch space for computing tree hash
numConsumed uint64 // Number of leaves hashed into hashStack
}
func (v *CtLogSubtreeVerifier) Consume(leaf []byte) {
if v.numConsumed == 0 {
// The hash stack might need to store a full sibling path
v.hashStack = make([][]byte, 0, bits.Len64(v.Subtree.Size())+2)
}
// Push the new leaf hash, H(0 || leaf), onto the stack
v.hashStack = append(v.hashStack, rfc6962LeafHash(leaf))
v.numConsumed += 1
// Now we'll iteratively pop pairs of siblings off the stack and
// replace them by their parent hash.
var iter int
if v.numConsumed >= v.Subtree.Size() {
// If we've consumed the whole subtree (or too many leaves!) then
// we'll iterate until there's only one element on the stack.
iter = len(v.hashStack) - 1
} else {
// Otherwise, there is a largest complete (i.e. power-of-two sized)
// subtree that contains the leaf that we just consumed. We'll iterate
// until the root of that subtree is on top of the stack.
iter = bits.TrailingZeros64(v.numConsumed)
}
for iter > 0 {
n := len(v.hashStack) - 1
L := v.hashStack[n-1]
R := v.hashStack[n]
v.hashStack = v.hashStack[:n-1]
v.hashStack = append(v.hashStack, rfc6962PairHash(L, R))
iter -= 1
}
}
func (v *CtLogSubtreeVerifier) CheckClaim() error {
if len(v.Subtree.Root) != crypto.SHA256.Size() {
return fmt.Errorf("CtLogSubtreeVerifier: Claim has the wrong length.")
}
if v.numConsumed != v.Subtree.Size() {
return fmt.Errorf("CtLogSubtreeVerifier: Consumed %d leaves but needed %d.", v.numConsumed, v.Subtree.Size())
}
if bytes.Compare(v.Subtree.Root, v.hashStack[0]) != 0 {
return fmt.Errorf("CtLogSubtreeVerifier: Verification failed.")
}
return nil
}
func consistencyProofToSubtrees(proof [][]byte, oldSize, newSize uint64) ([]CtLogSubtree, error) {
// Annotates a consistency proof with the indices needed to check it.
if newSize <= oldSize {
return nil, fmt.Errorf("Empty proof")
}
terms := make([]CtLogSubtree, 0, bits.Len64(newSize)+2)
// A consistency proof between |oldSize| and |newSize| is
// "almost" an inclusion proof for index |oldSize|-1 in the tree
// of size |newSize|. "Almost" because we can omit terms from
// the old tree so long as we provide enough information to
// recover the old tree head.
//
// We represent the current node by the the set of leaves below
// it, so each internal node of the tree looks like:
// [low, high]
// / \
// [low, mid-1] [mid, high]
// (The value of mid is determined by the size of the [low, high]
// interval.)
//
// We will traverse from the root towards the leaf at index
// |oldSize|-1, and we will record the set of leaves that lie
// below the sibling of each node that we visit.
//
cursor := CtLogSubtree{First: uint64(0), Last: uint64(newSize - 1)}
target := uint64(oldSize - 1)
// We walk down the tree until we reach a leaf (low == high) or
// a node which is in the old tree (high <= target). Both conditions
// are necessary if we are to handle the |oldSize| = 0 case.
//
for cursor.First != cursor.Last && cursor.Last != target {
mid := cursor.Midpoint()
if target < mid {
terms = append(terms, CtLogSubtree{First: mid, Last: cursor.Last})
cursor.Last = mid - 1
} else {
terms = append(terms, CtLogSubtree{First: cursor.First, Last: mid - 1})
cursor.First = mid
}
}
// The cursor is at node [low, high] and we have just recorded
// this node's sibling. We need to record enough information to
// recover the old tree head. If |oldSize| is a power of two,
// then the current node is the old tree head and the caller
// already knows its value. Otherwise we need to record the
// current node so that the caller can recover the old tree
// head.
//
if (oldSize & (oldSize - 1)) != 0 { // 0 < |oldSize| is not a power of 2
terms = append(terms, cursor)
}
if len(terms) != len(proof) {
return nil, fmt.Errorf("Expected proof of length %d and got %d.",
len(terms), len(proof))
}
// Reverse the list to conform with the presentation from RFC 6962
for i, j := 0, len(terms)-1; i < j; i, j = i+1, j-1 {
terms[i], terms[j] = terms[j], terms[i]
}
for i := 0; i < len(proof); i++ {
terms[i].Root = proof[i]
}
return terms, nil
}
// Coordinates all workers
type LogSyncEngine struct {
ThreadWaitGroup *sync.WaitGroup
DownloaderWaitGroup *sync.WaitGroup
database storage.CertDatabase
entryChan chan CtLogEntry
lastUpdateTime time.Time
lastUpdateMutex *sync.RWMutex
}
// Operates on a single log
type LogWorker struct {
Database storage.CertDatabase
Client *client.LogClient
LogMeta *types.CTLogMetadata
STH *ct.SignedTreeHead
LogState *types.CTLogState
WorkOrder LogWorkerTask
JobSize uint64
MetricKey string
}
func (lw LogWorker) Name() string {
return lw.LogMeta.URL
}
type LogWorkerTask int
const (
Init LogWorkerTask = iota // Initialize db with one batch of recent certs
Backfill // Download old certs
Update // Download new certs
Sleep // Wait for an STH update
)
func NewLogSyncEngine(db storage.CertDatabase) *LogSyncEngine {
return &LogSyncEngine{
ThreadWaitGroup: new(sync.WaitGroup),
DownloaderWaitGroup: new(sync.WaitGroup),
database: db,
entryChan: make(chan CtLogEntry, 1024*16),
lastUpdateTime: time.Time{},
lastUpdateMutex: &sync.RWMutex{},
}
}
func (ld *LogSyncEngine) StartDatabaseThreads() {
glog.Infof("Starting %d threads...", *ctconfig.NumThreads)
for t := 0; t < *ctconfig.NumThreads; t++ {
go ld.insertCTWorker()
}
}
// Blocking function, run from a thread
func (ld *LogSyncEngine) SyncLog(ctx context.Context, enrolledLogs *EnrolledLogs, logMeta types.CTLogMetadata) error {
ld.DownloaderWaitGroup.Add(1)
defer ld.DownloaderWaitGroup.Done()
if err := ld.database.Migrate(&logMeta); err != nil {
return err
}
for {
if !enrolledLogs.IsEnrolled(logMeta.LogID) {
return nil
}
worker, err := ld.NewLogWorker(ctx, &logMeta)
if err != nil {
metrics.IncrCounter([]string{"sync", "error"}, 1)
return err
}
err = worker.Run(ctx, ld.entryChan)
if err != nil {
glog.Errorf("[%s] Could not sync log: %s", logMeta.URL, err)
metrics.IncrCounter([]string{"sync", "error"}, 1)
return err
}
// We did useful work. Register an update for the health service.
ld.RegisterUpdate()
if !*ctconfig.RunForever {
return nil
}
select {
case <-ctx.Done():
glog.Infof("[%s] Downloader exiting.", logMeta.URL)
return nil
default:
}
}
}
func (ld *LogSyncEngine) RegisterUpdate() {
metrics.IncrCounter([]string{"sync", "progress"}, 1)
ld.lastUpdateMutex.Lock()
defer ld.lastUpdateMutex.Unlock()
ld.lastUpdateTime = time.Now()
}
func (ld *LogSyncEngine) ApproximateMostRecentUpdateTimestamp() time.Time {
ld.lastUpdateMutex.RLock()
defer ld.lastUpdateMutex.RUnlock()
return ld.lastUpdateTime
}
func (ld *LogSyncEngine) Wait() {
// Wait for the CT Log downloaders to finish. If we're configured
// to run forever, then this only happens if there are no enrolled logs,
// or if all downloaders have encountered an error, or if the main thread's
// cancel function has been called.
ld.DownloaderWaitGroup.Wait()
// No more log entries will be downloaded.
close(ld.entryChan)
// Finish handling |ld.entryChan|
glog.Infof("Waiting on database writes to complete: %d remaining", len(ld.entryChan))
ld.ThreadWaitGroup.Wait()
}
func (ld *LogSyncEngine) insertCTWorker() {
ld.ThreadWaitGroup.Add(1)
defer ld.ThreadWaitGroup.Done()
healthStatusPeriod, _ := time.ParseDuration("15s")
healthStatusJitter := rand.Int63n(15 * 1000)
healthStatusDuration := healthStatusPeriod + time.Duration(healthStatusJitter)*time.Millisecond
glog.Infof("Thread health status period: %v + %v = %v", healthStatusPeriod, healthStatusJitter, healthStatusDuration)
healthStatusTicker := time.NewTicker(healthStatusDuration)
defer healthStatusTicker.Stop()
for ep := range ld.entryChan {
select { // Taking something off the queue is useful work.
// So indicate server health when requested.
case <-healthStatusTicker.C:
ld.RegisterUpdate()
default:
}
var cert *x509.Certificate
var err error
precert := false
switch ep.LogEntry.Leaf.TimestampedEntry.EntryType {
case ct.X509LogEntryType:
cert = ep.LogEntry.X509Cert
case ct.PrecertLogEntryType:
cert, err = x509.ParseCertificate(ep.LogEntry.Precert.Submitted.Data)
precert = true
}
if cert == nil {
glog.Errorf("[%s] Fatal parsing error: index: %d error: %v", ep.LogMeta.URL, ep.LogEntry.Index, err)
continue
}
if err != nil {
glog.Warningf("[%s] Nonfatal parsing error: index: %d error: %s", ep.LogMeta.URL, ep.LogEntry.Index, err)
}
// Skip expired certificates unless configured otherwise
if cert.NotAfter.Before(time.Now()) && !*ctconfig.LogExpiredEntries {
continue
}
if len(ep.LogEntry.Chain) < 1 {
glog.Warningf("[%s] No issuer known for certificate precert=%v index=%d serial=%s subject=%+v issuer=%+v",
ep.LogMeta.URL, precert, ep.LogEntry.Index, types.NewSerial(cert).String(), cert.Subject, cert.Issuer)
continue
}
preIssuerOrIssuingCert, err := x509.ParseCertificate(ep.LogEntry.Chain[0].Data)
if err != nil {
glog.Errorf("[%s] Problem decoding issuing certificate: index: %d error: %s", ep.LogMeta.URL, ep.LogEntry.Index, err)
continue
}
// RFC 6962 allows a precertificate to be signed by "a
// special-purpose [...] Precertificate Signing Certificate
// [that is] certified by the (root or intermediate) CA
// certificate that will ultimately sign the end-entity". In
// this case, the certificate that will issue the final cert is
// the second entry in the chain (ep.LogEntry.Chain[1]).
var issuingCert *x509.Certificate
if types.IsPreIssuer(preIssuerOrIssuingCert) {
if !precert {
glog.Errorf("[%s] X509LogEntry issuer has precertificate signing EKU: index: %d", ep.LogMeta.URL, ep.LogEntry.Index)
continue
}
if len(ep.LogEntry.Chain) < 2 {
glog.Warningf("[%s] No issuer known for certificate precert=%v index=%d serial=%s subject=%+v issuer=%+v",
ep.LogMeta.URL, precert, ep.LogEntry.Index, types.NewSerial(cert).String(), cert.Subject, cert.Issuer)
continue
}
issuingCert, err = x509.ParseCertificate(ep.LogEntry.Chain[1].Data)
if err != nil {
glog.Errorf("[%s] Problem decoding issuing certificate: index: %d error: %s", ep.LogMeta.URL, ep.LogEntry.Index, err)
continue
}
// Bug 1955023 - This program previously failed to
// handle precertificate signing certificates. This
// caused some serial numbers to be stored in the bin
// labeled "issuer::<precertificate issuer id>" rather
// than "issuer::<issuer id>". Adding a preissuer alias
// causes CertDatabase to merge entries from
// "issuer::<precertificate issuer id>" into
// "issuer::<issuer id>" in future commits.
issuer := types.NewIssuer(issuingCert)
preissuer := types.NewIssuer(preIssuerOrIssuingCert)
err = ld.database.AddPreIssuerAlias(preissuer, issuer)
if err != nil {
glog.Warningf("[%s] Failed to add preissuer alias. index: %d, issuer=%+v", ep.LogMeta.URL, ep.LogEntry.Index, cert.Issuer)
}
} else {
issuingCert = preIssuerOrIssuingCert
}
err = ld.database.Store(cert, issuingCert, ep.LogMeta.URL, ep.LogEntry.Index)
if err != nil {
glog.Errorf("[%s] Problem inserting certificate: index: %d error: %s", ep.LogMeta.URL, ep.LogEntry.Index, err)
}
}
}
func (ld *LogSyncEngine) NewLogWorker(ctx context.Context, ctLogMeta *types.CTLogMetadata) (*LogWorker, error) {
batchSize := *ctconfig.BatchSize
logUrlObj, err := url.Parse(ctLogMeta.URL)
if err != nil {
glog.Errorf("[%s] Unable to parse CT Log URL: %s", ctLogMeta.URL, err)
return nil, err
}
logObj, err := ld.database.GetLogState(logUrlObj)
if err != nil {
glog.Errorf("[%s] Unable to get cached CT Log state: %s", ctLogMeta.URL, err)
return nil, err
}
if logObj.LogID != ctLogMeta.LogID {
// The LogID shouldn't change, but we'll treat the input as
// authoritative. Old versions of ct-fetch didn't store the
// LogID in redis, so we will hit this on upgrade.
logObj.LogID = ctLogMeta.LogID
}
if logObj.MMD != uint64(ctLogMeta.MMD) {
// Likewise storing MMD is new.
logObj.MMD = uint64(ctLogMeta.MMD)
}
ctLog, err := client.New(ctLogMeta.URL, &httpClient, jsonclient.Options{
UserAgent: "ct-fetch; https://github.com/mozilla/crlite",
})
if err != nil {
glog.Errorf("[%s] Unable to construct CT log client: %s", ctLogMeta.URL, err)
return nil, err
}
glog.Infof("[%s] Fetching signed tree head... ", ctLogMeta.URL)
sth, fetchErr := ctLog.GetSTH(ctx)
if fetchErr == nil {
glog.Infof("[%s] %d total entries as of %s", ctLogMeta.URL, sth.TreeSize,
uint64ToTimestamp(sth.Timestamp).Format(time.ANSIC))
}
// Determine what the worker should do.
var task LogWorkerTask
if fetchErr != nil {
// Temporary network failure?
glog.Warningf("[%s] Unable to fetch signed tree head: %s", ctLogMeta.URL, fetchErr)
task = Sleep
} else if sth.TreeSize <= 3 {
// For technical reasons, we can't verify our download
// until there are at least 3 entries in the log. So
// we'll wait.
task = Sleep
} else if logObj.LastUpdateTime.IsZero() {
// First contact with log
task = Init
} else if logObj.MaxEntry+batchSize < sth.TreeSize {
// There are many new entries to download.
task = Update
} else if logObj.MinEntry > 0 {
// There are not many new entries, but there's a
// backlog of old entries. Prioritize the backlog.
task = Backfill
} else if time.Since(logObj.LastUpdateTime) < 10*time.Minute {
// There are few new entries, no old entries, and we updated
// recently. So sleep.
task = Sleep
} else if logObj.MaxEntry < sth.TreeSize-1 {
// There is at least one new entry and we haven't
// downloaded anything recently.
task = Update
} else {
// There are no new entries.
task = Sleep
}
metricKey := ctLogMeta.MetricKey()
if sth != nil {
metrics.SetGauge([]string{metricKey, "coverage"}, float32(logObj.MaxEntry-logObj.MinEntry+1)/float32(sth.TreeSize))
}
return &LogWorker{
Database: ld.database,
Client: ctLog,
LogState: logObj,
LogMeta: ctLogMeta,
STH: sth,
WorkOrder: task,
JobSize: batchSize,
MetricKey: metricKey,
}, nil
}
func (lw *LogWorker) sleep(ctx context.Context) {
// Sleep for ctconfig.PollingDelay seconds (+/- 10%).
jitteredPollingDelay := (1 + 0.1*rand.NormFloat64()) * float64(*ctconfig.PollingDelay)
duration := time.Duration(jitteredPollingDelay) * time.Second
glog.Infof("[%s] Stopped. Sleeping for %d seconds", lw.Name(), int(jitteredPollingDelay))
select {
case <-ctx.Done():
glog.Infof("[%s] Signal caught. Exiting.", lw.Name())
case <-time.After(duration):
}
}
func (lw *LogWorker) Run(ctx context.Context, entryChan chan<- CtLogEntry) error {
var firstIndex, lastIndex uint64
switch lw.WorkOrder {
case Init:
if lw.STH.TreeSize < lw.JobSize {
firstIndex = 0
} else {
firstIndex = lw.STH.TreeSize - lw.JobSize
}
lastIndex = lw.STH.TreeSize - 1
glog.Infof("[%s] Running Init job %d %d", lw.Name(), firstIndex, lastIndex)
case Update:
firstIndex = lw.LogState.MaxEntry + 1
lastIndex = lw.LogState.MaxEntry + lw.JobSize
glog.Infof("[%s] Running Update job %d %d", lw.Name(), firstIndex, lastIndex)
case Backfill:
// We will make fewer get-entries requests to the CT Log if we align firstIndex
// to a power of two while backfilling.
// TODO(jms) document the fact that JobSize should be a power of two
if lw.LogState.MinEntry%lw.JobSize != 0 {
firstIndex = lw.LogState.MinEntry - (lw.LogState.MinEntry % lw.JobSize)
} else {
// Backfill implies MinEntry > 0, so MinEntry is a non-zero multiple of
// JobSize.
firstIndex = lw.LogState.MinEntry - lw.JobSize
}
lastIndex = lw.LogState.MinEntry - 1
glog.Infof("[%s] Running Backfill job %d %d", lw.Name(), firstIndex, lastIndex)
case Sleep:
lw.sleep(ctx)
return nil
}
if lastIndex > lw.STH.TreeSize-1 {
lastIndex = lw.STH.TreeSize - 1
}
glog.Infof("[%s] Downloading entries %d through %d",
lw.Name(), firstIndex, lastIndex)
// We're going to tell users that we downloaded entries
// |firstIndex| through |lastIndex|, and we want some assurance
// that we've actually done this (especially if we're not getting
// entries directly from the CT Log!). We'll ask the log for a
// consistency proof between the trees of size |firstIndex| and
// |lastIndex+1|. We'll then check that the entries we download
// generate the corresponding terms of the proof. We have to handle
// the case |firstIndex| = 0 specially.
//
// Note: we're essentially monitoring the log (as in section 5.3
// of RFC 6962). However, we can't always verify the consistency
// proofs that we request because we don't always have the
// necessary tree heads.
//
// TODO(jms): Check the proof when |newSize| = |lw.STH.TreeSize|
//
oldSize := firstIndex
newSize := lastIndex + 1
if oldSize == 0 {
// Special case: the consistency proof with |oldSize| =
// 0 is empty. With |oldSize| = 1 (or |oldSize| = 2) it
// doesn't include a hash that depends on entry 0 (resp.
// 0 or 1). We ensured newSize > 3 when we assigned this
// worker its job, so we can use oldSize = 3.
oldSize = 3
}
proof, err := lw.Client.GetSTHConsistency(ctx, oldSize, newSize)
if err != nil {
glog.Errorf("[%s] Unable to fetch consistency proof: %s", lw.Name(), err)
lw.sleep(ctx) // Assume this is a temporary outage and wait
return nil
}
// Annotate the proof with the leaves that influence each term.
subtrees, err := consistencyProofToSubtrees(proof, oldSize, newSize)
if err != nil {
glog.Errorf("[%s] Could not annotate proof: %s", lw.Name(), err)
return err
}
// We want to keep a contiguous set of verified entries in the
// database at all times. We'll queue the verifiers in the right
// order for this to happen.
verifiers := make([]*CtLogSubtreeVerifier, 0, len(subtrees))
switch lw.WorkOrder {
case Init:
fallthrough
case Update:
// We're updating towards the latest STH. Download subtrees
// in order of increasing first element.
for _, subtree := range subtrees {
if !(firstIndex <= subtree.First && subtree.Last <= lastIndex) {
continue
}
item := CtLogSubtreeVerifier{Subtree: subtree}
pos := sort.Search(len(verifiers),
func(i int) bool {
return subtree.First < verifiers[i].Subtree.First
})
verifiers = append(verifiers, nil)
copy(verifiers[pos+1:], verifiers[pos:])
verifiers[pos] = &item
}
case Backfill:
// We're backfilling towards index 0. Download subtrees
// in order of decreasing last element.
for _, subtree := range subtrees {
if !(firstIndex <= subtree.First && subtree.Last <= lastIndex) {
continue
}
item := CtLogSubtreeVerifier{Subtree: subtree}
pos := sort.Search(len(verifiers),
func(i int) bool {
return subtree.Last > verifiers[i].Subtree.Last
})
verifiers = append(verifiers, nil)
copy(verifiers[pos+1:], verifiers[pos:])
verifiers[pos] = &item
}
}
// Download entries and verify checksums
Loop:
for _, verifier := range verifiers {
minTimestamp, maxTimestamp, err := lw.downloadCTRangeToChannel(ctx, verifier, entryChan)
if err != nil {
glog.Errorf("[%s] downloadCTRangeToChannel exited with an error: %s.", lw.Name(), err)
lw.sleep(ctx) // Assume this is a temporary outage and wait
return nil
}
err = verifier.CheckClaim()
if err != nil {
glog.Errorf("[%s] downloadCTRangeToChannel could not verify entries %d-%d: %s",
lw.Name(), verifier.Subtree.First, verifier.Subtree.Last, err)
return err
}
err = lw.saveState(&verifier.Subtree, minTimestamp, maxTimestamp)
if err != nil {
glog.Errorf("[%s] Failed to update log state: %s", lw.Name(), err)
return err
}
select {
case <-ctx.Done():
break Loop
default:
}
}
glog.Infof("[%s] Verified entries %d-%d", lw.Name(), verifiers[0].Subtree.First, verifiers[len(verifiers)-1].Subtree.Last)
return nil
}
func (lw *LogWorker) saveState(newSubtree *CtLogSubtree, minTimestamp, maxTimestamp uint64) error {
// TODO(jms) Block until entry channel is empty and database writes are complete
// Depends on: using a separate entry channel per log
// Ensure that the entries in newSubtree are contiguous with the DB.
switch lw.WorkOrder {
case Init:
if lw.LogState.LastUpdateTime.IsZero() {
// New log. We need to initialize Min{Entry,Timestamp}.
// Subsequent calls with WorkOrder=Init only update Max{Entry,Timestamp}
lw.LogState.MinEntry = newSubtree.First
lw.LogState.MaxEntry = newSubtree.Last
lw.LogState.MinTimestamp = minTimestamp
lw.LogState.MaxTimestamp = maxTimestamp
} else if lw.LogState.MaxEntry == newSubtree.First-1 {
lw.LogState.MaxEntry = newSubtree.Last
} else {
return fmt.Errorf("Missing entries")
}
case Update:
if lw.LogState.MaxEntry == newSubtree.First-1 {
lw.LogState.MaxEntry = newSubtree.Last
} else {
return fmt.Errorf("Missing entries")
}
case Backfill:
if lw.LogState.MinEntry == newSubtree.Last+1 {
lw.LogState.MinEntry = newSubtree.First
} else {
return fmt.Errorf("Missing entries")
}
default:
return fmt.Errorf("Unknown work order")
}
// TODO(jms): We could do some sanity checks here. E.g. if the work order is
// Update and LogState.MaxTimestamp is >= 1 MMD ahead of LogState.MinTimestamp
// then LogState.MinTimestamp should not change.
lw.LogState.MinTimestamp = uint64Min(lw.LogState.MinTimestamp, minTimestamp)
lw.LogState.MaxTimestamp = uint64Max(lw.LogState.MaxTimestamp, maxTimestamp)
lw.LogState.LastUpdateTime = time.Now()
saveErr := lw.Database.SaveLogState(lw.LogState)
if saveErr != nil {
return fmt.Errorf("Database error: %s", saveErr)
}
glog.Infof("[%s] Saved log state: %s", lw.Name(), lw.LogState)
return nil
}
func (lw *LogWorker) downloadCTRangeToChannel(ctx context.Context, verifier *CtLogSubtreeVerifier, entryChan chan<- CtLogEntry) (uint64, uint64, error) {
var minTimestamp uint64
var maxTimestamp uint64
b := &backoff.Backoff{
Jitter: true,
Min: 5 * time.Second,
Max: 10 * time.Minute,
}
index := verifier.Subtree.First
last := verifier.Subtree.Last
for index <= last {
// TODO(jms) Add an option to get entries from disk.
resp, err := lw.Client.GetRawEntries(ctx, int64(index), int64(last))
if err != nil {
if strings.Contains(err.Error(), "HTTP Status") &&
(strings.Contains(err.Error(), "429") || strings.Contains(err.Error(), "Too Many Requests")) {
d := b.Duration()
glog.Infof("[%s] received status code 429 at index=%d, retrying in %s: %v", lw.Name(), index, d, err)
time.Sleep(d)
continue
}
glog.Warningf("Failed to get entries: %v", err)
return minTimestamp, maxTimestamp, err
}
b.Reset()
for _, entry := range resp.Entries {
logEntry, err := ct.LogEntryFromLeaf(int64(index), &entry)
if _, ok := err.(x509.NonFatalErrors); !ok && err != nil {
glog.Warningf("Erroneous certificate: log=%s index=%d err=%v",
lw.Name(), index, err)
// This is a serious error that prevents us from ingesting a log, so
// we ping the `ct-fetch.parse.error` metric to generate an alert and
// also the `ct-fetch.<log key>.parse.error` metric to identify the log.
metrics.IncrCounter([]string{"parse", "error"}, 1)
metrics.IncrCounter([]string{lw.MetricKey, "parse", "error"}, 1)
index++
continue
}
// We might block while waiting for space in entryChan.
// If we catch a signal here the verification will fail and the subtree
// will not get merged.
select {
case <-ctx.Done():
glog.Infof("[%s] Cancelled", lw.Name())
return minTimestamp, maxTimestamp, nil
case entryChan <- CtLogEntry{logEntry, lw.LogMeta}:
}
// Update the metadata that we will pass to mergeSubtree.
entryTimestamp := logEntry.Leaf.TimestampedEntry.Timestamp
if minTimestamp == 0 || entryTimestamp < minTimestamp {
minTimestamp = entryTimestamp
}
if maxTimestamp == 0 || maxTimestamp < entryTimestamp {
maxTimestamp = entryTimestamp
}
verifier.Consume(entry.LeafInput)
index++
}
}
return minTimestamp, maxTimestamp, nil
}
type EnrolledLogs struct {
wg *sync.WaitGroup
mutex *sync.RWMutex
metadata map[string]types.CTLogMetadata
NewChan chan types.CTLogMetadata
}
func NewEnrolledLogs() *EnrolledLogs {
wg := new(sync.WaitGroup)
wg.Add(1)
return &EnrolledLogs{
wg: wg,
mutex: new(sync.RWMutex),
metadata: make(map[string]types.CTLogMetadata),
NewChan: make(chan types.CTLogMetadata),
}
}
func (el *EnrolledLogs) Finalize() {
el.wg.Done()
close(el.NewChan)
}
func (el *EnrolledLogs) Wait() {
el.wg.Wait()
}
func (el *EnrolledLogs) Count() int {
el.mutex.RLock()
defer el.mutex.RUnlock()
return len(el.metadata)
}
func (el *EnrolledLogs) Enroll(ctLog types.CTLogMetadata) {
el.mutex.Lock()
defer el.mutex.Unlock()
_, prs := el.metadata[ctLog.LogID]
if !prs {
el.metadata[ctLog.LogID] = ctLog
el.NewChan <- ctLog
}
}
func (el *EnrolledLogs) Unenroll(ctLog types.CTLogMetadata) {
el.mutex.Lock()
defer el.mutex.Unlock()
delete(el.metadata, ctLog.LogID)
}
func (el *EnrolledLogs) IsEnrolled(logID string) bool {
el.mutex.RLock()
defer el.mutex.RUnlock()
_, prs := el.metadata[logID]
return prs
}
func (el *EnrolledLogs) updateFromRemoteSettingsOnce() error {
remoteSettingsURL, err := url.Parse(*ctconfig.RemoteSettingsURL)
if err != nil {
return err
}
if remoteSettingsURL.Scheme != "https" {
glog.Warning("Changing RemoteSettingsURL scheme to https")
remoteSettingsURL.Scheme = "https"
}
ctLogConfURL, _ := remoteSettingsURL.Parse(
"buckets/security-state/collections/ct-logs/records")
httpRsp, err := httpClient.Get(ctLogConfURL.String())
if err != nil {
return err
}
body, err := ioutil.ReadAll(httpRsp.Body)
httpRsp.Body.Close()
if err != nil {
return err
}
if httpRsp.StatusCode != http.StatusOK {
return fmt.Errorf("HTTP Status %q", httpRsp.Status)
}
// The response from the remote settings server is
// { "data" : []CTLogMetadata }
var ctLogJSON map[string][]types.CTLogMetadata
if err := json.Unmarshal([]byte(body), &ctLogJSON); err != nil {
return err
}
_, exists := ctLogJSON["data"]
if !exists {
return fmt.Errorf("Malformed response from Remote Settings %s", *ctconfig.RemoteSettingsURL)
}
el.mutex.Lock()
defer el.mutex.Unlock()
for _, ctLog := range ctLogJSON["data"] {
_, prs := el.metadata[ctLog.LogID]
if prs {
if !ctLog.CRLiteEnrolled {
delete(el.metadata, ctLog.LogID)
glog.Infof("[%s] Unenrolled", ctLog.URL)
} else {
glog.Infof("[%s] Remains enrolled", ctLog.URL)
}
} else {
if ctLog.CRLiteEnrolled {
el.metadata[ctLog.LogID] = ctLog
el.NewChan <- ctLog
glog.Infof("[%s] Enrolled with LogID %s", ctLog.URL, ctLog.LogID)
}
}
}
return nil
}
func (el *EnrolledLogs) UpdateFromRemoteSettings(ctx context.Context) {
defer el.Finalize()
for {
glog.Infof("Updating Enrolled CT log list from remote settings.")
err := el.updateFromRemoteSettingsOnce()
if err != nil {
glog.Errorf("Unable to get enrolled logs from Remote Settings: %s", err)
}
if !*ctconfig.RunForever {
return
}
glog.Infof("There are %d logs enrolled. Polling again in %d seconds.", el.Count(), *ctconfig.RemoteSettingsUpdateInterval)
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(*ctconfig.RemoteSettingsUpdateInterval) * time.Second):
}
}
}
func main() {
defer glog.Flush()
ctconfig.Init()
ctx := context.Background()
ctx, cancelMain := context.WithCancel(ctx)
// Try to handle SIGINT and SIGTERM gracefully
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
defer close(sigChan)
go func() {
sig := <-sigChan
glog.Infof("Signal caught: %s..", sig)
cancelMain()
signal.Stop(sigChan) // Restore default behavior
}()
// Seed random for clock jitter
rand.Seed(time.Now().UnixNano())
storageDB, _ := engine.GetConfiguredStorage(ctx, ctconfig, true)
err := storageDB.EnsureCacheIsConsistent()
if err != nil {
glog.Errorf("Could not recover cache: %s", err)
os.Exit(1)
}
engine.PrepareTelemetry("ct-fetch", ctconfig)
enrolledLogs := NewEnrolledLogs()
syncEngine := NewLogSyncEngine(storageDB)
// Start a pool of threads to parse and store log entries
syncEngine.StartDatabaseThreads()
// Sync with logs as they are enrolled
go func() {
for ctLog := range enrolledLogs.NewChan {
glog.Infof("[%s] Starting download.", ctLog.URL)
go syncEngine.SyncLog(ctx, enrolledLogs, ctLog)
}
}()
// Enroll logs from local settings
if *ctconfig.CTLogMetadata != "" {
localCTLogList := new([]types.CTLogMetadata)
if err := json.Unmarshal([]byte(*ctconfig.CTLogMetadata), localCTLogList); err != nil {
glog.Fatalf("Unable to parse CTLogMetadata argument: %s", err)
}
for _, ctLog := range *localCTLogList {
if ctLog.CRLiteEnrolled {
enrolledLogs.Enroll(ctLog)
}
}
}
if enrolledLogs.Count() == 0 && *ctconfig.RemoteSettingsURL == "" {
// Didn't include a mandatory action, so print usage and exit.
if *ctconfig.CTLogMetadata != "" {
glog.Warningf("No enrolled logs found in %s.", *ctconfig.CTLogMetadata)
}
ctconfig.Usage()
os.Exit(2)
}
// If we're configured with a Remote Settings URL, we'll periodically look for
// newly enrolled logs in Remote Settings. Otherwise we have all of the logs already.
if *ctconfig.RemoteSettingsURL != "" {
go enrolledLogs.UpdateFromRemoteSettings(ctx)
} else {
enrolledLogs.Finalize()
}
healthHandler := http.NewServeMux()
healthHandler.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
approxUpdateTimestamp := syncEngine.ApproximateMostRecentUpdateTimestamp()
if approxUpdateTimestamp.IsZero() {
w.Header().Add("Retry-After", "30")
w.WriteHeader(503)
_, err := w.Write([]byte("error: no health updates yet, Retry-After 30 seconds"))
if err != nil {
glog.Warningf("Couldn't return too early health status: %+v", err)
}
return
}
duration := time.Since(approxUpdateTimestamp)
evaluationTime := 2 * time.Duration(*ctconfig.PollingDelay) * time.Second
if duration > evaluationTime {
w.WriteHeader(500)
_, err := w.Write([]byte(fmt.Sprintf("error: %v since last update, which is longer than 2 * pollingDelay", duration)))
if err != nil {
glog.Warningf("Couldn't return poor health status: %+v", err)
}
return
}
w.WriteHeader(200)
_, err := w.Write([]byte(fmt.Sprintf("ok: %v since last update, which is shorter than 2 * pollingDelay", duration)))
if err != nil {
glog.Warningf("Couldn't return ok health status: %+v", err)
}
})
healthServer := &http.Server{
Handler: healthHandler,
Addr: *ctconfig.HealthAddr,
}
go healthServer.ListenAndServe()
// Wait until we've finalized enrollment.
enrolledLogs.Wait()
// Wait until all jobs are finished.
syncEngine.Wait()
if err := healthServer.Shutdown(ctx); err != nil {
glog.Infof("HTTP server shutdown error: %v", err)
}
glog.Flush()
os.Exit(0)
}