• 文档
  • 控制台
  • 登录
  • 立即注册
    目前不支持用户自主注册,如需注册账号,请联系400-080-1100
微服务平台CSP开发指南
最近更新时间:

6 在 CSP 上使用分布式调度功能(兼容 xxljob)

6.1 前提

 开通 CSP 平台服务。

 请确保您的机器上已经安装了 Java 和 Maven。

 目前版本只支持容器应用和 Spring Cloud 开发框架。

6.2 创建Demo应用

(1) 登录 CSP 平台。

(2) 在“应用管理 > 命名空间”列表页面,创建命名空间 job。

(3) 在“应用管理 > 应用组”页面,选中创建的命名空间 job,创建应用组 jobGrp。

(4) 在“应用管理 > 容器应用”页面,单击“创建”,创建应用 jobdemo1,单击“开启分布式调度”,Demo 应用示例如下:

1.png

(5) Demo 编程指导:

 Xxljob 模式:兼容 xxljob 的 bean 模式,使用@Xxljob 注解,开发定时任务,注解的值即后续“创建任务”中的“JobHandler”参数。

示例代码:

package com.example.xxljobexecutor.job;


import com.xxl.job.core.context.XxlJobHelper;

import com.xxl.job.core.handler.annotation.XxlJob;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;


import java.io.BufferedInputStream;

import java.io.BufferedReader;

import java.io.DataOutputStream;

import java.io.InputStreamReader;

import java.net.HttpURLConnection;

import java.net.URL;

import java.util.Arrays;

import java.util.concurrent.TimeUnit;


@Slf4j

@Component

public class SampleXxlJob {



        /**

            * 1、简单任务示例(Bean 模式)

            */

            @XxlJob("demoJobHandler")

            public void demoJobHandler() throws Exception {

                    XxlJobHelper.log("XXL-JOB, Hello World.");


                     for (int i = 0; i < 5; i++) {

                        XxlJobHelper.log("beat at:" + i);

                        TimeUnit.SECONDS.sleep(2);

                    }

                    // default success

        }



        /**

            * 2、分片广播任务

            */

         @XxlJob("shardingJobHandler")

         public void shardingJobHandler() throws Exception {


                // 分片参数

                 int shardIndex = XxlJobHelper.getShardIndex();

                 int shardTotal = XxlJobHelper.getShardTotal();


                 XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex,

shardTotal);


                // 业务逻辑

                for (int i = 0; i < shardTotal; i++) {

                        if (i == shardIndex) {

                            XxlJobHelper.log("第 {} 片, 命中分片开始处理", i);

                        } else {

                            XxlJobHelper.log("第 {} 片, 忽略", i);

                        }

                }

        }



         /**

            * 3、命令行任务

        */

         @XxlJob("commandJobHandler")

         public void commandJobHandler() throws Exception {

                String command = XxlJobHelper.getJobParam();

                 int exitValue = -1;


                BufferedReader bufferedReader = null;

                 try {

                         // command process

                        ProcessBuilder processBuilder = new ProcessBuilder();

                        processBuilder.command(command);

                        processBuilder.redirectErrorStream(true);


                         Process process = processBuilder.start();

                        //Process process = Runtime.getRuntime().exec(command);

                        BufferedInputStream bufferedInputStream = new

BufferedInputStream(process.getInputStream());

                        bufferedReader = new BufferedReader(new

InputStreamReader(bufferedInputStream));


                         // command log

                         String line;

                         while ((line = bufferedReader.readLine()) != null) {

                                 XxlJobHelper.log(line);

                        }


                         // command exit

                         process.waitFor();

                         exitValue = process.exitValue();

                 } catch (Exception e) {

                         XxlJobHelper.log(e);

                } finally {

                         if (bufferedReader != null) {

                        bufferedReader.close();

                        }

                 }


                if (exitValue == 0) {

                        // default success

                 } else {

                         XxlJobHelper.handleFail("command exit value("+exitValue+") is failed");

                 }

        }


         /**

            * 4、跨平台 Http 任务

             * 参数示例:

            * "url: http://www.baidu.com\n" +

             * "method: get\n" +

            * "data: content\n";

            */

         @XxlJob("httpJobHandler")

        public void httpJobHandler() throws Exception {

                 // param parse

                String param = XxlJobHelper.getJobParam();

                 if (param==null || param.trim().length()==0) {

                         XxlJobHelper.log("param["+ param +"] invalid.");


                         XxlJobHelper.handleFail();

                 }


                 String[] httpParams = param.split("\n");

                 String url = null;

                 String method = null;

                 String data = null;

                 for (String httpParam: httpParams) {

                         if (httpParam.startsWith("url:")) {

                                 url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();

                         }

                         if (httpParam.startsWith("method:")) {

                                 method = httpParam.substring(httpParam.indexOf("method:") +

 7).trim().toUpperCase();

                         }

                         if (httpParam.startsWith("data:")) {

                                data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();

                         }

                }


                 // param valid

                 if (url==null || url.trim().length()==0) {

                         XxlJobHelper.log("url["+ url +"] invalid.");


                        XxlJobHelper.handleFail();

                         return;

                 }

                 if (method==null || !Arrays.asList("GET", "POST").contains(method)) {

                         XxlJobHelper.log("method["+ method +"] invalid.");

                         XxlJobHelper.handleFail();


                         return;

                 }

                 boolean isPostMethod = method.equals("POST");


                // request

                HttpURLConnection connection = null;

                 BufferedReader bufferedReader = null;

                 try {

                         // connection

                 URL realUrl = new URL(url);

                 connection = (HttpURLConnection) realUrl.openConnection();


                 // connection setting

                connection.setRequestMethod(method);

                 connection.setDoOutput(isPostMethod);

                 connection.setDoInput(true);

                 connection.setUseCaches(false);

                 connection.setReadTimeout(5 * 1000);

                 connection.setConnectTimeout(3 * 1000);

                 connection.setRequestProperty("connection", "Keep-Alive");

                 connection.setRequestProperty("Content-Type",

"application/json;charset=UTF-8");

                connection.setRequestProperty("Accept-Charset",

"application/json;charset=UTF-8");

                

                 // do connection

                 connection.connect();

                

                 // data

                 if (isPostMethod && data!=null && data.trim().length()>0) {

                         DataOutputStream dataOutputStream = new

DataOutputStream(connection.getOutputStream());

                         dataOutputStream.write(data.getBytes("UTF-8"));

                         dataOutputStream.flush();

                         dataOutputStream.close();

                 }

                

                 // valid StatusCode

                 int statusCode = connection.getResponseCode();

                 if (statusCode != 200) {

                             throw new RuntimeException("Http Request StatusCode(" + statusCode

+ ") Invalid.");

                 }


                 // result

                 bufferedReader = new BufferedReader(new

InputStreamReader(connection.getInputStream(), "UTF-8"));

                 StringBuilder result = new StringBuilder();


                String line;

                while ((line = bufferedReader.readLine()) != null) {

                         result.append(line);

                 }

                 String responseMsg = result.toString();

                 XxlJobHelper.log(responseMsg);


                 return;

         } catch (Exception e) {

                 XxlJobHelper.log(e);


                 XxlJobHelper.handleFail();

                 return;

         } finally {

                 try {

                         if (bufferedReader != null) {

                             bufferedReader.close();

                         }

                         if (connection != null) {

                             connection.disconnect();

                         }

                } catch (Exception e2) {

                         XxlJobHelper.log(e2);

                 }

           }

     }

    /**

    * 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑;

    */

    @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")

    public void demoJobHandler2() throws Exception {

        XxlJobHelper.log("XXL-JOB, Hello World.");

    }


    public void init(){

             log.info("init");

        }

        public void destroy(){

             log.info("destory");

         }

}

 Shell、Python、PHP、Nodejs 等脚本语言:利用执行器所在机器的环境执行脚本,即在 GLUE 代码框中,自己输入代码。前提是执行器所在的机器有对应的环境,如果是容器应用,使用镜像部署,自己准备镜像,确保镜像中有对应的环境;如果是 ECS 应用,请在 ECS 中安装相应的环境。

 Go 语言代码:

package main

import (

"fmt"

xxl "github.com/xxl-job/xxl-job-executor-go"

"github.com/xxl-job/xxl-job-executor-go/example/task"

"log"

)


func main() {

    exec := xxl.NewExecutor(

            xxl.ServerAddr("http://127.0.0.1/xxl-job-admin"),

             xxl.AccessToken(""), //请求令牌(默认为空)

             xxl.ExecutorIp("127.0.0.1"), //可自动获取

             xxl.ExecutorPort("9999"), //默认 9999(非必填)

             xxl.RegistryKey("golang-jobs"), //执行器名称

             xxl.SetLogger(&logger{}), //自定义日志

)

exec.Init()

//设置日志查看 handler

exec.LogHandler(func(req *xxl.LogReq) *xxl.LogRes {

            return &xxl.LogRes{Code: xxl.SuccessCode, Msg: "", Content: xxl.LogResContent{

                     FromLineNum: req.FromLineNum,

                     ToLineNum: 2,

                     LogContent: "这个是自定义日志 handler",

                     IsEnd: true,

             }}

})

//注册任务 handler

exec.RegTask("task.test", task.Test)

exec.RegTask("task.test2", task.Test2)

exec.RegTask("task.panic", task.Panic)

log.Fatal(exec.Run())

}


//xxl.Logger 接口实现

type logger struct{}


func (l *logger) Info(format string, a ...interface{}) {

 fmt.Println(fmt.Sprintf("自定义日志 - "+format, a...))

}

func (l *logger) Error(format string, a ...interface{}) {

 log.Println(fmt.Sprintf("自定义日志 - "+format, a...))

}

(6) Demo 配置文件配置项说明:

1.png

 xxl.job.admin.addresses:xxljob 调度中心地址,默认值即可,通过 CSP 部署容器应用时会被替换为 CSP 下的 xxljob 调度中心地址。

 xxl.job.accessToken:访问调度中心携带的 token,默认为空即可。

 xxl.job.executor.appname:执行器名称,默认值即可,通过 CSP 部署容器应用时会和容器应用名保持一致。

 xxl.job.executor.address:执行器地址,建议为空,为空时会自动获取本机的 IP 地址,通过 CSP 部署容器应用时会被替换为可调度的地址。

 xxl.job.executor.ip:执行器 IP,默认为空即可。

 xxl.job.executor.port:执行器端口,默认 9999 即可,通过 CSP 部署容器应用时会被替换,暂时固定为 9999。

 xxl.job.executor.logpath:执行器日志路径,默认值即可。

 xxl.job.executor.logretentiondays:日志过期时间,默认值即可。

(7) 在“应用管理 > 容器应用”页面,创建开启了分布式调度开关的应用后,会默认在“分布式调度 > 应用管理”页面创建同名执行器应用。

1.png

(8) 使用 DEMO 生成的 jar 包,部署容器应用。暴露两个端口,其中 8082 为业务端口,9999 为执行器内置服务器端口,用于和调度中心通信使用。

2.png

(9) 部署成功后,DEMO 应用同时作为执行器会自动注册到调度中心。

1.png

(10) 在“分布式调度 > 任务管理”页面,选中刚才创建的执行器应用,创建任务。

2.png

1.png

(11) 执行任务:执行一次或根据“调度类型”下的调度规则启动定时任务。

1.png

(12) 查看日志。

2.png

6.3 创建多脚本语言应用

场景简介

是否支持多语言,取决于执行器应用所在机器的环境。

 如果是容器应用,就是 Pod 中安装的脚本语言环境。

 如果是 ECS 应用,就是虚拟机中安装的脚本语言环境。

下面以 Spring 应用为例,开启分布式调度,创建多脚本语言应用。 

操作步骤

(1) 在容器应用页,单击“创建”,根据下图中指示,完成创建。

1.png

(2) 在“应用详情 > 版本管理”页,单击“新建版本”,文件上传方式选择“镜像”。

2.png

(3) 准备基础镜像:安装 Python(这里以 Python2 为例,可按需换成 Python3)、PHP、Ndejs 等脚本语言。

1.png

(4) 准备 Dockerfile。

1.png

FROM image.cestc.cn/csp/cdp-jdk-agent:8-mutiLanguage

ENV CAM_OPTS ''

ENV CSG_OPTS ''

ENV JAVA_OPTS ''

ENV CSP_JOB_OPTS ''

ADD xxl-job-executor-sample-springboot-2.3.0.jar /workdir/

ADD agent /workdir/

CMD cd /workdir && java -javaagent:/workdir/agent/skywalking-agent.jar $JAVA_OPTS

$CAM_OPTS -jar /workdir/xxl-job-executor-sample-springboot-2.3.0.jar $CSG_OPTS

$CSP_JOB_OPTS

SM2.png

