refactor common code

pull/547/head
Aram Akhavan 6 months ago
parent 4e5c76b259
commit 98d958888c

@ -1,12 +0,0 @@
package database
import (
"github.com/analogj/scrutiny/webapp/backend/pkg/models/measurements"
"sort"
)
func sortSmartMeasurementsDesc(smartResults []measurements.Smart) {
sort.SliceStable(smartResults, func(i, j int) bool {
return smartResults[i].Date.After(smartResults[j].Date)
})
}

@ -1,30 +0,0 @@
package database
import (
"github.com/analogj/scrutiny/webapp/backend/pkg/models/measurements"
"github.com/stretchr/testify/require"
"testing"
"time"
)
func Test_sortSmartMeasurementsDesc_LatestFirst(t *testing.T) {
//setup
timeNow := time.Now()
smartResults := []measurements.Smart{
{
Date: timeNow.AddDate(0, 0, -2),
},
{
Date: timeNow,
},
{
Date: timeNow.AddDate(0, 0, -1),
},
}
//test
sortSmartMeasurementsDesc(smartResults)
//assert
require.Equal(t, smartResults[0].Date, timeNow)
}

@ -21,8 +21,7 @@ type DeviceRepo interface {
DeleteDevice(ctx context.Context, wwn string) error DeleteDevice(ctx context.Context, wwn string) error
SaveSmartAttributes(ctx context.Context, wwn string, collectorSmartData collector.SmartInfo) (measurements.Smart, error) SaveSmartAttributes(ctx context.Context, wwn string, collectorSmartData collector.SmartInfo) (measurements.Smart, error)
GetSmartAttributeHistory(ctx context.Context, wwn string, durationKey string, attributes []string) ([]measurements.Smart, error) GetSmartAttributeHistory(ctx context.Context, wwn string, durationKey string, n int, offset int, attributes []string) ([]measurements.Smart, error)
GetSmartAttributeHistoryTail(ctx context.Context, wwn string, n int, offset int, attributes []string) ([]measurements.Smart, error)
SaveSmartTemperature(ctx context.Context, wwn string, deviceProtocol string, collectorSmartData collector.SmartInfo) error SaveSmartTemperature(ctx context.Context, wwn string, deviceProtocol string, collectorSmartData collector.SmartInfo) error

@ -31,14 +31,14 @@ func (sr *scrutinyRepository) SaveSmartAttributes(ctx context.Context, wwn strin
} }
// GetSmartAttributeHistory MUST return in sorted order, where newest entries are at the beginning of the list, and oldest are at the end. // GetSmartAttributeHistory MUST return in sorted order, where newest entries are at the beginning of the list, and oldest are at the end.
func (sr *scrutinyRepository) GetSmartAttributeHistory(ctx context.Context, wwn string, durationKey string, attributes []string) ([]measurements.Smart, error) { func (sr *scrutinyRepository) GetSmartAttributeHistory(ctx context.Context, wwn string, durationKey string, n int, offset int, attributes []string) ([]measurements.Smart, error) {
// Get SMartResults from InfluxDB // Get SMartResults from InfluxDB
//TODO: change the filter startrange to a real number. //TODO: change the filter startrange to a real number.
// Get parser flux query result // Get parser flux query result
//appConfig.GetString("web.influxdb.bucket") //appConfig.GetString("web.influxdb.bucket")
queryStr := sr.aggregateSmartAttributesQuery(wwn, durationKey) queryStr := sr.aggregateSmartAttributesQuery(wwn, durationKey, n, offset, attributes)
log.Infoln(queryStr) log.Infoln(queryStr)
smartResults := []measurements.Smart{} smartResults := []measurements.Smart{}
@ -66,9 +66,6 @@ func (sr *scrutinyRepository) GetSmartAttributeHistory(ctx context.Context, wwn
return nil, err return nil, err
} }
//we have to sort the smartResults again, because the `union` command will return multiple 'tables' and only sort the records in each table.
sortSmartMeasurementsDesc(smartResults)
return smartResults, nil return smartResults, nil
//if err := device.SquashHistory(); err != nil { //if err := device.SquashHistory(); err != nil {
@ -85,73 +82,6 @@ func (sr *scrutinyRepository) GetSmartAttributeHistory(ctx context.Context, wwn
} }
// GetSmartAttributeHistory MUST return in sorted order, where newest entries are at the beginning of the list, and oldest are at the end.
func (sr *scrutinyRepository) GetSmartAttributeHistoryTail(ctx context.Context, wwn string, n int, offset int, attributes []string) ([]measurements.Smart, error) {
partialQueryStr := []string{
`import "influxdata/influxdb/schema"`,
}
nestedDurationKeys := sr.lookupNestedDurationKeys(DURATION_KEY_FOREVER)
subQueryNames := []string{}
for _, nestedDurationKey := range nestedDurationKeys {
bucketName := sr.lookupBucketName(nestedDurationKey)
durationRange := sr.lookupDuration(nestedDurationKey)
subQueryNames = append(subQueryNames, fmt.Sprintf(`%sData`, nestedDurationKey))
partialQueryStr = append(partialQueryStr, []string{
fmt.Sprintf(`%sData = from(bucket: "%s")`, nestedDurationKey, bucketName),
fmt.Sprintf(`|> range(start: %s, stop: %s)`, durationRange[0], durationRange[1]),
`|> filter(fn: (r) => r["_measurement"] == "smart" )`,
fmt.Sprintf(`|> filter(fn: (r) => r["device_wwn"] == "%s" )`, wwn),
// We only need the last `offset` # of entries from each table to guarantee we can
// get the last `n` # of entries starting from `offset` of the union
fmt.Sprintf(`|> tail(n: %d)`, offset),
"|> schema.fieldsAsCols()",
}...)
}
partialQueryStr = append(partialQueryStr, []string{
fmt.Sprintf("union(tables: [%s])", strings.Join(subQueryNames, ", ")),
"|> group()",
`|> sort(columns: ["_time"], desc: false)`,
fmt.Sprintf(`|> tail(n: %d, offset: %d)`, n, offset),
`|> yield(name: "last")`,
}...)
queryStr := strings.Join(partialQueryStr, "\n")
log.Infoln(queryStr)
smartResults := []measurements.Smart{}
result, err := sr.influxQueryApi.Query(ctx, queryStr)
if err == nil {
// Use Next() to iterate over query result lines
for result.Next() {
// Observe when there is new grouping key producing new table
if result.TableChanged() {
//fmt.Printf("table: %s\n", result.TableMetadata().String())
}
smartData, err := measurements.NewSmartFromInfluxDB(result.Record().Values())
if err != nil {
return nil, err
}
smartResults = append(smartResults, *smartData)
}
if result.Err() != nil {
fmt.Printf("Query error: %s\n", result.Err().Error())
}
} else {
return nil, err
}
//we have to sort the smartResults again, because the `union` command will return multiple 'tables' and only sort the records in each table.
sortSmartMeasurementsDesc(smartResults)
return smartResults, nil
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Helper Methods // Helper Methods
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -167,7 +97,7 @@ func (sr *scrutinyRepository) saveDatapoint(influxWriteApi api.WriteAPIBlocking,
return influxWriteApi.WritePoint(ctx, p) return influxWriteApi.WritePoint(ctx, p)
} }
func (sr *scrutinyRepository) aggregateSmartAttributesQuery(wwn string, durationKey string) string { func (sr *scrutinyRepository) aggregateSmartAttributesQuery(wwn string, durationKey string, n int, offset int, attributes []string) string {
/* /*
@ -176,28 +106,34 @@ func (sr *scrutinyRepository) aggregateSmartAttributesQuery(wwn string, duration
|> range(start: -1w, stop: now()) |> range(start: -1w, stop: now())
|> filter(fn: (r) => r["_measurement"] == "smart" ) |> filter(fn: (r) => r["_measurement"] == "smart" )
|> filter(fn: (r) => r["device_wwn"] == "0x5000c5002df89099" ) |> filter(fn: (r) => r["device_wwn"] == "0x5000c5002df89099" )
|> tail(n: 10, offset: 0)
|> schema.fieldsAsCols() |> schema.fieldsAsCols()
monthData = from(bucket: "metrics_weekly") monthData = from(bucket: "metrics_weekly")
|> range(start: -1mo, stop: -1w) |> range(start: -1mo, stop: -1w)
|> filter(fn: (r) => r["_measurement"] == "smart" ) |> filter(fn: (r) => r["_measurement"] == "smart" )
|> filter(fn: (r) => r["device_wwn"] == "0x5000c5002df89099" ) |> filter(fn: (r) => r["device_wwn"] == "0x5000c5002df89099" )
|> tail(n: 10, offset: 0)
|> schema.fieldsAsCols() |> schema.fieldsAsCols()
yearData = from(bucket: "metrics_monthly") yearData = from(bucket: "metrics_monthly")
|> range(start: -1y, stop: -1mo) |> range(start: -1y, stop: -1mo)
|> filter(fn: (r) => r["_measurement"] == "smart" ) |> filter(fn: (r) => r["_measurement"] == "smart" )
|> filter(fn: (r) => r["device_wwn"] == "0x5000c5002df89099" ) |> filter(fn: (r) => r["device_wwn"] == "0x5000c5002df89099" )
|> tail(n: 10, offset: 0)
|> schema.fieldsAsCols() |> schema.fieldsAsCols()
foreverData = from(bucket: "metrics_yearly") foreverData = from(bucket: "metrics_yearly")
|> range(start: -10y, stop: -1y) |> range(start: -10y, stop: -1y)
|> filter(fn: (r) => r["_measurement"] == "smart" ) |> filter(fn: (r) => r["_measurement"] == "smart" )
|> filter(fn: (r) => r["device_wwn"] == "0x5000c5002df89099" ) |> filter(fn: (r) => r["device_wwn"] == "0x5000c5002df89099" )
|> tail(n: 10, offset: 0)
|> schema.fieldsAsCols() |> schema.fieldsAsCols()
union(tables: [weekData, monthData, yearData, foreverData]) union(tables: [weekData, monthData, yearData, foreverData])
|> sort(columns: ["_time"], desc: false) |> group()
|> sort(columns: ["_time"], desc: true)
|> tail(n: 6, offset: 4)
|> yield(name: "last") |> yield(name: "last")
*/ */
@ -208,34 +144,57 @@ func (sr *scrutinyRepository) aggregateSmartAttributesQuery(wwn string, duration
nestedDurationKeys := sr.lookupNestedDurationKeys(durationKey) nestedDurationKeys := sr.lookupNestedDurationKeys(durationKey)
if len(nestedDurationKeys) == 1 {
//there's only one bucket being queried, no need to union, just aggregate the dataset and return
partialQueryStr = append(partialQueryStr, []string{
sr.generateSmartAttributesSubquery(wwn, nestedDurationKeys[0], n, offset, attributes),
fmt.Sprintf(`%sData`, nestedDurationKeys[0]),
`|> sort(columns: ["_time"], desc: true)`,
`|> yield()`,
}...)
return strings.Join(partialQueryStr, "\n")
}
subQueries := []string{}
subQueryNames := []string{} subQueryNames := []string{}
for _, nestedDurationKey := range nestedDurationKeys { for _, nestedDurationKey := range nestedDurationKeys {
bucketName := sr.lookupBucketName(nestedDurationKey)
durationRange := sr.lookupDuration(nestedDurationKey)
subQueryNames = append(subQueryNames, fmt.Sprintf(`%sData`, nestedDurationKey)) subQueryNames = append(subQueryNames, fmt.Sprintf(`%sData`, nestedDurationKey))
partialQueryStr = append(partialQueryStr, []string{ if n > 0 {
fmt.Sprintf(`%sData = from(bucket: "%s")`, nestedDurationKey, bucketName), // We only need the last `n + offset` # of entries from each table to guarantee we can
fmt.Sprintf(`|> range(start: %s, stop: %s)`, durationRange[0], durationRange[1]), // get the last `n` # of entries starting from `offset` of the union
`|> filter(fn: (r) => r["_measurement"] == "smart" )`, subQueries = append(subQueries, sr.generateSmartAttributesSubquery(wwn, nestedDurationKey, n+offset, 0, attributes))
fmt.Sprintf(`|> filter(fn: (r) => r["device_wwn"] == "%s" )`, wwn), } else {
"|> schema.fieldsAsCols()", subQueries = append(subQueries, sr.generateSmartAttributesSubquery(wwn, nestedDurationKey, 0, 0, attributes))
}...) }
}
partialQueryStr = append(partialQueryStr, subQueries...)
partialQueryStr = append(partialQueryStr, []string{
fmt.Sprintf("union(tables: [%s])", strings.Join(subQueryNames, ", ")),
`|> group()`,
`|> sort(columns: ["_time"], desc: true)`,
}...)
if n > 0 {
partialQueryStr = append(partialQueryStr, fmt.Sprintf(`|> tail(n: %d, offset: %d)`, n, offset))
} }
partialQueryStr = append(partialQueryStr, `|> yield(name: "last")`)
if len(subQueryNames) == 1 { return strings.Join(partialQueryStr, "\n")
//there's only one bucket being queried, no need to union, just aggregate the dataset and return }
partialQueryStr = append(partialQueryStr, []string{
subQueryNames[0], func (sr *scrutinyRepository) generateSmartAttributesSubquery(wwn string, durationKey string, n int, offset int, attributes []string) string {
`|> yield()`, bucketName := sr.lookupBucketName(durationKey)
}...) durationRange := sr.lookupDuration(durationKey)
} else {
partialQueryStr = append(partialQueryStr, []string{ partialQueryStr := []string{
fmt.Sprintf("union(tables: [%s])", strings.Join(subQueryNames, ", ")), fmt.Sprintf(`%sData = from(bucket: "%s")`, durationKey, bucketName),
`|> sort(columns: ["_time"], desc: false)`, fmt.Sprintf(`|> range(start: %s, stop: %s)`, durationRange[0], durationRange[1]),
`|> yield(name: "last")`, `|> filter(fn: (r) => r["_measurement"] == "smart" )`,
}...) fmt.Sprintf(`|> filter(fn: (r) => r["device_wwn"] == "%s" )`, wwn),
}
if n > 0 {
partialQueryStr = append(partialQueryStr, fmt.Sprintf(`|> tail(n: %d, offset: %d)`, n, offset))
} }
partialQueryStr = append(partialQueryStr, "|> schema.fieldsAsCols()")
return strings.Join(partialQueryStr, "\n") return strings.Join(partialQueryStr, "\n")
} }

@ -2,10 +2,11 @@ package measurements
import ( import (
"fmt" "fmt"
"github.com/analogj/scrutiny/webapp/backend/pkg"
"github.com/analogj/scrutiny/webapp/backend/pkg/thresholds"
"strconv" "strconv"
"strings" "strings"
"github.com/analogj/scrutiny/webapp/backend/pkg"
"github.com/analogj/scrutiny/webapp/backend/pkg/thresholds"
) )
type SmartAtaAttribute struct { type SmartAtaAttribute struct {
@ -24,6 +25,10 @@ type SmartAtaAttribute struct {
FailureRate float64 `json:"failure_rate,omitempty"` FailureRate float64 `json:"failure_rate,omitempty"`
} }
func (sa *SmartAtaAttribute) GetTransformedValue() int64 {
return sa.TransformedValue
}
func (sa *SmartAtaAttribute) GetStatus() pkg.AttributeStatus { func (sa *SmartAtaAttribute) GetStatus() pkg.AttributeStatus {
return sa.Status return sa.Status
} }

@ -6,4 +6,5 @@ type SmartAttribute interface {
Flatten() (fields map[string]interface{}) Flatten() (fields map[string]interface{})
Inflate(key string, val interface{}) Inflate(key string, val interface{})
GetStatus() pkg.AttributeStatus GetStatus() pkg.AttributeStatus
GetTransformedValue() int64
} }

@ -2,9 +2,10 @@ package measurements
import ( import (
"fmt" "fmt"
"strings"
"github.com/analogj/scrutiny/webapp/backend/pkg" "github.com/analogj/scrutiny/webapp/backend/pkg"
"github.com/analogj/scrutiny/webapp/backend/pkg/thresholds" "github.com/analogj/scrutiny/webapp/backend/pkg/thresholds"
"strings"
) )
type SmartNvmeAttribute struct { type SmartNvmeAttribute struct {
@ -18,6 +19,10 @@ type SmartNvmeAttribute struct {
FailureRate float64 `json:"failure_rate,omitempty"` FailureRate float64 `json:"failure_rate,omitempty"`
} }
func (sa *SmartNvmeAttribute) GetTransformedValue() int64 {
return sa.TransformedValue
}
func (sa *SmartNvmeAttribute) GetStatus() pkg.AttributeStatus { func (sa *SmartNvmeAttribute) GetStatus() pkg.AttributeStatus {
return sa.Status return sa.Status
} }

@ -2,9 +2,10 @@ package measurements
import ( import (
"fmt" "fmt"
"strings"
"github.com/analogj/scrutiny/webapp/backend/pkg" "github.com/analogj/scrutiny/webapp/backend/pkg"
"github.com/analogj/scrutiny/webapp/backend/pkg/thresholds" "github.com/analogj/scrutiny/webapp/backend/pkg/thresholds"
"strings"
) )
type SmartScsiAttribute struct { type SmartScsiAttribute struct {
@ -18,6 +19,10 @@ type SmartScsiAttribute struct {
FailureRate float64 `json:"failure_rate,omitempty"` FailureRate float64 `json:"failure_rate,omitempty"`
} }
func (sa *SmartScsiAttribute) GetTransformedValue() int64 {
return sa.TransformedValue
}
func (sa *SmartScsiAttribute) GetStatus() pkg.AttributeStatus { func (sa *SmartScsiAttribute) GetStatus() pkg.AttributeStatus {
return sa.Status return sa.Status
} }

@ -90,30 +90,12 @@ func ShouldNotify(device models.Device, smartAttrs measurements.Smart, statusThr
failingAttributes = append(failingAttributes, attrId) failingAttributes = append(failingAttributes, attrId)
} }
if repeatNotifications { if !repeatNotifications {
lastPoints, err := deviceRepo.GetSmartAttributeHistoryTail(c, c.Param("wwn"), 1, 1, failingAttributes) lastPoints, err := deviceRepo.GetSmartAttributeHistory(c, c.Param("wwn"), database.DURATION_KEY_FOREVER, 1, 1, failingAttributes)
if err == nil && len(lastPoints) > 1 { if err == nil && len(lastPoints) > 1 {
for _, attrId := range failingAttributes { for _, attrId := range failingAttributes {
if old, ok := lastPoints[0].Attributes[attrId].(*measurements.SmartAtaAttribute); ok { if lastPoints[0].Attributes[attrId].GetTransformedValue() != smartAttrs.Attributes[attrId].GetTransformedValue() {
if current, ok := smartAttrs.Attributes[attrId].(*measurements.SmartAtaAttribute); ok { return true
if old.TransformedValue != current.TransformedValue {
return true
}
}
}
if old, ok := lastPoints[0].Attributes[attrId].(*measurements.SmartNvmeAttribute); ok {
if current, ok := smartAttrs.Attributes[attrId].(*measurements.SmartNvmeAttribute); ok {
if old.TransformedValue != current.TransformedValue {
return true
}
}
}
if old, ok := lastPoints[0].Attributes[attrId].(*measurements.SmartScsiAttribute); ok {
if current, ok := smartAttrs.Attributes[attrId].(*measurements.SmartScsiAttribute); ok {
if old.TransformedValue != current.TransformedValue {
return true
}
}
} }
} }
return false return false
@ -251,7 +233,7 @@ func (n *Notify) Send() error {
notifyScripts := []string{} notifyScripts := []string{}
notifyShoutrrr := []string{} notifyShoutrrr := []string{}
for ndx, _ := range configUrls { for ndx := range configUrls {
if strings.HasPrefix(configUrls[ndx], "https://") || strings.HasPrefix(configUrls[ndx], "http://") { if strings.HasPrefix(configUrls[ndx], "https://") || strings.HasPrefix(configUrls[ndx], "http://") {
notifyWebhooks = append(notifyWebhooks, configUrls[ndx]) notifyWebhooks = append(notifyWebhooks, configUrls[ndx])
} else if strings.HasPrefix(configUrls[ndx], "script://") { } else if strings.HasPrefix(configUrls[ndx], "script://") {

@ -1,11 +1,12 @@
package handler package handler
import ( import (
"net/http"
"github.com/analogj/scrutiny/webapp/backend/pkg/database" "github.com/analogj/scrutiny/webapp/backend/pkg/database"
"github.com/analogj/scrutiny/webapp/backend/pkg/thresholds" "github.com/analogj/scrutiny/webapp/backend/pkg/thresholds"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"net/http"
) )
func GetDeviceDetails(c *gin.Context) { func GetDeviceDetails(c *gin.Context) {
@ -24,7 +25,7 @@ func GetDeviceDetails(c *gin.Context) {
durationKey = "forever" durationKey = "forever"
} }
smartResults, err := deviceRepo.GetSmartAttributeHistory(c, c.Param("wwn"), durationKey, nil) smartResults, err := deviceRepo.GetSmartAttributeHistory(c, c.Param("wwn"), durationKey, 0, 0, nil)
if err != nil { if err != nil {
logger.Errorln("An error occurred while retrieving device smart results", err) logger.Errorln("An error occurred while retrieving device smart results", err)
c.JSON(http.StatusInternalServerError, gin.H{"success": false}) c.JSON(http.StatusInternalServerError, gin.H{"success": false})

Loading…
Cancel
Save