vue-element-admin+flask实现数据查询项目的实例代码

Vue
378
0
0
2023-06-12
标签   Flask
目录
  • 前端
  • 1、添加路由
  • 2、添加页面元素
  • 3、添加请求
  • 后端
  • 1、添加路由
  • 2、添加函数

当今前端世界最流行的三个框架Vue,React,Angular。这三个框架可谓是各有千秋,可以满足不同场景的需求。我们这套实践分享内容主要是使用Vue框架,这个框架上学习起来非常容易,很容易上手。相对React陡峭的学习曲线,Vue显得格外简单和友好。好了,选好了框架后,我们还需要选择UI组件库。基于Vue框架开发的UI组件库也有很多,例如:View UI ,Element UI等等。我们这里使用Element UI做为我们的UI组件库。它的生态很丰富,有很多开源的集成了很多日常开发中常用的功能。在这个项目中,我们采用vue-element-admin。vue-element-admin 是一个后台前端解决方案,它基于 vue 和 element-ui实现。它使用了最新的前端技术栈,内置了 i18n 国际化解决方案,动态路由,权限验证,提炼了典型的业务模型,提供了丰富的功能组件,可以帮助我们快速搭建企业级中后台产品。

本文分享一个使用vue-element-admin+flask实现的一个数据查询项目,

填写数据库连接信息和查询语句,即可展示查询到的数据。

前提:已下载vue-element-admin并编译成功

前端

1、添加路由

src-router-index.js添加路由

{
    path: '/idata',
    component: Layout,
    redirect: '/idata/index',
    hidden: false,
    children: [
      {
        path: 'index',
        component: () => import('@/views/idata/index'),
        name: 'idata',
        meta: { title: 'iData设置', icon: 'list', noCache: true }
      }
    ]
  },

2、添加页面元素

在src-views下新建idata目录,目录下新建index.vue

<template>
  <div class="app-container">
    <el-card class="box-card">
      <el-form :inline="true" ref="formData" :rules="rules" :model="formData" label-width="px">
        <el-form-item label="域名" prop="db_host">
          <el-input
          v-model="formData.db_host"></el-input>
        </el-form-item>
        <el-form-item label="端口" prop="db_port" >
          <el-input
          v-model="formData.db_port" style="width:px;"></el-input>
        </el-form-item>
        <el-form-item label="用户名" prop="db_user">
          <el-input
          v-model="formData.db_user" style="width:px;"></el-input>
        </el-form-item>
        <el-form-item label="密码" prop="db_passwd">
          <el-input
          v-model="formData.db_passwd" show-password></el-input>
        </el-form-item>
        <el-form-item label="数据库" prop="db_name">
          <el-input
          v-model="formData.db_name"></el-input>
        </el-form-item>
       
  
          <el-row>
      <el-input
        type="textarea"
        :rows=""
        placeholder="请输入SQL语句"
        v-model="formData.sql">
      </el-input>
    </el-row>
  <el-row>
          <el-button type="primary" @click="onConnect('formData')">测试连接</el-button>
          <el-button type="primary" @click="onExec('formData')">执行SQL</el-button>
          <p>{{message}}</p>
        </el-row>
    </el-form>
    </el-card>
    <el-card class="box-card">
      <el-table
      :data="tableData"
     
      style="width:%">
        <el-table-column v-for="h in tableHead" :key="h"
          :prop="h" :label="h">
        </el-table-column>
      </el-table>
    </el-card>
  </div>
</template>
 
