# 主要目的
防止镜像站或在传输的途中有人“加料”
# 检测脚本(DeepSeek 生成)
import docker
import requests
from tqdm import tqdm
# ============== 用户配置区域 ==============
PROXIES = { # 设置代理(若无代理则置空)
"http": "http://your-proxy:8080",
"https": "http://your-proxy:8080",
}
# ========================================
def parse_repository(repo):
parts = repo.split('/')
if not parts:
return None, None, None
# 检查是否是registry(包含.或:)
if '.' in parts[0] or ':' in parts[0]:
registry = parts[0]
remaining = parts[1:]
else:
registry = 'docker.io'
remaining = parts
# 处理剩余部分
if len(remaining) == 0:
return registry, None, None
elif len(remaining) == 1:
namespace = 'library'
image = remaining[0]
else:
namespace = '/'.join(remaining[:-1])
image = remaining[-1]
return registry, namespace, image
def get_auth_token(repository):
auth_url = f"https://auth.docker.io/token?service=registry.docker.io&scope=repository:{repository}:pull"
try:
response = requests.get(auth_url, proxies=PROXIES)
response.raise_for_status()
return response.json().get('token')
except requests.exceptions.RequestException as e:
print(f"获取认证token失败:{e}")
return None
def get_hub_digest(repository, tag):
token = get_auth_token(repository)
if not token:
return None
headers = {
'Authorization': f'Bearer {token}',
'Accept': 'application/vnd.docker.distribution.manifest.v2+json'
}
url = f"https://registry-1.docker.io/v2/{repository}/manifests/{tag}"
try:
response = requests.head(url, headers=headers, proxies=PROXIES)
response.raise_for_status()
return response.headers.get('Docker-Content-Digest')
except requests.exceptions.RequestException as e:
print(f"获取Docker Hub上的manifest失败:{e}")
return None
def main():
client = docker.from_env()
images = client.images.list()
result_pass = []
result_warn = []
# 使用tqdm添加进度条
with tqdm(total=len(images), desc="扫描镜像", unit="image") as pbar:
for image in images:
repoTags = image.attrs.get('RepoTags', [])
repoDigests = image.attrs.get('RepoDigests', [])
for repo_tag in repoTags:
# 更新进度条描述
pbar.set_postfix_str(f"正在检查:{repo_tag[:30]}")
if '@' in repo_tag:
continue # 跳过含digest的repo_tag
# 分割repository和tag
repo_tag_parts = repo_tag.rsplit(':', 1)
if len(repo_tag_parts) == 2:
repository, tag = repo_tag_parts
else:
repository = repo_tag_parts[0]
tag = 'latest'
# 解析repository
registry, namespace, image_name = parse_repository(repository)
if not registry or not namespace or not image_name:
continue
# 仅处理docker.io/library的官方镜像
#if registry == 'docker.io' and namespace == 'library':
# 仅处理docker.io的官方镜像
if registry == 'docker.io':
digest = None
# 在repoDigests中查找匹配的digest
for d in repoDigests:
if '@' not in d:
continue
repo_part, digest_part = d.split('@', 1)
d_registry, d_namespace, d_image = parse_repository(repo_part)
if d_registry == registry and d_namespace == namespace and d_image == image_name:
digest = digest_part
break
if not digest:
print(f"镜像 {repo_tag} 没有找到对应的Repo Digest,跳过")
result_pass.append(repo_tag)
continue
# 构建Docker Hub的仓库名
hub_repo = f"{namespace}/{image_name}"
# 获取Docker Hub的digest
hub_digest = get_hub_digest(hub_repo, tag)
if not hub_digest:
print(f"无法获取 {hub_repo}:{tag} 的Docker Hub digest")
result_warn.append(f"无法获取 {hub_repo}:{tag} 的Docker Hub digest")
continue
# 比较digest
if hub_digest == digest:
print(f"镜像 {repo_tag} 的digest匹配")
else:
print(f"镜像 {repo_tag} 的digest不匹配。本地: {digest}, 远程: {hub_digest}")
result_warn.append(f"镜像 {repo_tag} 的digest不匹配")
else:
print(f"镜像 {repo_tag} registry {registry},跳过")
result_pass.append(repo_tag)
# 更新进度条
pbar.update(1)
pbar.refresh()
print('-' * 10)
print()
print()
print('跳过' + ', '.join(result_pass))
print()
print('\n'.join(result_warn))
if __name__ == "__main__":
main()
# 安装依赖
pip install docker requests tqdm
# 运行效果
执行脚本后,它将遍历本地所有Docker镜像,识别官方镜像,并对比本地与Docker Hub的Digest。
此脚本能够有效地检查本地官方镜像的Digest是否与Docker Hub一致,帮助确认镜像的完整性和更新状态。
# 注意事项
-
认证限制:Docker Hub API有请求频率限制,频繁请求可能导致临时封禁。
-
网络连接:确保运行环境可以访问Docker Hub(可通过更改
PROXIES来配置代理)。 -
官方镜像判断:脚本严格判断镜像属于
docker.io/library仓库,其他仓库的镜像将被排除。 -
错误处理:脚本包含基础错误处理,但部分异常可能需要根据实际情况扩展。