go/storage/rediscache.go (297 lines of code) (raw):
package storage
import (
"crypto/rand"
"encoding/base64"
"encoding/json"
"fmt"
"net/url"
"strconv"
"strings"
"time"
"github.com/go-redis/redis"
"github.com/golang/glog"
"github.com/mozilla/crlite/go"
)
const EMPTY_QUEUE string = "redis: nil"
const NO_EXPIRATION time.Duration = 0
// The commit lock is acquired in aggregate-known before cached serials are
// written to disk. It is held until aggregate-known is done reading serials
// from disk. We set a 4 hour expiry on the commit lock in case the
// aggregate-known process is abruptly terminated. The commit process is
// fault-tolerant and will not leave persistent storage in a bad state. The
// lock expiry just ensures that the next aggregate-known process will get a
// chance to run.
const COMMIT_LOCK_KEY string = "lock::commit"
const COMMIT_LOCK_EXPIRATION time.Duration = 4 * time.Hour
const EPOCH_KEY string = "epoch"
type RedisCache struct {
client *redis.Client
}
func NewRedisCache(addr string, cacheTimeout time.Duration) (*RedisCache, error) {
rdb := redis.NewClient(&redis.Options{
Addr: addr,
MaxRetries: 10,
MaxRetryBackoff: 5 * time.Second,
ReadTimeout: cacheTimeout,
WriteTimeout: cacheTimeout,
})
statusr := rdb.Ping()
if statusr.Err() != nil {
return nil, statusr.Err()
}
rc := &RedisCache{rdb}
err := rc.MemoryPolicyCorrect()
if err != nil {
glog.Warning(err)
}
return rc, nil
}
func (rc *RedisCache) MemoryPolicyCorrect() error {
// maxmemory_policy should be `noeviction`
confr := rc.client.Info("memory")
if confr.Err() != nil {
return confr.Err()
}
if strings.Contains(confr.Val(), "maxmemory_policy:noeviction") {
return nil
}
return fmt.Errorf("Redis maxmemory_policy should be `noeviction`. Memory config is set to %s",
confr.Val())
}
func (rc *RedisCache) SetInsert(key string, entry string) (bool, error) {
ir := rc.client.SAdd(key, entry)
added, err := ir.Result()
if err != nil && strings.HasPrefix(err.Error(), "OOM") {
glog.Fatalf("Out of memory on Redis insert of entry %s into key %s, error %v", entry, key, err.Error())
}
return added == 1, err
}
func (rc *RedisCache) SetRemove(key string, entries []string) error {
batchSize := 1024
for batchStart := 0; batchStart < len(entries); batchStart += batchSize {
batchEnd := batchStart + batchSize
if batchEnd > len(entries) {
batchEnd = len(entries)
}
batch := entries[batchStart:batchEnd]
_, err := rc.client.Pipelined(func(pipe redis.Pipeliner) error {
for _, entry := range batch {
err := pipe.SRem(key, entry).Err()
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
}
return nil
}
func (rc *RedisCache) SetContains(key string, entry string) (bool, error) {
br := rc.client.SIsMember(key, entry)
return br.Result()
}
func (rc *RedisCache) SetList(key string) ([]string, error) {
slicer := rc.client.SMembers(key)
return slicer.Result()
}
func (rc *RedisCache) SetToChan(key string, c chan<- string) error {
defer close(c)
scanres := rc.client.SScan(key, 0, "", 0)
err := scanres.Err()
if err != nil {
return err
}
iter := scanres.Iterator()
for iter.Next() {
c <- iter.Val()
}
return iter.Err()
}
func (rc *RedisCache) SetCardinality(key string) (int, error) {
v, err := rc.client.SCard(key).Result()
return int(v), err
}
func (rc *RedisCache) Exists(key string) (bool, error) {
ir := rc.client.Exists(key)
count, err := ir.Result()
return count == 1, err
}
func (rc *RedisCache) ExpireAt(key string, aExpTime time.Time) error {
br := rc.client.ExpireAt(key, aExpTime)
return br.Err()
}
func (rc *RedisCache) KeysToChan(pattern string, c chan<- string) error {
defer close(c)
scanres := rc.client.Scan(0, pattern, 0)
err := scanres.Err()
if err != nil {
return err
}
iter := scanres.Iterator()
for iter.Next() {
c <- iter.Val()
}
return iter.Err()
}
func shortUrlToLogKey(shortUrl string) string {
return fmt.Sprintf("log::%s", strings.TrimRight(shortUrl, "/"))
}
func (ec *RedisCache) Migrate(logData *types.CTLogMetadata) error {
logUrlObj, err := url.Parse(logData.URL)
if err != nil {
return err
}
shortUrl := logUrlObj.Host + strings.TrimRight(logUrlObj.Path, "/")
newKey := shortUrlToLogKey(shortUrl)
_, err = ec.client.Get(newKey).Bytes()
if err != nil && err != redis.Nil {
return err
}
haveNew := err != redis.Nil
oldKey := newKey + "/"
oldData, err := ec.client.Get(oldKey).Bytes()
if err != nil && err != redis.Nil {
return err
}
haveOld := err != redis.Nil
// If we have both new and old data, then just delete old.
if haveOld && haveNew {
ec.client.Del(oldKey)
return nil
}
// If we have old data but not new, migrate.
if haveOld {
var log types.CTLogState
if err = json.Unmarshal(oldData, &log); err != nil {
return err
}
if err = ec.StoreLogState(&log); err != nil {
return err
}
ec.client.Del(oldKey)
return nil
}
// No data. Nothing to do.
return nil
}
func (ec *RedisCache) StoreLogState(log *types.CTLogState) error {
encoded, err := json.Marshal(log)
if err != nil {
return err
}
return ec.client.Set(shortUrlToLogKey(log.ShortURL), encoded, NO_EXPIRATION).Err()
}
func (ec *RedisCache) LoadLogState(shortUrl string) (*types.CTLogState, error) {
data, err := ec.client.Get(shortUrlToLogKey(shortUrl)).Bytes()
if err != nil {
return nil, err
}
var log types.CTLogState
if err = json.Unmarshal(data, &log); err != nil {
return nil, err
}
return &log, nil
}
func (ec *RedisCache) LoadAllLogStates() ([]types.CTLogState, error) {
ctLogList := make([]types.CTLogState, 0)
keyChan := make(chan string)
go func() {
err := ec.KeysToChan("log::*", keyChan)
if err != nil {
glog.Fatalf("Couldn't list CT logs from cache: %s", err)
}
}()
for entry := range keyChan {
data, err := ec.client.Get(entry).Bytes()
if err != nil {
return nil, fmt.Errorf("Couldn't parse CT logs metadata: %s", err)
}
ctLogList = append(ctLogList, types.CTLogState{})
if err := json.Unmarshal(data, &ctLogList[len(ctLogList)-1]); err != nil {
return nil, fmt.Errorf("Couldn't parse CT logs metadata: %s", err)
}
}
return ctLogList, nil
}
func (ec *RedisCache) AcquireCommitLock() (*string, error) {
randomBytes := make([]byte, 16)
if _, err := rand.Read(randomBytes); err != nil {
return nil, err
}
commitLockToken := base64.URLEncoding.EncodeToString(randomBytes)
// SETNX is a set-if-not-set primitive. Returns true if commitLockToken
// is the new value associated with COMMIT_LOCK_KEY. Returns false or
// an error otherwise.
set, err := ec.client.SetNX(COMMIT_LOCK_KEY, commitLockToken, COMMIT_LOCK_EXPIRATION).Result()
if err != nil || !set {
return nil, err
}
return &commitLockToken, err
}
func (ec *RedisCache) ReleaseCommitLock(aToken string) {
hasLock, err := ec.HasCommitLock(aToken)
if err == nil && hasLock {
ec.client.Del(COMMIT_LOCK_KEY)
}
}
func (ec *RedisCache) HasCommitLock(aToken string) (bool, error) {
lockHolder, err := ec.client.Get(COMMIT_LOCK_KEY).Result()
if err == redis.Nil { // COMMIT_LOCK_KEY not set
return false, nil
}
if err != nil {
return false, err
}
return lockHolder == aToken, nil
}
func (ec *RedisCache) GetEpoch() (uint64, error) {
epochStr, err := ec.client.Get(EPOCH_KEY).Result()
if err == redis.Nil { // EPOCH_KEY not set
return 0, nil
}
if err != nil {
return 0, err
}
return strconv.ParseUint(epochStr, 10, 64)
}
func (ec *RedisCache) NextEpoch() error {
return ec.client.Incr(EPOCH_KEY).Err()
}
func (ec *RedisCache) Restore(aEpoch uint64, aLogStates []types.CTLogState) error {
commitToken, err := ec.AcquireCommitLock()
if err != nil || commitToken == nil {
return fmt.Errorf("Failed to acquire commit lock: %s", err)
}
defer ec.ReleaseCommitLock(*commitToken)
logKeys, err := ec.client.Keys("log::*").Result()
if err != nil {
return err
}
for _, logKey := range logKeys {
err = ec.client.Del(logKey).Err()
if err != nil {
return err
}
}
for _, logState := range aLogStates {
err := ec.StoreLogState(&logState)
if err != nil {
return err
}
}
err = ec.client.Set(EPOCH_KEY, aEpoch, NO_EXPIRATION).Err()
if err != nil {
return err
}
return nil
}
func (ec *RedisCache) AddPreIssuerAlias(aPreIssuer types.Issuer, aIssuer types.Issuer) error {
key := fmt.Sprintf("preissuer::%s", aPreIssuer.ID())
added, err := ec.SetInsert(key, aIssuer.ID())
if err == nil && added {
glog.Warningf("Added preissuer alias %s -> %s", aPreIssuer.ID(), aIssuer.ID())
// This alias will be preserved for one week. During this time
// any call to CertDatabase.Commit() will migrate serials from
// the preissuer's bin to the issuer's bin.
ec.ExpireAt(key, time.Now().AddDate(0, 0, 7))
}
return err
}
func (ec *RedisCache) GetPreIssuerAliases(aPreIssuer types.Issuer) ([]types.Issuer, error) {
key := fmt.Sprintf("preissuer::%s", aPreIssuer.ID())
aliases, err := ec.SetList(key)
if err != nil {
return nil, err
}
issuerList := make([]types.Issuer, 0, len(aliases))
for _, alias := range aliases {
issuerList = append(issuerList, types.NewIssuerFromString(alias))
}
return issuerList, nil
}