徐倩茹的实验报告

代码

1.main.py


# -*- coding: utf-8 -*-
import web_crawling as wc
import find_assignment as fa
import download as dl
import tool
import read_data as rd
import draw_pics as dp

import pandas as pd
import numpy as np

if __name__ == "__main__":
    df_code = fa.find_assignment(tool.get_No(),10)
    code_list = df_code['code_list']
    name_list = df_code['name_list']
    for code in code_list:
        wc.get_table(code)
        print("股票" + code + " completed...")
    
    for code in code_list:
        dl.get_pdf_link(code)
        print("股票" + code + "的年报下载 completed...")

    tool.clean_pdf()
    
    data = rd.read_all_data(df_code)
    seperated_data = tool.seperate_df(df_code,data)
    
    np.save('seperated_data.npy',seperated_data)
    seperated_data = np.load('seperated_data.npy',allow_pickle = 'True').item()
    
    for key in seperated_data.keys():
        dp.draw_pics_twinx(seperated_data.get(key))

2.find_assignment.py

 
import fitz
import re
import pandas as pd

#判断行业中上市公司个数是否满足大于等于10的要求
def does_match(text):
    if len(re.findall('\n\d{6}\n',text)) < 10:
        return False
    else:
        return True


def find_assignment(No,num): 
    #No为int类型,表示在本次作业中该学生的序号
    #num代表该行业前num家上市公司
        
        #读取所有的内容
    text = ''
    with fitz.open('../1638277734844_11692.pdf') as doc:
        for page in doc:
            text = text + page.get_text('text')
        
    #找到所有行业的两位代码
    locs = re.finditer('\n\d{2}\n',text)
    
    #注意locs里面可能有重复的行业标号,需要筛选一下
    loc_dict = {}
    for loc in locs:
        #loc.span()的结果为左开右闭的区间,去除两端的换行符
        s = loc.span()[0] + 1
        e = loc.span()[1] - 1
        if text[s:e] in loc_dict:
            continue
        else:
            loc_dict[text[s:e]] = [s,e]
    
    #获取有效行业(上市公司>=10),将他们放入列表effective_industries中,

    last_key = '' #保存上一次循环中的key
    last_value = 0 #保存上一次循环中的value[1],即行业代码的索引的后一位置(为换行符)
    effective_industries = []
    for key,value in loc_dict.items():
        
        if key != '01':
            subtext = text[last_value:value[0]]
            if does_match(subtext) == False:
                last_key = key
                last_value = value[1]
                continue
            else:
                effective_industries.append(last_key)   
        #第一个的key必定是01,所以第一次循环直接跳到这一步
        last_key = key
        last_value = value[1]
        
    #注意,最后还剩下一部分没循环到
    subtext = text[last_value:len(text)]
    if does_match(subtext) == True:
        effective_industries.append(key)
    
    #判断No匹配哪个行业
    #可能会出现No超过了有效行业数,故采用模运算
    No = No % len(effective_industries)
    
    my_industry = effective_industries[No-1] 
    
    print('第'+ str(No) + '号的作业是行业 ' + my_industry)
    
    '''
    附加代码,找到自己的行业包含的所有公司代码,以列表返回
    '''

    s = loc_dict.get(my_industry)[1]
    
    #获得下一个键值对,如'01'之后为'02'
    if int(my_industry) < 9:
        nxt = '0' + str(int(my_industry) + 1)
    elif int(my_industry) == 9:
        nxt = '10'
    else:
        nxt = str(int(my_industry) + 1)
    e = loc_dict.get(nxt)[0]
    
    subtext = text[s:e]
    
    code_list = re.findall('\n\d{6}\n',subtext)
    code_list = [code.strip() for code in code_list]
    
    '''
    附加代码,找到自己的行业包含的所有公司名称,以列表返回
    '''
    name_list = []
    
    for code in code_list:
        code_first_loc = subtext.find(code)
        start = code_first_loc + 7
        end = subtext.find('\n', start)
        
        name = subtext[start : end]
        name_list.append(name)
    
    '''
    把公司代码code_list和公司名称name_list合并成dataframe
    '''
    df = {'code_list':code_list[0:num], 'name_list':name_list[0:num]}
    df = pd.DataFrame(df)
    
