Overview
This article describes how to create repeatable, multi-step jobs within Magpie. It is helpful to revisit the components that make up job management within Magpie. The core elements that define a job are:
Projects: All jobs live within a particular project. The project context in which a job will be created can be set with the USE PROJECT command.
Jobs: A job is a collection of tasks that run in succession. Jobs are created using the CREATE JOB command.
Tasks: Tasks are individual units of activity within a larger job.
Tasks are further categorized into types based on how the task is specified.
Task Type | Description |
---|---|
SQL Mapper | This is a task that executes a SQL query and pushes the result into a target table. |
Magpie Script | This type of task simply executes a series of Magpie DSL commands in sequence. |
Scala Script | Scala scripts can be embedded in jobs as tasks. These tasks have access to all of the Magpie metadata and the Magpie context as well as all of the data processing, statistical, and machine learning APIs built into Magpie's underlying Spark processing engine. |
Python Script | Similar to Scala scripts, Python scripts can be executed as tasks. |
Nested Job | Existing jobs can be nested within other jobs as tasks. This is useful in creating modularity in job definitions and for chaining multiple, inter-dependent jobs together ensuring that one job executes before another starts. |
Once a job has been defined, it can be run interactively in the Magpie Notebook using the EXECUTE command, or it can be scheduled to run on a repeated basis.
Example
The following is an example of specifying and executing a job interactively. This job will pull data from a database source, and integrate it with data pulled from an internet URL source and populate a table with the results. Scheduling and notifications are covered in a separate article.
The first step in creating a job is setting the project context in which the job will execute.
use project my_project;
Once the project is set, this first step is to create the initial job.
drop job if exists refresh_property_table; create job { "name":"refresh_property_table", "description":"Refreshes a table storing information about real estate properties, supplementing the data with publicly available crime statistics." };
After the job is created, tasks can be added to it. Tasks are added using the ALTER JOB ADD TASK command. Note that each task other than the initial task needs to reference a predecessor task.
alter job refresh_property_table add magpie script task { "name":"stage_data", "description":"Stage the data from the source CSV web source containing crime statistics." } with script """ drop table if exists staging_crime with delete; save url "https://opendata.my_city.gov/crime_stats.csv" as table staging_crime with infer schema with header; """; alter job refresh_property_table add sql mapper task { "name":"populate_merged_table", "description":"Join the crime incident data to the property table containing the addresses for each property and save to a merged table.", "predecessorName":"stage_data", "targetTableName":"properties_and_crime" } with sql """ select p.*, a.crime_count from (select count(*) as crime_count, zip_code from staging_crime where year >= 2018 group by zip_code) a join properties p on a.zip_code = p.zip_code """;
With the tasks added, it is now possible to execute the job directly using the EXECUTE command.
execute job refresh_property_table;
Once a job has been created, it can also be reviewed using the DESCRIBE command.