internal/task_request_response/task_request_response.go (154 lines of code) (raw):

package task_request_response //nolint:staticcheck import ( "context" "encoding/json" "log/slog" "time" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/callback" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/file_cleaner" "gitlab.com/gitlab-org/gitlab-zoekt-indexer/internal/server" "gitlab.com/gitlab-org/labkit/correlation" ) type task struct { Name string `json:"name"` Payload map[string]any `json:"payload"` } type taskRequestResponse struct { Tasks []task `json:"tasks"` PullFrequency string `json:"pull_frequency"` Truncate bool `json:"truncate"` StopIndexing bool `json:"stop_indexing"` } type ProcessResult struct { Interval time.Duration // 0 means keep the current interval StopIndexing bool } func Process(ctx context.Context, body []byte, s *server.IndexServer) ProcessResult { var tr taskRequestResponse err := json.Unmarshal(body, &tr) if err != nil { slog.Error("error in unmarshaling taskRequestResponse", "err", err, "body", string(body[:min(len(body), 100)])) return ProcessResult{} } if tr.Truncate { func() { err := s.IndexingLock.LockAll() defer s.IndexingLock.UnlockAll() fileCleaner := file_cleaner.NewFileCleaner(s.IndexBuilder.GetIndexDir(), s.IndexingLock) if err != nil { cleanNodeUUID(fileCleaner) } err = fileCleaner.TruncateExceptUUID() if err != nil { cleanNodeUUID(fileCleaner) } }() } for _, task := range tr.Tasks { switch task.Name { case "index": payload := task.Payload indexReq, err := createIndexRequest(payload) if err != nil { slog.Error("error in creating IndexRequest", "err", err) continue } repoID := indexReq.RepoID if !s.IndexingLock.TryLock(repoID) { slog.Warn("indexing is already in progress for RepoID", "repoId", repoID, "task", task.Name) continue } go func() { //nolint:contextcheck defer s.IndexingLock.Unlock(repoID) callbackCtx := correlation.ContextWithCorrelation(context.Background(), correlation.ExtractFromContextOrGenerate(ctx)) errIndex := s.IndexBuilder.IndexRepository( ctx, indexReq, callback.CallbackFunc{ OnSuccess: func(params callback.CallbackParams) { s.CallbackAPI.SendSuccess(callbackCtx, params, s.IndexBuilder.GetIndexDir(), indexReq.RepoID) }, OnFailure: func(params callback.CallbackParams, errorReason error) { s.CallbackAPI.SendFailure(callbackCtx, params, s.IndexBuilder.GetIndexDir(), indexReq.RepoID, errorReason) }, }, ) if errIndex != nil { slog.Error("error while indexing", "repoId", repoID, "err", errIndex) return } }() case "delete": payload := task.Payload deleteReq, err := deleteRequest(payload) if err != nil { slog.Error("error in creating DeleteRequest", "err", err) continue } repoID := deleteReq.RepoID if !s.IndexingLock.TryLock(repoID) { slog.Warn("indexing is already in progress for RepoID", "repoId", repoID, "task", task.Name) continue } go func() { defer s.IndexingLock.Unlock(repoID) errDelete := s.IndexBuilder.DeleteRepository( deleteReq, callback.CallbackFunc{ OnSuccess: func(params callback.CallbackParams) { s.CallbackAPI.SendSuccess(ctx, params, s.IndexBuilder.GetIndexDir(), repoID) }, OnFailure: func(params callback.CallbackParams, errorReason error) { s.CallbackAPI.SendFailure(ctx, params, s.IndexBuilder.GetIndexDir(), repoID, errorReason) }, }, s.IndexingLock, ) if errDelete != nil { slog.Error("error while deleting", "repoId", repoID, "err", errDelete) return } }() } } var pullFrequency time.Duration if tr.PullFrequency != "" { interval, err := time.ParseDuration(tr.PullFrequency) if err == nil { pullFrequency = interval } } return ProcessResult{ Interval: pullFrequency, StopIndexing: tr.StopIndexing, } } func createIndexRequest(payload map[string]any) (server.IndexRequest, error) { var indexReq server.IndexRequest payloadJSON, err := json.Marshal(payload) if err != nil { return indexReq, err } if err := json.Unmarshal(payloadJSON, &indexReq); err != nil { return indexReq, err } return indexReq, nil } func deleteRequest(payload map[string]any) (server.DeleteRequest, error) { var deleteReq server.DeleteRequest payloadJSON, err := json.Marshal(payload) if err != nil { return deleteReq, err } if err := json.Unmarshal(payloadJSON, &deleteReq); err != nil { return deleteReq, err } return deleteReq, nil } func cleanNodeUUID(fileCleaner *file_cleaner.FileCleaner) { err := fileCleaner.RemoveNodeUUID() if err != nil { slog.Error("error while removing node.uuid", "err", err) panic(err) } }