NOTE: v2 is a major revamp. Documentation is currently WIP.
DataPusher+ is a fork of Datapusher that combines the speed and robustness of ckanext-xloader with the data type guessing of Datapusher - super-powered with the ability to infer, calculate & suggest metadata using Jinja2 formulas defined in the scheming configuration file.
Screen.Recording.2025-05-09.at.12.26.32.PM.mov
The Formulas have access to not just the package and resource fields (in the same namespaces), it also has access to the following information in these additional namespaces that can be used in Jinja2 expressions:
dpps- with the "s" for stats.
Each field will have an extensive list of summary statistics (by default: type, is_ascii, sum, min/max, range, sort_order, sortiness, min_length, max_length, sum_length, avg_length, stddev_length, variance_length, cv_length, mean, sem, geometric_mean, harmonic_mean, stddev, variance, cv, nullcount, max_precision, sparsity, cardinality, uniqueness_ratio.) Check here for all other available statistics.dppf- with the "f" for frequency table.
Each field will have its frequency table available sorted in descending order the top N (configurable, default 10) values, with a corresponding count & percentage. "Other (COUNT)" will be used as a "basket" for other values with COUNT set to the count of other values beyond the top N. ID fields will be indicated by "<ALL_UNIQUE>" in the table.dpp- additional inferred/calculated metadata.
ORIGINAL_FILE_SIZE(bytes)PREVIEW_FILE_SIZE(bytes)RECORD_COUNT(int)PREVIEW_RECORD_COUNT(int)IS_SORTED(bool)DEDUPED(bool)DUPE_COUNT(int: -1 if there are no dupes)- Date/DateTime metadata
DP+ can infer date/datetime columns - supporting 19 different formats. As it is a relatively expensive operation, it will only do so for candidate columns with names that fit a configurable pattern.DATE_FIELDS- a list of inferred date columnsNO_DATE_FIELDS(bool)DATETIME_FIELDS- a list of inferred datetime columnsNO_DATETIME_FIELDS(bool)
- Latitude/Longitude metadata
DP+ can infer the latitude and longitude columns based on the column's characteristics. A column is inferred to be a latitude/longitude column if:- its in a comma-separated priority-order list of lat/long name patterns
- for latitude, if its of type "Float" with a range of -90.0 to 90.0, and
- for longitude, if its a "Float" with a range of -180.0 to 180.0.
LAT_FIELDandLON_FIELD- the inferred lat/long columnsNO_LAT_LONG_FIELDS(bool)
Beyond the extensive list of built-in Jinja2 filters/functions, DP+ also supports an extensive list of additional custom filters/functions. Several of these helper functions make it trivially easy to calculate DCAT 3 recommended, optional properties that would ordinarily be too painstaking to manually compile (e.g dcat-us:GeographicBoundingBox, dcat:temporalResolution, dcat:startDate, dcat:endDate, etc. ).
There are two Formula types that are indicated by adding these keywords to the scheming yaml file:
formula- the formula will be evaluated at resource creation/update time and the result is assigned to the corresponding package/resource field immediately.suggest_formula- the formula will be evaluated at resource creation/update time and the result is stored in thedpp_suggestionspackage field as a compound JSON object.dpp_suggestionscontains all the suggestion for both package and resource fields. This field is parsed to show "Suggestions" during metadata entry for the associated package/resource field using the Suggestion UI (indicated by a function symbol next to the metadata field name).
Formulas that fail to evaluate will return with the #ERROR!: (reminiscent of Excel's #VALUE! function error) prefix followed by a detailed Jinja2 error message.
In addition, Datapusher+ is no longer a webservice, but a full-fledged CKAN extension. It drops usage of the deprecated CKAN Service Provider, with the unmaintained Messytables replaced by the blazing-fast qsv data-wrangling engine.
TxGIO/TWDB provided the use cases that informed and supported the development of Datapusher+, specifically, to support a Resource-first upload workflow.
For a more detailed overview, see the CKAN Monthly Live Jan 2023 presentation.
It features:
-
"Bullet-proof", ultra-fast data type inferencing with qsv
Unlike Messytables which scans only the the first few rows to guess the type of a column, qsv scans the entire table so its data type inferences are guaranteed1.
Despite this, qsv is still exponentially faster even if it scans the whole file, not only inferring data types, it also calculates summary statistics as well. For example, scanning a 2.7 million row, 124MB CSV file for types and stats took 0.16 seconds2.
It is very fast as qsv is written in Rust, is multithreaded, and uses all kinds of performance techniques especially designed for data-wrangling.
-
Exponentially faster loading speed
Similar to xloader, we use PostgreSQL COPY to directly pipe the data into the datastore, short-circuiting the additional processing/transformation/API calls used by Datapusher Plus.
But unlike xloader, we load everything using the proper data types and not as text, so there's no need to reload the data again after adjusting the Data Dictionary, as you would with xloader.
-
Far more Storage Efficient AND Performant Datastore with easier to compose SQL queries
As we create the Datastore tables using the most efficient PostgreSQL data type for each column using qsv's guaranteed type inferences - the Datastore is not only more storage efficient, it is also far more more performant for loading AND querying.
With its "smartint" data type (with qsv inferring the most efficient integer data type for the range of values in the column); comprehensive date format inferencing (supporting 19 date formats, with each format having several variants & with configurable DMY/MDY preference parsing) & auto-formatting dates to RFC3339 format so they are stored as Postgres timestamps; cardinality-aware, configurable auto-indexing; automatic sanitization of column names to valid PostgreSQL column identifiers; auto PostgreSQL vacuuming & analysis of resources after loading; and more - DP+ enables the Datastore to tap into PostgreSQL's full power.
Configurable auto-aliasing of resources also makes it easier to compose SQL queries, as you can use more intuitive resource aliases instead of cryptic resource IDs.
-
Production-ready Robustness
In production, the number one source of support issues is Datapusher - primarily, because of data quality issues and Datapusher's inability to correctly infer data types, gracefully handle errors3, and provide the Data Publisher actionable information to correct the data.
Datapusher+'s design directly addresses all these issues.
-
More informative datastore loading messages
Datapusher+ messages are designed to be more verbose and actionable, so the data publisher's user experience is far better and makes it possible to have a resource-first upload workflow.
-
Extended preprocessing with qsv
qsv is leveraged by Datapusher+ to:
- create "Smarter" Data Dictionaries, with:
- guaranteed data type inferences
- optional ability to automatically choose the best integer PostgreSQL data type ("smartint") based on the range of the numeric column (PostgreSQL's int, bigint and numeric types) for optimal storage/indexing efficiency and SQL query performance.
- sanitized column names (guaranteeing valid PostgreSQL column identifiers) while preserving the original column name as a label, which is used to label columns in DataTables_view.
- an optional "summary stats" resource as an extension of the Data Dictionary, with comprehensive summary statistics for each column - sum, min/max/range, min/max length, mean, stddev, variance, nullcount, sparsity, quartiles, IQR, lower/upper fences, skewness, median, mode/s, antimode/s & cardinality.
- convert Excel & OpenOffice/LibreOffice Calc (ODS) files to CSV, with the ability to choose which sheet to use by default (e.g. 0 is the first sheet, -1 is the last sheet, -2 the second to last sheet, etc.)
- convert SHP and GeoJSON files to CSV, with optional geometry simplification.
- decompress ZIP archives and insert the manifest as a CSV file with detailed metadata about the files in the archive. For ZIP archives with only one recognized file format, it can also automatically decompress the file and push that instead of the ZIP manifest into the Datastore.
- convert various date formats (19 date formats are recognized with each format having several variants; ~80 date format permutations in total) to a standard RFC 3339 format
- enable random access of a CSV by creating a CSV index - which also enables parallel processing of different parts of a CSV simultaneously (a major reason type inferencing and stats calculation is so fast)
- instantaneously count the number of rows with a CSV index
- validate if an uploaded CSV conforms to the RFC-4180 standard
- normalizes and transcodes CSV/TSV dialects into a standard UTF-8 encoded RFC-4180 CSV format
- optionally create a preview subset, with the ability to only download the first
npreview rows of a file, and not the entire file (e.g. only download first 1,000 rows of 3 gb CSV file - especially good for harvesting/cataloging external sites where you only want to harvest the metadata and a small sample of each file). - optionally create a preview subset from the end of a file (e.g. last 1,000 rows, good for time-series/sensor data)
- auto-index columns based on its cardinality/format (unique indices created for columns with all unique values, auto-index columns whose cardinality is below a given threshold; auto-index date columns)
- check for duplicates, and optionally deduplicate rows
- optionally screen for Personally Identifiable Information (PII), with an option to "quarantine" the PII-candidate rows in a separate resource, while still creating the screened resource.
- optional ability to specify a custom PII screening regex set, instead of the default PII screening regex set.
Even with all these pre-processing tasks, qsv typically takes less than 5 seconds to finish all its analysis tasks, even for a 100mb CSV file.
Future versions of Datapusher+ will further leverage qsv's 80+ commands to do additional preprocessing, data-wrangling and validation. The Roadmap is available here. Ideas, suggestions and your feedback are most welcome!
- create "Smarter" Data Dictionaries, with:
DataPusher+ supports an optional DRUF (Dataset Resource Upload First) workflow that allows users to upload data files before creating dataset metadata. This resource-first approach is particularly useful for:
- Data-driven workflows: Where the structure and content of the data informs the metadata
- Exploratory data publishing: When you want to examine the data before writing descriptions
- Simplified workflows: Reducing the cognitive load of filling out metadata forms upfront
When DRUF is enabled, the dataset creation workflow is modified:
- "Add Dataset" buttons redirect to a resource upload page instead of the metadata form
- Temporary datasets are automatically created with placeholder metadata
- Resource upload happens first, allowing DataPusher+ to analyze the data
- Metadata forms are enhanced with data-driven suggestions based on the uploaded content
- Form redirects guide users through a logical resource-first workflow
- To enable DRUF you need
DRUF compatable ckan version - You need to have scheming extension enabled and use the example DRUF compatable schema included in the dp+ extension.
Add the following configuration to your CKAN config file (e.g., /etc/ckan/default/ckan.ini):
# Enable DRUF (Dataset Resource Upload First) workflow
ckanext.datapusher_plus.enable_druf = true
ckanext.datapusher_plus.enable_form_redirect = trueDRUF is completely optional and disabled by default. When disabled:
- Standard CKAN dataset creation workflow is preserved
- No template modifications are applied
- Full backwards compatibility with existing CKAN installations
- CKAN 2.10+
- Python 3.10+
- tested and developed on Ubuntu 22.04.5
ckan.datastore.sqlsearch.enabledset totrueif you want to use thetemporal_resolutionandguess_accrual_periodicityFormula helpers- ckanext-scheming extension
Datapusher+ from version 1.0.0 onwards will be installed as a extension of CKAN, and will be available as a CKAN plugin. This will allow for easier integration with CKAN and other CKAN extensions.
- Install the required packages. We expect you are using a Linux distribution based on Ubuntu such as Ubuntu 24.04.
sudo apt install python3-virtualenv python3-dev python3-pip python3-wheel build-essential libxslt1-dev libxml2-dev zlib1g-dev git libffi-dev libpq-dev uchardet -y- Activate the CKAN virtual environment using at least python 3.10.
. /usr/lib/ckan/default/bin/activate- Install the extension using following commands:
cd /usr/lib/ckan/default/src
pip install -e "datapusher-plus@git+https://github.com/dathere/datapusher-plus.git@3.0.0"- Install the dependencies.
cd datapusher-plus
pip install -r requirements.txt
pip install -r requirements-dev.txt- Install qsv, such as the
qsvdpbinary and move it to/usr/local/bin/qsvdpfor access through thePATHenvironment variable.
qsv installation options (click here for more info)
If you are running a Debian-based Linux distribution on x86_64, you can quickly install qsv binaries including qsvdp using the following commands:
# Add the qsv repository to your sources list:
echo "deb [signed-by=/etc/apt/trusted.gpg.d/qsv-deb.gpg] https://dathere.github.io/qsv-deb-releases ./" > qsv.list
# Import trusted GPG key:
wget -O - https://dathere.github.io/qsv-deb-releases/qsv-deb.gpg | sudo apt-key add -
# Install qsv:
sudo apt update -y
sudo apt install qsv -yDownload the appropriate prebuilt binaries for your platform and copy
it to the appropriate directory. For example you can use the following commands for qsv v20.0.0 on x86_64 Linux (you can update the version 20.0.0 to the latest version available on the releases page):
wget https://github.com/dathere/qsv/releases/download/20.0.0/qsv-20.0.0-x86_64-unknown-linux-gnu.zip
unzip qsv-20.0.0-x86_64-unknown-linux-gnu.zip
rm qsv-20.0.0-x86_64-unknown-linux-gnu.zip
sudo mv qsv* /usr/local/binIf you get glibc errors when starting qsv, your Linux distro may not have the required version of the GNU C Library. If so, use the binaries ending with unknown-linux-musl instead as it they should be statically linked with the MUSL C Library.
ℹ️ NOTE: qsv's prebuilt binaries have the ability to self-update to the latest version. Just run qsv with the
--updateoption and it will check for the latest version and update itself as required.sudo qsvdp --update
Alternatively, if you want to install qsv from source, follow the instructions here. Note that when compiling from source, you may want to look into the Performance Tuning section to squeeze even more performance from qsv.
Also, if you get glibc errors when starting qsv, your Linux distro may not have the required version of the GNU C Library
(This will be the case when running Ubuntu 18.04 or older).
If so, use the unknown-linux-musl.zip archive as it is statically linked with the MUSL C Library.
If you already have qsv, update it to the latest release by using the --update option.
qsvdp --update
ℹ️ NOTE: qsv is a general purpose CSV data-wrangling toolkit that gets regular updates. To update to the latest version, just run qsv with the
--updateoption and it will check for the latest version and update as required.
Finally, you can build qsvdp from source. It has the additional benefit that the resulting binary will take advantage of all the machine's CPU features, making qsv and DP+ even faster, but may take up to 30 minutes to compile.
git clone https://github.com/dathere/qsv.git
cd qsv
# install Rust, if it's not installed
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# build qsvdp
CARGO_BUILD_RUSTFLAGS='-C target-cpu=native' cargo build --release --locked --bin qsvdp -F datapusher_plus
sudo cp target/release/qsvdp /usr/local/bin
cargo clean- Make sure CKAN is running (e.g. through
ckan -c /etc/ckan/default/ckan.ini runafter activating your virtual environment) then while CKAN is running create an API token for the DP+ Service account which this command automatically adds the relevant configckanext.datapusher_plus.api_tokenline to your CKAN config file/etc/ckan/default/ckan.ini. ReplaceCKAN_ADMINin the following command with an existing CKAN user with sysadmin privileges.
ckan config-tool /etc/ckan/default/ckan.ini "ckanext.datapusher_plus.api_token=$(ckan -c /etc/ckan/default/ckan.ini user token add CKAN_ADMIN dpplus | tail -n 1 | tr -d '\t')"
- Add the rest of the DP+ config to your CKAN config (e.g.
/etc/ckan/default/ckan.ini):
# datapusher-plus settings
ckanext.datapusher_plus.use_proxy = false
ckanext.datapusher_plus.download_proxy =
ckanext.datapusher_plus.ssl_verify = false
# supports INFO, DEBUG, TRACE - use DEBUG or TRACE when debugging scheming Formulas
ckanext.datapusher_plus.upload_log_level = INFO
ckanext.datapusher_plus.formats = csv tsv tab ssv xls xlsx xlsxb xlsm ods geojson shp qgis zip
ckanext.datapusher_plus.pii_screening = false
ckanext.datapusher_plus.pii_found_abort = false
ckanext.datapusher_plus.pii_regex_resource_id_or_alias =
ckanext.datapusher_plus.pii_show_candidates = false
ckanext.datapusher_plus.pii_quick_screen = false
ckanext.datapusher_plus.qsv_bin = /usr/local/bin/qsvdp
ckanext.datapusher_plus.preview_rows = 100
ckanext.datapusher_plus.download_timeout = 300
ckanext.datapusher_plus.max_content_length = 1256000000000
ckanext.datapusher_plus.chunk_size = 16384
ckanext.datapusher_plus.default_excel_sheet = 0
ckanext.datapusher_plus.sort_and_dupe_check = true
ckanext.datapusher_plus.dedup = false
ckanext.datapusher_plus.unsafe_prefix = unsafe_
ckanext.datapusher_plus.reserved_colnames = _id
ckanext.datapusher_plus.prefer_dmy = false
ckanext.datapusher_plus.ignore_file_hash = true
ckanext.datapusher_plus.auto_index_threshold = 3
ckanext.datapusher_plus.auto_index_dates = true
ckanext.datapusher_plus.auto_unique_index = true
ckanext.datapusher_plus.summary_stats_options =
ckanext.datapusher_plus.add_summary_stats_resource = false
ckanext.datapusher_plus.summary_stats_with_preview = false
ckanext.datapusher_plus.qsv_stats_string_max_length = 32767
ckanext.datapusher_plus.qsv_dates_whitelist = date,time,due,open,close,created
ckanext.datapusher_plus.qsv_freq_limit = 10
ckanext.datapusher_plus.auto_alias = true
ckanext.datapusher_plus.auto_alias_unique = false
ckanext.datapusher_plus.copy_readbuffer_size = 1048576
ckanext.datapusher_plus.type_mapping = {"String": "text", "Integer": "numeric","Float": "numeric","DateTime": "timestamp","Date": "date","NULL": "text"}
ckanext.datapusher_plus.auto_spatial_simplication = true
ckanext.datapusher_plus.spatial_simplication_relative_tolerance = 0.1
ckanext.datapusher_plus.latitude_fields = latitude,lat
ckanext.datapusher_plus.longitude_fields = longitude,long,lon
ckanext.datapusher_plus.jinja2_bytecode_cache_dir = /tmp/jinja2_butecode_cache
ckanext.datapusher_plus.auto_unzip_one_file = trueAlso add this entry to your CKAN's resource_formats.json file for ckanext.datapusher_plus.formats to work as expected with tab files.
["TAB", "Tab Separated Values File", "text/tab-separated-values", []],
See the configuration section below for more information.
- Optionally add DRUF mode to your CKAN config:
# Enable DRUF (Dataset Resource Upload First) workflow for the DataPusher+ CKAN extension
ckanext.datapusher_plus.enable_druf = true
ckanext.datapusher_plus.enable_form_redirect = true- Set up the database for
datapusher_plus:
ckan -c /etc/ckan/default/ckan.ini db upgrade -p datapusher_plus- If you get
Missing valuefor multiple fields as ackan.logic.ValidationError, temporarily you can addvalidators: ignore_missingfor those fields in their YAML schema file used in ckanext-scheming and you may also need to setrequired: False. - Make sure you enable the FileStore for allowing file uploads (the
ckan.uploads_enabledvariable is available in your CKAN config already and you should set it totrue). You'll also need to update FileStore storage permissions as per the docs, for example replace the Linux usernamerzmkto your username in the following commands:
sudo chown rzmk /var/lib/ckan/default
sudo chmod -R u+rwx /var/lib/ckan/default- Make sure you enable the Datastore plugin.
- In a separate terminal, start a Prefect worker subscribed to the
DP+ work pool. v3.0 replaced the RQ
ckan jobs workerwith a Prefect worker; see the Worker lifecycle section below for the full command and lifecycle details. Quick reference:
PREFECT_API_URL=http://prefect-server:4200/api \
CKAN_INI=/etc/ckan/default/ckan.ini \
prefect worker start --pool datapusher-plusAdd datapusher_plus to the plugins in your CKAN configuration file
(generally located at /etc/ckan/default/ckan.ini):
ckan.plugins = <other plugins> datapusher_plusUse a DP+ extended scheming schema:
scheming.dataset_schemas = ckanext.datapusher_plus:dataset-druf.yamlConfigure DP+ numerous settings. See config.py for details.
ckanext.datapusher_plus.use_proxy = false ckanext.datapusher_plus.download_proxy = ckanext.datapusher_plus.ssl_verify = false # supports INFO, DEBUG, TRACE - use DEBUG or TRACE when debugging scheming Formulas ckanext.datapusher_plus.upload_log_level = INFO ckanext.datapusher_plus.formats = csv tsv tab ssv xls xlsx xlsxb xlsm ods geojson shp qgis zip ckanext.datapusher_plus.pii_screening = false ckanext.datapusher_plus.pii_found_abort = false ckanext.datapusher_plus.pii_regex_resource_id_or_alias = ckanext.datapusher_plus.pii_show_candidates = false ckanext.datapusher_plus.pii_quick_screen = false ckanext.datapusher_plus.qsv_bin = /usr/local/bin/qsvdp ckanext.datapusher_plus.preview_rows = 100 ckanext.datapusher_plus.download_timeout = 300 ckanext.datapusher_plus.max_content_length = 1256000000000 ckanext.datapusher_plus.chunk_size = 16384 ckanext.datapusher_plus.default_excel_sheet = 0 ckanext.datapusher_plus.sort_and_dupe_check = true ckanext.datapusher_plus.dedup = false ckanext.datapusher_plus.unsafe_prefix = unsafe_ ckanext.datapusher_plus.reserved_colnames = _id ckanext.datapusher_plus.prefer_dmy = false ckanext.datapusher_plus.ignore_file_hash = true ckanext.datapusher_plus.auto_index_threshold = 3 ckanext.datapusher_plus.auto_index_dates = true ckanext.datapusher_plus.auto_unique_index = true ckanext.datapusher_plus.summary_stats_options = ckanext.datapusher_plus.add_summary_stats_resource = false ckanext.datapusher_plus.summary_stats_with_preview = false ckanext.datapusher_plus.qsv_stats_string_max_length = 32767 ckanext.datapusher_plus.qsv_dates_whitelist = date,time,due,open,close,created ckanext.datapusher_plus.qsv_freq_limit = 10 ckanext.datapusher_plus.auto_alias = true ckanext.datapusher_plus.auto_alias_unique = false ckanext.datapusher_plus.copy_readbuffer_size = 1048576 ckanext.datapusher_plus.type_mapping = {"String": "text", "Integer": "numeric","Float": "numeric","DateTime": "timestamp","Date": "date","NULL": "text"} ckanext.datapusher_plus.auto_spatial_simplication = true ckanext.datapusher_plus.spatial_simplication_relative_tolerance = 0.1 ckanext.datapusher_plus.latitude_fields = latitude,lat ckanext.datapusher_plus.longitude_fields = longitude,long,lon ckanext.datapusher_plus.jinja2_bytecode_cache_dir = /tmp/jinja2_butecode_cache ckanext.datapusher_plus.auto_unzip_one_file = true ckanext.datapusher_plus.api_token = <CKAN service account token for CKAN user with sysadmin privileges> ckanext.datapusher_plus.describeGPT_api_key = <Token for OpenAI API compatible service>and add this entry to your CKAN's
resource_formats.jsonfile.["TAB", "Tab Separated Values File", "text/tab-separated-values", []],
Any file that has one of the supported formats (defined in ckanext.datapusher_plus.formats) will be attempted to be loaded into the DataStore.
You can also manually trigger resources to be resubmitted. When editing a resource in CKAN (clicking the "Manage" button on a resource page), a new tab named "DataStore" will appear. This will contain a log of the last attempted upload and a button to retry the upload. Once a resource has been "pushed" into the Datastore, a "Data Dictionary" tab will also be available where the data pusblisher can fine-tune the inferred data dictionary.
Run the following command to submit all resources to datapusher, although it will skip files whose hash of the data file has not changed:
ckan -c /etc/ckan/default/ckan.ini datapusher_plus resubmitTo Resubmit a specific resource, whether or not the hash of the data file has changed:
ckan -c /etc/ckan/default/ckan.ini datapusher_plus submit {dataset_id}DataPusher+ v3.0 replaces the v2 RQ-based background worker with a Prefect 3 flow. RQ is no longer used by DP+ itself (CKAN continues to ship RQ for unrelated extensions).
- Customizable workflows — operators compose their own ingestion flows from DP+'s
@taskprimitives without forking the code. - Graceful failure & recoverable ingestion — per-task retries with exponential backoff, transactional rollback of partial datastore writes, result persistence so the Prefect UI's "Re-run from failed task" replays only the failed and downstream tasks, content-based caching that skips re-downloading unchanged resources.
- Observability — every flow run, every task, every retry, every log line is visible in the Prefect UI. Each successful run attaches a Markdown "Data Quality Report" artifact and emits a
datapusher.resource.ingestedevent that operators can wire into Automations (Slack, PagerDuty, search reindex, DCAT refresh) without DP+ hard-coding a specific alerting backend. - Horizontal scaling — add Prefect workers on additional hosts to scale ingestion throughput. Tag-based concurrency limits cap concurrent Postgres COPY operations so a burst of submissions doesn't flatten the datastore.
+--------+ submit_flow_run +----------------+ poll +-----------------+
| CKAN | ------------------------> | Prefect server | <--------------- | Prefect worker |
+--------+ datapusher_hook | (UI on 4200) | flow runs | (runs the flow) |
^ POST callback +----------------+ +-----------------+
| |
+----------------------------------------+
Postgres (shared)
The CKAN web request handler synchronously POSTs the run to the Prefect server and returns immediately with a flow_run_id. A separately-managed worker process executes the flow. Prefect 3's default Postgres-backed server is sufficient — no Redis required.
# 1. Bring up Prefect server + worker + Postgres
docker compose -f docker-compose.prefect.yaml up -d
# 2. Register the DataPusher+ deployment with the Prefect server
ckan -c /etc/ckan/default/ckan.ini datapusher_plus prefect-deploy
# 3. Submit a resource via the CKAN UI — you should now see the flow
# run in the Prefect UI at http://localhost:4200For production, run your own Prefect server (or use Prefect Cloud) and configure PREFECT_API_URL to point at it.
The CKAN-side RQ worker (ckan jobs worker) is no longer used for DP+ jobs. Replace it with:
PREFECT_API_URL=http://prefect-server:4200/api \
CKAN_INI=/etc/ckan/default/ckan.ini \
prefect worker start --pool datapusher-plusScale horizontally by starting workers on additional hosts pointed at the same Prefect server and work pool. Workers must have the datapusher-plus Python package installed and CKAN configuration accessible (it's read at flow-run start for resource metadata and the qsv binary path).
A one-shot CLI helper handles the cutover:
ckan -c /etc/ckan/default/ckan.ini datapusher_plus migrate-from-rq --resubmitWhat it does:
- Drains any pending DP+ jobs from CKAN's RQ queue.
- Resets any
pendingtask_status rows on the CKAN side so the UI no longer falsely shows in-flight ingestions. - Verifies the configured Prefect server is reachable.
- With
--resubmit, re-submits each affected resource_id through the new Prefect path.
After it finishes: stop your CKAN-side ckan jobs worker process (or leave it if other extensions use RQ — DP+ jobs no longer flow through it), then start prefect worker start -p datapusher-plus.
Historical Jobs/Logs rows from the RQ era remain queryable through datapusher_status and the CKAN UI without modification — the v2 database schema needs no migration.
Each successful flow run attaches:
- A Data Quality Markdown artifact — row count, inferred schema, PII findings, quarantine count.
- A CKAN resource link artifact — one-click jump back to the resource page in CKAN.
- A
datapusher.resource.ingestedcustom event with payload{rows, file_hash, duration_seconds}— wire this into Prefect Automations for downstream side effects.
The validation and PII tasks emit additional events (datapusher.row.quarantined, datapusher.pii.detected).
Two example automations ship in examples/automations/:
# Alert on 3 consecutive failures for the same resource within an hour
prefect automation create -f examples/automations/alert-on-consecutive-failures.json
# Alert on Crashed state (worker killed, OOM, infrastructure error)
prefect automation create -f examples/automations/alert-on-crashed.jsonReplace REPLACE_WITH_NOTIFICATION_BLOCK_ID in those files with the ID of a notification Block (Slack, PagerDuty, email) you've registered in the Prefect UI before applying.
Operators compose their own ingestion flow by importing DP+'s @task primitives and registering the flow via config:
# my_plugin/flows.py
from prefect import flow
from ckanext.datapusher_plus.jobs.prefect_flow import (
download_task, format_convert_task, validate_task, analyze_task,
database_task, indexing_task, formula_task, metadata_task,
)
from prefect.transactions import transaction
@flow(name="my-custom-datapusher")
def my_custom_flow(job_input):
dl = download_task(job_input)
cv = format_convert_task(dl)
# ... insert your own redaction / enrichment / spatial tasks here ...
vl = validate_task(cv)
an = analyze_task(vl)
with transaction():
db = database_task(an)
idx = indexing_task(db)
fm = formula_task(idx)
md = metadata_task(fm)Then point DP+ at it in ckan.ini:
ckanext.datapusher_plus.prefect_flow = my_plugin.flows:my_custom_flow…and re-run ckan datapusher_plus prefect-deploy. Submissions via the CKAN UI now invoke your flow. The existing IDataPusher.can_upload / after_upload plugin hooks continue to fire — custom flows are additive, not exclusive.
DP+ ships two subflow primitives in ckanext.datapusher_plus.jobs.subflows for custom flow authors who want per-domain work to appear as its own Prefect-UI run row (with its own retries, concurrency limits, and logs):
| Subflow | Wraps | Use when… |
|---|---|---|
pii_screening_subflow(csv_path, resource, temp_dir) |
screen_for_pii |
You want PII screening to retry / observe independently of analysis, or you're A/B-testing a custom screening implementation. |
spatial_processing_subflow(input_path, resource_format, output_csv_path=None, tolerance=0.001) |
process_spatial_file |
You want spatial conversion (Shapefile/GeoJSON → CSV) to retry / observe independently of the broader format-conversion task. |
Both return a JSON-encodable dict ({"pii_found": …, "pii_candidate_count": …} and {"success": …, "error_message": …, "bounds": …}) so Prefect's result-serialization works without extra schema setup.
Example — replacing the inline PII call inside a custom analyze task:
from prefect import task
from ckanext.datapusher_plus.jobs.subflows import pii_screening_subflow
@task
def my_analyze_task(prev):
ctx = ... # build / rehydrate the runtime context
# ... your stats / type-inference work ...
pii = pii_screening_subflow(
csv_path=ctx.tmp, resource=ctx.resource, temp_dir=ctx.temp_dir,
)
if pii["pii_found"]:
... # gate / suspend / route as you likeThe default DP+ flow does NOT call these subflows — it inlines the underlying helpers inside its existing task bodies to avoid restructuring the working pipeline. The subflows are entry points for custom-flow authors who want the independent-observability win.
| Key | Default | Purpose |
|---|---|---|
ckanext.datapusher_plus.prefect_deployment_name |
datapusher-plus/datapusher-plus |
Fully-qualified Prefect deployment name (<flow>/<deployment>). |
ckanext.datapusher_plus.prefect_work_pool |
datapusher-plus |
Work-pool name workers subscribe to. |
ckanext.datapusher_plus.prefect_flow |
(unset) | module.path:flow_name entrypoint of a custom flow. |
ckanext.datapusher_plus.prefect_ui_base |
(unset) | Base URL of the Prefect UI (e.g. http://prefect-server:4200) — when set, datapusher_status returns a job_url that deep-links into the run page. |
ckanext.datapusher_plus.flow_timeout |
7200 |
Outer flow-run timeout in seconds. Replaces v2's ckan.datapusher.timeout. |
ckanext.datapusher_plus.max_quarantine_pct |
5.0 |
Maximum percentage of rows that may be quarantined before the flow fails. |
ckanext.datapusher_plus.pii_review_threshold |
0 |
When > 0 and PII screening detects this many sensitive fields, the flow suspends for human approval via the Prefect UI. |
ckanext.datapusher_plus.result_storage_block |
local-file-system/datapusher-plus-results |
Prefect Block for task-result persistence. Swap to S3/GCS for multi-host worker pools. |
ckanext.datapusher_plus.use_truncate_freeze |
true |
When true, run TRUNCATE + COPY ... WITH FREEZE in one transaction (faster ingestion via FREEZE, but holds AccessExclusive lock for the full COPY). Set to false on read-heavy datastores to release the lock between TRUNCATE and COPY — concurrent SELECTs keep working at the cost of losing the FREEZE speedup. |
Worker-process env-var fallbacks (read when CKAN config isn't loaded in the worker):
DATAPUSHER_PLUS_FLOW_TIMEOUT_SECONDSDATAPUSHER_PLUS_DOWNLOAD_RETRIESDATAPUSHER_PLUS_DATABASE_RETRIESDATAPUSHER_PLUS_CACHE_TTL_HOURS(default 24)DATAPUSHER_PLUS_MAX_PERSIST_FILE_MB(default 512;0disables the cap)DATAPUSHER_PLUS_MAX_QUARANTINE_PCTDATAPUSHER_PLUS_RESULT_STORAGE_BLOCK
Prefect Variables (optional, highest priority): the flow looks up these
Variable names at run start and uses the value when set. Set / change them
from the Prefect UI (Variables tab) or prefect variable set NAME VALUE;
takes effect on the next flow run without a worker restart.
| Variable | Overrides |
|---|---|
dpp_flow_timeout |
ckanext.datapusher_plus.flow_timeout / DATAPUSHER_PLUS_FLOW_TIMEOUT_SECONDS |
dpp_download_retries |
ckanext.datapusher_plus.download_retries / DATAPUSHER_PLUS_DOWNLOAD_RETRIES |
Resolution order: Prefect Variable -> env var -> ckan.ini -> built-in default. Variable lookup failures (Prefect server unreachable, name absent, value not int-parseable) silently fall through to the next priority — operators with no Prefect Variables set see no behaviour change.
| Symptom | Likely cause | Fix |
|---|---|---|
datapusher_submit returns False with a Prefect connection error in the CKAN log |
The Prefect server is unreachable from CKAN | Check PREFECT_API_URL and that the Prefect server is healthy at <API>/health. |
Flow run sits in Scheduled forever |
No worker is polling the configured work pool | Start prefect worker start -p datapusher-plus on a host with the datapusher-plus package installed. |
Flow run goes straight to Failed with "QSV binary not found" |
The worker process can't see the qsv binary | Set ckanext.datapusher_plus.qsv_bin in the CKAN config the worker reads, or install qsv in the worker's PATH. |
| Re-run from a failed task re-downloads the file | Result storage block isn't registered, so persisted results aren't being read | Re-run ckan datapusher_plus prefect-deploy — it calls ensure_result_storage_block. |
| Pre-existing partial table after a failed run | Database on_rollback couldn't drop it (e.g., Postgres unreachable at rollback time) |
Drop manually with datastore_delete; the next submit recreates from scratch. |
This material is copyright (c) 2025, datHere, Open Knowledge Foundation and other contributors
It is open and licensed under the GNU Affero General Public License (AGPL) v3.0 whose full text may be found at:
http://www.fsf.org/licensing/licenses/agpl-3.0.html
Footnotes
-
Why use qsv instead of a "proper" python data analysis library like pandas? ↩
-
It takes 0.16 seconds with an index to run
qsv statsagainst the qsv whirlwind tour sample file on a Ryzen 4800H (8 physical/16 logical cores) with 32 gb memory and a 1 TB SSD. Without an index, it takes 1.3 seconds. ↩ -
Imagine you have a 1M row CSV, and the last row has an invalid value for a numeric column (e.g. "N/A" instead of a number). After spending hours pushing the data very slowly, legacy datapusher will abort on the last row and the ENTIRE job is invalid. Ok, that's bad, but what makes it worse is that the old table has been deleted already, and Datapusher doesn't tell you what caused the job to fail! YIKES!!!! ↩

