source: flex_extract.git/source/python/mods/tools.py @ f2616a3

ctbtodev
Last change on this file since f2616a3 was f2616a3, checked in by Anne Philipp <anne.philipp@…>, 5 years ago

implemented a job split with a new parameter 'job_chunk' so that huge time periods can automatically be splitted

  • Property mode set to 100644
File size: 24.2 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Philipp (University of Vienna)
5#
6# @Date: May 2018
7#
8# @Change History:
9#    October 2014 - Anne Fouilloux (University of Oslo)
10#        - created functions silent_remove and product (taken from ECMWF)
11#
12#    November 2015 - Leopold Haimberger (University of Vienna)
13#        - created functions: interpret_args_and_control, clean_up
14#          my_error, normal_exit, init128, to_param_id
15#
16#    April 2018 - Anne Philipp (University of Vienna):
17#        - applied PEP8 style guide
18#        - added documentation
19#        - moved all functions from file Flexparttools to this file tools
20#        - added function get_list_as_string
21#        - seperated args and control interpretation
22#
23# @License:
24#    (C) Copyright 2014-2018.
25#
26#    This software is licensed under the terms of the Apache Licence Version 2.0
27#    which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
28#
29# @Modul Description:
30#    This module contains a couple of helpful functions which are
31#    used in different places in flex_extract.
32#
33# @Module Content:
34#    - get_cmdline_args
35#    - clean_up
36#    - my_error
37#    - normal_exit
38#    - product
39#    - silent_remove
40#    - init128
41#    - to_param_id
42#    - get_list_as_string
43#    - make_dir
44#
45#*******************************************************************************
46
47# ------------------------------------------------------------------------------
48# MODULES
49# ------------------------------------------------------------------------------
50import os
51import errno
52import sys
53import glob
54import subprocess
55import traceback
56import exceptions
57from datetime import datetime
58from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
59
60# ------------------------------------------------------------------------------
61# FUNCTIONS
62# ------------------------------------------------------------------------------
63
64def none_or_str(value):
65    '''Converts the input string into pythons None-type if the string
66    contains string "None".
67
68    Parameters
69    ----------
70    value : :obj:`string`
71        String to be checked for the "None" word.
72
73    Return
74    ------
75    None or value:
76        Return depends on the content of the input value. If it was "None",
77        then the python type None is returned. Otherwise the string itself.
78    '''
79    if value == 'None':
80        return None
81    return value
82
83def none_or_int(value):
84    '''Converts the input string into pythons None-type if the string
85    contains string "None". Otherwise it is converted to an integer value.
86
87    Parameters
88    ----------
89    value : :obj:`string`
90        String to be checked for the "None" word.
91
92    Return
93    ------
94    None or int(value):
95        Return depends on the content of the input value. If it was "None",
96        then the python type None is returned. Otherwise the string is
97        converted into an integer value.
98    '''
99    if value == 'None':
100        return None
101    return int(value)
102
103def get_cmdline_args():
104    '''Decomposes the command line arguments and assigns them to variables.
105    Apply default values for non mentioned arguments.
106
107    Parameters
108    ----------
109
110    Return
111    ------
112    args : :obj:`Namespace`
113        Contains the commandline arguments from script/program call.
114    '''
115
116    parser = ArgumentParser(description='Retrieve FLEXPART input from \
117                                ECMWF MARS archive',
118                            formatter_class=ArgumentDefaultsHelpFormatter)
119
120    # control parameters that override control file values
121    parser.add_argument("--start_date", dest="start_date",
122                        type=none_or_str, default=None,
123                        help="start date YYYYMMDD")
124    parser.add_argument("--end_date", dest="end_date",
125                        type=none_or_str, default=None,
126                        help="end_date YYYYMMDD")
127    parser.add_argument("--date_chunk", dest="date_chunk",
128                        type=none_or_int, default=None,
129                        help="# of days to be retrieved at once")
130    parser.add_argument("--job_chunk", dest="job_chunk",
131                        type=none_or_int, default=None,
132                        help="# of days to be retrieved within a single job")
133    parser.add_argument("--controlfile", dest="controlfile",
134                        type=none_or_str, default='CONTROL.temp',
135                        help="file with CONTROL parameters")
136    parser.add_argument("--basetime", dest="basetime",
137                        type=none_or_int, default=None,
138                        help="base such as 00 or 12 (for half day retrievals)")
139    parser.add_argument("--step", dest="step",
140                        type=none_or_str, default=None,
141                        help="steps such as 00/to/48")
142    parser.add_argument("--levelist", dest="levelist",
143                        type=none_or_str, default=None,
144                        help="Vertical levels to be retrieved, e.g. 30/to/60")
145    parser.add_argument("--area", dest="area",
146                        type=none_or_str, default=None,
147                        help="area defined as north/west/south/east")
148
149    # some switches
150    parser.add_argument("--debug", dest="debug",
151                        type=none_or_int, default=None,
152                        help="debug mode - leave temporary files intact")
153    parser.add_argument("--request", dest="request",
154                        type=none_or_int, default=None,
155                        help="list all mars requests in file mars_requests.dat")
156    parser.add_argument("--public", dest="public",
157                        type=none_or_int, default=None,
158                        help="public mode - retrieves the public datasets")
159    parser.add_argument("--rrint", dest="rrint",
160                        type=none_or_int, default=None,
161                        help="select old or new precipitation interpolation \
162                        0 - old method\
163                        1 - new method (additional subgrid points)")
164
165    # set directories
166    parser.add_argument("--inputdir", dest="inputdir",
167                        type=none_or_str, default=None,
168                        help="root directory for storing intermediate files")
169    parser.add_argument("--outputdir", dest="outputdir",
170                        type=none_or_str, default=None,
171                        help="root directory for storing output files")
172    parser.add_argument("--flexpartdir", dest="flexpartdir",
173                        type=none_or_str, default=None,
174                        help="FLEXPART root directory (to find grib2flexpart \
175                        and COMMAND file)\n Normally flex_extract resides in \
176                        the scripts directory of the FLEXPART distribution")
177
178    # this is only used by prepare_flexpart.py to rerun a postprocessing step
179    parser.add_argument("--ppid", dest="ppid",
180                        type=none_or_str, default=None,
181                        help="specify parent process id for \
182                        rerun of prepare_flexpart")
183
184    # arguments for job submission to ECMWF, only needed by submit.py
185    parser.add_argument("--job_template", dest='job_template',
186                        type=none_or_str, default="job.temp",
187                        help="job template file for submission to ECMWF")
188    parser.add_argument("--queue", dest="queue",
189                        type=none_or_str, default=None,
190                        help="queue for submission to ECMWF \
191                        (e.g. ecgate or cca )")
192
193    args = parser.parse_args()
194
195    return args
196
197def read_ecenv(filepath):
198    '''Reads the file into a dictionary where the key values are the parameter
199    names.
200
201    Parameters
202    ----------
203    filepath : :obj:`string`
204        Path to file where the ECMWF environment parameters are stored.
205
206    Return
207    ------
208    envs : :obj:`dictionary`
209        Contains the environment parameter ecuid, ecgid, gateway
210        and destination for ECMWF server environments.
211    '''
212    envs= {}
213    try:
214        with open(filepath, 'r') as f:
215            for line in f:
216                data = line.strip().split()
217                envs[str(data[0])] = str(data[1])
218    except OSError as e:
219        print('... ERROR CODE: ' + str(e.errno))
220        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
221
222        sys.exit('\n... Error occured while trying to read ECMWF_ENV '
223                     'file: ' + str(filepath))
224
225    return envs
226
227def clean_up(c):
228    '''Remove files from the intermediate directory (inputdir).
229
230    It keeps the final FLEXPART input files if program runs without
231    ECMWF Api and keywords "ectrans" or "ecstorage" are set to "1".
232
233    Parameters
234    ----------
235    c : :obj:`ControlFile`
236        Contains all the parameters of CONTROL file and
237        command line.
238
239    Return
240    ------
241
242    '''
243
244    print("... clean inputdir!")
245
246    cleanlist = glob.glob(os.path.join(c.inputdir, "*"))
247
248    if cleanlist:
249        for element in cleanlist:
250            if c.prefix not in element:
251                silent_remove(element)
252            if c.ecapi is False and (c.ectrans == 1 or c.ecstorage == 1):
253                silent_remove(element)
254        print("... done!")
255    else:
256        print("... nothing to clean!")
257
258    return
259
260
261def my_error(users, message='ERROR'):
262    '''Prints a specified error message which can be passed to the function
263    before exiting the program.
264
265    Parameters
266    ----------
267    user : :obj:`list` of :obj:`string`
268        Contains all email addresses which should be notified.
269        It might also contain just the ecmwf user name which wil trigger
270        mailing to the associated email address for this user.
271
272    message : :obj:`string`, optional
273        Error message. Default value is "ERROR".
274
275    Return
276    ------
277
278    '''
279
280    trace = '\n'.join(traceback.format_stack())
281    full_message = message + '\n\n' + trace
282
283    print(full_message)
284
285    send_mail(users, 'ERROR', full_message)
286
287    sys.exit(1)
288
289    return
290
291
292def send_mail(users, success_mode, message):
293    '''Prints a specific exit message which can be passed to the function.
294
295    Parameters
296    ----------
297    users : :obj:`list` of :obj:`string`
298        Contains all email addresses which should be notified.
299        It might also contain just the ecmwf user name which wil trigger
300        mailing to the associated email address for this user.
301
302    success_mode : :obj:``string`
303        States the exit mode of the program to put into
304        the mail subject line.
305
306    message : :obj:`string`, optional
307        Message for exiting program. Default value is "Done!".
308
309    Return
310    ------
311
312    '''
313
314    for user in users:
315        if '${USER}' in user:
316            user = os.getenv('USER')
317        try:
318            p = subprocess.Popen(['mail', '-s flex_extract_v7.1 ' +
319                                  success_mode, os.path.expandvars(user)],
320                                 stdin=subprocess.PIPE,
321                                 stdout=subprocess.PIPE,
322                                 stderr=subprocess.PIPE,
323                                 bufsize=1)
324            pout = p.communicate(input=message + '\n\n')[0]
325        except ValueError as e:
326            print('... ERROR: ' + str(e))
327            sys.exit('... Email could not be sent!')
328        except OSError as e:
329            print('... ERROR CODE: ' + str(e.errno))
330            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
331            sys.exit('... Email could not be sent!')
332        else:
333            print('Email sent to ' + os.path.expandvars(user))
334
335    return
336
337
338def normal_exit(message='Done!'):
339    '''Prints a specific exit message which can be passed to the function.
340
341    Parameters
342    ----------
343    message : :obj:`string`, optional
344        Message for exiting program. Default value is "Done!".
345
346    Return
347    ------
348
349    '''
350
351    print(str(message))
352
353    return
354
355
356def product(*args, **kwds):
357    '''Creates combinations of all passed arguments.
358
359    This method combines the single characters of the passed arguments
360    with each other. So that each character of each argument value
361    will be combined with each character of the other arguments as a tuple.
362
363    Note
364    ----
365    This method is taken from an example at the ECMWF wiki website.
366    https://software.ecmwf.int/wiki/display/GRIB/index.py; 2018-03-16
367
368    Example
369    -------
370    product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy
371
372    product(range(2), repeat = 3) --> 000 001 010 011 100 101 110 111
373
374    Parameters
375    ----------
376    \*args : :obj:`list` or :obj:`string`
377        Positional arguments (arbitrary number).
378
379    \*\*kwds : :obj:`dictionary`
380        Contains all the keyword arguments from \*args.
381
382    Return
383    ------
384    prod : :obj:`tuple`
385        Return will be done with "yield". A tuple of combined arguments.
386        See example in description above.
387    '''
388    try:
389        pools = map(tuple, args) * kwds.get('repeat', 1)
390        result = [[]]
391        for pool in pools:
392            result = [x + [y] for x in result for y in pool]
393        for prod in result:
394            yield tuple(prod)
395    except TypeError as e:
396        sys.exit('... PRODUCT GENERATION FAILED!')
397
398    return
399
400
401def silent_remove(filename):
402    '''Remove file if it exists.
403    The function does not fail if the file does not exist.
404
405    Parameters
406    ----------
407    filename : :obj:`string`
408        The name of the file to be removed without notification.
409
410    Return
411    ------
412
413    '''
414    try:
415        os.remove(filename)
416    except OSError as e:
417        # errno.ENOENT  =  no such file or directory
418        if e.errno == errno.ENOENT:
419            pass
420        else:
421            raise  # re-raise exception if a different error occured
422
423    return
424
425
426def init128(filepath):
427    '''Opens and reads the grib file with table 128 information.
428
429    Parameters
430    ----------
431    filepath : :obj:`string`
432        Path to file of ECMWF grib table number 128.
433
434    Return
435    ------
436    table128 : :obj:`dictionary`
437        Contains the ECMWF grib table 128 information.
438        The key is the parameter number and the value is the
439        short name of the parameter.
440    '''
441    table128 = dict()
442    try:
443        with open(filepath) as f:
444            fdata = f.read().split('\n')
445    except OSError as e:
446        print('... ERROR CODE: ' + str(e.errno))
447        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
448
449        sys.exit('\n... Error occured while trying to read parameter '
450                 'table file: ' + str(filepath))
451    else:
452        for data in fdata:
453            if data[0] != '!':
454                table128[data[0:3]] = data[59:64].strip()
455
456    return table128
457
458
459def to_param_id(pars, table):
460    '''Transform parameter names to parameter ids with ECMWF grib table 128.
461
462    Parameters
463    ----------
464    pars : :obj:`string`
465        Addpar argument from CONTROL file in the format of
466        parameter names instead of ids. The parameter short
467        names are sepearted with "/" and they are passed as
468        one single string.
469
470    table : :obj:`dictionary`
471        Contains the ECMWF grib table 128 information.
472        The key is the parameter number and the value is the
473        short name of the parameter.
474
475    Return
476    ------
477    ipar : :obj:`list` of :obj:`integer`
478        List of addpar parameters from CONTROL file transformed to
479        parameter ids in the format of integer.
480    '''
481    if not pars:
482        return []
483    if not isinstance(pars, str):
484        pars=str(pars)
485
486    cpar = pars.upper().split('/')
487    ipar = []
488    for par in cpar:
489        for k, v in table.iteritems():
490            if par == k or par == v:
491                ipar.append(int(k))
492                break
493        else:
494            print('Warning: par ' + par + ' not found in table 128')
495
496    return ipar
497
498def get_list_as_string(list_obj, concatenate_sign=', '):
499    '''Converts a list of arbitrary content into a single string.
500
501    Parameters
502    ----------
503    list_obj : :obj:`list`
504        A list with arbitrary content.
505
506    concatenate_sign : :obj:`string`, optional
507        A string which is used to concatenate the single
508        list elements. Default value is ", ".
509
510    Return
511    ------
512    str_of_list : :obj:`string`
513        The content of the list as a single string.
514    '''
515
516    if not isinstance(list_obj, list):
517        list_obj = list(list_obj)
518    str_of_list = concatenate_sign.join(str(l) for l in list_obj)
519
520    return str_of_list
521
522def make_dir(directory):
523    '''Creates a directory.
524
525    It gives a warning if the directory already exists and skips process.
526    The program stops only if there is another problem.
527
528    Parameters
529    ----------
530    directory : :obj:`string`
531        The path to directory which should be created.
532
533    Return
534    ------
535
536    '''
537    try:
538        os.makedirs(directory)
539    except OSError as e:
540        # errno.EEXIST = directory already exists
541        if e.errno == errno.EEXIST:
542            print('WARNING: Directory {0} already exists!'.format(directory))
543        else:
544            raise # re-raise exception if a different error occured
545
546    return
547
548def put_file_to_ecserver(ecd, filename, target, ecuid, ecgid):
549    '''Uses the ecaccess-file-put command to send a file to the ECMWF servers.
550
551    Note
552    ----
553    The return value is just for testing reasons. It does not have
554    to be used from the calling function since the whole error handling
555    is done in here.
556
557    Parameters
558    ----------
559    ecd : :obj:`string`
560        The path were the file is stored.
561
562    filename : :obj:`string`
563        The name of the file to send to the ECMWF server.
564
565    target : :obj:`string`
566        The target queue where the file should be sent to.
567
568    ecuid : :obj:`string`
569        The user id on ECMWF server.
570
571    ecgid : :obj:`string`
572        The group id on ECMWF server.
573
574    Return
575    ------
576
577    '''
578
579    try:
580        subprocess.check_output(['ecaccess-file-put',
581                                 ecd + '/' + filename,
582                                 target + ':/home/ms/' +
583                                 ecgid + '/' + ecuid +
584                                 '/' + filename],
585                                stderr=subprocess.STDOUT)
586    except subprocess.CalledProcessError as e:
587        print('... ERROR CODE: ' + str(e.returncode))
588        print('... ERROR MESSAGE:\n \t ' + str(e))
589
590        print('\n... Do you have a valid ecaccess certification key?')
591        sys.exit('... ECACCESS-FILE-PUT FAILED!')
592    except OSError as e:
593        print('... ERROR CODE: ' + str(e.errno))
594        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
595
596        print('\n... Most likely the ECACCESS library is not available!')
597        sys.exit('... ECACCESS-FILE-PUT FAILED!')
598
599    return
600
601def submit_job_to_ecserver(target, jobname):
602    '''Uses ecaccess-job-submit command to submit a job to the ECMWF server.
603
604    Note
605    ----
606    The return value is just for testing reasons. It does not have
607    to be used from the calling function since the whole error handling
608    is done in here.
609
610    Parameters
611    ----------
612    target : :obj:`string`
613        The target where the file should be sent to, e.g. the queue.
614
615    jobname : :obj:`string`
616        The name of the jobfile to be submitted to the ECMWF server.
617
618    Return
619    ------
620    job_id : :obj:`int`
621        The id number of the job as a reference at the ecmwf server.
622    '''
623
624    try:
625        job_id = subprocess.check_output(['ecaccess-job-submit', '-queueName',
626                                          target, jobname])
627
628    except subprocess.CalledProcessError as e:
629        print('... ERROR CODE: ' + str(e.returncode))
630        print('... ERROR MESSAGE:\n \t ' + str(e))
631
632        print('\n... Do you have a valid ecaccess certification key?')
633        sys.exit('... ECACCESS-JOB-SUBMIT FAILED!')
634    except OSError as e:
635        print('... ERROR CODE: ' + str(e.errno))
636        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
637
638        print('\n... Most likely the ECACCESS library is not available!')
639        sys.exit('... ECACCESS-JOB-SUBMIT FAILED!')
640
641    return job_id
642
643
644def get_informations(filename):
645    '''Gets basic information from an example grib file.
646
647    These information are important for later use and the
648    initialization of numpy arrays for data storing.
649
650    Parameters
651    ----------
652    filename : :obj:`string`
653            Name of the file which will be opened to extract basic information.
654
655    Return
656    ------
657    data : :obj:`dictionary`
658        Contains basic informations of the ECMWF grib files, e.g.
659        'Ni', 'Nj', 'latitudeOfFirstGridPointInDegrees',
660        'longitudeOfFirstGridPointInDegrees', 'latitudeOfLastGridPointInDegrees',
661        'longitudeOfLastGridPointInDegrees', 'jDirectionIncrementInDegrees',
662        'iDirectionIncrementInDegrees', 'missingValue'
663    '''
664    from eccodes import codes_grib_new_from_file, codes_get, codes_release
665
666    data = {}
667
668    # --- open file ---
669    print("Opening file for getting information data --- %s" % filename)
670    with open(filename) as f:
671        # load first message from file
672        gid = codes_grib_new_from_file(f)
673
674        # information needed from grib message
675        keys = [
676                'Ni',
677                'Nj',
678                'latitudeOfFirstGridPointInDegrees',
679                'longitudeOfFirstGridPointInDegrees',
680                'latitudeOfLastGridPointInDegrees',
681                'longitudeOfLastGridPointInDegrees',
682                'jDirectionIncrementInDegrees',
683                'iDirectionIncrementInDegrees',
684                'missingValue',
685               ]
686
687        print('\nInformations are: ')
688        for key in keys:
689            # Get the value of the key in a grib message.
690            data[key] = codes_get(gid,key)
691            print("%s = %s" % (key,data[key]))
692
693        # Free the memory for the message referred as gribid.
694        codes_release(gid)
695
696    return data
697
698
699def get_dimensions(info, purefc, dtime, index_vals, start_date, end_date):
700    '''This function specifies the correct dimensions for x, y and t.
701
702    Parameters
703    ----------
704    info : :obj:`dictionary`
705        Contains basic informations of the ECMWF grib files, e.g.
706        'Ni', 'Nj', 'latitudeOfFirstGridPointInDegrees',
707        'longitudeOfFirstGridPointInDegrees', 'latitudeOfLastGridPointInDegrees',
708        'longitudeOfLastGridPointInDegrees', 'jDirectionIncrementInDegrees',
709        'iDirectionIncrementInDegrees', 'missingValue'
710
711    purefc : :obj:`integer`
712        Switch for definition of pure forecast mode or not.
713
714    dtime : :obj:`string`
715        Time step in hours.
716
717    index_vals : :obj:`list`
718        Contains the values from the keys used for a distinct selection
719        of grib messages in processing  the grib files.
720        Content looks like e.g.:
721        index_vals[0]: ('20171106', '20171107', '20171108') ; date
722        index_vals[1]: ('0', '1200', '1800', '600') ; time
723        index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
724
725    start_date : :obj:`string`
726        The start date of the retrieval job.
727
728    end_date : :obj:`string`
729        The end date of the retrieval job.
730
731    Return
732    ------
733    (ix, jy, it) : :obj:`tuple` of :obj:`integer`
734        Dimension in x-direction, y-direction and in time.
735    '''
736
737    ix = info['Ni']
738
739    jy = info['Nj']
740
741    if not purefc:
742        it = ((end_date - start_date).days + 1) * 24/int(dtime)
743    else:
744        # #no of step * #no of times * #no of days
745        it = len(index_vals[2]) * len(index_vals[1]) * len(index_vals[0])
746
747    return (ix, jy, it)
748
749
750def execute_subprocess(cmd_list, error_msg='SUBPROCESS FAILED!'):
751    '''Executes a command line instruction via a subprocess.
752
753    Error handling is done if an error occures.
754
755    Parameters
756    ----------
757    cmd_list : :obj:`list` of `:obj:`string`
758        A list of the components for the command line execution. Each
759        list entry is a single part of the command which is seperated from
760        the rest by a blank space.
761        E.g. ['mv', file1, file2]
762
763    Return
764    ------
765    error_msg : :obj:`string`, optional
766        The possible error message if the subprocess failed.
767        By default it will just tell "SUBPROCESS FAILED!".
768    '''
769
770    try:
771        subprocess.check_call(cmd_list)
772    except subprocess.CalledProcessError as e:
773        print('... ERROR CODE: ' + str(e.returncode))
774        print('... ERROR MESSAGE:\n \t ' + str(e))
775
776        sys.exit('... ' + error_msg)
777    except OSError as e:
778        print('... ERROR CODE: ' + str(e.errno))
779        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
780
781        sys.exit('... ' + error_msg)
782
783    return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG