Skip to content

Commit 677aca5

Browse files
李红李红
authored andcommitted
update pandora go sdk
1 parent f99d52b commit 677aca5

File tree

11 files changed

+226
-129
lines changed

11 files changed

+226
-129
lines changed

mgr/dataflow.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -378,21 +378,18 @@ func checkSampleData(sampleData []string, logParser parser.Parser) ([]string, er
378378
}
379379

380380
func getTransformerCreator(transformerConfig map[string]interface{}) (transforms.Creator, error) {
381-
transformKeyType, ok := transformerConfig[KeyType]
381+
transformKeyType, ok := transformerConfig[transforms.KeyType]
382382
if !ok {
383-
err := fmt.Errorf("missing param %s", KeyType)
384-
return nil, err
383+
return nil, fmt.Errorf("missing param %s", transforms.KeyType)
385384
}
386385
transformKeyTypeStr, ok := transformKeyType.(string)
387386
if !ok {
388-
err := fmt.Errorf("param %s must be of type string", KeyType)
389-
return nil, err
387+
return nil, fmt.Errorf("param %s must be of type string", transforms.KeyType)
390388
}
391389

392390
create, ok := transforms.Transformers[transformKeyTypeStr]
393391
if !ok {
394-
err := fmt.Errorf("transformer of type %v not exist", transformKeyTypeStr)
395-
return nil, err
392+
return nil, fmt.Errorf("transformer of type %v not exist", transformKeyTypeStr)
396393
}
397394
return create, nil
398395
}

mgr/runner.go

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/qiniu/logkit/sender"
2929
_ "github.com/qiniu/logkit/sender/builtin"
3030
"github.com/qiniu/logkit/transforms"
31+
"github.com/qiniu/logkit/transforms/ip"
3132
. "github.com/qiniu/logkit/utils/models"
3233
)
3334

@@ -291,15 +292,17 @@ func NewLogExportRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, r
291292
senderConfig[sender.KeyPandoraDescription] = LogkitAutoCreateDescription
292293
}
293294
}
294-
if senderConfig[sender.KeySenderType] == sender.TypePandora {
295-
senderConfig = setSenderConfig(senderConfig, serverConfigs)
295+
senderConfig, err := setSenderConfig(senderConfig, serverConfigs)
296+
if err != nil {
297+
return nil, err
296298
}
297299
s, err := sr.NewSender(senderConfig, meta.FtSaveLogPath())
298300
if err != nil {
299301
return nil, err
300302
}
301303
senders = append(senders, s)
302304
delete(rc.SendersConfig[i], sender.InnerUserAgent)
305+
delete(rc.SendersConfig[i], sender.KeyPandoraDescription)
303306
}
304307