return df

3.web-crawling.py

 
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
import time

import tool

def get_table_sse(code):#code here refers to stock code
    
    browser = webdriver.Chrome()
    url = 'http://www.sse.com.cn/disclosure/listedinfo/regular/'
    browser.get(url)
    time.sleep(3)
    browser.set_window_size(1550, 830)
    browser.find_element(By.ID, "inputCode").click()
    browser.find_element(By.ID, "inputCode").send_keys(code)
    time.sleep(3)
    browser.find_element(By.CSS_SELECTOR, ".sse_outerItem:nth-child(4) .filter-option-inner-inner").click()
    browser.find_element(By.LINK_TEXT, "年报").click()
    # dropdown = browser.find_element(By.CSS_SELECTOR, ".dropup > .selectpicker")
    # dropdown.find_element(By.XPATH, "//option[. = '年报']").click()
    time.sleep(3)#sleep的原因是需要等待浏览器渲染
    
    css_selector = "body > div.container.sse_content > div > div.col-lg-9.col-xxl-10 > div > div.sse_colContent.js_regular > div.table-responsive > table"
    element = browser.find_element(By.CSS_SELECTOR, css_selector)
    table_html = element.get_attribute('innerHTML')
    fname = f'../nianbao/{code}.html'
    with open(fname,'w',encoding='utf-8') as f:
        f.write(table_html)


def get_table_szse(code):
    browser = webdriver.Chrome()
    url = 'http://www.szse.cn/disclosure/listed/fixed/index.html'
    browser.get(url)
    time.sleep(3)
    
    browser.set_window_size(1550, 840)
    browser.find_element(By.ID, "input_code").click()
    browser.find_element(By.ID, "input_code").send_keys(code)
    time.sleep(3)
    
    browser.find_element(By.CSS_SELECTOR, ".active:nth-child(1) > a").click()
    browser.find_element(By.CSS_SELECTOR, "#select_gonggao .c-selectex-btn-text").click()
    time.sleep(3)
    browser.find_element(By.LINK_TEXT, "年度报告").click()
    time.sleep(3)
    
    css_selector = "#disclosure-table > div > div.table-con-outer > div > table"
    element = browser.find_element(By.CSS_SELECTOR, css_selector)
    table_html = element.get_attribute('innerHTML')
    fname = f'../nianbao/{code}.html'
    with open(fname,'w',encoding='utf-8') as f:
        f.write(table_html)


def get_table(code):
    market = tool.which_market(code)
    if market == 'sse':
        get_table_sse(code)
    elif market == 'szse':
        get_table_szse(code)

4.download.py

 
import re
import tool

import pytest
import time
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities

import requests

def download_pdf_from_szse(url):
    options = webdriver.ChromeOptions()
    download_path = "D:\\CS\\py_fi\\scores_3\\nianbao\\src\\pdf"
    profile = {"plugins.plugins_list": [{"enabled": False, "name": "Chrome PDF Viewer"}],
               "download.default_directory": download_path}
    options.add_experimental_option("prefs", profile)
    browser = webdriver.Chrome(chrome_options=options)
    browser.get(url)
    time.sleep(3)
    browser.set_window_size(1552, 840)
    browser.find_element(By.ID, "annouceDownloadBtn").click()
    
    #简便起见,直接使用sleep函数等待下载.但最好还是想办法循环判断是否下载完成
    time.sleep(15)

    print(url + '\n下载完成')

def get_pdf_link_from_szse(code):
    fname = f'../nianbao/{code}.html'
    html = ''
    
    with open(fname,'r',encoding = 'utf-8') as f:
        html = f.read()
    pattern = re.compile('',re.DOTALL)
    links = pattern.findall(html)
    
    #得到的url包含英文版年报或者摘要,需要将这两种年报链接去除
    full_links = []
    for lk in links:
        if '摘要' in lk[1] or '英文' in lk[1]:
            continue
        else:
            full_links.append(lk[0])
            
    for i in range(0,len(full_links)):
        full_links[i] = 'https://www.szse.cn' + full_links[i]
        
    '''
    以上是获取所有的下载链接
    下面是下载,需要调用download_pdf模块
    '''
    for i in range(0,min(len(full_links),10)):
        try: download_pdf_from_szse(full_links[i])
        except: print('股票'+code+'缺失,请到'+
                      full_links[i]+'重新下载')        

