#!/usr/bin/env python """ Don Morton Boreal Scientific Computing LLC Fairbanks, Alaska, USA Don.Morton@borealscicomp.com """ import argparse import fnmatch import logging import os import shutil import subprocess import sys import tempfile import uuid import numpy as np # Set up module logging LOGGING_LEVEL = logging.WARNING LOGGING_FORMAT = '%(levelname)s %(filename)s:%(funcName)s:%(lineno)d: %(message)s' MY_LOGGER = logging.getLogger() MY_LOGGER.setLevel(LOGGING_LEVEL) console_handler = logging.StreamHandler() console_handler.setLevel(LOGGING_LEVEL) logformatter = logging.Formatter(LOGGING_FORMAT) console_handler.setFormatter(logformatter) MY_LOGGER.addHandler(console_handler) #MY_LOGGER.propagate = False #--------------------- Define path prefixes to grib files ------------ ######### Paths to GRIB Files #### TO DO - put a note in here about where to get these files - put them #### on my web server. Right now they're versioned, but that's almost ### 1 GByte. ECMWF_PREFIX = 'gribfiles/ecmwf0p5' NCEP_PREFIX = 'gribfiles/ncep0p5' NCEPFV3_PREFIX = 'gribfiles/ncepfv30p5' ### DJM - 2019-07-30 - using these paths will take a bit of work, ### especially with the new ncep directory structure (with the 0.5/ and ### 0.5.fv3/ subdirectories). So, code will need to be adjusted in order ### for this to work. #XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX #If you're testing at CTBTO, these would be better paths to the same files #CTBTO_PREFIX = '/ops/data/atm' #ECMWF_PREFIX = CTBTO_PREFIX + '/ecmwf/2019/01/11/0.5' #NCEP_PREFIX = CTBTO_PREFIX + '/ncep/2019/01/08/0.5' #----------------------------------------------------------------------- # If you have these files available in the specified prefix, you should be # good to go ECMWF0P5_GRIB2_FCST_TESTFILE = ECMWF_PREFIX + '/EN19011109' # forecast file ECMWF0P5_GRIB2_ANL_TESTFILE = ECMWF_PREFIX + '/EN19011112' # analysis file ECMWF0P5_EF_FCST_TESTFILE = ECMWF_PREFIX + '/EF19073103' # forecast file NCEP0P5_GRIB2_TESTFILE1 = NCEP_PREFIX + '/GD19010812' NCEP0P5_GRIB2_TESTFILE2 = NCEP_PREFIX + '/GD19010815' NCEP0P5_GRIB2_TESTFILE3 = NCEP_PREFIX + '/GD19010818' NCEP0P5_GRIB2_TESTFILE4 = NCEP_PREFIX + '/GD19010821' NCEPFV30P5_GRIB2_TESTFILE1 = NCEPFV3_PREFIX + '/GD19060800' NCEPFV30P5_GRIB2_TESTFILE2 = NCEPFV3_PREFIX + '/GD19060803' NCEPFV30P5_GRIB2_TESTFILE2 = NCEPFV3_PREFIX + '/GD19060806' def main(): # Currently set up to be in the parent directory. CHECKGRIB_DIR = os.path.abspath(os.pardir) CHECKGRIB = CHECKGRIB_DIR + '/checkGRIB' # Compile flexpart2ctbt print("Compiling checkGRIB in dir: " + str(CHECKGRIB_DIR)) compile_passed = test_compile_checkgrib(checkgrib_dir=CHECKGRIB_DIR) print("compile_passed: %s" % compile_passed) if not compile_passed: print("Whooaa, Nellie, no sense continuing if you can't even compile!") sys.exit() passed_count = 0 failed_count = 0 #---------------------- # General tests #---------------------- print("--------test_missing_source_cli_arg_detected--------") passed = test_missing_source_cli_arg_detected(checkgrib=CHECKGRIB, gribfile=ECMWF0P5_GRIB2_FCST_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_invalid_source_cli_arg_detected--------") passed = test_invalid_source_cli_arg_detected(checkgrib=CHECKGRIB, gribfile=ECMWF0P5_GRIB2_FCST_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_missing_levels_cli_arg_detected--------") passed = test_missing_levels_cli_arg_detected(checkgrib=CHECKGRIB, gribfile=ECMWF0P5_GRIB2_FCST_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_checkanl_and_checkfcst_detected--------") passed = test_checkanl_and_checkfcst_detected(checkgrib=CHECKGRIB, gribfile=ECMWF0P5_GRIB2_FCST_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_reports_no_cli_args--------") passed = test_reports_no_cli_args(checkgrib=CHECKGRIB) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_reports_no_path_args--------") passed = test_reports_no_path_args(checkgrib=CHECKGRIB) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") #---------------------- # ECMWF-specific tests #---------------------- print("--------test_successful_ecmwfgrib2_check--------") passed = test_successful_ecmwfgrib2_check(checkgrib=CHECKGRIB, gribfile=ECMWF0P5_GRIB2_FCST_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_successful_ecmwfgrib_ef_check--------") passed = test_successful_ecmwfgrib_ef_check(checkgrib=CHECKGRIB, ef_gribfile=ECMWF0P5_EF_FCST_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_reports_ecmwfgrib_ef_fails_with_checkgrib2--------") passed = test_reports_ecmwfgrib_ef_fails_with_checkgrib2(checkgrib=CHECKGRIB, ef_gribfile=ECMWF0P5_EF_FCST_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_reports_unexpected_ecmwf_grib_center--------") passed = test_reports_unexpected_ecmwf_grib_centre(checkgrib=CHECKGRIB, ncep_gribfile=NCEP0P5_GRIB2_TESTFILE1) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_reports_unexpected_ecmwf_level--------") passed = test_reports_unexpected_ecmwf_level(checkgrib=CHECKGRIB, ecmwf_gribfile=ECMWF0P5_GRIB2_FCST_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_reports_missing_ecmwf_levels--------") passed = test_reports_missing_ecmwf_levels(checkgrib=CHECKGRIB, ecmwf_gribfile=ECMWF0P5_GRIB2_FCST_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_reports_failed_ecmwf_anl_check--------") passed = test_reports_failed_ecmwf_anl_check(checkgrib=CHECKGRIB, ecmwf_fcst_gribfile=ECMWF0P5_GRIB2_FCST_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_reports_failed_ecmwf_fcst_check--------") passed = test_reports_failed_ecmwf_fcst_check(checkgrib=CHECKGRIB, ecmwf_anl_gribfile=ECMWF0P5_GRIB2_ANL_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_ecmwf_detects_missing_level_14--------") passed = test_ecmwf_detects_missing_level_14(checkgrib=CHECKGRIB, ecmwf_gribfile=ECMWF0P5_GRIB2_ANL_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_ecmwf_detects_missing_level_14_etadot--------") passed = test_ecmwf_detects_missing_level_14_etadot(checkgrib=CHECKGRIB, ecmwf_gribfile=ECMWF0P5_GRIB2_ANL_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_ecmwf_detects_missing_var_lsp--------") passed = test_ecmwf_detects_missing_var_lsp(checkgrib=CHECKGRIB, ecmwf_gribfile=ECMWF0P5_GRIB2_ANL_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_ecmwf_detects_grib1_nsss--------") passed = test_ecmwf_detects_grib1_nsss(checkgrib=CHECKGRIB, ecmwf_gribfile=ECMWF0P5_GRIB2_ANL_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") # The following two tests will never pass with current CTBTO ECMWF # files because the files have mixed anl and fcst messages """ print("--------test_ecmwf_successful_fcst_check--------") passed = test_ecmwf_successful_fcst_check(checkgrib=CHECKGRIB, ecmwf_fcst_gribfile=ECMWF0P5_GRIB2_FCST_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_ecmwf_successful_anl_check--------") passed = test_ecmwf_successful_anl_check(checkgrib=CHECKGRIB, ecmwf_anl_gribfile=ECMWF0P5_GRIB2_ANL_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") """ #---------------------- # NCEP-specific tests #---------------------- print("--------test_reports_unexpected_ncep_grib_center--------") passed = test_reports_unexpected_ncep_grib_centre(checkgrib=CHECKGRIB, ecmwf_gribfile=ECMWF0P5_GRIB2_FCST_TESTFILE) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_successful_ncepgrib2_check--------") passed = test_successful_ncepgrib2_check(checkgrib=CHECKGRIB, gribfile=NCEP0P5_GRIB2_TESTFILE1) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_detects_ncep_more_pressure_levels_than_expected------") passed = test_detects_ncep_more_pressure_levels_than_expected( checkgrib=CHECKGRIB, gribfile=NCEP0P5_GRIB2_TESTFILE1) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_detects_ncep_fewer_pressure_levels_than_expected------") passed = test_detects_ncep_fewer_pressure_levels_than_expected( checkgrib=CHECKGRIB, gribfile=NCEP0P5_GRIB2_TESTFILE1) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_ncep_detects_missing_level_850------") passed = test_ncep_detects_missing_level_850( checkgrib=CHECKGRIB, ncep_gribfile=NCEP0P5_GRIB2_TESTFILE1) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_ncep_detects_missing_level_100_r------") passed = test_ncep_detects_missing_level_100_r( checkgrib=CHECKGRIB, ncep_gribfile=NCEP0P5_GRIB2_TESTFILE1) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_ncep_detects_missing_var_10u------") passed = test_ncep_detects_missing_var_10u( checkgrib=CHECKGRIB, ncep_gribfile=NCEP0P5_GRIB2_TESTFILE1) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_ncep_detects_missing_var_tsig1------") passed = test_ncep_detects_missing_var_tsig1( checkgrib=CHECKGRIB, ncep_gribfile=NCEP0P5_GRIB2_TESTFILE1) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_ncep_detects_grib1_2t------") passed = test_ncep_detects_grib1_2t( checkgrib=CHECKGRIB, ncep_gribfile=NCEP0P5_GRIB2_TESTFILE1) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") #---------------------- # NCEPFV3-specific tests #---------------------- print("--------test_successful_ncepfv3grib2_check--------") passed = test_successful_ncepfv3grib2_check(checkgrib=CHECKGRIB, gribfile=NCEPFV30P5_GRIB2_TESTFILE1) if passed: passed_count += 1 else: failed_count +=1 print('FV3 TEST FAILED - ABORT') sys.exit() print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_detects_ncepfv3_more_pressure_levels_than_expected------") passed = test_detects_ncepfv3_more_pressure_levels_than_expected( checkgrib=CHECKGRIB, gribfile=NCEPFV30P5_GRIB2_TESTFILE1) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_detects_ncepfv3_fewer_pressure_levels_than_expected------") passed = test_detects_ncepfv3_fewer_pressure_levels_than_expected( checkgrib=CHECKGRIB, gribfile=NCEPFV30P5_GRIB2_TESTFILE1) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_ncepfv3_detects_missing_level_850------") passed = test_ncepfv3_detects_missing_level_850( checkgrib=CHECKGRIB, ncepfv3_gribfile=NCEPFV30P5_GRIB2_TESTFILE1) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_ncepfv3_detects_missing_level_100_r------") passed = test_ncepfv3_detects_missing_level_100_r( checkgrib=CHECKGRIB, ncepfv3_gribfile=NCEPFV30P5_GRIB2_TESTFILE1) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_ncepfv3_detects_missing_var_10u------") passed = test_ncepfv3_detects_missing_var_10u( checkgrib=CHECKGRIB, ncepfv3_gribfile=NCEPFV30P5_GRIB2_TESTFILE1) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_ncepfv3_detects_missing_var_tsig1------") passed = test_ncepfv3_detects_missing_var_tsig1( checkgrib=CHECKGRIB, ncepfv3_gribfile=NCEPFV30P5_GRIB2_TESTFILE1) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") print("--------test_ncepfv3_detects_grib1_2t------") passed = test_ncepfv3_detects_grib1_2t( checkgrib=CHECKGRIB, ncepfv3_gribfile=NCEPFV30P5_GRIB2_TESTFILE1) if passed: passed_count += 1 else: failed_count +=1 print("passed: %s" % passed) print("-----------------------------------------------") # Summary of tests passed/failed print("\n************************") print("Passed tests: %d" % passed_count) print("Failed tests: %d" % failed_count) print("************************\n") ############# General test functions ##################### def test_compile_checkgrib(checkgrib_dir=None): """ Removes any existing copy of the binary, compiles, and tests for presence, returning a True if successful, otherwise False Expects executable to be named "gribcheck" Assumes that Makefile in the compile directory is correct for the environment """ the_command = 'cd ' + checkgrib_dir + ' ; ' the_command += 'make clean > /dev/null 2>&1; ' the_command += 'make > /dev/null 2>&1; ' subprocess.call(the_command, shell=True) execname = checkgrib_dir + '/checkGRIB' if os.path.isfile(execname) and os.access(execname, os.X_OK): success = True else: success = False return success def test_missing_source_cli_arg_detected(checkgrib=None, gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to NCEP gribfile to check Checks that utility exits with code 2 if the --source arg is missing """ the_command = [checkgrib, "--levels", "3", gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 2: passed = True else: passed = False return passed def test_invalid_source_cli_arg_detected(checkgrib=None, gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to NCEP gribfile to check Checks that utility exits with code 2 if the --source arg is invalid """ the_command = [checkgrib, "--levels", "3", "--source", "THIS_IS_INVALID!", gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 2: passed = True else: passed = False return passed def test_missing_levels_cli_arg_detected(checkgrib=None, gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to NCEP gribfile to check Checks that utility exits with code 2 if the --levels arg is missing """ the_command = [checkgrib, "--source", "ECMWF", gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 2: passed = True else: passed = False return passed def test_checkanl_and_checkfcst_detected(checkgrib=None, gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to NCEP gribfile to check Checks that utility exits with code 2 if both the --checkfcst and --checkanl flags are specified """ the_command = [checkgrib, "--source", "ECMWF", "--levels", "137", "--checkanl", "--checkfcst", gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 2: passed = True else: passed = False return passed def test_reports_no_cli_args(checkgrib=None): """ checkgrib: full path to checkgrib executable Checks that utility exits with code 2 if no CLI args were specified """ the_command = [checkgrib] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 2: passed = True else: passed = False return passed def test_reports_no_path_args(checkgrib=None): """ checkgrib: full path to checkgrib executable Checks that utility exits with code 2 if no gribfile args were specified """ the_command = [checkgrib, "--source", "ECMWF", "--levels", "137"] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 2: passed = True else: passed = False return passed ############# ECMWF test functions ##################### def test_reports_unexpected_ecmwf_grib_centre(checkgrib=None, ncep_gribfile=None): """ Calls checkgrib for ECMWF source but file is actually NCEP Expects an error message and return code of 1 """ the_command = [checkgrib, "--source", "ECMWF", "--levels", "137", ncep_gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False return passed def test_reports_missing_ecmwf_levels(checkgrib=None, ecmwf_gribfile=None): """ Calls checkgrib for ECMWF specifying number of levels greater than the expected 137 Expects an error message and return code of 1 """ the_command = [checkgrib, "--source", "ECMWF", "--levels", "147", ecmwf_gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False return passed def test_successful_ecmwfgrib2_check(checkgrib=None, gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to gribfile to check Runs through a no-brainer successful test, expecting a 0 return code """ the_command = [checkgrib, "--source", "ECMWF", "--levels", "137", gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 0: passed = True else: passed = False return passed def test_successful_ecmwfgrib_ef_check(checkgrib=None, ef_gribfile=None): """ checkgrib: full path to checkgrib executable ef_gribfile: full path to EF gribfile to check Runs through a no-brainer successful test, expecting a 0 return code The EF files contain both GRIB1 and GRIB2. This tests that it will still pass, as long as --checkgrib2 is not used """ the_command = [checkgrib, "--source", "ECMWF", "--levels", "137", ef_gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 0: passed = True else: passed = False return passed def test_reports_ecmwfgrib_ef_fails_with_checkgrib2(checkgrib=None, ef_gribfile=None): """ checkgrib: full path to checkgrib executable ef_gribfile: full path to EF gribfile to check Tests that EF check fails if checkgrib2 specified, expecting a 1 return code The EF files contain both GRIB1 and GRIB2. This tests that it will still pass, as long as --checkgrib2 is not used """ the_command = [checkgrib, "--source", "ECMWF", "--checkgrib2", "--levels", "137", ef_gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False return passed def test_reports_unexpected_ecmwf_level(checkgrib=None, ecmwf_gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to NCEP gribfile to check Checks an ECMWF file with "--levels 3", so exits when it encounters a level higher than that. Expected return_code is 1 """ the_command = [checkgrib, "--source", "ECMWF", "--levels", "3", ecmwf_gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False return passed def test_ecmwf_detects_missing_level_14(checkgrib=None, ecmwf_gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to NCEP gribfile to check Checks an ECMWF file that has had vertical level 14 removed Expected return_code is 1 """ # Create a temporary gribfile tempgribpath = '/tmp/' + str(uuid.uuid4()) + '.gr2' the_command = 'grib_copy -w level!=14 ' the_command += str(ecmwf_gribfile) + ' ' + str(tempgribpath) MY_LOGGER.info('Creating tempgribpath: ' + str(tempgribpath)) subprocess.call(the_command, shell=True) # Use it the_command = [checkgrib, "--source", "ECMWF", "--levels", "137", tempgribpath] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False # Remove the temporary gribfile MY_LOGGER.info('Removing tempgribpath: ' + str(tempgribpath)) os.remove(tempgribpath) return passed def test_ecmwf_detects_missing_var_lsp(checkgrib=None, ecmwf_gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to NCEP gribfile to check Checks an ECMWF file that has had the lsp variable removed Expected return_code is 1 """ # Create a temporary gribfile tempgribpath = '/tmp/' + str(uuid.uuid4()) + '.gr2' the_command = 'grib_copy -w shortName!=lsp ' the_command += str(ecmwf_gribfile) + ' ' + str(tempgribpath) MY_LOGGER.info('Creating tempgribpath: ' + str(tempgribpath)) subprocess.call(the_command, shell=True) # Use it the_command = [checkgrib, "--source", "ECMWF", "--levels", "137", tempgribpath] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False # Remove the temporary gribfile MY_LOGGER.info('Removing tempgribpath: ' + str(tempgribpath)) os.remove(tempgribpath) return passed def test_ecmwf_detects_missing_level_14_etadot(checkgrib=None, ecmwf_gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to NCEP gribfile to check Checks an ECMWF file that has had vertical level 14 removed just for etadot Expected return_code is 1 """ # Create a temporary gribfile tempgribpath = '/tmp/' + str(uuid.uuid4()) + '.gr2' the_command = "count=`grib_ls -p count -w level=14,shortName=etadot " the_command += ecmwf_gribfile + " | sed -n 3p` ;" the_command += " grib_copy -w count!=$count " the_command += str(ecmwf_gribfile) + ' ' + str(tempgribpath) MY_LOGGER.info('Creating tempgribpath: ' + str(tempgribpath)) MY_LOGGER.info('the_command: ' + str(the_command)) subprocess.call(the_command, shell=True) # Use it the_command = [checkgrib, "--source", "ECMWF", "--levels", "137", tempgribpath] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False # Remove the temporary gribfile MY_LOGGER.info('Removing tempgribpath: ' + str(tempgribpath)) os.remove(tempgribpath) return passed def test_ecmwf_detects_grib1_nsss(checkgrib=None, ecmwf_gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to NCEP gribfile to check Checks an ECMWF file that has message variable nsss changed from GRIB2 to GRIB1 Expected return_code is 1 """ # Create a temporary gribfile tempgribpath = '/tmp/' + str(uuid.uuid4()) + '.gr2' the_command = "grib_set -s edition=1 -w shortName=nsss " the_command += str(ecmwf_gribfile) + ' ' + str(tempgribpath) MY_LOGGER.info('Creating tempgribpath: ' + str(tempgribpath)) MY_LOGGER.info('the_command: ' + str(the_command)) subprocess.call(the_command, shell=True) # Use it the_command = [checkgrib, "--source", "ECMWF", "--checkgrib2", "--levels", "137", tempgribpath] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False # Remove the temporary gribfile MY_LOGGER.info('Removing tempgribpath: ' + str(tempgribpath)) os.remove(tempgribpath) return passed def test_reports_failed_ecmwf_anl_check(checkgrib=None, ecmwf_fcst_gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to ECMWF fcst gribfile to check Checks a fcst ECMWF file with "--checkanl", so exits when it encounters fcst. Expected return_code is 1 """ the_command = [checkgrib, "--source", "ECMWF", "--levels", "137", "--checkanl", ecmwf_fcst_gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False return passed def test_reports_failed_ecmwf_fcst_check(checkgrib=None, ecmwf_anl_gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to ECMWF anl gribfile to check Checks a anl ECMWF file with "--checkfcst", so exits when it encounters anl. Expected return_code is 1 """ the_command = [checkgrib, "--source", "ECMWF", "--levels", "137", "--checkfcst", ecmwf_anl_gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False return passed def test_ecmwf_successful_fcst_check(checkgrib=None, ecmwf_fcst_gribfile=None): """ checkgrib: full path to checkgrib executable ecmwf_fcst_gribfile: full path to ECMWF fcst gribfile to check Checks a fcst ECMWF file with "--checkfcst", so exits normally. Expected return_code is 0 """ ERROR_MSG = """ **************************************************************** WARNING: With current CTBTO ECMWF files, this test will always fail, because the files have mixed anl and fcst messages. YOU SHOULD NOT USE THIS TEST UNTIL THE SITUATION IS RESOLVED BuhBye """ print(ERROR_MSG) sys.exit() the_command = [checkgrib, "--source", "ECMWF", "--levels", "137", "--checkfcst", ecmwf_fcst_gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 0: passed = True else: passed = False return passed def test_ecmwf_successful_anl_check(checkgrib=None, ecmwf_anl_gribfile=None): """ checkgrib: full path to checkgrib executable ecmwf_anl_gribfile: full path to ECMWF anl gribfile to check Checks a anl ECMWF file with "--checkanl", so exits normally. Expected return_code is 0 """ ERROR_MSG = """ **************************************************************** WARNING: With current CTBTO ECMWF files, this test will always fail, because the files have mixed anl and fcst messages. YOU SHOULD NOT USE THIS TEST UNTIL THE SITUATION IS RESOLVED BuhBye """ print(ERROR_MSG) sys.exit() the_command = [checkgrib, "--source", "ECMWF", "--levels", "137", "--checkanl", ecmwf_anl_gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 0: passed = True else: passed = False return passed ############# NCEP test functions ##################### def test_reports_unexpected_ncep_grib_centre(checkgrib=None, ecmwf_gribfile=None): """ Calls checkgrib for NCEP source but file is actually ECMWF Expects an error message and return code of 1 """ the_command = [checkgrib, "--source", "NCEP", "--levels", "30", ecmwf_gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False return passed def test_successful_ncepgrib2_check(checkgrib=None, gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to gribfile to check Runs through a no-brainer successful test, expecting a 0 return code """ the_command = [checkgrib, "--source", "NCEP", "--checkgrib2", "--levels", "31", gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 0: passed = True else: passed = False return passed def test_reports_ncep_bad_level_structure(checkgrib=None, ncep_gribfile=None): """ Tests that if the pressure level structure of the NCEP file does not agree with expected number of levels, an error will be reported Expects a return code of 1 """ print("Not yet implemented...") def test_detects_ncep_more_pressure_levels_than_expected( checkgrib=None, gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to gribfile to check Tests that a return code of 1 will result if checkgrib finds more pressure levels than specified in command line """ the_command = [checkgrib, "--source", "NCEP", "--levels", "30", gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False return passed def test_detects_ncep_fewer_pressure_levels_than_expected( checkgrib=None, gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to gribfile to check Tests that a return code of 1 will result if checkgrib finds fewer pressure levels than specified in command line """ the_command = [checkgrib, "--source", "NCEP", "--levels", "32", gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False return passed def test_ncep_detects_missing_level_850(checkgrib=None, ncep_gribfile=None): """ checkgrib: full path to checkgrib executable ncep_gribfile: full path to NCEP gribfile to check Checks an NCEP file that has had vertical level 850 removed Expected return_code is 1 Note, as currently implemented, checkgrib will not see any 850 mb levels, and so it won't add them into expected inventory. It will fail because it won't see the 31 levels it expects. """ # Create a temporary gribfile tempgribpath = '/tmp/' + str(uuid.uuid4()) + '.gr2' the_command = 'grib_copy -w level!=850 ' the_command += str(ncep_gribfile) + ' ' + str(tempgribpath) MY_LOGGER.info('Creating tempgribpath: ' + str(tempgribpath)) subprocess.call(the_command, shell=True) # Use it the_command = [checkgrib, "--source", "NCEP", "--levels", "31", tempgribpath] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False # Remove the temporary gribfile MY_LOGGER.info('Removing tempgribpath: ' + str(tempgribpath)) os.remove(tempgribpath) return passed def test_ncep_detects_missing_level_100_r(checkgrib=None, ncep_gribfile=None): """ checkgrib: full path to checkgrib executable ncep_gribfile: full path to NCEP gribfile to check Checks an NCEP file that has had vertical level 100 removed for variable r Expected return_code is 1 """ # Create a temporary gribfile tempgribpath = '/tmp/' + str(uuid.uuid4()) + '.gr2' the_command = "count=`grib_ls -p count -w level=100,shortName=r " the_command += ncep_gribfile + " | sed -n 3p` ;" the_command += " grib_copy -w count!=$count " the_command += str(ncep_gribfile) + ' ' + str(tempgribpath) MY_LOGGER.info('Creating tempgribpath: ' + str(tempgribpath)) subprocess.call(the_command, shell=True) # Use it the_command = [checkgrib, "--source", "NCEP", "--levels", "31", tempgribpath] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False # Remove the temporary gribfile MY_LOGGER.info('Removing tempgribpath: ' + str(tempgribpath)) os.remove(tempgribpath) return passed def test_ncep_detects_missing_var_10u(checkgrib=None, ncep_gribfile=None): """ checkgrib: full path to checkgrib executable ncep_gribfile: full path to NCEP gribfile to check Checks an NCEP file that has had 10m U removed Expected return_code is 1 """ # Create a temporary gribfile tempgribpath = '/tmp/' + str(uuid.uuid4()) + '.gr2' the_command = 'grib_copy -w shortName!=10u ' the_command += str(ncep_gribfile) + ' ' + str(tempgribpath) MY_LOGGER.info('Creating tempgribpath: ' + str(tempgribpath)) subprocess.call(the_command, shell=True) # Use it the_command = [checkgrib, "--source", "NCEP", "--levels", "31", tempgribpath] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False # Remove the temporary gribfile MY_LOGGER.info('Removing tempgribpath: ' + str(tempgribpath)) os.remove(tempgribpath) return passed def test_ncep_detects_missing_var_tsig1(checkgrib=None, ncep_gribfile=None): """ checkgrib: full path to checkgrib executable ncep_gribfile: full path to NCEP gribfile to check Checks an NCEP file that has had sigma level t removed Expected return_code is 1 """ # Create a temporary gribfile tempgribpath = '/tmp/' + str(uuid.uuid4()) + '.gr2' the_command = "count=`grib_ls -p count -w typeOfLevel=sigma,shortName=t " the_command += ncep_gribfile + " | sed -n 3p` ;" the_command += " grib_copy -w count!=$count " the_command += str(ncep_gribfile) + ' ' + str(tempgribpath) MY_LOGGER.info('Creating tempgribpath: ' + str(tempgribpath)) subprocess.call(the_command, shell=True) # Use it the_command = [checkgrib, "--source", "NCEP", "--levels", "31", tempgribpath] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False # Remove the temporary gribfile MY_LOGGER.info('Removing tempgribpath: ' + str(tempgribpath)) os.remove(tempgribpath) return passed def test_ncep_detects_grib1_2t(checkgrib=None, ncep_gribfile=None): """ checkgrib: full path to checkgrib executable ncep_gribfile: full path to NCEP gribfile to check Checks an NCEP file that has had the 2t (2m T) message changed from GRIB2 to GRIB1 Expected return_code is 1 """ # Create a temporary gribfile tempgribpath = '/tmp/' + str(uuid.uuid4()) + '.gr2' the_command = "grib_set -s edition=1,packingType=grid_simple " the_command += " -w shortName=2t " the_command += str(ncep_gribfile) + ' ' + str(tempgribpath) MY_LOGGER.info('Creating tempgribpath: ' + str(tempgribpath)) subprocess.call(the_command, shell=True) # Use it the_command = [checkgrib, "--source", "NCEP", "--checkgrib2", "--levels", "31", tempgribpath] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False # Remove the temporary gribfile MY_LOGGER.info('Removing tempgribpath: ' + str(tempgribpath)) os.remove(tempgribpath) return passed ############# NCEPFV3 test functions ##################### def test_successful_ncepfv3grib2_check(checkgrib=None, gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to gribfile to check Runs through a no-brainer successful test, expecting a 0 return code """ the_command = [checkgrib, "--source", "NCEPFV3", "--checkgrib2", "--levels", "31", gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 0: passed = True else: passed = False return passed def test_detects_ncepfv3_more_pressure_levels_than_expected( checkgrib=None, gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to gribfile to check Tests that a return code of 1 will result if checkgrib finds more pressure levels than specified in command line """ the_command = [checkgrib, "--source", "NCEPFV3", "--levels", "30", gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False return passed def test_detects_ncepfv3_fewer_pressure_levels_than_expected( checkgrib=None, gribfile=None): """ checkgrib: full path to checkgrib executable gribfile: full path to gribfile to check Tests that a return code of 1 will result if checkgrib finds fewer pressure levels than specified in command line """ the_command = [checkgrib, "--source", "NCEPFV3", "--levels", "32", gribfile] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False return passed def test_ncepfv3_detects_missing_level_850(checkgrib=None, ncepfv3_gribfile=None): """ checkgrib: full path to checkgrib executable ncepfv3_gribfile: full path to NCEP FV3 gribfile to check Checks an NCEP FV3 file that has had vertical level 850 removed Expected return_code is 1 Note, as currently implemented, checkgrib will not see any 850 mb levels, and so it won't add them into expected inventory. It will fail because it won't see the 31 levels it expects. """ # Create a temporary gribfile tempgribpath = '/tmp/' + str(uuid.uuid4()) + '.gr2' the_command = 'grib_copy -w level!=850 ' the_command += str(ncepfv3_gribfile) + ' ' + str(tempgribpath) MY_LOGGER.info('Creating tempgribpath: ' + str(tempgribpath)) subprocess.call(the_command, shell=True) # Use it the_command = [checkgrib, "--source", "NCEPFV3", "--levels", "31", tempgribpath] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False # Remove the temporary gribfile MY_LOGGER.info('Removing tempgribpath: ' + str(tempgribpath)) os.remove(tempgribpath) return passed def test_ncepfv3_detects_missing_level_100_r(checkgrib=None, ncepfv3_gribfile=None): """ checkgrib: full path to checkgrib executable ncepfv3_gribfile: full path to NCEP FV3 gribfile to check Checks an NCEP FV3 file that has had vertical level 100 removed for variable r Expected return_code is 1 """ # Create a temporary gribfile tempgribpath = '/tmp/' + str(uuid.uuid4()) + '.gr2' the_command = "count=`grib_ls -p count -w level=100,shortName=r " the_command += ncepfv3_gribfile + " | sed -n 3p` ;" the_command += " grib_copy -w count!=$count " the_command += str(ncepfv3_gribfile) + ' ' + str(tempgribpath) MY_LOGGER.info('Creating tempgribpath: ' + str(tempgribpath)) subprocess.call(the_command, shell=True) # Use it the_command = [checkgrib, "--source", "NCEPFV3", "--levels", "31", tempgribpath] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False # Remove the temporary gribfile MY_LOGGER.info('Removing tempgribpath: ' + str(tempgribpath)) os.remove(tempgribpath) return passed def test_ncepfv3_detects_missing_var_10u(checkgrib=None, ncepfv3_gribfile=None): """ checkgrib: full path to checkgrib executable ncepfv3_gribfile: full path to NCEP FV3 gribfile to check Checks an NCEP FV3 file that has had 10m U removed Expected return_code is 1 """ # Create a temporary gribfile tempgribpath = '/tmp/' + str(uuid.uuid4()) + '.gr2' the_command = 'grib_copy -w shortName!=10u ' the_command += str(ncepfv3_gribfile) + ' ' + str(tempgribpath) MY_LOGGER.info('Creating tempgribpath: ' + str(tempgribpath)) subprocess.call(the_command, shell=True) # Use it the_command = [checkgrib, "--source", "NCEPFV3", "--levels", "31", tempgribpath] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False # Remove the temporary gribfile MY_LOGGER.info('Removing tempgribpath: ' + str(tempgribpath)) os.remove(tempgribpath) return passed def test_ncepfv3_detects_missing_var_tsig1(checkgrib=None, ncepfv3_gribfile=None): """ checkgrib: full path to checkgrib executable ncepfv3_gribfile: full path to NCEP FV3 gribfile to check Checks an NCEP FV3 file that has had sigma level t removed Expected return_code is 1 """ # Create a temporary gribfile tempgribpath = '/tmp/' + str(uuid.uuid4()) + '.gr2' the_command = "count=`grib_ls -p count -w typeOfLevel=sigma,shortName=t " the_command += ncepfv3_gribfile + " | sed -n 3p` ;" the_command += " grib_copy -w count!=$count " the_command += str(ncepfv3_gribfile) + ' ' + str(tempgribpath) MY_LOGGER.info('Creating tempgribpath: ' + str(tempgribpath)) subprocess.call(the_command, shell=True) # Use it the_command = [checkgrib, "--source", "NCEPFV3", "--levels", "31", tempgribpath] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False # Remove the temporary gribfile MY_LOGGER.info('Removing tempgribpath: ' + str(tempgribpath)) os.remove(tempgribpath) return passed def test_ncepfv3_detects_grib1_2t(checkgrib=None, ncepfv3_gribfile=None): """ checkgrib: full path to checkgrib executable ncepfv3_gribfile: full path to NCEP FV3 gribfile to check Checks an NCEP FV3 file that has had the 2t (2m T) message changed from GRIB2 to GRIB1 Expected return_code is 1 """ # Create a temporary gribfile tempgribpath = '/tmp/' + str(uuid.uuid4()) + '.gr2' the_command = "grib_set -s edition=1,packingType=grid_simple " the_command += " -w shortName=2t " the_command += str(ncepfv3_gribfile) + ' ' + str(tempgribpath) MY_LOGGER.info('Creating tempgribpath: ' + str(tempgribpath)) subprocess.call(the_command, shell=True) # Use it the_command = [checkgrib, "--source", "NCEPFV3", "--checkgrib2", "--levels", "31", tempgribpath] process = subprocess.Popen(the_command) process.communicate()[0] return_code = process.returncode MY_LOGGER.info('return_code: ' + str(return_code)) if return_code == 1: passed = True else: passed = False # Remove the temporary gribfile MY_LOGGER.info('Removing tempgribpath: ' + str(tempgribpath)) os.remove(tempgribpath) return passed if __name__=="__main__": main()