about examining crime trends in your local area. You know that relevant data exists, and you have some basic analytical skills that you can use to analyze this data. However, this data is changing frequently, and you want to keep your analysis updated with the most recent crime incidents without repeating your analysis. How can we automate this process?
Well, if you’ve stumbled upon this article, you’re in luck! Together, we’ll walk through how to create a data pipeline to extract local police log data, and connect this to a visualization platform to examine local crime trends over time. For this article, we’ll extract data on incidents reported to the Cambridge (MA) Police Department (CPD), and then visualize this data as a dashboard in Metabase.

The final dashboard we’ll create to summarize recent and historical trends in CPD incidents.
Additionally, this article can serve as a general template for anybody looking to write ETL pipelines orchestrated in Prefect, and/or anybody who wants to connect Metabase to their data stores to create insightful analyses/reports.
Note: I have no affiliation with Metabase – we’ll simply use Metabase as an example platform to create our final dashboard. There are many other viable alternatives, which are described in this section.
Before we dive into the pipeline, it’ll be helpful to review the following concepts, or keep these links as reference as you read.
The data we’ll be working with contains a collection of police log entries, where each entry is a single incident reported to/by the CPD. Each entry contains comprehensive information describing the incident, including but not limited to:

A glimpse at the Cambridge Daily Police Log.
Check out the portal for more information about the data.
For monitoring crime trends in Cambridge, MA, creating a data pipeline to extract this data is appropriate, as the data is updated daily (according to their website). If the data was updated less frequently (e.g. annually), then creating a data pipeline to automate this process wouldn’t save us much effort. We could simply revisit the data portal at the end of each year, download the .csv, and complete our analysis.
Now that we’ve found the appropriate dataset, let’s walk through the implementation.
To go from raw CPD log data to a Metabase dashboard, our project will consist of the following major steps:
The data flow of our system will look like the following:

