Skip to content

Commit bacf23f

Browse files
authored
Add multiprocessing parameters (#1521)
1 parent 0856479 commit bacf23f

File tree

9 files changed

+181
-27
lines changed

9 files changed

+181
-27
lines changed

docs/samples/python/batch_processing.ipynb

Lines changed: 102 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010
"outputs": [],
1111
"source": [
1212
"# download presidio\n",
13-
"!pip install presidio_analyzer presidio_anonymizer\n",
14-
"!python -m spacy download en_core_web_lg"
13+
"#!pip install presidio_analyzer presidio_anonymizer\n",
14+
"#!python -m spacy download en_core_web_lg\n",
15+
"#!pip install pandas"
1516
]
1617
},
1718
{
@@ -145,8 +146,8 @@
145146
"</div>"
146147
],
147148
"text/plain": [
148-
" name phrase phone number phrase integer \n",
149-
"0 Charlie likes this Please call 212-555-1234 after 2pm 1 \\\n",
149+
" name phrase phone number phrase integer \\\n",
150+
"0 Charlie likes this Please call 212-555-1234 after 2pm 1 \n",
150151
"1 You should talk to Mike his number is 978-428-7111 2 \n",
151152
"2 Mary had a little startup Phone number: 202-342-1234 3 \n",
152153
"\n",
@@ -322,8 +323,8 @@
322323
"</div>"
323324
],
324325
"text/plain": [
325-
" name phrase \n",
326-
"0 <PERSON> likes this \\\n",
326+
" name phrase \\\n",
327+
"0 <PERSON> likes this \n",
327328
"1 You should talk to <PERSON> \n",
328329
"2 <PERSON> had a little startup \n",
329330
"\n",
@@ -496,11 +497,104 @@
496497
"\n",
497498
"Are not yet supported. Consider breaking the JSON to parts if needed."
498499
]
500+
},
501+
{
502+
"cell_type": "markdown",
503+
"id": "c708ff56",
504+
"metadata": {},
505+
"source": [
506+
"## Multiprocessing\n",
507+
"\n",
508+
"`BatchAnalyzerEngine` builds upon spaCy's pipelines. For more info about multiprocessing, see https://spacy.io/usage/processing-pipelines#multiprocessing.\n",
509+
"\n",
510+
"In Presidio, one can pass the `n_process` argument and the `batch_size` parameter to define how processing is done in parallel."
511+
]
512+
},
513+
{
514+
"cell_type": "markdown",
515+
"id": "81316c6c",
516+
"metadata": {},
517+
"source": []
518+
},
519+
{
520+
"cell_type": "code",
521+
"execution_count": 25,
522+
"id": "09a80e87",
523+
"metadata": {},
524+
"outputs": [
525+
{
526+
"name": "stdout",
527+
"output_type": "stream",
528+
"text": [
529+
"[Monitor] Active Python processes: 4 - [38773, 38774, 45860, 109966]\n"
530+
]
531+
},
532+
{
533+
"name": "stdout",
534+
"output_type": "stream",
535+
"text": [
536+
"[Monitor] Active Python processes: 8 - [38773, 38774, 45860, 109966, 109973, 109976, 109977, 109978]\n",
537+
"[Monitor] Active Python processes: 8 - [38773, 38774, 45860, 109966, 109973, 109976, 109977, 109978]\n",
538+
"[Monitor] Active Python processes: 8 - [38773, 38774, 45860, 109966, 109973, 109976, 109977, 109978]\n",
539+
"[Monitor] Active Python processes: 8 - [38773, 38774, 45860, 109966, 109973, 109976, 109977, 109978]\n",
540+
"[Monitor] Active Python processes: 4 - [38773, 38774, 45860, 109966]\n"
541+
]
542+
}
543+
],
544+
"source": [
545+
"import multiprocessing\n",
546+
"import psutil\n",
547+
"import time\n",
548+
"\n",
549+
"def analyze_batch_multiprocess(n_process=12, batch_size=4):\n",
550+
" \"\"\"Run BatchAnalyzer with `n_process` processes and batch size of `batch_size`.\"\"\"\n",
551+
" list_of_texts = [\"My name is mike\"]*1000\n",
552+
"\n",
553+
" results = batch_analyzer.analyze_iterator(\n",
554+
" texts=list_of_texts, \n",
555+
" language=\"en\",\n",
556+
" n_process=n_process, \n",
557+
" batch_size=batch_size\n",
558+
" )\n",
559+
"\n",
560+
" return list(results)\n",
561+
"\n",
562+
"\n",
563+
"\n",
564+
"def monitor_processes():\n",
565+
" \"\"\"Monitor all Python processes dynamically.\"\"\"\n",
566+
" while True:\n",
567+
" processes = [p for p in psutil.process_iter(attrs=['pid', 'name']) if \"python\" in p.info['name']]\n",
568+
" print(f\"[Monitor] Active Python processes: {len(processes)} - {[p.info['pid'] for p in processes]}\")\n",
569+
" time.sleep(1)\n",
570+
"\n",
571+
"\n",
572+
"# Run interactive monitoring\n",
573+
"monitor_proc = multiprocessing.Process(target=monitor_processes, daemon=True)\n",
574+
"monitor_proc.start()\n",
575+
"\n",
576+
"# Run the batch analyzer process\n",
577+
"analyze_batch_multiprocess(n_process=4, batch_size=2)\n",
578+
"\n",
579+
"# Wait for everything to conclude\n",
580+
"time.sleep(1) \n",
581+
"\n",
582+
"# Clean up (not needed if daemon=True, but useful if stopping manually)\n",
583+
"monitor_proc.terminate()\n"
584+
]
585+
},
586+
{
587+
"cell_type": "code",
588+
"execution_count": null,
589+
"id": "7b7b6c64",
590+
"metadata": {},
591+
"outputs": [],
592+
"source": []
499593
}
500594
],
501595
"metadata": {
502596
"kernelspec": {
503-
"display_name": "Python 3 (ipykernel)",
597+
"display_name": "presidio-analyzer-sAyh6tzK-py3.12",
504598
"language": "python",
505599
"name": "python3"
506600
},
@@ -514,7 +608,7 @@
514608
"name": "python",
515609
"nbconvert_exporter": "python",
516610
"pygments_lexer": "ipython3",
517-
"version": "3.10.5"
611+
"version": "3.12.1"
518612
}
519613
},
520614
"nbformat": 4,

