func Process()

in internal/task_request_response/task_request_response.go [32:140]


func Process(ctx context.Context, body []byte, s *server.IndexServer) ProcessResult {
	var tr taskRequestResponse
	err := json.Unmarshal(body, &tr)
	if err != nil {
		slog.Error("error in unmarshaling taskRequestResponse", "err", err, "body", string(body[:min(len(body), 100)]))
		return ProcessResult{}
	}
	if tr.Truncate {
		func() {
			err := s.IndexingLock.LockAll()
			defer s.IndexingLock.UnlockAll()

			fileCleaner := file_cleaner.NewFileCleaner(s.IndexBuilder.GetIndexDir(), s.IndexingLock)
			if err != nil {
				cleanNodeUUID(fileCleaner)
			}

			err = fileCleaner.TruncateExceptUUID()
			if err != nil {
				cleanNodeUUID(fileCleaner)
			}
		}()
	}
	for _, task := range tr.Tasks {
		switch task.Name {
		case "index":
			payload := task.Payload
			indexReq, err := createIndexRequest(payload)
			if err != nil {
				slog.Error("error in creating IndexRequest", "err", err)
				continue
			}
			repoID := indexReq.RepoID
			if !s.IndexingLock.TryLock(repoID) {
				slog.Warn("indexing is already in progress for RepoID", "repoId", repoID, "task", task.Name)
				continue
			}
			go func() { //nolint:contextcheck
				defer s.IndexingLock.Unlock(repoID)

				callbackCtx := correlation.ContextWithCorrelation(context.Background(), correlation.ExtractFromContextOrGenerate(ctx))
				errIndex := s.IndexBuilder.IndexRepository(
					ctx,
					indexReq,
					callback.CallbackFunc{
						OnSuccess: func(params callback.CallbackParams) {
							s.CallbackAPI.SendSuccess(callbackCtx, params, s.IndexBuilder.GetIndexDir(), indexReq.RepoID)
						},
						OnFailure: func(params callback.CallbackParams, errorReason error) {
							s.CallbackAPI.SendFailure(callbackCtx, params, s.IndexBuilder.GetIndexDir(), indexReq.RepoID, errorReason)
						},
					},
				)
				if errIndex != nil {
					slog.Error("error while indexing", "repoId", repoID, "err", errIndex)
					return
				}
			}()
		case "delete":
			payload := task.Payload
			deleteReq, err := deleteRequest(payload)
			if err != nil {
				slog.Error("error in creating DeleteRequest", "err", err)
				continue
			}

			repoID := deleteReq.RepoID

			if !s.IndexingLock.TryLock(repoID) {
				slog.Warn("indexing is already in progress for RepoID", "repoId", repoID, "task", task.Name)
				continue
			}
			go func() {
				defer s.IndexingLock.Unlock(repoID)

				errDelete := s.IndexBuilder.DeleteRepository(
					deleteReq,
					callback.CallbackFunc{
						OnSuccess: func(params callback.CallbackParams) {
							s.CallbackAPI.SendSuccess(ctx, params, s.IndexBuilder.GetIndexDir(), repoID)
						},
						OnFailure: func(params callback.CallbackParams, errorReason error) {
							s.CallbackAPI.SendFailure(ctx, params, s.IndexBuilder.GetIndexDir(), repoID, errorReason)
						},
					},
					s.IndexingLock,
				)

				if errDelete != nil {
					slog.Error("error while deleting", "repoId", repoID, "err", errDelete)
					return
				}
			}()
		}
	}

	var pullFrequency time.Duration
	if tr.PullFrequency != "" {
		interval, err := time.ParseDuration(tr.PullFrequency)
		if err == nil {
			pullFrequency = interval
		}
	}

	return ProcessResult{
		Interval:     pullFrequency,
		StopIndexing: tr.StopIndexing,
	}
}