Skip to content

Commit ddc3a23

Browse files
committed
Remove code copied from serverless
1 parent b2e2f5b commit ddc3a23

File tree

3 files changed

+183
-139
lines changed

3 files changed

+183
-139
lines changed

internal/serverless/project.go

Lines changed: 4 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,13 @@ package serverless
66

77
import (
88
"context"
9-
"encoding/json"
109
"fmt"
11-
"io"
12-
"net/http"
13-
"net/url"
14-
"os"
15-
"path/filepath"
16-
"strings"
1710
"time"
1811

1912
"github.com/elastic/elastic-package/internal/elasticsearch"
13+
"github.com/elastic/elastic-package/internal/fleetserver"
2014
"github.com/elastic/elastic-package/internal/kibana"
2115
"github.com/elastic/elastic-package/internal/logger"
22-
"github.com/elastic/elastic-package/internal/profile"
23-
"github.com/elastic/elastic-package/internal/registry"
2416
)
2517

2618
const (
@@ -138,54 +130,6 @@ func (p *Project) DefaultFleetServerURL(ctx context.Context, kibanaClient *kiban
138130
return fleetURL, nil
139131
}
140132

141-
func (p *Project) AddLogstashFleetOutput(ctx context.Context, profile *profile.Profile, kibanaClient *kibana.Client) error {
142-
logstashFleetOutput := kibana.FleetOutput{
143-
Name: "logstash-output",
144-
ID: FleetLogstashOutput,
145-
Type: "logstash",
146-
Hosts: []string{"logstash:5044"},
147-
}
148-
149-
if err := kibanaClient.AddFleetOutput(ctx, logstashFleetOutput); err != nil {
150-
return fmt.Errorf("failed to add logstash fleet output: %w", err)
151-
}
152-
153-
return nil
154-
}
155-
156-
func (p *Project) UpdateLogstashFleetOutput(ctx context.Context, profile *profile.Profile, kibanaClient *kibana.Client) error {
157-
certsDir := filepath.Join(profile.ProfilePath, "certs", "elastic-agent")
158-
159-
caFile, err := os.ReadFile(filepath.Join(certsDir, "ca-cert.pem"))
160-
if err != nil {
161-
return fmt.Errorf("failed to read ca certificate: %w", err)
162-
}
163-
164-
certFile, err := os.ReadFile(filepath.Join(certsDir, "cert.pem"))
165-
if err != nil {
166-
return fmt.Errorf("failed to read client certificate: %w", err)
167-
}
168-
169-
keyFile, err := os.ReadFile(filepath.Join(certsDir, "key.pem"))
170-
if err != nil {
171-
return fmt.Errorf("failed to read client certificate private key: %w", err)
172-
}
173-
174-
logstashFleetOutput := kibana.FleetOutput{
175-
SSL: &kibana.AgentSSL{
176-
CertificateAuthorities: []string{string(caFile)},
177-
Certificate: string(certFile),
178-
Key: string(keyFile),
179-
},
180-
}
181-
182-
if err := kibanaClient.UpdateFleetOutput(ctx, logstashFleetOutput, FleetLogstashOutput); err != nil {
183-
return fmt.Errorf("failed to update logstash fleet output: %w", err)
184-
}
185-
186-
return nil
187-
}
188-
189133
func (p *Project) getESHealth(ctx context.Context, elasticsearchClient *elasticsearch.Client) error {
190134
return elasticsearchClient.CheckHealth(ctx)
191135
}
@@ -195,94 +139,18 @@ func (p *Project) getKibanaHealth(ctx context.Context, kibanaClient *kibana.Clie
195139
}
196140

197141
func (p *Project) getFleetHealth(ctx context.Context) error {
198-
statusURL, err := url.JoinPath(p.Endpoints.Fleet, "/api/status")
142+
client, err := fleetserver.NewClient(p.Endpoints.Fleet)
199143
if err != nil {
200-
return fmt.Errorf("could not build URL: %w", err)
144+
return fmt.Errorf("could not create Fleet Server client: %w", err)
201145
}
202-
logger.Debugf("GET %s", statusURL)
203-
req, err := http.NewRequestWithContext(ctx, "GET", statusURL, nil)
146+
status, err := client.Status(ctx)
204147
if err != nil {
205148
return err
206149
}
207-
resp, err := http.DefaultClient.Do(req)
208-
if err != nil {
209-
return fmt.Errorf("request failed (url: %s): %w", statusURL, err)
210-
}
211-
defer resp.Body.Close()
212-
if resp.StatusCode >= 300 {
213-
return fmt.Errorf("unexpected status code %v", resp.StatusCode)
214-
}
215-
body, err := io.ReadAll(resp.Body)
216-
if err != nil {
217-
return fmt.Errorf("failed to read response body: %w", err)
218-
}
219-
var status struct {
220-
Name string `json:"name"`
221-
Status string `json:"status"`
222-
}
223-
err = json.Unmarshal(body, &status)
224-
if err != nil {
225-
return fmt.Errorf("failed to parse response body: %w", err)
226-
}
227150

228151
if status.Status != "HEALTHY" {
229152
return fmt.Errorf("fleet status %s", status.Status)
230153

231154
}
232155
return nil
233156
}
234-
235-
func (p *Project) CreateAgentPolicy(ctx context.Context, kibanaClient *kibana.Client, stackVersion string, outputId string, selfMonitor bool) error {
236-
policy := kibana.Policy{
237-
ID: "elastic-agent-managed-ep",
238-
Name: "Elastic-Agent (elastic-package)",
239-
Description: "Policy created by elastic-package",
240-
Namespace: "default",
241-
MonitoringEnabled: []string{},
242-
DataOutputID: outputId,
243-
}
244-
if selfMonitor {
245-
policy.MonitoringEnabled = []string{"logs", "metrics"}
246-
}
247-
248-
newPolicy, err := kibanaClient.CreatePolicy(ctx, policy)
249-
if err != nil {
250-
return fmt.Errorf("error while creating agent policy: %w", err)
251-
}
252-
253-
if selfMonitor {
254-
err := p.createSystemPackagePolicy(ctx, kibanaClient, stackVersion, newPolicy.ID, newPolicy.Namespace)
255-
if err != nil {
256-
return err
257-
}
258-
}
259-
260-
return nil
261-
}
262-
263-
func (p *Project) createSystemPackagePolicy(ctx context.Context, kibanaClient *kibana.Client, stackVersion, agentPolicyID, namespace string) error {
264-
systemPackages, err := registry.Production.Revisions("system", registry.SearchOptions{
265-
KibanaVersion: strings.TrimSuffix(stackVersion, kibana.SNAPSHOT_SUFFIX),
266-
})
267-
if err != nil {
268-
return fmt.Errorf("could not get the system package version for Kibana %v: %w", stackVersion, err)
269-
}
270-
if len(systemPackages) != 1 {
271-
return fmt.Errorf("unexpected number of system package versions for Kibana %s - found %d expected 1", stackVersion, len(systemPackages))
272-
}
273-
logger.Debugf("Found %s package - version %s", systemPackages[0].Name, systemPackages[0].Version)
274-
packagePolicy := kibana.PackagePolicy{
275-
Name: "system-1",
276-
PolicyID: agentPolicyID,
277-
Namespace: namespace,
278-
}
279-
packagePolicy.Package.Name = "system"
280-
packagePolicy.Package.Version = systemPackages[0].Version
281-
282-
_, err = kibanaClient.CreatePackagePolicy(ctx, packagePolicy)
283-
if err != nil {
284-
return fmt.Errorf("error while creating package policy: %w", err)
285-
}
286-
287-
return nil
288-
}

internal/stack/agentpolicy.go

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package stack
6+
7+
import (
8+
"context"
9+
"errors"
10+
"fmt"
11+
"os"
12+
"path/filepath"
13+
"strings"
14+
15+
"github.com/elastic/elastic-package/internal/kibana"
16+
"github.com/elastic/elastic-package/internal/logger"
17+
"github.com/elastic/elastic-package/internal/profile"
18+
"github.com/elastic/elastic-package/internal/registry"
19+
)
20+
21+
const (
22+
managedAgentPolicyID = "elastic-agent-managed-ep"
23+
fleetLogstashOutput = "fleet-logstash-output"
24+
fleetElasticsearchOutput = "fleet-elasticsearch-output"
25+
)
26+
27+
// createAgentPolicy creates an agent policy with the initial configuration used for
28+
// agents managed by elastic-package.
29+
func createAgentPolicy(ctx context.Context, kibanaClient *kibana.Client, stackVersion string, outputId string, selfMonitor bool) (*kibana.Policy, error) {
30+
policy := kibana.Policy{
31+
ID: managedAgentPolicyID,
32+
Name: "Elastic-Agent (elastic-package)",
33+
Description: "Policy created by elastic-package",
34+
Namespace: "default",
35+
MonitoringEnabled: []string{},
36+
DataOutputID: outputId,
37+
}
38+
if selfMonitor {
39+
policy.MonitoringEnabled = []string{"logs", "metrics"}
40+
}
41+
42+
newPolicy, err := kibanaClient.CreatePolicy(ctx, policy)
43+
if errors.Is(err, kibana.ErrConflict) {
44+
newPolicy, err = kibanaClient.GetPolicy(ctx, policy.ID)
45+
if err != nil {
46+
return nil, fmt.Errorf("error while getting existing policy: %w", err)
47+
}
48+
return newPolicy, nil
49+
}
50+
if err != nil {
51+
return nil, fmt.Errorf("error while creating agent policy: %w", err)
52+
}
53+
54+
if selfMonitor {
55+
err := createSystemPackagePolicy(ctx, kibanaClient, stackVersion, newPolicy.ID, newPolicy.Namespace)
56+
if err != nil {
57+
return nil, err
58+
}
59+
}
60+
61+
return newPolicy, nil
62+
}
63+
64+
func createSystemPackagePolicy(ctx context.Context, kibanaClient *kibana.Client, stackVersion, agentPolicyID, namespace string) error {
65+
systemPackages, err := registry.Production.Revisions("system", registry.SearchOptions{
66+
KibanaVersion: strings.TrimSuffix(stackVersion, kibana.SNAPSHOT_SUFFIX),
67+
})
68+
if err != nil {
69+
return fmt.Errorf("could not get the system package version for Kibana %v: %w", stackVersion, err)
70+
}
71+
if len(systemPackages) != 1 {
72+
return fmt.Errorf("unexpected number of system package versions for Kibana %s - found %d expected 1", stackVersion, len(systemPackages))
73+
}
74+
logger.Debugf("Found %s package - version %s", systemPackages[0].Name, systemPackages[0].Version)
75+
packagePolicy := kibana.PackagePolicy{
76+
Name: "system-1",
77+
PolicyID: agentPolicyID,
78+
Namespace: namespace,
79+
}
80+
packagePolicy.Package.Name = "system"
81+
packagePolicy.Package.Version = systemPackages[0].Version
82+
83+
_, err = kibanaClient.CreatePackagePolicy(ctx, packagePolicy)
84+
if err != nil {
85+
return fmt.Errorf("error while creating package policy: %w", err)
86+
}
87+
88+
return nil
89+
}
90+
91+
func deleteAgentPolicy(ctx context.Context, kibanaClient *kibana.Client) error {
92+
err := kibanaClient.DeletePolicy(ctx, managedAgentPolicyID)
93+
var notFoundError *kibana.ErrPolicyNotFound
94+
if err != nil && !errors.As(err, &notFoundError) {
95+
return fmt.Errorf("failed to delete policy: %w", err)
96+
}
97+
98+
return nil
99+
}
100+
101+
func forceUnenrollAgentsWithPolicy(ctx context.Context, kibanaClient *kibana.Client) error {
102+
agents, err := kibanaClient.QueryAgents(ctx, fmt.Sprintf("policy_id: %s", managedAgentPolicyID))
103+
if err != nil {
104+
return fmt.Errorf("error while querying agents with policy %s: %w", managedAgentPolicyID, err)
105+
}
106+
107+
for _, agent := range agents {
108+
err := kibanaClient.RemoveAgent(ctx, agent)
109+
if err != nil {
110+
return fmt.Errorf("failed to remove agent %s: %w", agent.ID, err)
111+
}
112+
}
113+
114+
return nil
115+
}
116+
117+
func addFleetOutput(ctx context.Context, client *kibana.Client, outputType, host, id string) error {
118+
output := kibana.FleetOutput{
119+
Name: id,
120+
ID: id,
121+
Type: outputType,
122+
Hosts: []string{host},
123+
}
124+
125+
err := client.AddFleetOutput(ctx, output)
126+
if errors.Is(err, kibana.ErrConflict) {
127+
// Output already exists.
128+
return nil
129+
}
130+
if err != nil {
131+
return fmt.Errorf("failed to add %s fleet output of type %s: %w", id, outputType, err)
132+
}
133+
134+
return nil
135+
}
136+
137+
func addLogstashFleetOutput(ctx context.Context, client *kibana.Client) error {
138+
return addFleetOutput(ctx, client, "logstash", "logstash:5044", fleetLogstashOutput)
139+
}
140+
141+
func addElasticsearchFleetOutput(ctx context.Context, client *kibana.Client, host string) error {
142+
return addFleetOutput(ctx, client, "elasticsearch", host, fleetElasticsearchOutput)
143+
}
144+
145+
func updateLogstashFleetOutput(ctx context.Context, profile *profile.Profile, kibanaClient *kibana.Client) error {
146+
certsDir := filepath.Join(profile.ProfilePath, "certs", "elastic-agent")
147+
148+
caFile, err := os.ReadFile(filepath.Join(certsDir, "ca-cert.pem"))
149+
if err != nil {
150+
return fmt.Errorf("failed to read ca certificate: %w", err)
151+
}
152+
153+
certFile, err := os.ReadFile(filepath.Join(certsDir, "cert.pem"))
154+
if err != nil {
155+
return fmt.Errorf("failed to read client certificate: %w", err)
156+
}
157+
158+
keyFile, err := os.ReadFile(filepath.Join(certsDir, "key.pem"))
159+
if err != nil {
160+
return fmt.Errorf("failed to read client certificate private key: %w", err)
161+
}
162+
163+
logstashFleetOutput := kibana.FleetOutput{
164+
SSL: &kibana.AgentSSL{
165+
CertificateAuthorities: []string{string(caFile)},
166+
Certificate: string(certFile),
167+
Key: string(keyFile),
168+
},
169+
}
170+
171+
if err := kibanaClient.UpdateFleetOutput(ctx, logstashFleetOutput, fleetLogstashOutput); err != nil {
172+
return fmt.Errorf("failed to update logstash fleet output: %w", err)
173+
}
174+
175+
return nil
176+
}

internal/stack/serverless.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func (sp *serverlessProvider) createProject(ctx context.Context, settings projec
123123
}
124124

125125
if settings.LogstashEnabled {
126-
err = project.AddLogstashFleetOutput(ctx, sp.profile, sp.kibanaClient)
126+
err = addLogstashFleetOutput(ctx, sp.kibanaClient)
127127
if err != nil {
128128
return Config{}, err
129129
}
@@ -280,7 +280,7 @@ func (sp *serverlessProvider) BootUp(ctx context.Context, options Options) error
280280
}
281281

282282
logger.Infof("Creating agent policy")
283-
err = project.CreateAgentPolicy(ctx, sp.kibanaClient, options.StackVersion, outputID, settings.SelfMonitor)
283+
_, err = createAgentPolicy(ctx, sp.kibanaClient, options.StackVersion, outputID, settings.SelfMonitor)
284284

285285
if err != nil {
286286
return fmt.Errorf("failed to create agent policy: %w", err)
@@ -303,7 +303,7 @@ func (sp *serverlessProvider) BootUp(ctx context.Context, options Options) error
303303
// Updating the output with ssl certificates created in startLocalServices
304304
// The certificates are updated only when a new project is created and logstash is enabled
305305
if isNewProject && settings.LogstashEnabled {
306-
err = project.UpdateLogstashFleetOutput(ctx, sp.profile, sp.kibanaClient)
306+
err = updateLogstashFleetOutput(ctx, sp.profile, sp.kibanaClient)
307307
if err != nil {
308308
return err
309309
}

0 commit comments

Comments
 (0)