Use Hierarchical Data

This tutorial provides you with an example of how to work with hierarchical data.

Sample Data

The following data is a sample containing 2 hierarchies:

  • The Product level, and, for each product
    • a Customer level.
[
  {
    "Id": "Aa001",
    "Product": "General Corp Purpose",
    "Customer": [
      {
        "CustomerId": "1",
        "name": "Protea Hospitality Group (Pty) Limited",
        "Amount": 759215
      },
      {
        "CustomerId": "2",
        "name": "National Airways Corporation (Pty) Limited",
        "Amount": 176236
      },
      {
        "CustomerId": "3",
        "name": "MILLERS POINT PARKING PTY LTD",
        "Amount": 599413
      }
    ]
  },
  {
    "Id": "Aa002",
    "Product": "RLC",
    "Customer": [
      {
        "CustomerId": "4",
        "name": "Riverside Resource Recovery Limited",
        "Amount": 120994
      },
      {
        "CustomerId": "5",
        "name": "Millmerran Finance Company Pty Ltd",
        "Amount": 260978
      }
    ]
  }
]

The data is easily modeled in a JSON structure, using the aggregated data types such as maps and lists. However, this data cannot be stored in a cube, without prior processing.

In the following sections, you will learn how to process this data to be able to use it in a FusionFabric.cloud application.

Process the Data with the Data Manipulation Tool

In this section you are guided through an example of how to use the Data Manipulation tool to process a dataset that contains hierarchies. You will create a pipeline for this purpose. With this pipeline you will import the sample data, process, and store it in a cube.

