Skip to content
160 changes: 112 additions & 48 deletions cspp_runner/post_cspp.py
Original file line number Diff line number Diff line change
@@ -1,85 +1,162 @@
"""Scanning the CSPP working directory and cleaning up after CSPP processing
and move the SDR granules to a destination directory"""
"""Various helper functions for re-organizing the CSPP results after processing.

Scanning the CSPP working directory and cleaning up after CSPP processing
and move the SDR granules to a destination directory.
"""

import os
import pathlib
import stat
from datetime import datetime
from datetime import datetime, timedelta
import shutil
from glob import glob
from trollsift import Parser
from cspp_runner.orbitno import TBUS_STYLE
import logging
LOG = logging.getLogger(__name__)

TLE_SATNAME = {'npp': 'SUOMI NPP',
'j01': 'NOAA-20',
'j02': 'NOAA-21',
'noaa20': 'NOAA-20',
'noaa21': 'NOAA-20'
'noaa21': 'NOAA-21'
}

PLATFORM_NAME = {'Suomi-NPP': 'npp',
'JPSS-1': 'noaa20',
'JPSS-2': 'noaa21',
'NOAA-20': 'noaa20',
'NOAA-21': 'noaa21'}


VIIRS_SDR_FILE_PATTERN = '{dataset}_{platform_shortname}_d{start_time:%Y%m%d_t%H%M%S}{msec_start}_e{end_time:%H%M%S}{msec_end}_b{orbit:5d}_c{creation_time:%Y%m%d%H%M%S%f}_{source}.h5' # noqa

EXPECTED_NUMBER_OF_SDR_FILES = 28


def cleanup_cspp_workdir(workdir):
"""Clean up the CSPP working dir after processing"""
"""Clean up the CSPP working dir after processing."""
try:
filelist = workdir.glob('*')
except AttributeError:
filelist = glob('%s/*' % workdir)

filelist = glob('%s/*' % workdir)
for s in filelist:
if os.path.isfile(s):
if s.name.find('SVM16') >= 0:
LOG.debug("Granule SDR %s will be cleaned", s.name)
os.remove(s)
filelist = glob('%s/*' % workdir)
LOG.info(
"Number of items left after cleaning working dir = " + str(len(filelist)))
shutil.rmtree(workdir)
# os.mkdir(workdir)

try:
filelist = workdir.glob('*')
except AttributeError:
filelist = glob('%s/*' % workdir)

nfiles = len([f for f in filelist])
LOG.info("Number of items left after cleaning working dir = %d", nfiles)
return


def get_ivcdb_files(sdr_dir):
"""Locate the ivcdb files need for the VIIRS Active Fires algorithm. These
files are not yet part of the standard output of CSPP versio 3.1 and
earlier. Use '-d' flag and locate the files in sub-directories
"""Locate the ivcdb files needed for the VIIRS Active Fires algorithm.

