Axios refresh token
axios刷新token插件,支持自动刷新token,且刷新成功后自动重新发起请求并返回结果。可无缝集成到axios中。
实现源码
ts
import type {
AxiosError,
AxiosRequestConfig,
AxiosResponse,
InternalAxiosRequestConfig
} from 'axios';
type IRequest = (config: AxiosRequestConfig) => Promise<AxiosResponse>;
type IRefreshTokenFn = () => Promise<boolean>;
type IsRefreshToken = (error?: AxiosError, res?: AxiosResponse) => boolean;
interface IError extends AxiosError {
config: InternalAxiosRequestConfig;
}
export class AxiosRefreshTokenPlugin {
static CODE = 'ERR_REFRESH';
static CODE_FAILED = 'ERR_REFRESH_FAILED';
private isRefreshing = false;
private isStartedRefresh = false;
private pendingQueue: Function[];
request: IRequest;
refreshTokenFn: IRefreshTokenFn;
isRefreshToken = (error?: AxiosError, res?: AxiosResponse) => {
if (error) {
return error.response?.status === 401;
}
if (res) {
return res.data?.code === 401;
}
return false;
};
constructor(
refreshTokenFn: IRefreshTokenFn,
request: IRequest,
isRefreshToken?: IsRefreshToken
) {
this.pendingQueue = [];
this.request = request;
this.refreshTokenFn = refreshTokenFn;
if (isRefreshToken) {
this.isRefreshToken = isRefreshToken;
}
}
private createError(
message: string,
code: string,
config: InternalAxiosRequestConfig
) {
const error = new Error(message) as IError;
error.code = code;
error.config = config;
error.isAxiosError = false;
error.toJSON = () => ({});
error.name = 'RefreshTokenPluginError';
return error;
}
addPending(config: InternalAxiosRequestConfig): Promise<AxiosResponse> {
return new Promise((resolve, reject) => {
const delay = config.timeout;
let timer: NodeJS.Timeout | null = null;
if (delay) {
timer = setTimeout(() => {
const error = this.createError(
'Request timeout',
'ERR_CANCELED',
config
);
reject(error);
}, delay);
}
const callback = () => {
this.request(config).then(resolve).catch(reject);
timer && clearTimeout(timer);
};
this.pendingQueue.push(callback);
});
}
refresh(error: AxiosError): Promise<AxiosResponse> {
this.isRefreshing = true;
return new Promise((resolve, reject) => {
this.refreshTokenFn().then((res) => {
this.isRefreshing = false;
this.isStartedRefresh = false;
if (!res) {
this.pendingQueue = [];
error.message = 'Refresh token failed';
error.code = AxiosRefreshTokenPlugin.CODE_FAILED;
reject(error);
return;
}
this.addPending(error.config!).then(resolve).catch(reject);
this.pendingQueue.forEach((fn) => {
fn();
});
this.pendingQueue = [];
});
});
}
requestInterceptor(config: InternalAxiosRequestConfig) {
if (this.isRefreshing && this.isStartedRefresh) {
const error = this.createError(
'Refreshing token',
AxiosRefreshTokenPlugin.CODE,
config
);
return Promise.reject(error);
}
if (this.isRefreshing) {
this.isStartedRefresh = true;
}
return config;
}
responseInterceptorFulfilled(response: AxiosResponse) {
if (this.isRefreshToken(undefined, response)) {
const error = this.createError(
'Unauthorized',
'ERR_UNAUTHORIZED',
response.config
);
return this.refresh(error);
}
return response;
}
responseInterceptorRejected(error: AxiosError) {
if (error.code === AxiosRefreshTokenPlugin.CODE) {
return this.addPending(error.config!);
}
if (this.isRefreshToken(error)) {
return this.refresh(error);
}
return Promise.reject(error);
}
}
export default function createRefreshTokenPlugin(
refreshToken: IRefreshTokenFn,
request: IRequest,
isRefreshToken?: IsRefreshToken
) {
const instance = new AxiosRefreshTokenPlugin(
refreshToken,
request,
isRefreshToken
);
return {
requestInterceptor: instance.requestInterceptor.bind(instance),
responseInterceptorFulfilled: instance.responseInterceptorFulfilled.bind(instance),
responseInterceptorRejected: instance.responseInterceptorRejected.bind(instance)
};
}基本用法
简单用法
先创建插件实例,然后传入刷新token方法与axios实例请求方法,最后将插件实例的拦截器方法注册到axios中即可。
ts
import axios from 'axios'
import createRefreshTokenPlugin from '...'
const instance = axios.create()
const twinTokenPlugin = createRefreshTokenPlugin(async () => {
// 获取 token 逻辑
const token = getRefreshToken()
if (!token) {
return false
}
// 刷新 token 逻辑
const [, res] = await refreshToken(token)
if (!res) {
return false
}
// 保存 token 逻辑
saveToken(res.data.token, res.data.refreshToken)
return true
}, instance.request)
// 注册拦截器
instance.interceptors.request.use(twinTokenPlugin.requestInterceptor)
instance.interceptors.response.use(undefined, twinTokenPlugin.responseInterceptorRejected)自定义何时刷新
可以通过第三个参数传入一个函数,该函数接收两个参数,第一个参数为AxiosError实例,第二个参数为Response数据,可以根据这两个参数来自定义何时刷新。
ts
import axios from 'axios'
import createRefreshTokenPlugin from '...'
const instance = axios.create()
const twinTokenPlugin = createRefreshTokenPlugin(async () => {
// todo: 获取新 token 逻辑
return true
}, instance.request, (error, response) => {
// 自定义何时刷新 token
if (error?.response?.status === 401) {
return true
}
if (response?.data?.code === 401) {
return true
}
return false
})
// 注册拦截器
instance.interceptors.request.use(twinTokenPlugin.requestInterceptor)
instance.interceptors.response.use(twinTokenPlugin.responseInterceptorFulfilled, twinTokenPlugin.responseInterceptorRejected)配置项
refreshToken
- 类型:
() => Promise<boolean> - 默认值:
undefined - 说明:刷新 token 的方法
- 必填:是
request
- 类型:
(config: AxiosRequestConfig) => Promise<AxiosResponse> - 默认值:
undefined - 说明:刷新token成功后,重新请求方法。建议传入
axios实例的request方法 - 必填:是
isRefreshToken
- 类型:
(error?: AxiosError, response?: AxiosResponse) => boolean - 默认值:
(error, res) => {if (error) {return error.response?.status === 401;}if (res) {return res.data?.code === 401;}return false;} - 说明:判断是否需要刷新 token 的方法
- 必填:否