Skip to content

Geocode

This module geocodes the addresses.

Overview

This module handles geocoding of records.

It utilizes the ArcGIS geocoding API and asyncio to speed up the process.

build_url(bounds, address_data, key)

Build the geocoding url.

This uses the hardcoded fat_url and inserts the token (key) and then encodes the bounds and address data.

Parameters:

Name Type Description Default
bounds models.GeoBounds

The (rectangular) bounds to use for the geocoding.

required
address_data dict[str, str | int | None]

The address data to use for the geocoding.

required
key str

The ArcGIS token.

required

Returns:

Name Type Description
str str

The geocoding url.

Source code in opendata_pipeline/geocode.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def build_url(
    bounds: models.GeoBounds, address_data: dict[str, str | int | None], key: str
) -> str:
    """Build the geocoding url.

    This uses the hardcoded `fat_url` and inserts the token (`key`) and then encodes the bounds and address data.

    Args:
        bounds (models.GeoBounds): The (rectangular) bounds to use for the geocoding.
        address_data (dict[str, str | int | None]): The address data to use for the geocoding.
        key (str): The ArcGIS token.

    Returns:
        str: The geocoding url.
    """
    fat_url = f"https://geocode.arcgis.com/arcgis/rest/services/World/GeocodeServer/findAddressCandidates?f=json&outFields=none&outSR=4326&token={key}&forStorage=false&locationType=street&sourceCounty=USA&maxLocations=1&maxOutOfRange=false"

    # combines values that we have
    address_string = " ".join([str(v) for v in address_data.values() if v])

    # the string will get url encoded by requests, we needed to handle the data
    return f"{fat_url}&searchExtent={bounds.json()}&SingleLine={address_string}"

clean_address_string(s)

Clean street.

Removes any non-alphanumeric characters.

Returns None if the string is empty after cleaning.

Parameters:

Name Type Description Default
s str

The string to clean.

required

Returns:

Type Description
str | None

str | None: The cleaned string or None if the string is empty.

Source code in opendata_pipeline/geocode.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def clean_address_string(s: str) -> str | None:
    """Clean street.

    Removes any non-alphanumeric characters.

    Returns None if the string is empty after cleaning.

    Args:
        s (str): The string to clean.

    Returns:
        str | None: The cleaned string or None if the string is empty.
    """
    if s is None:
        return None
    s = s.lower().strip()
    if (
        "unk" in s
        or "n/a" in s
        or s == "same"
        or s == "none"
        or s == "undetermined"
        or s == "no scene"
    ):
        return None
    return s

export_geocoded_results(data)

Export geocoded results to file.

Source code in opendata_pipeline/geocode.py
128
129
130
131
132
def export_geocoded_results(data: list[dict[str, Any]]) -> None:
    """Export geocoded results to file."""
    with open(Path("data") / "geocoded_data.jsonl", "w") as f:
        for record in data:
            f.write(orjson.dumps(record).decode("utf-8") + "\n")

geocode_records(config, key) async

Geocode records for the data source.

Parameters:

Name Type Description Default
config models.DataSource

The data source config.

required
key str

The ArcGIS token.

required

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: The geocoded records.

Source code in opendata_pipeline/geocode.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
async def geocode_records(config: models.DataSource, key: str) -> list[dict[str, Any]]:
    """Geocode records for the data source.

    Args:
        config (models.DataSource): The data source config.
        key (str): The ArcGIS token.

    Returns:
        list[dict[str, Any]]: The geocoded records.
    """
    records = read_records(config)
    if config.spatial_config is None:
        raise ValueError("spatial_config is required for geocoding")

    console.log(f"Geocoding {len(records)} records from {config.name}...")
    async with aiohttp.ClientSession() as session:
        tasks = []
        for record in records:
            data = prepare_address(record, config)
            if data is None:
                continue
            id_, address_data = data
            url = build_url(
                bounds=config.spatial_config.bounds, address_data=address_data, key=key
            )
            tasks.append(
                get_geo_result(
                    session=session, url=url, id_=id_, data_source_name=config.name
                )
            )

        results: list[dict[str, Any]] = []
        for task in track(
            asyncio.as_completed(tasks),
            total=len(tasks),
            description=f"Geocoding records for {config.name}",
        ):
            result: dict[str, Any] | None = await task
            if result is not None:
                results.append(result)

    # this difference is due to cleaning
    print(f"Geocoded {len(results)} records out of {len(records)}")
    return results

get_geo_result(session, url, id_, data_source_name) async

Get the geocoding result from an async web request to ArcGIS.

If the request returns 200, will retry.

Will return None otherwise.

Parameters:

Name Type Description Default
session aiohttp.ClientSession

The aiohttp session to use for the request.

required
url str

The url to use for the request.

required
id_ int

The id of the record being geocoded.

required
data_source_name str

The name of the data source being geocoded.

required

Returns:

Type Description
dict[str, Any] | None

dict[str, Any] | None: The geocoding result or None if the request failed.

