Lesson 16 - Parallel Loop

Goal

In this lesson we’ll learn how to loop in parallel. When looping in parallel, a new branch is created for each value in a list and the action associated with the step is run for each branch in parallel.

Get Started

We’ll be creating a new flow that will call the new_hire flow we’ve built in previous lessons as a subflow. Let’s begin by creating a new file named hire_all.sl in the tutorials/hiring folder for our new flow. Also, we’ll need the new_hire.sl because we’re going to make some minor changes to that as well. And finally, we’ll pass our flow inputs using a file, so let’s create a tutorials/inputs folder and add a hires.yaml file.

Outputs

Since we’ll be using the new_hire flow as a subflow, it will be helpful if we add some flow outputs for a parent flow to make use of. We’ll simply add an outputs section at the bottom of our flow to output a bit of information. This outputs section is quite a distance from the flow key, so be extra careful to place it at the proper indentation.

outputs:
  - address
  - final_cost: ${total_cost}

Parent Flow

Our new hire_all flow is going to take in a list of names of people being hired and will call the new_hire flow for each one of them. It will be looping in parallel, so all the new_hire flows will be running simultaneously.

In hire_all.sl we can start off as usual by declaring a namespace, specifying the imports and taking in the inputs, which in our case is a list of names.

namespace: tutorials.hiring

imports:
  base: tutorials.base

flow:
  name: hire_all

  inputs:
    - names_list

  workflow:

Loop Syntax

A parallel loop looks pretty similar to a normal for loop, but with a few key differences.

Let’s create a new step named process_all in which we’ll do our looping. Each branch of the loop will call the new_hire flow.

- process_all:
    parallel_loop:
      for: name in eval(names_list)
      do:
        new_hire:
          - first_name: ${name["first"]}
          - middle_name: ${name.get("middle","")}
          - last_name: ${name["last"]}

As you can see, so far it is almost identical to a regular for loop, except the loop key has been replaced by parallel_loop.

The names_list input will be a list of dictionaries containing name information with the keys first, middle and last. For each name in names_list the new_hire flow will be called and passed the corresponding name values. The various branches running the new_hire flow will run in parallel and the rest of the flow will continue only after all the branches have completed.

For more information, see parallel_loop in the DSL reference.

Publish

Next we perform aggregation in the publish section in a similar manner to what we do in a normal for loop (as we did in lesson 11 - Loop Aggregation). Publish occurs only after all branches have completed.

In most cases the publish will make use of the branches_context list. This is a list that is populated with all of the outputs from all of the branches. For example, in our case, branches_context[0] will contain keys address and final_cost, corresponding to the values output by the first branch to complete. Similarly, branches_context[1] will contain the keys address and final_cost mapped to the values output by the second branch to complete.

There is no way to predict the order in which branches will complete, so the branches_context is rarely accessed using a particular index. Instead, Python expressions are used to extract the desired aggregations.

- process_all:
    parallel_loop:
      for: name in eval(names_list)
      do:
        new_hire:
          - first_name: ${name["first"]}
          - middle_name: ${name.get("middle","")}
          - last_name: ${name["last"]}
    publish:
      - email_list: "${', '.join(filter(lambda x : x != '', map(lambda x : str(x['address']), branches_context)))}"
      - cost: "${str(sum(map(lambda x : x['final_cost'], branches_context)))}"

In our case we use the map(), filter() and sum() Python functions to create a list of all the email addresses that were created and a sum of all the equipment costs.

Let’s look a bit closer at one of the publish aggregations to better understand what’s going on. Each time a branch of the parallel loop is finished running the new_hire subflow it publishes a final_cost value. Each of those individual final_cost values gets added to the branches_context list at index n, where n indicates the order the branches finish in, under the final_cost key. So, if we were to loop through the branches_context we would find at branches_context[n][final_cost] the final_cost value that was published by the nth new_hire subflow to finish running. Instead of looping through the branches_context, we use a Python lambda expression in conjunction with the map function to extract just the values of the final_cost from each branches_context[n][final_cost] to a new list. Finally, we use the Python sum function to add up all the extracted values in our new list and publish that value as cost.

For more information, see publish and branches_context in the DSL reference.

For more information on the Python constructs used here, see lambda, map and sum in the Python documentation.

Input File

We’ll use an input file to send the flow our list of names. An input file is very similar to a system properties file. It is written in plain YAML and therefore ends with the .yaml extension.

Here is the contents of our hires.yaml input file that we created in the tutorials/inputs folder.

names_list: '[{"first": "joe", "middle": "p", "last": "bloggs"}, {"first": "jane", "last": "doe"}, {"first": "juan", "last": "perez"}]'

The file contains a names_list key that maps to a stringified version of a list of name information. Remember, all inputs must be strings, so here we must use a string as well.

For more information, see Using an Inputs File in the CLI documentation.

Steps

Finally, we have to add the steps we referred to in the navigation section. We can put them right after the process_all step.

- print_success:
    do:
      base.print:
        - text: >
            ${"All addresses were created successfully.\nEmail addresses created: "
            + email_list + "\nTotal cost: " + cost}
    navigate:
      - SUCCESS: SUCCESS

- on_failure:
    - print_failure:
        do:
          base.print:
            - text: >
                ${"Some addresses were not created or there is an email issue.\nEmail addresses created: "
                + email_list + "\nTotal cost: " + cost}

