Skip to content

Fetch

This module fetches the data from the web.

Overview

This module contains functions for fetching data from the open data portal or other sources.

It uses async requests if not using the open data portal to speed things up.

build_url(offset, base_url)

Build url for pagination.

Adds resultOffset and resultRecordCount to url.

Parameters:

Name Type Description Default
offset int

int

required
base_url str

str

required

Returns:

Name Type Description
str str

url

Source code in opendata_pipeline/fetch.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def build_url(offset: int, base_url: str) -> str:
    """Build url for pagination.

    Adds resultOffset and resultRecordCount to url.

    Args:
        offset: int
        base_url: str

    Returns:
        str: url
    """
    # add 1000 record limit and offset params
    return f"{base_url}&resultRecordCount=1000&resultOffset={offset}"

cook_county_drug_col(df)

Create composite drug column for Cook County.

Source code in opendata_pipeline/fetch.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def cook_county_drug_col(df: pd.DataFrame) -> pd.DataFrame:
    """Create composite drug column for Cook County."""
    # a custom handler for this primary cause of death
    # taken from STACKOVERFLOW
    dff = df.copy()
    dff["primary_cod"] = (
        dff["primarycause"]
        .combine(
            dff["primarycause_linea"],
            lambda x, y: ((x or "") + " " + (y or "")) or None,
            None,
        )
        .combine(
            dff["primarycause_lineb"],
            lambda x, y: ((x or "") + " " + (y or "")) or None,
            None,
        )
        .combine(
            dff["primarycause_linec"],
            lambda x, y: ((x or "") + " " + (y or "")) or None,
            None,
        )
    ).apply(lambda x: x.strip() if x else None)
    return dff

export_drug_df(df, config)

Export drug dataframe to csv.

This is a temporary function to export a csv for the drug tool.

Parameters:

Name Type Description Default
df pd.DataFrame

Drug Output dataframe

required
config models.DataSource

DataSource object

required
Source code in opendata_pipeline/fetch.py
115
116
117
118
119
120
121
122
123
124
125
126
def export_drug_df(df: pd.DataFrame, config: models.DataSource) -> None:
    """Export drug dataframe to csv.

    This is a temporary function to export a csv for the drug tool.

    Args:
        df (pd.DataFrame): Drug Output dataframe
        config (models.DataSource): DataSource object
    """
    target_cols = ["CaseIdentifier"] + config.drug_columns
    out_path = Path("data") / config.drug_prep_filename
    df.loc[:, target_cols].to_csv(out_path, index=False)

export_jsonlines_from_df(df, config)

Export jsonlines from dataframe.

Source code in opendata_pipeline/fetch.py
129
130
131
132
133
def export_jsonlines_from_df(df: pd.DataFrame, config: models.DataSource) -> None:
    """Export jsonlines from dataframe."""
    console.log(f"Exporting {config.name} jsonlines...")
    out_path = Path("data") / config.records_filename
    df.to_json(out_path, orient="records", lines=True)

get_async_records(config, current_index) async

Get records from url.

This is an async function to get records from a url for each dataset.

Parameters:

Name Type Description Default
config models.DataSource

DataSource object

required
current_index int

int

required

Returns:

Name Type Description
int int

newly updated index

Source code in opendata_pipeline/fetch.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
async def get_async_records(config: models.DataSource, current_index: int) -> int:
    """Get records from url.

    This is an async function to get records from a url for each dataset.

    Args:
        config: DataSource object
        current_index: int

    Returns:
        int: newly updated index
    """
    console.log(f"Fetching {config.name} records...")
    async with aiohttp.ClientSession() as session:
        tasks = []
        # we add 2000 assuming there were more records than last week
        for offset in range(0, config.total_records + 2000, 1000):
            url = build_url(base_url=config.url, offset=offset)
            tasks.append(asyncio.ensure_future(get_record_set(session, url)))

        records = []
        for task in track(
            asyncio.as_completed(tasks),
            description="Fetching async records...",
            total=len(tasks),
        ):
            record_set = await task
            records.extend(record_set)

        console.log(
            f"Fetched {len(records):,} records asynchronously from {config.url}"
        )

        df = make_df_with_identifier(records, current_index)
        export_drug_df(df, config)
        export_jsonlines_from_df(df, config)
        return len(records)

get_open_data_records(config)

Get records from open data portal.

This is a synchronous request.

It sets the total to 1000 + the data source current total.

Parameters:

Name Type Description Default
config models.DataSource

DataSource object

required

Returns:

Type Description
list[dict[str, typing.Any]]

list[dict[str, typing.Any]]: list of records

