arrow

Collect And Process Data Using DataWorks

1. Experiment

1.1 Knowledge points

This experiment uses Alibaba Cloud’s DataWorks, an important PaaS platform product of Alibaba Cloud DTplus. It provides comprehensive product services, such as data integration, development, management, governance, and sharing. DataWorks provides an all-in-one development and management interface to help enterprises focus on mining and exploring the value of their data. This experiment demonstrates the process of synchronizing data from OSS and RDS data sources to DataWorks, as well as the process for processing the data.

1.2 Experiment process

  • Create a DataWorks project
  • Data collection: Upload log data
  • Data processing: Conduct user profiling

1.3 Cloud resources required

  • Dataworks

1.4 Prerequisites

  • Before starting the experiment, please confirm that the previous experiment has been closed normally and exited.

2. Start the experiment environment

Click Start Lab in the upper right corner of the page to start the experiment.

image desc.

After the experiment environment is successfully started, the system has deployed resources required by this experiment in the background, including the ECS instance, RDS instance, Server Load Balancer instance, and OSS bucket. An account consisting of the username and password for logging on to the Web console of Alibaba Cloud is also provided.

image desc

After the experiment environment is started and related resources are properly deployed, the experiment starts a countdown. You have two hours to perform experimental operations. After the countdown ends, the experiment stops, and related resources are released. During the experiment, pay attention to the remaining time and arrange your time wisely. Next, use the username and password provided by the system to log on to the Web console of Alibaba Cloud and view related resources:

openCole

Go to the logon page of Alibaba Cloud console.

image desc

Fill in the sub-user account and click Next.

image desc

Fill in the sub-user password and click Log on.

image desc

After you successfully log on to the console, the following page is displayed.

image desc

3. Create a DataWorks project

In the Alibaba Cloud console, choose DataWorks.

image desc

On the Workspace List page, choose US West 1 and click Create Workspace.

image desc

Set the project name, select Standard Mode for Mode, and click Next.

image desc

Select the MaxCompute engine and click Next.

image desc

Set the Instance Display Name, and click Create Workspace.

image desc

The creation is complete.

image desc

After a while, the status will be displayed as Normal, and the creation is successful.

image desc

4. Data collection: Upload log data

4.1 Create a data source

Return to the DataWorks console again and click Data Integration.

image desc

Choose the Data Sources, as shown in the following figure.

image desc

image desc

After configuring the settings as shown in the following figure, click Test Connectivity. After the connectivity test, click Complete to save the data source.

Data Source Name: oss_workshop_log
bucket: new-dataworks-workshop
Endpoint: http://oss-cn-shanghai-internal.aliyuncs.com
AK ID: LTAI4FvGT3iU4xjKotpUMAjS
AK Secret: 9RSUoRmNxpRC9EhC4m9PjuG7Jzy7px

image desc

Click Advanced.

image desc

Click Confirm.

image desc

Click Test connectivity.

image desc

After the OSS data source is created, choose the MySQL data source, as shown in the following figure.

image desc

After configuring the settings as shown in the following figure, click Test Connectivity. After the connectivity test, click Complete to save the data source.

Data Source Name: rds_workshop_log
RDS Instance ID: rm-2ev0681lc7042g16u
RDS Instance Account: 5600815724958382
Database Name: workshop
Username: workshop
Password: workshop#2017

image desc

image desc

The data source has now been created.

4.2 Create a Workflow

Click Data Analytics, as shown in the following figure.

image desc

Create a Workflow, as shown in the following figure.

image desc

As shown in the following figure, set the business flow name and click Create.

image desc

Next, go to the Workflow development panel and drag a Zero-Load node (workshop_start) and two data synchronization nodes (rds_rsync_data and oss_rsync_data) onto the panel.

image desc

The following figure shows the Zero-Load node name settings.

image desc

The settings of both data synchronization nodes are shown in the following figures.

image desc

image desc

Drag and drop lines to set the workshop_start node as the upstream node of the other two nodes, as shown in the following figure.

image desc

4.3 Configure the workshop_start node

The new version of DataWorks sets the input and output nodes for each node, so you must set an input for the workshop_start node. A virtual node in the business flow can be set as the project root node, making it the upstream node. The project root node is generally named “project name_root”.

Double-click the workshop_start node and modify it according to the following figure. Then, click Save in the upper-left corner.

image desc

4.4 Create tables

Next, create two tables (ods_raw_log_d and ods_user_info_d).

First, create the ods_raw_log_d table. To do this, click Create Table, as shown in the following figure.

image desc

image desc

Choose DDL Statement, copy the following content into the dialog box, and then click Generate Table Structure.

CREATE TABLE IF NOT EXISTS  ods_raw_log_d (
  col STRING
)
PARTITIONED BY (
  dt STRING
);

image desc

image desc

Set the table name as the display name and add it to the development and production environments, as shown in the following figure.

image desc

Create the ods_user_info_d table in the same way by running the following creation statement.

CREATE TABLE IF NOT  EXISTS ods_user_info_d (
  uid STRING COMMENT 'User ID',
  gender STRING COMMENT 'Gender',
  age_range STRING COMMENT 'Age range',
  zodiac STRING COMMENT 'Zodiac sign'
)
PARTITIONED BY (
  dt STRING
);

4.5 Configure a data synchronization task

Double-click the OSS data synchronization node to open the node configuration page.

image desc

Select a data source, as shown in the following figure.

image desc

Select the data destination, as shown in the following figure.

image desc

Configure field mapping.Delete other Columns.

image desc

image desc

image desc

Configure channel control with a maximum operating rate of 10 MB/s.

image desc

Click Resource Group configuration.

image desc

Click Save in the upper-left corner.

image desc

Next, configure the RDS data synchronization node. To do this, double-click the node to open the node configuration page.

image desc

Select the data source and destination, as shown in the following figure.

image desc

Configure field mapping.

image desc

Configure channel control with a maximum operating rate of 10 MB/s.

image desc

Refer to the figure below to set up a common resource group.

image desc

Click Save in the upper-left corner.

image desc

4.6 Submit a workflow task

Click the submit button in the upper-left corner.

image desc

Click Commit, as shown in the following figure.

image desc

The nodes have been submitted.

image desc

Click the run button in the upper-left corner.

image desc

You can view the running log during operation.

image desc

4.7 Confirm that the data is imported successfully

Create a temporary query, as shown in the following figure.

image desc

Click Commit.

image desc

Run the following SQL statement. Replace DATATIME with your own date. If the task running date is 20180717, the business date is 20180716, If the user is in China, due to time zone relationship, the date may be advanced two days in advance of the current date.

select count(*) from ods_raw_log_d where dt=DATETIME;
select count(*) from ods_user_info_d where dt=DATETIME;

Click Save to run the statement.

image desc

image desc

The running results are as follows:

image desc

image desc

Enter and save the following command, and then run it to view the content of the ods_raw_log_d table. Replace DATATIME with your own date. If the task running date is 20180717, the business date is 20180716.

SELECT SPLIT(col, '##@@')[0] AS ip
, SPLIT(col, '##@@')[1] AS uid
, SPLIT(col, '##@@')[2] AS time
, SPLIT(col, '##@@')[3] AS request
, SPLIT(col, '##@@')[4] AS status
, SPLIT(col, '##@@')[5] AS bytes
, SPLIT(col, '##@@')[6] AS referer
, SPLIT(col, '##@@')[7] AS agent
FROM ods_raw_log_d
WHERE dt = DATATIME;

image desc

5. Data processing: Conduct user profiling

Next, process the log data that has been collected in MaxCompute and conduct user profiling.

5.1 Create correlation tables

Create the ods_log_info_d table.

image desc

CREATE TABLE IF NOT EXISTS ods_log_info_d (
  ip STRING COMMENT 'ip',
  uid STRING COMMENT 'user ID',
  time STRING COMMENT 'yyyymmddhh:mi:ss',
  status STRING COMMENT 'server response status code',
  bytes STRING COMMENT 'Number of bytes returned to the client',
  region STRING COMMENT 'region',
  method STRING COMMENT 'HTTP request type',
  url STRING COMMENT 'url',
  protocol STRING COMMENT 'HTTP protocol version number',
  referer STRING COMMENT 'Source url',
  device STRING COMMENT 'Terminal type',
  identity STRING COMMENT 'Access type crawler feed user unknown'
)
PARTITIONED BY (
  dt STRING
);

image desc

image desc

After setting the table name, submit it to the development environment and production environment in that order.

image desc

Create the table dw_user_info_all_d in the same way.

The following is an example table creation statement:

CREATE TABLE IF NOT EXISTS dw_user_info_all_d (
  uid STRING COMMENT 'user ID',
  gender STRING COMMENT 'sex',
  age_range STRING COMMENT 'age range',
  zodiac STRING COMMENT 'constellation',
  region STRING COMMENT 'region',
  device STRING COMMENT 'terminal type',
  identity STRING COMMENT 'Access type crawler feed user unknown',
  method STRING COMMENT 'http request type',
  url STRING COMMENT 'url',
  referer STRING COMMENT 'source url',
  time STRING COMMENT 'yyyymmddhh:mi:ss'
)
PARTITIONED BY (
  dt STRING
);

Create the table rpt_user_info_d in the same way.

The following is an example table creation statement:

CREATE TABLE IF NOT EXISTS rpt_user_info_d (
  uid STRING COMMENT 'user ID',
  region STRING COMMENT 'region',
  device STRING COMMENT 'terminal type',
  pv BIGINT COMMENT 'pv',
  gender STRING COMMENT 'sex',
  age_range STRING COMMENT 'age range',
  zodiac STRING COMMENT 'constellation'
)
PARTITIONED BY (
  dt STRING
);

After the tables are created, drag three ODPS SQL nodes onto the canvas, as shown in the following figure. These nodes are named ods_log_info_d, dw_user_info_all_d, and rpt_user_info_d. Configure the dependencies as follows.

image desc

5.2 Create a user-defined function

Go to the following link in your browser to download an ip2region.jar file.

http://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/attach/85298/cn_zh/1532163718650/ip2region.jar

Create a resource, as shown in the following figure.

image desc

Select the downloaded file to upload it and click OK.

image desc

Once uploaded, click the submit button in the upper-left corner.

image desc

image desc

Create a user-defined function (UDF), as shown in the following figure.

image desc

Name it getregion.

image desc

Configure the function in the Register Function dialog box by specifying the class name, description, command format, and parameter description. After the configuration is complete, click Save and Submit in the upper-left corner.

Function Name: getregion
Class Name: org.alidata.odps.udf.Ip2Region
Resources: ip2region.jar
Description: IP address translation region
Expression Syntax: getregion('ip')

image desc

image desc

5.3 Configure an ODPS SQL node

Double-click the ods_log_info_d node to open the node configuration page and compose the processing logic.

image desc

INSERT OVERWRITE TABLE ods_log_info_d PARTITION (dt=${bdp.system.bizdate})
SELECT ip
  , uid
  , time
  , status
  , bytes --Use a custom UDF to get the region by IP
  , getregion(ip) AS region --The request is differentiated into three fields using a regular method
  , regexp_substr(request, '(^[^ ]+ )') AS method
  , regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
  , regexp_substr(request, '([^ ]+$)') AS protocol --Get a more accurate url with regular clear refer
  , regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer --Get terminal information and access form through agent
  , CASE
    WHEN TOLOWER(agent) RLIKE 'android' THEN 'android'
    WHEN TOLOWER(agent) RLIKE 'iphone' THEN 'iphone'
    WHEN TOLOWER(agent) RLIKE 'ipad' THEN 'ipad'
    WHEN TOLOWER(agent) RLIKE 'macintosh' THEN 'macintosh'
    WHEN TOLOWER(agent) RLIKE 'windows phone' THEN 'windows_phone'
    WHEN TOLOWER(agent) RLIKE 'windows' THEN 'windows_pc'
    ELSE 'unknown'
  END AS device
  , CASE
    WHEN TOLOWER(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
    WHEN TOLOWER(agent) RLIKE 'feed'
    OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
    WHEN TOLOWER(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
    AND agent RLIKE '^[Mozilla|Opera]'
    AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
    ELSE 'unknown'
  END AS identity
  FROM (
    SELECT SPLIT(col, '##@@')[0] AS ip
    , SPLIT(col, '##@@')[1] AS uid
    , SPLIT(col, '##@@')[2] AS time
    , SPLIT(col, '##@@')[3] AS request
    , SPLIT(col, '##@@')[4] AS status
    , SPLIT(col, '##@@')[5] AS bytes
    , SPLIT(col, '##@@')[6] AS referer
    , SPLIT(col, '##@@')[7] AS agent
  FROM ods_raw_log_d
  WHERE dt = ${bdp.system.bizdate}
) a;

Copy the preceding content to the node and click Save in the upper-left corner.

image desc

Double-click the dw_user_info_all node.

INSERT OVERWRITE TABLE dw_user_info_all_d PARTITION (dt='${bdp.system.bizdate}')
SELECT COALESCE(a.uid, b.uid) AS uid
  , b.gender
  , b.age_range
  , b.zodiac
  , a.region
  , a.device
  , a.identity
  , a.method
  , a.url
  , a.referer
  , a.time
FROM (
  SELECT *
  FROM ods_log_info_d
  WHERE dt = ${bdp.system.bizdate}
) a
LEFT OUTER JOIN (
  SELECT *
  FROM ods_user_info_d
  WHERE dt = ${bdp.system.bizdate}
) b
ON a.uid = b.uid;

Copy the preceding content to the node and click Save in the upper-left corner.

image desc

Double-click the rpt_user_info_d node.

INSERT OVERWRITE TABLE rpt_user_info_d PARTITION (dt='${bdp.system.bizdate}')
SELECT uid
  , MAX(region)
  , MAX(device)
  , COUNT(0) AS pv
  , MAX(gender)
  , MAX(age_range)
  , MAX(zodiac)
FROM dw_user_info_all_d
WHERE dt = ${bdp.system.bizdate}
GROUP BY uid;

Copy the preceding content to the node and click Save in the upper-left corner.

image desc

5.4 Submit the business flow

Click Commit to submit the node tasks that have been configured in the business flow, as shown in the following figure.

image desc

The nodes have been submitted.

image desc

5.5 Run the business flow

See the following figure:

image desc

It may take five or six minutes to run the business flow.

Then, create a temporary query to test it.

image desc

image desc

Enter the following command, save it, and then run it. Replace DATETIME with your date. For example, if the task runs on the date 20180717, the business date is 20180716.

select * from rpt_user_info_d where dt=DATETIME limit 100;

image desc

image desc

<font color='red'>The user can cut off the above result picture when doing the experiment and send it to the teacher, indicating that the part of the current chapter has been completed.</font>

5.6 Publish the business flow

When the business flow is submitted, this indicates that the task has entered the development environment. However, tasks in the development environment are not automatically scheduled. Therefore, you must publish the configured task to the production environment.

image desc

Choose the task to be published and add all nodes to the Nodes to Publish list, as shown in the following figure.

image desc

Open the Nodes to Publish list.

image desc

Click Publish All.

image desc

image desc

View the published items on the Release Package list page. Click Operation Center in the upper-right corner.

image desc

As shown in the following figure, select the workshop_start node and its downstream nodes for data population.

image desc

Select the task for data population and click OK. The data population task instance page appears automatically.

image desc

image desc

After a time, the task status changes to successful.

image desc

<font color='red'>Users can cut off the above result picture when they are doing the experiment and send it to the teacher, indicating that the current experiment has been completed.</font>

Reminder:
Before you leave this lab, remember to log out your Alibaba RAM account before you click the ‘stop’ button of your lab. Otherwise you’ll encounter some issue when opening a new lab session in the same browser:

image descimage desc

6. Experiment summary

This experiment demonstrates the process of synchronizing data from OSS and RDS data sources to DataWorks, as well as the process for processing the data. DataWorks allows you to transmit, convert, and integrate data. You can import data from different storage services, convert and develop the data, and ultimately transmit the data to other data systems. Using MaxCompute as its core computing and storage engine, DataWorks provides users with the capabilities to perform offline processing, analysis, and data mining for massive data.