import re
import fitz
import numpy as np
from get_code import change_code
def get_info(code,year,pdf_path):
'''
name: 上市公司代码
year: 年报所属年份
pdf_path: pdf文件地址
返回公司名称、办公地址和公司网址
'''
# 输出正在解析的PDF
print('正在解析{}的{}年报'.format(change_code(str(code)),year))
content = ''
doc = fitz.open(pdf_path)
for page in doc:
content += page.get_text()
# 由于后续subp匹配过程中,有的数字后面没有换行符,无法进行非贪婪的匹配,所以通过文本内部符号替换
content = content.replace(" "," \n")
content = content.replace("\n\n","\n")
# 获取公司名称
p_name = re.compile('公司\w*名称\s?\n?(.*?)\s?(?=\n)',re.DOTALL)
if p_name.search(content) == None:
name = np.nan
else:
name = p_name.search(content).group(1)
# 获取网址
p_web = re.compile('(?<=\n)公司\w*网址\s?\n?(.*?)\s?\n?电子信箱')
if p_web.search(content) is None:
p_web = re.compile('公司\w*网址\s?\n?(.*?)\s?(?=\n)')
if p_web.search(content) is None:
web = np.nan
else:
web = p_web.search(content).group(1)
else:
web = p_web.search(content).group(1)
# 获取办公地址
# \w 匹配字母或数字或下划线或汉字
# (?<=\n) 匹配换行符后面的位置
p_location = re.compile('办公地址\s?\n?(.*?)\s?\n?\D?公司办公地址的邮政编码',re.DOTALL)
if p_location.search(content) == None:
p_location = re.compile('办公地址\s?\n?(.*?)\s?\n?办公地址的邮政编码',re.DOTALL)
if p_location.search(content) == None:
location = np.nan
else:
location = p_location.search(content).group(1)
else:
location = p_location.search(content).group(1)
return name,location,web
def get_info_df(pdf_df):
'''
pdf_df是列名为code,year,pdf_path的Series
返回公司名称、办公地址和公司网址
'''
return get_info(pdf_df.code,pdf_df.year,pdf_df.pdf_path)
def get_mul_info(pdf_df):
'''
pdf_df是列名为code,year,pdf_path的DataFrame
返回公司名称、办公地址和公司网址
'''
return pdf_df.apply(get_info_df,axis = 1)
def get_target_pageno(doc,anchor='主要会计数据和财务指标'):
pageno = -1
for n in range(len(doc)):
page = doc[n]
txt = page.get_text()
if anchor in txt:
pageno=n
break
return(pageno)
def txt(pdf_path,anchor='主要会计数据和财务指标'):
# 定位主要会计数据和财务指标所在页面
doc = fitz.open(pdf_path)
n = get_target_pageno(doc,anchor)
page = doc[n]
txt = page.get_text()
page_ = doc[n+1]
txt += page_.get_text()
# 由于后续subp匹配过程中,有的数字后面没有换行符,无法进行非贪婪的匹配,所以通过文本内部符号替换
txt = txt.replace(" "," \n")
txt = txt.replace("\n\n","\n")
return txt
def get_info_secretary(code,year,pdf_path):
'''
name: 上市公司代码
year: 年报所属年份
pdf_path: pdf文件地址
返回董秘姓名、董秘电话和董秘电子邮箱
'''
# 输出正在解析的PDF
print('正在解析{}的{}年报'.format(change_code(str(code)),year))
content = txt(pdf_path,'联系人和联系方式')
# 获取董秘姓名
p_name = re.compile('(?<=\n)董事会秘书\D*姓名\s?\n?(.*?)\s?(?=\n)',re.DOTALL)
if p_name.search(content) == None:
name = np.nan
else:
name = p_name.search(content).group(1)
# 获取董秘电话
p_tel = re.compile('(?<=\n)电话\s?\n?(.*?)\s?(?=\n)',re.DOTALL)
if p_tel.search(content) == None:
tel = np.nan
else:
tel = p_tel.search(content).group(1)
# 获取董秘电子信箱
p_email = re.compile('(?<=\n)电子信箱\s?\n?(.*?)\s?(?=\n)',re.DOTALL)
if p_email.search(content) == None:
email = np.nan
else:
email = p_email.search(content).group(1)
return name,tel,email
def get_info_secretary_df(pdf_df):
'''
pdf_df是列名为code,year,pdf_path的Series
返回董秘姓名、董秘电话和董秘电子邮箱
'''
return get_info_secretary(pdf_df.code,pdf_df.year,pdf_df.pdf_path)
def get_mul_info_secretary(pdf_df):
'''
pdf_df是列名为code,year,pdf_path的DataFrame
返回董秘姓名、董秘电话和董秘电子邮箱
'''
return pdf_df.apply(get_info_secretary_df,axis = 1)
def parse_key_fin_data(subtext,keywords):
# keywords = ['营业收入','营业成本','毛利','归属于上市','归属于上市','经营活动']
ss = []
s = 0
for kw in keywords:
n = subtext.find(kw,s)
ss.append(n)
s = n+len(kw)
ss.append(len(subtext))
data = []
for n in range(len(ss)-1):
s = ss[n]
e = ss[n+1]
line = subtext[s:e]
data.append(line.split())
return(data)
def get_Net(text):
'''
提取归属于上市公司股东的净利润
'''
subp = "([0-9,.%\- ]*?)\n"
# 数据个数不同,只提取第一个即当年数据
psub = "%s" % (subp)
p1 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)稀释每股',re.DOTALL)
if p1.search(text) is None:
p1 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)最新每股',re.DOTALL)
if p1.search(text) is None:
p1 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)净资产收益',re.DOTALL)
net = p1.search(text).group(0)
# 定义提取归属于上市公司股东的净利润所在行的内容
# + 重复一次或更多次
p2 =re.compile("(?<=\n)归属于\D*?利润(\D*?\d*?\D*?\n)+%s" % psub)
lines_net = p2.search(net)
try:
lines_net[0]
except:
p2 =re.compile("(?<=\n)归\D*?润(\D*?\n)+%s" % psub)
lines_net = p2.search(net)
# 要提取的是 归属于上市公司股东的净利润
# 排除 归属于上市公司股东的扣除非经常性损益的净利润
if '扣除' in lines_net[0]:
p2 =re.compile("(?<=\n)归属于\D*?(\D*?\n)+%s" % psub)
lines_net = p2.search(net)
return(lines_net[0])
def get_sale(text):
'''
提取营业收入
'''
subp = "([0-9,.%\- ]*?)\n"
# 数据个数不同,只提取第一个即当年数据
psub = "%s" % (subp)
p1 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)稀释每股',re.DOTALL)
if p1.search(text) is None:
p1 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)最新每股',re.DOTALL)
if p1.search(text) is None:
p1 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)净资产收益',re.DOTALL)
sale = p1.search(text).group(0)
# 定义营业收入所在行的内容
# + 重复一次或更多次
p2 =re.compile("(?<=\n)\w*?营\D*?业\D*?收\n*?\w*?入(\D*?\n)+%s" % psub)
lines_sale = p2.search(sale)
return(lines_sale[0])
def get_profit(text):
'''
提取每股收益
'''
# ? 重复零次或一次
# * 重复零次或更多次
# . 匹配除换行符以外的任意字符
# \d 数字
# \d{1,3}1-3个数字,尽可能地多
# + 至少出现一次
# \D 匹配任意非数字的字符
# \s 匹配任意的空白符
# \w 匹配字母或数字或下划线或汉字
# *? 重复任意次,但尽可能少重复
# (?=exp) 匹配exp前面的位置
# (?<=exp) 匹配exp后面的位置
subp = "([0-9,.%\- ]*?)\n"
# 数据个数不同,只提取第一个即当年数据
psub = "%s" % (subp)
p1 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)稀释每股',re.DOTALL)
if p1.search(text) is None:
# p1匹配失败
p1_1 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)最新每股',re.DOTALL)
if p1_1.search(text) is None:
# p1_1匹配失败
p1_2 = re.compile('\D\s*\D*?主要\D*?指标\D*?\s*(?=\\n)(.*?)净资产收益',re.DOTALL)
profit = p1_2.search(text).group(0)
else:
# p1_1成功匹配
profit = p1_1.search(text).group(0)
# 定义每股收益所在行的内容
# + 重复一次或更多次
p2_1 =re.compile("(?<=\n)每股\D*?收\D*?益(\D*?\d*?\D*?\n)+%s" % psub)
lines_profit = p2_1.search(profit)
else:
# p1匹配成功
profit = p1.search(text).group(0)
# 定义每股收益所在行的内容
# + 重复一次或更多次
p2 =re.compile("(?<=\n)基本每股\D*?收\D*?益(\D*?\d*?\D*?\n)+%s" % psub)
lines_profit = p2.search(profit)
return(lines_profit[0])
def get_account_data(account,txt):
p_txt = '%s\D*?(\d{1,3}(?:,\d{3})*(?:\.\d+)?)'%account
p = re.compile(p_txt)
matchobj = p.search(txt)
amt = matchobj.group(1)
return amt
def notfloat(s):
# 检查字符串是否为浮点数,不为浮点数,返回True
try:
float(s)
return False
except ValueError:
return True
def get_pro_sal(code,year,pdf_path):
'''
name: 上市公司代码
year: 年报所属年份
pdf_path: pdf文件地址
返回营业收入、归属于上市公司股东的净利润和每股收益(基本每股收益)
'''
# 输出正在解析的PDF
print('正在解析{}的{}年报'.format(change_code(str(code)),year))
content = ''
doc = fitz.open(pdf_path)
for page in doc:
content += page.get_text()
# 由于后续subp匹配过程中,有的数字后面没有换行符,无法进行非贪婪的匹配,所以通过文本内部符号替换
content = content.replace(" "," \n")
content = content.replace("\n\n","\n")
sale_0 = get_sale(content)
sale_0 = sale_0.replace(',','') # 把收入里的逗号去掉
sale_0 = sale_0.split("\n")
while '' in sale_0:
sale_0.remove('')
# '营业收入'中间可能有换行符,要确认获得的数据是数字而不是文字
j=0
while notfloat(sale_0[j]):
# 不是字符串,加1
j+=1
try:
sale_ = float(sale_0[j].strip())
except:
sale_ = np.nan
net_0 = get_Net(content)
net_0 = net_0.replace(',','') # 把收入里的逗号去掉
net_0 = net_0.replace(' ','')
net_0 = net_0.split("\n")
while '' in net_0:
net_0.remove('')
# '归属于上市公司股东的净利润'中间可能有换行符,要确认获得的数据是数字而不是文字
k=0
while notfloat(net_0[k]):
k+=1
try:
net_ = float(net_0[k].strip())
except:
net_ = np.nan
# 同理,'每股收益'中间可能有换行符,要确认获得的数据是数字而不是文字
profit_0 = get_profit(content)
profit_0 = profit_0.replace(' ','')
profit_0 = profit_0.split("\n")
while '' in profit_0:
profit_0.remove('')
l=0
while notfloat(profit_0[l]):
# 不是字符串,加1
l+=1
try:
profit_ = float(profit_0[l].strip())
except:
profit_ = np.nan
return sale_,net_,profit_
def get_pro_sal_s(pdf_df):
'''
pdf_df是列名为code,year,pdf_path的Series
返回营业收入、归属于上市公司股东的净利润和每股收益(基本每股收益)
'''
return get_pro_sal(pdf_df.code,pdf_df.year,pdf_df.pdf_path)
def get_mul_pro_sal(pdf_df):
'''
pdf_df是列名为code,year,pdf_path的DataFrame
返回营业收入、归属于上市公司股东的净利润和每股收益(基本每股收益)
'''
return pdf_df.apply(get_pro_sal_s,axis = 1)