Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ require 'rake/testtask'
Rake::TestTask.new 'test' do |t|
t.libs = %w(lib test)
t.pattern = "test/*_test.rb"
t.verbose = true
t.warning = true
end

# ==========================================================
Expand Down
5 changes: 2 additions & 3 deletions ext/semian/resource.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,12 @@ semian_resource_reset_workers(VALUE self)
VALUE
semian_resource_unregister_worker(VALUE self)
{
int ret;
semian_resource_t *res = NULL;

TypedData_Get_Struct(self, semian_resource_t, &semian_resource_type, res);

sem_meta_lock(res->sem_id);
ret = perform_semop(res->sem_id, SI_SEM_REGISTERED_WORKERS, -1, IPC_NOWAIT | SEM_UNDO, NULL);
dprintf("Unregistering worker for sem_id:%d", res->sem_id);
int ret = perform_semop(res->sem_id, SI_SEM_REGISTERED_WORKERS, -1, IPC_NOWAIT | SEM_UNDO, NULL);
sem_meta_unlock(res->sem_id);

if ( ret == -1) {
Expand Down
109 changes: 67 additions & 42 deletions ext/semian/sliding_window.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ static void init_fn(void* ptr)
res->max_size = 0;
res->length = 0;
res->start = 0;
res->end = 0;
}

static int
Expand Down Expand Up @@ -92,16 +91,17 @@ check_scale_factor_arg(VALUE scale_factor)
}

static VALUE
grow_window(semian_simple_sliding_window_shared_t* window, int new_max_size)
grow_window(int sem_id, semian_simple_sliding_window_shared_t* window, int new_max_size)
{
if (new_max_size > SLIDING_WINDOW_MAX_SIZE) return Qnil;

int end = window->max_size ? (window->start + window->length) % window->max_size : 0;
dprintf("Growing window - sem_id:%d start:%d end:%d length:%d max_size:%d new_max_size:%d", sem_id, window->start, end, window->length, window->max_size, new_max_size);

if (window->length == 0) {
window->start = 0;
window->end = 0;
} else if (window->end > window->start) {
} else if (end > window->start) {
// Easy case - the window doesn't wrap around
window->end = window->start + window->length;
} else {
// Hard case - the window wraps, and data might need to move
int offset = new_max_size - window->max_size;
Expand All @@ -125,24 +125,25 @@ static void swap(int *a, int *b) {
}

static VALUE
shrink_window(semian_simple_sliding_window_shared_t* window, int new_max_size)
shrink_window(int sem_id, semian_simple_sliding_window_shared_t* window, int new_max_size)
{
if (new_max_size > SLIDING_WINDOW_MAX_SIZE) return Qnil;

int new_length = (new_max_size > window->length) ? window->length : new_max_size;

dprintf("Shrinking window - start:%d end:%d length:%d max_size:%d", window->start, window->end, window->length, window->max_size);
int end = window->max_size ? (window->start + window->length) % window->max_size : 0;
dprintf("Shrinking window - sem_id:%d start:%d end:%d length:%d max_size:%d new_max_size:%d", sem_id, window->start, end, window->length, window->max_size, new_max_size);

if (window->length == 0) {
window->start = 0;
window->end = 0;
} else if (window->end > window->start) {
} else if (end > window->start) {
// Easy case - the window doesn't wrap around
window->start = window->start + new_length;
} else {
// Hard case - the window wraps, so re-index the data
// Adapted from http://www.cplusplus.com/reference/algorithm/rotate/
int first = 0;
int middle = (window->end - new_max_size + window->max_size) % window->max_size;
int middle = (end - new_max_size + window->max_size) % window->max_size;
int last = window->max_size;
int next = middle;
while (first != next) {
Expand All @@ -154,7 +155,6 @@ shrink_window(semian_simple_sliding_window_shared_t* window, int new_max_size)
}
}
window->start = 0;
window->end = new_length;
}

window->max_size = new_max_size;
Expand All @@ -164,18 +164,14 @@ shrink_window(semian_simple_sliding_window_shared_t* window, int new_max_size)
}

static VALUE
resize_window(semian_simple_sliding_window_shared_t* window, int new_max_size)
resize_window(int sem_id, semian_simple_sliding_window_shared_t* window, int new_max_size)
{
if (new_max_size > SLIDING_WINDOW_MAX_SIZE) return Qnil;

if (window->max_size < new_max_size) {
dprintf("Growing window to %d", new_max_size);
return grow_window(window, new_max_size);
return grow_window(sem_id, window, new_max_size);
} else if (window->max_size > new_max_size) {
dprintf("Shrinking window to %d", new_max_size);
return shrink_window(window, new_max_size);
} else {
dprintf("Not re-sizing window");
return shrink_window(sem_id, window, new_max_size);
}

return Qnil;
Expand All @@ -202,6 +198,7 @@ Init_SlidingWindow()
rb_define_alloc_func(cSlidingWindow, semian_simple_sliding_window_alloc);
rb_define_method(cSlidingWindow, "initialize_sliding_window", semian_simple_sliding_window_initialize, 3);
rb_define_method(cSlidingWindow, "size", semian_simple_sliding_window_size, 0);
rb_define_method(cSlidingWindow, "length", semian_simple_sliding_window_size, 0); // Alias
rb_define_method(cSlidingWindow, "resize_to", semian_simple_sliding_window_resize_to, 1);
rb_define_method(cSlidingWindow, "max_size", semian_simple_sliding_window_max_size_get, 0);
rb_define_method(cSlidingWindow, "max_size=", semian_simple_sliding_window_max_size_set, 1);
Expand Down Expand Up @@ -270,7 +267,7 @@ semian_simple_sliding_window_initialize(VALUE self, VALUE name, VALUE max_size,
int error_threshold = max(res->error_threshold, (int) ceil(workers * scale * res->error_threshold));

dprintf(" workers:%d scale:%0.2f error_threshold:%d", workers, scale, error_threshold);
resize_window(res->shmem, error_threshold);
resize_window(res->sem_id, res->shmem, error_threshold);
}
sem_meta_unlock(res->sem_id);

Expand Down Expand Up @@ -305,7 +302,7 @@ semian_simple_sliding_window_resize_to(VALUE self, VALUE new_size)

sem_meta_lock(res->sem_id);
{
retval = resize_window(res->shmem, new_max_size);
retval = resize_window(res->sem_id, res->shmem, new_max_size);
}
sem_meta_unlock(res->sem_id);

Expand Down Expand Up @@ -340,7 +337,7 @@ semian_simple_sliding_window_max_size_set(VALUE self, VALUE new_size)

sem_meta_lock(res->sem_id);
{
retval = resize_window(res->shmem, new_max_size);
retval = resize_window(res->sem_id, res->shmem, new_max_size);
}
sem_meta_unlock(res->sem_id);

Expand Down Expand Up @@ -393,13 +390,25 @@ semian_simple_sliding_window_clear(VALUE self)
dprintf("Clearing sliding window");
res->shmem->length = 0;
res->shmem->start = 0;
res->shmem->end = 0;
}
sem_meta_unlock(res->sem_id);

return self;
}

// Handy for debugging the sliding window, but too noisy for regular debugging.
/*
static void dprint_window(semian_simple_sliding_window_shared_t *window)
{
dprintf("---");
for (int i = 0; i < window->length; ++i) {
const int index = (window->start + i) % window->max_size;
dprintf(" %0d: data[%d] = %d", i, index, window->data[index]);
}
dprintf("---");
}
*/