说明:

 Jar 包可以使用 6.2-(4)中的 xxljob demo 包,也可以使用接入了 xxljob SDK 的其他 Jar包。

 image.cestc.cn/csp/cdp-jdk-agent:8-mutiLanguage:6.3-(4)打的基础镜像。

 CAM_OPTS、CSG_OPTS 为监控参数和治理参数,详情参考章节 4 镜像部署。

 CSP_JOB_OPTS:分布式调度参数,加在环境变量和 java -jar 启动参数,部署应用后会

    覆盖掉该参数的值。

SM2.png

(5) 制作镜像并保存成 tar 包(docker build & docker save 命令,详情见章节 4)。

1.png

(6) 上传至应用所在的 CKS 下的 ECS 节点。

1.png

1.png

(7) 按照镜像部署的提示,Push 镜像到租户的 CCR 镜像仓库。

1.png

(8) 部署应用。

使用 CSP 制作的镜像都是双架构的,自己制作的镜像是单架构的,部署的时候可以加上亲和性,指定部署到对应架构的机器上。

kubectl get node --show-labels 查看标签,找到架构相关的标签。

(9) 创建四个任务,运行模式分别选择 Shell、Python、Nodejs、PHP,Glue 源代码框可以输入脚本语言定时任务代码,可以参考默认代码

1.png

(10) 执行任务:执行结果显示成功即可。

1.png

6.4 创建Go语言应用

场景简介

Go 语言的支持和 Java 一样,采用 SDK 的形式。创建任务的时候,需要选择 xxljob 模式执行任务,xxljob 模式即是 SDK 模式。 

限制与指导

只支持普通应用。

创建时,需开启分布式调度。 

操作步骤

(1) 在容器应用页,单击“创建”,创建应用。

(2) 在版本管理页,单击“新建版本”,文件上传方式选择“镜像”。

(3) 准备 Dockerfile。附件 11 为 Go 语言 Demo 代码,内含 Dockerfile。

1.png

(4) 设置参数:

        exec := xxl.NewExecutor(

                //xxl.ServerAddr(viper.GetString("xxl.job.admin.addresses")),

                xxl.ServerAddr(os.Getenv("XXL_JOB_ADMIN_ADDRESSES")),

                xxl.AccessToken(viper.GetString("xxl.job.accessToken")), //请求令牌(默认为空)

                //xxl.ExecutorIp(viper.GetString("xxl.job.executor.ip")), //可自动获取

                xxl.ExecutorPort(viper.GetString("xxl.job.executor.port")), //默认 9999(非必填)

                //xxl.RegistryKey(viper.GetString("xxl.job.executor.appname")), //执行器名称

                xxl.RegistryKey(os.Getenv("XXL_JOB_EXECUTOR_APPNAME")),

                xxl.SetLogger(&logger{}), //自定义日志

        )

SM2.png

注意:

 XXL_JOB_EXECUTOR_APPNAME:执行器名,从环境变量获取,部署应用会被替换。

 XXL_JOB_ADMIN_ADDRESSES:调度中心地址,从环境变量获取,部署应用会被替换。

 ExecutorIp:不要设置,自动获取所在环境的 IP。

 AccessToken:建议为空,CSP 使用了另一套鉴权机制。

 ExecutorPort:执行器内部服务器端口,可以随意设置。

SM2.png

1.png

(5) 制作镜像并保存成 tar 包:Go 镜像使用多阶段构建,Docker 17.05 以上版本支持。

        FROM image.cestc.cn/csm/golang:1.16.3 as builder

        WORKDIR /app

        ADD . /app

        RUN make build-local


        FROM image.cestc.cn/csm/alpine:base

        WORKDIR /

        COPY --from=builder /app/xxl-job-sample-golang /xxl-job-sample-golang

        COPY --from=builder /app/conf /conf

        CMD ["/xxl-job-sample-golang", "--conf", "conf/application.yaml"]

        1.png

(7) 加载镜像,按照镜像部署提示,Push 镜像到租户的 CCR 镜像仓库。

(8) 部署应用:放开业务端口和执行器内置服务器端口。

1.png

(9) 创建任务,运行模式选择 xxljob,jobHandler 任务处理器填写 main 函数中注册的任务名,例如:demo 代码中是“ask.test”、“task.test2”、“task.painc”。

1.png

2.png

(10) 执行任务,执行结果显示成功即可。

1.png

意见反馈

文档内容是否对您有帮助?

如您有其他疑问,您也可以通过在线客服来与我们联系探讨 在线客服

联系我们
回到顶部