System data flow from data extraction to Metabase visualization.
Our pipeline follows an ETL workflow, which means that we’ll transform the data before importing it into PostgreSQL. This requires loading data into memory while executing data transformations, which may be problematic for large datasets that are too big to fit in memory. In this case, we may consider an ELT workflow, where we transform the data in the same infrastructure where it’s stored. Since our dataset is small (<10k rows), this shouldn’t be a problem, and we’ll take advantage of the fact that pandas makes data transformation easy.
We’ll extract the CPD log data by making a request for the dataset to the Socrata Open Data API. We’ll use sodapy — a python client for the API — to make the request.
We’ll encapsulate this extraction code in its own file — extract.py.
import pandas as pd
from sodapy import Socrata
from dotenv import load_dotenv
import os
from prefect import task
@task(retries=3, retry_delay_seconds=[10, 10, 10]) # retry API request in case of failure
def extract_data():
'''
Extract incident data reported to the Cambridge Police Department using the Socrata Open Data API.
Return the incident data as a Pandas DataFrame.
'''
# fetch Socrata app token from .env
# include this app token when interacting with the Socrata API to avoid request throttling, so we can fetch all the incidents
load_dotenv()
APP_TOKEN = os.getenv("SOCRATA_APP_TOKEN")
# create Socrata client to interact with the Socrata API (https://github.com/afeld/sodapy)
client = Socrata(
"data.cambridgema.gov",
APP_TOKEN,
timeout=30 # increase timeout from 10s default - sometimes, it takes longer to fetch all the results
)
# fetch all data, paginating over results
DATASET_ID = "3gki-wyrb" # unique identifier for Cambridge Police Log data (https://data.cambridgema.gov/Public-Safety/Daily-Police-Log/3gki-wyrb/about_data)
results = client.get_all(DATASET_ID)
# Convert to pandas DataFrame
results_df = pd.DataFrame.from_records(results)
return results_df
Notes about the code:
Now, we’ll do some basic data quality checks on the data.
The data is already fairly clean (which makes sense as it’s provided by the Cambridge Police Department). So, our data quality checks will act more as a “sanity check” that we didn’t ingest anything unexpected.
We’ll validate the following:
We’ll put this validation code in its own file — validate.py.
from datetime import datetime
from collections import Counter
import pandas as pd
from prefect import task
### UTILITIES
def check_valid_schema(df):
'''
Check whether the DataFrame content contains the expected columns for the Cambridge Police dataset.
Otherwise, raise an error.
'''
SCHEMA_COLS = ['date_time', 'id', 'type', 'subtype', 'location', 'last_updated', 'description']
if Counter(df.columns) != Counter(SCHEMA_COLS):
raise ValueError("Schema does not match with the expected schema.")
def check_numeric_id(df):
'''
Convert 'id' values to numeric.
If any 'id' values are non-numeric, replace them with NaN, so they can be removed downstream in the data transformations.
'''
df['id'] = pd.to_numeric(df['id'], errors='coerce')
return df
def verify_datetime(df):
'''
Verify 'date_time' values follow ISO 8601 format (https://www.iso.org/iso-8601-date-and-time-format.html).
Raise a ValueError if any of the 'date_time' values are invalid.
'''
df.apply(lambda row: datetime.fromisoformat(row['date_time']), axis=1)
def check_missing_values(df):
'''
Check whether there are any missing values in columns that require data.
For police logs, each incident should have a datetime, ID, incident type, and location.
'''
REQUIRED_COLS = ['date_time', 'id', 'type', 'location']
for col in REQUIRED_COLS:
if df[col].isnull().sum() > 0:
raise ValueError(f"Missing values are present in the '{col}' attribute.")
### VALIDATION LOGIC
@task
def validate_data(df):
'''
Check the data satisfies the following data quality checks:
- schema is valid
- IDs are numeric
- datetime follows ISO 8601 format
- no missing values in columns that require data
'''
check_valid_schema(df)
df = check_numeric_id(df)
verify_datetime(df)
check_missing_values(df)
return df
When implementing these data quality checks, it’s important to think about how to handle data quality checks that fail.
We’ll raise an error if:
For records that have non-numeric IDs, we’ll fill them with NaN placeholders and then remove them downstream in the transformation step. These records do not break our analysis if we simply remove them.
Now, we’ll do some transformations on our data to prepare it for storage in PostgreSQL.
We’ll do the following transformations:
We’ll put this transformation code in its own file — transform.py.
import pandas as pd
from prefect import task
### UTILITIES
def remove_duplicates(df):
'''
Remove duplicate rows from dataframe based on 'id' column. Keep the first occurrence.
'''
return df.drop_duplicates(subset=["id"], keep='first')
def remove_invalid_rows(df):
'''
Remove rows where the 'id' is NaN, as these IDs were identified as non-numeric.
'''
return df.dropna(subset='id')
def split_datetime(df):
'''
Split the date_time column into separate year, month, day, and time columns.
'''
# convert to datetime
df['date_time'] = pd.to_datetime(df['date_time'])
# extract year/month/day/time
df['year'] = df['date_time'].dt.year
df['month'] = df['date_time'].dt.month
df['day'] = df['date_time'].dt.day
df['hour'] = df['date_time'].dt.hour
df['minute'] = df['date_time'].dt.minute
df['second'] = df['date_time'].dt.second
return df
### TRANSFORMATION LOGIC
@task
def transform_data(df):
'''
Apply the following transformations to the passed in dataframe:
- deduplicate records (keep the first)
- remove invalid rows
- split datetime into year, month, day, and time columns
'''
df = remove_duplicates(df)
df = remove_invalid_rows(df)
df = split_datetime(df)
return df
Now our data is ready to import into into PostgreSQL.
Before we can import our data, we need to create our PostgreSQL instance. We’ll create one locally using a compose file. This file allows us to define & configure all the services that our application needs.
services:
postgres_cpd: # postgres instance for CPD ETL pipeline
image: postgres:16
container_name: postgres_cpd_dev
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: cpd_db
ports:
- "5433:5432" # Postgres is already on port 5432 on my local machine
volumes:
- pgdata_cpd:/var/lib/postgresql/data
restart: unless-stopped
pgadmin:
image: dpage/pgadmin4
container_name: pgadmin_dev
environment:
PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL}
PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD}
ports:
- "8081:80"
depends_on: # don't start pg_admin until our postgres instance is running
- postgres_cpd
volumes:
pgdata_cpd: # all data for our postgres_cpd service will be stored here
There are two main services defined here:
Let’s highlight some important configuration for our postgres_cpd service:
Now that we’ve created our PostgreSQL instance, we can execute queries against it. Importing our data into PostgreSQL requires executing two queries against the database:
Each time we execute a query against our PostgreSQL instance, we need to do the following:
Establish our connection to PostgreSQL.
Execute the query.
Commit the changes & close the connection.
from prefect import task from sqlalchemy import create_engine import psycopg2 from dotenv import load_dotenv import os
load_dotenv()
def create_postgres_table():
'''
Create the cpd_incidents table in Postgres DB (cpd_db) if it doesn't exist.
'''
# establish connection to DB
conn = psycopg2.connect(
host="localhost",
port="5433",
database="cpd_db",
user=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD")
)
# create cursor object to execute SQL
cur = conn.cursor()
# execute query to create the table
create_table_query = '''
CREATE TABLE IF NOT EXISTS cpd_incidents (
date_time TIMESTAMP,
id INTEGER PRIMARY KEY,
type TEXT,
subtype TEXT,
location TEXT,
description TEXT,
last_updated TIMESTAMP,
year INTEGER,
month INTEGER,
day INTEGER,
hour INTEGER,
minute INTEGER,
second INTEGER
)
'''
cur.execute(create_table_query)
# commit changes
conn.commit()
# close cursor and connection
cur.close()
conn.close()
@task
def load_into_postgres(df):
'''
Loads the transformed data passed in as a DataFrame
into the 'cpd_incidents' table in our Postgres instance.
'''
# create table to insert data into as necessary
create_postgres_table()
# create Engine object to connect to DB
engine = create_engine(f"postgresql://{os.getenv("POSTGRES_USER")}:{os.getenv("POSTGRES_PASSWORD")}@localhost:5433/cpd_db")
# insert data into Postgres DB into the 'cpd_incidents' table
df.to_sql('cpd_incidents', engine, if_exists='replace')
Things to note about the code above:
We’ve implemented the individual components of the ETL process. Now, we’re ready to encapsulate these components into a pipeline.
There are many tools available to use for orchestrating pipelines defined in python. Two popular options are Apache Airflow and Prefect.
For it’s simplicity, we’ll proceed with defining our pipeline using Prefect. We need to do the following to get started:
For more information on Prefect setup, check out the docs.
Next, we must add the following decorators to our code:
If you look back at our extract, validate, transform, and load code, you’ll see that we added the @task decorator to these functions.
Now, let’s define our ETL pipeline that executes these tasks. We’ll put the following in a separate file, etl_pipeline.py.
from extract import extract_data
from validate import validate_data
from transform import transform_data
from load import load_into_postgres
from prefect import flow
@flow(name="cpd_incident_etl", log_prints=True) # Our pipeline will appear as 'cpd_incident_etl' in the Prefect UI. All print outputs will be displayed in Prefect.
def etl():
'''
Execute the ETL pipeline:
- Extract CPD incident data from the Socrata API
- Validate and transform the extracted data to prepare it for storage
- Import the transformed data into Postgres
'''
print("Extracting data...")
extracted_df = extract_data()
print("Performing data quality checks...")
validated_df = validate_data(extracted_df)
print("Performing data transformations...")
transformed_df = transform_data(validated_df)
print("Importing data into Postgres...")
load_into_postgres(transformed_df)
print("ETL complete!")
if __name__ == "__main__":
# CPD data is expected to be updated daily (https://data.cambridgema.gov/Public-Safety/Daily-Police-Log/3gki-wyrb/about_data)
# Thus, we'll execute our pipeline on a daily basis (at midnight)
etl.serve(name="cpd-pipeline-deployment", cron="0 0 * * *")
Things to note about the code:

Our flow will show up on Prefect’s home page.

The “Deployments” tab in the Prefect UI shows us our deployed flows.
Now that we’ve created our pipeline to load our data into PostgreSQL, it’s time to visualize it.
There are many approaches we could take to visualize our data. Some notable options include:
Both are good options. Without going into too much detail behind each BI tool, we’ll use Metabase to make a simple dashboard.
In the future, if we want to have more customization over our visuals/reports, we can consider using other tools. For now, Metabase will do for creating a POC.
Metabase allows you to choose between using its cloud version or managing a self-hosted instance. Metabase Cloud offeres several payment plans, but you can create a self-hosted instance of Metabase for free using Docker. We’ll define our Metabase instance in our compose file.
Since we’re self-hosting, we also have to define the Metabase application database, which contains the metadata that Metabase needs to query your data sources (in our case, postgres_cpd).
services: postgres_cpd: # postgres instance for CPD ETL pipeline image: postgres:16 container_name: postgres_cpd_dev environment: POSTGRES_USER: ${POSTGRES_USER} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} POSTGRES_DB: cpd_db ports: - "5433:5432" # Postgres is already on port 5432 on my local machine volumes: - pgdata_cpd:/var/lib/postgresql/data restart: unless-stopped networks: - metanet1 pgadmin: image: dpage/pgadmin4 container_name: pgadmin_dev environment: PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL} PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD} ports: - "8081:80" depends_on: - postgres_cpd networks: - metanet1 metabase: # taken from https://www.metabase.com/docs/latest/installation-and-operation/running-metabase-on-docker image: metabase/metabase:latest container_name: metabase hostname: metabase volumes: - /dev/urandom:/dev/random:ro ports: - "3000:3000" environment: MB_DB_TYPE: postgres MB_DB_DBNAME: metabaseappdb MB_DB_PORT: 5432 MB_DB_USER: ${METABASE_DB_USER} MB_DB_PASS: ${METABASE_DB_PASSWORD} MB_DB_HOST: postgres_metabase # must match container name of postgres_mb (Metabase Postgres instance) networks: - metanet1 healthcheck: test: curl --fail -I http://localhost:3000/api/health || exit 1 interval: 15s timeout: 5s retries: 5 postgres_mb: # postgres instance for managing Metabase instance image: postgres:16 container_name: postgres_metabase # other services must use this name to communicate with this container hostname: postgres_metabase # internal identifier, doesn't impact communication with other services (helpful for logs) environment: POSTGRES_USER: ${METABASE_DB_USER} POSTGRES_DB: metabaseappdb POSTGRES_PASSWORD: ${METABASE_DB_PASSWORD} ports: - "5434:5432" volumes: - pgdata_mb:/var/lib/postgresql/data networks: - metanet1
volumes: pgdata_cpd: pgdata_mb:
networks: metanet1: driver: bridge # TO DO: 'bridge' is the default network - services will be able to communicate with each other using their service names
To create our Metabase instance, we made the following changes to our compose file:
Without going into too much detail, let’s break down the metabase and postgres_mb services.
Our Metabase instance (metabase):
For more information on how to run Metabase in Docker, check out the docs.
After setting up Metabase, you’ll be prompted to connect Metabase to your data source.

Metabase can connect to a wide variety of data sources.
After selecting a PostgreSQL data source, we can specify the following connection string to connect Metabase to our PostgreSQL instance, substituting your credentials as necessary:
postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@postgres_cpd:5432/cpd_db

Connecting to PostgreSQL in Metabase by specifying your connection string.
After setting up the connection, we can create our dashboard. You can create a wide variety of visuals in Metabase, so we won’t go into the specifics here.
Let’s revisit the example dashboard that we displayed at the beginning of this post. This dashboard nicely summarizes recent and historical trends in reported CPD incidents.

CPD Incident Trends dashboard created in Metabase.
From this dashboard, we can see the following:
Luckily for us, Metabase will query our database whenever we load this dashboard, so we won’t have to worry about this dashboard displaying stale data.
Check out the Git repo here if you want to dive deeper into the implementation.
Thanks for reading! Let’s briefly recap what we built:
There are many ways to build upon this project, including but not limited to:
If you have any other ideas for how to extend upon this project, or you would’ve built things differently, I’d love to hear it in the comments!
The author has created all images in this article.
Prefect:
Metabase:
Docker:
GitHub Repo:
CPD Daily Police Log Dataset: