Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 65 additions & 6 deletions src/leveled_bookie.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@
book_loglevel/2,
book_addlogs/2,
book_removelogs/2,
book_headstatus/1
book_headstatus/1,
book_status/1
]).

%% folding API
Expand Down Expand Up @@ -1316,6 +1317,24 @@ book_removelogs(Pid, ForcedLogs) ->
book_headstatus(Pid) ->
gen_server:call(Pid, head_status, infinity).

-spec book_status(pid()) -> map().
%% @doc
%% Return a proplist containing the following items:
%% * current size of the ledger cache;
%% * number of active journal files;
%% * average compaction score for the journal;
%% * current distribution of files across the ledger (e.g. count of files by level);
%% * current size of the penciller in-memory cache;
%% * penciller work backlog status;
%% * last merge time (penciller);
%% * last compaction time (journal);
%% * last compaction result (journal) e.g. files compacted and compaction score;
%% * ratio of metadata to object size (recent PUTs);
%% * PUT/GET/HEAD recent time/count metrics;
%% * mean level for recent fetches.
book_status(Pid) ->
gen_server:call(Pid, status, infinity).

%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
Expand Down Expand Up @@ -1475,7 +1494,8 @@ handle_call(
State#state.cache_size,
State#state.cache_multiple,
Cache0,
State#state.penciller
State#state.penciller,
State#state.monitor
)
of
{ok, Cache} ->
Expand Down Expand Up @@ -1509,7 +1529,8 @@ handle_call({mput, ObjectSpecs, TTL}, From, State) when
State#state.cache_size,
State#state.cache_multiple,
Cache0,
State#state.penciller
State#state.penciller,
State#state.monitor
)
of
{ok, Cache} ->
Expand Down Expand Up @@ -1686,7 +1707,8 @@ handle_call({compact_journal, Timeout}, From, State) when
State#state.cache_size,
State#state.cache_multiple,
State#state.ledger_cache,
State#state.penciller
State#state.penciller,
State#state.monitor
)
of
{_, NewCache} ->
Expand Down Expand Up @@ -1740,6 +1762,8 @@ handle_call(return_actors, _From, State) ->
{reply, {ok, State#state.inker, State#state.penciller}, State};
handle_call(head_status, _From, State) ->
{reply, {State#state.head_only, State#state.head_lookup}, State};
handle_call(status, _From, State) ->
{reply, status(State), State};
handle_call(Msg, _From, State) ->
{reply, {unsupported_message, element(1, Msg)}, State}.

Expand Down Expand Up @@ -2877,7 +2901,11 @@ check_in_ledgercache(PK, Hash, Cache, loader) ->
end.

-spec maybepush_ledgercache(
pos_integer(), pos_integer(), ledger_cache(), pid()
pos_integer(),
pos_integer(),
ledger_cache(),
pid(),
leveled_monitor:monitor()
) ->
{ok | returned, ledger_cache()}.
%% @doc
Expand All @@ -2890,9 +2918,12 @@ check_in_ledgercache(PK, Hash, Cache, loader) ->
%% in the reply. Try again later when it isn't busy (and also potentially
%% implement a slow_offer state to slow down the pace at which PUTs are being
%% received)
maybepush_ledgercache(MaxCacheSize, MaxCacheMult, Cache, Penciller) ->
maybepush_ledgercache(
MaxCacheSize, MaxCacheMult, Cache, Penciller, {Monitor, _}
) ->
Tab = Cache#ledger_cache.mem,
CacheSize = ets:info(Tab, size),
leveled_monitor:add_stat(Monitor, {ledger_cache_size_update, CacheSize}),
TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize, MaxCacheMult),
if
TimeToPush ->
Expand Down Expand Up @@ -3048,6 +3079,34 @@ maybelog_snap_timing({Pid, _StatsFreq}, BookieTime, PCLTime) when
maybelog_snap_timing(_Monitor, _, _) ->
ok.

status(#state{monitor = {no_monitor, 0}}) ->
#{};
status(#state{monitor = {Monitor, _}}) ->
AllZeros =
#{
ledger_cache_size => undefined,
n_active_journal_files => undefined,
avg_compaction_score => undefined,
level_files_count => undefined,
penciller_inmem_cache_size => undefined,
penciller_work_backlog_status => undefined,
penciller_last_merge_time => undefined,
journal_last_compaction_time => undefined,
journal_last_compaction_result => undefined,
get_sample_count => undefined,
get_body_time => undefined,
head_sample_count => undefined,
head_rsp_time => undefined,
put_sample_count => undefined,
put_prep_time => undefined,
put_ink_time => undefined,
put_mem_time => undefined,
fetch_count_by_level => undefined
},
maps:merge(
AllZeros, leveled_monitor:get_bookie_status(Monitor)
).

%%%============================================================================
%%% Test
%%%============================================================================
Expand Down
10 changes: 10 additions & 0 deletions src/leveled_cdb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,8 @@ starting({call, From}, {open_writer, Filename}, State) ->
{next_state, writer, State0, [{reply, From, ok}, hibernate]};
starting({call, From}, {open_reader, Filename}, State) ->
leveled_log:save(State#state.log_options),
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}),
?STD_LOG(cdb02, [Filename]),
{Handle, Index, LastKey} = open_for_readonly(Filename, false),
State0 = State#state{
Expand All @@ -504,6 +506,8 @@ starting({call, From}, {open_reader, Filename}, State) ->
{next_state, reader, State0, [{reply, From, ok}, hibernate]};
starting({call, From}, {open_reader, Filename, LastKey}, State) ->
leveled_log:save(State#state.log_options),
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}),
?STD_LOG(cdb02, [Filename]),
{Handle, Index, LastKey} = open_for_readonly(Filename, LastKey),
State0 = State#state{
Expand Down Expand Up @@ -880,6 +884,8 @@ delete_pending(
) when
?IS_DEF(FN), ?IS_DEF(IO)
->
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, -1}),
?STD_LOG(cdb04, [FN, State#state.delete_point]),
close_pendingdelete(IO, FN, State#state.waste_path),
{stop, normal};
Expand All @@ -906,6 +912,10 @@ delete_pending(
),
{keep_state_and_data, [?DELETE_TIMEOUT]};
false ->
{Monitor, _} = State#state.monitor,
leveled_monitor:add_stat(
Monitor, {n_active_journal_files_update, -1}
),
?STD_LOG(cdb04, [FN, ManSQN]),
close_pendingdelete(IO, FN, State#state.waste_path),
{stop, normal}
Expand Down
46 changes: 36 additions & 10 deletions src/leveled_iclerk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ handle_cast(
{noreply, State#state{scored_files = [], scoring_state = ScoringState}};
handle_cast(
{score_filelist, [Entry | Tail]},
State = #state{scoring_state = ScoringState}
State = #state{scoring_state = ScoringState, cdb_options = CDBOpts}
) when
?IS_DEF(ScoringState)
->
Expand All @@ -376,7 +376,8 @@ handle_cast(
ScoringState#scoring_state.max_sqn,
?SAMPLE_SIZE,
?BATCH_SIZE,
State#state.reload_strategy
State#state.reload_strategy,
CDBOpts#cdb_options.monitor
);
{CachedScore, true, _ScoreOneIn} ->
% If caches are used roll the score towards the current score
Expand All @@ -391,7 +392,8 @@ handle_cast(
ScoringState#scoring_state.max_sqn,
?SAMPLE_SIZE,
?BATCH_SIZE,
State#state.reload_strategy
State#state.reload_strategy,
CDBOpts#cdb_options.monitor
),
(NewScore + CachedScore) / 2;
{CachedScore, false, _ScoreOneIn} ->
Expand Down Expand Up @@ -424,6 +426,11 @@ handle_cast(
{MaxRunLength, State#state.maxrunlength_compactionperc,
State#state.singlefile_compactionperc},
{BestRun0, Score} = assess_candidates(Candidates, ScoreParams),
{Monitor, _} = CDBopts#cdb_options.monitor,
leveled_monitor:add_stat(
Monitor,
{journal_last_compaction_result_update, {length(BestRun0), Score}}
),
?TMR_LOG(ic003, [Score, length(BestRun0)], SW),
case Score > 0.0 of
true ->
Expand Down Expand Up @@ -469,6 +476,12 @@ handle_cast(
->
FilesToDelete =
leveled_imanifest:find_persistedentries(PersistedSQN, ManifestAsList),
CDBopts = State#state.cdb_options,
{Monitor, _} = CDBopts#cdb_options.monitor,
leveled_monitor:add_stat(
Monitor,
{journal_last_compaction_time_update, os:system_time(millisecond)}
),
?STD_LOG(ic007, []),
ok = leveled_inker:ink_clerkcomplete(Ink, [], FilesToDelete),
{noreply, State};
Expand Down Expand Up @@ -591,7 +604,8 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) ->
leveled_codec:sqn(),
non_neg_integer(),
non_neg_integer(),
leveled_codec:compaction_strategy()
leveled_codec:compaction_strategy(),
leveled_monitor:monitor()
) ->
float().
%% @doc
Expand All @@ -612,7 +626,8 @@ check_single_file(
MaxSQN,
SampleSize,
BatchSize,
ReloadStrategy
ReloadStrategy,
{Monitor, _}
) ->
FN = leveled_cdb:cdb_filename(CDB),
SW = os:timestamp(),
Expand All @@ -626,6 +641,7 @@ check_single_file(
MaxSQN,
ReloadStrategy
),
leveled_monitor:add_stat(Monitor, {avg_compaction_score_update, Score}),
safely_log_filescore(PositionList, FN, Score, SW),
Score.

Expand Down Expand Up @@ -1262,14 +1278,22 @@ check_single_file_test() ->
replaced
end
end,
Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
Score1 = check_single_file(
CDB, LedgerFun1, LedgerSrv1, 9, 8, 4, RS, {no_monitor, 0}
),
?assertMatch(37.5, Score1),
LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> current end,
Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS),
Score2 = check_single_file(
CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS, {no_monitor, 0}
),
?assertMatch(100.0, Score2),
Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3, RS),
Score3 = check_single_file(
CDB, LedgerFun1, LedgerSrv1, 9, 8, 3, RS, {no_monitor, 0}
),
?assertMatch(37.5, Score3),
Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4, RS),
Score4 = check_single_file(
CDB, LedgerFun1, LedgerSrv1, 4, 8, 4, RS, {no_monitor, 0}
),
?assertMatch(75.0, Score4),
ok = leveled_cdb:cdb_deletepending(CDB),
ok = leveled_cdb:cdb_destroy(CDB).
Expand Down Expand Up @@ -1414,7 +1438,9 @@ compact_empty_file_test() ->
{3, {o, "Bucket", "Key3", null}}
],
LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> replaced end,
Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
Score1 = check_single_file(
CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS, {no_monitor, 0}
),
?assert((+0.0 =:= Score1) orelse (-0.0 =:= Score1)),
ok = leveled_cdb:cdb_deletepending(CDB2),
ok = leveled_cdb:cdb_destroy(CDB2).
Expand Down
Loading