陈琳的实验报告

代码——定义所需函数

1.获取年报下载链接


import pytest
import time
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
import time
import re
import pandas as pd



#获取年报链接,保存为html文件
def get_table_sse(code):
    browser = webdriver.Edge()
    browser.get("http://www.sse.com.cn/disclosure/listedinfo/regular/")
    time.sleep(3)
    browser.set_window_size(1456, 928)
    browser.find_element(By.ID, "inputCode").click()
    browser.find_element(By.ID, "inputCode").send_keys(code)
    browser.find_element(By.CSS_SELECTOR, ".sse_outerItem:nth-child(4) .filter-option-inner-inner").click()
    browser.find_element(By.LINK_TEXT, "年报").click()
    dropdown = browser.find_element(By.CSS_SELECTOR, ".dropup > .selectpicker")
    dropdown.find_element(By.XPATH, "//option[. = '年报']").click()
    time.sleep(3)
    
    selector = "body > div.container.sse_content > div > div.col-lg-9.col-xxl-10 > div > div.sse_colContent.js_regular > div.table-responsive > table"
    element=browser.find_element(By.CSS_SELECTOR,selector)
    table_html=element.get_attribute('innerHTML')
    
    fname=f'{code}.html'
    f=open(fname,'w',encoding='utf-8')
    f.write(table_html)
    f.close()

    browser.quit()




#提取年报的链接地址并保存为DataFrame
def get_data(tr):
    p_td=re.compile('(.*?)',re.DOTALL)  # 每一行的内容
    tds=p_td.findall(tr)
    #股票代码
    s=tds[0].find('>')+1  #起始索引
    e=tds[0].rfind('<')  #结束索引
    code=tds[0][s:e]
    #股票名称
    s=tds[1].find('>')+1
    e=tds[1].rfind('<')
    name=tds[1][s:e]
    #链接
    s=tds[2].find('href="')+6
    e=tds[2].find('.pdf"')+4
    href='http://www.sse.com.cn'+tds[2][s:e]  #补上上交所的顶级域名
    #标题
    s=tds[2].find('$(this))">')+10
    e=tds[2].find('')
    title=tds[2][s:e]
    #日期
    date=tds[3].strip() #strip()如果两端有空格则把空格删除
    
    data=[code,name,href,title,date]
    return(data)

def parse_data(code):
    fname=f'{code}.html'
    f=open(fname,encoding='utf-8')
    table_html=f.read()
    f.close()

    p=re.compile('(.+?)',re.DOTALL)
    trs=p.findall(table_html)
    #
    trs_new=[]  #删除了空行
    for tr in trs:
        if tr.strip() != '':
            trs_new.append(tr)
    #
    data_all=[get_data(tr) for tr in trs_new[1:]] #第一行是标题行,不需要
    df=pd.DataFrame({
        'code':[d[0] for d in data_all],
        'name':[d[1] for d in data_all],
        'href':[d[2] for d in data_all],
        'title':[d[3] for d in data_all],
        'date':[d[4] for d in data_all]
        })
    return(df)

2.过滤得到的链接


import re


#去除摘要等其他非年报内容
def filter_words(words,df,include=True):
    ls=[]
    for word in words:
        if include:
            ls.append([word in f for f in df['title']])
        else:
            ls.append([word not in f for f in df['title']])
    index=[]
    for r in range(len(df)):
        flag=not include
        for c in range(len(words)):
            if include:
                flag=flag or ls[c][r]
            else:
                flag=flag and ls[c][r]
        index.append(flag)
    df2=df[index]
    return df2



#只保留10年的年报
def filter_date(start,end,df):
    date=df['date']
    v=[d >= start and d<= end for d in date]
    df_new=df[v]
    return df_new

    
#10年的起止时间
import datetime
def start_end_10y():
    dt_now=datetime.datetime.now()
    current_year=dt_now.year
    start=f'{current_year-9}-01-01'
    end=f'{current_year}-12-31'
    return (start,end)

    

#整合以上代码
def filter_nb_10y(df,keep_words,exclude_words,start=''):
    if start == '':
        start,end=start_end_10y()
    else:
        start_y=int(start[0:4])
        end=f'{start_y+9}-12-31'
    #
    df=filter_words(keep_words,df,include=True)
    df=filter_words(exclude_words,df,include=False)
    df=filter_date(start,end,df)
    return(df)