305308
senderCnt := len(senders)
@@ -314,7 +317,7 @@ func createTransformers(rc RunnerConfig) ([]transforms.Transformer, error) {
314317
transformers := make([]transforms.Transformer, 0)
315318
for idx := range rc.Transforms {
316319
tConf := rc.Transforms[idx]
317-
tp := tConf[KeyType]
320+
tp := tConf[transforms.KeyType]
318321
if tp == nil {
319322
return nil, fmt.Errorf("transformer config type is empty %v", tConf)
320323
}
@@ -1296,38 +1299,59 @@ func MergeExtraInfoTags(meta *reader.Meta, tags map[string]interface{}) map[stri
12961299
return tags
12971300
}
12981301

1299-
func setSenderConfig(senderConfig conf.MapConf, serverConfigs []map[string]interface{}) conf.MapConf {
1302+
func setSenderConfig(senderConfig conf.MapConf, serverConfigs []map[string]interface{}) (conf.MapConf, error) {
1303+
if senderConfig[sender.KeySenderType] != sender.TypePandora {
1304+
return senderConfig, nil
1305+
}
1306+
1307+
var err error
13001308
for _, serverConfig := range serverConfigs {
1301-
keyType, ok := serverConfig[KeyType].(string)
1302-
if !ok || keyType != KeyIP {
1303-
continue
1304-
}
1305-
localEnable, ok := serverConfig[LocalEnable].(bool)
1309+
keyType, ok := serverConfig[transforms.KeyType].(string)
13061310
if !ok {
13071311
continue
13081312
}
1309-
1310-
autoCreate := senderConfig[sender.KeyPandoraAutoCreate]
1311-
if localEnable {
1312-
schema := fmt.Sprintf(",%v ip", KeyIP)
1313-
if autoCreate == fmt.Sprintf("%v ip", KeyIP) {
1314-
autoCreate = ""
1315-
} else if index := strings.Index(autoCreate, schema); index != -1 {
1316-
autoCreate = autoCreate[:index] + autoCreate[index+len(schema):]
1313+
switch keyType {
1314+
case ip.Name:
1315+
if senderConfig, err = setIPConfig(senderConfig, serverConfig); err != nil {
1316+
return senderConfig, err
13171317
}
1318-
senderConfig[sender.KeyPandoraAutoCreate] = autoCreate
1319-
continue
13201318
}
13211319

1322-
if autoCreate == "" {
1323-
senderConfig[sender.KeyPandoraAutoCreate] = fmt.Sprintf("%v ip", KeyIP)
1324-
continue
1325-
}
1320+
}
1321+
1322+
return senderConfig, nil
1323+
}
1324+
1325+
func setIPConfig(senderConfig conf.MapConf, serverConfig map[string]interface{}) (conf.MapConf, error) {
1326+
key, keyOk := serverConfig["key"].(string)
1327+
if !keyOk {
1328+
return senderConfig, nil
1329+
}
13261330

1327-
if !strings.Contains(autoCreate, KeyIP) {
1328-
senderConfig[sender.KeyPandoraAutoCreate] += fmt.Sprintf(",%v ip", KeyIP)
1331+
if len(GetKeys(key)) > 1 {
1332+
return senderConfig, fmt.Errorf("key: %v ip transform key in server doesn't support dot(.)", key)
1333+
}
1334+
autoCreate := senderConfig[sender.KeyPandoraAutoCreate]
1335+
transformAt, transformAtOk := serverConfig[transforms.TransformAt].(string)
1336+
if !transformAtOk {
1337+
return senderConfig, nil
1338+
}
1339+
if transformAt == ip.Local {
1340+
schema := fmt.Sprintf(",%v ip", key)
1341+
if autoCreate == fmt.Sprintf("%v ip", key) {
1342+
autoCreate = ""
1343+
} else if index := strings.Index(autoCreate, schema); index != -1 {
1344+
autoCreate = autoCreate[:index] + autoCreate[index+len(schema):]
13291345
}
1346+
senderConfig[sender.KeyPandoraAutoCreate] = autoCreate
1347+
return senderConfig, nil
1348+
}
1349+
1350+
if autoCreate == "" {
1351+
senderConfig[sender.KeyPandoraAutoCreate] = fmt.Sprintf("%s %s", key, TypeIP)
1352+
return senderConfig, nil
13301353
}
13311354

1332-
return senderConfig
1355+
senderConfig[sender.KeyPandoraAutoCreate] += fmt.Sprintf(",%s %s", key, TypeIP)
1356+
return senderConfig, nil
13331357
}

mgr/runner_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ import (
2626
_ "github.com/qiniu/logkit/sender/builtin"
2727
"github.com/qiniu/logkit/sender/mock"
2828
"github.com/qiniu/logkit/sender/pandora"
29+
"github.com/qiniu/logkit/transforms"
2930
_ "github.com/qiniu/logkit/transforms/builtin"
31+
"github.com/qiniu/logkit/transforms/ip"
3032
. "github.com/qiniu/logkit/utils/models"
3133
)
3234

@@ -1896,12 +1898,71 @@ DONE:
18961898
break DONE
18971899
default:
18981900
dft++
1901+
18991902
}
19001903
time.Sleep(50 * time.Millisecond)
19011904
if dft > 60 {
19021905
break
19031906
}
19041907
}
19051908
assert.Equal(t, 1, ret)
1909+
}
1910+
1911+
func Test_setSenderConfig(t *testing.T) {
1912+
senderConfig := conf.MapConf{
1913+
sender.KeySenderType: sender.TypePandora,
1914+
}
1915+
1916+
serverConfigs := []map[string]interface{}{
1917+
{
1918+
transforms.KeyType: ip.Name,
1919+
transforms.TransformAt: ip.Server,
1920+
},
1921+
}
1922+
actualConfig, err := setSenderConfig(senderConfig, serverConfigs)
1923+
assert.NoError(t, err)
1924+
assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate])
1925+
1926+
serverConfigs = []map[string]interface{}{
1927+
{
1928+
transforms.KeyType: ip.Name,
1929+
transforms.TransformAt: ip.Server,
1930+
"key": "ip",
1931+
},
1932+
}
1933+
actualConfig, err = setSenderConfig(senderConfig, serverConfigs)
1934+
assert.NoError(t, err)
1935+
assert.Equal(t, "ip ip", actualConfig[sender.KeyPandoraAutoCreate])
19061936

1937+
senderConfig = conf.MapConf{
1938+
sender.KeySenderType: sender.TypePandora,
1939+
}
1940+
serverConfigs = []map[string]interface{}{
1941+
{
1942+
transforms.KeyType: ip.Name,
1943+
transforms.TransformAt: ip.Local,
1944+
},
1945+
}
1946+
actualConfig, err = setSenderConfig(senderConfig, serverConfigs)
1947+
assert.NoError(t, err)
1948+
assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate])
1949+
1950+
serverConfigs = []map[string]interface{}{
1951+
{
1952+
transforms.KeyType: "other",
1953+
},
1954+
}
1955+
actualConfig, err = setSenderConfig(senderConfig, serverConfigs)
1956+
assert.NoError(t, err)
1957+
assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate])
1958+
1959+
serverConfigs = []map[string]interface{}{
1960+
{
1961+
transforms.KeyType: ip.Name,
1962+
transforms.TransformAt: ip.Server,
1963+
"key": "ip.ip",
1964+
},
1965+
}
1966+
actualConfig, err = setSenderConfig(senderConfig, serverConfigs)
1967+
assert.Error(t, err)
19071968
}

sender/pandora/pandora.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ type Sender struct {
4848
microsecondCounter uint64
4949
extraInfo map[string]string
5050
sendType string
51-
EnbleServerIp bool
5251
}
5352

5453
// UserSchema was parsed pandora schema from user's raw schema
@@ -497,7 +496,6 @@ func newPandoraSender(opt *PandoraOption) (s *Sender, err error) {
497496
log.Errorf("Runner[%v] Sender[%v]: auto create pandora repo error: %v, you can create on pandora portal, ignored...", opt.runnerName, opt.name, err)
498497
err = nil
499498
}
500-
log.Infof("`````````````````````````````dsl: %v, schemas: %v, opt.autoCreate: %v", dsl, schemas, opt.autoCreate)
501499
if initErr := s.client.InitOrUpdateWorkflow(&pipeline.InitOrUpdateWorkflowInput{
502500
// 此处要的 schema 为 autoCreate 中用户指定的,所以 SchemaFree 要恒为 true
503501
InitOptionChange: true,

sender/rest_senders_models.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ var ModeKeyOptions = map[string][]Option{
185185
Default: "nb",
186186
DefaultNoUse: false,
187187
Description: "创建的资源所在区域(pandora_region)",
188-
Advance: true,
189188
ToolTip: "工作流资源创建所在区域",
190189
},
191190
{

0 commit comments

Comments
 (0)