14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190 | class LogicalSection:
def __init__(self, doc, title, sections):
self.title = title
self.page_start = title.page
self.doc = doc
# self.textlines = [line for line in self.lines if line.kind is None
self.id = sections.index(title)
self.next_section = sections[self.id + 1] if self.id < len(sections) - 1 else None
try:
self.lines = self.get_section_lines()
except Exception:
logger.exception(f"pb collecting lines for section {title}")
logger.info(f"got all lines #={len(self.lines)}")
if self.lines:
# sometimes titles follow each other directly
self.page_end = self.lines[-1].page
self.pages = sorted(list(set([line.page for line in self.lines])))
logger.info("detecting lists")
self.lists = self.detect_lists()
logger.info("detecting paragraphs")
self.paragraphs = self.detect_paragraphs()
logger.info("done")
else:
self.page_end = self.page_start
self.pages = [self.page_start]
self.lists, self.paragraphs = [], []
def get_section_lines(self):
# can't use a while loop with line.next because lines are indexed via their page, not the whole document
# so for now, we go through the pages of the document and collect the lines in chunks
# it's less readable and maybe a little bit slower?
section_page = self.doc.pages[self.page_start]
logger.debug(f"section {self.title} starting on page {section_page.id}")
page_lines = section_page.lines
try:
section_pageindex = page_lines.index(self.title)
except ValueError:
msg = "\n".join(str(p) for p in page_lines)
logger.error(f"section {self.title} not found in page {self.page_start} with lines={msg}")
return []
# logger.debug(f"section starting on line {section_pageindex}")
if self.next_section is not None:
start_pageid = self.page_start
end_pageid = self.next_section.page
logger.debug(
f"collecting lines until next section {self.next_section} from page {start_pageid} to {end_pageid}"
)
lines: List[Any] = []
for pageid in range(start_pageid, end_pageid + 1):
logger.debug(f"collecting on page {pageid}")
p = self.doc.pages[pageid]
_lines = p.lines
if pageid == end_pageid:
_lines = _lines[: p.lines.index(self.next_section)]
if pageid == start_pageid:
_lines = _lines[section_pageindex:]
_lines = [_line for _line in _lines if type(_line.kind) not in {Header, Footer}]
if len(_lines) < 2000:
lines.extend(_lines)
else:
logger.warning(f"skipping page {pageid} because of too many lines {len(_lines)}")
logger.debug(f"#lines={len(lines)} #llines={len(_lines)}")
else:
# it's the last section of the document
logger.debug("collecting lines on all remaining pages")
lines = [line for page in self.doc.pages[section_pageindex::] for line in page.lines if line != self.title]
return lines
def display(self):
for line in self.lines:
print(line)
def detect_lists(self, thr=3):
# TODO : there are still some issues with nested lists, as they are detected twice.
doc = self.doc
lst = self.lines
final_ponct = re.compile(r"([\.\?\!…])")
starts = []
pages = []
fulls = []
for i, line in enumerate(lst):
page = doc.pages[line.page]
prev_line = line.prev if line.prev else lst[i - 1]
prev_spacing = line.prev_spacing if line.prev else abs((round(line.y0) - round(lst[i - 1].y1)))
prev_delta_spacing = prev_spacing - page.most_common_line_spacing
# next_delta_spacing = line.next_spacing - page.most_common_line_spacing
# potential start of list
if line.content.endswith(":") and line.next:
m = re.match(pats["list_start"], line.next.content)
if m:
pages.append(line.page)
starts.append(m)
rest = lst[i + 2 : :]
# print(len(rest))
# if the entirety of the intro is in the current line, we only need to collect the following lines
if line.content[0].isupper() and (
prev_delta_spacing > thr
or round(prev_line.x0 > round(line.x0) + 3)
or i == 0
or (prev_line.x1 < line.x1 and re.match(final_ponct, prev_line.content[-1]))
):
intro = [line]
items = collect_list_items(m, line, rest, page, thr)
else:
# otherwise, we must capture the lines before and add it to the 'intro paragraph'.
before = list(reversed(lst[:i]))
intro = collect_intro(line, before, page, thr)
items = collect_list_items(m, line, rest, page, thr)
full = TextList(intro=intro, items=items, m=m, doc=self.doc)
fulls.append(full)
# potential rest of list but on another page
if pages and fulls:
last = fulls[-1]
try:
line_index = page.lines.index(line)
if re.match(last.segmenter, line.content) and line_index <= 1:
rest = lst[i + 1 : :]
other_items = collect_list_items(last.match, line, rest, page, thr, newpage=True)
last.add_items(other_items)
except Exception:
pass
return fulls
def detect_paragraphs(self, thr=3):
doc = self.doc
all_lists_lines = [line for lst in self.lists for line in lst.lines]
def filter_lines():
groups = []
group = []
for line in self.lines:
if line.kind is None and line not in all_lists_lines:
group.append(line)
else:
if group:
groups.append(group)
group = []
if group:
groups.append(group)
return groups
paragraphs = []
text_groups = filter_lines()
final_ponct = re.compile(r"[\.\?\!…;]")
for lst in text_groups:
paras = []
para = []
page = doc.pages[lst[0].page]
for i, line in enumerate(lst):
para.append(line)
if i == len(lst) - 1:
paras.append(para)
break
next_line = lst[i + 1]
line_spacing = abs((round(next_line.y0) - round(line.y1)))
diff = line_spacing - page.most_common_line_spacing
if diff > thr and (re.match(final_ponct, line.content[-1]) or line.x0 > next_line.x0 + 20):
paras.append(para)
para = []
paragraphs.extend([Paragraph(p, doc) for p in paras])
return paragraphs
def get_all_paragraphs(self):
all_objs = self.paragraphs + self.lists
all_objs.sort(key=lambda x: self.lines.index(x.lines[0]))
return all_objs
|