Skip to content

Commit 644cfa1

Browse files
authored
Merge pull request #532 from OpenCOMPES/mpes_process_files_from_memory
Mpes process files from memory
2 parents d13b8e7 + 1bd8613 commit 644cfa1

File tree

1 file changed

+60
-32
lines changed

1 file changed

+60
-32
lines changed

sed/loader/mpes/loader.py

Lines changed: 60 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"""
66
import datetime
77
import glob
8+
import io
89
import json
910
import os
1011
from typing import Dict
@@ -27,6 +28,29 @@
2728
from sed.loader.base.loader import BaseLoader
2829

2930

31+
def load_h5_in_memory(file_path):
32+
"""
33+
Load an HDF5 file entirely into memory and open it with h5py.
34+
35+
Parameters:
36+
file_path (str): Path to the .h5 file.
37+
38+
Returns:
39+
h5py.File: An h5py File object representing the in-memory HDF5 file.
40+
"""
41+
# Read the entire file into memory
42+
with open(file_path, "rb") as f:
43+
file_content = f.read()
44+
45+
# Load the content into a BytesIO object
46+
file_buffer = io.BytesIO(file_content)
47+
48+
# Open the HDF5 file using h5py from the in-memory buffer
49+
h5_file = h5py.File(file_buffer, "r")
50+
51+
return h5_file
52+
53+
3054
def hdf5_to_dataframe(
3155
files: Sequence[str],
3256
group_names: Sequence[str] = None,
@@ -67,7 +91,7 @@ def hdf5_to_dataframe(
6791

6892
# Read a file to parse the file structure
6993
test_fid = kwds.pop("test_fid", 0)
70-
test_proc = h5py.File(files[test_fid])
94+
test_proc = load_h5_in_memory(files[test_fid])
7195
if group_names == []:
7296
group_names, alias_dict = get_groups_and_aliases(
7397
h5file=test_proc,
@@ -80,7 +104,7 @@ def hdf5_to_dataframe(
80104
column_names.append(time_stamp_alias)
81105

82106
test_array = hdf5_to_array(
83-
h5file=test_proc,
107+
h5filename=files[test_fid],
84108
group_names=group_names,
85109
time_stamps=time_stamps,
86110
ms_markers_group=ms_markers_group,
@@ -94,7 +118,7 @@ def hdf5_to_dataframe(
94118
arrays.append(
95119
da.from_delayed(
96120
dask.delayed(hdf5_to_array)(
97-
h5file=h5py.File(f),
121+
h5filename=f,
98122
group_names=group_names,
99123
time_stamps=time_stamps,
100124
ms_markers_group=ms_markers_group,
@@ -111,6 +135,8 @@ def hdf5_to_dataframe(
111135

112136
array_stack = da.concatenate(arrays, axis=1).T
113137

138+
test_proc.close()
139+
114140
return ddf.from_dask_array(array_stack, columns=column_names)
115141

116142

@@ -155,7 +181,7 @@ def hdf5_to_timed_dataframe(
155181

156182
# Read a file to parse the file structure
157183
test_fid = kwds.pop("test_fid", 0)
158-
test_proc = h5py.File(files[test_fid])
184+
test_proc = load_h5_in_memory(files[test_fid])
159185
if group_names == []:
160186
group_names, alias_dict = get_groups_and_aliases(
161187
h5file=test_proc,
@@ -168,7 +194,7 @@ def hdf5_to_timed_dataframe(
168194
column_names.append(time_stamp_alias)
169195

170196
test_array = hdf5_to_timed_array(
171-
h5file=test_proc,
197+
h5filename=files[test_fid],
172198
group_names=group_names,
173199
time_stamps=time_stamps,
174200
ms_markers_group=ms_markers_group,
@@ -182,7 +208,7 @@ def hdf5_to_timed_dataframe(
182208
arrays.append(
183209
da.from_delayed(
184210
dask.delayed(hdf5_to_timed_array)(
185-
h5file=h5py.File(f),
211+
h5filename=f,
186212
group_names=group_names,
187213
time_stamps=time_stamps,
188214
ms_markers_group=ms_markers_group,
@@ -198,6 +224,8 @@ def hdf5_to_timed_dataframe(
198224

199225
array_stack = da.concatenate(arrays, axis=1).T
200226

227+
test_proc.close()
228+
201229
return ddf.from_dask_array(array_stack, columns=column_names)
202230

203231

@@ -237,7 +265,7 @@ def get_groups_and_aliases(
237265

238266

239267
def hdf5_to_array(
240-
h5file: h5py.File,
268+
h5filename: str,
241269
group_names: Sequence[str],
242270
data_type: str = "float32",
243271
time_stamps=False,
@@ -248,14 +276,10 @@ def hdf5_to_array(
248276
2-dimensional array with the corresponding values.
249277
250278
Args:
251-
h5file (h5py.File):
252-
hdf5 file handle to read from
253-
group_names (str):
254-
group names to read
255-
data_type (str, optional):
256-
Data type of the output data. Defaults to "float32".
257-
time_stamps (bool, optional):
258-
Option to calculate time stamps. Defaults to False.
279+
h5filename (str): hdf5 file name to read from
280+
group_names (str): group names to read
281+
data_type (str, optional): Data type of the output data. Defaults to "float32".
282+
time_stamps (bool, optional): Option to calculate time stamps. Defaults to False.
259283
ms_markers_group (str): h5 column containing timestamp information.
260284
Defaults to "msMarkers".
261285
first_event_time_stamp_key (str): h5 attribute containing the start
@@ -267,6 +291,8 @@ def hdf5_to_array(
267291

268292
# Delayed array for loading an HDF5 file of reasonable size (e.g. < 1GB)
269293

294+
h5file = load_h5_in_memory(h5filename)
295+
270296
# Read out groups:
271297
data_list = []
272298
for group in group_names:
@@ -293,7 +319,7 @@ def hdf5_to_array(
293319
except KeyError:
294320
# get the start time of the file from its modification date if the key
295321
# does not exist (old files)
296-
start_time = os.path.getmtime(h5file.filename) # convert to ms
322+
start_time = os.path.getmtime(h5filename) # convert to ms
297323
# the modification time points to the time when the file was finished, so we
298324
# need to correct for the time it took to write the file
299325
start_time -= len(ms_marker) / 1000
@@ -316,11 +342,13 @@ def hdf5_to_array(
316342

317343
data_list.append(time_stamp_data)
318344

345+
h5file.close()
346+
319347
return np.asarray(data_list)
320348

321349

322350
def hdf5_to_timed_array(
323-
h5file: h5py.File,
351+
h5filename: str,
324352
group_names: Sequence[str],
325353
data_type: str = "float32",
326354
time_stamps=False,
@@ -331,14 +359,10 @@ def hdf5_to_timed_array(
331359
timed version of a 2-dimensional array with the corresponding values.
332360
333361
Args:
334-
h5file (h5py.File):
335-
hdf5 file handle to read from
336-
group_names (str):
337-
group names to read
338-
data_type (str, optional):
339-
Data type of the output data. Defaults to "float32".
340-
time_stamps (bool, optional):
341-
Option to calculate time stamps. Defaults to False.
362+
h5filename (str): hdf5 file name to read from
363+
group_names (str): group names to read
364+
data_type (str, optional): Data type of the output data. Defaults to "float32".
365+
time_stamps (bool, optional): Option to calculate time stamps. Defaults to False.
342366
ms_markers_group (str): h5 column containing timestamp information.
343367
Defaults to "msMarkers".
344368
first_event_time_stamp_key (str): h5 attribute containing the start
@@ -351,6 +375,8 @@ def hdf5_to_timed_array(
351375

352376
# Delayed array for loading an HDF5 file of reasonable size (e.g. < 1GB)
353377

378+
h5file = load_h5_in_memory(h5filename)
379+
354380
# Read out groups:
355381
data_list = []
356382
ms_marker = np.asarray(h5file[ms_markers_group])
@@ -377,7 +403,7 @@ def hdf5_to_timed_array(
377403
except KeyError:
378404
# get the start time of the file from its modification date if the key
379405
# does not exist (old files)
380-
start_time = os.path.getmtime(h5file.filename) # convert to ms
406+
start_time = os.path.getmtime(h5filename) # convert to ms
381407
# the modification time points to the time when the file was finished, so we
382408
# need to correct for the time it took to write the file
383409
start_time -= len(ms_marker) / 1000
@@ -386,6 +412,8 @@ def hdf5_to_timed_array(
386412

387413
data_list.append(time_stamp_data)
388414

415+
h5file.close()
416+
389417
return np.asarray(data_list)
390418

391419

@@ -692,16 +720,16 @@ def get_start_and_end_time(self) -> Tuple[float, float]:
692720
Returns:
693721
Tuple[float, float]: A tuple containing the start and end time stamps
694722
"""
695-
h5file = h5py.File(self.files[0])
723+
h5filename = self.files[0]
696724
timestamps = hdf5_to_array(
697-
h5file,
725+
h5filename=h5filename,
698726
group_names=self._config["dataframe"]["hdf5_groupnames"],
699727
time_stamps=True,
700728
)
701729
ts_from = timestamps[-1][1]
702-
h5file = h5py.File(self.files[-1])
730+
h5filename = self.files[-1]
703731
timestamps = hdf5_to_array(
704-
h5file,
732+
h5filename=h5filename,
705733
group_names=self._config["dataframe"]["hdf5_groupnames"],
706734
time_stamps=True,
707735
)
@@ -929,7 +957,7 @@ def get_count_rate(
929957
for fid in fids:
930958
try:
931959
count_rate_, secs_ = get_count_rate(
932-
h5py.File(self.files[fid]),
960+
load_h5_in_memory(self.files[fid]),
933961
ms_markers_group=ms_markers_group,
934962
)
935963
secs_list.append((accumulated_time + secs_).T)
@@ -974,7 +1002,7 @@ def get_elapsed_time(self, fids: Sequence[int] = None, **kwds) -> float:
9741002
for fid in fids:
9751003
try:
9761004
secs += get_elapsed_time(
977-
h5py.File(self.files[fid]),
1005+
load_h5_in_memory(self.files[fid]),
9781006
ms_markers_group=ms_markers_group,
9791007
)
9801008
except OSError as exc:

0 commit comments

Comments
 (0)