on
API 서버 개발하기
API 서버 개발하기
728x90
메모장 서비스 API 서버 개발
1. app.py
from flask import Flask from flask_restful import Api from flask_jwt_extended import JWTManager from db.db import get_mysql_connection from config.config import Config from resources.user import UserRegister, UserLoginResource, UserLogoutResource, jwt_blocklist from resources.memo import MemoListResource, MemoResource app = Flask(__name__) # 환경 변수 설정. app.config.from_object(Config) # JWT 환경 설정. jwt = JWTManager(app) @jwt.token_in_blocklist_loader def check_if_token_is_revoked(jwt_header, jwt_payload) : jti = jwt_payload['jti'] return jti in jwt_blocklist # api 설정. api = Api(app) # 경로랑 리소스 연결. api.add_resource(MemoListResource, '/v1/memo') api.add_resource(MemoResource, '/v1/memo/') # 경로에 변수 처리 api.add_resource(UserRegister, '/v1/users') api.add_resource(UserLoginResource, '/v1/users/login', '/v1/users//my') api.add_resource(UserLogoutResource, '/v1/users/logout') if __name__ == '__main__' : app.run()
2-1. resources/user.py
from flask import request from flask_restful import Resource from http import HTTPStatus from db.db import get_mysql_connection from mysql.connector import Error from utils import hash_password, check_password from email_validator import EmailNotValidError, validate_email from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity, get_jwt from flask_jwt_extended import get_jti jwt_blocklist = set() # 회원가입 API class UserRegister(Resource) : def post(self) : data = request.get_json() # 0. 데이터가 전부 다 있는지 체크 if 'nick_name' not in data or 'email' not in data or 'password' not in data : return {'err_code' : 1}, HTTPStatus.BAD_REQUEST # 1. 이메일이 유효한지 체크 try : validate_email(data['email']) except EmailNotValidError as e : print(str(e)) return {'err_code' : 3}, HTTPStatus.BAD_REQUEST # 2. 비밀번호 길이체크 및 암호화 if len(data['password']) < 4 or len(data['password']) > 16 : return {'err_code' : 2}, HTTPStatus.BAD_REQUEST password = hash_password(data['password']) print(password) # 3. 데이터베이스에 저장. try : connection = get_mysql_connection() cursor = connection.cursor() query = """ insert into memo_user (email, password, nick_name) values(%s, %s, %s);""" param = (data['email'], password, data['nick_name']) cursor.execute(query, param) connection.commit() # 디비에 데이터를 저장한 후, 저장된 아이디값을 받아온다. user_id = cursor.getlastrowid() print(user_id) except Error as e : print(str(e)) return {'err_code' : 4 }, HTTPStatus.NOT_ACCEPTABLE cursor.close() connection.close() # JWT를 이용해서 인증토큰을 생성해준다. access_token = create_access_token(identity=user_id) return {'token' : access_token}, HTTPStatus.OK class UserLoginResource(Resource) : # 로그인 API def post(self) : # 1. 클라이언트로부터 이메일과 비밀번호를 받아온다. data = request.get_json() if 'email' not in data or 'password' not in data : return {'err_code' : 1}, HTTPStatus.BAD_REQUEST # 2. 이메일 벨리데이션 체크 try : validate_email(data['email']) except EmailNotValidError as e : print(str(e)) return {'err_code' : 2}, HTTPStatus.BAD_REQUEST # 3. 비밀번호가 맞는지 체크하기 위해서 데이터 베이스에서 위의 이메일로 # 유저 정보를 가져온다. (select) connection = get_mysql_connection() cursor = connection.cursor(dictionary=True) query = """select id, password from memo_user where email = %s;""" param = (data['email'], ) cursor.execute(query, param) records = cursor.fetchall() print(records) # 3-1. 회원가입이 안된 이메일로 요청했을때는, record에 데이터가 없으니까 # 클라이언트에게 응답한다. if len(records) == 0 : return {'err_code' : 3}, HTTPStatus.BAD_REQUEST # 4. 위에서 가져온 디비에 저장되어있는 비밀번호와, 클라이언트로부터 # 받은 비밀번호를 암호화 한것과 비교한다. password = data['password'] hashed = records[0]['password'] ret = check_password(password, hashed) # 5. 같으면 클라이언트에 200 리턴 if ret is True : user_id = records[0]['id'] access_token = create_access_token(identity = user_id) return {'token' : access_token}, HTTPStatus.OK # 6. 다르면, 에러 리턴 else : return {'err_code' : 4}, HTTPStatus.BAD_REQUEST # 내정보 가져오는 API @jwt_required() def get(self, user_id) : # 토큰에서 유저 아이디 가져온다. token_user_id = get_jwt_identity() if token_user_id != user_id : return {'err_code' : 1}, HTTPStatus.UNAUTHORIZED # 1. 데이터베이스에서 쿼리해서 유저정보 가져온다. connection = get_mysql_connection() cursor = connection.cursor(dictionary=True) query = """ select id, email, nick_name from memo_user where id = %s; """ param = (user_id, ) cursor.execute(query, param) records = cursor.fetchall() cursor.close() connection.close() # 2. 유저정보 없으면, 클라이언트에 유저정보 없다고 리턴 if len(records) == 0 : return {'err_code' : 2}, HTTPStatus.BAD_REQUEST # 3. 있으면, 해당유저 정보를 리턴 else : return {'ret' : records[0]}, HTTPStatus.OK # 로그아웃 API 만들기 class UserLogoutResource(Resource) : @jwt_required() def post(self) : jti = get_jwt()['jti'] jwt_blocklist.add(jti) return {'message' : 'Successful Logout!'}, HTTPStatus.OK
2-2. resources/memo.py
from flask import request from flask_restful import Resource from http import HTTPStatus from db.db import get_mysql_connection # JWT 라이브러리 from flask_jwt_extended import jwt_required, get_jwt_identity class MemoListResource(Resource) : # get 메소드로 연결시킬 함수 작성. def get(self) : # recipe 테이블에 저장되어 있는 모든 레시피 정보를 가져오는 함수 # 1. DB 커넥션을 가져온다. connection = get_mysql_connection() print(connection) # 2. 커넥션에서 커서를 가져온다. cursor = connection.cursor(dictionary=True) # 3. 쿼리문을 작성한다. query = """select * from memo;""" # 4. SQL 실행 cursor.execute(query) # 5. 데이터를 페치한다. records = cursor.fetchall() print(records) ret = [] for row in records : row['created_at'] = row['created_at'].isoformat() row['updated_at'] = row['updated_at'].isoformat() ret.append(row) # 6. 커서와 커넥션을 닫아준다. cursor.close() connection.close() # 7. 클라이언트에 리스폰스 한다. return {'count' : len(ret), 'ret':ret}, HTTPStatus.OK @jwt_required() # 로그인한 유저만 이 API 이용 할 수 있다. def post(self) : # 1. 클라이언트가 요청한 request의 body에 있는 json 데이터를 가져오기 data = request.get_json() # 2. 필수 항목이 있는지 체크 if 'title' not in data : return { 'message' : '필수값이 없습니다.'}, HTTPStatus.BAD_REQUEST # JWT 인증토큰에서 유저아이디 뽑아온다. user_id = get_jwt_identity() # 3. 데이터베이스 커넥션 연결 connection = get_mysql_connection() # 4. 커서 가져오기 cursor = connection.cursor(dictionary=True) # 5. 쿼리문 만들기 query = """insert into memo(title, content, user_id) values (%s,%s,%s);""" param = (data['title'], data['content'], user_id) # 6. 쿼리문 실행 c_result = cursor.execute(query,param) print(c_result) connection.commit() # 7. 커서와 커넥션 닫기 cursor.close() connection.close() # 8. 클라이언트에 response 하기 return {'err_code':0 }, HTTPStatus.OK class MemoResource(Resource) : def get(self, memo_id) : # 1. 경로에 붙어있는 값(메모테이블의 아이디)을 가져와야 한다. # 위의 get 함수의 파라미터로 지정해준다. 따라서 recipe_id에 값이 들어있다. # 2. 디비 커넥션 가져온다. connection = get_mysql_connection() # 3. 커서를 가져온다. cursor = connection.cursor(dictionary=True) # 4. 쿼리문을 만든다. query = """ select * from memo where id = %s ; """ param = (memo_id,) # 5. 쿼리문을 실행한다. cursor.execute(query, param) records = cursor.fetchall() # 6. 커서, 커넥션을 닫아준다. cursor.close() connection.close() # 7. 실행 결과를 클라이언트에 보내준다. if records == 0 : return {'message' : '패스로 셋팅한 레시피 아이디는 없습니다.'}, HTTPStatus.BAD_REQUEST result = [] for row in records : row['created_at'] = row['created_at'].isoformat() row['updated_at'] = row['updated_at'].isoformat() result.append(row) return {'count' : len(result), 'ret' : result[0]} @jwt_required() def put(self, memo_id) : data = request.get_json() # 업데이트 함수 작성 . if 'title' not in data or 'content' not in data : return { 'err_code' : 1}, HTTPStatus.BAD_REQUEST connection = get_mysql_connection() cursor = connection.cursor(dictionary=True) # 유저아이디가, 이 레시피 만든 유적 맞는지 확인해 봐야 한다. user_id = get_jwt_identity() query = """select user_id from memo where id = %s;""" param = (memo_id, ) cursor.execute(query, param) records = cursor.fetchall() if len(records) == 0 : return {'err_code' : 2}, HTTPStatus.BAD_REQUEST if user_id != records[0]['user_id'] : return {'err_code' : 3}, HTTPStatus.NOT_ACCEPTABLE # 업데이트 쿼리 query = """ update memo set title = %s, content = %s where id = %s """ param = (data['title'],data['content'],memo_id) cursor.execute(query, param) connection.commit() cursor.close() connection.close() return {'err_code':0 }, HTTPStatus.OK @jwt_required() def delete(self, memo_id) : connection = get_mysql_connection() cursor = connection.cursor(dictionary=True) # 유저아이디가, 이 레시피 만든 유적 맞는지 확인해 봐야 한다. user_id = get_jwt_identity() # 데이터 베이스에 저장된 유저아이디와 같은지 비교한다. query = """select user_id from memo where id = %s;""" param = (memo_id, ) cursor.execute(query, param) records = cursor.fetchall() if len(records) == 0 : return {'err_code' : 1}, HTTPStatus.BAD_REQUEST if user_id != records[0]['user_id'] : return {'err_code' : 2}, HTTPStatus.UNAUTHORIZED # 삭제 쿼리 query = """ delete from memo where id = %s """ param = (memo_id, ) cursor.execute(query, param) connection.commit() cursor.close() connection.close() return {'err_code':0 }, HTTPStatus.OK
3. utils.py
from passlib.hash import pbkdf2_sha256 from config.config import salt # 회원가입시 사용할 함수로써, 유저가 입력한 비밀번호를 암호화 해주는 함수. def hash_password(password) : return pbkdf2_sha256.hash(password + salt) # 로그인시 사용할 함수 로써 유저가 입력한 비밀번호와, # DB에 저장된 비밀번호가 같은지 확인해 주는 함수 def check_password(password, hashed) : return pbkdf2_sha256.verify(password + salt, hashed)
4. db/db.py
import mysql.connector from mysql.connector import Error from config.config import db_config def get_mysql_connection(): try : connection = mysql.connector.connect( **db_config ) if connection.is_connected() : print('coneection OK!') return connection except Error as e: print('Error while connectiong to MySQL', e) return None
5. config/config.py
# MySQL 접속 정보를, 딕셔너리 형태로 저장한다. db_config = { 'host' : 'USER_DB_URL', 'database' : 'USER_DB_NAME', 'user' : 'USER_ID', 'password' : 'USER_PASSWORD' } class Config : DEBUG = True # JWT용 시크릿 키 SECRET_KEY = '자신이 기억할 수 있게 설정(해킹 방지 위한 키)' # 비밀번호 암호화를 위한 salt 설정 => 해킹방지를 위해서 salt = 'hello_test'
Reference : https://github.com/serverless/components
728x90
from http://jaechang3456.tistory.com/47 by ccl(A) rewrite - 2021-07-18 14:00:29