DataX TSDBReader


1 快速介绍

TSDBReader插件实现了从阿里云TSDB读取数据。阿里云时间序列数据库(Time Series Database , 简称 TSDB)是一种集时序数据高效读写,压缩存储,实时计算能力为一体的数据库服务,可广泛应用于物联网和互联网领域,实现对设备及业务服务的实时监控,实时预测告警。

2 实现原理

在底层实现上,TSDBReader通过HTTP请求链接到阿里云TSDB实例,利用 /api/query或者/api/mquery接口将数据点扫描出来(更多细节详见:时序数据库TSDB – HTTP API概览)。而整个同步的过程,是通过时间线和查询时间线范围进行切分。

3 功能说明

3.1 配置样例

配置一个从 阿里云TSDB数据库同步抽取数据到本地的作业,并以时序数据的格式输出:

时序数据样例:

{"metric":"m","tags":{"app":"a19","cluster":"c5","group":"g10","ip":"i999","zone":"z1"},"timestamp":1546272263,"value":1}
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "tsdbreader",
          "parameter": {
            "sinkDbType": "TSDB",
            "endpoint": "http://localhost:8242",
            "column": [
              "m"
            ],
            "splitIntervalMs": 60000,
            "beginDateTime": "2019-01-01 00:00:00",
            "endDateTime": "2019-01-01 01:00:00"
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 3
      }
    }
  }
}

配置一个从阿里云 TSDB数据库同步抽取数据到本地的作业,并以关系型数据的格式输出:

关系型数据样例:

m	1546272125	a1	c1	g2	i3021	z4	1.0
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "tsdbreader",
          "parameter": {
            "sinkDbType": "RDB",
            "endpoint": "http://localhost:8242",
            "column": [
              "__metric__",
              "__ts__",
              "app",
              "cluster",
              "group",
              "ip",
              "zone",
              "__value__"
            ],
            "metric": [
              "m"
            ],
            "splitIntervalMs": 60000,
            "beginDateTime": "2019-01-01 00:00:00",
            "endDateTime": "2019-01-01 01:00:00"
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 3
      }
    }
  }
}

配置一个从阿里云TSDB数据库同步抽取单值数据到ADB的作业:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "tsdbreader",
          "parameter": {
            "sinkDbType": "RDB",
            "endpoint": "http://localhost:8242",
            "column": [
              "__metric__",
              "__ts__",
              "app",
              "cluster",
              "group",
              "ip",
              "zone",
              "__value__"
            ],
            "metric": [
              "m"
            ],
            "splitIntervalMs": 60000,
            "beginDateTime": "2019-01-01 00:00:00",
            "endDateTime": "2019-01-01 01:00:00"
          }
        },
        "writer": {
          "name": "adswriter",
          "parameter": {
            "username": "******",
            "password": "******",
            "column": [
              "`metric`",
              "`ts`",
              "`app`",
              "`cluster`",
              "`group`",
              "`ip`",
              "`zone`",
              "`value`"
            ],
            "url": "http://localhost:3306",
            "schema": "datax_test",
            "table": "datax_test",
            "writeMode": "insert",
            "opIndex": "0",
            "batchSize": "2"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 3
      }
    }
  }
}

配置一个从阿里云TSDB数据库同步抽取多值数据到ADB的作业:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "tsdbreader",
          "parameter": {
            "sinkDbType": "RDB",
            "endpoint": "http://localhost:8242",
            "column": [
              "__metric__",
              "__ts__",
              "app",
              "cluster",
              "group",
              "ip",
              "zone",
              "load",
              "memory",
              "cpu"
            ],
            "metric": [
              "m_field"
            ],
            "field": {
              "m_field": [
                "load",
                "memory",
                "cpu"
              ]
            },
            "splitIntervalMs": 60000,
            "beginDateTime": "2019-01-01 00:00:00",
            "endDateTime": "2019-01-01 01:00:00"
          }
        },
        "writer": {
          "name": "adswriter",
          "parameter": {
            "username": "******",
            "password": "******",
            "column": [
              "`metric`",
              "`ts`",
              "`app`",
              "`cluster`",
              "`group`",
              "`ip`",
              "`zone`",
              "`load`",
              "`memory`",
              "`cpu`"
            ],
            "url": "http://localhost:3306",
            "schema": "datax_test",
            "table": "datax_test_multi_field",
            "writeMode": "insert",
            "opIndex": "0",
            "batchSize": "2"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 3
      }
    }
  }
}

