-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathfdsobj.py
More file actions
146 lines (119 loc) · 4.71 KB
/
fdsobj.py
File metadata and controls
146 lines (119 loc) · 4.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
import math
from array import array
import fdsheader
import numpy as np
inf = float('inf')
class FdsObj:
'''fds file object for reading '''
__isOpen = False
__fin = None
__num_bytes_per_datum = None
__bins_per_shot_to_read = None
__bins_per_shot_to_skip = None
__bytes_per_shot_to_read = None
__bytes_per_shot_to_skip = None
path = None
header = None
start_bin = None
end_bin = None
start_shot = None
block_size = 1
def __init__(self, filepath, bin_lims=None, start_shot=0):
self.path = os.path.normpath(filepath)
self.header = fdsheader.readheader(filepath)
self.start_shot = start_shot
# open file
if not self.__isOpen:
try:
self.__fin = open(self.path,'rb')
self.__isOpen = True
except:
raise ValueError('Could not open file.')
self.header = fdsheader.readheader(self.path)
# read needed header values
try:
dataStart_byte = self.header['HeaderSize_Bytes']
self.bin_spacing_m = self.header['DataLocusSpacing_m']
self.sample_rate_Hz = self.header['TimeStepFrequency_Hz']
self.first_bin_m = self.header['PositionOfFirstSample_m']
num_bins = self.header['DataLocusCount']
num_shots = self.header['TimeStepCount']
self.duration_s = num_shots / self.sample_rate_Hz
self.data_type = self.header['DataEncoding']
except:
raise ValueError('Missing header values.')
if bin_lims != None:
self.start_bin = bin_lims[0]
self.end_bin = bin_lims[1]
else:
self.start_bin = 0
self.end_bin = num_bins - 1
if self.start_bin < 0:
self.start_bin = 0
if self.end_bin > num_bins-1:
self.end_bin = num_bins-1
if self.start_shot < 0:
self.start_shot = 0
if self.start_shot > num_shots-1:
raise ValueError('Start shot past file end.')
if self.data_type == 'uint16':
num_bytes_per_datum = 2
elif self.data_type == 'int16':
num_bytes_per_datum = 2
elif self.data_type == 'real32' or self.data_type == 'single':
num_bytes_per_datum = 4
else:
raise ValueError('Unsupported data type: returning.')
self.__bins_per_shot_to_read = int(self.end_bin - self.start_bin + 1)
self.__bins_per_shot_to_skip = int(num_bins - self.__bins_per_shot_to_read)
self.__bytes_per_shot_to_read = self.__bins_per_shot_to_read*num_bytes_per_datum
self.__bytes_per_shot_to_skip = self.__bins_per_shot_to_skip*num_bytes_per_datum
aoi_start_byte = int(dataStart_byte +
(self.start_shot * num_bins + self.start_bin)*num_bytes_per_datum)
# it is assumed that this will leave us at the first byte to be read in the desired range in read_shot
self.__fin.seek(aoi_start_byte,0)
def __del__(self):
self.__fin.close()
# is is assumed that the current pos in the file is the first byte to be read in the current shot in the desired range.
# __init__ ensures this for the first shot read
def read_shot(self):
if self.header['DataEncoding'] == 'uint16':
read_arr = array('H')
elif self.header['DataEncoding'] == 'int16':
read_arr = array('i')
elif self.header['DataEncoding'] == 'real32' or self.header['DataEncoding'] == 'single':
read_arr = array('f')
else:
raise ValueError('Unsupported data type: returning.')
try:
read_arr.fromfile(self.__fin,self.__bins_per_shot_to_read)
self.__fin.seek(self.__bytes_per_shot_to_skip, 1)
except:
raise StopIteration
return read_arr
def read(self):
return np.asarray([shot for i, shot in zip(range(self.block_size), self)])
def num_bins(self):
return self.end_bin - self.start_bin + 1
def __iter__(self):
return self
def __next__(self):
return self.read_shot()
def __repr__(self):
return "FDS File: % s\n" \
"data_type: % s\n" \
"start_bin: % s\n" \
"end_bin: % s\n" \
"start_shot: % s \n" \
"sample_rate_Hz: % s\n" \
"bin_spacing_m: % s\n" \
"block_size: % s" \
% (os.path.split(self.path)[1],
self.data_type,
self.start_bin,
self.end_bin,
self.start_shot,
self.sample_rate_Hz,
self.bin_spacing_m,
self.block_size)