You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
scrutiny/collector/pkg/collector/metrics.go

130 lines
3.5 KiB

package collector
import (
"bytes"
"fmt"
"github.com/analogj/scrutiny/collector/pkg/errors"
"github.com/analogj/scrutiny/collector/pkg/models"
"github.com/sirupsen/logrus"
"net/url"
"os/exec"
"strings"
"sync"
)
type MetricsCollector struct {
BaseCollector
apiEndpoint *url.URL
}
func CreateMetricsCollector(logger *logrus.Entry, apiEndpoint string) (MetricsCollector, error) {
apiEndpointUrl, err := url.Parse(apiEndpoint)
if err != nil {
return MetricsCollector{}, err
}
sc := MetricsCollector{
apiEndpoint: apiEndpointUrl,
BaseCollector: BaseCollector{
logger: logger,
},
}
return sc, nil
}
func (mc *MetricsCollector) Run() error {
err := mc.Validate()
if err != nil {
return err
}
apiEndpoint, _ := url.Parse(mc.apiEndpoint.String())
apiEndpoint.Path = "/api/devices/register"
deviceRespWrapper := new(models.DeviceWrapper)
detectedStorageDevices, err := mc.DetectStorageDevices()
if err != nil {
return err
}
mc.logger.Infoln("Sending detected devices to API, for filtering & validation")
mc.logger.Debugf("Detected devices: %v", detectedStorageDevices)
err = mc.postJson(apiEndpoint.String(), models.DeviceWrapper{
Data: detectedStorageDevices,
}, &deviceRespWrapper)
if err != nil {
return err
}
if !deviceRespWrapper.Success {
mc.logger.Errorln("An error occurred while retrieving filtered devices")
return errors.ApiServerCommunicationError("An error occurred while retrieving filtered devices")
} else {
mc.logger.Debugln(deviceRespWrapper)
var wg sync.WaitGroup
for _, device := range deviceRespWrapper.Data {
// execute collection in parallel go-routines
wg.Add(1)
go mc.Collect(&wg, device.WWN, device.DeviceName)
}
mc.logger.Infoln("Main: Waiting for workers to finish")
wg.Wait()
mc.logger.Infoln("Main: Completed")
}
return nil
}
func (mc *MetricsCollector) Validate() error {
mc.logger.Infoln("Verifying required tools")
_, lookErr := exec.LookPath("smartctl")
if lookErr != nil {
return errors.DependencyMissingError("smartctl is missing")
}
return nil
}
func (mc *MetricsCollector) Collect(wg *sync.WaitGroup, deviceWWN string, deviceName string) {
defer wg.Done()
mc.logger.Infof("Collecting smartctl results for %s\n", deviceName)
result, err := mc.ExecCmd("smartctl", []string{"-a", "-j", fmt.Sprintf("/dev/%s", deviceName)}, "", nil)
resultBytes := []byte(result)
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
// smartctl command exited with an error, we should still push the data to the API server
mc.logger.Errorf("smartctl returned an error code (%d) while processing %s\n", exitError.ExitCode(), deviceName)
mc.LogSmartctlExitCode(exitError.ExitCode())
mc.Publish(deviceWWN, resultBytes)
} else {
mc.logger.Errorf("error while attempting to execute smartctl: %s\n", deviceName)
mc.logger.Errorf("ERROR MESSAGE: %v", err)
mc.logger.Errorf("IGNORING RESULT: %v", result)
}
return
} else {
//successful run, pass the results directly to webapp backend for parsing and processing.
mc.Publish(deviceWWN, resultBytes)
}
}
func (mc *MetricsCollector) Publish(deviceWWN string, payload []byte) error {
mc.logger.Infof("Publishing smartctl results for %s\n", deviceWWN)
apiEndpoint, _ := url.Parse(mc.apiEndpoint.String())
apiEndpoint.Path = fmt.Sprintf("/api/device/%s/smart", strings.ToLower(deviceWWN))
resp, err := httpClient.Post(apiEndpoint.String(), "application/json", bytes.NewBuffer(payload))
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}