pdf_process模块¶

In [ ]:
import re
import fitz
import numpy as np
from get_code import change_code
In [ ]:
def get_info(code,year,pdf_path):
    '''
    name: 上市公司代码
    year: 年报所属年份
    pdf_path: pdf文件地址

    返回公司名称、办公地址和公司网址

    '''
    # 输出正在解析的PDF
    print('正在解析{}的{}年报'.format(change_code(str(code)),year))

    content = ''
    doc = fitz.open(pdf_path)

    for page in doc:
        content += page.get_text()
            
    # 由于后续subp匹配过程中,有的数字后面没有换行符,无法进行非贪婪的匹配,所以通过文本内部符号替换
    content = content.replace(" "," \n")
    content = content.replace("\n\n","\n")

    # 获取公司名称
    p_name = re.compile('公司\w*名称\s?\n?(.*?)\s?(?=\n)',re.DOTALL)
    
    if p_name.search(content) == None:
        name = np.nan
    else:
        name = p_name.search(content).group(1)

    # 获取网址
    p_web = re.compile('(?<=\n)公司\w*网址\s?\n?(.*?)\s?\n?电子信箱')

    if p_web.search(content) is None:
        p_web = re.compile('公司\w*网址\s?\n?(.*?)\s?(?=\n)')
        
        if p_web.search(content) is None:     
            web = np.nan
        else:
            web = p_web.search(content).group(1)
    
    else:
        web = p_web.search(content).group(1)

    # 获取办公地址
    # \w 匹配字母或数字或下划线或汉字
    # (?<=\n) 匹配换行符后面的位置
    p_location = re.compile('办公地址\s?\n?(.*?)\s?\n?\D?公司办公地址的邮政编码',re.DOTALL)
    if p_location.search(content) == None:
        p_location = re.compile('办公地址\s?\n?(.*?)\s?\n?办公地址的邮政编码',re.DOTALL)
        
        if p_location.search(content) == None:
            location = np.nan
        else:
            location = p_location.search(content).group(1)
    
    else:
        location = p_location.search(content).group(1)

    return name,location,web


def get_info_df(pdf_df):
    '''
    
    pdf_df是列名为code,year,pdf_path的Series
    返回公司名称、办公地址和公司网址

    '''
    return get_info(pdf_df.code,pdf_df.year,pdf_df.pdf_path)


def get_mul_info(pdf_df):
    '''
    
    pdf_df是列名为code,year,pdf_path的DataFrame
    返回公司名称、办公地址和公司网址

    '''
    return pdf_df.apply(get_info_df,axis = 1)
In [ ]:
def get_target_pageno(doc,anchor='主要会计数据和财务指标'):
    pageno = -1
    for n in range(len(doc)):
        page = doc[n]
        txt = page.get_text()
        if anchor in txt:
            pageno=n
            break
    return(pageno)

def txt(pdf_path,anchor='主要会计数据和财务指标'):
    # 定位主要会计数据和财务指标所在页面
    doc = fitz.open(pdf_path)
    n = get_target_pageno(doc,anchor)
    page = doc[n]
    txt = page.get_text()
    page_ = doc[n+1]
    txt += page_.get_text()
    # 由于后续subp匹配过程中,有的数字后面没有换行符,无法进行非贪婪的匹配,所以通过文本内部符号替换
    txt = txt.replace(" "," \n")
    txt = txt.replace("\n\n","\n")
    return txt
In [ ]:
def get_info_secretary(code,year,pdf_path):
    '''
    name: 上市公司代码
    year: 年报所属年份
    pdf_path: pdf文件地址

    返回董秘姓名、董秘电话和董秘电子邮箱

    '''
    # 输出正在解析的PDF
    print('正在解析{}的{}年报'.format(change_code(str(code)),year))

    content = txt(pdf_path,'联系人和联系方式')

    # 获取董秘姓名
    p_name = re.compile('(?<=\n)董事会秘书\D*姓名\s?\n?(.*?)\s?(?=\n)',re.DOTALL)
    
    if p_name.search(content) == None:
        name = np.nan
    else:
        name = p_name.search(content).group(1)

    # 获取董秘电话
    p_tel = re.compile('(?<=\n)电话\s?\n?(.*?)\s?(?=\n)',re.DOTALL)

    if p_tel.search(content) == None:
        tel = np.nan
    else:
        tel = p_tel.search(content).group(1)

    # 获取董秘电子信箱    
    p_email = re.compile('(?<=\n)电子信箱\s?\n?(.*?)\s?(?=\n)',re.DOTALL)

    if p_email.search(content) == None:
        email = np.nan
    else:
        email = p_email.search(content).group(1)
    return name,tel,email


