using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Diagnostics;
using System.IO;
using System.Transactions;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Data.SQLite;
namespace RDH.PharmacyPlatform.Sync.Core
{
public class Bootstrap
{
#region Fields
#region Rabbitmq fields
private static ConnectionFactory _connectionFactory;
//客户端到MQ服务器的连接
private static IConnection _connection;
//与连接对象关联的通信通道
private static IModel _channel;
private static IBasicProperties _channelProperties;
//表示通信的消息队列
private static QueueDeclareOk _queue;
private static EventingBasicConsumer _cusumer;
#endregion
private static Config _config;
private static IMessageHandler _messageHandler;
private static ILogger _logger;
private static String _dbFileFullName;
private static String _dbConnectionString;
private static Thread _threadKeepMessageOut;
private static Thread _threadKeepMessageIn;
private static Boolean _isInited;
private const String _ConfigFilePath = "sync.config";
private const String _DbFilePath = "sync.db";
#endregion
#region Constructors
static Bootstrap()
{
_dbFileFullName = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, _DbFilePath);
_dbConnectionString = $"Data Source={_dbFileFullName};Version=3;";
}
~Bootstrap()
{
Exit();
}
#endregion
#region Properties
public static Config Config
{
get => _config;
internal set
{
_config = value;
}
}
#region Internal
internal static ConnectionFactory MqConnectionFacotry
{
get => _connectionFactory;
}
internal static IModel MqChannel
{
get => _channel;
}
internal static IBasicProperties ChannelProperties
{
get => _channelProperties;
}
internal static IMessageHandler MessageHandler
{
get => _messageHandler;
}
internal static String DbFileFullName
{
get => _dbFileFullName;
}
internal static String DbConnectionString
{
get => _dbConnectionString;
}
#endregion
#endregion
#region Methods
#region Public
///
/// 根据配置文件进行初始化
///
/// 错误信息
public static String Init()
{
if (!File.Exists(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, _ConfigFilePath)))
{
return "配置文件不存在";
}
String configContent = File.ReadAllText(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, _ConfigFilePath));
Config config = null;
//try
{
config = JsonSerializer.DeserializeObject(configContent);
}
//catch (Exception ex)
//{
// Log($"{ex.Message}\n{ex.StackTrace}");
// return "配置格式不正确:" + ex.Message;
//}
return Init(config);
}
///
/// 根据指定的配置进行初始化
///
/// 错误信息
public static String Init(Config config)
{
InitDatabase();
if (_config != null)
{
return null;
}
if (config == null)
{
return "参数不能为空";
}
_config = config;
#region 初始化连接基础参数
if (_connectionFactory == null)
{
_connectionFactory = new ConnectionFactory();
_connectionFactory.UserName = config.ServerUserName;
_connectionFactory.Password = config.ServerUserPassword;
_connectionFactory.HostName = config.ServerIpHost;
_connectionFactory.Port = config.ServerSyncPort;
//开启自动恢复连接
_connectionFactory.AutomaticRecoveryEnabled = true;
}
#endregion
try
{
#region 创建连接对象并初始化参数、订阅等
_connection = Bootstrap.MqConnectionFacotry.CreateConnection();
_channel = _connection.CreateModel();
_channelProperties = _channel.CreateBasicProperties();
_channelProperties.DeliveryMode = 2;
_channelProperties.ContentType = "text/plain";
_channel.BasicReturn += _channel_BasicReturn;
_channel.BasicAcks += _channel_BasicAcks;
_channel.BasicNacks += _channel_BasicNacks;
_queue = _channel.QueueDeclare(Bootstrap.Config.LocalNodeId, true, true, false);
_cusumer = new EventingBasicConsumer(_channel);
//autoAck
// true:表示消息服务器收到消息后,自动生成消息回复
// false:表示消费端收到信息后,需要主动进行回复
_channel.BasicConsume(_queue.QueueName, false, _cusumer);
_cusumer.Received += _cusumer_Received;
#endregion
_isInited = true;
}
catch (Exception ex)
{
_config = null;
Log($"{ex.Message}\n{ex.StackTrace}");
return ex.Message;
}
_threadKeepMessageIn = new Thread(ThreadKeepMessageInHandler);
_threadKeepMessageIn.IsBackground = true;
_threadKeepMessageIn.Start();
_threadKeepMessageOut = new Thread(ThreadKeepMessageOutHandler);
_threadKeepMessageOut.IsBackground = true;
_threadKeepMessageOut.Start();
return null;
}
public static void Exit()
{
_isInited = false;
CloseConnection();
}
///
/// 注册消息处理器
///
public static void RegisteMessageHandler(IMessageHandler handler)
{
_messageHandler = handler;
}
///
/// 注册日志处理器
///
public static void RegisteLogger(ILogger logger)
{
_logger = logger;
}
#endregion
#region Internal
//初始化本地数据库
internal static void InitDatabase()
{
if (!File.Exists(_dbFileFullName))
{
Boolean createFlag = true;
Log("创建消息数据库");
try
{
#region 创建数据库
SQLiteConnection.CreateFile(_dbFileFullName);
Log("创建消息表");
using (SQLiteConnection conn = new SQLiteConnection(_dbConnectionString))
{
conn.Open();
SQLiteCommand cmd = conn.CreateCommand();
cmd.CommandType = System.Data.CommandType.Text;
cmd.CommandText = "CREATE TABLE T_Message_Data_In ("
+ " Create_Time DATETIME NOT NULL,"
+ " Source_Id VARCHAR(50) NOT NULL,"
+ " Message_Content VARCHAR(5000),"
+ " Message_Content_Type VARCHAR(50),"
+ " Filter_Type INTEGER(1) NOT NULL,"
+ " Target_Id VARCHAR(50) NOT NULL)";
cmd.ExecuteNonQuery();
cmd.CommandText = "CREATE TABLE T_Message_Data_Out ("
+ " Create_Time DATETIME NOT NULL,"
+ " Source_Id VARCHAR(50) NOT NULL,"
+ " Message_Content VARCHAR(5000),"
+ " Message_Content_Type VARCHAR(50),"
+ " Filter_Type INTEGER(1) NOT NULL,"
+ " Target_Id VARCHAR(50) NOT NULL)";
cmd.ExecuteNonQuery();
cmd.CommandText = "CREATE TABLE T_Nodes ("
+ " Node_Id VARCHAR(50) PRIMARY KEY,"
+ " Group_Name VARCHAR(50),"
+ " Create_Time DATETIME)";
cmd.ExecuteNonQuery();
}
#endregion
}
catch (Exception ex)
{
createFlag = false;
Log($"{ex.Message}\n{ex.StackTrace}");
}
if (!createFlag)
{
try
{
if (File.Exists(_dbFileFullName))
{
File.Delete(_dbFileFullName);
}
}
catch
{
}
}
}
}
internal static void Log(String content)
{
if (_logger != null)
{
_logger.Log(content);
}
else
{
Debug.WriteLine(String.Format("{0} TID:{1} Log:{2}",
DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"),
Thread.CurrentThread.ManagedThreadId.ToString(),
content));
}
}
#endregion
#region Private
private async static void _cusumer_Received(object sender, BasicDeliverEventArgs e)
{
Log("_cusumer_Received");
if (Bootstrap.MessageHandler != null)
{
try
{
String content = Encoding.UTF8.GetString(e.Body.ToArray());
Log($"Content:{content}");
MessageData data = JsonSerializer.DeserializeObject(content);
//接收到消息后即时将消息持久化到本地
using (ConnectionSessionScope conn = new ConnectionSessionScope(_dbConnectionString))
{
MessageDataInDAL messageDataInDAL = new MessageDataInDAL();
messageDataInDAL.InsertModel(data);
}
_channel.BasicAck(e.DeliveryTag, false);
}
catch (Exception ex)
{
Bootstrap.Log($"{ex.Message}\n{ex.StackTrace}");
}
}
else
{
Log("未注册MessageHandler");
}
}
private static void _channel_BasicAcks(object sender, BasicAckEventArgs e)
{
Log("_channel_BasicAcks");
}
private static void _channel_BasicNacks(object sender, BasicNackEventArgs e)
{
Log("_channel_BasicNacks");
}
//处理消息发送后,未找到目标的处理
private static void _channel_BasicReturn(object sender, BasicReturnEventArgs e)
{
Log("_channel_BasicReturn");
MessageData data = JsonSerializer.DeserializeObject(Encoding.UTF8.GetString(e.Body.ToArray()));
data.CreateTime = DateTime.Now;
MessageClient.SendMessage(data);
}
//本地消息发送的线程处理
private static void ThreadKeepMessageOutHandler()
{
MessageDataOutDAL messageDataOutDAL = new MessageDataOutDAL();
while (_isInited)
{
MessageData messageOut = null;
//Debug.WriteLine($"打开连接, id:{Thread.CurrentThread.ManagedThreadId}");
using (ConnectionSessionScope conn = new ConnectionSessionScope(_dbConnectionString))
{
try
{
do
{
messageOut = messageDataOutDAL.Peek();
if (messageOut != null)
{
Debug.WriteLine($"读取到待发送的消息, id:{Thread.CurrentThread.ManagedThreadId}");
String sendState = MessageClient.InternalSendMessage(messageOut);
if (sendState == null)
{
Debug.WriteLine($"发送成功,删除消息, id:{Thread.CurrentThread.ManagedThreadId}");
//Boolean state = Bootstrap.MqChannel.WaitForConfirms();
messageDataOutDAL.DeleteModel(messageOut);
}
else
{
Log("发送消息失败:" + sendState);
}
Thread.Sleep(1);
}
else
{
//Debug.WriteLine($"没有待发送的消息, id:{Thread.CurrentThread.ManagedThreadId}");
break;
}
} while (true);
}
catch (Exception ex)
{
Log($"{ex.Message}\n{ex.StackTrace}");
}
}
Thread.Sleep(1000);
}
}
//本地消息响应的线程处理
private static void ThreadKeepMessageInHandler()
{
Debug.WriteLine($"ThreadKeepMessageInHandler start, id:{Thread.CurrentThread.ManagedThreadId}");
MessageDataInDAL messageDataInDAL = new MessageDataInDAL();
while (_isInited)
{
MessageData messageIn = null;
try
{
using (ConnectionSessionScope conn = new ConnectionSessionScope(_dbConnectionString))
{
do
{
messageIn = messageDataInDAL.Peek();
if (messageIn != null)
{
Debug.WriteLine($"读取到待处理的消息, id:{Thread.CurrentThread.ManagedThreadId}");
String handleState = _messageHandler.ReceivedMessage(messageIn).Result;
Debug.WriteLine($"处理消息 end, id:{Thread.CurrentThread.ManagedThreadId}");
if (handleState == null)
{
Debug.WriteLine($"处理成功,删除消息, id:{Thread.CurrentThread.ManagedThreadId}");
messageDataInDAL.DeleteModel(messageIn);
}
else
{
MessageData copy = new MessageData()
{
CreateTime = DateTime.Now,
MessageContent = messageIn.MessageContent,
MessageContentType = messageIn.MessageContentType,
FilterType = messageIn.FilterType,
SourceId = messageIn.SourceId,
TargetId = messageIn.TargetId,
};
conn.BeginTransaction();
messageDataInDAL.DeleteModel(messageIn);
messageDataInDAL.InsertModel(copy);
conn.CommitTransaction();
}
Thread.Sleep(1);
}
else
{
//Debug.WriteLine($"没有待处理的消息, id:{Thread.CurrentThread.ManagedThreadId}");
break;
}
} while (true);
}
}
catch (Exception ex)
{
Log($"{ex.Message}\n{ex.StackTrace}");
}
Thread.Sleep(1000);
}
}
private static void CloseConnection()
{
if (_channel != null)
{
_channel.Close();
_channel = null;
}
if (_connection != null)
{
_connection.Close();
_connection = null;
}
}
#endregion
#endregion
}
}