-
Notifications
You must be signed in to change notification settings - Fork 54
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: unable to download json results
- Loading branch information
Showing
2 changed files
with
245 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
package export | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"github.com/ReneKroon/ttlcache" | ||
"github.com/apex/log" | ||
"github.com/crawlab-team/crawlab-core/constants" | ||
"github.com/crawlab-team/crawlab-core/entity" | ||
"github.com/crawlab-team/crawlab-core/interfaces" | ||
"github.com/crawlab-team/crawlab-core/utils" | ||
"github.com/crawlab-team/crawlab-db/mongo" | ||
"github.com/crawlab-team/go-trace" | ||
"github.com/hashicorp/go-uuid" | ||
mongo2 "go.mongodb.org/mongo-driver/mongo" | ||
"os" | ||
"path" | ||
"time" | ||
) | ||
|
||
type JsonService struct { | ||
cache *ttlcache.Cache | ||
} | ||
|
||
func (svc *JsonService) GenerateId() (exportId string, err error) { | ||
exportId, err = uuid.GenerateUUID() | ||
if err != nil { | ||
return "", trace.TraceError(err) | ||
} | ||
return exportId, nil | ||
} | ||
|
||
func (svc *JsonService) Export(exportType, target string, filter interfaces.Filter) (exportId string, err error) { | ||
// generate export id | ||
exportId, err = svc.GenerateId() | ||
if err != nil { | ||
return "", err | ||
} | ||
|
||
// export | ||
export := &entity.Export{ | ||
Id: exportId, | ||
Type: exportType, | ||
Target: target, | ||
Filter: filter, | ||
Status: constants.TaskStatusRunning, | ||
StartTs: time.Now(), | ||
FileName: svc.getFileName(exportId), | ||
DownloadPath: svc.getDownloadPath(exportId), | ||
Limit: 100, | ||
} | ||
|
||
// save to cache | ||
svc.cache.Set(exportId, export) | ||
|
||
// execute export | ||
go svc.export(export) | ||
|
||
return exportId, nil | ||
} | ||
|
||
func (svc *JsonService) GetExport(exportId string) (export interfaces.Export, err error) { | ||
// get export from cache | ||
res, ok := svc.cache.Get(exportId) | ||
if !ok { | ||
return nil, trace.TraceError(errors.New("export not found")) | ||
} | ||
export = res.(interfaces.Export) | ||
return export, nil | ||
} | ||
|
||
func (svc *JsonService) export(export *entity.Export) { | ||
// check empty | ||
if export.Target == "" { | ||
err := errors.New("empty target") | ||
export.Status = constants.TaskStatusError | ||
export.EndTs = time.Now() | ||
log.Errorf("export error (id: %s): %v", export.Id, err) | ||
trace.PrintError(err) | ||
svc.cache.Set(export.Id, export) | ||
return | ||
} | ||
|
||
// mongo collection | ||
col := mongo.GetMongoCol(export.Target) | ||
|
||
// mongo query | ||
query, err := utils.FilterToQuery(export.Filter) | ||
if err != nil { | ||
export.Status = constants.TaskStatusError | ||
export.EndTs = time.Now() | ||
log.Errorf("export error (id: %s): %v", export.Id, err) | ||
trace.PrintError(err) | ||
svc.cache.Set(export.Id, export) | ||
return | ||
} | ||
|
||
// mongo cursor | ||
cur := col.Find(query, nil).GetCursor() | ||
|
||
// data | ||
var jsonData []interface{} | ||
|
||
// iterate cursor | ||
i := 0 | ||
for { | ||
// increment counter | ||
i++ | ||
|
||
// check error | ||
err := cur.Err() | ||
if err != nil { | ||
if err != mongo2.ErrNoDocuments { | ||
// error | ||
export.Status = constants.TaskStatusError | ||
export.EndTs = time.Now() | ||
log.Errorf("export error (id: %s): %v", export.Id, err) | ||
trace.PrintError(err) | ||
} else { | ||
// no more data | ||
export.Status = constants.TaskStatusFinished | ||
export.EndTs = time.Now() | ||
log.Infof("export finished (id: %s)", export.Id) | ||
} | ||
svc.cache.Set(export.Id, export) | ||
return | ||
} | ||
|
||
// has data | ||
if !cur.Next(context.Background()) { | ||
// no more data | ||
export.Status = constants.TaskStatusFinished | ||
export.EndTs = time.Now() | ||
log.Infof("export finished (id: %s)", export.Id) | ||
svc.cache.Set(export.Id, export) | ||
break | ||
} | ||
|
||
// convert raw data to entity | ||
var data map[string]interface{} | ||
err = cur.Decode(&data) | ||
if err != nil { | ||
// error | ||
export.Status = constants.TaskStatusError | ||
export.EndTs = time.Now() | ||
log.Errorf("export error (id: %s): %v", export.Id, err) | ||
trace.PrintError(err) | ||
svc.cache.Set(export.Id, export) | ||
return | ||
} | ||
|
||
jsonData = append(jsonData, data) | ||
} | ||
|
||
jsonBytes, err := json.Marshal(jsonData) | ||
if err != nil { | ||
// error | ||
export.Status = constants.TaskStatusError | ||
export.EndTs = time.Now() | ||
log.Errorf("export error (id: %s): %v", export.Id, err) | ||
trace.PrintError(err) | ||
svc.cache.Set(export.Id, export) | ||
return | ||
} | ||
jsonString := string(jsonBytes) | ||
f := utils.OpenFile(export.DownloadPath) | ||
_, err = f.WriteString(jsonString) | ||
if err != nil { | ||
// error | ||
export.Status = constants.TaskStatusError | ||
export.EndTs = time.Now() | ||
log.Errorf("export error (id: %s): %v", export.Id, err) | ||
trace.PrintError(err) | ||
svc.cache.Set(export.Id, export) | ||
return | ||
} | ||
} | ||
|
||
func (svc *JsonService) getExportDir() (dir string, err error) { | ||
tempDir := os.TempDir() | ||
exportDir := path.Join(tempDir, "export", "json") | ||
if !utils.Exists(exportDir) { | ||
err := os.MkdirAll(exportDir, 0755) | ||
if err != nil { | ||
return "", err | ||
} | ||
} | ||
return exportDir, nil | ||
} | ||
|
||
func (svc *JsonService) getFileName(exportId string) (fileName string) { | ||
return exportId + "_" + time.Now().Format("20060102150405") + ".json" | ||
} | ||
|
||
// getDownloadPath returns the download path for the export | ||
// format: <tempDir>/export/<exportId>/<exportId>_<timestamp>.csv | ||
func (svc *JsonService) getDownloadPath(exportId string) (downloadPath string) { | ||
exportDir, err := svc.getExportDir() | ||
if err != nil { | ||
return "" | ||
} | ||
downloadPath = path.Join(exportDir, svc.getFileName(exportId)) | ||
return downloadPath | ||
} | ||
|
||
func NewJsonService() (svc2 interfaces.ExportService) { | ||
cache := ttlcache.NewCache() | ||
cache.SetTTL(time.Minute * 5) | ||
svc := &JsonService{ | ||
cache: cache, | ||
} | ||
return svc | ||
} | ||
|
||
var _jsonService interfaces.ExportService | ||
|
||
func GetJsonService() (svc interfaces.ExportService) { | ||
if _jsonService == nil { | ||
_jsonService = NewJsonService() | ||
} | ||
return _jsonService | ||
} |