python调用subprocess模块实现命令行操作控制SVN的方法

Python
696
0
0
2023-02-10
标签   SVN

使用python的subprocess模块实现对SVN的相关操作。

设置GitSvn类,在该类下自定义执行SVN常规操作的方法。

SVN的常规操作包括:

(1)获取SVN当前版本,通过方法get_version()实现;

(2)下载SVN指定仓库,通过方法download()实现,实际是通过调用SVN的命令行操作指令svn checkout实现的下载整个仓库功能;

(3)获取SVN某个仓库下的所有文件列表,通过方法search_dir()实现,实际是通过调用SVN的命令行操作指令svn list实现的获取仓库文件列表功能;

(4)在SVN指定位置创建新的文件夹,通过方法mkdir_command()实现,实际是通过调用SVN的命令行操作指令svn mkdir实现的创建文件夹或者目录功能;

(5)将本地仓库文件添加到SVN仓库,通过方法upload_file()实现,实际是通过调用SVN的命令行操作指令svn add实现的添加文件功能;

(6)将本地仓库已添加的文件提交SVN指定仓库,通过方法commit_command()实现,实际是通过调用SVN的命令行操作指令svn commit实现的提交文件功能;

(7)删除SVN仓库的目录,通过方法delete_url()实现,实际是通过调用SVN的命令行操作指令svn delete实现的删除目录功能;

(8)锁定SVN仓库的文件,通过方法lock_file()实现,实际是通过调用SVN的命令行操作指令svn lock实现的锁定文件功能;

(9)将SVN仓库的文件解除锁定,通过方法unlock_file()实现,实际是通过调用SVN的命令行操作指令svn unlock实现的解除文件锁定功能;

(10)查看SVN仓库文件当前状态,通过方法check_status()实现,实际是通过调用SVN的命令行操作指令svn status实现的查看文件状态功能;

(11)更新SVN仓库的某个文件,通过方法update_file()实现,实际是通过调用SVN的命令行操作指令svn up实现的更新文件功能;

GitSvn类定义如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: Logintern09

import os
import time
import subprocess
from log_manage import logger

rq = time.strftime('%Y%m%d', time.localtime(time.time()))
file_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "temp_files")
tempfile = os.path.join(file_path, rq + '.log')