def get_info_secretary_df(pdf_df):
    '''
    
    pdf_df是列名为code,year,pdf_path的Series
    返回董秘姓名、董秘电话和董秘电子邮箱

    '''
    return get_info_secretary(pdf_df.code,pdf_df.year,pdf_df.pdf_path)


def get_mul_info_secretary(pdf_df):
    '''
    
    pdf_df是列名为code,year,pdf_path的DataFrame
    返回董秘姓名、董秘电话和董秘电子邮箱

    '''
    return pdf_df.apply(get_info_secretary_df,axis = 1)
In [ ]:
def parse_key_fin_data(subtext,keywords):
    # keywords = ['营业收入','营业成本','毛利','归属于上市','归属于上市','经营活动']
    ss = []
    s = 0
    for kw in keywords:
        n = subtext.find(kw,s)
        ss.append(n)
        s = n+len(kw)
    ss.append(len(subtext))
    data = []
    for n in range(len(ss)-1):
        s = ss[n]
        e = ss[n+1]
        line = subtext[s:e]
        data.append(line.split())
    return(data)
In [ ]:
def get_Net(text):
    '''
    提取归属于上市公司股东的净利润
    '''

    subp = "([0-9,.%\- ]*?)\n"
    # 数据个数不同,只提取第一个即当年数据
    psub = "%s" % (subp)

    p1 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)稀释每股',re.DOTALL)
    
    if p1.search(text) is None:
        p1 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)最新每股',re.DOTALL)
        
        if p1.search(text) is None:
            p1 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)净资产收益',re.DOTALL)

    net = p1.search(text).group(0)
    # 定义提取归属于上市公司股东的净利润所在行的内容
    # +	重复一次或更多次
    p2 =re.compile("(?<=\n)归属于\D*?利润(\D*?\d*?\D*?\n)+%s" % psub)
    lines_net = p2.search(net)
    try:
        lines_net[0]
    except:
        p2 =re.compile("(?<=\n)归\D*?润(\D*?\n)+%s" % psub)
        lines_net = p2.search(net)

    # 要提取的是 归属于上市公司股东的净利润
    # 排除 归属于上市公司股东的扣除非经常性损益的净利润
    if '扣除' in lines_net[0]:
        p2 =re.compile("(?<=\n)归属于\D*?(\D*?\n)+%s" % psub)
        lines_net = p2.search(net)

    return(lines_net[0])


def get_sale(text):
    '''
    提取营业收入
    '''

    subp = "([0-9,.%\- ]*?)\n"
    # 数据个数不同,只提取第一个即当年数据
    psub = "%s" % (subp)

    p1 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)稀释每股',re.DOTALL)
    
    if p1.search(text) is None:
        p1 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)最新每股',re.DOTALL)
        
        if p1.search(text) is None:
            p1 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)净资产收益',re.DOTALL)

    sale = p1.search(text).group(0)
    # 定义营业收入所在行的内容
    # +	重复一次或更多次
    p2 =re.compile("(?<=\n)\w*?营\D*?业\D*?收\n*?\w*?入(\D*?\n)+%s" % psub)
    lines_sale = p2.search(sale)

    return(lines_sale[0])

