Collect And Process Data Using DataWorks
1. Experiment
1.1 Knowledge points
This experiment uses Alibaba Cloud’s DataWorks, an important PaaS platform product of Alibaba Cloud DTplus. It provides comprehensive product services, such as data integration, development, management, governance, and sharing. DataWorks provides an all-in-one development and management interface to help enterprises focus on mining and exploring the value of their data. This experiment demonstrates the process of synchronizing data from OSS and RDS data sources to DataWorks, as well as the process for processing the data.
1.2 Experiment process
- Create a DataWorks project
- Data collection: Upload log data
- Data processing: Conduct user profiling
1.3 Cloud resources required
1.4 Prerequisites
- Before starting the experiment, please confirm that the previous experiment has been closed normally and exited.
2. Start the experiment environment
Click Start Lab in the upper right corner of the page to start the experiment.
.
After the experiment environment is successfully started, the system has deployed resources required by this experiment in the background, including the ECS instance, RDS instance, Server Load Balancer instance, and OSS bucket. An account consisting of the username and password for logging on to the Web console of Alibaba Cloud is also provided.
After the experiment environment is started and related resources are properly deployed, the experiment starts a countdown. You have two hours to perform experimental operations. After the countdown ends, the experiment stops, and related resources are released. During the experiment, pay attention to the remaining time and arrange your time wisely. Next, use the username and password provided by the system to log on to the Web console of Alibaba Cloud and view related resources:
Go to the logon page of Alibaba Cloud console.
Fill in the sub-user account and click Next.
Fill in the sub-user password and click Log on.
After you successfully log on to the console, the following page is displayed.
3. Create a DataWorks project
In the Alibaba Cloud console, choose DataWorks.
On the Workspace List page, choose US West 1 and click Create Workspace.
Set the project name, select Standard Mode for Mode, and click Next.
Select the MaxCompute engine and click Next.
Set the Instance Display Name, and click Create Workspace.
The creation is complete.
After a while, the status will be displayed as Normal, and the creation is successful.
4. Data collection: Upload log data
4.1 Create a data source
Return to the DataWorks console again and click Data Integration.
Choose the Data Sources, as shown in the following figure.
After configuring the settings as shown in the following figure, click Test Connectivity. After the connectivity test, click Complete to save the data source.
Data Source Name: oss_workshop_log
bucket: new-dataworks-workshop
Endpoint: http://oss-cn-shanghai-internal.aliyuncs.com
AK ID: LTAI4FvGT3iU4xjKotpUMAjS
AK Secret: 9RSUoRmNxpRC9EhC4m9PjuG7Jzy7px
Click Advanced.
Click Confirm.
Click Test connectivity.
After the OSS data source is created, choose the MySQL data source, as shown in the following figure.
After configuring the settings as shown in the following figure, click Test Connectivity. After the connectivity test, click Complete to save the data source.
Data Source Name: rds_workshop_log
RDS Instance ID: rm-2ev0681lc7042g16u
RDS Instance Account: 5600815724958382
Database Name: workshop
Username: workshop
Password: workshop#2017
The data source has now been created.
4.2 Create a Workflow
Click Data Analytics, as shown in the following figure.
Create a Workflow, as shown in the following figure.
As shown in the following figure, set the business flow name and click Create.
Next, go to the Workflow development panel and drag a Zero-Load node (workshop_start) and two data synchronization nodes (rds_rsync_data and oss_rsync_data) onto the panel.
The following figure shows the Zero-Load node name settings.
The settings of both data synchronization nodes are shown in the following figures.
Drag and drop lines to set the workshop_start node as the upstream node of the other two nodes, as shown in the following figure.
The new version of DataWorks sets the input and output nodes for each node, so you must set an input for the workshop_start node. A virtual node in the business flow can be set as the project root node, making it the upstream node. The project root node is generally named “project name_root”.
Double-click the workshop_start node and modify it according to the following figure. Then, click Save in the upper-left corner.
4.4 Create tables
Next, create two tables (ods_raw_log_d and ods_user_info_d).
First, create the ods_raw_log_d table. To do this, click Create Table, as shown in the following figure.
Choose DDL Statement, copy the following content into the dialog box, and then click Generate Table Structure.
CREATE TABLE IF NOT EXISTS ods_raw_log_d (
col STRING
)
PARTITIONED BY (
dt STRING
);
Set the table name as the display name and add it to the development and production environments, as shown in the following figure.
Create the ods_user_info_d table in the same way by running the following creation statement.
CREATE TABLE IF NOT EXISTS ods_user_info_d (
uid STRING COMMENT 'User ID',
gender STRING COMMENT 'Gender',
age_range STRING COMMENT 'Age range',
zodiac STRING COMMENT 'Zodiac sign'
)
PARTITIONED BY (
dt STRING
);
Double-click the OSS data synchronization node to open the node configuration page.
Select a data source, as shown in the following figure.
Select the data destination, as shown in the following figure.
Configure field mapping.Delete other Columns.
Configure channel control with a maximum operating rate of 10 MB/s.
Click Resource Group configuration.
Click Save in the upper-left corner.
Next, configure the RDS data synchronization node. To do this, double-click the node to open the node configuration page.
Select the data source and destination, as shown in the following figure.
Configure field mapping.
Configure channel control with a maximum operating rate of 10 MB/s.
Refer to the figure below to set up a common resource group.
Click Save in the upper-left corner.
4.6 Submit a workflow task
Click the submit button in the upper-left corner.
Click Commit, as shown in the following figure.
The nodes have been submitted.
Click the run button in the upper-left corner.
You can view the running log during operation.
4.7 Confirm that the data is imported successfully
Create a temporary query, as shown in the following figure.
Click Commit.
Run the following SQL statement. Replace DATATIME with your own date. If the task running date is 20180717, the business date is 20180716, If the user is in China, due to time zone relationship, the date may be advanced two days in advance of the current date.
select count(*) from ods_raw_log_d where dt=DATETIME;
select count(*) from ods_user_info_d where dt=DATETIME;
Click Save to run the statement.
The running results are as follows:
Enter and save the following command, and then run it to view the content of the ods_raw_log_d table. Replace DATATIME with your own date. If the task running date is 20180717, the business date is 20180716.
SELECT SPLIT(col, '##@@')[0] AS ip
, SPLIT(col, '##@@')[1] AS uid
, SPLIT(col, '##@@')[2] AS time
, SPLIT(col, '##@@')[3] AS request
, SPLIT(col, '##@@')[4] AS status
, SPLIT(col, '##@@')[5] AS bytes
, SPLIT(col, '##@@')[6] AS referer
, SPLIT(col, '##@@')[7] AS agent
FROM ods_raw_log_d
WHERE dt = DATATIME;
5. Data processing: Conduct user profiling
Next, process the log data that has been collected in MaxCompute and conduct user profiling.
5.1 Create correlation tables
Create the ods_log_info_d table.
CREATE TABLE IF NOT EXISTS ods_log_info_d (
ip STRING COMMENT 'ip',
uid STRING COMMENT 'user ID',
time STRING COMMENT 'yyyymmddhh:mi:ss',
status STRING COMMENT 'server response status code',
bytes STRING COMMENT 'Number of bytes returned to the client',
region STRING COMMENT 'region',
method STRING COMMENT 'HTTP request type',
url STRING COMMENT 'url',
protocol STRING COMMENT 'HTTP protocol version number',
referer STRING COMMENT 'Source url',
device STRING COMMENT 'Terminal type',
identity STRING COMMENT 'Access type crawler feed user unknown'
)
PARTITIONED BY (
dt STRING
);
After setting the table name, submit it to the development environment and production environment in that order.
Create the table dw_user_info_all_d in the same way.
The following is an example table creation statement:
CREATE TABLE IF NOT EXISTS dw_user_info_all_d (
uid STRING COMMENT 'user ID',
gender STRING COMMENT 'sex',
age_range STRING COMMENT 'age range',
zodiac STRING COMMENT 'constellation',
region STRING COMMENT 'region',
device STRING COMMENT 'terminal type',
identity STRING COMMENT 'Access type crawler feed user unknown',
method STRING COMMENT 'http request type',
url STRING COMMENT 'url',
referer STRING COMMENT 'source url',
time STRING COMMENT 'yyyymmddhh:mi:ss'
)
PARTITIONED BY (
dt STRING
);
Create the table rpt_user_info_d in the same way.
The following is an example table creation statement:
CREATE TABLE IF NOT EXISTS rpt_user_info_d (
uid STRING COMMENT 'user ID',
region STRING COMMENT 'region',
device STRING COMMENT 'terminal type',
pv BIGINT COMMENT 'pv',
gender STRING COMMENT 'sex',
age_range STRING COMMENT 'age range',
zodiac STRING COMMENT 'constellation'
)
PARTITIONED BY (
dt STRING
);
After the tables are created, drag three ODPS SQL nodes onto the canvas, as shown in the following figure. These nodes are named ods_log_info_d, dw_user_info_all_d, and rpt_user_info_d. Configure the dependencies as follows.
5.2 Create a user-defined function
Go to the following link in your browser to download an ip2region.jar file.
http://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/attach/85298/cn_zh/1532163718650/ip2region.jar
Create a resource, as shown in the following figure.
Select the downloaded file to upload it and click OK.
Once uploaded, click the submit button in the upper-left corner.
Create a user-defined function (UDF), as shown in the following figure.
Name it getregion.
Configure the function in the Register Function dialog box by specifying the class name, description, command format, and parameter description. After the configuration is complete, click Save and Submit in the upper-left corner.
Function Name: getregion
Class Name: org.alidata.odps.udf.Ip2Region
Resources: ip2region.jar
Description: IP address translation region
Expression Syntax: getregion('ip')
Double-click the ods_log_info_d node to open the node configuration page and compose the processing logic.
INSERT OVERWRITE TABLE ods_log_info_d PARTITION (dt=${bdp.system.bizdate})
SELECT ip
, uid
, time
, status
, bytes --Use a custom UDF to get the region by IP
, getregion(ip) AS region --The request is differentiated into three fields using a regular method
, regexp_substr(request, '(^[^ ]+ )') AS method
, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
, regexp_substr(request, '([^ ]+$)') AS protocol --Get a more accurate url with regular clear refer
, regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer --Get terminal information and access form through agent
, CASE
WHEN TOLOWER(agent) RLIKE 'android' THEN 'android'
WHEN TOLOWER(agent) RLIKE 'iphone' THEN 'iphone'
WHEN TOLOWER(agent) RLIKE 'ipad' THEN 'ipad'
WHEN TOLOWER(agent) RLIKE 'macintosh' THEN 'macintosh'
WHEN TOLOWER(agent) RLIKE 'windows phone' THEN 'windows_phone'
WHEN TOLOWER(agent) RLIKE 'windows' THEN 'windows_pc'
ELSE 'unknown'
END AS device
, CASE
WHEN TOLOWER(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
WHEN TOLOWER(agent) RLIKE 'feed'
OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
WHEN TOLOWER(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
AND agent RLIKE '^[Mozilla|Opera]'
AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
ELSE 'unknown'
END AS identity
FROM (
SELECT SPLIT(col, '##@@')[0] AS ip
, SPLIT(col, '##@@')[1] AS uid
, SPLIT(col, '##@@')[2] AS time
, SPLIT(col, '##@@')[3] AS request
, SPLIT(col, '##@@')[4] AS status
, SPLIT(col, '##@@')[5] AS bytes
, SPLIT(col, '##@@')[6] AS referer
, SPLIT(col, '##@@')[7] AS agent
FROM ods_raw_log_d
WHERE dt = ${bdp.system.bizdate}
) a;
Copy the preceding content to the node and click Save in the upper-left corner.
Double-click the dw_user_info_all node.
INSERT OVERWRITE TABLE dw_user_info_all_d PARTITION (dt='${bdp.system.bizdate}')
SELECT COALESCE(a.uid, b.uid) AS uid
, b.gender
, b.age_range
, b.zodiac
, a.region
, a.device
, a.identity
, a.method
, a.url
, a.referer
, a.time
FROM (
SELECT *
FROM ods_log_info_d
WHERE dt = ${bdp.system.bizdate}
) a
LEFT OUTER JOIN (
SELECT *
FROM ods_user_info_d
WHERE dt = ${bdp.system.bizdate}
) b
ON a.uid = b.uid;
Copy the preceding content to the node and click Save in the upper-left corner.
Double-click the rpt_user_info_d node.
INSERT OVERWRITE TABLE rpt_user_info_d PARTITION (dt='${bdp.system.bizdate}')
SELECT uid
, MAX(region)
, MAX(device)
, COUNT(0) AS pv
, MAX(gender)
, MAX(age_range)
, MAX(zodiac)
FROM dw_user_info_all_d
WHERE dt = ${bdp.system.bizdate}
GROUP BY uid;
Copy the preceding content to the node and click Save in the upper-left corner.
5.4 Submit the business flow
Click Commit to submit the node tasks that have been configured in the business flow, as shown in the following figure.
The nodes have been submitted.
5.5 Run the business flow
See the following figure:
It may take five or six minutes to run the business flow.
Then, create a temporary query to test it.
Enter the following command, save it, and then run it. Replace DATETIME with your date. For example, if the task runs on the date 20180717, the business date is 20180716.
select * from rpt_user_info_d where dt=DATETIME limit 100;
<font color='red'>The user can cut off the above result picture when doing the experiment and send it to the teacher, indicating that the part of the current chapter has been completed.</font>
5.6 Publish the business flow
When the business flow is submitted, this indicates that the task has entered the development environment. However, tasks in the development environment are not automatically scheduled. Therefore, you must publish the configured task to the production environment.
Choose the task to be published and add all nodes to the Nodes to Publish list, as shown in the following figure.
Open the Nodes to Publish list.
Click Publish All.
View the published items on the Release Package list page. Click Operation Center in the upper-right corner.
As shown in the following figure, select the workshop_start node and its downstream nodes for data population.
Select the task for data population and click OK. The data population task instance page appears automatically.
After a time, the task status changes to successful.
<font color='red'>Users can cut off the above result picture when they are doing the experiment and send it to the teacher, indicating that the current experiment has been completed.</font>
Reminder:
Before you leave this lab, remember to log out your Alibaba RAM account before you click the ‘stop’ button of your lab. Otherwise you’ll encounter some issue when opening a new lab session in the same browser:
6. Experiment summary
This experiment demonstrates the process of synchronizing data from OSS and RDS data sources to DataWorks, as well as the process for processing the data. DataWorks allows you to transmit, convert, and integrate data. You can import data from different storage services, convert and develop the data, and ultimately transmit the data to other data systems. Using MaxCompute as its core computing and storage engine, DataWorks provides users with the capabilities to perform offline processing, analysis, and data mining for massive data.