Changeset 27fe969 in flex_extract.git for source/python/classes/EcFlexpart.py
- Timestamp:
- Oct 3, 2018, 7:55:24 AM (6 years ago)
- Branches:
- master, ctbto, dev
- Children:
- ca867de
- Parents:
- 65748f4
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
source/python/classes/EcFlexpart.py
r65748f4 r27fe969 37 37 # - removed "dead code" , e.g. retrieval of Q since it is not needed 38 38 # - removed "times" parameter from retrieve-method since it is not used 39 # - seperated function "retrieve" into smaller functions (less code 40 # duplication, easier testing) 39 41 # 40 42 # @License: … … 110 112 111 113 c: instance of class ControlFile 112 Contains all the parameters of CONTROL file, which are e.g.: 113 DAY1(start_date), DAY2(end_date), DTIME, MAXSTEP, TYPE, TIME, 114 STEP, CLASS(marsclass), STREAM, NUMBER, EXPVER, GRID, LEFT, 115 LOWER, UPPER, RIGHT, LEVEL, LEVELIST, RESOL, GAUSS, ACCURACY, 116 OMEGA, OMEGADIFF, ETA, ETADIFF, DPDETA, SMOOTH, FORMAT, 117 ADDPAR, WRF, CWC, PREFIX, ECSTORAGE, ECTRANS, ECFSDIR, 118 MAILOPS, MAILFAIL, GRIB2FLEXPART, FLEXPARTDIR, BASETIME 119 DATE_CHUNK, DEBUG, INPUTDIR, OUTPUTDIR, FLEXPART_ROOT_SCRIPTS 120 114 Contains all the parameters of CONTROL file and 115 command line. 121 116 For more information about format and content of the parameter 122 117 see documentation. … … 297 292 298 293 294 def _mk_targetname(self, ftype, param, date): 295 ''' 296 @Description: 297 Creates the filename for the requested grib data to be stored in. 298 This name is passed as the "target" parameter in the request. 299 300 @Input: 301 ftype: string 302 Shortcut name of the type of the field. E.g. AN, FC, PF, ... 303 304 param: string 305 Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML, 306 GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL 307 308 date: string 309 The date period of the grib data to be stored in this file. 310 311 @Return: 312 targetname: string 313 The target filename for the grib data. 314 ''' 315 targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' + 316 str(os.getppid()) + '.' + str(os.getpid()) + '.grb') 317 318 return targetname 319 320 299 321 def _start_retrievement(self, request, par_dict): 300 322 ''' … … 359 381 return 360 382 361 362 def _mk_targetname(self, ftype, param, date):363 '''364 @Description:365 Creates the filename for the requested grib data to be stored in.366 This name is passed as the "target" parameter in the request.367 368 @Input:369 ftype: string370 Shortcut name of the type of the field. E.g. AN, FC, PF, ...371 372 param: string373 Shortcut of the grid type. E.g. SH__ML, SH__SL, GG__ML,374 GG__SL, OG__ML, OG__SL, OG_OROLSM_SL, OG_acc_SL375 376 date: string377 The date period of the grib data to be stored in this file.378 379 @Return:380 targetname: string381 The target filename for the grib data.382 '''383 targetname = (self.inputdir + '/' + ftype + param + '.' + date + '.' +384 str(os.getppid()) + '.' + str(os.getpid()) + '.grb')385 386 return targetname387 388 def write_namelist(self, c, filename):389 '''390 @Description:391 Creates a namelist file in the temporary directory and writes392 the following values to it: maxl, maxb, mlevel,393 mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1,394 momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta395 396 @Input:397 self: instance of EcFlexpart398 The current object of the class.399 400 c: instance of class ControlFile401 Contains all the parameters of CONTROL files, which are e.g.:402 DAY1(start_date), DAY2(end_date), DTIME, MAXSTEP, TYPE, TIME,403 STEP, CLASS(marsclass), STREAM, NUMBER, EXPVER, GRID, LEFT,404 LOWER, UPPER, RIGHT, LEVEL, LEVELIST, RESOL, GAUSS, ACCURACY,405 OMEGA, OMEGADIFF, ETA, ETADIFF, DPDETA, SMOOTH, FORMAT,406 ADDPAR, WRF, CWC, PREFIX, ECSTORAGE, ECTRANS, ECFSDIR,407 MAILOPS, MAILFAIL, GRIB2FLEXPART, FLEXPARTDIR, BASETIME408 DATE_CHUNK, DEBUG, INPUTDIR, OUTPUTDIR, FLEXPART_ROOT_SCRIPTS409 410 For more information about format and content of the parameter411 see documentation.412 413 filename: string414 Name of the namelist file.415 416 @Return:417 <nothing>418 '''419 420 self.inputdir = c.inputdir421 area = np.asarray(self.area.split('/')).astype(float)422 grid = np.asarray(self.grid.split('/')).astype(float)423 424 if area[1] > area[3]:425 area[1] -= 360426 maxl = int((area[3] - area[1]) / grid[1]) + 1427 maxb = int((area[0] - area[2]) / grid[0]) + 1428 429 with open(self.inputdir + '/' + filename, 'w') as f:430 f.write('&NAMGEN\n')431 f.write(',\n '.join(['maxl = ' + str(maxl), 'maxb = ' + str(maxb),432 'mlevel = ' + str(self.level),433 'mlevelist = ' + '"' + str(self.levelist)434 + '"',435 'mnauf = ' + str(self.resol),436 'metapar = ' + '77',437 'rlo0 = ' + str(area[1]),438 'rlo1 = ' + str(area[3]),439 'rla0 = ' + str(area[2]),440 'rla1 = ' + str(area[0]),441 'momega = ' + str(c.omega),442 'momegadiff = ' + str(c.omegadiff),443 'mgauss = ' + str(c.gauss),444 'msmooth = ' + str(c.smooth),445 'meta = ' + str(c.eta),446 'metadiff = ' + str(c.etadiff),447 'mdpdeta = ' + str(c.dpdeta)]))448 449 f.write('\n/\n')450 451 return452 383 453 384 def retrieve(self, server, dates, request, inputdir='.'): … … 656 587 657 588 658 def process_output(self, c):589 def write_namelist(self, c, filename): 659 590 ''' 660 591 @Description: 661 The grib files are postprocessed depending on the selection in 662 CONTROL file. The resulting files are moved to the output 663 directory if its not equla to the input directory. 664 The following modifications might be done if 665 properly switched in CONTROL file: 666 GRIB2 - Conversion to GRIB2 667 ECTRANS - Transfer of files to gateway server 668 ECSTORAGE - Storage at ECMWF server 669 GRIB2FLEXPART - Conversion of GRIB files to FLEXPART binary format 592 Creates a namelist file in the temporary directory and writes 593 the following values to it: maxl, maxb, mlevel, 594 mlevelist, mnauf, metapar, rlo0, rlo1, rla0, rla1, 595 momega, momegadiff, mgauss, msmooth, meta, metadiff, mdpdeta 670 596 671 597 @Input: … … 674 600 675 601 c: instance of class ControlFile 676 Contains all the parameters of CONTROL file, which are e.g.: 677 DAY1(start_date), DAY2(end_date), DTIME, MAXSTEP, TYPE, TIME, 678 STEP, CLASS(marsclass), STREAM, NUMBER, EXPVER, GRID, LEFT, 679 LOWER, UPPER, RIGHT, LEVEL, LEVELIST, RESOL, GAUSS, ACCURACY, 680 OMEGA, OMEGADIFF, ETA, ETADIFF, DPDETA, SMOOTH, FORMAT, 681 ADDPAR, WRF, CWC, PREFIX, ECSTORAGE, ECTRANS, ECFSDIR, 682 MAILOPS, MAILFAIL, GRIB2FLEXPART, FLEXPARTDIR, BASETIME 683 DATE_CHUNK, DEBUG, INPUTDIR, OUTPUTDIR, FLEXPART_ROOT_SCRIPTS 684 602 Contains all the parameters of CONTROL file and 603 command line. 685 604 For more information about format and content of the parameter 686 605 see documentation. 687 606 607 filename: string 608 Name of the namelist file. 609 688 610 @Return: 689 611 <nothing> 690 691 ''' 692 693 print('\n\nPostprocessing:\n Format: {}\n'.format(c.format)) 694 695 if not c.ecapi: 696 print('ecstorage: {}\n ecfsdir: {}\n'. 697 format(c.ecstorage, c.ecfsdir)) 698 #if not hasattr(c, 'gateway'): 699 # c.gateway = os.getenv('GATEWAY') 700 #if not hasattr(c, 'destination'): 701 # c.destination = os.getenv('DESTINATION') 702 print('ectrans: {}\n gateway: {}\n destination: {}\n ' 703 .format(c.ectrans, c.gateway, c.destination)) 704 705 print('Output filelist: \n') 706 print(self.outputfilelist) 707 708 if c.format.lower() == 'grib2': 709 for ofile in self.outputfilelist: 710 p = subprocess.check_call(['grib_set', '-s', 'edition=2, \ 711 productDefinitionTemplateNumber=8', 712 ofile, ofile + '_2']) 713 p = subprocess.check_call(['mv', ofile + '_2', ofile]) 714 715 if c.ectrans and not c.ecapi: 716 for ofile in self.outputfilelist: 717 p = subprocess.check_call(['ectrans', '-overwrite', '-gateway', 718 c.gateway, '-remote', c.destination, 719 '-source', ofile]) 720 #print('ectrans:', p) 721 722 if c.ecstorage and not c.ecapi: 723 for ofile in self.outputfilelist: 724 p = subprocess.check_call(['ecp', '-o', ofile, 725 os.path.expandvars(c.ecfsdir)]) 726 727 if c.outputdir != c.inputdir: 728 for ofile in self.outputfilelist: 729 p = subprocess.check_call(['mv', ofile, c.outputdir]) 730 731 # prepare environment for the grib2flexpart run 732 # to convert grib to flexpart binary 733 if c.grib2flexpart: 734 735 # generate AVAILABLE file 736 # Example of AVAILABLE file data: 737 # 20131107 000000 EN13110700 ON DISC 738 clist = [] 739 for ofile in self.outputfilelist: 740 fname = ofile.split('/') 741 if '.' in fname[-1]: 742 l = fname[-1].split('.') 743 timestamp = datetime.strptime(l[0][-6:] + l[1], 744 '%y%m%d%H') 745 timestamp += timedelta(hours=int(l[2])) 746 cdate = datetime.strftime(timestamp, '%Y%m%d') 747 chms = datetime.strftime(timestamp, '%H%M%S') 748 else: 749 cdate = '20' + fname[-1][-8:-2] 750 chms = fname[-1][-2:] + '0000' 751 clist.append(cdate + ' ' + chms + ' '*6 + 752 fname[-1] + ' '*14 + 'ON DISC') 753 clist.sort() 754 with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f: 755 f.write('\n'.join(clist) + '\n') 756 757 # generate pathnames file 758 pwd = os.path.abspath(c.outputdir) 759 with open(pwd + '/pathnames', 'w') as f: 760 f.write(pwd + '/Options/\n') 761 f.write(pwd + '/\n') 762 f.write(pwd + '/\n') 763 f.write(pwd + '/AVAILABLE\n') 764 f.write(' = == = == = == = == = == == = \n') 765 766 # create Options dir if necessary 767 if not os.path.exists(pwd + '/Options'): 768 os.makedirs(pwd+'/Options') 769 770 # read template COMMAND file 771 with open(os.path.expandvars(os.path.expanduser( 772 c.flexpart_root_scripts)) + '/../Options/COMMAND', 'r') as f: 773 lflist = f.read().split('\n') 774 775 # find index of list where to put in the 776 # date and time information 777 # usually after the LDIRECT parameter 778 i = 0 779 for l in lflist: 780 if 'LDIRECT' in l.upper(): 612 ''' 613 614 self.inputdir = c.inputdir 615 area = np.asarray(self.area.split('/')).astype(float) 616 grid = np.asarray(self.grid.split('/')).astype(float) 617 618 if area[1] > area[3]: 619 area[1] -= 360 620 maxl = int((area[3] - area[1]) / grid[1]) + 1 621 maxb = int((area[0] - area[2]) / grid[0]) + 1 622 623 with open(self.inputdir + '/' + filename, 'w') as f: 624 f.write('&NAMGEN\n') 625 f.write(',\n '.join(['maxl = ' + str(maxl), 'maxb = ' + str(maxb), 626 'mlevel = ' + str(self.level), 627 'mlevelist = ' + '"' + str(self.levelist) 628 + '"', 629 'mnauf = ' + str(self.resol), 630 'metapar = ' + '77', 631 'rlo0 = ' + str(area[1]), 632 'rlo1 = ' + str(area[3]), 633 'rla0 = ' + str(area[2]), 634 'rla1 = ' + str(area[0]), 635 'momega = ' + str(c.omega), 636 'momegadiff = ' + str(c.omegadiff), 637 'mgauss = ' + str(c.gauss), 638 'msmooth = ' + str(c.smooth), 639 'meta = ' + str(c.eta), 640 'metadiff = ' + str(c.etadiff), 641 'mdpdeta = ' + str(c.dpdeta)])) 642 643 f.write('\n/\n') 644 645 return 646 647 648 def deacc_fluxes(self, inputfiles, c): 649 ''' 650 @Description: 651 Goes through all flux fields in ordered time and de-accumulate 652 the fields. Afterwards the fields are disaggregated in time. 653 Different versions of disaggregation is provided for rainfall 654 data (darain, modified linear) and the surface fluxes and 655 stress data (dapoly, cubic polynomial). 656 657 @Input: 658 self: instance of EcFlexpart 659 The current object of the class. 660 661 inputfiles: instance of UioFiles 662 Contains a list of files. 663 664 c: instance of class ControlFile 665 Contains all the parameters of CONTROL file and 666 command line. 667 For more information about format and content of the parameter 668 see documentation. 669 670 @Return: 671 <nothing> 672 ''' 673 674 table128 = init128(_config.PATH_GRIBTABLE) 675 pars = to_param_id(self.params['OG_acc_SL'][0], table128) 676 index_keys = ["date", "time", "step"] 677 indexfile = os.path.join(c.inputdir, _config.FILE_GRIB_INDEX) 678 silent_remove(indexfile) 679 grib = GribTools(inputfiles.files) 680 # creates new index file 681 iid = grib.index(index_keys=index_keys, index_file=indexfile) 682 683 # read values of index keys 684 index_vals = [] 685 for key in index_keys: 686 key_vals = grib_index_get(iid, key) 687 print(key_vals) 688 # have to sort the steps for disaggregation, 689 # therefore convert to int first 690 if key == 'step': 691 key_vals = [int(k) for k in key_vals] 692 key_vals.sort() 693 key_vals = [str(k) for k in key_vals] 694 index_vals.append(key_vals) 695 # index_vals looks for example like: 696 # index_vals[0]: ('20171106', '20171107', '20171108') ; date 697 # index_vals[1]: ('0', '1200') ; time 698 # index_vals[2]: (3', '6', '9', '12') ; stepRange 699 700 valsdict = {} 701 svalsdict = {} 702 stepsdict = {} 703 for p in pars: 704 valsdict[str(p)] = [] 705 svalsdict[str(p)] = [] 706 stepsdict[str(p)] = [] 707 708 print('maxstep: ', c.maxstep) 709 710 for prod in product(*index_vals): 711 # e.g. prod = ('20170505', '0', '12') 712 # ( date ,time, step) 713 # per date e.g. time = 0, 1200 714 # per time e.g. step = 3, 6, 9, 12 715 print('current prod: ', prod) 716 for i in range(len(index_keys)): 717 grib_index_select(iid, index_keys[i], prod[i]) 718 719 # get first id from current product 720 gid = grib_new_from_index(iid) 721 722 # if there is data for this product combination 723 # prepare some date and time parameter before reading the data 724 if gid is not None: 725 cdate = grib_get(gid, 'date') 726 time = grib_get(gid, 'time') 727 step = grib_get(gid, 'step') 728 # date+time+step-2*dtime 729 # (since interpolated value valid for step-2*dtime) 730 sdate = datetime(year=cdate/10000, 731 month=(cdate % 10000)/100, 732 day=(cdate % 100), 733 hour=time/100) 734 fdate = sdate + timedelta(hours=step-2*int(c.dtime)) 735 sdates = sdate + timedelta(hours=step) 736 elimit = None 737 else: 738 break 739 740 if c.maxstep > 12: 741 fnout = os.path.join(c.inputdir, 'flux' + 742 sdate.strftime('%Y%m%d') + 743 '.{:0>2}'.format(time/100) + 744 '.{:0>3}'.format(step-2*int(c.dtime))) 745 gnout = os.path.join(c.inputdir, 'flux' + 746 sdate.strftime('%Y%m%d') + 747 '.{:0>2}'.format(time/100) + 748 '.{:0>3}'.format(step-int(c.dtime))) 749 hnout = os.path.join(c.inputdir, 'flux' + 750 sdate.strftime('%Y%m%d') + 751 '.{:0>2}'.format(time/100) + 752 '.{:0>3}'.format(step)) 753 else: 754 fnout = os.path.join(c.inputdir, 'flux' + 755 fdate.strftime('%Y%m%d%H')) 756 gnout = os.path.join(c.inputdir, 'flux' + 757 (fdate + timedelta(hours=int(c.dtime))). 758 strftime('%Y%m%d%H')) 759 hnout = os.path.join(c.inputdir, 'flux' + 760 sdates.strftime('%Y%m%d%H')) 761 762 print("outputfile = " + fnout) 763 f_handle = open(fnout, 'w') 764 g_handle = open(gnout, 'w') 765 h_handle = open(hnout, 'w') 766 767 # read message for message and store relevant data fields 768 # data keywords are stored in pars 769 while 1: 770 if gid is None: 781 771 break 782 i += 1 783 784 # insert the date and time information of run start and end 785 # into the list of lines of COMMAND file 786 lflist = lflist[:i+1] + \ 787 [clist[0][:16], clist[-1][:16]] + \ 788 lflist[i+3:] 789 790 # write the new COMMAND file 791 with open(pwd + '/Options/COMMAND', 'w') as g: 792 g.write('\n'.join(lflist) + '\n') 793 794 # change to outputdir and start the grib2flexpart run 795 # afterwards switch back to the working dir 796 os.chdir(c.outputdir) 797 p = subprocess.check_call([ 798 os.path.expandvars(os.path.expanduser(c.flexpart_root_scripts)) 799 + '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.']) 800 os.chdir(pwd) 772 cparamId = str(grib_get(gid, 'paramId')) 773 step = grib_get(gid, 'step') 774 atime = grib_get(gid, 'time') 775 ni = grib_get(gid, 'Ni') 776 nj = grib_get(gid, 'Nj') 777 if cparamId in valsdict.keys(): 778 values = grib_get_values(gid) 779 vdp = valsdict[cparamId] 780 svdp = svalsdict[cparamId] 781 sd = stepsdict[cparamId] 782 783 if cparamId == '142' or cparamId == '143': 784 fak = 1. / 1000. 785 else: 786 fak = 3600. 787 788 values = (np.reshape(values, (nj, ni))).flatten() / fak 789 vdp.append(values[:]) # save the accumulated values 790 if step <= int(c.dtime): 791 svdp.append(values[:] / int(c.dtime)) 792 else: # deaccumulate values 793 svdp.append((vdp[-1] - vdp[-2]) / int(c.dtime)) 794 795 print(cparamId, atime, step, len(values), 796 values[0], np.std(values)) 797 # save the 1/3-hourly or specific values 798 # svdp.append(values[:]) 799 sd.append(step) 800 # len(svdp) correspond to the time 801 if len(svdp) >= 3: 802 if len(svdp) > 3: 803 if cparamId == '142' or cparamId == '143': 804 values = disaggregation.darain(svdp) 805 else: 806 values = disaggregation.dapoly(svdp) 807 808 if not (step == c.maxstep and c.maxstep > 12 \ 809 or sdates == elimit): 810 vdp.pop(0) 811 svdp.pop(0) 812 else: 813 if c.maxstep > 12: 814 values = svdp[1] 815 else: 816 values = svdp[0] 817 818 grib_set_values(gid, values) 819 if c.maxstep > 12: 820 grib_set(gid, 'step', max(0, step-2*int(c.dtime))) 821 else: 822 grib_set(gid, 'step', 0) 823 grib_set(gid, 'time', fdate.hour*100) 824 grib_set(gid, 'date', fdate.year*10000 + 825 fdate.month*100+fdate.day) 826 grib_write(gid, f_handle) 827 828 if c.basetime: 829 elimit = datetime.strptime(c.end_date + 830 c.basetime, '%Y%m%d%H') 831 else: 832 elimit = sdate + timedelta(2*int(c.dtime)) 833 834 # squeeze out information of last two steps contained 835 # in svdp 836 # if step+int(c.dtime) == c.maxstep and c.maxstep>12 837 # or sdates+timedelta(hours = int(c.dtime)) 838 # >= elimit: 839 # Note that svdp[0] has not been popped in this case 840 841 if step == c.maxstep and c.maxstep > 12 or \ 842 sdates == elimit: 843 844 values = svdp[3] 845 grib_set_values(gid, values) 846 grib_set(gid, 'step', 0) 847 truedatetime = fdate + timedelta(hours= 848 2*int(c.dtime)) 849 grib_set(gid, 'time', truedatetime.hour * 100) 850 grib_set(gid, 'date', truedatetime.year * 10000 + 851 truedatetime.month * 100 + 852 truedatetime.day) 853 grib_write(gid, h_handle) 854 855 #values = (svdp[1]+svdp[2])/2. 856 if cparamId == '142' or cparamId == '143': 857 values = disaggregation.darain(list(reversed(svdp))) 858 else: 859 values = disaggregation.dapoly(list(reversed(svdp))) 860 861 grib_set(gid, 'step', 0) 862 truedatetime = fdate + timedelta(hours=int(c.dtime)) 863 grib_set(gid, 'time', truedatetime.hour * 100) 864 grib_set(gid, 'date', truedatetime.year * 10000 + 865 truedatetime.month * 100 + 866 truedatetime.day) 867 grib_set_values(gid, values) 868 grib_write(gid, g_handle) 869 870 grib_release(gid) 871 872 gid = grib_new_from_index(iid) 873 874 f_handle.close() 875 g_handle.close() 876 h_handle.close() 877 878 grib_index_release(iid) 801 879 802 880 return 881 803 882 804 883 def create(self, inputfiles, c): … … 825 904 826 905 c: instance of class ControlFile 827 Contains all the parameters of CONTROL files, which are e.g.: 828 DAY1(start_date), DAY2(end_date), DTIME, MAXSTEP, TYPE, TIME, 829 STEP, CLASS(marsclass), STREAM, NUMBER, EXPVER, GRID, LEFT, 830 LOWER, UPPER, RIGHT, LEVEL, LEVELIST, RESOL, GAUSS, ACCURACY, 831 OMEGA, OMEGADIFF, ETA, ETADIFF, DPDETA, SMOOTH, FORMAT, 832 ADDPAR, WRF, CWC, PREFIX, ECSTORAGE, ECTRANS, ECFSDIR, 833 MAILOPS, MAILFAIL, GRIB2FLEXPART, FLEXPARTDIR, BASETIME 834 DATE_CHUNK, DEBUG, INPUTDIR, OUTPUTDIR, FLEXPART_ROOT_SCRIPTS 835 906 Contains all the parameters of CONTROL file and 907 command line. 836 908 For more information about format and content of the parameter 837 909 see documentation. … … 1060 1132 return 1061 1133 1062 def deacc_fluxes(self, inputfiles, c): 1134 1135 def process_output(self, c): 1063 1136 ''' 1064 1137 @Description: 1065 Goes through all flux fields in ordered time and de-accumulate 1066 the fields. Afterwards the fields are disaggregated in time. 1067 Different versions of disaggregation is provided for rainfall 1068 data (darain, modified linear) and the surface fluxes and 1069 stress data (dapoly, cubic polynomial). 1138 The grib files are postprocessed depending on the selection in 1139 CONTROL file. The resulting files are moved to the output 1140 directory if its not equal to the input directory. 1141 The following modifications might be done if 1142 properly switched in CONTROL file: 1143 GRIB2 - Conversion to GRIB2 1144 ECTRANS - Transfer of files to gateway server 1145 ECSTORAGE - Storage at ECMWF server 1070 1146 1071 1147 @Input: … … 1073 1149 The current object of the class. 1074 1150 1075 inputfiles: instance of UioFiles1076 Contains a list of files.1077 1078 1151 c: instance of class ControlFile 1079 Contains all the parameters of CONTROL file, which are e.g.: 1080 DAY1(start_date), DAY2(end_date), DTIME, MAXSTEP, TYPE, TIME, 1081 STEP, CLASS(marsclass), STREAM, NUMBER, EXPVER, GRID, LEFT, 1082 LOWER, UPPER, RIGHT, LEVEL, LEVELIST, RESOL, GAUSS, ACCURACY, 1083 OMEGA, OMEGADIFF, ETA, ETADIFF, DPDETA, SMOOTH, FORMAT, 1084 ADDPAR, WRF, CWC, PREFIX, ECSTORAGE, ECTRANS, ECFSDIR, 1085 MAILOPS, MAILFAIL, GRIB2FLEXPART, FLEXPARTDIR, BASETIME 1086 DATE_CHUNK, DEBUG, INPUTDIR, OUTPUTDIR, FLEXPART_ROOT_SCRIPTS 1087 1152 Contains all the parameters of CONTROL file and 1153 command line. 1088 1154 For more information about format and content of the parameter 1089 1155 see documentation. … … 1091 1157 @Return: 1092 1158 <nothing> 1093 ''' 1094 1095 table128 = init128(_config.PATH_GRIBTABLE) 1096 pars = to_param_id(self.params['OG_acc_SL'][0], table128) 1097 index_keys = ["date", "time", "step"] 1098 indexfile = os.path.join(c.inputdir, _config.FILE_GRIB_INDEX) 1099 silent_remove(indexfile) 1100 grib = GribTools(inputfiles.files) 1101 # creates new index file 1102 iid = grib.index(index_keys=index_keys, index_file=indexfile) 1103 1104 # read values of index keys 1105 index_vals = [] 1106 for key in index_keys: 1107 key_vals = grib_index_get(iid, key) 1108 print(key_vals) 1109 # have to sort the steps for disaggregation, 1110 # therefore convert to int first 1111 if key == 'step': 1112 key_vals = [int(k) for k in key_vals] 1113 key_vals.sort() 1114 key_vals = [str(k) for k in key_vals] 1115 index_vals.append(key_vals) 1116 # index_vals looks for example like: 1117 # index_vals[0]: ('20171106', '20171107', '20171108') ; date 1118 # index_vals[1]: ('0', '1200') ; time 1119 # index_vals[2]: (3', '6', '9', '12') ; stepRange 1120 1121 valsdict = {} 1122 svalsdict = {} 1123 stepsdict = {} 1124 for p in pars: 1125 valsdict[str(p)] = [] 1126 svalsdict[str(p)] = [] 1127 stepsdict[str(p)] = [] 1128 1129 print('maxstep: ', c.maxstep) 1130 1131 for prod in product(*index_vals): 1132 # e.g. prod = ('20170505', '0', '12') 1133 # ( date ,time, step) 1134 # per date e.g. time = 0, 1200 1135 # per time e.g. step = 3, 6, 9, 12 1136 print('current prod: ', prod) 1137 for i in range(len(index_keys)): 1138 grib_index_select(iid, index_keys[i], prod[i]) 1139 1140 # get first id from current product 1141 gid = grib_new_from_index(iid) 1142 1143 # if there is data for this product combination 1144 # prepare some date and time parameter before reading the data 1145 if gid is not None: 1146 cdate = grib_get(gid, 'date') 1147 time = grib_get(gid, 'time') 1148 step = grib_get(gid, 'step') 1149 # date+time+step-2*dtime 1150 # (since interpolated value valid for step-2*dtime) 1151 sdate = datetime(year=cdate/10000, 1152 month=(cdate % 10000)/100, 1153 day=(cdate % 100), 1154 hour=time/100) 1155 fdate = sdate + timedelta(hours=step-2*int(c.dtime)) 1156 sdates = sdate + timedelta(hours=step) 1157 elimit = None 1159 1160 ''' 1161 1162 print('\n\nPostprocessing:\n Format: {}\n'.format(c.format)) 1163 1164 if not c.ecapi: 1165 print('ecstorage: {}\n ecfsdir: {}\n'. 1166 format(c.ecstorage, c.ecfsdir)) 1167 print('ectrans: {}\n gateway: {}\n destination: {}\n ' 1168 .format(c.ectrans, c.gateway, c.destination)) 1169 1170 print('Output filelist: \n') 1171 print(self.outputfilelist) 1172 1173 if c.format.lower() == 'grib2': 1174 for ofile in self.outputfilelist: 1175 p = subprocess.check_call(['grib_set', '-s', 'edition=2, \ 1176 productDefinitionTemplateNumber=8', 1177 ofile, ofile + '_2']) 1178 p = subprocess.check_call(['mv', ofile + '_2', ofile]) 1179 1180 if c.ectrans and not c.ecapi: 1181 for ofile in self.outputfilelist: 1182 p = subprocess.check_call(['ectrans', '-overwrite', '-gateway', 1183 c.gateway, '-remote', c.destination, 1184 '-source', ofile]) 1185 1186 if c.ecstorage and not c.ecapi: 1187 for ofile in self.outputfilelist: 1188 p = subprocess.check_call(['ecp', '-o', ofile, 1189 os.path.expandvars(c.ecfsdir)]) 1190 1191 if c.outputdir != c.inputdir: 1192 for ofile in self.outputfilelist: 1193 p = subprocess.check_call(['mv', ofile, c.outputdir]) 1194 1195 return 1196 1197 1198 def prepare_fp_files(self, c): 1199 ''' 1200 @Description: 1201 Conversion of GRIB files to FLEXPART binary format. 1202 1203 @Input: 1204 c: instance of class ControlFile 1205 Contains all the parameters of CONTROL file and 1206 command line. 1207 For more information about format and content of the parameter 1208 see documentation. 1209 1210 1211 @Return: 1212 <nothing> 1213 ''' 1214 # generate AVAILABLE file 1215 # Example of AVAILABLE file data: 1216 # 20131107 000000 EN13110700 ON DISC 1217 clist = [] 1218 for ofile in self.outputfilelist: 1219 fname = ofile.split('/') 1220 if '.' in fname[-1]: 1221 l = fname[-1].split('.') 1222 timestamp = datetime.strptime(l[0][-6:] + l[1], 1223 '%y%m%d%H') 1224 timestamp += timedelta(hours=int(l[2])) 1225 cdate = datetime.strftime(timestamp, '%Y%m%d') 1226 chms = datetime.strftime(timestamp, '%H%M%S') 1158 1227 else: 1228 cdate = '20' + fname[-1][-8:-2] 1229 chms = fname[-1][-2:] + '0000' 1230 clist.append(cdate + ' ' + chms + ' '*6 + 1231 fname[-1] + ' '*14 + 'ON DISC') 1232 clist.sort() 1233 with open(c.outputdir + '/' + 'AVAILABLE', 'w') as f: 1234 f.write('\n'.join(clist) + '\n') 1235 1236 # generate pathnames file 1237 pwd = os.path.abspath(c.outputdir) 1238 with open(pwd + '/pathnames', 'w') as f: 1239 f.write(pwd + '/Options/\n') 1240 f.write(pwd + '/\n') 1241 f.write(pwd + '/\n') 1242 f.write(pwd + '/AVAILABLE\n') 1243 f.write(' = == = == = == = == = == == = \n') 1244 1245 # create Options dir if necessary 1246 if not os.path.exists(pwd + '/Options'): 1247 os.makedirs(pwd+'/Options') 1248 1249 # read template COMMAND file 1250 with open(os.path.expandvars(os.path.expanduser( 1251 c.flexpart_root_scripts)) + '/../Options/COMMAND', 'r') as f: 1252 lflist = f.read().split('\n') 1253 1254 # find index of list where to put in the 1255 # date and time information 1256 # usually after the LDIRECT parameter 1257 i = 0 1258 for l in lflist: 1259 if 'LDIRECT' in l.upper(): 1159 1260 break 1160 1161 if c.maxstep > 12: 1162 fnout = os.path.join(c.inputdir, 'flux' + 1163 sdate.strftime('%Y%m%d') + 1164 '.{:0>2}'.format(time/100) + 1165 '.{:0>3}'.format(step-2*int(c.dtime))) 1166 gnout = os.path.join(c.inputdir, 'flux' + 1167 sdate.strftime('%Y%m%d') + 1168 '.{:0>2}'.format(time/100) + 1169 '.{:0>3}'.format(step-int(c.dtime))) 1170 hnout = os.path.join(c.inputdir, 'flux' + 1171 sdate.strftime('%Y%m%d') + 1172 '.{:0>2}'.format(time/100) + 1173 '.{:0>3}'.format(step)) 1174 else: 1175 fnout = os.path.join(c.inputdir, 'flux' + 1176 fdate.strftime('%Y%m%d%H')) 1177 gnout = os.path.join(c.inputdir, 'flux' + 1178 (fdate + timedelta(hours=int(c.dtime))). 1179 strftime('%Y%m%d%H')) 1180 hnout = os.path.join(c.inputdir, 'flux' + 1181 sdates.strftime('%Y%m%d%H')) 1182 1183 print("outputfile = " + fnout) 1184 f_handle = open(fnout, 'w') 1185 g_handle = open(gnout, 'w') 1186 h_handle = open(hnout, 'w') 1187 1188 # read message for message and store relevant data fields 1189 # data keywords are stored in pars 1190 while 1: 1191 if gid is None: 1192 break 1193 cparamId = str(grib_get(gid, 'paramId')) 1194 step = grib_get(gid, 'step') 1195 atime = grib_get(gid, 'time') 1196 ni = grib_get(gid, 'Ni') 1197 nj = grib_get(gid, 'Nj') 1198 if cparamId in valsdict.keys(): 1199 values = grib_get_values(gid) 1200 vdp = valsdict[cparamId] 1201 svdp = svalsdict[cparamId] 1202 sd = stepsdict[cparamId] 1203 1204 if cparamId == '142' or cparamId == '143': 1205 fak = 1. / 1000. 1206 else: 1207 fak = 3600. 1208 1209 values = (np.reshape(values, (nj, ni))).flatten() / fak 1210 vdp.append(values[:]) # save the accumulated values 1211 if step <= int(c.dtime): 1212 svdp.append(values[:] / int(c.dtime)) 1213 else: # deaccumulate values 1214 svdp.append((vdp[-1] - vdp[-2]) / int(c.dtime)) 1215 1216 print(cparamId, atime, step, len(values), 1217 values[0], np.std(values)) 1218 # save the 1/3-hourly or specific values 1219 # svdp.append(values[:]) 1220 sd.append(step) 1221 # len(svdp) correspond to the time 1222 if len(svdp) >= 3: 1223 if len(svdp) > 3: 1224 if cparamId == '142' or cparamId == '143': 1225 values = disaggregation.darain(svdp) 1226 else: 1227 values = disaggregation.dapoly(svdp) 1228 1229 if not (step == c.maxstep and c.maxstep > 12 \ 1230 or sdates == elimit): 1231 vdp.pop(0) 1232 svdp.pop(0) 1233 else: 1234 if c.maxstep > 12: 1235 values = svdp[1] 1236 else: 1237 values = svdp[0] 1238 1239 grib_set_values(gid, values) 1240 if c.maxstep > 12: 1241 grib_set(gid, 'step', max(0, step-2*int(c.dtime))) 1242 else: 1243 grib_set(gid, 'step', 0) 1244 grib_set(gid, 'time', fdate.hour*100) 1245 grib_set(gid, 'date', fdate.year*10000 + 1246 fdate.month*100+fdate.day) 1247 grib_write(gid, f_handle) 1248 1249 if c.basetime: 1250 elimit = datetime.strptime(c.end_date + 1251 c.basetime, '%Y%m%d%H') 1252 else: 1253 elimit = sdate + timedelta(2*int(c.dtime)) 1254 1255 # squeeze out information of last two steps contained 1256 # in svdp 1257 # if step+int(c.dtime) == c.maxstep and c.maxstep>12 1258 # or sdates+timedelta(hours = int(c.dtime)) 1259 # >= elimit: 1260 # Note that svdp[0] has not been popped in this case 1261 1262 if step == c.maxstep and c.maxstep > 12 or \ 1263 sdates == elimit: 1264 1265 values = svdp[3] 1266 grib_set_values(gid, values) 1267 grib_set(gid, 'step', 0) 1268 truedatetime = fdate + timedelta(hours= 1269 2*int(c.dtime)) 1270 grib_set(gid, 'time', truedatetime.hour * 100) 1271 grib_set(gid, 'date', truedatetime.year * 10000 + 1272 truedatetime.month * 100 + 1273 truedatetime.day) 1274 grib_write(gid, h_handle) 1275 1276 #values = (svdp[1]+svdp[2])/2. 1277 if cparamId == '142' or cparamId == '143': 1278 values = disaggregation.darain(list(reversed(svdp))) 1279 else: 1280 values = disaggregation.dapoly(list(reversed(svdp))) 1281 1282 grib_set(gid, 'step', 0) 1283 truedatetime = fdate + timedelta(hours=int(c.dtime)) 1284 grib_set(gid, 'time', truedatetime.hour * 100) 1285 grib_set(gid, 'date', truedatetime.year * 10000 + 1286 truedatetime.month * 100 + 1287 truedatetime.day) 1288 grib_set_values(gid, values) 1289 grib_write(gid, g_handle) 1290 1291 grib_release(gid) 1292 1293 gid = grib_new_from_index(iid) 1294 1295 f_handle.close() 1296 g_handle.close() 1297 h_handle.close() 1298 1299 grib_index_release(iid) 1261 i += 1 1262 1263 # insert the date and time information of run start and end 1264 # into the list of lines of COMMAND file 1265 lflist = lflist[:i+1] + \ 1266 [clist[0][:16], clist[-1][:16]] + \ 1267 lflist[i+3:] 1268 1269 # write the new COMMAND file 1270 with open(pwd + '/Options/COMMAND', 'w') as g: 1271 g.write('\n'.join(lflist) + '\n') 1272 1273 # change to outputdir and start the grib2flexpart run 1274 # afterwards switch back to the working dir 1275 os.chdir(c.outputdir) 1276 p = subprocess.check_call([ 1277 os.path.expandvars(os.path.expanduser(c.flexpart_root_scripts)) 1278 + '/../FLEXPART_PROGRAM/grib2flexpart', 'useAvailable', '.']) 1279 os.chdir(pwd) 1300 1280 1301 1281 return 1302 1303 1304
Note: See TracChangeset
for help on using the changeset viewer.