To create the Data Manipulation pipeline

  1. Copy the following code, paste it in an empty text file, and save the file as JSON, for example Hierarchies.json.
{
  "pipelineConfig" : {
    "schemaVersion" : 5,
    "version" : 9,
    "pipelineId" : "Hierarchies690fe02e-dc6a-49a4-b553-c0dcd510f2ea",
    "title" : "Hierarchies",
    "description" : "",
    "uuid" : "300b8e6b-2a18-4856-8825-25e6f8290d73",
    "configuration" : [ {
      "name" : "executionMode",
      "value" : "STANDALONE"
    }, {
      "name" : "deliveryGuarantee",
      "value" : "AT_LEAST_ONCE"
    }, {
      "name" : "startEventStage",
      "value" : "streamsets-datacollector-basic-lib::com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget::1"
    }, {
      "name" : "stopEventStage",
      "value" : "streamsets-datacollector-basic-lib::com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget::1"
    }, {
      "name" : "shouldRetry",
      "value" : true
    }, {
      "name" : "retryAttempts",
      "value" : -1
    }, {
      "name" : "memoryLimit",
      "value" : "${jvm:maxMemoryMB() * 0.85}"
    }, {
      "name" : "memoryLimitExceeded",
      "value" : "LOG"
    }, {
      "name" : "notifyOnStates",
      "value" : [ "RUN_ERROR", "STOPPED", "FINISHED" ]
    }, {
      "name" : "emailIDs",
      "value" : [ ]
    }, {
      "name" : "constants",
      "value" : [ ]
    }, {
      "name" : "badRecordsHandling",
      "value" : "streamsets-datacollector-basic-lib::com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget::1"
    }, {
      "name" : "errorRecordPolicy",
      "value" : "ORIGINAL_RECORD"
    }, {
      "name" : "workerCount",
      "value" : 0
    }, {
      "name" : "clusterSlaveMemory",
      "value" : 2048
    }, {
      "name" : "clusterSlaveJavaOpts",
      "value" : "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Dlog4j.debug"
    }, {
      "name" : "clusterLauncherEnv",
      "value" : [ ]
    }, {
      "name" : "mesosDispatcherURL",
      "value" : null
    }, {
      "name" : "hdfsS3ConfDir",
      "value" : null
    }, {
      "name" : "rateLimit",
      "value" : 0
    }, {
      "name" : "maxRunners",
      "value" : 0
    }, {
      "name" : "shouldCreateFailureSnapshot",
      "value" : true
    }, {
      "name" : "webhookConfigs",
      "value" : [ ]
    }, {
      "name" : "sparkConfigs",
      "value" : [ ]
    }, {
      "name" : "statsAggregatorStage",
      "value" : ""
    }, {
      "name" : "runnerIdleTIme",
      "value" : 60
    }, {
      "name" : "edgeHttpUrl",
      "value" : "http://localhost:18633"
    } ],
    "uiInfo" : {
      "previewConfig" : {
        "previewSource" : "CONFIGURED_SOURCE",
        "batchSize" : 10,
        "timeout" : 10000,
        "writeToDestinations" : false,
        "executeLifecycleEvents" : false,
        "showHeader" : false,
        "showFieldType" : true,
        "rememberMe" : false
      }
    },
    "fragments" : [ ],
    "stages" : [ {
      "instanceName" : "HTTPClient_01",
      "library" : "streamsets-datacollector-basic-lib",
      "stageName" : "com_streamsets_pipeline_stage_origin_http_HttpClientDSource",
      "stageVersion" : "14",
      "configuration" : [ {
        "name" : "conf.basic.maxBatchSize",
        "value" : 1000
      }, {
        "name" : "conf.basic.maxWaitTime",
        "value" : 2000
      }, {
        "name" : "conf.dataFormatConfig.compression",
        "value" : "NONE"
      }, {
        "name" : "conf.dataFormatConfig.filePatternInArchive",
        "value" : "*"
      }, {
        "name" : "conf.dataFormatConfig.charset",
        "value" : "UTF-8"
      }, {
        "name" : "conf.dataFormatConfig.removeCtrlChars",
        "value" : false
      }, {
        "name" : "conf.dataFormatConfig.textMaxLineLen",
        "value" : 1024
      }, {
        "name" : "conf.dataFormatConfig.useCustomDelimiter",
        "value" : false
      }, {
        "name" : "conf.dataFormatConfig.customDelimiter",
        "value" : "\\r\\n"
      }, {
        "name" : "conf.dataFormatConfig.includeCustomDelimiterInTheText",
        "value" : false
      }, {
        "name" : "conf.dataFormatConfig.jsonContent",
        "value" : "ARRAY_OBJECTS"
      }, {
        "name" : "conf.dataFormatConfig.jsonMaxObjectLen",
        "value" : 4096
      }, {
        "name" : "conf.dataFormatConfig.csvFileFormat",
        "value" : "CSV"
      }, {
        "name" : "conf.dataFormatConfig.csvHeader",
        "value" : "NO_HEADER"
      }, {
        "name" : "conf.dataFormatConfig.csvAllowExtraColumns",
        "value" : false
      }, {
        "name" : "conf.dataFormatConfig.csvExtraColumnPrefix",
        "value" : "_extra_"
      }, {
        "name" : "conf.dataFormatConfig.csvMaxObjectLen",
        "value" : 1024
      }, {
        "name" : "conf.dataFormatConfig.csvCustomDelimiter",
        "value" : "|"
      }, {
        "name" : "conf.dataFormatConfig.csvCustomEscape",
        "value" : "\\"
      }, {
        "name" : "conf.dataFormatConfig.csvCustomQuote",
        "value" : "\""
      }, {
        "name" : "conf.dataFormatConfig.csvEnableComments",
        "value" : false
      }, {
        "name" : "conf.dataFormatConfig.csvCommentMarker",
        "value" : "#"
      }, {
        "name" : "conf.dataFormatConfig.csvIgnoreEmptyLines",
        "value" : true
      }, {
        "name" : "conf.dataFormatConfig.csvRecordType",
        "value" : "LIST_MAP"
      }, {
        "name" : "conf.dataFormatConfig.csvSkipStartLines",
        "value" : 0
      }, {
        "name" : "conf.dataFormatConfig.parseNull",
        "value" : false
      }, {
        "name" : "conf.dataFormatConfig.nullConstant",
        "value" : "\\\\N"
      }, {
        "name" : "conf.dataFormatConfig.xmlRecordElement",
        "value" : null
      }, {
        "name" : "conf.dataFormatConfig.includeFieldXpathAttributes",
        "value" : false
      }, {
        "name" : "conf.dataFormatConfig.xPathNamespaceContext",
        "value" : [ ]
      }, {
        "name" : "conf.dataFormatConfig.outputFieldAttributes",
        "value" : false
      }, {
        "name" : "conf.dataFormatConfig.xmlMaxObjectLen",
        "value" : 4096
      }, {
        "name" : "conf.dataFormatConfig.logMode",
        "value" : "COMMON_LOG_FORMAT"
      }, {
        "name" : "conf.dataFormatConfig.logMaxObjectLen",
        "value" : 1024
      }, {
        "name" : "conf.dataFormatConfig.retainOriginalLine",
        "value" : false
      }, {
        "name" : "conf.dataFormatConfig.customLogFormat",
        "value" : "%h %l %u %t \"%r\" %>s %b"
      }, {
        "name" : "conf.dataFormatConfig.regex",
        "value" : "^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(\\S+) (\\S+) (\\S+)\" (\\d{3}) (\\d+)"
      }, {
        "name" : "conf.dataFormatConfig.fieldPathsToGroupName",
        "value" : [ {
          "fieldPath" : "/",
          "group" : 1
        } ]
      }, {
        "name" : "conf.dataFormatConfig.grokPatternDefinition",
        "value" : null
      }, {
        "name" : "conf.dataFormatConfig.grokPattern",
        "value" : "%{COMMONAPACHELOG}"
      }, {
        "name" : "conf.dataFormatConfig.onParseError",
        "value" : "ERROR"
      }, {
        "name" : "conf.dataFormatConfig.maxStackTraceLines",
        "value" : 50
      }, {
        "name" : "conf.dataFormatConfig.enableLog4jCustomLogFormat",
        "value" : false
      }, {
        "name" : "conf.dataFormatConfig.log4jCustomLogFormat",
        "value" : "%r [%t] %-5p %c %x - %m%n"
      }, {
        "name" : "conf.dataFormatConfig.avroSchemaSource",
        "value" : null
      }, {
        "name" : "conf.dataFormatConfig.avroSchema",
        "value" : null
      }, {
        "name" : "conf.dataFormatConfig.schemaRegistryUrls",
        "value" : [ ]
      }, {
        "name" : "conf.dataFormatConfig.schemaLookupMode",
        "value" : "SUBJECT"
      }, {
        "name" : "conf.dataFormatConfig.subject",
        "value" : null
      }, {
        "name" : "conf.dataFormatConfig.schemaId",
        "value" : null
      }, {
        "name" : "conf.dataFormatConfig.protoDescriptorFile",
        "value" : null
      }, {
        "name" : "conf.dataFormatConfig.messageType",
        "value" : null
      }, {
        "name" : "conf.dataFormatConfig.isDelimited",
        "value" : true
      }, {
        "name" : "conf.dataFormatConfig.binaryMaxObjectLen",
        "value" : 1024
      }, {
        "name" : "conf.dataFormatConfig.datagramMode",
        "value" : "SYSLOG"
      }, {
        "name" : "conf.dataFormatConfig.typesDbPath",
        "value" : null
      }, {
        "name" : "conf.dataFormatConfig.convertTime",
        "value" : false
      }, {
        "name" : "conf.dataFormatConfig.excludeInterval",
        "value" : true
      }, {
        "name" : "conf.dataFormatConfig.authFilePath",
        "value" : null
      }, {
        "name" : "conf.dataFormatConfig.netflowOutputValuesMode",
        "value" : "RAW_AND_INTERPRETED"
      }, {
        "name" : "conf.dataFormatConfig.maxTemplateCacheSize",
        "value" : -1
      }, {
        "name" : "conf.dataFormatConfig.templateCacheTimeoutMs",
        "value" : -1
      }, {
        "name" : "conf.dataFormatConfig.netflowOutputValuesModeDatagram",
        "value" : "RAW_AND_INTERPRETED"
      }, {
        "name" : "conf.dataFormatConfig.maxTemplateCacheSizeDatagram",
        "value" : -1
      }, {
        "name" : "conf.dataFormatConfig.templateCacheTimeoutMsDatagram",
        "value" : -1
      }, {
        "name" : "conf.dataFormatConfig.wholeFileMaxObjectLen",
        "value" : 8192
      }, {
        "name" : "conf.dataFormatConfig.rateLimit",
        "value" : "-1"
      }, {
        "name" : "conf.dataFormatConfig.verifyChecksum",
        "value" : false
      }, {
        "name" : "conf.resourceUrl",
        "value" : "https://api.myjson.com/bins/a9v84"
      }, {
        "name" : "conf.headers",
        "value" : [ ]
      }, {
        "name" : "conf.httpMethod",
        "value" : "GET"
      }, {
        "name" : "conf.timeZoneID",
        "value" : "UTC"
      }, {
        "name" : "conf.requestBody",
        "value" : null
      }, {
        "name" : "conf.defaultRequestContentType",
        "value" : "application/json"
      }, {
        "name" : "conf.client.transferEncoding",
        "value" : "CHUNKED"
      }, {
        "name" : "conf.client.httpCompression",
        "value" : "NONE"
      }, {
        "name" : "conf.client.connectTimeoutMillis",
        "value" : 0
      }, {
        "name" : "conf.client.readTimeoutMillis",
        "value" : 0
      }, {
        "name" : "conf.client.authType",
        "value" : "NONE"
      }, {
        "name" : "conf.client.useOAuth2",
        "value" : false
      }, {
        "name" : "conf.client.oauth.consumerKey",
        "value" : null
      }, {
        "name" : "conf.client.oauth.consumerSecret",
        "value" : null
      }, {
        "name" : "conf.client.oauth.token",
        "value" : null
      }, {
        "name" : "conf.client.oauth.tokenSecret",
        "value" : null
      }, {
        "name" : "conf.client.oauth2.credentialsGrantType",
        "value" : null
      }, {
        "name" : "conf.client.oauth2.tokenUrl",
        "value" : null
      }, {
        "name" : "conf.client.oauth2.clientId",
        "value" : null
      }, {
        "name" : "conf.client.oauth2.clientSecret",
        "value" : null
      }, {
        "name" : "conf.client.oauth2.username",
        "value" : null
      }, {
        "name" : "conf.client.oauth2.password",
        "value" : null
      }, {
        "name" : "conf.client.oauth2.resourceOwnerClientId",
        "value" : null
      }, {
        "name" : "conf.client.oauth2.resourceOwnerClientSecret",
        "value" : null
      }, {
        "name" : "conf.client.oauth2.algorithm",
        "value" : "NONE"
      }, {
        "name" : "conf.client.oauth2.key",
        "value" : null
      }, {
        "name" : "conf.client.oauth2.jwtClaims",
        "value" : null
      }, {
        "name" : "conf.client.oauth2.transferEncoding",
        "value" : "BUFFERED"
      }, {
        "name" : "conf.client.oauth2.additionalValues",
        "value" : [ { } ]
      }, {
        "name" : "conf.client.basicAuth.username",
        "value" : null
      }, {
        "name" : "conf.client.basicAuth.password",
        "value" : null
      }, {
        "name" : "conf.client.useProxy",
        "value" : false
      }, {
        "name" : "conf.client.proxy.uri",
        "value" : null
      }, {
        "name" : "conf.client.proxy.username",
        "value" : null
      }, {
        "name" : "conf.client.proxy.password",
        "value" : null
      }, {
        "name" : "conf.client.tlsConfig.tlsEnabled",
        "value" : false
      }, {
        "name" : "conf.client.tlsConfig.keyStoreFilePath",
        "value" : null
      }, {
        "name" : "conf.client.tlsConfig.keyStoreType",
        "value" : "JKS"
      }, {
        "name" : "conf.client.tlsConfig.keyStorePassword",
        "value" : null
      }, {
        "name" : "conf.client.tlsConfig.keyStoreAlgorithm",
        "value" : "SunX509"
      }, {
        "name" : "conf.client.tlsConfig.trustStoreFilePath",
        "value" : null
      }, {
        "name" : "conf.client.tlsConfig.trustStoreType",
        "value" : "JKS"
      }, {
        "name" : "conf.client.tlsConfig.trustStorePassword",
        "value" : null
      }, {
        "name" : "conf.client.tlsConfig.trustStoreAlgorithm",
        "value" : "SunX509"
      }, {
        "name" : "conf.client.tlsConfig.useDefaultProtocols",
        "value" : true
      }, {
        "name" : "conf.client.tlsConfig.protocols",
        "value" : [ ]
      }, {
        "name" : "conf.client.tlsConfig.useDefaultCiperSuites",
        "value" : true
      }, {
        "name" : "conf.client.tlsConfig.cipherSuites",
        "value" : [ ]
      }, {
        "name" : "conf.client.requestLoggingConfig.enableRequestLogging",
        "value" : false
      }, {
        "name" : "conf.client.requestLoggingConfig.logLevel",
        "value" : "FINE"
      }, {
        "name" : "conf.client.requestLoggingConfig.verbosity",
        "value" : "HEADERS_ONLY"
      }, {
        "name" : "conf.client.requestLoggingConfig.maxEntitySize",
        "value" : 0
      }, {
        "name" : "conf.httpMode",
        "value" : "BATCH"
      }, {
        "name" : "conf.pollingInterval",
        "value" : 5000
      }, {
        "name" : "conf.dataFormat",
        "value" : "JSON"
      }, {
        "name" : "conf.responseStatusActionConfigs",
        "value" : [ {
          "statusCode" : 500,
          "action" : "RETRY_EXPONENTIAL_BACKOFF",
          "backoffInterval" : 1000,
          "maxNumRetries" : 10
        } ]
      }, {
        "name" : "conf.responseTimeoutActionConfig.action",
        "value" : "RETRY_IMMEDIATELY"
      }, {
        "name" : "conf.responseTimeoutActionConfig.backoffInterval",
        "value" : 1000
      }, {
        "name" : "conf.responseTimeoutActionConfig.maxNumRetries",
        "value" : 10
      }, {
        "name" : "conf.pagination.mode",
        "value" : "NONE"
      }, {
        "name" : "conf.pagination.nextPageFieldPath",
        "value" : null
      }, {
        "name" : "conf.pagination.stopCondition",
        "value" : null
      }, {
        "name" : "conf.pagination.startAt",
        "value" : null
      }, {
        "name" : "conf.pagination.resultFieldPath",
        "value" : null
      }, {
        "name" : "conf.pagination.keepAllFields",
        "value" : false
      }, {
        "name" : "conf.pagination.rateLimit",
        "value" : 2000
      }, {
        "name" : "stageOnRecordError",
        "value" : "TO_ERROR"
      } ],
      "uiInfo" : {
        "description" : "",
        "label" : "HTTP Client 1",
        "xPos" : 43,
        "yPos" : 27,
        "stageType" : "SOURCE"
      },
      "inputLanes" : [ ],
      "outputLanes" : [ "HTTPClient_01OutputLane15377776587880" ],
      "eventLanes" : [ ],
      "services" : [ ]
    }, {
      "instanceName" : "FieldPivoter_01",
      "library" : "streamsets-datacollector-basic-lib",
      "stageName" : "com_streamsets_pipeline_stage_processor_listpivot_ListPivotDProcessor",
      "stageVersion" : "2",
      "configuration" : [ {
        "name" : "listPath",
        "value" : "/Customer"
      }, {
        "name" : "copyFields",
        "value" : true
      }, {
        "name" : "newPath",
        "value" : ""
      }, {
        "name" : "saveOriginalFieldName",
        "value" : false
      }, {
        "name" : "originalFieldNamePath",
        "value" : null
      }, {
        "name" : "onStagePreConditionFailure",
        "value" : "TO_ERROR"
      }, {
        "name" : "stageOnRecordError",
        "value" : "TO_ERROR"
      }, {
        "name" : "stageRequiredFields",
        "value" : [ ]
      }, {
        "name" : "stageRecordPreconditions",
        "value" : [ ]
      } ],
      "uiInfo" : {
        "description" : "",
        "label" : "Field Pivoter 1",
        "xPos" : 217,
        "yPos" : 143,
        "stageType" : "PROCESSOR"
      },
      "inputLanes" : [ "HTTPClient_01OutputLane15377776587880" ],
      "outputLanes" : [ "FieldPivoter_01OutputLane15330463693150" ],
      "eventLanes" : [ ],
      "services" : [ ]
    }, {
      "instanceName" : "FieldFlattener_01",
      "library" : "streamsets-datacollector-basic-lib",
      "stageName" : "com_streamsets_pipeline_stage_processor_fieldflattener_FieldFlattenerDProcessor",
      "stageVersion" : "1",
      "configuration" : [ {
        "name" : "config.flattenType",
        "value" : "ENTIRE_RECORD"
      }, {
        "name" : "config.fields",
        "value" : [ ]
      }, {
        "name" : "config.flattenInPlace",
        "value" : true
      }, {
        "name" : "config.flattenTargetField",
        "value" : null
      }, {
        "name" : "config.collisionFieldAction",
        "value" : "TO_ERROR"
      }, {
        "name" : "config.removeFlattenedField",
        "value" : true
      }, {
        "name" : "config.nameSeparator",
        "value" : "."
      }, {
        "name" : "stageOnRecordError",
        "value" : "TO_ERROR"
      }, {
        "name" : "stageRequiredFields",
        "value" : [ ]
      }, {
        "name" : "stageRecordPreconditions",
        "value" : [ ]
      } ],
      "uiInfo" : {
        "description" : "",
        "label" : "Field Flattener 1",
        "xPos" : 389,
        "yPos" : 10,
        "stageType" : "PROCESSOR"
      },
      "inputLanes" : [ "FieldPivoter_01OutputLane15330463693150" ],
      "outputLanes" : [ "FieldFlattener_01OutputLane15330462886790" ],
      "eventLanes" : [ ],
      "services" : [ ]
    }, {
      "instanceName" : "FieldRenamer_01",
      "library" : "streamsets-datacollector-basic-lib",
      "stageName" : "com_streamsets_pipeline_stage_processor_fieldrenamer_FieldRenamerDProcessor",
      "stageVersion" : "2",
      "configuration" : [ {
        "name" : "renameMapping",
        "value" : [ {
          "fromFieldExpression" : "/'Customer.Amount'",
          "toFieldExpression" : "/Amount"
        }, {
          "fromFieldExpression" : "/'Customer.CustomerId'",
          "toFieldExpression" : "/CustomerId"
        }, {
          "fromFieldExpression" : "/'Customer.name'",
          "toFieldExpression" : "/Customer"
        } ]
      }, {
        "name" : "errorHandler.nonExistingFromFieldHandling",
        "value" : "TO_ERROR"
      }, {
        "name" : "errorHandler.existingToFieldHandling",
        "value" : "TO_ERROR"
      }, {
        "name" : "errorHandler.multipleFromFieldsMatching",
        "value" : "TO_ERROR"
      }, {
        "name" : "stageOnRecordError",
        "value" : "TO_ERROR"
      }, {
        "name" : "stageRequiredFields",
        "value" : [ ]
      }, {
        "name" : "stageRecordPreconditions",
        "value" : [ ]
      } ],
      "uiInfo" : {
        "description" : "",
        "label" : "Field Renamer 1",
        "xPos" : 570,
        "yPos" : 141,
        "stageType" : "PROCESSOR"
      },
      "inputLanes" : [ "FieldFlattener_01OutputLane15330462886790" ],
      "outputLanes" : [ "FieldRenamer_01OutputLane15330479399830" ],
      "eventLanes" : [ ],
      "services" : [ ]
    }, {
      "instanceName" : "ExpressionEvaluator_01",
      "library" : "streamsets-datacollector-basic-lib",
      "stageName" : "com_streamsets_pipeline_stage_processor_expression_ExpressionDProcessor",
      "stageVersion" : "2",
      "configuration" : [ {
        "name" : "expressionProcessorConfigs",
        "value" : [ {
          "fieldToSet" : "/UUID",
          "expression" : "${record:value('/Id')}_${record:value('/CustomerId')}"
        } ]
      }, {
        "name" : "headerAttributeConfigs",
        "value" : [ { } ]
      }, {
        "name" : "fieldAttributeConfigs",
        "value" : [ {
          "fieldToSet" : "/"
        } ]
      }, {
        "name" : "stageOnRecordError",
        "value" : "TO_ERROR"
      }, {
        "name" : "stageRequiredFields",
        "value" : [ ]
      }, {
        "name" : "stageRecordPreconditions",
        "value" : [ ]
      } ],
      "uiInfo" : {
        "description" : "",
        "label" : "Expression Evaluator 1",
        "xPos" : 743,
        "yPos" : 12,
        "stageType" : "PROCESSOR"
      },
      "inputLanes" : [ "FieldRenamer_01OutputLane15330479399830" ],
      "outputLanes" : [ "ExpressionEvaluator_01OutputLane15330480416520" ],
      "eventLanes" : [ ],
      "services" : [ ]
    }, {
      "instanceName" : "CubeInsert_01",
      "library" : "streamsets-com-trmsys-bi-lib",
      "stageName" : "com_trmsys_bi_streamsets_cube_destination_CubeDestination",
      "stageVersion" : "2",
      "configuration" : [ {
        "name" : "cubeDestinationConfig.cubeName",
        "value" : "Hierarchies Data"
      }, {
        "name" : "cubeDestinationConfig.deleteCubeBeforeInserting",
        "value" : true
      }, {
        "name" : "cubeDestinationConfig.createReport",
        "value" : "IF_DOES_NOT_EXIST"
      }, {
        "name" : "cubeDestinationConfig.generateMissingColumns",
        "value" : true
      }, {
        "name" : "cubeDestinationConfig.primaryKeys",
        "value" : [ {
          "columnType" : "AUTOMATIC",
          "enumValues" : [ ],
          "defaultValue" : "NULL_VALUE",
          "fieldName" : "/UUID"
        } ]
      }, {
        "name" : "cubeDestinationConfig.dimensions",
        "value" : [ ]
      }, {
        "name" : "cubeDestinationConfig.measures",
        "value" : [ ]
      }, {
        "name" : "cubeDestinationConfig.useDefaultServer",
        "value" : true
      }, {
        "name" : "cubeDestinationConfig.serverUrl",
        "value" : null
      }, {
        "name" : "cubeDestinationConfig.userName",
        "value" : null
      }, {
        "name" : "cubeDestinationConfig.password",
        "value" : null
      }, {
        "name" : "stageOnRecordError",
        "value" : "TO_ERROR"
      }, {
        "name" : "stageRequiredFields",
        "value" : [ ]
      }, {
        "name" : "stageRecordPreconditions",
        "value" : [ ]
      } ],
      "uiInfo" : {
        "description" : "",
        "label" : "Cube Insert 1",
        "xPos" : 916,
        "yPos" : 144,
        "stageType" : "TARGET"
      },
      "inputLanes" : [ "ExpressionEvaluator_01OutputLane15330480416520" ],
      "outputLanes" : [ ],
      "eventLanes" : [ ],
      "services" : [ ]
    } ],
    "errorStage" : {
      "instanceName" : "Discard_ErrorStage",
      "library" : "streamsets-datacollector-basic-lib",
      "stageName" : "com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget",
      "stageVersion" : "1",
      "configuration" : [ ],
      "uiInfo" : {
        "description" : "",
        "label" : "Error Records - Discard",
        "xPos" : 583,
        "yPos" : 50,
        "stageType" : "TARGET"
      },
      "inputLanes" : [ ],
      "outputLanes" : [ ],
      "eventLanes" : [ ],
      "services" : [ ]
    },
    "info" : {
      "pipelineId" : "Hierarchies690fe02e-dc6a-49a4-b553-c0dcd510f2ea",
      "title" : "Hierarchies",
      "description" : "",
      "created" : 1537795289382,
      "lastModified" : 1539682063195,
      "creator" : "Mihai.Terente.finastra@creator.fusionfabric.cloud",
      "lastModifier" : "Mihai.Terente.finastra@creator.fusionfabric.cloud",
      "lastRev" : "0",
      "uuid" : "300b8e6b-2a18-4856-8825-25e6f8290d73",
      "valid" : true,
      "metadata" : {
        "labels" : [ ]
      },
      "name" : "Hierarchies690fe02e-dc6a-49a4-b553-c0dcd510f2ea",
      "sdcVersion" : "3.2.0.0",
      "sdcId" : "8a9bf0c7-ad04-11e8-b587-1b764aaaea38"
    },
    "metadata" : {
      "labels" : [ ]
    },
    "statsAggregatorStage" : null,
    "startEventStages" : [ {
      "instanceName" : "Discard_StartEventStage",
      "library" : "streamsets-datacollector-basic-lib",
      "stageName" : "com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget",
      "stageVersion" : "1",
      "configuration" : [ ],
      "uiInfo" : {
        "description" : "",
        "label" : "Start Event - Discard",
        "xPos" : 280,
        "yPos" : 50,
        "stageType" : "TARGET"
      },
      "inputLanes" : [ ],
      "outputLanes" : [ ],
      "eventLanes" : [ ],
      "services" : [ ]
    } ],
    "stopEventStages" : [ {
      "instanceName" : "Discard_StopEventStage",
      "library" : "streamsets-datacollector-basic-lib",
      "stageName" : "com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget",
      "stageVersion" : "1",
      "configuration" : [ ],
      "uiInfo" : {
        "description" : "",
        "label" : "Stop Event - Discard",
        "xPos" : 280,
        "yPos" : 50,
        "stageType" : "TARGET"
      },
      "inputLanes" : [ ],
      "outputLanes" : [ ],
      "eventLanes" : [ ],
      "services" : [ ]
    } ],
    "valid" : true,
    "issues" : {
      "pipelineIssues" : [ ],
      "stageIssues" : { },
      "issueCount" : 0
    },
    "previewable" : true
  },
  "pipelineRules" : {
    "schemaVersion" : 3,
    "version" : 2,
    "metricsRuleDefinitions" : [ {
      "id" : "badRecordsAlertID",
      "alertText" : "High incidence of Error Records",
      "metricId" : "pipeline.batchErrorRecords.counter",
      "metricType" : "COUNTER",
      "metricElement" : "COUNTER_COUNT",
      "condition" : "${value() > 100}",
      "sendEmail" : false,
      "enabled" : false,
      "timestamp" : 1533044082147,
      "valid" : true
    }, {
      "id" : "stageErrorAlertID",
      "alertText" : "High incidence of Stage Errors",
      "metricId" : "pipeline.batchErrorMessages.counter",
      "metricType" : "COUNTER",
      "metricElement" : "COUNTER_COUNT",
      "condition" : "${value() > 100}",
      "sendEmail" : false,
      "enabled" : false,
      "timestamp" : 1533044082147,
      "valid" : true
    }, {
      "id" : "idleGaugeID",
      "alertText" : "Pipeline is Idle",
      "metricId" : "RuntimeStatsGauge.gauge",
      "metricType" : "GAUGE",
      "metricElement" : "TIME_OF_LAST_RECEIVED_RECORD",
      "condition" : "${time:now() - value() > 120000}",
      "sendEmail" : false,
      "enabled" : false,
      "timestamp" : 1533044082147,
      "valid" : true
    }, {
      "id" : "batchTimeAlertID",
      "alertText" : "Batch taking more time to process",
      "metricId" : "RuntimeStatsGauge.gauge",
      "metricType" : "GAUGE",
      "metricElement" : "CURRENT_BATCH_AGE",
      "condition" : "${value() > 200}",
      "sendEmail" : false,
      "enabled" : false,
      "timestamp" : 1533044082147,
      "valid" : true
    }, {
      "id" : "memoryLimitAlertID",
      "alertText" : "Memory limit for pipeline exceeded",
      "metricId" : "pipeline.memoryConsumed.counter",
      "metricType" : "COUNTER",
      "metricElement" : "COUNTER_COUNT",
      "condition" : "${value() > (jvm:maxMemoryMB() * 0.65)}",
      "sendEmail" : false,
      "enabled" : false,
      "timestamp" : 1533044082147,
      "valid" : true
    } ],
    "dataRuleDefinitions" : [ ],
    "driftRuleDefinitions" : [ ],
    "uuid" : "41812cd1-2363-43d0-8308-5a363738ff62",
    "configuration" : [ {
      "name" : "emailIDs",
      "value" : [ ]
    }, {
      "name" : "webhookConfigs",
      "value" : [ ]
    } ],
    "ruleIssues" : [ ],
    "configIssues" : [ ]
  },
  "libraryDefinitions" : null
}
  1. (Optional) Create e new workspace.
  2. From the Tools hub, click Data source editor, and then open the DATA MANIPULATION tool.
  3. From the Pipelines Menu, click the drop-down button and select Import Pipeline.
