手动添加解析太过于麻烦,于是就写了一个python脚本来优化添加解析的流程。这里使用的是dnspod作为域名解析商,调用的其API。也算个dnspod的token利用工具吧。
环境:python 3.7
使用方法
- 修改第15行的LOGIN_TOKEN , API Token 生成方法详见:https://support.dnspod.cn/Kb/showarticle/tsid/227/ ,完整的 API Token 是由 ID,Token 组合而成的,用英文的逗号分割.
- python dnspod.py运行,Ctrl+Z退出。或python -c “from dnspod import *;command()” 执行单条命令
函数介绍:
- login()
尝试登陆,判断token有效性。返回值0表示登录成功。 - user_log(show = True)
获取用户日志,如添加域名,删除域名等操作记录。当show为True时整理并展示结果,False表示只返回字典,默认显示,含义下同。 - domain_log(domain, show = True)
获取域名日志,如添加A记录,删除MX记录等操作。domain为域名值,如domain = “for-get.com” ,所有domain参数都是必填,含义下同。 - domains(show = True)
获取域名列表,即查询用户账户下所有域名。 - records(domain, sub_domain = “”, record_type = “”, show = True)
查询域名下记录值,sub_domain为二级域名,如sub_domian = “@”表示根域名,含义下同。record_type表示记录类型,如record_type = “A” 表示A记录(不区分大小写)。当show为True时整理并展示结果。
Eg. records(“for-get.com”,”@”,”A”) 表示仅查询for-get.com下所有A记录
records(“for-get.com”,”@”) 表示查询for-get.com下所有类型记录
records(“for-get.com”,””,”A”) 表示查询for-get.com下所有A记录(包括所有子域名)。
records(“for-get.com”)表示查询for-get.com下所有记录。 - record(domain, record_id, show = True)
获取记录信息。record_id即记录ID,表示域名下的一条记录。 - create(domain, sub_domain, record_type, value, mx = “”, record_line_id = “0”)
创建记录。mx表示mx优先级,当且仅当类型为MX的时候有效。record_line_id表示解析线路,0为默认,其他线路请参考官方说明文档。无返回,但创建成功时会展示record id。
Eg. create(“for-get.com”,”sub”,”A”,”127.0.0.1”)表示创建sub.for-get.com的一条A记录,值为127.0.0.1。 - modify(domain, record_id, sub_domain, record_type, value, mx = “”, record_line_id = “0”)
修改记录。record id定位,即修改record_id对应的记录。其余参数作用与create参数完全相同。
Eg. modify(“for-get.com”,”233”,”sub”,”A”,”127.0.0.1”) 表示将for-get.com下编号为233的记录修改为sub.for-get.com的一条A记录,值为127.0.0.1 - remove(domain, record_id):
删除记录。 - batch_remove(domain, record_id):
批量删除记录,此时record_id为列表或元组,或不含空格、逗号隔开的字符串,如“233,666,777”,可以接受get_record_id()的返回值。
Eg. batch_remove(“for-get.com”, “233,666,777”)表示删除for-get.com下记录id为233,666和777的记录。 - ddns(domain, record_id = “”, sub_domain = “”, record_line_id = “0”)
更新动态DNS记录。Record_id和sub_domain二选一。推荐填写record_id。该命令只能动态更新本机公网ipv4地址到DNS,如果本地没有分配公网ip。此命令没有太多用途。 当仅填写sub_domain时,如果解析中含有多条A记录,默认修改第一条。命令含有检查本地ip。
Eg. ddns(“for-get.com”, “233”) 更新for-get.com下233记录为A记录,记录为本地公网ip。
ddns(“for-get.com”, “”,”sub”) 更新sub.for-get.com下的一条A记录为本地公网IP。
ddns(“for-get.com”, “233”,”sub”) 更新for-get.com下233记录为A记录,二级域名为sub,记录为本地公网IP。 - get_record_id(domain, sub_domain = “”, record_type = “”, value = “”,show = True)
查询记录的记录ID。 相当于records()命令,不过返回含有id的列表,且可以单独查询记录值。返回列表第一个元素即[0]为记录id的数量。(忘记了len可以判断数量了23333)。另外填写value,且类型为NS,MX,CNAME等时,需要注意结尾有点(.),如“for-get.com.”(PS创建时不需要刻意添加,会自动生成,但是查询时需要)。 - get_ipv4()
获取ipv4地址,返回ipv4地址。调用ip.sb查询。 - get_ipv6()
获取ipv6地址,返回ipv6地址。调用ip.sb查询。 - info()
展示用户下所有域名,及所有域名的记录。 - ipinfo(show = True)
展示ip信息,调用get_ipv4()、get_ipv6(),返回字典。
返回示例 {“ipv4”:”127.0.0.1”,“ipv6”:”::1”} - command(s = “”):
命令模式。
命令模式
使用command()函数。即python dnspod.py的程序界面(类似于CLI shell)。原理是将字符串使用split()拆分,通过函数映射执行命令。具体方法1、建立function_mappings字典,键对应函数名。2、拆分字符串s = s.split() 3、执行命令function_mappings[s[0]](*s[1 : ]) 。
command() 使输入变得简单。在本脚本中,如果遇到留空参数,需要使用“?“(不含双引号)代替。
支持的命令(全小写)
- login
即login() - userlog
即user_log(),别名user_log - domainlog
即domain_log(),别名domain_log - domains
即domains() - records
即records(),别名find
Eg. find for-get.com
find for-get.com @
find for-get.com ? A
…… - record
即record(),别名id
Eg. record for-get.com 2333 - create
即create(),别名add
Eg. add for-get.com sub A 127.0.0.1 - modify
即modify(),别名change
Eg. modify for-get.com 2333 @ A 127.0.0.1 - remove
即remove(),别名del,delete
Eg. del for-get.com 2333 - batch_remove
即batch_remove(),别名batchremove, batchdel, batch_del
Eg. batchremove for-get.com 233,666,777 - ddns
即ddns()
Eg. ddns for-get.com 233
ddns for-get.com ? sub - getid
即get_record_id(),别名get_record_id, getrecordid
Eg. getid for-get.com @
getid for-get.com ? A
getid for-get.com ? ? 127.0.0.1
…… - info
即info(),别名show - ip
即ipinfo(),别名getip,ipinfo
后记
- 异常情况尝试删除目录下“__pycache__”如果没有请掠过。
- print有点多,自行删除优化。
- dns简介https://wp.for-get.com/202.html
源码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#Author: for-get
#Site: https://blog.for-get.com
import re
import requests
import os
import time
import sys
try:
import json
except Exception:
import simplejson as json
global LOGIN_TOKEN
LOGIN_TOKEN = "123456,aabbccddeeffaabbccddeeffaabbccddeeff"
def login():
print(time.strftime("[%Y/%m/%d %H:%M:%S]\tlogin()"))
print("Login token: " + LOGIN_TOKEN)
api = "https://dnsapi.cn/User.Detail"
data = {
"login_token" : LOGIN_TOKEN,
"format": "json"
}
try:
r = requests.post(api, data, timeout=5)
except:
print("Network Error!")
exit(-1)
if int(r.json()["status"]["code"]) is 1:
print(time.strftime("[%Y/%m/%d %H:%M:%S]\tLogin Success."))
else:
print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
exit(-1)
def user_log(show = True):
print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"user_log({show})")
api = "https://dnsapi.cn/User.Log"
data = {
"login_token" : LOGIN_TOKEN,
"format": "json",
}
r = requests.post(api, data, timeout=5)
if int(r.json()["status"]["code"]) is not 1:
print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
elif show:
for i in range(1, len(r.json()["log"]) + 1):
print(r.json()["log"][len(r.json()["log"]) - i])
return r.json()["log"]
def domain_log(domain, show = True):
print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"user_log(\"{domain}\", {show})")
api = "https://dnsapi.cn/Domain.Log"
data = {
"login_token" : LOGIN_TOKEN,
"domain" : domain,
"offset" : 0,
"length" : 3000,
"format": "json",
}
r = requests.post(api, data, timeout=5)
if int(r.json()["status"]["code"]) is not 1:
print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
elif show:
for i in range(1, len(r.json()["log"]) + 1):
print(r.json()["log"][len(r.json()["log"]) - i])
return r.json()["log"]
def domains(show = True):
print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"domains({show})")
api = "https://dnsapi.cn/Domain.List"
data = {
"login_token" : LOGIN_TOKEN,
"format": "json",
}
r = requests.post(api, data, timeout=5)
if int(r.json()["status"]["code"]) is not 1:
print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
elif show:
print("Domain_ID\tDomain")
for i in range(0, int(r.json()["info"]["domain_total"])):
print(str(r.json()["domains"][i]["id"]) + "\t" + r.json()["domains"][i]["name"])
return r.json()
def records(domain, sub_domain = "", record_type = "", show = True):
print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"records(\"{domain}\", \"{sub_domain}\", \"{record_type}\", {show})")
api = "https://dnsapi.cn/Record.List"
record_type = record_type.upper()
sub_domain = sub_domain.lower()
data = {
"login_token" : LOGIN_TOKEN,
"format": "json",
"domain" : domain,
"offset" : 0,
"length" : 3000,
"sub_domain" : sub_domain,
"record_type" : record_type
}
r = requests.post(api, data, timeout=5)
if int(r.json()["status"]["code"]) is not 1:
print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
elif show:
print("Record_ID\tName\tType\tValue")
for i in range(0, int(r.json()["info"]["records_num"])):
if str(r.json()["records"][i]["type"]) == "MX":
print(str(r.json()["records"][i]["id"]) + "\t" + r.json()["records"][i]["name"] + "\t" + "MX: " + r.json()["records"][i]["mx"] + "\t" + r.json()["records"][i]["value"])
else:
print(str(r.json()["records"][i]["id"]) + "\t" + r.json()["records"][i]["name"] + "\t" + r.json()["records"][i]["type"] + "\t" + r.json()["records"][i]["value"])
return r.json()
def record(domain, record_id, show = True):
print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"record(\"{domain}\", \"{record_id}\", {show})")
api = "https://dnsapi.cn/Record.Info"
record_id = str(record_id)
data = {
"login_token" : LOGIN_TOKEN,
"format": "json",
"domain" : domain,
"record_id" : record_id
}
r = requests.post(api, data, timeout=5)
if int(r.json()["status"]["code"]) is not 1:
print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
elif show:
if str(r.json()["record"]["record_type"]) == "MX":
print("Record_ID\t" + str(r.json()["record"]["id"]))
print("Sub_Domain\t" + r.json()["record"]["sub_domain"])
print("MX\t" + r.json()["record"]["mx"])
print("Value\t" + r.json()["record"]["value"])
else:
print("Record_ID\t" + str(r.json()["record"]["id"]))
print("Sub_Domain\t" + r.json()["record"]["sub_domain"])
print("Type\t" + r.json()["record"]["record_type"])
print("Value\t" + r.json()["record"]["value"])
return r.json()
def create(domain, sub_domain, record_type, value, mx = "", record_line_id = "0"):
print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"create(\"{domain}\", \"{sub_domain}\", \"{record_type}\", \"{value}\", \"{mx}\", \"{record_line_id}\")")
api = "https://dnsapi.cn/Record.Create"
record_type = record_type.upper()
sub_domain = sub_domain.lower()
data = {
"login_token" : LOGIN_TOKEN,
"format": "json",
"domain" : domain,
"sub_domain" : sub_domain,
"record_type" : record_type,
"record_line_id" : record_line_id, #10=0,Telecom;10=1,Unicom
"value" : value,
"mx" : mx # mx = [1,20], only works in mx, 0
}
r = requests.post(api, data, timeout=5)
if int(r.json()["status"]["code"]) is 1:
print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + "Create completed successful, Record id: " + r.json()["record"]["id"])
else:
print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
def modify(domain, record_id, sub_domain, record_type, value, mx = "", record_line_id = "0"):
print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"modify(\"{domain}\", \"{record_id}\", \"{sub_domain}\", \"{record_type}\", \"{value}\", \"{mx}\", \"{record_line_id}\")")
value = value.lower()
record_type = record_type.upper()
sub_domain = sub_domain.lower()
api = "https://dnsapi.cn/Record.Modify"
data = {
"login_token" : LOGIN_TOKEN,
"format": "json",
"domain" : domain,
"record_id" : record_id,
"sub_domain" : sub_domain,
"record_type" : record_type,
"record_line_id" : record_line_id, #10=0,Telecom;10=1,Unicom
"value" : value,
"mx" : mx # mx = [1,20], only works in mx
}
r = requests.post(api, data, timeout=5)
if int(r.json()["status"]["code"]) is 1:
print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + "Modify completed successful")
else:
print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
def remove(domain, record_id):
print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"remove(\"{domain}\", \"{record_id}\")")
api = "https://dnsapi.cn/Record.Remove"
record_id = int(record_id)
data = {
"login_token" : LOGIN_TOKEN,
"format": "json",
"domain" : domain,
"record_id" : record_id,
}
r = requests.post(api, data, timeout=5)
if int(r.json()["status"]["code"]) is 1:
print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + "Remove completed successful")
else:
print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
def batch_remove(domain, record_id):
if type(record_id) is str:
record_id = record_id.split(',')
if len(record_id) == int(record_id[0]) + 1:
for i in range(0, int(record_id[0])):
remove(domain, int(record_id[i + 1]))
else:
for i in range(0, len(record_id)):
remove(domain, int(record_id[i]))
print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + "Batch_remove completed successful")
def ddns(domain, record_id = "", sub_domain = "", record_line_id = "0"):
print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"ddns(\"{domain}\", \"{record_id}\", \"{sub_domain}\", \"{record_line_id}\")")
if (sub_domain == "") and (record_id==""):
print("Too few parameters!")
return 0
old_ip = ""
new_ip = ""
if sub_domain == "":
try:
tamp = record(domain, record_id, False)
sub_domain = tamp["record"]["sub_domain"]
old_ip = tamp["record"]["value"]
except:
print("Record ID Error!")
return 0
if record_id == "":
record_id = get_record_id(domain, sub_domain, "A", "", False)
if record_id[0] == 0:
print("No sub_domain in A record, Please create first.")
return 0
else:
record_id = record_id[1]
###IP Check###
try:
if old_ip == "":
old_ip = record(domain, record_id, False)["record"]["value"]
except:
print("Network Error or Record ID Error")
return 0
try:
new_ip = get_ipv4().decode(encoding="utf-8",errors="strict")
if str(old_ip) == str(new_ip):
print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + "No Change!")
return 0
except:
print("Get ip timeout! Can't Check Change!")
##############
api = "https://dnsapi.cn/Record.Ddns"
record_id = int(record_id)
sub_domain = sub_domain.lower()
data = {
"login_token" : LOGIN_TOKEN,
"format": "json",
"domain" : domain,
"record_id" : record_id,
"sub_domain" : sub_domain,
"record_line_id" : record_line_id, #10=0,Telecom;10=1,Unicom
}
r = requests.post(api, data, timeout=5)
if int(r.json()["status"]["code"]) is 1:
print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + "Ddns completed successful")
else:
print("Error:", r.json()["status"]["code"], r.json()["status"]["message"])
def get_record_id(domain, sub_domain = "", record_type = "", value = "",show = True):
print(time.strftime("[%Y/%m/%d %H:%M:%S]\t") + f"get_record_id(\"{domain}\", \"{sub_domain}\", \"{record_type}\", \"{value}\", {show})")
value = value.lower()
r = records(domain, sub_domain, record_type, False)
id = []
try:
id.append(int(r["info"]["records_num"]))
except:
id.append(0)
records_num = int(id[0])
if records_num is 0:
print("Record Not Found, Maybe sub_domain Error!")
return id
elif value != "":
id[0] = 0
for i in range(0, records_num):
if value == str(r["records"][i]["value"]):
id.append(int(r["records"][i]["id"]))
sum = len(id)
id[0] = sum - 1
#if show == True:
# record(domain, id[sum - 1]) #record detail
if id[0] == 0:
print("Value Error!")
elif show == True:
print("Records_NUM\t" + str(id[0]))
print("ID\t" + ''.join(str(id[1:]).split())[1:-1])
return id
else:
for i in range(0, records_num):
id.append(int(r["records"][i]["id"]))
#if show == True:
# record(domain, id[i + 1]) #record detail
if show == True:
print("Records_NUM\t" + str(id[0]))
print("ID\t" + ''.join(str(id[1:]).split())[1:-1])
return id
def get_ipv4():
api = "https://api-ipv4.ip.sb/ip"
try:
r = requests.get(api, timeout=5)
return r.content.strip()
except:
r = "Error"
return r.encode('utf-8')
def get_ipv6():
api = "https://api-ipv6.ip.sb/ip"
try:
r = requests.get(api, timeout=5)
return r.content.strip()
except:
r = "Error"
return r.encode('utf-8')
def info():
print(time.strftime("[%Y/%m/%d %H:%M:%S]\tShow all domains information..."))
domainslist = domains()
for i in range(0, int(domainslist["info"]["domain_total"])):
domain = domainslist["domains"][i]["punycode"]
records(domain)
def ipinfo(show = True):
print(time.strftime("[%Y/%m/%d %H:%M:%S]\tTry to get IP..."))
ipv4 = get_ipv4().decode(encoding="utf-8",errors="strict")
ipv6 = get_ipv6().decode(encoding="utf-8",errors="strict")
ipaddr = { "ipv4" : ipv4, "ipv6" : ipv6 }
if show:
print("Your IPv4 Address:\t", ipv4)
print("Your IPv6 Address:\t", ipv6)
return ipaddr
def command(s = ""):
function_mappings = {
'login': login,
'userlog': user_log,
'user_log': user_log,
'domain_log': domain_log,
'domainlog': domain_log,
'domains': domains,
'records': records,
'find': records,
'record': record,
'id': record,
'create': create,
'add': create,
'modify': modify,
'change': modify,
'remove': remove,
'batch_remove': batch_remove,
'batchremove': batch_remove,
'batchdel': batch_remove,
'batch_del': batch_remove,
'del': remove,
'delete': remove,
'ddns': ddns,
'getid' : get_record_id,
'getrecordid' : get_record_id,
'get_record_id' : get_record_id,
'info': info,
'show': info,
'ipinfo': ipinfo,
'getip': ipinfo,
'ip': ipinfo
}
if s == "":
s = input(">>> ")
s = s.lower()
s = s.split()
keyword = s[0]
arg = len(s) - 1
for i in range(0, len(s)):
if s[i] == "?":
s[i] = ""
try:
if arg <= int(function_mappings[keyword].__code__.co_argcount):
function_mappings[keyword](*s[1 : arg + 1])
return 0
else:
print("Too Many Arguments!")
return -1
except KeyError:
print("Invalid function, try again.")
return -1
except:
print("Too Few Parameters or Timeout or Unknown Error!")
return -1
if __name__ == "__main__":
print("DNSPOD SAMPLE CLIENT\n")
try:
login()
print("\nDnspod Sample Shell")
print("Input Ctrl + Z to exit!")
while True:
command()
except:
print("Error!")
exit(-1)