Skip to content

Extract Drugs

This module extracts drugs from the records.

Overview

This module extracts drugs from the records.

It utilizes the drug columns (in order) listed in the config file (config.json).

It also requires you to have the drug extraction tool installed.

command(input_fpath, target_column, search_words)

Build the command to run the drug extraction tool.

Parameters:

Name Type Description Default
input_fpath str

path to the input file

required
target_column str

the column to search

required
search_words str

the search terms

required

Returns:

Type Description
list[str]

list[str]: the command (list) to run

Source code in opendata_pipeline/extract_drugs.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def command(input_fpath: str, target_column: str, search_words: str) -> list[str]:
    """Build the command to run the drug extraction tool.

    Args:
        input_fpath (str): path to the input file
        target_column (str): the column to search
        search_words (str): the search terms

    Returns:
        list[str]: the command (list) to run
    """
    return [
        "extract-drugs",
        "simple-search",
        input_fpath,
        "--target-column",
        target_column,
        "--id-column",
        # we made this column when we fetched the data
        "CaseIdentifier",
        "--search-words",
        search_words,
        "--algorithm",
        "osa",
        "--threshold",
        "0.9",
        "--format",
        "jsonl",
    ]

enhance_drug_output(record, target_column, column_level, data_source, tag_lookup)

Enhance drug output with additional columns.

Source code in opendata_pipeline/extract_drugs.py
70
71
72
73
74
75
76
77
78
79
80
81
82
def enhance_drug_output(
    record: dict,
    target_column: str,
    column_level: int,
    data_source: str,
    tag_lookup: dict[str, str],
) -> Generator[dict[str, Any], None, None]:
    """Enhance drug output with additional columns."""
    record["data_source"] = data_source
    record["source_column"] = target_column
    record["source_col_index"] = column_level
    record["tags"] = tag_lookup[record["search_term"].lower()]
    yield record

export_drug_output(drug_results)

Export the drug output to a file.

Source code in opendata_pipeline/extract_drugs.py
134
135
136
137
138
def export_drug_output(drug_results: list[dict[str, Any]]) -> None:
    """Export the drug output to a file."""
    with open(Path("data") / "drug_data.jsonl", "w") as f:
        for record in drug_results:
            f.write(orjson.dumps(record).decode("utf-8") + "\n")

fetch_drug_search_terms()

Fetch drug search terms from the remote github repo.

Returns:

Type Description
dict[str, str]

dict[str, str]: a dictionary of search terms and their tags

Source code in opendata_pipeline/extract_drugs.py
19
20
21
22
23
24
25
26
27
28
29
def fetch_drug_search_terms() -> dict[str, str]:
    """Fetch drug search terms from the remote github repo.

    Returns:
        dict[str, str]: a dictionary of search terms and their tags
    """
    console.log("Fetching drug search terms from GitHub")
    url = "https://raw.githubusercontent.com/UK-IPOP/drug-extraction/main/de-workflow/data/drug_info.json"
    resp = requests.get(url)
    data = resp.json()
    return {k: "|".join(v) for k, v in data.items()}

read_drug_output()

Read the drug output file and yield each record.

Source code in opendata_pipeline/extract_drugs.py
63
64
65
66
67
def read_drug_output() -> Generator[dict[str, Any], None, None]:
    """Read the drug output file and yield each record."""
    with open("extracted_drugs.jsonl", "r") as f:
        for line in f:
            yield orjson.loads(line)

run(settings)

Run the drug extraction tool.

Source code in opendata_pipeline/extract_drugs.py
141
142
143
144
145
146
147
148
149
150
151
def run(settings: models.Settings) -> None:
    """Run the drug extraction tool."""
    search_terms = fetch_drug_search_terms()

    drug_results: list[dict[str, Any]] = []
    for data_source in settings.sources:
        results = run_drug_tool(config=data_source, tag_lookup=search_terms)
        drug_results.extend(results)

    console.log("Exporting drug data...")
    export_drug_output(drug_results=drug_results)

run_drug_tool(config, tag_lookup)

Run the drug extraction tool.

Parameters:

Name Type Description Default
config models.DataSource

the data source config

required
tag_lookup dict[str, str]

the drug search terms

required

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: the drug results

Source code in opendata_pipeline/extract_drugs.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def run_drug_tool(
    config: models.DataSource, tag_lookup: dict[str, str]
) -> list[dict[str, Any]]:
    """Run the drug extraction tool.

    Args:
        config (models.DataSource): the data source config
        tag_lookup (dict[str, str]): the drug search terms

    Returns:
        list[dict[str, Any]]: the drug results
    """
    # mostly this is replicating de-workflow code but we don't want ALL of those features
    # and the added dependencies so we just rewrite it here
    terms = "|".join(tag_lookup.keys())

    drug_results: list[dict[str, Any]] = []

    for column_level, target_column in enumerate(config.drug_columns):
        # TODO: adjust this when we can read jsonlines in drug tool
        in_file = Path("data") / config.drug_prep_filename
        cmd = command(
            input_fpath=in_file.as_posix(),
            target_column=target_column,
            search_words=terms,
        )
        # output is written to file

        console.log(
            f"Running drug extraction tool on {target_column} for {config.name}"
        )
        subprocess.run(cmd)
        # so now we read the file using generators
        # could code this better
        for record in read_drug_output():
            enhanced_records = enhance_drug_output(
                record=record,
                data_source=config.name,
                target_column=target_column,
                column_level=column_level,
                tag_lookup=tag_lookup,
            )
            # consume generator
            for enhanced_record in enhanced_records:
                drug_results.append(enhanced_record)

    return drug_results