internal/task_request_response/task_request_response.go (154 lines of code) (raw):
package task_request_response //nolint:staticcheck
import (
"context"
"encoding/json"
"log/slog"
"time"
"gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/callback"
"gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/file_cleaner"
"gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/server"
"gitlab.com/gitlab-org/labkit/correlation"
)
type task struct {
Name string `json:"name"`
Payload map[string]any `json:"payload"`
}
type taskRequestResponse struct {
Tasks []task `json:"tasks"`
PullFrequency string `json:"pull_frequency"`
Truncate bool `json:"truncate"`
StopIndexing bool `json:"stop_indexing"`
}
type ProcessResult struct {
Interval time.Duration // 0 means keep the current interval
StopIndexing bool
}
func Process(ctx context.Context, body []byte, s *server.IndexServer) ProcessResult {
var tr taskRequestResponse
err := json.Unmarshal(body, &tr)
if err != nil {
slog.Error("error in unmarshaling taskRequestResponse", "err", err, "body", string(body[:min(len(body), 100)]))
return ProcessResult{}
}
if tr.Truncate {
func() {
err := s.IndexingLock.LockAll()
defer s.IndexingLock.UnlockAll()
fileCleaner := file_cleaner.NewFileCleaner(s.IndexBuilder.GetIndexDir(), s.IndexingLock)
if err != nil {
cleanNodeUUID(fileCleaner)
}
err = fileCleaner.TruncateExceptUUID()
if err != nil {
cleanNodeUUID(fileCleaner)
}
}()
}
for _, task := range tr.Tasks {
switch task.Name {
case "index":
payload := task.Payload
indexReq, err := createIndexRequest(payload)
if err != nil {
slog.Error("error in creating IndexRequest", "err", err)
continue
}
repoID := indexReq.RepoID
if !s.IndexingLock.TryLock(repoID) {
slog.Warn("indexing is already in progress for RepoID", "repoId", repoID, "task", task.Name)
continue
}
go func() { //nolint:contextcheck
defer s.IndexingLock.Unlock(repoID)
callbackCtx := correlation.ContextWithCorrelation(context.Background(), correlation.ExtractFromContextOrGenerate(ctx))
errIndex := s.IndexBuilder.IndexRepository(
ctx,
indexReq,
callback.CallbackFunc{
OnSuccess: func(params callback.CallbackParams) {
s.CallbackAPI.SendSuccess(callbackCtx, params, s.IndexBuilder.GetIndexDir(), indexReq.RepoID)
},
OnFailure: func(params callback.CallbackParams, errorReason error) {
s.CallbackAPI.SendFailure(callbackCtx, params, s.IndexBuilder.GetIndexDir(), indexReq.RepoID, errorReason)
},
},
)
if errIndex != nil {
slog.Error("error while indexing", "repoId", repoID, "err", errIndex)
return
}
}()
case "delete":
payload := task.Payload
deleteReq, err := deleteRequest(payload)
if err != nil {
slog.Error("error in creating DeleteRequest", "err", err)
continue
}
repoID := deleteReq.RepoID
if !s.IndexingLock.TryLock(repoID) {
slog.Warn("indexing is already in progress for RepoID", "repoId", repoID, "task", task.Name)
continue
}
go func() {
defer s.IndexingLock.Unlock(repoID)
errDelete := s.IndexBuilder.DeleteRepository(
deleteReq,
callback.CallbackFunc{
OnSuccess: func(params callback.CallbackParams) {
s.CallbackAPI.SendSuccess(ctx, params, s.IndexBuilder.GetIndexDir(), repoID)
},
OnFailure: func(params callback.CallbackParams, errorReason error) {
s.CallbackAPI.SendFailure(ctx, params, s.IndexBuilder.GetIndexDir(), repoID, errorReason)
},
},
s.IndexingLock,
)
if errDelete != nil {
slog.Error("error while deleting", "repoId", repoID, "err", errDelete)
return
}
}()
}
}
var pullFrequency time.Duration
if tr.PullFrequency != "" {
interval, err := time.ParseDuration(tr.PullFrequency)
if err == nil {
pullFrequency = interval
}
}
return ProcessResult{
Interval: pullFrequency,
StopIndexing: tr.StopIndexing,
}
}
func createIndexRequest(payload map[string]any) (server.IndexRequest, error) {
var indexReq server.IndexRequest
payloadJSON, err := json.Marshal(payload)
if err != nil {
return indexReq, err
}
if err := json.Unmarshal(payloadJSON, &indexReq); err != nil {
return indexReq, err
}
return indexReq, nil
}
func deleteRequest(payload map[string]any) (server.DeleteRequest, error) {
var deleteReq server.DeleteRequest
payloadJSON, err := json.Marshal(payload)
if err != nil {
return deleteReq, err
}
if err := json.Unmarshal(payloadJSON, &deleteReq); err != nil {
return deleteReq, err
}
return deleteReq, nil
}
func cleanNodeUUID(fileCleaner *file_cleaner.FileCleaner) {
err := fileCleaner.RemoveNodeUUID()
if err != nil {
slog.Error("error while removing node.uuid", "err", err)
panic(err)
}
}