Skip to content

Commit 1aaafdb

Browse files
authored
Merge pull request #692 from mockerzzz/pdr-6648
socket reader support split message by line
2 parents 96f2391 + f1d2afd commit 1aaafdb

File tree

4 files changed

+122
-9
lines changed

4 files changed

+122
-9
lines changed

mgr/cluster_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func TestClusterRest(t *testing.T) {
8181
}
8282
var logkitConf ManagerConfig
8383
logkitConf.RestDir = confDir
84-
logkitConf.BindHost = ":" + strconv.Itoa(port+i)
84+
logkitConf.BindHost = "127.0.0.1:" + strconv.Itoa(port+i)
8585
logkitConf.Cluster.Enable = true
8686
if i == 0 {
8787
logkitConf.Cluster.IsMaster = true

reader/rest_reader_models.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ const (
128128
// socket_service_address = "unix:///tmp/sys.sock"
129129
// socket_service_address = "unixgram:///tmp/sys.sock"
130130
KeySocketServiceAddress = "socket_service_address"
131+
KeySocketSplitByLine = "socket_split_by_line"
131132

132133
// 最大并发连接数
133134
// 仅用于 stream sockets (e.g. TCP).
@@ -1050,6 +1051,16 @@ var ModeKeyOptions = map[string][]Option{
10501051
Advance: true,
10511052
ToolTip: "仅tcp协议下生效",
10521053
},
1054+
{
1055+
KeyName: KeySocketSplitByLine,
1056+
Element: Radio,
1057+
ChooseOnly: true,
1058+
ChooseOptions: []interface{}{"false", "true"},
1059+
Default: "false",
1060+
Advance: true,
1061+
Description: "是否按行分隔内容(socket_split_by_line)",
1062+
ToolTip: "开启后,对socket内容按行进行分隔",
1063+
},
10531064
{
10541065
KeyName: KeySocketReadTimeout,
10551066
ChooseOnly: false,

reader/socket/socket.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,17 @@ func (ssr *streamSocketReader) read(c net.Conn) {
135135
address = localAddr.String()
136136
}
137137
}
138-
ssr.readChan <- socketInfo{address: address, data: string(scnr.Bytes())}
138+
val := string(scnr.Bytes())
139+
if ssr.IsSplitByLine {
140+
vals := strings.Split(val, "\n")
141+
for _, value := range vals {
142+
if value = strings.TrimSpace(value); value != "" {
143+
ssr.readChan <- socketInfo{address: address, data: value}
144+
}
145+
}
146+
} else {
147+
ssr.readChan <- socketInfo{address: address, data: val}
148+
}
139149
}
140150

141151
if err := scnr.Err(); err != nil {
@@ -199,8 +209,16 @@ func (psr *packetSocketReader) listen() {
199209
}
200210
val := string(buf[:n])
201211

202-
psr.readChan <- socketInfo{address: address, data: val}
203-
212+
if psr.IsSplitByLine {
213+
vals := strings.Split(val, "\n")
214+
for _, value := range vals {
215+
if value = strings.TrimSpace(value); value != "" {
216+
psr.readChan <- socketInfo{address: address, data: value}
217+
}
218+
}
219+
} else {
220+
psr.readChan <- socketInfo{address: address, data: val}
221+
}
204222
}
205223
}
206224

@@ -223,6 +241,7 @@ type Reader struct {
223241
ReadBufferSize int
224242
ReadTimeout time.Duration
225243
KeepAlivePeriod time.Duration
244+
IsSplitByLine bool
226245

227246
closer io.Closer
228247
}
@@ -246,16 +265,18 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
246265
if err != nil {
247266
return nil, err
248267
}
268+
IsSplitByLine, _ := conf.GetBoolOr(reader.KeySocketSplitByLine, false)
249269
return &Reader{
250270
meta: meta,
251271
status: reader.StatusInit,
252-
readChan: make(chan socketInfo),
272+
readChan: make(chan socketInfo, 2),
253273
errChan: make(chan error),
254274
ServiceAddress: ServiceAddress,
255275
MaxConnections: MaxConnections,
256276
ReadBufferSize: ReadBufferSize,
257277
ReadTimeout: ReadTimeoutdur,
258278
KeepAlivePeriod: KeepAlivePeriodDur,
279+
IsSplitByLine: IsSplitByLine,
259280
}, nil
260281
}
261282

reader/socket/socket_test.go