Run It

We can save the files and run the flow. It’s a bit harder to track what has happened now because there are quite a few things happening at once. On careful inspection you will see that each step in the new_hire flow, and in each of its subflows, is run for each of the people in the names_list input.

run --f <folder path>/tutorials/hiring/hire_all.sl --cp <folder path>/tutorials,<content folder path>/base --if <folder path>/tutorials/inputs/hires.yaml --spf <folder path>/tutorials/properties/bcompany.prop.sl

Download the Code

Lesson 16 - Complete code

New Code - Complete

new_hire.sl

namespace: tutorials.hiring

imports:
  base: tutorials.base
  mail: io.cloudslang.base.mail

flow:
  name: new_hire

  inputs:
    - first_name
    - middle_name:
        required: false
    - last_name
    - all_missing:
        default: ""
        required: false
        private: true
    - total_cost:
        default: '0'
        private: true
    - order_map:
        default: '{"laptop": 1000, "docking station": 200, "monitor": 500, "phone": 100}'

  workflow:
    - print_start:
        do:
          base.print:
            - text: "Starting new hire process"
        navigate:
          - SUCCESS: create_email_address
    - create_email_address:
        loop:
          for: attempt in range(1,5)
          do:
            create_user_email:
              - first_name
              - middle_name
              - last_name
              - attempt: ${str(attempt)}
          publish:
            - address
            - password
          break:
            - CREATED
            - FAILURE
        navigate:
          - CREATED: get_equipment
          - UNAVAILABLE: print_fail
          - FAILURE: print_fail
    - get_equipment:
        loop:
          for: item, price in eval(order_map)
          do:
            order:
              - item
              - price: ${str(price)}
              - missing: ${all_missing}
              - cost: ${total_cost}
          publish:
            - all_missing: ${missing + not_ordered}
            - total_cost: ${str(int(cost) + int(spent))}
          break: []
        navigate:
          - AVAILABLE: check_min_reqs
          - UNAVAILABLE: check_min_reqs
    - check_min_reqs:
        do:
          base.contains:
            - container: ${all_missing}
            - sub: 'laptop'
        navigate:
          - DOES_NOT_CONTAIN: print_finish
          - CONTAINS: print_warning
    - print_warning:
        do:
          base.print:
            - text: >
                ${first_name + ' ' + last_name +
                ' did not receive all the required equipment\n'}
        navigate:
          - SUCCESS: print_finish
    - print_finish:
        do:
          base.print:
            - text: >
                ${'Created address: ' + address + ' for: ' + first_name + ' ' + last_name + '\n' +
                'Missing items: ' + all_missing + ' Cost of ordered items: ' + total_cost}
        navigate:
          - SUCCESS: fancy_name
    - fancy_name:
        do:
          fancy_text:
            - text: ${first_name + ' ' + last_name}
        publish:
          - fancy_text: ${fancy}
        navigate:
          - SUCCESS: send_mail
    - send_mail:
        do:
          mail.send_mail:
            - hostname: ${get_sp('tutorials.properties.hostname')}
            - port: ${get_sp('tutorials.properties.port')}
            - from: ${get_sp('tutorials.properties.system_address')}
            - to: ${get_sp('tutorials.properties.hr_address')}
            - subject: "${'New Hire: ' + first_name + ' ' + last_name}"
            - body: >
                ${fancy_text + '<br>' +
                'Created address: ' + address + ' for: ' + first_name + ' ' + last_name + '<br>' +
                'Missing items: ' + all_missing + ' Cost of ordered items: ' + total_cost + '<br>' +
                'Temporary password: ' + password}
        navigate:
          - FAILURE: FAILURE
          - SUCCESS: SUCCESS
    - on_failure:
      - print_fail:
          do:
            base.print:
              - text: "${'Failed to create address for: ' + first_name + ' ' + last_name}"

  outputs:
    - address
    - final_cost: ${total_cost}

hire_all.sl

namespace: tutorials.hiring

imports:
  base: tutorials.base

flow:
  name: hire_all

  inputs:
    - names_list

  workflow:
    - process_all:
        parallel_loop:
          for: name in eval(names_list)
          do:
            new_hire:
              - first_name: ${name["first"]}
              - middle_name: ${name.get("middle","")}
              - last_name: ${name["last"]}
        publish:
          - email_list: "${', '.join(filter(lambda x : x != '', map(lambda x : str(x['address']), branches_context)))}"
          - cost: "${str(sum(map(lambda x : int(x['final_cost']), branches_context)))}"
        navigate:
          - SUCCESS: print_success
          - FAILURE: print_failure

    - print_success:
        do:
          base.print:
            - text: >
                ${"All addresses were created successfully.\nEmail addresses created: "
                + email_list + "\nTotal cost: " + cost}
        navigate:
          - SUCCESS: SUCCESS

    - on_failure:
        - print_failure:
            do:
              base.print:
                - text: >
                    ${"Some addresses were not created or there is an email issue.\nEmail addresses created: "
                    + email_list + "\nTotal cost: " + cost}

hires.yaml

names_list: '[{"first": "joe", "middle": "p", "last": "bloggs"}, {"first": "jane", "last": "doe"}, {"first": "juan", "last": "perez"}]'