- Consumer 消費者
- Producer 生産者
- Request-Response 請求-響應
在 MassTransit 中,一個消費者可以消費一種或多種消息
消費者的類型包括:普通消費者,saga,saga 狀态機,路由活動(分布式追蹤),處理器 handlers,工作消費者 job comsumers
- Consumer
- Instance
- Handler
- Others
public class Program
{
public static async Task Main()
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.ReceiveEndpoint("order-service", e =>
{
e.Consumer<SubmitOrderConsumer>();
});
});
}
}
繼承 IConsumer,實現 Consume 方法
class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
public async Task Consume(ConsumeContext<SubmitOrder> context)
{
await context.Publish<OrderSubmitted>(new
{
context.Message.OrderId
});
}
}
三個原則:
- 擁抱 The Hollywood Principle, which states, "Dont't call us, we'll call you."
- Consume 方法是一個被等待的方法,在執行中時其他消費者無法接收到這個消息,當這個方法完成的時候,消息被 ack,并且從隊列中移除
- Task 方法異常會導緻消息觸發 retry,如果沒有配置重試,消息将被投遞到失敗隊列
public class Program
{
public static async Task Main()
{
var submitOrderConsumer = new SubmitOrderConsumer();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.ReceiveEndpoint("order-service", e =>
{
e.Instance(submitOrderConsumer);
});
});
}
}
所有接收到的消息都由一個消費者來實例來處理(請确保這個消費者類是線程安全)
Consumer 每次接收到消息都會 new 一個實例
Handler
public class Program
{
public static async Task Main()
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.ReceiveEndpoint("order-service", e =>
{
e.Handler<SubmitOrder>(async context =>
{
await Console.Out.WriteLineAsync($"Submit Order Received: {context.Message.OrderId}");
});
});
});
}
}
通過一個委托 Lambda 方法,來消費消息
Others- Saga<>
- StateMachineSaga<>
消息的生産可以通過兩種方式産生:發送和發布
發送的時候需要指定一個具體的地址 DestinationAddress,發布的時候消息會被廣播給所有訂閱了這個消息類型的消費者
基于這兩種規則,消息被定義為:命令 command 和事件 event
- send
- publish
可以調用以下對象的 send 方法來發送 command:
- ConsumeContext (在 Consumer 的 Consumer 方法參數中傳遞)
- ISendEndpointProvider(可以從 DI 中獲取)
- IBusControl(最頂層的控制對象,用來啟動和停止 masstransit 的控制器)
public class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
private readonly IOrderSubmitter _orderSubmitter;
public SubmitOrderConsumer(IOrderSubmitter submitter)
=> _orderSubmitter = submitter;
public async Task Consume(IConsumeContext<SubmitOrder> context)
{
await _orderSubmitter.Process(context.Message);
await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
}
}
public async Task SendOrder(ISendEndpointProvider sendEndpointProvider)
{
var endpoint = await sendEndpointProvider.GetSendEndpoint(_serviceAddress);
await endpoint.Send(new SubmitOrder { OrderId = "123" });
}
- 發送地址
- 短地址
- Convention Map
- rabbitmq://localhost/input-queue
- rabbitmq://localhost/input-queue?durable=false
- GetSendEndpoint(new Uri("queue:input-queue"))
Convention Map
在配置文件中指定 map 規則
EndpointConvention.Map<StartDelivery>(new Uri(ConfigurationManager.AppSettings["deliveryServiceQueue"]));
直接發送
public class SubmitOrderConsumer :
IConsumer<SubmitOrder>
{
private readonly IOrderSubmitter _orderSubmitter;
public SubmitOrderConsumer(IOrderSubmitter submitter)
=> _orderSubmitter = submitter;
public async Task Consume(IConsumeContext<SubmitOrder> context)
{
await _orderSubmitter.Process(context.Message);
await context.Send(new StartDelivery(context.Message.OrderId, DateTime.UtcNow));
}
}
可以調用以下對象的 publish 方法來發送 event:
- ConsumeContext (在 Consumer 的 Consumer 方法參數中傳遞)
- IPublishEndpoint(可以從 DI 中獲取)
- IBusControl(最頂層的控制對象,用來啟動和停止 masstransit 的控制器)
IPublishEndpoint
public async Task NotifyOrderSubmitted(IPublishEndpoint publishEndpoint)
{
await publishEndpoint.Publish<OrderSubmitted>(new
{
OrderId = "27",
OrderDate = DateTime.UtcNow,
});
}
Request-Response 模式讓應用程序之間解耦之後,依然采用同步的方式
- Consumer
- IClientFactory
- IRequestClient
- Send a request
public async Task Consume(ConsumeContext<CheckOrderStatus> context)
{
var order = await _orderRepository.Get(context.Message.OrderId);
if (order == null)
throw new InvalidOperationException("Order not found");
await context.RespondAsync<OrderStatusResult>(new
{
OrderId = order.Id,
order.Timestamp,
order.StatusCode,
order.StatusText
});
}
需要處理返回類型 OrderStatusResult,異步方式模拟同步,實際上同樣有消息隊列,消費者處理過程
IClientFactory
public interface IClientFactory
{
IRequestClient<T> CreateRequestClient<T>(ConsumeContext context, Uri destinationAddress, RequestTimeout timeout);
IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout);
RequestHandle<T> CreateRequest<T>(T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
RequestHandle<T> CreateRequest<T>(ConsumeContext context, T request, Uri destinationAddress, CancellationToken cancellationToken, RequestTimeout timeout);
}
通過 IBusControl 的 CreateClientFactory 方法可以得到 ClientFactory
IRequestClient
public interface IRequestClient<TRequest>
where TRequest : class
{
RequestHandle<TRequest> Create(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
Task<Response<T>> GetResponse<T>(TRequest request, CancellationToken cancellationToken, RequestTimeout timeout);
}
RequestClient 可以創建請求,或者直接獲得響應
Send a request
var serviceAddress = new Uri("rabbitmq://localhost/check-order-status");
var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress);
var response = await client.GetResponse<OrderStatusResult>(new { OrderId = id});