Fig. 129: Import the pipeline definition.

Fig. 129: Import the pipeline definition.

  1. Enter the title of the pipeline: Hierarchies.
  2. Click Browse, and then locate and select the Hierarchies.json file that you created at step 1.
  3. Click Import. The sample pipeline is added to your list in the Data Manipulation tool. It also open in the editor.
Fig. 130: Overview of a pipeline to process a hierarchical dataset.

Fig. 130: Overview of a pipeline to process a hierarchical dataset.

Hierarchies Pipeline Stages

As you imported pipeline, you can now explore its stages. Run the pipeline in preview mode to see how the data is transformed during each stage.

  • HTTP Client

The first stage is the HTTP Client 1 origin, configured to get the sample data from the following URL: https://api.myjson.com/bins/a9v84. The data is downloaded in the nested JSON structure, that models the hierarchies in the data.

Fig. 131: The data contains hierarchies in a nested JSON structure.

Fig. 131: The data contains hierarchies in a nested JSON structure.

  • Field Pivoter

To explode the hierarchies, a Field Pivoter is used: Field Pivoter 1 explodes the /Customer hierarchy.

The stage is configured in the Field Pivot tab of the stage Configuraton section. The data is now exploded at the lowest hierarchical level: the customer.

Fig. 132: The Field Pivoter processor is used to explode the nested JSON structure to the lowest level.

