Skip to content

Commit b1ff93b

Browse files
ruflintsg
authored andcommitted
Fix bug when migrating old states, 2 restarts are required (#3322) (#3326)
The state of migrated states was not properly updated in the registry file. This lead to the issue that after the first restart, the states were migrated but the prospector assumed the states were not finished and didn't start harvesting. A second restart resolved the problem. Discussion started here: https://discuss.elastic.co/t/filebeat-upgrade-requiring-multiple-restarts/70414/8 (cherry picked from commit fb47507)
1 parent 33e4fda commit b1ff93b

File tree

4 files changed

+194
-117
lines changed

4 files changed

+194
-117
lines changed

CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ https://github.com/elastic/beats/compare/v5.1.2...5.1[Check the HEAD diff]
3232
*Packetbeat*
3333

3434
*Filebeat*
35+
- Fix registry migration issue from old states were files were only harvested after second restart. {pull}3322[3322]
3536

3637
*Winlogbeat*
3738

filebeat/registrar/registrar.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -118,15 +118,7 @@ func (r *Registrar) loadStates() error {
118118
return fmt.Errorf("Error decoding states: %s", err)
119119
}
120120

121-
// Set all states to finished and disable TTL on restart
122-
// For all states covered by a prospector, TTL will be overwritten with the prospector value
123-
for key, state := range states {
124-
state.Finished = true
125-
// Set ttl to -2 to easily spot which states are not managed by a prospector
126-
state.TTL = -2
127-
states[key] = state
128-
}
129-
121+
states = resetStates(states)
130122
r.states.SetStates(states)
131123
logp.Info("States Loaded from registrar: %+v", len(states))
132124

@@ -176,6 +168,7 @@ func (r *Registrar) loadAndConvertOldState(f *os.File) bool {
176168
// Convert old states to new states
177169
logp.Info("Old registry states found: %v", len(oldStates))
178170
states := convertOldStates(oldStates)
171+
states = resetStates(states)
179172
r.states.SetStates(states)
180173

181174
// Rewrite registry in new format
@@ -186,6 +179,19 @@ func (r *Registrar) loadAndConvertOldState(f *os.File) bool {
186179
return true
187180
}
188181

182+
// resetStates sets all states to finished and disable TTL on restart
183+
// For all states covered by a prospector, TTL will be overwritten with the prospector value
184+
func resetStates(states []file.State) []file.State {
185+
186+
for key, state := range states {
187+
state.Finished = true
188+
// Set ttl to -2 to easily spot which states are not managed by a prospector
189+
state.TTL = -2
190+
states[key] = state
191+
}
192+
return states
193+
}
194+
189195
func convertOldStates(oldStates map[string]file.State) []file.State {
190196
// Convert old states to new states
191197
states := []file.State{}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
from filebeat import BaseTest
2+
3+
import os
4+
import platform
5+
import time
6+
import shutil
7+
import json
8+
import stat
9+
from nose.plugins.skip import Skip, SkipTest
10+
11+
12+
class Test(BaseTest):
13+
14+
def test_migration_non_windows(self):
15+
"""
16+
Tests if migration from old filebeat registry to new format works
17+
"""
18+
19+
if os.name == "nt":
20+
raise SkipTest
21+
22+
registry_file = self.working_dir + '/registry'
23+
24+
# Write old registry file
25+
with open(registry_file, 'w') as f:
26+
f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}}')
27+
28+
self.render_config_template(
29+
path=os.path.abspath(self.working_dir) + "/log/input*",
30+
clean_removed="false",
31+
clean_inactive="0",
32+
)
33+
34+
filebeat = self.start_beat()
35+
36+
self.wait_until(
37+
lambda: self.log_contains("Old registry states found: 2"),
38+
max_timeout=15)
39+
40+
self.wait_until(
41+
lambda: self.log_contains("Old states converted to new states and written to registrar: 2"),
42+
max_timeout=15)
43+
44+
filebeat.check_kill_and_wait()
45+
46+
# Check if content is same as above
47+
assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4
48+
assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6
49+
50+
# Compare first entry
51+
oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}}')
52+
newJson = self.get_registry_entry_by_path("logs/hello.log")
53+
del newJson["timestamp"]
54+
del newJson["ttl"]
55+
assert newJson == oldJson
56+
57+
# Compare second entry
58+
oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}')
59+
newJson = self.get_registry_entry_by_path("logs/log2.log")
60+
del newJson["timestamp"]
61+
del newJson["ttl"]
62+
assert newJson == oldJson
63+
64+
# Make sure the right number of entries is in
65+
data = self.get_registry()
66+
assert len(data) == 2
67+
68+
def test_migration_windows(self):
69+
"""
70+
Tests if migration from old filebeat registry to new format works
71+
"""
72+
73+
if os.name != "nt":
74+
raise SkipTest
75+
76+
registry_file = self.working_dir + '/registry'
77+
78+
# Write old registry file
79+
with open(registry_file, 'w') as f:
80+
f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}}')
81+
82+
self.render_config_template(
83+
path=os.path.abspath(self.working_dir) + "/log/input*",
84+
)
85+
86+
filebeat = self.start_beat()
87+
88+
self.wait_until(
89+
lambda: self.log_contains("Old registry states found: 2"),
90+
max_timeout=15)
91+
92+
self.wait_until(
93+
lambda: self.log_contains("Old states converted to new states and written to registrar: 2"),
94+
max_timeout=15)
95+
96+
filebeat.check_kill_and_wait()
97+
98+
# Check if content is same as above
99+
assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4
100+
assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6
101+
102+
# Compare first entry
103+
oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}}')
104+
newJson = self.get_registry_entry_by_path("logs/hello.log")
105+
del newJson["timestamp"]
106+
del newJson["ttl"]
107+
assert newJson == oldJson
108+
109+
# Compare second entry
110+
oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}')
111+
newJson = self.get_registry_entry_by_path("logs/log2.log")
112+
del newJson["timestamp"]
113+
del newJson["ttl"]
114+
assert newJson == oldJson
115+
116+
# Make sure the right number of entries is in
117+
data = self.get_registry()
118+
assert len(data) == 2
119+
120+
def test_migration_continue_reading(self):
121+
"""
122+
Tests if after the migration filebeat keeps reading the file
123+
"""
124+
125+
os.mkdir(self.working_dir + "/log/")
126+
testfile1 = self.working_dir + "/log/test.log"
127+
128+
with open(testfile1, 'w') as f:
129+
f.write("entry10\n")
130+
131+
registry_file = self.working_dir + '/registry'
132+
133+
self.render_config_template(
134+
path=os.path.abspath(self.working_dir) + "/log/*",
135+
output_file_filename="filebeat_1",
136+
)
137+
138+
# Run filebeat to create a registry
139+
filebeat = self.start_beat(output="filebeat1.log")
140+
self.wait_until(
141+
lambda: self.output_has(lines=1, output_file="output/filebeat_1"),
142+
max_timeout=10)
143+
filebeat.check_kill_and_wait()
144+
145+
# Create old registry file out of the new one
146+
r = self.get_registry()
147+
registry_entry = r[0]
148+
del registry_entry["timestamp"]
149+
del registry_entry["ttl"]
150+
old_registry = {registry_entry["source"]: registry_entry}
151+
152+
# Overwrite registry
153+
with open(registry_file, 'w') as f:
154+
json.dump(old_registry, f)
155+
156+
157+
self.render_config_template(
158+
path=os.path.abspath(self.working_dir) + "/log/*",
159+
output_file_filename="filebeat_2",
160+
)
161+
162+
filebeat = self.start_beat(output="filebeat2.log")
163+
164+
# Wait until state is migrated
165+
self.wait_until(
166+
lambda: self.log_contains(
167+
"Old states converted to new states and written to registrar: 1", "filebeat2.log"),
168+
max_timeout=10)
169+
170+
with open(testfile1, 'a') as f:
171+
f.write("entry12\n")
172+
173+
# After restart new output file is created -> only 1 new entry
174+
self.wait_until(
175+
lambda: self.output_has(lines=1, output_file="output/filebeat_2"),
176+
max_timeout=10)
177+
178+
filebeat.check_kill_and_wait()

