Skip to content

Analyze

This module combines the files together in wide form.

Overview

This module contains the logic for analyzing the data.

It is responsible for combining the data from the different sources and converting into wide format for analysis.

You can actually run this as a script directly from the command line if you cloned the repo.

cleanup_columns(df)

Cleans up the column names IN PLACE.

Parameters:

Name Type Description Default
df pd.DataFrame

The data.

required
Source code in opendata_pipeline/analyze.py
186
187
188
189
190
191
192
193
194
def cleanup_columns(df: pd.DataFrame):
    """Cleans up the column names IN PLACE.

    Args:
        df (pd.DataFrame): The data.
    """
    # drop columns we don't need?
    # lowercase columns
    return df.rename(columns={col: col.lower().replace(" ", "_") for col in df.columns})

combine(base_df, geo_df, drug_df)

Combines the data into a single dataframe.

This function calls make_wide, merge_wide_drugs_to_records, and join_geocoded_data.

Parameters:

Name Type Description Default
base_df pd.DataFrame

The base data.

required
geo_df pd.DataFrame

The geocoded data.

required
drug_df pd.DataFrame

The drug data.

required

Returns:

Type Description
pd.DataFrame

pd.DataFrame: The combined data.

Source code in opendata_pipeline/analyze.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def combine(
    base_df: pd.DataFrame,
    geo_df: pd.DataFrame,
    drug_df: pd.DataFrame,
) -> pd.DataFrame:
    """Combines the data into a single dataframe.

    This function calls `make_wide`, `merge_wide_drugs_to_records`, and `join_geocoded_data`.

    Args:
        base_df (pd.DataFrame): The base data.
        geo_df (pd.DataFrame): The geocoded data.
        drug_df (pd.DataFrame): The drug data.

    Returns:
        pd.DataFrame: The combined data.
    """
    wide_drug_df = make_wide(df=drug_df)
    records_wide_drugs = merge_wide_drugs_to_records(
        base_df=base_df, wide_drug_df=wide_drug_df
    )
    joined_df = join_geocoded_data(base_df=records_wide_drugs, geo_df=geo_df)
    return joined_df

join_geocoded_data(base_df, geo_df)

Joins the geocoded data to the base data.

Merges on index

Parameters:

Name Type Description Default
base_df pd.DataFrame

The base data.

required
geo_df pd.DataFrame

The geocoded data.

required

Returns:

Type Description
pd.DataFrame

pd.DataFrame: The joined data.

Source code in opendata_pipeline/analyze.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def join_geocoded_data(base_df: pd.DataFrame, geo_df: pd.DataFrame) -> pd.DataFrame:
    """Joins the geocoded data to the base data.

    Merges on index

    Args:
        base_df (pd.DataFrame): The base data.
        geo_df (pd.DataFrame): The geocoded data.

    Returns:
        pd.DataFrame: The joined data.
    """
    # merge on index
    # expects CaseIdentifier to be the index
    merged_df = pd.merge(
        left=base_df,
        right=geo_df,
        how="left",
        left_index=True,
        right_index=True,
    )
    return merged_df

make_wide(df)

Converts the drug data from long to wide format.

Parameters:

Name Type Description Default
df pd.DataFrame

The drug data.

required

Returns:

Type Description
pd.DataFrame

pd.DataFrame: The drug data in wide format.

Source code in opendata_pipeline/analyze.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def make_wide(df: pd.DataFrame) -> pd.DataFrame:
    """Converts the drug data from long to wide format.

    Args:
        df (pd.DataFrame): The drug data.

    Returns:
        pd.DataFrame: The drug data in wide format.
    """
    # expects drug_df to have CaseIdentifier as index
    records: dict[str, dict[str, int]] = {}
    for row in df.reset_index().to_dict(orient="records"):
        case_id = row["CaseIdentifier"]
        if case_id not in records.keys():
            records[case_id] = {}

        # this encode the source column index (primary = 1, secondary = 2, etc)
        # and combines it with the search term (drug name)
        column_name = f"{row['search_term'].lower()}_{row['source_col_index'] + 1}"
        # set to 1 because we found it
        records[case_id][column_name] = 1

        for tag in row["tags"].split("|"):
            # set to 1 because we found it
            records[case_id][f"{tag}_tag"] = 1

    flat_records = [{"CaseIdentifier": k, **v} for k, v in records.items()]
    wide_df = pd.DataFrame(flat_records).set_index("CaseIdentifier")
    return wide_df

merge_wide_drugs_to_records(base_df, wide_drug_df)

Merges the wide drug data to the base data (on index).

Parameters:

Name Type Description Default
base_df pd.DataFrame

The base data.

required
wide_drug_df pd.DataFrame

The wide drug data.

required

Returns:

Type Description
pd.DataFrame

pd.DataFrame: The joined data.