Lines changed: 85 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func TestUdpSocketReader(t *testing.T) {
2121
reader.KeyFileDone: MetaDir,
2222
KeyRunnerName: "TestUdpSocketReader",
2323
reader.KeyMode: reader.ModeSocket,
24-
reader.KeySocketServiceAddress: "udp://:5140",
24+
reader.KeySocketServiceAddress: "udp://127.0.0.1:5140",
2525
}
2626
meta, err := reader.NewMetaWithConf(logkitConf)
2727
assert.NoError(t, err)
@@ -34,7 +34,7 @@ func TestUdpSocketReader(t *testing.T) {
3434
err = sr.Start()
3535
assert.NoError(t, err)
3636

37-
sysLog, err := syslog.Dial("udp", "localhost:5140",
37+
sysLog, err := syslog.Dial("udp", "127.0.0.1:5140",
3838
syslog.LOG_WARNING|syslog.LOG_DAEMON, "demotag")
3939
if err != nil {
4040
log.Fatal(err)
@@ -57,13 +57,54 @@ func TestUdpSocketReader(t *testing.T) {
5757
assert.NoError(t, err)
5858
}
5959

60+
func TestUdpSocketReaderWithSplit(t *testing.T) {
61+
logkitConf := conf.MapConf{
62+
reader.KeyMetaPath: MetaDir,
63+
reader.KeyFileDone: MetaDir,
64+
KeyRunnerName: "TestUdpSocketReader",
65+
reader.KeyMode: reader.ModeSocket,
66+
reader.KeySocketServiceAddress: "udp://127.0.0.1:5140",
67+
reader.KeySocketSplitByLine: "true",
68+
}
69+
meta, err := reader.NewMetaWithConf(logkitConf)
70+
assert.NoError(t, err)
71+
defer os.RemoveAll(MetaDir)
72+
73+
ssr, err := NewReader(meta, logkitConf)
74+
assert.NoError(t, err)
75+
sr := ssr.(*Reader)
76+
77+
err = sr.Start()
78+
assert.NoError(t, err)
79+
80+
sysLog, err := syslog.Dial("udp", "127.0.0.1:5140",
81+
syslog.LOG_WARNING|syslog.LOG_DAEMON, "demotag")
82+
if err != nil {
83+
log.Fatal(err)
84+
}
85+
err = sysLog.Emerg("And this is a daemon emergency with demotag.\n \nthis is OK")
86+
assert.NoError(t, err)
87+
time.Sleep(30 * time.Millisecond)
88+
line, err := sr.ReadLine()
89+
assert.NoError(t, err)
90+
assert.Contains(t, line, "And this is a daemon emergency with demotag.")
91+
assert.Contains(t, sr.Source(), "127.0.0.1")
92+
line, err = sr.ReadLine()
93+
assert.NoError(t, err)
94+
assert.Contains(t, line, "this is OK")
95+
assert.Contains(t, sr.Source(), "127.0.0.1")
96+
97+
err = sr.Close()
98+
assert.NoError(t, err)
99+
}
100+
60101
func TestTCPSocketReader(t *testing.T) {
61102
logkitConf := conf.MapConf{
62103
reader.KeyMetaPath: MetaDir,
63104
reader.KeyFileDone: MetaDir,
64105
KeyRunnerName: "TestTCPSocketReader",
65106
reader.KeyMode: reader.ModeSocket,
66-
reader.KeySocketServiceAddress: "tcp://:5141",
107+
reader.KeySocketServiceAddress: "tcp://127.0.0.1:5141",
67108
}
68109
meta, err := reader.NewMetaWithConf(logkitConf)
69110
assert.NoError(t, err)
@@ -75,7 +116,7 @@ func TestTCPSocketReader(t *testing.T) {
75116
err = sr.Start()
76117
assert.NoError(t, err)
77118

78-
sysLog, err := syslog.Dial("tcp", "localhost:5141",
119+
sysLog, err := syslog.Dial("tcp", "127.0.0.1:5141",
79120
syslog.LOG_WARNING|syslog.LOG_DAEMON, "demotag")
80121
if err != nil {
81122
log.Fatal(err)
@@ -98,6 +139,46 @@ func TestTCPSocketReader(t *testing.T) {
98139
assert.NoError(t, err)
99140
}
100141

142+
func TestTCPSocketReaderWithSplit(t *testing.T) {
143+
logkitConf := conf.MapConf{
144+
reader.KeyMetaPath: MetaDir,
145+
reader.KeyFileDone: MetaDir,
146+
KeyRunnerName: "TestTCPSocketReader",
147+
reader.KeyMode: reader.ModeSocket,
148+
reader.KeySocketServiceAddress: "tcp://127.0.0.1:5141",
149+
reader.KeySocketSplitByLine: "true",
150+
}
151+
meta, err := reader.NewMetaWithConf(logkitConf)
152+
assert.NoError(t, err)
153+
defer os.RemoveAll(MetaDir)
154+
155+
ssr, err := NewReader(meta, logkitConf)
156+
assert.NoError(t, err)
157+
sr := ssr.(*Reader)
158+
err = sr.Start()
159+
assert.NoError(t, err)
160+
161+
sysLog, err := syslog.Dial("tcp", "127.0.0.1:5141",
162+
syslog.LOG_WARNING|syslog.LOG_DAEMON, "demotag")
163+
if err != nil {
164+
log.Fatal(err)
165+
}
166+
err = sysLog.Emerg("And this is a daemon emergency with demotag.\n \nthis is OK")
167+
assert.NoError(t, err)
168+
time.Sleep(30 * time.Millisecond)
169+
line, err := sr.ReadLine()
170+
assert.NoError(t, err)
171+
assert.Contains(t, line, "And this is a daemon emergency with demotag.")
172+
assert.Contains(t, sr.Source(), "127.0.0.1")
173+
line, err = sr.ReadLine()
174+
assert.NoError(t, err)
175+
assert.Contains(t, line, "this is OK")
176+
assert.Contains(t, sr.Source(), "127.0.0.1")
177+
178+
err = sr.Close()
179+
assert.NoError(t, err)
180+
}
181+
101182
func TestUnixSocketReader(t *testing.T) {
102183
logkitConf := conf.MapConf{
103184
reader.KeyMetaPath: MetaDir,

0 commit comments

Comments
 (0)