Please observe: These files are not part of the standard output of CSPP
version 3.1 and earlier. Use '-d' flag and locate the files in
sub-directories.
"""
# From the Active Fires Insuidetallation G:
# find . -type f -name 'IVCDB*.h5' -exec mv {} ${PWD} \;

import fnmatch
import os

matches = []
for root, dirnames, filenames in os.walk(sdr_dir):
for root, _, filenames in os.walk(sdr_dir):
for filename in fnmatch.filter(filenames, 'IVCDB*.h5'):
matches.append(os.path.join(root, filename))

return matches


def get_sdr_files(sdr_dir, **kwargs):
"""Get the sdr filenames (all M- and I-bands plus geolocation for the
direct readout swath"""
"""Get the sdr filenames (all M- and I-bands plus geolocation) for the direct readout swath."""
params = {}
params.update({'source': 'cspp_dev'})
params.update(kwargs)
# params.update({'start_time': kwargs.get('start_time')})
if 'start_time' in params and not params.get('start_time'):
LOG.debug("params['start_time'] = %s", str(params['start_time']))
params.pop('start_time')
params.update({'platform_short_name': PLATFORM_NAME.get(kwargs.get('platform_name'))})
try:
params.pop('platform_name')
except KeyError:
pass

time_tolerance = kwargs.get('time_tolerance', timedelta(seconds=0))
if 'start_time' not in params or time_tolerance.total_seconds() == 0:
sdr_files = get_sdr_filenames_from_pattern_and_parameters(sdr_dir, params)
if len(sdr_files) < EXPECTED_NUMBER_OF_SDR_FILES:
LOG.error("No or not enough SDR files found matching the RDR granule: Files found = %s",
str(sdr_files))
return sdr_files

sdr_files = get_sdr_filenames_from_pattern_and_parameters(sdr_dir, params)
nfiles_found = len(sdr_files)
if nfiles_found >= EXPECTED_NUMBER_OF_SDR_FILES:
return sdr_files

start_time = params['start_time']
LOG.warning("No or not enough SDR files found matching the RDR granule: Files found = %d", nfiles_found)
LOG.info("Will look for SDR files with a start time close in time to the start time of the RDR granule: %s",
str(start_time))
expected_start_time = start_time - time_tolerance
sdr_files = []
while nfiles_found < EXPECTED_NUMBER_OF_SDR_FILES and expected_start_time < start_time + time_tolerance:
params.update({'start_time': expected_start_time})
sdr_files = sdr_files + get_sdr_filenames_from_pattern_and_parameters(sdr_dir, params)
nfiles_found = len(sdr_files)
expected_start_time = expected_start_time + timedelta(seconds=1)

# FIXME: Check for sufficient files and possibly raise an exception if not successful.
if nfiles_found == EXPECTED_NUMBER_OF_SDR_FILES:
LOG.debug("Expected number of SDR files found matching the RDR file.")
else:
LOG.error("Not enough SDR files found for the RDR scene: Files found = %d - Expected = %d",
nfiles_found, EXPECTED_NUMBER_OF_SDR_FILES)

return sdr_files


# VIIRS M-bands + geolocation:
mband_files = (glob(os.path.join(sdr_dir, 'SVM??_???_*.h5')) +
glob(os.path.join(sdr_dir, 'GM??O_???_*.h5')))
# VIIRS I-bands + geolocation:
iband_files = (glob(os.path.join(sdr_dir, 'SVI??_???_*.h5')) +
glob(os.path.join(sdr_dir, 'GI??O_???_*.h5')))
# VIIRS DNB band + geolocation:
dnb_files = (glob(os.path.join(sdr_dir, 'SVDNB_???_*.h5')) +
glob(os.path.join(sdr_dir, 'GDNBO_???_*.h5')))
def get_sdr_filenames_from_pattern_and_parameters(sdr_dir, params):
"""From a list of file pattern inputs get the list of file names by globbing in a directory."""
p__ = Parser(VIIRS_SDR_FILE_PATTERN)

ivcdb_files = get_ivcdb_files(sdr_dir)
params.update({'dataset': 'SVM??'})
mband_files = [f for f in sdr_dir.glob(p__.globify(params))]
params.update({'dataset': 'GM??O'})
mband_files = mband_files + [f for f in sdr_dir.glob(p__.globify(params))]

params.update({'dataset': 'SVI??'})
iband_files = [f for f in sdr_dir.glob(p__.globify(params))]
params.update({'dataset': 'GI??O'})
iband_files = iband_files + [f for f in sdr_dir.glob(p__.globify(params))]

params.update({'dataset': 'SVDNB'})
dnb_files = [f for f in sdr_dir.glob(p__.globify(params))]
params.update({'dataset': 'GDNBO'})
dnb_files = dnb_files + [f for f in sdr_dir.glob(p__.globify(params))]

params.update({'dataset': 'IVCDB'})
ivcdb_files = [f for f in sdr_dir.glob(p__.globify(params))]

return sorted(mband_files) + sorted(iband_files) + sorted(dnb_files) + sorted(ivcdb_files)


def create_subdirname(obstime, with_seconds=False, **kwargs):
"""Generate the pps subdirectory name from the start observation time, ex.:
'npp_20120405_0037_02270'"""
"""Generate the pps subdirectory name from the start observation time.

For example:
'npp_20120405_0037_02270'
"""
sat = kwargs.get('platform_name', 'npp')
platform_name = PLATFORM_NAME.get(sat, sat)

Expand All @@ -105,26 +182,14 @@ def create_subdirname(obstime, with_seconds=False, **kwargs):
return platform_name + obstime.strftime('_%Y%m%d_%H%M_') + '%.5d' % orbnum


def make_okay_files(base_dir, subdir_name):
"""Make okay file to signal that all SDR files have been placed in
destination directory"""
import subprocess
okfile = os.path.join(base_dir, subdir_name + ".okay")
subprocess.call(['touch', okfile])
return


def pack_sdr_files(sdrfiles, base_dir, subdir):
"""Copy the SDR files to the sub-directory under the *subdir* directory
structure"""

path = pathlib.Path(base_dir) / subdir
path.mkdir(exist_ok=True, parents=True)
def pack_sdr_files(sdrfiles, dest_path):
"""Copy the SDR files to the destination under the *subdir* directory structure."""
dest_path.mkdir(exist_ok=True, parents=True)

LOG.info("Number of SDR files: " + str(len(sdrfiles)))
retvl = []
for sdrfile in sdrfiles:
newfilename = path / os.path.basename(sdrfile)
newfilename = dest_path / os.path.basename(sdrfile)
LOG.info(f"Copy sdrfile to destination: {newfilename!s}")
if os.path.exists(sdrfile):
LOG.info("File to copy: {file} <> ST_MTIME={time}".format(
Expand Down Expand Up @@ -159,4 +224,3 @@ def pack_sdr_files(sdrfiles, base_dir, subdir):

subd = create_subdirname(start_time)
pack_sdr_files(FILES, rootdir, subd)
make_okay_files(rootdir, subd)
Loading