You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
scrutiny/webapp/backend/pkg/database/scrutiny_repository.go

513 lines
18 KiB

package database
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"github.com/analogj/scrutiny/webapp/backend/pkg/config"
"github.com/analogj/scrutiny/webapp/backend/pkg/models"
"github.com/glebarez/sqlite"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/domain"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
"io/ioutil"
"net/http"
"net/url"
"time"
)
const (
// 60seconds * 60minutes * 24hours * 15 days
RETENTION_PERIOD_15_DAYS_IN_SECONDS = 1_296_000
// 60seconds * 60minutes * 24hours * 7 days * 9 weeks
RETENTION_PERIOD_9_WEEKS_IN_SECONDS = 5_443_200
// 60seconds * 60minutes * 24hours * 7 days * (52 + 52 + 4)weeks
RETENTION_PERIOD_25_MONTHS_IN_SECONDS = 65_318_400
DURATION_KEY_WEEK = "week"
DURATION_KEY_MONTH = "month"
DURATION_KEY_YEAR = "year"
DURATION_KEY_FOREVER = "forever"
)
//// GormLogger is a custom logger for Gorm, making it use logrus.
//type GormLogger struct{ Logger logrus.FieldLogger }
//
//// Print handles log events from Gorm for the custom logger.
//func (gl *GormLogger) Print(v ...interface{}) {
// switch v[0] {
// case "sql":
// gl.Logger.WithFields(
// logrus.Fields{
// "module": "gorm",
// "type": "sql",
// "rows": v[5],
// "src_ref": v[1],
// "values": v[4],
// },
// ).Debug(v[3])
// case "log":
// gl.Logger.WithFields(logrus.Fields{"module": "gorm", "type": "log"}).Print(v[2])
// }
//}
func NewScrutinyRepository(appConfig config.Interface, globalLogger logrus.FieldLogger) (DeviceRepo, error) {
backgroundContext := context.Background()
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Gorm/SQLite setup
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
globalLogger.Infof("Trying to connect to scrutiny sqlite db: %s\n", appConfig.GetString("web.database.location"))
// When a transaction cannot lock the database, because it is already locked by another one,
// SQLite by default throws an error: database is locked. This behavior is usually not appropriate when
// concurrent access is needed, typically when multiple processes write to the same database.
// PRAGMA busy_timeout lets you set a timeout or a handler for these events. When setting a timeout,
// SQLite will try the transaction multiple times within this timeout.
// fixes #341
// https://rsqlite.r-dbi.org/reference/sqlitesetbusyhandler
// retrying for 30000 milliseconds, 30seconds - this would be unreasonable for a distributed multi-tenant application,
// but should be fine for local usage.
pragmaStr := sqlitePragmaString(map[string]string{
"busy_timeout": "30000",
})
database, err := gorm.Open(sqlite.Open(appConfig.GetString("web.database.location")+pragmaStr), &gorm.Config{
//TODO: figure out how to log database queries again.
//Logger: logger
DisableForeignKeyConstraintWhenMigrating: true,
})
if err != nil {
return nil, fmt.Errorf("Failed to connect to database! - %v", err)
}
globalLogger.Infof("Successfully connected to scrutiny sqlite db: %s\n", appConfig.GetString("web.database.location"))
//database.SetLogger()
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// InfluxDB setup
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Create a new client using an InfluxDB server base URL and an authentication token
influxdbUrl := fmt.Sprintf("%s://%s:%s", appConfig.GetString("web.influxdb.scheme"), appConfig.GetString("web.influxdb.host"), appConfig.GetString("web.influxdb.port"))
globalLogger.Debugf("InfluxDB url: %s", influxdbUrl)
tlsConfig := &tls.Config{
InsecureSkipVerify: appConfig.GetBool("web.influxdb.tls.insecure_skip_verify"),
}
globalLogger.Infof("InfluxDB certificate verification: %t\n", !tlsConfig.InsecureSkipVerify)
client := influxdb2.NewClientWithOptions(
influxdbUrl,
appConfig.GetString("web.influxdb.token"),
influxdb2.DefaultOptions().SetTLSConfig(tlsConfig),
)
//if !appConfig.IsSet("web.influxdb.token") {
globalLogger.Debugf("Determine Influxdb setup status...")
influxSetupComplete, err := InfluxSetupComplete(influxdbUrl, tlsConfig)
if err != nil {
return nil, fmt.Errorf("failed to check influxdb setup status - %w", err)
}
if !influxSetupComplete {
globalLogger.Debugf("Influxdb un-initialized, running first-time setup...")
// if no token is provided, but we have a valid server, we're going to assume this is the first setup of our server.
// we will initialize with a predetermined username & password, that you should change.
// metrics bucket will have a retention period of 8 days (since it will be down-sampled once a week)
// in seconds (60seconds * 60minutes * 24hours * 15 days) = 1_296_000 (see EnsureBucket() function)
_, err := client.SetupWithToken(
backgroundContext,
appConfig.GetString("web.influxdb.init_username"),
appConfig.GetString("web.influxdb.init_password"),
appConfig.GetString("web.influxdb.org"),
appConfig.GetString("web.influxdb.bucket"),
0,
appConfig.GetString("web.influxdb.token"),
)
if err != nil {
return nil, err
}
}
// Use blocking write client for writes to desired bucket
writeAPI := client.WriteAPIBlocking(appConfig.GetString("web.influxdb.org"), appConfig.GetString("web.influxdb.bucket"))
// Get query client
queryAPI := client.QueryAPI(appConfig.GetString("web.influxdb.org"))
// Get task client
taskAPI := client.TasksAPI()
if writeAPI == nil || queryAPI == nil || taskAPI == nil {
return nil, fmt.Errorf("Failed to connect to influxdb!")
}
deviceRepo := scrutinyRepository{
appConfig: appConfig,
logger: globalLogger,
influxClient: client,
influxWriteApi: writeAPI,
influxQueryApi: queryAPI,
influxTaskApi: taskAPI,
gormClient: database,
}
orgInfo, err := client.OrganizationsAPI().FindOrganizationByName(backgroundContext, appConfig.GetString("web.influxdb.org"))
if err != nil {
return nil, err
}
// Initialize Buckets (if necessary)
err = deviceRepo.EnsureBuckets(backgroundContext, orgInfo)
if err != nil {
return nil, err
}
// Initialize Background Tasks
err = deviceRepo.EnsureTasks(backgroundContext, *orgInfo.Id)
if err != nil {
return nil, err
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// InfluxDB & SQLite migrations
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//database.AutoMigrate(&models.Device{})
err = deviceRepo.Migrate(backgroundContext)
if err != nil {
return nil, err
}
return &deviceRepo, nil
}
type scrutinyRepository struct {
appConfig config.Interface
logger logrus.FieldLogger
influxWriteApi api.WriteAPIBlocking
influxQueryApi api.QueryAPI
influxTaskApi api.TasksAPI
influxClient influxdb2.Client
gormClient *gorm.DB
}
func (sr *scrutinyRepository) Close() error {
sr.influxClient.Close()
return nil
}
func (sr *scrutinyRepository) HealthCheck(ctx context.Context) error {
//check influxdb
status, err := sr.influxClient.Health(ctx)
if err != nil {
return fmt.Errorf("influxdb healthcheck failed: %w", err)
}
if status.Status != "pass" {
return fmt.Errorf("influxdb healthcheckf failed: status=%s", status.Status)
}
//check sqlite db.
database, err := sr.gormClient.DB()
if err != nil {
return fmt.Errorf("sqlite healthcheck failed: %w", err)
}
err = database.Ping()
if err != nil {
return fmt.Errorf("sqlite healthcheck failed during ping: %w", err)
}
return nil
}
func InfluxSetupComplete(influxEndpoint string, tlsConfig *tls.Config) (bool, error) {
influxUri, err := url.Parse(influxEndpoint)
if err != nil {
return false, err
}
influxUri, err = influxUri.Parse("/api/v2/setup")
if err != nil {
return false, err
}
client := &http.Client{Transport: &http.Transport{TLSClientConfig: tlsConfig}}
res, err := client.Get(influxUri.String())
if err != nil {
return false, err
}
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return false, err
}
type SetupStatus struct {
Allowed bool `json:"allowed"`
}
var data SetupStatus
err = json.Unmarshal(body, &data)
if err != nil {
return false, err
}
return !data.Allowed, nil
}
func (sr *scrutinyRepository) EnsureBuckets(ctx context.Context, org *domain.Organization) error {
var mainBucketRetentionRule domain.RetentionRule
var weeklyBucketRetentionRule domain.RetentionRule
var monthlyBucketRetentionRule domain.RetentionRule
if sr.appConfig.GetBool("web.influxdb.retention_policy") {
// in tests, we may not want to set a retention policy. If "false", we can set data with old timestamps,
// then manually run the down sampling scripts. This should be true for production environments.
mainBucketRetentionRule = domain.RetentionRule{EverySeconds: RETENTION_PERIOD_15_DAYS_IN_SECONDS}
weeklyBucketRetentionRule = domain.RetentionRule{EverySeconds: RETENTION_PERIOD_9_WEEKS_IN_SECONDS}
monthlyBucketRetentionRule = domain.RetentionRule{EverySeconds: RETENTION_PERIOD_25_MONTHS_IN_SECONDS}
}
mainBucket := sr.appConfig.GetString("web.influxdb.bucket")
if foundMainBucket, foundErr := sr.influxClient.BucketsAPI().FindBucketByName(ctx, mainBucket); foundErr != nil {
// metrics bucket will have a retention period of 15 days (since it will be down-sampled once a week)
_, err := sr.influxClient.BucketsAPI().CreateBucketWithName(ctx, org, mainBucket, mainBucketRetentionRule)
if err != nil {
return err
}
} else if sr.appConfig.GetBool("web.influxdb.retention_policy") {
//correctly set the retention period for the main bucket (cant do it during setup/creation)
foundMainBucket.RetentionRules = domain.RetentionRules{mainBucketRetentionRule}
sr.influxClient.BucketsAPI().UpdateBucket(ctx, foundMainBucket)
}
//create buckets (used for downsampling)
weeklyBucket := fmt.Sprintf("%s_weekly", sr.appConfig.GetString("web.influxdb.bucket"))
if foundWeeklyBucket, foundErr := sr.influxClient.BucketsAPI().FindBucketByName(ctx, weeklyBucket); foundErr != nil {
// metrics_weekly bucket will have a retention period of 8+1 weeks (since it will be down-sampled once a month)
_, err := sr.influxClient.BucketsAPI().CreateBucketWithName(ctx, org, weeklyBucket, weeklyBucketRetentionRule)
if err != nil {
return err
}
} else if sr.appConfig.GetBool("web.influxdb.retention_policy") {
//correctly set the retention period for the bucket (may not be able to do it during setup/creation)
foundWeeklyBucket.RetentionRules = domain.RetentionRules{weeklyBucketRetentionRule}
sr.influxClient.BucketsAPI().UpdateBucket(ctx, foundWeeklyBucket)
}
monthlyBucket := fmt.Sprintf("%s_monthly", sr.appConfig.GetString("web.influxdb.bucket"))
if foundMonthlyBucket, foundErr := sr.influxClient.BucketsAPI().FindBucketByName(ctx, monthlyBucket); foundErr != nil {
// metrics_monthly bucket will have a retention period of 24+1 months (since it will be down-sampled once a year)
_, err := sr.influxClient.BucketsAPI().CreateBucketWithName(ctx, org, monthlyBucket, monthlyBucketRetentionRule)
if err != nil {
return err
}
} else if sr.appConfig.GetBool("web.influxdb.retention_policy") {
//correctly set the retention period for the bucket (may not be able to do it during setup/creation)
foundMonthlyBucket.RetentionRules = domain.RetentionRules{monthlyBucketRetentionRule}
sr.influxClient.BucketsAPI().UpdateBucket(ctx, foundMonthlyBucket)
}
yearlyBucket := fmt.Sprintf("%s_yearly", sr.appConfig.GetString("web.influxdb.bucket"))
if _, foundErr := sr.influxClient.BucketsAPI().FindBucketByName(ctx, yearlyBucket); foundErr != nil {
// metrics_yearly bucket will have an infinite retention period
_, err := sr.influxClient.BucketsAPI().CreateBucketWithName(ctx, org, yearlyBucket)
if err != nil {
return err
}
}
return nil
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// DeviceSummary
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// get a map of all devices and associated SMART data
func (sr *scrutinyRepository) GetSummary(ctx context.Context) (map[string]*models.DeviceSummary, error) {
devices, err := sr.GetDevices(ctx)
if err != nil {
return nil, err
}
summaries := map[string]*models.DeviceSummary{}
for _, device := range devices {
summaries[device.WWN] = &models.DeviceSummary{Device: device}
}
// Get parser flux query result
//appConfig.GetString("web.influxdb.bucket")
queryStr := fmt.Sprintf(`
import "influxdata/influxdb/schema"
bucketBaseName = "%s"
dailyData = from(bucket: bucketBaseName)
|> range(start: -10y, stop: now())
|> filter(fn: (r) => r["_measurement"] == "smart" )
|> filter(fn: (r) => r["_field"] == "temp" or r["_field"] == "power_on_hours" or r["_field"] == "date")
|> last()
|> schema.fieldsAsCols()
|> group(columns: ["device_wwn"])
weeklyData = from(bucket: bucketBaseName + "_weekly")
|> range(start: -10y, stop: now())
|> filter(fn: (r) => r["_measurement"] == "smart" )
|> filter(fn: (r) => r["_field"] == "temp" or r["_field"] == "power_on_hours" or r["_field"] == "date")
|> last()
|> schema.fieldsAsCols()
|> group(columns: ["device_wwn"])
monthlyData = from(bucket: bucketBaseName + "_monthly")
|> range(start: -10y, stop: now())
|> filter(fn: (r) => r["_measurement"] == "smart" )
|> filter(fn: (r) => r["_field"] == "temp" or r["_field"] == "power_on_hours" or r["_field"] == "date")
|> last()
|> schema.fieldsAsCols()
|> group(columns: ["device_wwn"])
yearlyData = from(bucket: bucketBaseName + "_yearly")
|> range(start: -10y, stop: now())
|> filter(fn: (r) => r["_measurement"] == "smart" )
|> filter(fn: (r) => r["_field"] == "temp" or r["_field"] == "power_on_hours" or r["_field"] == "date")
|> last()
|> schema.fieldsAsCols()
|> group(columns: ["device_wwn"])
union(tables: [dailyData, weeklyData, monthlyData, yearlyData])
|> sort(columns: ["_time"], desc: false)
|> group(columns: ["device_wwn"])
|> last(column: "device_wwn")
|> yield(name: "last")
`,
sr.appConfig.GetString("web.influxdb.bucket"),
)
result, err := sr.influxQueryApi.Query(ctx, queryStr)
if err == nil {
// Use Next() to iterate over query result lines
for result.Next() {
// Observe when there is new grouping key producing new table
if result.TableChanged() {
//fmt.Printf("table: %s\n", result.TableMetadata().String())
}
// read result
//get summary data from Influxdb.
//result.Record().Values()
if deviceWWN, ok := result.Record().Values()["device_wwn"]; ok {
//ensure summaries is intialized for this wwn
if _, exists := summaries[deviceWWN.(string)]; !exists {
summaries[deviceWWN.(string)] = &models.DeviceSummary{}
}
summaries[deviceWWN.(string)].SmartResults = &models.SmartSummary{
Temp: result.Record().Values()["temp"].(int64),
PowerOnHours: result.Record().Values()["power_on_hours"].(int64),
CollectorDate: result.Record().Values()["_time"].(time.Time),
}
}
}
if result.Err() != nil {
fmt.Printf("Query error: %s\n", result.Err().Error())
}
} else {
return nil, err
}
deviceTempHistory, err := sr.GetSmartTemperatureHistory(ctx, DURATION_KEY_FOREVER)
if err != nil {
sr.logger.Printf("========================>>>>>>>>======================")
sr.logger.Printf("========================>>>>>>>>======================")
sr.logger.Printf("========================>>>>>>>>======================")
sr.logger.Printf("========================>>>>>>>>======================")
sr.logger.Printf("========================>>>>>>>>======================")
sr.logger.Printf("Error: %v", err)
}
for wwn, tempHistory := range deviceTempHistory {
summaries[wwn].TempHistory = tempHistory
}
return summaries, nil
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Helper Methods
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (sr *scrutinyRepository) lookupBucketName(durationKey string) string {
switch durationKey {
case DURATION_KEY_WEEK:
//data stored in the last week
return sr.appConfig.GetString("web.influxdb.bucket")
case DURATION_KEY_MONTH:
// data stored in the last month (after the first week)
return fmt.Sprintf("%s_weekly", sr.appConfig.GetString("web.influxdb.bucket"))
case DURATION_KEY_YEAR:
// data stored in the last year (after the first month)
return fmt.Sprintf("%s_monthly", sr.appConfig.GetString("web.influxdb.bucket"))
case DURATION_KEY_FOREVER:
//data stored before the last year
return fmt.Sprintf("%s_yearly", sr.appConfig.GetString("web.influxdb.bucket"))
}
return sr.appConfig.GetString("web.influxdb.bucket")
}
func (sr *scrutinyRepository) lookupDuration(durationKey string) []string {
switch durationKey {
case DURATION_KEY_WEEK:
//data stored in the last week
return []string{"-1w", "now()"}
case DURATION_KEY_MONTH:
// data stored in the last month (after the first week)
return []string{"-1mo", "-1w"}
case DURATION_KEY_YEAR:
// data stored in the last year (after the first month)
return []string{"-1y", "-1mo"}
case DURATION_KEY_FOREVER:
//data stored before the last year
return []string{"-10y", "-1y"}
}
return []string{"-1w", "now()"}
}
func (sr *scrutinyRepository) lookupNestedDurationKeys(durationKey string) []string {
switch durationKey {
case DURATION_KEY_WEEK:
//all data is stored in a single bucket
return []string{DURATION_KEY_WEEK}
case DURATION_KEY_MONTH:
//data is stored in the week bucket and the month bucket
return []string{DURATION_KEY_WEEK, DURATION_KEY_MONTH}
case DURATION_KEY_YEAR:
// data stored in the last year (after the first month)
return []string{DURATION_KEY_WEEK, DURATION_KEY_MONTH, DURATION_KEY_YEAR}
case DURATION_KEY_FOREVER:
//data stored before the last year
return []string{DURATION_KEY_WEEK, DURATION_KEY_MONTH, DURATION_KEY_YEAR, DURATION_KEY_FOREVER}
}
return []string{DURATION_KEY_WEEK}
}
func sqlitePragmaString(pragmas map[string]string) string {
q := url.Values{}
for key, val := range pragmas {
q.Add("_pragma", key+"="+val)
}
queryStr := q.Encode()
if len(queryStr) > 0 {
return "?" + queryStr
}
return ""
}