#!/usr/bin/python import datetime import copy import sqlite3 import logging import glob import time def main(cmdArgs): # Most of the information to publish messages is done by # by passing around and updating a dictionary # Create a log file if not create_rotating_log(): return False LOG = logging.getLogger(__name__) # # Open your file and start reading in lines # status = main_processing(processFile, cmdArgs[0]) def main_processing(processFile, scriptName): import logging LOG = logging.getLogger(__name__) LOG.debug('------------------------------- ') LOG.debug('Incoming File ' + processFile) databaseDir = os.environ['DATABASE_DIR'] obs_day = datetime.datetime.strptime(eventInfo['start_time'], '%Y-%m-%d %H:%M:%S.%f') obs_day_str = datetime.datetime.strftime(obs_day, '%Y%m%d') + '-event' + '.sqlite' databaseFile = os.path.join(databaseDir, obs_day_str) if not os.path.isfile(databaseFile): try: create_table(databaseFile) except Exception as e: message = 'Error: ' + str(e) LOG.error(message) message = 'failed creating table' LOG.error(message) return False try: table_insert(databaseUpdateCopy, databaseFile) except Exception as e: message = 'Error: ' + str(e) LOG.error(message) message = 'failed update database' LOG.error(message) return False def create_table(dbFile): import logging LOG = logging.getLogger(__name__) try: conn = sqlite3.connect(dbFile) except Exception as e: message = 'Error: ' + str(e) LOG.error(message) message = 'failed connection - ' + dbFile LOG.error(message) return False try: db = conn.cursor() except Exception as e: message = 'Error: ' + str(e) LOG.error(message) message = 'failed cursor - ' + dbFile LOG.error(message) return False try: db.execute('''CREATE TABLE msgIngest ( satellite TEXT , start_time TEXT , bandsSectors TEXT , band_status TEXT , image_status)''') except Exception as e: message = 'Error: ' + str(e) LOG.error(message) message = 'failed db.excute - ' + dbFile LOG.error(message) LOG.debug('finished creating the database file') conn.commit() db.close() conn.close() return def table_insert(insertInfo, dbFile): import logging LOG = logging.getLogger(__name__) satellite = insertInfo['satellite_ID'] start_time = insertInfo['start_time'] band = insertInfo['band'] band_sector = insertInfo['band_sector'] try: conn = sqlite3.connect(dbFile) except Exception as e: message = 'Error: ' + str(e) LOG.error(message) message = 'failed connection - ' + dbFile LOG.error(message) return False try: db = conn.cursor() except Exception as e: message = 'Error: ' + str(e) LOG.error(message) message = 'failed cursor - ' + dbFile LOG.error(message) return False try: db.execute( 'SELECT satellite, start_time, bandsSectors FROM msgIngest WHERE (satellite=? AND start_time=?)', (satellite, start_time)) except Exception as e: message = 'Error: ' + str(e) LOG.error(message) message = 'failed check duplicate ' + dbFile LOG.error(message) return False satMatch = db.fetchall() # New entry for image time if len(satMatch) == 0: LOG.debug('New entry') image_status = '' band_status = '' bandsSectors = insertInfo['band_sector'] LOG.debug('before try for inserting') try: message = 'Inserting into database ' + str(satellite) + ' ' + str(band) + ' ' + str(start_time) LOG.debug(message) db.execute( '''INSERT INTO msgIngest VALUES(?,?,?,?,?);''', (satellite, start_time, bandsSectors, band_status, image_status) ) except Exception as e: message = 'Error: ' + str(e) LOG.error(message) message = 'failed insert ' + dbFile LOG.error(message) try: LOG.debug('publishing for new entry') publish_message(insertInfo) except Exception as e: message = str(e) LOG.error('failed to publish message') LOG.error(message) elif band_sector in satMatch[0][2]: message = 'Already in database ' + str(satellite) + ' ' + str(band) + ' ' + str(start_time) LOG.debug(message) db.close() conn.close() return True else: band_sector_list = satMatch[0][2].split(',') band_sector_list.append(band_sector) bandsSectors = ','.join(band_sector_list) task = (bandsSectors, start_time, satellite) LOG.debug('updating bandsSectors ' + bandsSectors) sql = '''UPDATE msgIngest SET bandsSectors = ? WHERE start_time = ? AND satellite = ?''' try: db.execute(sql, task) conn.commit() except Exception as e: message = 'Error: ' + str(e) LOG.error(message) message = 'failed updating table' LOG.error(message) db.close() conn.close() return False publish_message(insertInfo) conn.commit() db.close() conn.close() return True def create_rotating_log(): import logging.handlers LOG = logging.getLogger(__name__) # # setup logging # outLogFile = os.path.join(os.environ['LOG_DIR'], 'msg-event-processing.log') if outLogFile is None: return logFormat = '%(asctime)s - %(levelname)s - %(message)s' logger = logging.getLogger() logger.setLevel(logging.DEBUG) logging.getLogger('pika').propagate = False handler = logging.handlers.RotatingFileHandler(outLogFile, maxBytes=100000000, backupCount=5) formatter = logging.Formatter(logFormat, datefmt='%m/%d/%Y %H:%M:%S ') handler.setFormatter(formatter) logger.addHandler(handler) return True if __name__ == '__main__': import sys import os os.environ['ROOT_DIR'] = '/home/oper/msg-event-processing' os.environ['EVENT_DIR'] = os.path.join(os.environ['ROOT_DIR'], 'event_files') os.environ['EVENT_FILES'] = os.path.join(os.environ['EVENT_DIR'], '*.event') os.environ['ADMIN_DIR'] = os.path.join(os.environ['ROOT_DIR'], 'admin') os.environ['BIN_DIR'] = os.path.join(os.environ['ROOT_DIR'], 'bin') os.environ['LOG_DIR'] = os.path.join(os.environ['ROOT_DIR'], 'log') os.environ['DATABASE_DIR'] = os.path.join(os.environ['ROOT_DIR'], 'databases') if 'prime' in os.environ['WHICH_SAT']: os.environ['GROUP'] = 'MSGEND' else: os.environ['GROUP'] = 'IODCEND' main(sys.argv)