diff --git a/pkg/plugins/cloudamqp/cloudamqpplugin/cloudamqpbilling.go b/pkg/plugins/cloudamqp/cloudamqpplugin/cloudamqpbilling.go new file mode 100644 index 0000000..2d75159 --- /dev/null +++ b/pkg/plugins/cloudamqp/cloudamqpplugin/cloudamqpbilling.go @@ -0,0 +1,51 @@ +package cloudamqpplugin + +// CloudAMQPInstance represents a CloudAMQP instance from the Customer API. +type CloudAMQPInstance struct { + ID int64 `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Plan string `json:"plan,omitempty"` + PlanID int64 `json:"plan_id,omitempty"` + Region string `json:"region,omitempty"` + Vhost string `json:"vhost,omitempty"` + Tags []string `json:"tags,omitempty"` + URL string `json:"url,omitempty"` + CreatedAt string `json:"created_at,omitempty"` + Nodes int64 `json:"nodes,omitempty"` + NodeType string `json:"node_type,omitempty"` + Ready bool `json:"ready,omitempty"` + Price CloudAMQPPrice `json:"price,omitempty"` + NoDefaultAlarms bool `json:"no_default_alarms,omitempty"` + Type string `json:"type,omitempty"` +} + +// CloudAMQPPrice represents the pricing info for a CloudAMQP instance. +type CloudAMQPPrice struct { + Amount float64 `json:"amount,omitempty"` + Currency string `json:"currency,omitempty"` + Interval string `json:"interval,omitempty"` +} + +// CloudAMQPInvoice represents an invoice from the CloudAMQP Customer API. +type CloudAMQPInvoice struct { + ID int64 `json:"id,omitempty"` + Amount float64 `json:"amount,omitempty"` + Currency string `json:"currency,omitempty"` + Description string `json:"description,omitempty"` + PeriodStart string `json:"period_start,omitempty"` + PeriodEnd string `json:"period_end,omitempty"` + Status string `json:"status,omitempty"` + Items []CloudAMQPInvoiceItem `json:"items,omitempty"` +} + +// CloudAMQPInvoiceItem represents a line item in a CloudAMQP invoice. +type CloudAMQPInvoiceItem struct { + Description string `json:"description,omitempty"` + Amount float64 `json:"amount,omitempty"` + Quantity float64 `json:"quantity,omitempty"` + PeriodStart string `json:"period_start,omitempty"` + PeriodEnd string `json:"period_end,omitempty"` + InstanceID int64 `json:"instance_id,omitempty"` + InstanceName string `json:"instance_name,omitempty"` + Plan string `json:"plan,omitempty"` +} diff --git a/pkg/plugins/cloudamqp/cmd/main/main.go b/pkg/plugins/cloudamqp/cmd/main/main.go new file mode 100644 index 0000000..f88a814 --- /dev/null +++ b/pkg/plugins/cloudamqp/cmd/main/main.go @@ -0,0 +1,290 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/google/uuid" + "github.com/hashicorp/go-plugin" + "golang.org/x/time/rate" + "google.golang.org/protobuf/types/known/timestamppb" + + commonconfig "github.com/opencost/opencost-plugins/pkg/common/config" + cloudamqpconfig "github.com/opencost/opencost-plugins/pkg/plugins/cloudamqp/config" + cloudamqpplugin "github.com/opencost/opencost-plugins/pkg/plugins/cloudamqp/cloudamqpplugin" + "github.com/opencost/opencost/core/pkg/log" + "github.com/opencost/opencost/core/pkg/model/pb" + "github.com/opencost/opencost/core/pkg/opencost" + ocplugin "github.com/opencost/opencost/core/pkg/plugin" +) + +var handshakeConfig = plugin.HandshakeConfig{ + ProtocolVersion: 1, + MagicCookieKey: "PLUGIN_NAME", + MagicCookieValue: "cloudamqp", +} + +const cloudamqpAPIBase = "https://customer.cloudamqp.com/api" +const cloudamqpInstancesURL = cloudamqpAPIBase + "/instances" +const cloudamqpInvoicesURL = cloudamqpAPIBase + "/invoices" + +type CloudAMQPCostSource struct { + rateLimiter *rate.Limiter + config *cloudamqpconfig.CloudAMQPConfig + client HTTPClient +} + +type HTTPClient interface { + Do(req *http.Request) (*http.Response, error) +} + +func (c *CloudAMQPCostSource) GetCustomCosts(req *pb.CustomCostRequest) []*pb.CustomCostResponse { + results := []*pb.CustomCostResponse{} + + targets, err := opencost.GetWindows(req.Start.AsTime(), req.End.AsTime(), req.Resolution.AsDuration()) + if err != nil { + log.Errorf("error getting windows: %v", err) + errResp := pb.CustomCostResponse{ + Errors: []string{fmt.Sprintf("error getting windows: %v", err)}, + } + results = append(results, &errResp) + return results + } + + for _, target := range targets { + if target.Start().After(time.Now().UTC()) { + log.Debugf("skipping future window %v", target) + continue + } + + log.Debugf("fetching CloudAMQP costs for window %v", target) + result := c.getCloudAMQPCostsForWindow(target) + results = append(results, result) + } + + return results +} + +func main() { + configFile, err := commonconfig.GetConfigFilePath() + if err != nil { + log.Fatalf("error opening config file: %v", err) + } + + cloudamqpCfg, err := cloudamqpconfig.GetCloudAMQPConfig(configFile) + if err != nil { + log.Fatalf("error building CloudAMQP config: %v", err) + } + log.SetLogLevel(cloudamqpCfg.LogLevel) + + rateLimiter := rate.NewLimiter(1, 2) + cloudamqpCostSrc := CloudAMQPCostSource{ + rateLimiter: rateLimiter, + config: cloudamqpCfg, + client: &http.Client{}, + } + + var pluginMap = map[string]plugin.Plugin{ + "CustomCostSource": &ocplugin.CustomCostPlugin{Impl: &cloudamqpCostSrc}, + } + + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: handshakeConfig, + Plugins: pluginMap, + GRPCServer: plugin.DefaultGRPCServer, + }) +} + +func boilerplateCloudAMQPCustomCost(win opencost.Window) pb.CustomCostResponse { + return pb.CustomCostResponse{ + Metadata: map[string]string{"api_client_version": "v1"}, + CostSource: "message_queue", + Domain: "cloudamqp", + Version: "v1", + Currency: "USD", + Start: timestamppb.New(*win.Start()), + End: timestamppb.New(*win.End()), + Errors: []string{}, + Costs: []*pb.CustomCost{}, + } +} + +func (c *CloudAMQPCostSource) getCloudAMQPCostsForWindow(window opencost.Window) *pb.CustomCostResponse { + ccResp := boilerplateCloudAMQPCustomCost(window) + + // Try to fetch invoices first + invoices, err := c.getInvoices() + if err != nil { + // If invoices API fails, fall back to instance-based pricing + log.Debugf("could not fetch invoices, falling back to instance pricing: %v", err) + instances, err := c.getInstances() + if err != nil { + ccResp.Errors = append(ccResp.Errors, fmt.Sprintf("error getting CloudAMQP data: %v", err)) + return &ccResp + } + ccResp.Costs = c.instancesToCustomCosts(invoices, instances) + return &ccResp + } + + // If we have invoices, use those + if len(invoices) > 0 { + ccResp.Costs = c.invoicesToCustomCosts(invoices) + return &ccResp + } + + // Fall back to instance-based pricing + instances, err := c.getInstances() + if err != nil { + ccResp.Errors = append(ccResp.Errors, fmt.Sprintf("error getting CloudAMQP instances: %v", err)) + return &ccResp + } + ccResp.Costs = c.instancesToCustomCosts(nil, instances) + return &ccResp +} + +func (c *CloudAMQPCostSource) instancesToCustomCosts(invoices []cloudamqpplugin.CloudAMQPInvoice, instances []cloudamqpplugin.CloudAMQPInstance) []*pb.CustomCost { + customCosts := []*pb.CustomCost{} + for _, instance := range instances { + customCost := pb.CustomCost{ + AccountName: instance.Name, + ChargeCategory: "Recurring", + Description: fmt.Sprintf("CloudAMQP %s (%s) - %d nodes", instance.Plan, instance.Region, instance.Nodes), + ResourceName: instance.Name, + ResourceType: instance.Plan, + Id: uuid.New().String(), + ProviderId: fmt.Sprintf("%d/%s", instance.ID, instance.Plan), + BilledCost: float32(instance.Price.Amount), + ListCost: float32(instance.Price.Amount), + UsageQuantity: 1, + UsageUnit: instance.Price.Interval, + Labels: map[string]string{}, + } + + if instance.Region != "" { + customCost.Labels["region"] = instance.Region + } + if instance.Nodes > 0 { + customCost.Labels["nodes"] = fmt.Sprintf("%d", instance.Nodes) + } + if instance.Price.Currency != "" { + customCost.Labels["currency"] = instance.Price.Currency + } + if instance.NodeType != "" { + customCost.Labels["node_type"] = instance.NodeType + } + + customCosts = append(customCosts, &customCost) + } + return customCosts +} + +func (c *CloudAMQPCostSource) invoicesToCustomCosts(invoices []cloudamqpplugin.CloudAMQPInvoice) []*pb.CustomCost { + customCosts := []*pb.CustomCost{} + for _, invoice := range invoices { + for _, item := range invoice.Items { + customCost := pb.CustomCost{ + AccountName: item.InstanceName, + ChargeCategory: "Usage", + Description: item.Description, + ResourceName: item.InstanceName, + ResourceType: item.Plan, + Id: uuid.New().String(), + ProviderId: fmt.Sprintf("%d/%s", item.InstanceID, item.Plan), + BilledCost: float32(item.Amount), + ListCost: float32(item.Amount), + UsageQuantity: float32(item.Quantity), + Labels: map[string]string{}, + } + + if item.PeriodStart != "" { + customCost.Labels["period_start"] = item.PeriodStart + } + if item.PeriodEnd != "" { + customCost.Labels["period_end"] = item.PeriodEnd + } + + customCosts = append(customCosts, &customCost) + } + } + return customCosts +} + +func (c *CloudAMQPCostSource) getInstances() ([]cloudamqpplugin.CloudAMQPInstance, error) { + req, err := http.NewRequest("GET", cloudamqpInstancesURL, nil) + if err != nil { + return nil, fmt.Errorf("error creating request: %v", err) + } + req.SetBasicAuth("", c.config.APIKey) + req.Header.Set("Accept", "application/json") + + err = c.rateLimiter.Wait(context.Background()) + if err != nil { + return nil, fmt.Errorf("rate limiter error: %v", err) + } + + resp, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("error making request to CloudAMQP API: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("CloudAMQP API returned status %d: %s", resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("error reading response body: %v", err) + } + + var instances []cloudamqpplugin.CloudAMQPInstance + err = json.Unmarshal(body, &instances) + if err != nil { + return nil, fmt.Errorf("error unmarshalling CloudAMQP instances response: %v", err) + } + + return instances, nil +} + +func (c *CloudAMQPCostSource) getInvoices() ([]cloudamqpplugin.CloudAMQPInvoice, error) { + req, err := http.NewRequest("GET", cloudamqpInvoicesURL, nil) + if err != nil { + return nil, fmt.Errorf("error creating request: %v", err) + } + req.SetBasicAuth("", c.config.APIKey) + req.Header.Set("Accept", "application/json") + + err = c.rateLimiter.Wait(context.Background()) + if err != nil { + return nil, fmt.Errorf("rate limiter error: %v", err) + } + + resp, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("error making request to CloudAMQP invoices API: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("CloudAMQP invoices API returned status %d: %s", resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("error reading response body: %v", err) + } + + var invoices []cloudamqpplugin.CloudAMQPInvoice + err = json.Unmarshal(body, &invoices) + if err != nil { + return nil, fmt.Errorf("error unmarshalling CloudAMQP invoices response: %v", err) + } + + return invoices, nil +} diff --git a/pkg/plugins/cloudamqp/cmd/main/main_test.go b/pkg/plugins/cloudamqp/cmd/main/main_test.go new file mode 100644 index 0000000..ef297b7 --- /dev/null +++ b/pkg/plugins/cloudamqp/cmd/main/main_test.go @@ -0,0 +1,118 @@ +package main + +import ( + "encoding/json" + "testing" + + cloudamqpplugin "github.com/opencost/opencost-plugins/pkg/plugins/cloudamqp/cloudamqpplugin" +) + +func TestCloudAMQPInstanceUnmarshal(t *testing.T) { + jsonData := `{ + "id": 12345, + "name": "my-rabbitmq", + "plan": "bunny-1", + "plan_id": 1, + "region": "amazon-web-services::us-east-1", + "nodes": 1, + "node_type": "t3.micro", + "ready": true, + "price": { + "amount": 29.0, + "currency": "USD", + "interval": "monthly" + } + }` + + var instance cloudamqpplugin.CloudAMQPInstance + err := json.Unmarshal([]byte(jsonData), &instance) + if err != nil { + t.Fatalf("Error unmarshalling instance: %v", err) + } + + if instance.Name != "my-rabbitmq" { + t.Errorf("Expected name 'my-rabbitmq', got '%s'", instance.Name) + } + if instance.Plan != "bunny-1" { + t.Errorf("Expected plan 'bunny-1', got '%s'", instance.Plan) + } + if instance.Price.Amount != 29.0 { + t.Errorf("Expected price amount 29.0, got %f", instance.Price.Amount) + } + if instance.Price.Currency != "USD" { + t.Errorf("Expected currency 'USD', got '%s'", instance.Price.Currency) + } + if instance.Nodes != 1 { + t.Errorf("Expected 1 node, got %d", instance.Nodes) + } +} + +func TestCloudAMQPInstancesListUnmarshal(t *testing.T) { + jsonData := `[ + { + "id": 12345, + "name": "my-rabbitmq", + "plan": "bunny-1", + "region": "amazon-web-services::us-east-1", + "price": {"amount": 29.0, "currency": "USD", "interval": "monthly"} + }, + { + "id": 67890, + "name": "my-rabbitmq-2", + "plan": "panda-3", + "region": "amazon-web-services::eu-west-1", + "price": {"amount": 199.0, "currency": "USD", "interval": "monthly"} + } + ]` + + var instances []cloudamqpplugin.CloudAMQPInstance + err := json.Unmarshal([]byte(jsonData), &instances) + if err != nil { + t.Fatalf("Error unmarshalling instances: %v", err) + } + + if len(instances) != 2 { + t.Errorf("Expected 2 instances, got %d", len(instances)) + } + if instances[1].Plan != "panda-3" { + t.Errorf("Expected second plan 'panda-3', got '%s'", instances[1].Plan) + } +} + +func TestCloudAMQPInvoiceUnmarshal(t *testing.T) { + jsonData := `{ + "id": 1001, + "amount": 228.0, + "currency": "USD", + "description": "Monthly subscription", + "period_start": "2024-01-01", + "period_end": "2024-01-31", + "status": "paid", + "items": [ + { + "description": "bunny-1 plan", + "amount": 29.0, + "quantity": 1, + "instance_id": 12345, + "instance_name": "my-rabbitmq", + "plan": "bunny-1" + } + ] + }` + + var invoice cloudamqpplugin.CloudAMQPInvoice + err := json.Unmarshal([]byte(jsonData), &invoice) + if err != nil { + t.Fatalf("Error unmarshalling invoice: %v", err) + } + + if invoice.Amount != 228.0 { + t.Errorf("Expected amount 228.0, got %f", invoice.Amount) + } + if len(invoice.Items) != 1 { + t.Errorf("Expected 1 item, got %d", len(invoice.Items)) + } + if invoice.Items[0].Plan != "bunny-1" { + t.Errorf("Expected plan 'bunny-1', got '%s'", invoice.Items[0].Plan) + } +} diff --git a/pkg/plugins/cloudamqp/cmd/validator/main/main.go b/pkg/plugins/cloudamqp/cmd/validator/main/main.go new file mode 100644 index 0000000..a43a2a7 --- /dev/null +++ b/pkg/plugins/cloudamqp/cmd/validator/main/main.go @@ -0,0 +1,36 @@ +package main + +import ( + "fmt" + "os" + + commonconfig "github.com/opencost/opencost-plugins/pkg/common/config" + cloudamqpconfig "github.com/opencost/opencost-plugins/pkg/plugins/cloudamqp/config" + "github.com/opencost/opencost/core/pkg/log" +) + +func main() { + configFile, err := commonconfig.GetConfigFilePath() + if err != nil { + log.Fatalf("error opening config file: %v", err) + } + + cloudamqpCfg, err := cloudamqpconfig.GetCloudAMQPConfig(configFile) + if err != nil { + log.Fatalf("error building CloudAMQP config: %v", err) + } + + if cloudamqpCfg.APIKey == "" { + log.Fatalf("cloudamqp_api_key is required in config file") + } + + fmt.Printf("CloudAMQP config validated successfully\n") + fmt.Printf("Log level: %s\n", cloudamqpCfg.LogLevel) + fmt.Printf("\nSample config:\n") + fmt.Printf(`{ + "cloudamqp_api_key": "YOUR_API_KEY", + "cloudamqp_plugin_log_level": "info" +} +`) + os.Exit(0) +} diff --git a/pkg/plugins/cloudamqp/config/cloudamqpconfig.go b/pkg/plugins/cloudamqp/config/cloudamqpconfig.go new file mode 100644 index 0000000..aa3ffcf --- /dev/null +++ b/pkg/plugins/cloudamqp/config/cloudamqpconfig.go @@ -0,0 +1,30 @@ +package config + +import ( + "encoding/json" + "fmt" + "os" +) + +type CloudAMQPConfig struct { + APIKey string `json:"cloudamqp_api_key"` + LogLevel string `json:"cloudamqp_plugin_log_level"` +} + +func GetCloudAMQPConfig(configFilePath string) (*CloudAMQPConfig, error) { + var result CloudAMQPConfig + bytes, err := os.ReadFile(configFilePath) + if err != nil { + return nil, fmt.Errorf("error reading config file for CloudAMQP config @ %s: %v", configFilePath, err) + } + err = json.Unmarshal(bytes, &result) + if err != nil { + return nil, fmt.Errorf("error marshaling json into CloudAMQP config %v", err) + } + + if result.LogLevel == "" { + result.LogLevel = "info" + } + + return &result, nil +} diff --git a/pkg/plugins/cloudamqp/go.mod b/pkg/plugins/cloudamqp/go.mod new file mode 100644 index 0000000..4bafd8c --- /dev/null +++ b/pkg/plugins/cloudamqp/go.mod @@ -0,0 +1,12 @@ +module github.com/opencost/opencost-plugins/pkg/plugins/cloudamqp + +go 1.21 + +require ( + github.com/google/uuid v1.6.0 + github.com/hashicorp/go-plugin v1.6.1 + github.com/opencost/opencost-plugins/pkg/common v0.0.0 + github.com/opencost/opencost/core v0.0.0 + golang.org/x/time v0.5.0 + google.golang.org/protobuf v1.34.0 +)