简介
本专题将带你使用 axum 和 gRPC 构建一个分布式的博客系统数据结构与Protobuf
本章对我们项目的数据结构和proto进行定义实现分类服务
本章我们实现分类服务,即 `category-srv`实现文章服务
本章将带你实现文章的 gPRC 服务。实现前台web服务
本章将通过使用 axum 调用分类和文章的 gRPC 服务,来实现博客前台Web服务实现管理员服务
本章我们将实现管理员服务实现后台管理web服务
本章将使用 axum 调用 gRPC 服务来实现后台管理的 web 服务安全与鉴权
本章将讨论使用jwt进行鉴权服务扩容、注册、发现和编排
本章将讨论服务管理相关的话题配置中心服务
本章讨论配置中心的实现总结
本专题试图通过一个分布式博客的案例来探讨使用 rust 实现 gRPC 微服务架构的可行性
实现文章服务
首先,创建项目并将到 workspace中:
cargo new topic-srv
实现 TopicService
以下代码位于 topic-srv/src/server.rs
定义自己的结构体
pub struct Topic {
pool: Arc<PgPool>,
}
impl Topic {
pub fn new(pool: PgPool) -> Self {
Self {
pool: Arc::new(pool),
}
}
}
#[tonic::async_trait]
impl TopicService for Topic {
// ...
}
创建文章 create_topic 的实现
async fn create_topic(
&self,
request: tonic::Request<CreateTopicRequest>,
) -> Result<tonic::Response<CreateTopicReply>, tonic::Status> {
let CreateTopicRequest {
title,
category_id,
content,
summary,
} = request.into_inner();
let summary = match summary {
Some(summary) => summary,
None => get_summary(&content),
};
let row = sqlx::query("INSERT INTO topics (title,category_id,content,summary) VALUES($1,$2,$3,$4) RETURNING id")
.bind(title)
.bind(category_id)
.bind(content)
.bind(summary)
.fetch_one(&*self.pool)
.await.map_err(|err|tonic::Status::internal(err.to_string()))?;
let reply = CreateTopicReply { id: row.get("id") };
Ok(tonic::Response::new(reply))
}
截取内容的函数:
fn get_summary(content: &str) -> String {
if content.len() <= 255 {
return String::from(content);
}
content.chars().into_iter().take(255).collect()
}
修改文章 edit_topic 的实现
async fn edit_topic(
&self,
request: tonic::Request<EditTopicRequest>,
) -> Result<tonic::Response<EditTopicReply>, tonic::Status> {
let r = request.into_inner();
let summary = match r.summary {
Some(s) => s,
None => get_summary(&r.content),
};
let rows_affected = sqlx::query(
"UPDATE topics SET title=$1,content=$2,summary=$3,category_id=$4 WHERE id=$5",
)
.bind(r.title)
.bind(r.content)
.bind(summary)
.bind(r.category_id)
.bind(r.id)
.execute(&*self.pool)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?
.rows_affected();
Ok(tonic::Response::new(EditTopicReply {
id: r.id,
ok: rows_affected > 0,
}))
}
删除/恢复文章 toggle_topic 的实现
async fn toggle_topic(
&self,
request: tonic::Request<ToggleTopicRequest>,
) -> Result<tonic::Response<ToggleTopicReply>, tonic::Status> {
let ToggleTopicRequest { id } = request.into_inner();
let row = sqlx::query("UPDATE topics SET is_del=(NOT is_del) WHERE id=$1 RETURNING is_del")
.bind(id)
.fetch_optional(&*self.pool)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
if row.is_none() {
return Err(tonic::Status::not_found("不存在的文章"));
}
Ok(tonic::Response::new(ToggleTopicReply {
id,
is_del: row.unwrap().get("is_del"),
}))
}
获取文章 get_topic 的实现
async fn get_topic(
&self,
request: tonic::Request<GetTopicRequest>,
) -> Result<tonic::Response<GetTopicReply>, tonic::Status> {
let GetTopicRequest {
id,
is_del,
inc_hit,
} = request.into_inner();
let inc_hit = inc_hit.unwrap_or(false); // 增加点击量
if inc_hit {
sqlx::query("UPDATE topics SET hit=hit+1 WHERE id=$1")
.bind(id)
.execute(&*self.pool)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
}
let query = match is_del {
Some(is_del) => sqlx::query("SELECT id,title,content,summary,is_del,category_id,dateline,hit FROM topics WHERE id=$1 AND is_del=$2")
.bind(id).bind(is_del),
None => sqlx::query("SELECT id,title,content,summary,is_del,category_id,dateline,hit FROM topics WHERE id=$1")
.bind(id),
};
let row = query
.fetch_optional(&*self.pool)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
if row.is_none() {
return Err(tonic::Status::not_found("不存在的文章"));
}
let row = row.unwrap();
let dt: DateTime<Local> = row.get("dateline");
let dateline = dt_conver(&dt);
Ok(tonic::Response::new(GetTopicReply {
topic: Some(blog_proto::Topic {
id: row.get("id"),
title: row.get("title"),
category_id: row.get("category_id"),
content: row.get("content"),
summary: row.get("summary"),
hit: row.get("hit"),
is_del: row.get("is_del"),
dateline,
}),
}))
}
- 从传入的参数中判断是否需要同时对点击量进行递增,如果需要,则执行对应的 SQL
- 从数据库中获取对应的记录
- 将结果返回
时间的处理
- 在 proto 中,使用 google 定义的 timestamp
- 在将 proto 生成 rust 代码时,使用了
prost_types::Timestamp - 在 PostgreSQL 中,使用的是
TIMESTAMP WITH TIME ZONE,通常会简写成TIMESTAMPTZ
如果将这些不同的定义进行统一处理?
prost_types::Timestamp 本身就是对 proto 中 google 定义的 timestamp 的映射,所以它自然提供了转换功能,它们本质其实就是 i64。而且 PostgreSQL 中的 TIMESTAMPTZ 更为丰富,从它展示的结果来看,更像是 DateTime,所以我们使用的方法是:
将 chrono::DateTime转为prost_types::Timestamp的dt_conver函数:
fn dt_conver(dt: &DateTime<Local>) -> Option<prost_types::Timestamp> {
if let Ok(dt) = prost_types::Timestamp::date_time(
dt.year().into(),
dt.month() as u8,
dt.day() as u8,
dt.hour() as u8,
dt.minute() as u8,
dt.second() as u8,
) {
Some(dt)
} else {
None
}
}
fn tm_cover(tm: Option<prost_types::Timestamp>) -> Option<DateTime<Local>> {
match tm {
Some(tm) => Some(Local.timestamp(tm.seconds, 0)),
None => None,
}
}
文章列表 list_topic 的实现
async fn list_topic(
&self,
request: tonic::Request<ListTopicRequest>,
) -> Result<tonic::Response<ListTopicReply>, tonic::Status> {
let ListTopicRequest {
page,
category_id,
keyword,
is_del,
dateline_range,
} = request.into_inner();
let page = page.unwrap_or(0);
let page_size = 30;
let offset = page * page_size;
let mut start = None;
let mut end = None;
if let Some(dr) = dateline_range {
start = tm_cover(dr.start);
end = tm_cover(dr.end);
}
let row = sqlx::query(
r#"
SELECT
COUNT(*)
FROM
topics
WHERE 1=1
AND ($1::int IS NULL OR category_id = $1::int)
AND ($2::text IS NULL OR title ILIKE CONCAT('%',$2::text,'%'))
AND ($3::boolean IS NULL OR is_del = $3::boolean)
AND (
($4::TIMESTAMPTZ IS NULL OR $5::TIMESTAMPTZ IS NULL)
OR
(dateline BETWEEN $4::TIMESTAMPTZ AND $5::TIMESTAMPTZ)
)"#,
)
.bind(&category_id)
.bind(&keyword)
.bind(&is_del)
.bind(&start)
.bind(&end)
.fetch_one(&*self.pool)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
let record_total: i64 = row.get(0);
let page_totoal = f64::ceil(record_total as f64 / page_size as f64) as i64;
let rows = sqlx::query(
r#"
SELECT
id,title,content,summary,is_del,category_id,dateline,hit FROM topics
WHERE 1=1
AND ($3::int IS NULL OR category_id = $3::int)
AND ($4::text IS NULL OR title ILIKE CONCAT('%',$4::text,'%'))
AND ($5::boolean IS NULL OR is_del = $5::boolean)
AND (
($6::TIMESTAMPTZ IS NULL OR $7::TIMESTAMPTZ IS NULL)
OR
(dateline BETWEEN $6::TIMESTAMPTZ AND $7::TIMESTAMPTZ)
)
ORDER BY
id DESC
LIMIT
$1
OFFSET
$2
"#,
)
.bind(page_size)
.bind(offset)
.bind(&category_id)
.bind(&keyword)
.bind(&is_del)
.bind(&start)
.bind(&end)
.fetch_all(&*self.pool)
.await
.map_err(|err| tonic::Status::internal(err.to_string()))?;
let mut topics = Vec::with_capacity(rows.len());
for row in rows {
let dt: DateTime<Local> = row.get("dateline");
let dateline = dt_conver(&dt);
topics.push(blog_proto::Topic {
id: row.get("id"),
title: row.get("title"),
category_id: row.get("category_id"),
content: row.get("content"),
summary: row.get("summary"),
hit: row.get("hit"),
is_del: row.get("is_del"),
dateline,
});
}
Ok(tonic::Response::new(ListTopicReply {
page,
page_size,
topics,
record_total,
page_totoal,
}))
}
}
由于该方法接收的筛选条件过于复杂,如果使用 rust 来拼接 SQL 将会非常麻烦,我们利用 PostgreSQL 来处理。
以 AND ($4::text IS NULL OR title ILIKE CONCAT('%',$4::text,'%')) 为例:
$4::text:将绑定的第4号位的参数转换成TEXT。由于我们传入的4号位的参数是keyword,它是一个Option<String>,则:- 如果是
Some(s),转换成功,且参数的值是s,比如传入Some("axum.rs".to_string()),那么$4::text将转换成'axum.rs' - 如果是
None,则无法转换成TEXT,结果为NULL
- 如果是
ILIKE:不区分大小写的模糊匹配CONCAT:字符串拼接。CONCAT('%', 'axum.rs', '%')的结果是:'%axum.rs%'
运行和测试
本章代码位于03/实现文章服务分支
