search-meilisearch/sync.go (65 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package meilisearch
import (
"context"
"github.com/apache/answer/plugin"
"github.com/segmentfault/pacman/log"
)
const (
MaxGetPageSize = 1000
MaxPutPerSize = 100
)
// sync data that already exist in Answer to meilisearch
func (s *Search) sync(ctx context.Context) {
log.Infof("start to sync question data to meilisearch")
if s.syncing {
log.Warnf("syncing is running, skip")
return
}
syncFns := []func(ctx context.Context, page, pageSize int) (answerList []*plugin.SearchContent, err error){
s.syncer.GetQuestionsPage,
s.syncer.GetAnswersPage,
}
s.lock.Lock()
defer s.lock.Unlock()
if s.syncing {
log.Warnf("syncing is running, skip")
return
}
s.syncing = true
for _, fn := range syncFns {
s.syncQuestionAndAnswerData(ctx, fn)
}
s.syncing = false
}
func (s *Search) syncQuestionAndAnswerData(ctx context.Context,
syncFunc func(ctx context.Context, page, pageSize int) (answerList []*plugin.SearchContent, err error)) {
log.Infof("start to sync data to meilisearch")
for page, pageSize := 1, MaxGetPageSize; ; page++ {
log.Infof("start to sync page %d", page)
dataList, err := syncFunc(ctx, page, pageSize)
if err != nil {
log.Errorf("get data failed %s", err)
return
}
if len(dataList) == 0 {
log.Infof("get page %d success, no other data, stop sync", page)
break
}
log.Infof("get page %d success, record count %d", page, len(dataList))
for i := 0; i < len(dataList); i += 100 {
end := i + MaxPutPerSize
if i+MaxPutPerSize > len(dataList) {
end = len(dataList)
}
resp, err := s.Client.Index(s.Config.IndexName).AddDocuments(
dataList[i:end], primaryKey)
if err != nil {
log.Errorf("add documents failed %s", err)
return
}
if err := waitForTask(s.Client, resp); err != nil {
log.Errorf("wait for task failed %s", err)
}
log.Infof("sync page %d, progress %d/%d", page, end, len(dataList))
}
}
}