You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

233 lines
9.2 KiB

1 year ago
import telebot
from config import Config
import pdfplumber
import time
import requests
from datetime import datetime
import schedule
1 year ago
from bs4 import BeautifulSoup
import requests
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
from selenium.common.exceptions import WebDriverException
1 year ago
BOT_TOKEN = '6701395239:AAFE30dqvNihDdni9vYoAbWssO-X5yAmwho'
# BOT_TOKEN = "6746720034:AAEMaoV2FwIZ8pz_PF18-bo2a6gFC1eVtVs"
#BOT_TOKEN = '6589162555:AAHGhrTQ0wYNtIUySMohnfpxQl1d6blr24Q'
def broadcast_message(message:str,chat_id:str):
r = requests.post(f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage",
json={
"chat_id": chat_id,
"text": message,
},
1 year ago
)
1 year ago
1 year ago
def find_cpi(url):
from selenium import webdriver
from selenium.webdriver.common.by import By
1 year ago
from selenium.webdriver.chrome.options import Options
1 year ago
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
print("Start")
options = Options()
options.add_argument('--headless')
1 year ago
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument("--window-size=1920,1080") # 可以根據需要調整這個大小
options.add_argument(
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3')
1 year ago
driver = webdriver.Chrome( options=options)
1 year ago
driver.get(url)
print("Open")
# time.sleep(4)
date = driver.find_element(By.XPATH, '/html/body/div[2]/div[5]/div/div[1]/pre[1]').text.split('\n')[6].split("-")[1].split(" ")[1]
value = driver.find_element(By.XPATH, '/html/body/div[2]/div[5]/div/div[1]/pre[1]').text.split('\n')[8:12]
# driver.quit()
1 year ago
return date , value
1 year ago
def find_fomc(url,date:str):
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
print("Start")
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument("--window-size=1920,1080") # 可以根據需要調整這個大小
options.add_argument(
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3')
driver = webdriver.Chrome( options=options)
driver.get(url+date+".htm")
print("Open")
# time.sleep(3)
date = driver.find_element(By.CLASS_NAME, 'article__time').text.split(',')[0].split(' ')[0]
value = driver.find_element(By.XPATH, '//*[@id="article"]/div[3]').text.split('.')
value = [item for item in value if 'In support of' in item]
driver.quit()
return date , value[0].strip('\n')
1 year ago
def find_pce(date:str):
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
print("Start")
options = Options()
# options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument("--window-size=1920,1080") # 可以根據需要調整這個大小
options.add_argument(
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3')
driver = webdriver.Chrome( options=options)
try:
driver.get(pce_url+date)
print("Successfully accessed the website.")
time.sleep(0.5)
date = driver.find_element(By.XPATH, '//*[@id="home"]/h1').text.split(' ')[4]
value_1 = driver.find_element(By.XPATH, '//*[@id="home"]/div[2]/div/div/p[2]').text
value_2 = driver.find_element(By.XPATH, '//*[@id="home"]/div[2]/div/div/div[1]/table/tbody/tr[13]').text.strip(' ').split(' ')[-1]
return date, value_1, value_2
except WebDriverException as e:
print(e)
print("Failed to access the website.")
return None , None , None
finally:
driver.quit()
1 year ago
#非農就業人數
def find_non_farm(url):
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
print("Start")
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument("--window-size=1920,1080") # 可以根據需要調整這個大小
options.add_argument(
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3')
driver = webdriver.Chrome( options=options)
driver.get(url)
print("Open")
# time.sleep(4)
date = driver.find_element(By.XPATH, '/html/body/div[2]/div[5]/div/div[1]/pre').text.split('\n')[6]
value = driver.find_element(By.XPATH, '/html/body/div[2]/div[5]/div/div[1]/pre').text.split('\n')[8:12]
# driver.quit()
return date , value
1 year ago
def read_pdf_nonfarm(month, year):
1 year ago
pdf = pdfplumber.open(f"empsit_{month}_{year}.pdf")
1 year ago
page = pdf.pages[0]
text = page.extract_text().split('\n')
text = (text[7]+text[8]).split(',')
text = text[0]+text[1]+text[2]
return text
1 year ago
def broadcast_all_non_farm(target:str):
1 year ago
startimee = time.time()
date , message = find_non_farm(nonfarm_url)
while date != target:
date, message = find_non_farm(nonfarm_url)
message= "\n".join(message)
broadcast_message(message, "-1002033782195")
1 year ago
endtimee = time.time()
broadcast_message(
f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s",
"-1002033782195")
broadcast_message(f"The above is the Non Farm for {target}","-1002033782195")
return True
1 year ago
def broadcast_all_cpi(target:str):
startimee = time.time()
date , message = find_cpi(cpi_url)
while date != target:
date, message = find_cpi(cpi_url)
message= "\n".join(message)
broadcast_message(message, "-1002033782195")
endtimee = time.time()
broadcast_message(
f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s",
"-1002033782195")
broadcast_message(f"The above is the CPI for {target}","-1002033782195")
return True
def broadcast_all_fomc(target:str):
startimee = time.time()
date , message = find_fomc(fomc_url,'20240131a')
while date != target:
date, message = find_fomc(fomc_url,'20240131a')
broadcast_message(message, "-1002033782195")
endtimee = time.time()
broadcast_message(
f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s",
"-1002033782195")
broadcast_message(f"The above is the FOMC abstract for {target}","-1002033782195")
1 year ago
def broadcast_all_pce(target:str,date_target:str):
startimee = time.time()
date, message1, message2 = find_pce(date_target)
while date != target:
date, message1, message2 = find_pce(date_target)
broadcast_message(date+" PCE Data", "-1002033782195")
broadcast_message(message1+'\n\n Percent change from month one year ago : '+f"**{message2}**", "-1002033782195")
1 year ago
endtimee = time.time()
broadcast_message(
f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s",
"-1002033782195")
broadcast_message(f"The above is the PCE for {target}","-1002033782195")
1 year ago
has_broadcasted = False
def wrapper_function_cpi(target):
global has_broadcasted
result = broadcast_all_cpi(target)
if result:
has_broadcasted = True
def wrapper_function_pce(target,date_target):
global has_broadcasted
result = broadcast_all_pce(target,date_target)
if result:
has_broadcasted = True
if __name__ == "__main__":
1 year ago
global nonfarm_url , cpi_url , fomc_url , pce_url
1 year ago
nonfarm_url = "https://www.bls.gov/news.release/empsit.nr0.htm"
cpi_url = "https://www.bls.gov/news.release/cpi.nr0.htm"
fomc_url = "https://www.federalreserve.gov/newsevents/pressreleases/monetary"
1 year ago
pce_url = "https://www.bea.gov/news/2024/personal-income-and-outlays-"#january-2024
# broadcast_all_pce("February", "february-2024")
print("Start Time:" , datetime.fromtimestamp(time.time()))
'''
schedule.every().day.at("10:44").do(wrapper_function_pce, "February", "february-2024")
while True:
if datetime.now().strftime("%Y-%m-%d %H:%M") == "2024-04-23 10:44":
schedule.run_pending()
if has_broadcasted:
print("Broadcast completed")
break
time.sleep(0.1) # Check every 0.1 seconds
'''
#NonFarm
# date , message = find_non_farm(nonfarm_url)
# print(date)
# print(message)
1 year ago
#CPI
# find_cpi("FEBRUARY")
#PCE
#broadcast_all_pce("February", "february-2024")
date, message1, message2 = find_pce("february-2024")
print(date)
print(message1)
print(message2)