def download_pdf_from_sse(url):
    fname = '../pdf/' + url.split('/')[-1]
    with open(fname,'wb') as pdf:
        pdf.write(requests.get(url).content)
        
    print(url + '\n下载完成')    
    
def get_pdf_link_from_sse(code):
    fname = f'../nianbao/{code}.html'
    html = ''
    
    with open(fname,'r',encoding = 'utf-8') as f:
        html = f.read()
        
    pattern = re.compile('class="table_titlewrap" href="(.*?)" target="_blank">(.*?)

5.read_data.py

 
import re
import pandas as pd
import os
import tool
import pdfplumber

def is_fin_number(string):
    if string == '':
        return False
    try: 
        string = string.strip()
        string = string.replace(',','')
    
    except: return False
    

    for s in string:
        if s.isdigit() == True or s == '-' or s == '.' or s == ' ' or s == '\n':
            continue
        else:
            return False
    return True

def get_data(row,name_mode):
    rc = re.compile(name_mode,re.DOTALL)

    bound = 0
    for i in range(0,len(row)):
        
        rs = None
        try: 
            rs = rc.search(row[i]) #row[i]可能是None
        except: 
            continue
        
        if rs is None:
            continue
        else:
            bound = i
            break
    
    if rs is None: #意味着没有找到
        return -1
    
    for i in range(bound,len(row)):
        if is_fin_number(row[i]) == True:
            return row[i]
        
    return 'other row' # 说明虽然匹配到了文字,但数据不在当行
    
#该页是否是主要会计数据和财务指标
def is_this_page(text):
    mode = '\n.*?主+要+会+计+数+据+和+财+务+指+标+.*?\n'
    if re.search(mode,text) is None:
        return False
    else: 
        return True


def get_twin_data(fname):
    
    earnings = -1
    
    try: #未知原因, 打开文件时会出现assert error
        with pdfplumber.open('../pdf/' + fname) as pdf:
            
            s = 0
            for i in range(0,len(pdf.pages)):
                text = pdf.pages[i].extract_text()
                if is_this_page(text) == True:
                    s = i
                    break
                else: 
                    continue
            
            page_index = 0
            bound = 0
            for i in range(s,s+2): #deterministic
    
                table = pdf.pages[i].extract_table()
                
                try: len(table)
                except: continue
            
                for j in range(0,len(table)):
                    e = get_data(table[j],'.*?营业收入.*?')

                    #此时文字和数据错行,需要继续往上搜索
                    if e == 'other row':
                        for k in range(j-1, 0,-1):
                            for h in range(0,len(table[k])):
                                if is_fin_number(table[k][h]) == True:
                                    e = table[k][h]
                                    break
                                else: 
                                    continue
                            else:
                                if is_fin_number(e) == True:
                                    break
                                
                    if e != -1:
                        earnings = e
                        bound = j
                        break
                    else:
                        continue
                
                if earnings == -1:
                    continue
                
                page_index = i
                break
            
            #循环结束仍然没有获得营业收入
            if earnings == 0:
                return None
            
            net_income = -1
            for i in range(page_index,page_index + 2):
                table = pdf.pages[i].extract_table()
                
                try: len(table)
                except: continue     
                
                ni_mode = '.*?归属于.*?(所有者|股东)?的?.?净?.?利?.?润?.*?'
                if i == page_index: #说明此时还没有换页
                    for j in range(bound + 1,len(table)):
                        ni = get_data(table[j], ni_mode)
                        
                        #此时文字和数据错行,需要继续往下搜索
                        if ni == 'other row':
                            for k in range(j, len(table)):
                                for h in range(0,len(table[k])):
                                    if is_fin_number(table[k][h]) == True:
                                        net_income = table[k][h]
                                        return [earnings,net_income]
                                    else: 
                                        continue
                                    
                        if ni == 'other row':
                            return 'data is at the next page'
                        elif ni != -1:
                            net_income = ni
                            break
                        else:
                            continue
                else: #此时换页
                    for j in range(0,len(table)):
                        ni = get_data(table[j], ni_mode)
                        if ni != -1:
                            net_income = ni
                            break
                        else:
                            continue
                    
                if net_income == -1: continue
                else: return [earnings,net_income]
                
    except: print(fname+'出现AssertionError')
'''
import read_data           
read_data.get_twin_data('../pdf/晨鸣纸业:2012年年度报告.PDF')
'''
#该函数需要在pdf目录下查找对应的文件名
def read_all_data(df):
    #df为包含两列(code_list和name_list)的dataframe

    filename_list = []
    year_list = []
    data_list = []
    for index,row in df.iterrows():
        for filepath,dirnames,filenames in os.walk('../pdf'):
            for filename in filenames:
                #print(filename)
                if (row['name_list'] in filename) or (row['code_list'] in filename):
                    print(filename)
                    data = get_twin_data(filename)
                    
                    if data is not None:
                        filename_list.append(filename)
                        year_list.append(tool.get_year(filename,row['code_list']))
                        data_list.append(get_twin_data(filename))
                        print(filename + ' completed')
    rt_list,ni_list = zip(*data_list)
    df_data = {'filename':filename_list,'year':year_list,
               '营业收入':rt_list,'净利润':ni_list}
    df_data = pd.DataFrame(df_data)    
    return df_data

6.tool.py

 
import os
import re
import pandas as pd

def get_No():
    print('程序开始前的注意事项:\n')
    print("1.请先下载好相关模块,并在网络通畅的情况下运行本程序;\n")
    print("2.本程序的爬虫部分使用的是chrome浏览器,请下载对应的webdriver;\n")
    print('3.下载时间较长,请耐心等待;\n')
    print('4.由于爬取深交所的代码的下载路径是绝对路径,请先创建如下目录路径:')
    print("D:\\CS\\py_fi\\scores_3\\nianbao\\src\\pdf\n")
    print('\n\n\n--------程序开始--------\n\n\n')
    return int(input("请输入你的序号:"))

def to_wan(num):
    return num/10000
def to_yi(num):
    return num/100000000

def is_year(string):
    if len(string) == 4:
        return True
    else:
        return False

def to_num(string):
    if type(string) == type('str'):
        string = string.replace(',','')
        string = string.replace('\n','')
        return float(format(to_yi(float(string)), '.3f'))
    else:
        return string

def to_year_list(str_list):
    for i in range(0,len(str_list)):
        str_list[i] = str(str_list[i])


def to_num_list(str_list):
    for i in range(0,len(str_list)):
        str_list[i] = to_num(str_list[i])

def which_market(code):
    if code[0:2] == '60' or code[0:3] == '688' or code[0:3] == '900':
        return 'sse'
    elif code[0:2] == '00' or code[0:3] == '200' or code[0:2] == '30':
        return 'szse'

def clean_pdf():
    for filepath,dirnames,filenames in os.walk('../pdf'):
        for filename in filenames:
            if '取消' in filename:
                os.remove('../pdf/'+ filename)
                print(filename + '+ deleted')

def get_year_sse(fname):
    year = re.search('\d{6}_(\d{4}).*?\.pdf',fname,re.IGNORECASE)
    return year.group(1)

def get_year_szse(fname):
    year = re.search('.*?(\d{4}).*?\.pdf',fname,re.IGNORECASE)
    return year.group(1)

def get_year(fname,code):
    m = which_market(code)
    if m == 'sse':
        return get_year_sse(fname)
    elif m == 'szse':
        return get_year_szse(fname)

def be_contigious(this_data):
    #对于某个公司的绘图数据,我们只取从最近时间到最远时间的连续数据
    length = len(this_data)
    last = int(this_data['year'][length - 1])
    for i in range(length-2, -1, -1):
         nxt = int(this_data['year'][i])
         if last - nxt != 1:#说明不连续
             return this_data.loc[i+1 : length]
         else:
             continue
    return this_data
             

def seperate_df(df_code,data):
    seperated_data = {}
    
    for j,row1 in df_code.iterrows():
        name = row1['name_list']
        code = row1['code_list']
        this_data = pd.DataFrame(columns = ['year','营业收入','净利润'])
        for i,row2 in data.iterrows():
            fn = row2['filename']
            if name in fn or code in fn:
                data_dict = {'name': name,
                             'year': row2['year'],
                             '营业收入':row2['营业收入'],
                             '净利润':row2['净利润']}
                this_data = this_data.append(data_dict,ignore_index=True)
                be_contigious(this_data)
        seperated_data[code] = this_data
return seperated_data

7.draw_pics.py

 
import matplotlib.pyplot as plt
import numpy as np
import tool

def draw_pics_twinx(df):
    
    plt.rcParams['figure.dpi'] = 200
    plt.rcParams['axes.unicode_minus']=False #用来正常显示负号
    plt.rcParams['font.sans-serif'] = ['SimHei'] # 使图片显示中文
    
    
    x = df['year']
    tool.to_year_list(x)
    y_rt = df['营业收入']
    tool.to_num_list(y_rt)
    y_ni = df['净利润']
    tool.to_num_list(y_ni)
    
    fig = plt.figure()
    
    
    ax1 = fig.subplots()
    ax1.plot(x, y_rt,'steelblue',label="营业收入",linestyle='-',linewidth=2,
             marker='o',markeredgecolor='pink',markersize='2',markeredgewidth=2)
    
    ax1.set_xlabel('年份')
    ax1.set_ylabel('营业收入(单位:亿元)')
    for i in range(len(x)):
        plt.text(x[i],y_rt[i],(y_rt[i]),fontsize = '10')
    ax1.legend(loc = 6) 

    ax2 = ax1.twinx()
    ax2.plot(x, y_ni, 'orange',label = "归属于股东的净利润",linestyle='-',linewidth=2,
             marker='o',markeredgecolor='teal',markersize='2',markeredgewidth=2)
    
    ax2.set_ylabel('归属于股东的净利润(单位:亿元)')
    for i in range(len(x)):
        plt.text(x[i],y_ni[i],(y_ni[i]),fontsize = '10')
    ax2.legend(loc = 2)
    
    '''
    title部分必须放最后,否则会出现左边的y轴有重复刻度的问题
    '''
    title = df['name'][0] + '历年' + '财务数据'
    plt.title(title)
    
    plt.savefig('../pics/' + title + '.png')
    plt.show()

结果(部分)

1.行业及财务数据

结果截图1 结果截图2 结果截图3

2.html和pdf年报

结果截图 结果截图4 结果截图5 结果截图6 结果截图7 结果截图8

3.财务数据走势图

结果截图1 结果截图2 结果截图3 结果截图4 结果截图5 结果截图6 结果截图7 结果截图8 结果截图9 结果截图10

解释

整体思路(main函数)

1.输入自己的序号,从行业分类表中找到自己对应的行业,并返回包括该行业下的股票代码和公司名称的dataframe.

2.根据股票代码,利用selenium,requests库等爬虫技术,获取对应的html文档信息,其中包括了该股票历年年报的下载地址.

3.根据下载地址,再次利用爬虫技术获取对应的pdf年报,并清理掉一些无用的pdf年报(如:已取消的年报).

4.从年报中提取主要会计数据和财务指标,以dataframe的格式返回,并存储在磁盘(因为提取耗时较长).

5.从磁盘中读取财务数据,并绘图,保存.

行业分析

从走势图中,我们能明显看到,我国家具制造企业的数量呈稳步增长发展态势,但整体来看增速有所放缓。受疫情的影响,家具上游木材供应及下游需求都在萎缩,导致行业企业产量下降,但随着疫情发展态势得到有效控制,国内家具产量逐渐恢复;此外,国外疫情使得一部分国外家具订单转移到中国,这也推动了2022年中国总体家具产量持续上升

实验心得

这门课对于我来说是有一定难度的,好在在老师和同学的帮助下,我解决了一个又一个问题,能够运用这个软件去做一些对于金融学习有关的事情,所以我觉得这门课的学习能帮助我在学习的道路上走得更远。