Changeset f20af73 in flex_extract.git for source/python
- Timestamp:
- Mar 8, 2019, 10:05:20 AM (5 years ago)
- Branches:
- master, ctbto, dev
- Children:
- 82c2959
- Parents:
- b4a4777
- Location:
- source/python
- Files:
-
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
source/python/_config.py
r6f951ca rf20af73 28 28 import sys 29 29 import inspect 30 import socket 30 31 31 32 _VERSION_STR = '7.1' 33 34 FLAG_ON_ECMWFSERVER = 'ecmwf' in socket.gethostname() 32 35 33 36 QUEUES_LIST = ['ecgate', 'cca', 'ccb'] 34 37 35 38 INSTALL_TARGETS = ['local', 'ecgate', 'cca'] 39 40 CDS_DATASET = 'reanalysis-era5-complete' 36 41 37 42 # up-to-date available maximum level numbers at ECMWF, 05.10.2018 -
source/python/classes/ControlFile.py
rae2756e rf20af73 318 318 Default value is 0. 319 319 320 ecapi : boolean 321 Tells wether the ECMWF Web APi was able to load or not. 320 ec_api : boolean 321 Tells wether the ECMWF Web API was able to load or not. 322 Default value is None. 323 324 cds_api : boolean 325 Tells wether the CDS API was able to load or not. 322 326 Default value is None. 323 327 … … 417 421 self.request = 0 418 422 self.public = 0 419 self.ecapi = None 423 self.ec_api = None 424 self.cds_api = None 420 425 self.purefc = 0 421 426 self.rrint = 0 … … 481 486 data[1] = data[1][:i] + var + data[1][k+1:] 482 487 else: 483 my_error(self.mailfail, 484 'Could not find variable ' 488 my_error('Could not find variable ' 485 489 + data[1][j+1:k] + ' while reading ' + 486 490 self.controlfile) -
source/python/classes/EcFlexpart.py
re446e85 rf20af73 73 73 sys.path.append('../') 74 74 import _config 75 from GribUtil import GribUtil75 from .GribUtil import GribUtil 76 76 from mods.tools import (init128, to_param_id, silent_remove, product, 77 77 my_error, make_dir, get_informations, get_dimensions, 78 execute_subprocess )79 from MarsRetrieval import MarsRetrieval80 from UioFiles import UioFiles78 execute_subprocess, to_param_id_with_tablenumber) 79 from .MarsRetrieval import MarsRetrieval 80 from .UioFiles import UioFiles 81 81 import mods.disaggregation as disaggregation 82 82 … … 403 403 elif not gauss and not eta: 404 404 self.params['OG__ML'][0] += '/U/V' 405 else: 406 print('Warning: Collecting etadot and parameters for gaussian grid \407 is a very costly parameter combination, \408 409 self.params['GG__SL'] = ['Q', 'ML', '1', \405 else: # GAUSS and ETA 406 print('Warning: Collecting etadot and parameters for gaussian grid ' 407 'is a very costly parameter combination, ' 408 'use this combination only for debugging!') 409 self.params['GG__SL'] = ['Q', 'ML', '1', 410 410 '{}'.format((int(self.resol) + 1) / 2)] 411 self.params['GG__ML'] = ['U/V/D/ 77', 'ML', self.glevelist, \411 self.params['GG__ML'] = ['U/V/D/ETADOT', 'ML', self.glevelist, 412 412 '{}'.format((int(self.resol) + 1) / 2)] 413 413 … … 419 419 420 420 # ADDITIONAL FIELDS FOR FLEXPART-WRF MODEL (IF QUESTIONED) 421 # -----------------------------------------------------------------------421 # ----------------------------------------------------------------------- 422 422 if wrf: 423 423 self.params['OG__ML'][0] += '/Z/VO' 424 424 if '/D' not in self.params['OG__ML'][0]: 425 425 self.params['OG__ML'][0] += '/D' 426 426 427 wrf_sfc = ['SP','SKT','SST','CI','STL1','STL2', 'STL3','STL4', 427 428 'SWVL1','SWVL2','SWVL3','SWVL4'] … … 452 453 453 454 ''' 454 self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR", \455 self.params['OG_acc_SL'] = ["LSP/CP/SSHF/EWSS/NSSS/SSR", 455 456 'SFC', '1', self.grid] 456 457 return … … 684 685 pk, 685 686 retr_param_dict['date'].split('/')[0]) 686 retr_param_dict['param'] = pv[0] 687 table128 = init128(_config.PATH_GRIBTABLE) 688 ids = to_param_id_with_tablenumber(pv[0], table128) 689 retr_param_dict['param'] = ids 687 690 retr_param_dict['levtype'] = pv[1] 688 691 retr_param_dict['levelist'] = pv[2] … … 1238 1241 1239 1242 ''' 1240 print('... disaggregation o rprecipitation with new method.')1243 print('... disaggregation of precipitation with new method.') 1241 1244 lsp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64) 1242 1245 cp_new_np = np.zeros((ni * nj, nt * 3), dtype=np.float64) … … 1268 1271 filename1 = c.prefix + date.strftime('%y%m%d%H') + '_1' 1269 1272 filename2 = c.prefix + date.strftime('%y%m%d%H') + '_2' 1270 1271 # collect for final processing1272 self.outputfilelist.append(os.path.basename(fluxfilename))1273 self.outputfilelist.append(os.path.basename(filename1))1274 self.outputfilelist.append(os.path.basename(filename2))1275 1273 1276 1274 # write original time step to flux file as usual … … 1449 1447 cdate_hour = datetime.strftime(timestamp, '%Y%m%d%H') 1450 1448 1451 # eliminateall temporary times1449 # skip all temporary times 1452 1450 # which are outside the retrieval period 1453 1451 if timestamp < start_period or \ … … 1545 1543 os.chdir(c.inputdir) 1546 1544 if os.stat('fort.21').st_size == 0 and c.eta: 1547 print('Parameter 77 (etadot) is missing, most likely it is \1548 not available for this type or date/time\n')1545 print('Parameter 77 (etadot) is missing, most likely it is ' 1546 'not available for this type or date / time\n') 1549 1547 print('Check parameters CLASS, TYPE, STREAM, START_DATE\n') 1550 my_error( c.mailfail, 'fort.21 is empty while parameter eta \1551 is set to 1 in CONTROL file')1552 # ============================================================================================1548 my_error('fort.21 is empty while parameter eta ' 1549 'is set to 1 in CONTROL file') 1550 # ============================================================================================ 1553 1551 # write out all output to log file before starting fortran programm 1554 1552 sys.stdout.flush() … … 1560 1558 1561 1559 os.chdir(pwd) 1562 # ============================================================================================1560 # ============================================================================================ 1563 1561 # create name of final output file, e.g. EN13040500 (ENYYMMDDHH) 1564 1562 if c.purefc: … … 1577 1575 # collect for final processing 1578 1576 self.outputfilelist.append(os.path.basename(fnout)) 1579 #============================================================================================ 1577 # get additional precipitation subgrid data if available 1578 if c.rrint: 1579 self.outputfilelist.append(os.path.basename(fnout + '_1')) 1580 self.outputfilelist.append(os.path.basename(fnout + '_2')) 1581 # ============================================================================================ 1580 1582 # create outputfile and copy all data from intermediate files 1581 1583 # to the outputfile (final GRIB input files for FLEXPART) … … 1597 1599 shutil.copyfileobj(open(os.path.join(c.inputdir, 'fort.25'), 1598 1600 'rb'), fout) 1599 # ============================================================================================1601 # ============================================================================================ 1600 1602 if c.wrf: 1601 1603 fwrf.close() … … 1692 1694 print('\n\nPostprocessing:\n Format: {}\n'.format(c.format)) 1693 1695 1694 if not c.ecapi:1696 if _config.FLAG_ON_ECMWFSERVER: 1695 1697 print('ecstorage: {}\n ecfsdir: {}\n'. 1696 1698 format(c.ecstorage, c.ecfsdir)) … … 1714 1716 'FILES FAILED!') 1715 1717 1716 if c.ectrans and not c.ecapi:1718 if c.ectrans and _config.FLAG_ON_ECMWFSERVER: 1717 1719 execute_subprocess(['ectrans', '-overwrite', '-gateway', 1718 1720 c.gateway, '-remote', c.destination, … … 1720 1722 error_msg='TRANSFER TO LOCAL SERVER FAILED!') 1721 1723 1722 if c.ecstorage and not c.ecapi:1724 if c.ecstorage and _config.FLAG_ON_ECMWFSERVER: 1723 1725 execute_subprocess(['ecp', '-o', ofile, 1724 1726 os.path.expandvars(c.ecfsdir)], -
source/python/classes/MarsRetrieval.py
r6f951ca rf20af73 35 35 import sys 36 36 import subprocess 37 import traceback 37 38 38 39 # software specific classes and modules from flex_extract 39 40 sys.path.append('../') 40 41 import _config 42 try: 43 ec_api = True 44 import ecmwfapi 45 except ImportError: 46 ec_api = False 47 48 try: 49 cds_api = True 50 import cdsapi 51 except ImportError: 52 cds_api = False 41 53 # ------------------------------------------------------------------------------ 42 54 # CLASS … … 130 142 ''' 131 143 132 def __init__(self, server, public, marsclass=" ei", dataset="", type="",144 def __init__(self, server, public, marsclass="EA", dataset="", type="", 133 145 levtype="", levelist="", repres="", date="", resol="", 134 146 stream="", area="", time="", step="", expver="1", … … 158 170 marsclass : str, optional 159 171 Characterisation of dataset. E.g. EI (ERA-Interim), 160 E4 (ERA40), OD (Operational archive), ea(ERA5).161 Default is the ERA -Interim dataset "ei".172 E4 (ERA40), OD (Operational archive), EA (ERA5). 173 Default is the ERA5 dataset "EA". 162 174 163 175 dataset : str, optional … … 429 441 def data_retrieve(self): 430 442 '''Submits a MARS retrieval. Depending on the existence of 431 ECMWF Web-API it is submitted via Python or a443 ECMWF Web-API or CDS API it is submitted via Python or a 432 444 subprocess in the Shell. The parameter for the mars retrieval 433 445 are taken from the defined class attributes. … … 452 464 attrs['class'] = mclass 453 465 454 # prepare target variable as needed for the Web API mode466 # prepare target variable as needed for the Web API or CDS API mode 455 467 # within the dictionary for full access 456 468 # as a single variable for public access … … 475 487 if self.server: 476 488 try: 477 if self.public: 478 print('RETRIEVE PUBLIC DATA!') 489 if cds_api and isinstance(self.server, cdsapi.Client): 490 print('RETRIEVE ERA5 WITH CDS API!') 491 self.server.retrieve(_config.CDS_DATASET, 492 attrs, target) 493 elif ec_api and isinstance(self.server, ecmwfapi.ECMWFDataServer): 494 print('RETRIEVE PUBLIC DATA (NOT ERA5)!') 479 495 self.server.retrieve(attrs) 496 elif ec_api and isinstance(self.server, ecmwfapi.ECMWFService): 497 print('EXECUTE NON-PUBLIC RETRIEVAL (NOT ERA5)!') 498 self.server.execute(attrs, target) 480 499 else: 481 print('EXECUTE NON-PUBLIC RETRIEVAL!') 482 self.server.execute(attrs, target) 483 except: 484 e = sys.exc_info()[0] 485 print("ERROR: ", e) 486 print('MARS Request failed!') 487 if not self.public and os.stat(target).st_size == 0: 488 print('MARS Request returned no data - ' 489 'please check request') 500 print('ERROR:') 501 print('No match for Web API instance!') 490 502 raise IOError 491 elif self.public and os.stat(target).st_size == 0: 492 print('Public MARS Request returned no data - ' 493 'please check request') 494 raise IOError 495 else: 496 raise IOError 497 # MARS request via extra process in shell 503 except Exception as e: 504 print('\n\nMARS Request failed!') 505 print(e) 506 tb = sys.exc_info()[2] 507 print(traceback.format_exc()) 508 sys.exit() 509 510 # MARS request via call in shell 498 511 else: 499 512 request_str = 'ret' … … 501 514 request_str = request_str + ',' + key + '=' + str(value) 502 515 request_str += ',target="' + target + '"' 503 p = subprocess.Popen(['mars' ],516 p = subprocess.Popen(['mars', '-p'], 504 517 stdin=subprocess.PIPE, 505 518 stdout=subprocess.PIPE, -
source/python/mods/checks.py
r5551626 rf20af73 27 27 import sys 28 28 import _config 29 import exceptions 30 from tools import my_error, silent_remove 29 try: 30 import exceptions 31 except ImportError: 32 import builtins as exceptions 33 from .tools import my_error, silent_remove 31 34 from datetime import datetime 32 35 import numpy as np … … 296 299 step = ['{:0>3}'.format(i) for i in ilist] 297 300 elif 'to' in step.lower() and 'by' not in step.lower(): 298 my_error( mailfail,step + ':\n' +301 my_error(step + ':\n' + 299 302 'if "to" is used in steps parameter, ' 300 303 'please use "by" as well') -
source/python/mods/get_mars_data.py
r45b99e6 rf20af73 67 67 inspect.getfile(inspect.currentframe()))) + '/../') 68 68 import _config 69 from tools import (my_error, normal_exit, get_cmdline_args,69 from .tools import (setup_controldata, my_error, normal_exit, get_cmdline_args, 70 70 read_ecenv, make_dir) 71 71 from classes.EcFlexpart import EcFlexpart … … 74 74 75 75 try: 76 ec api = True76 ec_api = True 77 77 import ecmwfapi 78 78 except ImportError: 79 ecapi = False 79 ec_api = False 80 81 try: 82 cds_api = True 83 import cdsapi 84 except ImportError: 85 cds_api = False 80 86 # ------------------------------------------------------------------------------ 81 87 # FUNCTION … … 95 101 ''' 96 102 97 args = get_cmdline_args() 98 c = ControlFile(args.controlfile) 99 100 env_parameter = read_ecenv(_config.PATH_ECMWF_ENV) 101 c.assign_args_to_control(args) 102 c.assign_envs_to_control(env_parameter) 103 c.check_conditions(args.queue) 104 103 c, _, _, _ = setup_controldata() 105 104 get_mars_data(c) 106 normal_exit( c.mailops, c.queue, 'Done!')105 normal_exit('Retrieving MARS data: Done!') 107 106 108 107 return … … 125 124 126 125 ''' 127 c.ecapi = ecapi 126 c.ec_api = ec_api 127 c.cds_api = cds_api 128 128 129 129 if not os.path.exists(c.inputdir): … … 182 182 183 183 def mk_server(c): 184 '''Creates server connection if ECMWF WebAPI is available. 184 '''Creates a server connection with available python API. 185 186 Which API is used depends on availability and the dataset to be retrieved. 187 The CDS API is used for ERA5 dataset no matter if the user is a member or 188 a public user. ECMWF WebAPI is used for all other available datasets. 185 189 186 190 Parameters … … 192 196 Return 193 197 ------ 194 server : ECMWFDataServer or ECMWFService 195 Connection to ECMWF server via python interface ECMWF WebAPI. 196 197 ''' 198 if c.ecapi: 198 server : ECMWFDataServer, ECMWFService or Client 199 Connection to ECMWF server via python interface ECMWF WebAPI or CDS API. 200 201 ''' 202 if cds_api and (c.marsclass.upper() == 'EA'): 203 server = cdsapi.Client() 204 c.ec_api = False 205 elif c.ec_api: 199 206 if c.public: 200 207 server = ecmwfapi.ECMWFDataServer() 201 208 else: 202 209 server = ecmwfapi.ECMWFService("mars") 210 c.cds_api = False 203 211 else: 204 212 server = False 205 213 206 print('Using ECMWF WebAPI: ' + str(c.ecapi)) 214 print('Using ECMWF WebAPI: ' + str(c.ec_api)) 215 print('Using CDS API: ' + str(c.cds_api)) 207 216 208 217 return server … … 353 362 flexpart.retrieve(server, dates, c.public, c.request, c.inputdir) 354 363 except IOError: 355 my_error( c.mailfail,'MARS request failed')364 my_error('MARS request failed') 356 365 357 366 day += delta_t -
source/python/mods/prepare_flexpart.py
re811e1a rf20af73 71 71 inspect.getfile(inspect.currentframe()))) + '/../') 72 72 import _config 73 from checks import check_ppid73 from .checks import check_ppid 74 74 from classes.UioFiles import UioFiles 75 75 from classes.ControlFile import ControlFile 76 from tools import clean_up, get_cmdline_args, read_ecenv, make_dir 76 from .tools import (setup_controldata, clean_up, get_cmdline_args, 77 read_ecenv, make_dir) 77 78 from classes.EcFlexpart import EcFlexpart 78 79 ecapi = 'ecmwf' not in socket.gethostname()80 try:81 if ecapi:82 import ecmwfapi83 except ImportError:84 ecapi = False85 79 86 80 # ------------------------------------------------------------------------------ … … 101 95 ''' 102 96 103 args = get_cmdline_args() 104 c = ControlFile(args.controlfile) 105 106 env_parameter = read_ecenv(_config.PATH_ECMWF_ENV) 107 c.assign_args_to_control(args) 108 c.assign_envs_to_control(env_parameter) 109 c.check_conditions(args.queue) 110 111 prepare_flexpart(args.ppid, c) 97 c, ppid, _, _ = setup_controldata() 98 prepare_flexpart(ppid, c) 99 normal_exit('Preparing FLEXPART output files: Done!') 112 100 113 101 return … … 136 124 ''' 137 125 check_ppid(c, ppid) 138 139 c.ecapi = ecapi140 126 141 127 # create the start and end date -
source/python/mods/tools.py
r8778c5a rf20af73 65 65 import subprocess 66 66 import traceback 67 import exceptions 67 try: 68 import exceptions 69 except ImportError: 70 import builtins as exceptions 68 71 from datetime import datetime 69 72 from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter 73 74 70 75 71 76 # ------------------------------------------------------------------------------ 72 77 # METHODS 73 78 # ------------------------------------------------------------------------------ 79 80 def setup_controldata(): 81 '''Collects, stores and checks controlling arguments from command line, 82 CONTROL file and ECMWF_ENV file. 83 84 Parameters 85 ---------- 86 87 Return 88 ------ 89 c : ControlFile 90 Contains all the parameters of CONTROL file and 91 command line. 92 93 ppid : str 94 Parent process id. 95 96 queue : str 97 Name of queue for submission to ECMWF (e.g. ecgate or cca ) 98 99 job_template : str 100 Name of the job template file for submission to ECMWF server. 101 ''' 102 import _config 103 from classes.ControlFile import ControlFile 104 105 args = get_cmdline_args() 106 c = ControlFile(args.controlfile) 107 c.assign_args_to_control(args) 108 if os.path.isfile(_config.PATH_ECMWF_ENV): 109 env_parameter = read_ecenv(_config.PATH_ECMWF_ENV) 110 c.assign_envs_to_control(env_parameter) 111 c.check_conditions(args.queue) 112 113 return c, args.ppid, args.queue, args.job_template 74 114 75 115 def none_or_str(value): … … 259 299 print("... clean inputdir!") 260 300 261 cleanlist = glob.glob(os.path.join(c.inputdir, "*")) 301 cleanlist = [file for file in glob.glob(os.path.join(c.inputdir, "*")) 302 if not os.path.basename(file).startswith(c.prefix)] 262 303 263 304 if cleanlist: 264 305 for element in cleanlist: 265 if c.prefix not in element: 266 silent_remove(element) 267 if c.ecapi is False and (c.ectrans == 1 or c.ecstorage == 1): 268 silent_remove(element) 306 silent_remove(element) 269 307 print("... done!") 270 308 else: … … 274 312 275 313 276 def my_error( users,message='ERROR'):314 def my_error(message='ERROR'): 277 315 '''Prints a specified error message which can be passed to the function 278 316 before exiting the program. … … 280 318 Parameters 281 319 ---------- 282 user : list of str283 Contains all email addresses which should be notified.284 It might also contain just the ecmwf user name which wil trigger285 mailing to the associated email address for this user.286 287 320 message : str, optional 288 321 Error message. Default value is "ERROR". … … 464 497 else: 465 498 for data in fdata: 466 if data [0] != '!':467 table128[data[0:3]] = data[59:6 4].strip()499 if data != '' and data[0] != '!': 500 table128[data[0:3]] = data[59:65].strip() 468 501 469 502 return table128 … … 508 541 509 542 return ipar 543 544 def to_param_id_with_tablenumber(pars, table): 545 '''Transform parameter names to parameter ids and add table id. 546 547 Conversion with ECMWF grib table 128. 548 549 Parameters 550 ---------- 551 pars : str 552 Addpar argument from CONTROL file in the format of 553 parameter names instead of ids. The parameter short 554 names are sepearted with "/" and they are passed as 555 one single string. 556 557 table : dict 558 Contains the ECMWF grib table 128 information. 559 The key is the parameter number and the value is the 560 short name of the parameter. 561 562 Return 563 ------ 564 spar : str 565 List of addpar parameters from CONTROL file transformed to 566 parameter ids in the format of integer. 567 ''' 568 if not pars: 569 return [] 570 if not isinstance(pars, str): 571 pars=str(pars) 572 573 cpar = pars.upper().split('/') 574 spar = [] 575 for par in cpar: 576 for k, v in table.iteritems(): 577 if par == k or par == v: 578 spar.append(k + '.128') 579 break 580 else: 581 print('\n\n\t\tWarning: par ' + par + ' not found in table 128\n\n') 582 583 return '/'.join(spar) 510 584 511 585 def get_list_as_string(list_obj, concatenate_sign=', '): -
source/python/submit.py
rfc05fbd rf20af73 67 67 # software specific classes and modules from flex_extract 68 68 import _config 69 from mods.tools import ( normal_exit, get_cmdline_args,69 from mods.tools import (setup_controldata, normal_exit, get_cmdline_args, 70 70 submit_job_to_ecserver, read_ecenv) 71 71 from mods.get_mars_data import get_mars_data … … 91 91 ''' 92 92 93 args = get_cmdline_args() 94 c = ControlFile(args.controlfile) 95 96 env_parameter = read_ecenv(_config.PATH_ECMWF_ENV) 97 c.assign_args_to_control(args) 98 c.assign_envs_to_control(env_parameter) 99 c.check_conditions(args.queue) 93 c, ppid, queue, job_template = setup_controldata() 100 94 101 95 # on local side 102 # on ECMWF server this would also be the local side96 # starting from an ECMWF server this would also be the local side 103 97 called_from_dir = os.getcwd() 104 if args.queue is None:98 if queue is None: 105 99 if c.inputdir[0] != '/': 106 100 c.inputdir = os.path.join(called_from_dir, c.inputdir) … … 109 103 get_mars_data(c) 110 104 if c.request == 0 or c.request == 2: 111 prepare_flexpart( args.ppid, c)105 prepare_flexpart(ppid, c) 112 106 exit_message = 'FLEX_EXTRACT IS DONE!' 113 107 else: … … 115 109 # send files to ECMWF server 116 110 else: 117 submit( args.job_template, c, args.queue)111 submit(job_template, c, queue) 118 112 exit_message = 'FLEX_EXTRACT JOB SCRIPT IS SUBMITED!' 119 113
Note: See TracChangeset
for help on using the changeset viewer.