master
joey0629 1 year ago
parent 44e9267390
commit e88cd95a58
  1. 94
      main.py

@ -5,6 +5,8 @@ import time
import requests import requests
from datetime import datetime from datetime import datetime
import schedule import schedule
from bs4 import BeautifulSoup
import requests
BOT_TOKEN = '6701395239:AAFE30dqvNihDdni9vYoAbWssO-X5yAmwho' BOT_TOKEN = '6701395239:AAFE30dqvNihDdni9vYoAbWssO-X5yAmwho'
# BOT_TOKEN = "6746720034:AAEMaoV2FwIZ8pz_PF18-bo2a6gFC1eVtVs" # BOT_TOKEN = "6746720034:AAEMaoV2FwIZ8pz_PF18-bo2a6gFC1eVtVs"
@ -32,9 +34,9 @@ def find_indicate_bs4(url):
soup = BeautifulSoup(response.text, 'html.parser') soup = BeautifulSoup(response.text, 'html.parser')
# 使用CSS選擇器找到元素 # 使用CSS選擇器找到元素
# element = soup.select("tr",class_ ='item-fact-row' ) element = soup.select("tr",class_ ='item-fact-row' )
print(soup) print(soup)
# return element[1].text.split('+')[0]+" "+element[1].text.split('+')[1] return element[1].text.split('+')[0]+" "+element[1].text.split('+')[1]
def find_indicate(url): def find_indicate(url):
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.common.by import By from selenium.webdriver.common.by import By
@ -50,42 +52,69 @@ def find_indicate(url):
date = driver.find_element(By.XPATH, '//*[@id="test"]/div[2]/article/div/div/div[1]/div[2]/div/div[1]/div/table/tbody/tr[1]/td[1]') date = driver.find_element(By.XPATH, '//*[@id="test"]/div[2]/article/div/div/div[1]/div[2]/div/div[1]/div/table/tbody/tr[1]/td[1]')
value = driver.find_element(By.XPATH, '//*[@id="test"]/div[2]/article/div/div/div[1]/div[2]/div/div[1]/div/table/tbody/tr[1]/td[2]') value = driver.find_element(By.XPATH, '//*[@id="test"]/div[2]/article/div/div/div[1]/div[2]/div/div[1]/div/table/tbody/tr[1]/td[2]')
return date.text , value.text return date.text , value.text
def find_cpi(url):
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
options = Options()
# options.add_argument("--headless") # 啟用無頭模式
driver =webdriver.Chrome(options=options)
driver.get(url)
# time.sleep(3)
date = driver.find_element(By.XPATH, '//*[@id="bodytext"]/div[1]/pre[1]').text.split('\n')[6].split("-")[1].split(" ")[1]
value = driver.find_element(By.XPATH, '//*[@id="bodytext"]/div[1]/pre[1]').text.split('\n')[8:12]
return date , value
# 是一個持續運行的迴圈,不斷從Telegram伺服器抓取新的消息 # 是一個持續運行的迴圈,不斷從Telegram伺服器抓取新的消息
# 然後使用上面定義的消息處理器來處理這些消息。 # 然後使用上面定義的消息處理器來處理這些消息。
#非農就業人數 #非農就業人數
def read_pdf_nonfarm(month, year): def read_pdf_nonfarm(month, year):
pdf = pdfplumber.open(f"empsit/empsit_{month}_{year}.pdf") pdf = pdfplumber.open(f"empsit_{month}_{year}.pdf")
page = pdf.pages[0] page = pdf.pages[0]
text = page.extract_text().split('\n') text = page.extract_text().split('\n')
text = (text[7]+text[8]).split(',') text = (text[7]+text[8]).split(',')
text = text[0]+text[1]+text[2] text = text[0]+text[1]+text[2]
return text return text
def read_nonfarm(): def download_pdf_nonfarm():
url = "https://www.bls.gov/news.release/pdf/empsit.pdf"
response = requests.get(url)
def read_nonfarm(url):
from bs4 import BeautifulSoup
import requests
startimee = time.time() startimee = time.time()
for i in range(7,13): response = requests.get(url)
print(f"2022年{i}月非農就業人數: ", end= "" ) soup = BeautifulSoup(response.text, 'html.parser')
print(read_pdf_nonfarm(i, 23))
# 使用CSS選擇器找到元素
element = soup.select("div", class_='normalnews')
print(element[0].text)
endtimee = time.time() endtimee = time.time()
print("Time: ", endtimee-startimee) print("Time: ", endtimee-startimee)
return element[0].text
# print(text.split('\n')[7:9]) # print(text.split('\n')[7:9])
def read_CPI(url):
startimee = time.time()
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
# 使用CSS選擇器找到元素
element = soup.select("div", class_='normalnews')
print(element[0].text)
endtimee = time.time()
print("Time: ", endtimee - startimee)
return element[0].text
# print(text.split('\n')[7:9])
def read_PCE(): def read_PCE():
message = find_indicate_bs4("https://www.bea.gov/data/personal-consumption-expenditures-price-index") message = find_indicate_bs4("https://www.bea.gov/data/income-saving/personal-income")
return message return message
# def read_PCE_test():
# from bs4 import BeautifulSoup
# import requests
# response = requests.get("http://127.0.0.1:5000")
# soup = BeautifulSoup(response.text, 'html.parser')
#
# # 使用CSS選擇器找到元素
# element = soup.select("p")
# print(element[0].text)
# return element[0].text
def broadcast_all(target:str): def broadcast_all(target:str):
startimee = time.time() startimee = time.time()
message = read_PCE() message = read_PCE()
@ -106,11 +135,34 @@ def broadcast_all_sele(target:str):
broadcast_message( broadcast_message(
f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s", f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s",
"-1002033782195") "-1002033782195")
def broadcast_all_cpi(target:str):
startimee = time.time()
date , message = find_cpi(cpi_url)
while date != target:
date, message = find_cpi(cpi_url)
message= "\n".join(message)
broadcast_message(message, "-1002033782195")
endtimee = time.time()
broadcast_message(
f"Now :{datetime.fromtimestamp(time.time())} , Spend time :{str(round(endtimee - startimee, 3))} s",
"-1002033782195")
if __name__ == "__main__": if __name__ == "__main__":
global nonfarm_url , cpi_url
nonfarm_url = "https://www.bls.gov/news.release/empsit.nr0.htm"
cpi_url = "https://www.bls.gov/news.release/cpi.nr0.htm"
#PCE #PCE
print("Start Time:" , datetime.fromtimestamp(time.time())) print("Start Time:" , datetime.fromtimestamp(time.time()))
schedule.every().day.at("21:30").do(broadcast_all_sele, "January") schedule.every().day.at("17:16").do(broadcast_all_cpi, "JANUARY")
while True: while True:
if datetime.now().strftime("%Y-%m-%d %H:%M") == "2024-02-29 21:30": if datetime.now().strftime("%Y-%m-%d %H:%M") == "2024-03-12 17:16":
schedule.run_pending() schedule.run_pending()
time.sleep(0.1) # Check every 0.1 seconds time.sleep(0.1) # Check every 0.1 seconds
#NonFarm
# text = download_pdf_nonfarm()
# read_nonfarm(url)
# print(text)
#CPI
# data, value = find_cpi(cpi_url)
# read_CPI(cpi_url)
Loading…
Cancel
Save