class GitSvn(object):
    def __init__(self, bill_addr):
        self.bill_addr = bill_addr

    def get_version(self):
        cmd = "svn info %s" % self.bill_addr
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        logger.info(result)
        error = error.decode(encoding="gbk")
        logger.error(error)
        with open(tempfile, "w") as f:
            f.write(result)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "网络异常,暂时连接不上系统,请检查网络或其他配置!"
                raise SystemError(res)

    def download(self, dir_path):
        with open(tempfile, "r") as f:
            result = f.readlines()
            for line in result:
                if line.startswith("Revision"):
                    laster_version = int(line.split(":")[-1].strip())
                    logger.info(line)
                    break
        cur_path = os.path.dirname(os.path.realpath(__file__))
        id_path = os.path.join(cur_path, default_sys_name)
        cmd = "svn checkout %s %s -r r%s" % (dir_path, id_path, laster_version)
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "网络异常,暂时连接不上系统,请检查网络或其他配置!"
                raise SystemError(res)
            if (error.startswith("svn: E155004:")) or (error.startswith("svn: E155037:")):
                cur_path = os.path.dirname(os.path.realpath(__file__))
                file_path = os.path.join(cur_path, default_sys_name)
                cmd = "svn cleanup"
                logger.info(cmd)
                output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                          cwd=file_path)
                (result, error) = output.communicate()
                result = result.decode(encoding="gbk")
                error = error.decode(encoding="gbk")
                logger.info(result)
                logger.error(error)

    def search_dir(self):
        cmd = "svn list %s" % self.bill_addr
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        with open(tempfile, "w") as f:
            f.write(result)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "网络异常,暂时连接不上系统,请检查网络或其他配置!"
                raise SystemError(res)

    def mkdir_command(self, id_num):
        cmd = "svn mkdir %s" % id_num
        logger.info(cmd)
        cur_path = os.path.dirname(os.path.realpath(__file__))
        file_path = os.path.join(cur_path, default_sys_name)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                  cwd=file_path)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "网络异常,暂时连接不上系统,请检查网络或其他配置!"
                raise SystemError(res)

    def upload_file(self, file_path):
        cmd = "svn add %s --force" % file_path
        root_path, file_name = os.path.split(file_path)
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                  cwd=root_path)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "网络异常,暂时连接不上系统,请检查网络或其他配置!"
                raise SystemError(res)

    def commit_command(self, file):
        # 执行了锁定的用户执行了提交操作(提交操作将自动解锁)
        if os.path.isfile(file):
            file_path, file_name = os.path.split(file)
        else:
            file_path = file
        cmd = "svn commit %s -m 'update_files'" % file
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                  cwd=file_path)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "网络异常,暂时连接不上系统,请检查网络或其他配置!"
                raise SystemError(res)
            elif error.startswith("svn: E730053:"):
                res = "数据上传失败,请重新提交数据!!!"
                raise SystemError(res)
            else:
                res = "数据上传失败,请重新提交数据!!!"
                raise SystemError(res)

    def delete_url(self, url):
        cmd = "svn delete %s" % url
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "网络异常,暂时连接不上系统,请检查网络或其他配置!"
                raise SystemError(res)

    def lock_file(self, file_name):
        cur_path = os.path.dirname(os.path.realpath(__file__))
        id_path = os.path.join(cur_path, default_sys_name)
        file_path = os.path.join(id_path, file_name)
        cmd = "svn lock %s" % file_path
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "网络异常,暂时连接不上系统,请检查网络或其他配置!"
                raise SystemError(res)
            elif error.startswith("svn: warning: W160042:"):
                res = "系统资源已被其他用户锁定,请稍后重试!"
                raise SystemError(res)

    def unlock_file(self, file_name):
        # 不使用--force 参数 可以解锁被自己锁定的文集 即普通的release lock
        cur_path = os.path.dirname(os.path.realpath(__file__))
        id_path = os.path.join(cur_path, default_sys_name)
        file_path = os.path.join(id_path, file_name)
        cmd = "svn unlock %s --force" % file_path
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "网络异常,暂时连接不上系统,请检查网络或其他配置!"
                raise SystemError(res)

    def check_status(self, file_name):
        # ?:不在svn的控制中;M:内容被修改;C:发生冲突;A:预定加入到版本库;K:被锁定
        cur_path = os.path.dirname(os.path.realpath(__file__))
        id_path = os.path.join(cur_path, default_sys_name)
        file_path = os.path.join(id_path, file_name)
        cmd = "svn status -v %s" % file_path
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        # 获取状态结果解析
        status_list = result.split(" ")
        status_flag_list = []
        for i in status_list:
            if i.isalpha():
                status_flag_list.append(i)
            else:
                break
        if result.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "网络异常,暂时连接不上系统,请检查网络或其他配置!"
                raise SystemError(res)
        elif "C" in status_flag_list:
            return "C"
        elif "K" in status_flag_list:
            return "K"

    def update_file(self, file_name):
        cur_path = os.path.dirname(os.path.realpath(__file__))
        id_path = os.path.join(cur_path, default_sys_name)
        file_path = os.path.join(id_path, file_name)
        cmd = "svn up %s" % file_path
        logger.info(cmd)
        output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (result, error) = output.communicate()
        result = result.decode(encoding="gbk")
        error = error.decode(encoding="gbk")
        logger.info(result)
        logger.error(error)
        with open(tempfile, "w") as f:
            f.write(result)
        with open(tempfile, "r") as f:
            result = f.readlines()
            count = -1
            for line in result:
                count += 1
                if (line.strip() != "") and (count == 1):
                    if line.startswith("C"):
                        res = "更新系统资源时发现存在冲突,请稍后重试!"
                        raise SystemError(res)
        if error.strip() != "":
            if error.startswith("svn: E170013:"):
                res = "网络异常,暂时连接不上系统,请检查网络或其他配置!"
                raise SystemError(res)
            elif (error.startswith("svn: E155037:")) or (error.startswith("svn: E155004:")):
                cur_path = os.path.dirname(os.path.realpath(__file__))
                file_path = os.path.join(cur_path, default_sys_name)
                cmd = "svn cleanup"
                logger.info(cmd)
                output = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                          cwd=file_path)
                (result, error) = output.communicate()
                result = result.decode(encoding="gbk")
                error = error.decode(encoding="gbk")
                logger.info(result)
                logger.error(error)

上述类GitSvn的应用实例如下:

if __name__ == '__main__':
    default_url = svn:***  # 用户本地SVN客户端的URL地址
    git_class = GitSvn(default_url)
    git_class.get_version()
    git_class.download()
    # 验证查看目录文件列表功能
    git_class.search_dir()
    # 验证删除目录功能
    cur_path = os.path.dirname(os.path.realpath(__file__))
    file_path = os.path.join(cur_path, default_sys_name)
    id_path = os.path.join(file_path, 'SCR202202100002')
    git_class.delete_url(id_path)
    # 验证文件加锁功能
    git_class.lock_file(file_name="单号总表.xlsx")
    # 验证文件解锁功能
    git_class.unlock_file(file_name="单号总表.xlsx")
    # 检查文件状态
    git_class.check_status(file_name="单号总表.xlsx")
    # 验证创建目录功能
    git_class.mkdir_command("SCR202203280001")
    # 验证提交文件功能
    cur_path = os.path.dirname(os.path.realpath(__file__))
    sys_path = os.path.join(cur_path, default_sys_name)
    target_dir = os.path.join(sys_path, "SCR202203280001")
    git_class.upload_file(target_dir)
    git_class.commit_command(target_dir)