#处理更正版的日期
def amend_date(df):
    target_index=[]
    for i, row in df.iterrows():
        if '更正' in row['title']:
            target_index.append(i)
            target=''.join(df.loc[target_index,'title'].values)
            matchobj=re.search('\d{4}年',target)
            amend_year=int(matchobj.group()[:4])
            df.loc[target_index,'date']=f'{amend_year+1}-12-31'
            df.sort_values(by="date" , inplace=True, ascending=False) 
    return df




#保留修订版和更正版,删除未修订和未更正版
def retain_rev_edt(df):
    df=df.reset_index(drop=True)  #重置行索引
    index_list = df[df['title'].str.contains('修订|更正')].index.tolist()
    if index_list != []:
        index_list = [i+1 for i in index_list]
        if index_list[-1] > len(df)-1:
            index_list.pop()
        df=df.drop(index_list,axis=0)
    return df

3.通过链接下载年报


import requests
import time


#提出链接、年份数据
def prepare_hrefs_years(df):
    hrefs=df['href'].to_list()
    years=[int(d[0:4])-1 for d in df['date']]
    return((hrefs,years))


# 下载1家公司1年年报
def download_pdf(href,code,year):
    r = requests.get(href, allow_redirects=True)
    fname=f'{code}_{year}.pdf'
    f = open(fname, 'wb')
    f.write(r.content)
    f.close()
    r.close()
    
    
# 下载1家公司多年年报
def download_pdfs(hrefs,code,years):
    for i in range(len(hrefs)):
        href=hrefs[i]
        year=years[i]
        download_pdf(href, code, year)
        print(f'Successfully downloaded: {code}_{year}')
        time.sleep(5)
    return()


# 下载多家公司多年年报
def download_pdfs_codes(list_hrefs,codes,list_years):
    for i in range(len(list_hrefs)):
        hrefs=list_hrefs[i]
        years=list_years[i]
        code=codes[i]
        download_pdfs(hrefs, code, years)
        return()

4.解析出会计数据


import fitz
import re


# 获取指定文本
def get_subtxt(doc,bounds=('主要会计数据和财务指标','总资产')):
    #默认设置为首尾页码
    start_pageno=0
    end_pageno=len(doc)-1
    #
    lb,ub=bounds
    #获取左界页码
    for n in range(len(doc)):
        page=doc[n]
        txt=page.get_text()
        if lb in txt:
            start_pageno=n
            break
    #获取右界页码
    for n in range(start_pageno,len(doc)):
        if ub in doc[n].get_text():
            end_pageno=n
            break
    #获取小范围内字符串
    txt=''
    for n in range(start_pageno,end_pageno+1):
        page=doc[n]
        txt += page.get_text()
    return(txt)


# 获取指定的会计数据值
def get_account_data(account,txt):
    p_txt='%s\s*(-*\d{1,3}(?:,\d{3})*(?:\.\d+)?)' % account   #%s是占位符,用‘account’替换,\D是非数字,\d{1,3}是数字1或2或3个,*可重复,?非贪婪,()内是所要的数字,小数点后\d+表示小数点后至少一位数字
    p=re.compile(p_txt)
    matchobj=p.search(txt)
    amt=matchobj.group(1)
    if '.' not in amt:
        p_txt='%s\s*(-*\d{1,3}(?:,\d{3})*(.*))\n' % account
        p=re.compile(p_txt)
        matchobj=p.search(txt)
        amt=matchobj.group(1)
        # 
        s=matchobj.end()
        p_txt1='(.+)\n'
        p1=re.compile(p_txt1)
        matchobj1=p1.search(txt[s:])
        amt += matchobj1.group(1)
    return(amt)




##获取整张会计数据表格

# 获取表头
def get_th_span(txt):
    nianfen='(20\d\d|199\d)\s*年末?'   #2016和年之间是空格,而2016年和2015年之间是换行
    s=f'{nianfen}\s*{nianfen}.*?{nianfen}'  
    p=re.compile(s,re.DOTALL)  #re.DOTALL指.遇到换行符也是可以的
    matchobj=p.search(txt)
    #
    end=matchobj.end()
    year1=matchobj.group(1)
    year2=matchobj.group(2)
    year3=matchobj.group(3)
    #
    flag=(int(year1)-int(year2) == 1) and (int(year2)-int(year3) == 1)
    #
    while (not flag):
        matchobj=p.search(txt[end:])
        end=matchobj.end()
        year1=matchobj.group(1)
        year2=matchobj.group(2)
        year3=matchobj.group(3)
        flag=(int(year1)-int(year2) == 1)
        flag=flag and (int(year2)-int(year3) ==1)
    return(matchobj.span())


#获取表格边界
def get_bounds(txt):
    th_span_1st=get_th_span(txt)
    end=th_span_1st[1]
    th_span_2nd=get_th_span(txt[end:])
    th_span_2nd=(end+th_span_2nd[0],end+th_span_2nd[1])
    #
    s=th_span_1st[1]
    e=th_span_2nd[0]-1
    #
    while (txt[e] not in '0123456789'):  #如果最后一个不是数字
        e=e-1
    return(s,e+1)


#获取表格内的会计数据关键字
def get_keywords(txt):
    p=re.compile(r'\d+\s*?\n\s*?([\u2E80-\u9FFF]+)')
    keywords=p.findall(txt)
    keywords.insert(0,'营业收入')
    return(keywords)


# 获取整张表格内容
def parse_key_fin_data(subtxt,keywords):
    ss=[]
    s=0
    for kw in keywords:
        n=subtxt.find(kw,s)  #参数s:从第s个位置开始找
        ss.append(n)  #所有keywords的起始位置
        s=n+len(kw)
    ss.append(len(subtxt))
    data=[]
    p=re.compile('[^0123456789-]+(?:\s+\D*)?(?:(.*)|\(.*\))?')
    p2=re.compile('\s')
    for n in range(len(ss)-1):
        s=ss[n]
        e=ss[n+1]
        line=subtxt[s:e]
        #获取可能换行的账户名称
        matchobj=p.search(line)
        account_name=p2.sub('',matchobj.group())
        #获取三年数据
        amnts=line[matchobj.end():].split()
        #加上账户名称
        amnts.insert(0,account_name)
        #追加到总数据
        data.append(amnts)
    return data

5.解析公司基本信息


# 提取公司基本情况信息
def get_com_ifm(txt,keywords=['公司办公地址','公司网址','电子信箱']):
    s=txt.find('基本情况简介')
    e=txt.find('信息披露及备置地点',s)
    subtxt=txt[s:e]
    data=[]
    for kw in keywords:
        p=re.compile('%s\s*\n\s*(.+)' % kw)
        matchobj=p.search(subtxt)
        if matchobj:
            ifm=matchobj.group(1)
            if ifm[-1] == ' ':
                ifm=ifm[:-1]
        else:
            ifm='无'
        data.append([kw,ifm])
    return data


# 提取董事会秘书基本信息
def get_cts_ifm(txt,keywords=['姓名','电话','电子信箱']):
    s=txt.find('联系人和联系方式')
    e=txt.find('基本情况简介',s)
    subtxt=txt[s:e]
    data=[]
    for kw in keywords:
        p=re.compile('%s\s*\n\s*(.+)' % kw)
        matchobj=p.search(subtxt)
        if matchobj:
            ifm=matchobj.group(1)
            if ifm[-1] == ' ':
                ifm=ifm[:-1]
        else:
            p=re.compile('%s\s*(.+)' % kw)
            matchobj=p.search(subtxt)
            if matchobj:
                ifm=matchobj.group(1)
                if ifm[-1] == ' ':
                    ifm=ifm[:-1]
            else:
                ifm='无'
        data.append([kw,ifm])
    return data

代码——具体实验操作


import fitz
import pandas as pd
from pylab import plt, mpl
plt.style.use('seaborn')
mpl.rcParams['font.family'] = 'serif'
plt.rcParams['font.family'] = ['sans-serif']
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus']=False

from sse import get_table_sse,parse_data
from filter_url import filter_nb_10y,amend_date,retain_rev_edt
from download import prepare_hrefs_years,download_pdfs
from parse_ar import get_subtxt,get_account_data
from parse_ifm import get_com_ifm,get_cts_ifm




# 下载年报
codes=[600075,600078,600096,600135,600141,600160,600165,600230,600249,600273]