VALUE
semian_simple_sliding_window_reject(VALUE self)
{
Expand All @@ -409,24 +418,40 @@ semian_simple_sliding_window_reject(VALUE self)

sem_meta_lock(res->sem_id);
{
// Store these values because we're going to be modifying the buffer.
int start = res->shmem->start;
int length = res->shmem->length;
dprintf("reject! - start:%d end:%d length:%d max_size:%d", res->shmem->start, res->shmem->end, res->shmem->length, res->shmem->max_size);

int cleared = 0;
for (int i = 0; i < length; ++i) {
int index = (start + i) % length;
int value = res->shmem->data[index];
VALUE y = rb_yield(RB_INT2NUM(value));
if (RTEST(y)) {
if (cleared++ != i) {
sem_meta_unlock(res->sem_id);
rb_raise(rb_eArgError, "reject! must delete monotonically");
semian_simple_sliding_window_shared_t *window = res->shmem;
const int start = window->start;
const int length = window->length;
const int max_size = window->max_size;

if (max_size && length) {
int wptr = (start + length + max_size - 1) % max_size;

// Walk the sliding window backward, from the last element to the first,
// pushing the entries to the back of the ring. When we've gone through
// every element, set the start pointer to the new location.
//
// Example, deleting "2":
// S E S E
// [x,x,0,1,2,3,x,x] --> [x,x,x,0,1,3,x,x]
// 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
//
// The runtime of this algorithm is theta(n), but n tends to be small.
//
dprintf("Before reject! start:%d length:%d max_size:%d", window->start, window->length, window->max_size);
for (int i = 0; i < length; ++i) {
const int rptr = (start + length + max_size - i - 1) % max_size;

const int value = window->data[rptr];
if (RTEST(rb_yield(RB_INT2NUM(value)))) {
window->length--;
} else {
window->data[wptr] = value;
wptr = (wptr + max_size - 1) % max_size;
}
res->shmem->start = (res->shmem->start + 1) % res->shmem->length;
res->shmem->length--;
}

window->start = (wptr + 1) % max_size;
dprintf("After reject! start:%d length:%d max_size:%d", window->start, window->length, window->max_size);
}
}
sem_meta_unlock(res->sem_id);
Expand All @@ -441,18 +466,18 @@ semian_simple_sliding_window_push(VALUE self, VALUE value)

sem_meta_lock(res->sem_id);
{
dprintf("Before: start:%d end:%d length:%d max_size:%d", res->shmem->start, res->shmem->end, res->shmem->length, res->shmem->max_size);
dprintf("Before: start:%d length:%d max_size:%d", res->shmem->start, res->shmem->length, res->shmem->max_size);
// If the window is full, make room by popping off the front.
if (res->shmem->length == res->shmem->max_size) {
res->shmem->length--;
res->shmem->start = (res->shmem->start + 1) % res->shmem->max_size;
}

// Push onto the back of the window.
int index = (res->shmem->start + res->shmem->length) % res->shmem->max_size;
res->shmem->length++;
res->shmem->data[res->shmem->end] = RB_NUM2INT(value);
dprintf("Pushed %d onto data[%d] (length %d)", RB_NUM2INT(value), res->shmem->end, res->shmem->length);
res->shmem->end = (res->shmem->end + 1) % res->shmem->max_size;
res->shmem->data[index] = RB_NUM2INT(value);
dprintf("Pushed %d onto data[%d] (length %d)", RB_NUM2INT(value), index, res->shmem->length);
}
sem_meta_unlock(res->sem_id);

Expand Down
1 change: 0 additions & 1 deletion ext/semian/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ typedef struct {
int max_size;
int length;
int start;
int end;
int data[SLIDING_WINDOW_MAX_SIZE];
} semian_simple_sliding_window_shared_t;

Expand Down
7 changes: 6 additions & 1 deletion ext/semian/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <stdarg.h>
#include <stdio.h>
#include <time.h>

#include <openssl/sha.h>
#include <ruby.h>
Expand All @@ -16,7 +17,11 @@
#define dprintf(fmt, ...) \
do { \
if (DEBUG_TEST) { \
printf("[DEBUG] %s:%d - " fmt "\n", __FILE__, __LINE__, ##__VA_ARGS__); \
const pid_t pid = getpid(); \
struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); \
struct tm t; localtime_r(&(ts.tv_sec), &t); \
char buf[128]; strftime(buf, sizeof(buf), "%H:%M:%S", &t); \
printf("%s.%ld [DEBUG] (%d): %s:%d - " fmt "\n", buf, ts.tv_nsec, pid, __FILE__, __LINE__, ##__VA_ARGS__); \
} \
} while (0)

Expand Down
1 change: 1 addition & 0 deletions lib/semian/circuit_breaker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ class CircuitBreaker #:nodoc:
extend Forwardable

def_delegators :@state, :closed?, :open?, :half_open?
def_delegators :@errors, :size, :max_size, :values

attr_reader :name, :half_open_resource_timeout, :error_timeout, :state, :last_error

Expand Down
2 changes: 1 addition & 1 deletion lib/semian/protected_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class ProtectedResource

def_delegators :@bulkhead, :destroy, :count, :semid, :tickets, :registered_workers
def_delegators :@circuit_breaker, :reset, :mark_failed, :mark_success, :request_allowed?,
:open?, :closed?, :half_open?
:open?, :closed?, :half_open?, :size, :max_size, :values

attr_reader :bulkhead, :circuit_breaker, :name
attr_accessor :updated_at
Expand Down
12 changes: 12 additions & 0 deletions lib/semian/resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ def count
0
end

def size
0
end

def max_size
0
end

def values
[]
end

def tickets
0
end
Expand Down
12 changes: 12 additions & 0 deletions lib/semian/unprotected_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ def count
0
end

def size
0
end

def max_size
0
end

def values
[]
end

def semid
0
end
Expand Down
30 changes: 30 additions & 0 deletions test/circuit_breaker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,35 @@ def setup
end
Semian.register(id, tickets: 1, exceptions: [SomeError], error_threshold: 2, error_timeout: 5, success_threshold: 1)
@resource = Semian[id]
@resource.reset
end

def test_destroy
id = Time.now.strftime('%H:%M:%S.%N')

# Create the resource and check that it was reset.
Semian.register(id, tickets: 1, exceptions: [SomeError], error_threshold: 2, error_timeout: 5, success_threshold: 1)
resource = Semian[id]
assert_equal(0, resource.size)
assert_equal(2, resource.max_size)
assert_equal([], resource.values)

# Open the circuit.
open_circuit!(resource, 2)
assert_equal(2, resource.size)
assert_equal(2, resource.max_size)

# Destroy the resource and check that it was destroyed.
Semian.destroy(id)
resource = Semian[id]
assert_nil(resource, "Resource was not destroyed")

# Re-create the resource and check that it was reset.
Semian.register(id, tickets: 1, exceptions: [SomeError], error_threshold: 2, error_timeout: 5, success_threshold: 1)
resource = Semian[id]
assert_equal(0, resource.size)
assert_equal(2, resource.max_size)
assert_equal([], resource.values)
end

def test_acquire_yield_when_the_circuit_is_closed
Expand Down Expand Up @@ -41,6 +70,7 @@ def test_after_error_timeout_is_elapsed_requests_are_attempted_again
end

def test_until_success_threshold_is_reached_a_single_error_will_reopen_the_circuit
assert_equal(0, @resource.size)
half_open_cicuit!
trigger_error!
assert_circuit_opened
Expand Down
4 changes: 3 additions & 1 deletion test/resource_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,12 @@ def test_sem_undo
end
end

# TODO(michaelkipper): Shouldn't need to rescue InternalError, this test
# should deterministically throw SyscallError.
def test_destroy
resource = create_resource :testing, tickets: 1
resource.destroy
assert_raises Semian::SyscallError do
assert_raises(Semian::InternalError, Semian::SyscallError) do
resource.acquire {}
end
end
Expand Down
Loading