filebeat/tests/system/test_registrar.py

Lines changed: 0 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -624,114 +624,6 @@ def test_state_after_rotation_ignore_older(self):
624624
assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 9
625625
assert self.get_registry_entry_by_path(os.path.abspath(testfile2))["offset"] == 8
626626

627-
628-
def test_migration_non_windows(self):
629-
"""
630-
Tests if migration from old filebeat registry to new format works
631-
"""
632-
633-
if os.name == "nt":
634-
raise SkipTest
635-
636-
registry_file = self.working_dir + '/registry'
637-
638-
# Write old registry file
639-
with open(registry_file, 'w') as f:
640-
f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}}')
641-
642-
self.render_config_template(
643-
path=os.path.abspath(self.working_dir) + "/log/input*",
644-
clean_removed="false",
645-
clean_inactive="0",
646-
)
647-
648-
filebeat = self.start_beat()
649-
650-
self.wait_until(
651-
lambda: self.log_contains("Old registry states found: 2"),
652-
max_timeout=15)
653-
654-
self.wait_until(
655-
lambda: self.log_contains("Old states converted to new states and written to registrar: 2"),
656-
max_timeout=15)
657-
658-
filebeat.check_kill_and_wait()
659-
660-
# Check if content is same as above
661-
assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4
662-
assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6
663-
664-
# Compare first entry
665-
oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}}')
666-
newJson = self.get_registry_entry_by_path("logs/hello.log")
667-
del newJson["timestamp"]
668-
del newJson["ttl"]
669-
assert newJson == oldJson
670-
671-
# Compare second entry
672-
oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}')
673-
newJson = self.get_registry_entry_by_path("logs/log2.log")
674-
del newJson["timestamp"]
675-
del newJson["ttl"]
676-
assert newJson == oldJson
677-
678-
# Make sure the right number of entries is in
679-
data = self.get_registry()
680-
assert len(data) == 2
681-
682-
def test_migration_windows(self):
683-
"""
684-
Tests if migration from old filebeat registry to new format works
685-
"""
686-
687-
if os.name != "nt":
688-
raise SkipTest
689-
690-
registry_file = self.working_dir + '/registry'
691-
692-
# Write old registry file
693-
with open(registry_file, 'w') as f:
694-
f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}}')
695-
696-
self.render_config_template(
697-
path=os.path.abspath(self.working_dir) + "/log/input*",
698-
)
699-
700-
filebeat = self.start_beat()
701-
702-
self.wait_until(
703-
lambda: self.log_contains("Old registry states found: 2"),
704-
max_timeout=15)
705-
706-
self.wait_until(
707-
lambda: self.log_contains("Old states converted to new states and written to registrar: 2"),
708-
max_timeout=15)
709-
710-
filebeat.check_kill_and_wait()
711-
712-
# Check if content is same as above
713-
assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4
714-
assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6
715-
716-
# Compare first entry
717-
oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}}')
718-
newJson = self.get_registry_entry_by_path("logs/hello.log")
719-
del newJson["timestamp"]
720-
del newJson["ttl"]
721-
assert newJson == oldJson
722-
723-
# Compare second entry
724-
oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}')
725-
newJson = self.get_registry_entry_by_path("logs/log2.log")
726-
del newJson["timestamp"]
727-
del newJson["ttl"]
728-
assert newJson == oldJson
729-
730-
# Make sure the right number of entries is in
731-
data = self.get_registry()
732-
assert len(data) == 2
733-
734-
735627
def test_clean_inactive(self):
736628
"""
737629
Checks that states are properly removed after clean_inactive

0 commit comments

Comments
 (0)