Skip to content

Commit 8680cbc

Browse files
李红李红
authored andcommitted
server ip config
1 parent e588dbf commit 8680cbc

File tree

6 files changed

+83
-39
lines changed

6 files changed

+83
-39
lines changed

mgr/dataflow.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -378,14 +378,14 @@ func checkSampleData(sampleData []string, logParser parser.Parser) ([]string, er
378378
}
379379

380380
func getTransformerCreator(transformerConfig map[string]interface{}) (transforms.Creator, error) {
381-
transformKeyType, ok := transformerConfig[transforms.KeyType]
381+
transformKeyType, ok := transformerConfig[KeyType]
382382
if !ok {
383-
err := fmt.Errorf("missing param %s", transforms.KeyType)
383+
err := fmt.Errorf("missing param %s", KeyType)
384384
return nil, err
385385
}
386386
transformKeyTypeStr, ok := transformKeyType.(string)
387387
if !ok {
388-
err := fmt.Errorf("param %s must be of type string", transforms.KeyType)
388+
err := fmt.Errorf("param %s must be of type string", KeyType)
389389
return nil, err
390390
}
391391

mgr/runner.go

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -291,15 +291,13 @@ func NewLogExportRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, r
291291
senderConfig[sender.KeyPandoraDescription] = LogkitAutoCreateDescription
292292
}
293293
}
294+
if senderConfig[sender.KeySenderType] == sender.TypePandora {
295+
senderConfig = setSenderConfig(senderConfig, serverConfigs)
296+
}
294297
s, err := sr.NewSender(senderConfig, meta.FtSaveLogPath())
295298
if err != nil {
296299
return nil, err
297300
}
298-
if senderConfig[sender.KeySenderType] == sender.TypePandora {
299-
if serverSender, ok := s.(sender.ServerSender); ok {
300-
serverSender.SetServer(serverConfigs)
301-
}
302-
}
303301
senders = append(senders, s)
304302
delete(rc.SendersConfig[i], sender.InnerUserAgent)
305303
}
@@ -1297,3 +1295,39 @@ func MergeExtraInfoTags(meta *reader.Meta, tags map[string]interface{}) map[stri
12971295
}
12981296
return tags
12991297
}
1298+
1299+
func setSenderConfig(senderConfig conf.MapConf, serverConfigs []map[string]interface{}) conf.MapConf {
1300+
for _, serverConfig := range serverConfigs {
1301+
keyType, ok := serverConfig[KeyType].(string)
1302+
if !ok || keyType != KeyIP {
1303+
continue
1304+
}
1305+
localEnable, ok := serverConfig[LocalEnable].(bool)
1306+
if !ok {
1307+
continue
1308+
}
1309+
1310+
autoCreate := senderConfig[sender.KeyPandoraAutoCreate]
1311+
if localEnable {
1312+
schema := fmt.Sprintf(",%v ip", KeyIP)
1313+
if autoCreate == fmt.Sprintf("%v ip", KeyIP) {
1314+
autoCreate = ""
1315+
} else if index := strings.Index(autoCreate, schema); index != -1 {
1316+
autoCreate = autoCreate[:index] + autoCreate[index+len(schema):]
1317+
}
1318+
senderConfig[sender.KeyPandoraAutoCreate] = autoCreate
1319+
continue
1320+
}
1321+
1322+
if autoCreate == "" {
1323+
senderConfig[sender.KeyPandoraAutoCreate] = fmt.Sprintf("%v ip", KeyIP)
1324+
continue
1325+
}
1326+
1327+
if !strings.Contains(autoCreate, KeyIP) {
1328+
senderConfig[sender.KeyPandoraAutoCreate] += fmt.Sprintf(",%v ip", KeyIP)
1329+
}
1330+
}
1331+
1332+
return senderConfig
1333+
}

sender/pandora/pandora.go

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ import (
3131

3232
var osInfo = []string{KeyCore, KeyHostName, KeyOsInfo, KeyLocalIp}
3333

34-
var _ sender.ServerSender = &Sender{}
35-
3634
const (
3735
SendTypeRaw = "raw"
3836
SendTypeNormal = "normal"
@@ -499,6 +497,7 @@ func newPandoraSender(opt *PandoraOption) (s *Sender, err error) {
499497
log.Errorf("Runner[%v] Sender[%v]: auto create pandora repo error: %v, you can create on pandora portal, ignored...", opt.runnerName, opt.name, err)
500498
err = nil
501499
}
500+
log.Infof("`````````````````````````````dsl: %v, schemas: %v, opt.autoCreate: %v", dsl, schemas, opt.autoCreate)
502501
if initErr := s.client.InitOrUpdateWorkflow(&pipeline.InitOrUpdateWorkflowInput{
503502
// 此处要的 schema 为 autoCreate 中用户指定的,所以 SchemaFree 要恒为 true
504503
InitOptionChange: true,
@@ -1129,14 +1128,3 @@ func (s *Sender) Name() string {
11291128
func (s *Sender) Close() error {
11301129
return s.client.Close()
11311130
}
1132-
1133-
func (s *Sender) SetServer(serverConfigs []map[string]interface{}) error {
1134-
for _, serverConfig := range serverConfigs {
1135-
if transformType, ok := serverConfig[KeyType].(string); ok && transformType == KeyIP {
1136-
if localEnable, ok := serverConfig[LocalEnable].(bool); ok && !localEnable {
1137-
s.opt.autoCreate += fmt.Sprintf(",%v ip", KeyIP)
1138-
}
1139-
}
1140-
}
1141-
return nil
1142-
}

sender/sender.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,10 +220,6 @@ type StatsSender interface {
220220
Restore(*StatsInfo)
221221
}
222222

223-
type ServerSender interface {
224-
SetServer([]map[string]interface{}) error
225-
}
226-
227223
// SenderRegistry sender 的工厂类。可以注册自定义sender
228224
type Registry struct {
229225
senderTypeMap map[string]func(conf.MapConf) (Sender, error)

transforms/ip/ip.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ type Transformer struct {
5454
}
5555

5656
func (t *Transformer) Init() error {
57+
if t.Key != "" {
58+
t.LocalEnable = true
59+
}
60+
61+
if !t.LocalEnable {
62+
return nil
63+
}
5764
if t.Language == "" {
5865
t.Language = "zh-CN"
5966
}
@@ -232,10 +239,21 @@ func (_ *Transformer) ConfigOptions() []Option {
232239
Default: false,
233240
Required: true,
234241
DefaultNoUse: false,
235-
Description: "是否使用本地解析,默认为服务端解析",
242+
Description: "使用本地解析",
236243
Type: transforms.TransformTypeBoolean,
237244
},
238-
transforms.KeyFieldName,
245+
{
246+
KeyName: "key",
247+
ChooseOnly: false,
248+
Default: "",
249+
Required: true,
250+
Placeholder: "my_field_keyname",
251+
DefaultNoUse: true,
252+
Description: "要进行Transform变化的键(key)",
253+
ToolTip: "对该字段的值进行transform变换",
254+
Type: transforms.TransformTypeString,
255+
AdvanceDepend: LocalEnable,
256+
},
239257
{
240258
KeyName: "data_path",
241259
ChooseOnly: false,

transforms/ip/ip_test.go

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func TestTransformer(t *testing.T) {
6666
Key: "multi.ip",
6767
DataPath: "./test_data/17monipdb.dat",
6868
}
69-
assert.Nil(t, ipt.Init())
69+
assert.Nil(t, ipt2.Init())
7070
data2, err2 := ipt2.Transform([]Data{{"multi": map[string]interface{}{"ip": "111.2.3.4"}}, {"multi": map[string]interface{}{"ip": "x.x.x.x"}}})
7171
assert.Error(t, err2)
7272
exp2 := []Data{{
@@ -95,6 +95,7 @@ func TestTransformer(t *testing.T) {
9595
Key: "multi.ip",
9696
DataPath: "./test_data/17monipdb.datx",
9797
}
98+
assert.Nil(t, ipt3.Init())
9899
data3, err3 := ipt3.Transform([]Data{{"multi": map[string]interface{}{"ip": "111.2.3.4"}}, {"multi": map[string]interface{}{"ip": "x.x.x.x"}}})
99100
assert.Error(t, err3)
100101
exp3 := []Data{{
@@ -139,6 +140,7 @@ func TestTransformer(t *testing.T) {
139140
},
140141
},
141142
}
143+
assert.Nil(t, ipt4.Init())
142144
data4, err4 := ipt4.Transform(multi_ip)
143145
exp4 := []Data{
144146
{
@@ -179,6 +181,7 @@ func TestTransformer(t *testing.T) {
179181
DataPath: "./test_data/17monipdb.mmdb",
180182
Language: "en",
181183
}
184+
assert.Nil(t, ipt5.Init())
182185
data5, err5 := ipt5.Transform([]Data{{"multi": map[string]interface{}{"ip": "216.160.83.56"}}, {"multi": map[string]interface{}{"ip": "x.x.x.x"}}})
183186
assert.Error(t, err5)
184187
exp5 := []Data{{
@@ -229,6 +232,7 @@ func TestTransformer(t *testing.T) {
229232
},
230233
},
231234
}
235+
assert.Nil(t, ipt6.Init())
232236
data6, err6 := ipt6.Transform(multi_ip2)
233237
exp6 := []Data{
234238
{
@@ -276,12 +280,12 @@ var dttest []Data
276280
//new: 2000000 621 ns/op 232 B/op 7 allocs/op
277281
func BenchmarkIpTrans(b *testing.B) {
278282
b.ReportAllocs()
279-
ipt4 := &Transformer{
283+
ipt := &Transformer{
280284
Key: "multi.ip2",
281285
DataPath: "./test_data/17monipdb.dat",
282286
KeyAsPrefix: true,
283287
}
284-
ipt4.Init()
288+
ipt.Init()
285289
data := []Data{
286290
{
287291
"multi": map[string]interface{}{
@@ -295,14 +299,15 @@ func BenchmarkIpTrans(b *testing.B) {
295299
},
296300
}
297301
for i := 0; i < b.N; i++ {
298-
dttest, _ = ipt4.Transform(data)
302+
dttest, _ = ipt.Transform(data)
299303
}
300304
}
301305

302306
func Test_badData(t *testing.T) {
303307
ipt := &Transformer{
304-
Key: "ip",
305-
DataPath: "./test_data/bad.dat",
308+
Key: "ip",
309+
DataPath: "./test_data/bad.dat",
310+
LocalEnable: true,
306311
}
307312
_, err := ipt.Transform([]Data{{"ip": "111.2.3.4"}, {"ip": "x.x.x.x"}})
308313
assert.Error(t, err)
@@ -311,8 +316,9 @@ func Test_badData(t *testing.T) {
311316
assert.Equal(t, "dat", ierr.Format)
312317

313318
ipt = &Transformer{
314-
Key: "ip",
315-
DataPath: "./test_data/bad.datx",
319+
Key: "ip",
320+
DataPath: "./test_data/bad.datx",
321+
LocalEnable: true,
316322
}
317323
_, err = ipt.Transform([]Data{{"ip": "111.2.3.4"}, {"ip": "x.x.x.x"}})
318324
assert.Error(t, err)
@@ -327,16 +333,18 @@ func Test_badData(t *testing.T) {
327333
assert.Equal(t, "datx", ierr.Format)
328334

329335
ipt = &Transformer{
330-
Key: "ip",
331-
DataPath: "./test_data/bad.datn",
336+
Key: "ip",
337+
DataPath: "./test_data/bad.datn",
338+
LocalEnable: true,
332339
}
333340
_, err = ipt.Transform([]Data{{"ip": "111.2.3.4"}, {"ip": "x.x.x.x"}})
334341
assert.Error(t, err)
335342
assert.True(t, strings.Contains(err.Error(), "unrecognized data file format"))
336343

337344
ipt = &Transformer{
338-
Key: "ip",
339-
DataPath: "./test_data/bad.mmdb",
345+
Key: "ip",
346+
DataPath: "./test_data/bad.mmdb",
347+
LocalEnable: true,
340348
}
341349
_, err = ipt.Transform([]Data{{"ip": "111.2.3.4"}, {"ip": "x.x.x.x"}})
342350
assert.Error(t, err)

0 commit comments

Comments
 (0)