Source code in opendata_pipeline/geocode.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
async def get_geo_result(
    session: aiohttp.ClientSession, url: str, id_: int, data_source_name: str
) -> dict[str, Any] | None:
    """Get the geocoding result from an async web request to ArcGIS.

    If the request returns 200, will retry.

    Will return None otherwise.

    Args:
        session (aiohttp.ClientSession): The aiohttp session to use for the request.
        url (str): The url to use for the request.
        id_ (int): The id of the record being geocoded.
        data_source_name (str): The name of the data source being geocoded.

    Returns:
        dict[str, Any] | None: The geocoding result or None if the request failed.
    """
    async with session.get(url) as response:
        # if returns error, try again
        if response.status != 200:
            return await get_geo_result(session, url, id_, data_source_name)
        json_data = await response.json()
        if results := json_data.get("candidates", None):
            best = results[0]
            return {
                "CaseIdentifier": id_,
                "latitude": best["location"]["y"],
                "longitude": best["location"]["x"],
                "score": best["score"],
                "matched_address": best["address"],
                "data_source": data_source_name,
            }
    return None

prepare_address(record, config)

Prepare address for geocoding.

Parameters:

Name Type Description Default
record dict[str, Any]

The record to prepare.

required

Returns:

Type Description
tuple[Any, dict[str, Any]] | None

tuple[Any, dict[str, Any]] | None: The id and address data or None if the address is invalid.

Source code in opendata_pipeline/geocode.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def prepare_address(
    record: dict[str, Any], config: models.DataSource
) -> tuple[Any, dict[str, Any]] | None:
    """Prepare address for geocoding.

    Args:
        record (dict[str, Any]): The record to prepare.

    Returns:
        tuple[Any, dict[str, Any]] | None: The id and address data or None if the address is invalid.
    """
    if config.spatial_config is None:
        raise ValueError("spatial_config is required for geocoding")

    address_config = config.spatial_config.address_fields
    if address_config.street is None:
        raise ValueError("street field is required for geocoding")
    if address_config.city is None and address_config.zip is None:
        raise ValueError("city or zip field is required for geocoding")

    clean_street = clean_address_string(record[address_config.street])
    if clean_street is None:
        return None
    address_dict: dict[str, Any] = {}
    address_dict["Address"] = clean_street
    address_dict["City"] = (
        record[address_config.city] if address_config.city is not None else None
    )
    address_dict["State"] = (
        record[address_config.state] if address_config.state is not None else None
    )
    address_dict["ZIP"] = (
        record[address_config.zip] if address_config.zip is not None else None
    )
    return record["CaseIdentifier"], address_dict

read_records(config)

Read records from file.

Only reads records that have not been geocoded.

Only return fields that are needed for geocoding.

Source code in opendata_pipeline/geocode.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def read_records(config: models.DataSource) -> list[dict[str, Any]]:
    """Read records from file.

    Only reads records that have not been geocoded.

    Only return fields that are needed for geocoding.
    """
    console.log(f"Reading records from file for {config.name}")
    if config.spatial_config is None:
        raise ValueError("spatial_config is required for geocoding")
    fields: list[str | None] = [
        "CaseIdentifier",
        config.spatial_config.lat_field,
        config.spatial_config.lon_field,
        config.spatial_config.address_fields.street,
        config.spatial_config.address_fields.city,
        config.spatial_config.address_fields.state,
        config.spatial_config.address_fields.zip,
    ]

    records: list[dict[str, Any]] = []
    with open(Path("data") / config.records_filename, "r") as f:
        for line in f:
            line_data = orjson.loads(line)
            filtered_data = {
                k: v
                for k, v in line_data.items()
                if k in {f for f in fields if f is not None}
            }
            lat_value = filtered_data[config.spatial_config.lat_field]
            lon_value = filtered_data[config.spatial_config.lon_field]
            street_value = filtered_data[config.spatial_config.address_fields.street]
            # if value is 0 or None, we need to geocode
            # otherwise we can skip
            if (
                lat_value == 0
                or lon_value == 0
                or lat_value is None
                or lon_value is None
                or street_value is None
            ):
                records.append(filtered_data)
    return records

run(settings, alternate_key) async

Run the geocoding process.

Source code in opendata_pipeline/geocode.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
async def run(settings: models.Settings, alternate_key: str | None) -> None:
    """Run the geocoding process."""
    if settings.arcgis_api_key is None and alternate_key is None:
        raise ValueError(
            "arcgis_api_key is required for geocoding. Consider using the --alternate-key flag"
        )
    # initialize ARCGIS
    if settings.arcgis_api_key is not None:
        key = settings.arcgis_api_key
    elif alternate_key is not None:
        key = alternate_key
    else:
        raise ValueError("arcgis_api_key is required for geocoding")

    geocoded_results: list[dict[str, Any]] = []
    for data_source in settings.sources:

        if data_source.needs_geocoding:
            source_set = await geocode_records(data_source, key)
            geocoded_results.extend(source_set)

    console.log(f"Exporting {len(geocoded_results)} geocoded records")
    export_geocoded_results(geocoded_results)