生产者缓存
⑴ ActiveMQ是什么是干什么用的
1、ActiviMq消息队列,解决了服务解耦合的动作,缓解了服务并发量很大,造成服务器无法处理的状况。(kafka、rabbitMQ、activiMQ)
其他作用:异步处理、消息通讯、流量消峰、应用解耦
应用场景:
1、用户注册的时候,重点内容是将用户信息保存到数据库中,发短信验证或者是发邮件增加了业务的复杂度。这时使用MQ将发短信、发邮件通知MQ由另外的服务平台完成。
2、搜索平台、缓存平台
查询数据,建立缓存、索引,不从数据库查询,从缓存或者索引库查询,当数据库发生增加、修改、删除操作时发消息给MQ,缓存平台或者是索引平台从MQ获取到这个消息,更新缓存或者索引。
ActiveMQ使用的是标准的生产者(完成生产消息并发送消息)和消费者(获取消息,完成自己的业务逻辑)模型
有两种数据结构
Topic(发布订阅) 一个生产者对应多个消费者,消息默认不会持久化,需要手动配置持久化。如果A服务器挂了,再生产一条消息的话,会被B服务器拿去使用,就算重新启动,A服务器也不会再拿到消息了
商品系统、库存系统、生成商品详情页面的系统,现在要添加一个商品信息,消息肯定是需要让库存系统以及商品信息详情页面系统知道的。
Queue(点对点)一个生产者对应一个消费者,默认消息持久化
StringMessage
mapMessage
byteMessage
objectMessage
要完成topic模式的消息持久化,需要保证每个消费者有唯一的clientID(本文来自北大青鸟)
⑵ 如何在C/C++程序中运用双缓存双线程等大规模数据处理的技巧 或者要怎么做可以一次调入一块数据进行处理
线程技术主要是用来并行处理一些任务,这些任务之间一般少有逻辑顺序上的关联,所以用线程技术可以提高程序整体的运行速度,特别在其中一些子线程运行速度有很大差距的情况下。
各类软件使用缓存的方式都不一样。双缓存或者多个缓存、缓存池等等方式都有。关键在于你的程序需要使用怎样的缓存结构。比如说你是类似生产者消费者模型的软件,你也许会使用多个缓存做成队列,一头在不断填充,一头则不断消耗,这样能大大提高整体的数据吞吐速度。
fread不输入整块调入,它底层是使用的read之类的函数,对文件句柄进行操作。gets函数则是对指针指向的内存地址操作。这些都是上层逻辑了,离磁盘寄存器很远。真正加快文件读取速度的方法有很多,比如把整个文件映射到内存里,又比如跳过磁盘缓存直接大块读取内容。这些有的有专门的API函数可用,有的则需要你自己改写系统底层代码。
建议你多看看操作系统原理方面的书,可以去试着学习下linux内核代码和原理,这样你对这些问题就会有更深的认识。
希望这些建议能帮助你。
⑶ 镎崭綔绯荤粺淇″彿閲忔带鍒堕梾棰桡纴缂揿瓨
涓銆侀梾棰樻弿杩
鐢熶骇钥-娑堣垂钥呴梾棰樻槸涓涓缁忓吀镄勮繘绋嫔悓姝ラ梾棰桡纴璇ラ梾棰樻渶镞╃敱Dijkstra鎻愬嚭锛岀敤浠ユ紨绀轰粬鎻愬嚭镄勪俊鍙烽噺链哄埗銆
浠栬佹眰璁捐″湪钖屼竴涓杩涚▼鍦板潃绌洪棿鍐呮墽琛岀殑涓や釜绾跨▼銆
鐢熶骇钥呯嚎绋嬬敓浜х墿鍝侊纴铹跺悗灏嗙墿鍝佹斁缃鍦ㄤ竴涓绌虹紦鍐插尯涓渚涙秷璐硅呯嚎绋嬫秷璐广
娑堣垂钥呯嚎绋嬩粠缂揿啿鍖轰腑銮峰缑鐗╁搧锛岀劧钖庨喷鏀剧紦鍐插尯銆
褰撶敓浜ц呯嚎绋嬬敓浜х墿鍝佹椂锛屽傛灉娌℃湁绌虹紦鍐插尯鍙鐢锛岄偅涔堢敓浜ц呯嚎绋嫔繀椤荤瓑寰呮秷璐硅呯嚎绋嬮喷鏀惧嚭涓涓绌虹紦鍐插尯銆
褰撴秷璐硅呯嚎绋嬫秷璐圭墿鍝佹椂锛屽傛灉娌℃湁婊$殑缂揿啿鍖猴纴闾d箞娑堣垂钥呯嚎绋嫔皢琚阒诲烇纴鐩村埌鏂扮殑鐗╁搧琚鐢熶骇鍑烘潵銆
钖岀悊 链闂棰树篃鏄瑕佹眰璁捐″湪钖屼竴涓杩涚▼鍦板潃绌洪棿鍐呮墽琛岀殑涓や釜绾跨▼銆
褰撹溅绔椤敭绁ㄥ巺杈惧埌20浜轰笂绾挎椂锛屽繀椤荤瓑链変汉璧板嚭杞︾珯鍞绁ㄥ巺锛岃溅绔椤敭绁ㄥ巺澶栫殑璐绁ㄨ呮墠鍙杩涘叆銆傚氨濡备笂闱
鐢熶骇钥呯嚎绋嬬敓浜х墿鍝佹椂锛屽傛灉娌℃湁绌虹紦鍐插尯鍙鐢锛岄偅涔堢敓浜ц呯嚎绋嫔繀椤荤瓑寰呮秷璐硅呯嚎绋嬮喷鏀惧嚭涓涓绌虹紦鍐插尯銆傛ゆ椂铡呭栬喘绁ㄨ呰繘鍏ュぇ铡呰繖涓浜嬩欢鍙浠ョ湅锅氭槸涓涓鐢熶骇钥呯嚎绋嬶纴钥屽巺鍐呰喘绁ㄨ呰蛋鍑哄巺澶栬繖涓浜嬩欢灏卞彲浠ョ湅锅氭槸涓涓娑堣垂钥呯嚎绋嬨傚傛ら梾棰樿繋鍒冭岃В銆
浜屻佸疄鐜颁唬镰
#include <windows.h>
#include <iostream>
const unsigned short SIZE_OF_BUFFER = 10; //缂揿啿鍖洪暱搴
unsigned short ProctID = 0; //浜у搧鍙
unsigned short ConsumeID = 0; //灏呜娑堣楃殑浜у搧鍙
unsigned short in = 0; //浜у搧杩涚紦鍐插尯镞剁殑缂揿啿鍖轰笅镙
unsigned short out = 0; //浜у搧鍑虹紦鍐插尯镞剁殑缂揿啿鍖轰笅镙
int g_buffer[SIZE_OF_BUFFER]; //缂揿啿鍖烘槸涓寰鐜阒熷垪
bool g_continue = true; //鎺у埗绋嫔簭缁撴潫
HANDLE g_hMutex; //鐢ㄤ簬绾跨▼闂寸殑浜掓枼
HANDLE g_hFullSemaphore; //褰撶紦鍐插尯婊℃椂杩浣跨敓浜ц呯瓑寰
HANDLE g_hEmptySemaphore; //褰撶紦鍐插尯绌烘椂杩浣挎秷璐硅呯瓑寰
DWORD WINAPI Procer(LPVOID); //鐢熶骇钥呯嚎绋
DWORD WINAPI Consumer(LPVOID); //娑堣垂钥呯嚎绋
int main()
{
//鍒涘缓钖勪釜浜掓枼淇″彿
g_hMutex = CreateMutex(NULL,FALSE,NULL);
g_hEmptySemaphore = CreateSemaphore(NULL,0,SIZE_OF_BUFFER-1,NULL);
//璋冩暣涓嬮溃镄勬暟鍊硷纴鍙浠ュ彂鐜帮纴褰撶敓浜ц呬釜鏁板氢簬娑堣垂钥呬釜鏁版椂锛
//鐢熶骇阃熷害蹇锛岀敓浜ц呯粡甯哥瓑寰呮秷璐硅咃绂鍙崭箣锛屾秷璐硅呯粡甯哥瓑寰
const unsigned short PRODUCERS_COUNT = 3; //鐢熶骇钥呯殑涓鏁
const unsigned short CONSUMERS_COUNT = 1; //娑堣垂钥呯殑涓鏁
//镐荤殑绾跨▼鏁
const unsigned short THREADS_COUNT = PRODUCERS_COUNT+CONSUMERS_COUNT;
DWORD procerID[CONSUMERS_COUNT]; //鐢熶骇钥呯嚎绋嬬殑镙囱瘑绗
DWORD consumerID[THREADS_COUNT]; //娑堣垂钥呯嚎绋嬬殑镙囱瘑绗
//鍒涘缓鐢熶骇钥呯嚎绋
for (int i=0;i<PRODUCERS_COUNT;++i){
hThreads[i]=CreateThread(NULL,0,Procer,NULL,0,&procerID[i]);
if (hThreads[i]==NULL) return -1;
}
//鍒涘缓娑堣垂钥呯嚎绋
for (int i=0;i<CONSUMERS_COUNT;++i){
hThreads[PRODUCERS_COUNT+i]=CreateThread(NULL,0,Consumer,NULL,0,&consumerID[i]);
if (hThreads[i]==NULL) return -1;
}
while(g_continue){
if(getchar()){ //鎸夊洖杞﹀悗缁堟㈢▼搴忚繍琛
g_continue = false;
}
}
return 0;
}
//鐢熶骇涓涓浜у搧銆傜亩鍗曟ā𨰾熶简涓涓嬶纴浠呰緭鍑烘柊浜у搧镄処D鍙
void Proce()
{
std::cerr << "Procing " << ++ProctID << " ... ";
std::cerr << "Succeed" << std::endl;
}
//鎶婃柊鐢熶骇镄勪骇鍝佹斁鍏ョ紦鍐插尯
void Append()
{
std::cerr << "Appending a proct ... ";
g_buffer[in] = ProctID;
in = (in+1)%SIZE_OF_BUFFER;
std::cerr << "Succeed" << std::endl;
//杈揿嚭缂揿啿鍖哄綋鍓岖殑鐘舵
for (int i=0;i<SIZE_OF_BUFFER;++i){
std::cout << i <<": " << g_buffer[i];
if (i==in) std::cout << " <-- 鐢熶骇";
if (i==out) std::cout << " <-- 娑堣垂";
std::cout << std::endl;
}
}
//浠庣紦鍐插尯涓鍙栧嚭涓涓浜у搧
void Take()
{
std::cerr << "Taking a proct ... ";
ConsumeID = g_buffer[out];
out = (out+1)%SIZE_OF_BUFFER;
std::cerr << "Succeed" << std::endl;
//杈揿嚭缂揿啿鍖哄綋鍓岖殑鐘舵
for (int i=0;i<SIZE_OF_BUFFER;++i){
std::cout << i <<": " << g_buffer[i];
if (i==in) std::cout << " <-- 鐢熶骇";
if (i==out) std::cout << " <-- 娑堣垂";
std::cout << std::endl;
}
}
//娑堣椾竴涓浜у搧
void Consume()
{
std::cerr << "Consuming " << ConsumeID << " ... ";
std::cerr << "Succeed" << std::endl;
}
//鐢熶骇钥
DWORD WINAPI Procer(LPVOID lpPara)
{
while(g_continue){
WaitForSingleObject(g_hFullSemaphore,INFINITE);
WaitForSingleObject(g_hMutex,INFINITE);
Proce();
Append();
Sleep(1500);
ReleaseMutex(g_hMutex);
ReleaseSemaphore(g_hEmptySemaphore,1,NULL);
}
return 0;
}
//娑堣垂钥
DWORD WINAPI Consumer(LPVOID lpPara)
{
while(g_continue){
WaitForSingleObject(g_hEmptySemaphore,INFINITE);
WaitForSingleObject(g_hMutex,INFINITE);
Take();
Consume();
Sleep(1500);
ReleaseMutex(g_hMutex);
ReleaseSemaphore(g_hFullSemaphore,1,NULL);
}
return 0;
}
⑷ 线程池-参数篇:2.队列
多线程环境中,通过队列可以很容易实现线程间数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享;同时作为BlockingQueue的使用者,我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue的实现者都给一手包办了。
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,另外还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。
基于链表的阻塞队列,其内部也维持着一个数据缓冲队列(由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。
对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。如果没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于罩伍消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。
ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。
DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
DelayQueue用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。
Delayed 是一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定宴闷冲值(final)。DelayQueue内部是使用PriorityQueue实现的。
考虑以下场景:
一种笨笨的办法就是,使用一个后台线程,遍历所有对象,挨个检查。这种笨笨的办法简单好用,但是对象数量过多时,可能存在性能问题,检查间隔时间不好设置,间隔时间过大,影响精确度,多小则存在效率问题。而且做不到按超时的时间顺序处理。
这场景,使用DelayQueue最适合了,详情查看 DelayedQueue学习笔记 ; 精巧好用的DelayQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),需要注意PriorityBlockingQueue并不会阻塞数据生产者晌歼,而只会在没有可消费的数据时,阻塞数据的消费者。
使用时,若生产者生产数据的速度快于消费者消费数据的速度,随着长时间的运行,可能会耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
SynchronousQueue是一个内部只能包含零个元素的队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取元素。同样,如果线程尝试获取元素并且当前没有线程在插入元素,则该线程将被阻塞,直到有线程将元素插入队列
声明一个SynchronousQueue有公平模式和非公平模式,区别如下:
参考: Java多线程-工具篇-BlockingQueue
12. SynchronousQueue