source: flex_extract.git/source/python/mods/tools.py @ bf48c8a

ctbtodev
Last change on this file since bf48c8a was bf48c8a, checked in by Anne Philipp <anne.philipp@…>, 5 years ago

introduced a function for subprocess check_call to do error handling once

  • Property mode set to 100644
File size: 24.0 KB
Line 
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#*******************************************************************************
4# @Author: Anne Philipp (University of Vienna)
5#
6# @Date: May 2018
7#
8# @Change History:
9#    October 2014 - Anne Fouilloux (University of Oslo)
10#        - created functions silent_remove and product (taken from ECMWF)
11#
12#    November 2015 - Leopold Haimberger (University of Vienna)
13#        - created functions: interpret_args_and_control, clean_up
14#          my_error, normal_exit, init128, to_param_id
15#
16#    April 2018 - Anne Philipp (University of Vienna):
17#        - applied PEP8 style guide
18#        - added documentation
19#        - moved all functions from file Flexparttools to this file tools
20#        - added function get_list_as_string
21#        - seperated args and control interpretation
22#
23# @License:
24#    (C) Copyright 2014-2018.
25#
26#    This software is licensed under the terms of the Apache Licence Version 2.0
27#    which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
28#
29# @Modul Description:
30#    This module contains a couple of helpful functions which are
31#    used in different places in flex_extract.
32#
33# @Module Content:
34#    - get_cmdline_args
35#    - clean_up
36#    - my_error
37#    - normal_exit
38#    - product
39#    - silent_remove
40#    - init128
41#    - to_param_id
42#    - get_list_as_string
43#    - make_dir
44#
45#*******************************************************************************
46
47# ------------------------------------------------------------------------------
48# MODULES
49# ------------------------------------------------------------------------------
50import os
51import errno
52import sys
53import glob
54import subprocess
55import traceback
56import exceptions
57from datetime import datetime
58from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
59
60# ------------------------------------------------------------------------------
61# FUNCTIONS
62# ------------------------------------------------------------------------------
63
64def none_or_str(value):
65    '''Converts the input string into pythons None-type if the string
66    contains string "None".
67
68    Parameters
69    ----------
70    value : :obj:`string`
71        String to be checked for the "None" word.
72
73    Return
74    ------
75    None or value:
76        Return depends on the content of the input value. If it was "None",
77        then the python type None is returned. Otherwise the string itself.
78    '''
79    if value == 'None':
80        return None
81    return value
82
83def none_or_int(value):
84    '''Converts the input string into pythons None-type if the string
85    contains string "None". Otherwise it is converted to an integer value.
86
87    Parameters
88    ----------
89    value : :obj:`string`
90        String to be checked for the "None" word.
91
92    Return
93    ------
94    None or int(value):
95        Return depends on the content of the input value. If it was "None",
96        then the python type None is returned. Otherwise the string is
97        converted into an integer value.
98    '''
99    if value == 'None':
100        return None
101    return int(value)
102
103def get_cmdline_args():
104    '''Decomposes the command line arguments and assigns them to variables.
105    Apply default values for non mentioned arguments.
106
107    Parameters
108    ----------
109
110    Return
111    ------
112    args : :obj:`Namespace`
113        Contains the commandline arguments from script/program call.
114    '''
115
116    parser = ArgumentParser(description='Retrieve FLEXPART input from \
117                                ECMWF MARS archive',
118                            formatter_class=ArgumentDefaultsHelpFormatter)
119
120    # control parameters that override control file values
121    parser.add_argument("--start_date", dest="start_date",
122                        type=none_or_str, default=None,
123                        help="start date YYYYMMDD")
124    parser.add_argument("--end_date", dest="end_date",
125                        type=none_or_str, default=None,
126                        help="end_date YYYYMMDD")
127    parser.add_argument("--date_chunk", dest="date_chunk",
128                        type=none_or_int, default=None,
129                        help="# of days to be retrieved at once")
130    parser.add_argument("--controlfile", dest="controlfile",
131                        type=none_or_str, default='CONTROL.temp',
132                        help="file with CONTROL parameters")
133    parser.add_argument("--basetime", dest="basetime",
134                        type=none_or_int, default=None,
135                        help="base such as 00 or 12 (for half day retrievals)")
136    parser.add_argument("--step", dest="step",
137                        type=none_or_str, default=None,
138                        help="steps such as 00/to/48")
139    parser.add_argument("--levelist", dest="levelist",
140                        type=none_or_str, default=None,
141                        help="Vertical levels to be retrieved, e.g. 30/to/60")
142    parser.add_argument("--area", dest="area",
143                        type=none_or_str, default=None,
144                        help="area defined as north/west/south/east")
145
146    # some switches
147    parser.add_argument("--debug", dest="debug",
148                        type=none_or_int, default=None,
149                        help="debug mode - leave temporary files intact")
150    parser.add_argument("--request", dest="request",
151                        type=none_or_int, default=None,
152                        help="list all mars requests in file mars_requests.dat")
153    parser.add_argument("--public", dest="public",
154                        type=none_or_int, default=None,
155                        help="public mode - retrieves the public datasets")
156    parser.add_argument("--rrint", dest="rrint",
157                        type=none_or_int, default=None,
158                        help="select old or new precipitation interpolation \
159                        0 - old method\
160                        1 - new method (additional subgrid points)")
161
162    # set directories
163    parser.add_argument("--inputdir", dest="inputdir",
164                        type=none_or_str, default=None,
165                        help="root directory for storing intermediate files")
166    parser.add_argument("--outputdir", dest="outputdir",
167                        type=none_or_str, default=None,
168                        help="root directory for storing output files")
169    parser.add_argument("--flexpartdir", dest="flexpartdir",
170                        type=none_or_str, default=None,
171                        help="FLEXPART root directory (to find grib2flexpart \
172                        and COMMAND file)\n Normally flex_extract resides in \
173                        the scripts directory of the FLEXPART distribution")
174
175    # this is only used by prepare_flexpart.py to rerun a postprocessing step
176    parser.add_argument("--ppid", dest="ppid",
177                        type=none_or_str, default=None,
178                        help="specify parent process id for \
179                        rerun of prepare_flexpart")
180
181    # arguments for job submission to ECMWF, only needed by submit.py
182    parser.add_argument("--job_template", dest='job_template',
183                        type=none_or_str, default="job.temp",
184                        help="job template file for submission to ECMWF")
185    parser.add_argument("--queue", dest="queue",
186                        type=none_or_str, default=None,
187                        help="queue for submission to ECMWF \
188                        (e.g. ecgate or cca )")
189
190    args = parser.parse_args()
191
192    return args
193
194def read_ecenv(filepath):
195    '''Reads the file into a dictionary where the key values are the parameter
196    names.
197
198    Parameters
199    ----------
200    filepath : :obj:`string`
201        Path to file where the ECMWF environment parameters are stored.
202
203    Return
204    ------
205    envs : :obj:`dictionary`
206        Contains the environment parameter ecuid, ecgid, gateway
207        and destination for ECMWF server environments.
208    '''
209    envs= {}
210    try:
211        with open(filepath, 'r') as f:
212            for line in f:
213                data = line.strip().split()
214                envs[str(data[0])] = str(data[1])
215    except OSError as e:
216        print('... ERROR CODE: ' + str(e.errno))
217        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
218
219        sys.exit('\n... Error occured while trying to read ECMWF_ENV '
220                     'file: ' + str(filepath))
221
222    return envs
223
224def clean_up(c):
225    '''Remove files from the intermediate directory (inputdir).
226
227    It keeps the final FLEXPART input files if program runs without
228    ECMWF Api and keywords "ectrans" or "ecstorage" are set to "1".
229
230    Parameters
231    ----------
232    c : :obj:`ControlFile`
233        Contains all the parameters of CONTROL file and
234        command line.
235
236    Return
237    ------
238
239    '''
240
241    print("... clean inputdir!")
242
243    cleanlist = glob.glob(os.path.join(c.inputdir, "*"))
244
245    if cleanlist:
246        for element in cleanlist:
247            if c.prefix not in element:
248                silent_remove(element)
249            if c.ecapi is False and (c.ectrans == 1 or c.ecstorage == 1):
250                silent_remove(element)
251        print("... done!")
252    else:
253        print("... nothing to clean!")
254
255    return
256
257
258def my_error(users, message='ERROR'):
259    '''Prints a specified error message which can be passed to the function
260    before exiting the program.
261
262    Parameters
263    ----------
264    user : :obj:`list` of :obj:`string`
265        Contains all email addresses which should be notified.
266        It might also contain just the ecmwf user name which wil trigger
267        mailing to the associated email address for this user.
268
269    message : :obj:`string`, optional
270        Error message. Default value is "ERROR".
271
272    Return
273    ------
274
275    '''
276
277    trace = '\n'.join(traceback.format_stack())
278    full_message = message + '\n\n' + trace
279
280    print(full_message)
281
282    send_mail(users, 'ERROR', full_message)
283
284    sys.exit(1)
285
286    return
287
288
289def send_mail(users, success_mode, message):
290    '''Prints a specific exit message which can be passed to the function.
291
292    Parameters
293    ----------
294    users : :obj:`list` of :obj:`string`
295        Contains all email addresses which should be notified.
296        It might also contain just the ecmwf user name which wil trigger
297        mailing to the associated email address for this user.
298
299    success_mode : :obj:``string`
300        States the exit mode of the program to put into
301        the mail subject line.
302
303    message : :obj:`string`, optional
304        Message for exiting program. Default value is "Done!".
305
306    Return
307    ------
308
309    '''
310
311    for user in users:
312        if '${USER}' in user:
313            user = os.getenv('USER')
314        try:
315            p = subprocess.Popen(['mail', '-s flex_extract_v7.1 ' +
316                                  success_mode, os.path.expandvars(user)],
317                                 stdin=subprocess.PIPE,
318                                 stdout=subprocess.PIPE,
319                                 stderr=subprocess.PIPE,
320                                 bufsize=1)
321            pout = p.communicate(input=message + '\n\n')[0]
322        except ValueError as e:
323            print('... ERROR: ' + str(e))
324            sys.exit('... Email could not be sent!')
325        except OSError as e:
326            print('... ERROR CODE: ' + str(e.errno))
327            print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
328            sys.exit('... Email could not be sent!')
329        else:
330            print('Email sent to ' + os.path.expandvars(user))
331
332    return
333
334
335def normal_exit(message='Done!'):
336    '''Prints a specific exit message which can be passed to the function.
337
338    Parameters
339    ----------
340    message : :obj:`string`, optional
341        Message for exiting program. Default value is "Done!".
342
343    Return
344    ------
345
346    '''
347
348    print(str(message))
349
350    return
351
352
353def product(*args, **kwds):
354    '''Creates combinations of all passed arguments.
355
356    This method combines the single characters of the passed arguments
357    with each other. So that each character of each argument value
358    will be combined with each character of the other arguments as a tuple.
359
360    Note
361    ----
362    This method is taken from an example at the ECMWF wiki website.
363    https://software.ecmwf.int/wiki/display/GRIB/index.py; 2018-03-16
364
365    Example
366    -------
367    product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy
368
369    product(range(2), repeat = 3) --> 000 001 010 011 100 101 110 111
370
371    Parameters
372    ----------
373    \*args : :obj:`list` or :obj:`string`
374        Positional arguments (arbitrary number).
375
376    \*\*kwds : :obj:`dictionary`
377        Contains all the keyword arguments from \*args.
378
379    Return
380    ------
381    prod : :obj:`tuple`
382        Return will be done with "yield". A tuple of combined arguments.
383        See example in description above.
384    '''
385    try:
386        pools = map(tuple, args) * kwds.get('repeat', 1)
387        result = [[]]
388        for pool in pools:
389            result = [x + [y] for x in result for y in pool]
390        for prod in result:
391            yield tuple(prod)
392    except TypeError as e:
393        sys.exit('... PRODUCT GENERATION FAILED!')
394
395    return
396
397
398def silent_remove(filename):
399    '''Remove file if it exists.
400    The function does not fail if the file does not exist.
401
402    Parameters
403    ----------
404    filename : :obj:`string`
405        The name of the file to be removed without notification.
406
407    Return
408    ------
409
410    '''
411    try:
412        os.remove(filename)
413    except OSError as e:
414        # errno.ENOENT  =  no such file or directory
415        if e.errno == errno.ENOENT:
416            pass
417        else:
418            raise  # re-raise exception if a different error occured
419
420    return
421
422
423def init128(filepath):
424    '''Opens and reads the grib file with table 128 information.
425
426    Parameters
427    ----------
428    filepath : :obj:`string`
429        Path to file of ECMWF grib table number 128.
430
431    Return
432    ------
433    table128 : :obj:`dictionary`
434        Contains the ECMWF grib table 128 information.
435        The key is the parameter number and the value is the
436        short name of the parameter.
437    '''
438    table128 = dict()
439    try:
440        with open(filepath) as f:
441            fdata = f.read().split('\n')
442    except OSError as e:
443        print('... ERROR CODE: ' + str(e.errno))
444        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
445
446        sys.exit('\n... Error occured while trying to read parameter '
447                 'table file: ' + str(filepath))
448    else:
449        for data in fdata:
450            if data[0] != '!':
451                table128[data[0:3]] = data[59:64].strip()
452
453    return table128
454
455
456def to_param_id(pars, table):
457    '''Transform parameter names to parameter ids with ECMWF grib table 128.
458
459    Parameters
460    ----------
461    pars : :obj:`string`
462        Addpar argument from CONTROL file in the format of
463        parameter names instead of ids. The parameter short
464        names are sepearted with "/" and they are passed as
465        one single string.
466
467    table : :obj:`dictionary`
468        Contains the ECMWF grib table 128 information.
469        The key is the parameter number and the value is the
470        short name of the parameter.
471
472    Return
473    ------
474    ipar : :obj:`list` of :obj:`integer`
475        List of addpar parameters from CONTROL file transformed to
476        parameter ids in the format of integer.
477    '''
478    if not pars:
479        return []
480    if not isinstance(pars, str):
481        pars=str(pars)
482
483    cpar = pars.upper().split('/')
484    ipar = []
485    for par in cpar:
486        for k, v in table.iteritems():
487            if par == k or par == v:
488                ipar.append(int(k))
489                break
490        else:
491            print('Warning: par ' + par + ' not found in table 128')
492
493    return ipar
494
495def get_list_as_string(list_obj, concatenate_sign=', '):
496    '''Converts a list of arbitrary content into a single string.
497
498    Parameters
499    ----------
500    list_obj : :obj:`list`
501        A list with arbitrary content.
502
503    concatenate_sign : :obj:`string`, optional
504        A string which is used to concatenate the single
505        list elements. Default value is ", ".
506
507    Return
508    ------
509    str_of_list : :obj:`string`
510        The content of the list as a single string.
511    '''
512
513    if not isinstance(list_obj, list):
514        list_obj = list(list_obj)
515    str_of_list = concatenate_sign.join(str(l) for l in list_obj)
516
517    return str_of_list
518
519def make_dir(directory):
520    '''Creates a directory.
521
522    It gives a warning if the directory already exists and skips process.
523    The program stops only if there is another problem.
524
525    Parameters
526    ----------
527    directory : :obj:`string`
528        The path to directory which should be created.
529
530    Return
531    ------
532
533    '''
534    try:
535        os.makedirs(directory)
536    except OSError as e:
537        # errno.EEXIST = directory already exists
538        if e.errno == errno.EEXIST:
539            print('WARNING: Directory {0} already exists!'.format(directory))
540        else:
541            raise # re-raise exception if a different error occured
542
543    return
544
545def put_file_to_ecserver(ecd, filename, target, ecuid, ecgid):
546    '''Uses the ecaccess-file-put command to send a file to the ECMWF servers.
547
548    Note
549    ----
550    The return value is just for testing reasons. It does not have
551    to be used from the calling function since the whole error handling
552    is done in here.
553
554    Parameters
555    ----------
556    ecd : :obj:`string`
557        The path were the file is stored.
558
559    filename : :obj:`string`
560        The name of the file to send to the ECMWF server.
561
562    target : :obj:`string`
563        The target queue where the file should be sent to.
564
565    ecuid : :obj:`string`
566        The user id on ECMWF server.
567
568    ecgid : :obj:`string`
569        The group id on ECMWF server.
570
571    Return
572    ------
573
574    '''
575
576    try:
577        subprocess.check_output(['ecaccess-file-put',
578                                 ecd + '/' + filename,
579                                 target + ':/home/ms/' +
580                                 ecgid + '/' + ecuid +
581                                 '/' + filename],
582                                stderr=subprocess.STDOUT)
583    except subprocess.CalledProcessError as e:
584        print('... ERROR CODE: ' + str(e.returncode))
585        print('... ERROR MESSAGE:\n \t ' + str(e))
586
587        print('\n... Do you have a valid ecaccess certification key?')
588        sys.exit('... ECACCESS-FILE-PUT FAILED!')
589    except OSError as e:
590        print('... ERROR CODE: ' + str(e.errno))
591        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
592
593        print('\n... Most likely the ECACCESS library is not available!')
594        sys.exit('... ECACCESS-FILE-PUT FAILED!')
595
596    return
597
598def submit_job_to_ecserver(target, jobname):
599    '''Uses ecaccess-job-submit command to submit a job to the ECMWF server.
600
601    Note
602    ----
603    The return value is just for testing reasons. It does not have
604    to be used from the calling function since the whole error handling
605    is done in here.
606
607    Parameters
608    ----------
609    target : :obj:`string`
610        The target where the file should be sent to, e.g. the queue.
611
612    jobname : :obj:`string`
613        The name of the jobfile to be submitted to the ECMWF server.
614
615    Return
616    ------
617    job_id : :obj:`int`
618        The id number of the job as a reference at the ecmwf server.
619    '''
620
621    try:
622        job_id = subprocess.check_output(['ecaccess-job-submit', '-queueName',
623                                          target, jobname])
624
625    except subprocess.CalledProcessError as e:
626        print('... ERROR CODE: ' + str(e.returncode))
627        print('... ERROR MESSAGE:\n \t ' + str(e))
628
629        print('\n... Do you have a valid ecaccess certification key?')
630        sys.exit('... ECACCESS-JOB-SUBMIT FAILED!')
631    except OSError as e:
632        print('... ERROR CODE: ' + str(e.errno))
633        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
634
635        print('\n... Most likely the ECACCESS library is not available!')
636        sys.exit('... ECACCESS-JOB-SUBMIT FAILED!')
637
638    return job_id
639
640
641def get_informations(filename):
642    '''Gets basic information from an example grib file.
643
644    These information are important for later use and the
645    initialization of numpy arrays for data storing.
646
647    Parameters
648    ----------
649    filename : :obj:`string`
650            Name of the file which will be opened to extract basic information.
651
652    Return
653    ------
654    data : :obj:`dictionary`
655        Contains basic informations of the ECMWF grib files, e.g.
656        'Ni', 'Nj', 'latitudeOfFirstGridPointInDegrees',
657        'longitudeOfFirstGridPointInDegrees', 'latitudeOfLastGridPointInDegrees',
658        'longitudeOfLastGridPointInDegrees', 'jDirectionIncrementInDegrees',
659        'iDirectionIncrementInDegrees', 'missingValue'
660    '''
661    from eccodes import codes_grib_new_from_file, codes_get, codes_release
662
663    data = {}
664
665    # --- open file ---
666    print("Opening file for getting information data --- %s" % filename)
667    with open(filename) as f:
668        # load first message from file
669        gid = codes_grib_new_from_file(f)
670
671        # information needed from grib message
672        keys = [
673                'Ni',
674                'Nj',
675                'latitudeOfFirstGridPointInDegrees',
676                'longitudeOfFirstGridPointInDegrees',
677                'latitudeOfLastGridPointInDegrees',
678                'longitudeOfLastGridPointInDegrees',
679                'jDirectionIncrementInDegrees',
680                'iDirectionIncrementInDegrees',
681                'missingValue',
682               ]
683
684        print('\nInformations are: ')
685        for key in keys:
686            # Get the value of the key in a grib message.
687            data[key] = codes_get(gid,key)
688            print("%s = %s" % (key,data[key]))
689
690        # Free the memory for the message referred as gribid.
691        codes_release(gid)
692
693    return data
694
695
696def get_dimensions(info, purefc, dtime, index_vals, start_date, end_date):
697    '''This function specifies the correct dimensions for x, y and t.
698
699    Parameters
700    ----------
701    info : :obj:`dictionary`
702        Contains basic informations of the ECMWF grib files, e.g.
703        'Ni', 'Nj', 'latitudeOfFirstGridPointInDegrees',
704        'longitudeOfFirstGridPointInDegrees', 'latitudeOfLastGridPointInDegrees',
705        'longitudeOfLastGridPointInDegrees', 'jDirectionIncrementInDegrees',
706        'iDirectionIncrementInDegrees', 'missingValue'
707
708    purefc : :obj:`integer`
709        Switch for definition of pure forecast mode or not.
710
711    dtime : :obj:`string`
712        Time step in hours.
713
714    index_vals : :obj:`list`
715        Contains the values from the keys used for a distinct selection
716        of grib messages in processing  the grib files.
717        Content looks like e.g.:
718        index_vals[0]: ('20171106', '20171107', '20171108') ; date
719        index_vals[1]: ('0', '1200', '1800', '600') ; time
720        index_vals[2]: ('0', '12', '3', '6', '9') ; stepRange
721
722    start_date : :obj:`string`
723        The start date of the retrieval job.
724
725    end_date : :obj:`string`
726        The end date of the retrieval job.
727
728    Return
729    ------
730    (ix, jy, it) : :obj:`tuple` of :obj:`integer`
731        Dimension in x-direction, y-direction and in time.
732    '''
733
734    ix = info['Ni']
735
736    jy = info['Nj']
737
738    if not purefc:
739        it = ((end_date - start_date).days + 1) * 24/int(dtime)
740    else:
741        # #no of step * #no of times * #no of days
742        it = len(index_vals[2]) * len(index_vals[1]) * len(index_vals[0])
743
744    return (ix, jy, it)
745
746
747def execute_subprocess(cmd_list, error_msg='SUBPROCESS FAILED!'):
748    '''Executes a command line instruction via a subprocess.
749
750    Error handling is done if an error occures.
751
752    Parameters
753    ----------
754    cmd_list : :obj:`list` of `:obj:`string`
755        A list of the components for the command line execution. Each
756        list entry is a single part of the command which is seperated from
757        the rest by a blank space.
758        E.g. ['mv', file1, file2]
759
760    Return
761    ------
762    error_msg : :obj:`string`, optional
763        The possible error message if the subprocess failed.
764        By default it will just tell "SUBPROCESS FAILED!".
765    '''
766
767    try:
768        subprocess.check_call(cmd_list)
769    except subprocess.CalledProcessError as e:
770        print('... ERROR CODE: ' + str(e.returncode))
771        print('... ERROR MESSAGE:\n \t ' + str(e))
772
773        sys.exit('... ' + error_msg)
774    except OSError as e:
775        print('... ERROR CODE: ' + str(e.errno))
776        print('... ERROR MESSAGE:\n \t ' + str(e.strerror))
777
778        sys.exit('... ' + error_msg)
779
780    return
Note: See TracBrowser for help on using the repository browser.
hosted by ZAMG