配置一个从阿里云TSDB数据库同步抽取单值数据到ADB的作业,并指定过滤部分时间线:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "tsdbreader",
          "parameter": {
            "sinkDbType": "RDB",
            "endpoint": "http://localhost:8242",
            "column": [
              "__metric__",
              "__ts__",
              "app",
              "cluster",
              "group",
              "ip",
              "zone",
              "__value__"
            ],
            "metric": [
              "m"
            ],
            "tag": {
              "m": {
                "app": "a1",
                "cluster": "c1"
              }
            },
            "splitIntervalMs": 60000,
            "beginDateTime": "2019-01-01 00:00:00",
            "endDateTime": "2019-01-01 01:00:00"
          }
        },
        "writer": {
          "name": "adswriter",
          "parameter": {
            "username": "******",
            "password": "******",
            "column": [
              "`metric`",
              "`ts`",
              "`app`",
              "`cluster`",
              "`group`",
              "`ip`",
              "`zone`",
              "`value`"
            ],
            "url": "http://localhost:3306",
            "schema": "datax_test",
            "table": "datax_test",
            "writeMode": "insert",
            "opIndex": "0",
            "batchSize": "2"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 3
      }
    }
  }
}

配置一个从阿里云TSDB数据库同步抽取多值数据到ADB的作业,并指定过滤部分时间线:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "tsdbreader",
          "parameter": {
            "sinkDbType": "RDB",
            "endpoint": "http://localhost:8242",
            "column": [
              "__metric__",
              "__ts__",
              "app",
              "cluster",
              "group",
              "ip",
              "zone",
              "load",
              "memory",
              "cpu"
            ],
            "metric": [
              "m_field"
            ],
            "field": {
              "m_field": [
                "load",
                "memory",
                "cpu"
              ]
            },
            "tag": {
              "m_field": {
                "ip": "i999"
              }
            },
            "splitIntervalMs": 60000,
            "beginDateTime": "2019-01-01 00:00:00",
            "endDateTime": "2019-01-01 01:00:00"
          }
        },
        "writer": {
          "name": "adswriter",
          "parameter": {
            "username": "******",
            "password": "******",
            "column": [
              "`metric`",
              "`ts`",
              "`app`",
              "`cluster`",
              "`group`",
              "`ip`",
              "`zone`",
              "`load`",
              "`memory`",
              "`cpu`"
            ],
            "url": "http://localhost:3306",
            "schema": "datax_test",
            "table": "datax_test_multi_field",
            "writeMode": "insert",
            "opIndex": "0",
            "batchSize": "2"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 3
      }
    }
  }
}

配置一个从阿里云TSDB数据库同步抽取单值数据到另一个阿里云TSDB数据库 的作业:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "tsdbreader",
          "parameter": {
            "sinkDbType": "TSDB",
            "endpoint": "http://localhost:8242",
            "column": [
              "m"
            ],
            "splitIntervalMs": 60000,
            "beginDateTime": "2019-01-01 00:00:00",
            "endDateTime": "2019-01-01 01:00:00"
          }
        },
        "writer": {
          "name": "tsdbwriter",
          "parameter": {
            "endpoint": "http://localhost:8240"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 3
      }
    }
  }
}

