实验报告
爬取网页
PART1
import json
import os
from time import sleep
from urllib import parse
import requests
import time
import random
from fake_useragent import UserAgent
import pdfplumber
ua = UserAgent()
userAgen = ua.random
def get_adress(bank_name):
url = "http://www.cninfo.com.cn/new/information/topSearch/detailOfQuery"
data = {
'keyWord': bank_name,
'maxSecNum': 10,
'maxListNum': 5,
}
hd = {
'Host': 'www.cninfo.com.cn',
'Origin': 'http://www.cninfo.com.cn',
'Pragma': 'no-cache',
'Accept-Encoding': 'gzip,deflate',
'Connection': 'keep-alive',
'Content-Length': '70',
'User-Agent': userAgen,
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Accept': 'application/json,text/plain,*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
}
get_json = requests.post(url, headers=hd, data=data)
data_json = get_json.content
toStr = str(data_json, encoding="utf-8")
last_json = json.loads(toStr)
orgId = last_json["keyBoardList"][0]["orgId"] # 获取参数
plate = last_json["keyBoardList"][0]["plate"]
code = last_json["keyBoardList"][0]["code"]
return orgId, plate, code
def download_PDF(url, file_name): # 下载pdf
url = url
r = requests.get(url)
f = open(company + "/" + file_name + ".pdf", "wb")
f.write(r.content)
f.close()
def get_PDF(orgId, plate, code):
url = "http://www.cninfo.com.cn/new/hisAnnouncement/query"
data = {
'stock': '{},{}'.format(code, orgId),
'tabName': 'fulltext',
'pageSize': 20,
'pageNum': 1,
'column': plate,
'category': 'category_ndbg_szsh;',
'plate': '',
'seDate': '',
'searchkey': '',
'secid': '',
'sortName': '',
'sortType': '',
'isHLtitle': 'true',
}
hd = {
'Host': 'www.cninfo.com.cn',
'Origin': 'http://www.cninfo.com.cn',
'Pragma': 'no-cache',
'Accept-Encoding': 'gzip,deflate',
'Connection': 'keep-alive',
'User-Agent': ua.random,
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Accept': 'application/json,text/plain,*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'X-Requested-With': 'XMLHttpRequest',
}
data = parse.urlencode(data)
data_json = requests.post(url, headers=hd, data=data)
toStr = str(data_json.content, encoding="utf-8")
last_json = json.loads(toStr)
reports_list = last_json['announcements']
for report in reports_list:
if '摘要' in report['announcementTitle'] or "20" not in report['announcementTitle']:
continue
if 'H' in report['announcementTitle']:
continue
else: # http://static.cninfo.com.cn/finalpage/2019-03-29/1205958883.PDF
pdf_url = "http://static.cninfo.com.cn/" + report['adjunctUrl']
file_name = report['announcementTitle']
print("正在下载:" + pdf_url, "存放在当前目录:/" + company + "/" + file_name)
download_PDF(pdf_url, file_name)
time.sleep(random.random()*3)
if __name__ == '__main__':
company_list = ["000059", "000637", "000698", "000723", "000819", "002778", "600688", "300839"]
for company in company_list:
os.mkdir(company)
orgId, plate,code = get_adress(company)
get_PDF(orgId, plate, code)
print("下载成功")
PART2
df_company = pd.DataFrame({'index': ["000059", "000637", "000698", "000723", "000819", "002778", "600688", "300839"],
company = df_company['name'].tolist()
def rename(df):
for i in df["简称"]:
i = i.replace("*","")
i = i.replace(" ","")
if i !="-":
sn=i
return sn
t=0
for com in company:
t+=1
com = com.replace('*','')
df = pd.read_csv(com+'.csv',converters={'证券代码':str})
d1 = {}
na = rename(df)
y = 2021
for index, row in df.iterrows():
names = na + str(y)+"年年度报告"
d1[names] = row[3]
y = y-1
df = df.sort_index(ascending=False)
final = pd.DataFrame(index=range(2012,2022),columns=['营业收入(元)','基本每股收益(元/股)'])
final.index.name='年份'
code = str(df.iloc[0,1])
name = df.iloc[-1,2].replace(' ','')
for i in range(len(df)):
title=list(d1.keys())[i]
doc = fitz.open('./%s/%s.pdf'%(com,title))
text=''
for j in range(15):
page = doc[j]
text += page.get_text()
p_year=re.compile('.*?(\d{4}) .*?年度报告.*?')
year = int(p_year.findall(text)[0])
p_rev = re.compile('(?<=\n)营业总?收入(?\w?)?\s?\n?([\d+,.]*)\s\n?')
p_eps = re.compile('(?<=\n)基本每股收益(元/?/?\n?股)\s?\n?([-\d+,.]*)\s?\n?')
p_site = re.compile('(?<=\n)\w*办公地址:?\s?\n?(.*?)\s?(?=\n)',re.DOTALL)
p_web =re.compile('(?<=\n)公司\w*网址:?\s?\n?([a-zA-Z./:]*)\s?(?=\n)',re.DOTALL)
revenue=float(p_rev.search(text).group(1).replace(',',''))
eps=p_eps.search(text).group(1)
final.loc[year,'营业收入(元)']=revenue
final.loc[year,'基本每股收益(元/股)']=eps
final.to_csv('%s数据.csv' %com,encoding='utf-8-sig')
site=p_site.search(text).group(1)
web=p_web.search(text).group(1)
with open('%s数据.csv'%com,'a',encoding='utf-8-sig') as f:
content='股票简称,%s\n股票代码,%s\n办公地址,%s\n公司网址,%s'%(name,code,site,web)
f.write(content)
print(name+'数据已保存完毕'+'(',t,'/',len(company),')')
PART3
import pandas as pd
import fitz
import os
import csv
os.chdir(r"E:\金融数据获取与处理\报告)
def ana(name):
doc = fitz.open('2022年年度报告.PDF')
text = ''
i = 0
for page in doc:
if i <= 20:
text += page.get_text()
i += 1
#
s = text.find('一、公司信息')
e = text.find('二、联系人')
subtext1 = text[s:e].replace(' ','')
#
s1 = subtext1.find('办公地址')
e1 = subtext1.find('办公地址的')
s2 = subtext1.find('公司网址')
e2 = subtext1.find('电子信箱')
place = subtext1[s1:e1].replace('办公地址' or ' ', '')
website = subtext1[s2:e2].replace('公司网址' or ' ', '')
#
s = text.find('二、联系人')
e = text.find('三、信息披露')
subtext2 = text[s:e].replace(' ','')
s3 = subtext2.find('姓名')
e3 = subtext2.find('联系地址')
s4 = subtext2.find('电话')
e4 = subtext2.find('传真')
s5 = subtext2.find('电子信箱')
board = subtext2[s3:e3]
board = board.split('\n')
board = board[1]
tel = subtext2[s4:e4]
tel = tel.split('\n')
tel = tel[1]
email = subtext2[s5:]
email = email.split('\n')
email = email[1]
#
list = [name, place, website, board, tel, email]
os.chdir(r"E:\金融数据获取与处理\报告")
with open('公司基本信息.csv', 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow(list)
with open('公司基本信息.csv', 'w', encoding='gbk', newline='') as f:
a = csv.writer(f)
a.writerow(['公司名称','公司办公地址', '公司网址', '董事会秘书姓名', '董事会秘书电话', '董事会秘书电子信箱'])
for name in name_sz:
os.chdir(name)
ana(name)
PART4
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import numpy.random as npr
plt.rcParams['font.sans-serif']=['SimHei'] #确保显示中文
plt.rcParams['axes.unicode_minus'] = False #确保显示负数的参数设置
##第一步处理数据
#导入数据
data=pd.read_csv(r'C:\Users\lenovomp10\Desktop\新建文件夹\company.csv',header=0,index_col=0)
DATA=data.iloc[:,0].tolist()
dflist=[]
for name in DATA:
df=pd.read_csv('【'+name+'】.csv')
dflist.append(df)
#年份为索引
comps = len(dflist)
for i in range(comps):
dflist[i]=dflist[i].set_index('年份')
#挑选出营业收入最高的八家,构造数据框,再绘图
#纵向对比
df1=pd.DataFrame(columns=['营业收入'])
for i in range (len(dflist)):
df1.loc[dflist[i].loc['股票简称','营业收入(元)'],'营业收入']=dflist[i].iloc[:11,0].astype(float).sum()
rank=df1.sort_values("营业收入",ascending=False)
top8=rank.head(8)#选出收入最高的八家
#接下来用构造出用于画图的数据
top=[top8.index]
topna=top8.index.tolist()
#将八家公司的营业收入,每股收益导入新创建的数据框中
indexes=[]
for idx in topna:
indexes.append(DATA.index(idx))
datalist=[]
datalist1=[]
for i in indexes: #在dflist里选出所需公司的营业收入数据
datalist.append(pd.DataFrame(dflist[i].iloc[:11,0]))
for df in datalist:
df.index=df.index.astype(int)
df['营业收入(元)']=df['营业收入(元)'].astype(float)/1000000000
for i in indexes: #在dflist里选出所需公司的每股收益数据
datalist1.append(pd.DataFrame(dflist[i].iloc[:11,1]))
for df in datalist1:
df.index=df.index.astype(int)
df['基本每股收益(元/股)']=df['基本每股收益(元/股)'].astype(float)
shouru=pd.concat(datalist,axis=1) #将所有公司的df合并成汇总表
eps=pd.concat(datalist1,axis=1)
shouru.columns=top8.index
eps.columns=top8.index
shouru
eps
#画图进行纵向比较
eps.plot(kind='bar',subplots=True,layout=(10,1),figsize=(15,20),xlabel='年份',ylabel='eps(元)')
shouru.plot(kind='bar',subplots=True,layout=(10,1),figsize=(15,20),xlabel='年份',ylabel='
plt.show()
第二部分:图示结果