Skip to content

Commit b4e0dac

Browse files
authored
feat: add async reindexing for large dataset (#94)
* feat: add async reindexing for large dataset * fix: respect wait_for_completion param
1 parent fac6e6b commit b4e0dac

File tree

3 files changed

+152
-34
lines changed

3 files changed

+152
-34
lines changed

plugins/reindexer/dao.go

Lines changed: 93 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package reindexer
33
import (
44
"context"
55
"encoding/json"
6+
"errors"
67
"fmt"
78
"regexp"
9+
"time"
810

911
log "github.com/sirupsen/logrus"
1012

@@ -13,6 +15,35 @@ import (
1315
es7 "github.com/olivere/elastic/v7"
1416
)
1517

18+
func postReIndex(ctx context.Context, sourceIndex, newIndexName string) error {
19+
// Fetch all the aliases of old index
20+
alias, err := aliasesOf(ctx, sourceIndex)
21+
22+
var aliases = []string{}
23+
if err != nil {
24+
return errors.New(`error fetching aliases of index ` + sourceIndex + "\n" + err.Error())
25+
}
26+
27+
if alias == "" {
28+
aliases = append(aliases, sourceIndex)
29+
} else {
30+
aliases = append(aliases, alias)
31+
}
32+
33+
// Delete old index
34+
err = deleteIndex(ctx, sourceIndex)
35+
if err != nil {
36+
return errors.New(`error deleting source index ` + sourceIndex + "\n" + err.Error())
37+
}
38+
// Set aliases of old index to the new index.
39+
err = setAlias(ctx, newIndexName, aliases...)
40+
if err != nil {
41+
return errors.New(`error setting alias for ` + newIndexName + "\n" + err.Error())
42+
}
43+
44+
return nil
45+
}
46+
1647
// Reindex Inplace: https://www.elastic.co/guide/en/elasticsearch/reference/current/reindex-upgrade-inplace.html
1748
//
1849
// 1. Create a new index and copy the mappings and settings from the old index.
@@ -116,53 +147,31 @@ func reindex(ctx context.Context, sourceIndex string, config *reindexConfig, wai
116147
Destination(dest).
117148
WaitForCompletion(waitForCompletion)
118149

119-
// If wait_for_completion = true, then we carry out the task synchronously along with three more steps:
120-
// - fetch any aliases of the old index
121-
// - delete the old index
122-
// - set the aliases of the old index to the new index
123150
if waitForCompletion {
124151
response, err := reindex.Do(ctx)
125152
if err != nil {
126153
return nil, err
127154
}
128155

129156
if destinationIndex == "" {
130-
// Fetch all the aliases of old index
131-
alias, err := aliasesOf(ctx, sourceIndex)
132-
133-
var aliases = []string{}
157+
err = postReIndex(ctx, sourceIndex, newIndexName)
134158
if err != nil {
135-
return nil, fmt.Errorf(`error fetching aliases of index "%s": %v`, sourceIndex, err)
136-
}
137-
138-
if alias == "" {
139-
aliases = append(aliases, sourceIndex)
140-
} else {
141-
aliases = append(aliases, alias)
142-
}
143-
144-
// Delete old index
145-
err = deleteIndex(ctx, sourceIndex)
146-
if err != nil {
147-
return nil, fmt.Errorf(`error deleting index "%s": %v\n`, sourceIndex, err)
148-
}
149-
// Set aliases of old index to the new index.
150-
err = setAlias(ctx, newIndexName, aliases...)
151-
if err != nil {
152-
return nil, fmt.Errorf(`error setting alias "%s" for index "%s"`, sourceIndex, newIndexName)
159+
return nil, err
153160
}
154161
}
155162

156163
return json.Marshal(response)
157164
}
158-
159-
// If wait_for_completion = false, we carry out the reindexing asynchronously and return the task ID.
165+
// If wait_for_completion = false, we carry out the re-indexing asynchronously and return the task ID.
166+
log.Println(logTag, fmt.Sprintf(" Data is > %d so using async reindex", IndexStoreSize))
160167
response, err := reindex.DoAsync(context.Background())
161168
if err != nil {
162169
return nil, err
163170
}
164171
taskID := response.TaskId
165172

173+
go asyncReIndex(taskID, sourceIndex, newIndexName)
174+
166175
// Get the reindex task by ID
167176
task, err := util.GetClient7().TasksGetTask().TaskId(taskID).Do(context.Background())
168177
if err != nil {
@@ -383,3 +392,59 @@ func getAliasIndexMap(ctx context.Context) (map[string]string, error) {
383392

384393
return res, nil
385394
}
395+
396+
func getIndexSize(ctx context.Context, indexName string) (int64, error) {
397+
var res int64
398+
index := classify.GetAliasIndex(indexName)
399+
if index == "" {
400+
index = indexName
401+
}
402+
stats, err := util.GetClient7().IndexStats(indexName).Do(ctx)
403+
if err != nil {
404+
return res, err
405+
}
406+
res = stats.Indices[index].Primaries.Store.SizeInBytes
407+
return res, nil
408+
}
409+
410+
func isTaskCompleted(ctx context.Context, taskID string) (bool, error) {
411+
res := false
412+
413+
status, err := util.GetClient7().TasksGetTask().TaskId(taskID).Do(ctx)
414+
if err != nil {
415+
log.Errorln(logTag, " Get task status error", err)
416+
return res, err
417+
}
418+
419+
res = status.Completed
420+
return res, nil
421+
}
422+
423+
// go routine to track async re-indexing process for a given source and destination index.
424+
// it checks every 30s if task is completed or not.
425+
func asyncReIndex(taskID, source, destination string) {
426+
SetCurrentProcess(taskID, source, destination)
427+
isCompleted := make(chan bool, 1)
428+
ticker := time.Tick(30 * time.Second)
429+
ctx := context.Background()
430+
431+
for {
432+
select {
433+
case <-ticker:
434+
ok, _ := isTaskCompleted(ctx, taskID)
435+
log.Println(logTag, " "+taskID+" task is still re-indexing data...")
436+
if ok {
437+
isCompleted <- true
438+
}
439+
case <-isCompleted:
440+
log.Println(logTag, taskID+" task completed successfully")
441+
// remove process from current cache
442+
RemoveCurrentProcess(taskID)
443+
err := postReIndex(ctx, source, destination)
444+
if err != nil {
445+
log.Errorln(logTag, " post re-indexing error: ", err)
446+
}
447+
return
448+
}
449+
}
450+
}

plugins/reindexer/handlers.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package reindexer
22

33
import (
44
"encoding/json"
5+
"fmt"
56
"io/ioutil"
67
"net/http"
78
"strconv"
@@ -28,8 +29,11 @@ func (rx *reindexer) reindex() http.HandlerFunc {
2829
if checkVar(ok, w, "index") {
2930
return
3031
}
31-
32-
err, body, waitForCompletion, done := reindexConfigResponse(req, w)
32+
if IsReIndexInProcess(indexName, "") {
33+
util.WriteBackError(w, fmt.Sprintf(`Re-indexing is already in progress for %s index`, indexName), http.StatusInternalServerError)
34+
return
35+
}
36+
err, body, waitForCompletion, done := reindexConfigResponse(req, w, indexName)
3337
if done {
3438
return
3539
}
@@ -50,7 +54,7 @@ func (rx *reindexer) reindexSrcToDest() http.HandlerFunc {
5054
if checkVar(okD, w, "destination_index") {
5155
return
5256
}
53-
err, body, waitForCompletion, done := reindexConfigResponse(req, w)
57+
err, body, waitForCompletion, done := reindexConfigResponse(req, w, sourceIndex)
5458
if done {
5559
return
5660
}
@@ -91,7 +95,7 @@ func checkVar(okS bool, w http.ResponseWriter, variable string) bool {
9195
return false
9296
}
9397

94-
func reindexConfigResponse(req *http.Request, w http.ResponseWriter) (error, reindexConfig, bool, bool) {
98+
func reindexConfigResponse(req *http.Request, w http.ResponseWriter, sourceIndex string) (error, reindexConfig, bool, bool) {
9599
reqBody, err := ioutil.ReadAll(req.Body)
96100
if err != nil {
97101
log.Errorln(logTag, ":", err)
@@ -108,10 +112,21 @@ func reindexConfigResponse(req *http.Request, w http.ResponseWriter) (error, rei
108112
return nil, reindexConfig{}, false, true
109113
}
110114

111-
// By default, wait_for_completion = true
115+
// By default, wait_for_completion depends on size of index
112116
param := req.URL.Query().Get("wait_for_completion")
113117
if param == "" {
114-
param = "true"
118+
// Get the size of currentIndex, if that is > IndexStoreSize (5MB - 5000000 Bytes) then do async re-indexing.
119+
size, err := getIndexSize(req.Context(), sourceIndex)
120+
if err != nil {
121+
log.Errorln(logTag, ":", err)
122+
util.WriteBackError(w, "Unable to get the size of "+sourceIndex, http.StatusBadRequest)
123+
return nil, reindexConfig{}, false, true
124+
}
125+
if size > IndexStoreSize {
126+
param = "false"
127+
} else {
128+
param = "true"
129+
}
115130
}
116131
waitForCompletion, err := strconv.ParseBool(param)
117132
if err != nil {

plugins/reindexer/util.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"regexp"
77
"strconv"
88
"strings"
9+
"sync"
910

1011
"github.com/appbaseio/arc/middleware/classify"
1112
log "github.com/sirupsen/logrus"
@@ -26,6 +27,15 @@ type AliasedIndices struct {
2627
PriStoreSize string `json:"pri.store.size"`
2728
}
2829

30+
// CurrentlyReIndexingProcess map of taskID [source, destinations] indexes for which indexing process is going on
31+
var CurrentlyReIndexingProcess = make(map[string][]string)
32+
33+
// CurrentlyReIndexingProcessMutex to stop concurrent writes on map
34+
var CurrentlyReIndexingProcessMutex = sync.RWMutex{}
35+
36+
// IndexStoreSize to decide whether to use async or sync re-indexing
37+
const IndexStoreSize = int64(5000000)
38+
2939
// reindexedName calculates from the name the number of times an index has been
3040
// reindexed to generate the successive name for the index. For example: for an
3141
// index named "twitter", the funtion returns "twitter_reindexed_1", and for an
@@ -76,3 +86,31 @@ func InitAliasIndexCache() {
7686
classify.SetAliasIndexCache(aliasIndexMap)
7787
log.Println(logTag, "=> Alias Index Cache", classify.GetAliasIndexCache())
7888
}
89+
90+
// SetCurrentProcess set indexes for current reindexing process
91+
func SetCurrentProcess(taskID, source, destination string) {
92+
CurrentlyReIndexingProcessMutex.Lock()
93+
CurrentlyReIndexingProcess[taskID] = []string{source, destination}
94+
CurrentlyReIndexingProcessMutex.Unlock()
95+
}
96+
97+
// RemoveCurrentProcess remove indexes for current reindexing process
98+
func RemoveCurrentProcess(taskID string) {
99+
CurrentlyReIndexingProcessMutex.Lock()
100+
delete(CurrentlyReIndexingProcess, taskID)
101+
CurrentlyReIndexingProcessMutex.Unlock()
102+
}
103+
104+
// IsReIndexInProcess check if index is Processing currently
105+
func IsReIndexInProcess(source, destination string) bool {
106+
for _, processingIndexes := range CurrentlyReIndexingProcess {
107+
if processingIndexes[0] == source || processingIndexes[0] == destination {
108+
return true
109+
}
110+
if processingIndexes[1] == source || processingIndexes[1] == destination {
111+
return true
112+
}
113+
}
114+
115+
return false
116+
}

0 commit comments

Comments
 (0)