<script>
import { getConnect, doConnect, exec } from '@/api/idata'
export default {
  name: 'idata',
  data() {
    return {
      rules: {
        db_host: [
          { required: true, message: '请输入域名', trigger: 'blur' }
        ],
        db_port: [
          { required: true, message: '请输入端口', trigger: 'blur' }
        ],
        db_user: [
          { required: true, message: '请输入用户名', trigger: 'blur' }
        ],
        db_passwd: [
          { required: true, message: '请输入密码', trigger: 'blur' }
        ],
        db_name: [
          { required: true, message: '请输入数据库', trigger: 'blur' }
        ]
      },
 
      title: 'iData设置',
      formData: {
        db_charset: 'utf',
        db_host: null,
        db_port: null,
        db_user: null,
        db_passwd: null,
        db_name: null,
        sql: null,
        param: {},
        key: 'all'
      },
      tableData: [],
      time:,
      uniqKey: null,
      maxCount:,
      sql: null,
      message:null,
      tableHead: [],
      result:[]
    }
  },
  methods: {
    onConnect (formName) {
        this.$refs[formName].validate((valid) =>{
        if (valid) {
        
         doConnect({
        ...this.formData
      }).then(res => {
        
        this.uniqKey = res.uniq_key
        this.message='连接成功'
        this.$message({
          message: '连接测试成功!',
          type: 'success'
        })
      })
          }
         else {    
          this.$message({
            message: '请检查必填字段!',
            type: 'error'
          })
           
            return false
          }
      }
      )//验证函数结尾
    },
    onExec (formName) {
      this.$refs[formName].validate((valid) =>{
      if (valid) {
          if (!this.formData.sql) {
            this.$message({
              message: 'SQL 语句不能为空。',
              type: 'warning'
            })
            return
          }
      
      exec({                      
            ...this.formData,           
           
          }).then(res => {
           
              if (res.data.length >) {  
              this.tableHead = []
              for (let i in res.data[]) {
                this.tableHead.push(i)
              }
              this.tableData = res.data  
              this.time = res.time_cost
            } else {                      
              this.tableHead = []
              this.tableData = []
            }
            this.message='查询成功'
            this.loading = false
            this.$message({
              message: '查询成功',
              type: 'success'
            }
 
            )
         
       
  }
           ).catch(err => { 
            this.loading = false
            console.log(err)
            this.$message({
            
              type: 'error'
            })
          })
        }
       else {    
        this.$message({
          message: '请检查必填字段!',
          type: 'error'
        })
          console.log('error submit!!')
          return false
        }
    }
    )//验证函数结尾
  }//onconnect函数结尾
}
}
</script>

3、添加请求

在src-api新建idata.js

import request from '@/utils/request'
export function doConnect (data) {
  return request({
    url: '/api/test',
    method: 'post',
    data
  })
}
export function exec (data) {
  return request({
    url: '/api/idata',
    method: 'post',
    data
  })
}
----拿到ret结果,根据结果来显示默认按钮/成功按钮。
----如何展示查询到的数据呢? 前端实际有一个table,动态读取结果字段显示。
----查询数据时,前端一直报错,显示error,控制台显示uncanght promise报错。
查了一会,才发现是后端返回的ret没有code这个字段,加上就可以了。
----如何将textraea输入框宽度增加
----弹框消息
 this.$message({
     message: '连接测试成功!',
     type: 'success'
    })
----前端方法里的response.data实际是取的返回结果data这个key的值
---- this.$refs 用来给标签或者组件添加属性。需要这个标签或者组件中有ref=
 <el-form :inline="true" ref="formData" :rules="rules" :model="formData"
label-width="70px">
----控制台可以调试前端代码
-----form加:rules="rules" 并且在return里写上规则,并且每个el-form-item要有prop,可以实现
校验必填 其实在el-form这个组件有写对于表单的校验
-----加了校验后报错 Cannot read properties of undefined (reading 'validate')
传参需要加引号submit('formName')。
 this.$refs[formName].validate 这一行是对的

后端

这里只贴主要代码,有时间再继续贴。

1、添加路由

app.py添加

app.route('/api/idata', methods=['POST'])(idata)
    app.route('/api/test',methods=['POST'])(onconnect)

2、添加函数

controller下data.py

import json
from flask import request, current_app as app
from constant import SQLTYPE, DBKEYTYPE
from lib.dbpool import DBPool
 
 
def idata():
    return query(request.json)
 
def onconnect():
    data=request.json
    conn_info = warp_db_conn_info(data)
    print(conn_info)
    db_conn = DBPool.get_conn(**conn_info)
    ret = {
        "success": False,
        "msg": None,
        'code':
    }
    if not db_conn:
        ret['msg'] = '连接目标DB失败,请确认连接信息是否正确'
        ret['code']=-
    else:
        ret['msg'] = '连接成功'
        ret['success']=True
        ret['code']=
    return ret
 
 
