From afe8f83f4a1a3014b68dbaf1e4b1f413a7f4f625 Mon Sep 17 00:00:00 2001 From: Antonio Date: Wed, 24 May 2023 18:01:09 +0800 Subject: [PATCH] format --- .vscode/launch.json | 2 +- Makefile | 2 +- src/BingImageCreator.py | 45 +++++++++++++++++++++++++++++------------ test/test_example.py | 10 ++++++--- 4 files changed, 41 insertions(+), 18 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 4c64967..c64cfb1 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -12,4 +12,4 @@ "justMyCode": true } ] -} \ No newline at end of file +} diff --git a/Makefile b/Makefile index 57c5b5f..dc5d69a 100644 --- a/Makefile +++ b/Makefile @@ -8,4 +8,4 @@ build: ci: python -m flake8 src --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics python -m flake8 src --count --select=E9,F63,F7,F82 --show-source --statistics - python setup.py install \ No newline at end of file + python setup.py install diff --git a/src/BingImageCreator.py b/src/BingImageCreator.py index fe4a532..553058a 100644 --- a/src/BingImageCreator.py +++ b/src/BingImageCreator.py @@ -1,17 +1,20 @@ import argparse import asyncio -from functools import partial import contextlib import json import os import random import sys import time +from functools import partial +from typing import Dict +from typing import List +from typing import Union + import aiohttp import pkg_resources import regex import requests -from typing import Union, List, Dict if os.environ.get("BING_URL") == None: BING_URL = "https://www.bing.com" @@ -94,7 +97,10 @@ def get_images(self, prompt: str) -> list: # https://www.bing.com/images/create?q=&rt=3&FORM=GENCRE url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE" response = self.session.post( - url, allow_redirects=False, data=payload, timeout=200 + url, + allow_redirects=False, + data=payload, + timeout=200, ) # check for content waring message if "this prompt has been blocked" in response.text.lower(): @@ -184,14 +190,15 @@ def save_images(self, links: list, output_dir: str, file_name: str = None) -> No jpeg_index = 0 for link in links: while os.path.exists( - os.path.join(output_dir, f"{fn}{jpeg_index}.jpeg") + os.path.join(output_dir, f"{fn}{jpeg_index}.jpeg"), ): jpeg_index += 1 with self.session.get(link, stream=True) as response: # save response to file response.raise_for_status() with open( - os.path.join(output_dir, f"{fn}{jpeg_index}.jpeg"), "wb" + os.path.join(output_dir, f"{fn}{jpeg_index}.jpeg"), + "wb", ) as output_file: for chunk in response.iter_content(chunk_size=8192): output_file.write(chunk) @@ -225,7 +232,7 @@ def __init__( if all_cookies: for cookie in all_cookies: self.session.cookie_jar.update_cookies( - {cookie["name"]: cookie["value"]} + {cookie["name"]: cookie["value"]}, ) if auth_cookie: self.session.cookie_jar.update_cookies({"_U": auth_cookie}) @@ -253,7 +260,9 @@ async def get_images(self, prompt: str) -> list: url = f"{BING_URL}/images/create?q={url_encoded_prompt}&rt=4&FORM=GENCRE" payload = f"q={url_encoded_prompt}&qs=ds" async with self.session.post( - url, allow_redirects=False, data=payload + url, + allow_redirects=False, + data=payload, ) as response: content = await response.text() if "this prompt has been blocked" in content.lower(): @@ -317,7 +326,10 @@ async def get_images(self, prompt: str) -> list: return normal_image_links async def save_images( - self, links: list, output_dir: str, file_name: str = None + self, + links: list, + output_dir: str, + file_name: str = None, ) -> None: """ Saves images to output directory @@ -333,13 +345,14 @@ async def save_images( jpeg_index = 0 for link in links: while os.path.exists( - os.path.join(output_dir, f"{fn}{jpeg_index}.jpeg") + os.path.join(output_dir, f"{fn}{jpeg_index}.jpeg"), ): jpeg_index += 1 async with self.session.get(link, raise_for_status=True) as response: # save response to file with open( - os.path.join(output_dir, f"{fn}{jpeg_index}.jpeg"), "wb" + os.path.join(output_dir, f"{fn}{jpeg_index}.jpeg"), + "wb", ) as output_file: async for chunk in response.content.iter_chunked(8192): output_file.write(chunk) @@ -358,7 +371,10 @@ async def async_image_gen( all_cookies=None, ): async with ImageGenAsync( - u_cookie, debug_file=debug_file, quiet=quiet, all_cookies=all_cookies + u_cookie, + debug_file=debug_file, + quiet=quiet, + all_cookies=all_cookies, ) as image_generator: images = await image_generator.get_images(prompt) await image_generator.save_images(images, output_dir=output_dir) @@ -423,7 +439,10 @@ def main(): if not args.asyncio: # Create image generator image_generator = ImageGen( - args.U, args.debug_file, args.quiet, all_cookies=cookie_json + args.U, + args.debug_file, + args.quiet, + all_cookies=cookie_json, ) image_generator.save_images( image_generator.get_images(args.prompt), @@ -438,7 +457,7 @@ def main(): args.debug_file, args.quiet, all_cookies=cookie_json, - ) + ), ) diff --git a/test/test_example.py b/test/test_example.py index cb7e303..3758444 100644 --- a/test/test_example.py +++ b/test/test_example.py @@ -1,13 +1,17 @@ -import os,shutil +import os +import shutil + from src.BingImageCreator import ImageGen + + def test_save_images(): # create a temporary output directory for testing purposes test_output_dir = "test_output" os.mkdir(test_output_dir) # download a test image test_image_url = "https://picsum.photos/200" - gen = ImageGen(auth_cookie='') - gen.save_images([test_image_url], test_output_dir, ) + gen = ImageGen(auth_cookie="") + gen.save_images([test_image_url], test_output_dir) gen.save_images([test_image_url], test_output_dir, file_name="test_image") # check if the image was downloaded and saved correctly assert os.path.exists(os.path.join(test_output_dir, "test_image_0.jpeg"))