diff --git a/.github/workflows/pypi-publish.yml b/.github/workflows/pypi-publish.yml index b55449d..10144d3 100644 --- a/.github/workflows/pypi-publish.yml +++ b/.github/workflows/pypi-publish.yml @@ -4,21 +4,22 @@ on: release: types: [created] +permissions: + contents: read + packages: read + jobs: build-and-publish: runs-on: ubuntu-latest environment: release - permissions: - id-token: write - steps: - uses: actions/checkout@v4 - name: Set up Python - uses: actions/setup-python@v3 + uses: actions/setup-python@v5 with: - python-version: '3.x' + python-version: 3.x - name: Install build dependencies run: | @@ -30,3 +31,5 @@ jobs: - name: Publish to PyPI uses: pypa/gh-action-pypi-publish@release/v1 + with: + password: ${{ secrets.PYPI_TOKEN }} diff --git a/.trunk/.gitignore b/.trunk/.gitignore new file mode 100644 index 0000000..15966d0 --- /dev/null +++ b/.trunk/.gitignore @@ -0,0 +1,9 @@ +*out +*logs +*actions +*notifications +*tools +plugins +user_trunk.yaml +user.yaml +tmp diff --git a/.trunk/configs/.isort.cfg b/.trunk/configs/.isort.cfg new file mode 100644 index 0000000..b9fb3f3 --- /dev/null +++ b/.trunk/configs/.isort.cfg @@ -0,0 +1,2 @@ +[settings] +profile=black diff --git a/.trunk/configs/.markdownlint.yaml b/.trunk/configs/.markdownlint.yaml new file mode 100644 index 0000000..b40ee9d --- /dev/null +++ b/.trunk/configs/.markdownlint.yaml @@ -0,0 +1,2 @@ +# Prettier friendly markdownlint config (all formatting rules disabled) +extends: markdownlint/style/prettier diff --git a/.trunk/configs/.yamllint.yaml b/.trunk/configs/.yamllint.yaml new file mode 100644 index 0000000..184e251 --- /dev/null +++ b/.trunk/configs/.yamllint.yaml @@ -0,0 +1,7 @@ +rules: + quoted-strings: + required: only-when-needed + extra-allowed: ["{|}"] + key-duplicates: {} + octal-values: + forbid-implicit-octal: true diff --git a/.trunk/configs/ruff.toml b/.trunk/configs/ruff.toml new file mode 100644 index 0000000..f5a235c --- /dev/null +++ b/.trunk/configs/ruff.toml @@ -0,0 +1,5 @@ +# Generic, formatter-friendly config. +select = ["B", "D3", "E", "F"] + +# Never enforce `E501` (line length violations). This should be handled by formatters. +ignore = ["E501"] diff --git a/.trunk/trunk.yaml b/.trunk/trunk.yaml new file mode 100644 index 0000000..855fca0 --- /dev/null +++ b/.trunk/trunk.yaml @@ -0,0 +1,32 @@ +# This file controls the behavior of Trunk: https://docs.trunk.io/cli +# To learn more about the format of this file, see https://docs.trunk.io/reference/trunk-yaml +version: 0.1 +cli: + version: 1.22.8 +# Trunk provides extensibility via plugins. (https://docs.trunk.io/plugins) +plugins: + sources: + - id: trunk + ref: v1.6.4 + uri: https://github.com/trunk-io/plugins +# Many linters and tools depend on runtimes - configure them here. (https://docs.trunk.io/runtimes) +runtimes: + enabled: + - node@18.12.1 + - python@3.10.8 +# This is the section where you manage your linters. (https://docs.trunk.io/check/configuration) +lint: + enabled: + - actionlint@1.7.4 + - bandit@1.7.10 + - black@24.10.0 + - checkov@3.2.291 + - git-diff-check + - isort@5.13.2 + - markdownlint@0.42.0 + - osv-scanner@1.9.1 + - prettier@3.3.3 + - ruff@0.7.3 + - taplo@0.9.3 + - trufflehog@3.83.6 + - yamllint@1.35.1 diff --git a/README.md b/README.md index ae62c71..9e740d5 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ Fetchtastic is a utility for downloading and managing the latest Meshtastic Andr 1. **Install Termux**: Download and install [Termux](https://f-droid.org/en/packages/com.termux/) from F-Droid. 2. **Install Termux Boot**: Download and install [Termux Boot](https://f-droid.org/en/packages/com.termux.boot/) from F-Droid. 3. **Install Termux API**: Download and install [Termux API](https://f-droid.org/en/packages/com.termux.api/) from F-Droid. -4. *(Optional)* **Install ntfy**: Download and install [ntfy](https://f-droid.org/en/packages/io.heckel.ntfy/) from F-Droid. +4. _(Optional)_ **Install ntfy**: Download and install [ntfy](https://f-droid.org/en/packages/io.heckel.ntfy/) from F-Droid. ## Installation @@ -56,10 +56,10 @@ During setup, you will be able to: By default, Fetchtastic saves files and configuration in the `Downloads/Meshtastic` directory: - - **Configuration File**: `Downloads/Meshtastic/fetchtastic.yaml` - - **Log File**: `Downloads/Meshtastic/fetchtastic.log` - - **APKs**: `Downloads/Meshtastic/apks` - - **Firmware**: `Downloads/Meshtastic/firmware` +- **Configuration File**: `Downloads/Meshtastic/fetchtastic.yaml` +- **Log File**: `Downloads/Meshtastic/fetchtastic.log` +- **APKs**: `Downloads/Meshtastic/apks` +- **Firmware**: `Downloads/Meshtastic/firmware` You can manually edit the configuration file to change the settings. @@ -68,6 +68,7 @@ You can manually edit the configuration file to change the settings. During setup, you have the option to add a cron job that runs Fetchtastic daily at 3 AM. To modify the cron job, you can run: + ```bash crontab -e ``` diff --git a/app/cli.py b/app/cli.py index 0638e0d..e498b54 100644 --- a/app/cli.py +++ b/app/cli.py @@ -1,34 +1,39 @@ # app/cli.py import argparse -import subprocess import os import shutil -from . import downloader -from . import setup_config +import subprocess + +from . import downloader, setup_config + def main(): - parser = argparse.ArgumentParser(description="Fetchtastic - Meshtastic Firmware and APK Downloader") - subparsers = parser.add_subparsers(dest='command') + parser = argparse.ArgumentParser( + description="Fetchtastic - Meshtastic Firmware and APK Downloader" + ) + subparsers = parser.add_subparsers(dest="command") # Command to run setup - subparsers.add_parser('setup', help='Run the setup process') + subparsers.add_parser("setup", help="Run the setup process") # Command to download firmware and APKs - subparsers.add_parser('download', help='Download firmware and APKs') + subparsers.add_parser("download", help="Download firmware and APKs") # Command to display NTFY topic - subparsers.add_parser('topic', help='Display the current NTFY topic') + subparsers.add_parser("topic", help="Display the current NTFY topic") # Command to clean/remove Fetchtastic files and settings - subparsers.add_parser('clean', help='Remove Fetchtastic configuration, downloads, and cron jobs') + subparsers.add_parser( + "clean", help="Remove Fetchtastic configuration, downloads, and cron jobs" + ) args = parser.parse_args() - if args.command == 'setup': + if args.command == "setup": # Run the setup process setup_config.run_setup() - elif args.command == 'download': + elif args.command == "download": # Check if configuration exists if not setup_config.config_exists(): print("No configuration found. Running setup.") @@ -36,24 +41,33 @@ def main(): else: # Run the downloader downloader.main() - elif args.command == 'topic': + elif args.command == "topic": # Display the NTFY topic and prompt to copy to clipboard config = setup_config.load_config() - if config and config.get('NTFY_SERVER') and config.get('NTFY_TOPIC'): - ntfy_server = config['NTFY_SERVER'].rstrip('/') - ntfy_topic = config['NTFY_TOPIC'] + if config and config.get("NTFY_SERVER") and config.get("NTFY_TOPIC"): + ntfy_server = config["NTFY_SERVER"].rstrip("/") + ntfy_topic = config["NTFY_TOPIC"] full_url = f"{ntfy_server}/{ntfy_topic}" print(f"Current NTFY topic URL: {full_url}") print(f"Topic name: {ntfy_topic}") - copy_to_clipboard = input("Do you want to copy the topic name to the clipboard? [y/n] (default: yes): ").strip().lower() or 'y' - if copy_to_clipboard == 'y': + copy_to_clipboard = ( + input( + "Do you want to copy the topic name to the clipboard? [y/n] (default: yes): " + ) + .strip() + .lower() + or "y" + ) + if copy_to_clipboard == "y": copy_to_clipboard_termux(ntfy_topic) print("Topic name copied to clipboard.") else: print("You can copy the topic name from above.") else: - print("Notifications are not set up. Run 'fetchtastic setup' to configure notifications.") - elif args.command == 'clean': + print( + "Notifications are not set up. Run 'fetchtastic setup' to configure notifications." + ) + elif args.command == "clean": # Run the clean process run_clean() elif args.command is None: @@ -63,16 +77,23 @@ def main(): else: parser.print_help() + def copy_to_clipboard_termux(text): try: - subprocess.run(['termux-clipboard-set'], input=text.encode('utf-8'), check=True) + subprocess.run(["termux-clipboard-set"], input=text.encode("utf-8"), check=True) except Exception as e: print(f"An error occurred while copying to clipboard: {e}") + def run_clean(): - print("This will remove Fetchtastic configuration files, downloaded files, and cron job entries.") - confirm = input("Are you sure you want to proceed? [y/n] (default: no): ").strip().lower() or 'n' - if confirm != 'y': + print( + "This will remove Fetchtastic configuration files, downloaded files, and cron job entries." + ) + confirm = ( + input("Are you sure you want to proceed? [y/n] (default: no): ").strip().lower() + or "n" + ) + if confirm != "y": print("Clean operation cancelled.") return @@ -91,13 +112,23 @@ def run_clean(): # Remove cron job entries try: # Get current crontab entries - result = subprocess.run(['crontab', '-l'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + result = subprocess.run( + ["crontab", "-l"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True + ) if result.returncode == 0: existing_cron = result.stdout # Remove existing fetchtastic cron jobs - new_cron = '\n'.join([line for line in existing_cron.split('\n') if 'fetchtastic download' not in line]) + new_cron = "\n".join( + [ + line + for line in existing_cron.split("\n") + if "fetchtastic download" not in line + ] + ) # Update crontab - process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE, text=True) + process = subprocess.Popen( + ["crontab", "-"], stdin=subprocess.PIPE, text=True + ) process.communicate(input=new_cron) print("Removed Fetchtastic cron job entries.") except Exception as e: @@ -110,7 +141,10 @@ def run_clean(): print(f"Removed boot script: {boot_script}") print("Fetchtastic has been cleaned from your system.") - print("If you installed Fetchtastic via pip and wish to uninstall it, run 'pip uninstall fetchtastic'.") + print( + "If you installed Fetchtastic via pip and wish to uninstall it, run 'pip uninstall fetchtastic'." + ) + if __name__ == "__main__": main() diff --git a/app/downloader.py b/app/downloader.py index 4ade60c..23bf3f6 100644 --- a/app/downloader.py +++ b/app/downloader.py @@ -1,16 +1,18 @@ # app/downloader.py +import json import os -import requests -import zipfile import time -import json +import zipfile from datetime import datetime + +import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from . import setup_config + def main(): # Load configuration config = setup_config.load_config() @@ -30,14 +32,19 @@ def main(): exclude_patterns = config.get("EXCLUDE_PATTERNS", []) wifi_only = config.get("WIFI_ONLY", True) - selected_apk_patterns = config.get('SELECTED_APK_ASSETS', []) - selected_firmware_patterns = config.get('SELECTED_FIRMWARE_ASSETS', []) + selected_apk_patterns = config.get("SELECTED_APK_ASSETS", []) + selected_firmware_patterns = config.get("SELECTED_FIRMWARE_ASSETS", []) - download_dir = config.get('DOWNLOAD_DIR', os.path.join(os.path.expanduser("~"), "storage", "downloads", "Meshtastic")) + download_dir = config.get( + "DOWNLOAD_DIR", + os.path.join(os.path.expanduser("~"), "storage", "downloads", "Meshtastic"), + ) firmware_dir = os.path.join(download_dir, "firmware") apks_dir = os.path.join(download_dir, "apks") latest_android_release_file = os.path.join(apks_dir, "latest_android_release.txt") - latest_firmware_release_file = os.path.join(firmware_dir, "latest_firmware_release.txt") + latest_firmware_release_file = os.path.join( + firmware_dir, "latest_firmware_release.txt" + ) # Create necessary directories for dir_path in [download_dir, firmware_dir, apks_dir]: @@ -57,11 +64,13 @@ def send_ntfy_notification(message, title=None): try: ntfy_url = f"{ntfy_server.rstrip('/')}/{ntfy_topic}" headers = { - 'Content-Type': 'text/plain; charset=utf-8', + "Content-Type": "text/plain; charset=utf-8", } if title: - headers['Title'] = title - response = requests.post(ntfy_url, data=message.encode('utf-8'), headers=headers) + headers["Title"] = title + response = requests.post( + ntfy_url, data=message.encode("utf-8"), headers=headers, timeout=10 + ) response.raise_for_status() log_message(f"Notification sent to {ntfy_url}") except requests.exceptions.RequestException as e: @@ -75,7 +84,9 @@ def get_latest_releases(url, scan_count=10): response.raise_for_status() releases = response.json() # Sort releases by published date, descending order - sorted_releases = sorted(releases, key=lambda r: r['published_at'], reverse=True) + sorted_releases = sorted( + releases, key=lambda r: r["published_at"], reverse=True + ) # Limit the number of releases to be scanned return sorted_releases[:scan_count] @@ -84,15 +95,15 @@ def download_file(url, download_path): session = requests.Session() retry = Retry(connect=3, backoff_factor=1, status_forcelist=[502, 503, 504]) adapter = HTTPAdapter(max_retries=retry) - session.mount('https://', adapter) - session.mount('http://', adapter) + session.mount("https://", adapter) + session.mount("http://", adapter) try: if not os.path.exists(download_path): log_message(f"Downloading {url}") response = session.get(url, stream=True) response.raise_for_status() - with open(download_path, 'wb') as file: + with open(download_path, "wb") as file: for chunk in response.iter_content(1024): file.write(chunk) log_message(f"Downloaded {download_path}") @@ -103,14 +114,14 @@ def download_file(url, download_path): def is_connected_to_wifi(): try: - result = os.popen('termux-wifi-connectioninfo').read() + result = os.popen("termux-wifi-connectioninfo").read() if not result: # If result is empty, assume not connected return False data = json.loads(result) - supplicant_state = data.get('supplicant_state', '') - ip_address = data.get('ip', '') - if supplicant_state == 'COMPLETED' and ip_address != '': + supplicant_state = data.get("supplicant_state", "") + ip_address = data.get("ip", "") + if supplicant_state == "COMPLETED" and ip_address != "": return True else: return False @@ -121,7 +132,7 @@ def is_connected_to_wifi(): # Function to extract files from zip archives def extract_files(zip_path, extract_dir, patterns, exclude_patterns): try: - with zipfile.ZipFile(zip_path, 'r') as zip_ref: + with zipfile.ZipFile(zip_path, "r") as zip_ref: matched_files = [] for file_info in zip_ref.infolist(): file_name = file_info.filename @@ -134,21 +145,29 @@ def extract_files(zip_path, extract_dir, patterns, exclude_patterns): # Extract and flatten directory structure source = zip_ref.open(file_info) target_path = os.path.join(extract_dir, base_name) - with open(target_path, 'wb') as target_file: + with open(target_path, "wb") as target_file: target_file.write(source.read()) log_message(f"Extracted {base_name} to {extract_dir}") matched_files.append(base_name) break # Stop checking patterns for this file if not matched_files: - log_message(f"No files matched the extraction patterns in {zip_path}.") + log_message( + f"No files matched the extraction patterns in {zip_path}." + ) except zipfile.BadZipFile: log_message(f"Error: {zip_path} is a bad zip file and cannot be opened.") except Exception as e: - log_message(f"Error: An unexpected error occurred while extracting files from {zip_path}: {e}") + log_message( + f"Error: An unexpected error occurred while extracting files from {zip_path}: {e}" + ) # Cleanup function to keep only specific versions based on release tags def cleanup_old_versions(directory, releases_to_keep): - versions = [d for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))] + versions = [ + d + for d in os.listdir(directory) + if os.path.isdir(os.path.join(directory, d)) + ] for version in versions: if version not in releases_to_keep: version_path = os.path.join(directory, version) @@ -162,7 +181,15 @@ def cleanup_old_versions(directory, releases_to_keep): log_message(f"Removed directory: {version_path}") # Function to check for missing releases and download them if necessary - def check_and_download(releases, latest_release_file, release_type, download_dir, versions_to_keep, extract_patterns, selected_patterns=None): + def check_and_download( + releases, + latest_release_file, + release_type, + download_dir, + versions_to_keep, + extract_patterns, + selected_patterns=None, + ): downloaded_versions = [] new_versions_available = [] @@ -172,7 +199,7 @@ def check_and_download(releases, latest_release_file, release_type, download_dir # Load the latest release tag from file if available saved_release_tag = None if os.path.exists(latest_release_file): - with open(latest_release_file, 'r') as f: + with open(latest_release_file, "r") as f: saved_release_tag = f.read().strip() # Determine which releases to download @@ -181,13 +208,13 @@ def check_and_download(releases, latest_release_file, release_type, download_dir if downloads_skipped: # Collect new versions available for release in releases_to_download: - release_tag = release['tag_name'] + release_tag = release["tag_name"] if release_tag != saved_release_tag: new_versions_available.append(release_tag) return downloaded_versions, new_versions_available for release in releases_to_download: - release_tag = release['tag_name'] + release_tag = release["tag_name"] release_dir = os.path.join(download_dir, release_tag) if os.path.exists(release_dir) or release_tag == saved_release_tag: @@ -196,33 +223,47 @@ def check_and_download(releases, latest_release_file, release_type, download_dir # Proceed to download this version os.makedirs(release_dir, exist_ok=True) log_message(f"Downloading new {release_type} version: {release_tag}") - for asset in release['assets']: - file_name = asset['name'] + for asset in release["assets"]: + file_name = asset["name"] # Matching logic if selected_patterns: - if not any(pattern in file_name for pattern in selected_patterns): + if not any( + pattern in file_name for pattern in selected_patterns + ): continue # Skip this asset download_path = os.path.join(release_dir, file_name) - download_file(asset['browser_download_url'], download_path) - if auto_extract and file_name.endswith('.zip') and release_type == "Firmware": - extract_files(download_path, release_dir, extract_patterns, exclude_patterns) + download_file(asset["browser_download_url"], download_path) + if ( + auto_extract + and file_name.endswith(".zip") + and release_type == "Firmware" + ): + extract_files( + download_path, + release_dir, + extract_patterns, + exclude_patterns, + ) downloaded_versions.append(release_tag) # Only update latest_release_file if downloads occurred if downloaded_versions: - with open(latest_release_file, 'w') as f: + with open(latest_release_file, "w") as f: f.write(downloaded_versions[0]) # Create a list of all release tags to keep - release_tags_to_keep = [release['tag_name'] for release in releases_to_download] + release_tags_to_keep = [release["tag_name"] for release in releases_to_download] # Clean up old versions cleanup_old_versions(download_dir, release_tags_to_keep) # Collect new versions available for release in releases_to_download: - release_tag = release['tag_name'] - if release_tag != saved_release_tag and release_tag not in downloaded_versions: + release_tag = release["tag_name"] + if ( + release_tag != saved_release_tag + and release_tag not in downloaded_versions + ): new_versions_available.append(release_tag) return downloaded_versions, new_versions_available @@ -244,7 +285,9 @@ def check_and_download(releases, latest_release_file, release_type, download_dir new_apk_versions = [] # URLs for releases - android_releases_url = "https://api.github.com/repos/meshtastic/Meshtastic-Android/releases" + android_releases_url = ( + "https://api.github.com/repos/meshtastic/Meshtastic-Android/releases" + ) firmware_releases_url = "https://api.github.com/repos/meshtastic/firmware/releases" # Increase scan count to cover more releases for cleanup @@ -255,7 +298,9 @@ def check_and_download(releases, latest_release_file, release_type, download_dir if save_firmware and selected_firmware_patterns: versions_to_download = firmware_versions_to_keep - latest_firmware_releases = get_latest_releases(firmware_releases_url, releases_to_scan) + latest_firmware_releases = get_latest_releases( + firmware_releases_url, releases_to_scan + ) fw_downloaded, fw_new_versions = check_and_download( latest_firmware_releases, latest_firmware_release_file, @@ -263,17 +308,21 @@ def check_and_download(releases, latest_release_file, release_type, download_dir firmware_dir, firmware_versions_to_keep, extract_patterns, - selected_patterns=selected_firmware_patterns + selected_patterns=selected_firmware_patterns, ) downloaded_firmwares.extend(fw_downloaded) new_firmware_versions.extend(fw_new_versions) - log_message(f"Latest Firmware releases: {', '.join(release['tag_name'] for release in latest_firmware_releases[:versions_to_download])}") + log_message( + f"Latest Firmware releases: {', '.join(release['tag_name'] for release in latest_firmware_releases[:versions_to_download])}" + ) elif not selected_firmware_patterns: log_message("No firmware assets selected. Skipping firmware download.") if save_apks and selected_apk_patterns: versions_to_download = android_versions_to_keep - latest_android_releases = get_latest_releases(android_releases_url, releases_to_scan) + latest_android_releases = get_latest_releases( + android_releases_url, releases_to_scan + ) apk_downloaded, apk_new_versions = check_and_download( latest_android_releases, latest_android_release_file, @@ -281,29 +330,41 @@ def check_and_download(releases, latest_release_file, release_type, download_dir apks_dir, android_versions_to_keep, extract_patterns, - selected_patterns=selected_apk_patterns + selected_patterns=selected_apk_patterns, ) downloaded_apks.extend(apk_downloaded) new_apk_versions.extend(apk_new_versions) - log_message(f"Latest Android APK releases: {', '.join(release['tag_name'] for release in latest_android_releases[:versions_to_download])}") + log_message( + f"Latest Android APK releases: {', '.join(release['tag_name'] for release in latest_android_releases[:versions_to_download])}" + ) elif not selected_apk_patterns: log_message("No APK assets selected. Skipping APK download.") end_time = time.time() total_time = end_time - start_time - log_message(f"Finished the Meshtastic downloader. Total time taken: {total_time:.2f} seconds") + log_message( + f"Finished the Meshtastic downloader. Total time taken: {total_time:.2f} seconds" + ) if downloads_skipped: log_message("Not connected to Wi-Fi. Skipping all downloads.") # Prepare notification message - message_lines = ["New releases are available but downloads were skipped because the device is not connected to Wi-Fi."] + message_lines = [ + "New releases are available but downloads were skipped because the device is not connected to Wi-Fi." + ] if new_firmware_versions: - message_lines.append(f"Firmware versions available: {', '.join(new_firmware_versions)}") + message_lines.append( + f"Firmware versions available: {', '.join(new_firmware_versions)}" + ) if new_apk_versions: - message_lines.append(f"Android APK versions available: {', '.join(new_apk_versions)}") - notification_message = '\n'.join(message_lines) + f"\n{datetime.now()}" - log_message('\n'.join(message_lines)) - send_ntfy_notification(notification_message, title="Fetchtastic Downloads Skipped") + message_lines.append( + f"Android APK versions available: {', '.join(new_apk_versions)}" + ) + notification_message = "\n".join(message_lines) + f"\n{datetime.now()}" + log_message("\n".join(message_lines)) + send_ntfy_notification( + notification_message, title="Fetchtastic Downloads Skipped" + ) elif downloaded_firmwares or downloaded_apks: # Prepare notification messages notification_messages = [] @@ -313,8 +374,10 @@ def check_and_download(releases, latest_release_file, release_type, download_dir if downloaded_apks: message = f"Downloaded Android APK versions: {', '.join(downloaded_apks)}" notification_messages.append(message) - notification_message = '\n'.join(notification_messages) + f"\n{datetime.now()}" - send_ntfy_notification(notification_message, title="Fetchtastic Download Completed") + notification_message = "\n".join(notification_messages) + f"\n{datetime.now()}" + send_ntfy_notification( + notification_message, title="Fetchtastic Download Completed" + ) else: # No new downloads; everything is up to date message = ( @@ -324,5 +387,6 @@ def check_and_download(releases, latest_release_file, release_type, download_dir log_message(message) send_ntfy_notification(message, title="Fetchtastic Up to Date") + if __name__ == "__main__": main() diff --git a/app/menu_apk.py b/app/menu_apk.py index ffb8ebf..c0abce7 100644 --- a/app/menu_apk.py +++ b/app/menu_apk.py @@ -1,32 +1,40 @@ # app/menu_apk.py import re + import requests from pick import pick + def fetch_apk_assets(): - apk_releases_url = "https://api.github.com/repos/meshtastic/Meshtastic-Android/releases" - response = requests.get(apk_releases_url) + apk_releases_url = ( + "https://api.github.com/repos/meshtastic/Meshtastic-Android/releases" + ) + response = requests.get(apk_releases_url, timeout=10) response.raise_for_status() releases = response.json() # Get the latest release latest_release = releases[0] - assets = latest_release['assets'] - asset_names = [asset['name'] for asset in assets if asset['name'].endswith('.apk')] + assets = latest_release["assets"] + asset_names = [asset["name"] for asset in assets if asset["name"].endswith(".apk")] return asset_names + def extract_base_name(filename): # Remove version numbers and extensions from filename to get base pattern # Example: 'fdroidRelease-2.5.1.apk' -> 'fdroidRelease-' - base_name = re.sub(r'-\d+\.\d+\.\d+.*', '-', filename) - base_name = re.sub(r'\.apk$', '', base_name) + base_name = re.sub(r"-\d+\.\d+\.\d+.*", "-", filename) + base_name = re.sub(r"\.apk$", "", base_name) return base_name + def select_assets(assets): - title = '''Select the APK files you want to download (press SPACE to select, ENTER to confirm): -Note: These are files from the latest release. Version numbers may change in other releases.''' + title = """Select the APK files you want to download (press SPACE to select, ENTER to confirm): +Note: These are files from the latest release. Version numbers may change in other releases.""" options = assets - selected_options = pick(options, title, multiselect=True, min_selection_count=0, indicator='*') + selected_options = pick( + options, title, multiselect=True, min_selection_count=0, indicator="*" + ) selected_assets = [option[0] for option in selected_options] if not selected_assets: print("No APK files selected. APKs will not be downloaded.") @@ -39,15 +47,14 @@ def select_assets(assets): base_patterns.append(pattern) return base_patterns + def run_menu(): try: assets = fetch_apk_assets() selected_patterns = select_assets(assets) if selected_patterns is None: return None - return { - 'selected_assets': selected_patterns - } + return {"selected_assets": selected_patterns} except Exception as e: print(f"An error occurred: {e}") return None diff --git a/app/menu_firmware.py b/app/menu_firmware.py index ce8464d..33db71a 100644 --- a/app/menu_firmware.py +++ b/app/menu_firmware.py @@ -1,9 +1,11 @@ # app/menu_firmware.py import re + import requests from pick import pick + def fetch_firmware_assets(): firmware_releases_url = "https://api.github.com/repos/meshtastic/firmware/releases" response = requests.get(firmware_releases_url) @@ -11,22 +13,26 @@ def fetch_firmware_assets(): releases = response.json() # Get the latest release latest_release = releases[0] - assets = latest_release['assets'] - asset_names = [asset['name'] for asset in assets] + assets = latest_release["assets"] + asset_names = [asset["name"] for asset in assets] return asset_names + def extract_base_name(filename): # Remove version numbers and extensions from filename to get base pattern # Example: 'firmware-esp32-2.5.6.d55c08d.zip' -> 'firmware-esp32-' - base_name = re.sub(r'-\d+\.\d+\.\d+.*', '-', filename) - base_name = re.sub(r'\.zip$', '', base_name) + base_name = re.sub(r"-\d+\.\d+\.\d+.*", "-", filename) + base_name = re.sub(r"\.zip$", "", base_name) return base_name + def select_assets(assets): - title = '''Select the firmware files you want to download (press SPACE to select, ENTER to confirm): -Note: These are files from the latest release. Version numbers may change in other releases.''' + title = """Select the firmware files you want to download (press SPACE to select, ENTER to confirm): +Note: These are files from the latest release. Version numbers may change in other releases.""" options = assets - selected_options = pick(options, title, multiselect=True, min_selection_count=0, indicator='*') + selected_options = pick( + options, title, multiselect=True, min_selection_count=0, indicator="*" + ) selected_assets = [option[0] for option in selected_options] if not selected_assets: print("No firmware files selected. Firmware will not be downloaded.") @@ -39,15 +45,14 @@ def select_assets(assets): base_patterns.append(pattern) return base_patterns + def run_menu(): try: assets = fetch_firmware_assets() selected_patterns = select_assets(assets) if selected_patterns is None: return None - return { - 'selected_assets': selected_patterns - } + return {"selected_assets": selected_patterns} except Exception as e: print(f"An error occurred: {e}") return None diff --git a/app/setup_config.py b/app/setup_config.py index f34786e..6e4eb00 100644 --- a/app/setup_config.py +++ b/app/setup_config.py @@ -1,42 +1,47 @@ # app/setup_config.py import os -import sys -import yaml -import subprocess import random -import string import shutil -from . import menu_apk -from . import menu_firmware +import string +import subprocess + +import yaml + from . import downloader # Import downloader to perform first run +from . import menu_apk, menu_firmware + def get_downloads_dir(): # For Termux, use ~/storage/downloads - if 'com.termux' in os.environ.get('PREFIX', ''): + if "com.termux" in os.environ.get("PREFIX", ""): storage_downloads = os.path.expanduser("~/storage/downloads") if os.path.exists(storage_downloads): return storage_downloads # For other environments, use standard Downloads directories home_dir = os.path.expanduser("~") - downloads_dir = os.path.join(home_dir, 'Downloads') + downloads_dir = os.path.join(home_dir, "Downloads") if os.path.exists(downloads_dir): return downloads_dir - downloads_dir = os.path.join(home_dir, 'Download') + downloads_dir = os.path.join(home_dir, "Download") if os.path.exists(downloads_dir): return downloads_dir # Fallback to home directory return home_dir + DOWNLOADS_DIR = get_downloads_dir() -DEFAULT_CONFIG_DIR = os.path.join(DOWNLOADS_DIR, 'Meshtastic') -CONFIG_FILE = os.path.join(DEFAULT_CONFIG_DIR, 'fetchtastic.yaml') +DEFAULT_CONFIG_DIR = os.path.join(DOWNLOADS_DIR, "Meshtastic") +CONFIG_FILE = os.path.join(DEFAULT_CONFIG_DIR, "fetchtastic.yaml") + def config_exists(): return os.path.exists(CONFIG_FILE) + def is_termux(): - return 'com.termux' in os.environ.get('PREFIX', '') + return "com.termux" in os.environ.get("PREFIX", "") + def check_storage_setup(): # Check if the Termux storage directory and Downloads are set up and writable @@ -44,7 +49,11 @@ def check_storage_setup(): storage_downloads = os.path.expanduser("~/storage/downloads") while True: - if os.path.exists(storage_dir) and os.path.exists(storage_downloads) and os.access(storage_downloads, os.W_OK): + if ( + os.path.exists(storage_dir) + and os.path.exists(storage_downloads) + and os.access(storage_downloads, os.W_OK) + ): print("Termux storage access is already set up.") return True else: @@ -56,6 +65,7 @@ def check_storage_setup(): # Re-check if storage is set up continue + def run_setup(): print("Running Fetchtastic Setup...") @@ -74,24 +84,33 @@ def run_setup(): if config_exists(): # Load existing configuration config = load_config() - print("Existing configuration found. You can keep current settings or change them.") + print( + "Existing configuration found. You can keep current settings or change them." + ) else: # Initialize default configuration config = {} # Prompt to save APKs, firmware, or both - save_choice = input(f"Would you like to download APKs, firmware, or both? [a/f/b] (default: both): ").strip().lower() or 'both' - if save_choice == 'a': + save_choice = ( + input( + "Would you like to download APKs, firmware, or both? [a/f/b] (default: both): " + ) + .strip() + .lower() + or "both" + ) + if save_choice == "a": save_apks = True save_firmware = False - elif save_choice == 'f': + elif save_choice == "f": save_apks = False save_firmware = True else: save_apks = True save_firmware = True - config['SAVE_APKS'] = save_apks - config['SAVE_FIRMWARE'] = save_firmware + config["SAVE_APKS"] = save_apks + config["SAVE_FIRMWARE"] = save_firmware # Run the menu scripts based on user choices if save_apks: @@ -99,17 +118,17 @@ def run_setup(): if not apk_selection: print("No APK assets selected. APKs will not be downloaded.") save_apks = False - config['SAVE_APKS'] = False + config["SAVE_APKS"] = False else: - config['SELECTED_APK_ASSETS'] = apk_selection['selected_assets'] + config["SELECTED_APK_ASSETS"] = apk_selection["selected_assets"] if save_firmware: firmware_selection = menu_firmware.run_menu() if not firmware_selection: print("No firmware assets selected. Firmware will not be downloaded.") save_firmware = False - config['SAVE_FIRMWARE'] = False + config["SAVE_FIRMWARE"] = False else: - config['SELECTED_FIRMWARE_ASSETS'] = firmware_selection['selected_assets'] + config["SELECTED_FIRMWARE_ASSETS"] = firmware_selection["selected_assets"] # If both save_apks and save_firmware are False, inform the user and exit setup if not save_apks and not save_firmware: @@ -119,96 +138,133 @@ def run_setup(): # Prompt for number of versions to keep if save_apks: - current_versions = config.get('ANDROID_VERSIONS_TO_KEEP', 2) - android_versions_to_keep = input(f"How many versions of the Android app would you like to keep? (default is {current_versions}): ").strip() or str(current_versions) - config['ANDROID_VERSIONS_TO_KEEP'] = int(android_versions_to_keep) + current_versions = config.get("ANDROID_VERSIONS_TO_KEEP", 2) + android_versions_to_keep = input( + f"How many versions of the Android app would you like to keep? (default is {current_versions}): " + ).strip() or str(current_versions) + config["ANDROID_VERSIONS_TO_KEEP"] = int(android_versions_to_keep) if save_firmware: - current_versions = config.get('FIRMWARE_VERSIONS_TO_KEEP', 2) - firmware_versions_to_keep = input(f"How many versions of the firmware would you like to keep? (default is {current_versions}): ").strip() or str(current_versions) - config['FIRMWARE_VERSIONS_TO_KEEP'] = int(firmware_versions_to_keep) + current_versions = config.get("FIRMWARE_VERSIONS_TO_KEEP", 2) + firmware_versions_to_keep = input( + f"How many versions of the firmware would you like to keep? (default is {current_versions}): " + ).strip() or str(current_versions) + config["FIRMWARE_VERSIONS_TO_KEEP"] = int(firmware_versions_to_keep) # Prompt for automatic extraction - auto_extract_default = 'yes' if config.get('AUTO_EXTRACT', False) else 'no' - auto_extract = input(f"Would you like to automatically extract specific files from firmware zip archives? [y/n] (default: {auto_extract_default}): ").strip().lower() or auto_extract_default[0] - if auto_extract == 'y': - print("Enter the keywords to match for extraction from the firmware zip files, separated by spaces.") + auto_extract_default = "yes" if config.get("AUTO_EXTRACT", False) else "no" + auto_extract = ( + input( + f"Would you like to automatically extract specific files from firmware zip archives? [y/n] (default: {auto_extract_default}): " + ) + .strip() + .lower() + or auto_extract_default[0] + ) + if auto_extract == "y": + print( + "Enter the keywords to match for extraction from the firmware zip files, separated by spaces." + ) print("Example: rak4631- tbeam-2 t1000-e- tlora-v2-1-1_6-") - if config.get('EXTRACT_PATTERNS'): - current_patterns = ' '.join(config.get('EXTRACT_PATTERNS', [])) + if config.get("EXTRACT_PATTERNS"): + current_patterns = " ".join(config.get("EXTRACT_PATTERNS", [])) print(f"Current patterns: {current_patterns}") - extract_patterns = input("Extraction patterns (leave blank to keep current, enter '!' to clear): ").strip() - if extract_patterns == '!': - config['AUTO_EXTRACT'] = False - config['EXTRACT_PATTERNS'] = [] + extract_patterns = input( + "Extraction patterns (leave blank to keep current, enter '!' to clear): " + ).strip() + if extract_patterns == "!": + config["AUTO_EXTRACT"] = False + config["EXTRACT_PATTERNS"] = [] print("Extraction patterns cleared. No files will be extracted.") elif extract_patterns: - config['AUTO_EXTRACT'] = True - config['EXTRACT_PATTERNS'] = extract_patterns.split() + config["AUTO_EXTRACT"] = True + config["EXTRACT_PATTERNS"] = extract_patterns.split() else: # Keep existing patterns - config['AUTO_EXTRACT'] = True + config["AUTO_EXTRACT"] = True else: extract_patterns = input("Extraction patterns: ").strip() if extract_patterns: - config['AUTO_EXTRACT'] = True - config['EXTRACT_PATTERNS'] = extract_patterns.split() + config["AUTO_EXTRACT"] = True + config["EXTRACT_PATTERNS"] = extract_patterns.split() else: - config['AUTO_EXTRACT'] = False - print("No patterns selected, no files will be extracted. Run setup again if you wish to change this.") + config["AUTO_EXTRACT"] = False + print( + "No patterns selected, no files will be extracted. Run setup again if you wish to change this." + ) # Skip exclude patterns prompt - config['EXCLUDE_PATTERNS'] = [] + config["EXCLUDE_PATTERNS"] = [] # Prompt for exclude patterns if extraction is enabled - if config.get('AUTO_EXTRACT', False) and config.get('EXTRACT_PATTERNS'): + if config.get("AUTO_EXTRACT", False) and config.get("EXTRACT_PATTERNS"): exclude_prompt = "Would you like to exclude any patterns from extraction? [y/n] (default: no): " - exclude_choice = input(exclude_prompt).strip().lower() or 'n' - if exclude_choice == 'y': - print("Enter the keywords to exclude from extraction, separated by spaces.") + exclude_choice = input(exclude_prompt).strip().lower() or "n" + if exclude_choice == "y": + print( + "Enter the keywords to exclude from extraction, separated by spaces." + ) print("Example: .hex tcxo") - if config.get('EXCLUDE_PATTERNS'): - current_excludes = ' '.join(config.get('EXCLUDE_PATTERNS', [])) + if config.get("EXCLUDE_PATTERNS"): + current_excludes = " ".join(config.get("EXCLUDE_PATTERNS", [])) print(f"Current exclude patterns: {current_excludes}") - exclude_patterns = input("Exclude patterns (leave blank to keep current, enter '!' to clear): ").strip() - if exclude_patterns == '!': - config['EXCLUDE_PATTERNS'] = [] - print("Exclude patterns cleared. No files will be excluded.") + exclude_patterns = input( + "Exclude patterns (leave blank to keep current, enter '!' to clear): " + ).strip() + if exclude_patterns == "!": + config["EXCLUDE_PATTERNS"] = [] + print( + "Exclude patterns cleared. No files will be excluded." + ) elif exclude_patterns: - config['EXCLUDE_PATTERNS'] = exclude_patterns.split() + config["EXCLUDE_PATTERNS"] = exclude_patterns.split() else: # Keep existing patterns pass else: exclude_patterns = input("Exclude patterns: ").strip() if exclude_patterns: - config['EXCLUDE_PATTERNS'] = exclude_patterns.split() + config["EXCLUDE_PATTERNS"] = exclude_patterns.split() else: - config['EXCLUDE_PATTERNS'] = [] + config["EXCLUDE_PATTERNS"] = [] else: # User chose not to exclude patterns - config['EXCLUDE_PATTERNS'] = [] + config["EXCLUDE_PATTERNS"] = [] else: - config['EXCLUDE_PATTERNS'] = [] + config["EXCLUDE_PATTERNS"] = [] else: - config['AUTO_EXTRACT'] = False - config['EXTRACT_PATTERNS'] = [] - config['EXCLUDE_PATTERNS'] = [] + config["AUTO_EXTRACT"] = False + config["EXTRACT_PATTERNS"] = [] + config["EXCLUDE_PATTERNS"] = [] # Ask if the user wants to only download when connected to Wi-Fi - wifi_only_default = 'yes' if config.get('WIFI_ONLY', True) else 'no' - wifi_only = input(f"Do you want to only download when connected to Wi-Fi? [y/n] (default: {wifi_only_default}): ").strip().lower() or wifi_only_default[0] - config['WIFI_ONLY'] = True if wifi_only == 'y' else False + wifi_only_default = "yes" if config.get("WIFI_ONLY", True) else "no" + wifi_only = ( + input( + f"Do you want to only download when connected to Wi-Fi? [y/n] (default: {wifi_only_default}): " + ) + .strip() + .lower() + or wifi_only_default[0] + ) + config["WIFI_ONLY"] = True if wifi_only == "y" else False # Set the download directory to the same as the config directory download_dir = DEFAULT_CONFIG_DIR - config['DOWNLOAD_DIR'] = download_dir + config["DOWNLOAD_DIR"] = download_dir # Save configuration to YAML file before proceeding - with open(CONFIG_FILE, 'w') as f: + with open(CONFIG_FILE, "w") as f: yaml.dump(config, f) # Ask if the user wants to set up a cron job - cron_default = 'yes' # Default to 'yes' - setup_cron = input(f"Would you like to schedule Fetchtastic to run daily at 3 AM? [y/n] (default: {cron_default}): ").strip().lower() or cron_default[0] - if setup_cron == 'y': + cron_default = "yes" # Default to 'yes' + setup_cron = ( + input( + f"Would you like to schedule Fetchtastic to run daily at 3 AM? [y/n] (default: {cron_default}): " + ) + .strip() + .lower() + or cron_default[0] + ) + if setup_cron == "y": install_crond() setup_cron_job() else: @@ -216,158 +272,225 @@ def run_setup(): print("Cron job has been removed.") # Ask if the user wants to run Fetchtastic on boot - boot_default = 'yes' # Default to 'yes' - run_on_boot = input(f"Do you want Fetchtastic to run on device boot? [y/n] (default: {boot_default}): ").strip().lower() or boot_default[0] - if run_on_boot == 'y': + boot_default = "yes" # Default to 'yes' + run_on_boot = ( + input( + f"Do you want Fetchtastic to run on device boot? [y/n] (default: {boot_default}): " + ) + .strip() + .lower() + or boot_default[0] + ) + if run_on_boot == "y": setup_boot_script() else: remove_boot_script() print("Boot script has been removed.") # Prompt for NTFY server configuration - notifications_default = 'yes' # Default to 'yes' - notifications = input(f"Would you like to set up notifications via NTFY? [y/n] (default: {notifications_default}): ").strip().lower() or 'y' - if notifications == 'y': - ntfy_server = input(f"Enter the NTFY server (current: {config.get('NTFY_SERVER', 'ntfy.sh')}): ").strip() or config.get('NTFY_SERVER', 'ntfy.sh') - if not ntfy_server.startswith('http://') and not ntfy_server.startswith('https://'): - ntfy_server = 'https://' + ntfy_server - - current_topic = config.get('NTFY_TOPIC', 'fetchtastic-' + ''.join(random.choices(string.ascii_lowercase + string.digits, k=6))) - topic_name = input(f"Enter a unique topic name (current: {current_topic}): ").strip() or current_topic - - config['NTFY_TOPIC'] = topic_name - config['NTFY_SERVER'] = ntfy_server - - with open(CONFIG_FILE, 'w') as f: + notifications_default = "yes" # Default to 'yes' + notifications = ( + input( + f"Would you like to set up notifications via NTFY? [y/n] (default: {notifications_default}): " + ) + .strip() + .lower() + or "y" + ) + if notifications == "y": + ntfy_server = input( + f"Enter the NTFY server (current: {config.get('NTFY_SERVER', 'ntfy.sh')}): " + ).strip() or config.get("NTFY_SERVER", "ntfy.sh") + if not ntfy_server.startswith("http://") and not ntfy_server.startswith( + "https://" + ): + ntfy_server = "https://" + ntfy_server + + current_topic = config.get( + "NTFY_TOPIC", + "fetchtastic-" + + "".join(random.choices(string.ascii_lowercase + string.digits, k=6)), + ) + topic_name = ( + input(f"Enter a unique topic name (current: {current_topic}): ").strip() + or current_topic + ) + + config["NTFY_TOPIC"] = topic_name + config["NTFY_SERVER"] = ntfy_server + + with open(CONFIG_FILE, "w") as f: yaml.dump(config, f) full_topic_url = f"{ntfy_server.rstrip('/')}/{topic_name}" print(f"Notifications set up using topic: {topic_name}") - print(f"Subscribe by pasting the topic name in the ntfy app.") + print("Subscribe by pasting the topic name in the ntfy app.") print(f"Full topic URL: {full_topic_url}") - copy_to_clipboard = input("Do you want to copy the topic name to the clipboard? [y/n] (default: yes): ").strip().lower() or 'y' - if copy_to_clipboard == 'y': + copy_to_clipboard = ( + input( + "Do you want to copy the topic name to the clipboard? [y/n] (default: yes): " + ) + .strip() + .lower() + or "y" + ) + if copy_to_clipboard == "y": copy_to_clipboard_termux(topic_name) print("Topic name copied to clipboard.") else: print("You can copy the topic name from above.") else: - config['NTFY_TOPIC'] = '' - config['NTFY_SERVER'] = '' - with open(CONFIG_FILE, 'w') as f: + config["NTFY_TOPIC"] = "" + config["NTFY_SERVER"] = "" + with open(CONFIG_FILE, "w") as f: yaml.dump(config, f) print("Notifications have been disabled.") # Ask if the user wants to perform a first run - perform_first_run = input("Would you like to start the first run now? [y/n] (default: yes): ").strip().lower() or 'y' - if perform_first_run == 'y': + perform_first_run = ( + input("Would you like to start the first run now? [y/n] (default: yes): ") + .strip() + .lower() + or "y" + ) + if perform_first_run == "y": print("Starting first run, this may take a few minutes...") downloader.main() else: print("Setup complete. Run 'fetchtastic download' to start downloading.") + def copy_to_clipboard_termux(text): try: - subprocess.run(['termux-clipboard-set'], input=text.encode('utf-8'), check=True) + subprocess.run(["termux-clipboard-set"], input=text.encode("utf-8"), check=True) except Exception as e: print(f"An error occurred while copying to clipboard: {e}") + def install_termux_packages(): # Install termux-api, termux-services, and cronie if they are not installed packages_to_install = [] # Check for termux-api - if shutil.which('termux-battery-status') is None: - packages_to_install.append('termux-api') + if shutil.which("termux-battery-status") is None: + packages_to_install.append("termux-api") # Check for termux-services - if shutil.which('sv-enable') is None: - packages_to_install.append('termux-services') + if shutil.which("sv-enable") is None: + packages_to_install.append("termux-services") # Check for cronie - if shutil.which('crond') is None: - packages_to_install.append('cronie') + if shutil.which("crond") is None: + packages_to_install.append("cronie") if packages_to_install: print("Installing required Termux packages...") - subprocess.run(['pkg', 'install'] + packages_to_install + ['-y'], check=True) + subprocess.run(["pkg", "install"] + packages_to_install + ["-y"], check=True) print("Required Termux packages installed.") else: print("All required Termux packages are already installed.") + def setup_storage(): # Run termux-setup-storage print("Setting up Termux storage access...") try: - subprocess.run(['termux-setup-storage'], check=True) - except subprocess.CalledProcessError as e: + subprocess.run(["termux-setup-storage"], check=True) + except subprocess.CalledProcessError: print("An error occurred while setting up Termux storage.") print("Please grant storage permissions when prompted.") + def install_crond(): try: - crond_path = shutil.which('crond') + crond_path = shutil.which("crond") if crond_path is None: print("Installing cronie...") # Install cronie - subprocess.run(['pkg', 'install', 'cronie', '-y'], check=True) + subprocess.run(["pkg", "install", "cronie", "-y"], check=True) print("cronie installed.") else: print("cronie is already installed.") # Enable crond service - subprocess.run(['sv-enable', 'crond'], check=True) + subprocess.run(["sv-enable", "crond"], check=True) print("crond service enabled.") except Exception as e: print(f"An error occurred while installing or enabling crond: {e}") + def setup_cron_job(): try: # Get current crontab entries - result = subprocess.run(['crontab', '-l'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + result = subprocess.run( + ["crontab", "-l"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True + ) if result.returncode != 0: - existing_cron = '' + existing_cron = "" else: existing_cron = result.stdout # Remove existing fetchtastic cron jobs - new_cron = '\n'.join([line for line in existing_cron.split('\n') if 'fetchtastic download' not in line]) + new_cron = "\n".join( + [ + line + for line in existing_cron.split("\n") + if "fetchtastic download" not in line + ] + ) # Add new cron job - new_cron += f"\n0 3 * * * fetchtastic download\n" + new_cron += "\n0 3 * * * fetchtastic download\n" # Update crontab - process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE, text=True) + process = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, text=True) process.communicate(input=new_cron) print("Cron job added to run Fetchtastic daily at 3 AM.") except Exception as e: print(f"An error occurred while setting up the cron job: {e}") + def remove_cron_job(): try: # Get current crontab entries - result = subprocess.run(['crontab', '-l'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + result = subprocess.run( + ["crontab", "-l"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True + ) if result.returncode == 0: existing_cron = result.stdout # Remove existing fetchtastic cron jobs - new_cron = '\n'.join([line for line in existing_cron.split('\n') if 'fetchtastic download' not in line]) + new_cron = "\n".join( + [ + line + for line in existing_cron.split("\n") + if "fetchtastic download" not in line + ] + ) # Update crontab - process = subprocess.Popen(['crontab', '-'], stdin=subprocess.PIPE, text=True) + process = subprocess.Popen( + ["crontab", "-"], stdin=subprocess.PIPE, text=True + ) process.communicate(input=new_cron) print("Cron job removed.") except Exception as e: print(f"An error occurred while removing the cron job: {e}") + def setup_boot_script(): boot_dir = os.path.expanduser("~/.termux/boot") boot_script = os.path.join(boot_dir, "fetchtastic.sh") if not os.path.exists(boot_dir): os.makedirs(boot_dir) print("Created the Termux:Boot directory.") - print("Please install Termux:Boot from F-Droid and run it once to enable boot scripts.") + print( + "Please install Termux:Boot from F-Droid and run it once to enable boot scripts." + ) # Write the boot script - with open(boot_script, 'w') as f: + with open(boot_script, "w") as f: f.write("#!/data/data/com.termux/files/usr/bin/sh\n") f.write("sleep 30\n") f.write("fetchtastic download\n") os.chmod(boot_script, 0o700) print("Boot script created to run Fetchtastic on device boot.") - print("Note: The script may not run on boot until you have installed and run Termux:Boot at least once.") + print( + "Note: The script may not run on boot until you have installed and run Termux:Boot at least once." + ) + def remove_boot_script(): boot_script = os.path.expanduser("~/.termux/boot/fetchtastic.sh") @@ -375,9 +498,10 @@ def remove_boot_script(): os.remove(boot_script) print("Boot script removed.") + def load_config(): if not config_exists(): return None - with open(CONFIG_FILE, 'r') as f: + with open(CONFIG_FILE, "r") as f: config = yaml.safe_load(f) return config diff --git a/setup.cfg b/setup.cfg index c32746e..6035b28 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = fetchtastic -version = 0.1.10 +version = 0.1.11 author = Jeremiah K author_email = jeremiahk@gmx.com description = Meshtastic Firmware and APK Downloader