presidio-analyzer/presidio_analyzer/batch_analyzer_engine.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ def analyze_iterator(
2727
self,
2828
texts: Iterable[Union[str, bool, float, int]],
2929
language: str,
30-
batch_size: Optional[int] = None,
30+
batch_size: int = 1,
31+
n_process: int = 1,
3132
**kwargs,
3233
) -> List[List[RecognizerResult]]:
3334
"""
@@ -36,6 +37,7 @@ def analyze_iterator(
3637
:param texts: An list containing strings to be analyzed.
3738
:param language: Input language
3839
:param batch_size: Batch size to process in a single iteration
40+
:param n_process: Number of processors to use. Defaults to `1`
3941
:param kwargs: Additional parameters for the `AnalyzerEngine.analyze` method.
4042
(default value depends on the nlp engine implementation)
4143
"""
@@ -46,7 +48,10 @@ def analyze_iterator(
4648
# Process the texts as batch for improved performance
4749
nlp_artifacts_batch: Iterator[Tuple[str, NlpArtifacts]] = (
4850
self.analyzer_engine.nlp_engine.process_batch(
49-
texts=texts, language=language, batch_size=batch_size
51+
texts=texts,
52+
language=language,
53+
batch_size=batch_size,
54+
n_process=n_process,
5055
)
5156
)
5257

@@ -65,6 +70,8 @@ def analyze_dict(
6570
input_dict: Dict[str, Union[Any, Iterable[Any]]],
6671
language: str,
6772
keys_to_skip: Optional[List[str]] = None,
73+
batch_size: int = 1,
74+
n_process: int = 1,
6875
**kwargs,
6976
) -> Iterator[DictAnalyzerResult]:
7077
"""
@@ -75,6 +82,9 @@ def analyze_dict(
7582
:param input_dict: The input dictionary for analysis
7683
:param language: Input language
7784
:param keys_to_skip: Keys to ignore during analysis
85+
:param batch_size: Batch size to process in a single iteration
86+
:param n_process: Number of processors to use. Defaults to `1`
87+
7888
:param kwargs: Additional keyword arguments
7989
for the `AnalyzerEngine.analyze` method.
8090
Use this to pass arguments to the analyze method,
@@ -119,6 +129,8 @@ def analyze_dict(
119129
texts=value,
120130
language=language,
121131
context=specific_context,
132+
n_process=n_process,
133+
batch_size=batch_size,
122134
**kwargs,
123135
)
124136
else:

presidio-analyzer/presidio_analyzer/nlp_engine/nlp_engine.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from abc import ABC, abstractmethod
2-
from typing import Iterable, Iterator, List, Optional, Tuple
2+
from typing import Iterable, Iterator, List, Tuple
33

44
from presidio_analyzer.nlp_engine import NlpArtifacts
55

@@ -29,7 +29,8 @@ def process_batch(
2929
self,
3030
texts: Iterable[str],
3131
language: str,
32-
batch_size: Optional[int] = None,
32+
batch_size: int = 1,
33+
n_process: int = 1,
3334
**kwargs, # noqa ANN003
3435
) -> Iterator[Tuple[str, NlpArtifacts]]:
3536
"""Execute the NLP pipeline on a batch of texts.

presidio-analyzer/presidio_analyzer/nlp_engine/spacy_nlp_engine.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@
66
from spacy.language import Language
77
from spacy.tokens import Doc, Span
88

9-
from presidio_analyzer.nlp_engine import NerModelConfiguration, NlpArtifacts, NlpEngine
9+
from presidio_analyzer.nlp_engine import (
10+
NerModelConfiguration,
11+
NlpArtifacts,
12+
NlpEngine,
13+
)
1014

1115
logger = logging.getLogger("presidio-analyzer")
1216

@@ -111,13 +115,16 @@ def process_batch(
111115
self,
112116
texts: Union[List[str], List[Tuple[str, object]]],
113117
language: str,
114-
batch_size: Optional[int] = None,
118+
batch_size: int = 1,
119+
n_process: int = 1,
115120
as_tuples: bool = False,
116121
) -> Iterator[Optional[NlpArtifacts]]:
117122
"""Execute the NLP pipeline on a batch of texts using spacy pipe.
118123
119124
:param texts: A list of texts to process.
120125
:param language: The language of the texts.
126+
:param batch_size: Default batch size for pipe and evaluate.
127+
:param n_process: Number of processors to process texts.
121128
:param as_tuples: If set to True, inputs should be a sequence of
122129
(text, context) tuples. Output will then be a sequence of
123130
(doc, context) tuples. Defaults to False.
@@ -128,7 +135,7 @@ def process_batch(
128135

129136
texts = (str(text) for text in texts)
130137
docs = self.nlp[language].pipe(
131-
texts, as_tuples=as_tuples, batch_size=batch_size
138+
texts, as_tuples=as_tuples, batch_size=batch_size, n_process=n_process
132139
)
133140
for doc in docs:
134141
yield doc.text, self._doc_to_nlp_artifact(doc, language)

presidio-analyzer/tests/test_analyzer_engine_provider.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
from presidio_analyzer import AnalyzerEngineProvider, RecognizerResult
66
from presidio_analyzer.nlp_engine import SpacyNlpEngine, NlpArtifacts
77

8-
from presidio_analyzer.nlp_engine.transformers_nlp_engine import TransformersNlpEngine
98
from presidio_analyzer.predefined_recognizers import AzureAILanguageRecognizer
109

10+
import pytest
1111

1212
def get_full_paths(analyzer_yaml, nlp_engine_yaml=None, recognizer_registry_yaml=None):
1313
this_path = Path(__file__).parent.absolute()
@@ -155,7 +155,7 @@ def test_analyzer_engine_provider_with_files_per_provider():
155155
assert len(recognizer_registry.recognizers) == 6
156156
assert recognizer_registry.supported_languages == ["en", "es"]
157157

158-
158+
@pytest.mark.skipif(pytest.importorskip("azure"), reason="Optional dependency not installed") # noqa: E501
159159
def test_analyzer_engine_provider_with_azure_ai_language():
160160
analyzer_yaml, _, _ = get_full_paths(
161161
"conf/test_azure_ai_language_reco.yaml",

presidio-analyzer/tests/test_batch_analyzer_engine.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,17 @@ def test_analyze_iterator_returns_list_of_recognizer_results(
3434
texts, expected_output, batch_analyzer_engine_simple
3535
):
3636

37-
results = batch_analyzer_engine_simple.analyze_iterator(texts=texts, language="en")
37+
results = batch_analyzer_engine_simple.analyze_iterator(texts=texts, language="en", batch_size=2)
3838

3939
assert len(results) == len(expected_output)
4040
for result, expected_result in zip(results, expected_output):
4141
assert result == expected_result
4242
# fmt: on
4343

44-
45-
def test_analyze_dict_one_value_per_key(batch_analyzer_engine_simple):
44+
@pytest.mark.parametrize("n_process, batch_size", [(1,1), (4,2), (2,1)])
45+
def test_analyze_dict_one_value_per_key(batch_analyzer_engine_simple,
46+
n_process,
47+
batch_size):
4648

4749
d = {
4850
"url": "https://microsoft.com",
@@ -51,7 +53,10 @@ def test_analyze_dict_one_value_per_key(batch_analyzer_engine_simple):
5153
"misc": "microsoft.com or (202)-555-1234",
5254
}
5355

54-
results = batch_analyzer_engine_simple.analyze_dict(input_dict=d, language="en")
56+
results = batch_analyzer_engine_simple.analyze_dict(input_dict=d,
57+
language="en",
58+
n_process=n_process,
59+
batch_size=batch_size)
5560
results = list(results)
5661

5762
# url
@@ -267,14 +272,19 @@ def test_analyze_dict_with_nones_returns_empty_result(batch_analyzer_engine_simp
267272
for r in res:
268273
assert not r
269274

275+
276+
@pytest.mark.parametrize("n_process, batch_size", [(1,1), (4,2), (2,1)])
270277
def test_batch_analyze_iterator_returns_list_of_recognizer_results(
271-
batch_analyzer_engine_simple
278+
batch_analyzer_engine_simple, n_process, batch_size
272279
):
273280
texts = ["My name is David", "Call me at 2352351232", "I was born at 1/5/1922"]
274281
expected_output = [[], [RecognizerResult(entity_type="PHONE_NUMBER", start=11, end=21, score= 0.4)], []]
275282

276-
results = batch_analyzer_engine_simple.analyze_iterator(texts=texts, language="en", batch_size=2)
283+
results = batch_analyzer_engine_simple.analyze_iterator(texts=texts,
284+
language="en",
285+
batch_size=batch_size,
286+
n_process=n_process)
277287

278288
assert len(results) == len(expected_output)
279289
for result, expected_result in zip(results, expected_output):
280-
assert result == expected_result
290+
assert result == expected_result

presidio-analyzer/tests/test_spacy_nlp_engine.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def test_simple_process_text(spacy_nlp_engine):
2121
assert nlp_artifacts.lemmas[1] == "text"
2222

2323

24+
2425
def test_process_batch_strings(spacy_nlp_engine):
2526
nlp_artifacts_batch = spacy_nlp_engine.process_batch(
2627
["simple text", "simple text"], language="en"

presidio-structured/presidio_structured/analysis_builder.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,16 @@ def __init__(
2626
self,
2727
analyzer: Optional[AnalyzerEngine] = None,
2828
analyzer_score_threshold: Optional[float] = None,
29+
n_process: int = 1,
30+
batch_size: int = 1
2931
) -> None:
30-
"""Initialize the configuration generator."""
32+
"""Initialize the configuration generator.
33+
34+
:param analyzer: AnalyzerEngine instance
35+
:param analyzer_score_threshold: threshold for filtering out results
36+
:param batch_size: Batch size to process in a single iteration
37+
:param n_process: Number of processors to use. Defaults to `1`
38+
"""
3139
default_score_threshold = (
3240
analyzer_score_threshold if analyzer_score_threshold is not None else 0
3341
)
@@ -37,6 +45,8 @@ def __init__(
3745
else analyzer
3846
)
3947
self.batch_analyzer = BatchAnalyzerEngine(analyzer_engine=self.analyzer)
48+
self.n_process = n_process
49+
self.batch_size = batch_size
4050

4151
@abstractmethod
4252
def generate_analysis(
@@ -92,7 +102,10 @@ def generate_analysis(
92102
"""
93103
logger.debug("Starting JSON BatchAnalyzer analysis")
94104
analyzer_results = self.batch_analyzer.analyze_dict(
95-
input_dict=data, language=language
105+
input_dict=data,
106+
language=language,
107+
n_process=self.n_process,
108+
batch_size=self.batch_size
96109
)
97110

98111
key_recognizer_result_map = self._generate_analysis_from_results_json(
@@ -240,7 +253,10 @@ def _batch_analyze_df(
240253
for column in df.columns:
241254
logger.debug(f"Finding most common PII entity for column {column}")
242255
analyzer_results = self.batch_analyzer.analyze_iterator(
243-
[val for val in df[column]], language=language
256+
[val for val in df[column]],
257+
language=language,
258+
n_process=self.n_process,
259+
batch_size=self.batch_size
244260
)
245261
column_analyzer_results_map[column] = analyzer_results
246262

0 commit comments

Comments
 (0)