Source code in opendata_pipeline/analyze.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def merge_wide_drugs_to_records(
    base_df: pd.DataFrame, wide_drug_df: pd.DataFrame
) -> pd.DataFrame:
    """Merges the wide drug data to the base data (on index).

    Args:
        base_df (pd.DataFrame): The base data.
        wide_drug_df (pd.DataFrame): The wide drug data.

    Returns:
        pd.DataFrame: The joined data.
    """
    # merge on index
    # expects CaseIdentifier to be the index for BOTH
    return pd.merge(
        left=base_df,
        right=wide_drug_df,
        left_index=True,
        right_index=True,
        how="left",
    )

read_drug_data(source)

Reads the drug data from the data directory.

Sets the index to CaseIdentifier/record_id, and handles some minor column renaming.

Only returns the data for the given source (i.e. filtered).

Parameters:

Name Type Description Default
source models.DataSource

The source to read.

required

Returns:

Type Description
pd.DataFrame

pd.DataFrame: The drug data.

Source code in opendata_pipeline/analyze.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def read_drug_data(source: models.DataSource) -> pd.DataFrame:
    """Reads the drug data from the data directory.

    Sets the index to `CaseIdentifier`/`record_id`, and handles some minor column renaming.

    Only returns the data for the given source (i.e. filtered).

    Args:
        source (models.DataSource): The source to read.

    Returns:
        pd.DataFrame: The drug data.
    """
    df = (
        pd.read_json(
            Path("data") / "drug_data.jsonl",
            lines=True,
            orient="records",
            typ="frame",
        )
        .rename(columns={"record_id": "CaseIdentifier"})
        .set_index("CaseIdentifier")
    )
    # column we set to data source name --> `data_source`
    filt_df = df[df["data_source"] == source.name].copy()
    return filt_df

read_geocoded_data(source)

Reads the geocoded data from the data directory.

Sets the index to CaseIdentifier, and handles some minor column renaming.

Only returns the data for the given source (i.e. filtered).

Parameters:

Name Type Description Default
source models.DataSource

The source to read.

required

Returns:

Type Description
pd.DataFrame

pd.DataFrame: The geocoded data.

Source code in opendata_pipeline/analyze.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def read_geocoded_data(source: models.DataSource) -> pd.DataFrame:
    """Reads the geocoded data from the data directory.

    Sets the index to `CaseIdentifier`, and handles some minor column renaming.

    Only returns the data for the given source (i.e. filtered).

    Args:
        source (models.DataSource): The source to read.

    Returns:
        pd.DataFrame: The geocoded data.
    """
    # expects geocoding to be done and file to be in
    # data/geocoded_data.jsonl
    # returns filtered data to save memory
    df = pd.read_json(
        Path("data") / "geocoded_data.jsonl",
        lines=True,
        orient="records",
        typ="frame",
    ).set_index("CaseIdentifier")
    # column we set to data source name --> `data_source`
    filt_df = df[df["data_source"] == source.name].drop(columns=["data_source"])
    dff = filt_df.rename(columns={col: f"geocoded_{col}" for col in filt_df.columns})
    return dff

read_records(source)

Reads the records from the data directory, sets index to CaseIdentifier.

Source code in opendata_pipeline/analyze.py
 96
 97
 98
 99
100
101
102
103
104
def read_records(source: models.DataSource) -> pd.DataFrame:
    """Reads the records from the data directory, sets index to `CaseIdentifier`."""
    df = pd.read_json(
        Path("data") / f"{source.records_filename}",
        lines=True,
        orient="records",
        typ="frame",
    ).set_index("CaseIdentifier")
    return df

run(settings)

Runs the data processing.

Parameters:

Name Type Description Default
settings models.Settings

The settings.

required
Source code in opendata_pipeline/analyze.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def run(settings: models.Settings) -> None:
    """Runs the data processing.

    Args:
        settings (models.Settings): The settings.
    """
    for data_source in settings.sources:

        records_df = read_records(source=data_source)
        console.log(
            f"Read {len(records_df)} records from {data_source.records_filename}"
        )
        drug_df = read_drug_data(source=data_source)
        console.log(f"Read {len(drug_df)} drug records for {data_source.name}")

        geocoded_df = read_geocoded_data(source=data_source)
        console.log(f"Read {len(geocoded_df)} geocoded records for {data_source.name}")

        # write a file for each analysis step for the data source
        # written into a folder for the data source so that we can zip
        data_dir = Path("data") / data_source.name.replace(" ", "_")
        data_dir.mkdir(exist_ok=True)
        records_df.reset_index().to_csv(data_dir / "records.csv")
        drug_df.reset_index().to_csv(data_dir / "drug.csv")
        geocoded_df.reset_index().to_csv(data_dir / "geocoded.csv")
        # eventually add spatial

        console.log("Combining data...")
        combined_df = combine(
            base_df=records_df,
            geo_df=geocoded_df,
            drug_df=drug_df,
        )
        console.log(f"Combined data has {combined_df.shape} shape")

        cleaned_df = cleanup_columns(df=combined_df)

        console.log("Writing combined data to csv...")
        cleaned_df.reset_index().to_csv(
            Path("data") / data_source.csv_filename, index=False
        )