Source code for cosipy.data_io.UnBinnedData

# Imports:
import numpy as np
from astropy.table import Table
from astropy.io import fits
from scipy import interpolate
import h5py
import time
from cosipy.data_io import DataIO
from cosipy.spacecraftfile import SpacecraftFile
import gzip
import astropy.coordinates as astro_co
import astropy.units as u
from astropy.coordinates import SkyCoord
from scoords import Attitude
from scoords import SpacecraftFrame
import logging
import sys
import math
from tqdm import tqdm
import subprocess
import gc
import os
import time
logger = logging.getLogger(__name__)

[docs] class UnBinnedData(DataIO): """Handles unbinned data."""
[docs] def read_tra(self, output_name=None, run_test=False, use_ori=False, event_min=None, event_max=None): """Reads MEGAlib .tra (or .tra.gz) file and creates cosi datset. Parameters ---------- output_name : str, optional Prefix of output file (default is None, in which case no output is written). run_test : bool, optional This is for unit testing only! Keep False unless comparing to MEGAlib calculations. use_ori : bool, optional Option to get pointing information from the orientation file, based on event time-stamps (default is False, in which case the pointing information comes from the event file itself). Note: this is an option for now, but will later be the default. event_min : int, optional Minimum event number to process (inclusive). All events below this will be skipped. event_max : int, optional Maximum event number to process (non-inclusive). All events at and above this will be skipped. Note: event_min and event_max correspond to the total number of events in the file, which is not necessarily the same as the event ID number. The purpose of this is to allow the data to be read in chunks, in order to overcome memory limitations, depending on the user's system. Returns ------- cosi_dataset, dict The returned dictionary contains the COSI dataset, which has the form: cosi_dataset = {'Energies':erg,\ 'TimeTags':tt,\ 'Xpointings':np.array([lonX,latX]).T,\ 'Ypointings':np.array([lonY,latY]).T,\ 'Zpointings':np.array([lonZ,latZ]).T,\ 'Phi':phi,\ 'Chi local':chi_loc,\ 'Psi local':psi_loc,\ 'Distance':dist,\ 'Chi galactic':chi_gal,\ 'Psi galactic':psi_gal}\ Arrays contain unbinned photon data. Notes ----- The current code is only able to handle data with Compton events. It will need to be modified to handle single-site and pair events. This method sets the instance attribute, cosi_dataset, but it does not explicitly return this. """ start_time = time.time() # Initialise empty lists: # Total photon energy erg = [] # Time tag in UNIX time tt = [] # Event Type (CE or PE) et = [] # Galactic latitude of X direction of spacecraft latX = [] # Galactic lontitude of X direction of spacecraft lonX = [] # Galactic latitude of Z direction of spacecraft latZ = [] # Galactic longitude of Z direction of spacecraft lonZ = [] # Compton scattering angle phi = [] # Measured data space angle chi (azimuth direction; 0..360 deg) chi_loc = [] # Measured data space angle psi (polar direction; 0..180 deg) psi_loc = [] # Measured gal angle chi (lon direction) chi_gal = [] # Measured gal angle psi (lat direction) psi_gal = [] # Components of dg (position vector from 1st interaion to 2nd) dg_x = [] dg_y = [] dg_z = [] # Define electron rest energy, which is used in calculation # of Compton scattering angle. c_E0 = 510.9989500015 # keV # This is for unit testing purposes only. # Use same value as MEGAlib for direct comparison: if run_test == True: c_E0 = 510.999 print("Preparing to read file...") # Open .tra.gz file: if self.data_file.endswith(".gz"): f = gzip.open(self.data_file,"rt") # Need to get number of lines for progress bar. # First try fast method for unix-based systems: try: proc=subprocess.Popen('gunzip -c %s | wc -l' %self.data_file, \ shell=True, stdout=subprocess.PIPE) num_lines = float(proc.communicate()[0]) # If fast method fails, use long method, which should work in all cases. except: print("Initial attempt failed.") print("Using long method...") g = gzip.open(self.data_file,"rt") num_lines = sum(1 for line in g) g.close() # Open .tra file: elif self.data_file.endswith(".tra"): f = open(self.data_file,"r") try: proc=subprocess.Popen('wc -l < %s' %self.data_file, \ shell=True, stdout=subprocess.PIPE) num_lines = float(proc.communicate()[0]) except: print("Initial attempt failed.") print("Using long method...") g = open(self.data_file,"rt") num_lines = sum(1 for line in g) g.close() else: print() print("ERROR: Input data file must have '.tra' or '.gz' extenstion.") print() sys.exit() # Read tra file line by line: print("Reading file...") N_events = 0 # number of events pbar = tqdm(total=num_lines) # start progress bar for line in f: this_line = line.strip().split() pbar.update(1) # update progress bar # Make sure line isn't empty: if len(this_line) == 0: continue # Count the number of events: if this_line[0] == "ID": N_events += 1 # Option to only parse a subset of events: if event_min != None: if N_events < event_min: continue if event_max != None: if N_events >= event_max: pbar.close() print("Stopping here: only reading a subset of events") break # Total photon energy and Compton angle: if this_line[0] == "CE": # Compute the total photon energy: m_Eg = float(this_line[1]) # Energy of scattered gamma ray in keV m_Ee = float(this_line[3]) # Energy of recoil electron in keV this_erg = m_Eg + m_Ee erg.append(this_erg) # Compute the Compton scatter angle due to the standard equation, # i.e. neglect the movement of the electron, # which would lead to a Doppler-broadening. this_value = 1.0 - c_E0 * (1.0/m_Eg - 1.0/(m_Ee + m_Eg)) this_phi = np.arccos(this_value) # radians phi.append(this_phi) # Time tag in Unix time (seconds): if this_line[0] == "TI": tt.append(float(this_line[1])) # Event type: if this_line[0] == "ET": et.append(this_line[1]) # X axis of detector orientation in Galactic coordinates: if this_line[0] == "GX": this_lonX = np.deg2rad(float(this_line[1])) # radians this_latX = np.deg2rad(float(this_line[2])) # radians lonX.append(this_lonX) latX.append(this_latX) # Z axis of detector orientation in Galactic coordinates: if this_line[0] == "GZ": this_lonZ = np.deg2rad(float(this_line[1])) # radians this_latZ = np.deg2rad(float(this_line[2])) # radians lonZ.append(this_lonZ) latZ.append(this_latZ) # Interaction position information: if (this_line[0] == "CH"): # First interaction: if this_line[1] == "0": v1 = np.array((float(this_line[2]),\ float(this_line[3]),float(this_line[4]))) # Second interaction: if this_line[1] == "1": v2 = np.array((float(this_line[2]), float(this_line[3]), float(this_line[4]))) # Compute position vector between first two interactions: dg = v1 - v2 dg_x.append(dg[0]) dg_y.append(dg[1]) dg_z.append(dg[2]) # Close progress bar: pbar.close() print("Making COSI data set...") print("total events to procecss: " + str(len(erg))) # Clear unused memory: gc.collect() # Initialize arrays: print("Initializing arrays...") erg = np.array(erg) phi = np.array(phi) tt = np.array(tt) et = np.array(et) lonX = np.array(lonX) latX = np.array(latX) lonZ = np.array(lonZ) latZ = np.array(latZ) dg_x = np.array(dg_x) dg_y = np.array(dg_y) dg_z = np.array(dg_z) # Check if the input data has pointing information, # if not, get it from the spacecraft file: if (use_ori == False) & (len(lonZ)==0): print("WARNING: No pointing information in input data.") print("Getting pointing information from spacecraft file.") use_ori = True # Option to get X and Z pointing information from orientation file: if use_ori == True: self.instrument_pointing() lonX = self.xl_interp(tt) latX = self.xb_interp(tt) lonZ = self.zl_interp(tt) latZ = self.zb_interp(tt) # Convert dg vector from 3D cartesian coordinates # to spherical polar coordinates, and then extract distance # b/n first two interactions (in cm), psi (rad), and chi (rad). # Note: the resulting angles are latitude/longitude (or elevation/azimuthal). conv = astro_co.cartesian_to_spherical(dg_x, dg_y, dg_z) dist = conv[0].value psi_loc = conv[1].value chi_loc = conv[2].value # Calculate chi_gal and psi_gal from x,y,z coordinates of events: xcoords = SkyCoord(lonX*u.rad, latX*u.rad, frame = 'galactic') zcoords = SkyCoord(lonZ*u.rad, latZ*u.rad, frame = 'galactic') attitude = Attitude.from_axes(x=xcoords, z=zcoords, frame = 'galactic') c = SkyCoord(dg_x, dg_y, dg_z, \ representation_type='cartesian', frame = SpacecraftFrame(attitude = attitude)) c_rotated = c.transform_to('galactic') chi_gal = np.array(c_rotated.l.deg) psi_gal = np.array(c_rotated.b.deg) # Change longitudes from 0..360 deg to -180..180 deg lonX[lonX > np.pi] -= 2*np.pi lonZ[lonZ > np.pi] -= 2*np.pi # Construct Y direction from X and Z direction lonlatY = self.construct_scy(np.rad2deg(lonX),np.rad2deg(latX), np.rad2deg(lonZ),np.rad2deg(latZ)) lonY = np.deg2rad(lonlatY[0]) latY = np.deg2rad(lonlatY[1]) # Rotate psi_loc to colatitude, measured from positive z direction. # This is requred for mhealpy input. psi_loc = (np.pi/2.0) - psi_loc # Define test values for psi and chi local; # this is only for comparing to MEGAlib: self.psi_loc_test = psi_loc self.chi_loc_test = chi_loc # Do the same for psi and chi galactic. # First need to convert to radians: chi_gal_rad = np.array(c_rotated.l.rad) psi_gal_rad = np.array(c_rotated.b.rad) # Rotate psi_gal_rad to colatitude, measured from positive # z direction: psi_gal_rad = (np.pi/2.0) - psi_gal_rad self.psi_gal_test = psi_gal_rad # Rotate chi_gal_test by pi, defined with respect to # the negative x-axis: self.chi_gal_test = chi_gal_rad - np.pi # Make observation dictionary print("Making dictionary...") cosi_dataset = {'Energies':erg, 'TimeTags':tt, 'Xpointings (glon,glat)':np.array([lonX,latX]).T, 'Ypointings (glon,glat)':np.array([lonY,latY]).T, 'Zpointings (glon,glat)':np.array([lonZ,latZ]).T, 'Phi':phi, 'Chi local':chi_loc, 'Psi local':psi_loc, 'Distance':dist, 'Chi galactic':chi_gal, 'Psi galactic':psi_gal} self.cosi_dataset = cosi_dataset # Option to write unbinned data to file (either fits or hdf5): if output_name != None: print("Saving file...") self.write_unbinned_output(output_name) # Get processing time: end_time = time.time() processing_time = end_time - start_time print("total processing time [s]: " + str(processing_time)) return
[docs] def instrument_pointing(self): """Get pointing information from ori file. Initializes interpolated functions for lonx, latx, lonz, latz in radians. Returns ------- xl_interp : scipy:interpolate:interp1d xb_interp : scipy:interpolate:interp1d zl_interp : scipy:interpolate:interp1d zb_interp : scipy:interpolate:interp1d Note ---- This method sets the instance attributes, but it does not explicitly return them. """ # Get ori info: ori = SpacecraftFile.parse_from_file(self.ori_file) time_tags = ori._load_time x_pointings = ori.x_pointings z_pointings = ori.z_pointings # Interpolate: self.xl_interp = interpolate.interp1d(time_tags, x_pointings.l.rad, kind='linear') self.xb_interp = interpolate.interp1d(time_tags, x_pointings.b.rad, kind='linear') self.zl_interp = interpolate.interp1d(time_tags, z_pointings.l.rad, kind='linear') self.zb_interp = interpolate.interp1d(time_tags, z_pointings.b.rad, kind='linear') return
[docs] def construct_scy(self, scx_l, scx_b, scz_l, scz_b): """Construct y-coordinate of instrument pointing given x and z directions. Parameters ---------- scx_l : float Longitude of x direction in degrees. scx_b : float Latitude of x direction in degrees. scz_l : float Longitude of z direction in degrees. scz_b : float Latitude of z direction in degrees. Returns ------- ra : float Right ascension (in degrees) for y-coordinate of instrument pointing. dec : float Declination (in degrees) for y-coordinate of instrument pointing. Note ---- Here, z is the optical axis. """ x = self.polar2cart(scx_l, scx_b) z = self.polar2cart(scz_l, scz_b) return self.cart2polar(np.cross(z,x,axis=0))
[docs] def polar2cart(self, ra, dec): """Coordinate transformation of ra/dec (lon/lat) [phi/theta] polar/spherical coordinates into cartesian coordinates. Parameters ---------- ra : float Right ascension in degrees. dec: float Declination in degrees. Returns ------- array x, y, and z cartesian coordinates in radians. """ x = np.cos(np.deg2rad(ra)) * np.cos(np.deg2rad(dec)) y = np.sin(np.deg2rad(ra)) * np.cos(np.deg2rad(dec)) z = np.sin(np.deg2rad(dec)) return np.array([x,y,z])
[docs] def cart2polar(self, vector): """Coordinate transformation of cartesian x/y/z values into spherical (deg). Parameters ---------- vector : vec Vector of x/y/z values. Returns ------- ra : float Right ascension in degrees. dec : float Declination in degrees. """ ra = np.arctan2(vector[1],vector[0]) dec = np.arcsin(vector[2]) return np.rad2deg(ra), np.rad2deg(dec)
[docs] def write_unbinned_output(self, output_name): """Writes unbinned data file to either fits or hdf5. Parameters ---------- output_name : str Name of output file. Only include prefix (not file type). """ # Data units: units=['keV','s','rad','rad', 'rad','rad','rad','rad','cm','deg','deg'] # For fits output: if self.unbinned_output == 'fits': table = Table(list(self.cosi_dataset.values()),\ names=list(self.cosi_dataset.keys()), \ units=units, \ meta={'data file':os.path.basename(self.data_file), \ 'version':1.0}) table.write("%s.fits" %output_name, overwrite=True) os.system('gzip -f %s.fits' %output_name) # For hdf5 output: if self.unbinned_output == 'hdf5': with h5py.File('%s.hdf5' %output_name, 'w') as hf: for each in list(self.cosi_dataset.keys()): dset = hf.create_dataset(each, data=self.cosi_dataset[each], compression='gzip') return
[docs] def get_dict_from_fits(self, input_fits): """Constructs dictionary from input fits file. Parameters ---------- input_fits : str Name of input fits file. Returns ------- dict Dictionary constructed from input fits file. """ # Initialize dictionary: this_dict = {} # Fill dictionary from input fits file: hdu = fits.open(input_fits,memmap=True) cols = hdu[1].columns data = hdu[1].data for i in range(0,len(cols)): this_key = cols[i].name this_dict[this_key] = data[this_key] # Clear unused memory: gc.collect() return this_dict
[docs] def get_dict_from_hdf5(self, input_hdf5): """Constructs dictionary from input hdf5 file Parameters ---------- input_hdf5 : str Name of input hdf5 file. Returns ------- dict Dictionary constructed from input hdf5 file. """ # Initialize dictionary: this_dict = {} # Fill dictionary from input h5fy file: hf = h5py.File(input_hdf5,"r") keys = list(hf.keys()) for each in keys: this_dict[each] = hf[each][:] return this_dict
[docs] def select_data(self, output_name=None, unbinned_data=None): """Applies cuts to unbinnned data dictionary. Parameters ---------- unbinned_data : str, optional Name of unbinned dictionary file. output_name : str, optional Prefix of output file (default is None, in which case no file is saved). Note ---- Only cuts in time are allowed for now. """ print("Making data selections...") # Option to read in unbinned data file: if unbinned_data: if self.unbinned_output == 'fits': self.cosi_dataset = self.get_dict_from_fits(unbinned_data) if self.unbinned_output == 'hdf5': self.cosi_dataset = self.get_dict_from_hdf5(unbinned_data) # Get time cut index: time_array = self.cosi_dataset["TimeTags"] time_cut_index = (time_array >= self.tmin) & (time_array < self.tmax) # Apply cuts to dictionary: for key in self.cosi_dataset: self.cosi_dataset[key] = self.cosi_dataset[key][time_cut_index] # Write unbinned data to file (either fits or hdf5): if output_name != None: print("Saving file...") self.write_unbinned_output(output_name) return
[docs] def combine_unbinned_data(self, input_files, output_name=None): """Combines input unbinned data files. Parameters ---------- input_files : list List of file names to combine. output_name : str, optional Prefix of output file. """ self.cosi_dataset = {} counter = 0 for each in input_files: print() print("adding %s..." %each) print() # Read dict from hdf5 or fits: if self.unbinned_output == 'hdf5': this_dict = self.get_dict_from_hdf5(each) if self.unbinned_output == 'fits': this_dict = self.get_dict_from_fits(each) # Combine dictionaries: if counter == 0: for key in this_dict: self.cosi_dataset[key] = this_dict[key] if counter > 0: for key in this_dict: self.cosi_dataset[key] = np.concatenate((self.cosi_dataset[key],this_dict[key])) counter =+ 1 # Clear unused memory: gc.collect() # Write unbinned data to file (either fits or hdf5): if output_name != None: self.write_unbinned_output(output_name) return