配置一个从阿里云TSDB数据库同步抽取多值数据到另一个阿里云TSDB数据库的作业:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "tsdbreader",
          "parameter": {
            "sinkDbType": "TSDB",
            "endpoint": "http://localhost:8242",
            "column": [
              "m_field"
            ],
            "field": {
              "m_field": [
                "load",
                "memory",
                "cpu"
              ]
            },
            "splitIntervalMs": 60000,
            "beginDateTime": "2019-01-01 00:00:00",
            "endDateTime": "2019-01-01 01:00:00"
          }
        },
        "writer": {
          "name": "tsdbwriter",
          "parameter": {
            "multiField": true,
            "endpoint": "http://localhost:8240"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 3
      }
    }
  }
}

3.2 参数说明

  • name
    • 描述:本插件的名称
    • 必选:是
    • 默认值:tsdbreader
  • parameter
    • sinkDbType
      • 描述:目标数据库的类型
      • 必选:否
      • 默认值:TSDB

注意:目前支持TSDB和RDB两个取值。其中,TSDB包括阿里云TSDB、OpenTSDB、InfluxDB、Prometheus和TimeScale。RDB包括ADB、MySQL、Oracle、PostgreSQL和DRDS等。

    • endpoint
      • 描述:阿里云TSDB的HTTP连接地址
      • 必选:是
      • 格式:http://IP:Port
      • 默认值:无
    • column
      • 描述:TSDB场景下:数据迁移任务需要迁移的Metric列表;RDB 场景下:映射到关系型数据库中的表字段,且增加 __metric__、__ts__ 和 __value__ 三个字段,其中 __metric__ 用于映射度量字段,__ts__ 用于映射 timestamp 字段,而 __value__ 仅适用于单值场景,用于映射度量值,多值场景下,直接指定 field 字段即可
      • 必选:是
      • 默认值:无
    • metric
      • 描述:仅适用于RDB场景下,表示数据迁移任务需要迁移的 Metric 列表
      • 必选:否
      • 默认值:无
    • field
      • 描述:仅适用于多值场景下,表示数据迁移任务需要迁移的Field 列表
      • 必选:否
      • 默认值:无
    • tag
      • 描述:数据迁移任务需要迁移的TagK和TagV,用于进一步过滤时间线
      • 必选:否
      • 默认值:无
    • splitIntervalMs
      • 描述:用于DataX内部切分Task,每个Task只查询一小部分的时间段
      • 必选:是
      • 默认值:无

注意:单位是 ms 毫秒

  • beginDateTime
    • 描述:和endDateTime配合使用,用于指定哪个时间段内的数据点,需要被迁移
    • 必选:是
    • 格式:yyyy-MM-dd HH:mm:ss
    • 默认值:无

注意:指定起止时间会自动忽略分钟和秒,转为整点时刻,例如2019-4-18 的 [3:35, 4:55)会被转为 [3:00, 4:00)

  • endDateTime
    • 描述:和beginDateTime配合使用,用于指定哪个时间段内的数据点,需要被迁移
    • 必选:是
    • 格式:yyyy-MM-dd HH:mm:ss
    • 默认值:无

注意:指定起止时间会自动忽略分钟和秒,转为整点时刻,例如2019-4-18 的 [3:35, 4:55) 会被转为 [3:00, 4:00)

3.3 类型转换

DataX内部类型

TSDB数据类型

string

TSDB数据点序列化字符串,包括timestamp、metric、tags、fields和value

4 约束限制

4.2 如果存在某一个 Metric 下在一个小时范围内的数据量过大,可能需要通过 -j 参数调整 JVM 内存大小

考虑到下游Writer如果写入速度不及TSDB Reader的查询数据,可能会存在积压的情况,因此需要适当地调整JVM参数。以“从阿里云TSDB数据库同步抽取数据到本地的作业”为例,启动命令如下:

 python datax/bin/datax.py tsdb2stream.json -j "-Xms4096m -Xmx4096m"

4.3 指定起止时间会自动被转为整点时刻

指定起止时间会自动被转为整点时刻,例如 2019-4-18 的 [3:35, 3:55) 会被转为 [3:00, 4:00)。


评论区(0)

评论