diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..f0ffb35 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +// Place your settings in this file to overwrite default and user settings. +{ + "editor.insertSpaces": false +} \ No newline at end of file diff --git a/onedrive_d/od_glob.py b/onedrive_d/od_glob.py index f6cc465..a95d4af 100644 --- a/onedrive_d/od_glob.py +++ b/onedrive_d/od_glob.py @@ -7,6 +7,7 @@ import os import sys import logging +import logging.handlers import atexit import json from calendar import timegm @@ -44,7 +45,7 @@ def get_logger(level=logging.DEBUG, file_path=None): logger_instance.setLevel(level) if file_path is not None: logger_instance.propagate = False - logger_fh = logging.FileHandler(file_path, 'a') + logger_fh = logging.handlers.WatchedFileHandler(file_path, 'a') logger_fh.setLevel(level) logger_instance.addHandler(logger_fh) atexit.register(flush_log_at_shutdown) @@ -118,8 +119,6 @@ class ConfigSet: 'LAST_RUN_TIMESTAMP': '1970-01-01T00:00:00+0000' } - logger = get_logger() - OS_HOSTNAME = os.uname()[1] OS_USERNAME = os.getenv('SUDO_USER') @@ -129,9 +128,12 @@ class ConfigSet: def __init__(self, setup_mode=False): # no locking is necessary because the code is run way before multithreading if not ConfigSet.initialized: - if ConfigSet.OS_USERNAME is None or ConfigSet.OS_USERNAME == '': - ConfigSet.OS_USERNAME = os.getenv('USER') - if ConfigSet.OS_USERNAME is None or ConfigSet.OS_USERNAME == '': + if not ConfigSet.OS_USERNAME: + for env_key in ['USER', 'LOGNAME']: + ConfigSet.OS_USERNAME = os.getenv(env_key) + if ConfigSet.OS_USERNAME: + break + if not ConfigSet.OS_USERNAME: get_logger().critical('cannot find current logged-in user.') sys.exit(1) ConfigSet.OS_USER_ID = getpwnam(ConfigSet.OS_USERNAME).pw_uid @@ -166,7 +168,7 @@ def __init__(self, setup_mode=False): self.ignore_list = od_ignore_list.IgnoreList( ConfigSet.APP_IGNORE_FILE, ConfigSet.params['ONEDRIVE_ROOT_PATH']) else: - ConfigSet.logger.info('ignore list file was not found.') + get_logger().info('ignore list file was not found.') ConfigSet.ignore_list = None def set_root_path(self, path): diff --git a/onedrive_d/od_mon_cli.py b/onedrive_d/od_mon_cli.py index 2742fe8..855bb1e 100644 --- a/onedrive_d/od_mon_cli.py +++ b/onedrive_d/od_mon_cli.py @@ -88,9 +88,12 @@ def cleanup(self): self.entrymgr.close() self.logger.debug('entry manager closed.') if self.inotify_thread is not None: - self.inotify_thread.stop() - self.inotify_thread.join() - self.logger.debug('inotify thread stopped.') + try: + self.inotify_thread.stop() + self.inotify_thread.join() + self.logger.debug('inotify thread stopped.') + except Exception as e: + self.logger.exception('inotify cleanup exception') if self.taskmgr is not None: self.taskmgr.clean_tasks() self.logger.debug('task queue cleaned.') diff --git a/onedrive_d/od_onedrive_api.py b/onedrive_d/od_onedrive_api.py index 011a78a..3cd3726 100644 --- a/onedrive_d/od_onedrive_api.py +++ b/onedrive_d/od_onedrive_api.py @@ -28,6 +28,10 @@ from time import sleep from . import od_glob from . import od_thread_manager +try: + from requests.packages.urllib3.exceptions import InsecureRequestWarning +except: + pass api_instance = None @@ -96,6 +100,10 @@ class OneDriveAPI: threadman = od_thread_manager.get_instance() def __init__(self, client_id, client_secret, client_scope=CLIENT_SCOPE, redirect_uri=REDIRECT_URI): + try: + requests.packages.urllib3.disable_warnings(InsecureRequestWarning) + except: + pass self.client_access_token = None self.client_refresh_token = None self.client_id = client_id @@ -188,7 +196,8 @@ def get_access_token(self, code=None, uri=None): self.set_refresh_token(response['refresh_token']) self.set_user_id(response['user_id']) return response - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() return self.get_access_token(code, uri) @@ -203,24 +212,26 @@ def refresh_token(self, token): } while True: try: - request = requests.post(OneDriveAPI.OAUTH_TOKEN_URI, data=params) + request = requests.post(OneDriveAPI.OAUTH_TOKEN_URI, data=params, verify=False) response = self.parse_response(request, OneDriveAPIException) self.set_access_token(response['access_token']) self.set_refresh_token(response['refresh_token']) self.set_user_id(response['user_id']) return response - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() def sign_out(self): while True: try: - r = self.http_client.get(OneDriveAPI.OAUTH_SIGNOUT_URI + '?client_id=' + self.client_id + '&redirect_uri=' + self.client_redirect_uri) + r = self.http_client.get(OneDriveAPI.OAUTH_SIGNOUT_URI + '?client_id=' + self.client_id + '&redirect_uri=' + self.client_redirect_uri, verify=False) return self.parse_response(r, OneDriveAuthError) except OneDriveAuthError: self.auto_recover_auth_error() - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() @@ -230,11 +241,12 @@ def get_recent_docs(self): def get_quota(self, user_id='me'): while True: try: - r = self.http_client.get(OneDriveAPI.API_URI + user_id + '/skydrive/quota') + r = self.http_client.get(OneDriveAPI.API_URI + user_id + '/skydrive/quota', verify=False) return self.parse_response(r, OneDriveAPIException) except OneDriveAuthError: self.auto_recover_auth_error() - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() @@ -243,12 +255,13 @@ def get_root_entry_name(self): def get_property(self, entry_id='me/skydrive'): try: - r = self.http_client.get(OneDriveAPI.API_URI + entry_id) + r = self.http_client.get(OneDriveAPI.API_URI + entry_id, verify=False) return self.parse_response(r, OneDriveAPIException) except OneDriveAuthError: self.auto_recover_auth_error() return self.get_property(entry_id) - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() return self.get_property(entry_id) @@ -266,11 +279,12 @@ def set_property(self, entry_id, **kwargs): while True: try: r = self.http_client.put( - OneDriveAPI.API_URI + entry_id, data=json.dumps(kwargs), headers=headers) + OneDriveAPI.API_URI + entry_id, data=json.dumps(kwargs), headers=headers, verify=False) return self.parse_response(r, OneDriveAPIException) except OneDriveAuthError: self.auto_recover_auth_error() - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() @@ -288,11 +302,12 @@ def get_link(self, entry_id, type='r'): while True: try: - r = self.http_client.get(OneDriveAPI.API_URI + entry_id + '/' + type) + r = self.http_client.get(OneDriveAPI.API_URI + entry_id + '/' + type, verify=False) return self.parse_response(r, OneDriveAPIException)['source'] except OneDriveAuthError: self.auto_recover_auth_error() - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() @@ -302,11 +317,12 @@ def list_entries(self, folder_id='me/skydrive', type='files'): """ while True: try: - r = self.http_client.get(OneDriveAPI.API_URI + folder_id + '/' + type) + r = self.http_client.get(OneDriveAPI.API_URI + folder_id + '/' + type, verify=False) return self.parse_response(r, OneDriveAPIException)['data'] except OneDriveAuthError: self.auto_recover_auth_error() - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() except OneDriveServerInternalError as e: @@ -324,11 +340,12 @@ def mkdir(self, folder_name, parent_id='me/skydrive'): uri = OneDriveAPI.API_URI + parent_id while True: try: - r = self.http_client.post(uri, data=json.dumps(data), headers=headers) + r = self.http_client.post(uri, data=json.dumps(data), headers=headers, verify=False) return self.parse_response(r, OneDriveAPIException, requests.codes.created) except OneDriveAuthError: self.auto_recover_auth_error() - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() @@ -346,14 +363,15 @@ def cp(self, target_id, dest_folder_id, overwrite=True, type='COPY'): } uri = OneDriveAPI.API_URI + target_id + '?overwrite=' + str(overwrite) req = requests.Request( - type, uri, data=json.dumps(data), headers=headers).prepare() + type, uri, data=json.dumps(data), headers=headers, verify=False).prepare() while True: try: r = self.http_client.send(req) return self.parse_response(r, OneDriveAPIException, requests.codes.created) except OneDriveAuthError: self.auto_recover_auth_error() - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() except OneDriveServerInternalError as e: @@ -411,14 +429,14 @@ def bits_put(self, name, folder_id, local_path=None, block_size=1048576): # BITS: Create-Session headers = { 'X-Http-Method-Override': 'BITS_POST', - 'Content-Length': 0, + 'Content-Length': '0', 'BITS-Packet-Type': 'Create-Session', 'BITS-Supported-Protocols': '{7df0354d-249b-430f-820d-3d2a9bef4931}' } self.logger.debug('getting session token for BITS upload...') while True: try: - response = self.http_client.request('post', url, headers=headers) + response = self.http_client.request('post', url, headers=headers, verify=False) if response.status_code != 201: if 'www-authenticate' in response.headers and 'invalid_token' in response.headers['www-authenticate']: response.close() @@ -434,7 +452,8 @@ def bits_put(self, name, folder_id, local_path=None, block_size=1048576): break except OneDriveAuthError: self.auto_recover_auth_error() - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() del headers @@ -456,7 +475,7 @@ def bits_put(self, name, folder_id, local_path=None, block_size=1048576): 'BITS-Packet-Type': 'Fragment', 'BITS-Session-Id': session_id, 'Content-Range': 'bytes {}-{}/{}'.format(source_cursor, target_cursor, source_size) - }) + }, verify=False) if response.status_code != requests.codes.ok: # unknown error. better log it for future analysis self.logger.debug('an error occurred uploading the block. HTTP %d.', response.status_code) @@ -471,7 +490,8 @@ def bits_put(self, name, folder_id, local_path=None, block_size=1048576): response.close() del data # sleep(1) - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') del data self.threadman.hang_caller() @@ -493,11 +513,11 @@ def bits_put(self, name, folder_id, local_path=None, block_size=1048576): 'X-Http-Method-Override': 'BITS_POST', 'BITS-Packet-Type': 'Close-Session', 'BITS-Session-Id': session_id, - 'Content-Length': 0 + 'Content-Length': '0' } while True: try: - response = self.http_client.request('post', url, headers=headers) + response = self.http_client.request('post', url, headers=headers, verify=False) if response.status_code != requests.codes.ok and response.status_code != requests.codes.created: # when token expires, server return HTTP 500 # www-authenticate: 'Bearer realm="OneDriveAPI", error="expired_token", error_description="Auth token expired. Try refreshing."' @@ -517,7 +537,8 @@ def bits_put(self, name, folder_id, local_path=None, block_size=1048576): return self.get_property('file.' + res_id[:res_id.index('!')] + '.' + res_id) except OneDriveAuthError: self.auto_recover_auth_error() - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() @@ -569,7 +590,7 @@ def put(self, name, folder_id='me/skydrive', upload_location=None, local_path=No while True: try: - r = self.http_client.put(uri, data=data) + r = self.http_client.put(uri, data=data, verify=False) ret = r.json() if r.status_code != requests.codes.ok and r.status_code != requests.codes.created: # TODO: try testing this @@ -580,7 +601,8 @@ def put(self, name, folder_id='me/skydrive', upload_location=None, local_path=No return self.get_property(ret['id']) except OneDriveAuthError: self.auto_recover_auth_error() - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() except OneDriveServerInternalError as e: @@ -603,7 +625,7 @@ def get_by_blocks(self, entry_id, local_path, file_size, block_size): r = self.http_client.get(OneDriveAPI.API_URI + entry_id + '/content', headers={ 'Range': 'bytes={0}-{1}'.format(cursor, target) - }) + }, verify=False) if r.status_code == requests.codes.ok or r.status_code == requests.codes.partial: # sample data: 'bytes 12582912-12927920/12927921' range_unit, range_str = r.headers['content-range'].split(' ') @@ -623,7 +645,8 @@ def get_by_blocks(self, entry_id, local_path, file_size, block_size): # return False except OneDriveAuthError: self.auto_recover_auth_error() - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() f.close() @@ -631,6 +654,10 @@ def get_by_blocks(self, entry_id, local_path, file_size, block_size): # fcntl.lockf(f, fcntl.LOCK_UN) return True + def get_size(self, entry_id): + r = self.http_client.get(OneDriveAPI.API_URI + entry_id + '/content', verify=False) + self.logger.info('filesize ' + entry_id + ' size is: ' + r.headers['content-length']) + def get(self, entry_id, local_path=None): """ Fetching content of OneNote files will raise OneDriveAPIException: @@ -638,7 +665,7 @@ def get(self, entry_id, local_path=None): """ while True: try: - r = self.http_client.get(OneDriveAPI.API_URI + entry_id + '/content') + r = self.http_client.get(OneDriveAPI.API_URI + entry_id + '/content', verify=False) if r.status_code != requests.codes.ok: ret = r.json() # TODO: try testing this @@ -654,7 +681,8 @@ def get(self, entry_id, local_path=None): return r.content except OneDriveAuthError: self.auto_recover_auth_error() - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() except OneDriveServerInternalError as e: @@ -667,9 +695,10 @@ def rm(self, entry_id): """ while True: try: - self.http_client.delete(OneDriveAPI.API_URI + entry_id) + self.http_client.delete(OneDriveAPI.API_URI + entry_id, verify=False) return - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() except OneDriveServerInternalError as e: @@ -679,21 +708,23 @@ def rm(self, entry_id): def get_user_info(self, user_id='me'): while True: try: - r = self.http_client.get(OneDriveAPI.API_URI + user_id) + r = self.http_client.get(OneDriveAPI.API_URI + user_id, verify=False) return self.parse_response(r, OneDriveAPIException, requests.codes.ok) except OneDriveAuthError: self.auto_recover_auth_error() - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() def get_contact_list(self, user_id='me'): while True: try: - r = self.http_client.get(OneDriveAPI.API_URI + user_id + '/friends') + r = self.http_client.get(OneDriveAPI.API_URI + user_id + '/friends', verify=False) return self.parse_response(r, OneDriveAPIException, requests.codes.ok) except OneDriveAuthError: self.auto_recover_auth_error() - except requests.exceptions.ConnectionError: + except requests.exceptions.ConnectionError as e: + self.logger.exception(e); self.logger.info('network connection error.') self.threadman.hang_caller() diff --git a/onedrive_d/od_sqlite.py b/onedrive_d/od_sqlite.py index f026ae1..9e63dcb 100644 --- a/onedrive_d/od_sqlite.py +++ b/onedrive_d/od_sqlite.py @@ -116,12 +116,30 @@ def del_task(self, task_id): self.acquire_lock() self.cursor.execute('DELETE FROM tasks WHERE rowid=?', (task_id, )) self.release_lock() + self.count_tasks() def clean_tasks(self): self.acquire_lock() self.cursor.execute('DELETE FROM tasks') self.release_lock() - + + def count_tasks(self): + self.acquire_lock(); + self.cursor.execute('SELECT COUNT(*) FROM tasks') + row = self.cursor.fetchone(); + if row is None: + self.release_lock(); + return + self.release_lock() + if row[0] == 0: + self.logger.info('No more tasks in queue') + return + if row[0] % 10 == 0: + self.logger.info('Got ' + str(row[0]) + ' tasks in queue') + return + self.logger.debug('Got ' + str(row[0]) + ' tasks in queue') + + def dump(self): self.acquire_lock() ret = TaskManager.db.iterdump() diff --git a/onedrive_d/od_worker_thread.py b/onedrive_d/od_worker_thread.py index 726fc83..53e9305 100644 --- a/onedrive_d/od_worker_thread.py +++ b/onedrive_d/od_worker_thread.py @@ -167,7 +167,7 @@ def sync_dir(self, task): if entry['name'] in local_entries: local_entries.remove(entry['name']) else: - self.logger.info('skipped file "' + task['local_path'] + '/' + entry['name'] + '" of unsupported type "' + entry['type'] + '".') + self.logger.warning('skipped file "' + task['local_path'] + '/' + entry['name'] + '" of unsupported type "' + entry['type'] + '".') for ent_name in local_entries: # untouched local files @@ -271,16 +271,18 @@ def analyze_file_path(self, local_path, remote_parent_id, entry, local_entries): # just fix the record self.entrymgr.update_entry(local_path=local_path, obj=entry) else: - self.logger.warning('case1: ' + str(local_mtime) + ',' + + # in some cases the API responds with a incorrect file size: http://stackoverflow.com/a/27031491, so this shouldn't be trusted + self.logger.warning('case1 (' + local_path + '): ' + str(local_mtime) + ',' + str(local_fsize) + ' vs ' + str(remote_mtime) + ',' + str(entry['size'])) - new_path = self.resolve_conflict(local_path, self.config.OS_HOSTNAME) - if new_path is None: - self.logger.critical('cannot rename file "' + local_path + '" to avoid conflict. Skip the conflicting remote file.') - return - # add the renamed local file to list so as to upload it later - local_entries.append(os.path.basename(new_path)) - # download the remote file to the path - self.taskmgr.add_task('dl', local_path, entry['id'], entry['parent_id'], args='add_row,', extra_info=json.dumps(entry)) + return + # new_path = self.resolve_conflict(local_path, self.config.OS_HOSTNAME) + # if new_path is None: + # self.logger.critical('cannot rename file "' + local_path + '" to avoid conflict. Skip the conflicting remote file.') + # return + # # add the renamed local file to list so as to upload it later + # local_entries.append(os.path.basename(new_path)) + # # download the remote file to the path + # self.taskmgr.add_task('dl', local_path, entry['id'], entry['parent_id'], args='add_row,', extra_info=json.dumps(entry)) else: # we have a previous record for reference if previous_entry['remote_id'] == entry['id']: @@ -294,7 +296,7 @@ def analyze_file_path(self, local_path, remote_parent_id, entry, local_entries): elif local_mtime != remote_mtime or entry['client_updated_time'] != previous_entry['client_updated_time']: # there may be more than one revisions between them # better keep both - self.logger.warning('case2: ' + str(local_mtime) + ',' + + self.logger.warning('case2 (' + local_path + '): ' + str(local_mtime) + ',' + str(local_fsize) + ' vs ' + str(remote_mtime) + ',' + str(entry['size'])) new_path = self.resolve_conflict(local_path, self.config.OS_HOSTNAME) if new_path is None: @@ -315,7 +317,7 @@ def analyze_file_path(self, local_path, remote_parent_id, entry, local_entries): if local_mtime != od_glob.str_to_time(previous_entry['client_updated_time']): # the local file was modified since its last sync # better keep it - self.logger.warning('case3: ' + str(local_mtime) + ',' + str(local_fsize) + ' vs ' + str( + self.logger.warning('case3 (' + local_path + '): ' + str(local_mtime) + ',' + str(local_fsize) + ' vs ' + str( od_glob.str_to_time(previous_entry['client_updated_time'])) + ',' + str(previous_entry['size'])) new_path = self.resolve_conflict(local_path, self.config.OS_HOSTNAME) if new_path is None: @@ -425,39 +427,44 @@ def move_remote_entry(self, task): def run(self): self.taskmgr = od_sqlite.TaskManager() self.entrymgr = od_sqlite.EntryManager() - while self.running: # not self.stop_event.is_set(): - self.taskmgr.dec_sem() - task = self.taskmgr.get_task() - if task is None: - self.logger.debug('got null task.') - continue - - self.logger.debug('got task: %s on "%s"', task['type'], task['local_path']) - - self.is_busy = True - od_inotify_thread.INotifyThread.pause_event.set() - if task['type'] == 'sy': - self.sync_dir(task) - elif task['type'] == 'rm': - self.remove_dir(task) - elif task['type'] == 'mk': - self.make_remote_dir(task) - elif task['type'] == 'up': - self.upload_file(task) - elif task['type'] == 'dl': - self.download_file(task) - elif task['type'] == 'mv': - self.move_remote_entry(task) - elif task['type'] == 'rf': - self.remove_file(task) - elif task['type'] == 'af': - pass - elif task['type'] == 'cp': - pass - else: - raise Exception('Unknown task type "' + task['type'] + '".') - od_inotify_thread.INotifyThread.pause_event.clear() - self.is_busy = False + try: + while self.running: # not self.stop_event.is_set(): + self.taskmgr.dec_sem() + task = self.taskmgr.get_task() + if task is None: + self.logger.debug('got null task.') + continue + + self.logger.debug('got task: %s on "%s"', task['type'], task['local_path']) + + self.is_busy = True + od_inotify_thread.INotifyThread.pause_event.set() + if task['type'] == 'sy': + self.sync_dir(task) + elif task['type'] == 'rm': + self.remove_dir(task) + elif task['type'] == 'mk': + self.make_remote_dir(task) + elif task['type'] == 'up': + self.upload_file(task) + elif task['type'] == 'dl': + self.download_file(task) + elif task['type'] == 'mv': + self.move_remote_entry(task) + elif task['type'] == 'rf': + self.remove_file(task) + elif task['type'] == 'af': + pass + elif task['type'] == 'cp': + pass + else: + raise Exception('Unknown task type "' + task['type'] + '".') + od_inotify_thread.INotifyThread.pause_event.clear() + self.is_busy = False + except Exception as e: + self.logger.exception('od_unexpected_exception') + raise + self.taskmgr = None self.entrymgr.close() self.logger.debug('stopped.')