diff --git a/main.py b/main.py index 3190b4a..bec5623 100644 --- a/main.py +++ b/main.py @@ -26,63 +26,6 @@ def broadcast_message(message:str,chat_id:str): ) -def start(): - global startime - startime = time.time() - url = "https://www.bea.gov/news/2024/personal-income-and-outlays-december-2023" - text = find_indicate(url) - return text -def find_indicate_bs4(url): - from bs4 import BeautifulSoup - import requests - response = requests.get(url) - soup = BeautifulSoup(response.text, 'html.parser') - - # 使用CSS選擇器找到元素 - element = soup.select("tr",class_ ='item-fact-row' ) - print(soup) - return element[1].text.split('+')[0]+" "+element[1].text.split('+')[1] -def find_indicate(url): - from selenium import webdriver - from selenium.webdriver.common.by import By - from selenium.webdriver.chrome.options import Options - from selenium.webdriver.support.ui import WebDriverWait - from selenium.webdriver.support import expected_conditions as EC - import time - options = Options() - # options.add_argument("--headless") # 啟用無頭模式 - driver =webdriver.Chrome(options=options) - driver.get(url) - # time.sleep(3) - date = driver.find_element(By.XPATH, '//*[@id="test"]/div[2]/article/div/div/div[1]/div[2]/div/div[1]/div/table/tbody/tr[1]/td[1]') - value = driver.find_element(By.XPATH, '//*[@id="test"]/div[2]/article/div/div/div[1]/div[2]/div/div[1]/div/table/tbody/tr[1]/td[2]') - return date.text , value.text -def find_Fomc_bs4(date:str): - from bs4 import BeautifulSoup - import requests - response = requests.get(fomc_url+date+".htm") - soup = BeautifulSoup(response.text, 'html.parser') - - # 使用CSS選擇器找到元素 - time_elements = soup.select('.article__time') - value_elements = soup.find_all('div',class_="col-xs-12 col-sm-8 col-md-8" ) - - if time_elements: - time_text = time_elements[0].text - time = time_text.split(',')[0].split(' ')[0] - else: - time = None - - if value_elements: - split_elements = [item.text.split('.') for item in value_elements] - result = [] - for sublist in split_elements: - result.extend(sublist) - value = [item for item in result if 'In support of' in item] - else: - value = None - - return time, value[0].strip('\n') def find_cpi(url): from selenium import webdriver from selenium.webdriver.common.by import By @@ -111,18 +54,22 @@ def find_cpi(url): def find_fomc(url,date:str): from selenium import webdriver from selenium.webdriver.common.by import By - from selenium.webdriver.firefox.options import Options + from selenium.webdriver.chrome.options import Options print("Start") options = Options() - options.add_argument("--headless") # 啟用無頭模式 - driver = webdriver.Firefox(options = options) + options.add_argument('--headless') + options.add_argument('--no-sandbox') + options.add_argument('--disable-dev-shm-usage') + options.add_argument("--window-size=1920,1080") # 可以根據需要調整這個大小 + options.add_argument( + 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3') + driver = webdriver.Chrome( options=options) driver.get(url+date+".htm") + print("Open") # time.sleep(3) date = driver.find_element(By.CLASS_NAME, 'article__time').text.split(',')[0].split(' ')[0] value = driver.find_element(By.XPATH, '//*[@id="article"]/div[3]').text.split('.') value = [item for item in value if 'In support of' in item] - - driver.quit() return date , value[0].strip('\n') def find_pce(date:str): @@ -131,55 +78,45 @@ def find_pce(date:str): from selenium.webdriver.chrome.options import Options print("Start") options = Options() - options.add_argument("--headless") # 啟用無頭模式 - driver = webdriver.Chrome() + options.add_argument('--headless') + options.add_argument('--no-sandbox') + options.add_argument('--disable-dev-shm-usage') + options.add_argument("--window-size=1920,1080") # 可以根據需要調整這個大小 + options.add_argument( + 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3') + driver = webdriver.Chrome( options=options) driver.get(pce_url+date) - # time.sleep(3) + print("Open") date = driver.find_element(By.XPATH, '//*[@id="home"]/h1').text.split(' ')[4] value_1 = driver.find_element(By.XPATH, '//*[@id="home"]/div[2]/div/div/p[2]').text value_2 = driver.find_element(By.XPATH, '//*[@id="home"]/div[2]/div/div/div[1]/table/tbody/tr[13]').text.strip(' ').split(' ')[-1] driver.quit() return date , value_1 , value_2 -def find_pce_bs4(date:str): - from bs4 import BeautifulSoup - import requests - response = requests.get(pce_url+date) - soup = BeautifulSoup(response.text, 'html.parser') - print(soup) - # 使用CSS選擇器找到元素 - time_elements = soup.select('.row release-embargo') - value_elements = soup.find_all('div',class_="col-md-12 release-body" ) - value2_elements = soup.find_all('td',class_="text-left" ) - print(value_elements) - if time_elements: - time_text = time_elements[0].text - print(time_text) - time = time_text.split(' ')[4] - print(time) - else: - time = None - - if value_elements: - split_elements = [item.text.split('.') for item in value_elements] - print(value_elements) - value1 = [] - for sublist in split_elements: - value1.extend(sublist) - print(value1) - - else: - value1 = None - if value2_elements: - split_elements = [item.text.split('.') for item in value2_elements] - value2 = [] - for sublist in split_elements: - value2.extend(sublist) - print(value2) - - else: - value2 = None - return time, value1 , value2 #非農就業人數 +def find_non_farm(url): + from selenium import webdriver + from selenium.webdriver.common.by import By + from selenium.webdriver.chrome.options import Options + + from selenium.webdriver.support.ui import WebDriverWait + from selenium.webdriver.support import expected_conditions as EC + import time + print("Start") + options = Options() + options.add_argument('--headless') + options.add_argument('--no-sandbox') + options.add_argument('--disable-dev-shm-usage') + options.add_argument("--window-size=1920,1080") # 可以根據需要調整這個大小 + options.add_argument( + 'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3') + driver = webdriver.Chrome( options=options) + driver.get(url) + print("Open") + # time.sleep(4) + date = driver.find_element(By.XPATH, '/html/body/div[2]/div[5]/div/div[1]/pre').text.split('\n')[6] + value = driver.find_element(By.XPATH, '/html/body/div[2]/div[5]/div/div[1]/pre').text.split('\n')[8:12] + # driver.quit() + return date , value def read_pdf_nonfarm(month, year): pdf = pdfplumber.open(f"empsit_{month}_{year}.pdf") page = pdf.pages[0] @@ -187,64 +124,22 @@ def read_pdf_nonfarm(month, year): text = (text[7]+text[8]).split(',') text = text[0]+text[1]+text[2] return text -def download_pdf_nonfarm(): - url = "https://www.bls.gov/news.release/pdf/empsit.pdf" - response = requests.get(url) - -def read_nonfarm(url): - from bs4 import BeautifulSoup - import requests - startimee = time.time() - response = requests.get(url) - soup = BeautifulSoup(response.text, 'html.parser') - - # 使用CSS選擇器找到元素 - element = soup.select("div", class_='normalnews') - print(element[0].text) - endtimee = time.time() - print("Time: ", endtimee-startimee) - - return element[0].text - # print(text.split('\n')[7:9]) -def read_CPI(url): - - startimee = time.time() - response = requests.get(url) - soup = BeautifulSoup(response.text, 'html.parser') - # 使用CSS選擇器找到元素 - element = soup.select("div", class_='normalnews') - print(element[0].text) - endtimee = time.time() - print("Time: ", endtimee - startimee) - - return element[0].text - # print(text.split('\n')[7:9]) -def read_PCE(): - message = find_indicate_bs4("https://www.bea.gov/data/income-saving/personal-income") - - return message -def broadcast_all(target:str): +def broadcast_all_non_farm(target:str): startimee = time.time() - message = read_PCE() - while message.split(' ')[0] != target: - message = read_PCE() + date , message = find_non_farm(nonfarm_url) + while date != target: + date, message = find_non_farm(nonfarm_url) + message= "\n".join(message) broadcast_message(message, "-1002033782195") endtimee = time.time() broadcast_message( f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s", "-1002033782195") -def broadcast_all_sele(target:str): - startimee = time.time() - date , message = find_indicate("https://www.bea.gov/data/personal-consumption-expenditures-price-index") - while date.split(' ')[0] != target: - date ,message = find_indicate("https://www.bea.gov/data/personal-consumption-expenditures-price-index") - broadcast_message(date+message, "-1002033782195") - endtimee = time.time() - broadcast_message( - f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s", - "-1002033782195") + broadcast_message(f"The above is the Non Farm for {target}","-1002033782195") + return True + def broadcast_all_cpi(target:str): startimee = time.time() date , message = find_cpi(cpi_url) @@ -256,6 +151,7 @@ def broadcast_all_cpi(target:str): broadcast_message( f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s", "-1002033782195") + broadcast_message(f"The above is the CPI for {target}","-1002033782195") return True def broadcast_all_fomc(target:str): startimee = time.time() @@ -267,39 +163,20 @@ def broadcast_all_fomc(target:str): broadcast_message( f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s", "-1002033782195") -def broadcast_all_bs4_fomc(target:str): - startimee = time.time() - date , message = find_Fomc_bs4('20240131a') - while date != target: - date, message = find_Fomc_bs4('20240131a') - broadcast_message(message, "-1002033782195") - endtimee = time.time() - broadcast_message( - f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s", - "-1002033782195") + broadcast_message(f"The above is the FOMC abstract for {target}","-1002033782195") def broadcast_all_pce(target:str,date_target:str): startimee = time.time() date, message1, message2 = find_pce(date_target) while date != target: date, message1, message2 = find_pce(date_target) broadcast_message(date+" PCE Data", "-1002033782195") - broadcast_message(message1+'\n\n Percent change from month one year ago : '+message2, "-1002033782195") + broadcast_message(message1+'\n\n Percent change from month one year ago : '+f"**{message2}**", "-1002033782195") endtimee = time.time() broadcast_message( f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s", "-1002033782195") + broadcast_message(f"The above is the PCE for {target}","-1002033782195") -def broadcast_all_bs4_pce(target:str): - startimee = time.time() - date , message = find_pce_bs4('20240131a') - while date != target: - date, message = find_pce_bs4('20240131a') - broadcast_message(date+" PCE Data", "-1002033782195") - broadcast_message(message, "-1002033782195") - endtimee = time.time() - broadcast_message( - f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s", - "-1002033782195") has_broadcasted = False def wrapper_function(target): @@ -314,6 +191,7 @@ if __name__ == "__main__": fomc_url = "https://www.federalreserve.gov/newsevents/pressreleases/monetary" pce_url = "https://www.bea.gov/news/2024/personal-income-and-outlays-"#january-2024 # broadcast_all_pce("February", "february-2024") + ''' print("Start Time:" , datetime.fromtimestamp(time.time())) schedule.every().day.at("19:56").do(wrapper_function, "FEBRUARY") while True: @@ -323,10 +201,17 @@ if __name__ == "__main__": print("Broadcast completed") break time.sleep(0.1) # Check every 0.1 seconds - #NonFarm - # text = download_pdf_nonfarm() + ''' - # read_nonfarm(url) - # print(text) + #NonFarm + # date , message = find_non_farm(nonfarm_url) + # print(date) + # print(message) #CPI - # broadcast_all_cpi("FEBRUARY") + # find_cpi("FEBRUARY") + + #PCE + # date, message1, message2 = find_pce("february-2024") + # print(date) + # print(message1) + # print(message2)