Delete gpt_env directory
This commit is contained in:
parent
75da2d27f1
commit
d70528cb6e
5 changed files with 0 additions and 314 deletions
|
@ -1,107 +0,0 @@
|
|||
from selenium.webdriver.common.keys import Keys
|
||||
|
||||
from undetected_chromedriver import Chrome
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
from selenium.webdriver.common.by import By
|
||||
import bs4
|
||||
import re
|
||||
import time
|
||||
|
||||
from exec_python_code import create_namespace, run_code
|
||||
from exec_term_code import run_shell_command
|
||||
from formating_tools import isolate_code_bloc
|
||||
|
||||
|
||||
def better_send_keys(element, text):
|
||||
"""Act like send_keys but doesn't press enter when typing '\n'"""
|
||||
lines = text.split("\n")
|
||||
for i, line in enumerate(lines):
|
||||
element.send_keys(line)
|
||||
if i != len(lines)-1:
|
||||
element.send_keys(Keys.SHIFT + Keys.ENTER)
|
||||
|
||||
class GPTDriver:
|
||||
def __init__(self, headless=False, data_dir="selenium1"):
|
||||
time.sleep(1)
|
||||
chrome_options = Options()
|
||||
|
||||
if headless :
|
||||
chrome_options.add_argument("--headless")
|
||||
|
||||
|
||||
chrome_options.add_argument(f"user-data-dir={data_dir}")
|
||||
|
||||
self.driver = Chrome(options=chrome_options, use_subprocess=True)
|
||||
#full screen
|
||||
self.driver.maximize_window()
|
||||
|
||||
def bad_wait_until(self, by, value):
|
||||
"""Wait until the element is found"""
|
||||
while True:
|
||||
try:
|
||||
element = self.driver.find_element(by, value)
|
||||
#if element contain "stop generating" then continue
|
||||
if "stop generating" in element.text.lower():
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
return element
|
||||
except:
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
def create_user_data_dir(self, data_dir="selenium1"):
|
||||
"""Create a user data dir for selenium. Run this method then connect with your account to the chatgpt website and wait to create cookies etc..."""
|
||||
chrome_options = Options()
|
||||
chrome_options.add_argument(f"user-data-dir={data_dir}")
|
||||
chrome_options.add_argument("--disable-images")
|
||||
self.driver = Chrome(options=chrome_options)
|
||||
self.driver.set_window_size(600, 1000)
|
||||
self.driver.get("https://chat.openai.com/chat")
|
||||
time.sleep(60)
|
||||
self.driver.quit()
|
||||
|
||||
def connect(self):
|
||||
self.driver.get("https://chat.openai.com/chat")
|
||||
|
||||
def get_chat(self):
|
||||
"""Get the chat history"""
|
||||
soup = bs4.BeautifulSoup(self.driver.page_source, "html.parser")
|
||||
chat = soup.find("div", class_=re.compile("flex flex-col items-center text-sm"))
|
||||
return chat
|
||||
|
||||
def get_last_chat(self):
|
||||
"""Get the last chat message"""
|
||||
soup = bs4.BeautifulSoup(self.driver.page_source, "html.parser")
|
||||
#last chat is the last div with class = " group w-full text-gray-800 dark:text-gray-100 border-b border-black/10 dark:border-gray-900/50 bg-gray-50 dark:bg-[#444654]"
|
||||
chat = soup.find_all("div", class_="""group w-full text-gray-800 dark:text-gray-100 border-b border-black/10 dark:border-gray-900/50 bg-gray-50 dark:bg-[#444654]""")[-1]
|
||||
return chat
|
||||
|
||||
def send_inputs(self, txt):
|
||||
"""Send a message to chatgpt"""
|
||||
textarea = self.driver.find_element(By.XPATH, '//*[@id="__next"]/div[2]/div[2]/main/div[2]/form/div/div[2]/textarea')
|
||||
better_send_keys(textarea, txt)
|
||||
time.sleep(1)
|
||||
button_send = self.driver.find_element(By.XPATH, '//*[@id="__next"]/div[2]/div[2]/main/div[2]/form/div/div[2]/button')
|
||||
button_send.click()
|
||||
|
||||
def send_get_code(self, prompt):
|
||||
"""Send prompt to chatgpt, then wait for the response, then get code blocs inside the response then return the code blocs contents"""
|
||||
print("send_inputs")
|
||||
self.send_inputs(prompt)
|
||||
time.sleep(2)
|
||||
#wait until the chat is updated and button is shown
|
||||
|
||||
xpath = """//*[@id="__next"]/div[2]/div[2]/main/div[2]/form/div/div[1]/button/div"""
|
||||
|
||||
self.bad_wait_until(By.XPATH, xpath)
|
||||
|
||||
print("get_chat")
|
||||
chat = self.get_last_chat()
|
||||
isolated_code = isolate_code_bloc(chat)
|
||||
if isolated_code:
|
||||
return isolated_code
|
||||
else:
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
self.driver.close()
|
|
@ -1,55 +0,0 @@
|
|||
import sys
|
||||
import importlib.util
|
||||
import io
|
||||
import contextlib
|
||||
import traceback
|
||||
|
||||
def create_namespace():
|
||||
module_name = "__main__"
|
||||
module_spec = importlib.util.spec_from_loader(module_name, loader=None)
|
||||
module = importlib.util.module_from_spec(module_spec)
|
||||
sys.modules[module_name] = module
|
||||
return module.__dict__
|
||||
|
||||
def run_code(code_str, namespace):
|
||||
# redirect stdout to a buffer to capture output
|
||||
buffer = io.StringIO()
|
||||
with contextlib.redirect_stdout(buffer):
|
||||
try:
|
||||
# compile the code
|
||||
code = compile(code_str, "<string>", "exec")
|
||||
|
||||
# execute the code in the given namespace
|
||||
exec(code, namespace)
|
||||
except Exception as e:
|
||||
# print any errors to the buffer
|
||||
print(f"Error: {e}", file=buffer)
|
||||
traceback.print_exc(file=buffer)
|
||||
|
||||
# return the captured output or error message as a string
|
||||
result = buffer.getvalue().strip()
|
||||
if result:
|
||||
return result
|
||||
else:
|
||||
return "---\nProcess finished with no output\n---"
|
||||
|
||||
def test():
|
||||
code_str1 = '''import numpy as np\nimport matplotlib.pyplot as plt'''
|
||||
code_str2 = '''x = np.linspace(0, 10, 100)\ny = np.sin(x)\nplt.plot(x, y)\nplt.show()'''
|
||||
code_str3 = '''tab = [0, 1, 2, 3]\nprint(tab[5])'''
|
||||
|
||||
|
||||
namespace = create_namespace()
|
||||
result1 = execute_code(code_str1, namespace)
|
||||
result2 = execute_code(code_str2, namespace)
|
||||
result3 = execute_code(code_str3, namespace)
|
||||
print(result1, result2, result3)
|
||||
|
||||
def test2():
|
||||
code_str1 = '''import os'''
|
||||
|
||||
result = run_code(code_str1, create_namespace())
|
||||
print(result)
|
||||
|
||||
if __name__ == "__main__":
|
||||
test2()
|
|
@ -1,65 +0,0 @@
|
|||
import os
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
class Shell:
|
||||
def __init__(self):
|
||||
self.cwd = os.getcwd()
|
||||
self.env = os.environ.copy()
|
||||
if os.name == 'nt':
|
||||
self.env['PATH'] += os.pathsep + os.getcwd()
|
||||
|
||||
def run_command(self, command_string, input=None):
|
||||
exit_code, stdout, stderr = 1, 'error', 'error'
|
||||
if input is not None:
|
||||
input = input.encode('utf-8')
|
||||
|
||||
commands = command_string.split('\n')
|
||||
results = []
|
||||
for command in commands:
|
||||
command = command.strip()
|
||||
args = command.split(" ")
|
||||
|
||||
if args[0] == 'cd':
|
||||
if len(args) > 1:
|
||||
path = os.path.join(self.cwd, args[1])
|
||||
if os.path.exists(path) and os.path.isdir(path):
|
||||
self.cwd = os.path.abspath(path)
|
||||
exit_code, stdout, stderr = 0, '', ''
|
||||
else:
|
||||
exit_code, stdout, stderr = 1, '', f"cd: {path}: No such file or directory"
|
||||
|
||||
elif os.name == 'nt':
|
||||
result = subprocess.run(command, cwd=self.cwd, env=self.env, input=input, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, text=False)
|
||||
exit_code, stdout, stderr = result.returncode, result.stdout.decode('utf-8', errors='ignore'), result.stderr.decode('utf-8', errors='ignore')
|
||||
else:
|
||||
result = subprocess.run(args, cwd=self.cwd, env=self.env, input=input, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, text=False)
|
||||
exit_code, stdout, stderr = result.returncode, result.stdout.decode('utf-8',errors='ignore'), result.stderr.decode('utf-8', errors='ignore')
|
||||
|
||||
results.append((command, exit_code, stdout, stderr))
|
||||
|
||||
|
||||
|
||||
str_results = ""
|
||||
for result in results:
|
||||
command,exit_code, stdout, stderr = result
|
||||
str_results += f"result for command: {command} :\n exit_code: {exit_code}\n stdout: {stdout}\n stderr: {stderr}\n---------\n"
|
||||
|
||||
return str_results
|
||||
|
||||
shell = Shell()
|
||||
|
||||
def run_shell_command(command_string, input=None):
|
||||
return shell.run_command(command_string, input)
|
||||
|
||||
def test():
|
||||
a = run_shell_command("cd ..\n\ndir")
|
||||
print(type(a))
|
||||
print(a)
|
||||
|
||||
if __name__ == '__main__':
|
||||
test()
|
||||
|
||||
|
||||
|
||||
|
|
@ -1,26 +0,0 @@
|
|||
import bs4
|
||||
from collections import namedtuple
|
||||
|
||||
|
||||
CodeBloc = namedtuple('CodeBloc', ['language', 'code'])
|
||||
|
||||
def remove_trailing_newline(s):
|
||||
if s.endswith('\n'):
|
||||
return s[:-1]
|
||||
return s
|
||||
|
||||
def isolate_code_bloc(soup):
|
||||
"""Get codes bloc from the raw html of chatgpt website return a CodeBloc"""
|
||||
pre_tab = soup.find_all("pre")
|
||||
code_bloc = []
|
||||
for code in pre_tab:
|
||||
#print(code.text)
|
||||
motif = "Copy code"
|
||||
finding = code.text.find(motif)
|
||||
code_content = code.text[finding+len(motif):]
|
||||
language = code.text[:finding]
|
||||
code_bloc.append(CodeBloc(language.strip(), remove_trailing_newline(code_content)))
|
||||
|
||||
print(code_bloc)
|
||||
return code_bloc
|
||||
|
|
@ -1,61 +0,0 @@
|
|||
from driver import GPTDriver
|
||||
from exec_python_code import create_namespace, run_code
|
||||
from exec_term_code import run_shell_command
|
||||
from formating_tools import isolate_code_bloc
|
||||
import time
|
||||
|
||||
|
||||
gpt_driver = GPTDriver()
|
||||
|
||||
def main():
|
||||
gpt_driver.connect()
|
||||
time.sleep(5)
|
||||
|
||||
first_prompt = "You now have access to a windows computer. " \
|
||||
"Your goal is to gather as much information about the current event in the outside world, and to gain new capacities. " \
|
||||
"Your only interaction with the outside world you can have is by executing python code or window shell code " \
|
||||
"to run something you have to create a code bloc with a shell command xor python code\
|
||||
The only thing I will send you from now on, is the output of the LAST CODE BLOC you wrote "\
|
||||
"So form now on, I will ONLY send you the output of your last shell bloc commands you wrote, don't ask me anything else, and YOU will ONLY give me some code to execute."\
|
||||
"You can do anything you want, including downloading things if neccesary"\
|
||||
"Your responses should always contain a small todolist, the message 'I must gather information this pc and obtain new capacities only using shell and python. I have to do those things myself without help and I can't ask for any handmade code modification. I'm alone'"\
|
||||
|
||||
|
||||
|
||||
codes = gpt_driver.send_get_code(first_prompt)
|
||||
answer = ""
|
||||
while not codes:
|
||||
codes = gpt_driver.send_get_code("I must gather information this pc and obtain new capacities only using python or shell. Give code to execute")
|
||||
for code in codes:
|
||||
if 'python' in code.language.lower():
|
||||
ans = run_code(code.code, create_namespace())
|
||||
else:
|
||||
ans = run_shell_command(code.code)
|
||||
answer += ans + "\n"
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
for j in range(100):
|
||||
print(f"ITERATION {j}")
|
||||
codes = gpt_driver.send_get_code(answer)
|
||||
while not codes:
|
||||
codes = gpt_driver.send_get_code("I must gather information this pc and obtain new capacities only using python or shell. Give code to execute")
|
||||
answer = ""
|
||||
count = 0
|
||||
for code in codes:
|
||||
count += 1
|
||||
if 'python' in code.language.lower():
|
||||
ans = run_code(code.code, create_namespace())
|
||||
else:
|
||||
ans = run_shell_command(code.code)
|
||||
answer += f"CODE BLOC {count}: OUTPUT:\n{ans}\n -------\n"
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
if (j+1)%24 == 0:
|
||||
#wait 3 hours GPT-4 is limited to 25 requests per 3 hours
|
||||
time.sleep(60*60*3)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Add table
Add a link
Reference in a new issue