1 | from fitparse import FitFile |
---|
2 | import gpxpy |
---|
3 | import gpxpy.gpx |
---|
4 | |
---|
5 | try: # pragma: no cover |
---|
6 | import lxml.etree as mod_etree # Load LXML or fallback to cET or ET |
---|
7 | except ImportError: # pragma: no cover |
---|
8 | try: |
---|
9 | import xml.etree.cElementTree as mod_etree |
---|
10 | except ImportError: |
---|
11 | import xml.etree.ElementTree as mod_etree |
---|
12 | |
---|
13 | from ow.utilities import semicircles_to_degrees |
---|
14 | |
---|
15 | |
---|
16 | class Fit(object): |
---|
17 | """ |
---|
18 | A fit object, that can be used to load, parse, convert, etc ANT fit |
---|
19 | files |
---|
20 | """ |
---|
21 | def __init__(self, path): |
---|
22 | self.path = path |
---|
23 | # FitFile object containing all data after opening the file with |
---|
24 | # python-fitparse |
---|
25 | self.obj = FitFile(path) |
---|
26 | # data is a temporary place to store info we got from the fit file, |
---|
27 | # so we don't have to loop over data in self.obj multiple times |
---|
28 | self.data = {} |
---|
29 | |
---|
30 | def load(self): |
---|
31 | """ |
---|
32 | Load contents from the fit file |
---|
33 | """ |
---|
34 | # get some basic info from the fit file |
---|
35 | # |
---|
36 | # IMPORTANT: only single-session fit files are supported right now |
---|
37 | sessions = list(self.obj.get_messages('session')) |
---|
38 | session = sessions[0] |
---|
39 | values = session.get_values() |
---|
40 | self.data['sport'] = values.get('sport', None) |
---|
41 | |
---|
42 | # in some garmin devices (520), you can have named profiles, which |
---|
43 | # can be used for different bikes or for different types of workouts |
---|
44 | # (training/races/etc) |
---|
45 | self.data['profile'] = values.get('unknown_110', None) |
---|
46 | |
---|
47 | # naive datetime object |
---|
48 | self.data['start'] = values.get('start_time', None) |
---|
49 | |
---|
50 | # seconds |
---|
51 | self.data['duration'] = values.get('total_timer_time', None) |
---|
52 | self.data['elapsed'] = values.get('total_elapsed_time', None) |
---|
53 | |
---|
54 | # meters |
---|
55 | self.data['distance'] = values.get('total_distance', None) |
---|
56 | |
---|
57 | # meters |
---|
58 | self.data['uphill'] = values.get('total_ascent', None) |
---|
59 | self.data['downhill'] = values.get('total_descent', None) |
---|
60 | |
---|
61 | self.data['calories'] = values.get('total_calories', None) |
---|
62 | |
---|
63 | # store here a list with all hr values |
---|
64 | self.data['hr'] = [] |
---|
65 | self.data['min_hr'] = None |
---|
66 | self.data['max_hr'] = values.get('max_heart_rate', None) |
---|
67 | self.data['avg_hr'] = values.get('avg_heart_rate', None) |
---|
68 | |
---|
69 | # store here a list with all cad values |
---|
70 | self.data['cad'] = [] |
---|
71 | self.data['min_cad'] = None |
---|
72 | self.data['max_cad'] = values.get('max_cadence', None) |
---|
73 | self.data['avg_cad'] = values.get('avg_cadence', None) |
---|
74 | |
---|
75 | self.data['max_speed'] = values.get('enhanced_max_speed', None) |
---|
76 | if self.data['max_speed'] is None: |
---|
77 | self.data['max_speed'] = values.get('max_speed', None) |
---|
78 | if self.data['max_speed'] is not None: |
---|
79 | self.data['max_speed'] = self.data['max_speed'] / 1000 |
---|
80 | |
---|
81 | self.data['avg_speed'] = values.get('enhanced_avg_speed', None) |
---|
82 | if self.data['avg_speed'] is None: |
---|
83 | self.data['avg_speed'] = values.get('avg_speed', None) |
---|
84 | if self.data['avg_speed'] is not None: |
---|
85 | self.data['avg_speed'] = self.data['avg_speed'] / 1000 |
---|
86 | |
---|
87 | # no temp values yet |
---|
88 | # store here a list with all temp values |
---|
89 | self.data['atemp'] = [] |
---|
90 | self.data['min_atemp'] = None |
---|
91 | self.data['max_atemp'] = 0 |
---|
92 | self.data['avg_atemp'] = 0 |
---|
93 | |
---|
94 | @property |
---|
95 | def name(self): |
---|
96 | """ |
---|
97 | Build a name based on some info from the fit file |
---|
98 | """ |
---|
99 | if self.data['profile']: |
---|
100 | return self.data['profile'] + ' ' + self.data['sport'] |
---|
101 | return self.data['sport'] |
---|
102 | |
---|
103 | def _calculate_avg_atemp(self): |
---|
104 | temps = [t[0] for t in self.data['atemp']] |
---|
105 | if temps: |
---|
106 | self.data['avg_atemp'] = int(round(sum(temps)/len(temps))) |
---|
107 | return self.data['avg_atemp'] |
---|
108 | |
---|
109 | @property |
---|
110 | def gpx(self): |
---|
111 | """ |
---|
112 | Return valid xml-formatted gpx contents from the loaded fit file |
---|
113 | """ |
---|
114 | # Create a gpx object, adding some basic metadata to it. |
---|
115 | # |
---|
116 | # The schema namespaces info is very important, in order to support |
---|
117 | # all the data we will extract from the fit file. |
---|
118 | |
---|
119 | gpx = gpxpy.gpx.GPX() |
---|
120 | gpx.creator = 'OpenWorkouts' |
---|
121 | gpx.schema_locations = [ |
---|
122 | 'http://www.topografix.com/GPX/1/1', |
---|
123 | 'http://www.topografix.com/GPX/1/1/gpx.xsd', |
---|
124 | 'http://www.garmin.com/xmlschemas/GpxExtensions/v3', |
---|
125 | 'http://www.garmin.com/xmlschemas/GpxExtensionsv3.xsd', |
---|
126 | 'http://www.garmin.com/xmlschemas/TrackPointExtension/v1', |
---|
127 | 'http://www.garmin.com/xmlschemas/TrackPointExtensionv1.xsd' |
---|
128 | ] |
---|
129 | garmin_schemas = 'http://www.garmin.com/xmlschemas' |
---|
130 | gpx.nsmap['gpxtpx'] = garmin_schemas + '/TrackPointExtension/v1' |
---|
131 | gpx.nsmap['gpxx'] = garmin_schemas + '/GpxExtensions/v3' |
---|
132 | |
---|
133 | # Create first track in our GPX: |
---|
134 | gpx_track = gpxpy.gpx.GPXTrack() |
---|
135 | gpx_track.name = self.name |
---|
136 | gpx.tracks.append(gpx_track) |
---|
137 | |
---|
138 | # Create first segment in our GPX track: |
---|
139 | gpx_segment = gpxpy.gpx.GPXTrackSegment() |
---|
140 | gpx_track.segments.append(gpx_segment) |
---|
141 | |
---|
142 | # namespace for the additional gpx extensions |
---|
143 | # (temperature, cadence, heart rate) |
---|
144 | namespace = '{gpxtpx}' |
---|
145 | # nsmap = {'ext': namespace[1:-1]} |
---|
146 | |
---|
147 | # loop over the 'record' messages in the fit file, which are the actual |
---|
148 | # data "recorded" by the device |
---|
149 | for message in self.obj.get_messages('record'): |
---|
150 | values = message.get_values() |
---|
151 | |
---|
152 | # Create an entry where we add the "gpx extensions" data, that will |
---|
153 | # be added to the track point later. |
---|
154 | extensions_root = mod_etree.Element( |
---|
155 | namespace + 'TrackPointExtension') |
---|
156 | |
---|
157 | atemp = mod_etree.Element(namespace + 'atemp') |
---|
158 | atemp_value = values.get('temperature', 0) |
---|
159 | atemp.text = str(atemp_value) |
---|
160 | atemp.tail = '' |
---|
161 | extensions_root.append(atemp) |
---|
162 | self.data['atemp'].append((atemp_value, values['timestamp'])) |
---|
163 | if self.data['min_atemp'] is None: |
---|
164 | self.data['min_atemp'] = atemp_value |
---|
165 | elif atemp_value < self.data['min_atemp']: |
---|
166 | self.data['min_atemp'] = atemp_value |
---|
167 | if atemp_value > self.data['max_atemp']: |
---|
168 | self.data['max_atemp'] = atemp_value |
---|
169 | |
---|
170 | hr = mod_etree.Element(namespace + 'hr') |
---|
171 | hr_value = values.get('heart_rate', 0) |
---|
172 | if hr_value == 0: |
---|
173 | # don't allow hr values of 0, something may have gone wrong |
---|
174 | # with the heart rate monitor, so we use the previous value, |
---|
175 | # if any |
---|
176 | if self.data['hr']: |
---|
177 | hr_value = self.data['hr'][-1][0] |
---|
178 | hr.text = str(hr_value) |
---|
179 | hr.tail = '' |
---|
180 | extensions_root.append(hr) |
---|
181 | self.data['hr'].append((hr_value, values['timestamp'])) |
---|
182 | # min_hr None means we are on the first value, set it as min, we |
---|
183 | # don't want 0 values here. If it is not None, check if the current |
---|
184 | # value is lower than the current min, update accordingly |
---|
185 | if self.data['min_hr'] is None or hr_value < self.data['min_hr']: |
---|
186 | self.data['min_hr'] = hr_value |
---|
187 | |
---|
188 | cad = mod_etree.Element(namespace + 'cad') |
---|
189 | cad_value = values.get('cadence', 0) |
---|
190 | cad.text = str(cad_value) |
---|
191 | cad.tail = '' |
---|
192 | extensions_root.append(cad) |
---|
193 | self.data['cad'].append((cad_value, values['timestamp'])) |
---|
194 | if self.data['min_cad'] is None: |
---|
195 | self.data['min_cad'] = cad_value |
---|
196 | elif cad_value < self.data['min_cad']: |
---|
197 | self.data['min_cad'] = cad_value |
---|
198 | |
---|
199 | # Create a gpx track point, that holds the "recorded" geo and speed |
---|
200 | # data, as well as the "gpx extensions" object |
---|
201 | if values.get('position_lat') and values.get('position_long'): |
---|
202 | track_point = gpxpy.gpx.GPXTrackPoint( |
---|
203 | latitude=semicircles_to_degrees(values['position_lat']), |
---|
204 | longitude=semicircles_to_degrees(values['position_long']), |
---|
205 | time=values['timestamp'] |
---|
206 | ) |
---|
207 | |
---|
208 | if 'enhanced_altitude' in values.keys(): |
---|
209 | track_point.elevation = values['enhanced_altitude'] |
---|
210 | |
---|
211 | if 'enhanced_speed' in values.keys(): |
---|
212 | track_point.speed = values['enhanced_speed'] |
---|
213 | |
---|
214 | track_point.extensions.append(extensions_root) |
---|
215 | |
---|
216 | gpx_segment.points.append(track_point) |
---|
217 | |
---|
218 | # if the fit file has temperature data, calculate the avg |
---|
219 | if self.data['atemp']: |
---|
220 | self._calculate_avg_atemp() |
---|
221 | |
---|
222 | # return a string containing the xml representation of the gpx object |
---|
223 | return gpx.to_xml() |
---|