Source code for dgp.lib.gravity_ingestor

# coding=utf-8

"""
gravity_ingestor.py
Library for gravity data import functions

"""

import csv
import numpy as np
import pandas as pd
import functools
import datetime
import struct
import fnmatch
import os
import re

from .time_utils import convert_gps_time
from .etc import interp_nans

def _extract_bits(bitfield, columns=None, as_bool=False):
    """
    Function that extracts bitfield values from integers.

    A pandas.Series or numpy.array of integers is converted to a
    pandas.DataFrame of 1/0 or True/False values for as many bits
    there are in the integer - least signficant bit first - or for as many
    column names that are given.

    Parameters
    ----------
    bitfields : numpy.array or pandas.Series
        16, 32, or 64-bit integers
    columns : list, optional
        If a list is given, then the column names are given to the resulting
        columns in the order listed.
    as_bool : bool, optional
        If True, then values in returned DataFrame are type numpy.bool_

    Returns
    -------
    pandas.DataFrame

    """
    def _unpack_bits(n):
        x = np.array(struct.unpack('4B', struct.pack('>I', n)), dtype=np.uint8)
        return np.flip(np.unpackbits(x), axis=0)

    data = bitfield.apply(_unpack_bits)
    df = pd.DataFrame(np.column_stack(list(zip(*data))))

    # set column names
    if columns is not None:
        # remove fields from the end if not named
        if len(columns) < len(df.columns):
            df.drop(df.columns[range(len(columns), len(df.columns))], axis=1, inplace=True)
            df.columns = columns
        elif len(columns) > len(df.columns):
            df.columns = columns[:len(df.columns)]
        else:
            df.columns = columns

    if as_bool:
        return df.astype(np.bool_)
    else:
        return df

[docs]def read_at1a(path, fill_with_nans=True, interp=False): """ Read and parse gravity data file from DGS AT1A (Airborne) meter. CSV Columns: gravity, long, cross, beam, temp, status, pressure, Etemp, GPSweek, GPSweekseconds Parameters ---------- path : str Filesystem path to gravity data file fill_with_nans : boolean, default True Fills time gaps with NaNs for all fields interp : boolean, default False Interpolate all NaNs for fields of type numpy.number Returns ------- pandas.DataFrame Gravity data indexed by datetime. """ fields = ['gravity', 'long', 'cross', 'beam', 'temp', 'status', 'pressure', 'Etemp', 'GPSweek', 'GPSweekseconds'] data = [] df = pd.read_csv(path, header=None, engine='c', na_filter=False) df.columns = fields # expand status field status_field_names = ['clamp', 'unclamp', 'gps_sync', 'feedback', 'reserved1', 'reserved2', 'ad_lock', 'cmd_rcvd', 'nav_mode_1', 'nav_mode_2', 'plat_comm', 'sens_comm', 'gps_input', 'ad_sat', 'long_sat', 'cross_sat', 'on_line'] status = _extract_bits(df['status'], columns=status_field_names, as_bool=True) df = pd.concat([df, status], axis=1) df.drop('status', axis=1, inplace=True) # create datetime index dt_list = [] for (week, sow) in zip(df['GPSweek'], df['GPSweekseconds']): dt_list.append(convert_gps_time(week, sow, format='datetime')) df.index = pd.DatetimeIndex(dt_list) if fill_with_nans: # select rows where time is synced with the GPS NMEA df = df.loc[df['gps_sync']] # fill gaps with NaNs interval = '100000U' index = pd.date_range(df.index[0], df.index[-1], freq=interval) df = df.reindex(index) if interp: numeric = df.select_dtypes(include=[np.number]) numeric = numeric.apply(interp_nans) # replace columns for col in numeric.columns: df[col] = numeric[col] return df
def _parse_ZLS_file_name(filename): # split by underscore fname = [e.split('.') for e in filename.split('_')] # split hour from day and then flatten into one tuple b = [int(el) for fname_parts in fname for el in fname_parts] # generate datetime c = datetime.datetime(b[0], 1, 1) + datetime.timedelta(days=b[2]-1, hours=b[1]) return c def _read_ZLS_format_file(filepath): col_names = ['line_name', 'year', 'day', 'hour', 'minute', 'second', 'sensor', 'spring_tension', 'cross_coupling', 'raw_beam', 'vcc', 'al', 'ax', 've2', 'ax2', 'xacc2', 'lacc2', 'xacc', 'lacc', 'par_port', 'platform_period'] col_widths = [10, 4, 3, 2, 2, 2, 8, 8, 7, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 6] time_columns = ['year', 'day', 'hour', 'minute', 'second'] # read into dataframe df = pd.read_fwf(filepath, widths=col_widths, names=col_names) day_fmt = lambda x: '{:03d}'.format(x) time_fmt = lambda x: '{:02d}'.format(x) t = df['year'].map(str) + df['day'].map(day_fmt) + \ df['hour'].map(time_fmt) + df['minute'].map(time_fmt) + \ df['second'].map(time_fmt) # index by datetime df.index = pd.to_datetime(t, format='%Y%j%H%M%S') df.drop(time_columns, axis=1, inplace=True) return df
[docs]def read_zls(dirpath, begin_time=None, end_time=None, excludes=['.*']): """ Read and parse gravity data file from ZLS meter. Files are segmented by hour and data is presented as ASCII in a fixed-width format. Columns: line name, year, day, hour, minute, second, gravity, spring tension, \ cross coupling, raw beam, vcc, al, ax, ve2, ax2, xacc2, lacc2, xacc, \ lacc, par port, platform period Parameters ---------- dirpath : str Filesystem path to directory containing files begin_time : datetime, optional Data start time if not importing from the first file in the directory end_time : datetime, optional Data end time if not importing to the last file in the directory excludes : list Files and directories to exclude from directory listing. Returns ------- pandas.DataFrame Gravity data indexed by datetime. """ excludes = r'|'.join([fnmatch.translate(x) for x in excludes]) or r'$.' # list files in directory files = [_parse_ZLS_file_name(f) for f in os.listdir(dirpath) if os.path.isfile(os.path.join(dirpath, f)) if not re.match(excludes, f)] # sort files files = sorted(files) # validate begin and end times if begin_time is None and end_time is None: begin_time = files[0] end_time = files[-1] + datetime.timedelta(hours=1) elif begin_time is None and end_time is not None: begin_time = files[0] if end_time < begin_time or end_time > files[-1]: raise ValueError('end time ({end}) is out of bounds' .format(end=end_time)) elif begin_time is not None and end_time is None: end_time = files[-1] if begin_time > end_time or begin_time < files[0]: raise ValueError('begin time ({begin}) is out of bounds' .format(begin=begin_time)) else: if begin_time > end_time: raise ValueError('begin time ({begin}) is after end time ({end})' .format(begin=begin_time, end=end_time)) # filter file list based on begin and end times files = filter(lambda x: (x >= begin_time and x <= end_time) or (begin_time >= x and begin_time <= x + datetime.timedelta(hours=1)) or (end_time - datetime.timedelta(hours=1) <= x and end_time >= x), files) # convert to ZLS-type file names files = [dt.strftime('%Y_%H.%j') for dt in files] df = pd.DataFrame() for f in files: frame = _read_ZLS_format_file(os.path.join(dirpath, f)) df = pd.concat([df, frame]) df.drop(df.index[df.index < begin_time], inplace=True) df.drop(df.index[df.index > end_time], inplace=True) return df