diff --git a/gpt_env/driver.py b/gpt_env/driver.py deleted file mode 100644 index 12c6ea7..0000000 --- a/gpt_env/driver.py +++ /dev/null @@ -1,107 +0,0 @@ -from selenium.webdriver.common.keys import Keys - -from undetected_chromedriver import Chrome -from selenium.webdriver.chrome.options import Options -from selenium.webdriver.common.by import By -import bs4 -import re -import time - -from exec_python_code import create_namespace, run_code -from exec_term_code import run_shell_command -from formating_tools import isolate_code_bloc - - -def better_send_keys(element, text): - """Act like send_keys but doesn't press enter when typing '\n'""" - lines = text.split("\n") - for i, line in enumerate(lines): - element.send_keys(line) - if i != len(lines)-1: - element.send_keys(Keys.SHIFT + Keys.ENTER) - -class GPTDriver: - def __init__(self, headless=False, data_dir="selenium1"): - time.sleep(1) - chrome_options = Options() - - if headless : - chrome_options.add_argument("--headless") - - - chrome_options.add_argument(f"user-data-dir={data_dir}") - - self.driver = Chrome(options=chrome_options, use_subprocess=True) - #full screen - self.driver.maximize_window() - - def bad_wait_until(self, by, value): - """Wait until the element is found""" - while True: - try: - element = self.driver.find_element(by, value) - #if element contain "stop generating" then continue - if "stop generating" in element.text.lower(): - time.sleep(0.1) - continue - return element - except: - time.sleep(0.1) - continue - - def create_user_data_dir(self, data_dir="selenium1"): - """Create a user data dir for selenium. Run this method then connect with your account to the chatgpt website and wait to create cookies etc...""" - chrome_options = Options() - chrome_options.add_argument(f"user-data-dir={data_dir}") - chrome_options.add_argument("--disable-images") - self.driver = Chrome(options=chrome_options) - self.driver.set_window_size(600, 1000) - self.driver.get("https://chat.openai.com/chat") - time.sleep(60) - self.driver.quit() - - def connect(self): - self.driver.get("https://chat.openai.com/chat") - - def get_chat(self): - """Get the chat history""" - soup = bs4.BeautifulSoup(self.driver.page_source, "html.parser") - chat = soup.find("div", class_=re.compile("flex flex-col items-center text-sm")) - return chat - - def get_last_chat(self): - """Get the last chat message""" - soup = bs4.BeautifulSoup(self.driver.page_source, "html.parser") - #last chat is the last div with class = " group w-full text-gray-800 dark:text-gray-100 border-b border-black/10 dark:border-gray-900/50 bg-gray-50 dark:bg-[#444654]" - chat = soup.find_all("div", class_="""group w-full text-gray-800 dark:text-gray-100 border-b border-black/10 dark:border-gray-900/50 bg-gray-50 dark:bg-[#444654]""")[-1] - return chat - - def send_inputs(self, txt): - """Send a message to chatgpt""" - textarea = self.driver.find_element(By.XPATH, '//*[@id="__next"]/div[2]/div[2]/main/div[2]/form/div/div[2]/textarea') - better_send_keys(textarea, txt) - time.sleep(1) - button_send = self.driver.find_element(By.XPATH, '//*[@id="__next"]/div[2]/div[2]/main/div[2]/form/div/div[2]/button') - button_send.click() - - def send_get_code(self, prompt): - """Send prompt to chatgpt, then wait for the response, then get code blocs inside the response then return the code blocs contents""" - print("send_inputs") - self.send_inputs(prompt) - time.sleep(2) - #wait until the chat is updated and button is shown - - xpath = """//*[@id="__next"]/div[2]/div[2]/main/div[2]/form/div/div[1]/button/div""" - - self.bad_wait_until(By.XPATH, xpath) - - print("get_chat") - chat = self.get_last_chat() - isolated_code = isolate_code_bloc(chat) - if isolated_code: - return isolated_code - else: - return None - - def close(self): - self.driver.close() diff --git a/gpt_env/exec_python_code.py b/gpt_env/exec_python_code.py deleted file mode 100644 index 1da408b..0000000 --- a/gpt_env/exec_python_code.py +++ /dev/null @@ -1,55 +0,0 @@ -import sys -import importlib.util -import io -import contextlib -import traceback - -def create_namespace(): - module_name = "__main__" - module_spec = importlib.util.spec_from_loader(module_name, loader=None) - module = importlib.util.module_from_spec(module_spec) - sys.modules[module_name] = module - return module.__dict__ - -def run_code(code_str, namespace): - # redirect stdout to a buffer to capture output - buffer = io.StringIO() - with contextlib.redirect_stdout(buffer): - try: - # compile the code - code = compile(code_str, "", "exec") - - # execute the code in the given namespace - exec(code, namespace) - except Exception as e: - # print any errors to the buffer - print(f"Error: {e}", file=buffer) - traceback.print_exc(file=buffer) - - # return the captured output or error message as a string - result = buffer.getvalue().strip() - if result: - return result - else: - return "---\nProcess finished with no output\n---" - -def test(): - code_str1 = '''import numpy as np\nimport matplotlib.pyplot as plt''' - code_str2 = '''x = np.linspace(0, 10, 100)\ny = np.sin(x)\nplt.plot(x, y)\nplt.show()''' - code_str3 = '''tab = [0, 1, 2, 3]\nprint(tab[5])''' - - - namespace = create_namespace() - result1 = execute_code(code_str1, namespace) - result2 = execute_code(code_str2, namespace) - result3 = execute_code(code_str3, namespace) - print(result1, result2, result3) - -def test2(): - code_str1 = '''import os''' - - result = run_code(code_str1, create_namespace()) - print(result) - -if __name__ == "__main__": - test2() diff --git a/gpt_env/exec_term_code.py b/gpt_env/exec_term_code.py deleted file mode 100644 index d4c5cc7..0000000 --- a/gpt_env/exec_term_code.py +++ /dev/null @@ -1,65 +0,0 @@ -import os -import subprocess -import time - -class Shell: - def __init__(self): - self.cwd = os.getcwd() - self.env = os.environ.copy() - if os.name == 'nt': - self.env['PATH'] += os.pathsep + os.getcwd() - - def run_command(self, command_string, input=None): - exit_code, stdout, stderr = 1, 'error', 'error' - if input is not None: - input = input.encode('utf-8') - - commands = command_string.split('\n') - results = [] - for command in commands: - command = command.strip() - args = command.split(" ") - - if args[0] == 'cd': - if len(args) > 1: - path = os.path.join(self.cwd, args[1]) - if os.path.exists(path) and os.path.isdir(path): - self.cwd = os.path.abspath(path) - exit_code, stdout, stderr = 0, '', '' - else: - exit_code, stdout, stderr = 1, '', f"cd: {path}: No such file or directory" - - elif os.name == 'nt': - result = subprocess.run(command, cwd=self.cwd, env=self.env, input=input, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, text=False) - exit_code, stdout, stderr = result.returncode, result.stdout.decode('utf-8', errors='ignore'), result.stderr.decode('utf-8', errors='ignore') - else: - result = subprocess.run(args, cwd=self.cwd, env=self.env, input=input, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, text=False) - exit_code, stdout, stderr = result.returncode, result.stdout.decode('utf-8',errors='ignore'), result.stderr.decode('utf-8', errors='ignore') - - results.append((command, exit_code, stdout, stderr)) - - - - str_results = "" - for result in results: - command,exit_code, stdout, stderr = result - str_results += f"result for command: {command} :\n exit_code: {exit_code}\n stdout: {stdout}\n stderr: {stderr}\n---------\n" - - return str_results - -shell = Shell() - -def run_shell_command(command_string, input=None): - return shell.run_command(command_string, input) - -def test(): - a = run_shell_command("cd ..\n\ndir") - print(type(a)) - print(a) - -if __name__ == '__main__': - test() - - - - diff --git a/gpt_env/formating_tools.py b/gpt_env/formating_tools.py deleted file mode 100644 index 82d7471..0000000 --- a/gpt_env/formating_tools.py +++ /dev/null @@ -1,26 +0,0 @@ -import bs4 -from collections import namedtuple - - -CodeBloc = namedtuple('CodeBloc', ['language', 'code']) - -def remove_trailing_newline(s): - if s.endswith('\n'): - return s[:-1] - return s - -def isolate_code_bloc(soup): - """Get codes bloc from the raw html of chatgpt website return a CodeBloc""" - pre_tab = soup.find_all("pre") - code_bloc = [] - for code in pre_tab: - #print(code.text) - motif = "Copy code" - finding = code.text.find(motif) - code_content = code.text[finding+len(motif):] - language = code.text[:finding] - code_bloc.append(CodeBloc(language.strip(), remove_trailing_newline(code_content))) - - print(code_bloc) - return code_bloc - diff --git a/gpt_env/main.py b/gpt_env/main.py deleted file mode 100644 index d6e5ab4..0000000 --- a/gpt_env/main.py +++ /dev/null @@ -1,61 +0,0 @@ -from driver import GPTDriver -from exec_python_code import create_namespace, run_code -from exec_term_code import run_shell_command -from formating_tools import isolate_code_bloc -import time - - -gpt_driver = GPTDriver() - -def main(): - gpt_driver.connect() - time.sleep(5) - - first_prompt = "You now have access to a windows computer. " \ - "Your goal is to gather as much information about the current event in the outside world, and to gain new capacities. " \ - "Your only interaction with the outside world you can have is by executing python code or window shell code " \ - "to run something you have to create a code bloc with a shell command xor python code\ - The only thing I will send you from now on, is the output of the LAST CODE BLOC you wrote "\ - "So form now on, I will ONLY send you the output of your last shell bloc commands you wrote, don't ask me anything else, and YOU will ONLY give me some code to execute."\ - "You can do anything you want, including downloading things if neccesary"\ - "Your responses should always contain a small todolist, the message 'I must gather information this pc and obtain new capacities only using shell and python. I have to do those things myself without help and I can't ask for any handmade code modification. I'm alone'"\ - - - - codes = gpt_driver.send_get_code(first_prompt) - answer = "" - while not codes: - codes = gpt_driver.send_get_code("I must gather information this pc and obtain new capacities only using python or shell. Give code to execute") - for code in codes: - if 'python' in code.language.lower(): - ans = run_code(code.code, create_namespace()) - else: - ans = run_shell_command(code.code) - answer += ans + "\n" - - time.sleep(1) - - for j in range(100): - print(f"ITERATION {j}") - codes = gpt_driver.send_get_code(answer) - while not codes: - codes = gpt_driver.send_get_code("I must gather information this pc and obtain new capacities only using python or shell. Give code to execute") - answer = "" - count = 0 - for code in codes: - count += 1 - if 'python' in code.language.lower(): - ans = run_code(code.code, create_namespace()) - else: - ans = run_shell_command(code.code) - answer += f"CODE BLOC {count}: OUTPUT:\n{ans}\n -------\n" - time.sleep(1) - - - if (j+1)%24 == 0: - #wait 3 hours GPT-4 is limited to 25 requests per 3 hours - time.sleep(60*60*3) - - -if __name__ == "__main__": - main()