service/worker/archiver/activities.go (153 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package archiver
import (
"context"
"errors"
"go.uber.org/cadence"
"go.uber.org/cadence/activity"
"github.com/uber/cadence/common"
carchiver "github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
)
const (
uploadHistoryActivityFnName = "uploadHistoryActivity"
deleteHistoryActivityFnName = "deleteHistoryActivity"
archiveVisibilityActivityFnName = "archiveVisibilityActivity"
)
var (
errUploadNonRetriable = errors.New("upload non-retriable error")
errDeleteNonRetriable = errors.New("delete non-retriable error")
errArchiveVisibilityNonRetriable = errors.New("archive visibility non-retriable error")
uploadHistoryActivityNonRetryableErrors = []string{"cadenceInternal:Panic", errUploadNonRetriable.Error()}
deleteHistoryActivityNonRetryableErrors = []string{"cadenceInternal:Panic", errDeleteNonRetriable.Error()}
)
func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err error) {
container := ctx.Value(bootstrapContainerKey).(*BootstrapContainer)
scope := container.MetricsClient.Scope(metrics.ArchiverUploadHistoryActivityScope, metrics.DomainTag(request.DomainName))
sw := scope.StartTimer(metrics.CadenceLatency)
defer func() {
sw.Stop()
if err != nil {
if err.Error() == errUploadNonRetriable.Error() {
scope.IncCounter(metrics.ArchiverNonRetryableErrorCount)
}
err = cadence.NewCustomError(err.Error())
}
}()
logger := tagLoggerWithHistoryRequest(tagLoggerWithActivityInfo(container.Logger, activity.GetInfo(ctx)), &request)
URI, err := carchiver.NewURI(request.URI)
if err != nil {
logger.Error(carchiver.ArchiveNonRetriableErrorMsg, tag.ArchivalArchiveFailReason("failed to get history archival uri"), tag.ArchivalURI(request.URI), tag.Error(err))
return errUploadNonRetriable
}
historyArchiver, err := container.ArchiverProvider.GetHistoryArchiver(URI.Scheme(), service.Worker)
if err != nil {
logger.Error(carchiver.ArchiveNonRetriableErrorMsg, tag.ArchivalArchiveFailReason("failed to get history archiver"), tag.Error(err))
return errUploadNonRetriable
}
allowArchivingIncompleteHistoryOpt := carchiver.GetArchivingIncompleteHistoryOption(container.Config.AllowArchivingIncompleteHistory)
err = historyArchiver.Archive(ctx, URI, &carchiver.ArchiveHistoryRequest{
ShardID: request.ShardID,
DomainID: request.DomainID,
DomainName: request.DomainName,
WorkflowID: request.WorkflowID,
RunID: request.RunID,
BranchToken: request.BranchToken,
NextEventID: request.NextEventID,
CloseFailoverVersion: request.CloseFailoverVersion,
}, carchiver.GetHeartbeatArchiveOption(), carchiver.GetNonRetriableErrorOption(errUploadNonRetriable), allowArchivingIncompleteHistoryOpt)
if err == nil {
return nil
}
if errors.Is(err, errUploadNonRetriable) {
logger.Error(carchiver.ArchiveNonRetriableErrorMsg,
tag.ArchivalArchiveFailReason("got non-retryable error from history archiver"),
tag.Error(err),
)
return errUploadNonRetriable
}
logger.Error(carchiver.ArchiveTransientErrorMsg,
tag.ArchivalArchiveFailReason("got retryable error from history archiver"),
tag.Error(err),
)
return err
}
func deleteHistoryActivity(ctx context.Context, request ArchiveRequest) (err error) {
container := ctx.Value(bootstrapContainerKey).(*BootstrapContainer)
scope := container.MetricsClient.Scope(metrics.ArchiverDeleteHistoryActivityScope, metrics.DomainTag(request.DomainName))
sw := scope.StartTimer(metrics.CadenceLatency)
defer func() {
sw.Stop()
if err != nil {
if err.Error() == errDeleteNonRetriable.Error() {
scope.IncCounter(metrics.ArchiverNonRetryableErrorCount)
}
err = cadence.NewCustomError(err.Error())
}
}()
err = container.HistoryV2Manager.DeleteHistoryBranch(ctx, &persistence.DeleteHistoryBranchRequest{
BranchToken: request.BranchToken,
ShardID: common.IntPtr(request.ShardID),
DomainName: request.DomainName,
})
if err == nil {
return nil
}
logger := tagLoggerWithHistoryRequest(tagLoggerWithActivityInfo(container.Logger, activity.GetInfo(ctx)), &request)
logger.Error("failed to delete history events", tag.Error(err))
if !persistence.IsTransientError(err) {
return errDeleteNonRetriable
}
return err
}
func archiveVisibilityActivity(ctx context.Context, request ArchiveRequest) (err error) {
container := ctx.Value(bootstrapContainerKey).(*BootstrapContainer)
scope := container.MetricsClient.Scope(metrics.ArchiverArchiveVisibilityActivityScope, metrics.DomainTag(request.DomainName))
sw := scope.StartTimer(metrics.CadenceLatency)
defer func() {
sw.Stop()
if err != nil {
if err.Error() == errArchiveVisibilityNonRetriable.Error() {
scope.IncCounter(metrics.ArchiverNonRetryableErrorCount)
}
err = cadence.NewCustomError(err.Error())
}
}()
logger := tagLoggerWithVisibilityRequest(tagLoggerWithActivityInfo(container.Logger, activity.GetInfo(ctx)), &request)
URI, err := carchiver.NewURI(request.VisibilityURI)
if err != nil {
logger.Error(carchiver.ArchiveNonRetriableErrorMsg, tag.ArchivalArchiveFailReason("failed to get visibility archival uri"), tag.ArchivalURI(request.VisibilityURI), tag.Error(err))
return errArchiveVisibilityNonRetriable
}
visibilityArchiver, err := container.ArchiverProvider.GetVisibilityArchiver(URI.Scheme(), service.Worker)
if err != nil {
logger.Error(carchiver.ArchiveNonRetriableErrorMsg, tag.ArchivalArchiveFailReason("failed to get visibility archiver"), tag.Error(err))
return errArchiveVisibilityNonRetriable
}
err = visibilityArchiver.Archive(ctx, URI, &carchiver.ArchiveVisibilityRequest{
DomainID: request.DomainID,
DomainName: request.DomainName,
WorkflowID: request.WorkflowID,
RunID: request.RunID,
WorkflowTypeName: request.WorkflowTypeName,
StartTimestamp: request.StartTimestamp,
ExecutionTimestamp: request.ExecutionTimestamp,
CloseTimestamp: request.CloseTimestamp,
CloseStatus: request.CloseStatus,
HistoryLength: request.HistoryLength,
Memo: request.Memo,
SearchAttributes: convertSearchAttributesToString(request.SearchAttributes),
HistoryArchivalURI: request.URI,
}, carchiver.GetNonRetriableErrorOption(errArchiveVisibilityNonRetriable))
if err == nil {
return nil
}
if err.Error() == errArchiveVisibilityNonRetriable.Error() {
logger.Error(carchiver.ArchiveNonRetriableErrorMsg, tag.ArchivalArchiveFailReason("got non-retryable error from visibility archiver"))
return errArchiveVisibilityNonRetriable
}
logger.Error(carchiver.ArchiveTransientErrorMsg, tag.ArchivalArchiveFailReason("got retryable error from visibility archiver"), tag.Error(err))
return err
}