From bff83de3a059dcc875eeee231481143d933e62ab Mon Sep 17 00:00:00 2001 From: Jason Kulatunga Date: Wed, 17 Nov 2021 18:35:50 -0800 Subject: [PATCH] query temp data across multiple buckets --- .../pkg/database/scrutiny_repository.go | 102 ++++++++++++++---- 1 file changed, 83 insertions(+), 19 deletions(-) diff --git a/webapp/backend/pkg/database/scrutiny_repository.go b/webapp/backend/pkg/database/scrutiny_repository.go index 3e2c61b..0753a48 100644 --- a/webapp/backend/pkg/database/scrutiny_repository.go +++ b/webapp/backend/pkg/database/scrutiny_repository.go @@ -15,6 +15,7 @@ import ( "gorm.io/driver/sqlite" "gorm.io/gorm" "gorm.io/gorm/clause" + "strings" "time" ) @@ -520,19 +521,7 @@ func (sr *scrutinyRepository) GetSmartTemperatureHistory(ctx context.Context, du deviceTempHistory := map[string][]measurements.SmartTemperature{} //TODO: change the query range to a variable. - queryStr := fmt.Sprintf(` - import "influxdata/influxdb/schema" - from(bucket: "%s") - |> range(start: %s, stop: now()) - |> filter(fn: (r) => r["_measurement"] == "temp" ) - |> aggregateWindow(every: 1h, fn: mean, createEmpty: false) - |> schema.fieldsAsCols() - |> group(columns: ["device_wwn"]) - |> yield(name: "last") - `, - sr.lookupBucketName(durationKey), - sr.lookupDuration(durationKey), - ) + queryStr := sr.aggregateTempQuery(durationKey) result, err := sr.influxQueryApi.Query(ctx, queryStr) if err == nil { @@ -671,21 +660,96 @@ func (sr *scrutinyRepository) lookupBucketName(durationKey string) string { return sr.appConfig.GetString("web.influxdb.bucket") } -func (sr *scrutinyRepository) lookupDuration(durationKey string) string { +func (sr *scrutinyRepository) lookupDuration(durationKey string) []string { switch durationKey { case "week": //data stored in the last week - return "-1w" + return []string{"-1w", "now()"} case "month": // data stored in the last month (after the first week) - return "-1mo" + return []string{"-1mo", "-1w"} case "year": // data stored in the last year (after the first month) - return "-1y" + return []string{"-1y", "-1mo"} case "forever": //data stored before the last year - return "-10y" + return []string{"-10y", "-1y"} } - return "-1w" + return []string{"-1w", "now()"} +} + +func (sr *scrutinyRepository) lookupNestedDurationKeys(durationKey string) []string { + switch durationKey { + case "week": + //all data is stored in a single bucket + return []string{"week"} + case "month": + //data is stored in the week bucket and the month bucket + return []string{"week", "month"} + case "year": + // data stored in the last year (after the first month) + return []string{"week", "month", "year"} + case "forever": + //data stored before the last year + return []string{"week", "month", "year"} + } + return []string{"week"} +} + +func (sr *scrutinyRepository) aggregateTempQuery(durationKey string) string { + + //TODO: change the query range to a variable. + //queryStr := fmt.Sprintf(` + //import "influxdata/influxdb/schema" + //from(bucket: "%s") + //|> range(start: %s, stop: now()) + //|> filter(fn: (r) => r["_measurement"] == "temp" ) + //|> aggregateWindow(every: 1h, fn: mean, createEmpty: false) + //|> schema.fieldsAsCols() + //|> group(columns: ["device_wwn"]) + //|> yield(name: "last") + // `, + // sr.lookupBucketName(durationKey), + // sr.lookupDuration(durationKey), + //) + + partialQueryStr := []string{`import "influxdata/influxdb/schema"`} + + nestedDurationKeys := sr.lookupNestedDurationKeys(durationKey) + + subQueryNames := []string{} + for _, nestedDurationKey := range nestedDurationKeys { + bucketName := sr.lookupBucketName(nestedDurationKey) + durationRange := sr.lookupDuration(nestedDurationKey) + + subQueryNames = append(subQueryNames, fmt.Sprintf(`%sData`, nestedDurationKey)) + partialQueryStr = append(partialQueryStr, []string{ + fmt.Sprintf(`%sData = from(bucket: "%s")`, nestedDurationKey, bucketName), + fmt.Sprintf(`|> range(start: %s, stop: %s)`, durationRange[0], durationRange[1]), + `|> filter(fn: (r) => r["_measurement"] == "temp" )`, + `|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)`, + `|> group(columns: ["device_wwn"])`, + `|> toInt()`, + "", + }...) + } + + if len(subQueryNames) == 1 { + //there's only one bucket being queried, no need to union, just aggregate the dataset and return + partialQueryStr = append(partialQueryStr, []string{ + subQueryNames[0], + "|> schema.fieldsAsCols()", + "|> yield()", + }...) + } else { + partialQueryStr = append(partialQueryStr, []string{ + fmt.Sprintf("union(tables: [%s])", strings.Join(subQueryNames, ", ")), + `|> group(columns: ["device_wwn"])`, + `|> sort(columns: ["_time"], desc: false)`, + "|> schema.fieldsAsCols()", + }...) + } + + return strings.Join(partialQueryStr, "\n") }