def warp_db_conn_info(data):
    d = {}
    keys = ['db_host', 'db_port', 'db_name', 'db_user', 'db_passwd', 'db_charset']
    for key in keys:
        if key in data and data[key]:
            d[key] = data[key].strip() if isinstance(data[key], (str, bytes)) else data[key]
 
    return d
 
 
def query(data):
    """
    根据DB和SQL信息执行远程查询并返回结果
    :param data:     {
        "db_host": "x.x.x.x",
        "db_port":,
        "db_name": "test",
        "db_user": "root",
        "db_passwd": "root",
        "db_charset": "utf",
        "sql": "select * from tbl_key_info where id=:id limit",
        "param": {
            "id":
        },
        "key": "all"
    }
    :return: {
        "success": True,
        "data": [{"id":, "xxx", "xxx"}],
        "type": "SELECT",
        "msg": ""
    }
    """
    app.logger.info(f'query data {json.dumps(data)}')
    ret = {
        "success": False,
        "data": [],
        "type": None,
        "msg": None,
        "code":
    }
 
    if not data:
        return ret
 
    conn_info = warp_db_conn_info(data)
    # print(conn_info)
    db_conn = DBPool.get_conn(**conn_info)
    if not db_conn:
        ret['msg'] = '连接目标DB失败,请确认连接信息是否正确'
        return ret
 
    sql = data.get('sql')
    if not sql:
        ret['msg'] = '查询语句不能为空'
        return ret
 
    # app.logger.info(f'查询语句: {sql}, 查询参数: {data.get("param")}')
    app.logger.info(f'查询语句: {sql}, 查询参数: {data.get("param")}')
    # rows = db_conn.query(sql, **data.get('param'))
    rows = db_conn.query(sql)
    results = []
 
    upper_sql = sql.upper().strip()
    if upper_sql.startswith(SQLTYPE.INSERT):
        sql_type = SQLTYPE.INSERT
    elif upper_sql.startswith(SQLTYPE.UPDATE):
        sql_type = SQLTYPE.UPDATE
    elif upper_sql.startswith(SQLTYPE.DELETE):
        sql_type = SQLTYPE.DELETE
    elif upper_sql.startswith(SQLTYPE.SELECT):
        sql_type = SQLTYPE.SELECT
        key = data.get('key', DBKEYTYPE.FIRST)
        if key == 'all':
            results = rows.as_dict()
        else:  # 为了减少网络传输,默认只查询一行
            first = rows.first()
            results = [first.as_dict()] if first else []
    else:
        sql_type = SQLTYPE.UNKNOWN
 
    ret.update(
        {
            "success": True,
            "data": results,
            "type": sql_type,
            "msg": '',
            'code':
 
        }
    )
 
    app.logger.info(f'查询结果: {results}')
    return ret

dbpool.py用来进行数据库连接

import records
from urllib import parse
class DBPool:
    db_pool = {}
 
    @staticmethod
    def get_conn(db_host=None, db_name=None, db_port="", db_user='root',
                 db_passwd='root', db_charset='utf'):
        if db_host is None or db_name is None:
            raise ValueError("host and db_name can't be None.")
 
        key = DBPool.make_uniq_key(db_host, db_port, db_name)
        if key not in DBPool.db_pool:
            sql_str = DBPool.make_conn_str(db_host, db_name, db_port, db_user, db_passwd, db_charset)
            print(sql_str)
            DBPool.db_pool[key] = records.Database(sql_str, pool_pre_ping=True)
 
        try:
            db = DBPool.db_pool[key]
            db.query('''select''')
        except:
            db = None
            del DBPool.db_pool[key]
 
        return db
 
    @staticmethod
    def make_uniq_key(db_host, db_name, db_port):
        return f'{db_host}:{db_port}:{db_name}'
 
    @staticmethod
    def make_conn_str(db_host, db_name, db_port, db_user, db_passwd, db_charset):
        return f'mysql+pymysql://{db_user}:{parse.quote_plus(db_passwd)}@{db_host}:{db_port}/{db_name}?charset={db_charset}'

今天晚了,改天有时间再详细讲解。