#!/usr/bin/python import os import glob def main(cmdArgs): import ast # print (cmdArgs) # This is handy step to break the json returned from amqpfind into a # dictionary # Here is an example output from amqpfind # geo.goes.g16.abi.adde.sdi.ncdf.band.end: '{"title": "ABI L1b Radiances", "mode": "3", "start_time": "2017-05-01 17:47:26.8", "message_type": "band", "adde_dataset": "GOES16/M1", "band": 3, "satellite_location": "GOES-Test", "server_type": "sdi", "path": "/complete/goes/grb/goes16/2017/2017_05_01_121/abi/L1b/RadM1/OR_ABI-L1b-RadM1-M3C03_G16_s20171211747268_e20171211747326_c20171211747368.nc", "instrument": "ABI", "satellite_family": "GOES", "status": "end", "end_time": "2017-05-01 17:47:32.6", "coverage": "Mesoscale-1", "create_time": "2017-05-01 17:47:36.8", "signal_type": "grb", "medium": "adde", "satellite_ID": "G16", "server_ip": "sdigrbbeta.ssec.wisc.edu"}' eventInfo = ast.literal_eval(cmdArgs[2]) print (eventInfo) if 'start' in eventInfo['status'] : return elif ('end' in eventInfo['status']) and (eventInfo['band'] <=3) : print ('----------------begin data copy------------------------') copyReturn = run_copy(eventInfo) if copyReturn['status'] == 0: print ('-------------copy data complete-------------------') print (' ') return else: print ('-------------copy data Failed-------------------') return elif ('complete' in eventInfo['status']): print ('--------------begin processing--------------------') processReturn = run_process(eventInfo) print (' return from processing ', processReturn['status']) if processReturn['status'] == 0: print ('--------------processing complete-------------') print (' ') print ('-----------send RabbitMQ Message------------------------') publishReturn = publish_message(eventInfo) if publishReturn['status'] == 0: print ('--------------Publish complete-------------') print (' ') return else: print('Failed processing') print ('--------------end processing------------------------') print (' ') return def run_copy(copyInfo): import shutil import glob nfsPath = os.environ['NFS_ROOT'] + copyInfo['path'] destinationDir = os.environ['LOCAL_TEMP_DIR'] if not os.path.isdir(os.environ['LOCAL_TEMP_DIR']): try: os.makedirs(os.environ['LOCAL_TEMP_DIR']) copyInfo['status'] = 0 except Exception as e: print ('Failed to make temporary directory' + os.environ['LOCAL_TEMP_DIR']) print (str(e)) copyInfo['status'] = -1 return copyInfo print (copyInfo['path']) print (type(copyInfo['path'])) file = copyInfo['path'].replace('data','satbuf1_data') destinationFile = os.path.join(destinationDir,os.path.basename(file)) try: shutil.copyfile(file,destinationFile) copyInfo['status'] = 0 except Exception as e: print (str(e)) copyInfo['status'] = -1 return copyInfo copyInfo['status'] = 0 copyInfo['path'] = os.path.join(destinationDir,os.path.basename(copyInfo['path'])) return copyInfo def run_process(processInfo): import subprocess import datetime # rgbCmd = 'ls' rgbCmd = ['/home/kbah/code/RGB/realtime/subpro/sh/call_idl.sh'] productDir = os.environ['PRODUCT_DIR'] # create the correct directory structure tempDateTime = datetime.datetime.strptime(processInfo['start_time'], '%Y-%m-%d %H:%M:%S.%f') dateDir = tempDateTime.strftime('%Y_%m_%d_%j') tempDir = os.path.join(productDir,dateDir) if 'Meso' in processInfo['coverage'] : coverageDir = processInfo['coverage'].upper() elif 'Full' in processInfo['coverage'] : coverageDir = 'FD' else: coverageDir = processInfo['coverage'].upper() # Need to split apart the file name dataFile = os.path.basename(processInfo['path']) fileParts = dataFile.split('_') del fileParts[3:4] fileParts.append(tempDateTime.strftime('s%Y%j%H%M%S*.dat')) fileMask = '_'.join(fileParts) tempDir = os.path.join(tempDir,coverageDir) processInfo['path'] = os.path.join(tempDir,fileMask) try: subprocess.check_call(rgbCmd,shell=True) print ('finished IDL processing') processInfo['status'] = 0 return processInfo except: processInfo['status'] = -1 print ('IDL processing failed') return(processInfo) def publish_message(messageInfo): import json import os import traceback as tb # # open a rabbitmq connection to mq1.ssec.wisc.edu # connectionProperties = open_rabbitmq_connection() if not connectionProperties: print ('unable to open ', os.environ['RABBITMQ_CONFIG']) processInfo['status'] = -1 return (processInfo) # # Create payload from dictionary # payload = json.dumps(messageInfo) # # publish actual message # # Hard code some values for now # satellite_type = 'rgb-test' satellite_family = messageInfo['satellite_family'].lower() satellite_ID = messageInfo['satellite_ID'].lower() satellite_instrument = messageInfo['instrument'].lower() medium = 'nfs' server_type = 'cluster' format = 'bin' classification = 'rgb' status = 'complete' keys = (satellite_type, satellite_family, satellite_ID, satellite_instrument, medium, server_type, format, classification, status) routing_key = '.'.join(keys) channel = connectionProperties['channel'] try: channel.basic_publish(exchange=connectionProperties['exchange'], routing_key=routing_key, body=payload, properties=connectionProperties['properties'] ) messageInfo['status'] = 0 except: print ('failed to publish message: ') print (routing_key) print (payload) tb.print_exc() print (' ') messageInfo['status'] = -1 return (messageInfo) def open_rabbitmq_connection(): import os import ast import pika # # open connection to rabbitmq # rabbitmqConfigFile = os.environ['RABBITMQ_CONFIG'] try: fileID = open(rabbitmqConfigFile,'r') except: return (False) rabbitmqParam={} for lines in fileID: lines = lines.rstrip() param = lines.split(' = ') rabbitmqParam[param[0]] = ast.literal_eval(param[1]) credentials = pika.PlainCredentials(rabbitmqParam['rabbitmq_user'], rabbitmqParam['rabbitmq_pass']) conn_params = pika.ConnectionParameters(host=rabbitmqParam['rabbitmq_server'], credentials=credentials) connection = pika.BlockingConnection(conn_params) channel = connection.channel() properties=pika.BasicProperties(delivery_mode = 2) publishProperties = {} publishProperties = dict( exchange = rabbitmqParam['rabbitmq_exchange'], properties = properties, channel = channel ) return publishProperties if __name__ == '__main__': import sys main(sys.argv)