Skip to content

Commit c7f9ff7

Browse files
authored
Merge pull request #678 from mockerzzz/prefix_socket_reader
http reader support multiple prefix
2 parents 98b6d88 + 2462fc8 commit c7f9ff7

File tree

5 files changed

+236
-121
lines changed

5 files changed

+236
-121
lines changed

mgr/dataflow_test.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"strings"
99
"testing"
1010
"time"
11+
"sync"
1112

1213
"github.com/json-iterator/go"
1314
"github.com/stretchr/testify/assert"
@@ -295,7 +296,7 @@ func Test_getTransformer(t *testing.T) {
295296

296297
func Test_SendData(t *testing.T) {
297298
c := conf.MapConf{
298-
reader.KeyHTTPServiceAddress: ":8000",
299+
reader.KeyHTTPServiceAddress: "127.0.0.1:8000",
299300
reader.KeyHTTPServicePath: "/logkit/data",
300301
}
301302
readConf := conf.MapConf{
@@ -379,20 +380,25 @@ func Test_SendData(t *testing.T) {
379380
"sampleLog": testInput,
380381
"senders": senders,
381382
}
382-
383+
var wg sync.WaitGroup
384+
wg.Add(1)
385+
go func() {
386+
for _, exp := range testJsonExp {
387+
got, err := httpReader.ReadLine()
388+
assert.NoError(t, err)
389+
for _, e := range exp {
390+
if !strings.Contains(got, e) {
391+
t.Fatalf("exp: %v contains %v, but not", got, e)
392+
}
393+
}
394+
}
395+
wg.Done()
396+
}()
383397
err = SendData(senderConfig)
384398
if err != nil {
385399
t.Error(err)
386400
}
387-
for _, exp := range testJsonExp {
388-
got, err := httpReader.ReadLine()
389-
assert.NoError(t, err)
390-
for _, e := range exp {
391-
if !strings.Contains(got, e) {
392-
t.Fatalf("exp: %v contains %v, but not", got, e)
393-
}
394-
}
395-
}
401+
wg.Wait()
396402
}
397403

398404
func Test_getSendersConfig(t *testing.T) {

reader/http/http.go

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"fmt"
99
"io"
1010
"net/http"
11+
"strings"
12+
"sync"
1113
"sync/atomic"
1214
"time"
1315

@@ -16,7 +18,6 @@ import (
1618
"github.com/qiniu/log"
1719

1820
"github.com/qiniu/logkit/conf"
19-
"github.com/qiniu/logkit/queue"
2021
"github.com/qiniu/logkit/reader"
2122
. "github.com/qiniu/logkit/utils/models"
2223
)
@@ -37,38 +38,51 @@ func init() {
3738
reader.RegisterConstructor(reader.ModeHTTP, NewReader)
3839
}
3940

41+
type Details struct {
42+
Content string
43+
Path string
44+
}
45+
4046
type Reader struct {
4147
meta *reader.Meta
4248
// Note: 原子操作,用于表示 reader 整体的运行状态
4349
status int32
4450

45-
bufQueue queue.BackendQueue
46-
readChan <-chan []byte
51+
readChan chan Details
4752

48-
address string
49-
path string
53+
currentPath string
54+
address string
55+
paths []string
56+
wg sync.WaitGroup
5057

5158
server *http.Server
5259
}
5360

5461
func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
5562
address, _ := conf.GetStringOr(reader.KeyHTTPServiceAddress, reader.DefaultHTTPServiceAddress)
5663
path, _ := conf.GetStringOr(reader.KeyHTTPServicePath, reader.DefaultHTTPServicePath)
64+
paths := strings.Split(path, ",")
65+
for _, val := range paths {
66+
if strings.TrimSpace(val) == "" {
67+
log.Infof("path[%v] have space,space have ignored", path)
68+
continue
69+
}
70+
if !strings.HasPrefix(val, "/") {
71+
return nil, fmt.Errorf("path[%v] is incorrect,it's beginning must be '/'", val)
72+
}
73+
}
5774
address, _ = RemoveHttpProtocal(address)
5875

59-
bq := queue.NewDiskQueue(Hash("Reader<"+address+">_buffer"), meta.BufFile(), DefaultMaxBytesPerFile, 0,
60-
DefaultMaxBytesPerFile, DefaultSyncEvery, DefaultSyncEvery, time.Second*2, DefaultWriteSpeedLimit, false, 0)
6176
err := CreateDirIfNotExist(meta.BufFile())
6277
if err != nil {
6378
return nil, err
6479
}
6580
return &Reader{
6681
meta: meta,
6782
status: reader.StatusInit,
68-
bufQueue: bq,
69-
readChan: bq.ReadChan(),
83+
readChan: make(chan Details, len(paths)),
7084
address: address,
71-
path: path,
85+
paths: paths,
7286
}, nil
7387
}
7488

@@ -97,7 +111,9 @@ func (r *Reader) Start() error {
97111
}
98112

99113
e := echo.New()
100-
e.POST(r.path, r.postData())
114+
for _, path := range r.paths {
115+
e.POST(path, r.postData())
116+
}
101117

102118
r.server = &http.Server{
103119
Handler: e,
@@ -113,15 +129,21 @@ func (r *Reader) Start() error {
113129
}
114130

115131
func (r *Reader) Source() string {
116-
return r.address
132+
return r.address + r.currentPath
117133
}
118134

119135
func (r *Reader) ReadLine() (string, error) {
120136
timer := time.NewTimer(time.Second)
121137
defer timer.Stop()
122138
select {
123-
case data := <-r.readChan:
124-
return string(data), nil
139+
case data, ok := <-r.readChan:
140+
// Note:防止waitgroup.wait()已经通过的情况下再次调用waitgroup.done()
141+
if ok {
142+
//Note:确保所有数据被读取后,再关闭channel
143+
r.wg.Done()
144+
}
145+
r.currentPath = data.Path
146+
return data.Content, nil
125147
case <-timer.C:
126148
}
127149

@@ -136,11 +158,12 @@ func (r *Reader) Close() error {
136158
return nil
137159
}
138160
log.Debugf("Runner[%v] %q daemon is stopping", r.meta.RunnerName, r.Name())
139-
140161
r.server.Shutdown(context.Background())
141-
err := r.bufQueue.Close()
162+
//Note:确保所有数据被读取后,再关闭channel
163+
r.wg.Wait()
164+
close(r.readChan)
142165
atomic.StoreInt32(&r.status, reader.StatusStopped)
143-
return err
166+
return nil
144167
}
145168

146169
func (r *Reader) postData() echo.HandlerFunc {
@@ -167,10 +190,10 @@ func (r *Reader) pickUpData(req *http.Request) (err error) {
167190
}
168191
}
169192
br := bufio.NewReader(reqBody)
170-
return r.storageData(br)
193+
return r.storageData(br, req.RequestURI)
171194
}
172195

173-
func (r *Reader) storageData(br *bufio.Reader) (err error) {
196+
func (r *Reader) storageData(br *bufio.Reader, path string) (err error) {
174197
for {
175198
line, err := r.readLine(br)
176199
if err != nil {
@@ -182,7 +205,14 @@ func (r *Reader) storageData(br *bufio.Reader) (err error) {
182205
if line == "" {
183206
continue
184207
}
185-
r.bufQueue.Put([]byte(line))
208+
if atomic.LoadInt32(&r.status) == reader.StatusStopped || atomic.LoadInt32(&r.status) == reader.StatusStopping {
209+
return err
210+
}
211+
r.wg.Add(1)
212+
r.readChan <- Details{
213+
Content: line,
214+
Path: path,
215+
}
186216
}
187217
return
188218
}

reader/http/http_test.go

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"os"
99
"testing"
1010
"time"
11+
"strings"
12+
"sync"
1113

1214
"github.com/stretchr/testify/assert"
1315

@@ -29,8 +31,8 @@ func TestNewHttpReader(t *testing.T) {
2931
defer os.RemoveAll("./meta")
3032

3133
c := conf.MapConf{
32-
reader.KeyHTTPServiceAddress: ":7110",
33-
reader.KeyHTTPServicePath: "/logkit/data",
34+
reader.KeyHTTPServiceAddress: "127.0.0.1:7110",
35+
reader.KeyHTTPServicePath: "/logkit/aaa,/logkit/bbb,/logkit/ccc,/logkit/ddd",
3436
}
3537
hhttpReader, err := NewReader(meta, c)
3638
assert.NoError(t, err)
@@ -47,38 +49,52 @@ func TestNewHttpReader(t *testing.T) {
4749
"zxcvbnm,./.,mnbvcxz",
4850
"asdfghjkl;';lkjhgfdsa",
4951
}
52+
paths := strings.Split("/logkit/aaa,/logkit/bbb,/logkit/ccc,/logkit/ddd", ",")
5053

5154
// 测试正常发送
52-
req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:7110/logkit/data", nil)
53-
assert.NoError(t, err)
54-
for _, val := range testData {
55+
var wg sync.WaitGroup
56+
for index, val := range testData {
57+
req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:7110"+paths[index], nil)
58+
assert.NoError(t, err)
59+
wg.Add(1)
60+
go func(httpReader *Reader, t *testing.T, index int, val string) {
61+
got, err := httpReader.ReadLine()
62+
assert.NoError(t, err)
63+
assert.Equal(t, val, got)
64+
assert.Equal(t, "127.0.0.1:7110"+paths[index], httpReader.Source())
65+
wg.Done()
66+
}(httpReader, t, index, val)
5567
req.Body = ioutil.NopCloser(bytes.NewReader([]byte(val)))
5668
resp, err := http.DefaultClient.Do(req)
5769
assert.NoError(t, err)
5870
assert.Equal(t, http.StatusOK, resp.StatusCode)
59-
got, err := httpReader.ReadLine()
60-
assert.NoError(t, err)
61-
assert.Equal(t, val, got)
71+
wg.Wait()
6272
}
6373

6474
// 测试 gzip 发送
65-
req, err = http.NewRequest(http.MethodPost, "http://127.0.0.1:7110/logkit/data", nil)
66-
req.Header.Set(ContentTypeHeader, ApplicationGzip)
67-
req.Header.Set(ContentEncodingHeader, "gzip")
68-
assert.NoError(t, err)
69-
for _, val := range testData {
75+
for index, val := range testData {
76+
req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:7110"+paths[index], nil)
77+
req.Header.Set(ContentTypeHeader, ApplicationGzip)
78+
req.Header.Set(ContentEncodingHeader, "gzip")
79+
assert.NoError(t, err)
80+
wg.Add(1)
7081
var buf bytes.Buffer
7182
g := gzip.NewWriter(&buf)
72-
_, err := g.Write([]byte(val))
83+
_, err = g.Write([]byte(val))
7384
assert.NoError(t, err)
7485
g.Close()
7586
byteVal := buf.Bytes()
87+
go func(httpReader *Reader, t *testing.T, index int, val string) {
88+
got, err := httpReader.ReadLine()
89+
assert.NoError(t, err)
90+
assert.Equal(t, val, got)
91+
assert.Equal(t, "127.0.0.1:7110"+paths[index], httpReader.Source())
92+
wg.Done()
93+
}(httpReader, t, index, val)
7694
req.Body = ioutil.NopCloser(bytes.NewReader(byteVal))
7795
resp, err := http.DefaultClient.Do(req)
7896
assert.NoError(t, err)
7997
assert.Equal(t, http.StatusOK, resp.StatusCode)
80-
got, err := httpReader.ReadLine()
81-
assert.NoError(t, err)
82-
assert.Equal(t, val, got)
98+
wg.Wait()
8399
}
84100
}

reader/rest_reader_models.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,6 +1111,7 @@ var ModeKeyOptions = map[string][]Option{
11111111
Description: "监听地址前缀(http_service_path)",
11121112
ToolTip: "监听的请求地址,如 /data ",
11131113
},
1114+
OptionDataSourceTag,
11141115
},
11151116
ModeScript: {
11161117
{

0 commit comments

Comments
 (0)