Use a connection pool instead of creating new connections

pull/956/head
Bond-009 5 years ago committed by Bond_009
parent cec22ad10d
commit c30ba14c1f

@ -43,7 +43,8 @@ namespace Emby.Server.Implementations.Activity
private void InitializeInternal()
{
using (var connection = CreateConnection())
CreateConnections().GetAwaiter().GetResult();
using (var connection = GetConnection())
{
RunDefaultInitialization(connection);
@ -57,7 +58,7 @@ namespace Emby.Server.Implementations.Activity
}
}
private void TryMigrate(SQLiteDatabaseConnection connection)
private void TryMigrate(ManagedConnection connection)
{
try
{
@ -85,7 +86,7 @@ namespace Emby.Server.Implementations.Activity
throw new ArgumentNullException(nameof(entry));
}
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{
@ -123,7 +124,7 @@ namespace Emby.Server.Implementations.Activity
throw new ArgumentNullException(nameof(entry));
}
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{
@ -157,7 +158,7 @@ namespace Emby.Server.Implementations.Activity
public QueryResult<ActivityLogEntry> GetActivityLogEntries(DateTime? minDate, bool? hasUserId, int? startIndex, int? limit)
{
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
var commandText = BaseActivitySelectText;
var whereClauses = new List<string>();

@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
@ -27,98 +28,93 @@ namespace Emby.Server.Implementations.Data
internal static int ThreadSafeMode { get; set; }
protected virtual ConnectionFlags DefaultConnectionFlags => ConnectionFlags.SharedCached | ConnectionFlags.FullMutex;
private readonly SemaphoreSlim WriteLock = new SemaphoreSlim(1, 1);
private SQLiteDatabaseConnection WriteConnection;
private readonly BlockingCollection<SQLiteDatabaseConnection> ReadConnectionPool = new BlockingCollection<SQLiteDatabaseConnection>();
static BaseSqliteRepository()
{
ThreadSafeMode = raw.sqlite3_threadsafe();
}
private static bool _versionLogged;
private string _defaultWal;
protected SQLiteDatabaseConnection CreateConnection(bool isReadOnly = false)
protected async Task CreateConnections()
{
if (!_versionLogged)
{
_versionLogged = true;
Logger.LogInformation("Sqlite version: " + SQLite3.Version);
Logger.LogInformation("Sqlite compiler options: " + string.Join(",", SQLite3.CompilerOptions));
}
ConnectionFlags connectionFlags;
if (isReadOnly)
{
//Logger.LogInformation("Opening read connection");
//connectionFlags = ConnectionFlags.Create;
connectionFlags = ConnectionFlags.ReadOnly;
}
else
{
//Logger.LogInformation("Opening write connection");
connectionFlags = ConnectionFlags.Create;
connectionFlags |= ConnectionFlags.ReadWrite;
}
connectionFlags |= ConnectionFlags.SharedCached;
connectionFlags |= ConnectionFlags.FullMutex;
var db = SQLite3.Open(DbFilePath, connectionFlags, null);
await WriteLock.WaitAsync().ConfigureAwait(false);
try
{
if (string.IsNullOrWhiteSpace(_defaultWal))
if (WriteConnection == null)
{
_defaultWal = db.Query("PRAGMA journal_mode").SelectScalarString().First();
Logger.LogInformation("Default journal_mode for {0} is {1}", DbFilePath, _defaultWal);
WriteConnection = SQLite3.Open(
DbFilePath,
DefaultConnectionFlags | ConnectionFlags.Create | ConnectionFlags.ReadWrite,
null);
}
var queries = new List<string>
if (string.IsNullOrWhiteSpace(_defaultWal))
{
//"PRAGMA cache size=-10000"
//"PRAGMA read_uncommitted = true",
//"PRAGMA synchronous=Normal"
};
_defaultWal = WriteConnection.Query("PRAGMA journal_mode").SelectScalarString().First();
if (CacheSize.HasValue)
{
queries.Add("PRAGMA cache_size=" + CacheSize.Value.ToString(CultureInfo.InvariantCulture));
Logger.LogInformation("Default journal_mode for {0} is {1}", DbFilePath, _defaultWal);
}
if (EnableTempStoreMemory)
{
queries.Add("PRAGMA temp_store = memory");
WriteConnection.Execute("PRAGMA temp_store = memory");
}
else
{
queries.Add("PRAGMA temp_store = file");
}
foreach (var query in queries)
{
db.Execute(query);
WriteConnection.Execute("PRAGMA temp_store = file");
}
}
catch
{
using (db)
{
}
throw;
}
finally
{
WriteLock.Release();
}
return db;
// Add one reading connection for each thread
int threads = System.Environment.ProcessorCount;
for (int i = 0; i <= threads; i++)
{
ReadConnectionPool.Add(SQLite3.Open(DbFilePath, DefaultConnectionFlags | ConnectionFlags.ReadOnly, null));
}
}
public IStatement PrepareStatement(SQLiteDatabaseConnection connection, string sql)
protected ManagedConnection GetConnection(bool isReadOnly = false)
{
if (isReadOnly)
{
return new ManagedConnection(ReadConnectionPool.Take(), ReadConnectionPool);
}
else
{
if (WriteConnection == null)
{
throw new InvalidOperationException("Can't access the write connection at this time.");
}
WriteLock.Wait();
return new ManagedConnection(WriteConnection, WriteLock);
}
}
public IStatement PrepareStatement(ManagedConnection connection, string sql)
{
return connection.PrepareStatement(sql);
}
public IStatement PrepareStatementSafe(SQLiteDatabaseConnection connection, string sql)
public IStatement PrepareStatementSafe(ManagedConnection connection, string sql)
{
return connection.PrepareStatement(sql);
}
@ -143,7 +139,7 @@ namespace Emby.Server.Implementations.Data
return sql.Select(connection.PrepareStatement).ToList();
}
protected bool TableExists(SQLiteDatabaseConnection connection, string name)
protected bool TableExists(ManagedConnection connection, string name)
{
return connection.RunInTransaction(db =>
{
@ -163,7 +159,7 @@ namespace Emby.Server.Implementations.Data
}, ReadTransactionMode);
}
protected void RunDefaultInitialization(SQLiteDatabaseConnection db)
protected void RunDefaultInitialization(ManagedConnection db)
{
var queries = new List<string>
{
@ -192,9 +188,7 @@ namespace Emby.Server.Implementations.Data
Logger.LogInformation("PRAGMA synchronous=" + db.Query("PRAGMA synchronous").SelectScalarString().First());
}
protected virtual bool EnableTempStoreMemory => false;
protected virtual int? CacheSize => null;
protected virtual bool EnableTempStoreMemory => true;
private bool _disposed;
protected void CheckDisposed()

@ -0,0 +1,87 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using SQLitePCL.pretty;
namespace Emby.Server.Implementations.Data
{
public class ManagedConnection : IDisposable
{
private SQLiteDatabaseConnection _db;
private SemaphoreSlim _writeLock;
private BlockingCollection<SQLiteDatabaseConnection> _readConPool;
private bool _disposed = false;
public ManagedConnection(SQLiteDatabaseConnection db, SemaphoreSlim writeLock)
{
_db = db;
_writeLock = writeLock;
}
public ManagedConnection(SQLiteDatabaseConnection db, BlockingCollection<SQLiteDatabaseConnection> queue)
{
_db = db;
_readConPool = queue;
}
public IStatement PrepareStatement(string sql)
{
return _db.PrepareStatement(sql);
}
public IEnumerable<IStatement> PrepareAll(string sql)
{
return _db.PrepareAll(sql);
}
public void ExecuteAll(string sql)
{
_db.ExecuteAll(sql);
}
public void Execute(string sql, params object[] values)
{
_db.Execute(sql, values);
}
public void RunQueries(string[] sql)
{
_db.RunQueries(sql);
}
public void RunInTransaction(Action<IDatabaseConnection> action, TransactionMode mode)
{
_db.RunInTransaction(action, mode);
}
public T RunInTransaction<T>(Func<IDatabaseConnection, T> action, TransactionMode mode)
{
return _db.RunInTransaction(action, mode);
}
public IEnumerable<IReadOnlyList<IResultSetValue>> Query(string sql)
{
return _db.Query(sql);
}
public IEnumerable<IReadOnlyList<IResultSetValue>> Query(string sql, params object[] values)
{
return _db.Query(sql, values);
}
public void Dispose()
{
if (_disposed)
{
return;
}
_writeLock?.Release();
_readConPool?.Add(_db);
_db = null; // Don't dispose it
_disposed = true;
}
}
}

@ -61,7 +61,8 @@ namespace Emby.Server.Implementations.Data
/// <returns>Task.</returns>
private void InitializeInternal()
{
using (var connection = CreateConnection())
CreateConnections().GetAwaiter().GetResult();
using (var connection = GetConnection())
{
RunDefaultInitialization(connection);
@ -98,7 +99,7 @@ namespace Emby.Server.Implementations.Data
cancellationToken.ThrowIfCancellationRequested();
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{
@ -139,7 +140,7 @@ namespace Emby.Server.Implementations.Data
cancellationToken.ThrowIfCancellationRequested();
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{
@ -168,7 +169,7 @@ namespace Emby.Server.Implementations.Data
var guidId = displayPreferencesId.GetMD5();
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
using (var statement = connection.PrepareStatement("select data from userdisplaypreferences where id = @id and userId=@userId and client=@client"))
{
@ -199,7 +200,7 @@ namespace Emby.Server.Implementations.Data
{
var list = new List<DisplayPreferences>();
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
using (var statement = connection.PrepareStatement("select data from userdisplaypreferences where userId=@userId"))
{

@ -92,8 +92,6 @@ namespace Emby.Server.Implementations.Data
private const string ChaptersTableName = "Chapters2";
protected override int? CacheSize => 20000;
protected override bool EnableTempStoreMemory => true;
/// <summary>
@ -101,7 +99,8 @@ namespace Emby.Server.Implementations.Data
/// </summary>
public void Initialize(SqliteUserDataRepository userDataRepo, IUserManager userManager)
{
using (var connection = CreateConnection())
CreateConnections().GetAwaiter().GetResult();
using (var connection = GetConnection())
{
RunDefaultInitialization(connection);
@ -551,7 +550,7 @@ namespace Emby.Server.Implementations.Data
CheckDisposed();
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{
@ -602,7 +601,7 @@ namespace Emby.Server.Implementations.Data
tuples.Add((item, ancestorIds, topParent, userdataKey, inheritedTags));
}
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{
@ -1186,7 +1185,7 @@ namespace Emby.Server.Implementations.Data
CheckDisposed();
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
using (var statement = PrepareStatementSafe(connection, "select " + string.Join(",", _retriveItemColumns) + " from TypedBaseItems where guid = @guid"))
{
@ -1899,7 +1898,7 @@ namespace Emby.Server.Implementations.Data
{
CheckDisposed();
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
var list = new List<ChapterInfo>();
@ -1928,7 +1927,7 @@ namespace Emby.Server.Implementations.Data
{
CheckDisposed();
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
using (var statement = PrepareStatementSafe(connection, "select StartPositionTicks,Name,ImagePath,ImageDateModified from " + ChaptersTableName + " where ItemId = @ItemId and ChapterIndex=@ChapterIndex"))
{
@ -1997,7 +1996,7 @@ namespace Emby.Server.Implementations.Data
throw new ArgumentNullException(nameof(chapters));
}
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{
@ -2533,7 +2532,7 @@ namespace Emby.Server.Implementations.Data
commandText += " where " + string.Join(" AND ", whereClauses);
}
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
using (var statement = PrepareStatementSafe(connection, commandText))
{
@ -2602,7 +2601,7 @@ namespace Emby.Server.Implementations.Data
}
}
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
var list = new List<BaseItem>();
@ -2820,7 +2819,7 @@ namespace Emby.Server.Implementations.Data
statementTexts.Add(commandText);
}
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
return connection.RunInTransaction(db =>
{
@ -3052,7 +3051,7 @@ namespace Emby.Server.Implementations.Data
}
}
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
var list = new List<Guid>();
@ -3119,7 +3118,7 @@ namespace Emby.Server.Implementations.Data
}
var list = new List<Tuple<Guid, string>>();
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
using (var statement = PrepareStatementSafe(connection, commandText))
{
@ -3231,7 +3230,7 @@ namespace Emby.Server.Implementations.Data
statementTexts.Add(commandText);
}
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
return connection.RunInTransaction(db =>
{
@ -4862,7 +4861,7 @@ namespace Emby.Server.Implementations.Data
private void UpdateInheritedTags(CancellationToken cancellationToken)
{
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{
@ -4925,7 +4924,7 @@ where AncestorIdText not null and ItemValues.Value not null and ItemValues.Type
CheckDisposed();
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{
@ -4982,7 +4981,7 @@ where AncestorIdText not null and ItemValues.Value not null and ItemValues.Type
commandText += " order by ListOrder";
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
var list = new List<string>();
using (var statement = PrepareStatementSafe(connection, commandText))
@ -5019,7 +5018,7 @@ where AncestorIdText not null and ItemValues.Value not null and ItemValues.Type
commandText += " order by ListOrder";
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
var list = new List<PersonInfo>();
@ -5245,7 +5244,7 @@ where AncestorIdText not null and ItemValues.Value not null and ItemValues.Type
commandText += " Group By CleanValue";
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
var list = new List<string>();
@ -5431,7 +5430,7 @@ where AncestorIdText not null and ItemValues.Value not null and ItemValues.Type
statementTexts.Add(countText);
}
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
return connection.RunInTransaction(db =>
{
@ -5698,7 +5697,7 @@ where AncestorIdText not null and ItemValues.Value not null and ItemValues.Type
CheckDisposed();
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{
@ -5815,7 +5814,7 @@ where AncestorIdText not null and ItemValues.Value not null and ItemValues.Type
cmdText += " order by StreamIndex ASC";
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
var list = new List<MediaStream>();
@ -5859,7 +5858,7 @@ where AncestorIdText not null and ItemValues.Value not null and ItemValues.Type
cancellationToken.ThrowIfCancellationRequested();
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{

@ -34,7 +34,8 @@ namespace Emby.Server.Implementations.Data
/// <returns>Task.</returns>
public void Initialize(IUserManager userManager)
{
using (var connection = CreateConnection())
CreateConnections().GetAwaiter().GetResult();
using (var connection = GetConnection())
{
var userDatasTableExists = TableExists(connection, "UserDatas");
var userDataTableExists = TableExists(connection, "userdata");
@ -172,7 +173,7 @@ namespace Emby.Server.Implementations.Data
{
cancellationToken.ThrowIfCancellationRequested();
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{
@ -240,7 +241,7 @@ namespace Emby.Server.Implementations.Data
{
cancellationToken.ThrowIfCancellationRequested();
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{
@ -275,7 +276,7 @@ namespace Emby.Server.Implementations.Data
throw new ArgumentNullException(nameof(key));
}
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
using (var statement = connection.PrepareStatement("select key,userid,rating,played,playCount,isFavorite,playbackPositionTicks,lastPlayedDate,AudioStreamIndex,SubtitleStreamIndex from UserDatas where key =@Key and userId=@UserId"))
{
@ -321,7 +322,7 @@ namespace Emby.Server.Implementations.Data
var list = new List<UserItemData>();
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
using (var statement = connection.PrepareStatement("select key,userid,rating,played,playCount,isFavorite,playbackPositionTicks,lastPlayedDate,AudioStreamIndex,SubtitleStreamIndex from UserDatas where userId=@UserId"))
{

@ -40,7 +40,8 @@ namespace Emby.Server.Implementations.Data
/// <returns>Task.</returns>
public void Initialize()
{
using (var connection = CreateConnection())
CreateConnections().GetAwaiter().GetResult();
using (var connection = GetConnection())
{
RunDefaultInitialization(connection);
@ -60,7 +61,7 @@ namespace Emby.Server.Implementations.Data
}
}
private void TryMigrateToLocalUsersTable(SQLiteDatabaseConnection connection)
private void TryMigrateToLocalUsersTable(ManagedConnection connection)
{
try
{
@ -119,7 +120,7 @@ namespace Emby.Server.Implementations.Data
var serialized = _jsonSerializer.SerializeToBytes(user);
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{
@ -153,7 +154,7 @@ namespace Emby.Server.Implementations.Data
var serialized = _jsonSerializer.SerializeToBytes(user);
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{
@ -168,7 +169,7 @@ namespace Emby.Server.Implementations.Data
}
}
private User GetUser(Guid guid, SQLiteDatabaseConnection connection)
private User GetUser(Guid guid, ManagedConnection connection)
{
using (var statement = connection.PrepareStatement("select id,guid,data from LocalUsersv2 where guid=@guid"))
{
@ -206,7 +207,7 @@ namespace Emby.Server.Implementations.Data
{
var list = new List<User>();
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
foreach (var row in connection.Query("select id,guid,data from LocalUsersv2"))
{
@ -230,7 +231,7 @@ namespace Emby.Server.Implementations.Data
throw new ArgumentNullException(nameof(user));
}
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{

@ -23,7 +23,8 @@ namespace Emby.Server.Implementations.Security
public void Initialize()
{
using (var connection = CreateConnection())
CreateConnections().GetAwaiter().GetResult();
using (var connection = GetConnection())
{
RunDefaultInitialization(connection);
@ -48,7 +49,7 @@ namespace Emby.Server.Implementations.Security
}
}
private void TryMigrate(SQLiteDatabaseConnection connection, bool tableNewlyCreated)
private void TryMigrate(ManagedConnection connection, bool tableNewlyCreated)
{
try
{
@ -87,7 +88,7 @@ namespace Emby.Server.Implementations.Security
throw new ArgumentNullException(nameof(info));
}
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{
@ -119,7 +120,7 @@ namespace Emby.Server.Implementations.Security
throw new ArgumentNullException(nameof(info));
}
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{
@ -151,7 +152,7 @@ namespace Emby.Server.Implementations.Security
throw new ArgumentNullException(nameof(info));
}
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{
@ -248,7 +249,7 @@ namespace Emby.Server.Implementations.Security
var list = new List<AuthenticationInfo>();
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
return connection.RunInTransaction(db =>
{
@ -346,7 +347,7 @@ namespace Emby.Server.Implementations.Security
public DeviceOptions GetDeviceOptions(string deviceId)
{
using (var connection = CreateConnection(true))
using (var connection = GetConnection(true))
{
return connection.RunInTransaction(db =>
{
@ -378,7 +379,7 @@ namespace Emby.Server.Implementations.Security
throw new ArgumentNullException(nameof(options));
}
using (var connection = CreateConnection())
using (var connection = GetConnection())
{
connection.RunInTransaction(db =>
{

Loading…
Cancel
Save