Get Dagster running in 5 minutes
Dagster describes itself as ‘the data orchestration tool designed for productivity’. And it’s a great tool. It’s also one with a steep learning curve. While learning Dagster, I found myself hunting for a simple example of a deployable data pipeline. One Minimum Viable Product that just works. I couldn’t find one, so I built it myself.
Here’s the Github repo: https://github.com/bakerwho/dagster-mvp. If you follow this 5-minute read, you will know what’s good about Dagster and how to get a simple Dagster pipeline running.
Hopefully this will save you a lot of trial-and-error learning by giving you an example that works, from start to finish.
The embarrassingly simple pipeline we will run today performs three steps or op
s:
- Select 1 of 2 sentences based on an input
key
- Apply upper- or lower-case ‘normalization’
- Strips the sentence of all punctuation
We build the pipeline as a Dagster graph
that chains these 3 op
s. We then configure the graph
into a runnable job
, and run the job
. We also build a scheduler
that can run this job every minute.
Key Dagster concepts
Dagster lets you build data pipelines and orchestrate their execution.
A data pipeline is a set of compute operations that gets data from a source, transforms it to increase its value, and stores the finished ‘data product’ somewhere where it will be useful.
- The source could be a database, a file, or even the output of another pipeline
- The transformation could be one or more of data cleaning, visualization, normalization, ML training/inference, etc.
- The ‘data product’ could be ML predictions, BI dashboards, (I can’t think of a third example rn)
It’s useful to think of a data product as the end result because products have functions and any data pipeline should have a very clear function.
Next, a key concept in Dagster is the DAG.
A DAG (directed acyclic graph) is a set of nodes (say, A, B, C) and directed edges (directed edge A→B does not imply directed edge B→A) between them with no cycles (edges A→B→C implies no edge C→A).
If you want to know more, go to Wikipedia.
Putting it together
The point is, data pipelines should absolutely always be designed as DAGs. Why? Because data ops should flow in one direction without forming cycles.
Dagster lets you design your data graph
as a DAG of op
s. You then configure a graph
into a runnable job
. You can run a job
s with a direct call from Python/the CLI - you can also orchestrate it with complicated dependencies.
Other tools that do something similar to Dagster are Apache Airflow, Spotify’s Luigi, Lyft’s Flyte, and the proprietary software Prefect. If you’ve used any of these and prefer it to Dagster, let us know why (except Prefect, I don’t care about that one). Type a comment below!
Run the Dagster job
Step 0 and the most important step is to pip install dagster dagit
- Clone the repo and
cd
to it using the following Terminal commands:git clone https://github.com/bakerwho/dagster-mvp cd dagster-mvp ls
It should look something like this:
. ├── README.md ├── dagster-exec │ ├── dagster.yaml │ └── workspace.yaml └── pipeline_1.py
dagster-exec
is where we will store all execution information, logs, etc. Everything outside of that can be ML code. Note thatdagster-exec
can have any name and can be stored anywhere on the machine. The only requirement is that it should contain aworkspace.yaml
. -
Edit 2 crucial lines to point to the correct filepaths.
Step 2A:
.example_envrc
is a file containing the ENVIRONMENT variables you need to set to run your Dagster project. Some of these, like the AWS and Snowflake credentials are for illustratory purposes. However, Dagster does prefer that you setDAGSTER_HOME
variable with the path to your localdagster-mvp/dagster-exec
directoryexport DAGSTER_HOME="/path/to/dagster-mvp/dagster-exec"
Once you have done this, activate these environment variables by running:
source .example_envrc
Pro tip: You can use
direnv
(website here) to automatically set environment variables in a.envrc
file that persist within the scope of the parent directory and all subdirectories.Step 2B:
dagster-exec/workspace.yaml
contains the following line. Edit it to point to the correct.py
file and repo within it as anattribute
. In this case, it isrepo_1
defined inpipeline_1.py
:load_from: - python_file: relative_path: "/path/to/dagster-mvp/pipeline_1.py" attribute: repo_1
Over time, your codebase will grow. You might introduce support for different encodings. Or, y’know, build an actual ML application. When that happens, you can load more repos from other locations, or even from installed Python packages.
-
Go through the
dagster
constructs inpipeline_1.py
There are a few useful ones:
- a fake
resource
calledconnection
: it takes a stringcredentials
and turns it to uppercase). In a real use-case, you could use aresource
for a database connection. - 3
op
s:get_string()
selects one of two strings,normalize_string()
turns an input string into upper or lowercase, andclean_string
strips an input string of punctuation. - a custom
IOManager
: an IOManager is attached to anop
output, and it runs between the end of an upstream op and the start of a downstream op. This is a slightly tricky concept, but basically Dagster always wants to persist op results to memory - it is concerned that you will lose track of them unless they are saved to disk. The default IOManager pickles the results, whatever they may be. Like civilized ML Engineers, we want to write our string outputs to.txt
, so we build something calledCustomIOManager
. - the resource definitions and configuration needed to turn the
clean_string_graph
intoclean_string_job
usingclean_string_graph.to_job(resource_defs, config)
- a
schedule
with the appropriate CRON string to runclean_string_job
every minute
- a fake
3 ways to run this pipeline
-
Run it in Python
Start a Python shell in
dagster-mvp
and run:from pipeline_1 import clean_string_job clean_string_job.execute_in_process()
-
Run it from the command line
dagster job execute clean_string_job
If this doesn’t work, double check the env variable
DAGSTER_HOME
. -
Run it from a pretty UI
Run
dagit
to spin up a pretty local orchestration server.dagit
Go to localhost:3000 in your browser.
You should be able to see your pipelines and jobs. Click on
clean_string_job
and go to the ‘Launchpad’. From here, you should be able to do a number of things, like edit the config, run a job, or start a schedule. -
(Optional) Play with the scheduler
Dagit’s UI also has a Schedules page. Click on it and you’ll see settings that let you turn the scheduler on and off. The scheduler is currently set to run every minute, but you can play with the Cron string in the Schedule definition in
pipeline_1.py
.Extra credit assignment: changing code between or during scheduled runs leads to interesting results - play with the orchestrator to see what happens!
Proof it ran: check the logs
Once you run a job, you will notice some interesting changes to your dagster-exec
folder.
There will suddenly appear 3 subdirectories, data
, logs
and storage
. If you explore the structure, it should look like this:
├── data
│ └── c47d674e-b547-4c81-807a-b2cd823a674b
│ ├── sent.txt
│ └── sent_clean.txt
├── logs
│ ├── events
│ │ ├── c47d674e-b547-4c81-807a-b2cd823a674b.db
│ │ └── index.db
│ ├── runs
│ │ └── runs.db
│ └── schedules
│ └── schedules.db
├── storage
│ └── c47d674e-b547-4c81-807a-b2cd823a674b
│ ├── compute_logs
│ │ ├── clean_string.complete
│ │ ├── clean_string.err
│ │ ├── clean_string.out
│ │ ├── get_string.complete
│ │ ├── get_string.err
│ │ ├── get_string.out
│ │ ├── normalize_string.complete
│ │ ├── normalize_string.err
│ │ └── normalize_string.out
│ └── normalize_string
│ └── sent_norm
Here, c47d674e-b547-4c81-807a-b2cd823a674b
is the run_id
for your run.
Logs
Dagster logs pretty much everything about your run in a nice, SQLite .db
format under logs
. It also stores .err
, .out
and .complete
compute logs for each op.
logs
and storage
are locations set in YAML in dagster-exec/dagster.yaml
. You don’t need to set them, but then Dagster will store them automatically in the default location.
IOManagers
The output from normalize_string
is handled by the default IOManager. That’s why it is stored as a pickled object sent_norm
.
The string outputs from the other 2 of our ops are handled by our custom CustomIOManager
. Our IOManager stores it in .txt
files in a dir called data
, under a sub-dir with same name as the run_id
.
The IOManager can be useful for finer control on your op outputs (e.g. you can choose which outputs to cache, where to store them, etc.).
In conclusion
That’s it. This is a minimal Dagster pipeline that just works. If you dig around, you’ll find lots of other features I’ve played with. For example, I am currently uselessly passing an empty dict hyperparameters
to an op that doesn’t use it.
Two closing points of advice:
- With a tool like Dagster, it’s easy to get carried away building non-essential nice-to-haves. Strip your pipeline to the essentials and build the simplest thing that just works. You can add complexity and sophistication later, and if anything breaks, always revert to the simpler version that just works. But if you start trying to build the perfect ML code project, you may struggle with execution. Be brutal with cutting out redundant concepts. For example, you don’t really need the Dagster
resource
concept at all. It’s just a nice-to-have, and I used it to configure paths and database connections. But in most cases, it’s non-essential. - The value-add from Dagster for reliable, deployed machine learning should be self-evident. If you have a complicated set of ML pipelines in different repos, Dagster empowers your team to develop independently but still orchestrate in sync. You can use logic to chain operations using Dagster, launching runs of certain pipelines based on dependencies, periodic
schedules
or even customsensor
s that respond to an event. You can leverage multiprocessing to distribute big computations, and wait to collect the results before starting the next step. You can define smart retry conditions and error handling. You can keep your dev process clean by writing simple tests using dummyconfig
s for predictable edge-cases.
Hopefully, you can now see the big picture, and have a tangible way to get started. Anything you can do in Python, you can do in Dagster - but also now you can orchestrate it reliably in production.