Skip to content

Commit c084af9

Browse files
李红李红
authored andcommitted
update pandora go sdk
1 parent 8680cbc commit c084af9

File tree

10 files changed

+182
-115
lines changed

10 files changed

+182
-115
lines changed

mgr/dataflow.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -378,21 +378,18 @@ func checkSampleData(sampleData []string, logParser parser.Parser) ([]string, er
378378
}
379379

380380
func getTransformerCreator(transformerConfig map[string]interface{}) (transforms.Creator, error) {
381-
transformKeyType, ok := transformerConfig[KeyType]
381+
transformKeyType, ok := transformerConfig[transforms.KeyType]
382382
if !ok {
383-
err := fmt.Errorf("missing param %s", KeyType)
384-
return nil, err
383+
return nil, fmt.Errorf("missing param %s", transforms.KeyType)
385384
}
386385
transformKeyTypeStr, ok := transformKeyType.(string)
387386
if !ok {
388-
err := fmt.Errorf("param %s must be of type string", KeyType)
389-
return nil, err
387+
return nil, fmt.Errorf("param %s must be of type string", transforms.KeyType)
390388
}
391389

392390
create, ok := transforms.Transformers[transformKeyTypeStr]
393391
if !ok {
394-
err := fmt.Errorf("transformer of type %v not exist", transformKeyTypeStr)
395-
return nil, err
392+
return nil, fmt.Errorf("transformer of type %v not exist", transformKeyTypeStr)
396393
}
397394
return create, nil
398395
}

mgr/runner.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/qiniu/logkit/sender"
2929
_ "github.com/qiniu/logkit/sender/builtin"
3030
"github.com/qiniu/logkit/transforms"
31+
"github.com/qiniu/logkit/transforms/ip"
3132
. "github.com/qiniu/logkit/utils/models"
3233
)
3334

@@ -291,9 +292,7 @@ func NewLogExportRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, r
291292
senderConfig[sender.KeyPandoraDescription] = LogkitAutoCreateDescription
292293
}
293294
}
294-
if senderConfig[sender.KeySenderType] == sender.TypePandora {
295-
senderConfig = setSenderConfig(senderConfig, serverConfigs)
296-
}
295+
senderConfig = setSenderConfig(senderConfig, serverConfigs)
297296
s, err := sr.NewSender(senderConfig, meta.FtSaveLogPath())
298297
if err != nil {
299298
return nil, err
@@ -314,7 +313,7 @@ func createTransformers(rc RunnerConfig) ([]transforms.Transformer, error) {
314313
transformers := make([]transforms.Transformer, 0)
315314
for idx := range rc.Transforms {
316315
tConf := rc.Transforms[idx]
317-
tp := tConf[KeyType]
316+
tp := tConf[transforms.KeyType]
318317
if tp == nil {
319318
return nil, fmt.Errorf("transformer config type is empty %v", tConf)
320319
}
@@ -1297,20 +1296,25 @@ func MergeExtraInfoTags(meta *reader.Meta, tags map[string]interface{}) map[stri
12971296
}
12981297

12991298
func setSenderConfig(senderConfig conf.MapConf, serverConfigs []map[string]interface{}) conf.MapConf {
1299+
if senderConfig[sender.KeySenderType] != sender.TypePandora {
1300+
return senderConfig
1301+
}
1302+
13001303
for _, serverConfig := range serverConfigs {
1301-
keyType, ok := serverConfig[KeyType].(string)
1302-
if !ok || keyType != KeyIP {
1304+
keyType, ok := serverConfig[transforms.KeyType].(string)
1305+
if !ok || keyType != ip.Name {
13031306
continue
13041307
}
1305-
localEnable, ok := serverConfig[LocalEnable].(bool)
1306-
if !ok {
1308+
1309+
key, keyOk := serverConfig["key"].(string)
1310+
if !keyOk {
13071311
continue
13081312
}
1309-
13101313
autoCreate := senderConfig[sender.KeyPandoraAutoCreate]
1311-
if localEnable {
1312-
schema := fmt.Sprintf(",%v ip", KeyIP)
1313-
if autoCreate == fmt.Sprintf("%v ip", KeyIP) {
1314+
localEnable, localEnableOk := serverConfig[transforms.LocalEnable].(string)
1315+
if localEnableOk && localEnable == ip.Local {
1316+
schema := fmt.Sprintf(",%v ip", key)
1317+
if autoCreate == fmt.Sprintf("%v ip", key) {
13141318
autoCreate = ""
13151319
} else if index := strings.Index(autoCreate, schema); index != -1 {
13161320
autoCreate = autoCreate[:index] + autoCreate[index+len(schema):]
@@ -1320,12 +1324,12 @@ func setSenderConfig(senderConfig conf.MapConf, serverConfigs []map[string]inter
13201324
}
13211325

13221326
if autoCreate == "" {
1323-
senderConfig[sender.KeyPandoraAutoCreate] = fmt.Sprintf("%v ip", KeyIP)
1327+
senderConfig[sender.KeyPandoraAutoCreate] = fmt.Sprintf("%v ip", key)
13241328
continue
13251329
}
13261330

13271331
if !strings.Contains(autoCreate, KeyIP) {
1328-
senderConfig[sender.KeyPandoraAutoCreate] += fmt.Sprintf(",%v ip", KeyIP)
1332+
senderConfig[sender.KeyPandoraAutoCreate] += fmt.Sprintf(",%v ip", key)
13291333
}
13301334
}
13311335

mgr/runner_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ import (
2626
_ "github.com/qiniu/logkit/sender/builtin"
2727
"github.com/qiniu/logkit/sender/mock"
2828
"github.com/qiniu/logkit/sender/pandora"
29+
"github.com/qiniu/logkit/transforms"
2930
_ "github.com/qiniu/logkit/transforms/builtin"
31+
"github.com/qiniu/logkit/transforms/ip"
3032
. "github.com/qiniu/logkit/utils/models"
3133
)
3234

@@ -1896,12 +1898,57 @@ DONE:
18961898
break DONE
18971899
default:
18981900
dft++
1901+
18991902
}
19001903
time.Sleep(50 * time.Millisecond)
19011904
if dft > 60 {
19021905
break
19031906
}
19041907
}
19051908
assert.Equal(t, 1, ret)
1909+
}
1910+
1911+
func Test_setSenderConfig(t *testing.T) {
1912+
senderConfig := conf.MapConf{
1913+
sender.KeySenderType: sender.TypePandora,
1914+
}
1915+
1916+
serverConfigs := []map[string]interface{}{
1917+
{
1918+
transforms.KeyType: ip.Name,
1919+
transforms.LocalEnable: ip.Server,
1920+
},
1921+
}
1922+
actualConfig := setSenderConfig(senderConfig, serverConfigs)
1923+
assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate])
1924+
1925+
serverConfigs = []map[string]interface{}{
1926+
{
1927+
transforms.KeyType: ip.Name,
1928+
transforms.LocalEnable: ip.Server,
1929+
"key": "ip",
1930+
},
1931+
}
1932+
actualConfig = setSenderConfig(senderConfig, serverConfigs)
1933+
assert.Equal(t, "ip ip", actualConfig[sender.KeyPandoraAutoCreate])
1934+
1935+
senderConfig = conf.MapConf{
1936+
sender.KeySenderType: sender.TypePandora,
1937+
}
1938+
serverConfigs = []map[string]interface{}{
1939+
{
1940+
transforms.KeyType: ip.Name,
1941+
transforms.LocalEnable: ip.Local,
1942+
},
1943+
}
1944+
actualConfig = setSenderConfig(senderConfig, serverConfigs)
1945+
assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate])
19061946

1947+
serverConfigs = []map[string]interface{}{
1948+
{
1949+
transforms.KeyType: "other",
1950+
},
1951+
}
1952+
actualConfig = setSenderConfig(senderConfig, serverConfigs)
1953+
assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate])
19071954
}

sender/pandora/pandora.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ type Sender struct {
4848
microsecondCounter uint64
4949
extraInfo map[string]string
5050
sendType string
51-
EnbleServerIp bool
5251
}
5352

5453
// UserSchema was parsed pandora schema from user's raw schema
@@ -497,7 +496,6 @@ func newPandoraSender(opt *PandoraOption) (s *Sender, err error) {
497496
log.Errorf("Runner[%v] Sender[%v]: auto create pandora repo error: %v, you can create on pandora portal, ignored...", opt.runnerName, opt.name, err)
498497
err = nil
499498
}
500-
log.Infof("`````````````````````````````dsl: %v, schemas: %v, opt.autoCreate: %v", dsl, schemas, opt.autoCreate)
501499
if initErr := s.client.InitOrUpdateWorkflow(&pipeline.InitOrUpdateWorkflowInput{
502500
// 此处要的 schema 为 autoCreate 中用户指定的,所以 SchemaFree 要恒为 true
503501
InitOptionChange: true,

transforms/ip/ip.go

Lines changed: 64 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ const (
2020
Latitude = "Latitude"
2121
Longitude = "Longitude"
2222
DistrictCode = "DistrictCode"
23+
24+
Local = "使用本地解析"
25+
Server = "使用服务端解析"
2326
)
2427

2528
var (
@@ -33,7 +36,7 @@ type Transformer struct {
3336
StageTime string `json:"stage"`
3437
Key string `json:"key"`
3538
DataPath string `json:"data_path"`
36-
LocalEnable bool `json:"local_enable"`
39+
LocalEnable string `json:"local_enable"`
3740
KeyAsPrefix bool `json:"key_as_prefix"`
3841
Language string `json:"language"`
3942

@@ -54,13 +57,13 @@ type Transformer struct {
5457
}
5558

5659
func (t *Transformer) Init() error {
57-
if t.Key != "" {
58-
t.LocalEnable = true
60+
if t.LocalEnable == "" {
61+
t.LocalEnable = Local
5962
}
60-
61-
if !t.LocalEnable {
63+
if t.LocalEnable != Local {
6264
return nil
6365
}
66+
6467
if t.Language == "" {
6568
t.Language = "zh-CN"
6669
}
@@ -100,9 +103,10 @@ func (_ *Transformer) RawTransform(datas []string) ([]string, error) {
100103
}
101104

102105
func (t *Transformer) Transform(datas []Data) ([]Data, error) {
103-
if !t.LocalEnable {
106+
if t.LocalEnable != Local {
104107
return datas, nil
105108
}
109+
106110
var err, fmtErr error
107111
errNum := 0
108112
if t.loc == nil {
@@ -232,62 +236,65 @@ func (_ *Transformer) SampleConfig() string {
232236
func (_ *Transformer) ConfigOptions() []Option {
233237
return []Option{
234238
{
235-
KeyName: LocalEnable,
236-
Element: Radio,
239+
KeyName: transforms.LocalEnable,
240+
Element: Checkbox,
237241
ChooseOnly: true,
238-
ChooseOptions: []interface{}{true, false},
239-
Default: false,
242+
ChooseOptions: []interface{}{Local, Server},
243+
Default: Local,
240244
Required: true,
241245
DefaultNoUse: false,
242-
Description: "使用本地解析",
246+
Description: "使用本地/服务端解析",
243247
Type: transforms.TransformTypeBoolean,
248+
ToolTip: "本地解析使用客户自己的IP库,更为灵活。服务端解析固定使用七牛IP库,用户无需提供IP库",
244249
},
245250
{
246-
KeyName: "key",
247-
ChooseOnly: false,
248-
Default: "",
249-
Required: true,
250-
Placeholder: "my_field_keyname",
251-
DefaultNoUse: true,
252-
Description: "要进行Transform变化的键(key)",
253-
ToolTip: "对该字段的值进行transform变换",
254-
Type: transforms.TransformTypeString,
255-
AdvanceDepend: LocalEnable,
251+
KeyName: "key",
252+
ChooseOnly: false,
253+
Default: "",
254+
Required: true,
255+
Placeholder: "my_field_keyname",
256+
DefaultNoUse: true,
257+
Description: "要进行Transform变化的键(key)",
258+
ToolTip: "对该字段的值进行transform变换,服务端不支持嵌套",
259+
Type: transforms.TransformTypeString,
256260
},
257261
{
258-
KeyName: "data_path",
259-
ChooseOnly: false,
260-
Default: "",
261-
Required: true,
262-
Placeholder: "your/path/to/ip.dat(x)",
263-
DefaultNoUse: true,
264-
Description: "IP数据库路径(data_path)",
265-
Type: transforms.TransformTypeString,
266-
AdvanceDepend: LocalEnable,
262+
KeyName: "data_path",
263+
ChooseOnly: false,
264+
Default: "",
265+
Required: true,
266+
Placeholder: "your/path/to/ip.dat(x)",
267+
DefaultNoUse: true,
268+
Description: "IP数据库路径(data_path)",
269+
Type: transforms.TransformTypeString,
270+
AdvanceDepend: transforms.LocalEnable,
271+
AdvanceDependValue: Local,
267272
},
268273
{
269-
KeyName: "key_as_prefix",
270-
ChooseOnly: true,
271-
ChooseOptions: []interface{}{false, true},
272-
Required: false,
273-
Default: true,
274-
DefaultNoUse: false,
275-
Element: Checkbox,
276-
Description: "字段名称作为前缀(key_as_prefix)",
277-
Type: transforms.TransformTypeString,
278-
AdvanceDepend: LocalEnable,
274+
KeyName: "key_as_prefix",
275+
ChooseOnly: true,
276+
ChooseOptions: []interface{}{false, true},
277+
Required: false,
278+
Default: true,
279+
DefaultNoUse: false,
280+
Element: Checkbox,
281+
Description: "字段名称作为前缀(key_as_prefix)",
282+
Type: transforms.TransformTypeString,
283+
AdvanceDepend: transforms.LocalEnable,
284+
AdvanceDependValue: Local,
279285
},
280286
{
281-
KeyName: "language",
282-
ChooseOnly: false,
283-
Default: "zh-CN",
284-
Required: true,
285-
Placeholder: "zh-CN",
286-
DefaultNoUse: true,
287-
Description: "mmdb格式库使用的语种",
288-
Advance: true,
289-
Type: transforms.TransformTypeString,
290-
AdvanceDepend: LocalEnable,
287+
KeyName: "language",
288+
ChooseOnly: false,
289+
Default: "zh-CN",
290+
Required: true,
291+
Placeholder: "zh-CN",
292+
DefaultNoUse: true,
293+
Description: "mmdb格式库使用的语种",
294+
Advance: true,
295+
Type: transforms.TransformTypeString,
296+
AdvanceDepend: transforms.LocalEnable,
297+
AdvanceDependValue: Local,
291298
},
292299
}
293300
}
@@ -314,8 +321,13 @@ func (t *Transformer) Close() error {
314321

315322
func (t *Transformer) ServerConfig() map[string]interface{} {
316323
config := make(map[string]interface{})
317-
config[KeyType] = Name
318-
config[LocalEnable] = t.LocalEnable
324+
config[transforms.KeyType] = Name
325+
config[transforms.LocalEnable] = t.LocalEnable
326+
t.keys = GetKeys(t.Key)
327+
if len(t.keys) == 1 {
328+
config["key"] = t.keys[0]
329+
}
330+
319331
return config
320332
}
321333

transforms/ip/ip_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ func Test_badData(t *testing.T) {
307307
ipt := &Transformer{
308308
Key: "ip",
309309
DataPath: "./test_data/bad.dat",
310-
LocalEnable: true,
310+
LocalEnable: Local,
311311
}
312312
_, err := ipt.Transform([]Data{{"ip": "111.2.3.4"}, {"ip": "x.x.x.x"}})
313313
assert.Error(t, err)
@@ -318,7 +318,7 @@ func Test_badData(t *testing.T) {
318318
ipt = &Transformer{
319319
Key: "ip",
320320
DataPath: "./test_data/bad.datx",
321-
LocalEnable: true,
321+
LocalEnable: Local,
322322
}
323323
_, err = ipt.Transform([]Data{{"ip": "111.2.3.4"}, {"ip": "x.x.x.x"}})
324324
assert.Error(t, err)
@@ -335,7 +335,7 @@ func Test_badData(t *testing.T) {
335335
ipt = &Transformer{
336336
Key: "ip",
337337
DataPath: "./test_data/bad.datn",
338-
LocalEnable: true,
338+
LocalEnable: Local,
339339
}
340340
_, err = ipt.Transform([]Data{{"ip": "111.2.3.4"}, {"ip": "x.x.x.x"}})
341341
assert.Error(t, err)
@@ -344,7 +344,7 @@ func Test_badData(t *testing.T) {
344344
ipt = &Transformer{
345345
Key: "ip",
346346
DataPath: "./test_data/bad.mmdb",
347-
LocalEnable: true,
347+
LocalEnable: Local,
348348
}
349349
_, err = ipt.Transform([]Data{{"ip": "111.2.3.4"}, {"ip": "x.x.x.x"}})
350350
assert.Error(t, err)

0 commit comments

Comments
 (0)