import telebot from config import Config import pdfplumber import time import requests from datetime import datetime import schedule from bs4 import BeautifulSoup import requests from selenium import webdriver from selenium.webdriver.chrome.service import Service as ChromeService from webdriver_manager.chrome import ChromeDriverManager BOT_TOKEN = '6701395239:AAFE30dqvNihDdni9vYoAbWssO-X5yAmwho' # BOT_TOKEN = "6746720034:AAEMaoV2FwIZ8pz_PF18-bo2a6gFC1eVtVs" #BOT_TOKEN = '6589162555:AAHGhrTQ0wYNtIUySMohnfpxQl1d6blr24Q' def broadcast_message(message:str,chat_id:str): r = requests.post(f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage", json={ "chat_id": chat_id, "text": message, }, ) def start(): global startime startime = time.time() url = "https://www.bea.gov/news/2024/personal-income-and-outlays-december-2023" text = find_indicate(url) return text def find_indicate_bs4(url): from bs4 import BeautifulSoup import requests response = requests.get(url) soup = BeautifulSoup(response.text, 'html.parser') # 使用CSS選擇器找到元素 element = soup.select("tr",class_ ='item-fact-row' ) print(soup) return element[1].text.split('+')[0]+" "+element[1].text.split('+')[1] def find_indicate(url): from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import time options = Options() # options.add_argument("--headless") # 啟用無頭模式 driver =webdriver.Chrome(options=options) driver.get(url) # time.sleep(3) date = driver.find_element(By.XPATH, '//*[@id="test"]/div[2]/article/div/div/div[1]/div[2]/div/div[1]/div/table/tbody/tr[1]/td[1]') value = driver.find_element(By.XPATH, '//*[@id="test"]/div[2]/article/div/div/div[1]/div[2]/div/div[1]/div/table/tbody/tr[1]/td[2]') return date.text , value.text def find_Fomc_bs4(date:str): from bs4 import BeautifulSoup import requests response = requests.get(fomc_url+date+".htm") soup = BeautifulSoup(response.text, 'html.parser') # 使用CSS選擇器找到元素 time_elements = soup.select('.article__time') value_elements = soup.find_all('div',class_="col-xs-12 col-sm-8 col-md-8" ) if time_elements: time_text = time_elements[0].text time = time_text.split(',')[0].split(' ')[0] else: time = None if value_elements: split_elements = [item.text.split('.') for item in value_elements] result = [] for sublist in split_elements: result.extend(sublist) value = [item for item in result if 'In support of' in item] else: value = None return time, value[0].strip('\n') def find_cpi(url): from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import time print("Start") options = Options() # options.add_argument('--headless') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') driver = webdriver.Chrome( options=options) driver.get(url) # time.sleep(3) date = driver.find_element(By.XPATH, '//*[@id="bodytext"]/div[1]/pre[1]').text.split('\n')[6].split("-")[1].split(" ")[1] value = driver.find_element(By.XPATH, '//*[@id="bodytext"]/div[1]/pre[1]').text.split('\n')[8:12] driver.quit() return date , value def find_fomc(url,date:str): from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.firefox.options import Options print("Start") options = Options() options.add_argument("--headless") # 啟用無頭模式 driver = webdriver.Firefox(options = options) driver.get(url+date+".htm") # time.sleep(3) date = driver.find_element(By.CLASS_NAME, 'article__time').text.split(',')[0].split(' ')[0] value = driver.find_element(By.XPATH, '//*[@id="article"]/div[3]').text.split('.') value = [item for item in value if 'In support of' in item] driver.quit() return date , value[0].strip('\n') def find_pce(date:str): from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options print("Start") options = Options() options.add_argument("--headless") # 啟用無頭模式 driver = webdriver.Chrome() driver.get(pce_url+date) # time.sleep(3) date = driver.find_element(By.XPATH, '//*[@id="home"]/h1').text.split(' ')[4] value_1 = driver.find_element(By.XPATH, '//*[@id="home"]/div[2]/div/div/p[2]').text value_2 = driver.find_element(By.XPATH, '//*[@id="home"]/div[2]/div/div/div[1]/table/tbody/tr[13]').text.strip(' ').split(' ')[-1] driver.quit() return date , value_1 , value_2 def find_pce_bs4(date:str): from bs4 import BeautifulSoup import requests response = requests.get(pce_url+date) soup = BeautifulSoup(response.text, 'html.parser') print(soup) # 使用CSS選擇器找到元素 time_elements = soup.select('.row release-embargo') value_elements = soup.find_all('div',class_="col-md-12 release-body" ) value2_elements = soup.find_all('td',class_="text-left" ) print(value_elements) if time_elements: time_text = time_elements[0].text print(time_text) time = time_text.split(' ')[4] print(time) else: time = None if value_elements: split_elements = [item.text.split('.') for item in value_elements] print(value_elements) value1 = [] for sublist in split_elements: value1.extend(sublist) print(value1) else: value1 = None if value2_elements: split_elements = [item.text.split('.') for item in value2_elements] value2 = [] for sublist in split_elements: value2.extend(sublist) print(value2) else: value2 = None return time, value1 , value2 #非農就業人數 def read_pdf_nonfarm(month, year): pdf = pdfplumber.open(f"empsit_{month}_{year}.pdf") page = pdf.pages[0] text = page.extract_text().split('\n') text = (text[7]+text[8]).split(',') text = text[0]+text[1]+text[2] return text def download_pdf_nonfarm(): url = "https://www.bls.gov/news.release/pdf/empsit.pdf" response = requests.get(url) def read_nonfarm(url): from bs4 import BeautifulSoup import requests startimee = time.time() response = requests.get(url) soup = BeautifulSoup(response.text, 'html.parser') # 使用CSS選擇器找到元素 element = soup.select("div", class_='normalnews') print(element[0].text) endtimee = time.time() print("Time: ", endtimee-startimee) return element[0].text # print(text.split('\n')[7:9]) def read_CPI(url): startimee = time.time() response = requests.get(url) soup = BeautifulSoup(response.text, 'html.parser') # 使用CSS選擇器找到元素 element = soup.select("div", class_='normalnews') print(element[0].text) endtimee = time.time() print("Time: ", endtimee - startimee) return element[0].text # print(text.split('\n')[7:9]) def read_PCE(): message = find_indicate_bs4("https://www.bea.gov/data/income-saving/personal-income") return message def broadcast_all(target:str): startimee = time.time() message = read_PCE() while message.split(' ')[0] != target: message = read_PCE() broadcast_message(message, "-1002033782195") endtimee = time.time() broadcast_message( f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s", "-1002033782195") def broadcast_all_sele(target:str): startimee = time.time() date , message = find_indicate("https://www.bea.gov/data/personal-consumption-expenditures-price-index") while date.split(' ')[0] != target: date ,message = find_indicate("https://www.bea.gov/data/personal-consumption-expenditures-price-index") broadcast_message(date+message, "-1002033782195") endtimee = time.time() broadcast_message( f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s", "-1002033782195") def broadcast_all_cpi(target:str): startimee = time.time() date , message = find_cpi(cpi_url) while date != target: date, message = find_cpi(cpi_url) message= "\n".join(message) broadcast_message(message, "-1002033782195") endtimee = time.time() broadcast_message( f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s", "-1002033782195") def broadcast_all_fomc(target:str): startimee = time.time() date , message = find_fomc(fomc_url,'20240131a') while date != target: date, message = find_fomc(fomc_url,'20240131a') broadcast_message(message, "-1002033782195") endtimee = time.time() broadcast_message( f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s", "-1002033782195") def broadcast_all_bs4_fomc(target:str): startimee = time.time() date , message = find_Fomc_bs4('20240131a') while date != target: date, message = find_Fomc_bs4('20240131a') broadcast_message(message, "-1002033782195") endtimee = time.time() broadcast_message( f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s", "-1002033782195") def broadcast_all_pce(target:str,date_target:str): startimee = time.time() date, message1, message2 = find_pce(date_target) while date != target: date, message1, message2 = find_pce(date_target) broadcast_message(date+" PCE Data", "-1002033782195") broadcast_message(message1+'\n\n Percent change from month one year ago : '+message2, "-1002033782195") endtimee = time.time() broadcast_message( f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s", "-1002033782195") def broadcast_all_bs4_pce(target:str): startimee = time.time() date , message = find_pce_bs4('20240131a') while date != target: date, message = find_pce_bs4('20240131a') broadcast_message(date+" PCE Data", "-1002033782195") broadcast_message(message, "-1002033782195") endtimee = time.time() broadcast_message( f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s", "-1002033782195") if __name__ == "__main__": global nonfarm_url , cpi_url , fomc_url , pce_url nonfarm_url = "https://www.bls.gov/news.release/empsit.nr0.htm" cpi_url = "https://www.bls.gov/news.release/cpi.nr0.htm" fomc_url = "https://www.federalreserve.gov/newsevents/pressreleases/monetary" pce_url = "https://www.bea.gov/news/2024/personal-income-and-outlays-"#january-2024 # broadcast_all_pce("February", "february-2024") # print("Start Time:" , datetime.fromtimestamp(time.time())) # schedule.every().day.at("18:39").do(broadcast_all_cpi, "FEBRUARY") # while True: # if datetime.now().strftime("%Y-%m-%d %H:%M") == "2024-04-10 18:39": # schedule.run_pending() # time.sleep(0.1) # Check every 0.1 seconds #NonFarm # text = download_pdf_nonfarm() # read_nonfarm(url) # print(text) #CPI date , value = find_cpi(cpi_url) print(date) print(value) # broadcast_all_cpi("FEBRUARY")