for code in codes:
    #获取年报的下载链接
    get_table_sse(code)
    df=parse_data(code)
    df=amend_date(df)   #处理更正版年报的日期
    df=filter_nb_10y(df, keep_words=['年报','年度报告'], exclude_words=['摘要'])  #去除年报摘要
    df=retain_rev_edt(df)  #保留修订版和更正版,删除未修订和未更正版
    #下载年报
    hrefs=prepare_hrefs_years(df)[0]
    years=prepare_hrefs_years(df)[1]
    download_pdfs(hrefs=hrefs,code=code,years=years)



# 解析会计数据数据
years=[2013,2014,2015,2016,2017,2018,2019,2020,2021,2022]

revenues=pd.DataFrame(index=years,columns=codes)
profits_shlder=pd.DataFrame(index=years,columns=codes)

for code in codes:
    for year in years:
        filename=f'{code}_{year}.pdf'
        doc=fitz.open(filename)
        if filename != '600078_2021.pdf':
            txt=get_subtxt(doc,bounds=('主要会计数据和财务指标','总资产'))
        else:
            txt=get_subtxt(doc,bounds=('主要 会计数据和财务指标','总资产'))              
        if filename != '600075_2013.pdf':
            revenue=get_account_data('\s*'.join('营业收入'), txt)
        else:
            revenue=get_account_data('营业总收入', txt)
        revenues.loc[year,code]=revenue
        if filename != '600273_2019.pdf':
            profit_shlder=get_account_data('\s*'.join('归属于上市公司股东的净利润'), txt)
        else:
            profit_shlder=get_account_data('\s*'.join('归属于上市公司股东的'), txt)
        profits_shlder.loc[year,code]=profit_shlder


# 更改列名
col_name=['新疆天业','*ST澄星','云天化','乐凯胶片','兴发集团',
          '巨化股份','新日恒力','沧州大化','两面针','嘉化能源']

revenues.columns=col_name
profits_shlder.columns=col_name


# 将数据转化为浮点型
for col in col_name:
    revenues[col] = revenues[col].str.replace(',', '').astype(float)
    profits_shlder[col] = profits_shlder[col].str.replace(',', '').astype(float)
    


# 10家公司的营业收入折线图
(revenues/1e9).plot(figsize=(12,7))
plt.xlabel('年份', fontsize=13)
plt.ylabel('营业收入/十亿', fontsize=13)
plt.title('10家公司近10年营业收入折线图',fontsize=14)
# plt.gca().get_yaxis().get_major_formatter().set_scientific(False)
plt.show()


# 600075新疆天业近十年营业收入和归属于上市公司股东的净利润折线图
plt.figure(figsize=(12,7))
plt.plot(revenues['新疆天业']/1e9)
plt.plot(profits_shlder['新疆天业']/1e9)
plt.xlabel('年份', fontsize=13)
plt.ylabel('金额/十亿', fontsize=13)
plt.title('新疆天业近10年营业收入和归属于上市公司股东的净利润折线图',fontsize=14)
plt.legend(['营业收入','归属于上市公司股东的净利润'],fontsize=13)
plt.show()





# 提取公司和董事会秘书基本信息
bsc=pd.DataFrame()
for code in codes:
    filename=f'{code}_2022.pdf'
    doc=fitz.open(filename)
    txt=get_subtxt(doc,bounds=('联系人和联系方式','信息披露及备置地点'))
    #
    data1=get_com_ifm(txt)
    bsc.loc[code,'公司办公地址']=data1[0][1]
    bsc.loc[code,'公司网址']=data1[1][1]
    bsc.loc[code,'电子信箱']=data1[2][1]
    #
    data2=get_cts_ifm(txt)
    bsc.loc[code,'董事会秘书姓名']=data2[0][1]
    bsc.loc[code,'董事会秘书电话']=data2[1][1]
    bsc.loc[code,'董事会秘书电子信箱']=data2[2][1]

bsc = bsc.rename_axis("公司代码")
bsc.insert(0, '公司简称', col_name)

bsc.to_csv('公司及董事会秘书基本信息.csv')

结果

1.包含年报链接的HTML文件

结果截图

2.下载的全部年报

结果截图

3.公司办公地址、公司网址和电子信箱

结果截图

4.董事会秘书姓名、电话和电子信箱

结果截图

5.10家公司近10年营业收入

结果截图

6.10家公司近10年归属于上市公司股东的净利润

结果截图

7.10家公司营业收入折线图

结果截图

8.新疆天业营业收入和归属于上市公司股东的净利润折线图

结果截图

实验报告

《金融数据获取与处理》综合实验