Fig. 132: The Field Pivoter processor is used to explode the nested JSON structure to the lowest level.

  • Field Flattener

Further, the data is processed in the Field Flattener 1 stage to remove the aggregated fields such as maps or lists. The stage is configured to flatten entire record and use the . character to rename the fields. Each customer record now contains only attributes stored as simple data types: strings, numbers, booleans or nulls.

Fig. 133: The Field Flattener processor is used to remove the aggregated fields from the JSON structure.

Fig. 133: The Field Flattener processor is used to remove the aggregated fields from the JSON structure.

  • Field Renamer

The Field Renamer 1 is used to give the fields more meaningful names, that are set as pairs of Source Field Expression - Target Field Expression in the Rename section of the stage configuration.

Fig. 134: The fields are given meaningful names in the Field Renamer 1 processor.

Fig. 134: The fields are given meaningful names in the Field Renamer 1 processor.

  • Field Expression

A last processor stage is required to generate a unique ID for each record. The Field Expression used is ${record:value('/Id')}_${record:value('/CustomerId')}, and the result is stored in the UUID field. The data is now ready to be stored in a cube.

Fig. 135: A unique ID is added to the records in the Expression Evaluator 1 stage.

Fig. 135: A unique ID is added to the records in the Expression Evaluator 1 stage.

  • Cube Insert

The final stage is used to store the data in the cube. The Cube Insert 1 is configured to store the data in the Hierarchies Data cube. It is also configured to create the corresponding report.