in internal/task_request_response/task_request_response.go [32:140]
func Process(ctx context.Context, body []byte, s *server.IndexServer) ProcessResult {
var tr taskRequestResponse
err := json.Unmarshal(body, &tr)
if err != nil {
slog.Error("error in unmarshaling taskRequestResponse", "err", err, "body", string(body[:min(len(body), 100)]))
return ProcessResult{}
}
if tr.Truncate {
func() {
err := s.IndexingLock.LockAll()
defer s.IndexingLock.UnlockAll()
fileCleaner := file_cleaner.NewFileCleaner(s.IndexBuilder.GetIndexDir(), s.IndexingLock)
if err != nil {
cleanNodeUUID(fileCleaner)
}
err = fileCleaner.TruncateExceptUUID()
if err != nil {
cleanNodeUUID(fileCleaner)
}
}()
}
for _, task := range tr.Tasks {
switch task.Name {
case "index":
payload := task.Payload
indexReq, err := createIndexRequest(payload)
if err != nil {
slog.Error("error in creating IndexRequest", "err", err)
continue
}
repoID := indexReq.RepoID
if !s.IndexingLock.TryLock(repoID) {
slog.Warn("indexing is already in progress for RepoID", "repoId", repoID, "task", task.Name)
continue
}
go func() { //nolint:contextcheck
defer s.IndexingLock.Unlock(repoID)
callbackCtx := correlation.ContextWithCorrelation(context.Background(), correlation.ExtractFromContextOrGenerate(ctx))
errIndex := s.IndexBuilder.IndexRepository(
ctx,
indexReq,
callback.CallbackFunc{
OnSuccess: func(params callback.CallbackParams) {
s.CallbackAPI.SendSuccess(callbackCtx, params, s.IndexBuilder.GetIndexDir(), indexReq.RepoID)
},
OnFailure: func(params callback.CallbackParams, errorReason error) {
s.CallbackAPI.SendFailure(callbackCtx, params, s.IndexBuilder.GetIndexDir(), indexReq.RepoID, errorReason)
},
},
)
if errIndex != nil {
slog.Error("error while indexing", "repoId", repoID, "err", errIndex)
return
}
}()
case "delete":
payload := task.Payload
deleteReq, err := deleteRequest(payload)
if err != nil {
slog.Error("error in creating DeleteRequest", "err", err)
continue
}
repoID := deleteReq.RepoID
if !s.IndexingLock.TryLock(repoID) {
slog.Warn("indexing is already in progress for RepoID", "repoId", repoID, "task", task.Name)
continue
}
go func() {
defer s.IndexingLock.Unlock(repoID)
errDelete := s.IndexBuilder.DeleteRepository(
deleteReq,
callback.CallbackFunc{
OnSuccess: func(params callback.CallbackParams) {
s.CallbackAPI.SendSuccess(ctx, params, s.IndexBuilder.GetIndexDir(), repoID)
},
OnFailure: func(params callback.CallbackParams, errorReason error) {
s.CallbackAPI.SendFailure(ctx, params, s.IndexBuilder.GetIndexDir(), repoID, errorReason)
},
},
s.IndexingLock,
)
if errDelete != nil {
slog.Error("error while deleting", "repoId", repoID, "err", errDelete)
return
}
}()
}
}
var pullFrequency time.Duration
if tr.PullFrequency != "" {
interval, err := time.ParseDuration(tr.PullFrequency)
if err == nil {
pullFrequency = interval
}
}
return ProcessResult{
Interval: pullFrequency,
StopIndexing: tr.StopIndexing,
}
}