import time
import math
import numpy as np
from selenium import webdriver
from selenium.webdriver.common.by import By
import selenium.webdriver.support.ui as ui
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import NoSuchElementException,TimeoutException
from selenium.common.exceptions import StaleElementReferenceException
def get_html_sh(code,PROXY=0):
'''
从上交所爬取数据
'''
# 浏览器启动选项
option=webdriver.FirefoxOptions()
# 添加启动选项,指定为无界面模式
option.add_argument('--headless')
# PROXY=0,默认不使用代理ip
# 若传入代理ip
if PROXY!=0:
option.add_argument('--proxy-server=%s' % PROXY)
else:
pass
# 创建Firefox驱动程序的实例
browser = webdriver.Firefox(options=option)
browser.get("http://www.sse.com.cn/disclosure/listedinfo/regular/")
browser.set_window_size(1550,830)
browser.find_element(By.ID,"inputCode").click()
browser.find_element(By.ID,"inputCode").send_keys(code)
# WebDriverWait(driver,60):60秒内每隔500毫秒扫描1次页面变化,当出现指定的元素后结束
wait = ui.WebDriverWait(browser,60)
try:
wait.until(lambda browser: browser.find_element(By.CSS_SELECTOR, ".sse_outerItem:nth-child(4) .filter-option-inner-inner"))
except TimeoutException:
return code
browser.find_element(By.CSS_SELECTOR, ".sse_outerItem:nth-child(4) .filter-option-inner-inner").click()
browser.find_element(By.LINK_TEXT,"年报").click()
css_selector0 = "body > div.container.sse_content > div > div.col-lg-9.col-xxl-10 > div > div.sse_colContent.js_regular > div.table-responsive > table > tbody > tr > td"
try:
wait.until(lambda browser: browser.find_element(By.CSS_SELECTOR,css_selector0))
except TimeoutException:
return code
element0 = browser.find_element(By.CSS_SELECTOR,css_selector0)
try:
table0 = element0.get_attribute('innerHTML')
except StaleElementReferenceException:
return code
if table0=='暂无数据':
return 0
css_selector = "body > div.container.sse_content > div > div.col-lg-9.col-xxl-10 > div > div.sse_colContent.js_regular > div.table-responsive > table"
try:
wait.until(lambda browser: browser.find_element(By.CSS_SELECTOR,css_selector))
except TimeoutException:
return code
element = browser.find_element(By.CSS_SELECTOR,css_selector)
table = element.get_attribute('innerHTML')
f = open("上交所年报.html",'a',encoding='utf-8')
f.write(table)
f.close()
# 默认每页25条
css_selector_page_sh = "body > div.container.sse_content > div > div.col-lg-9.col-xxl-10 > div > div.sse_colContent.js_regular > div.pagination-box > span._count.hidden-xl > b"
try:
wait.until(lambda browser: browser.find_element(By.CSS_SELECTOR,css_selector_page_sh))
except TimeoutException:
return code
element_page_sh = browser.find_element(By.CSS_SELECTOR,css_selector_page_sh)
page_sh = int(element_page_sh.get_attribute('innerHTML'))
page_sh = math.ceil(page_sh/25)
if page_sh!=1:
for j in range(2,page_sh+1):
time.sleep(3)
try:
wait.until(lambda browser: browser.find_element(By.LINK_TEXT,str(j)))
except TimeoutException:
return code
browser.find_element(By.LINK_TEXT,str(j)).click()
css_selector = "body > div.container.sse_content > div > div.col-lg-9.col-xxl-10 > div > div.sse_colContent.js_regular > div.table-responsive > table"
try:
wait.until(lambda browser: browser.find_element(By.CSS_SELECTOR,css_selector))
except TimeoutException:
return code
element = browser.find_element(By.CSS_SELECTOR,css_selector)
table = element.get_attribute('innerHTML')
f = open("上交所年报.html",'a',encoding='utf-8')
f.write(table)
f.close()
browser.close()
print('Successfully download the report(html) of {}'.format(code))
time.sleep(10)
return 1
def get_mul_html_sh(codes):
'''
从上交所爬取数据
'''
lst = []
for code in codes:
lst.append(get_html_sh(code))
return lst
def get_html_sz(code,PROXY=0):
'''
从深交所爬取数据
Parameters
----------
codes : 证券代码列表
'''
# 浏览器启动选项
option=webdriver.FirefoxOptions()
# 添加启动选项,指定为无界面模式
option.add_argument('--headless')
# PROXY=0,默认不使用代理ip
# 若传入代理ip
if PROXY!=0:
option.add_argument('--proxy-server=%s' % PROXY)
else:
pass
# 创建Firefox驱动程序的实例
browser = webdriver.Firefox(options=option)
browser.get("http://www.szse.cn/disclosure/listed/fixed/")
browser.set_window_size(1295, 695)
wait = ui.WebDriverWait(browser,60)
try:
wait.until(lambda browser: browser.find_element(By.CSS_SELECTOR, "#select_gonggao .c-selectex-btn-text"))
except TimeoutException:
return code
browser.find_element(By.CSS_SELECTOR, "#select_gonggao .c-selectex-btn-text").click()
time.sleep(1)
browser.find_element(By.LINK_TEXT, "年度报告").click()
time.sleep(1)
browser.find_element(By.CSS_SELECTOR, ".disclosure-title > .title").click()
time.sleep(1)
browser.find_element(By.ID, "input_code").click()
time.sleep(1)
browser.find_element(By.ID, "input_code").send_keys(code)
time.sleep(1)
browser.find_element(By.ID, "input_code").send_keys(Keys.ENTER)
time.sleep(1)
# 获得当页页数
css_selector_page = "#disclosure-table > div > div.disclosure-table-footer.clearfix > div.current-page.pull-left > span.num-all"
try:
wait.until(lambda browser: browser.find_element(By.CSS_SELECTOR,css_selector_page))
except TimeoutException:
return code
element_page = browser.find_element(By.CSS_SELECTOR,css_selector_page)
page = int(element_page.get_attribute('innerHTML'))
css_selector_table0 = "#disclosure-table > div > div.table-con-outer > div > table > tbody"
try:
wait.until(lambda browser: browser.find_element(By.CSS_SELECTOR,css_selector_table0))
except TimeoutException:
return code
element_table0 = browser.find_element(By.CSS_SELECTOR,css_selector_table0)
try:
table_html0 = element_table0.get_attribute('innerHTML')
except StaleElementReferenceException:
return code
f = open("深交所年报.html",'a',encoding='utf-8')
f.write(table_html0)
f.close()
if page!=1:
for j in range(2,page+1):
try:
wait.until(lambda browser: browser.find_element(By.LINK_TEXT,str(j)))
except TimeoutException:
return code
browser.find_element(By.LINK_TEXT,str(j)).click()
css_selector_table = "#disclosure-table > div > div.table-con-outer > div > table > tbody"
try:
wait.until(lambda browser: browser.find_element(By.CSS_SELECTOR,css_selector_table))
except TimeoutException:
return code
element_table = browser.find_element(By.CSS_SELECTOR,css_selector_table)
table_html = element_table.get_attribute('innerHTML')
f = open("深交所年报.html",'a',encoding='utf-8')
f.write(table_html)
f.close()
time.sleep(3)
browser.close()
print('Successfully download the report(html) of {}'.format(code))
time.sleep(5)
return 1
def get_mul_html_sz(codes):
'''
从深交所爬取数据
'''
lst = []
for code in codes:
lst.append(get_html_sz(code))
return lst