diff --git a/src/python/WMComponent/AgentStatusWatcher/AgentStatusPoller.py b/src/python/WMComponent/AgentStatusWatcher/AgentStatusPoller.py index 3319409175..52b603381e 100644 --- a/src/python/WMComponent/AgentStatusWatcher/AgentStatusPoller.py +++ b/src/python/WMComponent/AgentStatusWatcher/AgentStatusPoller.py @@ -167,26 +167,18 @@ def collectWorkQueueInfo(self): return results - def collectCouchDBInfo(self): + def checkCouchStatus(self): """ - This method collects information from CouchDB, such that it can report - on any problematic replication task + This method checks whether CouchDB is running properly and it also + verifies whether all the replication tasks are progressing as expected :return: a dictionary with the status for CouchServer """ couchInfo = {'name': 'CouchServer', 'status': 'ok', 'error_message': ""} - replicationDocs = self.localCouchMonitor.getSchedulerDocs() - - if len(replicationDocs) != len(self.replicatorDocs): - msg = "Could not find all the replication documents in CouchDB." - msg += "Scheduler docs report the following: {}".format(replicationDocs) - couchInfo.update({'status': 'error', 'error_message': msg}) - return couchInfo - - for rp in replicationDocs: - if rp['state'].lower() not in ["pending", "running"]: - couchInfo['status'] = 'error' - couchInfo['error_message'] = rp.get('error_message', rp.get('info')) + for rp in self.replicatorDocs: + cInfo = self.localCouchMonitor.checkCouchServerStatus(rp['source'], rp['target']) + if cInfo['status'] != 'ok': + couchInfo.update({'status': 'error', 'error_message': cInfo['error_message']}) return couchInfo @@ -214,7 +206,7 @@ def collectAgentInfo(self): else: agentInfo['drain_mode'] = False - couchInfo = self.collectCouchDBInfo() + couchInfo = self.checkCouchStatus() if couchInfo['status'] != 'ok': agentInfo['down_components'].append(couchInfo['name']) agentInfo['status'] = couchInfo['status'] diff --git a/src/python/WMCore/Database/CMSCouch.py b/src/python/WMCore/Database/CMSCouch.py index 1b3eda8fc4..a659350a87 100644 --- a/src/python/WMCore/Database/CMSCouch.py +++ b/src/python/WMCore/Database/CMSCouch.py @@ -960,6 +960,13 @@ def __init__(self, dburl='http://localhost:5984', usePYCurl=True, ckey=None, cer self.ckey = ckey self.cert = cert + def getCouchWelcome(self): + """ + Retrieve CouchDB welcome information (which includes the version number) + :return: a dictionary + """ + return self.get('') + def listDatabases(self): "List all the databases the server hosts" return self.get('/_all_dbs') @@ -1149,8 +1156,10 @@ def __init__(self, couchURL): self.couchServer = CouchServer(couchURL) self.replicatorDB = self.couchServer.connectDatabase('_replicator', False) - # this is set {source: {taget: update_sequence}} - self.previousUpdateSequence = {} + + # use the CouchDB version to decide which APIs and schema is available + couchInfo = self.couchServer.getCouchWelcome() + self.couchVersion = couchInfo.get("version") def deleteReplicatorDocs(self, source=None, target=None, repDocs=None): if repDocs is None: @@ -1242,21 +1251,30 @@ def getSchedulerDocs(self): def checkCouchServerStatus(self, source, target): """ Check the status of a specific replication task + :param source: string with the source database :param target: string with the target database :return: a dictionary with the current state of that replication """ - replDocs = self.getSchedulerDocs() passwdStrippedTarget = target.split("@")[-1] + if self.couchVersion == "1.6.1": + activeTasks = self.couchServer.getActiveTasks() + # keep only replications + activeTasks = [task for task in activeTasks if task["type"].lower() == "replication"] + else: + # then assume it's CouchDB >= 3.x + activeTasks = self.getSchedulerDocs() - for replication in replDocs: + for replication in activeTasks: if passwdStrippedTarget in replication.get("target", ""): - if replication.get('state', '').lower() in ["pending", "running"]: + # note that CouchDB 1.6 does not have this 'state' key + if replication.get('state', 'running').lower() in ["pending", "running"]: # now check the replication based on update_on attribute - if self.checkReplicationStatus(replication, source, passwdStrippedTarget): + if self.isReplicationOK(replication, source, passwdStrippedTarget): return {'status': 'ok'} else: - msg = "Replication from %s to %s is stale" % (source, target) + msg = f"Replication from {source} to {target} is stale " + msg += f"in state: {replication.get('state')}" return {'status': 'error', 'error_message': msg} else: msg = "Replication stopped from %s to %s.\n" % (source, passwdStrippedTarget) @@ -1265,7 +1283,7 @@ def checkCouchServerStatus(self, source, target): msg = "Failed to find a replication document from %s to %s.\n" % (source, passwdStrippedTarget) return {'status': 'down', 'error_message': msg} - def checkReplicationStatus(self, replInfo, source, target): + def isReplicationOK(self, replInfo, source, target): """ Checks whether a given replication task has been updated over the last 5 minutes.