Source code in opendata_pipeline/fetch.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def get_open_data_records(
    config: models.DataSource,
) -> list[dict[str, typing.Any]]:
    """Get records from open data portal.

    This is a synchronous request.

    It sets the total to 1000 + the data source current total.

    Args:
        config: DataSource object

    Returns:
        list[dict[str, typing.Any]]: list of records
    """
    # we add 1000 assuming there were more records than last week
    console.log(f"Fetching {config.name} records...")
    payload = {"$top": config.total_records + 1_000}
    response = requests.get(config.url, params=payload)
    data: dict[str, typing.Any] = response.json()
    if "value" not in data:
        raise ValueError(
            f"Unable to get records from {config.url}, `value` key not in response"
        )
    return data["value"]

get_record_set(session, url) async

Get record set from url.

An async function to get a record set from a url.

If fails, retries.

Parameters:

Name Type Description Default
session aiohttp.ClientSession

aiohttp.ClientSession

required
url str

str

required

Returns:

Type Description
list[dict[str, typing.Any]]

list[dict[str, typing.Any]]: list of records

Source code in opendata_pipeline/fetch.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
async def get_record_set(
    session: aiohttp.ClientSession, url: str
) -> list[dict[str, typing.Any]]:
    """Get record set from url.

    An async function to get a record set from a url.

    If fails, retries.

    Args:
        session: aiohttp.ClientSession
        url: str

    Returns:
        list[dict[str, typing.Any]]: list of records
    """
    async with session.get(url) as resp:
        # if returns error, try again
        if resp.status != 200:
            return await get_record_set(session, url)
        resp_data: dict[str, typing.Any] = await resp.json()
        if "features" in resp_data:
            if len(resp_data["features"]) == 0:
                return []
            return [r["attributes"] for r in resp_data["features"]]
    return await get_record_set(session, url)

get_sync_records(config, current_index)

Get records from url synchronously.

This is a synchronous function to get records from a url for each dataset.

Parameters:

Name Type Description Default
config models.DataSource

DataSource object

required

Returns:

Name Type Description
int int

newly updated index

Source code in opendata_pipeline/fetch.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def get_sync_records(config: models.DataSource, current_index: int) -> int:
    """Get records from url synchronously.

    This is a synchronous function to get records from a url for each dataset.

    Args:
        config: DataSource object

    Returns:
        int: newly updated index
    """
    console.log(f"Fetching {config.name} records...")
    records = get_open_data_records(config)
    df = make_df_with_identifier(records, current_index)
    if config.name == "Cook County":
        # create composite drug column
        df = cook_county_drug_col(df)
    export_drug_df(df, config)
    export_jsonlines_from_df(df, config)
    return len(records)

make_df_with_identifier(records, current_index)

Make dataframe with case identifier.

Uses the current index to label records with a global identifier.

Parameters:

Name Type Description Default
records list[dict[str, typing.Any]]

list[dict[str, typing.Any]]

required
current_index int

int

required

Returns:

Type Description
pd.DataFrame

pd.DataFrame: dataframe with case identifier

Source code in opendata_pipeline/fetch.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def make_df_with_identifier(
    records: list[dict[str, typing.Any]], current_index: int
) -> pd.DataFrame:
    """Make dataframe with case identifier.

    Uses the current index to label records with a global identifier.

    Args:
        records: list[dict[str, typing.Any]]
        current_index: int

    Returns:
        pd.DataFrame: dataframe with case identifier
    """
    df = pd.DataFrame(records)
    df["CaseIdentifier"] = df.index + current_index
    return df

run(settings, update_remote=False) async

Fetch records from open data portal.

Parameters:

Name Type Description Default
settings models.Settings

Settings object

required
update_remote bool

whether to update the remote config.json or not

False
Source code in opendata_pipeline/fetch.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
async def run(settings: models.Settings, update_remote: bool = False) -> None:
    """Fetch records from open data portal.

    Args:
        settings (models.Settings): Settings object
        update_remote (bool): whether to update the remote config.json or not

    """
    total_records = 0
    for data_source in settings.sources:
        if data_source.is_async:
            record_count = await get_async_records(
                config=data_source, current_index=total_records
            )
            data_source.total_records = record_count
            total_records += record_count
        else:
            record_count = get_sync_records(
                config=data_source, current_index=total_records
            )
            data_source.total_records = record_count
            total_records += record_count

    console.log(f"Total records fetched: {total_records:,}")

    if update_remote:
        manage_config.update_remote_config(config=settings)
    else:
        manage_config.update_local_config(config=settings)