Aliyun Log Python SDK¶
User Guide¶
Introduction¶
This is the open source version of Python SDK for AliCloud Log Service. It’s a Python programming interfaces of Alicloud Log Service, providing encapsulations of Log Service Rest API. It helps Pythoner to connect to Alicloud Log Service more efficiently.
Refer to the doc: http://aliyun-log-python-sdk.readthedocs.io/.
Don’t want to write code? Try the CLI which covers almost all features of this SDK.
Features¶
- Wrap all SLS Rest API (Management, data manipulation, consumer group etc)
- Consumer Group high level Class Support.
- Python Logging handler util
- High level operations: auto paging, auto retry till complete, copy project/logstore settings, arrange_shard, resource usage etc.
- Elasticsearch data migration
- ETL feature: copy data cross logstore, transform data cross logstore with powerful ETL config.
Supported Python:¶
- Python 2.6
- Python 2.7
- Python 3.3
- Python 3.4
- Python 3.5
- Python 3.6
- Python 3.7
- Pypy2
- Pypy3
Supported Log Service API¶
- Log Service API 0.6
Change Logs¶
Installation¶
pip install -U aliyun-log-python-sdk
Sample Code:¶
Complete API Reference¶
Other resources¶
- Alicloud Log Service homepage:https://www.alibabacloud.com/product/log-service
- Alicloud Log Service doc:https://www.alibabacloud.com/help/product/28958.htm
- for any issues, please submit support tickets
User Guide (中文)¶
基本介绍¶
这是Log Service SDK for Python的开源版本。Log Service SDK for Python是阿里云日志服务 (Log Service)API的Python编程接口,提供了对于Log Service Rest API所有接口的封装 和支持,帮助Python开发人员更快编程使用阿里云Log Service服务。
参考文档: http://aliyun-log-python-sdk.readthedocs.io/
不想写代码? 可以试一下命令行控制台, 其覆盖了这个SDK的几乎所有功能.
具体功能¶
- 封装所有Rest API (管理, 数据操作, 消费组等)
- 消费组高阶类支持
- Python日志模块的Handler
- 高阶操作支持: 自动分页, 自动未完成重试, 复制Project/Logstore配置, 调整更大的Shard读写数, 查看资源使用等.
- Elasticsearch 数据迁移
- 数据ETL功能: 按照shard/时间高速跨logstore复制数据, 根据灵活的配置规则, 对数据进行批量或持续可并发的数据ETL转换.
支持Python版本¶
- Python 2.6
- Python 2.7
- Python 3.3
- Python 3.4
- Python 3.5
- Python 3.6
- Python 3.7
- Pypy2
- Pypy3
支持API版本¶
- Log Service API 0.6
安装¶
pip install -U aliyun-log-python-sdk
如果提示time-out
之类的错误,表示网络不通,建议可以加上国内清华的索引试一试:
pip install -U aliyun-log-python-sdk -i https://pypi.tuna.tsinghua.edu.cn/simple
如果存在安装Regex失败的错误,
可以参考使用yun
/apt-get
或者手动安装一下python-devel
https://rpmfind.net/linux/rpm2html/search.php?query=python-devel
配置SDK¶
参考SDK配置 获得访问秘钥的ID和Key以及访问入口Endpoint, 构建一个LogClient的客户端.
from aliyun.log import LogClient
# “华东 1 (杭州)” Region 的日志服务入口。
endpoint = 'cn-hangzhou.log.aliyuncs.com'
# 用户访问秘钥对中的 AccessKeyId。
accessKeyId = 'ABCDEFGHIJKLJMN'
# 用户访问秘钥对中的 AccessKeySecret。
accessKey = 'OPQRSTUVWXYZ'
client = LogClient(endpoint, accessKeyId, accessKey)
# 使用client的方法来操作日志服务
数据采集配置¶
管理日志项目¶
- 获取列表
列出本region下面的所有可见项目:
res = client.list_project()
res.log_print()
注意:
默认获取100个项目,通过传入参数offset
和size
来获取更多
- 获取信息
获取单个项目的较为详细的信息.
res = client.get_project('project1')
res.log_print()
- 创建
res = client.create_project("project1", "a simple project")
res.log_print()
- 删除
res = client.delete_project("project1")
res.log_print()
注意: 只能删除空的项目.
- 复制
复制一个项目的所有日志库和相应的配置(包括机器组合索引等), 要求目标项目不存在.
res = client.copy_project("project1", "project2")
res.log_print()
管理日志库(logstore)¶
日志库属于某一个项目, 所有的操作都需要传入项目名称.
- 获取列表
获取一个项目下的所有日志库:
from aliyun.log import ListLogstoresRequest
request = ListLogstoresRequest('project1')
res = client.list_logstores(request)
res.log_print()
- 创建
创建一个日志库:
res = client.create_logstore('project1', 'logstore1', ttl=30, shard_count=3)
res.log_print()
注意:
参数ttl
和shard_count
表示日志存储日期和分区数量.
- 获取信息
获取单个日志库较为详细的信息.
res = client.get_logstore('project1', 'logstore1')
res.log_print()
- 删除
通过delete_logstore
删除日志库
- 更新
通过update_logstore
删除日志库
管理日志库分区(shard)¶
分区属于某一个日志库, 所有的操作都需要传入项目名称和日志库名称.
- 获取列表
通过list_shards
获取列表
- 分裂
通过split_shard
分裂分区
- 合并
通过merge_shard
合并分区
管理日志库Logtail配置¶
Logtail的配置拥有独立的名字, 但其与日志库(logstore)一般是一一对应的关系.
获取列表
列出本项目下所有Logtail的配置名单:
res = client.list_logtail_config('project1')
res.log_print()
注意:
默认获取100个配置项,通过传入参数offset
和size
来获取更多
输出:
{"count": 2, "configs": ["config_name1", "config_name2"], "total": 2}
- 创建
创建一个Logtail配置, 并关联到日志库上:
from aliyun.log import LogtailConfigGenerator as helper
import json
config_detail_json = """{
"configName": "json_1",
"inputDetail": {
"logType": "json_log",
"filePattern": "test.log",
"logPath": "/json_1"
},
"inputType": "file",
"outputDetail": {
"logstoreName": "logstore3"
}
}"""
request = helper.generate_config(json.loads(config_detail))
res = client.create_logtail_config('project1', request)
res.log_print()
注意: -
创建的配置的名字configName
和关联的日志库名字logstoreName
都是放在传入的request
中.
-
不同类型的Logtail配置参数不一样,可以参考这篇文章了解更多业务逻辑。
-
更多的JSON样例,请参考这里。
- 创建的Logtail的配置还没有应用到任何一个机器组,
需要调用后面的APIapply_config_to_machine_group
来进行配置.
- 获取信息
获取Logtail配置的具体信息:
res = client.get_logtail_config('project1', 'config1')
res.log_print()
- 修改
通过update_logtail_config
来修改Logtail配置.
- 删除
通过delete_logtail_config
来删除Logtail配置.
管理机器组¶
机器组(MachineGroup)主要是用于应用Logtail配置的. 其与Logtail配置的关系是多对多的关系. 一个Logtail配置可以应用到多个机器组上, 放置一个机器组也可以应用多个Logtail配置.
- 获取列表
列出本项目下所有机器组的名单:
res = client.list_machine_group('project1')
res.log_print()
注意:
默认获取100个机器组,通过传入参数offset
和size
来获取更多
输出:
{"count": 2, "machinegroups": ["group_name1", "group_name2"], "total": 2}
- 创建
创建一个机器组:
from aliyun.log import MachineGroupDetail
config_detail_json = {
"group_name": "group_name1",
"machine_list": [
"machine1",
"machine2"
],
"machine_type": "userdefined",
"group_type": "Armory",
"group_attribute": {
"externalName": "ex name",
"groupTopic": "topic x"
}
}
request = MachineGroupDetail()
request.from_json(config_detail_json)
res = client.create_machine_group('project1', request)
res.log_print()
注意: -
创建的机器组的名字group_name
是放在传入的request
中. -
创建的机器组还没有应用到任何一个Logtail配置,
需要调用后面的APIapply_config_to_machine_group
来进行配置.
- 获取信息
获取机器组的具体信息:
res = client.get_machine_group('project1', 'group1')
res.log_print()
- 修改
通过update_logtail_config
来修改Logtail配置.
- 删除
通过delete_logtail_config
来删除Logtail配置.
关联Logtail配置到机器组¶
机器组与Logtail配置的关系是多对多的关系. 一个Logtail配置可以应用到多个机器组上, 反之一个机器组也可以应用多个Logtail配置.
- 应用Logtail配置到特定机器组
res = client.apply_config_to_machine_group('project1', 'config1', 'group1')
res.log_print()
- 去除机器组的Logtail配置
res = client.remove_config_to_machine_group('project1', 'config1', 'group1')
res.log_print()
- 获取Logtail配置应用到的机器组名单
res = client.get_config_applied_machine_groups('project1', 'config1')
res.log_print()
输出:
{"count": 2, "machinegroups": ["group1", "group2"]}
- 获取机器组应用的Logtail配置名单
res = client.get_machine_group_applied_configs('project1', 'group1')
res.log_print()
输出:
{"count": 2, "configs": ["config1", "config2"]}
日志库索引管理¶
只有配置了索引的日志库才能使用SQL查询日志.
- 创建
给一个日志库创建索引
from aliyun.log import IndexConfig
request_json = {
"keys": {
"f1": {
"caseSensitive": False,
"token": [
",", " ", "\"", "\"", ";", "=", "(", ")", "[", "]",
"{", "}", "?", "@", "&", "<", ">", "/", ":", "\n", "\t"
],
"type": "text",
"doc_value": True
},
"f2": {
"doc_value": True,
"type": "long"
}
},
"storage": "pg",
"ttl": 2,
"index_mode": "v2",
"line": {
"caseSensitive": False,
"token": [
",", " ", "\"", "\"", ";", "=", "(", ")", "[", "]", "{",
"}", "?", "@", "&", "<", ">", "/", ":", "\n", "\t"
]
}
}
request = IndexConfig()
request.from_json(request_json)
res = client.create_index('project1', 'logstore1', request)
res.log_print()
更多索引样例,可以参考这里,注意,使用SDK时,需要将true/false
改成Python对应的True/False
。
- 修改
通过update_index
修改日志库的索引
- 获取
通过get_index_config
获得日志库的索引配置
- 删除
通过delete_index
删除日志库的索引
其他操作¶
- 获取日志库主题列表
from aliyun.log import ListTopicsRequest
request = ListTopicsRequest('project1', 'logstore1')
res = client.list_topic(request)
res.log_print()
日志消费¶
有三种方式消费日志:
- 拉取数据(PullLog): 根据分区游标来消费日志: 需要指定分区(Shard)以及游标.
- 日志库查询数据(GetLog): 通过索引查询来消费特定日志库日志: 需要指定日志库与日志时间以及(或)查询条件.
- 项目统计查询数据(GetProjectLog): 通过索引查询来消费整个项目组日志: 需要指定日志时间以及(或)统计查询查询条件.
- 实时消费(Consumer Group): 第一种的高级方式, 通过服务器支持的消费组, 来并发可靠的快速拉取日志.
游标操作¶
拉取数据需要传入游标和分区,
获取分区可以参考前面的管理日志库分区(shard)
, 这里介绍游标操作.
- 获取开头游标
获取日志库特定分区的最开头的游标.
res = client.get_begin_cursor('project1', 'logstore1', shard_id=0)
print(res.get_cursor())
- 获取结尾游标
获取日志库特定分区的结尾的游标.
res = client.get_end_cursor('project1', 'logstore1', shard_id=0)
print(res.get_cursor())
- 获取特定时间的游标
可以特定日志库分区的特定接受时间最接近的一个游标.
res = client.get_cursor('project1', 'logstore1', shard_id=0, start_time="2018-1-1 10:10:10")
print(res.get_cursor())
- 这里的
start_time
指的是服务器接受日志的时间. 也可以是begin
或者end
- 获取游标时间
获得特定日志库分区的某个游标说对应的服务器时间, 如果是结尾游标, 一般对应于服务器的的当前时间.
res = client.get_begin_cursor('project1', 'logstore1', shard_id=0)
res = client.get_cursor_time('project1', 'logstore1', shard_id=0, cursor=res.get_cursor())
print(res.get_cursor_time())
- 获取游标时间
获得特定日志库分区的某个游标的上一个游标所对应的服务器时间, 如果是开头游标, 则对应于服务器的的开头游标的时间.
res = client.get_end_cursor('project1', 'logstore1', shard_id=0)
res = client.get_previous_cursor_time('project1', 'logstore1', shard_id=0, cursor=res.get_cursor())
print(res.get_cursor_time())
拉取(Pull)数据¶
根据游标获取数据, 需要传入分区.
下面例子消费分区0
一个小时前收集到的数据.
from time import time
res = client.get_cursor('project1', 'logstore1', shard_id=0, start_time=int(time())-3600)
res = client.pull_logs('project1', 'logstore1', shard_id=0, cursor=res.get_cursor())
res.log_print()
# 或者
it = client.pull_log('project1', 'logstore1', shard_id=0, from_time="2018-1-1 10:10:10", to_time="2018-1-1 10:20:10")
for res in it:
res.log_print()
# 或者大并发直接下载在本地
it = client.pull_log('project1', 'logstore1', from_time="2018-1-1 10:10:10", to_time="2018-1-1 10:20:10", file_path="/data/dump_{}.data")
for res in it:
res.log_print()
注意: 默认获取1000条, 可以通过参数count
来调节.
也可以通过参数end_cursor
来设定设定一个结束的游标.
获取(Get)日志库数据¶
消费特定日志库, 根据索引获取数据, 需要传入时间范围, 也可以传入查询语句. 下面的例子查询时间是过去一小时特定日志库的前100条日志.
from time import time
from aliyun.log import GetLogsRequest
request = GetLogsRequest("project1", "logstore1", fromTime=int(time()-3600), toTime=int(time()), topic='', query="*", line=100, offset=0, reverse=False)
# 或者
request = GetLogsRequest("project1", "logstore1", fromTime="2018-1-1 10:10:10", toTime="2018-1-1 10:20:10", topic='', query="*", line=100, offset=0, reverse=False)
res = client.get_logs(request)
res.log_print()
也可以通过接口get_log
来获取
from time import time
res = client.get_log("project1", "logstore1", from_time=int(time()-3600), to_time=int(time()), size=-1)
# 或者
res = client.get_log("project1", "logstore1", from_time="2018-1-1 10:10:10", to_time="2018-1-1 10:20:10", size=-1)
res.log_print()
这里传入size=-1
可以获取所有.
获取数据分布图¶
通过get_histograms
来根据索引获取数据特定日志时间范围内的分布图.
获取(Get)项目组数据¶
跨日志库的对整个项目组查询, 根据索引获取数据, 需要传入时间范围和查询语句. 注意: 因为跨项目组传输数据可能非常大从而影响实际的性能, 推荐使用统计查询SQL来降低传输数据量.
下面的例子查询时间是一段时间内的项目组的几个日志库的日志数量.
from aliyun.log import GetProjectLogsRequest
req = GetProjectLogsRequest("project1","select count(1) from logstore1, logstore2, logstore3 where __date__ >'2017-11-10 00:00:00' and __date__ < '2017-11-13 00:00:00'")
res = client.get_project_logs(req)
res.log_print();
注意:注意SQL中使用的字段需要在索引中配置好,例如上面例子中的__date__
实时消费¶
通过消费组(Consumer Group)可以获得可保障的自动扩展的日志消费服务.
高级接口¶
可以参考这这几篇实战文章与样例。 - 日志服务与SIEM(如Splunk)集成方案 - 使用消费组实时分发数据 - 使用消费组实时实时跨域监测多日志库数据 - 相关样例代码
基础接口¶
高级接口已经对基础接口进行了封装. 个别情况下也可以通过基础接口进行一些特定的操作.
- 获取列表
通过list_consumer_group
h获得当前消费组列表.
- 创建
通过create_consumer_group
创建一个消费组.
- 更新
通过update_consumer_group
更新一个消费组, 例如延迟和消费顺序等.
- 删除
通过delete_consumer_group
删除一个消费组.
- 获取消费进度
可以通过get_check_point
获得消费组的消费检查点(Checkpoint),
来了解消费进度信息
- 更新消费进度
消费者需要通过update_check_point
来存储和更新消费检查点(Checkpoint)
报表管理¶
管理报表
- 获取列表
通过list_dashboard
获取报表的列表
- 创建报表
通过create_dashboard
创建一个报表.
传入的结构是一个字典对象,如下:
{
"charts": [
{
"display": {
"displayName": "",
"height": 5,
"width": 5,
"xAxis": [
"province"
],
"xPos": 0,
"yAxis": [
"pv"
],
"yPos": 0
},
"search": {
"end": "now",
"logstore": "access-log",
"query": "method: GET | select ip_to_province(remote_addr) as province , count(1) as pv group by province order by pv desc ",
"start": "-86400s",
"topic": ""
},
"title": "map",
"type": "map"
},
{
"display": {
"displayName": "",
"height": 5,
"width": 5,
"xAxis": [
"province"
],
"xPos": 5,
"yAxis": [
"pv"
],
"yPos": 0
},
"search": {
"end": "now",
"logstore": "access-log",
"query": "method: POST | select ip_to_province(remote_addr) as province , count(1) as pv group by province order by pv desc ",
"start": "-86400s",
"topic": ""
},
"title": "post_map",
"type": "map"
}
],
"dashboardName": 'dashboard_1',
"description": ""
}
- 获取报表
通过get_dashboard
获取一个报表的具体信息.
- 更新报表
通过update_dashboard
更新一个报表,传入的结构与创建一样。
- 删除报表
通过delete_dashboard
删除一个报表.
报警管理¶
管理报警
- 获取报警
通过list_alert
获取报警的列表
- 创建报警
通过create_alert
创建一个报警. 传入的结构是一个字典对象,如下:
{
"name": 'alert_1',
"displayName": "Alert for testing",
"description": "",
"type": "Alert",
"state": "Enabled",
"schedule": {
"type": "FixedRate",
"interval": "5m",
},
"configuration": {
"condition": "total >= 100",
"dashboard": "dashboard-test",
"queryList": [
{
"logStore": "test-logstore",
"start": "-120s",
"end": "now",
"timeSpanType": "Custom",
"chartTitle": "chart-test",
"query": "* | select count(1) as total",
}
],
"notificationList": [
{
"type": "DingTalk",
"serviceUri": "http://xxxx",
"content": "Message",
},
{
"type": "MessageCenter",
"content": "Message",
},
{
"type": "Email",
"emailList": ["abc@test.com"],
"content": "Email Message",
},
{
"type": "SMS",
"mobileList": ["132373830xx"],
"content": "Cellphone message"
}
],
"muteUntil": 1544582517,
"notifyThreshold": 1,
"throttling": "5m",
}
}
- 获取报警
通过get_alert
获取一个报警的具体信息.
- 更新报警
通过update_alert
更新一个报警,传入的结构与创建一样。
- 删除报警
通过delete_alert
删除一个报警.
快速查询管理¶
管理快速查询
- 获取快速查询
通过list_savedsearch
获取快速查询的列表
- 创建快速查询
通过create_savedsearch
创建一个快速查询.
传入的结构是一个字典对象,如下:
{
"logstore": "test2",
"savedsearchName": 'search_1',
"searchQuery": "boy | select sex, count() as Count group by sex",
"topic": ""
}
- 获取快速查询
通过get_savedsearch
获取一个快速查询的具体信息.
- 更新快速查询
通过update_savedsearch
更新一个快速查询,传入的结构与创建一样。
- 删除快速查询
通过delete_savedsearch
删除一个快速查询.
外部存储管理¶
管理外部存储, 参考文章
- 获取外部存储列表
通过list_external_store
获取快速查询的列表
- 创建外部存储
通过create_external_store
创建一个快速查询.
传入的结构是一个字典对象,如下:
{
"externalStoreName": "rds_store4",
"storeType": "rds-vpc",
"parameter": {
"vpc-id": "vpc-m5eq4irc1pucpk85frr5j",
"instance-id": "i-m5eeo2whsnfg4kzq54ah",
"host": "1.2.3.4",
"port": "3306",
"username": "root",
"password": "123",
"db": "meta",
"table": "join_meta",
"region": "cn-qingdao"
}
}
- 获取外部存储
通过get_external_store
获取一个快速查询的具体信息.
- 更新外部存储
通过update_external_store
更新一个快速查询,传入的结构与创建一样。
- 删除外部存储
通过delete_external_store
删除一个快速查询.
投递管理¶
投递的配置一般称为Job, 包含了投递的具体配置以及调度日程安排. 而某一个具体时间的运行实例称为Task.
- 获取配置列表
通过list_shipper
获取投递配置的列表
- 创建配置
通过create_shipper
创建一个投递配置.
可以传入一个json字符串(如下)或者对应的dict对象(如get_shipper获得的对象结果),如下:
OSS以CSV格式投递:
{
"shipperName": "oss-ship-test1",
"targetConfiguration": {
"bufferInterval": 300,
"bufferSize": 256,
"compressType": "none",
"enable": true,
"ossBucket": "bucket1",
"ossPrefix": "bucket_folder1",
"pathFormat": "%Y/%m/%d/%H/%M",
"roleArn": "acs:ram::1234:role/aliyunlogdefaultrole",
"storage": {
"detail": {
"columns": [
"k3",
"k4"
],
"delimiter": "|",
"header": false,
"nullIdentifier": "",
"quote": "\""
},
"format": "csv"
}
},
"targetType": "oss"
}
OSS以JSON格式投递(注意storage.detail.columns为空是必须的,这个和get_shipper有所差别):
{
"shipperName": "oss-ship-test1",
"targetConfiguration": {
"bufferInterval": 300,
"bufferSize": 256,
"compressType": "snappy",
"enable": true,
"ossBucket": "bucket1",
"ossPrefix": "bucket_folder1",
"pathFormat": "%Y/%m/%d/%H/%M",
"roleArn": "acs:ram::1234:role/aliyunlogdefaultrole",
"storage": {
"detail": {
"columns": [
]
},
"format": "json"
}
},
"targetType": "oss"
}
- 获取配置
通过get_shipper_config
获取一个投递配置的具体信息.
- 更新配置
通过update_shipper
更新一个投递配置.
- 删除配置
通过delete_shipper
删除一个投递配置.
- 获取运行实例列表
通过get_shipper_tasks
获取投递运行实例.
- 重试运行实例
通过retry_shipper_tasks
重试某一个运行实例.
Elasticsearch 数据迁移¶
用于将 Elasticsearch 中的数据迁移至日志服务。
将 hosts 为
localhost:9200
的 Elasticsearch 中的所有文档导入日志服务的项目project1
中。migration_manager = MigrationManager(hosts="localhost:9200", endpoint=endpoint, project_name="project1", access_key_id=access_key_id, access_key=access_key) migration_manager.migrate()
指定将 Elasticsearch 中索引名以
myindex_
开头的数据写入日志库logstore1
,将索引index1,index2
中的数据写入日志库logstore2
中。migration_manager = MigrationManager(hosts="localhost:9200,other_host:9200", endpoint=endpoint, project_name="project1", access_key_id=access_key_id, access_key=access_key, logstore_index_mappings='{"logstore1": "myindex_*", "logstore2": "index1,index2"}}') migration_manager.migrate()
使用参数 query 指定从 Elasticsearch 中抓取
title
字段等于python
的文档,并使用文档中的字段date1
作为日志的 time 字段。migration_manager = MigrationManager(hosts="localhost:9200", endpoint=endpoint, project_name="project1", access_key_id=access_key_id, access_key=access_key, query='{"query": {"match": {"title": "python"}}}', time_reference="date1") migration_manager.migrate()
最佳实践¶
其他资源:¶
- 日志服务产品介绍:http://www.aliyun.com/product/sls/
- 日志服务产品文档:https://help.aliyun.com/product/28958.html
- 其他问题请提工单
最佳实践¶
日志服务与SIEM(如Splunk)集成方案¶
背景信息¶
目标¶
本文主要介绍如何让阿里云日志服务与您的SIEM方案(如Splunk)对接, 以便确保阿里云上的所有法规、审计、与其他相关日志能够导入到您的安全运维中心(SOC)中。
名词解释¶
LOG(SLS) - 阿里云日志服务,简写SLS表示(Simple Log Service)。 SIEM - 安全信息与事件管理系统(Security Information and Event Management),如Splunk, QRadar等。 Splunk HEC - Splunk的Http事件接收器(Splunk Http Event Collector), 一个 HTTP(s)接口,用于接收日志。
审计相关日志¶
安全运维团队一般对阿里云相关的审计日志感兴趣,如下列出所有存在于所有目前在日志服务中可用的相关日志(但不限于):
* Regions化 - 时刻更新,请以最新的产品文档为准。
集成方案建议¶
概念¶
项目(Project) 项目(Project)是日志服务中的资源管理单元,用于资源隔离和控制。您可以通过项目来管理某一个应用的所有日志及相关的日志源。它管理着用户的所有日志库(Logstore),采集日志的机器配置等信息,同时它也是用户访问日志服务资源的入口。
日志库(Logstore) 日志库(Logstore)是日志服务中日志数据的收集、存储和查询单元。每个日志库隶属于一个项目,且每个项目可以创建多个日志库。
分区(Shard) 每个日志库分若干个分区(Shard),每个分区由MD5左闭右开区间组成,每个区间范围不会相互覆盖,并且所有的区间的范围是MD5整个取值范围。
服务入口(Endpoint) 日志服务入口是访问一个项目(Project)及其内部日志数据的 URL。它和 Project 所在的阿里云区域(Region)及 Project 名称相关。 https://help.aliyun.com/document_detail/29008.html
访问秘钥(AccessKey) 阿里云访问秘钥是阿里云为用户使用 API(非控制台)来访问其云资源设计的“安全口令”。您可以用它来签名 API 请求内容以通过服务端的安全验证。 https://help.aliyun.com/document_detail/29009.html
假设¶
这里假设您的SIEM(如Splunk)位于组织内部环境(on-premise)中,而不是云端。为了安全考虑,没有任何端口开放让外界环境来访问此SIEM。
使用消费组编程¶
协同消费库(Consumer Library)是对日志服务中日志进行消费的高级模式,提供了消费组(ConsumerGroup)的概念对消费端进行抽象和管理,和直接使用SDK进行数据读取的区别在于,用户无需关心日志服务的实现细节,只需要专注于业务逻辑,另外,消费者之间的负载均衡、failover等用户也都无需关心。
Spark Streaming、Storm 以及Flink Connector都以Consumer Library作为基础实现。
基本概念¶
消费组(Consumer Group) - 一个消费组由多个消费者构成,同一个消费组下面的消费者共同消费一个logstore中的数据,消费者之间不会重复消费数据。 消费者(Consumer) - 消费组的构成单元,实际承担消费任务,同一个消费组下面的消费者名称必须不同。
在日志服务中,一个logstore下面会有多个shard,协同消费库的功能就是将shard分配给一个消费组下面的消费者,分配方式遵循以下原则: - 每个shard只会分配到一个消费者。 - 一个消费者可以同时拥有多个shard。 新的消费者加入一个消费组,这个消费组下面的shard从属关系会调整,以达到消费负载均衡的目的,但是上面的分配原则不会变,分配过程对用户透明。
协同消费库的另一个功能是保存checkpoint,方便程序故障恢复时能接着从断点继续消费,从而保证数据不会被重复消费。
部署建议¶
硬件建议¶
硬件参数: 需要一台机器运行程序,安装一个Linux(如Ubuntu x64),推荐硬件参数如下: - 2.0+ GHZ X 8核 - 16GB 内存,推荐32GB - 1 Gbps网卡 - 至少2GB可用磁盘空间,建议10GB以上
网络参数: 从组织内的环境到阿里云的带宽应该大于数据在阿里云端产生的速度,否则日志无法实时消费。假设数据产生一般速度均匀,峰值在2倍左右,每天100TB原始日志。5倍压缩的场景下,推荐带宽应该在4MB/s(32Mbps)左右。
使用(Python)¶
这里我们描述用Python使用消费组进行编程。对于Java语言用法,可以参考这篇文章.
注意:本篇文章的代码可能会更新,最新版本在这里可以找到:Github样例.
安装¶
环境 1. 强烈推荐PyPy3来运行本程序,而不是使用标准CPython解释器。 2. 日志服务的Python SDK可以如下安装:
pypy3 -m pip install aliyun-log-python-sdk -U
更多SLS Python SDK的使用手册,可以参考这里
程序配置¶
如下展示如何配置程序: 1. 配置程序日志文件,以便后续测试或者诊断可能的问题。 2. 基本的日志服务连接与消费组的配置选项。 3. 消费组的一些高级选项(性能调参,不推荐修改)。 4. SIEM(Splunk)的相关参数与选项。
请仔细阅读代码中相关注释并根据需要调整选项:
#encoding: utf8
import os
import logging
from logging.handlers import RotatingFileHandler
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())
logger = logging.getLogger(__name__)
def get_option():
##########################
# 基本选项
##########################
# 从环境变量中加载SLS参数与选项
endpoint = os.environ.get('SLS_ENDPOINT', '')
accessKeyId = os.environ.get('SLS_AK_ID', '')
accessKey = os.environ.get('SLS_AK_KEY', '')
project = os.environ.get('SLS_PROJECT', '')
logstore = os.environ.get('SLS_LOGSTORE', '')
consumer_group = os.environ.get('SLS_CG', '')
# 消费的起点。这个参数在第一次跑程序的时候有效,后续再次运行将从上一次消费的保存点继续。
# 可以使”begin“,”end“,或者特定的ISO时间格式。
cursor_start_time = "2018-12-26 0:0:0"
##########################
# 一些高级选项
##########################
# 一般不要修改消费者名,尤其是需要并发跑时
consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)
# 心跳时长,当服务器在2倍时间内没有收到特定Shard的心跳报告时,服务器会认为对应消费者离线并重新调配任务。
# 所以当网络不是特别好的时候,不要调整的特别小。
heartbeat_interval = 20
# 消费数据的最大间隔,如果数据生成的速度很快,并不需要调整这个参数。
data_fetch_interval = 1
# 构建一个消费组和消费者
option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
cursor_start_time=cursor_start_time,
heartbeat_interval=heartbeat_interval,
data_fetch_interval=data_fetch_interval)
# Splunk选项
settings = {
"host": "10.1.2.3",
"port": 80,
"token": "a023nsdu123123123",
'https': False, # 可选, bool
'timeout': 120, # 可选, int
'ssl_verify': True, # 可选, bool
"sourcetype": "", # 可选, sourcetype
"index": "", # 可选, index
"source": "", # 可选, source
}
return option, settings
数据消费与转发¶
如下代码展示如何从SLS拿到数据后转发给Splunk。
from aliyun.log.consumer import *
from aliyun.log.pulllog_response import PullLogResponse
from multiprocessing import current_process
import time
import json
import socket
import requests
class SyncData(ConsumerProcessorBase):
"""
这个消费者从SLS消费数据并发送给Splunk
"""
def __init__(self, splunk_setting):
"""初始化并验证Splunk连通性"""
super(SyncData, self).__init__()
assert splunk_setting, ValueError("You need to configure settings of remote target")
assert isinstance(splunk_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")
self.option = splunk_setting
self.timeout = self.option.get("timeout", 120)
# 测试Splunk连通性
s = socket.socket()
s.settimeout(self.timeout)
s.connect((self.option["host"], self.option['port']))
self.r = requests.session()
self.r.max_redirects = 1
self.r.verify = self.option.get("ssl_verify", True)
self.r.headers['Authorization'] = "Splunk {}".format(self.option['token'])
self.url = "{0}://{1}:{2}/services/collector/event".format("http" if not self.option.get('https') else "https", self.option['host'], self.option['port'])
self.default_fields = {}
if self.option.get("sourcetype"):
self.default_fields['sourcetype'] = self.option.get("sourcetype")
if self.option.get("source"):
self.default_fields['source'] = self.option.get("source")
if self.option.get("index"):
self.default_fields['index'] = self.option.get("index")
def process(self, log_groups, check_point_tracker):
logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
for log in logs:
# 发送数据到Splunk
# 如下代码只是一个样例(注意:所有字符串都是unicode)
# Python2: {u"__time__": u"12312312", u"__topic__": u"topic", u"field1": u"value1", u"field2": u"value2"}
# Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
event = {}
event.update(self.default_fields)
if log.get(u"__topic__") == 'audit_log':
# suppose we only care about audit log
event['time'] = log[u'__time__']
event['fields'] = {}
del log['__time__']
event['fields'].update(log)
data = json.dumps(event, sort_keys=True)
try:
req = self.r.post(self.url, data=data, timeout=self.timeout)
req.raise_for_status()
except Exception as err:
logger.debug("Failed to connect to remote Splunk server ({0}). Exception: {1}", self.url, err)
# TODO: 根据需要,添加一些重试或者报告的逻辑
logger.info("Complete send data to remote")
self.save_checkpoint(check_point_tracker)
主逻辑¶
如下代码展示主程序控制逻辑:
def main():
option, settings = get_monitor_option()
logger.info("*** start to consume data...")
worker = ConsumerWorker(SyncData, option, args=(settings,) )
worker.start(join=True)
if __name__ == '__main__':
main()
启动¶
假设程序命名为”sync_data.py”,可以如下启动:
export SLS_ENDPOINT=<Endpoint of your region>
export SLS_AK_ID=<YOUR AK ID>
export SLS_AK_KEY=<YOUR AK KEY>
export SLS_PROJECT=<SLS Project Name>
export SLS_LOGSTORE=<SLS Logstore Name>
export SLS_CG=<消费组名,可以简单命名为"syc_data">
pypy3 sync_data.py
限制与约束
每一个日志库(logstore)最多可以配置10个消费组,如果遇到错误ConsumerGroupQuotaExceed
则表示遇到限制,建议在控制台端删除一些不用的消费组。
性能考虑¶
启动多个消费者¶
基于消费组的程序可以直接启动多次以便达到并发作用:
nohup pypy3 sync_data.py &
nohup pypy3 sync_data.py &
nohup pypy3 sync_data.py &
...
注意: 所有消费者使用了同一个消费组的名字和不同的消费者名字(因为消费者名以进程ID为后缀)。 因为一个分区(Shard)只能被一个消费者消费,假设一个日志库有10个分区,那么最多有10个消费者同时消费。
Https¶
如果服务入口(endpoint)配置为https://
前缀,如https://cn-beijing.log.aliyuncs.com
,程序与SLS的连接将自动使用HTTPS加密。
服务器证书*.aliyuncs.com
是GlobalSign签发,默认大多数Linux/Windows的机器会自动信任此证书。如果某些特殊情况,机器不信任此证书,可以参考这里下载并安装此证书。
性能吞吐¶
基于测试,在没有带宽限制、接收端速率限制(如Splunk端)的情况下,以推进硬件用pypy3
运行上述样例,单个消费者占用大约10%的单核CPU
下可以消费达到5 MB/s
原始日志的速率。因此,理论上可以达到50 MB/s
原始日志每个CPU核
,也就是每个CPU核每天可以消费4TB原始日志
。
注意: 这个数据依赖带宽、硬件参数和SIEM接收端(如Splunk)是否能够较快接收数据。
高可用性¶
消费组会将检测点(check-point)保存在服务器端,当一个消费者停止,另外一个消费者将自动接管并从断点继续消费。
可以在不同机器上启动消费者,这样当一台机器停止或者损坏的清下,其他机器上的消费者可以自动接管并从断点进行消费。
理论上,为了备用,也可以启动大于shard数量的消费者。
更多案例¶
- 日志服务Python消费组实战(一):日志服务与SIEM(如Splunk)集成实战
- 日志服务Python消费组实战(二):实时日志分发
- 日志服务Python消费组实战(三):实时跨域监测多日志库数据
- 日志服务Python消费组实战(三):日志服务与SIEM(集成实战(二):syslog篇
- 本文Github样例
日志服务与SIEM(如IBM QRadar)集成方案:syslog方式¶
背景信息¶
背景¶
在日志服务与SIEM(如Splunk)集成方案(一)中,我们介绍了如何使用消费组的技术,实现稳定、高性能与可扩展的数据传输,并使用了SIEM的接口(例如Splunk的HEC)来对接。
在现实中,syslog也是一个常见的日志通道,大部分物理设备、交换机路由器以及服务器等,都支持通过syslog来发送日志,因此几乎所有的SIEM(如IBM Qradar
,
HP Arcsight
等)也支持syslog
渠道接受日志。
本文将重点介绍如何使用syslog
与SIEM(如IBM Qradar
,
HP Arcsight
等)对接,确保传输的性能与可靠性,以便确保阿里云上的所有法规、审计、与其他相关日志能够导入到您的安全运维中心(SOC)中。
概念¶
syslog主要是标准协议RFC5424和RFC3164定义了相关格式规范,协议RFC5424
(2009年发布)是升级版本,并废弃了``RFC3164``(2001年发布)。因为新版兼容旧版,且新版本解决了很多问题,因此推荐使用``RFC5424``协议。
syslog over TCP/TLS:syslog只是规定了日志格式,理论上TCP和UDP都可以支持syslog,可以较好的确保数据传输稳定性。协议RFC5425也定义了TLS的安全传输层,如果您的SIEM支持TCP通道,甚至TLS,那么建议优先使用。
syslog
facility:早期Unix定义的程序组件,一般如下定义,这里我们可以选择user
作为默认组件。
| Facility | 代码 | 关键字 | 描述 | | — | — | — | — |
| 0 | kern | Kernel | messages | | 1 | user | User-level |
messages | | 2 | mail | Mail | system | | 3 | daemon | System
| daemons | | 4 | auth | Security/authentication | messages | |
5 | syslog | Messages | generated | internally | by | syslogd |
| 6 | lpr | Line | printer | subsystem | | 7 | news | Network
| news | subsystem | | 8 | uucp | UUCP | subsystem | | 9 |
cron | Clock | daemon | | 10 | authpriv | Security/authentication
| messages | | 11 | ftp | FTP | daemon | | 12 | ntp | NTP |
subsystem | | 13 | security | Log | audit | | 14 | console |
Log | alert | | 15 | solaris-cron | Scheduling | daemon | |
16–23 | local0 | – | local7 | Locally | used | facilities |
syslog
severity:定义了日志的级别,可以根据需要设置特定内容的日志为较高的级别。默认一般用info
。
|值|严重度|关键字|描述 | | — | — | — | — |
|0|Emergency|emerg|System is unusable| |1|Alert|alert|A
condition that should be corrected immediately, such as a corrupted
system database.| |2|Critical|crit|Critical conditions|
|3|Error|err|error|Error conditions|
|4|Warning|warning|warn|Warning conditions|
|5|Notice|notice|Conditions that are not error conditions, but that
may require special handling.| |6|Informational|info|Informational
messages| |7|Debug|debug|Messages that contain information normally
of use only when debugging a program.|
集成方案建议¶
假设¶
这里假设您的SIEM(如IBM QRadar)位于组织内部环境(on-premise)中,而不是云端。为了安全考虑,没有任何端口开放让外界环境来访问此SIEM。
概览¶
推荐使用SLS消费组构建程序来从SLS进行实时消费,然后通过syslog over TCP/TLS
来发送日志给SIEM。
关于消费组的相关概念、以及程序部署相关的环境需求等。可以直接参考日志服务与SIEM(如Splunk)集成方案(一)。
本文着重介绍与SIEM通讯的syslog
部分。如果您的SIEM支持TCP通道,甚至TLS,那么建议优先使用。

image
使用消费组编程¶
如下展示如何配置程序: 1. 配置程序日志文件,以便后续测试或者诊断可能的问题。 2. 基本的日志服务连接与消费组的配置选项。 3. 消费组的一些高级选项(性能调参,不推荐修改)。 4. SIEM的syslog server相关参数与选项。
请仔细阅读代码中相关注释并根据需要调整选项:
#encoding: utf8
import os
import logging
from logging.handlers import RotatingFileHandler
root = logging.getLogger()
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5)
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
root.setLevel(logging.INFO)
root.addHandler(handler)
root.addHandler(logging.StreamHandler())
logger = logging.getLogger(__name__)
def get_option():
##########################
# 基本选项
##########################
# 从环境变量中加载SLS参数与选项
endpoint = os.environ.get('SLS_ENDPOINT', '')
accessKeyId = os.environ.get('SLS_AK_ID', '')
accessKey = os.environ.get('SLS_AK_KEY', '')
project = os.environ.get('SLS_PROJECT', '')
logstore = os.environ.get('SLS_LOGSTORE', '')
consumer_group = os.environ.get('SLS_CG', '')
# 消费的起点。这个参数在第一次跑程序的时候有效,后续再次运行将从上一次消费的保存点继续。
# 可以使”begin“,”end“,或者特定的ISO时间格式。
cursor_start_time = "2018-12-26 0:0:0"
##########################
# 一些高级选项
##########################
# 一般不要修改消费者名,尤其是需要并发跑时
consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)
# 心跳时长,当服务器在2倍时间内没有收到特定Shard的心跳报告时,服务器会认为对应消费者离线并重新调配任务。
# 所以当网络不是特别好的时候,不要调整的特别小。
heartbeat_interval = 20
# 消费数据的最大间隔,如果数据生成的速度很快,并不需要调整这个参数。
data_fetch_interval = 1
# 构建一个消费组和消费者
option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name,
cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
cursor_start_time=cursor_start_time,
heartbeat_interval=heartbeat_interval,
data_fetch_interval=data_fetch_interval)
# syslog options
settings = {
"host": "1.2.3.4", # 必选
"port": 514, # 必选, 端口
"protocol": "tcp", # 必选, tcp, udp或 tls (仅Python3)
"sep": "||", # 必选, key=value键值对的分隔符,这里用||分隔
"cert_path": None, # 可选, TLS的证书位置
"timeout": 120, # 可选, 超时时间,默认120秒
"facility": syslogclient.FAC_USER, # 可选, 可以参考其他syslogclient.FAC_*的值
"severity": syslogclient.SEV_INFO, # 可选, 可以参考其他syslogclient.SEV_*的值
"hostname": None, # 可选,机器名,默认选择本机机器名
"tag": None # 可选,标签,默认是 -
}
return option, settings
如下代码展示如何从SLS拿到数据后转发给SIEM
syslog服务器。主要这里依赖了一个库syslogclient.py
,也在我们的样例代码库中可以找到,是一个标准syslog协议的实现版本。
请仔细阅读代码中相关注释并根据需要调整格式化方式:
from syslogclient import SyslogClientRFC5424 as SyslogClient
class SyncData(ConsumerProcessorBase):
"""
这个消费者从SLS消费数据并发送给syslog server
"""
def __init__(self, splunk_setting):
"""初始化并验证syslog server连通性"""
super(SyncData, self).__init__() # remember to call base's init
assert target_setting, ValueError("You need to configure settings of remote target")
assert isinstance(target_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.")
self.option = target_setting
self.protocol = self.option['protocol']
self.timeout = int(self.option.get('timeout', 120))
self.sep = self.option.get('sep', "||")
self.host = self.option["host"]
self.port = int(self.option.get('port', 514))
self.cert_path=self.option.get('cert_path', None)
# try connection
with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
pass
def process(self, log_groups, check_point_tracker):
logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs)))
try:
with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout, cert_path=self.cert_path) as client:
for log in logs:
# Put your sync code here to send to remote.
# the format of log is just a dict with example as below (Note, all strings are unicode):
# Python2: {"__time__": "12312312", "__topic__": "topic", u"field1": u"value1", u"field2": u"value2"}
# Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"}
# suppose we only care about audit log
timestamp = datetime.fromtimestamp(int(log[u'__time__']))
del log['__time__']
io = six.StringIO()
first = True
# TODO: 这里可以根据需要修改格式化内容,这里使用Key=Value传输,并使用默认的||进行分割
for k, v in six.iteritems(log):
io.write("{0}{1}={2}".format(self.sep, k, v))
data = io.getvalue()
# TODO:这里可以根据需要修改facility或者severity
client.log(data, facility=self.option.get("facility", None), severity=self.option.get("severity", None), timestamp=timestamp, program=self.option.get("tag", None), hostname=self.option.get("hostname", None))
except Exception as err:
logger.debug("Failed to connect to remote syslog server ({0}). Exception: {1}".format(self.option, err))
# TODO: 需要添加一些错误处理的代码,例如重试或者通知等
raise err
logger.info("Complete send data to remote")
self.save_checkpoint(check_point_tracker)
如下代码展示主程序控制逻辑:
def main():
option, settings = get_monitor_option()
logger.info("*** start to consume data...")
worker = ConsumerWorker(SyncData, option, args=(settings,) )
worker.start(join=True)
if __name__ == '__main__':
main()
假设程序命名为”sync_data.py”,需要把可以如下启动:
export SLS_ENDPOINT=<Endpoint of your region>
export SLS_AK_ID=<YOUR AK ID>
export SLS_AK_KEY=<YOUR AK KEY>
export SLS_PROJECT=<SLS Project Name>
export SLS_LOGSTORE=<SLS Logstore Name>
export SLS_CG=<消费组名,可以简单命名为"syc_data">
pypy3 sync_data.py
限制与约束
每一个日志库(logstore)最多可以配置10个消费组,如果遇到错误ConsumerGroupQuotaExceed
则表示遇到限制,建议在控制台端删除一些不用的消费组。
限制与约束
每一个日志库(logstore)最多可以配置10个消费组,如果遇到错误ConsumerGroupQuotaExceed
则表示遇到限制,建议在控制台端删除一些不用的消费组。
部署、管理、监控、性能、安全性相关¶
请参考日志服务与SIEM(如Splunk)集成方案(一)中相关内容。
更多案例¶
- 日志服务Python消费组实战(一):日志服务与SIEM(如Splunk)集成实战
- 日志服务Python消费组实战(二):实时日志分发
- 日志服务Python消费组实战(三):实时跨域监测多日志库数据
- 日志服务Python消费组实战(三):日志服务与SIEM(集成实战(二):syslog篇
- 本文Github样例:sync_data_to_syslog.py、syslogclient.py
使用消费组实时分发数据¶
场景目标¶
使用日志服务的Web-tracking、logtail(文件极简)、syslog等收集上来的日志经常存在各种各样的格式,我们需要针对特定的日志(例如topic)进行一定的分发到特定的logstore中处理和索引,本文主要介绍如何使用消费组实时分发日志到不通的目标日志库中。并且利用消费组的特定,达到自动平衡、负载均衡和高可用性。

image
基本概念¶
协同消费库(Consumer Library)是对日志服务中日志进行消费的高级模式,提供了消费组(ConsumerGroup)的概念对消费端进行抽象和管理,和直接使用SDK进行数据读取的区别在于,用户无需关心日志服务的实现细节,只需要专注于业务逻辑,另外,消费者之间的负载均衡、failover等用户也都无需关心。
消费组(Consumer Group) - 一个消费组由多个消费者构成,同一个消费组下面的消费者共同消费一个logstore中的数据,消费者之间不会重复消费数据。 消费者(Consumer) - 消费组的构成单元,实际承担消费任务,同一个消费组下面的消费者名称必须不同。
在日志服务中,一个logstore下面会有多个shard,协同消费库的功能就是将shard分配给一个消费组下面的消费者,分配方式遵循以下原则: - 每个shard只会分配到一个消费者。 - 一个消费者可以同时拥有多个shard。 新的消费者加入一个消费组,这个消费组下面的shard从属关系会调整,以达到消费负载均衡的目的,但是上面的分配原则不会变,分配过程对用户透明。
协同消费库的另一个功能是保存checkpoint,方便程序故障恢复时能接着从断点继续消费,从而保证数据不会被重复消费。
使用消费组进行实时分发¶
这里我们描述用Python使用消费组进行编程,实时根据数据的topic进行分发。 注意:本篇文章的相关代码可能会更新,最新版本在这里可以找到:Github样例.

image
安装¶
环境 1. 建议程序运行在源日志库同Region下的ECS上,并使用局域网服务入口,这样好处是网络速度最快,其次是读取没有外网费用产生。 2. 强烈推荐PyPy3来运行本程序,而不是使用标准CPython解释器。 3. 日志服务的Python SDK可以如下安装:
pypy3 -m pip install aliyun-log-python-sdk -U
更多SLS Python SDK的使用手册,可以参考这里
程序配置¶
如下展示如何配置程序: 1. 配置程序日志文件,以便后续测试或者诊断可能的问题(跳过,具体参考样例)。 2. 基本的日志服务连接与消费组的配置选项。 3. 目标Logstore的一些连接信息
请仔细阅读代码中相关注释并根据需要调整选项:
#encoding: utf8
def get_option():
##########################
# 基本选项
##########################
# 从环境变量中加载SLS参数与选项,根据需要可以配置多个目标
accessKeyId = os.environ.get('SLS_AK_ID', '')
accessKey = os.environ.get('SLS_AK_KEY', '')
endpoint = os.environ.get('SLS_ENDPOINT', '')
project = os.environ.get('SLS_PROJECT', '')
logstore = os.environ.get('SLS_LOGSTORE', '')
to_endpoint = os.environ.get('SLS_ENDPOINT_TO', endpoint)
to_project = os.environ.get('SLS_PROJECT_TO', project)
to_logstore1 = os.environ.get('SLS_LOGSTORE_TO1', '')
to_logstore2 = os.environ.get('SLS_LOGSTORE_TO2', '')
to_logstore3 = os.environ.get('SLS_LOGSTORE_TO3', '')
consumer_group = os.environ.get('SLS_CG', '')
# 消费的起点。这个参数在第一次跑程序的时候有效,后续再次运行将从上一次消费的保存点继续。
# 可以使”begin“,”end“,或者特定的ISO时间格式。
cursor_start_time = "2018-12-26 0:0:0"
# 一般不要修改消费者名,尤其是需要并发跑时
consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)
# 构建一个消费组和消费者
option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time)
# bind put_log_raw which is faster
to_client = LogClient(to_endpoint, accessKeyId, accessKey)
put_method1 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore1)
put_method2 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore2)
put_method3 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore3)
return option, {u'ngnix': put_method1, u'sql_audit': put_method2, u'click': put_method3}
注意,这里使用了functools.partial
对put_log_raw
进行绑定,以便后续调用方便。
数据消费与分发¶
如下代码展示如何从SLS拿到数据后根据topic
进行转发。
if __name__ == '__main__':
option, put_methods = get_copy_option()
def copy_data(shard_id, log_groups):
for log_group in log_groups.LogGroups:
# update topic
if log_group.Topic in put_methods:
put_methods[log_group.Topic](log_group=log_group)
logger.info("*** start to consume data...")
worker = ConsumerWorker(ConsumerProcessorAdaptor, option, args=(copy_data, ))
worker.start(join=True)
启动¶
假设程序命名为”dispatch_data.py”,可以如下启动:
export SLS_ENDPOINT=<Endpoint of your region>
export SLS_AK_ID=<YOUR AK ID>
export SLS_AK_KEY=<YOUR AK KEY>
export SLS_PROJECT=<SLS Project Name>
export SLS_LOGSTORE=<SLS Logstore Name>
export SLS_LOGSTORE_TO1=<SLS To Logstore1 Name>
export SLS_LOGSTORE_TO1=<SLS To Logstore2 Name>
export SLS_LOGSTORE_TO1=<SLS To Logstore3 Name>
export SLS_CG=<消费组名,可以简单命名为"dispatch_data">
pypy3 dispatch_data.py
性能考虑¶
启动多个消费者¶
基于消费组的程序可以直接启动多次以便达到并发作用:
nohup pypy3 dispatch_data.py &
nohup pypy3 dispatch_data.py &
nohup pypy3 dispatch_data.py &
...
注意: 所有消费者使用了同一个消费组的名字和不同的消费者名字(因为消费者名以进程ID为后缀)。 因为一个分区(Shard)只能被一个消费者消费,假设一个日志库有10个分区,那么最多有10个消费者同时消费。
性能吞吐¶
基于测试,在没有带宽限制、接收端速率限制(如Splunk端)的情况下,以推进硬件用pypy3
运行上述样例,单个消费者占用大约10%的单核CPU
下可以消费达到5 MB/s
原始日志的速率。因此,理论上可以达到50 MB/s
原始日志每个CPU核
,也就是每个CPU核每天可以消费4TB原始日志
。
注意: 这个数据依赖带宽、硬件参数和SIEM接收端(如Splunk)是否能够较快接收数据。
高可用性¶
消费组会将检测点(check-point)保存在服务器端,当一个消费者停止,另外一个消费者将自动接管并从断点继续消费。
可以在不同机器上启动消费者,这样当一台机器停止或者损坏的清下,其他机器上的消费者可以自动接管并从断点进行消费。
理论上,为了备用,也可以启动大于shard数量的消费者。
更多案例¶
- 日志服务Python消费组实战(一):日志服务与SIEM(如Splunk)集成实战
- 日志服务Python消费组实战(二):实时日志分发
- 日志服务Python消费组实战(三):实时跨域监测多日志库数据
- 日志服务Python消费组实战(三):日志服务与SIEM(集成实战(二):syslog篇
- 本文Github样例
使用消费组实时跨域监测多日志库数据¶
解决问题¶
使用日志服务进行数据处理与传递的过程中,你是否遇到如下监测场景不能很好的解决: 1. 特定数据上传到日志服务中需要检查数据内的异常情况,而没有现成监控工具? 2. 需要检索数据里面的关键字,但数据没有建立索引,无法使用日志服务的告警功能? 3. 数据监测要求实时性(<5秒,例如Web访问500错误),而特定功能都有一定延迟(1分钟以上)? 4. 存在多个域的多个日志库(例如每个Region的错误文件对应的日志库),数据量不大,但监控逻辑类似,每个目标都要监控与配置,比较繁琐?
如果是的,您可以考虑使用日志服务Python消费组进行跨域实时数据监控,本文主要介绍如何使用消费组实时监控多个域中的多个日志库中的异常数据,并进行下一步告警动作。可以很好解决以上问题,并利用消费组的特点,达到自动平衡、负载均衡和高可用性。

image
基本概念¶
协同消费库(Consumer Library)是对日志服务中日志进行消费的高级模式,提供了消费组(ConsumerGroup)的概念对消费端进行抽象和管理,和直接使用SDK进行数据读取的区别在于,用户无需关心日志服务的实现细节,只需要专注于业务逻辑,另外,消费者之间的负载均衡、failover等用户也都无需关心。
消费组(Consumer Group) - 一个消费组由多个消费者构成,同一个消费组下面的消费者共同消费一个logstore中的数据,消费者之间不会重复消费数据。 消费者(Consumer) - 消费组的构成单元,实际承担消费任务,同一个消费组下面的消费者名称必须不同。
在日志服务中,一个logstore下面会有多个shard,协同消费库的功能就是将shard分配给一个消费组下面的消费者,分配方式遵循以下原则: - 每个shard只会分配到一个消费者。 - 一个消费者可以同时拥有多个shard。 新的消费者加入一个消费组,这个消费组下面的shard从属关系会调整,以达到消费负载均衡的目的,但是上面的分配原则不会变,分配过程对用户透明。
协同消费库的另一个功能是保存checkpoint,方便程序故障恢复时能接着从断点继续消费,从而保证数据不会被重复消费。
使用消费组进行实时分发¶
这里我们描述用Python使用消费组进行编程,实时跨域监测多个域的多个日志库,全文或特定字段检查 注意:本篇文章的相关代码可能会更新,最新版本在这里可以找到:Github样例.
安装¶
环境 1. 建议程序运行在靠近源日志库同Region下的ECS上,并使用局域网服务入口,这样好处是网络速度最快,其次是读取没有外网费用产生。 2. 强烈推荐PyPy3来运行本程序,而不是使用标准CPython解释器。 3. 日志服务的Python SDK可以如下安装:
pypy3 -m pip install aliyun-log-python-sdk -U
更多SLS Python SDK的使用手册,可以参考这里
程序配置¶
如下展示如何配置程序: 1. 配置程序日志文件,以便后续测试或者诊断可能的问题(跳过,具体参考样例)。 2. 基本的日志服务连接与消费组的配置选项。 3. 目标Logstore的一些连接信息
请仔细阅读代码中相关注释并根据需要调整选项:
#encoding: utf8
def get_option():
##########################
# 基本选项
##########################
# 从环境变量中加载SLS参数与选项,endpoint、project、logstore可以多个并配对
endpoints = os.environ.get('SLS_ENDPOINTS', '').split(";") # ;分隔
projects = os.environ.get('SLS_PROJECTS', '').split(";") # ;分隔
logstores = os.environ.get('SLS_LOGSTORES', '').split(";") # ;分隔,同一个Project下的用,分隔
accessKeyId = os.environ.get('SLS_AK_ID', '')
accessKey = os.environ.get('SLS_AK_KEY', '')
consumer_group = os.environ.get('SLS_CG', '')
# 消费的起点。这个参数在第一次跑程序的时候有效,后续再次运行将从上一次消费的保存点继续。
# 可以使”begin“,”end“,或者特定的ISO时间格式。
cursor_start_time = "2018-12-26 0:0:0"
# 一般不要修改消费者名,尤其是需要并发跑时
consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)
# 设定共享执行器
exeuctor = ThreadPoolExecutor(max_workers=2)
# 构建多个消费组(每个logstore一个)
options = []
for i in range(len(endpoints)):
endpoint = endpoints[i].strip()
project = projects[i].strip()
if not endpoint or not project:
logger.error("project: {0} or endpoint {1} is empty, skip".format(project, endpoint))
continue
logstore_list = logstores[i].split(",")
for logstore in logstore_list:
logstore = logstore.strip()
if not logstore:
logger.error("logstore for project: {0} or endpoint {1} is empty, skip".format(project, endpoint))
continue
option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
cursor_start_time=cursor_start_time, shared_executor=exeuctor)
options.append(option)
# 设定检测目标字段与目标值,例如这里是检测status字段是否有500等错误
keywords = {'status': r'5\d{2}'}
return exeuctor, options, keywords
注意,配置了多个endpoint、project、logstore,需要用分号分隔,并且一一对应;如果一个project下有多个logstore需要检测,可以将他们直接用逗号分隔。如下是一个检测3个Region下的4个Logstore的配置:
export SLS_ENDPOINTS=cn-hangzhou.log.aliyuncs.com;cn-beijing.log.aliyuncs.com;cn-qingdao.log.aliyuncs.com
export SLS_PROJECTS=project1;project2;project3
export SLS_LOGSTORES=logstore1;logstore2;logstore3_1,logstore3_2
数据监测¶
如下代码展示如何构建一个关键字检测器,针对数据中的目标字段进行检测,您也可以修改逻辑设定为符合需要的场景(例如多个字段的组合关系等)。
class KeywordMonitor(ConsumerProcessorBase):
"""
this consumer will keep monitor with k-v fields. like {"content": "error"}
"""
def __init__(self, keywords=None, logstore=None):
super(KeywordMonitor, self).__init__() # remember to call base init
self.keywords = keywords
self.kw_check = {}
for k, v in self.keywords.items():
self.kw_check[k] = re.compile(v)
self.logstore = logstore
def process(self, log_groups, check_point_tracker):
logs = PullLogResponse.loggroups_to_flattern_list(log_groups)
match_count = 0
sample_error_log = ""
for log in logs:
m = None
for k, c in self.kw_check.items():
if k in log:
m = c.search(log[k])
if m:
logger.debug('Keyword detected for shard "{0}" with keyword: "{1}" in field "{2}", log: {3}'
.format(self.shard_id, log[k], k, log))
if m:
match_count += 1
sample_error_log = log
if match_count:
logger.info("Keyword detected for shard {0}, count: {1}, example: {2}".format(self.shard_id, match_count, sample_error_log))
# TODO: 这里添加通知下游的代码
else:
logger.debug("No keyword detected for shard {0}".format(self.shard_id))
self.save_checkpoint(check_point_tracker)
控制逻辑¶
如下展示如何控制多个消费者,并管理退出命令:
def main():
exeuctor, options, keywords = get_monitor_option()
logger.info("*** start to consume data...")
workers = []
for option in options:
worker = ConsumerWorker(KeywordMonitor, option, args=(keywords,) )
workers.append(worker)
worker.start()
try:
for i, worker in enumerate(workers):
while worker.is_alive():
worker.join(timeout=60)
logger.info("worker project: {0} logstore: {1} exit unexpected, try to shutdown it".format(
options[i].project, options[i].logstore))
worker.shutdown()
except KeyboardInterrupt:
logger.info("*** try to exit **** ")
for worker in workers:
worker.shutdown()
# wait for all workers to shutdown before shutting down executor
for worker in workers:
while worker.is_alive():
worker.join(timeout=60)
exeuctor.shutdown()
if __name__ == '__main__':
main()
启动¶
假设程序命名为”monitor_keyword.py”,可以如下启动:
export SLS_ENDPOINTS=cn-hangzhou.log.aliyuncs.com;cn-beijing.log.aliyuncs.com;cn-qingdao.log.aliyuncs.com
export SLS_PROJECTS=project1;project2;project3
export SLS_LOGSTORES=logstore1;logstore2;logstore3_1,logstore3_2
export SLS_AK_ID=<YOUR AK ID>
export SLS_AK_KEY=<YOUR AK KEY>
export SLS_CG=<消费组名,可以简单命名为"dispatch_data">
pypy3 monitor_keyword.py
性能考虑¶
启动多个消费者¶
如果您的目标logstore存在多个shard,或者您的目标监测日志库较多,您可以进行一定划分并并启动多次程序:
# export SLS_ENDPOINTS, SLS_PROJECTS, SLS_LOGSTORES
nohup pypy3 dispatch_data.py &
# export SLS_ENDPOINTS, SLS_PROJECTS, SLS_LOGSTORES
nohup pypy3 dispatch_data.py &
# export SLS_ENDPOINTS, SLS_PROJECTS, SLS_LOGSTORES
nohup pypy3 dispatch_data.py &
...
注意: 所有消费者使用了同一个消费组的名字和不同的消费者名字(因为消费者名以进程ID为后缀)。 但数据量较大或者目标日志库较多时,单个消费者的速度可能无法满足需求,且因为Python的GIL的原因,只能用到一个CPU核。强烈建议您根据目标日志库的Shard数以及CPU的数量进行划分,启动多次以便重复利用CPU资源。
性能吞吐¶
基于测试,在没有带宽限制、接收端速率限制(如Splunk端)的情况下,以推进硬件用pypy3
运行上述样例,单个消费者占用大约10%的单核CPU
下可以消费达到5 MB/s
原始日志的速率。因此,理论上可以达到50 MB/s
原始日志每个CPU核
,也就是每个CPU核每天可以消费4TB原始日志
。
注意: 这个数据依赖带宽、硬件参数等。
高可用性¶
消费组会将检测点(check-point)保存在服务器端,当一个消费者停止,另外一个消费者将自动接管并从断点继续消费。
可以在不同机器上启动消费者,这样当一台机器停止或者损坏的清下,其他机器上的消费者可以自动接管并从断点进行消费。
理论上,为了备用,也可以启动大于shard数量的消费者。
更多案例¶
- 日志服务Python消费组实战(一):日志服务与SIEM(如Splunk)集成实战
- 日志服务Python消费组实战(二):实时日志分发
- 日志服务Python消费组实战(三):实时跨域监测多日志库数据
- 日志服务Python消费组实战(三):日志服务与SIEM(集成实战(二):syslog篇
- 本文Github样例
使用Log Handler自动上传Python日志¶
概述¶
使用Python SDK提供的Log Handler可以实现每一条Python程序的日志在不落盘的情况下自动上传到日志服务上。与写到文件再通过各种方式上传比起来,有如下优势:
- 实时性:主动直接发送,不落盘
- 吞吐量大,异步发送
- 配置简单:无需修改程序,无需知道机器位置,修改程序配置文件即可生效
- 智能解析: 自动解析日志中JSON和KV格式信息
本篇主要介绍如何基本的配置方式, 关于如何自动解析JSON和KV格式的日志和相关配置, 参考自动解析KV格式的日志和JSON格式的日志
配置¶
Log Handler与Python logging模块完全兼容,参考Python Logging
Python
logging模块允许通过编程或者文件的形式配置日志,如下我们通过文件配置logging.conf
:
[loggers]
keys=root,sls
[handlers]
keys=consoleHandler, slsHandler
[formatters]
keys=simpleFormatter, rawFormatter
[logger_root]
level=DEBUG
handlers=consoleHandler
[logger_sls]
level=INFO
handlers=consoleHandler, slsHandler
qualname=sls
propagate=0
[handler_consoleHandler]
class=StreamHandler
level=DEBUG
formatter=simpleFormatter
args=(sys.stdout,)
[handler_slsHandler]
class=aliyun.log.QueuedLogHandler
level=INFO
formatter=rawFormatter
args=(os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', ''), os.environ.get('ALIYUN_LOG_SAMPLE_ACCESSID', ''), os.environ.get('ALIYUN_LOG_SAMPLE_ACCESSKEY', ''), os.environ.get('ALIYUN_LOG_SAMPLE_TMP_PROJECT', ''), "logstore")
[formatter_simpleFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
[formatter_rawFormatter]
format=%(message)s
这里我们配置了一个root
和一个sls
的Log Handler,
其中sls
是实例化类aliyun.log.QueuedLogHandler
,并传入参数(详细参数列表)如下:
args=(os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', ''), os.environ.get('ALIYUN_LOG_SAMPLE_ACCESSID', ''), os.environ.get('ALIYUN_LOG_SAMPLE_ACCESSKEY', ''), os.environ.get('ALIYUN_LOG_SAMPLE_TMP_PROJECT', ''), "logstore")
注意:这里使用了os.environ
来从环境变量中获取相关配置。这里也可以直接填写实际的值。
上传日志¶
使用logging配置文件并输出日志即可,日志会自动上传。
import logging
import logging.config
# 配置
logging.config.fileConfig('logging.conf')
logger = logging.getLogger('sls')
# 使用logger
logger.info("test1")
try:
1/0
except ZeroDivisionError as ex:
logger.exception(ex)
之后日志即可自动上传到日志服务,如果要使用统计查询功能,最好打开索引。
配置日志服务logstore的索引¶
将接受日志的Logstore的索引打开,将特定域进行索引。推荐使用CLI进行配置如下:
aliyunlog log update_index --project_name="project1" --logstore_name="logstore1" --index_detail="file:///Users/user1/loghandler_index.json"
调整收集日志域¶
目前支持如下的日志信息,默认会收集所有相关域:
域 | 说明 |
---|---|
message | 消息内容 |
record_name | logging handler的名字,上面例子是sls |
level | 级别,INFO、ERROR等 |
file_path | 代码文件全路径 |
func_name | 所在函数名 |
line_no | 行号 |
module | 所在模块 |
thread_id | 当前线程Id |
thread_name | 当前线程名 |
process_id | 当前进程Id |
process_name | 当前进程名 |
参考类QueuedLogHandler的参数fields
接受一个列表来调整想要配置的域。
进一步参考日志域列表
下面例子中,我们修改之前的日志配置文件,只收集个别域如module
、func_name
等。(注意:message
是一定会被收集的):
[handler_slsHandler]
class=aliyun.log.QueuedLogHandler
level=INFO
formatter=rawFormatter
args=('cn-beijing.log.aliyuncs.com', 'ak_id', 'ak_key', 'project1', "logstore1", 'mytopic', ['level', 'func_name', 'module', 'line_no'] )
注意: 也可以通过参数buildin_fields_prefix
/
buildin_fields_suffix
给这些内置域增加前缀和后缀,
例如__level__
等.
使用JSON配置¶
如果期望更加灵活的配置, 也可以使用代码配置, 如下
#encoding: utf8
import logging, logging.config, os
# 配置
conf = {'version': 1,
'formatters': {'rawformatter': {'class': 'logging.Formatter',
'format': '%(message)s'}
},
'handlers': {'sls_handler': {'()':
'aliyun.log.QueuedLogHandler',
'level': 'INFO',
'formatter': 'rawformatter',
# custom args:
'end_point': os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', ''),
'access_key_id': os.environ.get('ALIYUN_LOG_SAMPLE_ACCESSID', ''),
'access_key': os.environ.get('ALIYUN_LOG_SAMPLE_ACCESSKEY', ''),
'project': 'project1',
'log_store': "logstore1"
}
},
'loggers': {'sls': {'handlers': ['sls_handler', ],
'level': 'INFO',
'propagate': False}
}
}
logging.config.dictConfig(conf)
# 使用
logger = logging.getLogger('sls')
logger.info("Hello world")
需要注意里面QueuedLogHandler
的初始化方式,
用的是传入命名参数的方式.
具体参数列表可以参考这里.
更多关于Python的dictConfig
,
参考这里.
UWSGI下使用Python Logging Handler¶
这里主要介绍了QueuedLogHandler
,
但是在UWSGI下因为进程调度模型的原因, 这个类无法正常工作.
因此提供了另外2个Handler, 如下:
- UwsgiQueuedLogHandler
- 建议使用这个类, 功能和配置完全一样.
但是需要额外安装一个第三方法库
uwsgidecorators
- SimpleLogHandler - 即时发送的简单Logging Handler, 配置完全一样. 用于特殊场景下的测试更方便一些, 一般情况下不推荐.
使用Python Log Handler自动上传并解析KV格式的日志¶
概述¶
使用Python SDK提供的Log Handler可以实现每一条Python程序的日志在不落盘的情况下自动上传到日志服务上。与写到文件再通过各种方式上传比起来,有如下优势:
- 实时性:主动直接发送,不落盘
- 吞吐量大,异步发送
- 配置简单:无需修改程序,无需知道机器位置,修改程序配置文件即可生效
- 智能解析: 自动解析日志中JSON和KV格式信息
本篇主要如何打开自动解析KV格式
的功能,
关于如何配置并使用的基本信息, 请参考使用Log
Handler自动上传Python日志
解决的问题¶
在程序中, 有时我们需要将特定数据输出到日志中以便跟踪, 例如:
data = {'name':'xiao ming', 'score': 100.0}
一般情况下, 我们会格式化数据内容, 附加其他信息并输出:
data = {'name':'xiao ming', 'score': 100.0}
logger.error('get some error when parsing data. name="{}" score={}'.format(data['name'], data['score']))
这样会输出的消息为:
get some error when parsing data. name="xiao ming" score=100.0
我们期望在上传到日志服务时可以自动解析出域name
和score
字段.
使用Python Handler的简单配置即可做到. 如下.
通过Logging的配置文件¶
参考Logging Handler的详细配置, 将其中参数列表修改为:
args=(os.environ.get(‘ALIYUN_LOG_SAMPLE_ENDPOINT’, ‘’), os.environ.get(‘ALIYUN_LOG_SAMPLE_ACCESSID’, ‘’), os.environ.get(‘ALIYUN_LOG_SAMPLE_ACCESSKEY’, ‘’), os.environ.get(‘ALIYUN_LOG_SAMPLE_TMP_PROJECT’, ‘’), “logstore”, None, None, None, None, None, None, None, None, None, None, None, None, True)
最后一个参数对应了Logging
Handler的详细参数的extract_kv
参数.
注意, 受限于Python
Logging的限制,
这里只能用无名参数, 依次传入. 对于不改的参数, 用None
占位.
通过代码以JSON形式配置¶
如果期望更加灵活的配置, 也可以使用代码配置,
如下将参数extract_kv
设置为True
即可.
#encoding: utf8
import logging, logging.config, os
# 配置
conf = {'version': 1,
'formatters': {'rawformatter': {'class': 'logging.Formatter',
'format': '%(message)s'}
},
'handlers': {'sls_handler': {'()':
'aliyun.log.QueuedLogHandler',
'level': 'INFO',
'formatter': 'rawformatter',
# custom args:
'end_point': os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', ''),
'access_key_id': os.environ.get('ALIYUN_LOG_SAMPLE_ACCESSID', ''),
'access_key': os.environ.get('ALIYUN_LOG_SAMPLE_ACCESSKEY', ''),
'project': 'project1',
'log_store': "logstore1",
'extract_kv': True
}
},
'loggers': {'sls': {'handlers': ['sls_handler', ],
'level': 'INFO',
'propagate': False}
}
}
logging.config.dictConfig(conf)
# 使用
logger = logging.getLogger('sls')
logger.error("get error, reason=103 return_code=333 agent_type=ios")
支持KV的格式¶
默认支持key=value的格式, 也就是等号=
分隔的值.
其中关键字key
的范围是: 中日文, 字母数字, 下划线, 点和横线.
值value
在有双引号括起来的情况下是除了双引号的任意字符.
在没有双引号括起来的情况下和关键字是一样的. 如下都是支持的:
c1 = "i=c1, k1=v1,k2=v2 k3=v3"
c2 = 'i=c2, k1=" v 1 ", k2="v 2" k3="~!@#=`;.>"' # 双引号
c3 = 'i=c3, k1=你好 k2=他们' # utf8
c4 = u'i=c4, 姓名=小明 年龄=中文 ' # utf8
c5 = u'i=c5, 姓名="小明" 年龄="中文"'# utf8
c6 = u'i=c6, 姓名=中文 年龄=中文' # unicode
c7 = u'i=c7, 姓名="小明" 年龄=中文 ' # unicode
c8 = """i=c8, k1="hello # 换行
world" k2="good
morning"
"""
自定义分隔符¶
默认通过等号=
分隔, 也可以通过参数extract_kv_sep
修改,
例如冒号:
c9 = 'k1:v1 k2:v2'
有时我们的分隔符是混合的, 有时为=
有时为:
, 如下:
c10 = 'k1=v1 k2:v2'
c11 = "k3 = v3"
c12 = "k4 : v4"
可以传入一个正则表达式给参数extract_kv_sep
即可,
例如上面的情况可以传入(?:=|:)
, 这里使用可非捕获分组(?:)
,
再用|
将各种可能的分隔符写入即可.
域名冲突¶
当关键字和内置日志域冲突时, 需要做一些调整, 例如:
c1 = 'student="xiao ming" level=3'
这里的level
和日志域的内建表示日志级别冲突了,
可以通过参数buildin_fields_prefix
/
buildin_fields_suffix
给系统日志域添加前缀后缀;
或者通过参数extract_kv_prefix
和extract_kv_suffix
给抽取的域添加前缀后缀来解决.
其他定制参数¶
自动抽取KV也支持更多其他相关参数如下:
参数 | 作用 | 默认值 |
---|---|---|
extract_kv | 是否自动解析KV | False |
extract_kv_drop_message | 匹配KV后是否丢弃掉默认的message域 | False |
extract_kv_prefix | 给解析的域添加前缀 | 空串 |
extract_kv_suffix | 给解析的域添加后缀 | 空串 |
extract_kv_sep | 关键字和值的分隔符 | = |
buildin_fields_prefix | 给系统域添加前缀 | 空串 |
buildin_fields_suffix | 给系统域添加后缀 | 空串 |
使用Python Log Handler自动上传并解析JSON格式的日志¶
概述¶
使用Python SDK提供的Log Handler可以实现每一条Python程序的日志在不落盘的情况下自动上传到日志服务上。与写到文件再通过各种方式上传比起来,有如下优势:
- 实时性:主动直接发送,不落盘
- 吞吐量大,异步发送
- 配置简单:无需修改程序,无需知道机器位置,修改程序配置文件即可生效
- 智能解析: 自动解析日志中JSON和KV格式信息
本篇主要如何打开自动解析JSON格式
的功能,
关于如何配置并使用的基本信息, 请参考使用Log
Handler自动上传Python日志
解决的问题¶
在程序中, 有时我们需要将特定数据输出到日志中以便跟踪, 例如:
data = {'name':u"小明", 'score': 100.0}
一般情况下, 我们可以直接输出数据, 如下:
response_data = {'name':u'小明', 'score': 100.0}
logger.info(response_data)
这样会输出的消息为:
{'name':u'小明', 'score': 100.0}
因为Python格式化的原因, 数据的字符串形式并不是真正的JSON格式.
并且我们期望在上传到日志服务时可以自动解析出域name
和score
字段.
使用Python Handler的简单配置即可做到. 如下.
通过Logging的配置文件¶
参考Logging Handler的详细配置, 将其中参数列表修改为:
args=(os.environ.get(‘ALIYUN_LOG_SAMPLE_ENDPOINT’, ‘’), os.environ.get(‘ALIYUN_LOG_SAMPLE_ACCESSID’, ‘’), os.environ.get(‘ALIYUN_LOG_SAMPLE_ACCESSKEY’, ‘’), os.environ.get(‘ALIYUN_LOG_SAMPLE_TMP_PROJECT’, ‘’), “logstore”, None, None, None, None, None, None, None, None, True)
最后一个参数对应了Logging
Handler的详细参数的extract_json
参数.
注意, 受限于Python
Logging的限制,
这里只能用无名参数, 依次传入. 对于不改的参数, 用None
占位.
通过代码以JSON形式配置¶
如果期望更加灵活的配置, 也可以使用代码配置,
如下将参数extract_json
设置为True
即可.
#encoding: utf8
import logging, logging.config, os
# 配置
conf = {'version': 1,
'formatters': {'rawformatter': {'class': 'logging.Formatter',
'format': '%(message)s'}
},
'handlers': {'sls_handler': {'()':
'aliyun.log.QueuedLogHandler',
'level': 'INFO',
'formatter': 'rawformatter',
# custom args:
'end_point': os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', ''),
'access_key_id': os.environ.get('ALIYUN_LOG_SAMPLE_ACCESSID', ''),
'access_key': os.environ.get('ALIYUN_LOG_SAMPLE_ACCESSKEY', ''),
'project': 'project1',
'log_store': "logstore1",
'extract_json': True
}
},
'loggers': {'sls': {'handlers': ['sls_handler', ],
'level': 'INFO',
'propagate': False}
}
}
logging.config.dictConfig(conf)
# 使用
logger = logging.getLogger('sls')
response_data = {'name':u'小明', 'score': 100.0}
logger.info(response_data)
域名冲突¶
当关键字和内置日志域冲突时, 需要做一些调整, 例如:
c1 = 'student="xiao ming" level=3'
这里的level
和日志域的内建表示日志级别冲突了,
可以通过参数buildin_fields_prefix
/
buildin_fields_suffix
给系统日志域添加前缀后缀;
或者通过参数extract_json_prefix
和extract_json_suffix
给抽取的域添加前缀后缀来解决.
其他定制参数¶
自动抽取KV也支持更多其他相关参数如下:
参数 | 作用 | 默认值 |
---|---|---|
extract_json | 是否自动解析KV | False |
extract_json_drop_message | 匹配JSON后是否丢弃掉默认的message域 | False |
extract_json_prefix | 给解析的域添加前缀 | 空串 |
extract_json_suffix | 给解析的域添加后缀 | 空串 |
buildin_fields_prefix | 给系统域添加前缀 | 空串 |
buildin_fields_suffix | 给系统域添加后缀 | 空串 |
Elasticsearch 数据迁移¶
概述¶
使用 Python SDK 提供的 MigrationManager 可以方便您快速将 Elasticsearch 中的数据导入日志服务。 MigrationManager 内部使用 Scroll API 从 Elasticsearch 中抓取数据。
配置¶
aliyun-log-python-sdk.readthedocs.io 无法正常显示表格,请参阅tutorial_es_migration.md
数据映射¶
logstore - index¶
MigrationManager 默认会将 Elasticsearch index 中的数据迁移至同名的 logstore 中,当然您也可以通过参数 logstore_index_mappings 指定将多个 index 中的数据迁移至一个 logstore。
logstore 不必事先创建,如果 MigrationManager 发现目标 logstore 未创建,会为您在指定的 project 下创建好。
数据类型映射¶
MigrationManager 会根据 Elasticsearch 的数据类型 在index 对应的 logstore 中创建好索引。
- Core datatypes
Elasticsearch | 日志服务 |
---|---|
text | text |
keyword | text,不分词 |
long | long |
integer | long |
short | long |
byte | long |
double | double |
float | double |
half_float | double |
scaled_float | double |
date | text |
boolean | text,不分词 |
binary | n/a |
integer_range | json |
float_range | json |
long_range | json |
double_range | json |
date_range | json |
ip_range | text,不分词 |
- Complex datatypes
Elasticsearch | 日志服务 |
---|---|
Array datatype | n/a |
Object datatype | json |
Nested datatype | n/a |
- Geo datatypes
Elasticsearch | 日志服务 |
---|---|
Geo-point datatype | text |
Geo-Shape datatype | text |
- Specialised datatypes
Elasticsearch | 日志服务 |
---|---|
IP datatype | text,不分词 |
Completion datatype | n/a |
Token count datatype | n/a |
mapper-murmur3 | n/a |
Percolator type | n/a |
join datatype | n/a |
抓取模式¶
- 为了提高吞吐量,MigrationManager 会为每个 index 的每个 shard 创建一个数据迁移任务,并提交到内部进程池中执行。
- 当全部任务执行完成后,migrate 方法才会退出。
任务执行情况展示¶
MigrationManager 使用 logging 记录任务的执行情况,您可以通过如下配置指定将结果输出至控制台。
logger = logging.getLogger()
logger.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
logger.addHandler(ch)
- 单个迁移任务执行结果展示。
========Tasks Info========
...
task_id=1, slice_id=1, slice_max=10, hosts=localhost:9200, indexes=None, query=None, project=test-project, time_cost_in_seconds=128.71100688, status=CollectionTaskStatus.SUCCESS, count=129330, message=None
...
编号为 1 的迁移任务执行成功,耗时 128.7s,迁移文档数量 129330。
- 迁移任务执行结果汇总信息。
========Summary========
Total started task count: 10
Successful task count: 10
Failed task count: 0
Total collected documentation count: 1000000
MigrationManager 总共启动了 10 个数据数据迁移任务,全部执行成功。迁移文档总数 1000000。
使用样例¶
- 将 hosts 为
localhost:9200
的 Elasticsearch 中的所有文档导入日志服务的项目project1
中。
migration_manager = MigrationManager(hosts="localhost:9200",
endpoint=endpoint,
project_name="project1",
access_key_id=access_key_id,
access_key=access_key)
migration_manager.migrate()
- 指定将 Elasticsearch 中索引名以
myindex_
开头的数据写入日志库logstore1
,将索引index1,index2
中的数据写入日志库logstore2
中。
migration_manager = MigrationManager(hosts="localhost:9200,other_host:9200",
endpoint=endpoint,
project_name="project1",
access_key_id=access_key_id,
access_key=access_key,
logstore_index_mappings='{"logstore1": "myindex_*", "logstore2": "index1,index2"}}')
migration_manager.migrate()
- 使用参数 query 指定从 Elasticsearch 中抓取
title
字段等于python
的文档,并使用文档中的字段date1
作为日志的 time 字段。
migration_manager = MigrationManager(hosts="localhost:9200",
endpoint=endpoint,
project_name="project1",
access_key_id=access_key_id,
access_key=access_key,
query='{"query": {"match": {"title": "python"}}}',
time_reference="date1")
migration_manager.migrate()
- 使用 HTTP 基本认证
user:secret@localhost:9200
,从 Elasticserch 中迁移数据。
migration_manager = MigrationManager(hosts="user:secret@localhost:9200",
endpoint=endpoint,
project_name="project1",
access_key_id=access_key_id,
access_key=access_key)
migration_manager.migrate()
使用日志服务Jupyter Notebook扩展¶
背景¶
IPython/Jupyter很流行¶
Jupyter的前身是IPython Notebook,而IPython Notebook的前身是IPython。如下可以看到起发展轨迹:

image
IPython/Jupyter非常流行,从三个方面可以看到: * 数据科学领域Python愈来愈流行已经是既定事实,根据数据科学与机器学习社区Kaggle 2018年调查,超过92%的人员会使用Python,而IPython/Jupyter也已经是不争的Python科学生态入口,使用Python做数据分析的人都会选择IPython/Jupyter作为工具平台。 * IPython/Jupyter Notebook不只是Python独有,作为开放平台,已经支持超过50种语言,例如Go、Java等。 * 各大云厂商都提供了对于Notebook的支持,SaaS生态中也有许多Notebook的有用工具,例如Github、NBViewer等。

image
日志服务对IPython/Jupyter支持¶
阿里云的日志服务(log service)是针对日志类数据的一站式服务,无需开发就能快捷完成海量日志数据的采集、消费、投递以及查询分析等功能。通过日志服务对IPython/Jupyter扩展的支持,可以轻松地使用Python对海量数据进行深度加工(ETL)、交互式分析(通过SQL、DataFrame)、机器学习与可视化等:

image
功能概述¶
安装¶
快速安装¶
Jupyter Notebook:
1. pip install aliyun-log-python-sdk>=0.6.43 pandas odps ipywidgets -U
配置DataFrame增强交互的配置(仅适用于Notebook):
1. jupyter --path
进入data的第一个目录(或者第二个也可以),
例如C:\Users\Administrator\AppData\Roaming\jupyter
在里面构建一个子目录(如果没有的话):nbextensions
2. python -c "import odps;print(odps);"
根据输出找到odps模块所在目录,进入子目录static > ui ,例如:C:\ProgramData\Anaconda3\Lib\site-packages\odps\static\ui
3. 复制#2中的target目录到#1中,并修改target目录为pyodps
例如: C:\ProgramData\Anaconda3\Lib\site-packages\odps\static\ui\target ==> C:\Users\Administrator\AppData\Roaming\jupyter\nbextensions\pyodps
4. 启动Jupyter前验证下
jupyter nbextension enable --py --sys-prefix widgetsnbextension
jupyter nbextension enable pyodps/main
IPython Shell/Jupyter Lab:
1. pip install aliyun-log-python-sdk>=0.6.43 pandas -U
配置¶

img
加载magic命令
%load_ext aliyun.log.ext.jupyter_magic
配置参数如下:
%manage_log <服务入口> <秘钥ID> <秘钥值> <日志项目名> <日志库名>
在Jupyter Notebook下,也可以无参数传入,通过GUI配置:
%manage_log
关于服务入口、秘钥等,可以进一步参考配置。
配置保存位置¶
以上操作将存储AK、Endpoint、Project、Logstore在~/.aliyunlogcli,
使用的块名是__jupyter_magic__
[__jupyter_magic__]
access-id=
access-key=
region-endpoint=
project=
logstore=
支持场景¶
1. 查询与统计¶
一般查询域统计(配置时间)¶

img
第一行用from_time ~ to_time
这样的格式操作。 注意:
两个%
%%log -1day ~ now
* |
select date_format(date_trunc('hour', __time__), '%H:%i') as dt,
count(1)%100 as pv,
round(sum(if(status < 400, 1, 0))*100.0/count(1), 1) AS ratio
group by date_trunc('hour', __time__)
order by dt limit 1000
Note:如果只有查询的部分,会自动拉取时间范围内所有日志(自动分页)
具体时间格式的支持,可以参考这里。
2. 全量数据拉取¶

img
如果原始数据没有索引,无法使用查询统计时,或者不需要条件过滤时,可以使用拉取命令。
%fetch 2019-1-31 10:0:0+8:00 ~ 2019-1-31 10:00:10+8:00
Note:
- 时间范围是服务器接受日志的时间,不同于日志自身的时间。
- 拉取过程中,取消的话,已经拉取的数据会放到
log_df_part
中。
3. Dataframe操作¶

img
查询返回值通过log_df
进行操作。是一个Pandas
的标准DataFrame
操作示例:
关于DataFrame操作,可以参考Pandas DataFrame。
注意事项¶
- 魔法命令不支持注释
因框架限制,魔法命令不支持注释(例如下面这种),推荐使用新的单元格放注释。
# count PV for host
%%log -1 day ~ now
* | select host, count(1) as pv group by host
问题¶
日志服务的数据并不要求统一格式,每条日志可以有不同的关键字集合,例如:
{"city": "123", "province": "vvv"}
{"city": "shanghai", "pop": "2000"}
{"name": "xiao ming", "home": "shanghai"}
因此一般使用日志服务的CLI下载的命令get_log_all或者pull_log_dump时,格式都是单行JSON格式以保证灵活性。
但是大部分情况下,一个日志库的所有日志的关键字集合总体是稳定的;另一方面,Excel格式(或者更简单的CSV格式)相对JSON更加商业应用和人类操作友好一些。
如果期望下载下来时是Excel或者CSV格式,并且自动处理字段不一致的情况的话,该怎么办?
本文通过使用日志服务IPython/Jupyter扩展,轻松做到这点。
前提¶
安装日志服务扩展¶
首先,参考文章日志服务IPythonIPython/Jupyter扩展完成安装(IPython Shell、IPython/Jupyter Notebook或者Jupyter Lab均可)
安装Excel相关组件¶
在IPython所在环境中安装Excel读写的相关组件:
pip install openpyxl xlrd xlwt XlsxWriter
- openpyxl - 用于Excel 2010 xlsx/xlsm文件的读写
- xlrd - 读取Exce (xls格式)
- xlwt - 写Excel (xls格式)
- XlsxWriter - 写Excel (xlsx)文件
场景¶
1. 将结果保存到Excel中¶
通过查询命令%%log
查询得到Pandas Dataframe
,然后调用to_excel
即可。
样例:
%%log -1day ~ now
* | select date_format(date_trunc('hour', __time__), '%H:%i') as dt,
count(1)%100 as pv,
round(sum(if(status < 400, 1, 0))*100.0/count(1), 1) AS ratio
group by date_trunc('hour', __time__)
order by dt limit 1000
df1 = log_df
df1.to_excel('output.xlsx')
2. 将结果保存到Excel多个Sheet中¶
通过%log
或%%log
获得多个数据存在不同的Dataframe中后,如下样例操作:
import pandas as pd
writer = pd.ExcelWriter('output2.xlsx')
df1.to_excel(writer, sheet_name='data1')
df2.to_excel(writer, sheet_name='data2')
writer.save()
3. 定制Excel细节格式¶
Pandas默认使用Xlwt模块
写xls
文件、使用Openpyxl模块
写xlsx
文件。而使用XlsxWriter
写xlsx
功能更加全面灵活,但需要如下配置。
例如上面例子中的ExcelWriter
构造时,增加参数即可:
writer = pd.ExcelWriter('output2.xlsx', engine='xlsxwriter')
可以定制特定列的格式、样式、甚至直接画Excel图表。具体推荐参考这篇文章。
API¶
Main Class¶
LogClient (endpoint, accessKeyId, accessKey) |
Construct the LogClient with endpoint, accessKeyId, accessKey. |
LogException (errorCode, errorMessage[, …]) |
The Exception of the log request & response. |
LogResponse (headers[, body]) |
The base response class of all log response. |
Logging Handler Class¶
SimpleLogHandler (end_point, access_key_id, …) |
SimpleLogHandler, blocked sending any logs, just for simple test purpose |
QueuedLogHandler (end_point, access_key_id, …) |
Queued Log Handler, tuned async log handler. |
UwsgiQueuedLogHandler (*args, **kwargs) |
Queued Log Handler for Uwsgi, depends on library uwsgidecorators, need to deploy it separatedly. |
LogFields |
fields used to upload automatically Possible fields: record_name, level, func_name, module, file_path, line_no, process_id, process_name, thread_id, thread_name |
Request and Config Class¶
GetHistogramsRequest ([project, logstore, …]) |
The request used to get histograms of a query from log. |
GetLogsRequest ([project, logstore, …]) |
The request used to get logs by a query from log. |
GetProjectLogsRequest ([project, query]) |
The request used to get logs by a query from log cross multiple logstores. |
ListTopicsRequest ([project, logstore, …]) |
The request used to get topics of a query from log. |
ListLogstoresRequest ([project]) |
The request used to list log store from log. |
PutLogsRequest ([project, logstore, topic, …]) |
The request used to send data to log. |
LogtailConfigGenerator |
Generator of Logtial config |
PluginConfigDetail (logstoreName, configName, …) |
The logtail config for simple mode |
SeperatorFileConfigDetail (logstoreName, …) |
The logtail config for separator mode |
SimpleFileConfigDetail (logstoreName, …[, …]) |
The logtail config for simple mode |
FullRegFileConfigDetail (logstoreName, …[, …]) |
The logtail config for full regex mode |
JsonFileConfigDetail (logstoreName, …[, …]) |
The logtail config for json mode |
ApsaraFileConfigDetail (logstoreName, …[, …]) |
The logtail config for Apsara mode |
SyslogConfigDetail (logstoreName, configName, tag) |
The logtail config for syslog mode |
MachineGroupDetail ([group_name, …]) |
The machine group detail info |
IndexConfig ([ttl, line_config, …]) |
The index config of a logstore |
OssShipperConfig (oss_bucket, oss_prefix, …) |
A oss ship config |
OdpsShipperConfig (odps_endpoint, …[, …]) |
Odps shipper config |
ShipperTask (task_id, task_status, …) |
A shipper task |
Response Class¶
CreateProjectResponse (header[, resp]) |
Response of create_project |
DeleteProjectResponse (header[, resp]) |
|
GetProjectResponse (resp, header) |
|
ListProjectResponse (resp, header) |
GetLogsResponse (resp, header) |
The response of the GetLog API from log. |
ListLogstoresResponse (resp, header) |
The response of the ListLogstores API from log. |
ListTopicsResponse (resp, header) |
The response of the ListTopic API from log. |
GetCursorResponse (resp, header) |
The response of the get_cursor API from log. |
GetCursorTimeResponse (resp, header) |
The response of the get_cursor_time API from log. |
ListShardResponse (resp, header) |
The response of the list_shard API from log. |
DeleteShardResponse (header[, resp]) |
The response of the create_logstore API from log. |
GetHistogramsResponse (resp, header) |
The response of the GetHistograms API from log. |
Histogram (fromTime, toTime, count, progress) |
The class used to present the result of log histogram status. |
GetLogsResponse (resp, header) |
The response of the GetLog API from log. |
QueriedLog (timestamp, source, contents) |
The QueriedLog is a log of the GetLogsResponse which obtained from the log. |
PullLogResponse (resp, header) |
The response of the pull_logs API from log. |
CreateIndexResponse (header[, resp]) |
The response of the create_index API from log. |
UpdateIndexResponse (header[, resp]) |
The response of the update_index API from log. |
DeleteIndexResponse (header[, resp]) |
The response of the delete_index API from log. |
GetIndexResponse (resp, header) |
The response of the get_index_config API from log. |
CreateLogtailConfigResponse (header[, resp]) |
The response of the create_logtail_config API from log. |
DeleteLogtailConfigResponse (header[, resp]) |
The response of the delete_logtail_config API from log. |
GetLogtailConfigResponse (resp, header) |
The response of the get_logtail_config API from log. |
UpdateLogtailConfigResponse (header[, resp]) |
The response of the update_logtail_config API from log. |
ListLogtailConfigResponse (resp, header) |
The response of the list_logtail_config API from log. |
CreateMachineGroupResponse (header[, resp]) |
The response of the create_machine_group API from log. |
DeleteMachineGroupResponse (header[, resp]) |
The response of the delete_machine_group API from log. |
GetMachineGroupResponse (resp, header) |
The response of the get_machine_group API from log. |
UpdateMachineGroupResponse (header[, resp]) |
The response of the update_machine_group API from log. |
ListMachineGroupResponse (resp, header) |
The response of the list_machine_group API from log. |
ListMachinesResponse (resp, header) |
The response of the list_machines API from log. |
ApplyConfigToMachineGroupResponse (header[, resp]) |
The response of the apply_config_to_machine_group API from log. |
RemoveConfigToMachineGroupResponse (header[, …]) |
The response of the remove_config_to_machine_group API from log. |
GetMachineGroupAppliedConfigResponse (resp, …) |
The response of the get_machine_group_applied_config API from log. |
GetConfigAppliedMachineGroupsResponse (resp, …) |
The response of the get_config_applied_machine_group API from log. |
CreateShipperResponse (header[, resp]) |
|
UpdateShipperResponse (header[, resp]) |
|
DeleteShipperResponse (header[, resp]) |
|
GetShipperConfigResponse (resp, header) |
|
ListShipperResponse (resp, header) |
|
GetShipperTasksResponse (resp, header) |
|
RetryShipperTasksResponse (header[, resp]) |
ConsumerGroupEntity (consumer_group_name, timeout) |
|
CreateConsumerGroupResponse (headers[, resp]) |
|
ConsumerGroupCheckPointResponse (resp, headers) |
|
ConsumerGroupHeartBeatResponse (resp, headers) |
|
ConsumerGroupUpdateCheckPointResponse (headers) |
|
DeleteConsumerGroupResponse (headers[, resp]) |
|
ListConsumerGroupResponse (resp, headers) |
|
UpdateConsumerGroupResponse (headers, resp) |
CreateEntityResponse (headers[, body]) |
|
UpdateEntityResponse (headers[, body]) |
|
DeleteEntityResponse (headers[, body]) |
|
GetEntityResponse (headers[, body]) |
|
ListEntityResponse (header, resp[, …]) |
ES Migration Class¶
MigrationManager ([hosts, indexes, query, …]) |
MigrationManager, migrate data from elasticsearch to aliyun log service |
Project¶
list_project ([offset, size]) |
list the project Unsuccessful opertaion will cause an LogException. |
create_project (project_name, project_des) |
Create a project Unsuccessful opertaion will cause an LogException. |
get_project (project_name) |
get project Unsuccessful opertaion will cause an LogException. |
delete_project (project_name) |
delete project Unsuccessful opertaion will cause an LogException. |
copy_project (from_project, to_project[, …]) |
copy project, logstore, machine group and logtail config to target project, expecting the target project doesn’t contain same named logstores as source project |
Logstore¶
copy_logstore (from_project, from_logstore, …) |
copy logstore, index, logtail config to target logstore, machine group are not included yet. |
list_logstore (project_name[, …]) |
list the logstore in a projectListLogStoreResponse Unsuccessful opertaion will cause an LogException. |
create_logstore (project_name, logstore_name) |
create log store Unsuccessful opertaion will cause an LogException. |
get_logstore (project_name, logstore_name) |
get the logstore meta info Unsuccessful opertaion will cause an LogException. |
update_logstore (project_name, logstore_name) |
update the logstore meta info Unsuccessful opertaion will cause an LogException. |
delete_logstore (project_name, logstore_name) |
delete log store Unsuccessful opertaion will cause an LogException. |
list_topics (request) |
List all topics in a logstore. |
Index¶
create_index (project_name, logstore_name, …) |
create index for a logstore Unsuccessful opertaion will cause an LogException. |
update_index (project_name, logstore_name, …) |
update index for a logstore Unsuccessful opertaion will cause an LogException. |
delete_index (project_name, logstore_name) |
delete index of a logstore Unsuccessful opertaion will cause an LogException. |
get_index_config (project_name, logstore_name) |
get index config detail of a logstore Unsuccessful opertaion will cause an LogException. |
Logtail Config¶
create_logtail_config (project_name, …) |
create logtail config in a project Unsuccessful opertaion will cause an LogException. |
update_logtail_config (project_name, …) |
update logtail config in a project Unsuccessful opertaion will cause an LogException. |
delete_logtail_config (project_name, config_name) |
delete logtail config in a project Unsuccessful opertaion will cause an LogException. |
get_logtail_config (project_name, config_name) |
get logtail config in a project Unsuccessful opertaion will cause an LogException. |
list_logtail_config (project_name[, offset, size]) |
list logtail config name in a project Unsuccessful opertaion will cause an LogException. |
Machine Group¶
create_machine_group (project_name, group_detail) |
create machine group in a project Unsuccessful opertaion will cause an LogException. |
delete_machine_group (project_name, group_name) |
delete machine group in a project Unsuccessful opertaion will cause an LogException. |
update_machine_group (project_name, group_detail) |
update machine group in a project Unsuccessful opertaion will cause an LogException. |
get_machine_group (project_name, group_name) |
get machine group in a project Unsuccessful opertaion will cause an LogException. |
list_machine_group (project_name[, offset, size]) |
list machine group names in a project Unsuccessful opertaion will cause an LogException. |
list_machines (project_name, group_name[, …]) |
list machines in a machine group Unsuccessful opertaion will cause an LogException. |
Apply Logtail Config¶
apply_config_to_machine_group (project_name, …) |
apply a logtail config to a machine group Unsuccessful opertaion will cause an LogException. |
remove_config_to_machine_group (project_name, …) |
remove a logtail config to a machine group Unsuccessful opertaion will cause an LogException. |
get_machine_group_applied_configs (…) |
get the logtail config names applied in a machine group Unsuccessful opertaion will cause an LogException. |
get_config_applied_machine_groups (…) |
get machine group names where the logtail config applies to Unsuccessful opertaion will cause an LogException. |
Shard¶
list_shards (project_name, logstore_name) |
list the shard meta of a logstore Unsuccessful opertaion will cause an LogException. |
split_shard (project_name, logstore_name, …) |
split a readwrite shard into two shards Unsuccessful opertaion will cause an LogException. |
merge_shard (project_name, logstore_name, shardId) |
split two adjacent readwrite hards into one shards Unsuccessful opertaion will cause an LogException. |
Cursor¶
get_cursor (project_name, logstore_name, …) |
Get cursor from log service for batch pull logs Unsuccessful opertaion will cause an LogException. |
get_cursor_time (project_name, logstore_name, …) |
Get cursor time from log service Unsuccessful opertaion will cause an LogException. |
get_previous_cursor_time (project_name, …) |
Get previous cursor time from log service. |
get_begin_cursor (project_name, …) |
Get begin cursor from log service for batch pull logs Unsuccessful opertaion will cause an LogException. |
get_end_cursor (project_name, logstore_name, …) |
Get end cursor from log service for batch pull logs Unsuccessful opertaion will cause an LogException. |
Logs¶
put_logs (request) |
Put logs to log service. |
pull_logs (project_name, logstore_name, …) |
batch pull log data from log service Unsuccessful opertaion will cause an LogException. |
pull_log (project_name, logstore_name, …[, …]) |
batch pull log data from log service using time-range Unsuccessful opertaion will cause an LogException. |
pull_log_dump (project_name, logstore_name, …) |
dump all logs seperatedly line into file_path, file_path, the time parameters are log received time on server side. |
get_log (project, logstore, from_time, to_time) |
Get logs from log service. |
get_logs (request) |
Get logs from log service. |
get_log_all (project, logstore, from_time, …) |
Get logs from log service. |
get_histograms (request) |
Get histograms of requested query from log service. |
get_project_logs (request) |
Get logs from log service. |
Consumer group¶
create_consumer_group (project, logstore, …) |
create consumer group |
update_consumer_group (project, logstore, …) |
Update consumer group |
delete_consumer_group (project, logstore, …) |
Delete consumer group |
list_consumer_group (project, logstore) |
List consumer group |
update_check_point (project, logstore, …[, …]) |
Update check point |
get_check_point (project, logstore, …[, shard]) |
Get check point |
Dashboard¶
list_dashboard (project[, offset, size]) |
list the Dashboard, get first 100 items by default Unsuccessful opertaion will cause an LogException. |
create_dashboard (project, detail) |
Create Dashboard. |
get_dashboard (project, entity) |
Get Dashboard. |
update_dashboard (project, detail) |
Update Dashboard. |
delete_dashboard (project, entity) |
Delete Dashboard. |
Saved search¶
list_savedsearch (project[, offset, size]) |
list the Savedsearch, get first 100 items by default Unsuccessful opertaion will cause an LogException. |
create_savedsearch (project, detail) |
Create Savedsearch. |
get_savedsearch (project, entity) |
Get Savedsearch. |
update_savedsearch (project, detail) |
Update Savedsearch. |
delete_savedsearch (project, entity) |
Delete Savedsearch. |
Alert¶
list_alert (project[, offset, size]) |
list the Alert, get first 100 items by default Unsuccessful opertaion will cause an LogException. |
create_alert (project, detail) |
Create Alert. |
get_alert (project, entity) |
Get Alert. |
update_alert (project, detail) |
Update Alert. |
delete_alert (project, entity) |
Delete Alert. |
Shipper¶
create_shipper (project, logstore, detail) |
Create Shipper. |
update_shipper (project, logstore, detail) |
Update Shipper. |
delete_shipper (project, logstore, entity) |
Delete Shipper. |
get_shipper (project, logstore, entity) |
Get Shipper. |
list_shipper (project, logstore[, offset, size]) |
list the Shipper, get first 100 items by default Unsuccessful opertaion will cause an LogException. |
get_shipper_tasks (project_name, …[, …]) |
get odps/oss shipper tasks in a certain time range Unsuccessful opertaion will cause an LogException. |
retry_shipper_tasks (project_name, …) |
retry failed tasks , only the failed task can be retried Unsuccessful opertaion will cause an LogException. |
Definitions¶
-
class
aliyun.log.
LogClient
(endpoint, accessKeyId, accessKey, securityToken=None, source=None)[source]¶ Construct the LogClient with endpoint, accessKeyId, accessKey.
Parameters: - endpoint (string) – log service host name, for example, ch-hangzhou.log.aliyuncs.com or https://cn-beijing.log.aliyuncs.com
- accessKeyId (string) – aliyun accessKeyId
- accessKey (string) – aliyun accessKey
-
apply_config_to_machine_group
(project_name, config_name, group_name)[source]¶ apply a logtail config to a machine group Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- config_name (string) – the logtail config name to apply
- group_name (string) – the machine group name
Returns: ApplyConfigToMachineGroupResponse
Raise: LogException
-
arrange_shard
(project, logstore, count)[source]¶ arrange shard to the expected read-write count to a larger one.
Parameters: - project (string) – project name
- logstore (string) – logstore name
- count (int) – expected read-write shard count. should be larger than the current one.
Returns: ’‘
Raise: LogException
-
copy_data
(project, logstore, from_time, to_time=None, to_client=None, to_project=None, to_logstore=None, shard_list=None, batch_size=None, compress=None, new_topic=None, new_source=None)[source]¶ copy data from one logstore to another one (could be the same or in different region), the time is log received time on server side.
Parameters: - project (string) – project name
- logstore (string) – logstore name
- from_time (string/int) – curosr value, could be begin, timestamp or readable time in readable time like “%Y-%m-%d %H:%M:%S<time_zone>” e.g. “2018-01-02 12:12:10+8:00”, also support human readable string, e.g. “1 hour ago”, “now”, “yesterday 0:0:0”, refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
- to_time (string/int) – curosr value, default is “end”, could be begin, timestamp or readable time in readable time like “%Y-%m-%d %H:%M:%S<time_zone>” e.g. “2018-01-02 12:12:10+8:00”, also support human readable string, e.g. “1 hour ago”, “now”, “yesterday 0:0:0”, refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
- to_client (LogClient) – logclient instance, if empty will use source client
- to_project (string) – project name, if empty will use source project
- to_logstore (string) – logstore name, if empty will use source logstore
- shard_list (string) – shard number list. could be comma seperated list or range: 1,20,31-40
- batch_size (int) – batch size to fetch the data in each iteration. by default it’s 500
- compress (bool) – if use compression, by default it’s True
- new_topic (string) – overwrite the copied topic with the passed one
- new_source (string) – overwrite the copied source with the passed one
Returns: LogResponse {“total_count”: 30, “shards”: {0: 10, 1: 20} })
-
copy_logstore
(from_project, from_logstore, to_logstore, to_project=None, to_client=None)[source]¶ copy logstore, index, logtail config to target logstore, machine group are not included yet. the target logstore will be crated if not existing
Parameters: - from_project (string) – project name
- from_logstore (string) – logstore name
- to_logstore (string) – target logstore name
- to_project (string) – target project name, copy to same project if not being specified, will try to create it if not being specified
- to_client (LogClient) – logclient instance, use it to operate on the “to_project” if being specified for cross region purpose
Returns:
-
copy_project
(from_project, to_project, to_client=None, copy_machine_group=False)[source]¶ copy project, logstore, machine group and logtail config to target project, expecting the target project doesn’t contain same named logstores as source project
Parameters: - from_project (string) – project name
- to_project (string) – project name
- to_client (LogClient) – logclient instance
- copy_machine_group (bool) – if copy machine group resources, False by default.
Returns: None
-
create_alert
(project, detail)¶ Create Alert. Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – project name
- detail (dict/string) – json string
Returns: CreateEntityResponse
Raise: LogException
-
create_consumer_group
(project, logstore, consumer_group, timeout, in_order=False)[source]¶ create consumer group
Parameters: - project (string) – project name
- logstore (string) – logstore name
- consumer_group (string) – consumer group name
- timeout (int) – time-out
- in_order (bool) – if consume in order, default is False
Returns: CreateConsumerGroupResponse
-
create_dashboard
(project, detail)¶ Create Dashboard. Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – project name
- detail (dict/string) – json string
Returns: CreateEntityResponse
Raise: LogException
-
create_external_store
(project_name, config)[source]¶ create log store Unsuccessful opertaion will cause an LogException.
Parameters: project_name (string) – the Project name :type config : ExternalStoreConfig :param config :external store config
Returns: CreateExternalStoreResponse Raise: LogException
-
create_index
(project_name, logstore_name, index_detail)[source]¶ create index for a logstore Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
- index_detail (IndexConfig) – the index config detail used to create index
Returns: CreateIndexResponse
Raise: LogException
-
create_logstore
(project_name, logstore_name, ttl=30, shard_count=2, enable_tracking=False, append_meta=False, auto_split=True, max_split_shard=64, preserve_storage=False)[source]¶ create log store Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
- ttl (int) – the life cycle of log in the logstore in days, default 30, up to 3650
- shard_count (int) – the shard count of the logstore to create, default 2
- enable_tracking (bool) – enable web tracking, default is False
- append_meta (bool) – allow to append meta info (server received time and IP for external IP to each received log)
- auto_split (bool) – auto split shard, max_split_shard will be 64 by default is True
- max_split_shard (int) – max shard to split, up to 64
- preserve_storage (bool) – if always persist data, TTL will be ignored.
Returns: CreateLogStoreResponse
Raise: LogException
-
create_logtail_config
(project_name, config_detail)[source]¶ create logtail config in a project Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- config_detail (LogtailConfigGenerator or SeperatorFileConfigDetail or SimpleFileConfigDetail or FullRegFileConfigDetail or JsonFileConfigDetail or ApsaraFileConfigDetail or SyslogConfigDetail or CommonRegLogConfigDetail) – the logtail config detail info, use LogtailConfigGenerator.from_json to generate config: SeperatorFileConfigDetail or SimpleFileConfigDetail or FullRegFileConfigDetail or JsonFileConfigDetail or ApsaraFileConfigDetail or SyslogConfigDetail, Note: CommonRegLogConfigDetail is deprecated.
Returns: CreateLogtailConfigResponse
Raise: LogException
-
create_machine_group
(project_name, group_detail)[source]¶ create machine group in a project Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- group_detail (MachineGroupDetail) – the machine group detail config
Returns: CreateMachineGroupResponse
Raise: LogException
-
create_project
(project_name, project_des)[source]¶ Create a project Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- project_des (string) – the description of a project
Returns: CreateProjectResponse
Raise: LogException
-
create_savedsearch
(project, detail)¶ Create Savedsearch. Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – project name
- detail (dict/string) – json string
Returns: CreateEntityResponse
Raise: LogException
-
create_shipper
(project, logstore, detail)¶ Create Shipper. Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – project name
- logstore (string) – logstore name
- detail (dict/string) – json string
Returns: CreateEntityResponse
Raise: LogException
-
delete_alert
(project, entity)¶ Delete Alert. Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – project name
- entity (string) – alert name
Returns: DeleteEntityResponse
Raise: LogException
-
delete_consumer_group
(project, logstore, consumer_group)[source]¶ Delete consumer group
Parameters: - project (string) – project name
- logstore (string) – logstore name
- consumer_group (string) – consumer group name
Returns: None
-
delete_dashboard
(project, entity)¶ Delete Dashboard. Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – project name
- entity (string) – dashboard name
Returns: DeleteEntityResponse
Raise: LogException
-
delete_external_store
(project_name, store_name)[source]¶ delete log store Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- store_name (string) – the external store name
Returns: DeleteExternalStoreResponse
Raise: LogException
-
delete_index
(project_name, logstore_name)[source]¶ delete index of a logstore Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
Returns: DeleteIndexResponse
Raise: LogException
-
delete_logstore
(project_name, logstore_name)[source]¶ delete log store Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
Returns: DeleteLogStoreResponse
Raise: LogException
-
delete_logtail_config
(project_name, config_name)[source]¶ delete logtail config in a project Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- config_name (string) – the logtail config name
Returns: DeleteLogtailConfigResponse
Raise: LogException
-
delete_machine_group
(project_name, group_name)[source]¶ delete machine group in a project Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- group_name (string) – the group name
Returns: DeleteMachineGroupResponse
Raise: LogException
-
delete_project
(project_name)[source]¶ delete project Unsuccessful opertaion will cause an LogException.
Parameters: project_name (string) – the Project name Returns: DeleteProjectResponse Raise: LogException
-
delete_savedsearch
(project, entity)¶ Delete Savedsearch. Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – project name
- entity (string) – savedsearch name
Returns: DeleteEntityResponse
Raise: LogException
-
delete_shard
(project_name, logstore_name, shardId)[source]¶ delete a readonly shard Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
- shardId (int) – the read only shard id
Returns: ListShardResponse
Raise: LogException
-
delete_shipper
(project, logstore, entity)¶ Delete Shipper. Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – project name
- logstore (string) – logstore name
- entity (string) – shipper name
Returns: DeleteEntityResponse
Raise: LogException
-
es_migration
(hosts, project_name, indexes=None, query=None, scroll='5m', logstore_index_mappings=None, pool_size=10, time_reference=None, source=None, topic=None, wait_time_in_secs=60, auto_creation=True)[source]¶ migrate data from elasticsearch to aliyun log service
Parameters: - hosts (string) – a comma-separated list of source ES nodes. e.g. “localhost:9200,other_host:9200”
- project_name (string) – specify the project_name of your log services. e.g. “your_project”
- indexes (string) – a comma-separated list of source index names. e.g. “index1,index2”
- query (string) – used to filter docs, so that you can specify the docs you want to migrate. e.g. ‘{“query”: {“match”: {“title”: “python”}}}’
- scroll (string) – specify how long a consistent view of the index should be maintained for scrolled search. e.g. “5m”
- logstore_index_mappings (string) – specify the mappings of log service logstore and ES index. e.g. ‘{“logstore1”: “my_index*”, “logstore2”: “index1,index2”}, “logstore3”: “index3”}’
- pool_size (int) – specify the size of process pool. e.g. 10
- time_reference (string) – specify what ES doc’s field to use as log’s time field. e.g. “field1”
- source (string) – specify the value of log’s source field. e.g. “your_source”
- topic (string) – specify the value of log’s topic field. e.g. “your_topic”
- wait_time_in_secs (int) – specify the waiting time between initialize aliyun log and executing data migration task. e.g. 60
- auto_creation (bool) – specify whether to let the tool create logstore and index automatically for you. e.g. True
Returns: MigrationResponse
Raise: Exception
-
get_alert
(project, entity)¶ Get Alert. Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – project name
- entity (string) – alert name
Returns: GetEntityResponse
Raise: LogException
-
get_begin_cursor
(project_name, logstore_name, shard_id)[source]¶ Get begin cursor from log service for batch pull logs Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
- shard_id (int) – the shard id
Returns: GetLogsResponse
Raise: LogException
-
get_check_point
(project, logstore, consumer_group, shard=-1)[source]¶ Get check point
Parameters: - project (string) – project name
- logstore (string) – logstore name
- consumer_group (string) – consumer group name
- shard (int) – shard id
Returns: ConsumerGroupCheckPointResponse
-
get_check_point_fixed
(project, logstore, consumer_group, shard=-1)[source]¶ Get check point
Parameters: - project (string) – project name
- logstore (string) – logstore name
- consumer_group (string) – consumer group name
- shard (int) – shard id
Returns: ConsumerGroupCheckPointResponse
-
get_config_applied_machine_groups
(project_name, config_name)[source]¶ get machine group names where the logtail config applies to Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- config_name (string) – the logtail config name used to apply
Returns: GetConfigAppliedMachineGroupsResponse
Raise: LogException
-
get_cursor
(project_name, logstore_name, shard_id, start_time)[source]¶ Get cursor from log service for batch pull logs Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
- shard_id (int) – the shard id
- start_time (string/int) – the start time of cursor, e.g 1441093445 or “begin”/”end”, or readable time like “%Y-%m-%d %H:%M:%S<time_zone>” e.g. “2018-01-02 12:12:10+8:00”, also support human readable string, e.g. “1 hour ago”, “now”, “yesterday 0:0:0”, refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
Returns: GetCursorResponse
Raise: LogException
-
get_cursor_time
(project_name, logstore_name, shard_id, cursor)[source]¶ Get cursor time from log service Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
- shard_id (int) – the shard id
- cursor (string) – the cursor to get its service receive time
Returns: GetCursorTimeResponse
Raise: LogException
-
get_dashboard
(project, entity)¶ Get Dashboard. Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – project name
- entity (string) – dashboard name
Returns: GetEntityResponse
Raise: LogException
-
get_end_cursor
(project_name, logstore_name, shard_id)[source]¶ Get end cursor from log service for batch pull logs Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
- shard_id (int) – the shard id
Returns: GetLogsResponse
Raise: LogException
-
get_external_store
(project_name, store_name)[source]¶ get the logstore meta info Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- store_name (string) – the logstore name
Returns: GetLogStoreResponse
Raise: LogException
-
get_histograms
(request)[source]¶ Get histograms of requested query from log service. Unsuccessful opertaion will cause an LogException.
Parameters: request (GetHistogramsRequest) – the GetHistograms request parameters class. Returns: GetHistogramsResponse Raise: LogException
-
get_index_config
(project_name, logstore_name)[source]¶ get index config detail of a logstore Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
Returns: GetIndexResponse
Raise: LogException
-
get_log
(project, logstore, from_time, to_time, topic=None, query=None, reverse=False, offset=0, size=100)[source]¶ Get logs from log service. will retry when incomplete. Unsuccessful opertaion will cause an LogException. Note: for larger volume of data (e.g. > 1 million logs), use get_log_all
Parameters: - project (string) – project name
- logstore (string) – logstore name
- from_time (int/string) – the begin timestamp or format of time in readable time like “%Y-%m-%d %H:%M:%S<time_zone>” e.g. “2018-01-02 12:12:10+8:00”, also support human readable string, e.g. “1 hour ago”, “now”, “yesterday 0:0:0”, refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
- to_time (int/string) – the end timestamp or format of time in readable time like “%Y-%m-%d %H:%M:%S<time_zone>” e.g. “2018-01-02 12:12:10+8:00”, also support human readable string, e.g. “1 hour ago”, “now”, “yesterday 0:0:0”, refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
- topic (string) – topic name of logs, could be None
- query (string) – user defined query, could be None
- reverse (bool) – if reverse is set to true, the query will return the latest logs first, default is false
- offset (int) – line offset of return logs
- size (int) – max line number of return logs, -1 means get all
Returns: GetLogsResponse
Raise: LogException
-
get_log_all
(project, logstore, from_time, to_time, topic=None, query=None, reverse=False, offset=0)[source]¶ Get logs from log service. will retry when incomplete. Unsuccessful opertaion will cause an LogException. different with get_log with size=-1, It will try to iteratively fetch all data every 100 items and yield them, in CLI, it could apply jmes filter to each batch and make it possible to fetch larger volume of data.
Parameters: - project (string) – project name
- logstore (string) – logstore name
- from_time (int/string) – the begin timestamp or format of time in readable time like “%Y-%m-%d %H:%M:%S<time_zone>” e.g. “2018-01-02 12:12:10+8:00”, also support human readable string, e.g. “1 hour ago”, “now”, “yesterday 0:0:0”, refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
- to_time (int/string) – the end timestamp or format of time in readable time like “%Y-%m-%d %H:%M:%S<time_zone>” e.g. “2018-01-02 12:12:10+8:00”, also support human readable string, e.g. “1 hour ago”, “now”, “yesterday 0:0:0”, refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
- topic (string) – topic name of logs, could be None
- query (string) – user defined query, could be None
- reverse (bool) – if reverse is set to true, the query will return the latest logs first, default is false
- offset (int) – offset to start, by default is 0
Returns: GetLogsResponse iterator
Raise: LogException
-
get_logs
(request)[source]¶ Get logs from log service. Unsuccessful opertaion will cause an LogException. Note: for larger volume of data (e.g. > 1 million logs), use get_log_all
Parameters: request (GetLogsRequest) – the GetLogs request parameters class. Returns: GetLogsResponse Raise: LogException
-
get_logstore
(project_name, logstore_name)[source]¶ get the logstore meta info Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
Returns: GetLogStoreResponse
Raise: LogException
-
get_logtail_config
(project_name, config_name)[source]¶ get logtail config in a project Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- config_name (string) – the logtail config name
Returns: GetLogtailConfigResponse
Raise: LogException
-
get_machine_group
(project_name, group_name)[source]¶ get machine group in a project Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- group_name (string) – the group name to get
Returns: GetMachineGroupResponse
Raise: LogException
-
get_machine_group_applied_configs
(project_name, group_name)[source]¶ get the logtail config names applied in a machine group Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- group_name (string) – the group name list
Returns: GetMachineGroupAppliedConfigResponse
Raise: LogException
-
get_previous_cursor_time
(project_name, logstore_name, shard_id, cursor, normalize=True)[source]¶ Get previous cursor time from log service. Note: normalize = true: if the cursor is out of range, it will be nornalized to nearest cursor Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
- shard_id (int) – the shard id
- cursor (string) – the cursor to get its service receive time
- normalize (bool) – fix the cursor or not if it’s out of scope
Returns: GetCursorTimeResponse
Raise: LogException
-
get_project
(project_name)[source]¶ get project Unsuccessful opertaion will cause an LogException.
Parameters: project_name (string) – the Project name Returns: GetProjectResponse Raise: LogException
-
get_project_logs
(request)[source]¶ Get logs from log service. Unsuccessful opertaion will cause an LogException.
Parameters: request (GetProjectLogsRequest) – the GetProjectLogs request parameters class. Returns: GetLogsResponse Raise: LogException
-
get_resource_usage
(project)[source]¶ get resource usage ist the project Unsuccessful opertaion will cause an LogException.
Parameters: client (string) – project name Returns: dict Raise: LogException
-
get_savedsearch
(project, entity)¶ Get Savedsearch. Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – project name
- entity (string) – savedsearch name
Returns: GetEntityResponse
Raise: LogException
-
get_shipper
(project, logstore, entity)¶ Get Shipper. Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – project name
- logstore (string) – logstore name
- entity (string) – shipper name
Returns: GetEntityResponse
Raise: LogException
-
get_shipper_tasks
(project_name, logstore_name, shipper_name, start_time, end_time, status_type='', offset=0, size=100)[source]¶ get odps/oss shipper tasks in a certain time range Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
- shipper_name (string) – the shipper name
- start_time (int) – the start timestamp
- end_time (int) – the end timestamp
- status_type (string) – support one of [‘’, ‘fail’, ‘success’, ‘running’] , if the status_type = ‘’ , return all kinds of status type
- offset (int) – the begin task offset, -1 means all
- size (int) – the needed tasks count
Returns: GetShipperTasksResponse
Raise: LogException
-
heart_beat
(project, logstore, consumer_group, consumer, shards=None)[source]¶ Heatbeat consumer group
Parameters: - project (string) – project name
- logstore (string) – logstore name
- consumer_group (string) – consumer group name
- consumer (string) – consumer name
- shards (int list) – shard id list e.g. [0,1,2]
Returns: None
-
list_alert
(project, offset=0, size=100)¶ list the Alert, get first 100 items by default Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – the Project name
- offset (int) – the offset of all the matched names
- size (int) – the max return names count, -1 means all
Returns: ListLogStoreResponse
Raise: LogException
-
list_consumer_group
(project, logstore)[source]¶ List consumer group
Parameters: - project (string) – project name
- logstore (string) – logstore name
Returns: ListConsumerGroupResponse
-
list_dashboard
(project, offset=0, size=100)¶ list the Dashboard, get first 100 items by default Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – the Project name
- offset (int) – the offset of all the matched names
- size (int) – the max return names count, -1 means all
Returns: ListLogStoreResponse
Raise: LogException
-
list_external_store
(project_name, external_store_name_pattern=None, offset=0, size=100)[source]¶ list the logstore in a projectListLogStoreResponse Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name_pattern (string) – the sub name logstore, used for the server to return logstore names contain this sub name
- offset (int) – the offset of all the matched names
- size (int) – the max return names count, -1 means all
Returns: ListLogStoreResponse
Raise: LogException
-
list_logstore
(project_name, logstore_name_pattern=None, offset=0, size=100)[source]¶ list the logstore in a projectListLogStoreResponse Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name_pattern (string) – the sub name logstore, used for the server to return logstore names contain this sub name
- offset (int) – the offset of all the matched names
- size (int) – the max return names count, -1 means all
Returns: ListLogStoreResponse
Raise: LogException
-
list_logstores
(request)[source]¶ List all logstores of requested project. Unsuccessful opertaion will cause an LogException.
Parameters: request (ListLogstoresRequest) – the ListLogstores request parameters class. Returns: ListLogStoresResponse Raise: LogException
-
list_logtail_config
(project_name, offset=0, size=100)[source]¶ list logtail config name in a project Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- offset (int) – the offset of all config names
- size (int) – the max return names count, -1 means all
Returns: ListLogtailConfigResponse
Raise: LogException
-
list_machine_group
(project_name, offset=0, size=100)[source]¶ list machine group names in a project Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- offset (int) – the offset of all group name
- size (int) – the max return names count, -1 means all
Returns: ListMachineGroupResponse
Raise: LogException
-
list_machines
(project_name, group_name, offset=0, size=100)[source]¶ list machines in a machine group Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- group_name (string) – the group name to list
- offset (int) – the offset of all group name
- size (int) – the max return names count, -1 means all
Returns: ListMachinesResponse
Raise: LogException
-
list_project
(offset=0, size=100)[source]¶ list the project Unsuccessful opertaion will cause an LogException.
Parameters: - offset (int) – the offset of all the matched names
- size (int) – the max return names count, -1 means return all data
Returns: ListProjectResponse
Raise: LogException
-
list_savedsearch
(project, offset=0, size=100)¶ list the Savedsearch, get first 100 items by default Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – the Project name
- offset (int) – the offset of all the matched names
- size (int) – the max return names count, -1 means all
Returns: ListLogStoreResponse
Raise: LogException
-
list_shards
(project_name, logstore_name)[source]¶ list the shard meta of a logstore Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
Returns: ListShardResponse
Raise: LogException
-
list_shipper
(project, logstore, offset=0, size=100)¶ list the Shipper, get first 100 items by default Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – the Project name
- logstore (string) – the logstore name
- offset (int) – the offset of all the matched names
- size (int) – the max return names count, -1 means all
Returns: ListLogStoreResponse
Raise: LogException
-
list_topics
(request)[source]¶ List all topics in a logstore. Unsuccessful opertaion will cause an LogException.
Parameters: request (ListTopicsRequest) – the ListTopics request parameters class. Returns: ListTopicsResponse Raise: LogException
-
merge_shard
(project_name, logstore_name, shardId)[source]¶ split two adjacent readwrite hards into one shards Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
- shardId (int) – the shard id of the left shard, server will determine the right adjacent shardId
Returns: ListShardResponse
Raise: LogException
-
pull_log
(project_name, logstore_name, shard_id, from_time, to_time, batch_size=None, compress=None)[source]¶ batch pull log data from log service using time-range Unsuccessful opertaion will cause an LogException. the time parameter means the time when server receives the logs
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
- shard_id (int) – the shard id
- from_time (string/int) – curosr value, could be begin, timestamp or readable time in readable time like “%Y-%m-%d %H:%M:%S<time_zone>” e.g. “2018-01-02 12:12:10+8:00”, also support human readable string, e.g. “1 hour ago”, “now”, “yesterday 0:0:0”, refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
- to_time (string/int) – curosr value, could be begin, timestamp or readable time in readable time like “%Y-%m-%d %H:%M:%S<time_zone>” e.g. “2018-01-02 12:12:10+8:00”, also support human readable string, e.g. “1 hour ago”, “now”, “yesterday 0:0:0”, refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
- batch_size (int) – batch size to fetch the data in each iteration. by default it’s 1000
- compress (bool) – if use compression, by default it’s True
Returns: PullLogResponse
Raise: LogException
-
pull_log_dump
(project_name, logstore_name, from_time, to_time, file_path, batch_size=None, compress=None, encodings=None, shard_list=None, no_escape=None)[source]¶ dump all logs seperatedly line into file_path, file_path, the time parameters are log received time on server side.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
- from_time (string/int) – curosr value, could be begin, timestamp or readable time in readable time like “%Y-%m-%d %H:%M:%S<time_zone>” e.g. “2018-01-02 12:12:10+8:00”, also support human readable string, e.g. “1 hour ago”, “now”, “yesterday 0:0:0”, refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
- to_time (string/int) – curosr value, could be begin, timestamp or readable time in readable time like “%Y-%m-%d %H:%M:%S<time_zone>” e.g. “2018-01-02 12:12:10+8:00”, also support human readable string, e.g. “1 hour ago”, “now”, “yesterday 0:0:0”, refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
- file_path (string) – file path with {} for shard id. e.g. “/data/dump_{}.data”, {} will be replaced with each partition.
- batch_size (int) – batch size to fetch the data in each iteration. by default it’s 500
- compress (bool) – if use compression, by default it’s True
- encodings (string list) – encoding like [“utf8”, “latin1”] etc to dumps the logs in json format to file. default is [“utf8”,]
- shard_list (string) – shard number list. could be comma seperated list or range: 1,20,31-40
- no_escape (bool) – if not_escape the non-ANSI, default is to escape, set it to True if don’t want it.
Returns: LogResponse {“total_count”: 30, “files”: {‘file_path_1’: 10, “file_path_2”: 20} })
Raise: LogException
-
pull_logs
(project_name, logstore_name, shard_id, cursor, count=None, end_cursor=None, compress=None)[source]¶ batch pull log data from log service Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
- shard_id (int) – the shard id
- cursor (string) – the start to cursor to get data
- count (int) – the required pull log package count, default 1000 packages
- end_cursor (string) – the end cursor position to get data
- compress (boolean) – if use zip compress for transfer data, default is True
Returns: PullLogResponse
Raise: LogException
-
put_log_raw
(project, logstore, log_group, compress=None)[source]¶ Put logs to log service. using raw data in protobuf
Parameters: - project (string) – the Project name
- logstore (string) – the logstore name
- log_group (LogGroup) – log group structure
- compress (boolean) – compress or not, by default is True
Returns: PutLogsResponse
Raise: LogException
-
put_logs
(request)[source]¶ Put logs to log service. up to 512000 logs up to 10MB size Unsuccessful opertaion will cause an LogException.
Parameters: request (PutLogsRequest) – the PutLogs request parameters class Returns: PutLogsResponse Raise: LogException
-
remove_config_to_machine_group
(project_name, config_name, group_name)[source]¶ remove a logtail config to a machine group Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- config_name (string) – the logtail config name to apply
- group_name (string) – the machine group name
Returns: RemoveConfigToMachineGroupResponse
Raise: LogException
-
retry_shipper_tasks
(project_name, logstore_name, shipper_name, task_list)[source]¶ retry failed tasks , only the failed task can be retried Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
- shipper_name (string) – the shipper name
- task_list (string array) – the failed task_id list, e.g [‘failed_task_id_1’, ‘failed_task_id_2’,…], currently the max retry task count 10 every time
Returns: RetryShipperTasksResponse
Raise: LogException
-
set_source
(source)[source]¶ Set the source of the log client
Parameters: source (string) – new source Returns: None
-
set_user_agent
(user_agent)[source]¶ set user agent
Parameters: user_agent (string) – user agent Returns: None
-
split_shard
(project_name, logstore_name, shardId, split_hash)[source]¶ split a readwrite shard into two shards Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
- shardId (int) – the shard id
- split_hash (string) – the internal hash between the shard begin and end hash
Returns: ListShardResponse
Raise: LogException
-
transform_data
(project, logstore, config, from_time, to_time=None, to_client=None, to_project=None, to_logstore=None, shard_list=None, batch_size=None, compress=None, cg_name=None, c_name=None, cg_heartbeat_interval=None, cg_data_fetch_interval=None, cg_in_order=None, cg_worker_pool_size=None)[source]¶ transform data from one logstore to another one (could be the same or in different region), the time passed is log received time on server side. There’re two mode, batch mode / consumer group mode. For Batch mode, just leave the cg_name and later options as None.
Parameters: - project (string) – project name
- logstore (string) – logstore name
- config (string) – transform config imported or path of config (in python)
- from_time (string/int) – curosr value, could be begin, timestamp or readable time in readable time like “%Y-%m-%d %H:%M:%S<time_zone>” e.g. “2018-01-02 12:12:10+8:00”, also support human readable string, e.g. “1 hour ago”, “now”, “yesterday 0:0:0”, refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
- to_time (string/int) – curosr value, could be begin, timestamp or readable time in readable time like “%Y-%m-%d %H:%M:%S<time_zone>” e.g. “2018-01-02 12:12:10+8:00”, also support human readable string, e.g. “1 hour ago”, “now”, “yesterday 0:0:0”, refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
- to_client (LogClient) – logclient instance, if empty will use source client
- to_project (string) – project name, if empty will use source project
- to_logstore (string) – logstore name, if empty will use source logstore
- shard_list (string) – shard number list. could be comma seperated list or range: 1,20,31-40
- batch_size (int) – batch size to fetch the data in each iteration. by default it’s 500
- compress (bool) – if use compression, by default it’s True
- cg_name (string) – consumer group name to enable scability and availability support.
- c_name (string) – consumer name for consumer group mode, must be different for each consuer in one group, normally leave it as default: CLI-transform-data-${process_id}
- cg_heartbeat_interval (int) – cg_heartbeat_interval, default 20
- cg_data_fetch_interval (int) – cg_data_fetch_interval, default 2
- cg_in_order (bool) – cg_in_order, default False
- cg_worker_pool_size (int) – cg_worker_pool_size, default 2
Returns: LogResponse {“total_count”: 30, “shards”: {0: {“count”: 10, “removed”: 1}, 2: {“count”: 20, “removed”: 1}} })
-
update_alert
(project, detail)¶ Update Alert. Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – project name
- detail (dict/string) – json string
Returns: UpdateEntityResponse
Raise: LogException
-
update_check_point
(project, logstore, consumer_group, shard, check_point, consumer='', force_success=True)[source]¶ Update check point
Parameters: - project (string) – project name
- logstore (string) – logstore name
- consumer_group (string) – consumer group name
- shard (int) – shard id
- check_point (string) – checkpoint name
- consumer (string) – consumer name
- force_success (bool) – if force to succeed
Returns: None
-
update_consumer_group
(project, logstore, consumer_group, timeout=None, in_order=None)[source]¶ Update consumer group
Parameters: - project (string) – project name
- logstore (string) – logstore name
- consumer_group (string) – consumer group name
- timeout (int) – timeout
- in_order (bool) – order
Returns: None
-
update_dashboard
(project, detail)¶ Update Dashboard. Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – project name
- detail (dict/string) – json string
Returns: UpdateEntityResponse
Raise: LogException
-
update_external_store
(project_name, config)[source]¶ update the logstore meta info Unsuccessful opertaion will cause an LogException.
:param config : external store config
Returns: UpdateExternalStoreResponse Raise: LogException
-
update_index
(project_name, logstore_name, index_detail)[source]¶ update index for a logstore Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
- index_detail (IndexConfig) – the index config detail used to update index
Returns: UpdateIndexResponse
Raise: LogException
-
update_logstore
(project_name, logstore_name, ttl=None, enable_tracking=None, shard_count=None, append_meta=None, auto_split=None, max_split_shard=None, preserve_storage=None)[source]¶ update the logstore meta info Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- logstore_name (string) – the logstore name
- ttl (int) – the life cycle of log in the logstore in days
- enable_tracking (bool) – enable web tracking
- shard_count (int) – deprecated, the shard count could only be updated by split & merge
- append_meta (bool) – allow to append meta info (server received time and IP for external IP to each received log)
- auto_split (bool) – auto split shard, max_split_shard will be 64 by default is True
- max_split_shard (int) – max shard to split, up to 64
- preserve_storage (bool) – if always persist data, TTL will be ignored.
Returns: UpdateLogStoreResponse
Raise: LogException
-
update_logtail_config
(project_name, config_detail)[source]¶ update logtail config in a project Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- config_detail (LogtailConfigGenerator or SeperatorFileConfigDetail or SimpleFileConfigDetail or FullRegFileConfigDetail or JsonFileConfigDetail or ApsaraFileConfigDetail or SyslogConfigDetail or CommonRegLogConfigDetail) – the logtail config detail info, use LogtailConfigGenerator.from_json to generate config: SeperatorFileConfigDetail or SimpleFileConfigDetail or FullRegFileConfigDetail or JsonFileConfigDetail or ApsaraFileConfigDetail or SyslogConfigDetail
Returns: UpdateLogtailConfigResponse
Raise: LogException
-
update_machine_group
(project_name, group_detail)[source]¶ update machine group in a project Unsuccessful opertaion will cause an LogException.
Parameters: - project_name (string) – the Project name
- group_detail (MachineGroupDetail) – the machine group detail config
Returns: UpdateMachineGroupResponse
Raise: LogException
-
update_savedsearch
(project, detail)¶ Update Savedsearch. Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – project name
- detail (dict/string) – json string
Returns: UpdateEntityResponse
Raise: LogException
-
update_shipper
(project, logstore, detail)¶ Update Shipper. Unsuccessful opertaion will cause an LogException.
Parameters: - project (string) – project name
- logstore (string) – logstore name
- detail (dict/string) – json string
Returns: UpdateEntityResponse
Raise: LogException
-
class
aliyun.log.
LogException
(errorCode, errorMessage, requestId='', resp_status=200, resp_header='', resp_body='')[source]¶ The Exception of the log request & response.
Parameters: - errorCode (string) – log service error code
- errorMessage (string) – detailed information for the exception
- requestId (string) – the request id of the response, ‘’ is set if client error
-
class
aliyun.log.
GetHistogramsRequest
(project=None, logstore=None, fromTime=None, toTime=None, topic=None, query=None)[source]¶ The request used to get histograms of a query from log.
Parameters: - project (string) – project name
- logstore (string) – logstore name
- fromTime (int/string) – the begin time or format of time in readable time like “%Y-%m-%d %H:%M:%S<time_zone>” e.g. “2018-01-02 12:12:10+8:00” e.g. “2018-01-02 12:12:10”, also support human readable string, e.g. “1 hour ago”, “now”, “yesterday 0:0:0”, refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
- toTime (int/string) – the end time or format of time in readable time like “%Y-%m-%d %H:%M:%S<time_zone>” e.g. “2018-01-02 12:12:10+8:00” e.g. “2018-01-02 12:12:10”, also support human readable string, e.g. “1 hour ago”, “now”, “yesterday 0:0:0”, refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
- topic (string) – topic name of logs
- query (string) – user defined query
-
class
aliyun.log.
GetLogsRequest
(project=None, logstore=None, fromTime=None, toTime=None, topic=None, query=None, line=100, offset=0, reverse=False)[source]¶ The request used to get logs by a query from log.
Parameters: - project (string) – project name
- logstore (string) – logstore name
- fromTime (int/string) – the begin time, or format of time in format “%Y-%m-%d %H:%M:%S” e.g. “2018-01-02 12:12:10”
- toTime (int/string) – the end time, or format of time in format “%Y-%m-%d %H:%M:%S” e.g. “2018-01-02 12:12:10”
- topic (string) – topic name of logs
- query (string) – user defined query
- line (int) – max line number of return logs
- offset (int) – line offset of return logs
- reverse (bool) – if reverse is set to true, the query will return the latest logs first
-
class
aliyun.log.
GetProjectLogsRequest
(project=None, query=None)[source]¶ The request used to get logs by a query from log cross multiple logstores.
Parameters: - project (string) – project name
- query (string) – user defined query
-
class
aliyun.log.
IndexConfig
(ttl=1, line_config=None, key_config_list=None, all_keys_config=None, log_reduce=None)[source]¶ The index config of a logstore
Parameters: - ttl (int) – this parameter is deprecated, the ttl is same as logstore’s ttl
- line_config (IndexLineConfig) – the index config of the whole log line
- key_config_list (dict) – dict (string => IndexKeyConfig), the index key configs of the keys
- all_keys_config (IndexKeyConfig) – the key config of all keys, the new create logstore should never user this param, it only used to compatible with old config
- log_reduce (bool) – if to enable logreduce
-
class
aliyun.log.
ListTopicsRequest
(project=None, logstore=None, token=None, line=None)[source]¶ The request used to get topics of a query from log.
Parameters: - project (string) – project name
- logstore (string) – logstore name
- token (string) – the start token to list topics
- line (int) – max topic counts to return
-
class
aliyun.log.
ListLogstoresRequest
(project=None)[source]¶ The request used to list log store from log.
Parameters: project (string) – project name
-
class
aliyun.log.
PluginConfigDetail
(logstoreName, configName, plugin, **extended_items)[source]¶ The logtail config for simple mode
Parameters: - logstoreName (string) – the logstore name
- configName (string) – the config name
- logPath (string) – folder of log path /apsara/nuwa/
- filePattern (string) – file path, e.g. .log, it will be /apsara/nuwa/…/.log
- localStorage (bool) – if use local cache 1GB when logtail is offline. default is True.
- enableRawLog (bool) – if upload raw data in content, default is False
- topicFormat (string) – “none”, “group_topic” or regex to extract value from file path e.g. “/test/(w+).log” will extract each file as topic, default is “none”
- fileEncoding (string) – “utf8” or “gbk” so far
- maxDepth (int) – max depth of folder to scan, by default its 100, 0 means just scan the root folder
- preserve (bool) – if preserve time-out, by default is False, 30 min time-out if set it as True
- preserveDepth (int) – time-out folder depth. 1-3
- filterKey (string list) – only keep log which match the keys. e.g. [“city”, “location”] will only scan files math the two fields
- filterRegex (string list) – matched value for filterKey, e.g. [“shanghai|beijing|nanjing”, “east”] note, it’s regex value list
- createTime (int) – timestamp of created, only useful when getting data from REST
- modifyTime (int) – timestamp of last modified time, only useful when getting data from REST
- extended_items (dict) – extended items
-
class
aliyun.log.
SeperatorFileConfigDetail
(logstoreName, configName, logPath, filePattern, logSample, separator, key, timeKey='', timeFormat=None, localStorage=None, enableRawLog=None, topicFormat=None, fileEncoding=None, maxDepth=None, preserve=None, preserveDepth=None, filterKey=None, filterRegex=None, createTime=None, modifyTime=None, **extended_items)[source]¶ The logtail config for separator mode
Parameters: - logstoreName (string) – the logstore name
- configName (string) – the config name
- logPath (string) – folder of log path /apsara/nuwa/
- filePattern (string) – file path, e.g. .log, it will be /apsara/nuwa/…/.log
- logSample (string) – log sample. e.g. shanghai|2000|east
- separator (string) – ‘ ‘ for tab, ‘ ‘ for space, ‘|’, up to 3 chars like “&&&” or “||” etc.
- key (string list) – keys to map the fields like [“city”, “population”, “location”]
- timeKey (string) – one key name in key to set the time or set it None to use system time.
- timeFormat (string) – whe timeKey is not None, set its format, refer to https://help.aliyun.com/document_detail/28980.html?spm=5176.2020520112.113.4.2243b18eHkxdNB
- localStorage (bool) – if use local cache 1GB when logtail is offline. default is True.
- enableRawLog (bool) – if upload raw data in content, default is False
- topicFormat (string) – “none”, “group_topic” or regex to extract value from file path e.g. “/test/(w+).log” will extract each file as topic, default is “none”
- fileEncoding (string) – “utf8” or “gbk” so far
- maxDepth (int) – max depth of folder to scan, by default its 100, 0 means just scan the root folder
- preserve (bool) – if preserve time-out, by default is False, 30 min time-out if set it as True
- preserveDepth (int) – time-out folder depth. 1-3
- filterKey (string list) – only keep log which match the keys. e.g. [“city”, “location”] will only scan files math the two fields
- filterRegex (string list) – matched value for filterKey, e.g. [“shanghai|beijing|nanjing”, “east”] note, it’s regex value list
- createTime (int) – timestamp of created, only useful when getting data from REST
- modifyTime (int) – timestamp of last modified time, only useful when getting data from REST
- extended_items (dict) – extended items
-
class
aliyun.log.
SimpleFileConfigDetail
(logstoreName, configName, logPath, filePattern, localStorage=None, enableRawLog=None, topicFormat=None, fileEncoding=None, maxDepth=None, preserve=None, preserveDepth=None, filterKey=None, filterRegex=None, **extended_items)[source]¶ The logtail config for simple mode
Parameters: - logstoreName (string) – the logstore name
- configName (string) – the config name
- logPath (string) – folder of log path /apsara/nuwa/
- filePattern (string) – file path, e.g. .log, it will be /apsara/nuwa/…/.log
- localStorage (bool) – if use local cache 1GB when logtail is offline. default is True.
- enableRawLog (bool) – if upload raw data in content, default is False
- topicFormat (string) – “none”, “group_topic” or regex to extract value from file path e.g. “/test/(w+).log” will extract each file as topic, default is “none”
- fileEncoding (string) – “utf8” or “gbk” so far
- maxDepth (int) – max depth of folder to scan, by default its 100, 0 means just scan the root folder
- preserve (bool) – if preserve time-out, by default is False, 30 min time-out if set it as True
- preserveDepth (int) – time-out folder depth. 1-3
- filterKey (string list) – only keep log which match the keys. e.g. [“city”, “location”] will only scan files math the two fields
- filterRegex (string list) – matched value for filterKey, e.g. [“shanghai|beijing|nanjing”, “east”] note, it’s regex value list
- createTime (int) – timestamp of created, only useful when getting data from REST
- modifyTime (int) – timestamp of last modified time, only useful when getting data from REST
- extended_items (dict) – extended items
-
class
aliyun.log.
FullRegFileConfigDetail
(logstoreName, configName, logPath, filePattern, logSample, logBeginRegex=None, regex=None, key=None, timeFormat=None, localStorage=None, enableRawLog=None, topicFormat=None, fileEncoding=None, maxDepth=None, preserve=None, preserveDepth=None, filterKey=None, filterRegex=None, **extended_items)[source]¶ The logtail config for full regex mode
Parameters: - logstoreName (string) – the logstore name
- configName (string) – the config name
- logPath (string) – folder of log path /apsara/nuwa/
- filePattern (string) – file path, e.g. .log, it will be /apsara/nuwa/…/.log
- logSample (string) – log sample. e.g. shanghai|2000|east
- logBeginRegex (string) – regex to match line, None means ‘.*’, just single line mode.
- regex (string) – regex to extract fields form log. None means (.*), just capture whole line
- key (string list) – keys to map the fields like [“city”, “population”, “location”]. None means [“content”]
- timeFormat (string) – whe timeKey is not None, set its format, refer to https://help.aliyun.com/document_detail/28980.html?spm=5176.2020520112.113.4.2243b18eHkxdNB
- localStorage (bool) – if use local cache 1GB when logtail is offline. default is True.
- enableRawLog (bool) – if upload raw data in content, default is False
- topicFormat (string) – “none”, “group_topic” or regex to extract value from file path e.g. “/test/(w+).log” will extract each file as topic, default is “none”
- fileEncoding (string) – “utf8” or “gbk” so far
- maxDepth (int) – max depth of folder to scan, by default its 100, 0 means just scan the root folder
- preserve (bool) – if preserve time-out, by default is False, 30 min time-out if set it as True
- preserveDepth (int) – time-out folder depth. 1-3
- filterKey (string list) – only keep log which match the keys. e.g. [“city”, “location”] will only scan files math the two fields
- filterRegex (string list) – matched value for filterKey, e.g. [“shanghai|beijing|nanjing”, “east”] note, it’s regex value list
- createTime (int) – timestamp of created, only useful when getting data from REST
- modifyTime (int) – timestamp of last modified time, only useful when getting data from REST
- extended_items (dict) – extended items
-
class
aliyun.log.
JsonFileConfigDetail
(logstoreName, configName, logPath, filePattern, timeKey='', timeFormat=None, localStorage=None, enableRawLog=None, topicFormat=None, fileEncoding=None, maxDepth=None, preserve=None, preserveDepth=None, filterKey=None, filterRegex=None, createTime=None, modifyTime=None, **extended_items)[source]¶ The logtail config for json mode
Parameters: - logstoreName (string) – the logstore name
- configName (string) – the config name
- logPath (string) – folder of log path /apsara/nuwa/
- filePattern (string) – file path, e.g. .log, it will be /apsara/nuwa/…/.log
- timeKey (string) – one key name in key to set the time or set it None to use system time.
- timeFormat (string) – whe timeKey is not None, set its format, refer to https://help.aliyun.com/document_detail/28980.html?spm=5176.2020520112.113.4.2243b18eHkxdNB
- localStorage (bool) – if use local cache 1GB when logtail is offline. default is True.
- enableRawLog (bool) – if upload raw data in content, default is False
- topicFormat (string) – “none”, “group_topic” or regex to extract value from file path e.g. “/test/(w+).log” will extract each file as topic, default is “none”
- fileEncoding (string) – “utf8” or “gbk” so far
- maxDepth (int) – max depth of folder to scan, by default its 100, 0 means just scan the root folder
- preserve (bool) – if preserve time-out, by default is False, 30 min time-out if set it as True
- preserveDepth (int) – time-out folder depth. 1-3
- filterKey (string list) – only keep log which match the keys. e.g. [“city”, “location”] will only scan files math the two fields
- filterRegex (string list) – matched value for filterKey, e.g. [“shanghai|beijing|nanjing”, “east”] note, it’s regex value list
- createTime (int) – timestamp of created, only useful when getting data from REST
- modifyTime (int) – timestamp of last modified time, only useful when getting data from REST
- extended_items (dict) – extended items
-
class
aliyun.log.
ApsaraFileConfigDetail
(logstoreName, configName, logPath, filePattern, logBeginRegex, localStorage=None, enableRawLog=None, topicFormat=None, fileEncoding=None, maxDepth=None, preserve=None, preserveDepth=None, filterKey=None, filterRegex=None, createTime=None, modifyTime=None, **extended_items)[source]¶ The logtail config for Apsara mode
Parameters: - logstoreName (string) – the logstore name
- configName (string) – the config name
- logPath (string) – folder of log path /apsara/nuwa/
- filePattern (string) – file path, e.g. .log, it will be /apsara/nuwa/…/.log
- logBeginRegex (string) – regex to match line, None means ‘.*’, just single line mode.
- localStorage (bool) – if use local cache 1GB when logtail is offline. default is True.
- enableRawLog (bool) – if upload raw data in content, default is False
- topicFormat (string) – “none”, “group_topic” or regex to extract value from file path e.g. “/test/(w+).log” will extract each file as topic, default is “none”
- fileEncoding (string) – “utf8” or “gbk” so far
- maxDepth (int) – max depth of folder to scan, by default its 100, 0 means just scan the root folder
- preserve (bool) – if preserve time-out, by default is False, 30 min time-out if set it as True
- preserveDepth (int) – time-out folder depth. 1-3
- filterKey (string list) – only keep log which match the keys. e.g. [“city”, “location”] will only scan files math the two fields
- filterRegex (string list) – matched value for filterKey, e.g. [“shanghai|beijing|nanjing”, “east”] note, it’s regex value list
- createTime (int) – timestamp of created, only useful when getting data from REST
- modifyTime (int) – timestamp of last modified time, only useful when getting data from REST
- extended_items (dict) – extended items
-
class
aliyun.log.
SyslogConfigDetail
(logstoreName, configName, tag, localStorage=None, createTime=None, modifyTime=None, **extended_items)[source]¶ The logtail config for syslog mode
Parameters: - logstoreName (string) – the logstore name
- configName (string) – the config name
- tag (string) – tag for the log captured
- localStorage (bool) – if use local cache 1GB when logtail is offline. default is True.
- createTime (int) – timestamp of created, only useful when getting data from REST
- modifyTime (int) – timestamp of last modified time, only useful when getting data from REST
- extended_items (dict) – extended items
-
class
aliyun.log.
MachineGroupDetail
(group_name=None, machine_type=None, machine_list=None, group_type='', group_attribute=None)[source]¶ The machine group detail info
Parameters: - group_name (string) – group name
- machine_type (string) – “ip” or “userdefined”
- machine_list (string list) – the list of machine ips or machine userdefined, e.g [“127.0.0.1”, “127.0.0.2”]
- group_type (string) – the machine group type, “” or “Armory”
- group_attribute (dict) – the attributes in group, it contains two optional key : 1. “externalName”: only used if the group_type is “Armory”, its the Armory name 2. “groupTopic”: group topic value
-
class
aliyun.log.
PutLogsRequest
(project=None, logstore=None, topic=None, source=None, logitems=None, hashKey=None, compress=True, logtags=None)[source]¶ The request used to send data to log.
Parameters: - project (string) – project name
- logstore (string) – logstore name
- topic (string) – topic name
- source (string) – source of the logs
- logitems (list<LogItem>) – log data
- hashKey (String) – put data with set hash, the data will be send to shard whose range contains the hashKey
- compress (bool) – if need to compress the logs
- logtags (list) – list of key:value tag pair , [(tag_key_1,tag_value_1) , (tag_key_2,tag_value_2)]
-
class
aliyun.log.
ShipperTask
(task_id, task_status, task_message, task_create_time, task_last_data_receive_time, task_finish_time)[source]¶ A shipper task
Parameters: - task_id (string) – the task id
- task_status (string) – one of [‘success’, ‘running’, ‘fail’]
- task_message (string) – the error message of task_status is ‘fail’
- task_create_time (int) – the task create time (timestamp from 1970.1.1)
- task_last_data_receive_time (int) – last log data receive time (timestamp)
- task_finish_time (int) – the task finish time (timestamp)
-
class
aliyun.log.
LogResponse
(headers, body='')[source]¶ The base response class of all log response.
Parameters: headers (dict) – HTTP response header
-
class
aliyun.log.
GetLogsResponse
(resp, header)[source]¶ The response of the GetLog API from log.
Parameters: - resp (dict) – GetLogsResponse HTTP response body
- header (dict) – GetLogsResponse HTTP response header
-
class
aliyun.log.
ListLogstoresResponse
(resp, header)[source]¶ The response of the ListLogstores API from log.
Parameters: - resp (dict) – ListLogstoresResponse HTTP response body
- header (dict) – ListLogstoresResponse HTTP response header
-
class
aliyun.log.
ListTopicsResponse
(resp, header)[source]¶ The response of the ListTopic API from log.
Parameters: - resp (dict) – ListTopicsResponse HTTP response body
- header (dict) – ListTopicsResponse HTTP response header
-
get_count
()[source]¶ Get the number of all the topics from the response
Returns: int, the number of all the topics from the response
-
class
aliyun.log.
GetCursorResponse
(resp, header)[source]¶ The response of the get_cursor API from log.
Parameters: - header (dict) – ListShardResponse HTTP response header
- resp (dict) – the HTTP response body
-
class
aliyun.log.
GetCursorTimeResponse
(resp, header)[source]¶ The response of the get_cursor_time API from log.
Parameters: - header (dict) – GetCursorTimeResponse HTTP response header
- resp (dict) – the HTTP response body
-
class
aliyun.log.
ListShardResponse
(resp, header)[source]¶ The response of the list_shard API from log.
Parameters: - header (dict) – ListShardResponse HTTP response header
- resp (dict) – the HTTP response body
-
class
aliyun.log.
DeleteShardResponse
(header, resp='')[source]¶ The response of the create_logstore API from log.
Parameters: header (dict) – DeleteShardResponse HTTP response header
-
class
aliyun.log.
GetHistogramsResponse
(resp, header)[source]¶ The response of the GetHistograms API from log.
Parameters: - resp (dict) – GetHistogramsResponse HTTP response body
- header (dict) – GetHistogramsResponse HTTP response header
-
get_histograms
()[source]¶ Get histograms on the requested time range: [from, to)
Returns: Histogram list, histograms on the requested time range: [from, to)
-
class
aliyun.log.
Histogram
(fromTime, toTime, count, progress)[source]¶ The class used to present the result of log histogram status. For every log histogram, it contains : from/to time range, hit log count and query completed status.
Parameters: - fromTime (int) – the begin time
- toTime (int) – the end time
- count (int) – log count of histogram that query hits
- progress (string) – histogram query status(Complete or InComplete)
-
class
aliyun.log.
GetLogsResponse
(resp, header)[source] The response of the GetLog API from log.
Parameters: - resp (dict) – GetLogsResponse HTTP response body
- header (dict) – GetLogsResponse HTTP response header
-
get_count
()[source] Get log number from the response
Returns: int, log number
-
get_logs
()[source] Get all logs from the response
Returns: QueriedLog list, all log data
-
is_completed
()[source] Check if the get logs query is completed
Returns: bool, true if this logs query is completed
-
class
aliyun.log.
QueriedLog
(timestamp, source, contents)[source]¶ The QueriedLog is a log of the GetLogsResponse which obtained from the log.
Parameters: - timestamp (int) – log timestamp
- source (string) – log source
- contents (dict) – log contents, content many key/value pair
-
class
aliyun.log.
PullLogResponse
(resp, header)[source]¶ The response of the pull_logs API from log.
Parameters: - header (dict) – PullLogResponse HTTP response header
- resp (string) – the HTTP response body
-
class
aliyun.log.
CreateIndexResponse
(header, resp='')[source]¶ The response of the create_index API from log.
Parameters: header (dict) – CreateIndexResponse HTTP response header
-
class
aliyun.log.
UpdateIndexResponse
(header, resp='')[source]¶ The response of the update_index API from log.
Parameters: header (dict) – UpdateIndexResponse HTTP response header
-
class
aliyun.log.
DeleteIndexResponse
(header, resp='')[source]¶ The response of the delete_index API from log.
Parameters: header (dict) – DeleteIndexResponse HTTP response header
-
class
aliyun.log.
GetIndexResponse
(resp, header)[source]¶ The response of the get_index_config API from log.
Parameters: - header (dict) – GetIndexResponse HTTP response header
- resp (dict) – the HTTP response body
-
class
aliyun.log.
CreateLogtailConfigResponse
(header, resp='')[source]¶ The response of the create_logtail_config API from log.
Parameters: header (dict) – CreateLogtailConfigResponse HTTP response header
-
class
aliyun.log.
DeleteLogtailConfigResponse
(header, resp='')[source]¶ The response of the delete_logtail_config API from log.
Parameters: header (dict) – DeleteLogtailConfigResponse HTTP response header
-
class
aliyun.log.
GetLogtailConfigResponse
(resp, header)[source]¶ The response of the get_logtail_config API from log.
Parameters: - header (dict) – GetLogtailConfigResponse HTTP response header
- resp (dict) – the HTTP response body
-
class
aliyun.log.
UpdateLogtailConfigResponse
(header, resp='')[source]¶ The response of the update_logtail_config API from log.
Parameters: header (dict) – UpdateLogtailConfigResponse HTTP response header
-
class
aliyun.log.
ListLogtailConfigResponse
(resp, header)[source]¶ The response of the list_logtail_config API from log.
Parameters: - header (dict) – ListLogtailConfigResponse HTTP response header
- resp (dict) – the HTTP response body
-
class
aliyun.log.
CreateMachineGroupResponse
(header, resp='')[source]¶ The response of the create_machine_group API from log.
Parameters: header (dict) – CreateMachineGroupResponse HTTP response header
-
class
aliyun.log.
DeleteMachineGroupResponse
(header, resp='')[source]¶ The response of the delete_machine_group API from log.
Parameters: header (dict) – DeleteMachineGroupResponse HTTP response header
-
class
aliyun.log.
GetMachineGroupResponse
(resp, header)[source]¶ The response of the get_machine_group API from log.
Parameters: - header (dict) – GetMachineGroupResponse HTTP response header
- resp (dict) – the HTTP response body
-
class
aliyun.log.
UpdateMachineGroupResponse
(header, resp='')[source]¶ The response of the update_machine_group API from log.
Parameters: header (dict) – UpdateMachineGroupResponse HTTP response header
-
class
aliyun.log.
ListMachineGroupResponse
(resp, header)[source]¶ The response of the list_machine_group API from log.
Parameters: - header (dict) – ListMachineGroupResponse HTTP response header
- resp (dict) – the HTTP response body
-
class
aliyun.log.
ListMachinesResponse
(resp, header)[source]¶ The response of the list_machines API from log.
Parameters: - header (dict) – ListMachinesResponse HTTP response header
- resp (dict) – the HTTP response body
-
class
aliyun.log.
ApplyConfigToMachineGroupResponse
(header, resp='')[source]¶ The response of the apply_config_to_machine_group API from log.
Parameters: header (dict) – ApplyConfigToMachineGroupResponse HTTP response header
-
class
aliyun.log.
RemoveConfigToMachineGroupResponse
(header, resp='')[source]¶ The response of the remove_config_to_machine_group API from log.
Parameters: header (dict) – RemoveConfigToMachineGroupResponse HTTP response header
-
class
aliyun.log.
GetMachineGroupAppliedConfigResponse
(resp, header)[source]¶ The response of the get_machine_group_applied_config API from log.
Parameters: - header (dict) – GetMachineGroupAppliedConfigResponse HTTP response header
- resp (dict) – the HTTP response body
-
class
aliyun.log.
GetConfigAppliedMachineGroupsResponse
(resp, header)[source]¶ The response of the get_config_applied_machine_group API from log.
Parameters: - header (dict) – GetConfigAppliedMachineGroupsResponse HTTP response header
- resp (dict) – the HTTP response body
-
class
aliyun.log.
ConsumerGroupCheckPointResponse
(resp, headers)[source]¶
-
class
aliyun.log.
ListEntityResponse
(header, resp, resource_name=None, entities_key=None)[source]¶
-
class
aliyun.log.
SimpleLogHandler
(end_point, access_key_id, access_key, project, log_store, topic=None, fields=None, buildin_fields_prefix=None, buildin_fields_suffix=None, extract_json=None, extract_json_drop_message=None, extract_json_prefix=None, extract_json_suffix=None, extract_kv=None, extract_kv_drop_message=None, extract_kv_prefix=None, extract_kv_suffix=None, extract_kv_sep=None, extra=None, **kwargs)[source]¶ SimpleLogHandler, blocked sending any logs, just for simple test purpose
Parameters: - end_point – log service endpoint
- access_key_id – access key id
- access_key – access key
- project – project name
- log_store – logstore name
- topic – topic, by default is empty
- fields – list of LogFields or list of names of LogFields, default is LogFields.record_name, LogFields.level, LogFields.func_name, LogFields.module, LogFields.file_path, LogFields.line_no, LogFields.process_id, LogFields.process_name, LogFields.thread_id, LogFields.thread_name, you could also just use he string name like ‘thread_name’, it’s also possible customize extra fields in this list by disable extra fields and put white list here.
- buildin_fields_prefix – prefix of builtin fields, default is empty. suggest using “__” when extract json is True to prevent conflict.
- buildin_fields_suffix – suffix of builtin fields, default is empty. suggest using “__” when extract json is True to prevent conflict.
- extract_json – if extract json automatically, default is False
- extract_json_drop_message – if drop message fields if it’s JSON and extract_json is True, default is False
- extract_json_prefix – prefix of fields extracted from json when extract_json is True. default is “”
- extract_json_suffix – suffix of fields extracted from json when extract_json is True. default is empty
- extract_kv – if extract kv like k1=v1 k2=”v 2” automatically, default is False
- extract_kv_drop_message – if drop message fields if it’s kv and extract_kv is True, default is False
- extract_kv_prefix – prefix of fields extracted from KV when extract_json is True. default is “”
- extract_kv_suffix – suffix of fields extracted from KV when extract_json is True. default is “”
- extract_kv_sep – separator for KV case, defualt is ‘=’, e.g. k1=v1
- extra – if show extra info, default True to show all. default is True. Note: the extra field will also be handled with buildin_fields_prefix/suffix
- kwargs – other parameters passed to logging.Handler
-
class
aliyun.log.
QueuedLogHandler
(end_point, access_key_id, access_key, project, log_store, topic=None, fields=None, queue_size=None, put_wait=None, close_wait=None, batch_size=None, buildin_fields_prefix=None, buildin_fields_suffix=None, extract_json=None, extract_json_drop_message=None, extract_json_prefix=None, extract_json_suffix=None, extract_kv=None, extract_kv_drop_message=None, extract_kv_prefix=None, extract_kv_suffix=None, extract_kv_sep=None, extra=None, **kwargs)[source]¶ Queued Log Handler, tuned async log handler.
Parameters: - end_point – log service endpoint
- access_key_id – access key id
- access_key – access key
- project – project name
- log_store – logstore name
- topic – topic, default is empty
- fields – list of LogFields, default is LogFields.record_name, LogFields.level, LogFields.func_name, LogFields.module, LogFields.file_path, LogFields.line_no, LogFields.process_id, LogFields.process_name, LogFields.thread_id, LogFields.thread_name
- queue_size – queue size, default is 40960 logs, about 10MB ~ 40MB
- put_wait – maximum delay to send the logs, by default 2 seconds and wait double time for when Queue is full.
- close_wait – when program exit, it will try to send all logs in queue in this timeperiod, by default 5 seconds
- batch_size – merge this cound of logs and send them batch, by default min(1024, queue_size)
- buildin_fields_prefix – prefix of builtin fields, default is empty. suggest using “__” when extract json is True to prevent conflict.
- buildin_fields_suffix – suffix of builtin fields, default is empty. suggest using “__” when extract json is True to prevent conflict.
- extract_json – if extract json automatically, default is False
- extract_json_drop_message – if drop message fields if it’s JSON and extract_json is True, default is False
- extract_json_prefix – prefix of fields extracted from json when extract_json is True. default is “”
- extract_json_suffix – suffix of fields extracted from json when extract_json is True. default is empty
- extract_kv – if extract kv like k1=v1 k2=”v 2” automatically, default is False
- extract_kv_drop_message – if drop message fields if it’s kv and extract_kv is True, default is False
- extract_kv_prefix – prefix of fields extracted from KV when extract_json is True. default is “”
- extract_kv_suffix – suffix of fields extracted from KV when extract_json is True. default is “”
- extract_kv_sep – separator for KV case, defualt is ‘=’, e.g. k1=v1
- extra – if show extra info, default True to show all. default is True
- kwargs – other parameters passed to logging.Handler
-
class
aliyun.log.
UwsgiQueuedLogHandler
(*args, **kwargs)[source]¶ Queued Log Handler for Uwsgi, depends on library uwsgidecorators, need to deploy it separatedly.
Parameters: - end_point – log service endpoint
- access_key_id – access key id
- access_key – access key
- project – project name
- log_store – logstore name
- topic – topic, default is empty
- fields – list of LogFields, default is LogFields.record_name, LogFields.level, LogFields.func_name, LogFields.module, LogFields.file_path, LogFields.line_no, LogFields.process_id, LogFields.process_name, LogFields.thread_id, LogFields.thread_name
- queue_size – queue size, default is 40960 logs, about 10MB ~ 40MB
- put_wait – maximum delay to send the logs, by default 2 seconds and wait double time for when Queue is full.
- close_wait – when program exit, it will try to send all logs in queue in this timeperiod, by default 2 seconds
- batch_size – merge this cound of logs and send them batch, by default min(1024, queue_size)
- buildin_fields_prefix – prefix of builtin fields, default is empty. suggest using “__” when extract json is True to prevent conflict.
- buildin_fields_suffix – suffix of builtin fields, default is empty. suggest using “__” when extract json is True to prevent conflict.
- extract_json – if extract json automatically, default is False
- extract_json_drop_message – if drop message fields if it’s JSON and extract_json is True, default is False
- extract_json_prefix – prefix of fields extracted from json when extract_json is True. default is “”
- extract_json_suffix – suffix of fields extracted from json when extract_json is True. default is empty
- extract_kv – if extract kv like k1=v1 k2=”v 2” automatically, default is False
- extract_kv_drop_message – if drop message fields if it’s kv and extract_kv is True, default is False
- extract_kv_prefix – prefix of fields extracted from KV when extract_json is True. default is “”
- extract_kv_suffix – suffix of fields extracted from KV when extract_json is True. default is “”
- extract_kv_sep – separator for KV case, defualt is ‘=’, e.g. k1=v1
- extra – if show extra info, default True to show all. default is True
- kwargs – other parameters passed to logging.Handler
-
class
aliyun.log.
LogFields
[source]¶ fields used to upload automatically Possible fields: record_name, level, func_name, module, file_path, line_no, process_id, process_name, thread_id, thread_name
-
class
aliyun.log.es_migration.
MigrationManager
(hosts=None, indexes=None, query=None, scroll='5m', endpoint=None, project_name=None, access_key_id=None, access_key=None, logstore_index_mappings=None, pool_size=10, time_reference=None, source=None, topic=None, wait_time_in_secs=60, auto_creation=True)[source]¶ MigrationManager, migrate data from elasticsearch to aliyun log service
Parameters: - hosts (string) – a comma-separated list of source ES nodes. e.g. “localhost:9200,other_host:9200”
- indexes (string) – a comma-separated list of source index names. e.g. “index1,index2”
- query (string) – used to filter docs, so that you can specify the docs you want to migrate. e.g. ‘{“query”: {“match”: {“title”: “python”}}}’
- scroll (string) – specify how long a consistent view of the index should be maintained for scrolled search. e.g. “5m”
- endpoint (string) – specify the endpoint of your log services. e.g. “cn-beijing.log.aliyuncs.com”
- project_name (string) – specify the project_name of your log services. e.g. “your_project”
- access_key_id (string) – specify the access_key_id of your account.
- access_key (string) – specify the access_key of your account.
- logstore_index_mappings (string) – specify the mappings of log service logstore and ES index. e.g. ‘{“logstore1”: “my_index*”, “logstore2”: “index1,index2”}, “logstore3”: “index3”}’
- pool_size (int) – specify the size of process pool. e.g. 10
- time_reference (string) – specify what ES doc’s field to use as log’s time field. e.g. “field1”
- source (string) – specify the value of log’s source field. e.g. “your_source”
- topic (string) – specify the value of log’s topic field. e.g. “your_topic”
- wait_time_in_secs (int) – specify the waiting time between initialize aliyun log and executing data migration task. e.g. 60
- auto_creation (bool) – specify whether to let the tool create logstore and index automatically for you. e.g. True