From 0f0efac866e673e33a55eb3cdc5e22e85802ba34 Mon Sep 17 00:00:00 2001 From: Jason Kulatunga Date: Sat, 9 Jul 2022 10:42:30 -0700 Subject: [PATCH] fix update, using raw flux script. --- .../pkg/database/scrutiny_repository_tasks.go | 63 ++++--- .../scrutiny_repository_tasks_test.go | 159 ++++++++++-------- 2 files changed, 122 insertions(+), 100 deletions(-) diff --git a/webapp/backend/pkg/database/scrutiny_repository_tasks.go b/webapp/backend/pkg/database/scrutiny_repository_tasks.go index 92b67f4..82b6040 100644 --- a/webapp/backend/pkg/database/scrutiny_repository_tasks.go +++ b/webapp/backend/pkg/database/scrutiny_repository_tasks.go @@ -11,10 +11,10 @@ import ( //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// func (sr *scrutinyRepository) EnsureTasks(ctx context.Context, orgID string) error { weeklyTaskName := "tsk-weekly-aggr" - weeklyTaskScript := sr.DownsampleScript("weekly") + weeklyTaskScript := sr.DownsampleScript("weekly", weeklyTaskName, "0 1 * * 0") if found, findErr := sr.influxTaskApi.FindTasks(ctx, &api.TaskFilter{Name: weeklyTaskName}); findErr == nil && len(found) == 0 { //weekly on Sunday at 1:00am - _, err := sr.influxTaskApi.CreateTaskWithCron(ctx, weeklyTaskName, weeklyTaskScript, "0 1 * * 0", orgID) + _, err := sr.influxTaskApi.CreateTaskByFlux(ctx, weeklyTaskScript, orgID) if err != nil { return err } @@ -32,10 +32,10 @@ func (sr *scrutinyRepository) EnsureTasks(ctx context.Context, orgID string) err } monthlyTaskName := "tsk-monthly-aggr" - monthlyTaskScript := sr.DownsampleScript("monthly") + monthlyTaskScript := sr.DownsampleScript("monthly", monthlyTaskName, "30 1 1 * *") if found, findErr := sr.influxTaskApi.FindTasks(ctx, &api.TaskFilter{Name: monthlyTaskName}); findErr == nil && len(found) == 0 { //monthly on first day of the month at 1:30am - _, err := sr.influxTaskApi.CreateTaskWithCron(ctx, monthlyTaskName, monthlyTaskScript, "30 1 1 * *", orgID) + _, err := sr.influxTaskApi.CreateTaskByFlux(ctx, monthlyTaskScript, orgID) if err != nil { return err } @@ -53,10 +53,10 @@ func (sr *scrutinyRepository) EnsureTasks(ctx context.Context, orgID string) err } yearlyTaskName := "tsk-yearly-aggr" - yearlyTaskScript := sr.DownsampleScript("yearly") + yearlyTaskScript := sr.DownsampleScript("yearly", yearlyTaskName, "0 2 1 1 *") if found, findErr := sr.influxTaskApi.FindTasks(ctx, &api.TaskFilter{Name: yearlyTaskName}); findErr == nil && len(found) == 0 { //yearly on the first day of the year at 2:00am - _, err := sr.influxTaskApi.CreateTaskWithCron(ctx, yearlyTaskName, yearlyTaskScript, "0 2 1 1 *", orgID) + _, err := sr.influxTaskApi.CreateTaskByFlux(ctx, yearlyTaskScript, orgID) if err != nil { return err } @@ -75,7 +75,7 @@ func (sr *scrutinyRepository) EnsureTasks(ctx context.Context, orgID string) err return nil } -func (sr *scrutinyRepository) DownsampleScript(aggregationType string) string { +func (sr *scrutinyRepository) DownsampleScript(aggregationType string, name string, cron string) string { var sourceBucket string // the source of the data var destBucket string // the destination for the aggregated data var rangeStart string @@ -124,30 +124,37 @@ func (sr *scrutinyRepository) DownsampleScript(aggregationType string) string { */ return fmt.Sprintf(` - sourceBucket = "%s" - rangeStart = %s - rangeEnd = %s - aggWindow = %s - destBucket = "%s" - destOrg = "%s" +option task = { + name: "%s", + cron: "%s", +} + +sourceBucket = "%s" +rangeStart = %s +rangeEnd = %s +aggWindow = %s +destBucket = "%s" +destOrg = "%s" - from(bucket: sourceBucket) - |> range(start: rangeStart, stop: rangeEnd) - |> filter(fn: (r) => r["_measurement"] == "smart" ) - |> group(columns: ["device_wwn", "_field"]) - |> aggregateWindow(every: aggWindow, fn: last, createEmpty: false) - |> to(bucket: destBucket, org: destOrg) +from(bucket: sourceBucket) +|> range(start: rangeStart, stop: rangeEnd) +|> filter(fn: (r) => r["_measurement"] == "smart" ) +|> group(columns: ["device_wwn", "_field"]) +|> aggregateWindow(every: aggWindow, fn: last, createEmpty: false) +|> to(bucket: destBucket, org: destOrg) - from(bucket: sourceBucket) - |> range(start: rangeStart, stop: rangeEnd) - |> filter(fn: (r) => r["_measurement"] == "temp") - |> group(columns: ["device_wwn"]) - |> toInt() - |> aggregateWindow(fn: mean, every: aggWindow, createEmpty: false) - |> set(key: "_measurement", value: "temp") - |> set(key: "_field", value: "temp") - |> to(bucket: destBucket, org: destOrg) +from(bucket: sourceBucket) +|> range(start: rangeStart, stop: rangeEnd) +|> filter(fn: (r) => r["_measurement"] == "temp") +|> group(columns: ["device_wwn"]) +|> toInt() +|> aggregateWindow(fn: mean, every: aggWindow, createEmpty: false) +|> set(key: "_measurement", value: "temp") +|> set(key: "_field", value: "temp") +|> to(bucket: destBucket, org: destOrg) `, + name, + cron, sourceBucket, rangeStart, rangeEnd, diff --git a/webapp/backend/pkg/database/scrutiny_repository_tasks_test.go b/webapp/backend/pkg/database/scrutiny_repository_tasks_test.go index 4f2dc51..e1e5e5c 100644 --- a/webapp/backend/pkg/database/scrutiny_repository_tasks_test.go +++ b/webapp/backend/pkg/database/scrutiny_repository_tasks_test.go @@ -24,33 +24,38 @@ func Test_DownsampleScript_Weekly(t *testing.T) { aggregationType := "weekly" //test - influxDbScript := deviceRepo.DownsampleScript(aggregationType) + influxDbScript := deviceRepo.DownsampleScript(aggregationType, "tsk-weekly-aggr", "0 1 * * 0") //assert require.Equal(t, ` - sourceBucket = "metrics" - rangeStart = -2w - rangeEnd = -1w - aggWindow = 1w - destBucket = "metrics_weekly" - destOrg = "scrutiny" - - from(bucket: sourceBucket) - |> range(start: rangeStart, stop: rangeEnd) - |> filter(fn: (r) => r["_measurement"] == "smart" ) - |> group(columns: ["device_wwn", "_field"]) - |> aggregateWindow(every: aggWindow, fn: last, createEmpty: false) - |> to(bucket: destBucket, org: destOrg) - - from(bucket: sourceBucket) - |> range(start: rangeStart, stop: rangeEnd) - |> filter(fn: (r) => r["_measurement"] == "temp") - |> group(columns: ["device_wwn"]) - |> toInt() - |> aggregateWindow(fn: mean, every: aggWindow, createEmpty: false) - |> set(key: "_measurement", value: "temp") - |> set(key: "_field", value: "temp") - |> to(bucket: destBucket, org: destOrg) +option task = { + name: "tsk-weekly-aggr", + cron: "0 1 * * 0", +} + +sourceBucket = "metrics" +rangeStart = -2w +rangeEnd = -1w +aggWindow = 1w +destBucket = "metrics_weekly" +destOrg = "scrutiny" + +from(bucket: sourceBucket) +|> range(start: rangeStart, stop: rangeEnd) +|> filter(fn: (r) => r["_measurement"] == "smart" ) +|> group(columns: ["device_wwn", "_field"]) +|> aggregateWindow(every: aggWindow, fn: last, createEmpty: false) +|> to(bucket: destBucket, org: destOrg) + +from(bucket: sourceBucket) +|> range(start: rangeStart, stop: rangeEnd) +|> filter(fn: (r) => r["_measurement"] == "temp") +|> group(columns: ["device_wwn"]) +|> toInt() +|> aggregateWindow(fn: mean, every: aggWindow, createEmpty: false) +|> set(key: "_measurement", value: "temp") +|> set(key: "_field", value: "temp") +|> to(bucket: destBucket, org: destOrg) `, influxDbScript) } @@ -71,33 +76,38 @@ func Test_DownsampleScript_Monthly(t *testing.T) { aggregationType := "monthly" //test - influxDbScript := deviceRepo.DownsampleScript(aggregationType) + influxDbScript := deviceRepo.DownsampleScript(aggregationType, "tsk-monthly-aggr", "30 1 1 * *") //assert require.Equal(t, ` - sourceBucket = "metrics_weekly" - rangeStart = -2mo - rangeEnd = -1mo - aggWindow = 1mo - destBucket = "metrics_monthly" - destOrg = "scrutiny" - - from(bucket: sourceBucket) - |> range(start: rangeStart, stop: rangeEnd) - |> filter(fn: (r) => r["_measurement"] == "smart" ) - |> group(columns: ["device_wwn", "_field"]) - |> aggregateWindow(every: aggWindow, fn: last, createEmpty: false) - |> to(bucket: destBucket, org: destOrg) - - from(bucket: sourceBucket) - |> range(start: rangeStart, stop: rangeEnd) - |> filter(fn: (r) => r["_measurement"] == "temp") - |> group(columns: ["device_wwn"]) - |> toInt() - |> aggregateWindow(fn: mean, every: aggWindow, createEmpty: false) - |> set(key: "_measurement", value: "temp") - |> set(key: "_field", value: "temp") - |> to(bucket: destBucket, org: destOrg) +option task = { + name: "tsk-monthly-aggr", + cron: "30 1 1 * *", +} + +sourceBucket = "metrics_weekly" +rangeStart = -2mo +rangeEnd = -1mo +aggWindow = 1mo +destBucket = "metrics_monthly" +destOrg = "scrutiny" + +from(bucket: sourceBucket) +|> range(start: rangeStart, stop: rangeEnd) +|> filter(fn: (r) => r["_measurement"] == "smart" ) +|> group(columns: ["device_wwn", "_field"]) +|> aggregateWindow(every: aggWindow, fn: last, createEmpty: false) +|> to(bucket: destBucket, org: destOrg) + +from(bucket: sourceBucket) +|> range(start: rangeStart, stop: rangeEnd) +|> filter(fn: (r) => r["_measurement"] == "temp") +|> group(columns: ["device_wwn"]) +|> toInt() +|> aggregateWindow(fn: mean, every: aggWindow, createEmpty: false) +|> set(key: "_measurement", value: "temp") +|> set(key: "_field", value: "temp") +|> to(bucket: destBucket, org: destOrg) `, influxDbScript) } @@ -118,32 +128,37 @@ func Test_DownsampleScript_Yearly(t *testing.T) { aggregationType := "yearly" //test - influxDbScript := deviceRepo.DownsampleScript(aggregationType) + influxDbScript := deviceRepo.DownsampleScript(aggregationType, "tsk-yearly-aggr", "0 2 1 1 *") //assert require.Equal(t, ` - sourceBucket = "metrics_monthly" - rangeStart = -2y - rangeEnd = -1y - aggWindow = 1y - destBucket = "metrics_yearly" - destOrg = "scrutiny" - - from(bucket: sourceBucket) - |> range(start: rangeStart, stop: rangeEnd) - |> filter(fn: (r) => r["_measurement"] == "smart" ) - |> group(columns: ["device_wwn", "_field"]) - |> aggregateWindow(every: aggWindow, fn: last, createEmpty: false) - |> to(bucket: destBucket, org: destOrg) - - from(bucket: sourceBucket) - |> range(start: rangeStart, stop: rangeEnd) - |> filter(fn: (r) => r["_measurement"] == "temp") - |> group(columns: ["device_wwn"]) - |> toInt() - |> aggregateWindow(fn: mean, every: aggWindow, createEmpty: false) - |> set(key: "_measurement", value: "temp") - |> set(key: "_field", value: "temp") - |> to(bucket: destBucket, org: destOrg) +option task = { + name: "tsk-yearly-aggr", + cron: "0 2 1 1 *", +} + +sourceBucket = "metrics_monthly" +rangeStart = -2y +rangeEnd = -1y +aggWindow = 1y +destBucket = "metrics_yearly" +destOrg = "scrutiny" + +from(bucket: sourceBucket) +|> range(start: rangeStart, stop: rangeEnd) +|> filter(fn: (r) => r["_measurement"] == "smart" ) +|> group(columns: ["device_wwn", "_field"]) +|> aggregateWindow(every: aggWindow, fn: last, createEmpty: false) +|> to(bucket: destBucket, org: destOrg) + +from(bucket: sourceBucket) +|> range(start: rangeStart, stop: rangeEnd) +|> filter(fn: (r) => r["_measurement"] == "temp") +|> group(columns: ["device_wwn"]) +|> toInt() +|> aggregateWindow(fn: mean, every: aggWindow, createEmpty: false) +|> set(key: "_measurement", value: "temp") +|> set(key: "_field", value: "temp") +|> to(bucket: destBucket, org: destOrg) `, influxDbScript) }