def get_profit(text):
    '''
    提取每股收益
    '''
    # ?	重复零次或一次
    # *	 重复零次或更多次
    # .  匹配除换行符以外的任意字符
    # \d 数字
    # \d{1,3}1-3个数字,尽可能地多
    # + 至少出现一次
    # \D 匹配任意非数字的字符
    # \s 匹配任意的空白符
    # \w 匹配字母或数字或下划线或汉字
    # *? 重复任意次,但尽可能少重复
    # (?=exp)	匹配exp前面的位置
    # (?<=exp)	匹配exp后面的位置

    subp = "([0-9,.%\- ]*?)\n"
    # 数据个数不同,只提取第一个即当年数据
    psub = "%s" % (subp)

    p1 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)稀释每股',re.DOTALL)
    
    if p1.search(text) is None:
        # p1匹配失败
        p1_1 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)最新每股',re.DOTALL)
        
        if p1_1.search(text) is None:
            # p1_1匹配失败
            p1_2 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)净资产收益',re.DOTALL)
            profit = p1_2.search(text).group(0)  

        else:
            # p1_1成功匹配
            profit = p1_1.search(text).group(0)

        # 定义每股收益所在行的内容
        # +	重复一次或更多次
        p2_1 =re.compile("(?<=\n)每股\D*?收\D*?益(\D*?\d*?\D*?\n)+%s" % psub)
        lines_profit = p2_1.search(profit)
        
    else:
        # p1匹配成功
        profit = p1.search(text).group(0)
        # 定义每股收益所在行的内容
        # +	重复一次或更多次
        p2 =re.compile("(?<=\n)基本每股\D*?收\D*?益(\D*?\d*?\D*?\n)+%s" % psub)
        lines_profit = p2.search(profit)

    return(lines_profit[0])


def get_account_data(account,txt):
    p_txt = '%s\D*?(\d{1,3}(?:,\d{3})*(?:\.\d+)?)'%account
    p = re.compile(p_txt)
    matchobj = p.search(txt)
    amt = matchobj.group(1)
    return amt

def notfloat(s):
    # 检查字符串是否为浮点数,不为浮点数,返回True
    try:
        float(s)
        return False
    except ValueError:
        return True
   
 
def get_pro_sal(code,year,pdf_path):

    '''
    name: 上市公司代码
    year: 年报所属年份
    pdf_path: pdf文件地址

    返回营业收入、归属于上市公司股东的净利润和每股收益(基本每股收益)

    '''
    # 输出正在解析的PDF
    print('正在解析{}的{}年报'.format(change_code(str(code)),year))

    content = ''
    doc = fitz.open(pdf_path)

    for page in doc:
        content += page.get_text()
            
    # 由于后续subp匹配过程中,有的数字后面没有换行符,无法进行非贪婪的匹配,所以通过文本内部符号替换
    content = content.replace(" "," \n")
    content = content.replace("\n\n","\n")

    sale_0 = get_sale(content)
    sale_0 = sale_0.replace(',','') # 把收入里的逗号去掉
    sale_0 = sale_0.split("\n")
    while '' in sale_0:
        sale_0.remove('') 
        
    # '营业收入'中间可能有换行符,要确认获得的数据是数字而不是文字
    j=0
    while notfloat(sale_0[j]):
        # 不是字符串,加1
        j+=1
    try:
        sale_ = float(sale_0[j].strip())
    except:
        sale_ = np.nan


    net_0 = get_Net(content)
    net_0 = net_0.replace(',','') # 把收入里的逗号去掉
    net_0 = net_0.replace(' ','')
    net_0 = net_0.split("\n")
    while '' in net_0:
        net_0.remove('') 

    # '归属于上市公司股东的净利润'中间可能有换行符,要确认获得的数据是数字而不是文字
    k=0
    while notfloat(net_0[k]):
        k+=1

    try:
        net_ = float(net_0[k].strip())
    except:
        net_ = np.nan

    # 同理,'每股收益'中间可能有换行符,要确认获得的数据是数字而不是文字
    profit_0 = get_profit(content)
    profit_0 = profit_0.replace(' ','')
    profit_0 = profit_0.split("\n")
    while '' in profit_0:
        profit_0.remove('') 

    l=0
    while notfloat(profit_0[l]):
        # 不是字符串,加1
        l+=1
    try:
        profit_ = float(profit_0[l].strip())
    except:
        profit_ = np.nan

    return sale_,net_,profit_


def get_pro_sal_s(pdf_df):
    '''
    
    pdf_df是列名为code,year,pdf_path的Series
    返回营业收入、归属于上市公司股东的净利润和每股收益(基本每股收益)

    '''
    return get_pro_sal(pdf_df.code,pdf_df.year,pdf_df.pdf_path)


def get_mul_pro_sal(pdf_df):
    '''
    
    pdf_df是列名为code,year,pdf_path的DataFrame
    返回营业收入、归属于上市公司股东的净利润和每股收益(基本每股收益)

    '''
    return pdf_df.apply(get_pro_sal_s,axis = 1)