Changeset f20af73 in flex_extract.git


Ignore:
Timestamp:
Mar 8, 2019, 10:05:20 AM (5 years ago)
Author:
Anne Philipp <anne.philipp@…>
Branches:
master, ctbto, dev
Children:
82c2959
Parents:
b4a4777
Message:

added CDS API support for ERA5 instead of ECMWFAPI / refactored setup of CONTROL parameter because for local version it wanted to use not installed ECMWF_ENV file.

Location:
source/python
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • source/python/_config.py

    r6f951ca rf20af73  
    2828import sys
    2929import inspect
     30import socket
    3031
    3132_VERSION_STR = '7.1'
     33
     34FLAG_ON_ECMWFSERVER = 'ecmwf' in socket.gethostname()
    3235
    3336QUEUES_LIST = ['ecgate', 'cca', 'ccb']
    3437
    3538INSTALL_TARGETS = ['local', 'ecgate', 'cca']
     39
     40CDS_DATASET = 'reanalysis-era5-complete'
    3641
    3742# up-to-date available maximum level numbers at ECMWF, 05.10.2018
  • source/python/classes/ControlFile.py

    rae2756e rf20af73  
    318318        Default value is 0.
    319319
    320     ecapi : boolean
    321         Tells wether the ECMWF Web APi was able to load or not.
     320    ec_api : boolean
     321        Tells wether the ECMWF Web API was able to load or not.
     322        Default value is None.
     323
     324    cds_api : boolean
     325        Tells wether the CDS API was able to load or not.
    322326        Default value is None.
    323327
     
    417421        self.request = 0
    418422        self.public = 0
    419         self.ecapi = None
     423        self.ec_api = None
     424        self.cds_api = None
    420425        self.purefc = 0
    421426        self.rrint = 0
     
    481486                                data[1] = data[1][:i] + var + data[1][k+1:]
    482487                            else:
    483                                 my_error(self.mailfail,
    484                                          'Could not find variable '
     488                                my_error('Could not find variable '
    485489                                         + data[1][j+1:k] + ' while reading ' +
    486490                                         self.controlfile)
  • source/python/classes/EcFlexpart.py

    re446e85 rf20af73  
    7373sys.path.append('../')
    7474import _config
    75 from GribUtil import GribUtil
     75from .GribUtil import GribUtil
    7676from mods.tools import (init128, to_param_id, silent_remove, product,
    7777                        my_error, make_dir, get_informations, get_dimensions,
    78                         execute_subprocess)
    79 from MarsRetrieval import MarsRetrieval
    80 from UioFiles import UioFiles
     78                        execute_subprocess, to_param_id_with_tablenumber)
     79from .MarsRetrieval import MarsRetrieval
     80from .UioFiles import UioFiles
    8181import mods.disaggregation as disaggregation
    8282
     
    403403        elif not gauss and not eta:
    404404            self.params['OG__ML'][0] += '/U/V'
    405         else:
    406             print('Warning: Collecting etadot and parameters for gaussian grid \
    407                             is a very costly parameter combination, \
    408                             use this combination only for debugging!')
    409             self.params['GG__SL'] = ['Q', 'ML', '1', \
     405        else:  # GAUSS and ETA
     406            print('Warning: Collecting etadot and parameters for gaussian grid '
     407                           'is a very costly parameter combination, '
     408                           'use this combination only for debugging!')
     409            self.params['GG__SL'] = ['Q', 'ML', '1',
    410410                                     '{}'.format((int(self.resol) + 1) / 2)]
    411             self.params['GG__ML'] = ['U/V/D/77', 'ML', self.glevelist, \
     411            self.params['GG__ML'] = ['U/V/D/ETADOT', 'ML', self.glevelist,
    412412                                     '{}'.format((int(self.resol) + 1) / 2)]
    413413
     
    419419
    420420        # ADDITIONAL FIELDS FOR FLEXPART-WRF MODEL (IF QUESTIONED)
    421         #-----------------------------------------------------------------------
     421        # -----------------------------------------------------------------------
    422422        if wrf:
    423423            self.params['OG__ML'][0] += '/Z/VO'
    424424            if '/D' not in self.params['OG__ML'][0]:
    425425                self.params['OG__ML'][0] += '/D'
     426
    426427            wrf_sfc = ['SP','SKT','SST','CI','STL1','STL2', 'STL3','STL4',
    427428                       'SWVL1','SWVL2','SWVL3','SWVL4']
     
    452453
    453454        '''
    454         self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR", \
     455        self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR",
    455456                                    'SFC', '1', self.grid]
    456457        return
     
    684685                                        pk,
    685686                                        retr_param_dict['date'].split('/')[0])
    686                 retr_param_dict['param'] = pv[0]
     687                table128 = init128(_config.PATH_GRIBTABLE)
     688                ids = to_param_id_with_tablenumber(pv[0], table128)
     689                retr_param_dict['param'] = ids
    687690                retr_param_dict['levtype'] = pv[1]
    688691                retr_param_dict['levelist'] = pv[2]
     
    12381241
    12391242        '''
    1240         print('... disaggregation or precipitation with new method.')
     1243        print('... disaggregation of precipitation with new method.')
    12411244        lsp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
    12421245        cp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64)
     
    12681271                    filename1 = c.prefix + date.strftime('%y%m%d%H') + '_1'
    12691272                    filename2 = c.prefix + date.strftime('%y%m%d%H') + '_2'
    1270 
    1271                 # collect for final processing
    1272                 self.outputfilelist.append(os.path.basename(fluxfilename))
    1273                 self.outputfilelist.append(os.path.basename(filename1))
    1274                 self.outputfilelist.append(os.path.basename(filename2))
    12751273
    12761274                # write original time step to flux file as usual
     
    14491447            cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H')
    14501448
    1451             # eliminate all temporary times
     1449            # skip all temporary times
    14521450            # which are outside the retrieval period
    14531451            if timestamp < start_period or \
     
    15451543            os.chdir(c.inputdir)
    15461544            if os.stat('fort.21').st_size == 0 and c.eta:
    1547                 print('Parameter 77 (etadot) is missing, most likely it is \
    1548                        not available for this type or date/time\n')
     1545                print('Parameter 77 (etadot) is missing, most likely it is '
     1546                      'not available for this type or date / time\n')
    15491547                print('Check parameters CLASS, TYPE, STREAM, START_DATE\n')
    1550                 my_error(c.mailfail, 'fort.21 is empty while parameter eta \
    1551                          is set to 1 in CONTROL file')
    1552 #============================================================================================
     1548                my_error('fort.21 is empty while parameter eta '
     1549                         'is set to 1 in CONTROL file')
     1550# ============================================================================================
    15531551            # write out all output to log file before starting fortran programm
    15541552            sys.stdout.flush()
     
    15601558
    15611559            os.chdir(pwd)
    1562 #============================================================================================
     1560# ============================================================================================
    15631561            # create name of final output file, e.g. EN13040500 (ENYYMMDDHH)
    15641562            if c.purefc:
     
    15771575            # collect for final processing
    15781576            self.outputfilelist.append(os.path.basename(fnout))
    1579 #============================================================================================
     1577            # get additional precipitation subgrid data if available
     1578            if c.rrint:
     1579                self.outputfilelist.append(os.path.basename(fnout + '_1'))
     1580                self.outputfilelist.append(os.path.basename(fnout + '_2'))
     1581# ============================================================================================
    15801582            # create outputfile and copy all data from intermediate files
    15811583            # to the outputfile (final GRIB input files for FLEXPART)
     
    15971599                    shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'),
    15981600                                            'rb'), fout)
    1599 #============================================================================================
     1601# ============================================================================================
    16001602        if c.wrf:
    16011603            fwrf.close()
     
    16921694        print('\n\nPostprocessing:\n Format: {}\n'.format(c.format))
    16931695
    1694         if not c.ecapi:
     1696        if _config.FLAG_ON_ECMWFSERVER:
    16951697            print('ecstorage: {}\n ecfsdir: {}\n'.
    16961698                  format(c.ecstorage, c.ecfsdir))
     
    17141716                                   'FILES FAILED!')
    17151717
    1716             if c.ectrans and not c.ecapi:
     1718            if c.ectrans and _config.FLAG_ON_ECMWFSERVER:
    17171719                execute_subprocess(['ectrans', '-overwrite', '-gateway',
    17181720                                    c.gateway, '-remote', c.destination,
     
    17201722                                   error_msg='TRANSFER TO LOCAL SERVER FAILED!')
    17211723
    1722             if c.ecstorage and not c.ecapi:
     1724            if c.ecstorage and _config.FLAG_ON_ECMWFSERVER:
    17231725                execute_subprocess(['ecp', '-o', ofile,
    17241726                                    os.path.expandvars(c.ecfsdir)],
  • source/python/classes/MarsRetrieval.py

    r6f951ca rf20af73  
    3535import sys
    3636import subprocess
     37import traceback
    3738
    3839# software specific classes and modules from flex_extract
    3940sys.path.append('../')
    4041import _config
     42try:
     43    ec_api = True
     44    import ecmwfapi
     45except ImportError:
     46    ec_api = False
     47
     48try:
     49    cds_api = True
     50    import cdsapi
     51except ImportError:
     52    cds_api = False
    4153# ------------------------------------------------------------------------------
    4254# CLASS
     
    130142    '''
    131143
    132     def __init__(self, server, public, marsclass="ei", dataset="", type="",
     144    def __init__(self, server, public, marsclass="EA", dataset="", type="",
    133145                 levtype="", levelist="", repres="", date="", resol="",
    134146                 stream="", area="", time="", step="", expver="1",
     
    158170        marsclass : str, optional
    159171            Characterisation of dataset. E.g. EI (ERA-Interim),
    160             E4 (ERA40), OD (Operational archive), ea (ERA5).
    161             Default is the ERA-Interim dataset "ei".
     172            E4 (ERA40), OD (Operational archive), EA (ERA5).
     173            Default is the ERA5 dataset "EA".
    162174
    163175        dataset : str, optional
     
    429441    def data_retrieve(self):
    430442        '''Submits a MARS retrieval. Depending on the existence of
    431         ECMWF Web-API it is submitted via Python or a
     443        ECMWF Web-API or CDS API it is submitted via Python or a
    432444        subprocess in the Shell. The parameter for the mars retrieval
    433445        are taken from the defined class attributes.
     
    452464        attrs['class'] = mclass
    453465
    454         # prepare target variable as needed for the Web API mode
     466        # prepare target variable as needed for the Web API or CDS API mode
    455467        # within the dictionary for full access
    456468        # as a single variable for public access
     
    475487        if self.server:
    476488            try:
    477                 if self.public:
    478                     print('RETRIEVE PUBLIC DATA!')
     489                if cds_api and isinstance(self.server, cdsapi.Client):
     490                    print('RETRIEVE ERA5 WITH CDS API!')
     491                    self.server.retrieve(_config.CDS_DATASET,
     492                                         attrs, target)
     493                elif ec_api and isinstance(self.server, ecmwfapi.ECMWFDataServer):
     494                    print('RETRIEVE PUBLIC DATA (NOT ERA5)!')
    479495                    self.server.retrieve(attrs)
     496                elif ec_api and isinstance(self.server, ecmwfapi.ECMWFService):
     497                    print('EXECUTE NON-PUBLIC RETRIEVAL (NOT ERA5)!')
     498                    self.server.execute(attrs, target)
    480499                else:
    481                     print('EXECUTE NON-PUBLIC RETRIEVAL!')
    482                     self.server.execute(attrs, target)
    483             except:
    484                 e = sys.exc_info()[0]
    485                 print("ERROR: ", e)
    486                 print('MARS Request failed!')
    487                 if not self.public and os.stat(target).st_size == 0:
    488                     print('MARS Request returned no data - '
    489                           'please check request')
     500                    print('ERROR:')
     501                    print('No match for Web API instance!')
    490502                    raise IOError
    491                 elif self.public and os.stat(target).st_size == 0:
    492                     print('Public MARS Request returned no data - '
    493                           'please check request')
    494                     raise IOError
    495                 else:
    496                     raise IOError
    497         # MARS request via extra process in shell
     503            except Exception as e:
     504                print('\n\nMARS Request failed!')
     505                print(e)
     506                tb = sys.exc_info()[2]
     507                print(traceback.format_exc())
     508                sys.exit()
     509
     510        # MARS request via call in shell
    498511        else:
    499512            request_str = 'ret'
     
    501514                request_str = request_str + ',' + key + '=' + str(value)
    502515            request_str += ',target="' + target + '"'
    503             p = subprocess.Popen(['mars'],
     516            p = subprocess.Popen(['mars', '-p'],
    504517                                 stdin=subprocess.PIPE,
    505518                                 stdout=subprocess.PIPE,
  • source/python/mods/checks.py

    r5551626 rf20af73  
    2727import sys
    2828import _config
    29 import exceptions
    30 from tools import my_error, silent_remove
     29try:
     30    import exceptions
     31except ImportError:
     32    import builtins as exceptions
     33from .tools import my_error, silent_remove
    3134from datetime import datetime
    3235import numpy as np
     
    296299            step = ['{:0>3}'.format(i) for i in ilist]
    297300        elif 'to' in step.lower() and 'by' not in step.lower():
    298             my_error(mailfail, step + ':\n' +
     301            my_error(step + ':\n' +
    299302                     'if "to" is used in steps parameter, '
    300303                     'please use "by" as well')
  • source/python/mods/get_mars_data.py

    r45b99e6 rf20af73  
    6767    inspect.getfile(inspect.currentframe()))) + '/../')
    6868import _config
    69 from tools import (my_error, normal_exit, get_cmdline_args,
     69from .tools import (setup_controldata, my_error, normal_exit, get_cmdline_args,
    7070                   read_ecenv, make_dir)
    7171from classes.EcFlexpart import EcFlexpart
     
    7474
    7575try:
    76     ecapi = True
     76    ec_api = True
    7777    import ecmwfapi
    7878except ImportError:
    79     ecapi = False
     79    ec_api = False
     80
     81try:
     82    cds_api = True
     83    import cdsapi
     84except ImportError:
     85    cds_api = False
    8086# ------------------------------------------------------------------------------
    8187# FUNCTION
     
    95101    '''
    96102
    97     args = get_cmdline_args()
    98     c = ControlFile(args.controlfile)
    99 
    100     env_parameter = read_ecenv(_config.PATH_ECMWF_ENV)
    101     c.assign_args_to_control(args)
    102     c.assign_envs_to_control(env_parameter)
    103     c.check_conditions(args.queue)
    104 
     103    c, _, _, _ = setup_controldata()
    105104    get_mars_data(c)
    106     normal_exit(c.mailops, c.queue, 'Done!')
     105    normal_exit('Retrieving MARS data: Done!')
    107106
    108107    return
     
    125124
    126125    '''
    127     c.ecapi = ecapi
     126    c.ec_api = ec_api
     127    c.cds_api = cds_api
    128128
    129129    if not os.path.exists(c.inputdir):
     
    182182
    183183def mk_server(c):
    184     '''Creates server connection if ECMWF WebAPI is available.
     184    '''Creates a server connection with available python API.
     185
     186    Which API is used depends on availability and the dataset to be retrieved.
     187    The CDS API is used for ERA5 dataset no matter if the user is a member or
     188    a public user. ECMWF WebAPI is used for all other available datasets.
    185189
    186190    Parameters
     
    192196    Return
    193197    ------
    194     server : ECMWFDataServer or ECMWFService
    195         Connection to ECMWF server via python interface ECMWF WebAPI.
    196 
    197     '''
    198     if c.ecapi:
     198    server : ECMWFDataServer, ECMWFService or Client
     199        Connection to ECMWF server via python interface ECMWF WebAPI or CDS API.
     200
     201    '''
     202    if cds_api and (c.marsclass.upper() == 'EA'):
     203        server = cdsapi.Client()
     204        c.ec_api = False
     205    elif c.ec_api:
    199206        if c.public:
    200207            server = ecmwfapi.ECMWFDataServer()
    201208        else:
    202209            server = ecmwfapi.ECMWFService("mars")
     210        c.cds_api = False
    203211    else:
    204212        server = False
    205213
    206     print('Using ECMWF WebAPI: ' + str(c.ecapi))
     214    print('Using ECMWF WebAPI: ' + str(c.ec_api))
     215    print('Using CDS API: ' + str(c.cds_api))
    207216
    208217    return server
     
    353362            flexpart.retrieve(server, dates, c.public, c.request, c.inputdir)
    354363        except IOError:
    355             my_error(c.mailfail, 'MARS request failed')
     364            my_error('MARS request failed')
    356365
    357366        day += delta_t
  • source/python/mods/prepare_flexpart.py

    re811e1a rf20af73  
    7171    inspect.getfile(inspect.currentframe()))) + '/../')
    7272import _config
    73 from checks import check_ppid
     73from .checks import check_ppid
    7474from classes.UioFiles import UioFiles
    7575from classes.ControlFile import ControlFile
    76 from tools import clean_up, get_cmdline_args, read_ecenv, make_dir
     76from .tools import (setup_controldata, clean_up, get_cmdline_args,
     77                   read_ecenv, make_dir)
    7778from classes.EcFlexpart import EcFlexpart
    78 
    79 ecapi = 'ecmwf' not in socket.gethostname()
    80 try:
    81     if ecapi:
    82         import ecmwfapi
    83 except ImportError:
    84     ecapi = False
    8579
    8680# ------------------------------------------------------------------------------
     
    10195    '''
    10296
    103     args = get_cmdline_args()
    104     c = ControlFile(args.controlfile)
    105 
    106     env_parameter = read_ecenv(_config.PATH_ECMWF_ENV)
    107     c.assign_args_to_control(args)
    108     c.assign_envs_to_control(env_parameter)
    109     c.check_conditions(args.queue)
    110 
    111     prepare_flexpart(args.ppid, c)
     97    c, ppid, _, _ = setup_controldata()
     98    prepare_flexpart(ppid, c)
     99    normal_exit('Preparing FLEXPART output files: Done!')
    112100
    113101    return
     
    136124    '''
    137125    check_ppid(c, ppid)
    138 
    139     c.ecapi = ecapi
    140126
    141127    # create the start and end date
  • source/python/mods/tools.py

    r8778c5a rf20af73  
    6565import subprocess
    6666import traceback
    67 import exceptions
     67try:
     68    import exceptions
     69except ImportError:
     70    import builtins as exceptions
    6871from datetime import datetime
    6972from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
     73
     74
    7075
    7176# ------------------------------------------------------------------------------
    7277# METHODS
    7378# ------------------------------------------------------------------------------
     79
     80def setup_controldata():
     81    '''Collects, stores and checks controlling arguments from command line,
     82    CONTROL file and ECMWF_ENV file.
     83
     84    Parameters
     85    ----------
     86
     87    Return
     88    ------
     89    c : ControlFile
     90        Contains all the parameters of CONTROL file and
     91        command line.
     92
     93    ppid : str
     94        Parent process id.
     95
     96    queue : str
     97        Name of queue for submission to ECMWF (e.g. ecgate or cca )
     98
     99    job_template : str
     100        Name of the job template file for submission to ECMWF server.
     101    '''
     102    import _config
     103    from classes.ControlFile import ControlFile
     104
     105    args = get_cmdline_args()
     106    c = ControlFile(args.controlfile)
     107    c.assign_args_to_control(args)
     108    if os.path.isfile(_config.PATH_ECMWF_ENV):
     109        env_parameter = read_ecenv(_config.PATH_ECMWF_ENV)
     110        c.assign_envs_to_control(env_parameter)
     111    c.check_conditions(args.queue)
     112
     113    return c, args.ppid, args.queue, args.job_template
    74114
    75115def none_or_str(value):
     
    259299    print("... clean inputdir!")
    260300
    261     cleanlist = glob.glob(os.path.join(c.inputdir, "*"))
     301    cleanlist = [file for file in glob.glob(os.path.join(c.inputdir, "*"))
     302                 if not os.path.basename(file).startswith(c.prefix)]
    262303
    263304    if cleanlist:
    264305        for element in cleanlist:
    265             if c.prefix not in element:
    266                 silent_remove(element)
    267             if c.ecapi is False and (c.ectrans == 1 or c.ecstorage == 1):
    268                 silent_remove(element)
     306            silent_remove(element)
    269307        print("... done!")
    270308    else:
     
    274312
    275313
    276 def my_error(users, message='ERROR'):
     314def my_error(message='ERROR'):
    277315    '''Prints a specified error message which can be passed to the function
    278316    before exiting the program.
     
    280318    Parameters
    281319    ----------
    282     user : list of str
    283         Contains all email addresses which should be notified.
    284         It might also contain just the ecmwf user name which wil trigger
    285         mailing to the associated email address for this user.
    286 
    287320    message : str, optional
    288321        Error message. Default value is "ERROR".
     
    464497    else:
    465498        for data in fdata:
    466             if data[0] != '!':
    467                 table128[data[0:3]] = data[59:64].strip()
     499            if data != '' and data[0] != '!':
     500                table128[data[0:3]] = data[59:65].strip()
    468501
    469502    return table128
     
    508541
    509542    return ipar
     543
     544def to_param_id_with_tablenumber(pars, table):
     545    '''Transform parameter names to parameter ids and add table id.
     546
     547    Conversion with ECMWF grib table 128.
     548
     549    Parameters
     550    ----------
     551    pars : str
     552        Addpar argument from CONTROL file in the format of
     553        parameter names instead of ids. The parameter short
     554        names are sepearted with "/" and they are passed as
     555        one single string.
     556
     557    table : dict
     558        Contains the ECMWF grib table 128 information.
     559        The key is the parameter number and the value is the
     560        short name of the parameter.
     561
     562    Return
     563    ------
     564    spar : str
     565        List of addpar parameters from CONTROL file transformed to
     566        parameter ids in the format of integer.
     567    '''
     568    if not pars:
     569        return []
     570    if not isinstance(pars, str):
     571        pars=str(pars)
     572
     573    cpar = pars.upper().split('/')
     574    spar = []
     575    for par in cpar:
     576        for k, v in table.iteritems():
     577            if par == k or par == v:
     578                spar.append(k + '.128')
     579                break
     580        else:
     581            print('\n\n\t\tWarning: par ' + par + ' not found in table 128\n\n')
     582
     583    return '/'.join(spar)
    510584
    511585def get_list_as_string(list_obj, concatenate_sign=', '):
  • source/python/submit.py

    rfc05fbd rf20af73  
    6767# software specific classes and modules from flex_extract
    6868import _config
    69 from mods.tools import (normal_exit, get_cmdline_args,
     69from mods.tools import (setup_controldata, normal_exit, get_cmdline_args,
    7070                        submit_job_to_ecserver, read_ecenv)
    7171from mods.get_mars_data import get_mars_data
     
    9191    '''
    9292
    93     args = get_cmdline_args()
    94     c = ControlFile(args.controlfile)
    95 
    96     env_parameter = read_ecenv(_config.PATH_ECMWF_ENV)
    97     c.assign_args_to_control(args)
    98     c.assign_envs_to_control(env_parameter)
    99     c.check_conditions(args.queue)
     93    c, ppid, queue, job_template = setup_controldata()
    10094
    10195    # on local side
    102     # on ECMWF server this would also be the local side
     96    # starting from an ECMWF server this would also be the local side
    10397    called_from_dir = os.getcwd()
    104     if args.queue is None:
     98    if queue is None:
    10599        if c.inputdir[0] != '/':
    106100            c.inputdir = os.path.join(called_from_dir, c.inputdir)
     
    109103        get_mars_data(c)
    110104        if c.request == 0 or c.request == 2:
    111             prepare_flexpart(args.ppid, c)
     105            prepare_flexpart(ppid, c)
    112106            exit_message = 'FLEX_EXTRACT IS DONE!'
    113107        else:
     
    115109    # send files to ECMWF server
    116110    else:
    117         submit(args.job_template, c, args.queue)
     111        submit(job_template, c, queue)
    118112        exit_message = 'FLEX_EXTRACT JOB SCRIPT IS SUBMITED!'
    119113
Note: See TracChangeset for help on using the changeset viewer.
hosted by ZAMG