@ -11,30 +11,66 @@ import (
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func ( sr * scrutinyRepository ) EnsureTasks ( ctx context . Context , orgID string ) error {
weeklyTaskName := "tsk-weekly-aggr"
weeklyTaskScript := sr . DownsampleScript ( "weekly" )
if found , findErr := sr . influxTaskApi . FindTasks ( ctx , & api . TaskFilter { Name : weeklyTaskName } ) ; findErr == nil && len ( found ) == 0 {
//weekly on Sunday at 1:00am
_ , err := sr . influxTaskApi . CreateTaskWithCron ( ctx , weeklyTaskName , sr. DownsampleScript ( "weekly" ) , "0 1 * * 0" , orgID )
_ , err := sr . influxTaskApi . CreateTaskWithCron ( ctx , weeklyTaskName , weeklyTaskScript , "0 1 * * 0" , orgID )
if err != nil {
return err
}
} else if len ( found ) == 1 {
//check if we should update
task := & found [ 0 ]
if weeklyTaskScript != task . Flux {
sr . logger . Infoln ( "updating weekly task script" )
task . Flux = weeklyTaskScript
_ , err := sr . influxTaskApi . UpdateTask ( ctx , task )
if err != nil {
return err
}
}
}
monthlyTaskName := "tsk-monthly-aggr"
monthlyTaskScript := sr . DownsampleScript ( "monthly" )
if found , findErr := sr . influxTaskApi . FindTasks ( ctx , & api . TaskFilter { Name : monthlyTaskName } ) ; findErr == nil && len ( found ) == 0 {
//monthly on first day of the month at 1:30am
_ , err := sr . influxTaskApi . CreateTaskWithCron ( ctx , monthlyTaskName , sr . DownsampleScript ( "monthly" ) , "30 1 1 * *" , orgID )
_ , err := sr . influxTaskApi . CreateTaskWithCron ( ctx , monthlyTaskName , monthlyTaskScript , "30 1 1 * *" , orgID )
if err != nil {
return err
}
} else if len ( found ) == 1 {
//check if we should update
task := & found [ 0 ]
if monthlyTaskScript != task . Flux {
sr . logger . Infoln ( "updating monthly task script" )
task . Flux = monthlyTaskScript
_ , err := sr . influxTaskApi . UpdateTask ( ctx , task )
if err != nil {
return err
}
}
}
yearlyTaskName := "tsk-yearly-aggr"
yearlyTaskScript := sr . DownsampleScript ( "yearly" )
if found , findErr := sr . influxTaskApi . FindTasks ( ctx , & api . TaskFilter { Name : yearlyTaskName } ) ; findErr == nil && len ( found ) == 0 {
//yearly on the first day of the year at 2:00am
_ , err := sr . influxTaskApi . CreateTaskWithCron ( ctx , yearlyTaskName , sr . DownsampleScript ( "yearly" ) , "0 2 1 1 *" , orgID )
_ , err := sr . influxTaskApi . CreateTaskWithCron ( ctx , yearlyTaskName , yearlyTaskScript , "0 2 1 1 *" , orgID )
if err != nil {
return err
}
} else if len ( found ) == 1 {
//check if we should update
task := & found [ 0 ]
if yearlyTaskScript != task . Flux {
sr . logger . Infoln ( "updating yearly task script" )
task . Flux = yearlyTaskScript
_ , err := sr . influxTaskApi . UpdateTask ( ctx , task )
if err != nil {
return err
}
}
}
return nil
}
@ -102,14 +138,14 @@ func (sr *scrutinyRepository) DownsampleScript(aggregationType string) string {
| > aggregateWindow ( every : aggWindow , fn : last , createEmpty : false )
| > to ( bucket : destBucket , org : destOrg )
temp_data = from ( bucket : sourceBucket )
from ( bucket : sourceBucket )
| > range ( start : rangeStart , stop : rangeEnd )
| > filter ( fn : ( r ) = > r [ "_measurement" ] == "temp" )
| > group ( columns : [ "device_wwn" ] )
| > toInt ( )
temp_data
| > aggregateWindow ( fn : mean , every : aggWindow , createEmpty : false )
| > set ( key : "_measurement" , value : "temp" )
| > set ( key : "_field" , value : "temp" )
| > to ( bucket : destBucket , org : destOrg )
` ,
sourceBucket ,