diff --git a/src/python/WMArchive/PySpark/RecordAggregator.py b/src/python/WMArchive/PySpark/RecordAggregator.py index e5f4d50..22fac15 100644 --- a/src/python/WMArchive/PySpark/RecordAggregator.py +++ b/src/python/WMArchive/PySpark/RecordAggregator.py @@ -159,6 +159,16 @@ def get_paths(d, dd, key): return stats +def serialize_stats(stats): + "Serialize all parts of stats" + for rec in stats: + scope = rec['scope'] + sdate = scope['start_date'] + scope['start_date'] = sdate.isoformat() + edate = scope['end_date'] + scope['end_date'] = edate.isoformat() + rec['scope'] = scope + return stats class MapReduce(object): def __init__(self, spec=None): @@ -200,14 +210,7 @@ def reducer(self, records): if self.verbose: print("### total number of collected stats", len(stats)) with open('/tmp/wma_agg.json', 'w') as ostream: - for rec in stats: - scope = rec['scope'] - sdate = scope['start_date'] - scope['start_date'] = sdate.isoformat() - edate = scope['end_date'] - scope['end_date'] = edate.isoformat() - rec['scope'] = scope - ostream.write(json.dumps(stats)) + ostream.write(json.dumps(serialize_stats(stats))) if len(stats): try: # store to mongoDB @@ -223,4 +226,4 @@ def reducer(self, records): if self.verbose: print("--- {} seconds ---".format(time.time() - self.start_time)) - return stats + return serialize_stats(stats) diff --git a/src/python/WMArchive/Tools/myspark.py b/src/python/WMArchive/Tools/myspark.py index 7cbada2..5a25bb9 100755 --- a/src/python/WMArchive/Tools/myspark.py +++ b/src/python/WMArchive/Tools/myspark.py @@ -38,6 +38,8 @@ # WMArchive modules import WMArchive from WMArchive.Utils.Utils import htime, wmaHash, range_dates +# stopmAMQ API +from WMCore.Services.StompAMQ.StompAMQ import StompAMQ try: from wmarchive_config import HDIR @@ -85,6 +87,9 @@ def __init__(self): dest="verbose", default=False, help="verbose output") self.parser.add_argument("--records-output", action="store", dest="rout", default="", help="Output file for records") + msg = "Send WMArchive results via StompAMQ to a broker, provide broker credentials in JSON file" + self.parser.add_argument("--amq", action="store", + dest="amq", default="", help=msg) def x509(): "Helper function to get x509 either from env or tmp file" @@ -335,6 +340,16 @@ def run(schema_file, data_path, script=None, spec_file=None, verbose=None, rout= logger.info("Elapsed time %s" % htime(time.time()-time0)) return out +def credentials(fname=None): + "Read credentials from WMA_BROKER environment" + if not fname: + fname = os.environ.get('WMA_BROKER', '') + if not os.path.isfile(fname): + return {} + with open(fname, 'r') as istream: + data = json.load(istream) + return data + def is_spec(data): "Check if given data is WMArchive spec" if not isinstance(data, dict): @@ -391,6 +406,14 @@ def main(): data['dtype'] = 'job' pdata = dict(job=data) postdata(opts.store, pdata, opts.ckey, opts.cert, opts.verbose) + elif opts.amq: + creds = credentials(opts.amq) + host, port = creds['host_and_ports'].split(':') + if creds: + print("### Send %s docs via StompAMQ" % len(results)) + amq = StompAMQ(creds['username'], creds['password'], \ + creds['producer'], creds['topic'], [(host, port)]) + amq.send(results) else: print(results)