1
1
import unittest
2
2
from ota_interface import JobInfo , ProcessesManagement
3
3
from unittest .mock import patch , mock_open , Mock , MagicMock
4
+ import os
5
+ import sqlite3
6
+ import copy
4
7
5
8
class TestJobInfo (unittest .TestCase ):
6
9
def setUp (self ):
@@ -176,7 +179,152 @@ def test_to_dict_detail(self):
176
179
)
177
180
178
181
class TestProcessesManagement (unittest .TestCase ):
179
- pass
182
+ def setUp (self ):
183
+ if os .path .isfile ('test_process.db' ):
184
+ self .tearDown ()
185
+ self .processes = ProcessesManagement (path = 'test_process.db' )
186
+ testcase_job_info = TestJobInfo ()
187
+ testcase_job_info .setUp ()
188
+ self .test_job_info = testcase_job_info .setup_job (incremental = 'target/source.zip' )
189
+ self .processes .insert_database (self .test_job_info )
190
+
191
+ def tearDown (self ):
192
+ os .remove ('test_process.db' )
193
+ try :
194
+ os .remove ('output/stderr.' + self .test_job_info .id )
195
+ os .remove ('output/stdout.' + self .test_job_info .id )
196
+ except FileNotFoundError :
197
+ pass
198
+
199
+ def test_init (self ):
200
+ # Test the database is created successfully
201
+ self .assertTrue (os .path .isfile ('test_process.db' ))
202
+ test_columns = [
203
+ {'name' : 'ID' ,'type' :'TEXT' },
204
+ {'name' : 'TargetPath' ,'type' :'TEXT' },
205
+ {'name' : 'IncrementalPath' ,'type' :'TEXT' },
206
+ {'name' : 'Verbose' ,'type' :'INTEGER' },
207
+ {'name' : 'Partial' ,'type' :'TEXT' },
208
+ {'name' : 'OutputPath' ,'type' :'TEXT' },
209
+ {'name' : 'Status' ,'type' :'TEXT' },
210
+ {'name' : 'Downgrade' ,'type' :'INTEGER' },
211
+ {'name' : 'OtherFlags' ,'type' :'TEXT' },
212
+ {'name' : 'STDOUT' ,'type' :'TEXT' },
213
+ {'name' : 'STDERR' ,'type' :'TEXT' },
214
+ {'name' : 'StartTime' ,'type' :'INTEGER' },
215
+ {'name' : 'FinishTime' ,'type' :'INTEGER' },
216
+ ]
217
+ connect = sqlite3 .connect ('test_process.db' )
218
+ cursor = connect .cursor ()
219
+ cursor .execute ("PRAGMA table_info(jobs)" )
220
+ columns = cursor .fetchall ()
221
+ for column in test_columns :
222
+ column_found = list (filter (lambda x : x [1 ]== column ['name' ], columns ))
223
+ self .assertEqual (len (column_found ), 1 ,
224
+ 'The column ' + column ['name' ] + ' is not found in database'
225
+ )
226
+ self .assertEqual (column_found [0 ][2 ], column ['type' ],
227
+ 'The column' + column ['name' ] + ' has a wrong type'
228
+ )
229
+
230
+ def test_get_status_by_ID (self ):
231
+ job_info = self .processes .get_status_by_ID (self .test_job_info .id )
232
+ self .assertEqual (job_info , self .test_job_info ,
233
+ 'The data read from database is not the same one as inserted'
234
+ )
235
+
236
+ def test_get_status (self ):
237
+ # Insert the same info again, but change the last digit of id to 0
238
+ test_job_info2 = copy .copy (self .test_job_info )
239
+ test_job_info2 .id = test_job_info2 .id [:- 1 ] + '0'
240
+ self .processes .insert_database (test_job_info2 )
241
+ job_infos = self .processes .get_status ()
242
+ self .assertEqual (len (job_infos ), 2 ,
243
+ 'The number of data entries is not the same as created'
244
+ )
245
+ self .assertEqual (job_infos [0 ], self .test_job_info ,
246
+ 'The data list read from database is not the same one as inserted'
247
+ )
248
+ self .assertEqual (job_infos [1 ], test_job_info2 ,
249
+ 'The data list read from database is not the same one as inserted'
250
+ )
251
+
252
+ def test_ota_run (self ):
253
+ # Test when the job exit normally
254
+ mock_proc = Mock ()
255
+ mock_proc .wait = Mock (return_value = 0 )
256
+ mock_Popen = Mock (return_value = mock_proc )
257
+ test_command = [
258
+ "ota_from_target_files" , "-v" ,"build/target.zip" , "output/ota.zip" ,
259
+ ]
260
+ mock_pipes_template = Mock ()
261
+ mock_pipes_template .open = Mock ()
262
+ mock_Template = Mock (return_value = mock_pipes_template )
263
+ # Mock the subprocess.Popen, subprocess.Popen().wait and pipes.Template
264
+ with patch ("subprocess.Popen" , mock_Popen ), \
265
+ patch ("pipes.Template" , mock_Template ):
266
+ self .processes .ota_run (test_command , self .test_job_info .id )
267
+ mock_Popen .assert_called_once ()
268
+ mock_proc .wait .assert_called_once ()
269
+ job_info = self .processes .get_status_by_ID (self .test_job_info .id )
270
+ self .assertEqual (job_info .status , 'Finished' )
271
+ mock_Popen .reset_mock ()
272
+ mock_proc .wait .reset_mock ()
273
+ # Test when the job exit with prbolems
274
+ mock_proc .wait = Mock (return_value = 1 )
275
+ with patch ("subprocess.Popen" , mock_Popen ), \
276
+ patch ("pipes.Template" , mock_Template ):
277
+ self .processes .ota_run (test_command , self .test_job_info .id )
278
+ mock_Popen .assert_called_once ()
279
+ mock_proc .wait .assert_called_once ()
280
+ job_info = self .processes .get_status_by_ID (self .test_job_info .id )
281
+ self .assertEqual (job_info .status , 'Error' )
282
+
283
+ def test_ota_generate (self ):
284
+ test_args = dict ({
285
+ 'output' : 'ota.zip' ,
286
+ 'extra_keys' : ['downgrade' , 'wipe_user_data' ],
287
+ 'extra' : '--disable_vabc' ,
288
+ 'isIncremental' : True ,
289
+ 'isPartial' : True ,
290
+ 'partial' : ['system' , 'vendor' ],
291
+ 'incremental' : 'target/source.zip' ,
292
+ 'target' : 'target/build.zip' ,
293
+ 'verbose' : True
294
+ })
295
+ # Usually the order of commands make no difference, but the following
296
+ # order has been validated, so it is best to follow this manner:
297
+ # ota_from_target_files [flags like -v, --downgrade]
298
+ # [-i incremental_source] [-p partial_list] target output
299
+ test_command = [
300
+ 'ota_from_target_files' , '-v' , '--downgrade' ,
301
+ '--wipe_user_data' , '--disable_vabc' , '-k' ,
302
+ 'build/make/target/product/security/testkey' ,
303
+ '-i' , 'target/source.zip' ,
304
+ '--partial' , 'system vendor' , 'target/build.zip' , 'ota.zip'
305
+ ]
306
+ mock_os_path_isfile = Mock (return_value = True )
307
+ mock_threading = Mock ()
308
+ mock_thread = Mock (return_value = mock_threading )
309
+ with patch ("os.path.isfile" , mock_os_path_isfile ), \
310
+ patch ("threading.Thread" , mock_thread ):
311
+ self .processes .ota_generate (test_args , id = 'test' )
312
+ job_info = self .processes .get_status_by_ID ('test' )
313
+ self .assertEqual (job_info .status , 'Running' ,
314
+ 'The job cannot be stored into database properly'
315
+ )
316
+ # Test if the job stored into database properly
317
+ for key , value in test_args .items ():
318
+ # extra_keys is merged to extra when stored into database
319
+ if key == 'extra_keys' :
320
+ continue
321
+ self .assertEqual (job_info .__dict__ [key ], value ,
322
+ 'The column ' + key + ' is not stored into database properly'
323
+ )
324
+ # Test if the command is in its order
325
+ self .assertEqual (mock_thread .call_args [1 ]['args' ][0 ], test_command ,
326
+ 'The subprocess command is not in its good shape'
327
+ )
180
328
181
329
if __name__ == '__main__' :
182
330
unittest .main ()
0 commit comments