You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

324 lines
12 KiB

import telebot
from config import Config
import pdfplumber
import time
import requests
from datetime import datetime
import schedule
from bs4 import BeautifulSoup
import requests
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
BOT_TOKEN = '6701395239:AAFE30dqvNihDdni9vYoAbWssO-X5yAmwho'
# BOT_TOKEN = "6746720034:AAEMaoV2FwIZ8pz_PF18-bo2a6gFC1eVtVs"
#BOT_TOKEN = '6589162555:AAHGhrTQ0wYNtIUySMohnfpxQl1d6blr24Q'
def broadcast_message(message:str,chat_id:str):
r = requests.post(f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage",
json={
"chat_id": chat_id,
"text": message,
},
)
def start():
global startime
startime = time.time()
url = "https://www.bea.gov/news/2024/personal-income-and-outlays-december-2023"
text = find_indicate(url)
return text
def find_indicate_bs4(url):
from bs4 import BeautifulSoup
import requests
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
# 使用CSS選擇器找到元素
element = soup.select("tr",class_ ='item-fact-row' )
print(soup)
return element[1].text.split('+')[0]+" "+element[1].text.split('+')[1]
def find_indicate(url):
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
options = Options()
# options.add_argument("--headless") # 啟用無頭模式
driver =webdriver.Chrome(options=options)
driver.get(url)
# time.sleep(3)
date = driver.find_element(By.XPATH, '//*[@id="test"]/div[2]/article/div/div/div[1]/div[2]/div/div[1]/div/table/tbody/tr[1]/td[1]')
value = driver.find_element(By.XPATH, '//*[@id="test"]/div[2]/article/div/div/div[1]/div[2]/div/div[1]/div/table/tbody/tr[1]/td[2]')
return date.text , value.text
def find_Fomc_bs4(date:str):
from bs4 import BeautifulSoup
import requests
response = requests.get(fomc_url+date+".htm")
soup = BeautifulSoup(response.text, 'html.parser')
# 使用CSS選擇器找到元素
time_elements = soup.select('.article__time')
value_elements = soup.find_all('div',class_="col-xs-12 col-sm-8 col-md-8" )
if time_elements:
time_text = time_elements[0].text
time = time_text.split(',')[0].split(' ')[0]
else:
time = None
if value_elements:
split_elements = [item.text.split('.') for item in value_elements]
result = []
for sublist in split_elements:
result.extend(sublist)
value = [item for item in result if 'In support of' in item]
else:
value = None
return time, value[0].strip('\n')
def find_cpi(url):
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
print("Start")
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument("--window-size=1920,1080") # 可以根據需要調整這個大小
options.add_argument(
'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3')
driver = webdriver.Chrome( options=options)
driver.get(url)
print("Open")
# time.sleep(4)
date = driver.find_element(By.XPATH, '/html/body/div[2]/div[5]/div/div[1]/pre[1]').text.split('\n')[6].split("-")[1].split(" ")[1]
value = driver.find_element(By.XPATH, '/html/body/div[2]/div[5]/div/div[1]/pre[1]').text.split('\n')[8:12]
# driver.quit()
return date , value
def find_fomc(url,date:str):
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
print("Start")
options = Options()
options.add_argument("--headless") # 啟用無頭模式
driver = webdriver.Firefox(options = options)
driver.get(url+date+".htm")
# time.sleep(3)
date = driver.find_element(By.CLASS_NAME, 'article__time').text.split(',')[0].split(' ')[0]
value = driver.find_element(By.XPATH, '//*[@id="article"]/div[3]').text.split('.')
value = [item for item in value if 'In support of' in item]
driver.quit()
return date , value[0].strip('\n')
def find_pce(date:str):
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
print("Start")
options = Options()
options.add_argument("--headless") # 啟用無頭模式
driver = webdriver.Chrome()
driver.get(pce_url+date)
# time.sleep(3)
date = driver.find_element(By.XPATH, '//*[@id="home"]/h1').text.split(' ')[4]
value_1 = driver.find_element(By.XPATH, '//*[@id="home"]/div[2]/div/div/p[2]').text
value_2 = driver.find_element(By.XPATH, '//*[@id="home"]/div[2]/div/div/div[1]/table/tbody/tr[13]').text.strip(' ').split(' ')[-1]
driver.quit()
return date , value_1 , value_2
def find_pce_bs4(date:str):
from bs4 import BeautifulSoup
import requests
response = requests.get(pce_url+date)
soup = BeautifulSoup(response.text, 'html.parser')
print(soup)
# 使用CSS選擇器找到元素
time_elements = soup.select('.row release-embargo')
value_elements = soup.find_all('div',class_="col-md-12 release-body" )
value2_elements = soup.find_all('td',class_="text-left" )
print(value_elements)
if time_elements:
time_text = time_elements[0].text
print(time_text)
time = time_text.split(' ')[4]
print(time)
else:
time = None
if value_elements:
split_elements = [item.text.split('.') for item in value_elements]
print(value_elements)
value1 = []
for sublist in split_elements:
value1.extend(sublist)
print(value1)
else:
value1 = None
if value2_elements:
split_elements = [item.text.split('.') for item in value2_elements]
value2 = []
for sublist in split_elements:
value2.extend(sublist)
print(value2)
else:
value2 = None
return time, value1 , value2
#非農就業人數
def read_pdf_nonfarm(month, year):
pdf = pdfplumber.open(f"empsit_{month}_{year}.pdf")
page = pdf.pages[0]
text = page.extract_text().split('\n')
text = (text[7]+text[8]).split(',')
text = text[0]+text[1]+text[2]
return text
def download_pdf_nonfarm():
url = "https://www.bls.gov/news.release/pdf/empsit.pdf"
response = requests.get(url)
def read_nonfarm(url):
from bs4 import BeautifulSoup
import requests
startimee = time.time()
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
# 使用CSS選擇器找到元素
element = soup.select("div", class_='normalnews')
print(element[0].text)
endtimee = time.time()
print("Time: ", endtimee-startimee)
return element[0].text
# print(text.split('\n')[7:9])
def read_CPI(url):
startimee = time.time()
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
# 使用CSS選擇器找到元素
element = soup.select("div", class_='normalnews')
print(element[0].text)
endtimee = time.time()
print("Time: ", endtimee - startimee)
return element[0].text
# print(text.split('\n')[7:9])
def read_PCE():
message = find_indicate_bs4("https://www.bea.gov/data/income-saving/personal-income")
return message
def broadcast_all(target:str):
startimee = time.time()
message = read_PCE()
while message.split(' ')[0] != target:
message = read_PCE()
broadcast_message(message, "-1002033782195")
endtimee = time.time()
broadcast_message(
f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s",
"-1002033782195")
def broadcast_all_sele(target:str):
startimee = time.time()
date , message = find_indicate("https://www.bea.gov/data/personal-consumption-expenditures-price-index")
while date.split(' ')[0] != target:
date ,message = find_indicate("https://www.bea.gov/data/personal-consumption-expenditures-price-index")
broadcast_message(date+message, "-1002033782195")
endtimee = time.time()
broadcast_message(
f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s",
"-1002033782195")
def broadcast_all_cpi(target:str):
startimee = time.time()
date , message = find_cpi(cpi_url)
while date != target:
date, message = find_cpi(cpi_url)
message= "\n".join(message)
broadcast_message(message, "-1002033782195")
endtimee = time.time()
broadcast_message(
f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s",
"-1002033782195")
def broadcast_all_fomc(target:str):
startimee = time.time()
date , message = find_fomc(fomc_url,'20240131a')
while date != target:
date, message = find_fomc(fomc_url,'20240131a')
broadcast_message(message, "-1002033782195")
endtimee = time.time()
broadcast_message(
f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s",
"-1002033782195")
def broadcast_all_bs4_fomc(target:str):
startimee = time.time()
date , message = find_Fomc_bs4('20240131a')
while date != target:
date, message = find_Fomc_bs4('20240131a')
broadcast_message(message, "-1002033782195")
endtimee = time.time()
broadcast_message(
f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s",
"-1002033782195")
def broadcast_all_pce(target:str,date_target:str):
startimee = time.time()
date, message1, message2 = find_pce(date_target)
while date != target:
date, message1, message2 = find_pce(date_target)
broadcast_message(date+" PCE Data", "-1002033782195")
broadcast_message(message1+'\n\n Percent change from month one year ago : '+message2, "-1002033782195")
endtimee = time.time()
broadcast_message(
f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s",
"-1002033782195")
def broadcast_all_bs4_pce(target:str):
startimee = time.time()
date , message = find_pce_bs4('20240131a')
while date != target:
date, message = find_pce_bs4('20240131a')
broadcast_message(date+" PCE Data", "-1002033782195")
broadcast_message(message, "-1002033782195")
endtimee = time.time()
broadcast_message(
f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s",
"-1002033782195")
if __name__ == "__main__":
global nonfarm_url , cpi_url , fomc_url , pce_url
nonfarm_url = "https://www.bls.gov/news.release/empsit.nr0.htm"
cpi_url = "https://www.bls.gov/news.release/cpi.nr0.htm"
fomc_url = "https://www.federalreserve.gov/newsevents/pressreleases/monetary"
pce_url = "https://www.bea.gov/news/2024/personal-income-and-outlays-"#january-2024
# broadcast_all_pce("February", "february-2024")
# print("Start Time:" , datetime.fromtimestamp(time.time()))
# schedule.every().day.at("18:39").do(broadcast_all_cpi, "FEBRUARY")
# while True:
# if datetime.now().strftime("%Y-%m-%d %H:%M") == "2024-04-10 18:39":
# schedule.run_pending()
# time.sleep(0.1) # Check every 0.1 seconds
#NonFarm
# text = download_pdf_nonfarm()
# read_nonfarm(url)
# print(text)
#CPI
date , value = find_cpi(cpi_url)
print(date)
print(value)
# broadcast_all_cpi("FEBRUARY")